diff --git a/CMakeLists.txt b/CMakeLists.txt index 07431d6..11472b5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") # set(CMAKE_BUILD_TYPE Debug) -set(CMAKE_BUILD_TYPE Release) +# set(CMAKE_BUILD_TYPE Release) add_definitions(-DSHOW_PERF=1) add_definitions(-DSPDLOG_COMPILED_LIB) add_subdirectory(src) \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 97d417d..cfa5b54 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -17,8 +17,9 @@ namespace nsgv { extern SortArg gSortArg; }; -int doSort(); +extern int doSort(); // sort 函数声明 +// 获取当前时间戳 string getCurrentTimeStr() { time_t time_val; struct tm *at; @@ -29,9 +30,10 @@ string getCurrentTimeStr() { return string(now); } +// 入口函数 int main(int argc, char *argv[]) { // init log - spdlog::set_default_logger(spdlog::stderr_color_st("fastdup")); + spdlog::set_default_logger(spdlog::stderr_color_st("fastsort")); spdlog::cfg::load_env_levels(); // init arg parser @@ -138,12 +140,15 @@ int main(int argc, char *argv[]) { program.get("--index-format") == "BAI" ? nsmd::IndexFormat::BAI : nsmd::IndexFormat::CSI; nsgv::gSortArg.SORT_COORIDINATE = program.get("--sort-type") == "COORDINATE"; + // 三种排序方式 std::map query_name_args = { {"NATURAL,", nsmd::QueryNameType::NATURAL}, {"ASCII", nsmd::QueryNameType::ASCII}, {"PICARD", nsmd::QueryNameType::PICARD}}; nsgv::gSortArg.QUERY_NAME_TYPE = query_name_args[program.get("--query-name")]; + + // 设置缓存大小 string maxMemStr = program.get("--mem"); { char* q; @@ -163,8 +168,25 @@ int main(int argc, char *argv[]) { return 1; } +/** + * sort 流程 + * 第一阶段(phase-1): + * 1. 读入bam文件header(要注意如果是sam或者管道输入则不需要解压,如果是bam,需要考虑samtools和gatk的压缩方式不同,gatk不会block对齐) + * 2. phase-1-stage-1: 读入待解压的数据块,直到填满phase-1-stage-1的buffer,读入数据期间分割压缩的block + * 3. phase-1-stage-2: 多线程解压数据块,每个线程解压后放在自己的buf里,乱序解压,线程buf满后,进行线程内的排序 + * 4. phase-1-stage-3: 线程间进行归并排序,然后压缩写入中间文件,清空线程buf,继续读入下一批数据块,直到文件读完 + * 第二阶段(phase-2): + * 1. 打开所有中间文件,并关联上解压线程池 + * 2. phase-2-stage-1:解压中间文件,并进行归并排序,放入buf + * 3. phase-2-stage-2:多线程进行压缩,放入buf + * 4. phase-2-stage-3:将buf写入最终的bam文件 + * 5. 要注意计算index并写入index文件 + */ + spdlog::info("fast bam sort start"); + doSort(); + spdlog::info("fast bam sort end"); displayProfiling(nsgv::gSortArg.NUM_THREADS); diff --git a/src/sort/common_data.h b/src/sort/common_data.h new file mode 100644 index 0000000..cd22fe5 --- /dev/null +++ b/src/sort/common_data.h @@ -0,0 +1,25 @@ +/* + Description: 共用的一些宏定义,全局变量等 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/05/23 +*/ + +#pragma once + +#include "sam_io.h" +#include "sort_args.h" + +#define PROGRAM_NAME "FastSort" +#define BAM_COMPRESS_RATIAO 5 // 大概5倍压缩比,用来粗略估计解压后需要多少内存空间 + + +namespace nsgv { + +// 全局变量 for bamsort +extern SortArg gSortArg; // 参数 +extern HeaderBuf gInHdr; // 输入文件的header +extern bool gIsBigEndian; +}; // namespace nsgv \ No newline at end of file diff --git a/src/sort/compress.cpp b/src/sort/compress.cpp new file mode 100644 index 0000000..e69de29 diff --git a/src/sort/compress.h b/src/sort/compress.h new file mode 100644 index 0000000..bf11f4a --- /dev/null +++ b/src/sort/compress.h @@ -0,0 +1,10 @@ +/* + Description: 压缩相关的函数 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/02/08 +*/ + +#pragma once \ No newline at end of file diff --git a/src/sort/decompress.cpp b/src/sort/decompress.cpp new file mode 100644 index 0000000..e69de29 diff --git a/src/sort/decompress.h b/src/sort/decompress.h new file mode 100644 index 0000000..dcf5379 --- /dev/null +++ b/src/sort/decompress.h @@ -0,0 +1,10 @@ +/* + Description: 解压相关的函数 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/02/08 +*/ + +#pragma once \ No newline at end of file diff --git a/src/sort/global_vars.cpp b/src/sort/global_vars.cpp new file mode 100644 index 0000000..2bfb8dc --- /dev/null +++ b/src/sort/global_vars.cpp @@ -0,0 +1,5 @@ +#include "global_vars.h" + +namespace nsgv { + +}; \ No newline at end of file diff --git a/src/sort/global_vars.h b/src/sort/global_vars.h new file mode 100644 index 0000000..3561aec --- /dev/null +++ b/src/sort/global_vars.h @@ -0,0 +1,19 @@ +/* + Description: 全局变量 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/02/08 +*/ + +#pragma once + +#include "sort_args.h" +#include "sam_io.h" + +namespace nsgv { +extern SortArg gSortArg; // 参数 +extern HeaderBuf gInHdr; // 输入文件的header +extern bool gIsBigEndian; +}; \ No newline at end of file diff --git a/src/sort/phase_1.cpp b/src/sort/phase_1.cpp new file mode 100644 index 0000000..d4c155e --- /dev/null +++ b/src/sort/phase_1.cpp @@ -0,0 +1,52 @@ +/* + Description: 排序第一阶段的并行流水线实现代码 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/05/23 +*/ + +#include "phase_1.h" + +#include + +#include "common_data.h" +#include "phase_1_read.h" +#include "phase_1_uncompress.h" +#include "phase_1_write.h" +#include "util/profiling.h" + +void phase1Pipeline() { + +#if 1 + /* set up*/ + Phase1PipelineArg phase1Arg; + phase1Arg.numThread = nsgv::gSortArg.NUM_THREADS; + const size_t kReadBufSize = 1L * 1024 * 1024 * phase1Arg.numThread; // 平均每线程1M缓冲区,累加起来,用来读入文件(BAM/SAM)(相对解压之后的缓冲区,大小可以忽略) + + phase1Arg.uncompressBufBytes = nsgv::gSortArg.MAX_MEM * 0.9; // 比最大内存参数小点 + phase1Arg.threadBlocksWrap.Resize(phase1Arg.numThread); // 每个线程的解压block数组初始大小,后续如果不够用会自动扩容 + phase1Arg.singleThreadMemBytes = kReadBufSize * BAM_COMPRESS_RATIAO; + spdlog::info("max mem: {}, uncompress mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, phase1Arg.uncompressBufBytes, phase1Arg.singleThreadMemBytes); + for (int i = 0; i < phase1Arg.READ_BUF_NUM; ++i) { + phase1Arg.readData[i].Resize(kReadBufSize); + }; + + // 根据最大内存参数,计算初始化开辟的空间 + phase1Arg.uncompressData.Resize(phase1Arg.uncompressBufBytes); + + PROF_G_BEG(mid_all); + /* create threads */ + pthread_t tidArr[2]; // 2-stage pipeline + pthread_create(&tidArr[0], NULL, phase1ReadFile, &phase1Arg); + pthread_create(&tidArr[1], NULL, phase1Uncompress, &phase1Arg); + + for (int i = 0; i < 2; ++i) pthread_join(tidArr[i], NULL); + + spdlog::info("all bams num: {}", phase1Arg.bamNum); + + PROF_G_END(mid_all); + +#endif +} \ No newline at end of file diff --git a/src/sort/phase_1.h b/src/sort/phase_1.h new file mode 100644 index 0000000..764eb12 --- /dev/null +++ b/src/sort/phase_1.h @@ -0,0 +1,115 @@ +/* + Description: bam是个外排序过程,整个过程包含两个阶段,这是第一个阶段 + * 第一阶段(phase-1): + * 1. 读入bam文件header(要注意如果是sam或者管道输入则不需要解压) + * 2. phase-1-stage-1: 读入待解压的数据块,直到填满phase-1-stage-1的buffer,读入数据期间分割压缩的block + * 3. phase-1-stage-2: 多线程解压数据块,每个线程解压后放在自己的buf里,乱序解压,线程buf满后,进行线程内的排序 + * 4. phase-1-stage-3: 线程间进行归并排序,然后压缩写入中间文件 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/02/08 +*/ + +#pragma once + +#include + +#include "const_val.h" +#include "sam_io.h" +#include "util/yarn.h" +#include "sort.h" + +#define START_IDX(i, nt, nele) ((i) * (nele) / (nt)) +#define STOP_IDX(i, nt, nele) (((i) + 1) * (nele) / (nt)) + +// 第一阶段用到的数据结构 +// 第一阶段的解压、排序、归并、压缩 +struct UncompressBlocksWrap { + vector threadBlocks; // 每个thread一个,用来保存解压后的block数据 + + vector threadBlockBuf; // 每个线程一个,用来保存解压后的block数据,和上边的threadBlocks重复了,暂时保留,后续优化 + + UncompressBlocksWrap() {} + UncompressBlocksWrap(int numThread) { Resize(numThread); } + UncompressBlocksWrap(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); } + void Resize(int numThread) { Resize(numThread, 128); } + void Resize(int numThread, int vecInitSize) { + threadBlocks.resize(numThread); + threadBlockBuf.resize(numThread); + for (int i = 0; i < numThread; ++i) { threadBlocks[i].blockArr.reserve(vecInitSize); } + for (int i = 0; i < numThread; ++i) { + threadBlocks[i].blockArr.reserve(vecInitSize); + threadBlockBuf[i].allocMem(vecInitSize * SINGLE_BLOCK_SIZE); // 每个线程的解压block数组初始大小,后续如果不够用会自动扩容 + } + } + void ResetBlockArr() { + for (int i = 0; i < threadBlocks.size(); ++i) { + threadBlocks[i].clear(); + threadBlockBuf[i].clear(); + } + } + + uint64_t GetTotalBlockNum() { + uint64_t totalBlockNum = 0; + for (int i = 0; i < threadBlocks.size(); ++i) { + totalBlockNum += threadBlocks[i].curIdx; + } + return totalBlockNum; + } + + uint64_t GetHeapBlockNum() { + uint64_t heapBlockNum = 0; + for (int i = 0; i < threadBlocks.size(); ++i) { + heapBlockNum += threadBlocks[i].blockHeap.size(); + } + return heapBlockNum; + } +}; + +/* 第一阶段的多线程流水线参数 */ +struct Phase1PipelineArg { + static const int READ_BUF_NUM = 2; // 读入的buf数量 + static const int UNCOMPRESS_BUF_NUM = 1; // 解压的buf数量, 只有一个 + static const int COMPRESS_BUF_NUM = 1; // 压缩的buf数量, 只有一个 + static const int WRITE_BUF_NUM = 2; // 写入文件buf数量 + + // common parameters + int numThread = 0; // 线程数 + uint64_t bamNum = 0; // 解压后的bam数量 + uint64_t singleThreadMemBytes = 0; // 单线程开辟的内存字节上限 + uint64_t uncompressBufBytes = 0; // 总的解压缓冲区大小 + uint64_t startBlockId = 0; // 当前轮次起始block id + + // for read-uncompress-parse + uint64_t readOrder = 0; // 读取文件轮次编号,与下边的uncompressOrder对应 + uint64_t uncompressOrder = 0; // 并行解压gz block, 包含排序(缓冲区满之后排序),以及合并之后的压缩 + + volatile int readFinish = 0; + yarn::lock_t* readSig; + yarn::lock_t* uncompressSig; + ReadBuffer readData[READ_BUF_NUM]; // 用来读如数据,双缓冲 + UncompressBlocksWrap threadBlocksWrap; // 每个thread一个,用来保存解压后的block数据 + UncompressBlockBuffer uncompressData; // 所有线程共用一个,串行往这里添加解压后的block数据 + + + // for merge-compress-write + uint64_t compressOrder = 0; // 排序后压缩,这个和下边的writeOder对应,跟上边的order不相关 + uint64_t writeOrder = 0; // 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 + + volatile int compressFinish = 0; + yarn::lock_t* compressSig; + yarn::lock_t* writeSig; + + Phase1PipelineArg() { + readSig = yarn::NEW_LOCK(0); + uncompressSig = yarn::NEW_LOCK(0); + } +}; + + + + +// 排序第一阶段,并行流水线执行程序 +void phase1Pipeline(); \ No newline at end of file diff --git a/src/sort/phase_1_read.cpp b/src/sort/phase_1_read.cpp new file mode 100644 index 0000000..937d27a --- /dev/null +++ b/src/sort/phase_1_read.cpp @@ -0,0 +1,111 @@ +/* + Description: 第一阶段的读入线程 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/05/25 +*/ + +#include "phase_1_read.h" + +#include +#include +#include + +#include "common_data.h" +#include "const_val.h" +#include "phase_1.h" +#include "sam_io.h" +#include "sort.h" +#include "util/profiling.h" +#include "util/yarn.h" + +/* 具体执行读取文件操作 */ +/* 将bam文件内容读取到buf,解析buf中的gz block长度信息 */ +static size_t doPhase1ReadFile(Phase1PipelineArg& p, DataBuffer& halfBlock, FILE* fpr) { + ReadBuffer& readData = p.readData[p.readOrder % p.READ_BUF_NUM]; + size_t readState = 0; + size_t curReadPos = 0; + int blockLen = 0; + int maxBlockLen = 0; + + readState = fread(readData.dataBuf, 1, readData.readBufSize, fpr); + if (readState == 0) { + return 0; + } + + readData.startAddrArr.clear(); // 先清空上一次的block起始地址信息,后续重新计算并保存当前buf里每个block的起始地址和长度信息 + + /* 处理上一个不完整的block */ // 需要一个额外的空间来保存上一个不完整的block数据,因为readbuffer里的data和block都会用来解析,而且解析之后都会清空。 + if (halfBlock.readPos > 0) { // 上一轮有剩余 + if (halfBlock.readPos < BLOCK_HEADER_LENGTH) { // 上一轮剩余数据不满足解析block长度信息 + memcpy(&halfBlock.data[halfBlock.readPos], readData.dataBuf, BLOCK_HEADER_LENGTH - halfBlock.readPos); + halfBlock.curLen = unpackInt16(&halfBlock.data[16]) + 1; // 更新一下剩余block的真正长度 + // spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos); + } + memcpy(readData.blockBuf, halfBlock.data, halfBlock.readPos); + curReadPos = halfBlock.curLen - halfBlock.readPos; // curlen保存上一个block的长度,readPos保存上一个block在上一次读取中的长度 + memcpy(&readData.blockBuf[halfBlock.readPos], readData.dataBuf, curReadPos); // 将不完整的block剩余数据拷贝到curBlock + readData.startAddrArr.push_back(readData.blockBuf); + } + /* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */ + while (curReadPos + BLOCK_HEADER_LENGTH <= readState) { /* 确保能解析block长度 */ + blockLen = unpackInt16(&readData.dataBuf[curReadPos + 16]) + 1; + if (blockLen > maxBlockLen) { + maxBlockLen = blockLen; + } + if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */ + readData.startAddrArr.push_back(&readData.dataBuf[curReadPos]); + curReadPos += blockLen; + } else { + break; /* 当前block数据不完整,一部分在还没读入的file数据里 */ + } + } + /* 如果buf中包含不完整的block数据,先保存一下,放到下一轮里去处理 */ + halfBlock.readPos = readState - curReadPos; + halfBlock.curLen = blockLen; + if (halfBlock.readPos > 0) { + memcpy(halfBlock.data, &readData.dataBuf[curReadPos], halfBlock.readPos); // 将不完整的block拷贝到halfBlock + } + + // spdlog::info("block num-1: {}", readData.startAddrArr.size()); + // spdlog::info("read order: {}, max block len: {}", p.readOrder, maxBlockLen); + + return readState; +} + +/* phase1ReadFile step-1 读取文件线程 */ +void* phase1ReadFile(void* data) { + Phase1PipelineArg& p = *(Phase1PipelineArg*)data; + /* 1. set up */ + FILE* fpr = fopen(nsgv::gSortArg.INPUT_FILE.c_str(), "rb"); + parseSamHeader(fpr, nsgv::gInHdr); // 解析header,后续需要用到header里的信息来解析bam数据 + size_t fileSize = 0; + DataBuffer halfBlock(SINGLE_BLOCK_SIZE); + + /* 2. do the work */ + while (true) { + // self dependency + yarn::DEPENDENCY_NOT_TO_BE(p.readSig, p.READ_BUF_NUM); + + PROF_G_BEG(read); + size_t readState = doPhase1ReadFile(p, halfBlock, fpr); + PROF_G_END(read); + + if (readState == 0) { + yarn::SIGNAL_FINISH(p.readSig, p.readFinish); + break; + } + + // update self status + yarn::UPDATE_SIG_ORDER(p.readSig, p.readOrder); + + fileSize += readState; + } + + spdlog::info("read file order: {}, file size: {}", p.readOrder, fileSize); + /* 3. clean up */ + fclose(fpr); + return nullptr; +} \ No newline at end of file diff --git a/src/sort/phase_1_read.h b/src/sort/phase_1_read.h new file mode 100644 index 0000000..cb2ebce --- /dev/null +++ b/src/sort/phase_1_read.h @@ -0,0 +1,13 @@ +/* + Description: 第一阶段的读入线程 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/05/25 +*/ + +#pragma once + +/* phase1ReadFile step-1 读取文件线程 */ +void* phase1ReadFile(void* data); \ No newline at end of file diff --git a/src/sort/phase_1_uncompress.cpp b/src/sort/phase_1_uncompress.cpp new file mode 100644 index 0000000..a26f7d8 --- /dev/null +++ b/src/sort/phase_1_uncompress.cpp @@ -0,0 +1,184 @@ +/* + Description: 第一阶段的解压线程 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/05/25 +*/ + +#include "phase_1_uncompress.h" + +#include +#include +#include +#include + +#include "common_data.h" +#include "const_val.h" +#include "phase_1.h" +#include "sam_io.h" +#include "sort.h" +#include "util/profiling.h" +#include "util/yarn.h" + +/* 多线程解压 */ +static void mtUncompressBlock(void* data, long idx, int tid) { + PROF_T_BEG(mem_copy); + + Phase1PipelineArg& p = *(Phase1PipelineArg*)data; + ReadBuffer & readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; + + auto& blockArr = p.threadBlocksWrap.threadBlocks[tid]; + auto& blockItem = blockArr.add(); + uint8_t* block = readData.startAddrArr[idx]; + + size_t dlen = SINGLE_BLOCK_SIZE; // 65535 + int block_length = unpackInt16(&block[16]) + 1; + uint32_t crc = le_to_u32(block + block_length - 8); + int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef*)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc); + if (ret != 0) { + spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret); + exit(0); + } + blockItem.blockId = idx + p.startBlockId; + blockItem.blockLen = dlen; + blockArr.blockHeap.push({blockItem.blockId, blockArr.curIdx - 1}); // 解压完成后,将block的id和在block数组里的索引加入堆中,方便后续排序和合并 + +#if 0 + // 放入全局缓冲区 + // spdlog::info("top id: {}, block id: {}", blockArr.blockHeap.top().blockId, p.uncompressData.nextBlockId); + while (blockArr.blockHeap.top().blockId == p.uncompressData.nextBlockId) { + auto& top = blockArr.blockHeap.top(); + // auto& topBlock = blockArr.blockArr[top.blockArrIdx]; + // memcpy(p.uncompressData.dataBuf + p.uncompressData.usedBufSize, topBlock.data, topBlock.blockLen); + // p.uncompressData.startAddrArr.push_back(p.uncompressData.dataBuf + p.uncompressData.usedBufSize); + // p.uncompressData.usedBufSize += topBlock.blockLen; + // p.bamNum += topBlock.bamNum; + + blockArr.blockHeap.pop(); + p.uncompressData.nextBlockId += 1; + p.uncompressData.blockNum += 1; + } +#endif + + PROF_T_END(tid, mem_copy); +} + +// 多线程解压,静态分配任务,此时用idx代替tid,multi-thread uncompress bam blocks +static void mtUncompressBlockBatch(void* data, long idx, int tid) { + PROF_T_BEG(mem_copy); + + Phase1PipelineArg& p = *(Phase1PipelineArg*)data; + ReadBuffer& readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; + + tid = idx; // 静态分配任务,此时用idx代替tid + int startIdx = START_IDX(idx, p.numThread, readData.startAddrArr.size()); + int stopIdx = STOP_IDX(idx, p.numThread, readData.startAddrArr.size()); + + auto &blockBuf = p.threadBlocksWrap.threadBlockBuf[tid]; + + if (stopIdx - startIdx > blockBuf.maxLen / SINGLE_BLOCK_SIZE) { + blockBuf.reAllocMem((stopIdx - startIdx) * SINGLE_BLOCK_SIZE); + } + + for (int i = startIdx; i < stopIdx; ++i) { + uint8_t* block = readData.startAddrArr[i]; + size_t dlen = SINGLE_BLOCK_SIZE; // 65535 + int block_length = unpackInt16(&block[16]) + 1; + uint32_t crc = le_to_u32(block + block_length - 8); + int ret = bgzfUncompress(blockBuf.data + blockBuf.curLen, &dlen, (Bytef*)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc); + if (ret != 0) { + spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret); + exit(0); + } + blockBuf.curLen += dlen; + } + PROF_T_END(tid, mem_copy); +} + +static void mtMemCopy(void* data, long idx, int tid) { + Phase1PipelineArg& p = *(Phase1PipelineArg*)data; + tid = idx; // 静态分配任务,此时用idx代替tid + uint64_t offset = 0; + for (int i = 0; i < tid; ++i) { + offset += p.threadBlocksWrap.threadBlockBuf[i].curLen; + } + memcpy(p.uncompressData.dataBuf + p.uncompressData.usedBufSize + offset, p.threadBlocksWrap.threadBlockBuf[tid].data, + p.threadBlocksWrap.threadBlockBuf[tid].curLen); +} + +/* 将gz block进行解压,并进行线程内排序 */ +static void doPhase1Uncompress(Phase1PipelineArg& p, int finish = 0) { + PROF_G_BEG(uncompress); + uint64_t blockNum = p.readData[p.uncompressOrder % p.READ_BUF_NUM].startAddrArr.size(); + // kt_for(p.numThread, mtUncompressBlock, &p, blockNum); + kt_for(p.numThread, mtUncompressBlockBatch, &p, p.numThread); + // 串行拷贝所有blocks + PROF_G_BEG(mem_copy); +#if 1 + kt_for(p.numThread, mtMemCopy, &p, p.numThread); +#else + for (int i = 0; i < p.numThread; ++i) { + memcpy(p.uncompressData.dataBuf + p.uncompressData.usedBufSize, p.threadBlocksWrap.threadBlockBuf[i].data, p.threadBlocksWrap.threadBlockBuf[i].curLen); + // p.uncompressData.startAddrArr.push_back(p.uncompressData.dataBuf + p.uncompressData.usedBufSize); + p.uncompressData.usedBufSize += p.threadBlocksWrap.threadBlockBuf[i].curLen; + } +#endif + +#if 0 + for (int i = 0; i < 1; ++i) { + auto& blockArr = p.threadBlocksWrap.threadBlocks[i]; + for (int j = 0; j < blockArr.curIdx; ++j) { + auto& blockItem = blockArr.blockArr[j]; + memcpy(p.uncompressData.dataBuf + p.uncompressData.usedBufSize, blockItem.data, blockItem.blockLen); + p.uncompressData.startAddrArr.push_back(p.uncompressData.dataBuf + p.uncompressData.usedBufSize); + p.uncompressData.usedBufSize += blockItem.blockLen; + p.uncompressData.blockNum += 1; + } + } +#endif + + PROF_G_END(mem_copy); + + p.startBlockId += blockNum; + + if (true) { // 缓冲区满了 + spdlog::info("blocks num: {}, left: {}, uncompressed: {}", p.threadBlocksWrap.GetTotalBlockNum(), p.threadBlocksWrap.GetHeapBlockNum(), + p.uncompressData.blockNum); + p.uncompressData.Clear(); + p.threadBlocksWrap.ResetBlockArr(); + p.uncompressData.nextBlockId = p.startBlockId; + p.uncompressData.blockNum = 0; + } + + PROF_G_END(uncompress); +} + +/* phase1Uncompress step-2 解压线程 */ +void* phase1Uncompress(void* data) { + Phase1PipelineArg& p = *(Phase1PipelineArg*)data; + + /* 2. do the work */ + while (true) { + // previous dependency + yarn::DEPENDENCY_NOT_TO_BE(p.readSig, 0); + + if (p.readFinish) { + while (p.uncompressOrder < p.readOrder) { + doPhase1Uncompress(p, 1); + p.uncompressOrder += 1; + } + break; + } + + doPhase1Uncompress(p); + + // update status + yarn::CONSUME_SIGNAL(p.readSig); + p.uncompressOrder += 1; + } + + spdlog::info("uncompress order: {}", p.uncompressOrder); + return nullptr; +} \ No newline at end of file diff --git a/src/sort/phase_1_uncompress.h b/src/sort/phase_1_uncompress.h new file mode 100644 index 0000000..2e608ef --- /dev/null +++ b/src/sort/phase_1_uncompress.h @@ -0,0 +1,13 @@ +/* + Description: 第一阶段的解压线程 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/05/25 +*/ + +#pragma once + +/* phase1Uncompress step-2 解压线程 */ +void* phase1Uncompress(void* data); \ No newline at end of file diff --git a/src/sort/phase_1_write.cpp b/src/sort/phase_1_write.cpp new file mode 100644 index 0000000..e69de29 diff --git a/src/sort/phase_1_write.h b/src/sort/phase_1_write.h new file mode 100644 index 0000000..e69de29 diff --git a/src/sort/phase_2.cpp b/src/sort/phase_2.cpp new file mode 100644 index 0000000..e69de29 diff --git a/src/sort/phase_2.h b/src/sort/phase_2.h new file mode 100644 index 0000000..f562f55 --- /dev/null +++ b/src/sort/phase_2.h @@ -0,0 +1,15 @@ +/* + Description: 排序过程的第二阶段 + * 第二阶段(phase-2): + * 1. 打开所有中间文件,并关联上解压线程池 + * 2. phase-2-stage-1:解压中间文件,并进行归并排序,放入buf + * 3. phase-2-stage-2:多线程进行压缩,放入buf + * 4. phase-2-stage-3:将buf写入最终的bam文件 + * 5. 要注意计算index并写入index文件 + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/02/08 +*/ + +#pragma once \ No newline at end of file diff --git a/src/sort/process_header.cpp b/src/sort/process_header.cpp new file mode 100644 index 0000000..e69de29 diff --git a/src/sort/process_header.h b/src/sort/process_header.h new file mode 100644 index 0000000..ac1a89e --- /dev/null +++ b/src/sort/process_header.h @@ -0,0 +1,10 @@ +/* + Description: 处理bam/sam文件的header,可能需要解压 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/02/08 +*/ + +#pragma once \ No newline at end of file diff --git a/src/sort/sam_io.h b/src/sort/sam_io.h index b13c10f..1063a12 100644 --- a/src/sort/sam_io.h +++ b/src/sort/sam_io.h @@ -4,6 +4,7 @@ #define INCREASE_SIZE (8L * 1024 * 1024) +// 一般用途的数据缓冲区 struct DataBuffer { uint8_t *data; size_t readPos = 0; // 当前读取的位置 @@ -36,6 +37,7 @@ struct DataBuffer { data = (uint8_t *)realloc(data, maxLen); } } + void clear() { curLen = 0; readPos = 0; } }; struct HeaderBuf { diff --git a/src/sort/sort.cpp b/src/sort/sort.cpp index 49754e0..3f5c7ec 100644 --- a/src/sort/sort.cpp +++ b/src/sort/sort.cpp @@ -23,6 +23,12 @@ #include "util/profiling.h" #include "util/yarn.h" +#include "process_header.h" +#include "phase_1.h" +#include "phase_2.h" +#include "compress.h" +#include "decompress.h" + using std::string; #define BAM_BLOCK_SIZE 16L * 1024 * 1024 @@ -49,7 +55,7 @@ struct BamSortData { /* 将bam文件内容读取到buf,解析buf中的gz block长度信息 */ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *fpr) { - ReadData &readData = p.readData[p.readOrder % p.READ_BUF_NUM]; + ReadBuffer &readData = p.readData[p.readOrder % p.READ_BUF_NUM]; size_t readState = 0; size_t curReadPos = 0; int blockLen = 0; @@ -60,7 +66,7 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE * readData.startAddrArr.clear(); - /* 处理上一个不完整的block */ + /* 处理上一个不完整的block */ // 需要一个额外的空间来保存上一个不完整的block数据,因为readbuffer里的data和block都会用来解析,而且解析之后都会清空。 if (halfBlock.readPos > 0) { // 上一轮有剩余 if (halfBlock.readPos < BLOCK_HEADER_LENGTH) { // 上一轮剩余数据不满足解析block长度信息 memcpy(&halfBlock.data[halfBlock.readPos], readData.dataBuf, BLOCK_HEADER_LENGTH - halfBlock.readPos); @@ -160,7 +166,7 @@ static int parseBam(uint8_t* addr, bam1_t* b) { c->mtid = le_to_u32(x + 20); c->mpos = le_to_i32(x + 24); c->isize = le_to_i32(x + 28); -/* +#if 0 new_l_data = block_len - 32 + c->l_extranul; if (new_l_data > INT_MAX || c->l_qseq < 0 || c->l_qname < 1) return -4; @@ -198,7 +204,7 @@ static int parseBam(uint8_t* addr, bam1_t* b) { return -4; } } -*/ +#endif return 4 + block_len; } @@ -207,7 +213,7 @@ static void mtUncompressBlock(void *data, long idx, int tid) { PROF_T_BEG(mem_copy); UncompressData& p = *(UncompressData*)data; - ReadData &readData = *p.readDataPtr; + ReadBuffer &readData = *p.readDataPtr; auto &blockItemArr = p.blockItemArr[tid]; auto &bamItemArr = p.bamItemArr[tid]; @@ -263,8 +269,9 @@ static void mtUncompressBlock(void *data, long idx, int tid) { exit(0); } blockItem.bamNum = bamNum; + blockItemArr.bamNum += bamNum; - // 判断是否超过内存阈值 + // 判断是否超过内存阈值,只要有一个超过阈值,就转入下一阶段,进行排序和合并,但是也得把这一轮数据解压处理完 if (blockItemArr.curIdx * SINGLE_BLOCK_SIZE * BLOCK_MEM_FACTOR >= p.father->singleThreadBytes) { p.father->uncompressBufFull = 1; } else { @@ -277,7 +284,7 @@ static void mtUncompressBlock(void *data, long idx, int tid) { static void mtUncompressBlockBatch(void* data, long idx, int tid) { PROF_T_BEG(mem_copy); UncompressData& p = *(UncompressData*)data; - ReadData& readData = *p.readDataPtr; + ReadBuffer& readData = *p.readDataPtr; int startIdx = START_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size()); int stopIdx = STOP_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size()); @@ -306,6 +313,7 @@ static void mtUncompressBlockBatch(void* data, long idx, int tid) { uint32_t bamLen = 0; uint32_t bamNum = 0; +#if 1 /* 解析每个bam */ while (nextBamStart + 4 <= blockItem.blockLen) { bamItemArr.bamArr.push_back(OneBam()); @@ -332,6 +340,7 @@ static void mtUncompressBlockBatch(void* data, long idx, int tid) { spdlog::error("Block content does not contain integer number of bam records!"); exit(0); } +#endif blockItem.bamNum = bamNum; } @@ -397,13 +406,17 @@ static void mtCompressBlock(void* data, long idx, int tid) { size_t dlen = BGZF_MAX_BLOCK_SIZE; // spdlog::info("len: {}", curAddr); bgzfCompress(block, &dlen, compressBlock, curAddr, -1); + // 先放到自己线程内部的buf里 + + // 然后在压缩完成之后,检测当前全局压缩序号,如果当前序号的block在自己线程内,则复制过去 + } /* 将gz block进行解压,并进行线程内排序 */ -static void doFirstPipeUncompress(FirstPipeArg &p) { +static void doFirstPipeUncompress(FirstPipeArg &p, int finish = 0) { // return; PROF_G_BEG(uncompress); - ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; + ReadBuffer &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM]; UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM]; uncompressData.readDataPtr = &readData; @@ -415,19 +428,19 @@ static void doFirstPipeUncompress(FirstPipeArg &p) { PROF_G_END(uncompress); - // 判断是否超过内存阈值 - if (p.uncompressBufFull == 1) { + // 判断是否超过内存阈值,只要有一个超过阈值,就转入下一阶段,进行排序和合并 + if (p.uncompressBufFull == 1 || finish) { - spdlog::info("buf full - {}", p.mergOrder); + spdlog::info("buf full - {}", p.mergeOrder); // spdlog::info("max mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, p.singleThreadBytes); - // sort + // sort,排序的时候应该线程利用率很高,此时不需要跟其他操作如压缩等进行重叠了 PROF_G_BEG(sort); kt_for(p.numThread, mtInThreadSort, &uncompressData, nsgv::gSortArg.NUM_THREADS); PROF_G_END(sort); // merge -#if 1 +#if 0 auto& bamArr = p.sortedBamArr; BamHeap heap; heap.Init(&uncompressData.bamItemArr); @@ -462,19 +475,29 @@ static void doFirstPipeUncompress(FirstPipeArg &p) { // spdlog::info("pos all: {}", posAll); #endif - // compress + // compress,此时并行压缩的多线程利用率应该很高了 PROF_G_BEG(compress); +#if 0 kt_for(p.numThread, mtCompressBlock, &p, taskArr.size()); +#endif + + for (auto &blockItemArr : uncompressData.blockItemArr) { + p.bamNum += blockItemArr.bamNum; + } uncompressData.ResetBlockArr(); uncompressData.ResetBamArr(); p.uncompressBufFull = 0; - p.mergOrder++; + p.mergeOrder++; for(auto &buf : p.threadBuf) { buf.curLen = 0; } PROF_G_END(compress); + + // 压缩完之后应该写入中间文件,这个应该可以overlap } + // 等处理完这些数据之后,进入新一轮的解压和归并排序压缩输出中间文件。 + // auto &bam = uncompressData.bamItemArr[0].bamArr.back(); // string qname(bam.qnameAddr, bam.qnameLen); // spdlog::info("bam name:{}", qname); @@ -494,7 +517,7 @@ static void *firstPipeUncompress(void *data) { if (p.readFinish) { while (p.uncompressOrder < p.readOrder) { yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM); - doFirstPipeUncompress(p); + doFirstPipeUncompress(p, 1); yarn::UPDATE_SIG_ORDER(p.uncompressSig, p.uncompressOrder); } yarn::SIGNAL_FINISH(p.uncompressSig, p.uncompressFinish); @@ -513,9 +536,9 @@ static void *firstPipeUncompress(void *data) { } /* 将并行解压的数据放到一起,解析,到容量阈值后,进行排序并输出到中间文件 */ -void doFirstPipeMergeSort(FirstPipeArg &p) { +void dofirstPipeWrite(FirstPipeArg &p) { PROF_G_BEG(sort); - UncompressData &uncompressData = p.uncompressData[p.mergeSortOrder % p.UNCOMPRESS_BUF_NUM]; + UncompressData &uncompressData = p.uncompressData[p.writeOrder % p.UNCOMPRESS_BUF_NUM]; size_t blockNum = 0; for (int i = 0; i < p.numThread; ++i) { @@ -535,32 +558,32 @@ void doFirstPipeMergeSort(FirstPipeArg &p) { PROF_G_END(sort); } -/* FirstPipe step-3 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 */ -static void *firstPipeMergeSort(void *data) { +/* FirstPipe step-3 串行写入中间文件(已经压缩好) */ +static void *firstPipeWrite(void *data) { FirstPipeArg &p = *(FirstPipeArg *)data; while (true) { yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, 0); if (p.uncompressFinish) { - spdlog::info("uncompress finish, cur sort order: {}", p.mergeSortOrder); - while (p.mergeSortOrder < p.uncompressOrder) { - doFirstPipeMergeSort(p); - p.mergeSortOrder += 1; + spdlog::info("uncompress finish, cur sort order: {}", p.writeOrder); + while (p.writeOrder < p.uncompressOrder) { + dofirstPipeWrite(p); + p.writeOrder += 1; } /* 需要检测一下buf中是否还有数据,如果还有,则需要进行排序,可以不输出到中间文件,直接进行second-pipe的归并排序 */ break; } - doFirstPipeMergeSort(p); + dofirstPipeWrite(p); - p.mergeSortOrder += 1; + p.writeOrder += 1; // update status yarn::CONSUME_SIGNAL(p.uncompressSig); } - spdlog::info("merge sort order: {}", p.mergeSortOrder); + spdlog::info("merge sort order: {}", p.writeOrder); return nullptr; } @@ -571,7 +594,7 @@ static void bamSortFirstPipe() { firstPipeArg.numThread = nsgv::gSortArg.NUM_THREADS; firstPipeArg.singleThreadBytes = nsgv::gSortArg.MAX_MEM / firstPipeArg.numThread; spdlog::info("max mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, firstPipeArg.singleThreadBytes); - const size_t kReadBufSize = 1L * 1024 * 1024 * firstPipeArg.numThread; + const size_t kReadBufSize = 1L * 1024 * 1024 * firstPipeArg.numThread; // 平均每线程1M缓冲区,累加起来,用来读入文件(BAM/SAM)(相对解压之后的缓冲区,大小可以忽略) for (int i = 0; i= 0) { + bam_num++; if (max_bam_len < bamp->l_data) max_bam_len = bamp->l_data; if (bamp->l_data > 1000) { spdlog::info("large record len: {}", bamp->l_data); @@ -673,6 +704,7 @@ int doSort() { } sam_close(inBamFp); spdlog::info("max record len: {}", max_bam_len); + spdlog::info("bam num: {}", bam_num); #endif diff --git a/src/sort/sort.h b/src/sort/sort.h index df09542..69a822b 100644 --- a/src/sort/sort.h +++ b/src/sort/sort.h @@ -12,22 +12,22 @@ using std::priority_queue; using std::vector; -#define START_IDX(i, nt, nele) ((i) * (nele) / (nt)) -#define STOP_IDX(i, nt, nele) (((i)+1) * (nele) / (nt)) +//#define START_IDX(i, nt, nele) ((i) * (nele) / (nt)) +//#define STOP_IDX(i, nt, nele) (((i)+1) * (nele) / (nt)) /* for step-1 read data from bam file */ -struct ReadData { +struct ReadBuffer { uint8_t *dataBuf = nullptr; uint8_t *blockBuf = nullptr; // 用来保存上一轮没能完整读取的block int readBufSize = 0; // 读入的buf大小 vector startAddrArr; // 存放每个block的起始地址 - ReadData() { } - ReadData(int readBufSize_) { + ReadBuffer() { } + ReadBuffer(int readBufSize_) { readBufSize = readBufSize_; dataBuf = (uint8_t *)malloc(readBufSize); blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); } - ~ReadData() { + ~ReadBuffer() { if (dataBuf) free(dataBuf); if (blockBuf) free(blockBuf); } @@ -40,6 +40,46 @@ struct ReadData { } }; +/* 用来串行保存解压后的block数据, 跟ReadBuffer差不多*/ +struct UncompressBlockBuffer { + uint8_t *dataBuf = nullptr; // 用来保存解压后的block数据,串行往这里添加解压后的block数据 + uint8_t *unCompleteBuf = nullptr; // 用来保存上一轮没能完整解析的bam数据(前一部分,需要后续拷贝到dataBuf里) + uint64_t dataBufSize = 0; // 存放的解压之后的block的buf大小 + uint64_t usedBufSize = 0; // 已经使用的buf大小 + uint64_t usedUnCompleteBufSize = 0; // 已经使用的unCompleteBuf大小 + int unCompleteBufSize = 0; // unCompleteBuf里数据的字节数,即上一轮不完整的bam的前半部分大小 + vector startAddrArr; // 存放每个bam的起始地址 + uint64_t nextBlockId = 0; // 下一个需要放入的block id,按照顺序放入dataBuf里 + + uint64_t blockNum = 0; // 解压后的block数量 + + UncompressBlockBuffer() { } + UncompressBlockBuffer(uint64_t dataBufSize_) { + dataBufSize = dataBufSize_; + dataBuf = (uint8_t *)malloc(dataBufSize); + unCompleteBufSize = SINGLE_BLOCK_SIZE; + unCompleteBuf = (uint8_t*)malloc(SINGLE_BLOCK_SIZE); + } + ~UncompressBlockBuffer() { + if (dataBuf) free(dataBuf); + if (unCompleteBuf) free(unCompleteBuf); + } + void Resize(uint64_t dataBufSize_) { + if (dataBuf) free(dataBuf); + if (unCompleteBuf) free(unCompleteBuf); + dataBufSize = dataBufSize_; + unCompleteBufSize = SINGLE_BLOCK_SIZE; + dataBuf = (uint8_t *)malloc(dataBufSize); + unCompleteBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); + } + void Clear() { usedBufSize = 0; usedUnCompleteBufSize = 0; startAddrArr.clear(); } +}; + + +/* */ + + + class ThreadBlockArr; /* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序,用来排序的*/ struct OneBam { @@ -84,15 +124,30 @@ struct ThreadBamArr { void clear() { curIdx = 0; } }; +// 解压后的一个block数据 struct OneBlock { uint8_t data[SINGLE_BLOCK_SIZE]; uint32_t blockLen = 0; // 解压后的数据长度 uint64_t blockId = 0; // 按照顺序排列的block ID uint64_t bamNum = 0; // 解压后的bam数量 }; + +struct BlockIdIdx { + uint64_t blockId = 0; + int blockArrIdx = 0; // 在block数组里的索引 + // 方法一:重载 operator> 供 std::greater 使用 + bool operator>(const BlockIdIdx& other) const { + return blockId > other.blockId; // blockId 小的优先级高(最小堆) + } +}; +// 每个线程一个解压block数组 struct ThreadBlockArr { vector blockArr; // 解压后的数据 int curIdx = 0; // 当前解压数据对应的vector的索引 + uint64_t bamNum = 0; // 解压后的bam数量 + // 最小堆 + std::priority_queue, std::greater> blockHeap; + OneBlock& add() { if (curIdx < blockArr.size()) return blockArr[curIdx++]; @@ -114,7 +169,11 @@ struct ThreadBlockArr { } } - void clear() { curIdx = 0; } + void clear() { + curIdx = 0; + bamNum = 0; + blockHeap = {}; + } }; class FirstPipeArg; @@ -124,7 +183,7 @@ struct UncompressData { vector blockItemArr; // 每个thread一个,用来保存解压后的block数据 vector bamItemArr; // 每个thread一个,用来保存解压后的bam数据 - ReadData *readDataPtr = nullptr; // 读取数据的指针 + ReadBuffer *readDataPtr = nullptr; // 读取数据的指针 FirstPipeArg* father = nullptr; // 保存父参数的指针,可能更新一些状态 UncompressData() { } @@ -138,7 +197,7 @@ struct UncompressData { bamItemArr.resize(numThread); for (int i = 0; i < numThread; ++i) { blockItemArr[i].blockArr.reserve(vecInitSize); - bamItemArr[i].bamArr.reserve(vecInitSize * 192); + bamItemArr[i].bamArr.reserve(vecInitSize * 192); // 192是每个block平均包含的bam数量,单线程解压一个block的数据,平均会得到192条bam记录,应该作为一个参数或者宏定义 } } void ResetBlockArr() { @@ -305,10 +364,13 @@ struct FirstPipeArg { int numThread = 0; // 线程数 uint64_t singleThreadBytes = 0; // 单线程开辟的内存字节上限 + // for test + uint64_t bamNum = 0; // 解压后的bam数量 + uint64_t readOrder = 0; // 读取文件 - uint64_t uncompressOrder = 0; // 并行解压gz block - uint64_t mergeSortOrder = 0; // 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 - uint64_t mergOrder = 0; // 用来给中间文件编号 + uint64_t uncompressOrder = 0; // 并行解压gz block, 包含排序(缓冲区满之后排序),以及合并之后的压缩 + uint64_t writeOrder = 0; // 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 + uint64_t mergeOrder = 0; // 用来给中间文件编号 volatile int readFinish = 0; volatile int uncompressFinish = 0; @@ -317,8 +379,9 @@ struct FirstPipeArg { yarn::lock_t *readSig; yarn::lock_t *uncompressSig; - ReadData readData[READ_BUF_NUM]; - UncompressData uncompressData[UNCOMPRESS_BUF_NUM]; + ReadBuffer readData[READ_BUF_NUM]; // 用来读如数据,双缓冲 + UncompressData uncompressData[UNCOMPRESS_BUF_NUM]; // 每个线程内保留一些自己的数据,比如解压缩的数据 + UncompressBlockBuffer unCompblockDataBuf; // 所有线程共用一个,串行往这里添加解压后的block数据 vector sortedBamArr; vector taskArr; vector threadBuf; diff --git a/src/sort/sort_impl.h b/src/sort/sort_impl.h index c5eb89b..c9da2d6 100644 --- a/src/sort/sort_impl.h +++ b/src/sort/sort_impl.h @@ -5,6 +5,8 @@ #include "sort.h" +// 好像不需要用到这些了? + // Struct which contains the sorting key for TemplateCoordinate sort. struct TemplateCoordinateKey { int tid1; diff --git a/src/util/profiling.h b/src/util/profiling.h index 820f305..a0fc35c 100644 --- a/src/util/profiling.h +++ b/src/util/profiling.h @@ -29,9 +29,9 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE]; #define PROF_T_BEG_AGAIN(time_name) prof_T_tmp_##time_name = realtimeMsec() #define PROF_T_END(tid, time_name) tprof[TP_##time_name][tid] += realtimeMsec() - prof_T_tmp_##time_name #define PROF_PRINT_BEG(tmp_time) uint64_t tmp_time = realtimeMsec() -#define PROF_PRINT_END(tmp_time) \ - tmp_time = realtimeMsec() - tmp_time; \ - fprintf(stderr, "time %-15s: %0.2lfs\n", #tmp_time, tmp_time * 1.0 / proc_freq) +//#define PROF_PRINT_END(tmp_time) \ +// tmp_time = realtimeMsec() - tmp_time; \ +// fprintf(stderr, "time %-15s: %0.2lfs\n", #tmp_time, tmp_time * 1.0 / proc_freq) #else #define PROF_G_BEG(time_name) #define PROF_G_BEG_AGAIN(time_name) @@ -41,7 +41,24 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE]; #define PROF_T_END(tid, time_name) #endif - +#ifdef SHOW_PERF +#define PROF_START(tmp_time) uint64_t prof_tmp_##tmp_time = RealtimeMsec() +#define PROF_START_AGAIN(tmp_time) prof_tmp_##tmp_time = RealtimeMsec() +#define PROF_END(result, tmp_time) result += RealtimeMsec() - prof_tmp_##tmp_time +#define PROF_GP_END(tmp_time) gprof[tmp_time] += RealtimeMsec() - prof_tmp_##tmp_time +#define PROF_TP_END(tmp_time) tprof[tmp_time][thid] += RealtimeMsec() - prof_tmp_##tmp_time +#define PROF_PRINT_START(tmp_time) uint64_t tmp_time = RealtimeMsec() +#define PROF_PRINT_END(tmp_time) \ + tmp_time = RealtimeMsec() - tmp_time; \ + fprintf(stderr, "time %-15s: %0.2lfs\n", #tmp_time, tmp_time * 1.0 / proc_freq) +#else +#define PROF_START(tmp_time) +#define PROF_END(result, tmp_time) +#define PROF_GP_END(tmp_time) +#define PROF_TP_END(tmp_time) +#define PROF_PRINT_START(tmp_time) +#define PROF_PRINT_END(tmp_time) +#endif // GLOBAL enum { GP_0 = 0, GP_1, GP_2, GP_3, GP_4, GP_5, GP_6, GP_7, GP_8, GP_9, GP_10 };