Merge branch 'main' of ssh://zzhict.top:6622/zzh/FastSort
This commit is contained in:
commit
ab7e3989d0
|
|
@ -13,6 +13,19 @@ namespace nsgv {
|
||||||
extern bool gIsBigEndian;
|
extern bool gIsBigEndian;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
size_t fileSize(FILE *fp) {
|
||||||
|
spdlog::info("test file start");
|
||||||
|
size_t bufSize = 64L * 1024 * 1024;
|
||||||
|
uint8_t *testBuf = (uint8_t *)malloc(bufSize);
|
||||||
|
size_t readState = 0;
|
||||||
|
size_t fileSize = 0;
|
||||||
|
while ((readState = fread(testBuf, 1, bufSize, fp)) > 0) {
|
||||||
|
fileSize += readState;
|
||||||
|
}
|
||||||
|
spdlog::info("test file size: {}", fileSize);
|
||||||
|
return fileSize;
|
||||||
|
}
|
||||||
|
|
||||||
int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen, uint32_t expected_crc) {
|
int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen, uint32_t expected_crc) {
|
||||||
struct libdeflate_decompressor *z = libdeflate_alloc_decompressor();
|
struct libdeflate_decompressor *z = libdeflate_alloc_decompressor();
|
||||||
if (!z) {
|
if (!z) {
|
||||||
|
|
|
||||||
|
|
@ -44,12 +44,6 @@ struct HeaderBuf {
|
||||||
sam_hdr_t *header;
|
sam_hdr_t *header;
|
||||||
};
|
};
|
||||||
|
|
||||||
// 解压之后的数据结构
|
|
||||||
struct UBlockBuf {
|
|
||||||
uint8_t *data;
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
static int unpackInt16(const uint8_t *buffer) { return buffer[0] | buffer[1] << 8; }
|
static int unpackInt16(const uint8_t *buffer) { return buffer[0] | buffer[1] << 8; }
|
||||||
|
|
||||||
void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf);
|
void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf);
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@
|
||||||
#include <sys/mman.h>
|
#include <sys/mman.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <zlib.h>
|
#include <zlib.h>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
|
|
@ -21,6 +22,8 @@
|
||||||
#include "util/profiling.h"
|
#include "util/profiling.h"
|
||||||
#include "util/yarn.h"
|
#include "util/yarn.h"
|
||||||
|
|
||||||
|
using std::string;
|
||||||
|
|
||||||
#define BAM_BLOCK_SIZE 16L * 1024 * 1024
|
#define BAM_BLOCK_SIZE 16L * 1024 * 1024
|
||||||
|
|
||||||
namespace nsgv {
|
namespace nsgv {
|
||||||
|
|
@ -42,224 +45,6 @@ struct BamSortData {
|
||||||
char *qname; // pointer to qname
|
char *qname; // pointer to qname
|
||||||
};
|
};
|
||||||
|
|
||||||
// thread data for sorting
|
|
||||||
struct SortData {
|
|
||||||
/* 用在多线程解压中的数据 */
|
|
||||||
vector<uint8_t *> *blockAddrArr; // 等待解压的gz block
|
|
||||||
vector<ThreadBlockArr> blockItemArr; // 每个thread一个,用来保存解压后的block数据
|
|
||||||
volatile int uncompressFinish = 0; // 是否解压完成
|
|
||||||
|
|
||||||
/* 单线程解析用到的数据 */
|
|
||||||
DataBuffer bamData; // 解压后的数据存放在连续的地址空间中
|
|
||||||
BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int round = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
int sam_realloc_bam_data(bam1_t *b, size_t desired) {
|
|
||||||
uint32_t new_m_data;
|
|
||||||
uint8_t *new_data;
|
|
||||||
new_m_data = desired;
|
|
||||||
kroundup32(new_m_data); // next power of 2
|
|
||||||
new_m_data += 32; // reduces malloc arena migrations?
|
|
||||||
if (new_m_data < desired) {
|
|
||||||
errno = ENOMEM; // Not strictly true but we can't store the size
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION
|
|
||||||
if (new_m_data > FUZZ_ALLOC_LIMIT) {
|
|
||||||
errno = ENOMEM;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
if ((bam_get_mempolicy(b) & BAM_USER_OWNS_DATA) == 0) {
|
|
||||||
new_data = (uint8_t*)realloc(b->data, new_m_data);
|
|
||||||
} else {
|
|
||||||
if ((new_data = (uint8_t *)malloc(new_m_data)) != NULL) {
|
|
||||||
if (b->l_data > 0)
|
|
||||||
memcpy(new_data, b->data, b->l_data < b->m_data ? b->l_data : b->m_data);
|
|
||||||
bam_set_mempolicy(b, bam_get_mempolicy(b) & (~BAM_USER_OWNS_DATA));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!new_data)
|
|
||||||
return -1;
|
|
||||||
b->data = new_data;
|
|
||||||
b->m_data = new_m_data;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline int realloc_bam_data(bam1_t *b, size_t desired) {
|
|
||||||
if (desired <= b->m_data)
|
|
||||||
return 0;
|
|
||||||
return sam_realloc_bam_data(b, desired);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fix bad records where qname is not terminated correctly.
|
|
||||||
static int fixup_missing_qname_nul(bam1_t *b) {
|
|
||||||
bam1_core_t *c = &b->core;
|
|
||||||
|
|
||||||
// Note this is called before c->l_extranul is added to c->l_qname
|
|
||||||
if (c->l_extranul > 0) {
|
|
||||||
b->data[c->l_qname++] = '\0';
|
|
||||||
c->l_extranul--;
|
|
||||||
} else {
|
|
||||||
if (b->l_data > INT_MAX - 4)
|
|
||||||
return -1;
|
|
||||||
if (realloc_bam_data(b, b->l_data + 4) < 0)
|
|
||||||
return -1;
|
|
||||||
b->l_data += 4;
|
|
||||||
b->data[c->l_qname++] = '\0';
|
|
||||||
c->l_extranul = 3;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void bam_cigar2rqlens(int n_cigar, const uint32_t *cigar, hts_pos_t *rlen, hts_pos_t *qlen) {
|
|
||||||
int k;
|
|
||||||
*rlen = *qlen = 0;
|
|
||||||
for (k = 0; k < n_cigar; ++k) {
|
|
||||||
int type = bam_cigar_type(bam_cigar_op(cigar[k]));
|
|
||||||
int len = bam_cigar_oplen(cigar[k]);
|
|
||||||
if (type & 1)
|
|
||||||
*qlen += len;
|
|
||||||
if (type & 2)
|
|
||||||
*rlen += len;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// multi-thread uncompress bam blocks
|
|
||||||
static void mtUncompressBlock1(void *data, long idx, int tid) {
|
|
||||||
auto &p = *(SortData*) data;
|
|
||||||
auto &blockItemArr = p.blockItemArr[tid];
|
|
||||||
|
|
||||||
if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) {
|
|
||||||
blockItemArr.blockArr.push_back(OneBlock());
|
|
||||||
}
|
|
||||||
|
|
||||||
auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
|
|
||||||
|
|
||||||
uint8_t *block = (*p.blockAddrArr)[idx];
|
|
||||||
|
|
||||||
size_t dlen = SINGLE_BLOCK_SIZE; // 65535
|
|
||||||
int block_length = unpackInt16(&block[16]) + 1;
|
|
||||||
uint32_t crc = le_to_u32(block + block_length - 8);
|
|
||||||
int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef *)block + BLOCK_HEADER_LENGTH, block_length - BLOCK_HEADER_LENGTH, crc);
|
|
||||||
|
|
||||||
blockItem.blockId = idx;
|
|
||||||
blockItem.blockLen = dlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void mergeSortedBlocks(const SortData &sortData) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *threadMergeParseBlocks(void *data) {
|
|
||||||
return nullptr;
|
|
||||||
|
|
||||||
auto &p = *(SortData *)data;
|
|
||||||
DataBuffer &buf = p.bamData;
|
|
||||||
|
|
||||||
int idx = 0;
|
|
||||||
int tIdx[nsgv::gSortArg.NUM_THREADS] = {0};
|
|
||||||
while (!p.uncompressFinish) {
|
|
||||||
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
|
|
||||||
auto &blockItemArr = p.blockItemArr[t];
|
|
||||||
if (tIdx[t] < blockItemArr.curIdx && blockItemArr.blockArr[tIdx[t]].blockId == idx) {
|
|
||||||
//spdlog::info("1. thread-{} arr idx: {}, block id: {}, size: {}, idx: {}", t, tIdx[t],
|
|
||||||
// blockItemArr.blockArr[tIdx[t]].blockId, blockItemArr.curIdx, idx);
|
|
||||||
++tIdx[t];
|
|
||||||
++idx;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 处理结尾数据
|
|
||||||
int totalBlockNum = 0;
|
|
||||||
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
|
|
||||||
auto &blockItemArr = p.blockItemArr[t];
|
|
||||||
totalBlockNum += blockItemArr.curIdx;
|
|
||||||
}
|
|
||||||
/* spdlog::info("here, idx: {}, total: {}", idx, totalBlockNum);
|
|
||||||
while (idx < totalBlockNum) {
|
|
||||||
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
|
|
||||||
auto &blockItemArr = p.blockItemArr[t];
|
|
||||||
// spdlog::info("t:{}, tidx:{}, block id:{}, idx:{}", t, tIdx[t], blockItemArr.blockArr[tIdx[t]].blockId, idx);
|
|
||||||
if (tIdx[t] < blockItemArr.curIdx && blockItemArr.blockArr[tIdx[t]].blockId == idx) {
|
|
||||||
//spdlog::info("2. thread-{} block id: {}, size: {}", t, tIdx[t], blockItemArr.curIdx);
|
|
||||||
++tIdx[t];
|
|
||||||
++idx;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// break;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
//spdlog::info("threadMergeParseBlocks exit, idx: {}", idx);
|
|
||||||
// exit(0);
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *nonBlockingUncompress(void *data) {
|
|
||||||
PROF_G_BEG(uncompress);
|
|
||||||
pthread_t parseTid = 0;
|
|
||||||
|
|
||||||
if (((SortData *)data)->round % 10 == 0)
|
|
||||||
spdlog::info("round-{} block size: {}", ((SortData *)data)->round, ((SortData *)data)->blockAddrArr->size());
|
|
||||||
|
|
||||||
SortData &sortData = *(SortData *)data;
|
|
||||||
for (auto &arr: sortData.blockItemArr) arr.curIdx = 0;
|
|
||||||
sortData.uncompressFinish = 0;
|
|
||||||
|
|
||||||
// 1. 开启一个线程,用来将每个线程解压的数据放到一起
|
|
||||||
pthread_create(&parseTid, NULL, threadMergeParseBlocks, &sortData);
|
|
||||||
|
|
||||||
// 2. 调用并行解压的线程函数
|
|
||||||
kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock1, data, ((SortData *)data)->blockAddrArr->size());
|
|
||||||
sortData.uncompressFinish = 1;
|
|
||||||
int totalBlockNum = 0;
|
|
||||||
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
|
|
||||||
auto &blockItemArr = sortData.blockItemArr[t];
|
|
||||||
totalBlockNum += blockItemArr.curIdx;
|
|
||||||
}
|
|
||||||
// spdlog::info("uncompress finish, total block num: {}", totalBlockNum);
|
|
||||||
|
|
||||||
// 3. 等待负责数据合并的线程结束
|
|
||||||
pthread_join(parseTid, NULL);
|
|
||||||
|
|
||||||
// 4. 如果当前读入的数据超过设定的参数,则进行一次排序
|
|
||||||
|
|
||||||
PROF_G_END(uncompress);
|
|
||||||
|
|
||||||
// for (auto &arr : sortData.blockItemArr) spdlog::info("block size: {}", arr.curIdx);
|
|
||||||
|
|
||||||
((SortData *)data)->round++;
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
static void switchPointer(T *p1, T *p2) {
|
|
||||||
const T tmp = *p1;
|
|
||||||
*p1 = *p2;
|
|
||||||
*p2 = tmp;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *threadRead(void *data) {
|
|
||||||
spdlog::info("test file start");
|
|
||||||
FILE *fp = (FILE*) data;
|
|
||||||
size_t bufSize = 64L * 1024 * 1024;
|
|
||||||
uint8_t *testBuf = (uint8_t *)malloc(bufSize);
|
|
||||||
size_t readState = 0;
|
|
||||||
size_t fileSize = 0;
|
|
||||||
while ((readState = fread(testBuf, 1, bufSize, fp)) > 0) {
|
|
||||||
fileSize += readState;
|
|
||||||
}
|
|
||||||
spdlog::info("test file size: {}", fileSize);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* 将bam文件内容读取到buf,解析buf中的gz block长度信息 */
|
/* 将bam文件内容读取到buf,解析buf中的gz block长度信息 */
|
||||||
static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *fpr) {
|
static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *fpr) {
|
||||||
|
|
@ -272,10 +57,6 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *
|
||||||
readState = fread(readData.dataBuf, 1, readData.readBufSize, fpr);
|
readState = fread(readData.dataBuf, 1, readData.readBufSize, fpr);
|
||||||
if (readState == 0) { return 0; }
|
if (readState == 0) { return 0; }
|
||||||
|
|
||||||
//if (p.readOrder == 278) {
|
|
||||||
// spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos);
|
|
||||||
//}
|
|
||||||
|
|
||||||
readData.startAddrArr.clear();
|
readData.startAddrArr.clear();
|
||||||
|
|
||||||
/* 处理上一个不完整的block */
|
/* 处理上一个不完整的block */
|
||||||
|
|
@ -283,7 +64,7 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *
|
||||||
if (halfBlock.readPos < BLOCK_HEADER_LENGTH) { // 上一轮剩余数据不满足解析block长度信息
|
if (halfBlock.readPos < BLOCK_HEADER_LENGTH) { // 上一轮剩余数据不满足解析block长度信息
|
||||||
memcpy(&halfBlock.data[halfBlock.readPos], readData.dataBuf, BLOCK_HEADER_LENGTH - halfBlock.readPos);
|
memcpy(&halfBlock.data[halfBlock.readPos], readData.dataBuf, BLOCK_HEADER_LENGTH - halfBlock.readPos);
|
||||||
halfBlock.curLen = unpackInt16(&halfBlock.data[16]) + 1; // 更新一下剩余block的真正长度
|
halfBlock.curLen = unpackInt16(&halfBlock.data[16]) + 1; // 更新一下剩余block的真正长度
|
||||||
spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos);
|
// spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos);
|
||||||
}
|
}
|
||||||
memcpy(readData.blockBuf, halfBlock.data, halfBlock.readPos);
|
memcpy(readData.blockBuf, halfBlock.data, halfBlock.readPos);
|
||||||
curReadPos = halfBlock.curLen - halfBlock.readPos; // curlen保存上一个block的长度,readPos保存上一个block在上一次读取中的长度
|
curReadPos = halfBlock.curLen - halfBlock.readPos; // curlen保存上一个block的长度,readPos保存上一个block在上一次读取中的长度
|
||||||
|
|
@ -291,7 +72,7 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *
|
||||||
readData.startAddrArr.push_back(readData.blockBuf);
|
readData.startAddrArr.push_back(readData.blockBuf);
|
||||||
}
|
}
|
||||||
/* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */
|
/* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */
|
||||||
while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */
|
while (curReadPos + BLOCK_HEADER_LENGTH <= readState) { /* 确保能解析block长度 */
|
||||||
blockLen = unpackInt16(&readData.dataBuf[curReadPos + 16]) + 1;
|
blockLen = unpackInt16(&readData.dataBuf[curReadPos + 16]) + 1;
|
||||||
if (blockLen > maxBlockLen) { maxBlockLen = blockLen; }
|
if (blockLen > maxBlockLen) { maxBlockLen = blockLen; }
|
||||||
if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */
|
if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */
|
||||||
|
|
@ -307,11 +88,9 @@ static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *
|
||||||
if (halfBlock.readPos > 0) {
|
if (halfBlock.readPos > 0) {
|
||||||
memcpy(halfBlock.data, &readData.dataBuf[curReadPos], halfBlock.readPos); // 将不完整的block拷贝到halfBlock
|
memcpy(halfBlock.data, &readData.dataBuf[curReadPos], halfBlock.readPos); // 将不完整的block拷贝到halfBlock
|
||||||
}
|
}
|
||||||
//if (p.readOrder == 277) {
|
|
||||||
// spdlog::info("tail - last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos);
|
// spdlog::info("block num-1: {}", readData.startAddrArr.size());
|
||||||
//}
|
//spdlog::info("read order: {}, max block len: {}", p.readOrder, maxBlockLen);
|
||||||
//spdlog::info("block num-1: {}", readData.startAddrArr.size());
|
|
||||||
//spdlog::info("max block len: {}", maxBlockLen);
|
|
||||||
|
|
||||||
return readState;
|
return readState;
|
||||||
}
|
}
|
||||||
|
|
@ -361,6 +140,7 @@ static void mtUncompressBlock(void *data, long idx, int tid) {
|
||||||
if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) {
|
if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) {
|
||||||
blockItemArr.blockArr.push_back(OneBlock());
|
blockItemArr.blockArr.push_back(OneBlock());
|
||||||
}
|
}
|
||||||
|
auto &bamItemArr = p.bamItemArr[tid];
|
||||||
|
|
||||||
auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
|
auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
|
||||||
|
|
||||||
|
|
@ -381,15 +161,33 @@ static void mtUncompressBlock(void *data, long idx, int tid) {
|
||||||
uint32_t bamLen = 0;
|
uint32_t bamLen = 0;
|
||||||
uint32_t bamNum = 0;
|
uint32_t bamNum = 0;
|
||||||
PROF_G_BEG(mem_copy);
|
PROF_G_BEG(mem_copy);
|
||||||
// memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen);
|
|
||||||
/* 解析每个bam */
|
/* 解析每个bam */
|
||||||
while (nextBamStart + 4 <= blockItem.blockLen) {
|
while (nextBamStart + 4 <= blockItem.blockLen) {
|
||||||
memcpy(&bamLen, &blockItem.data[nextBamStart], 4);
|
bamItemArr.bamArr.push_back(OneBam());
|
||||||
nextBamStart += 4 + bamLen;
|
OneBam &bam = bamItemArr.bamArr.back();
|
||||||
|
|
||||||
|
bam.addr = &blockItem.data[nextBamStart];
|
||||||
|
uint8_t *curAddr = &blockItem.data[nextBamStart];
|
||||||
|
memcpy(&bamLen, curAddr, 4);
|
||||||
|
curAddr += 4;
|
||||||
|
if (nsgv::gIsBigEndian) ed_swap_4p(&bamLen);
|
||||||
|
bam.bamLen = bamLen;
|
||||||
|
bam.tid = le_to_u32(curAddr);
|
||||||
|
bam.pos = le_to_i32(curAddr + 4);
|
||||||
|
uint32_t x2 = le_to_u32(curAddr + 8);
|
||||||
|
bam.qnameLen = x2 & 0xff;
|
||||||
|
bam.qnameAddr = (char *)(curAddr + 32);
|
||||||
|
|
||||||
// spdlog::info("bam len: {}", bamLen);
|
// spdlog::info("bam len: {}", bamLen);
|
||||||
|
nextBamStart += 4 + bamLen;
|
||||||
++bamNum;
|
++bamNum;
|
||||||
}
|
}
|
||||||
|
if (nextBamStart != blockItem.blockLen) {
|
||||||
|
spdlog::error("Block content does not contain integer number of bam records!");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
blockItem.bamNum = bamNum;
|
blockItem.bamNum = bamNum;
|
||||||
|
|
||||||
PROF_G_END(mem_copy);
|
PROF_G_END(mem_copy);
|
||||||
|
|
||||||
// 判断是否超过内存阈值
|
// 判断是否超过内存阈值
|
||||||
|
|
@ -447,10 +245,13 @@ static void mtUncompressBlockBatch(void* data, long idx, int tid) {
|
||||||
/* 将gz block进行解压,并进行线程内排序 */
|
/* 将gz block进行解压,并进行线程内排序 */
|
||||||
static void doFirstPipeUncompress(FirstPipeArg &p) {
|
static void doFirstPipeUncompress(FirstPipeArg &p) {
|
||||||
// return;
|
// return;
|
||||||
|
PROF_G_BEG(uncompress);
|
||||||
ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM];
|
ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM];
|
||||||
UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM];
|
UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM];
|
||||||
uncompressData.readDataPtr = &readData;
|
uncompressData.readDataPtr = &readData;
|
||||||
uncompressData.ResetBlockArr();
|
uncompressData.ResetBlockArr();
|
||||||
|
uncompressData.ResetBamArr();
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size());
|
kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size());
|
||||||
#else
|
#else
|
||||||
|
|
@ -458,9 +259,13 @@ static void doFirstPipeUncompress(FirstPipeArg &p) {
|
||||||
#endif
|
#endif
|
||||||
//spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx,
|
//spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx,
|
||||||
// uncompressData.blockItemArr[0].blockArr.size());
|
// uncompressData.blockItemArr[0].blockArr.size());
|
||||||
|
|
||||||
// 判断是否超过内存阈值
|
// 判断是否超过内存阈值
|
||||||
|
// auto &bam = uncompressData.bamItemArr[0].bamArr.back();
|
||||||
|
// string qname(bam.qnameAddr, bam.qnameLen);
|
||||||
|
// spdlog::info("bam name:{}", qname);
|
||||||
|
|
||||||
|
PROF_G_END(uncompress);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* FirstPipe step-2 并行解压gz blocks*/
|
/* FirstPipe step-2 并行解压gz blocks*/
|
||||||
|
|
@ -499,29 +304,25 @@ static void *firstPipeUncompress(void *data) {
|
||||||
|
|
||||||
/* 将并行解压的数据放到一起,解析,到容量阈值后,进行排序并输出到中间文件 */
|
/* 将并行解压的数据放到一起,解析,到容量阈值后,进行排序并输出到中间文件 */
|
||||||
void doFirstPipeMergeSort(FirstPipeArg &p) {
|
void doFirstPipeMergeSort(FirstPipeArg &p) {
|
||||||
|
PROF_G_BEG(sort);
|
||||||
UncompressData &uncompressData = p.uncompressData[p.mergeSortOrder % p.UNCOMPRESS_BUF_NUM];
|
UncompressData &uncompressData = p.uncompressData[p.mergeSortOrder % p.UNCOMPRESS_BUF_NUM];
|
||||||
DataBuffer &buf = p.mergeSortData.bamData;
|
|
||||||
|
|
||||||
BlockHeap blockHeap;
|
|
||||||
blockHeap.Init(&uncompressData.blockItemArr);
|
|
||||||
// spdlog::info("block heap size: {}", blockHeap.Size());
|
|
||||||
// spdlog::info("all block bytes: {}", blockHeap.AllBlockBytes());
|
|
||||||
// size_t allBlockBytes = blockHeap.AllBlockBytes();
|
|
||||||
// if (buf.maxLen < buf.curLen + allBlockBytes) {
|
|
||||||
// // spdlog::info("here ");
|
|
||||||
// buf.reAllocMem(buf.curLen + allBlockBytes);
|
|
||||||
// }
|
|
||||||
// memset(buf.data, 0, buf.maxLen);
|
|
||||||
|
|
||||||
const OneBlock *pB;
|
|
||||||
size_t bamNum = 0;
|
|
||||||
size_t blockNum = 0;
|
size_t blockNum = 0;
|
||||||
while ((pB = blockHeap.Pop()) != nullptr && pB->blockLen > 0) {
|
for (int i = 0; i < p.numThread; ++i) {
|
||||||
++blockNum;
|
blockNum += uncompressData.blockItemArr[i].curIdx;
|
||||||
bamNum += pB->bamNum;
|
|
||||||
}
|
}
|
||||||
buf.curLen = 0;
|
size_t bamNum = blockNum * 256;
|
||||||
spdlog::info("block num: {}, bam num: {}", blockNum, bamNum);
|
|
||||||
|
size_t curBlockBamMemSize = blockNum * SINGLE_BLOCK_SIZE + bamNum * 32;
|
||||||
|
/* 内存使用量达到阈值后,进行排序 */
|
||||||
|
if (curBlockBamMemSize > nsgv::gSortArg.MAX_MEM / 2) {
|
||||||
|
uncompressData.ResetBlockArr();
|
||||||
|
uncompressData.ResetBamArr();
|
||||||
|
}
|
||||||
|
|
||||||
|
//spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockBytes, bamNum * 32);
|
||||||
|
spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockNum * SINGLE_BLOCK_SIZE, bamNum * 32);
|
||||||
|
PROF_G_END(sort);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* FirstPipe step-3 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 */
|
/* FirstPipe step-3 串行合并解压后的blocks,并解析每个bam的长度,达到内存阈值后,并行排序 */
|
||||||
|
|
@ -566,6 +367,7 @@ static void bamSortFirstPipe() {
|
||||||
firstPipeArg.uncompressData[i].Resize(firstPipeArg.numThread, 128);
|
firstPipeArg.uncompressData[i].Resize(firstPipeArg.numThread, 128);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PROF_G_BEG(mid_all);
|
||||||
/* create threads */
|
/* create threads */
|
||||||
pthread_t pipeThreadIdArr[3]; // 3-stage pipeline
|
pthread_t pipeThreadIdArr[3]; // 3-stage pipeline
|
||||||
pthread_create(&pipeThreadIdArr[0], NULL, firstPipeReadFile, &firstPipeArg);
|
pthread_create(&pipeThreadIdArr[0], NULL, firstPipeReadFile, &firstPipeArg);
|
||||||
|
|
@ -573,6 +375,7 @@ static void bamSortFirstPipe() {
|
||||||
pthread_create(&pipeThreadIdArr[2], NULL, firstPipeMergeSort, &firstPipeArg);
|
pthread_create(&pipeThreadIdArr[2], NULL, firstPipeMergeSort, &firstPipeArg);
|
||||||
|
|
||||||
for (int i = 0; i < 3; ++i) pthread_join(pipeThreadIdArr[i], NULL);
|
for (int i = 0; i < 3; ++i) pthread_join(pipeThreadIdArr[i], NULL);
|
||||||
|
PROF_G_END(mid_all);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* IO同步的方式进行排序 */
|
/* IO同步的方式进行排序 */
|
||||||
|
|
@ -607,16 +410,23 @@ static void bamSortSerialFirstPipe() {
|
||||||
fclose(fpr);
|
fclose(fpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* 对sam文件进行排序 */
|
||||||
|
static void samSortFirstPipe() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// entry function
|
// entry function
|
||||||
int doSort() {
|
int doSort() {
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
nsgv::gIsBigEndian = ed_is_big();
|
nsgv::gIsBigEndian = ed_is_big();
|
||||||
bamSortFirstPipe();
|
bamSortFirstPipe();
|
||||||
// //bamSortSerialFirstPipe();
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
|
spdlog::info("OneBam size: {}", sizeof(OneBam));
|
||||||
|
//bamSortSerialFirstPipe();
|
||||||
#else
|
#else
|
||||||
|
|
||||||
/* 打开输入bam文件 */
|
/* 打开输入bam文件 */
|
||||||
samFile *inBamFp = sam_open_format(nsgv::gSortArg.INPUT_FILE.c_str(), "r", nullptr);
|
samFile *inBamFp = sam_open_format(nsgv::gSortArg.INPUT_FILE.c_str(), "r", nullptr);
|
||||||
if (!inBamFp) {
|
if (!inBamFp) {
|
||||||
|
|
@ -647,151 +457,9 @@ int doSort() {
|
||||||
}
|
}
|
||||||
sam_close(inBamFp);
|
sam_close(inBamFp);
|
||||||
spdlog::info("max record len: {}", max_bam_len);
|
spdlog::info("max record len: {}", max_bam_len);
|
||||||
return 0;
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// 初始化算法用到的数据结构
|
|
||||||
SortData sortData;
|
|
||||||
//sortData.bamArr.resize(nsgv::gSortArg.NUM_THREADS);
|
|
||||||
//sortData.uData.resize(nsgv::gSortArg.NUM_THREADS);
|
|
||||||
//for (int i = 0; i < nsgv::gSortArg.NUM_THREADS; ++i) {
|
|
||||||
// sortData.uData[i].allocMem(nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS);
|
|
||||||
//}
|
|
||||||
|
|
||||||
sortData.blockItemArr.resize(nsgv::gSortArg.NUM_THREADS);
|
|
||||||
for(auto &arr: sortData.blockItemArr) {
|
|
||||||
// arr.blockArr.resize(1000);
|
|
||||||
}
|
|
||||||
// sortData.bamPtrArr.resize(nsgv::gSortArg.NUM_THREADS);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
nsgv::gIsBigEndian = ed_is_big();
|
|
||||||
|
|
||||||
// 打开文件
|
|
||||||
FILE *fpr = fopen(nsgv::gSortArg.INPUT_FILE.c_str(), "rb");
|
|
||||||
|
|
||||||
// threadRead(fpr); exit(0);
|
|
||||||
|
|
||||||
parseSamHeader(fpr, nsgv::gInHdr);
|
|
||||||
// exit(0);
|
|
||||||
|
|
||||||
// FILE *fpw = fopen(nsgv::gSortArg.OUTPUT_FILE.c_str(), "rb");
|
|
||||||
|
|
||||||
const size_t READ_BUFSIZE = 4L * 1024 * 1024 * nsgv::gSortArg.NUM_THREADS;
|
|
||||||
|
|
||||||
uint8_t *fbuf[5];
|
|
||||||
fbuf[0] = (uint8_t *)malloc(READ_BUFSIZE);
|
|
||||||
fbuf[1] = (uint8_t *)malloc(READ_BUFSIZE);
|
|
||||||
fbuf[2] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
|
|
||||||
fbuf[3] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
|
|
||||||
fbuf[4] = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
|
|
||||||
|
|
||||||
|
|
||||||
uint8_t *curBuf = fbuf[0];
|
|
||||||
uint8_t *preBuf = fbuf[1];
|
|
||||||
uint8_t *preBlock = fbuf[2];
|
|
||||||
uint8_t *curBlock = fbuf[3];
|
|
||||||
uint8_t *halfBlock = fbuf[4];
|
|
||||||
|
|
||||||
size_t fsize = 0;
|
|
||||||
size_t block_num = 0;
|
|
||||||
int round = 0;
|
|
||||||
size_t totalBlocks = 0;
|
|
||||||
|
|
||||||
/*
|
|
||||||
有几种情况
|
|
||||||
第一种:上一个buf里,只剩17字节,那么不能解析下一个block的长度
|
|
||||||
第二种:上一个buf里,完全不剩字节,即上一个buf读入完成时正好卡在下一个block开头
|
|
||||||
第三种:上一个buf里,剩余18字节,只能解析下一个block的长度
|
|
||||||
第四种:上一个buf里,剩余超过18字节,但是没有放下所有block的数据
|
|
||||||
|
|
||||||
简而言之:1. 够解析下一个block的header,不够解析完整的block
|
|
||||||
2. 不够解析下一个block的header。
|
|
||||||
*/
|
|
||||||
size_t readState = 0;
|
|
||||||
size_t curReadPos = 0;
|
|
||||||
int blockLen = 0;
|
|
||||||
vector<uint8_t *> startAddrArr[2];
|
|
||||||
vector<uint8_t *> *curStartAddrArr = &startAddrArr[0];
|
|
||||||
vector<uint8_t *> *preStartAddrArr = &startAddrArr[1];
|
|
||||||
size_t lastBufRemain = 0;
|
|
||||||
pthread_t uncompressTid = 0;
|
|
||||||
pthread_t sortMergeTid = 0;
|
|
||||||
|
|
||||||
PROF_G_BEG(mid_all);
|
|
||||||
|
|
||||||
PROF_G_BEG(read);
|
|
||||||
readState = fread(curBuf, 1, READ_BUFSIZE, fpr);
|
|
||||||
PROF_G_END(read);
|
|
||||||
|
|
||||||
while (readState > 0) {
|
|
||||||
fsize += readState;
|
|
||||||
PROF_G_BEG(parse_block);
|
|
||||||
|
|
||||||
curStartAddrArr->clear();
|
|
||||||
curReadPos = blockLen - lastBufRemain; /* 上一个buf里剩余的block数据 */
|
|
||||||
/* 处理上一个不完整的block */
|
|
||||||
if (lastBufRemain > 0) { // 上一轮有剩余
|
|
||||||
memcpy(curBlock, halfBlock, lastBufRemain);
|
|
||||||
memcpy(&curBlock[lastBufRemain], &curBuf[0], curReadPos); // 将不完整的block剩余数据拷贝到curBlock
|
|
||||||
curStartAddrArr->push_back(&curBlock[0]);
|
|
||||||
}
|
|
||||||
/* 解析读入buf中的文件数据,计算包含的每个block的长度信息和起始地址 */
|
|
||||||
while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */
|
|
||||||
blockLen = unpackInt16(&curBuf[curReadPos + 16]) + 1;
|
|
||||||
if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */
|
|
||||||
curStartAddrArr->push_back(&curBuf[curReadPos]);
|
|
||||||
curReadPos += blockLen;
|
|
||||||
} else {
|
|
||||||
break; /* 当前block数据不完整,一部分在还没读入的file数据里 */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* 如果buf中包含不完整的block数据,先保存一下,放到下一轮里去处理 */
|
|
||||||
lastBufRemain = readState - curReadPos;
|
|
||||||
if (lastBufRemain > 0) {
|
|
||||||
memcpy(halfBlock, &curBuf[curReadPos], lastBufRemain); // 将不完整的block拷贝到halfBlock
|
|
||||||
}
|
|
||||||
// spdlog::info("lastBufRemain: {}", lastBufRemain);
|
|
||||||
|
|
||||||
totalBlocks += curStartAddrArr->size();
|
|
||||||
PROF_G_END(parse_block);
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
// 并行解压
|
|
||||||
PROF_G_BEG(sort);
|
|
||||||
pthread_join(uncompressTid, NULL);
|
|
||||||
PROF_G_END(sort);
|
|
||||||
|
|
||||||
sortData.blockAddrArr = curStartAddrArr;
|
|
||||||
pthread_create(&uncompressTid, NULL, nonBlockingUncompress, &sortData);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// 交换buf指针
|
|
||||||
switchPointer(&curBuf, &preBuf);
|
|
||||||
switchPointer(&curStartAddrArr, &preStartAddrArr);
|
|
||||||
switchPointer(&curBlock, &preBlock);
|
|
||||||
|
|
||||||
PROF_G_BEG(read);
|
|
||||||
readState = fread(curBuf, 1, READ_BUFSIZE, fpr);
|
|
||||||
PROF_G_END(read);
|
|
||||||
}
|
|
||||||
pthread_join(uncompressTid, NULL);
|
|
||||||
PROF_G_END(mid_all);
|
|
||||||
|
|
||||||
|
|
||||||
spdlog::info("total blocks: {}", totalBlocks);
|
|
||||||
fprintf(stderr, "file size: %ld\n\n", fsize);
|
|
||||||
err:
|
|
||||||
free(fbuf[0]);
|
|
||||||
free(fbuf[1]);
|
|
||||||
free(fbuf[2]);
|
|
||||||
free(fbuf[3]);
|
|
||||||
free(fbuf[4]);
|
|
||||||
fclose(fpr);
|
|
||||||
// fclose(fpw);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,20 @@ struct ReadData {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
/* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序*/
|
/* for step-2 parallel uncompress gz blocks,还有缓存满了之后的线程内排序*/
|
||||||
|
struct OneBam {
|
||||||
|
uint16_t bamLen = 0;
|
||||||
|
uint16_t qnameLen = 0;
|
||||||
|
uint32_t tid = 0;
|
||||||
|
char *qnameAddr = 0; // qname的地址
|
||||||
|
uint64_t pos = 0; // mapping 位置
|
||||||
|
uint8_t *addr = 0; // 地址
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ThreadBamArr {
|
||||||
|
vector<OneBam> bamArr;
|
||||||
|
int curIdx = 0; //
|
||||||
|
};
|
||||||
|
|
||||||
struct OneBlock {
|
struct OneBlock {
|
||||||
uint8_t data[SINGLE_BLOCK_SIZE];
|
uint8_t data[SINGLE_BLOCK_SIZE];
|
||||||
uint32_t blockLen = 0; // 解压后的数据长度
|
uint32_t blockLen = 0; // 解压后的数据长度
|
||||||
|
|
@ -52,6 +66,8 @@ struct ThreadBlockArr {
|
||||||
};
|
};
|
||||||
struct UncompressData {
|
struct UncompressData {
|
||||||
vector<ThreadBlockArr> blockItemArr; // 每个thread一个,用来保存解压后的block数据
|
vector<ThreadBlockArr> blockItemArr; // 每个thread一个,用来保存解压后的block数据
|
||||||
|
vector<ThreadBamArr> bamItemArr; // 每个thread一个,用来保存解压后的bam数据
|
||||||
|
|
||||||
ReadData *readDataPtr = nullptr; // 读取数据的指针
|
ReadData *readDataPtr = nullptr; // 读取数据的指针
|
||||||
|
|
||||||
UncompressData() { }
|
UncompressData() { }
|
||||||
|
|
@ -59,12 +75,14 @@ struct UncompressData {
|
||||||
UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); }
|
UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); }
|
||||||
void Resize(int numThread) {
|
void Resize(int numThread) {
|
||||||
blockItemArr.resize(numThread);
|
blockItemArr.resize(numThread);
|
||||||
|
bamItemArr.resize(numThread);
|
||||||
for (int i = 0; i < numThread; ++i) {
|
for (int i = 0; i < numThread; ++i) {
|
||||||
blockItemArr[i].blockArr.reserve(128);
|
blockItemArr[i].blockArr.reserve(128);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void Resize(int numThread, int vecInitSize) {
|
void Resize(int numThread, int vecInitSize) {
|
||||||
blockItemArr.resize(numThread);
|
blockItemArr.resize(numThread);
|
||||||
|
bamItemArr.resize(numThread);
|
||||||
for (int i = 0; i < numThread; ++i) {
|
for (int i = 0; i < numThread; ++i) {
|
||||||
blockItemArr[i].blockArr.reserve(vecInitSize);
|
blockItemArr[i].blockArr.reserve(vecInitSize);
|
||||||
}
|
}
|
||||||
|
|
@ -74,6 +92,12 @@ struct UncompressData {
|
||||||
blockItemArr[i].curIdx = 0;
|
blockItemArr[i].curIdx = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
void ResetBamArr() {
|
||||||
|
for (int i = 0; i < bamItemArr.size(); ++i) {
|
||||||
|
// bamItemArr[i].curIdx = 0;
|
||||||
|
bamItemArr[i].bamArr.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
/* block 排序堆 */
|
/* block 排序堆 */
|
||||||
struct BlockArrIdIdx {
|
struct BlockArrIdIdx {
|
||||||
|
|
@ -146,21 +170,12 @@ struct BlockHeap {
|
||||||
};
|
};
|
||||||
|
|
||||||
/* for step-3 serial merge blocks and sort them */
|
/* for step-3 serial merge blocks and sort them */
|
||||||
struct BamPointer {
|
|
||||||
uint64_t offset = 0; // 地址偏移量
|
|
||||||
uint32_t bamLen = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct BamPtrArr {
|
|
||||||
vector<BamPointer> bamArr;
|
|
||||||
int curIdx = 0; //
|
|
||||||
};
|
|
||||||
|
|
||||||
struct MergeSortData {
|
struct MergeSortData {
|
||||||
DataBuffer bamData; // 用来保存解压后的数据
|
DataBuffer bamData; // 用来保存解压后的数据
|
||||||
BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度
|
// BamPtrArr bamPtrArr; // 每个bam对应的解压数据,起始地址和长度
|
||||||
MergeSortData() {
|
MergeSortData() {
|
||||||
bamPtrArr.bamArr.reserve(128);
|
// bamPtrArr.bamArr.reserve(128);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue