Compare commits

...

2 Commits

2 changed files with 75 additions and 5 deletions

View File

@ -160,6 +160,7 @@ static void mtUncompressBlock(void *data, long idx, int tid) {
uint32_t nextBamStart = 0;
uint32_t bamLen = 0;
uint32_t bamNum = 0;
PROF_G_BEG(mem_copy);
/* 解析每个bam */
while (nextBamStart + 4 <= blockItem.blockLen) {
bamItemArr.bamArr.push_back(OneBam());
@ -186,19 +187,80 @@ static void mtUncompressBlock(void *data, long idx, int tid) {
exit(0);
}
blockItem.bamNum = bamNum;
PROF_G_END(mem_copy);
// 判断是否超过内存阈值
if (blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE >= nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS) {
}
}
/* 将gz block进行解压 */
// multi-thread uncompress bam blocks
static void mtUncompressBlockBatch(void* data, long idx, int tid) {
UncompressData& p = *(UncompressData*)data;
ReadData& readData = *p.readDataPtr;
int startIdx = START_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size());
int stopIdx = STOP_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size());
auto& blockItemArr = p.blockItemArr[tid];
if (blockItemArr.curIdx + (stopIdx - startIdx) >= blockItemArr.blockArr.size()) {
blockItemArr.blockArr.resize(blockItemArr.curIdx + (stopIdx - startIdx));
}
for (int i = startIdx; i < stopIdx; ++i) {
auto& blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
uint8_t* block = readData.startAddrArr[i];
size_t dlen = SINGLE_BLOCK_SIZE; // 65535
int block_length = unpackInt16(&block[16]) + 1;
uint32_t crc = le_to_u32(block + block_length - 8);
int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef*)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc);
if (ret != 0) {
spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret);
exit(0);
}
blockItem.blockId = i;
blockItem.blockLen = dlen;
uint32_t nextBamStart = 0;
uint32_t bamLen = 0;
uint32_t bamNum = 0;
PROF_G_BEG(mem_copy);
// memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen);
/* 解析每个bam */
while (nextBamStart + 4 <= blockItem.blockLen) {
memcpy(&bamLen, &blockItem.data[nextBamStart], 4);
nextBamStart += 4 + bamLen;
// spdlog::info("bam len: {}", bamLen);
++bamNum;
}
blockItem.bamNum = bamNum;
PROF_G_END(mem_copy);
}
}
/* 将gz block进行解压并进行线程内排序 */
static void doFirstPipeUncompress(FirstPipeArg &p) {
// return;
PROF_G_BEG(uncompress);
ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM];
UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM];
uncompressData.readDataPtr = &readData;
uncompressData.ResetBlockArr();
uncompressData.ResetBamArr();
#if 1
kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size());
#else
kt_for(p.numThread, mtUncompressBlockBatch, &uncompressData, nsgv::gSortArg.NUM_THREADS);
#endif
//spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx,
// uncompressData.blockItemArr[0].blockArr.size());
// 判断是否超过内存阈值
// auto &bam = uncompressData.bamItemArr[0].bamArr.back();
// string qname(bam.qnameAddr, bam.qnameLen);
// spdlog::info("bam name:{}", qname);
@ -217,6 +279,7 @@ static void *firstPipeUncompress(void *data) {
// self dependency
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
PROF_G_BEG(uncompress);
if (p.readFinish) {
while (p.uncompressOrder < p.readOrder) {
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
@ -228,6 +291,7 @@ static void *firstPipeUncompress(void *data) {
}
doFirstPipeUncompress(p);
PROF_G_END(uncompress);
// update status
yarn::CONSUME_SIGNAL(p.readSig);
@ -290,7 +354,7 @@ static void *firstPipeMergeSort(void *data) {
return nullptr;
}
/* 对bam文件进行排序 */
/* 对bam文件进行排序第一阶段线程内排序线程间merge输入到中间bam文件 */
static void bamSortFirstPipe() {
/* set up*/
FirstPipeArg firstPipeArg;
@ -351,16 +415,18 @@ static void samSortFirstPipe() {
}
// entry function
int doSort() {
nsgv::gIsBigEndian = ed_is_big();
#if 1
nsgv::gIsBigEndian = ed_is_big();
bamSortFirstPipe();
spdlog::info("OneBam size: {}", sizeof(OneBam));
//bamSortSerialFirstPipe();
#else
#if 0
/* 打开输入bam文件 */
samFile *inBamFp = sam_open_format(nsgv::gSortArg.INPUT_FILE.c_str(), "r", nullptr);
if (!inBamFp) {

View File

@ -12,6 +12,9 @@
using std::priority_queue;
using std::vector;
#define START_IDX(i, nt, nele) ((i) * (nele) / (nt))
#define STOP_IDX(i, nt, nele) (((i)+1) * (nele) / (nt))
/* for step-1 read data from bam file */
struct ReadData {
uint8_t *dataBuf = nullptr;
@ -36,7 +39,7 @@ struct ReadData {
blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
}
};
/* for step-2 parallel uncompress gz blocks */
/* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序*/
struct OneBam {
uint16_t bamLen = 0;
uint16_t qnameLen = 0;
@ -189,6 +192,7 @@ struct FirstPipeArg {
volatile int readFinish = 0;
volatile int uncompressFinish = 0;
volatile int uncompressBufFull = 0;
yarn::lock_t *readSig;
yarn::lock_t *uncompressSig;