|
|
|
|
@ -160,6 +160,7 @@ static void mtUncompressBlock(void *data, long idx, int tid) {
|
|
|
|
|
uint32_t nextBamStart = 0;
|
|
|
|
|
uint32_t bamLen = 0;
|
|
|
|
|
uint32_t bamNum = 0;
|
|
|
|
|
PROF_G_BEG(mem_copy);
|
|
|
|
|
/* 解析每个bam */
|
|
|
|
|
while (nextBamStart + 4 <= blockItem.blockLen) {
|
|
|
|
|
bamItemArr.bamArr.push_back(OneBam());
|
|
|
|
|
@ -186,19 +187,80 @@ static void mtUncompressBlock(void *data, long idx, int tid) {
|
|
|
|
|
exit(0);
|
|
|
|
|
}
|
|
|
|
|
blockItem.bamNum = bamNum;
|
|
|
|
|
|
|
|
|
|
PROF_G_END(mem_copy);
|
|
|
|
|
|
|
|
|
|
// 判断是否超过内存阈值
|
|
|
|
|
if (blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE >= nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS) {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* 将gz block进行解压 */
|
|
|
|
|
// multi-thread uncompress bam blocks
|
|
|
|
|
static void mtUncompressBlockBatch(void* data, long idx, int tid) {
|
|
|
|
|
UncompressData& p = *(UncompressData*)data;
|
|
|
|
|
ReadData& readData = *p.readDataPtr;
|
|
|
|
|
|
|
|
|
|
int startIdx = START_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size());
|
|
|
|
|
int stopIdx = STOP_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size());
|
|
|
|
|
|
|
|
|
|
auto& blockItemArr = p.blockItemArr[tid];
|
|
|
|
|
|
|
|
|
|
if (blockItemArr.curIdx + (stopIdx - startIdx) >= blockItemArr.blockArr.size()) {
|
|
|
|
|
blockItemArr.blockArr.resize(blockItemArr.curIdx + (stopIdx - startIdx));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int i = startIdx; i < stopIdx; ++i) {
|
|
|
|
|
auto& blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
|
|
|
|
|
|
|
|
|
|
uint8_t* block = readData.startAddrArr[i];
|
|
|
|
|
|
|
|
|
|
size_t dlen = SINGLE_BLOCK_SIZE; // 65535
|
|
|
|
|
int block_length = unpackInt16(&block[16]) + 1;
|
|
|
|
|
uint32_t crc = le_to_u32(block + block_length - 8);
|
|
|
|
|
int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef*)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc);
|
|
|
|
|
if (ret != 0) {
|
|
|
|
|
spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret);
|
|
|
|
|
exit(0);
|
|
|
|
|
}
|
|
|
|
|
blockItem.blockId = i;
|
|
|
|
|
blockItem.blockLen = dlen;
|
|
|
|
|
uint32_t nextBamStart = 0;
|
|
|
|
|
uint32_t bamLen = 0;
|
|
|
|
|
uint32_t bamNum = 0;
|
|
|
|
|
PROF_G_BEG(mem_copy);
|
|
|
|
|
// memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen);
|
|
|
|
|
/* 解析每个bam */
|
|
|
|
|
while (nextBamStart + 4 <= blockItem.blockLen) {
|
|
|
|
|
memcpy(&bamLen, &blockItem.data[nextBamStart], 4);
|
|
|
|
|
nextBamStart += 4 + bamLen;
|
|
|
|
|
// spdlog::info("bam len: {}", bamLen);
|
|
|
|
|
++bamNum;
|
|
|
|
|
}
|
|
|
|
|
blockItem.bamNum = bamNum;
|
|
|
|
|
PROF_G_END(mem_copy);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* 将gz block进行解压,并进行线程内排序 */
|
|
|
|
|
static void doFirstPipeUncompress(FirstPipeArg &p) {
|
|
|
|
|
// return;
|
|
|
|
|
PROF_G_BEG(uncompress);
|
|
|
|
|
ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM];
|
|
|
|
|
UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM];
|
|
|
|
|
uncompressData.readDataPtr = &readData;
|
|
|
|
|
uncompressData.ResetBlockArr();
|
|
|
|
|
uncompressData.ResetBamArr();
|
|
|
|
|
|
|
|
|
|
#if 1
|
|
|
|
|
kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size());
|
|
|
|
|
#else
|
|
|
|
|
kt_for(p.numThread, mtUncompressBlockBatch, &uncompressData, nsgv::gSortArg.NUM_THREADS);
|
|
|
|
|
#endif
|
|
|
|
|
//spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx,
|
|
|
|
|
// uncompressData.blockItemArr[0].blockArr.size());
|
|
|
|
|
|
|
|
|
|
// 判断是否超过内存阈值
|
|
|
|
|
// auto &bam = uncompressData.bamItemArr[0].bamArr.back();
|
|
|
|
|
// string qname(bam.qnameAddr, bam.qnameLen);
|
|
|
|
|
// spdlog::info("bam name:{}", qname);
|
|
|
|
|
@ -217,6 +279,7 @@ static void *firstPipeUncompress(void *data) {
|
|
|
|
|
// self dependency
|
|
|
|
|
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
|
|
|
|
|
|
|
|
|
|
PROF_G_BEG(uncompress);
|
|
|
|
|
if (p.readFinish) {
|
|
|
|
|
while (p.uncompressOrder < p.readOrder) {
|
|
|
|
|
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
|
|
|
|
|
@ -228,6 +291,7 @@ static void *firstPipeUncompress(void *data) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
doFirstPipeUncompress(p);
|
|
|
|
|
PROF_G_END(uncompress);
|
|
|
|
|
|
|
|
|
|
// update status
|
|
|
|
|
yarn::CONSUME_SIGNAL(p.readSig);
|
|
|
|
|
@ -290,7 +354,7 @@ static void *firstPipeMergeSort(void *data) {
|
|
|
|
|
return nullptr;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* 对bam文件进行排序 */
|
|
|
|
|
/* 对bam文件进行排序第一阶段,线程内排序,线程间merge,输入到中间bam文件 */
|
|
|
|
|
static void bamSortFirstPipe() {
|
|
|
|
|
/* set up*/
|
|
|
|
|
FirstPipeArg firstPipeArg;
|
|
|
|
|
@ -351,16 +415,18 @@ static void samSortFirstPipe() {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// entry function
|
|
|
|
|
int doSort() {
|
|
|
|
|
nsgv::gIsBigEndian = ed_is_big();
|
|
|
|
|
|
|
|
|
|
#if 1
|
|
|
|
|
nsgv::gIsBigEndian = ed_is_big();
|
|
|
|
|
bamSortFirstPipe();
|
|
|
|
|
|
|
|
|
|
spdlog::info("OneBam size: {}", sizeof(OneBam));
|
|
|
|
|
//bamSortSerialFirstPipe();
|
|
|
|
|
#else
|
|
|
|
|
|
|
|
|
|
#if 0
|
|
|
|
|
/* 打开输入bam文件 */
|
|
|
|
|
samFile *inBamFp = sam_open_format(nsgv::gSortArg.INPUT_FILE.c_str(), "r", nullptr);
|
|
|
|
|
if (!inBamFp) {
|
|
|
|
|
|