diff --git a/CMakeLists.txt b/CMakeLists.txt index 11472b5..07431d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") # set(CMAKE_BUILD_TYPE Debug) -# set(CMAKE_BUILD_TYPE Release) +set(CMAKE_BUILD_TYPE Release) add_definitions(-DSHOW_PERF=1) add_definitions(-DSPDLOG_COMPILED_LIB) add_subdirectory(src) \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 5316d09..97d417d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -78,11 +78,11 @@ int main(int argc, char *argv[]) { .help("Set compression level, from 0 (uncompressed) to 9 (best).") .nargs(1); - program.add_argument("-m", "--max-mem") - .help("approximate total memory limit for all threads (by default 2GB)") - .default_value(std::string("2G")) + program.add_argument("-m", "--mem") + .help("approximate total memory limit for all threads (by default 4GB)") + .default_value(std::string("4G")) .nargs(1) - .metavar(""); + .metavar(""); program.add_argument("--sort-type") .help("\nUse coordinate sort or query name sort. Possible values: {COORDINATE, NAME}") @@ -144,6 +144,18 @@ int main(int argc, char *argv[]) { {"PICARD", nsmd::QueryNameType::PICARD}}; nsgv::gSortArg.QUERY_NAME_TYPE = query_name_args[program.get("--query-name")]; + string maxMemStr = program.get("--mem"); + { + char* q; + uint64_t maxMem = strtol(maxMemStr.c_str(), &q, 0); + if (*q == 'k' || *q == 'K') + maxMem <<= 10; + else if (*q == 'm' || *q == 'M') + maxMem <<= 20; + else if (*q == 'g' || *q == 'G') + maxMem <<= 30; + nsgv::gSortArg.MAX_MEM = maxMem; + } } catch (const std::exception &err) { spdlog::error(err.what()); @@ -155,7 +167,7 @@ int main(int argc, char *argv[]) { doSort(); spdlog::info("fast bam sort end"); - displayProfiling(1); + displayProfiling(nsgv::gSortArg.NUM_THREADS); return 0; } \ No newline at end of file diff --git a/src/sort/const_val.h b/src/sort/const_val.h index 0118cff..8bb9729 100644 --- a/src/sort/const_val.h +++ b/src/sort/const_val.h @@ -3,7 +3,9 @@ #include // 单个block gz的最大解压大小 +#define BGZF_MAX_BLOCK_SIZE 0x10000 #define SINGLE_BLOCK_SIZE 65535 +#define BLOCK_MEM_FACTOR 1.2 // bam header字节数量 #ifndef BLOCK_HEADER_LENGTH diff --git a/src/sort/sam_io.cpp b/src/sort/sam_io.cpp index 9a807b7..1795256 100644 --- a/src/sort/sam_io.cpp +++ b/src/sort/sam_io.cpp @@ -13,6 +13,8 @@ namespace nsgv { extern bool gIsBigEndian; }; +static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0"; + size_t fileSize(FILE *fp) { spdlog::info("test file start"); size_t bufSize = 64L * 1024 * 1024; @@ -26,6 +28,74 @@ size_t fileSize(FILE *fp) { return fileSize; } +static inline void packInt16(uint8_t* buffer, uint16_t value) { + buffer[0] = value; + buffer[1] = value >> 8; +} + +static inline void packInt32(uint8_t* buffer, uint32_t value) { + buffer[0] = value; + buffer[1] = value >> 8; + buffer[2] = value >> 16; + buffer[3] = value >> 24; +} + +int bgzfCompress(void* _dst, size_t* dlen, const void* src, size_t slen, int level) { + if (slen == 0) { + // EOF block + if (*dlen < 28) + return -1; + memcpy(_dst, "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", 28); + *dlen = 28; + return 0; + } + + uint8_t* dst = (uint8_t*)_dst; + + if (level == 0) { + // Uncompressed data + if (*dlen < slen + 5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH) + return -1; + dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951 + u16_to_le(slen, &dst[BLOCK_HEADER_LENGTH + 1]); // length + u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH + 3]); // ones-complement length + memcpy(dst + BLOCK_HEADER_LENGTH + 5, src, slen); + *dlen = slen + 5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH; + + } else { + level = level > 0 ? level : 6; // libdeflate doesn't honour -1 as default + // NB levels go up to 12 here. + int lvl_map[] = {0, 1, 2, 3, 5, 6, 7, 8, 10, 12}; + level = lvl_map[level > 9 ? 9 : level]; + struct libdeflate_compressor* z = libdeflate_alloc_compressor(level); + if (!z) + return -1; + + // Raw deflate + size_t clen = libdeflate_deflate_compress(z, src, slen, dst + BLOCK_HEADER_LENGTH, *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH); + + if (clen <= 0) { + hts_log_error("Call to libdeflate_deflate_compress failed"); + libdeflate_free_compressor(z); + return -1; + } + + *dlen = clen + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH; + + libdeflate_free_compressor(z); + } + + // write the header + memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block + packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes + + // write the footer + uint32_t crc = libdeflate_crc32(0, src, slen); + packInt32((uint8_t*)&dst[*dlen - 8], crc); + packInt32((uint8_t*)&dst[*dlen - 4], slen); + return 0; +} + int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen, uint32_t expected_crc) { struct libdeflate_decompressor *z = libdeflate_alloc_decompressor(); if (!z) { diff --git a/src/sort/sam_io.h b/src/sort/sam_io.h index cd83246..b13c10f 100644 --- a/src/sort/sam_io.h +++ b/src/sort/sam_io.h @@ -48,4 +48,5 @@ static int unpackInt16(const uint8_t *buffer) { return buffer[0] | buffer[1] << void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf); -int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen, uint32_t expected_crc); \ No newline at end of file +int bgzfCompress(void* _dst, size_t* dlen, const void* src, size_t slen, int level); +int bgzfUncompress(uint8_t* dst, size_t* dlen, const uint8_t* src, size_t slen, uint32_t expected_crc); \ No newline at end of file diff --git a/src/sort/sort.cpp b/src/sort/sort.cpp index adbbcf2..49754e0 100644 --- a/src/sort/sort.cpp +++ b/src/sort/sort.cpp @@ -12,13 +12,14 @@ #include #include #include -#include #include +#include +#include "const_val.h" #include "sam_io.h" #include "sort_args.h" -#include "const_val.h" +#include "sort_impl.h" #include "util/profiling.h" #include "util/yarn.h" @@ -131,19 +132,87 @@ static void *firstPipeReadFile(void *data) { return nullptr; } +static int parseBam(uint8_t* addr, bam1_t* b) { + bam1_core_t* c = &b->core; + int32_t block_len, ret, i; + uint32_t new_l_data; + uint8_t tmp[32], *x; + + b->l_data = 0; + memcpy(&block_len, addr, 4); + if (nsgv::gIsBigEndian) + ed_swap_4p(&block_len); + if (block_len < 32) + return -4; // block_len includes core data + x = addr + 4; + + c->tid = le_to_u32(x); + c->pos = le_to_i32(x + 4); + uint32_t x2 = le_to_u32(x + 8); + c->bin = x2 >> 16; + c->qual = x2 >> 8 & 0xff; + c->l_qname = x2 & 0xff; + c->l_extranul = (c->l_qname % 4 != 0) ? (4 - c->l_qname % 4) : 0; + uint32_t x3 = le_to_u32(x + 12); + c->flag = x3 >> 16; + c->n_cigar = x3 & 0xffff; + c->l_qseq = le_to_u32(x + 16); + c->mtid = le_to_u32(x + 20); + c->mpos = le_to_i32(x + 24); + c->isize = le_to_i32(x + 28); +/* + new_l_data = block_len - 32 + c->l_extranul; + if (new_l_data > INT_MAX || c->l_qseq < 0 || c->l_qname < 1) + return -4; + if (((uint64_t)c->n_cigar << 2) + c->l_qname + c->l_extranul + (((uint64_t)c->l_qseq + 1) >> 1) + c->l_qseq > (uint64_t)new_l_data) + return -4; + if (realloc_bam_data(b, new_l_data) < 0) + return -4; + b->l_data = new_l_data; + + if (bgzf_read_small(fp, b->data, c->l_qname) != c->l_qname) + return -4; + if (b->data[c->l_qname - 1] != '\0') { // try to fix missing nul termination + if (fixup_missing_qname_nul(b) < 0) + return -4; + } + for (i = 0; i < c->l_extranul; ++i) b->data[c->l_qname + i] = '\0'; + c->l_qname += c->l_extranul; + if (b->l_data < c->l_qname || bgzf_read_small(fp, b->data + c->l_qname, b->l_data - c->l_qname) != b->l_data - c->l_qname) + return -4; + if (fp->is_be) + swap_data(c, b->l_data, b->data, 0); + if (bam_tag2cigar(b, 0, 0) < 0) + return -4; + + // TODO: consider making this conditional + if (c->n_cigar > 0) { // recompute "bin" and check CIGAR-qlen consistency + hts_pos_t rlen, qlen; + bam_cigar2rqlens(c->n_cigar, bam_get_cigar(b), &rlen, &qlen); + if ((b->core.flag & BAM_FUNMAP) || rlen == 0) + rlen = 1; + b->core.bin = hts_reg2bin(b->core.pos, b->core.pos + rlen, 14, 5); + // Sanity check for broken CIGAR alignments + if (c->l_qseq > 0 && !(c->flag & BAM_FUNMAP) && qlen != c->l_qseq) { + hts_log_error("CIGAR and query sequence lengths differ for %s", bam_get_qname(b)); + return -4; + } + } +*/ + return 4 + block_len; +} + // multi-thread uncompress bam blocks static void mtUncompressBlock(void *data, long idx, int tid) { - UncompressData &p = *(UncompressData *)data; + PROF_T_BEG(mem_copy); + + UncompressData& p = *(UncompressData*)data; ReadData &readData = *p.readDataPtr; auto &blockItemArr = p.blockItemArr[tid]; - if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) { - blockItemArr.blockArr.push_back(OneBlock()); - } auto &bamItemArr = p.bamItemArr[tid]; - auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++]; - + auto &blockItem = blockItemArr.add(); uint8_t *block = readData.startAddrArr[idx]; size_t dlen = SINGLE_BLOCK_SIZE; // 65535 @@ -160,44 +229,53 @@ static void mtUncompressBlock(void *data, long idx, int tid) { uint32_t nextBamStart = 0; uint32_t bamLen = 0; uint32_t bamNum = 0; - PROF_G_BEG(mem_copy); + /* 解析每个bam */ while (nextBamStart + 4 <= blockItem.blockLen) { - bamItemArr.bamArr.push_back(OneBam()); - OneBam &bam = bamItemArr.bamArr.back(); + OneBam& bam = bamItemArr.add(); - bam.addr = &blockItem.data[nextBamStart]; + bam.blockThread = &blockItemArr; + bam.blockIdx = blockItemArr.curIdx - 1; + bam.offset = nextBamStart; uint8_t *curAddr = &blockItem.data[nextBamStart]; +#if 0 + nextBamStart += parseBam(curAddr, &bam.b); + ++bamNum; +#else memcpy(&bamLen, curAddr, 4); curAddr += 4; if (nsgv::gIsBigEndian) ed_swap_4p(&bamLen); - bam.bamLen = bamLen; + bam.bamLen = bamLen; bam.tid = le_to_u32(curAddr); bam.pos = le_to_i32(curAddr + 4); uint32_t x2 = le_to_u32(curAddr + 8); bam.qnameLen = x2 & 0xff; - bam.qnameAddr = (char *)(curAddr + 32); - // spdlog::info("bam len: {}", bamLen); + // spdlog::info("bam len: {}, bam struct len: {}", bamLen, sizeof(OneBam)); + // spdlog::info("bam name: {}, bam name len: {}", string((char*)(blockItem.data + bam.offset + OneBam::QnameOffset), bam.qnameLen), bam.qnameLen); nextBamStart += 4 + bamLen; ++bamNum; +#endif } + // spdlog::info("bam num: {}", bamNum); if (nextBamStart != blockItem.blockLen) { spdlog::error("Block content does not contain integer number of bam records!"); exit(0); } blockItem.bamNum = bamNum; - PROF_G_END(mem_copy); - // 判断是否超过内存阈值 - if (blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE >= nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS) { - + if (blockItemArr.curIdx * SINGLE_BLOCK_SIZE * BLOCK_MEM_FACTOR >= p.father->singleThreadBytes) { + p.father->uncompressBufFull = 1; + } else { + // spdlog::info("block arr: {}, single thread: {}", blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE, p.father->singleThreadBytes); } + PROF_T_END(tid, mem_copy); } // multi-thread uncompress bam blocks static void mtUncompressBlockBatch(void* data, long idx, int tid) { + PROF_T_BEG(mem_copy); UncompressData& p = *(UncompressData*)data; ReadData& readData = *p.readDataPtr; @@ -205,10 +283,9 @@ static void mtUncompressBlockBatch(void* data, long idx, int tid) { int stopIdx = STOP_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size()); auto& blockItemArr = p.blockItemArr[tid]; + auto& bamItemArr = p.bamItemArr[tid]; - if (blockItemArr.curIdx + (stopIdx - startIdx) >= blockItemArr.blockArr.size()) { - blockItemArr.blockArr.resize(blockItemArr.curIdx + (stopIdx - startIdx)); - } + blockItemArr.add(stopIdx - startIdx); for (int i = startIdx; i < stopIdx; ++i) { auto& blockItem = blockItemArr.blockArr[blockItemArr.curIdx++]; @@ -228,18 +305,98 @@ static void mtUncompressBlockBatch(void* data, long idx, int tid) { uint32_t nextBamStart = 0; uint32_t bamLen = 0; uint32_t bamNum = 0; - PROF_G_BEG(mem_copy); - // memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen); + /* 解析每个bam */ while (nextBamStart + 4 <= blockItem.blockLen) { - memcpy(&bamLen, &blockItem.data[nextBamStart], 4); + bamItemArr.bamArr.push_back(OneBam()); + OneBam& bam = bamItemArr.bamArr.back(); + + //bam.addr = &blockItem.data[nextBamStart]; + uint8_t* curAddr = &blockItem.data[nextBamStart]; + memcpy(&bamLen, curAddr, 4); + curAddr += 4; + if (nsgv::gIsBigEndian) + ed_swap_4p(&bamLen); + bam.bamLen = bamLen; + bam.tid = le_to_u32(curAddr); + bam.pos = le_to_i32(curAddr + 4); + uint32_t x2 = le_to_u32(curAddr + 8); + bam.qnameLen = x2 & 0xff; + //bam.qnameAddr = (char*)(curAddr + 32); + + // spdlog::info("bam len: {}, bam struct len: {}", bamLen, sizeof(OneBam)); nextBamStart += 4 + bamLen; - // spdlog::info("bam len: {}", bamLen); ++bamNum; } + if (nextBamStart != blockItem.blockLen) { + spdlog::error("Block content does not contain integer number of bam records!"); + exit(0); + } blockItem.bamNum = bamNum; - PROF_G_END(mem_copy); + } + // 判断是否超过内存阈值 + if (blockItemArr.curIdx * SINGLE_BLOCK_SIZE >= p.father->singleThreadBytes) { + p.father->uncompressBufFull = 1; + } else { + // spdlog::info("block arr: {}, single thread: {}", blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE, p.father->singleThreadBytes); + } + PROF_T_END(tid, mem_copy); +} + +// 线程内排序 +static void mtInThreadSort(void* data, long idx, int tid) { + UncompressData& p = *(UncompressData*)data; + auto& bamItemArr = p.bamItemArr[tid]; + auto& arr = bamItemArr.bamArr; + // 先按照坐标排序 + +#if 0 + if (tid == 0) + spdlog::info("Before: {} {} {} {} {} {} {} {} {} {}", arr[0].pos, arr[1].pos, arr[2].pos, arr[3].pos, arr[4].pos, arr[5].pos, arr[6].pos, + arr[7].pos, arr[8].pos, arr[9].pos); +#endif + + if (nsgv::gSortArg.SORT_COORIDINATE) { + std::sort(arr.begin(), arr.begin() + bamItemArr.curIdx, [](const OneBam& b1, const OneBam& b2) { return b1.pos < b2.pos; }); + } else if (nsgv::gSortArg.QUERY_NAME_TYPE == nsmd::QueryNameType::PICARD) { + std::sort(arr.begin(), arr.begin() + bamItemArr.curIdx, [](const OneBam& b1, const OneBam& b2) { + int cmp = 0; + strncmp((char*)(b1.blockThread->blockArr[b1.blockIdx].data + b1.offset + OneBam::QnameOffset), + (char*)(b2.blockThread->blockArr[b2.blockIdx].data + b2.offset + OneBam::QnameOffset), std::min(b1.qnameLen, b2.qnameLen)); + if (cmp == 0) + return b1.qnameLen < b2.qnameLen; + return cmp < 0; + }); + } + +#if 0 + if (tid == 0) + spdlog::info("After: {} {} {} {} {} {} {} {} {} {}", arr[0].pos, arr[1].pos, arr[2].pos, arr[3].pos, arr[4].pos, arr[5].pos, arr[6].pos, + arr[7].pos, arr[8].pos, arr[9].pos); +#endif + +} + +// 多线程压缩 +static void mtCompressBlock(void* data, long idx, int tid) { + FirstPipeArg &p = *(FirstPipeArg *)data; + auto& barr = p.sortedBamArr; + auto& t = p.taskArr[idx]; + auto& buf = p.threadBuf[tid]; + + uint8_t block[SINGLE_BLOCK_SIZE] = {0}; + uint8_t compressBlock[SINGLE_BLOCK_SIZE] = {0}; + int curAddr = 0; + for (int i = t.idx; i < t.idx + t.num; ++i) { + // copy data + memccpy(&block[curAddr], barr[i]->blockThread->blockArr[barr[i]->blockIdx].data + barr[i]->offset, 1, barr[i]->bamLen); + curAddr += barr[i]->bamLen; + } + + size_t dlen = BGZF_MAX_BLOCK_SIZE; + // spdlog::info("len: {}", curAddr); + bgzfCompress(block, &dlen, compressBlock, curAddr, -1); } /* 将gz block进行解压,并进行线程内排序 */ @@ -249,23 +406,78 @@ static void doFirstPipeUncompress(FirstPipeArg &p) { ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM]; uncompressData.readDataPtr = &readData; - uncompressData.ResetBlockArr(); - uncompressData.ResetBamArr(); #if 1 kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size()); #else kt_for(p.numThread, mtUncompressBlockBatch, &uncompressData, nsgv::gSortArg.NUM_THREADS); #endif - //spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx, - // uncompressData.blockItemArr[0].blockArr.size()); + + PROF_G_END(uncompress); // 判断是否超过内存阈值 + if (p.uncompressBufFull == 1) { + + spdlog::info("buf full - {}", p.mergOrder); + + // spdlog::info("max mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, p.singleThreadBytes); + + // sort + PROF_G_BEG(sort); + kt_for(p.numThread, mtInThreadSort, &uncompressData, nsgv::gSortArg.NUM_THREADS); + PROF_G_END(sort); + // merge +#if 1 + auto& bamArr = p.sortedBamArr; + BamHeap heap; + heap.Init(&uncompressData.bamItemArr); + bamArr.resize(heap.Size()); + auto& taskArr = p.taskArr; + taskArr.clear(); + + const OneBam* bam = nullptr; + uint64_t posAll = 0; + int i = 0; + ArrayInterval intv{0, 0}; + uint64_t blockBytes = 0; + PROF_G_BEG(merge); + while ((bam = heap.Pop()) != nullptr) { + // posAll += bam->pos; + //spdlog::info("pos: {}", bam->pos); + bamArr[i++] = bam; + blockBytes += bam->bamLen; + if (blockBytes >= SINGLE_BLOCK_SIZE) { + intv.num = i - 1 - intv.idx; + taskArr.push_back(intv); + intv.idx = i - 1; + blockBytes = bam->bamLen; + } + } + PROF_G_END(merge); + spdlog::info("task size: {} {}", taskArr.size(), bamArr.size()); + if (i - 1 > intv.idx) { + intv.num = i - 1 - intv.idx; + taskArr.push_back(intv); + } + + // spdlog::info("pos all: {}", posAll); +#endif + // compress + PROF_G_BEG(compress); + kt_for(p.numThread, mtCompressBlock, &p, taskArr.size()); + + uncompressData.ResetBlockArr(); + uncompressData.ResetBamArr(); + p.uncompressBufFull = 0; + p.mergOrder++; + for(auto &buf : p.threadBuf) { + buf.curLen = 0; + } + PROF_G_END(compress); + } // auto &bam = uncompressData.bamItemArr[0].bamArr.back(); // string qname(bam.qnameAddr, bam.qnameLen); // spdlog::info("bam name:{}", qname); - - PROF_G_END(uncompress); } /* FirstPipe step-2 并行解压gz blocks*/ @@ -279,7 +491,6 @@ static void *firstPipeUncompress(void *data) { // self dependency yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); - PROF_G_BEG(uncompress); if (p.readFinish) { while (p.uncompressOrder < p.readOrder) { yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); @@ -291,7 +502,6 @@ static void *firstPipeUncompress(void *data) { } doFirstPipeUncompress(p); - PROF_G_END(uncompress); // update status yarn::CONSUME_SIGNAL(p.readSig); @@ -316,12 +526,12 @@ void doFirstPipeMergeSort(FirstPipeArg &p) { size_t curBlockBamMemSize = blockNum * SINGLE_BLOCK_SIZE + bamNum * 32; /* 内存使用量达到阈值后,进行排序 */ if (curBlockBamMemSize > nsgv::gSortArg.MAX_MEM / 2) { - uncompressData.ResetBlockArr(); - uncompressData.ResetBamArr(); + //uncompressData.ResetBlockArr(); + //uncompressData.ResetBamArr(); } //spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockBytes, bamNum * 32); - spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockNum * SINGLE_BLOCK_SIZE, bamNum * 32); + // spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockNum * SINGLE_BLOCK_SIZE, bamNum * 32); PROF_G_END(sort); } @@ -359,12 +569,18 @@ static void bamSortFirstPipe() { /* set up*/ FirstPipeArg firstPipeArg; firstPipeArg.numThread = nsgv::gSortArg.NUM_THREADS; + firstPipeArg.singleThreadBytes = nsgv::gSortArg.MAX_MEM / firstPipeArg.numThread; + spdlog::info("max mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, firstPipeArg.singleThreadBytes); const size_t kReadBufSize = 1L * 1024 * 1024 * firstPipeArg.numThread; for (int i = 0; i startAddrArr; // 存放每个block的起始地址 ReadData() { } @@ -39,19 +39,49 @@ struct ReadData { blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); } }; -/* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序*/ + +class ThreadBlockArr; +/* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序,用来排序的*/ struct OneBam { + // uint16_t bamLen = 0; + // uint16_t qnameLen = 0; + // uint32_t tid = 0; + // char *qnameAddr = 0; // qname的地址 + // uint64_t pos = 0; // mapping 位置 + // uint8_t *addr = 0; // 地址 + static constexpr int QnameOffset = 36; // 距离该sam记录开头地址的偏移量 + int blockIdx = 0; // 在哪个block里 uint16_t bamLen = 0; - uint16_t qnameLen = 0; - uint32_t tid = 0; - char *qnameAddr = 0; // qname的地址 - uint64_t pos = 0; // mapping 位置 - uint8_t *addr = 0; // 地址 + uint16_t qnameLen; // 序列名字长度 + uint32_t offset = 0; // 距离首地址的偏移量 + uint32_t tid = 0; // 比对到的染色体 + uint64_t pos = 0; // mapping 位置 + ThreadBlockArr* blockThread; + // for test + // bam1_t b; }; + + struct ThreadBamArr { vector bamArr; int curIdx = 0; // + OneBam &add() { + if (curIdx < bamArr.size()) + return bamArr[curIdx++]; + else { +#if 0 + bamArr.resize((bamArr.size() + 1) << 1); + return bamArr[curIdx++]; + +#else + bamArr.push_back(OneBam()); + curIdx++; + return bamArr.back(); +#endif + } + } + void clear() { curIdx = 0; } }; struct OneBlock { @@ -63,39 +93,62 @@ struct OneBlock { struct ThreadBlockArr { vector blockArr; // 解压后的数据 int curIdx = 0; // 当前解压数据对应的vector的索引 + OneBlock& add() { + if (curIdx < blockArr.size()) + return blockArr[curIdx++]; + else { +#if 0 + blockArr.resize((blockArr.size() + 1) << 1); + return blockArr[curIdx++]; + +#else + blockArr.push_back(OneBlock()); + curIdx++; + return blockArr.back(); +#endif + } + } + void add(int num) { + if (curIdx + num > blockArr.size()) { + blockArr.resize(curIdx + num); + } + } + + void clear() { curIdx = 0; } }; + +class FirstPipeArg; + +// 第一阶段的解压、排序、归并、压缩 struct UncompressData { vector blockItemArr; // 每个thread一个,用来保存解压后的block数据 vector bamItemArr; // 每个thread一个,用来保存解压后的bam数据 ReadData *readDataPtr = nullptr; // 读取数据的指针 + FirstPipeArg* father = nullptr; // 保存父参数的指针,可能更新一些状态 UncompressData() { } UncompressData(int numThread) { Resize(numThread); } UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); } void Resize(int numThread) { - blockItemArr.resize(numThread); - bamItemArr.resize(numThread); - for (int i = 0; i < numThread; ++i) { - blockItemArr[i].blockArr.reserve(128); - } + Resize(numThread, 128); } void Resize(int numThread, int vecInitSize) { blockItemArr.resize(numThread); bamItemArr.resize(numThread); for (int i = 0; i < numThread; ++i) { blockItemArr[i].blockArr.reserve(vecInitSize); + bamItemArr[i].bamArr.reserve(vecInitSize * 192); } } void ResetBlockArr() { for (int i = 0; i < blockItemArr.size(); ++i) { - blockItemArr[i].curIdx = 0; + blockItemArr[i].clear(); } } void ResetBamArr() { for (int i = 0; i < bamItemArr.size(); ++i) { - // bamItemArr[i].curIdx = 0; - bamItemArr[i].bamArr.clear(); + bamItemArr[i].clear(); } } }; @@ -111,6 +164,7 @@ struct BlockGreaterThan{ return a.block->blockId > b.block->blockId; } }; + /* 用来排序 */ struct BlockHeap { vector *arr2d; @@ -169,6 +223,69 @@ struct BlockHeap { } }; +/* bam 排序堆 */ +struct BamArrIdIdx { + int arrId = 0; + size_t arrIdx = 0; // 下一个待读入数据的idx + const OneBam* bam = nullptr; +}; + +struct BamGreaterThan { + bool operator()(const BamArrIdIdx& a, const BamArrIdIdx& b) const { return a.bam->pos > b.bam->pos; } +}; + +/* 用来排序 bam*/ +template +struct BamHeap { + vector* arr2d; + priority_queue, GreaterThan> minHeap; + size_t popNum = 0; + + int Init(vector* _arr2d) { + arr2d = _arr2d; + if (arr2d == nullptr) { + return -1; + } + for (int i = 0; i < arr2d->size(); ++i) { + auto& v = (*arr2d)[i]; + if (v.curIdx > 0) { + minHeap.push({i, 1, &v.bamArr[0]}); + } + } + return 0; + } + + const OneBam* Pop() { + const OneBam* ret = nullptr; + if (!minHeap.empty()) { + auto minVal = minHeap.top(); + minHeap.pop(); + ++popNum; + ret = minVal.bam; + auto& v = (*arr2d)[minVal.arrId]; + if (v.curIdx > minVal.arrIdx) { + minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v.bamArr[minVal.arrIdx]}); + } + } + return ret; + } + + size_t Size() { + size_t len = 0; + if (arr2d != nullptr) { + for (auto& v : *arr2d) { + len += v.curIdx; + } + } + return len - popNum; + } +}; + +struct ArrayInterval { + int idx; + int num; +}; + /* for step-3 serial merge blocks and sort them */ struct MergeSortData { @@ -182,13 +299,16 @@ struct MergeSortData { /* 第一阶段的多线程流水线参数 */ struct FirstPipeArg { static const int READ_BUF_NUM = 2; // 读入的buf数量 - static const int UNCOMPRESS_BUF_NUM = 2; // 解压的buf数量 + static const int UNCOMPRESS_BUF_NUM = 1; // 解压的buf数量 + static const int COMPRESS_BUF_NUM = 2; // merge之后,每个线程缓冲区的数量 int numThread = 0; // 线程数 + uint64_t singleThreadBytes = 0; // 单线程开辟的内存字节上限 uint64_t readOrder = 0; // 读取文件 uint64_t uncompressOrder = 0; // 并行解压gz block uint64_t mergeSortOrder = 0; // 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 + uint64_t mergOrder = 0; // 用来给中间文件编号 volatile int readFinish = 0; volatile int uncompressFinish = 0; @@ -199,12 +319,18 @@ struct FirstPipeArg { ReadData readData[READ_BUF_NUM]; UncompressData uncompressData[UNCOMPRESS_BUF_NUM]; + vector sortedBamArr; + vector taskArr; + vector threadBuf; MergeSortData mergeSortData; FirstPipeArg() { readSig = yarn::NEW_LOCK(0); uncompressSig = yarn::NEW_LOCK(0); + for (int i = 0; i < UNCOMPRESS_BUF_NUM; ++i) { + uncompressData[i].father = this; + } } }; diff --git a/src/sort/sort_impl.cpp b/src/sort/sort_impl.cpp index ff2a4c1..44c8aaa 100644 --- a/src/sort/sort_impl.cpp +++ b/src/sort/sort_impl.cpp @@ -5,3 +5,37 @@ #include "sort_impl.h" +int ks_radixsort(size_t n, OneBam* buf, const sam_hdr_t* h, int tid) { + if (tid == 0) spdlog::info("sort in thread, {} {}", tid, n); + + int curr = 0, ret = -1; + ssize_t i; + OneBam *buf_ar2[2], *bam_a, *bam_b; + uint64_t max_pos = 1; + uint32_t max_tid = 1, tid_bytes = 0, pos_bytes = 0, byte = 0; + uint32_t tid_shift_l, tid_shift_r; + int nref = sam_hdr_nref(h); + + //if (tid == 0) + //spdlog::info("Before: {} {} {} {} {} {} {} {} {} {}", buf[0].pos, buf[1].pos, buf[2].pos, buf[3].pos, buf[4].pos, buf[5].pos, buf[6].pos, + //buf[7].pos, buf[8].pos, buf[9].pos); + // spdlog::info("name: {}", std::string(buf[0].qnameAddr, buf[0].qnameLen)); + // spdlog::info("name addr: {}", (uint64_t)buf[0].qnameLen); + // std::sort(buf, buf + n, [](const OneBam& b1, const OneBam& b2) { return b1.pos < b2.pos; }); + std::sort(buf, buf + n, [](const OneBam& b1, const OneBam& b2) { + int cmp = 0; + strncmp((char*)(b1.blockThread->blockArr[b1.blockIdx].data + b1.offset + OneBam::QnameOffset), + (char*)(b2.blockThread->blockArr[b2.blockIdx].data + b2.offset + OneBam::QnameOffset), std::min(b1.qnameLen, b2.qnameLen)); + if (cmp == 0) + return b1.qnameLen < b2.qnameLen; + return cmp < 0; + }); + if (tid == 0) { + //std::string s1 = (char*)(buf[0].blockThread->blockArr[buf[0].blockIdx].data + buf[0].offset + OneBam::QnameOffset); + //std::string s2 = (char*)(buf[1].blockThread->blockArr[buf[1].blockIdx].data + buf[1].offset + OneBam::QnameOffset); + //spdlog::info("After: {} {} {} {} {} {} {} {} {} {}", s1, s2, buf[2].pos, buf[3].pos, buf[4].pos, buf[5].pos, buf[6].pos, buf[7].pos, + //buf[8].pos, buf[9].pos); + } + + return 0; +} \ No newline at end of file diff --git a/src/sort/sort_impl.h b/src/sort/sort_impl.h index 6948de2..c5eb89b 100644 --- a/src/sort/sort_impl.h +++ b/src/sort/sort_impl.h @@ -3,6 +3,8 @@ #include #include +#include "sort.h" + // Struct which contains the sorting key for TemplateCoordinate sort. struct TemplateCoordinateKey { int tid1; @@ -27,4 +29,6 @@ struct BamSortTag { uint8_t pos_tid[12]; TemplateCoordinateKey *key; } u; -}; \ No newline at end of file +}; + +int ks_radixsort(size_t n, OneBam* buf, const sam_hdr_t* h, int tid); \ No newline at end of file diff --git a/src/util/profiling.cpp b/src/util/profiling.cpp index 61a3798..76a14fa 100644 --- a/src/util/profiling.cpp +++ b/src/util/profiling.cpp @@ -57,6 +57,8 @@ int displayProfiling(int nthread) { PRINT_GP(parse_block); PRINT_GP(uncompress); PRINT_GP(sort); + PRINT_GP(merge); + PRINT_GP(compress); PRINT_GP(write_mid); PRINT_GP(read_mid); PRINT_GP(write_final); diff --git a/src/util/profiling.h b/src/util/profiling.h index 42e37b9..820f305 100644 --- a/src/util/profiling.h +++ b/src/util/profiling.h @@ -52,6 +52,8 @@ enum { GP_parse_block, GP_uncompress, GP_sort, + GP_merge, + GP_compress, GP_write_mid, GP_read_mid, GP_write_final