From dbe57f345ccaaaf43848950b4e7201deb00f771e Mon Sep 17 00:00:00 2001 From: zzh Date: Fri, 25 Apr 2025 15:34:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E5=B9=B6=E8=A1=8C?= =?UTF-8?q?=E6=B5=81=E6=B0=B4=E7=BA=BF=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + src/sort/sam_io.h | 5 + src/sort/sort.cpp | 405 +++++++++++++++++++++++++++++++++++----------- src/sort/sort.h | 196 ++++++++++++++++++++++ src/util/yarn.cpp | 395 ++++++++++++++++++++++++++++++++++++++++++++ src/util/yarn.h | 169 +++++++++++++++++++ 6 files changed, 1076 insertions(+), 96 deletions(-) create mode 100644 src/sort/sort.h create mode 100644 src/util/yarn.cpp create mode 100644 src/util/yarn.h diff --git a/.gitignore b/.gitignore index 96caa21..51c06a1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # ---> C++ # Prerequisites *.d +*.bak +*.log /test /.vscode /build diff --git a/src/sort/sam_io.h b/src/sort/sam_io.h index 0896329..f9b41b3 100644 --- a/src/sort/sam_io.h +++ b/src/sort/sam_io.h @@ -15,6 +15,11 @@ struct DataBuffer { maxLen = 0; data = nullptr; } + DataBuffer(size_t initSize) { + curLen = 0; + maxLen = initSize; + data = (uint8_t *)malloc(maxLen); + } ~DataBuffer() { if (data) free(data); diff --git a/src/sort/sort.cpp b/src/sort/sort.cpp index 27acdbb..e1422aa 100644 --- a/src/sort/sort.cpp +++ b/src/sort/sort.cpp @@ -1,3 +1,5 @@ +#include "sort.h" + #include #include #include @@ -15,8 +17,9 @@ #include "sam_io.h" #include "sort_args.h" -#include "util/profiling.h" #include "const_val.h" +#include "util/profiling.h" +#include "util/yarn.h" #define BAM_BLOCK_SIZE 16L * 1024 * 1024 @@ -39,27 +42,6 @@ struct BamSortData { char *qname; // pointer to qname }; -struct BamPointer { - uint64_t offset = 0; // 地址偏移量 - uint32_t bamLen = 0; -}; - -struct BamPtrArr { - vector bamArr; - int curIdx = 0; // -}; - -struct OneBlock { - uint8_t data[SINGLE_BLOCK_SIZE]; - uint32_t blockLen = 0; // 解压后的数据长度 - uint64_t blockId = 0; // 按照顺序排列的block ID -}; - -struct ThreadBlockArr { - vector blockArr; // 解压后的数据 - int curIdx = 0; // 当前解压数据对应的vector的索引 -}; - // thread data for sorting struct SortData { /* 用在多线程解压中的数据 */ @@ -148,75 +130,9 @@ static void bam_cigar2rqlens(int n_cigar, const uint32_t *cigar, hts_pos_t *rlen } } -int parseBam(uint8_t *data, bam1_t *b) { - bam1_core_t *c = &b->core; - int32_t block_len, ret, i; - uint32_t new_l_data; - uint8_t tmp[32], *x; - - b->l_data = 0; - - block_len = *(uint32_t *)data; - spdlog::info("record_len: {}", block_len); - data += 4; - x = data; - - c->tid = le_to_u32(x); - c->pos = le_to_i32(x + 4); - uint32_t x2 = le_to_u32(x + 8); - c->bin = x2 >> 16; - c->qual = x2 >> 8 & 0xff; - c->l_qname = x2 & 0xff; - c->l_extranul = (c->l_qname % 4 != 0) ? (4 - c->l_qname % 4) : 0; - uint32_t x3 = le_to_u32(x + 12); - c->flag = x3 >> 16; - c->n_cigar = x3 & 0xffff; - c->l_qseq = le_to_u32(x + 16); - c->mtid = le_to_u32(x + 20); - c->mpos = le_to_i32(x + 24); - c->isize = le_to_i32(x + 28); - - new_l_data = block_len - 32 + c->l_extranul; - if (new_l_data > INT_MAX || c->l_qseq < 0 || c->l_qname < 1) - return -4; - if (((uint64_t)c->n_cigar << 2) + c->l_qname + c->l_extranul + (((uint64_t)c->l_qseq + 1) >> 1) + c->l_qseq > - (uint64_t)new_l_data) - return -4; - if (realloc_bam_data(b, new_l_data) < 0) - return -4; - b->l_data = new_l_data; - - b->data = data; - data += c->l_qname; - - if (b->data[c->l_qname - 1] != '\0') { // try to fix missing nul termination - if (fixup_missing_qname_nul(b) < 0) - return -4; - } - for (i = 0; i < c->l_extranul; ++i) b->data[c->l_qname + i] = '\0'; - c->l_qname += c->l_extranul; - data += b->l_data - c->l_qname; - - - // TODO: consider making this conditional - if (c->n_cigar > 0) { // recompute "bin" and check CIGAR-qlen consistency - hts_pos_t rlen, qlen; - bam_cigar2rqlens(c->n_cigar, bam_get_cigar(b), &rlen, &qlen); - if ((b->core.flag & BAM_FUNMAP) || rlen == 0) - rlen = 1; - b->core.bin = hts_reg2bin(b->core.pos, b->core.pos + rlen, 14, 5); - // Sanity check for broken CIGAR alignments - if (c->l_qseq > 0 && !(c->flag & BAM_FUNMAP) && qlen != c->l_qseq) { - hts_log_error("CIGAR and query sequence lengths differ for %s", bam_get_qname(b)); - return -4; - } - } - - return 4 + block_len; -} // multi-thread uncompress bam blocks -static void mtUncompressBlock(void *data, long idx, int tid) { +static void mtUncompressBlock1(void *data, long idx, int tid) { auto &p = *(SortData*) data; auto &blockItemArr = p.blockItemArr[tid]; @@ -243,7 +159,7 @@ static void mergeSortedBlocks(const SortData &sortData) { static void *threadMergeParseBlocks(void *data) { return nullptr; - + auto &p = *(SortData *)data; DataBuffer &buf = p.bamData; @@ -302,7 +218,7 @@ static void *nonBlockingUncompress(void *data) { pthread_create(&parseTid, NULL, threadMergeParseBlocks, &sortData); // 2. 调用并行解压的线程函数 - kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, data, ((SortData *)data)->blockAddrArr->size()); + kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock1, data, ((SortData *)data)->blockAddrArr->size()); sortData.uncompressFinish = 1; int totalBlockNum = 0; for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { @@ -345,11 +261,308 @@ void *threadRead(void *data) { return NULL; } +/* 将bam文件内容读取到buf,解析buf中的gz block长度信息 */ +static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *fpr) { + ReadData &readData = p.readData[p.readOrder % p.READ_BUF_NUM]; + size_t readState = 0; + size_t curReadPos = 0; + int blockLen = 0; + int maxBlockLen = 0; + + readState = fread(readData.dataBuf, 1, readData.readBufSize, fpr); + if (readState == 0) { return 0; } + + //if (p.readOrder == 278) { + // spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); + //} + + readData.startAddrArr.clear(); + + /* 处理上一个不完整的block */ + if (halfBlock.readPos > 0) { // 上一轮有剩余 + if (halfBlock.readPos < BLOCK_HEADER_LENGTH) { // 上一轮剩余数据不满足解析block长度信息 + memcpy(&halfBlock.data[halfBlock.readPos], readData.dataBuf, BLOCK_HEADER_LENGTH - halfBlock.readPos); + halfBlock.curLen = unpackInt16(&halfBlock.data[16]) + 1; // 更新一下剩余block的真正长度 + spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); + } + memcpy(readData.blockBuf, halfBlock.data, halfBlock.readPos); + curReadPos = halfBlock.curLen - halfBlock.readPos; // curlen保存上一个block的长度,readPos保存上一个block在上一次读取中的长度 + memcpy(&readData.blockBuf[halfBlock.readPos], readData.dataBuf, curReadPos); // 将不完整的block剩余数据拷贝到curBlock + readData.startAddrArr.push_back(readData.blockBuf); + } + /* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */ + while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */ + blockLen = unpackInt16(&readData.dataBuf[curReadPos + 16]) + 1; + if (blockLen > maxBlockLen) { maxBlockLen = blockLen; } + if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */ + readData.startAddrArr.push_back(&readData.dataBuf[curReadPos]); + curReadPos += blockLen; + } else { + break; /* 当前block数据不完整,一部分在还没读入的file数据里 */ + } + } + /* 如果buf中包含不完整的block数据,先保存一下,放到下一轮里去处理 */ + halfBlock.readPos = readState - curReadPos; + halfBlock.curLen = blockLen; + if (halfBlock.readPos > 0) { + memcpy(halfBlock.data, &readData.dataBuf[curReadPos], halfBlock.readPos); // 将不完整的block拷贝到halfBlock + } + //if (p.readOrder == 277) { + // spdlog::info("tail - last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); + //} + //spdlog::info("block num-1: {}", readData.startAddrArr.size()); + //spdlog::info("max block len: {}", maxBlockLen); + + return readState; +} + + +/* FirstPipe step-1 读取文件线程 */ +static void *firstPipeReadFile(void *data) { + FirstPipeArg &p = *(FirstPipeArg *)data; + /* 1. set up */ + FILE *fpr = fopen(nsgv::gSortArg.INPUT_FILE.c_str(), "rb"); + parseSamHeader(fpr, nsgv::gInHdr); + size_t fileSize = 0; + DataBuffer halfBlock(SINGLE_BLOCK_SIZE); + + /* 2. do the work */ + while (true) { + // self dependency + yarn::DEPENDENCY_NOT_TO_BE(p.readSig, p.READ_BUF_NUM); + + PROF_G_BEG(read); + size_t readState = doFirstPipeReadFile(p, halfBlock, fpr); + PROF_G_END(read); + + if (readState == 0) { + yarn::SIGNAL_FINISH(p.readSig, p.readFinish); + break; + } + + // update self status + yarn::UPDATE_SIG_ORDER(p.readSig, p.readOrder); + + fileSize += readState; + } + + spdlog::info("read file order: {}, file size: {}", p.readOrder, fileSize); + /* 3. clean up */ + fclose(fpr); + return nullptr; +} + +// multi-thread uncompress bam blocks +static void mtUncompressBlock(void *data, long idx, int tid) { + UncompressData &p = *(UncompressData *)data; + ReadData &readData = *p.readDataPtr; + + auto &blockItemArr = p.blockItemArr[tid]; + if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) { + blockItemArr.blockArr.push_back(OneBlock()); + } + + auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++]; + + uint8_t *block = readData.startAddrArr[idx]; + + size_t dlen = SINGLE_BLOCK_SIZE; // 65535 + int block_length = unpackInt16(&block[16]) + 1; + uint32_t crc = le_to_u32(block + block_length - 8); + int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef *)block + BLOCK_HEADER_LENGTH, + block_length - BLOCK_HEADER_LENGTH, crc); + if (ret != 0) { + spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret); + exit(0); + } + blockItem.blockId = idx; + blockItem.blockLen = dlen; + uint32_t nextBamStart = 0; + uint32_t bamLen = 0; + uint32_t bamNum = 0; + PROF_G_BEG(mem_copy); + // memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen); + /* 解析每个bam */ + while (nextBamStart + 4 <= blockItem.blockLen) { + memcpy(&bamLen, &blockItem.data[nextBamStart], 4); + nextBamStart += 4 + bamLen; + // spdlog::info("bam len: {}", bamLen); + ++bamNum; + } + blockItem.bamNum = bamNum; + PROF_G_END(mem_copy); +} + +/* 将gz block进行解压 */ +static void doFirstPipeUncompress(FirstPipeArg &p) { + // return; + ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; + UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM]; + uncompressData.readDataPtr = &readData; + uncompressData.ResetBlockArr(); + + kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size()); + //spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx, + // uncompressData.blockItemArr[0].blockArr.size()); + +} + +/* FirstPipe step-2 并行解压gz blocks*/ +static void *firstPipeUncompress(void *data) { + FirstPipeArg &p = *(FirstPipeArg *)data; + + /* 2. do the work */ + while (true) { + // previous dependency + yarn::DEPENDENCY_NOT_TO_BE(p.readSig, 0); + // self dependency + yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); + + if (p.readFinish) { + while (p.uncompressOrder < p.readOrder) { + yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); + doFirstPipeUncompress(p); + yarn::UPDATE_SIG_ORDER(p.uncompressSig, p.uncompressOrder); + } + yarn::SIGNAL_FINISH(p.uncompressSig, p.uncompressFinish); + break; + } + + doFirstPipeUncompress(p); + + // update status + yarn::CONSUME_SIGNAL(p.readSig); + yarn::UPDATE_SIG_ORDER(p.uncompressSig, p.uncompressOrder); + } + + spdlog::info("uncompress order: {}", p.uncompressOrder); + return nullptr; +} + +/* 将并行解压的数据放到一起,解析,到容量阈值后,进行排序并输出到中间文件 */ +void doFirstPipeMergeSort(FirstPipeArg &p) { + UncompressData &uncompressData = p.uncompressData[p.mergeSortOrder % p.UNCOMPRESS_BUF_NUM]; + DataBuffer &buf = p.mergeSortData.bamData; + + BlockHeap blockHeap; + blockHeap.Init(&uncompressData.blockItemArr); + // spdlog::info("block heap size: {}", blockHeap.Size()); + // spdlog::info("all block bytes: {}", blockHeap.AllBlockBytes()); + // size_t allBlockBytes = blockHeap.AllBlockBytes(); + // if (buf.maxLen < buf.curLen + allBlockBytes) { + // // spdlog::info("here "); + // buf.reAllocMem(buf.curLen + allBlockBytes); + // } + // memset(buf.data, 0, buf.maxLen); + + const OneBlock *pB; + size_t bamNum = 0; + size_t blockNum = 0; + while ((pB = blockHeap.Pop()) != nullptr && pB->blockLen > 0) { + ++blockNum; + bamNum += pB->bamNum; + } + buf.curLen = 0; + spdlog::info("block num: {}, bam num: {}", blockNum, bamNum); +} + +/* FirstPipe step-3 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 */ +static void *firstPipeMergeSort(void *data) { + FirstPipeArg &p = *(FirstPipeArg *)data; + + while (true) { + yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, 0); + + if (p.uncompressFinish) { + spdlog::info("uncompress finish, cur sort order: {}", p.mergeSortOrder); + while (p.mergeSortOrder < p.uncompressOrder) { + doFirstPipeMergeSort(p); + p.mergeSortOrder += 1; + } + /* 需要检测一下buf中是否还有数据,如果还有,则需要进行排序,可以不输出到中间文件,直接进行second-pipe的归并排序 */ + break; + } + + doFirstPipeMergeSort(p); + + p.mergeSortOrder += 1; + + // update status + yarn::CONSUME_SIGNAL(p.uncompressSig); + } + + spdlog::info("merge sort order: {}", p.mergeSortOrder); + return nullptr; +} + +/* 对bam文件进行排序 */ +static void bamSortFirstPipe() { + /* set up*/ + FirstPipeArg firstPipeArg; + firstPipeArg.numThread = nsgv::gSortArg.NUM_THREADS; + const size_t kReadBufSize = 1L * 1024 * 1024 * firstPipeArg.numThread; + for (int i = 0; isize(); PROF_G_END(parse_block); -#if 1 +#if 0 // 并行解压 PROF_G_BEG(sort); pthread_join(uncompressTid, NULL); @@ -527,4 +739,5 @@ err: // fclose(fpw); return 0; -} \ No newline at end of file +} + diff --git a/src/sort/sort.h b/src/sort/sort.h new file mode 100644 index 0000000..ee4e599 --- /dev/null +++ b/src/sort/sort.h @@ -0,0 +1,196 @@ +#pragma once + +#include + +#include +#include + +#include "const_val.h" +#include "sam_io.h" +#include "util/yarn.h" + +using std::priority_queue; +using std::vector; + +/* for step-1 read data from bam file */ +struct ReadData { + uint8_t *dataBuf = nullptr; + uint8_t *blockBuf = nullptr; + int readBufSize = 0; // 读入的buf大小 + vector startAddrArr; // 存放每个block的起始地址 + ReadData() { } + ReadData(int readBufSize_) { + readBufSize = readBufSize_; + dataBuf = (uint8_t *)malloc(readBufSize); + blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); + } + ~ReadData() { + if (dataBuf) free(dataBuf); + if (blockBuf) free(blockBuf); + } + void Resize(int readBufSize_) { + if (dataBuf) free(dataBuf); + if (blockBuf) free(blockBuf); + readBufSize = readBufSize_; + dataBuf = (uint8_t *)malloc(readBufSize); + blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); + } +}; +/* for step-2 parallel uncompress gz blocks */ +struct OneBlock { + uint8_t data[SINGLE_BLOCK_SIZE]; + uint32_t blockLen = 0; // 解压后的数据长度 + uint64_t blockId = 0; // 按照顺序排列的block ID + uint64_t bamNum = 0; // 解压后的bam数量 +}; +struct ThreadBlockArr { + vector blockArr; // 解压后的数据 + int curIdx = 0; // 当前解压数据对应的vector的索引 +}; +struct UncompressData { + vector blockItemArr; // 每个thread一个,用来保存解压后的block数据 + ReadData *readDataPtr = nullptr; // 读取数据的指针 + + UncompressData() { } + UncompressData(int numThread) { Resize(numThread); } + UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); } + void Resize(int numThread) { + blockItemArr.resize(numThread); + for (int i = 0; i < numThread; ++i) { + blockItemArr[i].blockArr.reserve(128); + } + } + void Resize(int numThread, int vecInitSize) { + blockItemArr.resize(numThread); + for (int i = 0; i < numThread; ++i) { + blockItemArr[i].blockArr.reserve(vecInitSize); + } + } + void ResetBlockArr() { + for (int i = 0; i < blockItemArr.size(); ++i) { + blockItemArr[i].curIdx = 0; + } + } +}; +/* block 排序堆 */ +struct BlockArrIdIdx { + int arrId = 0; + size_t arrIdx = 0; // 下一个待读入数据的idx + const OneBlock *block = nullptr; +}; + +struct BlockGreaterThan{ + bool operator()(const BlockArrIdIdx &a, const BlockArrIdIdx &b) const { + return a.block->blockId > b.block->blockId; + } +}; +/* 用来排序 */ +struct BlockHeap { + vector *arr2d; + priority_queue, BlockGreaterThan> minHeap; + size_t popNum = 0; + + int Init(vector *_arr2d) { + arr2d = _arr2d; + if (arr2d == nullptr) { + return -1; + } + for (int i = 0; i < arr2d->size(); ++i) { + auto &v = (*arr2d)[i]; + if (v.curIdx > 0) { + minHeap.push({i, 1, &v.blockArr[0]}); + } + } + return 0; + } + + const OneBlock *Pop() { + const OneBlock *ret = nullptr; + if (!minHeap.empty()) { + auto minVal = minHeap.top(); + minHeap.pop(); + ++popNum; + ret = minVal.block; + auto &v = (*arr2d)[minVal.arrId]; + if (v.curIdx > minVal.arrIdx) { + minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v.blockArr[minVal.arrIdx]}); + } + } + return ret; + } + + size_t AllBlockBytes() { + size_t bytes = 0; + if (arr2d != nullptr) { + for (auto &v : *arr2d) { + for (int i = 0; i < v.curIdx; ++i) { + bytes += v.blockArr[i].blockLen; + } + } + } + return bytes; + } + + size_t Size() { + size_t len = 0; + if (arr2d != nullptr) { + for (auto &v : *arr2d) { + len += v.curIdx; + } + } + return len - popNum; + } +}; + +/* for step-3 serial merge blocks and sort them */ +struct BamPointer { + uint64_t offset = 0; // 地址偏移量 + uint32_t bamLen = 0; +}; + +struct BamPtrArr { + vector bamArr; + int curIdx = 0; // +}; + +struct MergeSortData { + DataBuffer bamData; // 用来保存解压后的数据 + BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度 + MergeSortData() { + bamPtrArr.bamArr.reserve(128); + } +}; + +/* 第一阶段的多线程流水线参数 */ +struct FirstPipeArg { + static const int READ_BUF_NUM = 2; // 读入的buf数量 + static const int UNCOMPRESS_BUF_NUM = 2; // 解压的buf数量 + + int numThread = 0; // 线程数 + + uint64_t readOrder = 0; // 读取文件 + uint64_t uncompressOrder = 0; // 并行解压gz block + uint64_t mergeSortOrder = 0; // 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 + + volatile int readFinish = 0; + volatile int uncompressFinish = 0; + + yarn::lock_t *readSig; + yarn::lock_t *uncompressSig; + + ReadData readData[READ_BUF_NUM]; + UncompressData uncompressData[UNCOMPRESS_BUF_NUM]; + MergeSortData mergeSortData; + + FirstPipeArg() + { + readSig = yarn::NEW_LOCK(0); + uncompressSig = yarn::NEW_LOCK(0); + } +}; + +/* 第二阶段的多线程流水线参数 */ +struct PipeSecondArg +{ + /* data */ +}; \ No newline at end of file diff --git a/src/util/yarn.cpp b/src/util/yarn.cpp new file mode 100644 index 0000000..7945210 --- /dev/null +++ b/src/util/yarn.cpp @@ -0,0 +1,395 @@ +/* yarn.c -- generic thread operations implemented using pthread functions + * Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler + * Version 1.7 12 Apr 2020 Mark Adler + * For conditions of distribution and use, see copyright notice in yarn.h + */ + +/* Basic thread operations implemented using the POSIX pthread library. All + pthread references are isolated within this module to allow alternate + implementations with other thread libraries. See yarn.h for the description + of these operations. */ + +/* Version history: + 1.0 19 Oct 2008 First version + 1.1 26 Oct 2008 No need to set the stack size -- remove + Add yarn_abort() function for clean-up on error exit + 1.2 19 Dec 2011 (changes reversed in 1.3) + 1.3 13 Jan 2012 Add large file #define for consistency with pigz.c + Update thread portability #defines per IEEE 1003.1-2008 + Fix documentation in yarn.h for yarn_prefix + 1.4 19 Jan 2015 Allow yarn_abort() to avoid error message to stderr + Accept and do nothing for NULL argument to free_lock() + 1.5 8 May 2018 Remove destruct() to avoid use of pthread_cancel() + Normalize the code style + 1.6 3 Apr 2019 Add debugging information to fail() error messages + 1.7 12 Apr 2020 Fix use after free bug in ignition() + */ + +// For thread portability. +#define _XOPEN_SOURCE 700 +#define _POSIX_C_SOURCE 200809L +#define _THREAD_SAFE + +// Use large file functions if available. +#define _FILE_OFFSET_BITS 64 + +// External libraries and entities referenced. +#include // pthread_t, pthread_create(), pthread_join(), +#include // fprintf(), stderr +#include // exit(), malloc(), free(), NULL +// pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(), +// PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(), +// pthread_self(), pthread_equal(), +// pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(), +// pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(), +// pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(), +// pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() +#include // EPERM, ESRCH, EDEADLK, ENOMEM, EBUSY, EINVAL, EAGAIN + +// Interface definition. +#include "yarn.h" + +// Constants. +#define local static // for non-exported functions and globals + +namespace yarn { + +// Error handling external globals, resettable by application. +char *yarn_prefix = (char *)"yarn"; +void (*yarn_abort)(int) = NULL; + +// Immediately exit -- use for errors that shouldn't ever happen. +local void fail(int err, char const *file, long line, char const *func) { + fprintf(stderr, "%s: ", yarn_prefix); + switch (err) { + case EPERM: + fputs("already unlocked", stderr); + break; + case ESRCH: + fputs("no such thread", stderr); + break; + case EDEADLK: + fputs("resource deadlock", stderr); + break; + case ENOMEM: + fputs("out of memory", stderr); + break; + case EBUSY: + fputs("can't destroy locked resource", stderr); + break; + case EINVAL: + fputs("invalid request", stderr); + break; + case EAGAIN: + fputs("resource unavailable", stderr); + break; + default: + fprintf(stderr, "internal error %d", err); + } + fprintf(stderr, " (%s:%ld:%s)\n", file, line, func); + if (yarn_abort != NULL) + yarn_abort(err); + exit(err); +} + +// Memory handling routines provided by user. If none are provided, malloc() +// and free() are used, which are therefore assumed to be thread-safe. +typedef void *(*malloc_t)(size_t); +typedef void (*free_t)(void *); +local malloc_t my_malloc_f = malloc; +local free_t my_free = free; + +// Use user-supplied allocation routines instead of malloc() and free(). +void yarn_mem(malloc_t lease, free_t vacate) { + my_malloc_f = lease; + my_free = vacate; +} + +// Memory allocation that cannot fail (from the point of view of the caller). +local void *my_malloc(size_t size, char const *file, long line) { + void *block; + + if ((block = my_malloc_f(size)) == NULL) + fail(ENOMEM, file, line, "malloc"); + return block; +} + +// -- Lock functions -- + +struct lock_s { + pthread_mutex_t mutex; + pthread_cond_t cond; + long value; +}; + +lock_t *new_lock_(long initial, char const *file, long line) { + lock_t *bolt = (lock_t *)my_malloc(sizeof(struct lock_s), file, line); + int ret = pthread_mutex_init(&(bolt->mutex), NULL); + if (ret) + fail(ret, file, line, "mutex_init"); + ret = pthread_cond_init(&(bolt->cond), NULL); + if (ret) + fail(ret, file, line, "cond_init"); + bolt->value = initial; + return bolt; +} + +void possess_(lock_t *bolt, char const *file, long line) { + int ret = pthread_mutex_lock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_lock"); +} + +void release_(lock_t *bolt, char const *file, long line) { + int ret = pthread_mutex_unlock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_unlock"); +} + +void twist_(lock_t *bolt, enum twist_op op, long val, char const *file, long line) { + if (op == TO) + bolt->value = val; + else if (op == BY) + bolt->value += val; + int ret = pthread_cond_broadcast(&(bolt->cond)); + if (ret) + fail(ret, file, line, "cond_broadcast"); + ret = pthread_mutex_unlock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_unlock"); +} + +#define until(a) while (!(a)) + +void wait_for_(lock_t *bolt, enum wait_op op, long val, char const *file, long line) { + switch (op) { + case TO_BE: + until(bolt->value == val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case NOT_TO_BE: + until(bolt->value != val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case TO_BE_MORE_THAN: + until(bolt->value > val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case TO_BE_LESS_THAN: + until(bolt->value < val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + } +} + +long peek_lock(lock_t *bolt) { return bolt->value; } + +void free_lock_(lock_t *bolt, char const *file, long line) { + if (bolt == NULL) + return; + int ret = pthread_cond_destroy(&(bolt->cond)); + if (ret) + fail(ret, file, line, "cond_destroy"); + ret = pthread_mutex_destroy(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_destroy"); + my_free(bolt); +} + +// -- Thread functions (uses the lock_t functions above) -- + +struct thread_s { + pthread_t id; + int done; // true if this thread has exited + thread *next; // for list of all launched threads +}; + +// List of threads launched but not joined, count of threads exited but not +// joined (incremented by ignition() just before exiting). +local lock_t threads_lock = { + PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, + 0 // number of threads exited but not joined +}; +local thread *threads = NULL; // list of extant threads + +// Structure in which to pass the probe and its payload to ignition(). +struct capsule { + void (*probe)(void *); + void *payload; + char const *file; + long line; +}; + +// Mark the calling thread as done and alert join_all(). +local void reenter(void *arg) { + struct capsule *capsule = (struct capsule *)arg; + + // find this thread in the threads list by matching the thread id + pthread_t me = pthread_self(); + possess_(&(threads_lock), capsule->file, capsule->line); + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (pthread_equal(match->id, me)) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, capsule->file, capsule->line, "reenter lost"); + + // mark this thread as done and move it to the head of the list + match->done = 1; + if (threads != match) { + *prior = match->next; + match->next = threads; + threads = match; + } + + // update the count of threads to be joined and alert join_all() + twist_(&(threads_lock), BY, +1, capsule->file, capsule->line); + + // free the capsule resource, even if the thread is cancelled (though yarn + // doesn't use pthread_cancel() -- you never know) + my_free(capsule); +} + +// All threads go through this routine. Just before a thread exits, it marks +// itself as done in the threads list and alerts join_all() so that the thread +// resources can be released. Use a cleanup stack so that the marking occurs +// even if the thread is cancelled. +local void *ignition(void *arg) { + struct capsule *capsule = (struct capsule *)arg; + + // run reenter() before leaving + pthread_cleanup_push(reenter, arg); + + // execute the requested function with argument + capsule->probe(capsule->payload); + + // mark this thread as done, letting join_all() know, and free capsule + pthread_cleanup_pop(1); + + // exit thread + return NULL; +} + +// Not all POSIX implementations create threads as joinable by default, so that +// is made explicit here. +thread *launch_(void (*probe)(void *), void *payload, char const *file, long line) { + // construct the requested call and argument for the ignition() routine + // (allocated instead of automatic so that we're sure this will still be + // there when ignition() actually starts up -- ignition() will free this + // allocation) + struct capsule *capsule = (struct capsule *)my_malloc(sizeof(struct capsule), file, line); + capsule->probe = probe; + capsule->payload = payload; + capsule->file = file; + capsule->line = line; + + // assure this thread is in the list before join_all() or ignition() looks + // for it + possess_(&(threads_lock), file, line); + + // create the thread and call ignition() from that thread + thread *th = (thread *)my_malloc(sizeof(struct thread_s), file, line); + pthread_attr_t attr; + int ret = pthread_attr_init(&attr); + if (ret) + fail(ret, file, line, "attr_init"); + ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + if (ret) + fail(ret, file, line, "attr_setdetachstate"); + ret = pthread_create(&(th->id), &attr, ignition, capsule); + if (ret) + fail(ret, file, line, "create"); + ret = pthread_attr_destroy(&attr); + if (ret) + fail(ret, file, line, "attr_destroy"); + + // put the thread in the threads list for join_all() + th->done = 0; + th->next = threads; + threads = th; + release_(&(threads_lock), file, line); + return th; +} + +void join_(thread *ally, char const *file, long line) { + // wait for thread to exit and return its resources + int ret = pthread_join(ally->id, NULL); + if (ret) + fail(ret, file, line, "join"); + + // find the thread in the threads list + possess_(&(threads_lock), file, line); + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (match == ally) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, file, line, "join lost"); + + // remove thread from list and update exited count, free thread + if (match->done) + threads_lock.value--; + *prior = match->next; + release_(&(threads_lock), file, line); + my_free(ally); +} + +// This implementation of join_all() only attempts to join threads that have +// announced that they have exited (see ignition()). When there are many +// threads, this is faster than waiting for some random thread to exit while a +// bunch of other threads have already exited. +int join_all_(char const *file, long line) { + // grab the threads list and initialize the joined count + int count = 0; + possess_(&(threads_lock), file, line); + + // do until threads list is empty + while (threads != NULL) { + // wait until at least one thread has reentered + wait_for_(&(threads_lock), NOT_TO_BE, 0, file, line); + + // find the first thread marked done (should be at or near the top) + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (match->done) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, file, line, "join_all lost"); + + // join the thread (will be almost immediate), remove from the threads + // list, update the reenter count, and free the thread + int ret = pthread_join(match->id, NULL); + if (ret) + fail(ret, file, line, "join"); + threads_lock.value--; + *prior = match->next; + my_free(match); + count++; + } + + // let go of the threads list and return the number of threads joined + release_(&(threads_lock), file, line); + return count; +} + +}; // namespace yarn \ No newline at end of file diff --git a/src/util/yarn.h b/src/util/yarn.h new file mode 100644 index 0000000..cac9274 --- /dev/null +++ b/src/util/yarn.h @@ -0,0 +1,169 @@ +/* yarn.h -- generic interface for thread operations + * Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler + * Version 1.7 12 Apr 2020 Mark Adler + */ + +/* + This software is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + 3. This notice may not be removed or altered from any source distribution. + + Mark Adler + madler@alumni.caltech.edu + */ + +/* Basic thread operations + + This interface isolates the local operating system implementation of threads + from the application in order to facilitate platform independent use of + threads. All of the implementation details are deliberately hidden. + + Assuming adequate system resources and proper use, none of these functions + can fail. As a result, any errors encountered will cause an exit() to be + executed, or the execution of your own optionally-provided abort function. + + These functions allow the simple launching and joining of threads, and the + locking of objects and synchronization of changes of objects. The latter is + implemented with a single lock_t type that contains an integer value. The + value can be ignored for simple exclusive access to an object, or the value + can be used to signal and wait for changes to an object. + + -- Arguments -- + + thread *thread; identifier for launched thread, used by join + void probe(void *); pointer to function "probe", run when thread starts + void *payload; single argument passed to the probe function + lock_t *lock_t; a lock_t with a value -- used for exclusive access to + an object and to synchronize threads waiting for + changes to an object + long val; value to set lock_t, increment lock_t, or wait for + int n; number of threads joined + + -- Thread functions -- + + thread = launch(probe, payload) - launch a thread -- exit via probe() return + join(thread) - join a thread and by joining end it, waiting for the thread + to exit if it hasn't already -- will free the resources allocated by + launch() (don't try to join the same thread more than once) + n = join_all() - join all threads launched by launch() that are not joined + yet and free the resources allocated by the launches, usually to clean + up when the thread processing is done -- join_all() returns an int with + the count of the number of threads joined (join_all() should only be + called from the main thread, and should only be called after any calls + of join() have completed) + + -- Lock functions -- + + lock_t = new_lock(val) - create a new lock_t with initial value val (lock_t is + created in the released state) + possess(lock_t) - acquire exclusive possession of a lock_t, waiting if necessary + twist(lock_t, [TO | BY], val) - set lock_t to or increment lock_t by val, signal + all threads waiting on this lock_t and then release the lock_t -- must + possess the lock_t before calling (twist releases, so don't do a + release() after a twist() on the same lock_t) + wait_for(lock_t, [TO_BE | NOT_TO_BE | TO_BE_MORE_THAN | TO_BE_LESS_THAN], val) + - wait on lock_t value to be, not to be, be greater than, or be less than + val -- must possess the lock_t before calling, will possess the lock_t on + return but the lock_t is released while waiting to permit other threads + to use twist() to change the value and signal the change (so make sure + that the object is in a usable state when waiting) + release(lock_t) - release a possessed lock_t (do not try to release a lock_t that + the current thread does not possess) + val = peek_lock(lock_t) - return the value of the lock_t (assumes that lock_t is + already possessed, no possess or release is done by peek_lock()) + free_lock(lock_t) - free the resources allocated by new_lock() (application + must assure that the lock_t is released before calling free_lock()) + + -- Memory allocation --- + + yarn_mem(better_malloc, better_free) - set the memory allocation and free + routines for use by the yarn routines where the supplied routines have + the same interface and operation as malloc() and free(), and may be + provided in order to supply thread-safe memory allocation routines or + for any other reason -- by default malloc() and free() will be used + + -- Error control -- + + yarn_prefix - a char pointer to a string that will be the prefix for any + error messages that these routines generate before exiting -- if not + changed by the application, "yarn" will be used + yarn_abort - an external function that will be executed when there is an + internal yarn error, due to out of memory or misuse -- this function + may exit to abort the application, or if it returns, the yarn error + handler will exit (set to NULL by default for no action) + */ +#pragma once + +#include + +namespace yarn { + +extern char *yarn_prefix; +extern void (*yarn_abort)(int); + +void yarn_mem(void *(*)(size_t), void (*)(void *)); + +typedef struct thread_s thread; +thread *launch_(void (*)(void *), void *, char const *, long); +#define LAUNCH(a, b) launch_(a, b, __FILE__, __LINE__) +void join_(thread *, char const *, long); +#define JOIN(a) join_(a, __FILE__, __LINE__) +int join_all_(char const *, long); +#define JOIN_ALL() join_all_(__FILE__, __LINE__) + +typedef struct lock_s lock_t; +lock_t *new_lock_(long, char const *, long); +#define NEW_LOCK(a) new_lock_(a, __FILE__, __LINE__) +void possess_(lock_t *, char const *, long); +#define POSSESS(a) possess_(a, __FILE__, __LINE__) +void release_(lock_t *, char const *, long); +// #define release(a) release_(a, __FILE__, __LINE__) +#define RELEASE(a) release_(a, __FILE__, __LINE__) +enum twist_op { TO, BY }; +void twist_(lock_t *, enum twist_op, long, char const *, long); +#define TWIST(a, b, c) twist_(a, b, c, __FILE__, __LINE__) +enum wait_op { + TO_BE, + /* or */ NOT_TO_BE, /* that is the question */ + TO_BE_MORE_THAN, + TO_BE_LESS_THAN +}; +void wait_for_(lock_t *, enum wait_op, long, char const *, long); +#define WAIT_FOR(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__) +long peek_lock(lock_t *); +#define PEEK_LOCK(a) peek_lock(a) +void free_lock_(lock_t *, char const *, long); +#define FREE_LOCK(a) free_lock_(a, __FILE__, __LINE__) + +#define DEPENDENCY_NOT_TO_BE(sig, request) \ + possess_(sig, __FILE__, __LINE__); \ + wait_for_(sig, yarn::NOT_TO_BE, request, __FILE__, __LINE__); \ + release_(sig, __FILE__, __LINE__); + +#define UPDATE_SIG_ORDER(sig, order) \ + possess_(sig, __FILE__, __LINE__); \ + order += 1; \ + twist_(sig, yarn::BY, 1, __FILE__, __LINE__); + +#define SIGNAL_FINISH(sig, finish) \ + possess_(sig, __FILE__, __LINE__); \ + finish = 1; \ + twist_(sig, yarn::BY, 1, __FILE__, __LINE__); + +#define CONSUME_SIGNAL(sig) \ + possess_(sig, __FILE__, __LINE__); \ + twist_(sig, yarn::BY, -1, __FILE__, __LINE__); + +}; // namespace yarn