From 9b2c35eaebd9577cc46696a15699a6f58fd2c2de Mon Sep 17 00:00:00 2001 From: zzh Date: Tue, 22 Apr 2025 00:38:38 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E6=9E=90header=E5=AE=8C=E6=95=B4?= =?UTF-8?q?=E4=BA=86=EF=BC=8C=E8=AF=BB=E5=8F=96=E6=96=87=E4=BB=B6=E5=B9=B6?= =?UTF-8?q?=E8=A7=A3=E6=9E=90gz=20block=E7=9A=84=E6=A1=86=E6=9E=B6?= =?UTF-8?q?=E6=AD=A3=E7=A1=AE=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- code.bak | 44 +++++++++++++++++ src/sort/const_val.h | 16 ++++++ src/sort/sam_io.cpp | 84 ++++++++++++++++++------------- src/sort/sam_io.h | 13 +---- src/sort/sort.cpp | 115 ++++++++++++++----------------------------- 5 files changed, 149 insertions(+), 123 deletions(-) create mode 100644 code.bak create mode 100644 src/sort/const_val.h diff --git a/code.bak b/code.bak new file mode 100644 index 0000000..4e92421 --- /dev/null +++ b/code.bak @@ -0,0 +1,44 @@ +// if (lastBufRemain < BLOCK_HEADER_LENGTH) { // 上次遗留的数据不够解析block 长度 +// memcpy(&preBlock[lastBufRemain], curBuf, BLOCK_HEADER_LENGTH); +// curReadPos += BLOCK_HEADER_LENGTH; +// lastBufRemain += BLOCK_HEADER_LENGTH; +// } +// blockLen = unpackInt16(&preBlock[16]) + 1; +// const int lenToRead = blockLen - lastBufRemain; +// memcpy(&preBlock[lastBufRemain], &curBuf[curReadPos], lenToRead); +// curReadPos += lenToRead; +// curStartAddrArr->push_back(&preBlock[0]); + + // curStartAddrArr->push_back(&curBuf[0]); + // spdlog::info("first blocklen: {}, remain: {}, toread: {}", blockLen, lastBufRemain, lenToRead); + // uint8_t buf[65535]; + // int block_length = unpackInt16(&preBlock[16]) + 1; + // uint32_t crc = le_to_u32(preBlock + block_length - 8); + // size_t dlen = 65535; + // int ret = bgzf_uncompress(buf, &dlen, (Bytef *)preBlock + 18, block_length - 18, crc); // 开头是header,得跳过 + // int bamLen = *(uint32_t *)(buf); + // spdlog::info("ret: {}, bamLen: {}, dlen: {}, block_length: {}", ret, bamLen, dlen, block_length); + // exit(0); + + // spdlog::info("read block len: {}", blockLen); + + // spdlog::info("cur block size: {}", curStartAddrArr->size()); + + PROF_G_BEG(read); + readState = fread(fbuf[4], 1, READ_BUFSIZE, fpr); + readState = fread(curBuf, 1, READ_BUFSIZE, fpr); + PROF_G_BEG(mem_copy); + memcpy(curBuf, fbuf[4], READ_BUFSIZE); + PROF_G_END(mem_copy); + PROF_G_END(read); + + // kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, &sortData, curStartAddrArr->size() / BATCH_SIZE); + // if (uncommpressedDataSize(sortData) >= nsgv::gSortArg.MAX_MEM) { + // kt_for(nsgv::gSortArg.NUM_THREADS, mtSortBlocks, &sortData, nsgv::gSortArg.NUM_THREADS); + // mergeSortedBlocks(sortData); + // } + + //setvbuf(fpr, (char*)fbuf[4], _IOFBF, 0); + //setvbuf(fpr, (char *)fbuf[4], _IONBF, READ_BUFSIZE); + + // const size_t READ_BUFSIZE = 8L * 1024 * 1024; \ No newline at end of file diff --git a/src/sort/const_val.h b/src/sort/const_val.h new file mode 100644 index 0000000..0118cff --- /dev/null +++ b/src/sort/const_val.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +// 单个block gz的最大解压大小 +#define SINGLE_BLOCK_SIZE 65535 + +// bam header字节数量 +#ifndef BLOCK_HEADER_LENGTH +#define BLOCK_HEADER_LENGTH 18 +#endif + +// bam footer字节数量 +#ifndef BLOCK_FOOTER_LENGTH +#define BLOCK_FOOTER_LENGTH 8 +#endif \ No newline at end of file diff --git a/src/sort/sam_io.cpp b/src/sort/sam_io.cpp index 775ca72..281d45b 100644 --- a/src/sort/sam_io.cpp +++ b/src/sort/sam_io.cpp @@ -7,6 +7,7 @@ #include #include +#include "const_val.h" namespace nsgv { extern bool gIsBigEndian; @@ -40,50 +41,59 @@ int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen, return 0; } -void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf) { - const int kMaxBlockSize = 65535; +// 读取并解压一个gz block +size_t readUncompressOneBlock(FILE *fpr, uint8_t *fBuf, DataBuffer *uDataPtr) { + DataBuffer &uData = *uDataPtr; + size_t readState = 0; + int blockLen = 0; // 每个gz block的大小 + size_t dlen = SINGLE_BLOCK_SIZE; // gz block解压之后的真正大小 + readState = fread(fBuf, 1, BLOCK_HEADER_LENGTH, fpr); // 先读取一个gz block的头部 + blockLen = unpackInt16(&fBuf[16]) + 1; // block的字节数 + readState = fread(&fBuf[BLOCK_HEADER_LENGTH], 1, blockLen - BLOCK_HEADER_LENGTH, fpr); + // 解压gz block + uint32_t crc = le_to_u32(fBuf + blockLen - 8); + size_t newDataSize = uData.maxLen; + while (uData.curLen + SINGLE_BLOCK_SIZE > newDataSize) newDataSize *= 2; + uData.reAllocMem(newDataSize); // 需要重新开辟空间 + int ret = bgzfUncompress(&uData.data[uData.curLen], &dlen, (Bytef *)fBuf + BLOCK_HEADER_LENGTH, + blockLen - BLOCK_HEADER_LENGTH, crc); + if (ret < 0) { + spdlog::error("Error decompressing gz block"); + exit(0); + } + uData.curLen += dlen; + spdlog::info("gz block size: {}, uncompressed size: {}", blockLen, dlen); + return readState; +} + +// 解析sam头部信息 +void parseSamHeader(FILE *fpr, HeaderBuf &hdrBuf) { + const int kMaxBlockSize = SINGLE_BLOCK_SIZE; sam_hdr_t *header = NULL; uint8_t *fBuf = (uint8_t*)malloc(kMaxBlockSize); - DataBuffer uData; - size_t readState = 0; - int blockLen = 0; - size_t dlen = 0x10000; // 65535 + DataBuffer uData; // 用来放解压缩后的数据,待解析 int magicLen; int32_t i, nameLen, numNames = 0; - size_t bufsize; - ssize_t bytes; - uData.allocMem(kMaxBlockSize); + header = sam_hdr_init(); // 初始化header + uData.allocMem(kMaxBlockSize); // 初始化解压数据的buffer + readUncompressOneBlock(fpr, fBuf, &uData); // 读取第一个gz block - readState = fread(fBuf, 1, BLOCK_HEADER_LENGTH, fp); - blockLen = unpackInt16(&fBuf[16]) + 1; - readState = fread(&fBuf[BLOCK_HEADER_LENGTH], 1, blockLen - BLOCK_HEADER_LENGTH, fp); - header = sam_hdr_init(); - spdlog::info("Header file size: {}", blockLen); - - uint32_t crc = le_to_u32(fBuf + blockLen - 8); - int ret = bgzfUncompress(uData.data, &dlen, (Bytef *)fBuf + 18, blockLen - 18, crc); - uData.curLen = dlen; // 解析header magicLen = 4; if (memcmp(uData.data, "BAM\1", magicLen)) { spdlog::error("Invalid BAM binary header"); return; } - uData.readPos += magicLen; - header->l_text = le_to_u32(uData.data + uData.readPos); uData.readPos += 4; + uData.readPos += magicLen; // 在解压数据中读取了magicLen个字节 + header->l_text = le_to_u32(uData.data + uData.readPos); uData.readPos += 4; // 解析l_text的长度 header->text = (char *)malloc(header->l_text + 1); - header->text[header->l_text] = 0; -// while (uData.readPos + header->l_text > uData.curLen) { // header内容超过了一个block,继续读 -// readState = fread(fBuf, 1, BLOCK_HEADER_LENGTH, fp); // 读压缩块头 -// blockLen = unpackInt16(&fBuf[16]) + 1; // 压缩块大小 -// readState = fread(&fBuf[BLOCK_HEADER_LENGTH], 1, blockLen - BLOCK_HEADER_LENGTH, fp); // 读压缩块剩下的部分 -// size_t newDataSize = uData.maxLen; -// while (uData.curLen + kMaxBlockSize > uData.maxLen) newDataSize *= 2; -// uData.reAllocMem(newDataSize); -// ret = bgzfUncompress(&uData.data[uData.curLen], &dlen, (Bytef *)fBuf + 18, blockLen - 18, crc); -// uData.curLen += dlen; -// } + header->text[header->l_text] = 0; // 在text末尾添加'\0' + + while (uData.readPos + header->l_text > uData.curLen) { // header内容超过了一个block,继续读 + readUncompressOneBlock(fpr, fBuf, &uData); // 读取包含header内容的gz block + } + memcpy(header->text, &uData.data[uData.readPos], header->l_text); uData.readPos += header->l_text; @@ -100,11 +110,13 @@ void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf) { } for (int i = 0; i != header->n_targets; ++i) { + while (uData.readPos + 4 > uData.curLen) readUncompressOneBlock(fpr, fBuf, &uData); memcpy(&nameLen, &uData.data[uData.readPos], 4); uData.readPos += 4; if (nsgv::gIsBigEndian) ed_swap_4p(&nameLen); header->target_name[i] = (char *)malloc(nameLen); ++numNames; + while (uData.readPos + nameLen > uData.curLen) readUncompressOneBlock(fpr, fBuf, &uData); memcpy(header->target_name[i], &uData.data[uData.readPos], nameLen); uData.readPos += nameLen; if (header->target_name[i][nameLen - 1] != '\0') { @@ -114,6 +126,7 @@ void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf) { header->target_name[i] = newName; header->target_name[i][nameLen] = '\0'; } + while (uData.readPos + 4 > uData.curLen) readUncompressOneBlock(fpr, fBuf, &uData); memcpy(&header->target_len[i], &uData.data[uData.readPos], 4); uData.readPos += 4; if (nsgv::gIsBigEndian) ed_swap_4p(&header->target_len[i]); @@ -122,9 +135,14 @@ void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf) { } // spdlog::info("test res: {} {} {} {}", header->l_text, dlen, header->n_targets, header->text); + spdlog::info("udata: readPos: {}, curLen: {}, maxLen: {}", uData.readPos, uData.curLen, uData.maxLen); hdrBuf.header = header; - free(fBuf); + if (uData.readPos < uData.curLen) { // 如果还有数据,则将其保存在hdrBuf中,用以后续的bam解析 + hdrBuf.data = (uint8_t *)malloc(uData.curLen - uData.readPos); + memcpy(hdrBuf.data, &uData.data[uData.readPos], uData.curLen - uData.readPos); + hdrBuf.dataLen = uData.curLen - uData.readPos; + } - // exit(0); + free(fBuf); } \ No newline at end of file diff --git a/src/sort/sam_io.h b/src/sort/sam_io.h index 26acc70..0896329 100644 --- a/src/sort/sam_io.h +++ b/src/sort/sam_io.h @@ -2,13 +2,6 @@ #include #include -#ifndef BLOCK_HEADER_LENGTH -#define BLOCK_HEADER_LENGTH 18 -#endif -#ifndef BLOCK_FOOTER_LENGTH -#define BLOCK_FOOTER_LENGTH 8 -#endif - #define INCREASE_SIZE (8L * 1024 * 1024) struct DataBuffer { @@ -41,10 +34,8 @@ struct DataBuffer { }; struct HeaderBuf { - uint8_t *data; - int bamPos; // 第一条bam的位置 - int curLen; - int maxLen; + uint8_t *data = nullptr; // 保留除header之外的解压数据(即最开始的bam记录) + int dataLen = 0; // 解压数据的字节数 sam_hdr_t *header; }; diff --git a/src/sort/sort.cpp b/src/sort/sort.cpp index fa5432b..32aa9fc 100644 --- a/src/sort/sort.cpp +++ b/src/sort/sort.cpp @@ -16,6 +16,7 @@ #include "sam_io.h" #include "sort_args.h" #include "util/profiling.h" +#include "const_val.h" #define BAM_BLOCK_SIZE 16L * 1024 * 1024 @@ -247,7 +248,7 @@ static void mtUncompressBlock(void *data, long idx, int tid) { PROF_T_BEG(mem_copy); memcpy(udata.data + udata.curLen, buf, dlen); PROF_T_END(tid, mem_copy); - udata.curLen += dlen * 1; + udata.curLen += dlen * 1; } } @@ -371,28 +372,27 @@ int doSort() { // threadRead(fpr); exit(0); - parseSamHeader(fpr, nsgv::gInHdr); + parseSamHeader(fpr, nsgv::gInHdr); + // exit(0); // FILE *fpw = fopen(nsgv::gSortArg.OUTPUT_FILE.c_str(), "rb"); const size_t READ_BUFSIZE = 4L * 1024 * 1024 * nsgv::gSortArg.NUM_THREADS; const size_t MARGIN_SIZE = READ_BUFSIZE * 4; - // const size_t READ_BUFSIZE = 8L * 1024 * 1024; - const size_t SINGLE_BLOCK_SIZE = 65535; + uint8_t *fbuf[5]; fbuf[0] = (uint8_t *)malloc(READ_BUFSIZE); fbuf[1] = (uint8_t *)malloc(READ_BUFSIZE); fbuf[2] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); fbuf[3] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); - fbuf[4] = (uint8_t *)malloc(READ_BUFSIZE); + fbuf[4] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE); - //setvbuf(fpr, (char*)fbuf[4], _IOFBF, 0); - //setvbuf(fpr, (char *)fbuf[4], _IONBF, READ_BUFSIZE); uint8_t *curBuf = fbuf[0]; uint8_t *preBuf = fbuf[1]; uint8_t *preBlock = fbuf[2]; uint8_t *curBlock = fbuf[3]; + uint8_t *halfBlock = fbuf[4]; size_t fsize = 0; size_t block_num = 0; @@ -415,78 +415,43 @@ int doSort() { vector startAddrArr[2]; vector *curStartAddrArr = &startAddrArr[0]; vector *preStartAddrArr = &startAddrArr[1]; - size_t curBLockPos = 0; size_t lastBufRemain = 0; - uint8_t *switchBuf; - vector *switchAddrArr; - uint8_t *switchBlock; pthread_t uncompressTid = 0; pthread_t sortMergeTid = 0; -// 读取一个压缩的block -#define READ_BLOCKS \ - curBLockPos = curReadPos; \ - while (curReadPos + BLOCK_HEADER_LENGTH < readState) { \ - blockLen = unpackInt16(&curBuf[curBLockPos + 16]) + 1; \ - curReadPos += blockLen; \ - if (curReadPos < readState) { \ - /* 这个block完整的包含在curBuf里 */ \ - curStartAddrArr->push_back(&curBuf[curBLockPos]); \ - curBLockPos = curReadPos; \ - } \ - } \ - lastBufRemain = readState - curBLockPos; \ - memcpy(curBlock, &curBuf[curBLockPos], lastBufRemain); // 将不完整的block拷贝到curBlock - -// 交换buffer指针 -#define SWITCH_POINTER \ - switchPointer(&curBuf, &preBuf); \ - switchPointer(&curStartAddrArr, &preStartAddrArr); \ - switchPointer(&curBlock, &preBlock); - PROF_G_BEG(mid_all); PROF_G_BEG(read); - readState = fread(fbuf[4], 1, READ_BUFSIZE, fpr); - // readState = fread(curBuf, 1, READ_BUFSIZE, fpr); - PROF_G_BEG(mem_copy); - memcpy(curBuf, fbuf[4], READ_BUFSIZE); - PROF_G_END(mem_copy); + readState = fread(curBuf, 1, READ_BUFSIZE, fpr); PROF_G_END(read); while (readState > 0) { - //spdlog::info("readState-0: {}", readState); - // while (readState > 0) { - PROF_G_BEG(parse_block); - curStartAddrArr->clear(); fsize += readState; - curReadPos = 0; -// if (lastBufRemain < BLOCK_HEADER_LENGTH) { // 上次遗留的数据不够解析block 长度 -// memcpy(&preBlock[lastBufRemain], curBuf, BLOCK_HEADER_LENGTH); -// curReadPos += BLOCK_HEADER_LENGTH; -// lastBufRemain += BLOCK_HEADER_LENGTH; -// } -// blockLen = unpackInt16(&preBlock[16]) + 1; -// const int lenToRead = blockLen - lastBufRemain; -// memcpy(&preBlock[lastBufRemain], &curBuf[curReadPos], lenToRead); -// curReadPos += lenToRead; -// curStartAddrArr->push_back(&preBlock[0]); + PROF_G_BEG(parse_block); - // curStartAddrArr->push_back(&curBuf[0]); - // spdlog::info("first blocklen: {}, remain: {}, toread: {}", blockLen, lastBufRemain, lenToRead); - // uint8_t buf[65535]; - // int block_length = unpackInt16(&preBlock[16]) + 1; - // uint32_t crc = le_to_u32(preBlock + block_length - 8); - // size_t dlen = 65535; - // int ret = bgzf_uncompress(buf, &dlen, (Bytef *)preBlock + 18, block_length - 18, crc); // 开头是header,得跳过 - // int bamLen = *(uint32_t *)(buf); - // spdlog::info("ret: {}, bamLen: {}, dlen: {}, block_length: {}", ret, bamLen, dlen, block_length); - // exit(0); - - READ_BLOCKS; - // spdlog::info("read block len: {}", blockLen); - - // spdlog::info("cur block size: {}", curStartAddrArr->size()); + curStartAddrArr->clear(); + curReadPos = blockLen - lastBufRemain; /* 上一个buf里剩余的block数据 */ + /* 处理上一个不完整的block */ + if (lastBufRemain > 0) { // 上一轮有剩余 + memcpy(curBlock, halfBlock, lastBufRemain); + memcpy(&curBlock[lastBufRemain], &curBuf[0], curReadPos); // 将不完整的block剩余数据拷贝到curBlock + curStartAddrArr->push_back(&curBlock[0]); + } + + while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */ + blockLen = unpackInt16(&curBuf[curReadPos + 16]) + 1; + if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */ + curStartAddrArr->push_back(&curBuf[curReadPos]); + curReadPos += blockLen; + } else { + break; /* 当前block数据不完整,一部分在还没读入的file数据里 */ + } + } + lastBufRemain = readState - curReadPos; + // spdlog::info("lastBufRemain: {}", lastBufRemain); + if (lastBufRemain > 0) { + memcpy(halfBlock, &curBuf[curReadPos], lastBufRemain); // 将不完整的block拷贝到halfBlock + } totalBlocks += curStartAddrArr->size(); PROF_G_END(parse_block); @@ -512,23 +477,15 @@ int doSort() { pthread_create(&uncompressTid, NULL, nonBlockingUncompress, &sortData); //nonBlockingUncompress(&sortData); #endif - // kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, &sortData, curStartAddrArr->size() / BATCH_SIZE); - // if (uncommpressedDataSize(sortData) >= nsgv::gSortArg.MAX_MEM) { - // kt_for(nsgv::gSortArg.NUM_THREADS, mtSortBlocks, &sortData, nsgv::gSortArg.NUM_THREADS); - // mergeSortedBlocks(sortData); - // } - SWITCH_POINTER; + // 交换buf指针 + switchPointer(&curBuf, &preBuf); + switchPointer(&curStartAddrArr, &preStartAddrArr); + switchPointer(&curBlock, &preBlock); PROF_G_BEG(read); readState = fread(curBuf, 1, READ_BUFSIZE, fpr); - //spdlog::info("readState: {}", readState); - PROF_G_BEG(mem_copy); - // memcpy(curBuf, fbuf[4], readState); readState = READ_BUFSIZE; - memcpy(curBuf, fbuf[4], readState); - PROF_G_END(mem_copy); PROF_G_END(read); - // if (fsize >= 6245369164) break; } pthread_join(uncompressTid, NULL); PROF_G_END(mid_all);