#include "sort.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "const_val.h" #include "sam_io.h" #include "sort_args.h" #include "sort_impl.h" #include "util/profiling.h" #include "util/yarn.h" #include "process_header.h" #include "phase_1.h" #include "phase_2.h" #include "compress.h" #include "decompress.h" using std::string; #define BAM_BLOCK_SIZE 16L * 1024 * 1024 namespace nsgv { SortArg gSortArg; // 参数 HeaderBuf gInHdr; // 输入文件的header bool gIsBigEndian; DataBuffer gMarginBuf; // 用于存放跨两个gz block的解压数据 }; // namespace nsgv struct BamSortData { uint16_t flag; uint16_t qnameLen; uint16_t bamLen; int32_t tid; int64_t pos; uint8_t *uDataPos; char *qname; // pointer to qname }; /* 将bam文件内容读取到buf,解析buf中的gz block长度信息 */ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *fpr) { ReadBuffer &readData = p.readData[p.readOrder % p.READ_BUF_NUM]; size_t readState = 0; size_t curReadPos = 0; int blockLen = 0; int maxBlockLen = 0; readState = fread(readData.dataBuf, 1, readData.readBufSize, fpr); if (readState == 0) { return 0; } readData.startAddrArr.clear(); /* 处理上一个不完整的block */ // 需要一个额外的空间来保存上一个不完整的block数据,因为readbuffer里的data和block都会用来解析,而且解析之后都会清空。 if (halfBlock.readPos > 0) { // 上一轮有剩余 if (halfBlock.readPos < BLOCK_HEADER_LENGTH) { // 上一轮剩余数据不满足解析block长度信息 memcpy(&halfBlock.data[halfBlock.readPos], readData.dataBuf, BLOCK_HEADER_LENGTH - halfBlock.readPos); halfBlock.curLen = unpackInt16(&halfBlock.data[16]) + 1; // 更新一下剩余block的真正长度 // spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); } memcpy(readData.blockBuf, halfBlock.data, halfBlock.readPos); curReadPos = halfBlock.curLen - halfBlock.readPos; // curlen保存上一个block的长度,readPos保存上一个block在上一次读取中的长度 memcpy(&readData.blockBuf[halfBlock.readPos], readData.dataBuf, curReadPos); // 将不完整的block剩余数据拷贝到curBlock readData.startAddrArr.push_back(readData.blockBuf); } /* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */ while (curReadPos + BLOCK_HEADER_LENGTH <= readState) { /* 确保能解析block长度 */ blockLen = unpackInt16(&readData.dataBuf[curReadPos + 16]) + 1; if (blockLen > maxBlockLen) { maxBlockLen = blockLen; } if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */ readData.startAddrArr.push_back(&readData.dataBuf[curReadPos]); curReadPos += blockLen; } else { break; /* 当前block数据不完整,一部分在还没读入的file数据里 */ } } /* 如果buf中包含不完整的block数据,先保存一下,放到下一轮里去处理 */ halfBlock.readPos = readState - curReadPos; halfBlock.curLen = blockLen; if (halfBlock.readPos > 0) { memcpy(halfBlock.data, &readData.dataBuf[curReadPos], halfBlock.readPos); // 将不完整的block拷贝到halfBlock } // spdlog::info("block num-1: {}", readData.startAddrArr.size()); //spdlog::info("read order: {}, max block len: {}", p.readOrder, maxBlockLen); return readState; } /* FirstPipe step-1 读取文件线程 */ static void *firstPipeReadFile(void *data) { FirstPipeArg &p = *(FirstPipeArg *)data; /* 1. set up */ FILE *fpr = fopen(nsgv::gSortArg.INPUT_FILE.c_str(), "rb"); parseSamHeader(fpr, nsgv::gInHdr); size_t fileSize = 0; DataBuffer halfBlock(SINGLE_BLOCK_SIZE); /* 2. do the work */ while (true) { // self dependency yarn::DEPENDENCY_NOT_TO_BE(p.readSig, p.READ_BUF_NUM); PROF_G_BEG(read); size_t readState = doFirstPipeReadFile(p, halfBlock, fpr); PROF_G_END(read); if (readState == 0) { yarn::SIGNAL_FINISH(p.readSig, p.readFinish); break; } // update self status yarn::UPDATE_SIG_ORDER(p.readSig, p.readOrder); fileSize += readState; } spdlog::info("read file order: {}, file size: {}", p.readOrder, fileSize); /* 3. clean up */ fclose(fpr); return nullptr; } static int parseBam(uint8_t* addr, bam1_t* b) { bam1_core_t* c = &b->core; int32_t block_len, ret, i; uint32_t new_l_data; uint8_t tmp[32], *x; b->l_data = 0; memcpy(&block_len, addr, 4); if (nsgv::gIsBigEndian) ed_swap_4p(&block_len); if (block_len < 32) return -4; // block_len includes core data x = addr + 4; c->tid = le_to_u32(x); c->pos = le_to_i32(x + 4); uint32_t x2 = le_to_u32(x + 8); c->bin = x2 >> 16; c->qual = x2 >> 8 & 0xff; c->l_qname = x2 & 0xff; c->l_extranul = (c->l_qname % 4 != 0) ? (4 - c->l_qname % 4) : 0; uint32_t x3 = le_to_u32(x + 12); c->flag = x3 >> 16; c->n_cigar = x3 & 0xffff; c->l_qseq = le_to_u32(x + 16); c->mtid = le_to_u32(x + 20); c->mpos = le_to_i32(x + 24); c->isize = le_to_i32(x + 28); #if 0 new_l_data = block_len - 32 + c->l_extranul; if (new_l_data > INT_MAX || c->l_qseq < 0 || c->l_qname < 1) return -4; if (((uint64_t)c->n_cigar << 2) + c->l_qname + c->l_extranul + (((uint64_t)c->l_qseq + 1) >> 1) + c->l_qseq > (uint64_t)new_l_data) return -4; if (realloc_bam_data(b, new_l_data) < 0) return -4; b->l_data = new_l_data; if (bgzf_read_small(fp, b->data, c->l_qname) != c->l_qname) return -4; if (b->data[c->l_qname - 1] != '\0') { // try to fix missing nul termination if (fixup_missing_qname_nul(b) < 0) return -4; } for (i = 0; i < c->l_extranul; ++i) b->data[c->l_qname + i] = '\0'; c->l_qname += c->l_extranul; if (b->l_data < c->l_qname || bgzf_read_small(fp, b->data + c->l_qname, b->l_data - c->l_qname) != b->l_data - c->l_qname) return -4; if (fp->is_be) swap_data(c, b->l_data, b->data, 0); if (bam_tag2cigar(b, 0, 0) < 0) return -4; // TODO: consider making this conditional if (c->n_cigar > 0) { // recompute "bin" and check CIGAR-qlen consistency hts_pos_t rlen, qlen; bam_cigar2rqlens(c->n_cigar, bam_get_cigar(b), &rlen, &qlen); if ((b->core.flag & BAM_FUNMAP) || rlen == 0) rlen = 1; b->core.bin = hts_reg2bin(b->core.pos, b->core.pos + rlen, 14, 5); // Sanity check for broken CIGAR alignments if (c->l_qseq > 0 && !(c->flag & BAM_FUNMAP) && qlen != c->l_qseq) { hts_log_error("CIGAR and query sequence lengths differ for %s", bam_get_qname(b)); return -4; } } #endif return 4 + block_len; } // multi-thread uncompress bam blocks static void mtUncompressBlock(void *data, long idx, int tid) { PROF_T_BEG(mem_copy); UncompressData& p = *(UncompressData*)data; ReadBuffer &readData = *p.readDataPtr; auto &blockItemArr = p.blockItemArr[tid]; auto &bamItemArr = p.bamItemArr[tid]; auto &blockItem = blockItemArr.add(); uint8_t *block = readData.startAddrArr[idx]; size_t dlen = SINGLE_BLOCK_SIZE; // 65535 int block_length = unpackInt16(&block[16]) + 1; uint32_t crc = le_to_u32(block + block_length - 8); int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef *)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc); if (ret != 0) { spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret); exit(0); } blockItem.blockId = idx; blockItem.blockLen = dlen; uint32_t nextBamStart = 0; uint32_t bamLen = 0; uint32_t bamNum = 0; /* 解析每个bam */ while (nextBamStart + 4 <= blockItem.blockLen) { OneBam& bam = bamItemArr.add(); bam.blockThread = &blockItemArr; bam.blockIdx = blockItemArr.curIdx - 1; bam.offset = nextBamStart; uint8_t *curAddr = &blockItem.data[nextBamStart]; #if 0 nextBamStart += parseBam(curAddr, &bam.b); ++bamNum; #else memcpy(&bamLen, curAddr, 4); curAddr += 4; if (nsgv::gIsBigEndian) ed_swap_4p(&bamLen); bam.bamLen = bamLen; bam.tid = le_to_u32(curAddr); bam.pos = le_to_i32(curAddr + 4); uint32_t x2 = le_to_u32(curAddr + 8); bam.qnameLen = x2 & 0xff; // spdlog::info("bam len: {}, bam struct len: {}", bamLen, sizeof(OneBam)); // spdlog::info("bam name: {}, bam name len: {}", string((char*)(blockItem.data + bam.offset + OneBam::QnameOffset), bam.qnameLen), bam.qnameLen); nextBamStart += 4 + bamLen; ++bamNum; #endif } // spdlog::info("bam num: {}", bamNum); if (nextBamStart != blockItem.blockLen) { spdlog::error("Block content does not contain integer number of bam records!"); exit(0); } blockItem.bamNum = bamNum; blockItemArr.bamNum += bamNum; // 判断是否超过内存阈值,只要有一个超过阈值,就转入下一阶段,进行排序和合并,但是也得把这一轮数据解压处理完 if (blockItemArr.curIdx * SINGLE_BLOCK_SIZE * BLOCK_MEM_FACTOR >= p.father->singleThreadBytes) { p.father->uncompressBufFull = 1; } else { // spdlog::info("block arr: {}, single thread: {}", blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE, p.father->singleThreadBytes); } PROF_T_END(tid, mem_copy); } // multi-thread uncompress bam blocks static void mtUncompressBlockBatch(void* data, long idx, int tid) { PROF_T_BEG(mem_copy); UncompressData& p = *(UncompressData*)data; ReadBuffer& readData = *p.readDataPtr; int startIdx = START_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size()); int stopIdx = STOP_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size()); auto& blockItemArr = p.blockItemArr[tid]; auto& bamItemArr = p.bamItemArr[tid]; blockItemArr.add(stopIdx - startIdx); for (int i = startIdx; i < stopIdx; ++i) { auto& blockItem = blockItemArr.blockArr[blockItemArr.curIdx++]; uint8_t* block = readData.startAddrArr[i]; size_t dlen = SINGLE_BLOCK_SIZE; // 65535 int block_length = unpackInt16(&block[16]) + 1; uint32_t crc = le_to_u32(block + block_length - 8); int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef*)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc); if (ret != 0) { spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret); exit(0); } blockItem.blockId = i; blockItem.blockLen = dlen; uint32_t nextBamStart = 0; uint32_t bamLen = 0; uint32_t bamNum = 0; #if 1 /* 解析每个bam */ while (nextBamStart + 4 <= blockItem.blockLen) { bamItemArr.bamArr.push_back(OneBam()); OneBam& bam = bamItemArr.bamArr.back(); //bam.addr = &blockItem.data[nextBamStart]; uint8_t* curAddr = &blockItem.data[nextBamStart]; memcpy(&bamLen, curAddr, 4); curAddr += 4; if (nsgv::gIsBigEndian) ed_swap_4p(&bamLen); bam.bamLen = bamLen; bam.tid = le_to_u32(curAddr); bam.pos = le_to_i32(curAddr + 4); uint32_t x2 = le_to_u32(curAddr + 8); bam.qnameLen = x2 & 0xff; //bam.qnameAddr = (char*)(curAddr + 32); // spdlog::info("bam len: {}, bam struct len: {}", bamLen, sizeof(OneBam)); nextBamStart += 4 + bamLen; ++bamNum; } if (nextBamStart != blockItem.blockLen) { spdlog::error("Block content does not contain integer number of bam records!"); exit(0); } #endif blockItem.bamNum = bamNum; } // 判断是否超过内存阈值 if (blockItemArr.curIdx * SINGLE_BLOCK_SIZE >= p.father->singleThreadBytes) { p.father->uncompressBufFull = 1; } else { // spdlog::info("block arr: {}, single thread: {}", blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE, p.father->singleThreadBytes); } PROF_T_END(tid, mem_copy); } // 线程内排序 static void mtInThreadSort(void* data, long idx, int tid) { UncompressData& p = *(UncompressData*)data; auto& bamItemArr = p.bamItemArr[tid]; auto& arr = bamItemArr.bamArr; // 先按照坐标排序 #if 0 if (tid == 0) spdlog::info("Before: {} {} {} {} {} {} {} {} {} {}", arr[0].pos, arr[1].pos, arr[2].pos, arr[3].pos, arr[4].pos, arr[5].pos, arr[6].pos, arr[7].pos, arr[8].pos, arr[9].pos); #endif if (nsgv::gSortArg.SORT_COORIDINATE) { std::sort(arr.begin(), arr.begin() + bamItemArr.curIdx, [](const OneBam& b1, const OneBam& b2) { return b1.pos < b2.pos; }); } else if (nsgv::gSortArg.QUERY_NAME_TYPE == nsmd::QueryNameType::PICARD) { std::sort(arr.begin(), arr.begin() + bamItemArr.curIdx, [](const OneBam& b1, const OneBam& b2) { int cmp = 0; strncmp((char*)(b1.blockThread->blockArr[b1.blockIdx].data + b1.offset + OneBam::QnameOffset), (char*)(b2.blockThread->blockArr[b2.blockIdx].data + b2.offset + OneBam::QnameOffset), std::min(b1.qnameLen, b2.qnameLen)); if (cmp == 0) return b1.qnameLen < b2.qnameLen; return cmp < 0; }); } #if 0 if (tid == 0) spdlog::info("After: {} {} {} {} {} {} {} {} {} {}", arr[0].pos, arr[1].pos, arr[2].pos, arr[3].pos, arr[4].pos, arr[5].pos, arr[6].pos, arr[7].pos, arr[8].pos, arr[9].pos); #endif } // 多线程压缩 static void mtCompressBlock(void* data, long idx, int tid) { FirstPipeArg &p = *(FirstPipeArg *)data; auto& barr = p.sortedBamArr; auto& t = p.taskArr[idx]; auto& buf = p.threadBuf[tid]; uint8_t block[SINGLE_BLOCK_SIZE] = {0}; uint8_t compressBlock[SINGLE_BLOCK_SIZE] = {0}; int curAddr = 0; for (int i = t.idx; i < t.idx + t.num; ++i) { // copy data memccpy(&block[curAddr], barr[i]->blockThread->blockArr[barr[i]->blockIdx].data + barr[i]->offset, 1, barr[i]->bamLen); curAddr += barr[i]->bamLen; } size_t dlen = BGZF_MAX_BLOCK_SIZE; // spdlog::info("len: {}", curAddr); bgzfCompress(block, &dlen, compressBlock, curAddr, -1); // 先放到自己线程内部的buf里 // 然后在压缩完成之后,检测当前全局压缩序号,如果当前序号的block在自己线程内,则复制过去 } /* 将gz block进行解压,并进行线程内排序 */ static void doFirstPipeUncompress(FirstPipeArg &p, int finish = 0) { // return; PROF_G_BEG(uncompress); ReadBuffer &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM]; uncompressData.readDataPtr = &readData; #if 1 kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size()); #else kt_for(p.numThread, mtUncompressBlockBatch, &uncompressData, nsgv::gSortArg.NUM_THREADS); #endif PROF_G_END(uncompress); // 判断是否超过内存阈值,只要有一个超过阈值,就转入下一阶段,进行排序和合并 if (p.uncompressBufFull == 1 || finish) { spdlog::info("buf full - {}", p.mergeOrder); // spdlog::info("max mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, p.singleThreadBytes); // sort,排序的时候应该线程利用率很高,此时不需要跟其他操作如压缩等进行重叠了 PROF_G_BEG(sort); kt_for(p.numThread, mtInThreadSort, &uncompressData, nsgv::gSortArg.NUM_THREADS); PROF_G_END(sort); // merge #if 0 auto& bamArr = p.sortedBamArr; BamHeap heap; heap.Init(&uncompressData.bamItemArr); bamArr.resize(heap.Size()); auto& taskArr = p.taskArr; taskArr.clear(); const OneBam* bam = nullptr; uint64_t posAll = 0; int i = 0; ArrayInterval intv{0, 0}; uint64_t blockBytes = 0; PROF_G_BEG(merge); while ((bam = heap.Pop()) != nullptr) { // posAll += bam->pos; //spdlog::info("pos: {}", bam->pos); bamArr[i++] = bam; blockBytes += bam->bamLen; if (blockBytes >= SINGLE_BLOCK_SIZE) { intv.num = i - 1 - intv.idx; taskArr.push_back(intv); intv.idx = i - 1; blockBytes = bam->bamLen; } } PROF_G_END(merge); spdlog::info("task size: {} {}", taskArr.size(), bamArr.size()); if (i - 1 > intv.idx) { intv.num = i - 1 - intv.idx; taskArr.push_back(intv); } // spdlog::info("pos all: {}", posAll); #endif // compress,此时并行压缩的多线程利用率应该很高了 PROF_G_BEG(compress); #if 0 kt_for(p.numThread, mtCompressBlock, &p, taskArr.size()); #endif for (auto &blockItemArr : uncompressData.blockItemArr) { p.bamNum += blockItemArr.bamNum; } uncompressData.ResetBlockArr(); uncompressData.ResetBamArr(); p.uncompressBufFull = 0; p.mergeOrder++; for(auto &buf : p.threadBuf) { buf.curLen = 0; } PROF_G_END(compress); // 压缩完之后应该写入中间文件,这个应该可以overlap } // 等处理完这些数据之后,进入新一轮的解压和归并排序压缩输出中间文件。 // auto &bam = uncompressData.bamItemArr[0].bamArr.back(); // string qname(bam.qnameAddr, bam.qnameLen); // spdlog::info("bam name:{}", qname); } /* FirstPipe step-2 并行解压gz blocks*/ static void *firstPipeUncompress(void *data) { FirstPipeArg &p = *(FirstPipeArg *)data; /* 2. do the work */ while (true) { // previous dependency yarn::DEPENDENCY_NOT_TO_BE(p.readSig, 0); // self dependency yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); if (p.readFinish) { while (p.uncompressOrder < p.readOrder) { yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); doFirstPipeUncompress(p, 1); yarn::UPDATE_SIG_ORDER(p.uncompressSig, p.uncompressOrder); } yarn::SIGNAL_FINISH(p.uncompressSig, p.uncompressFinish); break; } doFirstPipeUncompress(p); // update status yarn::CONSUME_SIGNAL(p.readSig); yarn::UPDATE_SIG_ORDER(p.uncompressSig, p.uncompressOrder); } spdlog::info("uncompress order: {}", p.uncompressOrder); return nullptr; } /* 将并行解压的数据放到一起,解析,到容量阈值后,进行排序并输出到中间文件 */ void dofirstPipeWrite(FirstPipeArg &p) { PROF_G_BEG(sort); UncompressData &uncompressData = p.uncompressData[p.writeOrder % p.UNCOMPRESS_BUF_NUM]; size_t blockNum = 0; for (int i = 0; i < p.numThread; ++i) { blockNum += uncompressData.blockItemArr[i].curIdx; } size_t bamNum = blockNum * 256; size_t curBlockBamMemSize = blockNum * SINGLE_BLOCK_SIZE + bamNum * 32; /* 内存使用量达到阈值后,进行排序 */ if (curBlockBamMemSize > nsgv::gSortArg.MAX_MEM / 2) { //uncompressData.ResetBlockArr(); //uncompressData.ResetBamArr(); } //spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockBytes, bamNum * 32); // spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockNum * SINGLE_BLOCK_SIZE, bamNum * 32); PROF_G_END(sort); } /* FirstPipe step-3 串行写入中间文件(已经压缩好) */ static void *firstPipeWrite(void *data) { FirstPipeArg &p = *(FirstPipeArg *)data; while (true) { yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, 0); if (p.uncompressFinish) { spdlog::info("uncompress finish, cur sort order: {}", p.writeOrder); while (p.writeOrder < p.uncompressOrder) { dofirstPipeWrite(p); p.writeOrder += 1; } /* 需要检测一下buf中是否还有数据,如果还有,则需要进行排序,可以不输出到中间文件,直接进行second-pipe的归并排序 */ break; } dofirstPipeWrite(p); p.writeOrder += 1; // update status yarn::CONSUME_SIGNAL(p.uncompressSig); } spdlog::info("merge sort order: {}", p.writeOrder); return nullptr; } /* 对bam文件进行排序第一阶段,线程内排序,线程间merge,输入到中间bam文件 */ static void bamSortFirstPipe() { /* set up*/ FirstPipeArg firstPipeArg; firstPipeArg.numThread = nsgv::gSortArg.NUM_THREADS; firstPipeArg.singleThreadBytes = nsgv::gSortArg.MAX_MEM / firstPipeArg.numThread; spdlog::info("max mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, firstPipeArg.singleThreadBytes); const size_t kReadBufSize = 1L * 1024 * 1024 * firstPipeArg.numThread; // 平均每线程1M缓冲区,累加起来,用来读入文件(BAM/SAM)(相对解压之后的缓冲区,大小可以忽略) for (int i = 0; i= 0) { bam_num++; if (max_bam_len < bamp->l_data) max_bam_len = bamp->l_data; if (bamp->l_data > 1000) { spdlog::info("large record len: {}", bamp->l_data); } } sam_close(inBamFp); spdlog::info("max record len: {}", max_bam_len); spdlog::info("bam num: {}", bam_num); #endif return 0; }