196 lines
5.4 KiB
C
196 lines
5.4 KiB
C
|
|
#pragma once
|
|||
|
|
|
|||
|
|
#include <inttypes.h>
|
|||
|
|
|
|||
|
|
#include <queue>
|
|||
|
|
#include <vector>
|
|||
|
|
|
|||
|
|
#include "const_val.h"
|
|||
|
|
#include "sam_io.h"
|
|||
|
|
#include "util/yarn.h"
|
|||
|
|
|
|||
|
|
using std::priority_queue;
|
|||
|
|
using std::vector;
|
|||
|
|
|
|||
|
|
/* for step-1 read data from bam file */
|
|||
|
|
struct ReadData {
|
|||
|
|
uint8_t *dataBuf = nullptr;
|
|||
|
|
uint8_t *blockBuf = nullptr;
|
|||
|
|
int readBufSize = 0; // 读入的buf大小
|
|||
|
|
vector<uint8_t *> startAddrArr; // 存放每个block的起始地址
|
|||
|
|
ReadData() { }
|
|||
|
|
ReadData(int readBufSize_) {
|
|||
|
|
readBufSize = readBufSize_;
|
|||
|
|
dataBuf = (uint8_t *)malloc(readBufSize);
|
|||
|
|
blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
|
|||
|
|
}
|
|||
|
|
~ReadData() {
|
|||
|
|
if (dataBuf) free(dataBuf);
|
|||
|
|
if (blockBuf) free(blockBuf);
|
|||
|
|
}
|
|||
|
|
void Resize(int readBufSize_) {
|
|||
|
|
if (dataBuf) free(dataBuf);
|
|||
|
|
if (blockBuf) free(blockBuf);
|
|||
|
|
readBufSize = readBufSize_;
|
|||
|
|
dataBuf = (uint8_t *)malloc(readBufSize);
|
|||
|
|
blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
/* for step-2 parallel uncompress gz blocks */
|
|||
|
|
struct OneBlock {
|
|||
|
|
uint8_t data[SINGLE_BLOCK_SIZE];
|
|||
|
|
uint32_t blockLen = 0; // 解压后的数据长度
|
|||
|
|
uint64_t blockId = 0; // 按照顺序排列的block ID
|
|||
|
|
uint64_t bamNum = 0; // 解压后的bam数量
|
|||
|
|
};
|
|||
|
|
struct ThreadBlockArr {
|
|||
|
|
vector<OneBlock> blockArr; // 解压后的数据
|
|||
|
|
int curIdx = 0; // 当前解压数据对应的vector的索引
|
|||
|
|
};
|
|||
|
|
struct UncompressData {
|
|||
|
|
vector<ThreadBlockArr> blockItemArr; // 每个thread一个,用来保存解压后的block数据
|
|||
|
|
ReadData *readDataPtr = nullptr; // 读取数据的指针
|
|||
|
|
|
|||
|
|
UncompressData() { }
|
|||
|
|
UncompressData(int numThread) { Resize(numThread); }
|
|||
|
|
UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); }
|
|||
|
|
void Resize(int numThread) {
|
|||
|
|
blockItemArr.resize(numThread);
|
|||
|
|
for (int i = 0; i < numThread; ++i) {
|
|||
|
|
blockItemArr[i].blockArr.reserve(128);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
void Resize(int numThread, int vecInitSize) {
|
|||
|
|
blockItemArr.resize(numThread);
|
|||
|
|
for (int i = 0; i < numThread; ++i) {
|
|||
|
|
blockItemArr[i].blockArr.reserve(vecInitSize);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
void ResetBlockArr() {
|
|||
|
|
for (int i = 0; i < blockItemArr.size(); ++i) {
|
|||
|
|
blockItemArr[i].curIdx = 0;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
/* block 排序堆 */
|
|||
|
|
struct BlockArrIdIdx {
|
|||
|
|
int arrId = 0;
|
|||
|
|
size_t arrIdx = 0; // 下一个待读入数据的idx
|
|||
|
|
const OneBlock *block = nullptr;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
struct BlockGreaterThan{
|
|||
|
|
bool operator()(const BlockArrIdIdx &a, const BlockArrIdIdx &b) const {
|
|||
|
|
return a.block->blockId > b.block->blockId;
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
/* 用来排序 */
|
|||
|
|
struct BlockHeap {
|
|||
|
|
vector<ThreadBlockArr> *arr2d;
|
|||
|
|
priority_queue<BlockArrIdIdx, vector<BlockArrIdIdx>, BlockGreaterThan> minHeap;
|
|||
|
|
size_t popNum = 0;
|
|||
|
|
|
|||
|
|
int Init(vector<ThreadBlockArr> *_arr2d) {
|
|||
|
|
arr2d = _arr2d;
|
|||
|
|
if (arr2d == nullptr) {
|
|||
|
|
return -1;
|
|||
|
|
}
|
|||
|
|
for (int i = 0; i < arr2d->size(); ++i) {
|
|||
|
|
auto &v = (*arr2d)[i];
|
|||
|
|
if (v.curIdx > 0) {
|
|||
|
|
minHeap.push({i, 1, &v.blockArr[0]});
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return 0;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const OneBlock *Pop() {
|
|||
|
|
const OneBlock *ret = nullptr;
|
|||
|
|
if (!minHeap.empty()) {
|
|||
|
|
auto minVal = minHeap.top();
|
|||
|
|
minHeap.pop();
|
|||
|
|
++popNum;
|
|||
|
|
ret = minVal.block;
|
|||
|
|
auto &v = (*arr2d)[minVal.arrId];
|
|||
|
|
if (v.curIdx > minVal.arrIdx) {
|
|||
|
|
minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v.blockArr[minVal.arrIdx]});
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return ret;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
size_t AllBlockBytes() {
|
|||
|
|
size_t bytes = 0;
|
|||
|
|
if (arr2d != nullptr) {
|
|||
|
|
for (auto &v : *arr2d) {
|
|||
|
|
for (int i = 0; i < v.curIdx; ++i) {
|
|||
|
|
bytes += v.blockArr[i].blockLen;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return bytes;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
size_t Size() {
|
|||
|
|
size_t len = 0;
|
|||
|
|
if (arr2d != nullptr) {
|
|||
|
|
for (auto &v : *arr2d) {
|
|||
|
|
len += v.curIdx;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return len - popNum;
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
/* for step-3 serial merge blocks and sort them */
|
|||
|
|
struct BamPointer {
|
|||
|
|
uint64_t offset = 0; // 地址偏移量
|
|||
|
|
uint32_t bamLen = 0;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
struct BamPtrArr {
|
|||
|
|
vector<BamPointer> bamArr;
|
|||
|
|
int curIdx = 0; //
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
struct MergeSortData {
|
|||
|
|
DataBuffer bamData; // 用来保存解压后的数据
|
|||
|
|
BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度
|
|||
|
|
MergeSortData() {
|
|||
|
|
bamPtrArr.bamArr.reserve(128);
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
/* 第一阶段的多线程流水线参数 */
|
|||
|
|
struct FirstPipeArg {
|
|||
|
|
static const int READ_BUF_NUM = 2; // 读入的buf数量
|
|||
|
|
static const int UNCOMPRESS_BUF_NUM = 2; // 解压的buf数量
|
|||
|
|
|
|||
|
|
int numThread = 0; // 线程数
|
|||
|
|
|
|||
|
|
uint64_t readOrder = 0; // 读取文件
|
|||
|
|
uint64_t uncompressOrder = 0; // 并行解压gz block
|
|||
|
|
uint64_t mergeSortOrder = 0; // 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序
|
|||
|
|
|
|||
|
|
volatile int readFinish = 0;
|
|||
|
|
volatile int uncompressFinish = 0;
|
|||
|
|
|
|||
|
|
yarn::lock_t *readSig;
|
|||
|
|
yarn::lock_t *uncompressSig;
|
|||
|
|
|
|||
|
|
ReadData readData[READ_BUF_NUM];
|
|||
|
|
UncompressData uncompressData[UNCOMPRESS_BUF_NUM];
|
|||
|
|
MergeSortData mergeSortData;
|
|||
|
|
|
|||
|
|
FirstPipeArg()
|
|||
|
|
{
|
|||
|
|
readSig = yarn::NEW_LOCK(0);
|
|||
|
|
uncompressSig = yarn::NEW_LOCK(0);
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
/* 第二阶段的多线程流水线参数 */
|
|||
|
|
struct PipeSecondArg
|
|||
|
|
{
|
|||
|
|
/* data */
|
|||
|
|
};
|