#pragma once #include #include #include #include "const_val.h" #include "sam_io.h" #include "util/yarn.h" using std::priority_queue; using std::vector; /* for step-1 read data from bam file */ struct ReadData { uint8_t *dataBuf = nullptr; uint8_t *blockBuf = nullptr; int readBufSize = 0; // 读入的buf大小 vector startAddrArr; // 存放每个block的起始地址 ReadData() { } ReadData(int readBufSize_) { readBufSize = readBufSize_; dataBuf = (uint8_t *)malloc(readBufSize); blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); } ~ReadData() { if (dataBuf) free(dataBuf); if (blockBuf) free(blockBuf); } void Resize(int readBufSize_) { if (dataBuf) free(dataBuf); if (blockBuf) free(blockBuf); readBufSize = readBufSize_; dataBuf = (uint8_t *)malloc(readBufSize); blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); } }; /* for step-2 parallel uncompress gz blocks */ struct OneBam { uint16_t bamLen = 0; uint16_t qnameLen = 0; uint32_t tid = 0; char *qnameAddr = 0; // qname的地址 uint64_t pos = 0; // mapping 位置 uint8_t *addr = 0; // 地址 }; struct ThreadBamArr { vector bamArr; int curIdx = 0; // }; struct OneBlock { uint8_t data[SINGLE_BLOCK_SIZE]; uint32_t blockLen = 0; // 解压后的数据长度 uint64_t blockId = 0; // 按照顺序排列的block ID uint64_t bamNum = 0; // 解压后的bam数量 }; struct ThreadBlockArr { vector blockArr; // 解压后的数据 int curIdx = 0; // 当前解压数据对应的vector的索引 }; struct UncompressData { vector blockItemArr; // 每个thread一个,用来保存解压后的block数据 vector bamItemArr; // 每个thread一个,用来保存解压后的bam数据 ReadData *readDataPtr = nullptr; // 读取数据的指针 UncompressData() { } UncompressData(int numThread) { Resize(numThread); } UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); } void Resize(int numThread) { blockItemArr.resize(numThread); bamItemArr.resize(numThread); for (int i = 0; i < numThread; ++i) { blockItemArr[i].blockArr.reserve(128); } } void Resize(int numThread, int vecInitSize) { blockItemArr.resize(numThread); bamItemArr.resize(numThread); for (int i = 0; i < numThread; ++i) { blockItemArr[i].blockArr.reserve(vecInitSize); } } void ResetBlockArr() { for (int i = 0; i < blockItemArr.size(); ++i) { blockItemArr[i].curIdx = 0; } } void ResetBamArr() { for (int i = 0; i < bamItemArr.size(); ++i) { // bamItemArr[i].curIdx = 0; bamItemArr[i].bamArr.clear(); } } }; /* block 排序堆 */ struct BlockArrIdIdx { int arrId = 0; size_t arrIdx = 0; // 下一个待读入数据的idx const OneBlock *block = nullptr; }; struct BlockGreaterThan{ bool operator()(const BlockArrIdIdx &a, const BlockArrIdIdx &b) const { return a.block->blockId > b.block->blockId; } }; /* 用来排序 */ struct BlockHeap { vector *arr2d; priority_queue, BlockGreaterThan> minHeap; size_t popNum = 0; int Init(vector *_arr2d) { arr2d = _arr2d; if (arr2d == nullptr) { return -1; } for (int i = 0; i < arr2d->size(); ++i) { auto &v = (*arr2d)[i]; if (v.curIdx > 0) { minHeap.push({i, 1, &v.blockArr[0]}); } } return 0; } const OneBlock *Pop() { const OneBlock *ret = nullptr; if (!minHeap.empty()) { auto minVal = minHeap.top(); minHeap.pop(); ++popNum; ret = minVal.block; auto &v = (*arr2d)[minVal.arrId]; if (v.curIdx > minVal.arrIdx) { minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v.blockArr[minVal.arrIdx]}); } } return ret; } size_t AllBlockBytes() { size_t bytes = 0; if (arr2d != nullptr) { for (auto &v : *arr2d) { for (int i = 0; i < v.curIdx; ++i) { bytes += v.blockArr[i].blockLen; } } } return bytes; } size_t Size() { size_t len = 0; if (arr2d != nullptr) { for (auto &v : *arr2d) { len += v.curIdx; } } return len - popNum; } }; /* for step-3 serial merge blocks and sort them */ struct MergeSortData { DataBuffer bamData; // 用来保存解压后的数据 // BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度 MergeSortData() { // bamPtrArr.bamArr.reserve(128); } }; /* 第一阶段的多线程流水线参数 */ struct FirstPipeArg { static const int READ_BUF_NUM = 2; // 读入的buf数量 static const int UNCOMPRESS_BUF_NUM = 2; // 解压的buf数量 int numThread = 0; // 线程数 uint64_t readOrder = 0; // 读取文件 uint64_t uncompressOrder = 0; // 并行解压gz block uint64_t mergeSortOrder = 0; // 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 volatile int readFinish = 0; volatile int uncompressFinish = 0; yarn::lock_t *readSig; yarn::lock_t *uncompressSig; ReadData readData[READ_BUF_NUM]; UncompressData uncompressData[UNCOMPRESS_BUF_NUM]; MergeSortData mergeSortData; FirstPipeArg() { readSig = yarn::NEW_LOCK(0); uncompressSig = yarn::NEW_LOCK(0); } }; /* 第二阶段的多线程流水线参数 */ struct PipeSecondArg { /* data */ };