添加了并行流水线框架

This commit is contained in:
zzh 2025-04-25 15:34:49 +08:00
parent 7ee5241029
commit dbe57f345c
6 changed files with 1076 additions and 96 deletions

2
.gitignore vendored
View File

@ -1,6 +1,8 @@
# ---> C++
# Prerequisites
*.d
*.bak
*.log
/test
/.vscode
/build

View File

@ -15,6 +15,11 @@ struct DataBuffer {
maxLen = 0;
data = nullptr;
}
DataBuffer(size_t initSize) {
curLen = 0;
maxLen = initSize;
data = (uint8_t *)malloc(maxLen);
}
~DataBuffer() {
if (data)
free(data);

View File

@ -1,3 +1,5 @@
#include "sort.h"
#include <fcntl.h>
#include <htslib/sam.h>
#include <htslib/thread_pool.h>
@ -15,8 +17,9 @@
#include "sam_io.h"
#include "sort_args.h"
#include "util/profiling.h"
#include "const_val.h"
#include "util/profiling.h"
#include "util/yarn.h"
#define BAM_BLOCK_SIZE 16L * 1024 * 1024
@ -39,27 +42,6 @@ struct BamSortData {
char *qname; // pointer to qname
};
struct BamPointer {
uint64_t offset = 0; // 地址偏移量
uint32_t bamLen = 0;
};
struct BamPtrArr {
vector<BamPointer> bamArr;
int curIdx = 0; //
};
struct OneBlock {
uint8_t data[SINGLE_BLOCK_SIZE];
uint32_t blockLen = 0; // 解压后的数据长度
uint64_t blockId = 0; // 按照顺序排列的block ID
};
struct ThreadBlockArr {
vector<OneBlock> blockArr; // 解压后的数据
int curIdx = 0; // 当前解压数据对应的vector的索引
};
// thread data for sorting
struct SortData {
/* 用在多线程解压中的数据 */
@ -148,75 +130,9 @@ static void bam_cigar2rqlens(int n_cigar, const uint32_t *cigar, hts_pos_t *rlen
}
}
int parseBam(uint8_t *data, bam1_t *b) {
bam1_core_t *c = &b->core;
int32_t block_len, ret, i;
uint32_t new_l_data;
uint8_t tmp[32], *x;
b->l_data = 0;
block_len = *(uint32_t *)data;
spdlog::info("record_len: {}", block_len);
data += 4;
x = data;
c->tid = le_to_u32(x);
c->pos = le_to_i32(x + 4);
uint32_t x2 = le_to_u32(x + 8);
c->bin = x2 >> 16;
c->qual = x2 >> 8 & 0xff;
c->l_qname = x2 & 0xff;
c->l_extranul = (c->l_qname % 4 != 0) ? (4 - c->l_qname % 4) : 0;
uint32_t x3 = le_to_u32(x + 12);
c->flag = x3 >> 16;
c->n_cigar = x3 & 0xffff;
c->l_qseq = le_to_u32(x + 16);
c->mtid = le_to_u32(x + 20);
c->mpos = le_to_i32(x + 24);
c->isize = le_to_i32(x + 28);
new_l_data = block_len - 32 + c->l_extranul;
if (new_l_data > INT_MAX || c->l_qseq < 0 || c->l_qname < 1)
return -4;
if (((uint64_t)c->n_cigar << 2) + c->l_qname + c->l_extranul + (((uint64_t)c->l_qseq + 1) >> 1) + c->l_qseq >
(uint64_t)new_l_data)
return -4;
if (realloc_bam_data(b, new_l_data) < 0)
return -4;
b->l_data = new_l_data;
b->data = data;
data += c->l_qname;
if (b->data[c->l_qname - 1] != '\0') { // try to fix missing nul termination
if (fixup_missing_qname_nul(b) < 0)
return -4;
}
for (i = 0; i < c->l_extranul; ++i) b->data[c->l_qname + i] = '\0';
c->l_qname += c->l_extranul;
data += b->l_data - c->l_qname;
// TODO: consider making this conditional
if (c->n_cigar > 0) { // recompute "bin" and check CIGAR-qlen consistency
hts_pos_t rlen, qlen;
bam_cigar2rqlens(c->n_cigar, bam_get_cigar(b), &rlen, &qlen);
if ((b->core.flag & BAM_FUNMAP) || rlen == 0)
rlen = 1;
b->core.bin = hts_reg2bin(b->core.pos, b->core.pos + rlen, 14, 5);
// Sanity check for broken CIGAR alignments
if (c->l_qseq > 0 && !(c->flag & BAM_FUNMAP) && qlen != c->l_qseq) {
hts_log_error("CIGAR and query sequence lengths differ for %s", bam_get_qname(b));
return -4;
}
}
return 4 + block_len;
}
// multi-thread uncompress bam blocks
static void mtUncompressBlock(void *data, long idx, int tid) {
static void mtUncompressBlock1(void *data, long idx, int tid) {
auto &p = *(SortData*) data;
auto &blockItemArr = p.blockItemArr[tid];
@ -243,7 +159,7 @@ static void mergeSortedBlocks(const SortData &sortData) {
static void *threadMergeParseBlocks(void *data) {
return nullptr;
auto &p = *(SortData *)data;
DataBuffer &buf = p.bamData;
@ -302,7 +218,7 @@ static void *nonBlockingUncompress(void *data) {
pthread_create(&parseTid, NULL, threadMergeParseBlocks, &sortData);
// 2. 调用并行解压的线程函数
kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock, data, ((SortData *)data)->blockAddrArr->size());
kt_for(nsgv::gSortArg.NUM_THREADS, mtUncompressBlock1, data, ((SortData *)data)->blockAddrArr->size());
sortData.uncompressFinish = 1;
int totalBlockNum = 0;
for (int t = 0; t < nsgv::gSortArg.NUM_THREADS; ++t) {
@ -345,11 +261,308 @@ void *threadRead(void *data) {
return NULL;
}
/* 将bam文件内容读取到buf解析buf中的gz block长度信息 */
static size_t doFirstPipeReadFile(FirstPipeArg &p, DataBuffer &halfBlock, FILE *fpr) {
ReadData &readData = p.readData[p.readOrder % p.READ_BUF_NUM];
size_t readState = 0;
size_t curReadPos = 0;
int blockLen = 0;
int maxBlockLen = 0;
readState = fread(readData.dataBuf, 1, readData.readBufSize, fpr);
if (readState == 0) { return 0; }
//if (p.readOrder == 278) {
// spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos);
//}
readData.startAddrArr.clear();
/* 处理上一个不完整的block */
if (halfBlock.readPos > 0) { // 上一轮有剩余
if (halfBlock.readPos < BLOCK_HEADER_LENGTH) { // 上一轮剩余数据不满足解析block长度信息
memcpy(&halfBlock.data[halfBlock.readPos], readData.dataBuf, BLOCK_HEADER_LENGTH - halfBlock.readPos);
halfBlock.curLen = unpackInt16(&halfBlock.data[16]) + 1; // 更新一下剩余block的真正长度
spdlog::info("last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos);
}
memcpy(readData.blockBuf, halfBlock.data, halfBlock.readPos);
curReadPos = halfBlock.curLen - halfBlock.readPos; // curlen保存上一个block的长度readPos保存上一个block在上一次读取中的长度
memcpy(&readData.blockBuf[halfBlock.readPos], readData.dataBuf, curReadPos); // 将不完整的block剩余数据拷贝到curBlock
readData.startAddrArr.push_back(readData.blockBuf);
}
/* 解析读入buf中的文件数据计算包含的每个block的长度信息和起始地址 */
while (curReadPos + BLOCK_HEADER_LENGTH < readState) { /* 确保能解析block长度 */
blockLen = unpackInt16(&readData.dataBuf[curReadPos + 16]) + 1;
if (blockLen > maxBlockLen) { maxBlockLen = blockLen; }
if (curReadPos + blockLen <= readState) { /* 完整的block数据在buf里 */
readData.startAddrArr.push_back(&readData.dataBuf[curReadPos]);
curReadPos += blockLen;
} else {
break; /* 当前block数据不完整一部分在还没读入的file数据里 */
}
}
/* 如果buf中包含不完整的block数据先保存一下放到下一轮里去处理 */
halfBlock.readPos = readState - curReadPos;
halfBlock.curLen = blockLen;
if (halfBlock.readPos > 0) {
memcpy(halfBlock.data, &readData.dataBuf[curReadPos], halfBlock.readPos); // 将不完整的block拷贝到halfBlock
}
//if (p.readOrder == 277) {
// spdlog::info("tail - last remain, blocklen: {}, last load: {}", halfBlock.curLen, halfBlock.readPos);
//}
//spdlog::info("block num-1: {}", readData.startAddrArr.size());
//spdlog::info("max block len: {}", maxBlockLen);
return readState;
}
/* FirstPipe step-1 读取文件线程 */
static void *firstPipeReadFile(void *data) {
FirstPipeArg &p = *(FirstPipeArg *)data;
/* 1. set up */
FILE *fpr = fopen(nsgv::gSortArg.INPUT_FILE.c_str(), "rb");
parseSamHeader(fpr, nsgv::gInHdr);
size_t fileSize = 0;
DataBuffer halfBlock(SINGLE_BLOCK_SIZE);
/* 2. do the work */
while (true) {
// self dependency
yarn::DEPENDENCY_NOT_TO_BE(p.readSig, p.READ_BUF_NUM);
PROF_G_BEG(read);
size_t readState = doFirstPipeReadFile(p, halfBlock, fpr);
PROF_G_END(read);
if (readState == 0) {
yarn::SIGNAL_FINISH(p.readSig, p.readFinish);
break;
}
// update self status
yarn::UPDATE_SIG_ORDER(p.readSig, p.readOrder);
fileSize += readState;
}
spdlog::info("read file order: {}, file size: {}", p.readOrder, fileSize);
/* 3. clean up */
fclose(fpr);
return nullptr;
}
// multi-thread uncompress bam blocks
static void mtUncompressBlock(void *data, long idx, int tid) {
UncompressData &p = *(UncompressData *)data;
ReadData &readData = *p.readDataPtr;
auto &blockItemArr = p.blockItemArr[tid];
if (blockItemArr.curIdx >= blockItemArr.blockArr.size()) {
blockItemArr.blockArr.push_back(OneBlock());
}
auto &blockItem = blockItemArr.blockArr[blockItemArr.curIdx++];
uint8_t *block = readData.startAddrArr[idx];
size_t dlen = SINGLE_BLOCK_SIZE; // 65535
int block_length = unpackInt16(&block[16]) + 1;
uint32_t crc = le_to_u32(block + block_length - 8);
int ret = bgzfUncompress(blockItem.data, &dlen, (Bytef *)block + BLOCK_HEADER_LENGTH,
block_length - BLOCK_HEADER_LENGTH, crc);
if (ret != 0) {
spdlog::error("uncompress error, block id: {}, len: {}, ret: {}", idx, block_length, ret);
exit(0);
}
blockItem.blockId = idx;
blockItem.blockLen = dlen;
uint32_t nextBamStart = 0;
uint32_t bamLen = 0;
uint32_t bamNum = 0;
PROF_G_BEG(mem_copy);
// memcpy(&buf.data[buf.curLen], pB->data, pB->blockLen);
/* 解析每个bam */
while (nextBamStart + 4 <= blockItem.blockLen) {
memcpy(&bamLen, &blockItem.data[nextBamStart], 4);
nextBamStart += 4 + bamLen;
// spdlog::info("bam len: {}", bamLen);
++bamNum;
}
blockItem.bamNum = bamNum;
PROF_G_END(mem_copy);
}
/* 将gz block进行解压 */
static void doFirstPipeUncompress(FirstPipeArg &p) {
// return;
ReadData &readData = p.readData[p.uncompressOrder % p.READ_BUF_NUM];
UncompressData &uncompressData = p.uncompressData[p.uncompressOrder % p.UNCOMPRESS_BUF_NUM];
uncompressData.readDataPtr = &readData;
uncompressData.ResetBlockArr();
kt_for(p.numThread, mtUncompressBlock, &uncompressData, readData.startAddrArr.size());
//spdlog::info("thread-0 idx:{}, blocks: {}", uncompressData.blockItemArr[0].curIdx,
// uncompressData.blockItemArr[0].blockArr.size());
}
/* FirstPipe step-2 并行解压gz blocks*/
static void *firstPipeUncompress(void *data) {
FirstPipeArg &p = *(FirstPipeArg *)data;
/* 2. do the work */
while (true) {
// previous dependency
yarn::DEPENDENCY_NOT_TO_BE(p.readSig, 0);
// self dependency
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
if (p.readFinish) {
while (p.uncompressOrder < p.readOrder) {
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, p.UNCOMPRESS_BUF_NUM);
doFirstPipeUncompress(p);
yarn::UPDATE_SIG_ORDER(p.uncompressSig, p.uncompressOrder);
}
yarn::SIGNAL_FINISH(p.uncompressSig, p.uncompressFinish);
break;
}
doFirstPipeUncompress(p);
// update status
yarn::CONSUME_SIGNAL(p.readSig);
yarn::UPDATE_SIG_ORDER(p.uncompressSig, p.uncompressOrder);
}
spdlog::info("uncompress order: {}", p.uncompressOrder);
return nullptr;
}
/* 将并行解压的数据放到一起,解析,到容量阈值后,进行排序并输出到中间文件 */
void doFirstPipeMergeSort(FirstPipeArg &p) {
UncompressData &uncompressData = p.uncompressData[p.mergeSortOrder % p.UNCOMPRESS_BUF_NUM];
DataBuffer &buf = p.mergeSortData.bamData;
BlockHeap blockHeap;
blockHeap.Init(&uncompressData.blockItemArr);
// spdlog::info("block heap size: {}", blockHeap.Size());
// spdlog::info("all block bytes: {}", blockHeap.AllBlockBytes());
// size_t allBlockBytes = blockHeap.AllBlockBytes();
// if (buf.maxLen < buf.curLen + allBlockBytes) {
// // spdlog::info("here ");
// buf.reAllocMem(buf.curLen + allBlockBytes);
// }
// memset(buf.data, 0, buf.maxLen);
const OneBlock *pB;
size_t bamNum = 0;
size_t blockNum = 0;
while ((pB = blockHeap.Pop()) != nullptr && pB->blockLen > 0) {
++blockNum;
bamNum += pB->bamNum;
}
buf.curLen = 0;
spdlog::info("block num: {}, bam num: {}", blockNum, bamNum);
}
/* FirstPipe step-3 串行合并解压后的blocks并解析每个bam的长度达到内存阈值后并行排序 */
static void *firstPipeMergeSort(void *data) {
FirstPipeArg &p = *(FirstPipeArg *)data;
while (true) {
yarn::DEPENDENCY_NOT_TO_BE(p.uncompressSig, 0);
if (p.uncompressFinish) {
spdlog::info("uncompress finish, cur sort order: {}", p.mergeSortOrder);
while (p.mergeSortOrder < p.uncompressOrder) {
doFirstPipeMergeSort(p);
p.mergeSortOrder += 1;
}
/* 需要检测一下buf中是否还有数据如果还有则需要进行排序可以不输出到中间文件直接进行second-pipe的归并排序 */
break;
}
doFirstPipeMergeSort(p);
p.mergeSortOrder += 1;
// update status
yarn::CONSUME_SIGNAL(p.uncompressSig);
}
spdlog::info("merge sort order: {}", p.mergeSortOrder);
return nullptr;
}
/* 对bam文件进行排序 */
static void bamSortFirstPipe() {
/* set up*/
FirstPipeArg firstPipeArg;
firstPipeArg.numThread = nsgv::gSortArg.NUM_THREADS;
const size_t kReadBufSize = 1L * 1024 * 1024 * firstPipeArg.numThread;
for (int i = 0; i<firstPipeArg.READ_BUF_NUM; ++i) {
firstPipeArg.readData[i].Resize(kReadBufSize);
}
for (int i = 0; i < firstPipeArg.UNCOMPRESS_BUF_NUM; ++i) {
firstPipeArg.uncompressData[i].Resize(firstPipeArg.numThread, 128);
}
/* create threads */
pthread_t pipeThreadIdArr[3]; // 3-stage pipeline
pthread_create(&pipeThreadIdArr[0], NULL, firstPipeReadFile, &firstPipeArg);
pthread_create(&pipeThreadIdArr[1], NULL, firstPipeUncompress, &firstPipeArg);
pthread_create(&pipeThreadIdArr[2], NULL, firstPipeMergeSort, &firstPipeArg);
for (int i = 0; i < 3; ++i) pthread_join(pipeThreadIdArr[i], NULL);
}
/* IO同步的方式进行排序 */
static void bamSortSerialFirstPipe() {
/* set up*/
FirstPipeArg firstPipeArg;
firstPipeArg.numThread = nsgv::gSortArg.NUM_THREADS;
const size_t kReadBufSize = 1L * 1024 * 1024 * firstPipeArg.numThread;
for (int i = 0; i < firstPipeArg.READ_BUF_NUM; ++i) {
firstPipeArg.readData[i].Resize(kReadBufSize);
}
for (int i = 0; i < firstPipeArg.UNCOMPRESS_BUF_NUM; ++i) {
firstPipeArg.uncompressData[i].Resize(firstPipeArg.numThread, 128);
}
/* 1. read file */
FILE *fpr = fopen(nsgv::gSortArg.INPUT_FILE.c_str(), "rb");
parseSamHeader(fpr, nsgv::gInHdr);
size_t fileSize = 0;
DataBuffer halfBlock(SINGLE_BLOCK_SIZE);
/* 2. do the work */
while (true) {
size_t readState = doFirstPipeReadFile(firstPipeArg, halfBlock, fpr);
doFirstPipeUncompress(firstPipeArg);
doFirstPipeMergeSort(firstPipeArg);
if (readState == 0) break;
fileSize += readState;
}
/* 3. clean up */
fclose(fpr);
}
/* 对sam文件进行排序 */
static void samSortFirstPipe() {
}
// entry function
int doSort() {
nsgv::gIsBigEndian = ed_is_big();
bamSortFirstPipe();
// //bamSortSerialFirstPipe();
return 0;
#if 0
#if 1
/* 打开输入bam文件 */
samFile *inBamFp = sam_open_format(nsgv::gSortArg.INPUT_FILE.c_str(), "r", nullptr);
if (!inBamFp) {
@ -394,7 +607,7 @@ int doSort() {
sortData.blockItemArr.resize(nsgv::gSortArg.NUM_THREADS);
for(auto &arr: sortData.blockItemArr) {
arr.blockArr.resize(1000);
// arr.blockArr.resize(1000);
}
// sortData.bamPtrArr.resize(nsgv::gSortArg.NUM_THREADS);
@ -413,7 +626,6 @@ int doSort() {
// FILE *fpw = fopen(nsgv::gSortArg.OUTPUT_FILE.c_str(), "rb");
const size_t READ_BUFSIZE = 4L * 1024 * 1024 * nsgv::gSortArg.NUM_THREADS;
const size_t MARGIN_SIZE = READ_BUFSIZE * 4;
uint8_t *fbuf[5];
fbuf[0] = (uint8_t *)malloc(READ_BUFSIZE);
@ -492,7 +704,7 @@ int doSort() {
totalBlocks += curStartAddrArr->size();
PROF_G_END(parse_block);
#if 1
#if 0
// 并行解压
PROF_G_BEG(sort);
pthread_join(uncompressTid, NULL);
@ -527,4 +739,5 @@ err:
// fclose(fpw);
return 0;
}
}

196
src/sort/sort.h 100644
View File

@ -0,0 +1,196 @@
#pragma once
#include <inttypes.h>
#include <queue>
#include <vector>
#include "const_val.h"
#include "sam_io.h"
#include "util/yarn.h"
using std::priority_queue;
using std::vector;
/* for step-1 read data from bam file */
struct ReadData {
uint8_t *dataBuf = nullptr;
uint8_t *blockBuf = nullptr;
int readBufSize = 0; // 读入的buf大小
vector<uint8_t *> startAddrArr; // 存放每个block的起始地址
ReadData() { }
ReadData(int readBufSize_) {
readBufSize = readBufSize_;
dataBuf = (uint8_t *)malloc(readBufSize);
blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
}
~ReadData() {
if (dataBuf) free(dataBuf);
if (blockBuf) free(blockBuf);
}
void Resize(int readBufSize_) {
if (dataBuf) free(dataBuf);
if (blockBuf) free(blockBuf);
readBufSize = readBufSize_;
dataBuf = (uint8_t *)malloc(readBufSize);
blockBuf = (uint8_t *)malloc(SINGLE_BLOCK_SIZE);
}
};
/* for step-2 parallel uncompress gz blocks */
struct OneBlock {
uint8_t data[SINGLE_BLOCK_SIZE];
uint32_t blockLen = 0; // 解压后的数据长度
uint64_t blockId = 0; // 按照顺序排列的block ID
uint64_t bamNum = 0; // 解压后的bam数量
};
struct ThreadBlockArr {
vector<OneBlock> blockArr; // 解压后的数据
int curIdx = 0; // 当前解压数据对应的vector的索引
};
struct UncompressData {
vector<ThreadBlockArr> blockItemArr; // 每个thread一个用来保存解压后的block数据
ReadData *readDataPtr = nullptr; // 读取数据的指针
UncompressData() { }
UncompressData(int numThread) { Resize(numThread); }
UncompressData(int numThread, int vecInitSize) { Resize(numThread, vecInitSize); }
void Resize(int numThread) {
blockItemArr.resize(numThread);
for (int i = 0; i < numThread; ++i) {
blockItemArr[i].blockArr.reserve(128);
}
}
void Resize(int numThread, int vecInitSize) {
blockItemArr.resize(numThread);
for (int i = 0; i < numThread; ++i) {
blockItemArr[i].blockArr.reserve(vecInitSize);
}
}
void ResetBlockArr() {
for (int i = 0; i < blockItemArr.size(); ++i) {
blockItemArr[i].curIdx = 0;
}
}
};
/* block 排序堆 */
struct BlockArrIdIdx {
int arrId = 0;
size_t arrIdx = 0; // 下一个待读入数据的idx
const OneBlock *block = nullptr;
};
struct BlockGreaterThan{
bool operator()(const BlockArrIdIdx &a, const BlockArrIdIdx &b) const {
return a.block->blockId > b.block->blockId;
}
};
/* 用来排序 */
struct BlockHeap {
vector<ThreadBlockArr> *arr2d;
priority_queue<BlockArrIdIdx, vector<BlockArrIdIdx>, BlockGreaterThan> minHeap;
size_t popNum = 0;
int Init(vector<ThreadBlockArr> *_arr2d) {
arr2d = _arr2d;
if (arr2d == nullptr) {
return -1;
}
for (int i = 0; i < arr2d->size(); ++i) {
auto &v = (*arr2d)[i];
if (v.curIdx > 0) {
minHeap.push({i, 1, &v.blockArr[0]});
}
}
return 0;
}
const OneBlock *Pop() {
const OneBlock *ret = nullptr;
if (!minHeap.empty()) {
auto minVal = minHeap.top();
minHeap.pop();
++popNum;
ret = minVal.block;
auto &v = (*arr2d)[minVal.arrId];
if (v.curIdx > minVal.arrIdx) {
minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v.blockArr[minVal.arrIdx]});
}
}
return ret;
}
size_t AllBlockBytes() {
size_t bytes = 0;
if (arr2d != nullptr) {
for (auto &v : *arr2d) {
for (int i = 0; i < v.curIdx; ++i) {
bytes += v.blockArr[i].blockLen;
}
}
}
return bytes;
}
size_t Size() {
size_t len = 0;
if (arr2d != nullptr) {
for (auto &v : *arr2d) {
len += v.curIdx;
}
}
return len - popNum;
}
};
/* for step-3 serial merge blocks and sort them */
struct BamPointer {
uint64_t offset = 0; // 地址偏移量
uint32_t bamLen = 0;
};
struct BamPtrArr {
vector<BamPointer> bamArr;
int curIdx = 0; //
};
struct MergeSortData {
DataBuffer bamData; // 用来保存解压后的数据
BamPtrArr bamPtrArr; // 每个bam对应的解压数据起始地址和长度
MergeSortData() {
bamPtrArr.bamArr.reserve(128);
}
};
/* 第一阶段的多线程流水线参数 */
struct FirstPipeArg {
static const int READ_BUF_NUM = 2; // 读入的buf数量
static const int UNCOMPRESS_BUF_NUM = 2; // 解压的buf数量
int numThread = 0; // 线程数
uint64_t readOrder = 0; // 读取文件
uint64_t uncompressOrder = 0; // 并行解压gz block
uint64_t mergeSortOrder = 0; // 串行合并解压后的blocks并解析每个bam的长度达到内存阈值后并行排序
volatile int readFinish = 0;
volatile int uncompressFinish = 0;
yarn::lock_t *readSig;
yarn::lock_t *uncompressSig;
ReadData readData[READ_BUF_NUM];
UncompressData uncompressData[UNCOMPRESS_BUF_NUM];
MergeSortData mergeSortData;
FirstPipeArg()
{
readSig = yarn::NEW_LOCK(0);
uncompressSig = yarn::NEW_LOCK(0);
}
};
/* 第二阶段的多线程流水线参数 */
struct PipeSecondArg
{
/* data */
};

395
src/util/yarn.cpp 100644
View File

@ -0,0 +1,395 @@
/* yarn.c -- generic thread operations implemented using pthread functions
* Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler
* Version 1.7 12 Apr 2020 Mark Adler
* For conditions of distribution and use, see copyright notice in yarn.h
*/
/* Basic thread operations implemented using the POSIX pthread library. All
pthread references are isolated within this module to allow alternate
implementations with other thread libraries. See yarn.h for the description
of these operations. */
/* Version history:
1.0 19 Oct 2008 First version
1.1 26 Oct 2008 No need to set the stack size -- remove
Add yarn_abort() function for clean-up on error exit
1.2 19 Dec 2011 (changes reversed in 1.3)
1.3 13 Jan 2012 Add large file #define for consistency with pigz.c
Update thread portability #defines per IEEE 1003.1-2008
Fix documentation in yarn.h for yarn_prefix
1.4 19 Jan 2015 Allow yarn_abort() to avoid error message to stderr
Accept and do nothing for NULL argument to free_lock()
1.5 8 May 2018 Remove destruct() to avoid use of pthread_cancel()
Normalize the code style
1.6 3 Apr 2019 Add debugging information to fail() error messages
1.7 12 Apr 2020 Fix use after free bug in ignition()
*/
// For thread portability.
#define _XOPEN_SOURCE 700
#define _POSIX_C_SOURCE 200809L
#define _THREAD_SAFE
// Use large file functions if available.
#define _FILE_OFFSET_BITS 64
// External libraries and entities referenced.
#include <pthread.h> // pthread_t, pthread_create(), pthread_join(),
#include <stdio.h> // fprintf(), stderr
#include <stdlib.h> // exit(), malloc(), free(), NULL
// pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
// PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
// pthread_self(), pthread_equal(),
// pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
// pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
// pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
// pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy()
#include <errno.h> // EPERM, ESRCH, EDEADLK, ENOMEM, EBUSY, EINVAL, EAGAIN
// Interface definition.
#include "yarn.h"
// Constants.
#define local static // for non-exported functions and globals
namespace yarn {
// Error handling external globals, resettable by application.
char *yarn_prefix = (char *)"yarn";
void (*yarn_abort)(int) = NULL;
// Immediately exit -- use for errors that shouldn't ever happen.
local void fail(int err, char const *file, long line, char const *func) {
fprintf(stderr, "%s: ", yarn_prefix);
switch (err) {
case EPERM:
fputs("already unlocked", stderr);
break;
case ESRCH:
fputs("no such thread", stderr);
break;
case EDEADLK:
fputs("resource deadlock", stderr);
break;
case ENOMEM:
fputs("out of memory", stderr);
break;
case EBUSY:
fputs("can't destroy locked resource", stderr);
break;
case EINVAL:
fputs("invalid request", stderr);
break;
case EAGAIN:
fputs("resource unavailable", stderr);
break;
default:
fprintf(stderr, "internal error %d", err);
}
fprintf(stderr, " (%s:%ld:%s)\n", file, line, func);
if (yarn_abort != NULL)
yarn_abort(err);
exit(err);
}
// Memory handling routines provided by user. If none are provided, malloc()
// and free() are used, which are therefore assumed to be thread-safe.
typedef void *(*malloc_t)(size_t);
typedef void (*free_t)(void *);
local malloc_t my_malloc_f = malloc;
local free_t my_free = free;
// Use user-supplied allocation routines instead of malloc() and free().
void yarn_mem(malloc_t lease, free_t vacate) {
my_malloc_f = lease;
my_free = vacate;
}
// Memory allocation that cannot fail (from the point of view of the caller).
local void *my_malloc(size_t size, char const *file, long line) {
void *block;
if ((block = my_malloc_f(size)) == NULL)
fail(ENOMEM, file, line, "malloc");
return block;
}
// -- Lock functions --
struct lock_s {
pthread_mutex_t mutex;
pthread_cond_t cond;
long value;
};
lock_t *new_lock_(long initial, char const *file, long line) {
lock_t *bolt = (lock_t *)my_malloc(sizeof(struct lock_s), file, line);
int ret = pthread_mutex_init(&(bolt->mutex), NULL);
if (ret)
fail(ret, file, line, "mutex_init");
ret = pthread_cond_init(&(bolt->cond), NULL);
if (ret)
fail(ret, file, line, "cond_init");
bolt->value = initial;
return bolt;
}
void possess_(lock_t *bolt, char const *file, long line) {
int ret = pthread_mutex_lock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_lock");
}
void release_(lock_t *bolt, char const *file, long line) {
int ret = pthread_mutex_unlock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_unlock");
}
void twist_(lock_t *bolt, enum twist_op op, long val, char const *file, long line) {
if (op == TO)
bolt->value = val;
else if (op == BY)
bolt->value += val;
int ret = pthread_cond_broadcast(&(bolt->cond));
if (ret)
fail(ret, file, line, "cond_broadcast");
ret = pthread_mutex_unlock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_unlock");
}
#define until(a) while (!(a))
void wait_for_(lock_t *bolt, enum wait_op op, long val, char const *file, long line) {
switch (op) {
case TO_BE:
until(bolt->value == val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case NOT_TO_BE:
until(bolt->value != val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case TO_BE_MORE_THAN:
until(bolt->value > val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case TO_BE_LESS_THAN:
until(bolt->value < val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
}
}
long peek_lock(lock_t *bolt) { return bolt->value; }
void free_lock_(lock_t *bolt, char const *file, long line) {
if (bolt == NULL)
return;
int ret = pthread_cond_destroy(&(bolt->cond));
if (ret)
fail(ret, file, line, "cond_destroy");
ret = pthread_mutex_destroy(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_destroy");
my_free(bolt);
}
// -- Thread functions (uses the lock_t functions above) --
struct thread_s {
pthread_t id;
int done; // true if this thread has exited
thread *next; // for list of all launched threads
};
// List of threads launched but not joined, count of threads exited but not
// joined (incremented by ignition() just before exiting).
local lock_t threads_lock = {
PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,
0 // number of threads exited but not joined
};
local thread *threads = NULL; // list of extant threads
// Structure in which to pass the probe and its payload to ignition().
struct capsule {
void (*probe)(void *);
void *payload;
char const *file;
long line;
};
// Mark the calling thread as done and alert join_all().
local void reenter(void *arg) {
struct capsule *capsule = (struct capsule *)arg;
// find this thread in the threads list by matching the thread id
pthread_t me = pthread_self();
possess_(&(threads_lock), capsule->file, capsule->line);
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (pthread_equal(match->id, me))
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, capsule->file, capsule->line, "reenter lost");
// mark this thread as done and move it to the head of the list
match->done = 1;
if (threads != match) {
*prior = match->next;
match->next = threads;
threads = match;
}
// update the count of threads to be joined and alert join_all()
twist_(&(threads_lock), BY, +1, capsule->file, capsule->line);
// free the capsule resource, even if the thread is cancelled (though yarn
// doesn't use pthread_cancel() -- you never know)
my_free(capsule);
}
// All threads go through this routine. Just before a thread exits, it marks
// itself as done in the threads list and alerts join_all() so that the thread
// resources can be released. Use a cleanup stack so that the marking occurs
// even if the thread is cancelled.
local void *ignition(void *arg) {
struct capsule *capsule = (struct capsule *)arg;
// run reenter() before leaving
pthread_cleanup_push(reenter, arg);
// execute the requested function with argument
capsule->probe(capsule->payload);
// mark this thread as done, letting join_all() know, and free capsule
pthread_cleanup_pop(1);
// exit thread
return NULL;
}
// Not all POSIX implementations create threads as joinable by default, so that
// is made explicit here.
thread *launch_(void (*probe)(void *), void *payload, char const *file, long line) {
// construct the requested call and argument for the ignition() routine
// (allocated instead of automatic so that we're sure this will still be
// there when ignition() actually starts up -- ignition() will free this
// allocation)
struct capsule *capsule = (struct capsule *)my_malloc(sizeof(struct capsule), file, line);
capsule->probe = probe;
capsule->payload = payload;
capsule->file = file;
capsule->line = line;
// assure this thread is in the list before join_all() or ignition() looks
// for it
possess_(&(threads_lock), file, line);
// create the thread and call ignition() from that thread
thread *th = (thread *)my_malloc(sizeof(struct thread_s), file, line);
pthread_attr_t attr;
int ret = pthread_attr_init(&attr);
if (ret)
fail(ret, file, line, "attr_init");
ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (ret)
fail(ret, file, line, "attr_setdetachstate");
ret = pthread_create(&(th->id), &attr, ignition, capsule);
if (ret)
fail(ret, file, line, "create");
ret = pthread_attr_destroy(&attr);
if (ret)
fail(ret, file, line, "attr_destroy");
// put the thread in the threads list for join_all()
th->done = 0;
th->next = threads;
threads = th;
release_(&(threads_lock), file, line);
return th;
}
void join_(thread *ally, char const *file, long line) {
// wait for thread to exit and return its resources
int ret = pthread_join(ally->id, NULL);
if (ret)
fail(ret, file, line, "join");
// find the thread in the threads list
possess_(&(threads_lock), file, line);
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (match == ally)
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, file, line, "join lost");
// remove thread from list and update exited count, free thread
if (match->done)
threads_lock.value--;
*prior = match->next;
release_(&(threads_lock), file, line);
my_free(ally);
}
// This implementation of join_all() only attempts to join threads that have
// announced that they have exited (see ignition()). When there are many
// threads, this is faster than waiting for some random thread to exit while a
// bunch of other threads have already exited.
int join_all_(char const *file, long line) {
// grab the threads list and initialize the joined count
int count = 0;
possess_(&(threads_lock), file, line);
// do until threads list is empty
while (threads != NULL) {
// wait until at least one thread has reentered
wait_for_(&(threads_lock), NOT_TO_BE, 0, file, line);
// find the first thread marked done (should be at or near the top)
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (match->done)
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, file, line, "join_all lost");
// join the thread (will be almost immediate), remove from the threads
// list, update the reenter count, and free the thread
int ret = pthread_join(match->id, NULL);
if (ret)
fail(ret, file, line, "join");
threads_lock.value--;
*prior = match->next;
my_free(match);
count++;
}
// let go of the threads list and return the number of threads joined
release_(&(threads_lock), file, line);
return count;
}
}; // namespace yarn

169
src/util/yarn.h 100644
View File

@ -0,0 +1,169 @@
/* yarn.h -- generic interface for thread operations
* Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler
* Version 1.7 12 Apr 2020 Mark Adler
*/
/*
This software is provided 'as-is', without any express or implied
warranty. In no event will the author be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.
Mark Adler
madler@alumni.caltech.edu
*/
/* Basic thread operations
This interface isolates the local operating system implementation of threads
from the application in order to facilitate platform independent use of
threads. All of the implementation details are deliberately hidden.
Assuming adequate system resources and proper use, none of these functions
can fail. As a result, any errors encountered will cause an exit() to be
executed, or the execution of your own optionally-provided abort function.
These functions allow the simple launching and joining of threads, and the
locking of objects and synchronization of changes of objects. The latter is
implemented with a single lock_t type that contains an integer value. The
value can be ignored for simple exclusive access to an object, or the value
can be used to signal and wait for changes to an object.
-- Arguments --
thread *thread; identifier for launched thread, used by join
void probe(void *); pointer to function "probe", run when thread starts
void *payload; single argument passed to the probe function
lock_t *lock_t; a lock_t with a value -- used for exclusive access to
an object and to synchronize threads waiting for
changes to an object
long val; value to set lock_t, increment lock_t, or wait for
int n; number of threads joined
-- Thread functions --
thread = launch(probe, payload) - launch a thread -- exit via probe() return
join(thread) - join a thread and by joining end it, waiting for the thread
to exit if it hasn't already -- will free the resources allocated by
launch() (don't try to join the same thread more than once)
n = join_all() - join all threads launched by launch() that are not joined
yet and free the resources allocated by the launches, usually to clean
up when the thread processing is done -- join_all() returns an int with
the count of the number of threads joined (join_all() should only be
called from the main thread, and should only be called after any calls
of join() have completed)
-- Lock functions --
lock_t = new_lock(val) - create a new lock_t with initial value val (lock_t is
created in the released state)
possess(lock_t) - acquire exclusive possession of a lock_t, waiting if necessary
twist(lock_t, [TO | BY], val) - set lock_t to or increment lock_t by val, signal
all threads waiting on this lock_t and then release the lock_t -- must
possess the lock_t before calling (twist releases, so don't do a
release() after a twist() on the same lock_t)
wait_for(lock_t, [TO_BE | NOT_TO_BE | TO_BE_MORE_THAN | TO_BE_LESS_THAN], val)
- wait on lock_t value to be, not to be, be greater than, or be less than
val -- must possess the lock_t before calling, will possess the lock_t on
return but the lock_t is released while waiting to permit other threads
to use twist() to change the value and signal the change (so make sure
that the object is in a usable state when waiting)
release(lock_t) - release a possessed lock_t (do not try to release a lock_t that
the current thread does not possess)
val = peek_lock(lock_t) - return the value of the lock_t (assumes that lock_t is
already possessed, no possess or release is done by peek_lock())
free_lock(lock_t) - free the resources allocated by new_lock() (application
must assure that the lock_t is released before calling free_lock())
-- Memory allocation ---
yarn_mem(better_malloc, better_free) - set the memory allocation and free
routines for use by the yarn routines where the supplied routines have
the same interface and operation as malloc() and free(), and may be
provided in order to supply thread-safe memory allocation routines or
for any other reason -- by default malloc() and free() will be used
-- Error control --
yarn_prefix - a char pointer to a string that will be the prefix for any
error messages that these routines generate before exiting -- if not
changed by the application, "yarn" will be used
yarn_abort - an external function that will be executed when there is an
internal yarn error, due to out of memory or misuse -- this function
may exit to abort the application, or if it returns, the yarn error
handler will exit (set to NULL by default for no action)
*/
#pragma once
#include <stdlib.h>
namespace yarn {
extern char *yarn_prefix;
extern void (*yarn_abort)(int);
void yarn_mem(void *(*)(size_t), void (*)(void *));
typedef struct thread_s thread;
thread *launch_(void (*)(void *), void *, char const *, long);
#define LAUNCH(a, b) launch_(a, b, __FILE__, __LINE__)
void join_(thread *, char const *, long);
#define JOIN(a) join_(a, __FILE__, __LINE__)
int join_all_(char const *, long);
#define JOIN_ALL() join_all_(__FILE__, __LINE__)
typedef struct lock_s lock_t;
lock_t *new_lock_(long, char const *, long);
#define NEW_LOCK(a) new_lock_(a, __FILE__, __LINE__)
void possess_(lock_t *, char const *, long);
#define POSSESS(a) possess_(a, __FILE__, __LINE__)
void release_(lock_t *, char const *, long);
// #define release(a) release_(a, __FILE__, __LINE__)
#define RELEASE(a) release_(a, __FILE__, __LINE__)
enum twist_op { TO, BY };
void twist_(lock_t *, enum twist_op, long, char const *, long);
#define TWIST(a, b, c) twist_(a, b, c, __FILE__, __LINE__)
enum wait_op {
TO_BE,
/* or */ NOT_TO_BE, /* that is the question */
TO_BE_MORE_THAN,
TO_BE_LESS_THAN
};
void wait_for_(lock_t *, enum wait_op, long, char const *, long);
#define WAIT_FOR(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__)
long peek_lock(lock_t *);
#define PEEK_LOCK(a) peek_lock(a)
void free_lock_(lock_t *, char const *, long);
#define FREE_LOCK(a) free_lock_(a, __FILE__, __LINE__)
#define DEPENDENCY_NOT_TO_BE(sig, request) \
possess_(sig, __FILE__, __LINE__); \
wait_for_(sig, yarn::NOT_TO_BE, request, __FILE__, __LINE__); \
release_(sig, __FILE__, __LINE__);
#define UPDATE_SIG_ORDER(sig, order) \
possess_(sig, __FILE__, __LINE__); \
order += 1; \
twist_(sig, yarn::BY, 1, __FILE__, __LINE__);
#define SIGNAL_FINISH(sig, finish) \
possess_(sig, __FILE__, __LINE__); \
finish = 1; \
twist_(sig, yarn::BY, 1, __FILE__, __LINE__);
#define CONSUME_SIGNAL(sig) \
possess_(sig, __FILE__, __LINE__); \
twist_(sig, yarn::BY, -1, __FILE__, __LINE__);
}; // namespace yarn