From 0d2c533d59fdaefbfd0767870cd958c3ffac636b Mon Sep 17 00:00:00 2001 From: zzh Date: Wed, 23 Apr 2025 22:08:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E4=B8=B2=E8=A1=8C?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/sort/sort.cpp | 261 +++++++++++++++++++++++++--------------------- 1 file changed, 143 insertions(+), 118 deletions(-) diff --git a/src/sort/sort.cpp b/src/sort/sort.cpp index 32aa9fc..27acdbb 100644 --- a/src/sort/sort.cpp +++ b/src/sort/sort.cpp @@ -25,6 +25,7 @@ namespace nsgv { SortArg gSortArg; // 参数 HeaderBuf gInHdr; // 输入文件的header bool gIsBigEndian; +DataBuffer gMarginBuf; // 用于存放跨两个gz block的解压数据 }; // namespace nsgv @@ -38,11 +39,40 @@ struct BamSortData { char *qname; // pointer to qname }; +struct BamPointer { + uint64_t offset = 0; // 地址偏移量 + uint32_t bamLen = 0; +}; + +struct BamPtrArr { + vector bamArr; + int curIdx = 0; // +}; + +struct OneBlock { + uint8_t data[SINGLE_BLOCK_SIZE]; + uint32_t blockLen = 0; // 解压后的数据长度 + uint64_t blockId = 0; // 按照顺序排列的block ID +}; + +struct ThreadBlockArr { + vector blockArr; // 解压后的数据 + int curIdx = 0; // 当前解压数据对应的vector的索引 +}; + // thread data for sorting struct SortData { - vector> bamArr; // for each thread - vector uData; // for each thread uncompressed data - vector *blockAddrArr; + /* 用在多线程解压中的数据 */ + vector *blockAddrArr; // 等待解压的gz block + vector blockItemArr; // 每个thread一个,用来保存解压后的block数据 + volatile int uncompressFinish = 0; // 是否解压完成 + + /* 单线程解析用到的数据 */ + DataBuffer bamData; // 解压后的数据存放在连续的地址空间中 + BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度 + + + int round = 0; }; @@ -188,101 +218,110 @@ int parseBam(uint8_t *data, bam1_t *b) { // multi-thread uncompress bam blocks static void mtUncompressBlock(void *data, long idx, int tid) { auto &p = *(SortData*) data; - auto &bamArr = p.bamArr[tid]; - auto &udata = p.uData[tid]; + auto &blockItemArr = p.blockItemArr[tid]; - // if (udata.maxLen < nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS) { - // udata.allocMem(nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS); - // } - // return; - - // spdlog::info("start: {} end: {} size: {} idx: {}, tid: {}", start, end, p.blockAddrArr->size(), idx, tid); - - long start = idx; - int step = nsgv::gSortArg.NUM_THREADS; - // long stop = p.blockAddrArr->size(); - long stop = idx + 1; - - // int chunkSize = (p.blockAddrArr->size() + nsgv::gSortArg.NUM_THREADS - 1) / nsgv::gSortArg.NUM_THREADS; - // long start = idx * chunkSize; - // int step = 1; - // long stop = std::min((idx + 1) * chunkSize, (long)p.blockAddrArr->size()); - - uint8_t buf[65535]; - // if (tid == 0 ) spdlog::info("round:{} tid:{} start:{} stop:{}", p.round, tid, start, stop); - - for (long i = start; i < stop; i += step) { - uint8_t *block = (*p.blockAddrArr)[i]; - size_t dlen = 0x10000; // 65535 - if (udata.maxLen - udata.curLen < dlen) { - // spdlog::info("re alloc buf, maxLen: {} M", udata.maxLen / 1024 / 1024); - udata.maxLen += INCREASE_SIZE; - udata.data = (uint8_t *)realloc(udata.data, udata.maxLen); - } - int block_length = unpackInt16(&block[16]) + 1; - uint32_t crc = le_to_u32(block + block_length - 8); - int ret = bgzfUncompress(buf, &dlen, (Bytef *)block + 18, block_length - 18, crc); - // spdlog::info("BamSortData size: {}, bam size: {}", sizeof(BamSortData), *(uint32_t *)(udata.data + - // udata.curLen)); - // spdlog::info("i: {} slen: {} dlen: {}", i, block_length, dlen); - // 不用对bam进行完全解析,反正只需要排序就行 - // int pos = 0; - // uint8_t *ud = udata.data + udata.curLen; - // bam1_t *bp = bam_init1(); - //while (pos < dlen) { - // bamArr.push_back(BamSortData()); - // BamSortData &b = bamArr.back(); - // b.bamLen = *(uint32_t *)(buf + pos); - // // // spdlog::info("bamLen: {}", b.bamLen); - // pos += 4 + b.bamLen; - // // if (b.bamLen > 1000) - // // spdlog::info("tid: {} record len: {}, dlen: {}", tid, b.bamLen, dlen); - // // exit(0); - // // int bamLen = parseBam(buf, bp); - // // exit(0); - // // spdlog::info("bamLen: {}", b.bamLen); - // // pos += bamLen; - //} - // exit(0); - // usleep(1); - PROF_T_BEG(mem_copy); - memcpy(udata.data + udata.curLen, buf, dlen); - PROF_T_END(tid, mem_copy); - udata.curLen += dlen * 1; + if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) { + blockItemArr.blockArr.push_back(OneBlock()); } -} -// multi-thread sort bam blocks -static void mtSortBlocks(void *data, long idx, int tid) { - auto &p = *(SortData *)data; - auto &bamArr = p.bamArr[tid]; - auto &uData = p.uData[tid]; + auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++]; + + uint8_t *block = (*p.blockAddrArr)[idx]; + + size_t dlen = SINGLE_BLOCK_SIZE; // 65535 + int block_length = unpackInt16(&block[16]) + 1; + uint32_t crc = le_to_u32(block + block_length - 8); + int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef *)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc); + + blockItem.blockId = idx; + blockItem.blockLen = dlen; } static void mergeSortedBlocks(const SortData &sortData) { } -static void *nonBlockingUncompress(void *data) { - PROF_G_BEG(uncompress); - if (((SortData *)data)->round % 10 == 0) - spdlog::info("round-{} block size: {}", ((SortData *)data)->round++, ((SortData *)data)->blockAddrArr->size()); - else - ((SortData *)data)->round++; +static void *threadMergeParseBlocks(void *data) { + return nullptr; + + auto &p = *(SortData *)data; + DataBuffer &buf = p.bamData; - // spdlog::info("block arr size: {}", ((SortData *)data)->blockAddrArr->size()); - kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, data, ((SortData *)data)->blockAddrArr->size()); - // kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, data, nsgv::gSortArg.NUM_THREADS); - PROF_G_END(uncompress); + int idx = 0; + int tIdx[nsgv::gSortArg.NUM_THREADS] = {0}; + while (!p.uncompressFinish) { + for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { + auto &blockItemArr = p.blockItemArr[t]; + if (tIdx[t] < blockItemArr.curIdx && blockItemArr.blockArr[tIdx[t]].blockId == idx) { + //spdlog::info("1. thread-{} arr idx: {}, block id: {}, size: {}, idx: {}", t, tIdx[t], + // blockItemArr.blockArr[tIdx[t]].blockId, blockItemArr.curIdx, idx); + ++tIdx[t]; + ++idx; + break; + } + } + } + // 处理结尾数据 + int totalBlockNum = 0; + for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { + auto &blockItemArr = p.blockItemArr[t]; + totalBlockNum += blockItemArr.curIdx; + } +/* spdlog::info("here, idx: {}, total: {}", idx, totalBlockNum); + while (idx < totalBlockNum) { + for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { + auto &blockItemArr = p.blockItemArr[t]; + // spdlog::info("t:{}, tidx:{}, block id:{}, idx:{}", t, tIdx[t], blockItemArr.blockArr[tIdx[t]].blockId, idx); + if (tIdx[t] < blockItemArr.curIdx && blockItemArr.blockArr[tIdx[t]].blockId == idx) { + //spdlog::info("2. thread-{} block id: {}, size: {}", t, tIdx[t], blockItemArr.curIdx); + ++tIdx[t]; + ++idx; + break; + } + } + // break; + } +*/ + //spdlog::info("threadMergeParseBlocks exit, idx: {}", idx); + // exit(0); return nullptr; } -static size_t uncommpressedDataSize(const SortData &sortData) { - size_t curSize = 0; - for (int i = 0; i < sortData.uData.size(); ++i) { - curSize += sortData.uData[i].curLen; +static void *nonBlockingUncompress(void *data) { + PROF_G_BEG(uncompress); + pthread_t parseTid = 0; + + if (((SortData *)data)->round % 10 == 0) + spdlog::info("round-{} block size: {}", ((SortData *)data)->round, ((SortData *)data)->blockAddrArr->size()); + + SortData &sortData = *(SortData *)data; + for (auto &arr: sortData.blockItemArr) arr.curIdx = 0; + sortData.uncompressFinish = 0; + + // 1. 开启一个线程,用来将每个线程解压的数据放到一起 + pthread_create(&parseTid, NULL, threadMergeParseBlocks, &sortData); + + // 2. 调用并行解压的线程函数 + kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, data, ((SortData *)data)->blockAddrArr->size()); + sortData.uncompressFinish = 1; + int totalBlockNum = 0; + for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { + auto &blockItemArr = sortData.blockItemArr[t]; + totalBlockNum += blockItemArr.curIdx; } - return curSize; + // spdlog::info("uncompress finish, total block num: {}", totalBlockNum); + + // 3. 等待负责数据合并的线程结束 + pthread_join(parseTid, NULL); + + // 4. 如果当前读入的数据超过设定的参数,则进行一次排序 + + PROF_G_END(uncompress); + + // for (auto &arr : sortData.blockItemArr) spdlog::info("block size: {}", arr.curIdx); + + ((SortData *)data)->round++; + return nullptr; } template @@ -292,22 +331,6 @@ static void switchPointer(T *p1, T *p2) { *p2 = tmp; } -static size_t threadMemSize(SortData &sortData) { - size_t memSize = 0; - for (int i = 0; i < sortData.uData.size(); ++i) { - memSize += sortData.uData[i].curLen; - } - return memSize; -} - -static size_t threadMaxMemSize(SortData &sortData) { - size_t memSize = 0; - for (int i = 0; i < sortData.uData.size(); ++i) { - memSize += sortData.uData[i].maxLen; - } - return memSize; -} - void *threadRead(void *data) { spdlog::info("test file start"); FILE *fp = (FILE*) data; @@ -322,6 +345,8 @@ void *threadRead(void *data) { return NULL; } + +// entry function int doSort() { #if 0 @@ -358,13 +383,23 @@ int doSort() { return 0; #endif + // 初始化算法用到的数据结构 SortData sortData; - sortData.bamArr.resize(nsgv::gSortArg.NUM_THREADS); - sortData.uData.resize(nsgv::gSortArg.NUM_THREADS); - for (int i = 0; i < nsgv::gSortArg.NUM_THREADS; ++i) { - sortData.uData[i].allocMem(nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS); + //sortData.bamArr.resize(nsgv::gSortArg.NUM_THREADS); + //sortData.uData.resize(nsgv::gSortArg.NUM_THREADS); + //for (int i = 0; i < nsgv::gSortArg.NUM_THREADS; ++i) { + // sortData.uData[i].allocMem(nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS); + //} + + sortData.blockItemArr.resize(nsgv::gSortArg.NUM_THREADS); + for(auto &arr: sortData.blockItemArr) { + arr.blockArr.resize(1000); } + // sortData.bamPtrArr.resize(nsgv::gSortArg.NUM_THREADS); + + + nsgv::gIsBigEndian = ed_is_big(); // 打开文件 @@ -437,7 +472,7 @@ int doSort() { memcpy(&curBlock[lastBufRemain], &curBuf[0], curReadPos); // 将不完整的block剩余数据拷贝到curBlock curStartAddrArr->push_back(&curBlock[0]); } - + /* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */ while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */ blockLen = unpackInt16(&curBuf[curReadPos + 16]) + 1; if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */ @@ -447,11 +482,12 @@ int doSort() { break; /* 当前block数据不完整,一部分在还没读入的file数据里 */ } } + /* 如果buf中包含不完整的block数据,先保存一下,放到下一轮里去处理 */ lastBufRemain = readState - curReadPos; - // spdlog::info("lastBufRemain: {}", lastBufRemain); if (lastBufRemain > 0) { memcpy(halfBlock, &curBuf[curReadPos], lastBufRemain); // 将不完整的block拷贝到halfBlock } + // spdlog::info("lastBufRemain: {}", lastBufRemain); totalBlocks += curStartAddrArr->size(); PROF_G_END(parse_block); @@ -460,22 +496,10 @@ int doSort() { // 并行解压 PROF_G_BEG(sort); pthread_join(uncompressTid, NULL); - { - // 如果sortData的解压buf满了,那么进行排序并输出 - if (true) { - //if (threadMemSize(sortData) + MARGIN_SIZE >= nsgv::gSortArg.MAX_MEM) { - // spdlog::info("clear buf data, buf size: {} G, {} G", threadMemSize(sortData) / 1024.0 / 1024 / 1024, threadMaxMemSize(sortData) / 1024.0 / 1024 / 1024); - for (int i = 0; i < sortData.uData.size(); ++i) { - sortData.uData[i].curLen = 0; // 清空数据 - sortData.bamArr[i].clear(); - } - } - } PROF_G_END(sort); sortData.blockAddrArr = curStartAddrArr; pthread_create(&uncompressTid, NULL, nonBlockingUncompress, &sortData); - //nonBlockingUncompress(&sortData); #endif // 交换buf指针 @@ -498,6 +522,7 @@ err: free(fbuf[1]); free(fbuf[2]); free(fbuf[3]); + free(fbuf[4]); fclose(fpr); // fclose(fpw);