线程内排序,线程间merge,线程内压缩,还没输出

This commit is contained in:
zzh 2025-12-16 23:44:49 +08:00
parent ab7e3989d0
commit afe6abc857
11 changed files with 533 additions and 64 deletions

View File

@ -4,7 +4,7 @@ set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
# set(CMAKE_BUILD_TYPE Debug)
# set(CMAKE_BUILD_TYPE Release)
set(CMAKE_BUILD_TYPE Release)
add_definitions(-DSHOW_PERF=1)
add_definitions(-DSPDLOG_COMPILED_LIB)
add_subdirectory(src)

View File

@ -78,11 +78,11 @@ int main(int argc, char *argv[]) {
.help("Set compression level, from 0 (uncompressed) to 9 (best).")
.nargs(1);
program.add_argument("-m", "--max-mem")
.help("approximate total memory limit for all threads (by default 2GB)")
.default_value(std::string("2G"))
program.add_argument("-m", "--mem")
.help("approximate total memory limit for all threads (by default 4GB)")
.default_value(std::string("4G"))
.nargs(1)
.metavar("<MAX_MEM>");
.metavar("<MEM>");
program.add_argument("--sort-type")
.help("\nUse coordinate sort or query name sort. Possible values: {COORDINATE, NAME}")
@ -144,6 +144,18 @@ int main(int argc, char *argv[]) {
{"PICARD", nsmd::QueryNameType::PICARD}};
nsgv::gSortArg.QUERY_NAME_TYPE = query_name_args[program.get("--query-name")];
string maxMemStr = program.get("--mem");
{
char* q;
uint64_t maxMem = strtol(maxMemStr.c_str(), &q, 0);
if (*q == 'k' || *q == 'K')
maxMem <<= 10;
else if (*q == 'm' || *q == 'M')
maxMem <<= 20;
else if (*q == 'g' || *q == 'G')
maxMem <<= 30;
nsgv::gSortArg.MAX_MEM = maxMem;
}
} catch (const std::exception &err) {
spdlog::error(err.what());
@ -155,7 +167,7 @@ int main(int argc, char *argv[]) {
doSort();
spdlog::info("fast bam sort end");
displayProfiling(1);
displayProfiling(nsgv::gSortArg.NUM_THREADS);
return 0;
}

View File

@ -3,7 +3,9 @@
#include <stdlib.h>
// 单个block gz的最大解压大小
#define BGZF_MAX_BLOCK_SIZE 0x10000
#define SINGLE_BLOCK_SIZE 65535
#define BLOCK_MEM_FACTOR 1.2
// bam header字节数量
#ifndef BLOCK_HEADER_LENGTH

View File

@ -13,6 +13,8 @@ namespace nsgv {
extern bool gIsBigEndian;
};
static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
size_t fileSize(FILE *fp) {
spdlog::info("test file start");
size_t bufSize = 64L * 1024 * 1024;
@ -26,6 +28,74 @@ size_t fileSize(FILE *fp) {
return fileSize;
}
static inline void packInt16(uint8_t* buffer, uint16_t value) {
buffer[0] = value;
buffer[1] = value >> 8;
}
static inline void packInt32(uint8_t* buffer, uint32_t value) {
buffer[0] = value;
buffer[1] = value >> 8;
buffer[2] = value >> 16;
buffer[3] = value >> 24;
}
int bgzfCompress(void* _dst, size_t* dlen, const void* src, size_t slen, int level) {
if (slen == 0) {
// EOF block
if (*dlen < 28)
return -1;
memcpy(_dst, "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", 28);
*dlen = 28;
return 0;
}
uint8_t* dst = (uint8_t*)_dst;
if (level == 0) {
// Uncompressed data
if (*dlen < slen + 5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH)
return -1;
dst[BLOCK_HEADER_LENGTH] = 1; // BFINAL=1, BTYPE=00; see RFC1951
u16_to_le(slen, &dst[BLOCK_HEADER_LENGTH + 1]); // length
u16_to_le(~slen, &dst[BLOCK_HEADER_LENGTH + 3]); // ones-complement length
memcpy(dst + BLOCK_HEADER_LENGTH + 5, src, slen);
*dlen = slen + 5 + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
} else {
level = level > 0 ? level : 6; // libdeflate doesn't honour -1 as default
// NB levels go up to 12 here.
int lvl_map[] = {0, 1, 2, 3, 5, 6, 7, 8, 10, 12};
level = lvl_map[level > 9 ? 9 : level];
struct libdeflate_compressor* z = libdeflate_alloc_compressor(level);
if (!z)
return -1;
// Raw deflate
size_t clen = libdeflate_deflate_compress(z, src, slen, dst + BLOCK_HEADER_LENGTH, *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH);
if (clen <= 0) {
hts_log_error("Call to libdeflate_deflate_compress failed");
libdeflate_free_compressor(z);
return -1;
}
*dlen = clen + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
libdeflate_free_compressor(z);
}
// write the header
memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
// write the footer
uint32_t crc = libdeflate_crc32(0, src, slen);
packInt32((uint8_t*)&dst[*dlen - 8], crc);
packInt32((uint8_t*)&dst[*dlen - 4], slen);
return 0;
}
int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen, uint32_t expected_crc) {
struct libdeflate_decompressor *z = libdeflate_alloc_decompressor();
if (!z) {

View File

@ -48,4 +48,5 @@ static int unpackInt16(const uint8_t *buffer) { return buffer[0] | buffer[1] <<
void parseSamHeader(FILE *fp, HeaderBuf &hdrBuf);
int bgzfUncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen, uint32_t expected_crc);
int bgzfCompress(void* _dst, size_t* dlen, const void* src, size_t slen, int level);
int bgzfUncompress(uint8_t* dst, size_t* dlen, const uint8_t* src, size_t slen, uint32_t expected_crc);

View File

@ -12,13 +12,14 @@
#include <sys/mman.h>
#include <unistd.h>
#include <zlib.h>
#include <string>
#include <algorithm>
#include <string>
#include "const_val.h"
#include "sam_io.h"
#include "sort_args.h"
#include "const_val.h"
#include "sort_impl.h"
#include "util/profiling.h"
#include "util/yarn.h"
@ -131,19 +132,87 @@ static void *firstPipeReadFile(void *data) {
return nullptr;
}
static int parseBam(uint8_t* addr, bam1_t* b) {
bam1_core_t* c = &b->core;
int32_t block_len, ret, i;
uint32_t new_l_data;
uint8_t tmp[32], *x;
b->l_data = 0;
memcpy(&block_len, addr, 4);
if (nsgv::gIsBigEndian)
ed_swap_4p(&block_len);
if (block_len < 32)
return -4; // block_len includes core data
x = addr + 4;
c->tid = le_to_u32(x);
c->pos = le_to_i32(x + 4);
uint32_t x2 = le_to_u32(x + 8);
c->bin = x2 >> 16;
c->qual = x2 >> 8 & 0xff;
c->l_qname = x2 & 0xff;
c->l_extranul = (c->l_qname % 4 != 0) ? (4 - c->l_qname % 4) : 0;
uint32_t x3 = le_to_u32(x + 12);
c->flag = x3 >> 16;
c->n_cigar = x3 & 0xffff;
c->l_qseq = le_to_u32(x + 16);
c->mtid = le_to_u32(x + 20);
c->mpos = le_to_i32(x + 24);
c->isize = le_to_i32(x + 28);
/*
new_l_data = block_len - 32 + c->l_extranul;
if (new_l_data > INT_MAX || c->l_qseq < 0 || c->l_qname < 1)
return -4;
if (((uint64_t)c->n_cigar << 2) + c->l_qname + c->l_extranul + (((uint64_t)c->l_qseq + 1) >> 1) + c->l_qseq > (uint64_t)new_l_data)
return -4;
if (realloc_bam_data(b, new_l_data) < 0)
return -4;
b->l_data = new_l_data;
if (bgzf_read_small(fp, b->data, c->l_qname) != c->l_qname)
return -4;
if (b->data[c->l_qname - 1] != '\0') { // try to fix missing nul termination
if (fixup_missing_qname_nul(b) < 0)
return -4;
}
for (i = 0; i < c->l_extranul; ++i) b->data[c->l_qname + i] = '\0';
c->l_qname += c->l_extranul;
if (b->l_data < c->l_qname || bgzf_read_small(fp, b->data + c->l_qname, b->l_data - c->l_qname) != b->l_data - c->l_qname)
return -4;
if (fp->is_be)
swap_data(c, b->l_data, b->data, 0);
if (bam_tag2cigar(b, 0, 0) < 0)
return -4;
// TODO: consider making this conditional
if (c->n_cigar > 0) { // recompute "bin" and check CIGAR-qlen consistency
hts_pos_t rlen, qlen;
bam_cigar2rqlens(c->n_cigar, bam_get_cigar(b), &rlen, &qlen);
if ((b->core.flag & BAM_FUNMAP) || rlen == 0)
rlen = 1;
b->core.bin = hts_reg2bin(b->core.pos, b->core.pos + rlen, 14, 5);
// Sanity check for broken CIGAR alignments
if (c->l_qseq > 0 && !(c->flag & BAM_FUNMAP) && qlen != c->l_qseq) {
hts_log_error("CIGAR and query sequence lengths differ for %s", bam_get_qname(b));
return -4;
}
}
*/
return 4 + block_len;
}
// multi-thread uncompress bam blocks
static void mtUncompressBlock(void *data, long idx, int tid) {
UncompressData &p = *(UncompressData *)data;
PROF_T_BEG(mem_copy);
UncompressData& p = *(UncompressData*)data;
ReadData &readData = *p.readDataPtr;
auto &blockItemArr = p.blockItemArr[tid];
if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) {
blockItemArr.blockArr.push_back(OneBlock());
}
auto &bamItemArr = p.bamItemArr[tid];
auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
auto &blockItem = blockItemArr.add();
uint8_t *block = readData.startAddrArr[idx];
size_t dlen = SINGLE_BLOCK_SIZE; // 65535
@ -160,44 +229,53 @@ static void mtUncompressBlock(void *data, long idx, int tid) {
uint32_t nextBamStart = 0;
uint32_t bamLen = 0;
uint32_t bamNum = 0;
PROF_G_BEG(mem_copy);
/* 解析每个bam */
while (nextBamStart + 4 <= blockItem.blockLen) {
bamItemArr.bamArr.push_back(OneBam());
OneBam &bam = bamItemArr.bamArr.back();
OneBam& bam = bamItemArr.add();
bam.addr = &blockItem.data[nextBamStart];
bam.blockThread = &blockItemArr;
bam.blockIdx = blockItemArr.curIdx - 1;
bam.offset = nextBamStart;
uint8_t *curAddr = &blockItem.data[nextBamStart];
#if 0
nextBamStart += parseBam(curAddr, &bam.b);
++bamNum;
#else
memcpy(&bamLen, curAddr, 4);
curAddr += 4;
if (nsgv::gIsBigEndian) ed_swap_4p(&bamLen);
bam.bamLen = bamLen;
bam.bamLen = bamLen;
bam.tid = le_to_u32(curAddr);
bam.pos = le_to_i32(curAddr + 4);
uint32_t x2 = le_to_u32(curAddr + 8);
bam.qnameLen = x2 & 0xff;
bam.qnameAddr = (char *)(curAddr + 32);
// spdlog::info("bam len: {}", bamLen);
// spdlog::info("bam len: {}, bam struct len: {}", bamLen, sizeof(OneBam));
// spdlog::info("bam name: {}, bam name len: {}", string((char*)(blockItem.data + bam.offset + OneBam::QnameOffset), bam.qnameLen), bam.qnameLen);
nextBamStart += 4 + bamLen;
++bamNum;
#endif
}
// spdlog::info("bam num: {}", bamNum);
if (nextBamStart != blockItem.blockLen) {
spdlog::error("Block content does not contain integer number of bam records!");
exit(0);
}
blockItem.bamNum = bamNum;
PROF_G_END(mem_copy);
// 判断是否超过内存阈值
if (blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE >= nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS) {
if (blockItemArr.curIdx * SINGLE_BLOCK_SIZE * BLOCK_MEM_FACTOR >= p.father->singleThreadBytes) {
p.father->uncompressBufFull = 1;
} else {
// spdlog::info("block arr: {}, single thread: {}", blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE, p.father->singleThreadBytes);
}
PROF_T_END(tid, mem_copy);
}
// multi-thread uncompress bam blocks
static void mtUncompressBlockBatch(void* data, long idx, int tid) {
PROF_T_BEG(mem_copy);
UncompressData& p = *(UncompressData*)data;
ReadData& readData = *p.readDataPtr;
@ -205,10 +283,9 @@ static void mtUncompressBlockBatch(void* data, long idx, int tid) {
int stopIdx = STOP_IDX(idx, nsgv::gSortArg.NUM_THREADS, readData.startAddrArr.size());
auto& blockItemArr = p.blockItemArr[tid];
auto& bamItemArr = p.bamItemArr[tid];
if (blockItemArr.curIdx + (stopIdx - startIdx) >= blockItemArr.blockArr.size()) {
blockItemArr.blockArr.resize(blockItemArr.curIdx + (stopIdx - startIdx));
}
blockItemArr.add(stopIdx - startIdx);
for (int i = startIdx; i < stopIdx; ++i) {
auto& blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
@ -228,18 +305,98 @@ static void mtUncompressBlockBatch(void* data, long idx, int tid) {
uint32_t nextBamStart = 0;
uint32_t bamLen = 0;
uint32_t bamNum = 0;
PROF_G_BEG(mem_copy);
// memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen);
/* 解析每个bam */
while (nextBamStart + 4 <= blockItem.blockLen) {
memcpy(&bamLen, &blockItem.data[nextBamStart], 4);
bamItemArr.bamArr.push_back(OneBam());
OneBam& bam = bamItemArr.bamArr.back();
//bam.addr = &blockItem.data[nextBamStart];
uint8_t* curAddr = &blockItem.data[nextBamStart];
memcpy(&bamLen, curAddr, 4);
curAddr += 4;
if (nsgv::gIsBigEndian)
ed_swap_4p(&bamLen);
bam.bamLen = bamLen;
bam.tid = le_to_u32(curAddr);
bam.pos = le_to_i32(curAddr + 4);
uint32_t x2 = le_to_u32(curAddr + 8);
bam.qnameLen = x2 & 0xff;
//bam.qnameAddr = (char*)(curAddr + 32);
// spdlog::info("bam len: {}, bam struct len: {}", bamLen, sizeof(OneBam));
nextBamStart += 4 + bamLen;
// spdlog::info("bam len: {}", bamLen);
++bamNum;
}
if (nextBamStart != blockItem.blockLen) {
spdlog::error("Block content does not contain integer number of bam records!");
exit(0);
}
blockItem.bamNum = bamNum;
PROF_G_END(mem_copy);
}
// 判断是否超过内存阈值
if (blockItemArr.curIdx * SINGLE_BLOCK_SIZE >= p.father->singleThreadBytes) {
p.father->uncompressBufFull = 1;
} else {
// spdlog::info("block arr: {}, single thread: {}", blockItemArr.blockArr.size() * SINGLE_BLOCK_SIZE, p.father->singleThreadBytes);
}
PROF_T_END(tid, mem_copy);
}
// 线程内排序
static void mtInThreadSort(void* data, long idx, int tid) {
UncompressData& p = *(UncompressData*)data;
auto& bamItemArr = p.bamItemArr[tid];
auto& arr = bamItemArr.bamArr;
// 先按照坐标排序
#if 0
if (tid == 0)
spdlog::info("Before: {} {} {} {} {} {} {} {} {} {}", arr[0].pos, arr[1].pos, arr[2].pos, arr[3].pos, arr[4].pos, arr[5].pos, arr[6].pos,
arr[7].pos, arr[8].pos, arr[9].pos);
#endif
if (nsgv::gSortArg.SORT_COORIDINATE) {
std::sort(arr.begin(), arr.begin() + bamItemArr.curIdx, [](const OneBam& b1, const OneBam& b2) { return b1.pos < b2.pos; });
} else if (nsgv::gSortArg.QUERY_NAME_TYPE == nsmd::QueryNameType::PICARD) {
std::sort(arr.begin(), arr.begin() + bamItemArr.curIdx, [](const OneBam& b1, const OneBam& b2) {
int cmp = 0;
strncmp((char*)(b1.blockThread->blockArr[b1.blockIdx].data + b1.offset + OneBam::QnameOffset),
(char*)(b2.blockThread->blockArr[b2.blockIdx].data + b2.offset + OneBam::QnameOffset), std::min(b1.qnameLen, b2.qnameLen));
if (cmp == 0)
return b1.qnameLen < b2.qnameLen;
return cmp < 0;
});
}
#if 0
if (tid == 0)
spdlog::info("After: {} {} {} {} {} {} {} {} {} {}", arr[0].pos, arr[1].pos, arr[2].pos, arr[3].pos, arr[4].pos, arr[5].pos, arr[6].pos,
arr[7].pos, arr[8].pos, arr[9].pos);
#endif
}
// 多线程压缩
static void mtCompressBlock(void* data, long idx, int tid) {
FirstPipeArg &p = *(FirstPipeArg *)data;
auto& barr = p.sortedBamArr;
auto& t = p.taskArr[idx];
auto& buf = p.threadBuf[tid];
uint8_t block[SINGLE_BLOCK_SIZE] = {0};
uint8_t compressBlock[SINGLE_BLOCK_SIZE] = {0};
int curAddr = 0;
for (int i = t.idx; i < t.idx + t.num; ++i) {
// copy data
memccpy(&block[curAddr], barr[i]->blockThread->blockArr[barr[i]->blockIdx].data + barr[i]->offset, 1, barr[i]->bamLen);
curAddr += barr[i]->bamLen;
}
size_t dlen = BGZF_MAX_BLOCK_SIZE;
// spdlog::info("len: {}", curAddr);
bgzfCompress(block, &dlen, compressBlock, curAddr, -1);
}
/* 将gz block进行解压并进行线程内排序 */
@ -249,23 +406,78 @@ static void doFirstPipeUncompress(FirstPipeArg &p) {
ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM];
UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM];
uncompressData.readDataPtr = &readData;
uncompressData.ResetBlockArr();
uncompressData.ResetBamArr();
#if 1
kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size());
#else
kt_for(p.numThread, mtUncompressBlockBatch, &uncompressData, nsgv::gSortArg.NUM_THREADS);
#endif
//spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx,
// uncompressData.blockItemArr[0].blockArr.size());
PROF_G_END(uncompress);
// 判断是否超过内存阈值
if (p.uncompressBufFull == 1) {
spdlog::info("buf full - {}", p.mergOrder);
// spdlog::info("max mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, p.singleThreadBytes);
// sort
PROF_G_BEG(sort);
kt_for(p.numThread, mtInThreadSort, &uncompressData, nsgv::gSortArg.NUM_THREADS);
PROF_G_END(sort);
// merge
#if 1
auto& bamArr = p.sortedBamArr;
BamHeap<BamGreaterThan> heap;
heap.Init(&uncompressData.bamItemArr);
bamArr.resize(heap.Size());
auto& taskArr = p.taskArr;
taskArr.clear();
const OneBam* bam = nullptr;
uint64_t posAll = 0;
int i = 0;
ArrayInterval intv{0, 0};
uint64_t blockBytes = 0;
PROF_G_BEG(merge);
while ((bam = heap.Pop()) != nullptr) {
// posAll += bam->pos;
//spdlog::info("pos: {}", bam->pos);
bamArr[i++] = bam;
blockBytes += bam->bamLen;
if (blockBytes >= SINGLE_BLOCK_SIZE) {
intv.num = i - 1 - intv.idx;
taskArr.push_back(intv);
intv.idx = i - 1;
blockBytes = bam->bamLen;
}
}
PROF_G_END(merge);
spdlog::info("task size: {} {}", taskArr.size(), bamArr.size());
if (i - 1 > intv.idx) {
intv.num = i - 1 - intv.idx;
taskArr.push_back(intv);
}
// spdlog::info("pos all: {}", posAll);
#endif
// compress
PROF_G_BEG(compress);
kt_for(p.numThread, mtCompressBlock, &p, taskArr.size());
uncompressData.ResetBlockArr();
uncompressData.ResetBamArr();
p.uncompressBufFull = 0;
p.mergOrder++;
for(auto &buf : p.threadBuf) {
buf.curLen = 0;
}
PROF_G_END(compress);
}
// auto &bam = uncompressData.bamItemArr[0].bamArr.back();
// string qname(bam.qnameAddr, bam.qnameLen);
// spdlog::info("bam name:{}", qname);
PROF_G_END(uncompress);
}
/* FirstPipe step-2 并行解压gz blocks*/
@ -279,7 +491,6 @@ static void *firstPipeUncompress(void *data) {
// self dependency
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
PROF_G_BEG(uncompress);
if (p.readFinish) {
while (p.uncompressOrder < p.readOrder) {
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
@ -291,7 +502,6 @@ static void *firstPipeUncompress(void *data) {
}
doFirstPipeUncompress(p);
PROF_G_END(uncompress);
// update status
yarn::CONSUME_SIGNAL(p.readSig);
@ -316,12 +526,12 @@ void doFirstPipeMergeSort(FirstPipeArg &p) {
size_t curBlockBamMemSize = blockNum * SINGLE_BLOCK_SIZE + bamNum * 32;
/* 内存使用量达到阈值后,进行排序 */
if (curBlockBamMemSize > nsgv::gSortArg.MAX_MEM / 2) {
uncompressData.ResetBlockArr();
uncompressData.ResetBamArr();
//uncompressData.ResetBlockArr();
//uncompressData.ResetBamArr();
}
//spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockBytes, bamNum * 32);
spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockNum * SINGLE_BLOCK_SIZE, bamNum * 32);
// spdlog::info("block num: {}, bam num: {}, block size: {}, bam size: {}", blockNum, bamNum, blockNum * SINGLE_BLOCK_SIZE, bamNum * 32);
PROF_G_END(sort);
}
@ -359,12 +569,18 @@ static void bamSortFirstPipe() {
/* set up*/
FirstPipeArg firstPipeArg;
firstPipeArg.numThread = nsgv::gSortArg.NUM_THREADS;
firstPipeArg.singleThreadBytes = nsgv::gSortArg.MAX_MEM / firstPipeArg.numThread;
spdlog::info("max mem: {}, single thread mem: {}", nsgv::gSortArg.MAX_MEM, firstPipeArg.singleThreadBytes);
const size_t kReadBufSize = 1L * 1024 * 1024 * firstPipeArg.numThread;
for (int i = 0; i<firstPipeArg.READ_BUF_NUM; ++i) {
firstPipeArg.readData[i].Resize(kReadBufSize);
}
// 根据最大内存参数,计算初始化开辟的空间
int blockInitNum = nsgv::gSortArg.MAX_MEM / nsgv::gSortArg.NUM_THREADS / SINGLE_BLOCK_SIZE / BLOCK_MEM_FACTOR;
spdlog::info("Init block num: {}", blockInitNum);
for (int i = 0; i < firstPipeArg.UNCOMPRESS_BUF_NUM; ++i) {
firstPipeArg.uncompressData[i].Resize(firstPipeArg.numThread, 128);
firstPipeArg.uncompressData[i].Resize(firstPipeArg.numThread, blockInitNum);
}
PROF_G_BEG(mid_all);
@ -424,7 +640,7 @@ int doSort() {
bamSortFirstPipe();
spdlog::info("OneBam size: {}", sizeof(OneBam));
//bamSortSerialFirstPipe();
// bamSortSerialFirstPipe();
#else
/* 打开输入bam文件 */

View File

@ -18,7 +18,7 @@ using std::vector;
/* for step-1 read data from bam file */
struct ReadData {
uint8_t *dataBuf = nullptr;
uint8_t *blockBuf = nullptr;
uint8_t *blockBuf = nullptr; // 用来保存上一轮没能完整读取的block
int readBufSize = 0; // 读入的buf大小
vector<uint8_t *> startAddrArr; // 存放每个block的起始地址
ReadData() { }
@ -39,19 +39,49 @@ struct ReadData {
blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
}
};
/* for step-2 parallel uncompress gz blocks还有缓存满了之后的线程内排序*/
class ThreadBlockArr;
/* for step-2 parallel uncompress gz blocks还有缓存满了之后的线程内排序用来排序的*/
struct OneBam {
// uint16_t bamLen = 0;
// uint16_t qnameLen = 0;
// uint32_t tid = 0;
// char *qnameAddr = 0; // qname的地址
// uint64_t pos = 0; // mapping 位置
// uint8_t *addr = 0; // 地址
static constexpr int QnameOffset = 36; // 距离该sam记录开头地址的偏移量
int blockIdx = 0; // 在哪个block里
uint16_t bamLen = 0;
uint16_t qnameLen = 0;
uint32_t tid = 0;
char *qnameAddr = 0; // qname的地址
uint64_t pos = 0; // mapping 位置
uint8_t *addr = 0; // 地址
uint16_t qnameLen; // 序列名字长度
uint32_t offset = 0; // 距离首地址的偏移量
uint32_t tid = 0; // 比对到的染色体
uint64_t pos = 0; // mapping 位置
ThreadBlockArr* blockThread;
// for test
// bam1_t b;
};
struct ThreadBamArr {
vector<OneBam> bamArr;
int curIdx = 0; //
OneBam &add() {
if (curIdx < bamArr.size())
return bamArr[curIdx++];
else {
#if 0
bamArr.resize((bamArr.size() + 1) << 1);
return bamArr[curIdx++];
#else
bamArr.push_back(OneBam());
curIdx++;
return bamArr.back();
#endif
}
}
void clear() { curIdx = 0; }
};
struct OneBlock {
@ -63,39 +93,62 @@ struct OneBlock {
struct ThreadBlockArr {
vector<OneBlock> blockArr; // 解压后的数据
int curIdx = 0; // 当前解压数据对应的vector的索引
OneBlock& add() {
if (curIdx < blockArr.size())
return blockArr[curIdx++];
else {
#if 0
blockArr.resize((blockArr.size() + 1) << 1);
return blockArr[curIdx++];
#else
blockArr.push_back(OneBlock());
curIdx++;
return blockArr.back();
#endif
}
}
void add(int num) {
if (curIdx + num > blockArr.size()) {
blockArr.resize(curIdx + num);
}
}
void clear() { curIdx = 0; }
};
class FirstPipeArg;
// 第一阶段的解压、排序、归并、压缩
struct UncompressData {
vector<ThreadBlockArr> blockItemArr; // 每个thread一个用来保存解压后的block数据
vector<ThreadBamArr> bamItemArr; // 每个thread一个用来保存解压后的bam数据
ReadData *readDataPtr = nullptr; // 读取数据的指针
FirstPipeArg* father = nullptr; // 保存父参数的指针,可能更新一些状态
UncompressData() { }
UncompressData(int numThread) { Resize(numThread); }
UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); }
void Resize(int numThread) {
blockItemArr.resize(numThread);
bamItemArr.resize(numThread);
for (int i = 0; i < numThread; ++i) {
blockItemArr[i].blockArr.reserve(128);
}
Resize(numThread, 128);
}
void Resize(int numThread, int vecInitSize) {
blockItemArr.resize(numThread);
bamItemArr.resize(numThread);
for (int i = 0; i < numThread; ++i) {
blockItemArr[i].blockArr.reserve(vecInitSize);
bamItemArr[i].bamArr.reserve(vecInitSize * 192);
}
}
void ResetBlockArr() {
for (int i = 0; i < blockItemArr.size(); ++i) {
blockItemArr[i].curIdx = 0;
blockItemArr[i].clear();
}
}
void ResetBamArr() {
for (int i = 0; i < bamItemArr.size(); ++i) {
// bamItemArr[i].curIdx = 0;
bamItemArr[i].bamArr.clear();
bamItemArr[i].clear();
}
}
};
@ -111,6 +164,7 @@ struct BlockGreaterThan{
return a.block->blockId > b.block->blockId;
}
};
/* 用来排序 */
struct BlockHeap {
vector<ThreadBlockArr> *arr2d;
@ -169,6 +223,69 @@ struct BlockHeap {
}
};
/* bam 排序堆 */
struct BamArrIdIdx {
int arrId = 0;
size_t arrIdx = 0; // 下一个待读入数据的idx
const OneBam* bam = nullptr;
};
struct BamGreaterThan {
bool operator()(const BamArrIdIdx& a, const BamArrIdIdx& b) const { return a.bam->pos > b.bam->pos; }
};
/* 用来排序 bam*/
template<class GreaterThan>
struct BamHeap {
vector<ThreadBamArr>* arr2d;
priority_queue<BamArrIdIdx, vector<BamArrIdIdx>, GreaterThan> minHeap;
size_t popNum = 0;
int Init(vector<ThreadBamArr>* _arr2d) {
arr2d = _arr2d;
if (arr2d == nullptr) {
return -1;
}
for (int i = 0; i < arr2d->size(); ++i) {
auto& v = (*arr2d)[i];
if (v.curIdx > 0) {
minHeap.push({i, 1, &v.bamArr[0]});
}
}
return 0;
}
const OneBam* Pop() {
const OneBam* ret = nullptr;
if (!minHeap.empty()) {
auto minVal = minHeap.top();
minHeap.pop();
++popNum;
ret = minVal.bam;
auto& v = (*arr2d)[minVal.arrId];
if (v.curIdx > minVal.arrIdx) {
minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v.bamArr[minVal.arrIdx]});
}
}
return ret;
}
size_t Size() {
size_t len = 0;
if (arr2d != nullptr) {
for (auto& v : *arr2d) {
len += v.curIdx;
}
}
return len - popNum;
}
};
struct ArrayInterval {
int idx;
int num;
};
/* for step-3 serial merge blocks and sort them */
struct MergeSortData {
@ -182,13 +299,16 @@ struct MergeSortData {
/* 第一阶段的多线程流水线参数 */
struct FirstPipeArg {
static const int READ_BUF_NUM = 2; // 读入的buf数量
static const int UNCOMPRESS_BUF_NUM = 2; // 解压的buf数量
static const int UNCOMPRESS_BUF_NUM = 1; // 解压的buf数量
static const int COMPRESS_BUF_NUM = 2; // merge之后每个线程缓冲区的数量
int numThread = 0; // 线程数
uint64_t singleThreadBytes = 0; // 单线程开辟的内存字节上限
uint64_t readOrder = 0; // 读取文件
uint64_t uncompressOrder = 0; // 并行解压gz block
uint64_t mergeSortOrder = 0; // 串行合并解压后的blocks并解析每个bam的长度达到内存阈值后并行排序
uint64_t mergOrder = 0; // 用来给中间文件编号
volatile int readFinish = 0;
volatile int uncompressFinish = 0;
@ -199,12 +319,18 @@ struct FirstPipeArg {
ReadData readData[READ_BUF_NUM];
UncompressData uncompressData[UNCOMPRESS_BUF_NUM];
vector<const OneBam*> sortedBamArr;
vector<ArrayInterval> taskArr;
vector<DataBuffer> threadBuf;
MergeSortData mergeSortData;
FirstPipeArg()
{
readSig = yarn::NEW_LOCK(0);
uncompressSig = yarn::NEW_LOCK(0);
for (int i = 0; i < UNCOMPRESS_BUF_NUM; ++i) {
uncompressData[i].father = this;
}
}
};

View File

@ -5,3 +5,37 @@
#include "sort_impl.h"
int ks_radixsort(size_t n, OneBam* buf, const sam_hdr_t* h, int tid) {
if (tid == 0) spdlog::info("sort in thread, {} {}", tid, n);
int curr = 0, ret = -1;
ssize_t i;
OneBam *buf_ar2[2], *bam_a, *bam_b;
uint64_t max_pos = 1;
uint32_t max_tid = 1, tid_bytes = 0, pos_bytes = 0, byte = 0;
uint32_t tid_shift_l, tid_shift_r;
int nref = sam_hdr_nref(h);
//if (tid == 0)
//spdlog::info("Before: {} {} {} {} {} {} {} {} {} {}", buf[0].pos, buf[1].pos, buf[2].pos, buf[3].pos, buf[4].pos, buf[5].pos, buf[6].pos,
//buf[7].pos, buf[8].pos, buf[9].pos);
// spdlog::info("name: {}", std::string(buf[0].qnameAddr, buf[0].qnameLen));
// spdlog::info("name addr: {}", (uint64_t)buf[0].qnameLen);
// std::sort(buf, buf + n, [](const OneBam& b1, const OneBam& b2) { return b1.pos < b2.pos; });
std::sort(buf, buf + n, [](const OneBam& b1, const OneBam& b2) {
int cmp = 0;
strncmp((char*)(b1.blockThread->blockArr[b1.blockIdx].data + b1.offset + OneBam::QnameOffset),
(char*)(b2.blockThread->blockArr[b2.blockIdx].data + b2.offset + OneBam::QnameOffset), std::min(b1.qnameLen, b2.qnameLen));
if (cmp == 0)
return b1.qnameLen < b2.qnameLen;
return cmp < 0;
});
if (tid == 0) {
//std::string s1 = (char*)(buf[0].blockThread->blockArr[buf[0].blockIdx].data + buf[0].offset + OneBam::QnameOffset);
//std::string s2 = (char*)(buf[1].blockThread->blockArr[buf[1].blockIdx].data + buf[1].offset + OneBam::QnameOffset);
//spdlog::info("After: {} {} {} {} {} {} {} {} {} {}", s1, s2, buf[2].pos, buf[3].pos, buf[4].pos, buf[5].pos, buf[6].pos, buf[7].pos,
//buf[8].pos, buf[9].pos);
}
return 0;
}

View File

@ -3,6 +3,8 @@
#include <htslib/sam.h>
#include <stdio.h>
#include "sort.h"
// Struct which contains the sorting key for TemplateCoordinate sort.
struct TemplateCoordinateKey {
int tid1;
@ -27,4 +29,6 @@ struct BamSortTag {
uint8_t pos_tid[12];
TemplateCoordinateKey *key;
} u;
};
};
int ks_radixsort(size_t n, OneBam* buf, const sam_hdr_t* h, int tid);

View File

@ -57,6 +57,8 @@ int displayProfiling(int nthread) {
PRINT_GP(parse_block);
PRINT_GP(uncompress);
PRINT_GP(sort);
PRINT_GP(merge);
PRINT_GP(compress);
PRINT_GP(write_mid);
PRINT_GP(read_mid);
PRINT_GP(write_final);

View File

@ -52,6 +52,8 @@ enum {
GP_parse_block,
GP_uncompress,
GP_sort,
GP_merge,
GP_compress,
GP_write_mid,
GP_read_mid,
GP_write_final