删除了一些调试信息,可以发布的版本了

This commit is contained in:
zzh 2025-03-04 01:29:19 +08:00
parent 3606562de9
commit f13c875a5e
5 changed files with 25 additions and 75 deletions

View File

@ -79,8 +79,6 @@ int MarkDuplicates() {
htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池 htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池
htsPoolRead.pool = hts_tpool_init(nsgv::gMdArg.NUM_THREADS); htsPoolRead.pool = hts_tpool_init(nsgv::gMdArg.NUM_THREADS);
htsPoolWrite.pool = hts_tpool_init(nsgv::gMdArg.NUM_THREADS); htsPoolWrite.pool = hts_tpool_init(nsgv::gMdArg.NUM_THREADS);
// htsPoolRead.pool = hts_tpool_init(8);
// htsPoolWrite.pool = hts_tpool_init(32);
if (!htsPoolRead.pool || !htsPoolWrite.pool) { if (!htsPoolRead.pool || !htsPoolWrite.pool) {
spdlog::error("[{}] failed to set up thread pool", __LINE__); spdlog::error("[{}] failed to set up thread pool", __LINE__);
sam_close(nsgv::gInBamFp); sam_close(nsgv::gInBamFp);
@ -88,14 +86,6 @@ int MarkDuplicates() {
} }
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
// 测试读写速度
#if 0
bam1_t *bamp = bam_init1();
while (sam_read1(nsgv::gInBamFp, nsgv::gInBamHeader, bamp) >= 0);
DisplayProfiling(nsgv::gMdArg.NUM_THREADS);
exit(0);
#endif
/* 冗余检查和标记 */ /* 冗余检查和标记 */
PROF_START(markdup_all); PROF_START(markdup_all);
PipelineMarkDups(); PipelineMarkDups();
@ -162,11 +152,9 @@ int MarkDuplicates() {
opticalIdxQue.Init(&nsgv::gDupRes.opticalDupIdxArr); opticalIdxQue.Init(&nsgv::gDupRes.opticalDupIdxArr);
spdlog::info("{} duplicate reads found", dupIdxQue.Size()); spdlog::info("{} duplicate reads found", dupIdxQue.Size());
spdlog::info("{} optical reads found", opticalIdxQue.Size()); spdlog::info("{} optical reads found", opticalIdxQue.Size());
spdlog::info("{} represent reads found", repIdxQue.Size()); // spdlog::info("{} represent reads found", repIdxQue.Size());
// spdlog::info("real dup size: {}", dupIdxQue.RealSize());
// spdlog::info("real optical dup size: {}", opticalIdxQue.RealSize());
return 0; // return 0;
uint64_t bamIdx = 0; uint64_t bamIdx = 0;
DupInfo dupIdx = dupIdxQue.Pop(); DupInfo dupIdx = dupIdxQue.Pop();
@ -183,19 +171,12 @@ int MarkDuplicates() {
int64_t realDupSize = 0; int64_t realDupSize = 0;
ofstream ofs("newdup.txt");
// return 0;
// for debug
int64_t maxDiff = 0;
int64_t minDiff = 0;
PROF_START(write); PROF_START(write);
while (inBuf.ReadStat() >= 0) { while (inBuf.ReadStat() >= 0) {
PROF_START(final_read); PROF_START(final_read);
size_t readNum = inBuf.ReadBam(); size_t readNum = inBuf.ReadBam();
PROF_END(gprof[GP_final_read], final_read); PROF_END(gprof[GP_final_read], final_read);
PROF_PRINT_START(read_write); // PROF_PRINT_START(read_write);
for (size_t i = 0; i < readNum; ++i) { for (size_t i = 0; i < readNum; ++i) {
BamWrap *bw = inBuf[i]; BamWrap *bw = inBuf[i];
if (bam_copy1(bp, bw->b) == nullptr) { if (bam_copy1(bp, bw->b) == nullptr) {
@ -225,9 +206,7 @@ int MarkDuplicates() {
} }
/* 判断是否冗余 */ /* 判断是否冗余 */
// cout << dupIdx << endl;
if (bamIdx == dupIdx) { if (bamIdx == dupIdx) {
// ofs << bamIdx << endl;
++realDupSize; // for test ++realDupSize; // for test
isDup = true; isDup = true;
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) { if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) {
@ -236,14 +215,6 @@ int MarkDuplicates() {
duplicateSetSize = dupIdx.dupSet; duplicateSetSize = dupIdx.dupSet;
} }
#if 1
// spdlog::info("diff: {}", dupIdx.idx - dupIdx.repIdx);
//maxDiff = std::max(maxDiff, dupIdx.idx - dupIdx.repIdx);
//minDiff = std::min(minDiff, dupIdx.idx - dupIdx.repIdx);
//spdlog::info("min: {}, max: {}", minDiff, maxDiff);
#endif
// 为了防止小内存运行的时候有重复的dupidx这时候dup的repIdx和dupSetSize可能会有不同 // 为了防止小内存运行的时候有重复的dupidx这时候dup的repIdx和dupSetSize可能会有不同
while ((dupIdx = dupIdxQue.Pop()) == bamIdx); while ((dupIdx = dupIdxQue.Pop()) == bamIdx);
if (opticalIdx == bamIdx) if (opticalIdx == bamIdx)
@ -297,8 +268,6 @@ int MarkDuplicates() {
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) { if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) {
if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) { if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) {
// cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl;
// ofs << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl;
uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str()); uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str());
if (oldTagVal != NULL) if (oldTagVal != NULL)
bam_aux_del(bw->b, oldTagVal); bam_aux_del(bw->b, oldTagVal);
@ -326,7 +295,7 @@ int MarkDuplicates() {
bam_aux_append(bw->b, "PG", 'Z', nsgv::gMdArg.PROGRAM_RECORD_ID.size() + 1, bam_aux_append(bw->b, "PG", 'Z', nsgv::gMdArg.PROGRAM_RECORD_ID.size() + 1,
(const uint8_t *)nsgv::gMdArg.PROGRAM_RECORD_ID.c_str()); (const uint8_t *)nsgv::gMdArg.PROGRAM_RECORD_ID.c_str());
} }
#if 0 #if 1
if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) { if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) {
spdlog::error("failed writing sam record to \"{}\"", nsgv::gMdArg.OUTPUT_FILE.c_str()); spdlog::error("failed writing sam record to \"{}\"", nsgv::gMdArg.OUTPUT_FILE.c_str());
sam_close(nsgv::gOutBamFp); sam_close(nsgv::gOutBamFp);
@ -336,7 +305,7 @@ int MarkDuplicates() {
#endif #endif
} }
inBuf.ClearAll(); inBuf.ClearAll();
PROF_PRINT_END(read_write); // PROF_PRINT_END(read_write);
spdlog::info("write {} reads to output", readNum); spdlog::info("write {} reads to output", readNum);
} }
bam_destroy1(bp); bam_destroy1(bp);
@ -397,7 +366,5 @@ int MarkDuplicates() {
sam_close(nsgv::gOutBamFp); sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp); sam_close(nsgv::gInBamFp);
ofs.close();
return 0; return 0;
} }

View File

@ -644,7 +644,7 @@ static void *pipeRead(void *data) {
pipeArg.readOrder += 1; // for next pipeArg.readOrder += 1; // for next
yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据
} }
spdlog::info("total reads processed {}, last order {}", readNumSum, pipeArg.readOrder); spdlog::info("{} total reads processed", readNumSum);
return 0; return 0;
} }
static void *pipeGenRE(void *data) { static void *pipeGenRE(void *data) {
@ -683,7 +683,6 @@ static void *pipeGenRE(void *data) {
yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); yarn::TWIST(pipeArg.genRESig, yarn::BY, 1);
yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // 使用了一次读入的数据 yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // 使用了一次读入的数据
} }
spdlog::info("genRE last order {}", pipeArg.genREOrder);
return 0; return 0;
} }
static void *pipeSort(void *data) { static void *pipeSort(void *data) {
@ -702,7 +701,6 @@ static void *pipeSort(void *data) {
if (pipeArg.genREFinish) { if (pipeArg.genREFinish) {
// 处理完剩余数据 // 处理完剩余数据
cout << "zzh pipeSort: " << pipeArg.genREOrder << '\t' << pipeArg.sortOrder << endl;
while (pipeArg.sortOrder < pipeArg.genREOrder) { while (pipeArg.sortOrder < pipeArg.genREOrder) {
yarn::POSSESS(pipeArg.sortSig); yarn::POSSESS(pipeArg.sortSig);
yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // 有BUFNUM个位置 yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // 有BUFNUM个位置
@ -733,7 +731,6 @@ static void *pipeSort(void *data) {
pipeArg.sortOrder += 1; pipeArg.sortOrder += 1;
yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); yarn::TWIST(pipeArg.sortSig, yarn::BY, 1);
} }
spdlog::info("sort last order {}", pipeArg.sortOrder);
return 0; return 0;
} }
static void *pipeMarkDup(void *data) { static void *pipeMarkDup(void *data) {
@ -751,10 +748,8 @@ static void *pipeMarkDup(void *data) {
yarn::RELEASE(pipeArg.markDupSig); yarn::RELEASE(pipeArg.markDupSig);
if (pipeArg.sortFinish) { if (pipeArg.sortFinish) {
cout << "zzh pipeMarkDup: " << pipeArg.sortOrder << '\t' << pipeArg.markDupOrder << endl;
// 应该还得处理剩余的数据 // 应该还得处理剩余的数据
while (pipeArg.markDupOrder < pipeArg.sortOrder) { while (pipeArg.markDupOrder < pipeArg.sortOrder) {
cout << "zzh pipeMarkDup: " << pipeArg.sortOrder << '\t' << pipeArg.markDupOrder << endl;
yarn::POSSESS(pipeArg.markDupSig); yarn::POSSESS(pipeArg.markDupSig);
yarn::WAIT_FOR(pipeArg.markDupSig, yarn::NOT_TO_BE, pipeArg.MARKBUFNUM); yarn::WAIT_FOR(pipeArg.markDupSig, yarn::NOT_TO_BE, pipeArg.MARKBUFNUM);
yarn::RELEASE(pipeArg.markDupSig); yarn::RELEASE(pipeArg.markDupSig);
@ -767,7 +762,6 @@ static void *pipeMarkDup(void *data) {
pipeArg.markDupOrder += 1; pipeArg.markDupOrder += 1;
yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1);
} }
cout << "zzh pipeMarkDup: " << pipeArg.sortOrder << '\t' << pipeArg.markDupOrder << endl;
yarn::POSSESS(pipeArg.markDupSig); yarn::POSSESS(pipeArg.markDupSig);
pipeArg.markDupFinish = 1; pipeArg.markDupFinish = 1;
yarn::TWIST(pipeArg.markDupSig, yarn::TO, 2 + pipeArg.MARKBUFNUM); yarn::TWIST(pipeArg.markDupSig, yarn::TO, 2 + pipeArg.MARKBUFNUM);
@ -784,7 +778,6 @@ static void *pipeMarkDup(void *data) {
pipeArg.markDupOrder += 1; pipeArg.markDupOrder += 1;
yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1);
} }
spdlog::info("markdup last order {}", pipeArg.markDupOrder);
return 0; return 0;
} }
static void *pipeIntersect(void *data) { static void *pipeIntersect(void *data) {
@ -799,7 +792,6 @@ static void *pipeIntersect(void *data) {
PROF_END(gprof[GP_intersect_wait], intersect_wait); PROF_END(gprof[GP_intersect_wait], intersect_wait);
if (pipeArg.markDupFinish) { if (pipeArg.markDupFinish) {
cout << "zzh pipeIntersect: " << pipeArg.markDupOrder << '\t' << pipeArg.intersectOrder << endl;
while (pipeArg.intersectOrder < pipeArg.markDupOrder) { while (pipeArg.intersectOrder < pipeArg.markDupOrder) {
PROF_START(intersect); PROF_START(intersect);
doIntersect(pipeArg); doIntersect(pipeArg);
@ -819,7 +811,6 @@ static void *pipeIntersect(void *data) {
pipeArg.intersectOrder += 1; pipeArg.intersectOrder += 1;
} }
spdlog::info("intersect last order {}", pipeArg.intersectOrder);
return 0; return 0;
} }
@ -866,13 +857,13 @@ static void parallelPipeline() {
processLastData(pipeArg); processLastData(pipeArg);
PROF_END(gprof[GP_merge_result], merge_result); PROF_END(gprof[GP_merge_result], merge_result);
spdlog::info("pipeArg size : {} GB", pipeArg.byteSize() / 1024.0 / 1024 / 1024); // spdlog::info("pipeArg size : {} GB", pipeArg.byteSize() / 1024.0 / 1024 / 1024);
size_t repNum = 0; // size_t repNum = 0;
for (auto &v : pipeArg.intersectData.repIdxArr) repNum += v.size(); // for (auto &v : pipeArg.intersectData.repIdxArr) repNum += v.size();
spdlog::info("rep num : {}", repNum); // spdlog::info("rep num : {}", repNum);
spdlog::info("result size : {} GB", nsgv::gDupRes.byteSize() / 1024.0 / 1024 / 1024); // spdlog::info("result size : {} GB", nsgv::gDupRes.byteSize() / 1024.0 / 1024 / 1024);
} }
/* 并行流水线方式处理数据,标记冗余 */ /* 并行流水线方式处理数据,标记冗余 */

View File

@ -302,7 +302,7 @@ struct DupIdxQueue {
preTop = topIdx; preTop = topIdx;
} }
// ofs.close(); ofs1.close(); // ofs.close(); ofs1.close();
cout << "RealSize: " << len << endl; // cout << "RealSize: " << len << endl;
return len; return len;
} }
}; };

View File

@ -14,8 +14,7 @@ Date : 2023/11/3
#include <util/bam_wrap.h> #include <util/bam_wrap.h>
#include <algorithm> #include <algorithm>
#include <iostream>
using namespace std;
/** /**
* Small interface that provides access to the physical location information * Small interface that provides access to the physical location information
* about a cluster. All values should be defaulted to -1 if unavailable. * about a cluster. All values should be defaulted to -1 if unavailable.
@ -148,12 +147,12 @@ struct ReadEnds : PhysicalLocation {
comp = a.read2ReferenceIndex - b.read2ReferenceIndex; comp = a.read2ReferenceIndex - b.read2ReferenceIndex;
if (comp == 0) if (comp == 0)
comp = a.read2Coordinate - b.read2Coordinate; comp = a.read2Coordinate - b.read2Coordinate;
// if (comp == 0) if (comp == 0)
// comp = a.tile - b.tile; comp = a.tile - b.tile;
// if (comp == 0) if (comp == 0)
// comp = a.x - b.x; // 由于picard的bug用short类型来表示xy导致其可能为负数 comp = a.x - b.x; // 由于picard的bug用short类型来表示xy导致其可能为负数
// if (comp == 0) if (comp == 0)
// comp - a.y - b.y; comp - a.y - b.y;
if (comp == 0) if (comp == 0)
comp = (int)(a.read1IndexInFile - b.read1IndexInFile); comp = (int)(a.read1IndexInFile - b.read1IndexInFile);
if (comp == 0) if (comp == 0)
@ -161,11 +160,6 @@ struct ReadEnds : PhysicalLocation {
return comp < 0; return comp < 0;
} }
void Print() {
std::cout << read1ReferenceIndex << '\t' << read1Coordinate << '\t' << read2ReferenceIndex << '\t' << read2Coordinate
<< '\t' << orientation << '\t' << read1IndexInFile << '\t' << read2IndexInFile << std::endl;
}
static bool PairLittleThan(const ReadEnds &a, const ReadEnds &b) { static bool PairLittleThan(const ReadEnds &a, const ReadEnds &b) {
int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; int comp = a.read1ReferenceIndex - b.read1ReferenceIndex;
if (comp == 0) if (comp == 0)
@ -176,12 +170,12 @@ struct ReadEnds : PhysicalLocation {
comp = a.read2Coordinate - b.read2Coordinate; comp = a.read2Coordinate - b.read2Coordinate;
if (comp == 0) // 这个放在坐标比较了之后,把坐标范围的放在之前,这样对分段数据块比较好处理 if (comp == 0) // 这个放在坐标比较了之后,把坐标范围的放在之前,这样对分段数据块比较好处理
comp = a.orientation - b.orientation; comp = a.orientation - b.orientation;
// if (comp == 0) if (comp == 0)
// comp = a.tile - b.tile; comp = a.tile - b.tile;
// if (comp == 0) if (comp == 0)
// comp = a.x - b.x; // 由于picard的bug用short类型来表示xy导致其可能为负数 comp = a.x - b.x; // 由于picard的bug用short类型来表示xy导致其可能为负数
// if (comp == 0) if (comp == 0)
// comp - a.y - b.y; comp - a.y - b.y;
if (comp == 0) if (comp == 0)
comp = (int)(a.read1IndexInFile - b.read1IndexInFile); comp = (int)(a.read1IndexInFile - b.read1IndexInFile);
if (comp == 0) if (comp == 0)

View File

@ -138,7 +138,6 @@ struct ReadNameParser {
} else { } else {
// Standard version that will use the regex // Standard version that will use the regex
cmatch m; cmatch m;
// cout << "here1" << endl;
if (std::regex_match(readName.c_str(), m, readNamePattern)) { if (std::regex_match(readName.c_str(), m, readNamePattern)) {
loc->tile = std::stoi(m[1].str()); loc->tile = std::stoi(m[1].str());
loc->x = std::stoi(m[2].str()); loc->x = std::stoi(m[2].str());
@ -202,7 +201,6 @@ struct ReadNameParser {
if (readName.at(i) == delim || 0 == i) { if (readName.at(i) == delim || 0 == i) {
numFields++; numFields++;
const int startIdx = (0 == i) ? 0 : (i + 1); const int startIdx = (0 == i) ? 0 : (i + 1);
// cout << readName << endl;
tmpLocationFields[tokensIdx] = std::stoi(readName.substr(startIdx, endIdx - startIdx)); tmpLocationFields[tokensIdx] = std::stoi(readName.substr(startIdx, endIdx - startIdx));
tokensIdx--; tokensIdx--;
endIdx = i; endIdx = i;