正确性没有问题了

This commit is contained in:
zzh 2024-11-26 23:51:16 +08:00
parent 7352eb9070
commit 0ee22ca331
9 changed files with 59 additions and 94 deletions

7
.vscode/launch.json vendored
View File

@ -13,14 +13,13 @@
"program": "${workspaceRoot}/build/bin/picard_cpp",
"args": [
"MarkDuplicates",
"--INPUT", "/home/zzh/data/wgs_na12878.bam",
"--INPUT", "/home/zzh/data/bam/100w.bam",
"--OUTPUT", "./out.sam",
"--METRICS_FILE", "metrics.txt",
"--num_threads", "32",
"--num_threads", "1",
"--max_mem", "2G",
"--verbosity", "DEBUG",
"--asyncio", "true",
"--READ_NAME_REGEX", ""
"--asyncio", "true"
],
"cwd": "${workspaceFolder}", //
}

16
run.sh
View File

@ -1,20 +1,24 @@
nthread=1
#nthread=1
#nthread=2
#nthread=4
#nthread=8
#nthread=16
#nthread=32
#nthread=32
nthread=32
#nthread=64
#nthread=128
#input=/home/zzh/data/bam/zy_normal.bam
#input=/home/zzh/data/bam/normal_all.sam
#input=/home/zzh/data/bam/zy_tumor.bam
input=/home/zzh/data/wgs_na12878.bam
#input=~/data/bam/100w.bam
#input=/home/zzh/data/wgs_na12878.bam
input=~/data/bam/100w.bam
#input=~/data/bam/t100w.sam
#input=~/data/bam/1k.sam
#input=~/data/bam/1kw.sam
#input=~/data/bam/1kw.bam
#input=~/data/bam/n1kw.sam
#input=~/data/bam/tumor_small.bam
#input=~/data/bam/normal_small.bam
#input=~/data/bam/normal_30.bam
output=/home/zzh/data1/na12878_out.bam
cd ./build/ && make -j 8 && cd ..
@ -27,7 +31,7 @@ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \
--num_threads $nthread \
--max_mem 2G \
--verbosity DEBUG \
--asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX null #
--asyncio true -TAG_DUPLICATE_SET_MEMBERS true --READ_NAME_REGEX null #
#--TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX ""
#--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$"
#--READ_NAME_REGEX ""

View File

@ -54,10 +54,10 @@ sam_hdr_t *g_outBamHeader; // 输出文件的header
GlobalArg &g_gArg = GlobalArg::Instance();
static MarkDupsArg g_mdArg_;
MarkDupsArg &g_mdArg = g_mdArg_;
static GlobalDataArg gData_;
GlobalDataArg &gData = gData_;
DuplicationMetrics gMetrics_;
DuplicationMetrics &gMetrics = gMetrics_; // 记录统计信息
PipelineArg gPipe_;
PipelineArg &gPipe = gPipe_;
/* 程序版本等信息 */
const char *PROGRAM_GROUP_VERSION = "3.2.0";
@ -152,7 +152,7 @@ int MarkDuplicates(int argc, char *argv[]) {
htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池
// htsPoolRead.pool = hts_tpool_init(g_gArg.num_threads);
// htsPoolWrite.pool = hts_tpool_init(g_gArg.num_threads);
htsPoolRead.pool = hts_tpool_init(16);
htsPoolRead.pool = hts_tpool_init(32);
htsPoolWrite.pool = hts_tpool_init(32);
if (!htsPoolRead.pool || !htsPoolWrite.pool) {
Error("[%d] failed to set up thread pool", __LINE__);
@ -219,9 +219,9 @@ int MarkDuplicates(int argc, char *argv[]) {
inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
DupIdxQueue<DupInfo> dupIdxQue, repIdxQue;
DupIdxQueue<int64_t> opticalIdxQue;
dupIdxQue.Init(&gData.dupIdxArr);
repIdxQue.Init(&gData.repIdxArr);
opticalIdxQue.Init(&gData.opticalDupIdxArr);
dupIdxQue.Init(&gPipe.intersectData.dupIdxArr);
repIdxQue.Init(&gPipe.intersectData.repIdxArr);
opticalIdxQue.Init(&gPipe.intersectData.opticalDupIdxArr);
Timer tw;
cout << "dupsize: " << dupIdxQue.Size() << endl;
uint64_t bamIdx = 0;
@ -237,7 +237,7 @@ int MarkDuplicates(int argc, char *argv[]) {
uint32_t duplicateSetSize = 0;
int64_t realDupSize = 0;
exit(0);
// exit(0);
while (inBuf.ReadStat() >= 0) {
Timer tw1;
size_t readNum = inBuf.ReadBam();
@ -292,6 +292,9 @@ int MarkDuplicates(int argc, char *argv[]) {
}
// 添加冗余标识
bw->SetDuplicateReadFlag(true);
if (isOpticalDup) {
// cerr << bamIdx << endl;
}
uint8_t *oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_TYPE_TAG.c_str());
if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal);
@ -331,16 +334,20 @@ int MarkDuplicates(int argc, char *argv[]) {
}
if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) {
// cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl;
if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) {
cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl;
uint8_t *oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_SET_INDEX_TAG.c_str());
if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal);
if (oldTagVal != NULL)
bam_aux_del(bw->b, oldTagVal);
bam_aux_append(bw->b, g_mdArg.DUPLICATE_SET_INDEX_TAG.c_str(), 'i',
sizeof(representativeReadIndexInFile), (uint8_t *)&representativeReadIndexInFile);
oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_SET_SIZE_TAG.c_str());
if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal);
if (oldTagVal != NULL)
bam_aux_del(bw->b, oldTagVal);
bam_aux_append(bw->b, g_mdArg.DUPLICATE_SET_SIZE_TAG.c_str(), 'i', sizeof(duplicateSetSize),
(const uint8_t *)&duplicateSetSize);
}
}
// 每个read都要写到output只是添加标识或者删掉这些冗余record
++bamIdx;
if (isDup && g_mdArg.REMOVE_DUPLICATES) {
@ -355,12 +362,14 @@ int MarkDuplicates(int argc, char *argv[]) {
bam_aux_append(bw->b, "PG", 'Z', g_mdArg.PROGRAM_RECORD_ID.size() + 1,
(const uint8_t *)g_mdArg.PROGRAM_RECORD_ID.c_str());
}
#if 1
if (sam_write1(g_outBamFp, g_outBamHeader, bw->b) < 0) {
Error("failed writing sam record to \"%s\"", g_gArg.out_fn.c_str());
sam_close(g_outBamFp);
sam_close(g_inBamFp);
return -1;
}
#endif
}
inBuf.ClearAll();
cout << "write round time: " << tw1.seconds_elapsed() << " s" << endl;
@ -371,7 +380,7 @@ int MarkDuplicates(int argc, char *argv[]) {
// 计算统计信息
gMetrics.READ_PAIRS_EXAMINED /= 2;
gMetrics.READ_PAIR_DUPLICATES /= 2;
for (auto &arr : gData.opticalDupIdxArr) gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size();
for (auto &arr : gPipe.intersectData.opticalDupIdxArr) gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size();
gMetrics.READ_PAIR_OPTICAL_DUPLICATES = gMetrics.READ_PAIR_OPTICAL_DUPLICATES / 2;
gMetrics.ESTIMATED_LIBRARY_SIZE =
estimateLibrarySize(gMetrics.READ_PAIRS_EXAMINED - gMetrics.READ_PAIR_OPTICAL_DUPLICATES,

View File

@ -87,7 +87,7 @@ void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnd
k.read2ReferenceIndex = bc.mtid;
}
// Fill in the location information for optical duplicates
rnParser.AddLocationInformation(bw.query_name(), pKey);
if (!rnParser.wrong_name_format) rnParser.AddLocationInformation(bw.query_name(), pKey);
// cout << k.tile << ' ' << k.x << ' ' << k.y << endl;
// 计算位置key
k.posKey =

View File

@ -93,11 +93,9 @@ struct TaskSeqDupInfo {
DPSet<DupInfo> dupIdx;
MDSet<int64_t> opticalDupIdx;
DPSet<DupInfo> repIdx;
MDSet<int64_t> singletonIdx;
MDSet<int64_t> notDupIdx;
MDSet<int64_t> notOpticalDupIdx;
MDSet<int64_t> notRepIdx;
MDSet<int64_t> notSingletonIdx;
};
/* 保存有未匹配pair位点的信息包括read end数组和有几个未匹配的read end */
@ -172,58 +170,3 @@ struct GlobalDataArg {
vector<MDSet<int64_t>> latterNotRepIdxArr;
vector<MDSet<int64_t>> latterNotSingletonIdxArr;
};
struct ReadEndsArrIdIdx {
int arrId = 0;
uint64_t arrIdx = 0;
const ReadEnds *pE = nullptr;
};
struct ReadEndsGreaterThan {
bool operator()(const ReadEndsArrIdIdx &a, const ReadEndsArrIdIdx &b) { return *b.pE < *a.pE; }
};
struct ReadEndsQueue {
// 将冗余索引和他对应的task vector对应起来
vector<vector<ReadEnds>*> *arr2d;
priority_queue<ReadEndsArrIdIdx, vector<ReadEndsArrIdIdx>, ReadEndsGreaterThan> minHeap;
uint64_t popNum = 0;
int Init(vector<vector<ReadEnds>*> *_arr2d) {
arr2d = _arr2d;
if (arr2d == nullptr) {
return -1;
}
for (int i = 0; i < arr2d->size(); ++i) {
auto v = (*arr2d)[i];
if (!v->empty()) {
minHeap.push({i, 1, &(*v)[0]});
}
}
return 0;
}
const ReadEnds *Pop() {
const ReadEnds *ret = nullptr;
if (!minHeap.empty()) {
auto idx = minHeap.top();
minHeap.pop();
++popNum;
ret = idx.pE;
auto v = (*arr2d)[idx.arrId];
if (v->size() > idx.arrIdx) {
minHeap.push({idx.arrId, idx.arrIdx + 1, &(*v)[idx.arrIdx]});
}
}
return ret;
}
uint64_t Size() {
uint64_t len = 0;
if (arr2d != nullptr) {
for (auto v : *arr2d) {
len += v->size();
}
}
return len - popNum;
}
};

View File

@ -807,6 +807,7 @@ static void *pipeGenRE(void *data) {
TWIST(pipeArg.genRESig, BY, 1);
TWIST(pipeArg.readSig, BY, -1); // 使用了一次读入的数据
}
cout << "pipe gen reads" << endl;
return 0;
}
static void *pipeSort(void *data) {
@ -851,6 +852,7 @@ static void *pipeSort(void *data) {
pipeArg.sortOrder += 1;
TWIST(pipeArg.sortSig, BY, 1);
}
cout << "end pipe sort" << endl;
return 0;
}
static void *pipeMarkDup(void *data) {
@ -879,11 +881,11 @@ static void *pipeMarkDup(void *data) {
}
POSSESS(pipeArg.markDupSig);
pipeArg.markDupFinish = 1;
TWIST(pipeArg.markDupSig, BY, 1);
TWIST(pipeArg.markDupSig, BY, 2);
break;
}
/* 冗余检测 readends */
cout << "markdup order: " << pipeArg.markDupOrder << endl;
// cout << "markdup order: " << pipeArg.markDupOrder << endl;
tm_arr[3].acc_start();
doMarkDup(pipeArg);
tm_arr[3].acc_end();
@ -894,6 +896,7 @@ static void *pipeMarkDup(void *data) {
pipeArg.markDupOrder += 1;
TWIST(pipeArg.markDupSig, BY, 1);
}
cout << "end pipe markdup" << endl;
return 0;
}
static void *pipeIntersect(void *data) {
@ -925,6 +928,7 @@ static void *pipeIntersect(void *data) {
pipeArg.intersectOrder += 1;
}
cout << "end pipe intersect" << endl;
return 0;
}
@ -1011,7 +1015,7 @@ static void mergeAllTask(PipelineArg &pipeArg) {
static void parallelPipeline() {
Timer::log_time("pipeline start");
PipelineArg pipeArg;
PipelineArg &pipeArg = gPipe;
pipeArg.numThread = g_gArg.num_threads;
pthread_t tidArr[5];
@ -1082,7 +1086,7 @@ void pipelineMarkDups() {
if (g_gArg.num_threads > 1) return parallelPipeline();
Timer::log_time("pipeline start");
PipelineArg pipeArg;
PipelineArg &pipeArg = gPipe;
pipeArg.numThread = g_gArg.num_threads;
BamBufType inBamBuf(g_gArg.use_asyncio);
inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);

View File

@ -26,8 +26,8 @@ class MarkDupsArg;
extern MarkDupsArg &g_mdArg; // markduplicate程序相关的参数
// 上边加下划线表示所有模块共享不加下划线表示markduplicate计算时候内部使用
class GlobalDataArg;
extern GlobalDataArg &gData; // 计算冗余过程中保存在全局的数据
class PipelineArg;
extern PipelineArg &gPipe; // 计算冗余过程中保存在全局的数据
class DuplicationMetrics;
extern DuplicationMetrics &gMetrics; // 用来计算统计信息

View File

@ -32,6 +32,8 @@ struct PhysicalLocation {
int16_t tile = -1;
int32_t x = -1;
int32_t y = -1;
//int16_t x = -1;
//int16_t y = -1;
};
/* 包含了所有read ends信息如picard里边的 ReadEndsForMarkDuplicates*/

View File

@ -50,6 +50,8 @@ struct ReadNameParser {
const string DEFAULT_READ_NAME_REGEX =
"(?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$";
bool wrong_name_format = false;
string readNameStored = "";
PhysicalLocation physicalLocationStored;
int tmpLocationFields[3]; // for optimization of addLocationInformation
@ -166,6 +168,7 @@ struct ReadNameParser {
"%s; Error Msg: %s",
readNameRegex.c_str(), readName.c_str(), e.what());
warnedAboutRegexNotMatching = false;
wrong_name_format = true;
}
} catch (...) {
if (warnedAboutRegexNotMatching) {
@ -175,6 +178,7 @@ struct ReadNameParser {
"%s",
readNameRegex.c_str(), readName.c_str());
warnedAboutRegexNotMatching = false;
wrong_name_format = true;
}
}