两种不同的多线程数据分配策略
This commit is contained in:
parent
dbe57f345c
commit
3560b5a504
|
|
@ -391,19 +391,75 @@ static void mtUncompressBlock(void *data, long idx, int tid) {
|
|||
}
|
||||
blockItem.bamNum = bamNum;
|
||||
PROF_G_END(mem_copy);
|
||||
|
||||
// 判断是否超过内存阈值
|
||||
if (blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE >= nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/* 将gz block进行解压 */
|
||||
// multi-thread uncompress bam blocks
|
||||
static void mtUncompressBlockBatch(void* data, long idx, int tid) {
|
||||
UncompressData& p = *(UncompressData*)data;
|
||||
ReadData& readData = *p.readDataPtr;
|
||||
|
||||
int startIdx = START_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size());
|
||||
int stopIdx = STOP_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size());
|
||||
|
||||
auto& blockItemArr = p.blockItemArr[tid];
|
||||
|
||||
if (blockItemArr.curIdx + (stopIdx - startIdx) >= blockItemArr.blockArr.size()) {
|
||||
blockItemArr.blockArr.resize(blockItemArr.curIdx + (stopIdx - startIdx));
|
||||
}
|
||||
|
||||
for (int i = startIdx; i < stopIdx; ++i) {
|
||||
auto& blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
|
||||
|
||||
uint8_t* block = readData.startAddrArr[i];
|
||||
|
||||
size_t dlen = SINGLE_BLOCK_SIZE; // 65535
|
||||
int block_length = unpackInt16(&block[16]) + 1;
|
||||
uint32_t crc = le_to_u32(block + block_length - 8);
|
||||
int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef*)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc);
|
||||
if (ret != 0) {
|
||||
spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret);
|
||||
exit(0);
|
||||
}
|
||||
blockItem.blockId = i;
|
||||
blockItem.blockLen = dlen;
|
||||
uint32_t nextBamStart = 0;
|
||||
uint32_t bamLen = 0;
|
||||
uint32_t bamNum = 0;
|
||||
PROF_G_BEG(mem_copy);
|
||||
// memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen);
|
||||
/* 解析每个bam */
|
||||
while (nextBamStart + 4 <= blockItem.blockLen) {
|
||||
memcpy(&bamLen, &blockItem.data[nextBamStart], 4);
|
||||
nextBamStart += 4 + bamLen;
|
||||
// spdlog::info("bam len: {}", bamLen);
|
||||
++bamNum;
|
||||
}
|
||||
blockItem.bamNum = bamNum;
|
||||
PROF_G_END(mem_copy);
|
||||
}
|
||||
}
|
||||
|
||||
/* 将gz block进行解压,并进行线程内排序 */
|
||||
static void doFirstPipeUncompress(FirstPipeArg &p) {
|
||||
// return;
|
||||
ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM];
|
||||
UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM];
|
||||
uncompressData.readDataPtr = &readData;
|
||||
uncompressData.ResetBlockArr();
|
||||
|
||||
#if 1
|
||||
kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size());
|
||||
#else
|
||||
kt_for(p.numThread, mtUncompressBlockBatch, &uncompressData, nsgv::gSortArg.NUM_THREADS);
|
||||
#endif
|
||||
//spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx,
|
||||
// uncompressData.blockItemArr[0].blockArr.size());
|
||||
// 判断是否超过内存阈值
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -418,6 +474,7 @@ static void *firstPipeUncompress(void *data) {
|
|||
// self dependency
|
||||
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
|
||||
|
||||
PROF_G_BEG(uncompress);
|
||||
if (p.readFinish) {
|
||||
while (p.uncompressOrder < p.readOrder) {
|
||||
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
|
||||
|
|
@ -429,6 +486,7 @@ static void *firstPipeUncompress(void *data) {
|
|||
}
|
||||
|
||||
doFirstPipeUncompress(p);
|
||||
PROF_G_END(uncompress);
|
||||
|
||||
// update status
|
||||
yarn::CONSUME_SIGNAL(p.readSig);
|
||||
|
|
@ -495,7 +553,7 @@ static void *firstPipeMergeSort(void *data) {
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
/* 对bam文件进行排序 */
|
||||
/* 对bam文件进行排序第一阶段,线程内排序,线程间merge,输入到中间bam文件 */
|
||||
static void bamSortFirstPipe() {
|
||||
/* set up*/
|
||||
FirstPipeArg firstPipeArg;
|
||||
|
|
@ -549,20 +607,16 @@ static void bamSortSerialFirstPipe() {
|
|||
fclose(fpr);
|
||||
}
|
||||
|
||||
/* 对sam文件进行排序 */
|
||||
static void samSortFirstPipe() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
// entry function
|
||||
int doSort() {
|
||||
|
||||
#if 1
|
||||
nsgv::gIsBigEndian = ed_is_big();
|
||||
bamSortFirstPipe();
|
||||
// //bamSortSerialFirstPipe();
|
||||
return 0;
|
||||
|
||||
#if 1
|
||||
#else
|
||||
/* 打开输入bam文件 */
|
||||
samFile *inBamFp = sam_open_format(nsgv::gSortArg.INPUT_FILE.c_str(), "r", nullptr);
|
||||
if (!inBamFp) {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,9 @@
|
|||
using std::priority_queue;
|
||||
using std::vector;
|
||||
|
||||
#define START_IDX(i, nt, nele) ((i) * (nele) / (nt))
|
||||
#define STOP_IDX(i, nt, nele) (((i)+1) * (nele) / (nt))
|
||||
|
||||
/* for step-1 read data from bam file */
|
||||
struct ReadData {
|
||||
uint8_t *dataBuf = nullptr;
|
||||
|
|
@ -36,7 +39,7 @@ struct ReadData {
|
|||
blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
|
||||
}
|
||||
};
|
||||
/* for step-2 parallel uncompress gz blocks */
|
||||
/* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序*/
|
||||
struct OneBlock {
|
||||
uint8_t data[SINGLE_BLOCK_SIZE];
|
||||
uint32_t blockLen = 0; // 解压后的数据长度
|
||||
|
|
@ -174,6 +177,7 @@ struct FirstPipeArg {
|
|||
|
||||
volatile int readFinish = 0;
|
||||
volatile int uncompressFinish = 0;
|
||||
volatile int uncompressBufFull = 0;
|
||||
|
||||
yarn::lock_t *readSig;
|
||||
yarn::lock_t *uncompressSig;
|
||||
|
|
|
|||
Loading…
Reference in New Issue