From 3560b5a5046f6e5665d8bd702a5c714fc380ef21 Mon Sep 17 00:00:00 2001 From: zzh Date: Sun, 14 Dec 2025 00:45:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=A4=E7=A7=8D=E4=B8=8D=E5=90=8C=E7=9A=84?= =?UTF-8?q?=E5=A4=9A=E7=BA=BF=E7=A8=8B=E6=95=B0=E6=8D=AE=E5=88=86=E9=85=8D?= =?UTF-8?q?=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/sort/sort.cpp | 74 ++++++++++++++++++++++++++++++++++++++++------- src/sort/sort.h | 6 +++- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/src/sort/sort.cpp b/src/sort/sort.cpp index e1422aa..c56047f 100644 --- a/src/sort/sort.cpp +++ b/src/sort/sort.cpp @@ -391,19 +391,75 @@ static void mtUncompressBlock(void *data, long idx, int tid) { } blockItem.bamNum = bamNum; PROF_G_END(mem_copy); + + // 判断是否超过内存阈值 + if (blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE >= nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS) { + + } } -/* 将gz block进行解压 */ +// multi-thread uncompress bam blocks +static void mtUncompressBlockBatch(void* data, long idx, int tid) { + UncompressData& p = *(UncompressData*)data; + ReadData& readData = *p.readDataPtr; + + int startIdx = START_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size()); + int stopIdx = STOP_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size()); + + auto& blockItemArr = p.blockItemArr[tid]; + + if (blockItemArr.curIdx + (stopIdx - startIdx) >= blockItemArr.blockArr.size()) { + blockItemArr.blockArr.resize(blockItemArr.curIdx + (stopIdx - startIdx)); + } + + for (int i = startIdx; i < stopIdx; ++i) { + auto& blockItem = blockItemArr.blockArr[blockItemArr.curIdx++]; + + uint8_t* block = readData.startAddrArr[i]; + + size_t dlen = SINGLE_BLOCK_SIZE; // 65535 + int block_length = unpackInt16(&block[16]) + 1; + uint32_t crc = le_to_u32(block + block_length - 8); + int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef*)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc); + if (ret != 0) { + spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret); + exit(0); + } + blockItem.blockId = i; + blockItem.blockLen = dlen; + uint32_t nextBamStart = 0; + uint32_t bamLen = 0; + uint32_t bamNum = 0; + PROF_G_BEG(mem_copy); + // memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen); + /* 解析每个bam */ + while (nextBamStart + 4 <= blockItem.blockLen) { + memcpy(&bamLen, &blockItem.data[nextBamStart], 4); + nextBamStart += 4 + bamLen; + // spdlog::info("bam len: {}", bamLen); + ++bamNum; + } + blockItem.bamNum = bamNum; + PROF_G_END(mem_copy); + } +} + +/* 将gz block进行解压,并进行线程内排序 */ static void doFirstPipeUncompress(FirstPipeArg &p) { // return; ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM]; uncompressData.readDataPtr = &readData; uncompressData.ResetBlockArr(); - +#if 1 kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size()); +#else + kt_for(p.numThread, mtUncompressBlockBatch, &uncompressData, nsgv::gSortArg.NUM_THREADS); +#endif //spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx, // uncompressData.blockItemArr[0].blockArr.size()); + // 判断是否超过内存阈值 + } @@ -418,6 +474,7 @@ static void *firstPipeUncompress(void *data) { // self dependency yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); + PROF_G_BEG(uncompress); if (p.readFinish) { while (p.uncompressOrder < p.readOrder) { yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); @@ -429,6 +486,7 @@ static void *firstPipeUncompress(void *data) { } doFirstPipeUncompress(p); + PROF_G_END(uncompress); // update status yarn::CONSUME_SIGNAL(p.readSig); @@ -495,7 +553,7 @@ static void *firstPipeMergeSort(void *data) { return nullptr; } -/* 对bam文件进行排序 */ +/* 对bam文件进行排序第一阶段,线程内排序,线程间merge,输入到中间bam文件 */ static void bamSortFirstPipe() { /* set up*/ FirstPipeArg firstPipeArg; @@ -549,20 +607,16 @@ static void bamSortSerialFirstPipe() { fclose(fpr); } -/* 对sam文件进行排序 */ -static void samSortFirstPipe() { - -} - - // entry function int doSort() { + +#if 1 nsgv::gIsBigEndian = ed_is_big(); bamSortFirstPipe(); // //bamSortSerialFirstPipe(); return 0; -#if 1 +#else /* 打开输入bam文件 */ samFile *inBamFp = sam_open_format(nsgv::gSortArg.INPUT_FILE.c_str(), "r", nullptr); if (!inBamFp) { diff --git a/src/sort/sort.h b/src/sort/sort.h index ee4e599..b198d54 100644 --- a/src/sort/sort.h +++ b/src/sort/sort.h @@ -12,6 +12,9 @@ using std::priority_queue; using std::vector; +#define START_IDX(i, nt, nele) ((i) * (nele) / (nt)) +#define STOP_IDX(i, nt, nele) (((i)+1) * (nele) / (nt)) + /* for step-1 read data from bam file */ struct ReadData { uint8_t *dataBuf = nullptr; @@ -36,7 +39,7 @@ struct ReadData { blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); } }; -/* for step-2 parallel uncompress gz blocks */ +/* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序*/ struct OneBlock { uint8_t data[SINGLE_BLOCK_SIZE]; uint32_t blockLen = 0; // 解压后的数据长度 @@ -174,6 +177,7 @@ struct FirstPipeArg { volatile int readFinish = 0; volatile int uncompressFinish = 0; + volatile int uncompressBufFull = 0; yarn::lock_t *readSig; yarn::lock_t *uncompressSig;