diff --git a/src/sort/sam_io.cpp b/src/sort/sam_io.cpp index 281d45b..9a807b7 100644 --- a/src/sort/sam_io.cpp +++ b/src/sort/sam_io.cpp @@ -13,6 +13,19 @@ namespace nsgv { extern bool gIsBigEndian; }; +size_t fileSize(FILE *fp) { + spdlog::info("test file start"); + size_t bufSize = 64L * 1024 * 1024; + uint8_t *testBuf = (uint8_t *)malloc(bufSize); + size_t readState = 0; + size_t fileSize = 0; + while ((readState = fread(testBuf, 1, bufSize, fp)) > 0) { + fileSize += readState; + } + spdlog::info("test file size: {}", fileSize); + return fileSize; +} + int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen, uint32_t expected_crc) { struct libdeflate_decompressor *z = libdeflate_alloc_decompressor(); if (!z) { diff --git a/src/sort/sam_io.h b/src/sort/sam_io.h index f9b41b3..cd83246 100644 --- a/src/sort/sam_io.h +++ b/src/sort/sam_io.h @@ -44,12 +44,6 @@ struct HeaderBuf { sam_hdr_t *header; }; -// 解压之后的数据结构 -struct UBlockBuf { - uint8_t *data; - -}; - static int unpackInt16(const uint8_t *buffer) { return buffer[0] | buffer[1] << 8; } void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf); diff --git a/src/sort/sort.cpp b/src/sort/sort.cpp index c56047f..adbbcf2 100644 --- a/src/sort/sort.cpp +++ b/src/sort/sort.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include @@ -21,6 +22,8 @@ #include "util/profiling.h" #include "util/yarn.h" +using std::string; + #define BAM_BLOCK_SIZE 16L * 1024 * 1024 namespace nsgv { @@ -42,224 +45,6 @@ struct BamSortData { char *qname; // pointer to qname }; -// thread data for sorting -struct SortData { - /* 用在多线程解压中的数据 */ - vector *blockAddrArr; // 等待解压的gz block - vector blockItemArr; // 每个thread一个,用来保存解压后的block数据 - volatile int uncompressFinish = 0; // 是否解压完成 - - /* 单线程解析用到的数据 */ - DataBuffer bamData; // 解压后的数据存放在连续的地址空间中 - BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度 - - - - int round = 0; -}; - - -int sam_realloc_bam_data(bam1_t *b, size_t desired) { - uint32_t new_m_data; - uint8_t *new_data; - new_m_data = desired; - kroundup32(new_m_data); // next power of 2 - new_m_data += 32; // reduces malloc arena migrations? - if (new_m_data < desired) { - errno = ENOMEM; // Not strictly true but we can't store the size - return -1; - } -#ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION - if (new_m_data > FUZZ_ALLOC_LIMIT) { - errno = ENOMEM; - return -1; - } -#endif - if ((bam_get_mempolicy(b) & BAM_USER_OWNS_DATA) == 0) { - new_data = (uint8_t*)realloc(b->data, new_m_data); - } else { - if ((new_data = (uint8_t *)malloc(new_m_data)) != NULL) { - if (b->l_data > 0) - memcpy(new_data, b->data, b->l_data < b->m_data ? b->l_data : b->m_data); - bam_set_mempolicy(b, bam_get_mempolicy(b) & (~BAM_USER_OWNS_DATA)); - } - } - if (!new_data) - return -1; - b->data = new_data; - b->m_data = new_m_data; - return 0; -} - -static inline int realloc_bam_data(bam1_t *b, size_t desired) { - if (desired <= b->m_data) - return 0; - return sam_realloc_bam_data(b, desired); -} - -// Fix bad records where qname is not terminated correctly. -static int fixup_missing_qname_nul(bam1_t *b) { - bam1_core_t *c = &b->core; - - // Note this is called before c->l_extranul is added to c->l_qname - if (c->l_extranul > 0) { - b->data[c->l_qname++] = '\0'; - c->l_extranul--; - } else { - if (b->l_data > INT_MAX - 4) - return -1; - if (realloc_bam_data(b, b->l_data + 4) < 0) - return -1; - b->l_data += 4; - b->data[c->l_qname++] = '\0'; - c->l_extranul = 3; - } - return 0; -} - -static void bam_cigar2rqlens(int n_cigar, const uint32_t *cigar, hts_pos_t *rlen, hts_pos_t *qlen) { - int k; - *rlen = *qlen = 0; - for (k = 0; k < n_cigar; ++k) { - int type = bam_cigar_type(bam_cigar_op(cigar[k])); - int len = bam_cigar_oplen(cigar[k]); - if (type & 1) - *qlen += len; - if (type & 2) - *rlen += len; - } -} - - -// multi-thread uncompress bam blocks -static void mtUncompressBlock1(void *data, long idx, int tid) { - auto &p = *(SortData*) data; - auto &blockItemArr = p.blockItemArr[tid]; - - if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) { - blockItemArr.blockArr.push_back(OneBlock()); - } - - auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++]; - - uint8_t *block = (*p.blockAddrArr)[idx]; - - size_t dlen = SINGLE_BLOCK_SIZE; // 65535 - int block_length = unpackInt16(&block[16]) + 1; - uint32_t crc = le_to_u32(block + block_length - 8); - int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef *)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc); - - blockItem.blockId = idx; - blockItem.blockLen = dlen; -} - -static void mergeSortedBlocks(const SortData &sortData) { - -} - -static void *threadMergeParseBlocks(void *data) { - return nullptr; - - auto &p = *(SortData *)data; - DataBuffer &buf = p.bamData; - - int idx = 0; - int tIdx[nsgv::gSortArg.NUM_THREADS] = {0}; - while (!p.uncompressFinish) { - for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { - auto &blockItemArr = p.blockItemArr[t]; - if (tIdx[t] < blockItemArr.curIdx && blockItemArr.blockArr[tIdx[t]].blockId == idx) { - //spdlog::info("1. thread-{} arr idx: {}, block id: {}, size: {}, idx: {}", t, tIdx[t], - // blockItemArr.blockArr[tIdx[t]].blockId, blockItemArr.curIdx, idx); - ++tIdx[t]; - ++idx; - break; - } - } - } - // 处理结尾数据 - int totalBlockNum = 0; - for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { - auto &blockItemArr = p.blockItemArr[t]; - totalBlockNum += blockItemArr.curIdx; - } -/* spdlog::info("here, idx: {}, total: {}", idx, totalBlockNum); - while (idx < totalBlockNum) { - for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { - auto &blockItemArr = p.blockItemArr[t]; - // spdlog::info("t:{}, tidx:{}, block id:{}, idx:{}", t, tIdx[t], blockItemArr.blockArr[tIdx[t]].blockId, idx); - if (tIdx[t] < blockItemArr.curIdx && blockItemArr.blockArr[tIdx[t]].blockId == idx) { - //spdlog::info("2. thread-{} block id: {}, size: {}", t, tIdx[t], blockItemArr.curIdx); - ++tIdx[t]; - ++idx; - break; - } - } - // break; - } -*/ - //spdlog::info("threadMergeParseBlocks exit, idx: {}", idx); - // exit(0); - return nullptr; -} - -static void *nonBlockingUncompress(void *data) { - PROF_G_BEG(uncompress); - pthread_t parseTid = 0; - - if (((SortData *)data)->round % 10 == 0) - spdlog::info("round-{} block size: {}", ((SortData *)data)->round, ((SortData *)data)->blockAddrArr->size()); - - SortData &sortData = *(SortData *)data; - for (auto &arr: sortData.blockItemArr) arr.curIdx = 0; - sortData.uncompressFinish = 0; - - // 1. 开启一个线程,用来将每个线程解压的数据放到一起 - pthread_create(&parseTid, NULL, threadMergeParseBlocks, &sortData); - - // 2. 调用并行解压的线程函数 - kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock1, data, ((SortData *)data)->blockAddrArr->size()); - sortData.uncompressFinish = 1; - int totalBlockNum = 0; - for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) { - auto &blockItemArr = sortData.blockItemArr[t]; - totalBlockNum += blockItemArr.curIdx; - } - // spdlog::info("uncompress finish, total block num: {}", totalBlockNum); - - // 3. 等待负责数据合并的线程结束 - pthread_join(parseTid, NULL); - - // 4. 如果当前读入的数据超过设定的参数,则进行一次排序 - - PROF_G_END(uncompress); - - // for (auto &arr : sortData.blockItemArr) spdlog::info("block size: {}", arr.curIdx); - - ((SortData *)data)->round++; - return nullptr; -} - -template -static void switchPointer(T *p1, T *p2) { - const T tmp = *p1; - *p1 = *p2; - *p2 = tmp; -} - -void *threadRead(void *data) { - spdlog::info("test file start"); - FILE *fp = (FILE*) data; - size_t bufSize = 64L * 1024 * 1024; - uint8_t *testBuf = (uint8_t *)malloc(bufSize); - size_t readState = 0; - size_t fileSize = 0; - while ((readState = fread(testBuf, 1, bufSize, fp)) > 0) { - fileSize += readState; - } - spdlog::info("test file size: {}", fileSize); - return NULL; -} /* 将bam文件内容读取到buf,解析buf中的gz block长度信息 */ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *fpr) { @@ -272,10 +57,6 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE * readState = fread(readData.dataBuf, 1, readData.readBufSize, fpr); if (readState == 0) { return 0; } - //if (p.readOrder == 278) { - // spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); - //} - readData.startAddrArr.clear(); /* 处理上一个不完整的block */ @@ -283,7 +64,7 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE * if (halfBlock.readPos < BLOCK_HEADER_LENGTH) { // 上一轮剩余数据不满足解析block长度信息 memcpy(&halfBlock.data[halfBlock.readPos], readData.dataBuf, BLOCK_HEADER_LENGTH - halfBlock.readPos); halfBlock.curLen = unpackInt16(&halfBlock.data[16]) + 1; // 更新一下剩余block的真正长度 - spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); + // spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); } memcpy(readData.blockBuf, halfBlock.data, halfBlock.readPos); curReadPos = halfBlock.curLen - halfBlock.readPos; // curlen保存上一个block的长度,readPos保存上一个block在上一次读取中的长度 @@ -291,7 +72,7 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE * readData.startAddrArr.push_back(readData.blockBuf); } /* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */ - while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */ + while (curReadPos + BLOCK_HEADER_LENGTH <= readState) { /* 确保能解析block长度 */ blockLen = unpackInt16(&readData.dataBuf[curReadPos + 16]) + 1; if (blockLen > maxBlockLen) { maxBlockLen = blockLen; } if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */ @@ -307,11 +88,9 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE * if (halfBlock.readPos > 0) { memcpy(halfBlock.data, &readData.dataBuf[curReadPos], halfBlock.readPos); // 将不完整的block拷贝到halfBlock } - //if (p.readOrder == 277) { - // spdlog::info("tail - last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); - //} - //spdlog::info("block num-1: {}", readData.startAddrArr.size()); - //spdlog::info("max block len: {}", maxBlockLen); + + // spdlog::info("block num-1: {}", readData.startAddrArr.size()); + //spdlog::info("read order: {}, max block len: {}", p.readOrder, maxBlockLen); return readState; } @@ -361,6 +140,7 @@ static void mtUncompressBlock(void *data, long idx, int tid) { if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) { blockItemArr.blockArr.push_back(OneBlock()); } + auto &bamItemArr = p.bamItemArr[tid]; auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++]; @@ -381,15 +161,33 @@ static void mtUncompressBlock(void *data, long idx, int tid) { uint32_t bamLen = 0; uint32_t bamNum = 0; PROF_G_BEG(mem_copy); - // memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen); /* 解析每个bam */ while (nextBamStart + 4 <= blockItem.blockLen) { - memcpy(&bamLen, &blockItem.data[nextBamStart], 4); - nextBamStart += 4 + bamLen; + bamItemArr.bamArr.push_back(OneBam()); + OneBam &bam = bamItemArr.bamArr.back(); + + bam.addr = &blockItem.data[nextBamStart]; + uint8_t *curAddr = &blockItem.data[nextBamStart]; + memcpy(&bamLen, curAddr, 4); + curAddr += 4; + if (nsgv::gIsBigEndian) ed_swap_4p(&bamLen); + bam.bamLen = bamLen; + bam.tid = le_to_u32(curAddr); + bam.pos = le_to_i32(curAddr + 4); + uint32_t x2 = le_to_u32(curAddr + 8); + bam.qnameLen = x2 & 0xff; + bam.qnameAddr = (char *)(curAddr + 32); + // spdlog::info("bam len: {}", bamLen); + nextBamStart += 4 + bamLen; ++bamNum; } + if (nextBamStart != blockItem.blockLen) { + spdlog::error("Block content does not contain integer number of bam records!"); + exit(0); + } blockItem.bamNum = bamNum; + PROF_G_END(mem_copy); // 判断是否超过内存阈值 @@ -447,10 +245,13 @@ static void mtUncompressBlockBatch(void* data, long idx, int tid) { /* 将gz block进行解压,并进行线程内排序 */ static void doFirstPipeUncompress(FirstPipeArg &p) { // return; + PROF_G_BEG(uncompress); ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM]; uncompressData.readDataPtr = &readData; uncompressData.ResetBlockArr(); + uncompressData.ResetBamArr(); + #if 1 kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size()); #else @@ -458,9 +259,13 @@ static void doFirstPipeUncompress(FirstPipeArg &p) { #endif //spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx, // uncompressData.blockItemArr[0].blockArr.size()); + // 判断是否超过内存阈值 + // auto &bam = uncompressData.bamItemArr[0].bamArr.back(); + // string qname(bam.qnameAddr, bam.qnameLen); + // spdlog::info("bam name:{}", qname); - + PROF_G_END(uncompress); } /* FirstPipe step-2 并行解压gz blocks*/ @@ -499,29 +304,25 @@ static void *firstPipeUncompress(void *data) { /* 将并行解压的数据放到一起,解析,到容量阈值后,进行排序并输出到中间文件 */ void doFirstPipeMergeSort(FirstPipeArg &p) { + PROF_G_BEG(sort); UncompressData &uncompressData = p.uncompressData[p.mergeSortOrder % p.UNCOMPRESS_BUF_NUM]; - DataBuffer &buf = p.mergeSortData.bamData; - BlockHeap blockHeap; - blockHeap.Init(&uncompressData.blockItemArr); - // spdlog::info("block heap size: {}", blockHeap.Size()); - // spdlog::info("all block bytes: {}", blockHeap.AllBlockBytes()); - // size_t allBlockBytes = blockHeap.AllBlockBytes(); - // if (buf.maxLen < buf.curLen + allBlockBytes) { - // // spdlog::info("here "); - // buf.reAllocMem(buf.curLen + allBlockBytes); - // } - // memset(buf.data, 0, buf.maxLen); - - const OneBlock *pB; - size_t bamNum = 0; size_t blockNum = 0; - while ((pB = blockHeap.Pop()) != nullptr && pB->blockLen > 0) { - ++blockNum; - bamNum += pB->bamNum; + for (int i = 0; i < p.numThread; ++i) { + blockNum += uncompressData.blockItemArr[i].curIdx; } - buf.curLen = 0; - spdlog::info("block num: {}, bam num: {}", blockNum, bamNum); + size_t bamNum = blockNum * 256; + + size_t curBlockBamMemSize = blockNum * SINGLE_BLOCK_SIZE + bamNum * 32; + /* 内存使用量达到阈值后,进行排序 */ + if (curBlockBamMemSize > nsgv::gSortArg.MAX_MEM / 2) { + uncompressData.ResetBlockArr(); + uncompressData.ResetBamArr(); + } + + //spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockBytes, bamNum * 32); + spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockNum * SINGLE_BLOCK_SIZE, bamNum * 32); + PROF_G_END(sort); } /* FirstPipe step-3 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 */ @@ -566,6 +367,7 @@ static void bamSortFirstPipe() { firstPipeArg.uncompressData[i].Resize(firstPipeArg.numThread, 128); } + PROF_G_BEG(mid_all); /* create threads */ pthread_t pipeThreadIdArr[3]; // 3-stage pipeline pthread_create(&pipeThreadIdArr[0], NULL, firstPipeReadFile, &firstPipeArg); @@ -573,6 +375,7 @@ static void bamSortFirstPipe() { pthread_create(&pipeThreadIdArr[2], NULL, firstPipeMergeSort, &firstPipeArg); for (int i = 0; i < 3; ++i) pthread_join(pipeThreadIdArr[i], NULL); + PROF_G_END(mid_all); } /* IO同步的方式进行排序 */ @@ -607,16 +410,23 @@ static void bamSortSerialFirstPipe() { fclose(fpr); } +/* 对sam文件进行排序 */ +static void samSortFirstPipe() { + +} + + // entry function int doSort() { #if 1 nsgv::gIsBigEndian = ed_is_big(); bamSortFirstPipe(); - // //bamSortSerialFirstPipe(); - return 0; + spdlog::info("OneBam size: {}", sizeof(OneBam)); + //bamSortSerialFirstPipe(); #else + /* 打开输入bam文件 */ samFile *inBamFp = sam_open_format(nsgv::gSortArg.INPUT_FILE.c_str(), "r", nullptr); if (!inBamFp) { @@ -647,151 +457,9 @@ int doSort() { } sam_close(inBamFp); spdlog::info("max record len: {}", max_bam_len); - return 0; #endif - // 初始化算法用到的数据结构 - SortData sortData; - //sortData.bamArr.resize(nsgv::gSortArg.NUM_THREADS); - //sortData.uData.resize(nsgv::gSortArg.NUM_THREADS); - //for (int i = 0; i < nsgv::gSortArg.NUM_THREADS; ++i) { - // sortData.uData[i].allocMem(nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS); - //} - - sortData.blockItemArr.resize(nsgv::gSortArg.NUM_THREADS); - for(auto &arr: sortData.blockItemArr) { -// arr.blockArr.resize(1000); - } - // sortData.bamPtrArr.resize(nsgv::gSortArg.NUM_THREADS); - - - - nsgv::gIsBigEndian = ed_is_big(); - - // 打开文件 - FILE *fpr = fopen(nsgv::gSortArg.INPUT_FILE.c_str(), "rb"); - - // threadRead(fpr); exit(0); - - parseSamHeader(fpr, nsgv::gInHdr); - // exit(0); - - // FILE *fpw = fopen(nsgv::gSortArg.OUTPUT_FILE.c_str(), "rb"); - - const size_t READ_BUFSIZE = 4L * 1024 * 1024 * nsgv::gSortArg.NUM_THREADS; - - uint8_t *fbuf[5]; - fbuf[0] = (uint8_t *)malloc(READ_BUFSIZE); - fbuf[1] = (uint8_t *)malloc(READ_BUFSIZE); - fbuf[2] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); - fbuf[3] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); - fbuf[4] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); - - - uint8_t *curBuf = fbuf[0]; - uint8_t *preBuf = fbuf[1]; - uint8_t *preBlock = fbuf[2]; - uint8_t *curBlock = fbuf[3]; - uint8_t *halfBlock = fbuf[4]; - - size_t fsize = 0; - size_t block_num = 0; - int round = 0; - size_t totalBlocks = 0; - - /* - 有几种情况 - 第一种:上一个buf里,只剩17字节,那么不能解析下一个block的长度 - 第二种:上一个buf里,完全不剩字节,即上一个buf读入完成时正好卡在下一个block开头 - 第三种:上一个buf里,剩余18字节,只能解析下一个block的长度 - 第四种:上一个buf里,剩余超过18字节,但是没有放下所有block的数据 - - 简而言之:1. 够解析下一个block的header,不够解析完整的block - 2. 不够解析下一个block的header。 - */ - size_t readState = 0; - size_t curReadPos = 0; - int blockLen = 0; - vector startAddrArr[2]; - vector *curStartAddrArr = &startAddrArr[0]; - vector *preStartAddrArr = &startAddrArr[1]; - size_t lastBufRemain = 0; - pthread_t uncompressTid = 0; - pthread_t sortMergeTid = 0; - - PROF_G_BEG(mid_all); - - PROF_G_BEG(read); - readState = fread(curBuf, 1, READ_BUFSIZE, fpr); - PROF_G_END(read); - - while (readState > 0) { - fsize += readState; - PROF_G_BEG(parse_block); - - curStartAddrArr->clear(); - curReadPos = blockLen - lastBufRemain; /* 上一个buf里剩余的block数据 */ - /* 处理上一个不完整的block */ - if (lastBufRemain > 0) { // 上一轮有剩余 - memcpy(curBlock, halfBlock, lastBufRemain); - memcpy(&curBlock[lastBufRemain], &curBuf[0], curReadPos); // 将不完整的block剩余数据拷贝到curBlock - curStartAddrArr->push_back(&curBlock[0]); - } - /* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */ - while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */ - blockLen = unpackInt16(&curBuf[curReadPos + 16]) + 1; - if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */ - curStartAddrArr->push_back(&curBuf[curReadPos]); - curReadPos += blockLen; - } else { - break; /* 当前block数据不完整,一部分在还没读入的file数据里 */ - } - } - /* 如果buf中包含不完整的block数据,先保存一下,放到下一轮里去处理 */ - lastBufRemain = readState - curReadPos; - if (lastBufRemain > 0) { - memcpy(halfBlock, &curBuf[curReadPos], lastBufRemain); // 将不完整的block拷贝到halfBlock - } - // spdlog::info("lastBufRemain: {}", lastBufRemain); - - totalBlocks += curStartAddrArr->size(); - PROF_G_END(parse_block); - -#if 0 - // 并行解压 - PROF_G_BEG(sort); - pthread_join(uncompressTid, NULL); - PROF_G_END(sort); - - sortData.blockAddrArr = curStartAddrArr; - pthread_create(&uncompressTid, NULL, nonBlockingUncompress, &sortData); -#endif - - // 交换buf指针 - switchPointer(&curBuf, &preBuf); - switchPointer(&curStartAddrArr, &preStartAddrArr); - switchPointer(&curBlock, &preBlock); - - PROF_G_BEG(read); - readState = fread(curBuf, 1, READ_BUFSIZE, fpr); - PROF_G_END(read); - } - pthread_join(uncompressTid, NULL); - PROF_G_END(mid_all); - - - spdlog::info("total blocks: {}", totalBlocks); - fprintf(stderr, "file size: %ld\n\n", fsize); -err: - free(fbuf[0]); - free(fbuf[1]); - free(fbuf[2]); - free(fbuf[3]); - free(fbuf[4]); - fclose(fpr); - // fclose(fpw); - return 0; } diff --git a/src/sort/sort.h b/src/sort/sort.h index b198d54..6366969 100644 --- a/src/sort/sort.h +++ b/src/sort/sort.h @@ -40,6 +40,20 @@ struct ReadData { } }; /* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序*/ +struct OneBam { + uint16_t bamLen = 0; + uint16_t qnameLen = 0; + uint32_t tid = 0; + char *qnameAddr = 0; // qname的地址 + uint64_t pos = 0; // mapping 位置 + uint8_t *addr = 0; // 地址 +}; + +struct ThreadBamArr { + vector bamArr; + int curIdx = 0; // +}; + struct OneBlock { uint8_t data[SINGLE_BLOCK_SIZE]; uint32_t blockLen = 0; // 解压后的数据长度 @@ -52,6 +66,8 @@ struct ThreadBlockArr { }; struct UncompressData { vector blockItemArr; // 每个thread一个,用来保存解压后的block数据 + vector bamItemArr; // 每个thread一个,用来保存解压后的bam数据 + ReadData *readDataPtr = nullptr; // 读取数据的指针 UncompressData() { } @@ -59,12 +75,14 @@ struct UncompressData { UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); } void Resize(int numThread) { blockItemArr.resize(numThread); + bamItemArr.resize(numThread); for (int i = 0; i < numThread; ++i) { blockItemArr[i].blockArr.reserve(128); } } void Resize(int numThread, int vecInitSize) { blockItemArr.resize(numThread); + bamItemArr.resize(numThread); for (int i = 0; i < numThread; ++i) { blockItemArr[i].blockArr.reserve(vecInitSize); } @@ -74,6 +92,12 @@ struct UncompressData { blockItemArr[i].curIdx = 0; } } + void ResetBamArr() { + for (int i = 0; i < bamItemArr.size(); ++i) { + // bamItemArr[i].curIdx = 0; + bamItemArr[i].bamArr.clear(); + } + } }; /* block 排序堆 */ struct BlockArrIdIdx { @@ -146,21 +170,12 @@ struct BlockHeap { }; /* for step-3 serial merge blocks and sort them */ -struct BamPointer { - uint64_t offset = 0; // 地址偏移量 - uint32_t bamLen = 0; -}; - -struct BamPtrArr { - vector bamArr; - int curIdx = 0; // -}; struct MergeSortData { DataBuffer bamData; // 用来保存解压后的数据 - BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度 + // BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度 MergeSortData() { - bamPtrArr.bamArr.reserve(128); + // bamPtrArr.bamArr.reserve(128); } };