去掉了一些调试信息,把cout等输出都换成了spdlog,可以作为开源代码发布了,README还需要详细写一写

This commit is contained in:
zzh 2024-12-16 02:55:11 +08:00
parent 5d9a52e90b
commit 01d14d539f
10 changed files with 434 additions and 112 deletions

View File

@ -5,6 +5,5 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
# set(CMAKE_BUILD_TYPE Debug)
# set(CMAKE_BUILD_TYPE Release)
# add_definitions(-DSHOW_PERF)
add_definitions(-DSHOW_PERF=1)
add_subdirectory(src)

View File

@ -1,3 +1,10 @@
# markdups
# FastDup
mark duplicate去除冗余
Identifies duplicate reads. This tool locates and tags duplicate reads in a coordinate ordered SAM or BAM file.
Use the same algorithm as picard MarkDuplicates and output identical results.
Use spdlog as log tool and the default level is 'info'.
### Features
* Fast -

111
metrics.txt 100644
View File

@ -0,0 +1,111 @@
## StringHeader
# /home/zzh/work/ngs/FastDup/build/bin/fastdup --input /home/zzh/data/bam/normal_all.sam --output /home/zzh/data1/out.sam --metrics ./metrics.txt --num-threads 1 --create-index --index-format CSI --tag-duplicate-set-members
## StringHeader
# Started on: December 16, 2024 at 02:43:41 AM CST
## METRICS
LIBRARY UNPAIRED_READS_EXAMINED READ_PAIRS_EXAMINED SECONDARY_OR_SUPPLEMENTARY_RDS UNMAPPED_READS UNPAIRED_READ_DUPLICATES READ_PAIR_DUPLICATES READ_PAIR_OPTICAL_DUPLICATES PERCENT_DUPLICATION ESTIMATED_LIBRARY_SIZE
normal 108919 66154888 117508 161395 67414 15058359 876584 0.227945 127582703
## HISTOGRAM Double
BIN CoverageMult
1 1.010249
2 1.611750
3 1.969882
4 2.183113
5 2.310070
6 2.385660
7 2.430666
8 2.457463
9 2.473417
10 2.482917
11 2.488573
12 2.491940
13 2.493945
14 2.495139
15 2.495850
16 2.496273
17 2.496525
18 2.496675
19 2.496764
20 2.496817
21 2.496849
22 2.496868
23 2.496879
24 2.496886
25 2.496890
26 2.496892
27 2.496894
28 2.496894
29 2.496895
30 2.496895
31 2.496895
32 2.496896
33 2.496896
34 2.496896
35 2.496896
36 2.496896
37 2.496896
38 2.496896
39 2.496896
40 2.496896
41 2.496896
42 2.496896
43 2.496896
44 2.496896
45 2.496896
46 2.496896
47 2.496896
48 2.496896
49 2.496896
50 2.496896
51 2.496896
52 2.496896
53 2.496896
54 2.496896
55 2.496896
56 2.496896
57 2.496896
58 2.496896
59 2.496896
60 2.496896
61 2.496896
62 2.496896
63 2.496896
64 2.496896
65 2.496896
66 2.496896
67 2.496896
68 2.496896
69 2.496896
70 2.496896
71 2.496896
72 2.496896
73 2.496896
74 2.496896
75 2.496896
76 2.496896
77 2.496896
78 2.496896
79 2.496896
80 2.496896
81 2.496896
82 2.496896
83 2.496896
84 2.496896
85 2.496896
86 2.496896
87 2.496896
88 2.496896
89 2.496896
90 2.496896
91 2.496896
92 2.496896
93 2.496896
94 2.496896
95 2.496896
96 2.496896
97 2.496896
98 2.496896
99 2.496896
100 2.496896

View File

@ -1,6 +1,7 @@
#include <spdlog/cfg/env.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
#include <time.h>
#include <argparse/argparse.hpp>
#include <iostream>
@ -8,9 +9,9 @@
#include <set>
#include <string>
#include "fastdup_version.h"
#include "markdup/md_args.h"
#include "util/profiling.h"
#include "fastdup_version.h"
namespace nsgv {
extern MarkDupsArg gMdArg;
@ -18,6 +19,16 @@ extern MarkDupsArg gMdArg;
int MarkDuplicates();
string get_current_time_str() {
time_t time_val;
struct tm *at;
char now[80];
time(&time_val);
at = localtime(&time_val);
strftime(now, 79, "%B %d, %Y at %I:%M:%S %p %Z", at);
return string(now);
}
int main(int argc, char *argv[]) {
// init log
spdlog::set_default_logger(spdlog::stderr_color_st("fastdup"));
@ -159,6 +170,12 @@ int main(int argc, char *argv[]) {
// std::cout << program << std::endl;
nsgv::gMdArg.START_TIME = get_current_time_str();
nsgv::gMdArg.CLI_STR = argv[0];
for (int i = 1; i < argc; ++i) {
nsgv::gMdArg.CLI_STR += " " + std::string(argv[i]);
}
try {
program.parse_args(argc, argv);
nsgv::gMdArg.INPUT_FILE = program.get("--input");

View File

@ -11,6 +11,7 @@ Date : 2023/10/23
#include <htslib/thread_pool.h>
#include <spdlog/spdlog.h>
#include <iomanip>
#include <vector>
#include "dup_metrics.h"
@ -36,6 +37,13 @@ PipelineArg gPipe;
};
// 字节缓冲区
struct ByteBuf {
uint8_t *buf = nullptr;
int size = 0; // 当前使用量
int capacity = 0; // 最大容量
};
/*
*
*/
@ -89,25 +97,283 @@ int MarkDuplicates() {
#endif
/* 冗余检查和标记 */
PROF_START(markdup_all);
pipelineMarkDups();
PROF_END(gprof[GP_markdup_all], markdup_all);
/* 初始化输出文件 */
/* 标记冗余, 将处理后的结果写入文件 */
char modeout[12] = "wb";
sam_open_mode(modeout + 1, nsgv::gMdArg.OUTPUT_FILE.c_str(), NULL);
nsgv::gOutBamFp = sam_open(nsgv::gMdArg.OUTPUT_FILE.c_str(), modeout);
if (!nsgv::gOutBamFp) {
spdlog::error("[{}] create output sam/bam file failed.\n", __func__);
return -1;
}
nsgv::gOutBamHeader = sam_hdr_dup(nsgv::gInBamHeader);
// 修改输出文件的header
// sam_hdr_add_line(nsgv::gOutBamHeader, "PG", "ID", nsgv::gMdArg.PROGRAM_RECORD_ID.c_str(), "VN", FASTDUP_VERSION,
// "CL",
// (nsgv::gMdArg.PROGRAM_RECORD_ID + " " + nsgv::gMdArg.GetArgValueString() + " " +
// nsgv::gMdArg.GetArgValueString())
// .c_str(),
// NULL);
sam_hdr_add_line(nsgv::gOutBamHeader, "PG", "ID", nsgv::gMdArg.PROGRAM_RECORD_ID.c_str(), "VN", FASTDUP_VERSION,
"CL", nsgv::gMdArg.CLI_STR.c_str(), NULL);
// 用同样的线程处理输出文件
// 用同样的线程数量处理输出文件
hts_set_opt(nsgv::gOutBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
hts_set_opt(nsgv::gOutBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite);
sam_close(nsgv::gInBamFp); // 重新打开bam文件
nsgv::gInBamFp = sam_open_format(nsgv::gMdArg.INPUT_FILE.c_str(), "r", nullptr);
if (!nsgv::gInBamFp) {
spdlog::error("[{}] load sam/bam file failed.\n", __func__);
return -1;
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp);
if (sam_hdr_write(nsgv::gOutBamFp, nsgv::gOutBamHeader) != 0) {
spdlog::error("failed writing header to \"{}\"", nsgv::gMdArg.OUTPUT_FILE);
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
return -1;
}
// 输出index文件
string indexFn = nsgv::gMdArg.OUTPUT_FILE + ".bai"; // min_shift = 0 是bai格式
if ("sam" == getFileExtension(nsgv::gMdArg.OUTPUT_FILE) || !nsgv::gMdArg.CREATE_INDEX) {
indexFn = "";
}
if (!indexFn.empty()) {
int index_min_shift = 0;
if (nsgv::gMdArg.INDEX_FORMAT == nsmd::IndexFormat::CSI) {
indexFn = nsgv::gMdArg.OUTPUT_FILE + ".csi";
index_min_shift = 14;
}
if (sam_idx_init(nsgv::gOutBamFp, nsgv::gOutBamHeader, 0 /*csi 14*/, indexFn.c_str()) < 0) {
spdlog::error("failed to open index \"{}\" for writing", indexFn);
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
return -1;
}
}
// 读取输入文件,标记冗余并输出
BamBufType inBuf(nsgv::gMdArg.DUPLEX_IO);
inBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM);
DupIdxQueue<DupInfo> dupIdxQue, repIdxQue;
DupIdxQueue<int64_t> opticalIdxQue;
dupIdxQue.Init(&nsgv::gPipe.intersectData.dupIdxArr);
repIdxQue.Init(&nsgv::gPipe.intersectData.repIdxArr);
opticalIdxQue.Init(&nsgv::gPipe.intersectData.opticalDupIdxArr);
spdlog::info("{} duplicate reads found", dupIdxQue.Size());
uint64_t bamIdx = 0;
DupInfo dupIdx = dupIdxQue.Pop();
DupInfo repIdx = repIdxQue.Pop();
uint64_t opticalIdx = opticalIdxQue.Pop();
ByteBuf bytes;
bam1_t *bp = bam_init1();
bool isDup = false;
bool isOpticalDup = false;
bool isInDuplicateSet = false;
uint32_t representativeReadIndexInFile = 0;
uint32_t duplicateSetSize = 0;
int64_t realDupSize = 0;
// exit(0);
PROF_START(write);
while (inBuf.ReadStat() >= 0) {
PROF_START(final_read);
size_t readNum = inBuf.ReadBam();
PROF_END(gprof[GP_final_read], final_read);
PROF_PRINT_START(read_write);
for (size_t i = 0; i < readNum; ++i) {
BamWrap *bw = inBuf[i];
if (bam_copy1(bp, bw->b) == nullptr) {
spdlog::error("Can not copy sam record!");
return -1;
}
bw->b = bp;
isDup = false;
isOpticalDup = false;
isInDuplicateSet = false;
// 删除原来的duplicate tag
if (nsgv::gMdArg.CLEAR_DT) {
uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str());
if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal);
}
// 统计信息
if (bw->GetReadUnmappedFlag()) {
++nsgv::gMetrics.UNMAPPED_READS;
} else if (bw->IsSecondaryOrSupplementary()) {
++nsgv::gMetrics.SECONDARY_OR_SUPPLEMENTARY_RDS;
} else if (!bw->GetReadPairedFlag() || bw->GetMateUnmappedFlag()) {
++nsgv::gMetrics.UNPAIRED_READS_EXAMINED;
} else {
++nsgv::gMetrics.READ_PAIRS_EXAMINED; // will need to be divided by 2 at the end
}
/* 判断是否冗余 */
if (bamIdx == dupIdx) {
++realDupSize; // for test
isDup = true;
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) {
isInDuplicateSet = true;
representativeReadIndexInFile = dupIdx.repIdx;
duplicateSetSize = dupIdx.dupSet;
}
// 为了防止小内存运行的时候有重复的dupidx这时候dup的repIdx和dupSetSize可能会有不同
while ((dupIdx = dupIdxQue.Pop()) == bamIdx);
if (opticalIdx == bamIdx)
isOpticalDup = true;
else if (opticalIdx < bamIdx) {
while ((opticalIdx = opticalIdxQue.Pop()) < bamIdx);
if (opticalIdx == bamIdx)
isOpticalDup = true;
}
// 添加冗余标识
bw->SetDuplicateReadFlag(true);
uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str());
if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal);
if (isOpticalDup)
bam_aux_append(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str(), 'Z',
nsgv::gMdArg.DUPLICATE_TYPE_SEQUENCING.size() + 1,
(const uint8_t *)nsgv::gMdArg.DUPLICATE_TYPE_SEQUENCING.c_str());
else
bam_aux_append(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str(), 'Z',
nsgv::gMdArg.DUPLICATE_TYPE_LIBRARY.size() + 1,
(const uint8_t *)nsgv::gMdArg.DUPLICATE_TYPE_LIBRARY.c_str());
// 计算统计信息
if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) {
// Update the duplication metrics
if (!bw->GetReadPairedFlag() || bw->GetMateUnmappedFlag()) {
++nsgv::gMetrics.UNPAIRED_READ_DUPLICATES;
} else {
++nsgv::gMetrics.READ_PAIR_DUPLICATES; // will need to be divided by 2 at the end
}
}
} else {
bw->SetDuplicateReadFlag(false);
}
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && bamIdx == repIdx) { // repressent
isInDuplicateSet = true;
representativeReadIndexInFile = repIdx.repIdx;
duplicateSetSize = repIdx.dupSet;
while (repIdx == bamIdx || repIdx.dupSet == 0) {
if (repIdxQue.Size() > 0)
repIdx = repIdxQue.Pop();
else {
repIdx = -1;
break;
}
}
}
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) {
if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) {
// cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl;
uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str());
if (oldTagVal != NULL)
bam_aux_del(bw->b, oldTagVal);
bam_aux_append(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str(), 'i',
sizeof(representativeReadIndexInFile), (uint8_t *)&representativeReadIndexInFile);
oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_SIZE_TAG.c_str());
if (oldTagVal != NULL)
bam_aux_del(bw->b, oldTagVal);
bam_aux_append(bw->b, nsgv::gMdArg.DUPLICATE_SET_SIZE_TAG.c_str(), 'i', sizeof(duplicateSetSize),
(const uint8_t *)&duplicateSetSize);
}
}
// 每个read都要写到output只是添加标识或者删掉这些冗余record
++bamIdx;
if (isDup && nsgv::gMdArg.REMOVE_DUPLICATES) {
continue;
}
if (isOpticalDup && nsgv::gMdArg.REMOVE_SEQUENCING_DUPLICATES) {
continue;
}
if (!nsgv::gMdArg.PROGRAM_RECORD_ID.empty() && nsgv::gMdArg.ADD_PG_TAG_TO_READS) {
uint8_t *oldTagVal = bam_aux_get(bw->b, "PG");
if (oldTagVal != NULL)
bam_aux_del(bw->b, oldTagVal);
bam_aux_append(bw->b, "PG", 'Z', nsgv::gMdArg.PROGRAM_RECORD_ID.size() + 1,
(const uint8_t *)nsgv::gMdArg.PROGRAM_RECORD_ID.c_str());
}
#if 0
if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) {
spdlog::error("failed writing sam record to \"{}\"", nsgv::gMdArg.OUTPUT_FILE.c_str());
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
return -1;
}
#endif
}
inBuf.ClearAll();
PROF_PRINT_END(read_write);
spdlog::info("write {} reads to output", readNum);
}
bam_destroy1(bp);
// 计算统计信息
nsgv::gMetrics.READ_PAIRS_EXAMINED /= 2;
nsgv::gMetrics.READ_PAIR_DUPLICATES /= 2;
for (auto &arr : nsgv::gPipe.intersectData.opticalDupIdxArr)
nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size();
nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES = nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES / 2;
nsgv::gMetrics.ESTIMATED_LIBRARY_SIZE =
estimateLibrarySize(nsgv::gMetrics.READ_PAIRS_EXAMINED - nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES,
nsgv::gMetrics.READ_PAIRS_EXAMINED - nsgv::gMetrics.READ_PAIR_DUPLICATES);
if (nsgv::gMetrics.UNPAIRED_READS_EXAMINED + nsgv::gMetrics.READ_PAIRS_EXAMINED != 0) {
nsgv::gMetrics.PERCENT_DUPLICATION =
(nsgv::gMetrics.UNPAIRED_READ_DUPLICATES + nsgv::gMetrics.READ_PAIR_DUPLICATES * 2) /
(double)(nsgv::gMetrics.UNPAIRED_READS_EXAMINED + nsgv::gMetrics.READ_PAIRS_EXAMINED * 2);
} else {
nsgv::gMetrics.PERCENT_DUPLICATION = 0;
}
calculateRoiHistogram(nsgv::gMetrics);
// 将统计信息写到文件里
if (!nsgv::gMdArg.METRICS_FILE.empty()) {
ofstream ofsM(nsgv::gMdArg.METRICS_FILE);
ofsM << "## StringHeader" << endl;
ofsM << "# " << nsgv::gMdArg.CLI_STR << endl;
ofsM << "## StringHeader" << endl;
ofsM << "# Started on: " << nsgv::gMdArg.START_TIME << endl << endl;
ofsM << "## METRICS"
<< endl;
ofsM << "LIBRARY\tUNPAIRED_READS_EXAMINED\tREAD_PAIRS_EXAMINED\tSECONDARY_OR_SUPPLEMENTARY_RDS\tUNMAPPED_"
"READS\tUNPAIRED_READ_DUPLICATES\tREAD_PAIR_DUPLICATES\tREAD_PAIR_OPTICAL_DUPLICATES\tPERCENT_"
"DUPLICATION\tESTIMATED_LIBRARY_SIZE"
<< endl;
ofsM << nsgv::gMetrics.LIBRARY << "\t" << nsgv::gMetrics.UNPAIRED_READS_EXAMINED << "\t" << nsgv::gMetrics.READ_PAIRS_EXAMINED
<< "\t" << nsgv::gMetrics.SECONDARY_OR_SUPPLEMENTARY_RDS << "\t" << nsgv::gMetrics.UNMAPPED_READS << "\t"
<< nsgv::gMetrics.UNPAIRED_READ_DUPLICATES << "\t" << nsgv::gMetrics.READ_PAIR_DUPLICATES << "\t"
<< nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES << "\t" << nsgv::gMetrics.PERCENT_DUPLICATION << "\t"
<< nsgv::gMetrics.ESTIMATED_LIBRARY_SIZE << endl
<< endl;
ofsM << "## HISTOGRAM\tDouble" << endl;
ofsM << "BIN CoverageMult" << endl;
for (int i = 1; i <= 100; ++i) {
ofsM << i << "\t" << std::fixed << std::setprecision(6) << nsgv::gMetrics.CoverageMult[i] << endl;
}
ofsM.close();
}
PROF_END(gprof[GP_write], write);
if (!indexFn.empty() && sam_idx_save(nsgv::gOutBamFp) < 0) {
spdlog::error("writing index failed");
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
return -1;
}
/* 关闭文件,收尾清理 */
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);
return 0;
}

View File

@ -256,14 +256,9 @@ struct MarkDupsArg {
/* Add PG tag to each read in a SAM or BAM (PGTagArgumentCollection)*/
bool ADD_PG_TAG_TO_READS = true;
// 解析参数
void ParseArgument(int argc, char **argv);
void PrintArgValue();
string GetArgValueString();
static void PrintHelp();
static void PrintVersion();
// 命令行字符串
string CLI_STR;
// 开始运行时间
string START_TIME;
};

View File

@ -2,6 +2,7 @@
#include <klib/kthread.h>
#include <pthread.h>
#include <spdlog/spdlog.h>
#include <algorithm>
#include <iostream>
@ -16,7 +17,6 @@
#include "util/profiling.h"
#include "util/yarn.h"
using std::cout;
using std::vector;
namespace nsgv {
@ -247,7 +247,6 @@ static void markdups(MarkDupDataArg *arg) {
static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds> &rightArr, vector<ReadEnds> *dst,
bool isPairCmp = false) {
if (leftArr.empty() || rightArr.empty()) {
// cout << "bad size: " << leftArr.size() << '\t' << rightArr.size() << endl;
return;
}
const size_t leftEndIdx = leftArr.size() - 1;
@ -782,7 +781,7 @@ static void *pipeRead(void *data) {
yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据
break;
}
cout << "read num: " << readNum << "\t" << readNumSum << '\t' << pipeArg.readOrder << endl;
spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder);
pipeArg.readData.bams = inBamBuf.GetBamArr();
@ -794,7 +793,7 @@ static void *pipeRead(void *data) {
pipeArg.readOrder += 1; // for next
yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据
}
cout << "total reads: " << readNumSum << endl;
spdlog::info("total reads processed {}", readNumSum);
return 0;
}
static void *pipeGenRE(void *data) {
@ -825,7 +824,6 @@ static void *pipeGenRE(void *data) {
break;
}
/* 处理bam生成readends */
// cout << "genRE order: " << pipeArg.genREOrder << "\t" << pipeArg.readData.bamStartIdx << endl;
PROF_START(gen);
doGenRE(pipeArg);
// usleep(200000);
@ -836,7 +834,6 @@ static void *pipeGenRE(void *data) {
yarn::TWIST(pipeArg.genRESig, yarn::BY, 1);
yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // 使用了一次读入的数据
}
cout << "pipe gen reads" << endl;
return 0;
}
static void *pipeSort(void *data) {
@ -867,7 +864,6 @@ static void *pipeSort(void *data) {
break;
}
/* 排序 readends */
// cout << "sort order: " << pipeArg.sortOrder << endl;
PROF_START(sort);
doSort(pipeArg);
PROF_END(gprof[GP_sort], sort);
@ -879,7 +875,6 @@ static void *pipeSort(void *data) {
pipeArg.sortOrder += 1;
yarn::TWIST(pipeArg.sortSig, yarn::BY, 1);
}
cout << "end pipe sort" << endl;
return 0;
}
static void *pipeMarkDup(void *data) {
@ -910,7 +905,6 @@ static void *pipeMarkDup(void *data) {
break;
}
/* 冗余检测 readends */
// cout << "markdup order: " << pipeArg.markDupOrder << endl;
PROF_START(markdup);
doMarkDup(pipeArg);
PROF_END(gprof[GP_markdup], markdup);
@ -921,7 +915,6 @@ static void *pipeMarkDup(void *data) {
pipeArg.markDupOrder += 1;
yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1);
}
cout << "end pipe markdup" << endl;
return 0;
}
static void *pipeIntersect(void *data) {
@ -953,7 +946,6 @@ static void *pipeIntersect(void *data) {
pipeArg.intersectOrder += 1;
}
cout << "end pipe intersect" << endl;
return 0;
}
@ -1060,56 +1052,6 @@ static void parallelPipeline() {
PROF_START(merge_result);
mergeAllTask(pipeArg);
PROF_END(gprof[GP_merge_result], merge_result);
int64_t dupNum = 0;
int64_t opticalDupNum = 0;
for (auto &arr : pipeArg.intersectData.dupIdxArr) dupNum += arr.size();
for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size();
map<int64_t, int> dup;
#if 0
int taskSeq = 0;
for (auto &arr : pipeArg.intersectData.dupIdxArr) {
for (auto idx : arr) {
if (dup.find(idx.idx) != dup.end()) {
//if (taskSeq - 1 > dup[idx])
cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' << idx << endl;
}
dup[idx.idx] = taskSeq;
}
// cout << taskSeq << "\t" << arr.size() << endl;
taskSeq++;
}
#endif
cout << "Final read order: " << pipeArg.readOrder << endl;
cout << "Final gen order: " << pipeArg.genREOrder << endl;
cout << "Final sort order: " << pipeArg.sortOrder << endl;
cout << "Final mark order: " << pipeArg.markDupOrder << endl;
cout << "Final inter order: " << pipeArg.intersectOrder << endl;
// cout << "w read time : " << tm_arr[10].acc_seconds_elapsed() << endl;
// cout << "w gen time : " << tm_arr[11].acc_seconds_elapsed() << endl;
// cout << "w sort time : " << tm_arr[12].acc_seconds_elapsed() << endl;
// cout << "w markdup time : " << tm_arr[13].acc_seconds_elapsed() << endl;
// cout << "w intersect time: " << tm_arr[14].acc_seconds_elapsed() << endl;
//
// cout << "w1 gen time : " << tm_arr[21].acc_seconds_elapsed() << endl;
// cout << "w1 sort time : " << tm_arr[22].acc_seconds_elapsed() << endl;
// cout << "w1 markdup time : " << tm_arr[23].acc_seconds_elapsed() << endl;
//
// cout << "read time : " << tm_arr[0].acc_seconds_elapsed() << endl;
// cout << "gen time : " << tm_arr[1].acc_seconds_elapsed() << endl;
// cout << "sort time : " << tm_arr[2].acc_seconds_elapsed() << endl;
// cout << "markdup time : " << tm_arr[3].acc_seconds_elapsed() << endl;
// cout << "intersect time: " << tm_arr[4].acc_seconds_elapsed() << endl;
//
// cout << "copy time: " << tm_arr[5].acc_seconds_elapsed() << endl;
// cout << "merge al6 time: " << tm_arr[6].acc_seconds_elapsed() << endl;
//
cout << "dup num : " << dupNum << "\t" << dup.size() << endl;
cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl;
}
/* 并行流水线方式处理数据,标记冗余 */
@ -1133,7 +1075,7 @@ void pipelineMarkDups() {
if (readNum < 1) {
break;
}
cout << "read num: " << readNum << "\t" << readNumSum << '\t' << pipeArg.readOrder << endl;
spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder);
pipeArg.readData.bams = inBamBuf.GetBamArr();
pipeArg.readData.bamStartIdx = readNumSum;
@ -1158,32 +1100,4 @@ void pipelineMarkDups() {
pipeArg.readOrder += 1; // for next
}
mergeAllTask(pipeArg);
int64_t dupNum = 0;
int64_t opticalDupNum = 0;
for (auto &arr : pipeArg.intersectData.dupIdxArr) dupNum += arr.size();
for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size();
map<int64_t, int> dup;
#if 0
int taskSeq = 0;
for (auto &arr : pipeArg.intersectData.dupIdxArr) {
for (auto idx : arr) {
if (dup.find(idx.idx) != dup.end()) {
cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' << idx << endl;
}
dup[idx.idx] = taskSeq;
}
taskSeq++;
}
#endif
cout << "total reads: " << readNumSum << endl;
cout << "Final read order: " << pipeArg.readOrder << endl;
cout << "Final gen order: " << pipeArg.genREOrder << endl;
cout << "Final sort order: " << pipeArg.sortOrder << endl;
cout << "Final mark order: " << pipeArg.markDupOrder << endl;
cout << "Final inter order: " << pipeArg.intersectOrder << endl;
cout << "dup num : " << dupNum << "\t" << dup.size() << endl;
cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl;
}

View File

@ -32,6 +32,7 @@ struct PhysicalLocation {
int16_t tile = -1;
// int32_t x = -1;
// int32_t y = -1;
// This is a bug in picard Markduplicates, because some tile coordinates exceede the range of int16_t
int16_t x = -1;
int16_t y = -1;
};

View File

@ -68,6 +68,9 @@ int DisplayProfiling(int nthread) {
PRINT_GP(merge_markdup);
PRINT_GP(merge_update);
PRINT_GP(merge_add);
PRINT_GP(markdup_all);
PRINT_GP(final_read);
PRINT_GP(write);
PRINT_TP(gen, nthread);
PRINT_TP(sort_frag, nthread);

View File

@ -25,9 +25,15 @@ extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE];
#define PROF_START(tmp_time) uint64_t prof_tmp_##tmp_time = RealtimeMsec()
#define PROF_START_AGAIN(tmp_time) prof_tmp_##tmp_time = RealtimeMsec()
#define PROF_END(result, tmp_time) result += RealtimeMsec() - prof_tmp_##tmp_time
#define PROF_PRINT_START(tmp_time) uint64_t tmp_time = RealtimeMsec()
#define PROF_PRINT_END(tmp_time) \
tmp_time = RealtimeMsec() - tmp_time; \
fprintf(stderr, "time %-15s: %0.2lfs\n", #tmp_time, tmp_time * 1.0 / proc_freq)
#else
#define PROF_START(tmp_time)
#define PROF_END(result, tmp_time)
#define PROF_PRINT_START(tmp_time)
#define PROF_PRINT_END(tmp_time)
#endif
// GLOBAL
@ -51,7 +57,10 @@ enum {
GP_merge_match,
GP_merge_markdup,
GP_merge_update,
GP_merge_add
GP_merge_add,
GP_markdup_all,
GP_final_read,
GP_write
};
// THREAD
enum { TP_0 = 0, TP_1, TP_2, TP_3, TP_4, TP_5, TP_6, TP_7, TP_8, TP_9, TP_10 };