解析header完整了,读取文件并解析gz block的框架正确了

This commit is contained in:
zzh 2025-04-22 00:38:38 +08:00
parent bcb8b420ba
commit 9b2c35eaeb
5 changed files with 149 additions and 123 deletions

44
code.bak 100644
View File

@ -0,0 +1,44 @@
// if (lastBufRemain < BLOCK_HEADER_LENGTH) { // 上次遗留的数据不够解析block 长度
// memcpy(&preBlock[lastBufRemain], curBuf, BLOCK_HEADER_LENGTH);
// curReadPos += BLOCK_HEADER_LENGTH;
// lastBufRemain += BLOCK_HEADER_LENGTH;
// }
// blockLen = unpackInt16(&preBlock[16]) + 1;
// const int lenToRead = blockLen - lastBufRemain;
// memcpy(&preBlock[lastBufRemain], &curBuf[curReadPos], lenToRead);
// curReadPos += lenToRead;
// curStartAddrArr->push_back(&preBlock[0]);
// curStartAddrArr->push_back(&curBuf[0]);
// spdlog::info("first blocklen: {}, remain: {}, toread: {}", blockLen, lastBufRemain, lenToRead);
// uint8_t buf[65535];
// int block_length = unpackInt16(&preBlock[16]) + 1;
// uint32_t crc = le_to_u32(preBlock + block_length - 8);
// size_t dlen = 65535;
// int ret = bgzf_uncompress(buf, &dlen, (Bytef *)preBlock + 18, block_length - 18, crc); // 开头是header得跳过
// int bamLen = *(uint32_t *)(buf);
// spdlog::info("ret: {}, bamLen: {}, dlen: {}, block_length: {}", ret, bamLen, dlen, block_length);
// exit(0);
// spdlog::info("read block len: {}", blockLen);
// spdlog::info("cur block size: {}", curStartAddrArr->size());
PROF_G_BEG(read);
readState = fread(fbuf[4], 1, READ_BUFSIZE, fpr);
readState = fread(curBuf, 1, READ_BUFSIZE, fpr);
PROF_G_BEG(mem_copy);
memcpy(curBuf, fbuf[4], READ_BUFSIZE);
PROF_G_END(mem_copy);
PROF_G_END(read);
// kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, &sortData, curStartAddrArr->size() / BATCH_SIZE);
// if (uncommpressedDataSize(sortData) >= nsgv::gSortArg.MAX_MEM) {
// kt_for(nsgv::gSortArg.NUM_THREADS, mtSortBlocks, &sortData, nsgv::gSortArg.NUM_THREADS);
// mergeSortedBlocks(sortData);
// }
//setvbuf(fpr, (char*)fbuf[4], _IOFBF, 0);
//setvbuf(fpr, (char *)fbuf[4], _IONBF, READ_BUFSIZE);
// const size_t READ_BUFSIZE = 8L * 1024 * 1024;

View File

@ -0,0 +1,16 @@
#pragma once
#include <stdlib.h>
// 单个block gz的最大解压大小
#define SINGLE_BLOCK_SIZE 65535
// bam header字节数量
#ifndef BLOCK_HEADER_LENGTH
#define BLOCK_HEADER_LENGTH 18
#endif
// bam footer字节数量
#ifndef BLOCK_FOOTER_LENGTH
#define BLOCK_FOOTER_LENGTH 8
#endif

View File

@ -7,6 +7,7 @@
#include <zlib.h>
#include <spdlog/spdlog.h>
#include "const_val.h"
namespace nsgv {
extern bool gIsBigEndian;
@ -40,50 +41,59 @@ int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen,
return 0;
}
void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf) {
const int kMaxBlockSize = 65535;
// 读取并解压一个gz block
size_t readUncompressOneBlock(FILE *fpr, uint8_t *fBuf, DataBuffer *uDataPtr) {
DataBuffer &uData = *uDataPtr;
size_t readState = 0;
int blockLen = 0; // 每个gz block的大小
size_t dlen = SINGLE_BLOCK_SIZE; // gz block解压之后的真正大小
readState = fread(fBuf, 1, BLOCK_HEADER_LENGTH, fpr); // 先读取一个gz block的头部
blockLen = unpackInt16(&fBuf[16]) + 1; // block的字节数
readState = fread(&fBuf[BLOCK_HEADER_LENGTH], 1, blockLen - BLOCK_HEADER_LENGTH, fpr);
// 解压gz block
uint32_t crc = le_to_u32(fBuf + blockLen - 8);
size_t newDataSize = uData.maxLen;
while (uData.curLen + SINGLE_BLOCK_SIZE > newDataSize) newDataSize *= 2;
uData.reAllocMem(newDataSize); // 需要重新开辟空间
int ret = bgzfUncompress(&uData.data[uData.curLen], &dlen, (Bytef *)fBuf + BLOCK_HEADER_LENGTH,
blockLen - BLOCK_HEADER_LENGTH, crc);
if (ret < 0) {
spdlog::error("Error decompressing gz block");
exit(0);
}
uData.curLen += dlen;
spdlog::info("gz block size: {}, uncompressed size: {}", blockLen, dlen);
return readState;
}
// 解析sam头部信息
void parseSamHeader(FILE *fpr, HeaderBuf &hdrBuf) {
const int kMaxBlockSize = SINGLE_BLOCK_SIZE;
sam_hdr_t *header = NULL;
uint8_t *fBuf = (uint8_t*)malloc(kMaxBlockSize);
DataBuffer uData;
size_t readState = 0;
int blockLen = 0;
size_t dlen = 0x10000; // 65535
DataBuffer uData; // 用来放解压缩后的数据,待解析
int magicLen;
int32_t i, nameLen, numNames = 0;
size_t bufsize;
ssize_t bytes;
uData.allocMem(kMaxBlockSize);
header = sam_hdr_init(); // 初始化header
uData.allocMem(kMaxBlockSize); // 初始化解压数据的buffer
readUncompressOneBlock(fpr, fBuf, &uData); // 读取第一个gz block
readState = fread(fBuf, 1, BLOCK_HEADER_LENGTH, fp);
blockLen = unpackInt16(&fBuf[16]) + 1;
readState = fread(&fBuf[BLOCK_HEADER_LENGTH], 1, blockLen - BLOCK_HEADER_LENGTH, fp);
header = sam_hdr_init();
spdlog::info("Header file size: {}", blockLen);
uint32_t crc = le_to_u32(fBuf + blockLen - 8);
int ret = bgzfUncompress(uData.data, &dlen, (Bytef *)fBuf + 18, blockLen - 18, crc);
uData.curLen = dlen;
// 解析header
magicLen = 4;
if (memcmp(uData.data, "BAM\1", magicLen)) {
spdlog::error("Invalid BAM binary header");
return;
}
uData.readPos += magicLen;
header->l_text = le_to_u32(uData.data + uData.readPos); uData.readPos += 4;
uData.readPos += magicLen; // 在解压数据中读取了magicLen个字节
header->l_text = le_to_u32(uData.data + uData.readPos); uData.readPos += 4; // 解析l_text的长度
header->text = (char *)malloc(header->l_text + 1);
header->text[header->l_text] = 0;
// while (uData.readPos + header->l_text > uData.curLen) { // header内容超过了一个block继续读
// readState = fread(fBuf, 1, BLOCK_HEADER_LENGTH, fp); // 读压缩块头
// blockLen = unpackInt16(&fBuf[16]) + 1; // 压缩块大小
// readState = fread(&fBuf[BLOCK_HEADER_LENGTH], 1, blockLen - BLOCK_HEADER_LENGTH, fp); // 读压缩块剩下的部分
// size_t newDataSize = uData.maxLen;
// while (uData.curLen + kMaxBlockSize > uData.maxLen) newDataSize *= 2;
// uData.reAllocMem(newDataSize);
// ret = bgzfUncompress(&uData.data[uData.curLen], &dlen, (Bytef *)fBuf + 18, blockLen - 18, crc);
// uData.curLen += dlen;
// }
header->text[header->l_text] = 0; // 在text末尾添加'\0'
while (uData.readPos + header->l_text > uData.curLen) { // header内容超过了一个block继续读
readUncompressOneBlock(fpr, fBuf, &uData); // 读取包含header内容的gz block
}
memcpy(header->text, &uData.data[uData.readPos], header->l_text);
uData.readPos += header->l_text;
@ -100,11 +110,13 @@ void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf) {
}
for (int i = 0; i != header->n_targets; ++i) {
while (uData.readPos + 4 > uData.curLen) readUncompressOneBlock(fpr, fBuf, &uData);
memcpy(&nameLen, &uData.data[uData.readPos], 4);
uData.readPos += 4;
if (nsgv::gIsBigEndian) ed_swap_4p(&nameLen);
header->target_name[i] = (char *)malloc(nameLen);
++numNames;
while (uData.readPos + nameLen > uData.curLen) readUncompressOneBlock(fpr, fBuf, &uData);
memcpy(header->target_name[i], &uData.data[uData.readPos], nameLen);
uData.readPos += nameLen;
if (header->target_name[i][nameLen - 1] != '\0') {
@ -114,6 +126,7 @@ void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf) {
header->target_name[i] = newName;
header->target_name[i][nameLen] = '\0';
}
while (uData.readPos + 4 > uData.curLen) readUncompressOneBlock(fpr, fBuf, &uData);
memcpy(&header->target_len[i], &uData.data[uData.readPos], 4);
uData.readPos += 4;
if (nsgv::gIsBigEndian) ed_swap_4p(&header->target_len[i]);
@ -122,9 +135,14 @@ void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf) {
}
// spdlog::info("test res: {} {} {} {}", header->l_text, dlen, header->n_targets, header->text);
spdlog::info("udata: readPos: {}, curLen: {}, maxLen: {}", uData.readPos, uData.curLen, uData.maxLen);
hdrBuf.header = header;
free(fBuf);
if (uData.readPos < uData.curLen) { // 如果还有数据则将其保存在hdrBuf中用以后续的bam解析
hdrBuf.data = (uint8_t *)malloc(uData.curLen - uData.readPos);
memcpy(hdrBuf.data, &uData.data[uData.readPos], uData.curLen - uData.readPos);
hdrBuf.dataLen = uData.curLen - uData.readPos;
}
// exit(0);
free(fBuf);
}

View File

@ -2,13 +2,6 @@
#include <htslib/sam.h>
#include <stdio.h>
#ifndef BLOCK_HEADER_LENGTH
#define BLOCK_HEADER_LENGTH 18
#endif
#ifndef BLOCK_FOOTER_LENGTH
#define BLOCK_FOOTER_LENGTH 8
#endif
#define INCREASE_SIZE (8L * 1024 * 1024)
struct DataBuffer {
@ -41,10 +34,8 @@ struct DataBuffer {
};
struct HeaderBuf {
uint8_t *data;
int bamPos; // 第一条bam的位置
int curLen;
int maxLen;
uint8_t *data = nullptr; // 保留除header之外的解压数据即最开始的bam记录
int dataLen = 0; // 解压数据的字节数
sam_hdr_t *header;
};

View File

@ -16,6 +16,7 @@
#include "sam_io.h"
#include "sort_args.h"
#include "util/profiling.h"
#include "const_val.h"
#define BAM_BLOCK_SIZE 16L * 1024 * 1024
@ -372,27 +373,26 @@ int doSort() {
// threadRead(fpr); exit(0);
parseSamHeader(fpr, nsgv::gInHdr);
// exit(0);
// FILE *fpw = fopen(nsgv::gSortArg.OUTPUT_FILE.c_str(), "rb");
const size_t READ_BUFSIZE = 4L * 1024 * 1024 * nsgv::gSortArg.NUM_THREADS;
const size_t MARGIN_SIZE = READ_BUFSIZE * 4;
// const size_t READ_BUFSIZE = 8L * 1024 * 1024;
const size_t SINGLE_BLOCK_SIZE = 65535;
uint8_t *fbuf[5];
fbuf[0] = (uint8_t *)malloc(READ_BUFSIZE);
fbuf[1] = (uint8_t *)malloc(READ_BUFSIZE);
fbuf[2] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
fbuf[3] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
fbuf[4] = (uint8_t *)malloc(READ_BUFSIZE);
fbuf[4] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
//setvbuf(fpr, (char*)fbuf[4], _IOFBF, 0);
//setvbuf(fpr, (char *)fbuf[4], _IONBF, READ_BUFSIZE);
uint8_t *curBuf = fbuf[0];
uint8_t *preBuf = fbuf[1];
uint8_t *preBlock = fbuf[2];
uint8_t *curBlock = fbuf[3];
uint8_t *halfBlock = fbuf[4];
size_t fsize = 0;
size_t block_num = 0;
@ -415,78 +415,43 @@ int doSort() {
vector<uint8_t *> startAddrArr[2];
vector<uint8_t *> *curStartAddrArr = &startAddrArr[0];
vector<uint8_t *> *preStartAddrArr = &startAddrArr[1];
size_t curBLockPos = 0;
size_t lastBufRemain = 0;
uint8_t *switchBuf;
vector<uint8_t *> *switchAddrArr;
uint8_t *switchBlock;
pthread_t uncompressTid = 0;
pthread_t sortMergeTid = 0;
// 读取一个压缩的block
#define READ_BLOCKS \
curBLockPos = curReadPos; \
while (curReadPos + BLOCK_HEADER_LENGTH < readState) { \
blockLen = unpackInt16(&curBuf[curBLockPos + 16]) + 1; \
curReadPos += blockLen; \
if (curReadPos < readState) { \
/* 这个block完整的包含在curBuf里 */ \
curStartAddrArr->push_back(&curBuf[curBLockPos]); \
curBLockPos = curReadPos; \
} \
} \
lastBufRemain = readState - curBLockPos; \
memcpy(curBlock, &curBuf[curBLockPos], lastBufRemain); // 将不完整的block拷贝到curBlock
// 交换buffer指针
#define SWITCH_POINTER \
switchPointer(&curBuf, &preBuf); \
switchPointer(&curStartAddrArr, &preStartAddrArr); \
switchPointer(&curBlock, &preBlock);
PROF_G_BEG(mid_all);
PROF_G_BEG(read);
readState = fread(fbuf[4], 1, READ_BUFSIZE, fpr);
// readState = fread(curBuf, 1, READ_BUFSIZE, fpr);
PROF_G_BEG(mem_copy);
memcpy(curBuf, fbuf[4], READ_BUFSIZE);
PROF_G_END(mem_copy);
readState = fread(curBuf, 1, READ_BUFSIZE, fpr);
PROF_G_END(read);
while (readState > 0) {
//spdlog::info("readState-0: {}", readState);
// while (readState > 0) {
PROF_G_BEG(parse_block);
curStartAddrArr->clear();
fsize += readState;
curReadPos = 0;
// if (lastBufRemain < BLOCK_HEADER_LENGTH) { // 上次遗留的数据不够解析block 长度
// memcpy(&preBlock[lastBufRemain], curBuf, BLOCK_HEADER_LENGTH);
// curReadPos += BLOCK_HEADER_LENGTH;
// lastBufRemain += BLOCK_HEADER_LENGTH;
// }
// blockLen = unpackInt16(&preBlock[16]) + 1;
// const int lenToRead = blockLen - lastBufRemain;
// memcpy(&preBlock[lastBufRemain], &curBuf[curReadPos], lenToRead);
// curReadPos += lenToRead;
// curStartAddrArr->push_back(&preBlock[0]);
PROF_G_BEG(parse_block);
// curStartAddrArr->push_back(&curBuf[0]);
// spdlog::info("first blocklen: {}, remain: {}, toread: {}", blockLen, lastBufRemain, lenToRead);
// uint8_t buf[65535];
// int block_length = unpackInt16(&preBlock[16]) + 1;
// uint32_t crc = le_to_u32(preBlock + block_length - 8);
// size_t dlen = 65535;
// int ret = bgzf_uncompress(buf, &dlen, (Bytef *)preBlock + 18, block_length - 18, crc); // 开头是header得跳过
// int bamLen = *(uint32_t *)(buf);
// spdlog::info("ret: {}, bamLen: {}, dlen: {}, block_length: {}", ret, bamLen, dlen, block_length);
// exit(0);
curStartAddrArr->clear();
curReadPos = blockLen - lastBufRemain; /* 上一个buf里剩余的block数据 */
/* 处理上一个不完整的block */
if (lastBufRemain > 0) { // 上一轮有剩余
memcpy(curBlock, halfBlock, lastBufRemain);
memcpy(&curBlock[lastBufRemain], &curBuf[0], curReadPos); // 将不完整的block剩余数据拷贝到curBlock
curStartAddrArr->push_back(&curBlock[0]);
}
READ_BLOCKS;
// spdlog::info("read block len: {}", blockLen);
// spdlog::info("cur block size: {}", curStartAddrArr->size());
while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */
blockLen = unpackInt16(&curBuf[curReadPos + 16]) + 1;
if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */
curStartAddrArr->push_back(&curBuf[curReadPos]);
curReadPos += blockLen;
} else {
break; /* 当前block数据不完整一部分在还没读入的file数据里 */
}
}
lastBufRemain = readState - curReadPos;
// spdlog::info("lastBufRemain: {}", lastBufRemain);
if (lastBufRemain > 0) {
memcpy(halfBlock, &curBuf[curReadPos], lastBufRemain); // 将不完整的block拷贝到halfBlock
}
totalBlocks += curStartAddrArr->size();
PROF_G_END(parse_block);
@ -512,23 +477,15 @@ int doSort() {
pthread_create(&uncompressTid, NULL, nonBlockingUncompress, &sortData);
//nonBlockingUncompress(&sortData);
#endif
// kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, &sortData, curStartAddrArr->size() / BATCH_SIZE);
// if (uncommpressedDataSize(sortData) >= nsgv::gSortArg.MAX_MEM) {
// kt_for(nsgv::gSortArg.NUM_THREADS, mtSortBlocks, &sortData, nsgv::gSortArg.NUM_THREADS);
// mergeSortedBlocks(sortData);
// }
SWITCH_POINTER;
// 交换buf指针
switchPointer(&curBuf, &preBuf);
switchPointer(&curStartAddrArr, &preStartAddrArr);
switchPointer(&curBlock, &preBlock);
PROF_G_BEG(read);
readState = fread(curBuf, 1, READ_BUFSIZE, fpr);
//spdlog::info("readState: {}", readState);
PROF_G_BEG(mem_copy);
// memcpy(curBuf, fbuf[4], readState); readState = READ_BUFSIZE;
memcpy(curBuf, fbuf[4], readState);
PROF_G_END(mem_copy);
PROF_G_END(read);
// if (fsize >= 6245369164) break;
}
pthread_join(uncompressTid, NULL);
PROF_G_END(mid_all);