添加了串行解析线程

This commit is contained in:
zzh 2025-04-23 22:08:40 +08:00
parent 9b2c35eaeb
commit 0d2c533d59
1 changed files with 143 additions and 118 deletions

View File

@ -25,6 +25,7 @@ namespace nsgv {
SortArg gSortArg; // 参数
HeaderBuf gInHdr; // 输入文件的header
bool gIsBigEndian;
DataBuffer gMarginBuf; // 用于存放跨两个gz block的解压数据
}; // namespace nsgv
@ -38,11 +39,40 @@ struct BamSortData {
char *qname; // pointer to qname
};
struct BamPointer {
uint64_t offset = 0; // 地址偏移量
uint32_t bamLen = 0;
};
struct BamPtrArr {
vector<BamPointer> bamArr;
int curIdx = 0; //
};
struct OneBlock {
uint8_t data[SINGLE_BLOCK_SIZE];
uint32_t blockLen = 0; // 解压后的数据长度
uint64_t blockId = 0; // 按照顺序排列的block ID
};
struct ThreadBlockArr {
vector<OneBlock> blockArr; // 解压后的数据
int curIdx = 0; // 当前解压数据对应的vector的索引
};
// thread data for sorting
struct SortData {
vector<vector<BamSortData>> bamArr; // for each thread
vector<DataBuffer> uData; // for each thread uncompressed data
vector<uint8_t *> *blockAddrArr;
/* 用在多线程解压中的数据 */
vector<uint8_t *> *blockAddrArr; // 等待解压的gz block
vector<ThreadBlockArr> blockItemArr; // 每个thread一个用来保存解压后的block数据
volatile int uncompressFinish = 0; // 是否解压完成
/* 单线程解析用到的数据 */
DataBuffer bamData; // 解压后的数据存放在连续的地址空间中
BamPtrArr bamPtrArr; // 每个bam对应的解压数据起始地址和长度
int round = 0;
};
@ -188,101 +218,110 @@ int parseBam(uint8_t *data, bam1_t *b) {
// multi-thread uncompress bam blocks
static void mtUncompressBlock(void *data, long idx, int tid) {
auto &p = *(SortData*) data;
auto &bamArr = p.bamArr[tid];
auto &udata = p.uData[tid];
auto &blockItemArr = p.blockItemArr[tid];
// if (udata.maxLen < nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS) {
// udata.allocMem(nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS);
// }
// return;
// spdlog::info("start: {} end: {} size: {} idx: {}, tid: {}", start, end, p.blockAddrArr->size(), idx, tid);
long start = idx;
int step = nsgv::gSortArg.NUM_THREADS;
// long stop = p.blockAddrArr->size();
long stop = idx + 1;
// int chunkSize = (p.blockAddrArr->size() + nsgv::gSortArg.NUM_THREADS - 1) / nsgv::gSortArg.NUM_THREADS;
// long start = idx * chunkSize;
// int step = 1;
// long stop = std::min((idx + 1) * chunkSize, (long)p.blockAddrArr->size());
uint8_t buf[65535];
// if (tid == 0 ) spdlog::info("round:{} tid:{} start:{} stop:{}", p.round, tid, start, stop);
for (long i = start; i < stop; i += step) {
uint8_t *block = (*p.blockAddrArr)[i];
size_t dlen = 0x10000; // 65535
if (udata.maxLen - udata.curLen < dlen) {
// spdlog::info("re alloc buf, maxLen: {} M", udata.maxLen / 1024 / 1024);
udata.maxLen += INCREASE_SIZE;
udata.data = (uint8_t *)realloc(udata.data, udata.maxLen);
}
int block_length = unpackInt16(&block[16]) + 1;
uint32_t crc = le_to_u32(block + block_length - 8);
int ret = bgzfUncompress(buf, &dlen, (Bytef *)block + 18, block_length - 18, crc);
// spdlog::info("BamSortData size: {}, bam size: {}", sizeof(BamSortData), *(uint32_t *)(udata.data +
// udata.curLen));
// spdlog::info("i: {} slen: {} dlen: {}", i, block_length, dlen);
// 不用对bam进行完全解析反正只需要排序就行
// int pos = 0;
// uint8_t *ud = udata.data + udata.curLen;
// bam1_t *bp = bam_init1();
//while (pos < dlen) {
// bamArr.push_back(BamSortData());
// BamSortData &b = bamArr.back();
// b.bamLen = *(uint32_t *)(buf + pos);
// // // spdlog::info("bamLen: {}", b.bamLen);
// pos += 4 + b.bamLen;
// // if (b.bamLen > 1000)
// // spdlog::info("tid: {} record len: {}, dlen: {}", tid, b.bamLen, dlen);
// // exit(0);
// // int bamLen = parseBam(buf, bp);
// // exit(0);
// // spdlog::info("bamLen: {}", b.bamLen);
// // pos += bamLen;
//}
// exit(0);
// usleep(1);
PROF_T_BEG(mem_copy);
memcpy(udata.data + udata.curLen, buf, dlen);
PROF_T_END(tid, mem_copy);
udata.curLen += dlen * 1;
if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) {
blockItemArr.blockArr.push_back(OneBlock());
}
}
// multi-thread sort bam blocks
static void mtSortBlocks(void *data, long idx, int tid) {
auto &p = *(SortData *)data;
auto &bamArr = p.bamArr[tid];
auto &uData = p.uData[tid];
auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
uint8_t *block = (*p.blockAddrArr)[idx];
size_t dlen = SINGLE_BLOCK_SIZE; // 65535
int block_length = unpackInt16(&block[16]) + 1;
uint32_t crc = le_to_u32(block + block_length - 8);
int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef *)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc);
blockItem.blockId = idx;
blockItem.blockLen = dlen;
}
static void mergeSortedBlocks(const SortData &sortData) {
}
static void *nonBlockingUncompress(void *data) {
PROF_G_BEG(uncompress);
if (((SortData *)data)->round % 10 == 0)
spdlog::info("round-{} block size: {}", ((SortData *)data)->round++, ((SortData *)data)->blockAddrArr->size());
else
((SortData *)data)->round++;
static void *threadMergeParseBlocks(void *data) {
return nullptr;
// spdlog::info("block arr size: {}", ((SortData *)data)->blockAddrArr->size());
kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, data, ((SortData *)data)->blockAddrArr->size());
// kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, data, nsgv::gSortArg.NUM_THREADS);
PROF_G_END(uncompress);
auto &p = *(SortData *)data;
DataBuffer &buf = p.bamData;
int idx = 0;
int tIdx[nsgv::gSortArg.NUM_THREADS] = {0};
while (!p.uncompressFinish) {
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
auto &blockItemArr = p.blockItemArr[t];
if (tIdx[t] < blockItemArr.curIdx && blockItemArr.blockArr[tIdx[t]].blockId == idx) {
//spdlog::info("1. thread-{} arr idx: {}, block id: {}, size: {}, idx: {}", t, tIdx[t],
// blockItemArr.blockArr[tIdx[t]].blockId, blockItemArr.curIdx, idx);
++tIdx[t];
++idx;
break;
}
}
}
// 处理结尾数据
int totalBlockNum = 0;
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
auto &blockItemArr = p.blockItemArr[t];
totalBlockNum += blockItemArr.curIdx;
}
/* spdlog::info("here, idx: {}, total: {}", idx, totalBlockNum);
while (idx < totalBlockNum) {
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
auto &blockItemArr = p.blockItemArr[t];
// spdlog::info("t:{}, tidx:{}, block id:{}, idx:{}", t, tIdx[t], blockItemArr.blockArr[tIdx[t]].blockId, idx);
if (tIdx[t] < blockItemArr.curIdx && blockItemArr.blockArr[tIdx[t]].blockId == idx) {
//spdlog::info("2. thread-{} block id: {}, size: {}", t, tIdx[t], blockItemArr.curIdx);
++tIdx[t];
++idx;
break;
}
}
// break;
}
*/
//spdlog::info("threadMergeParseBlocks exit, idx: {}", idx);
// exit(0);
return nullptr;
}
static size_t uncommpressedDataSize(const SortData &sortData) {
size_t curSize = 0;
for (int i = 0; i < sortData.uData.size(); ++i) {
curSize += sortData.uData[i].curLen;
static void *nonBlockingUncompress(void *data) {
PROF_G_BEG(uncompress);
pthread_t parseTid = 0;
if (((SortData *)data)->round % 10 == 0)
spdlog::info("round-{} block size: {}", ((SortData *)data)->round, ((SortData *)data)->blockAddrArr->size());
SortData &sortData = *(SortData *)data;
for (auto &arr: sortData.blockItemArr) arr.curIdx = 0;
sortData.uncompressFinish = 0;
// 1. 开启一个线程,用来将每个线程解压的数据放到一起
pthread_create(&parseTid, NULL, threadMergeParseBlocks, &sortData);
// 2. 调用并行解压的线程函数
kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, data, ((SortData *)data)->blockAddrArr->size());
sortData.uncompressFinish = 1;
int totalBlockNum = 0;
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
auto &blockItemArr = sortData.blockItemArr[t];
totalBlockNum += blockItemArr.curIdx;
}
return curSize;
// spdlog::info("uncompress finish, total block num: {}", totalBlockNum);
// 3. 等待负责数据合并的线程结束
pthread_join(parseTid, NULL);
// 4. 如果当前读入的数据超过设定的参数,则进行一次排序
PROF_G_END(uncompress);
// for (auto &arr : sortData.blockItemArr) spdlog::info("block size: {}", arr.curIdx);
((SortData *)data)->round++;
return nullptr;
}
template <typename T>
@ -292,22 +331,6 @@ static void switchPointer(T *p1, T *p2) {
*p2 = tmp;
}
static size_t threadMemSize(SortData &sortData) {
size_t memSize = 0;
for (int i = 0; i < sortData.uData.size(); ++i) {
memSize += sortData.uData[i].curLen;
}
return memSize;
}
static size_t threadMaxMemSize(SortData &sortData) {
size_t memSize = 0;
for (int i = 0; i < sortData.uData.size(); ++i) {
memSize += sortData.uData[i].maxLen;
}
return memSize;
}
void *threadRead(void *data) {
spdlog::info("test file start");
FILE *fp = (FILE*) data;
@ -322,6 +345,8 @@ void *threadRead(void *data) {
return NULL;
}
// entry function
int doSort() {
#if 0
@ -358,13 +383,23 @@ int doSort() {
return 0;
#endif
// 初始化算法用到的数据结构
SortData sortData;
sortData.bamArr.resize(nsgv::gSortArg.NUM_THREADS);
sortData.uData.resize(nsgv::gSortArg.NUM_THREADS);
for (int i = 0; i < nsgv::gSortArg.NUM_THREADS; ++i) {
sortData.uData[i].allocMem(nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS);
//sortData.bamArr.resize(nsgv::gSortArg.NUM_THREADS);
//sortData.uData.resize(nsgv::gSortArg.NUM_THREADS);
//for (int i = 0; i < nsgv::gSortArg.NUM_THREADS; ++i) {
// sortData.uData[i].allocMem(nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS);
//}
sortData.blockItemArr.resize(nsgv::gSortArg.NUM_THREADS);
for(auto &arr: sortData.blockItemArr) {
arr.blockArr.resize(1000);
}
// sortData.bamPtrArr.resize(nsgv::gSortArg.NUM_THREADS);
nsgv::gIsBigEndian = ed_is_big();
// 打开文件
@ -437,7 +472,7 @@ int doSort() {
memcpy(&curBlock[lastBufRemain], &curBuf[0], curReadPos); // 将不完整的block剩余数据拷贝到curBlock
curStartAddrArr->push_back(&curBlock[0]);
}
/* 解析读入buf中的文件数据计算包含的每个block的长度信息和起始地址 */
while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */
blockLen = unpackInt16(&curBuf[curReadPos + 16]) + 1;
if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */
@ -447,11 +482,12 @@ int doSort() {
break; /* 当前block数据不完整一部分在还没读入的file数据里 */
}
}
/* 如果buf中包含不完整的block数据先保存一下放到下一轮里去处理 */
lastBufRemain = readState - curReadPos;
// spdlog::info("lastBufRemain: {}", lastBufRemain);
if (lastBufRemain > 0) {
memcpy(halfBlock, &curBuf[curReadPos], lastBufRemain); // 将不完整的block拷贝到halfBlock
}
// spdlog::info("lastBufRemain: {}", lastBufRemain);
totalBlocks += curStartAddrArr->size();
PROF_G_END(parse_block);
@ -460,22 +496,10 @@ int doSort() {
// 并行解压
PROF_G_BEG(sort);
pthread_join(uncompressTid, NULL);
{
// 如果sortData的解压buf满了那么进行排序并输出
if (true) {
//if (threadMemSize(sortData) + MARGIN_SIZE >= nsgv::gSortArg.MAX_MEM) {
// spdlog::info("clear buf data, buf size: {} G, {} G", threadMemSize(sortData) / 1024.0 / 1024 / 1024, threadMaxMemSize(sortData) / 1024.0 / 1024 / 1024);
for (int i = 0; i < sortData.uData.size(); ++i) {
sortData.uData[i].curLen = 0; // 清空数据
sortData.bamArr[i].clear();
}
}
}
PROF_G_END(sort);
sortData.blockAddrArr = curStartAddrArr;
pthread_create(&uncompressTid, NULL, nonBlockingUncompress, &sortData);
//nonBlockingUncompress(&sortData);
#endif
// 交换buf指针
@ -498,6 +522,7 @@ err:
free(fbuf[1]);
free(fbuf[2]);
free(fbuf[3]);
free(fbuf[4]);
fclose(fpr);
// fclose(fpw);