代码重构基本完成,还差markdup里的一些调用和处理代码

This commit is contained in:
zzh 2024-12-15 03:20:35 +08:00
parent 27e0af955e
commit e8b5b4405a
27 changed files with 4430 additions and 17 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@
/.vscode /.vscode
/build /build
build.sh build.sh
run.sh
# Compiled Object files # Compiled Object files
*.slo *.slo

View File

@ -1,8 +1,9 @@
CMAKE_MINIMUM_REQUIRED(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
project(FastDup) project(FastDup)
set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
# set(CMAKE_BUILD_TYPE Debug) # set(CMAKE_BUILD_TYPE Debug)
# set(CMAKE_BUILD_TYPE Release) # set(CMAKE_BUILD_TYPE Release)
ADD_SUBDIRECTORY(src) add_definitions(-DSHOW_PERF)
add_subdirectory(src)

View File

@ -45,7 +45,7 @@ else()
endif() endif()
# lib deflate # lib deflate
find_library(DeflateLib dlibdeflate) find_library(DeflateLib deflate)
if(DeflateLib) if(DeflateLib)
target_link_libraries(${PG_NAME} ${DeflateLib}) target_link_libraries(${PG_NAME} ${DeflateLib})
else() else()
@ -61,6 +61,13 @@ else()
message(FATAL_ERROR "lzma is not found") message(FATAL_ERROR "lzma is not found")
endif() endif()
# lib curl
find_package(CURL REQUIRED)
if (CURL_FOUND)
include_directories(${CURL_INCLUDE_DIR})
target_link_libraries(${PG_NAME} ${CURL_LIBRARY})
endif()
# lz # lz
find_library(Z_LIBRARY z) find_library(Z_LIBRARY z)
if(Z_LIBRARY) if(Z_LIBRARY)

View File

@ -3,23 +3,207 @@
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <argparse/argparse.hpp> #include <argparse/argparse.hpp>
#include <iostream>
#include <map>
#include <set>
#include <string>
#include "version.h"
#include "markdup/md_args.h" #include "markdup/md_args.h"
#include "util/profiling.h"
#include "version.h"
using namespace std; namespace nsgv {
extern MarkDupsArg gMdArg; extern MarkDupsArg gMdArg;
};
int MarkDuplicates();
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
// init log // init log
spdlog::set_default_logger(spdlog::stderr_color_st("fastdup")); spdlog::set_default_logger(spdlog::stderr_color_st("fastdup"));
spdlog::cfg::load_env_levels(); spdlog::cfg::load_env_levels();
// init arg parser
argparse::ArgumentParser cliArgs()
// init arg parser
argparse::ArgumentParser program(nsgv::gMdArg.PROGRAM_RECORD_ID, FASTDUP_VERSION, argparse::default_arguments::none);
program.add_description(
"Identifies duplicate reads. This tool locates and tags duplicate reads in a coordinate ordered SAM or BAM "
"file.\nUse the same algorithm as picard MarkDuplicates and output identical results.\n"
"Use spdlog as log tool and the default level is 'info'.");
program.add_argument("--input")
.help("Input file. One coordinate ordered SAM or BAM file.")
.metavar("<INPUT>")
.required();
program.add_argument("--metrics")
.help("Metrics file. File to write duplication metrics to.")
.metavar("<METRICS>")
.required();
program.add_argument("--output")
.help("Output file. SAM or BAM file to write marked records to.")
.metavar("<OUTPUT>")
.required();
program.add_argument("--num-threads")
.help("Number of threads to use.")
.scan<'i', int>()
.default_value(1)
.nargs(1)
.metavar("<NUM_THREADS>");
program.add_argument("--none-duplex-io")
.help("Do not use writing-while-reading mode.")
.default_value(false)
.implicit_value(true);
program.add_argument("--create-index")
.help("Whether to create an index when writing coordinate sorted BAM output.")
.default_value(false)
.implicit_value(true);
program.add_argument("--index-format")
.help("Format for bam index file. Possible values: {BAI, CSI}")
.default_value(std::string("BAI"))
.choices("BAI", "CSI")
.nargs(1)
.metavar("<IndexFormat>");
program.add_argument("--remove-duplicates")
.help("If true do not write duplicates to the output file instead of writing them with appropriate flags set.")
.default_value(false)
.implicit_value(true);
program.add_argument("--duplicate-scoring-strategy")
.help(
"The scoring strategy for choosing the non-duplicate among candidates. "
"Possible values: {SUM_OF_BASE_QUALITIES, TOTAL_MAPPED_REFERENCE_LENGTH, RANDOM}")
.default_value(std::string("SUM_OF_BASE_QUALITIES"))
.choices("SUM_OF_BASE_QUALITIES", "TOTAL_MAPPED_REFERENCE_LENGTH", "RANDOM")
.nargs(1)
.metavar("<ScoringStrategy>");
program.add_argument("--optical-duplicate-pixel-distance")
.help(
"\nThe maximum offset between two duplicate clusters in order to consider them optical"
"duplicates. The default is appropriate for unpatterned versions of the Illumina platform."
"For the patterned flowcell models, 2500 is moreappropriate. For other platforms and"
"models, users should experiment to find what works best.")
.scan<'i', int>()
.default_value(100)
.nargs(1)
.metavar("<Integer>");
program.add_argument("--read-name-regex")
.help(
"\nMarkDuplicates can use the tile and cluster positions to estimate the rate of optical "
"duplication in addition to the dominant source of duplication, PCR, to provide a more "
"accurate estimation of library size. By default (with no READ_NAME_REGEX specified), "
"MarkDuplicates will attempt to extract coordinates using a split on ':' (see Note below). "
"Set READ_NAME_REGEX to 'null' to disable optical duplicate detection. Note that without "
"optical duplicate counts, library size estimation will be less accurate. If the read name "
"does not follow a standard Illumina colon-separation convention, but does contain tile and "
"x,y coordinates, a regular expression can be specified to extract three variables: "
"tile/region, x coordinate and y coordinate from a read name. The regular expression must "
"contain three capture groups for the three variables, in order. It must match the entire "
"read name. e.g. if field names were separated by semi-colon (';') this example regex "
"could be specified (?:.*;)?([0-9]+)[^;]*;([0-9]+)[^;]*;([0-9]+)[^;]*$ Note that if no "
"READ_NAME_REGEX is specified, the read name is split on ':'. For 5 element names, the "
"3rd, 4th and 5th elements are assumed to be tile, x and y values. For 7 element names "
"(CASAVA 1.8), the 5th, 6th, and 7th elements are assumed to be tile, x and y values. "
"Default value: <optimized capture of last three ':' separated fields as numeric values>.")
.default_value(std::string("(?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$"))
.nargs(1)
.metavar("<ReadNameRegex>");
program.add_argument("--tag-duplicate-set-members")
.help(
"\nIf a read appears in a duplicate set, add two tags. The first tag, DUPLICATE_SET_SIZE_TAG"
"(DS), indicates the size of the duplicate set. The smallest possible DS value is 2 which"
"occurs when two reads map to the same portion of the reference only one of which is marked"
"as duplicate. The second tag, DUPLICATE_SET_INDEX_TAG (DI), represents a unique identifier"
"for the duplicate set to which the record belongs. This identifier is the index-in-file of"
"the representative read that was selected out of the duplicate set.")
.default_value(false)
.implicit_value(true);
program.add_argument("--tagging-policy")
.help(
"Determines how duplicate types are recorded in the DT optional attribute. Possible values: {DontTag, "
"OpticalOnly, All}.")
.default_value(std::string("DontTag"))
.choices("DontTag", "OpticalOnly", "All")
.nargs(1)
.metavar("<DuplicateTaggingPolicy>");
// add help and version args
program.add_argument("-h", "--help")
.action([&](const auto & /*unused*/) {
std::cout << program.help().str();
std::exit(0);
})
.default_value(false)
.help("shows help message and exits")
.implicit_value(true)
.nargs(0);
program.add_argument("-v", "--version")
.action([&](const auto & /*unused*/) {
std::cout << FASTDUP_VERSION << std::endl;
std::exit(0);
})
.default_value(false)
.help("prints version information and exits")
.implicit_value(true)
.nargs(0);
// std::cout << program << std::endl;
try {
program.parse_args(argc, argv);
nsgv::gMdArg.INPUT_FILE = program.get("--input");
nsgv::gMdArg.OUTPUT_FILE = program.get("--output");
nsgv::gMdArg.METRICS_FILE = program.get("--metrics");
nsgv::gMdArg.NUM_THREADS = program.get<int>("--num-threads");
nsgv::gMdArg.DUPLEX_IO = !program.get<bool>("--none-duplex-io");
nsgv::gMdArg.CREATE_INDEX = program.get<bool>("--create-index");
nsgv::gMdArg.INDEX_FORMAT =
program.get("--index-format") == "BAI" ? nsmd::IndexFormat::BAI : nsmd::IndexFormat::CSI;
nsgv::gMdArg.REMOVE_DUPLICATES = program.get<bool>("--remove-duplicates");
std::map<std::string, nsmd::ScoringStrategy> scoring_strategy_args = {
{"SUM_OF_BASE_QUALITIES", nsmd::ScoringStrategy::SUM_OF_BASE_QUALITIES},
{"TOTAL_MAPPED_REFERENCE_LENGTH", nsmd::ScoringStrategy::TOTAL_MAPPED_REFERENCE_LENGTH},
{"RANDOM", nsmd::ScoringStrategy::RANDOM}};
nsgv::gMdArg.DUPLICATE_SCORING_STRATEGY = scoring_strategy_args[program.get("--duplicate-scoring-strategy")];
nsgv::gMdArg.OPTICAL_DUPLICATE_PIXEL_DISTANCE = program.get<int>("--optical-duplicate-pixel-distance");
std::set<string> all_nulls = {"null", "Null", "NULL"};
nsgv::gMdArg.READ_NAME_REGEX =
all_nulls.find(program.get("--read-name-regex")) != all_nulls.end() ? "" : program.get("--read-name-regex");
if (nsgv::gMdArg.READ_NAME_REGEX.empty())
nsgv::gMdArg.CHECK_OPTICAL_DUP = false;
nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS = program.get<bool>("--tag-duplicate-set-members");
std::map<std::string, nsmd::DuplicateTaggingPolicy> tagging_policy_args = {
{"DontTag", nsmd::DuplicateTaggingPolicy::DontTag},
{"OpticalOnly", nsmd::DuplicateTaggingPolicy::OpticalOnly},
{"All", nsmd::DuplicateTaggingPolicy::All}};
nsgv::gMdArg.TAGGING_POLICY = tagging_policy_args[program.get("--tagging-policy")];
} catch (const std::exception &err) {
spdlog::error(err.what());
return 1;
}
spdlog::info("fast markduplicates start"); spdlog::info("fast markduplicates start");
MarkDuplicates();
spdlog::info("fast markduplicates end");
DisplayProfiling(1);
return 0; return 0;
} }

View File

@ -0,0 +1,2 @@
#include "read_name_parser.h"
bool ReadNameParser::sWrongNameFormat = false;

View File

@ -0,0 +1,72 @@
#pragma once
#include <stdint.h>
#include <string>
#include <vector>
#include "md_types.h"
using std::string;
using std::vector;
/*
*
*/
struct DuplicationMetrics {
/**
* The library on which the duplicate marking was performed.
*/
string LIBRARY = "";
/**
* The number of mapped reads examined which did not have a mapped mate pair,
* either because the read is unpaired, or the read is paired to an unmapped mate.
*/
uint64_t UNPAIRED_READS_EXAMINED = 0;
/**
* The number of mapped read pairs examined. (Primary, non-supplemental)
*/
uint64_t READ_PAIRS_EXAMINED = 0;
/**
* The number of reads that were either secondary or supplementary
*/
uint64_t SECONDARY_OR_SUPPLEMENTARY_RDS = 0;
/**
* The total number of unmapped reads examined. (Primary, non-supplemental)
*/
uint64_t UNMAPPED_READS = 0;
/**
* The number of fragments that were marked as duplicates.
*/
uint64_t UNPAIRED_READ_DUPLICATES = 0;
/**
* The number of read pairs that were marked as duplicates.
*/
uint64_t READ_PAIR_DUPLICATES = 0;
/**
* The number of read pairs duplicates that were caused by optical duplication.
* Value is always < READ_PAIR_DUPLICATES, which counts all duplicates regardless of source.
*/
uint64_t READ_PAIR_OPTICAL_DUPLICATES = 0;
/**
* The fraction of mapped sequence that is marked as duplicate.
*/
double PERCENT_DUPLICATION = 0.0;
/**
* The estimated number of unique molecules in the library based on PE duplication.
*/
uint64_t ESTIMATED_LIBRARY_SIZE = 0;
// 其他的统计数据
vector<double> CoverageMult;
// 比如在该位置有4个冗余那么所有4个冗余的位置数量
MDMap DuplicateCountHist;
MDMap NonOpticalDuplicateCountHist;
MDMap OpticalDuplicatesCountHist;
// 没有冗余的位置总数量 for test
MDSet<int64_t> singletonReads;
MDSet<int64_t> dupReads; // for test
MDSet<int64_t> bestReads;
};

View File

@ -1 +0,0 @@
#pragma once

View File

@ -7,4 +7,88 @@ Copyright : All right reserved by ICT
Author : Zhang Zhonghai Author : Zhang Zhonghai
Date : 2023/10/23 Date : 2023/10/23
*/ */
#include <htslib/sam.h>
#include <htslib/thread_pool.h>
#include <spdlog/spdlog.h>
#include <vector>
#include "dup_metrics.h"
#include "md_args.h"
#include "md_funcs.h"
#include "pipeline_md.h"
#include "read_name_parser.h"
#include "util/profiling.h"
#include "version.h"
#define BAM_BLOCK_SIZE 16L * 1024 * 1024
namespace nsgv {
MarkDupsArg gMdArg; // 用来测试性能
std::vector<ReadNameParser> gNameParsers; // 每个线程一个read name parser
samFile *gInBamFp; // 输入的bam文件
sam_hdr_t *gInBamHeader; // 输入的bam文件头信息
samFile *gOutBamFp; // 输出文件, sam或者bam格式
sam_hdr_t *gOutBamHeader; // 输出文件的header
DuplicationMetrics gMetrics; // 统计信息
PipelineArg gPipe;
};
/*
*
*/
static string getFileExtension(const string &filename) {
auto last_dot = filename.find_last_of('.');
if (last_dot == string::npos) {
return "";
}
return filename.substr(last_dot + 1);
}
// entrance of mark duplicates
int MarkDuplicates() {
/* 初始化一些参数和变量*/
nsgv::gNameParsers.resize(nsgv::gMdArg.NUM_THREADS);
for (auto &parser : nsgv::gNameParsers)
parser.SetReadNameRegex(nsgv::gMdArg.READ_NAME_REGEX); // 用来解析read name中的tilexy信息
/* 打开输入bam文件 */
nsgv::gInBamFp = sam_open_format(nsgv::gMdArg.INPUT_FILE.c_str(), "r", nullptr);
if (!nsgv::gInBamFp) {
spdlog::error("[{}] load sam/bam file failed.\n", __func__);
return -1;
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // 读取header
// 获取样本名称(libraryId)
nsgv::gMetrics.LIBRARY = sam_hdr_line_name(nsgv::gInBamHeader, "RG", 0);
/* 利用线程池对输入输出文件进行读写 */
htsThreadPool htsPoolRead = {NULL, 0}; // 多线程读取,创建线程池
htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池
htsPoolRead.pool = hts_tpool_init(nsgv::gMdArg.NUM_THREADS);
htsPoolWrite.pool = hts_tpool_init(nsgv::gMdArg.NUM_THREADS);
// htsPoolRead.pool = hts_tpool_init(8);
// htsPoolWrite.pool = hts_tpool_init(32);
if (!htsPoolRead.pool || !htsPoolWrite.pool) {
spdlog::error("[{}] failed to set up thread pool", __LINE__);
sam_close(nsgv::gInBamFp);
return -1;
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
// 测试读写速度
#if 1
bam1_t *bamp = bam_init1();
while (sam_read1(nsgv::gInBamFp, nsgv::gInBamHeader, bamp) >= 0);
DisplayProfiling(nsgv::gMdArg.NUM_THREADS);
exit(0);
#endif
return 0;
}

View File

@ -23,4 +23,4 @@ using std::stol;
using std::string; using std::string;
using std::vector; using std::vector;
using namespace ns_md; using namespace nsmd;

View File

@ -14,7 +14,7 @@ Date : 2023/10/23
using std::string; using std::string;
using std::vector; using std::vector;
namespace ns_md { namespace nsmd {
/* How strict to be when reading a SAM or BAM, beyond bare minimum validation. /* How strict to be when reading a SAM or BAM, beyond bare minimum validation.
*/ */
enum ValidationStringency { enum ValidationStringency {
@ -54,10 +54,20 @@ enum ScoringStrategy { SUM_OF_BASE_QUALITIES, TOTAL_MAPPED_REFERENCE_LENGTH, RAN
/* 索引文件的格式 bai或者csi */ /* 索引文件的格式 bai或者csi */
enum IndexFormat { BAI, CSI }; enum IndexFormat { BAI, CSI };
} // namespace ns_md } // namespace nsmd
/* markduplicate 需要的参数*/ /* markduplicate 需要的参数*/
struct MarkDupsArg { struct MarkDupsArg {
string INPUT_FILE; // input bam filename
string OUTPUT_FILE; // output bam filename
int NUM_THREADS = 1;
size_t MAX_MEM = ((size_t)1) << 31; // 最小2G
bool DUPLEX_IO = true; // 同时读写
/** /**
* The optional attribute in SAM/BAM/CRAM files used to store the duplicate type. * The optional attribute in SAM/BAM/CRAM files used to store the duplicate type.
*/ */
@ -125,7 +135,7 @@ struct MarkDupsArg {
bool REMOVE_SEQUENCING_DUPLICATES = false; bool REMOVE_SEQUENCING_DUPLICATES = false;
/* "Determines how duplicate types are recorded in the DT optional attribute.") */ /* "Determines how duplicate types are recorded in the DT optional attribute.") */
ns_md::DuplicateTaggingPolicy TAGGING_POLICY = ns_md::DuplicateTaggingPolicy::DontTag; nsmd::DuplicateTaggingPolicy TAGGING_POLICY = nsmd::DuplicateTaggingPolicy::DontTag;
/* "Clear DT tag from input SAM records. Should be set to false if input SAM doesn't have this tag. Default true") /* "Clear DT tag from input SAM records. Should be set to false if input SAM doesn't have this tag. Default true")
*/ */
@ -159,11 +169,11 @@ struct MarkDupsArg {
/* "If not null, assume that the input file has this order even if the header says otherwise.", /* "If not null, assume that the input file has this order even if the header says otherwise.",
optional = true, mutex = {"ASSUME_SORTED"} */ optional = true, mutex = {"ASSUME_SORTED"} */
ns_md::SortOrder ASSUME_SORT_ORDER = ns_md::SortOrder::unsorted; nsmd::SortOrder ASSUME_SORT_ORDER = nsmd::SortOrder::unsorted;
/* "The scoring strategy for choosing the non-duplicate among candidates." */ /* "The scoring strategy for choosing the non-duplicate among candidates." */
ns_md::ScoringStrategy DUPLICATE_SCORING_STRATEGY = ns_md::ScoringStrategy::SUM_OF_BASE_QUALITIES; nsmd::ScoringStrategy DUPLICATE_SCORING_STRATEGY = nsmd::ScoringStrategy::SUM_OF_BASE_QUALITIES;
// ns_md::ScoringStrategy DUPLICATE_SCORING_STRATEGY = ns_md::ScoringStrategy::TOTAL_MAPPED_REFERENCE_LENGTH; // nsmd::ScoringStrategy DUPLICATE_SCORING_STRATEGY = nsmd::ScoringStrategy::TOTAL_MAPPED_REFERENCE_LENGTH;
/* "The program record ID for the @PG record(s) created by this program. Set to null to disable " + /* "The program record ID for the @PG record(s) created by this program. Set to null to disable " +
"PG record creation. This string may have a suffix appended to avoid collision with other " + "PG record creation. This string may have a suffix appended to avoid collision with other " +
@ -203,6 +213,7 @@ struct MarkDupsArg {
3rd, 4th and 5th elements are assumed to be tile, x and y values. " + " For 7 element names (CASAVA 1.8), the 3rd, 4th and 5th elements are assumed to be tile, x and y values. " + " For 7 element names (CASAVA 1.8), the
5th, 6th, and 7th elements are assumed to be tile, x and y values.", optional = true */ 5th, 6th, and 7th elements are assumed to be tile, x and y values.", optional = true */
string READ_NAME_REGEX = "(?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$"; string READ_NAME_REGEX = "(?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$";
bool CHECK_OPTICAL_DUP = true;
/* "The maximum offset between two duplicate clusters in order to consider them optical duplicates. The default " + /* "The maximum offset between two duplicate clusters in order to consider them optical duplicates. The default " +
"is appropriate for unpatterned versions of the Illumina platform. For the patterned flowcell "is appropriate for unpatterned versions of the Illumina platform. For the patterned flowcell
@ -224,7 +235,7 @@ struct MarkDupsArg {
/* "Validation stringency for all SAM files read by this program. Setting stringency to SILENT " + /* "Validation stringency for all SAM files read by this program. Setting stringency to SILENT " +
"can improve performance when processing a BAM file in which variable-length data (read, qualities, tags) " "can improve performance when processing a BAM file in which variable-length data (read, qualities, tags) "
+ "do not otherwise need to be decoded.", common=true */ + "do not otherwise need to be decoded.", common=true */
ns_md::ValidationStringency VALIDATION_STRINGENCY = ns_md::ValidationStringency::DEFAULT_STRINGENCY; nsmd::ValidationStringency VALIDATION_STRINGENCY = nsmd::ValidationStringency::DEFAULT_STRINGENCY;
/* "Compression level for all compressed files created (e.g. BAM and VCF).", common = true */ /* "Compression level for all compressed files created (e.g. BAM and VCF).", common = true */
int COMPRESSION_LEVEL = 5; int COMPRESSION_LEVEL = 5;
@ -237,7 +248,7 @@ struct MarkDupsArg {
/* "Whether to create an index when writing VCF or coordinate sorted BAM output.", common = true */ /* "Whether to create an index when writing VCF or coordinate sorted BAM output.", common = true */
bool CREATE_INDEX = false; bool CREATE_INDEX = false;
ns_md::IndexFormat INDEX_FORMAT = ns_md::IndexFormat::BAI; nsmd::IndexFormat INDEX_FORMAT = nsmd::IndexFormat::BAI;
/* "Whether to create an MD5 digest for any BAM or FASTQ files created. ", common = true */ /* "Whether to create an MD5 digest for any BAM or FASTQ files created. ", common = true */
bool CREATE_MD5_FILE = false; bool CREATE_MD5_FILE = false;

View File

@ -0,0 +1,437 @@
#include "md_funcs.h"
#include <math.h>
#include <stdint.h>
#include <util/bam_buf.h>
#include <util/murmur3.h>
#include <util/profiling.h>
#include <iostream>
#include <map>
#include <set>
#include <unordered_map>
#include <vector>
#include "dup_metrics.h"
#include "md_args.h"
#include "read_ends.h"
#include "read_name_parser.h"
using std::cerr;
using std::endl;
using std::map;
using std::set;
using std::unordered_map;
using std::vector;
namespace nsgv {
extern MarkDupsArg gMdArg;
extern DuplicationMetrics gMetrics;
};
/*
* read
*/
int16_t computeDuplicateScore(BamWrap &bw) {
int16_t score = 0;
switch (nsgv::gMdArg.DUPLICATE_SCORING_STRATEGY) {
case nsmd::SUM_OF_BASE_QUALITIES:
// two (very) long reads worth of high-quality bases can go over
// Short.MAX_VALUE/2 and risk overflow.
score += (int16_t)min(bw.GetSumOfBaseQualities(), INT16_MAX / 2);
break;
case nsmd::TOTAL_MAPPED_REFERENCE_LENGTH:
if (!bw.GetReadUnmappedFlag())
// no need to remember the score since this scoring mechanism is
// symmetric
score = (int16_t)min(bw.GetReferenceLength(), INT16_MAX / 2);
break;
case nsmd::RANDOM:
// The RANDOM score gives the same score to both reads so that they get
// filtered together. it's not critical do use the readName since the
// scores from both ends get added, but it seem to be clearer this way.
score += (short)(Murmur3::Instance().HashUnencodedChars(bw.query_name()) & 0b11111111111111);
// subtract Short.MIN_VALUE/4 from it to end up with a number between
// 0 and Short.MAX_VALUE/2. This number can be then discounted in case
// the read is not passing filters. We need to stay far from overflow so
// that when we add the two scores from the two read mates we do not
// overflow since that could cause us to chose a failing read-pair
// instead of a passing one.
score -= INT16_MIN / 4;
default:
break;
}
// make sure that filter-failing records are heavily discounted. (the
// discount can happen twice, once for each mate, so need to make sure we do
// not subtract more than Short.MIN_VALUE overall.)
score += bw.GetReadFailsVendorQualityCheckFlag() ? (int16_t)(INT16_MIN / 2) : 0;
return score;
}
/*
* Builds a read ends object that represents a single read.
* read
*/
void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey) {
auto &k = *pKey;
auto &bc = bw.b->core;
k.read1FirstOfPair = bw.GetFirstOfPairFlag();
k.read1ReferenceIndex = bc.tid;
k.read1Coordinate = (bc.flag & BAM_FREVERSE) ? bw.GetUnclippedEnd() : bw.GetUnclippedStart();
k.orientation = (bc.flag & BAM_FREVERSE) ? ReadEnds::R : ReadEnds::F;
k.read1IndexInFile = index;
k.score = computeDuplicateScore(bw);
// Doing this lets the ends object know that it's part of a pair
if (bw.GetReadPairedFlag() && !bw.GetMateUnmappedFlag()) {
k.read2ReferenceIndex = bc.mtid;
}
// Fill in the location information for optical duplicates
if (!ReadNameParser::sWrongNameFormat)
rnParser.AddLocationInformation(bw.query_name(), pKey);
else
nsgv::gMdArg.CHECK_OPTICAL_DUP = false;
// cout << k.tile << ' ' << k.x << ' ' << k.y << endl;
// 计算位置key
k.posKey = BamWrap::bam_global_pos(k.read1ReferenceIndex, k.read1Coordinate); // << 1 | k.orientation;
}
/* 对找到的pairend read end添加一些信息 */
void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds) {
auto &pairedEnds = *pPairedEnds;
int64_t bamIdx = fragEnd.read1IndexInFile;
const int matesRefIndex = fragEnd.read1ReferenceIndex;
const int matesCoordinate = fragEnd.read1Coordinate;
// Set orientationForOpticalDuplicates, which always goes by the first then
// the second end for the strands. NB: must do this before updating the
// orientation later.
if (fragEnd.read1FirstOfPair) {
pairedEnds.orientationForOpticalDuplicates =
ReadEnds::GetOrientationByte(fragEnd.IsNegativeStrand(), pairedEnds.orientation == ReadEnds::R);
} else {
pairedEnds.orientationForOpticalDuplicates =
ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, fragEnd.IsNegativeStrand());
}
// If the other read is actually later, simply add the other read's data as
// read2, else flip the reads
if (matesRefIndex > pairedEnds.read1ReferenceIndex ||
(matesRefIndex == pairedEnds.read1ReferenceIndex && matesCoordinate >= pairedEnds.read1Coordinate)) {
pairedEnds.read2ReferenceIndex = matesRefIndex;
pairedEnds.read2Coordinate = matesCoordinate;
pairedEnds.read2IndexInFile = bamIdx;
pairedEnds.orientation =
ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, fragEnd.IsNegativeStrand());
// if the two read ends are in the same position, pointing in opposite
// directions, the orientation is undefined and the procedure above will
// depend on the order of the reads in the file. To avoid this, we set
// it explicitly (to FR):
if (pairedEnds.read2ReferenceIndex == pairedEnds.read1ReferenceIndex &&
pairedEnds.read2Coordinate == pairedEnds.read1Coordinate && pairedEnds.orientation == ReadEnds::RF) {
pairedEnds.orientation = ReadEnds::FR;
}
} else {
pairedEnds.read2ReferenceIndex = pairedEnds.read1ReferenceIndex;
pairedEnds.read2Coordinate = pairedEnds.read1Coordinate;
pairedEnds.read2IndexInFile = pairedEnds.read1IndexInFile;
pairedEnds.read1ReferenceIndex = matesRefIndex;
pairedEnds.read1Coordinate = matesCoordinate;
pairedEnds.read1IndexInFile = bamIdx;
pairedEnds.orientation =
ReadEnds::GetOrientationByte(fragEnd.IsNegativeStrand(), pairedEnds.orientation == ReadEnds::R);
pairedEnds.posKey = fragEnd.posKey;
}
pairedEnds.score += fragEnd.score;
}
static inline bool closeEnough(const ReadEnds *lhs, const ReadEnds *rhs, const int distance) {
return lhs != rhs && // no comparing an object to itself (checked using object identity)!
(lhs->tile != ReadEnds::NO_VALUE) &&
(rhs->tile != ReadEnds::NO_VALUE) && // no comparing objects without locations
lhs->tile == rhs->tile && // and the same tile
abs(lhs->x - rhs->x) <= distance && abs(lhs->y - rhs->y) <= distance;
}
static inline bool closeEnoughShort(const ReadEnds *lhs, const ReadEnds *rhs, const int distance) {
return lhs != rhs && abs(lhs->x - rhs->x) <= distance && abs(lhs->y - rhs->y) <= distance;
}
/**
* Finds which reads within the list of duplicates that are likely to be optical/co-localized duplicates of
* one another. Within each cluster of optical duplicates that is found, one read remains un-flagged for
* optical duplication and the rest are flagged as optical duplicates. The set of reads that are considered
* optical duplicates are indicated by returning "true" at the same index in the resulting boolean[] as the
* read appeared in the input list of physical locations.
*
* @param list a list of reads that are determined to be duplicates of one another
* @param keeper a single PhysicalLocation that is the one being kept as non-duplicate, and thus should never be
* annotated as an optical duplicate. May in some cases be null, or a PhysicalLocation not
* contained within the list! (always not be null!)
* @return a boolean[] of the same length as the incoming list marking which reads are optical duplicates
*/
static void findOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const ReadEnds *pBestRe,
vector<bool> *pOpticalDuplicateFlags) {
const int DEFAULT_OPTICAL_DUPLICATE_DISTANCE = 100;
const int DEFAULT_MAX_DUPLICATE_SET_SIZE = 300000;
vector<bool> &opticalDuplicateFlags = *pOpticalDuplicateFlags;
// opticalDuplicateFlags.push_back(true); // for test
int len = readEndsArr.size();
// If there is only one or zero reads passed in (so there are obviously no optical duplicates),
// or if there are too many reads (so we don't want to try to run this expensive n^2 algorithm),
// then just return an array of all false
if (len < 2 || len > DEFAULT_MAX_DUPLICATE_SET_SIZE) {
return;
}
if (len >= 4) {
/**
* Compute the optical duplicates correctly in the case where the duplicate group could end up with transitive
* optical duplicates
* getOpticalDuplicatesFlagWithGraph
*/
// Make a graph where the edges are reads that lie within the optical duplicate pixel distance from each other,
// we will then use the union-find algorithm to cluster the graph and find optical duplicate groups
Graph<int> opticalDistanceRelationGraph;
unordered_map<int, vector<int>> tileRGmap;
int keeperIndex = -1;
for (int i = 0; i < readEndsArr.size(); ++i) {
const ReadEnds *currentLoc = readEndsArr[i];
if (currentLoc == pBestRe)
keeperIndex = i;
if (currentLoc->tile != ReadEnds::NO_VALUE) {
int key = currentLoc->tile; // 只处理一个样本所以只有一个read group
tileRGmap[key].push_back(i);
}
opticalDistanceRelationGraph.addNode(i);
}
// Since because finding adjacent optical duplicates is an O(n^2) operation, we can subdivide the input into its
// readgroups in order to reduce the amount of redundant checks across readgroups between reads.
// fillGraphFromAGroup
for (auto &entry : tileRGmap) {
auto &groupList = entry.second;
if (groupList.size() > 1) {
for (int i = 0; i < groupList.size(); ++i) {
int iIndex = groupList[i];
const ReadEnds *currentLoc = readEndsArr[iIndex];
for (int j = i + 1; j < groupList.size(); ++j) {
int jIndex = groupList[j];
const ReadEnds *other = readEndsArr[jIndex];
if (closeEnoughShort(currentLoc, other, DEFAULT_OPTICAL_DUPLICATE_DISTANCE))
opticalDistanceRelationGraph.addEdge(iIndex, jIndex);
}
}
}
}
// Keep a map of the reads and their cluster assignments
unordered_map<int, int> opticalDuplicateClusterMap;
opticalDistanceRelationGraph.cluster(&opticalDuplicateClusterMap);
unordered_map<int, int> clusterToRepresentativeRead;
int keeperCluster = -1;
// Specially mark the keeper as specifically not a duplicate if it exists
if (keeperIndex >= 0) {
keeperCluster = opticalDuplicateClusterMap[keeperIndex];
clusterToRepresentativeRead[keeperCluster] = keeperIndex;
}
for (auto &entry : opticalDuplicateClusterMap) {
int recordIndex = entry.first;
int recordAssignedCluster = entry.second;
// If its not the first read we've seen for this cluster, mark it as an optical duplicate
auto repReadItr = clusterToRepresentativeRead.find(recordAssignedCluster);
if (repReadItr != clusterToRepresentativeRead.end() && recordIndex != keeperIndex) {
const ReadEnds *representativeLoc = readEndsArr[repReadItr->second];
const ReadEnds *currentRecordLoc = readEndsArr[recordIndex];
// If not in the keeper cluster, then keep the minX -> minY valued duplicate (note the tile must be
// equal for reads to cluster together)
if (!(keeperIndex >= 0 && recordAssignedCluster == keeperCluster) &&
(currentRecordLoc->x < representativeLoc->x ||
(currentRecordLoc->x == representativeLoc->x && currentRecordLoc->y < representativeLoc->y))) {
// Mark the old min as an optical duplicate, and save the new min
opticalDuplicateFlags[repReadItr->second] = true;
clusterToRepresentativeRead[recordAssignedCluster] = recordIndex;
} else {
// If a smaller read has already been visited, mark the test read as an optical duplicate
opticalDuplicateFlags[recordIndex] = true;
}
} else {
clusterToRepresentativeRead[recordAssignedCluster] = recordIndex;
}
}
} else {
/**
* Compute optical duplicates quickly in the standard case where we know that there won't be any transitive
* distances to worry about. Note, this is guaranteed to be correct when there are at most 2x reads from a
* readgroup or 3x with the keeper present
* getOpticalDuplicatesFlagFast
*/
// First go through and compare all the reads to the keeper
for (int i = 0; i < len; ++i) {
const ReadEnds *other = readEndsArr[i];
opticalDuplicateFlags[i] = closeEnough(pBestRe, other, DEFAULT_OPTICAL_DUPLICATE_DISTANCE);
}
// Now go through and do each pairwise comparison not involving the actualKeeper
for (int i = 0; i < len; ++i) {
const ReadEnds *lhs = readEndsArr[i];
if (lhs == pBestRe) // no comparisons to actualKeeper since those are all handled above
continue;
for (int j = i + 1; j < len; ++j) {
const ReadEnds *rhs = readEndsArr[j];
if (rhs == pBestRe) // no comparisons to actualKeeper since those are all handled above
continue;
if (opticalDuplicateFlags[i] && opticalDuplicateFlags[j])
continue; // both already marked, no need to check
if (closeEnough(lhs, rhs, DEFAULT_OPTICAL_DUPLICATE_DISTANCE)) {
// At this point we want to mark either lhs or rhs as duplicate. Either could have been marked
// as a duplicate of the keeper (but not both - that's checked above), so be careful about which
// one to now mark as a duplicate.
int index = opticalDuplicateFlags[j] ? i : j;
opticalDuplicateFlags[index] = true;
}
}
}
}
}
/**
* Looks through the set of reads and identifies how many of the duplicates are
* in fact optical duplicates, and stores the data in the instance level histogram.
*
* We expect only reads with FR or RF orientations, not a mixture of both.
*
* In PCR duplicate detection, a duplicates can be a have FR and RF when fixing the orientation order to the first end
* of the mate. In optical duplicate detection, we do not consider them duplicates if one read as FR and the other RF
* when we order orientation by the first mate sequenced (read #1 of the pair).
*/
static int checkOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const ReadEnds *pBestRe) {
vector<bool> opticalDuplicateFlags(readEndsArr.size(), false);
// find OpticalDuplicates
findOpticalDuplicates(readEndsArr, pBestRe, &opticalDuplicateFlags);
int opticalDuplicates = 0;
for (int i = 0; i < opticalDuplicateFlags.size(); ++i) {
ReadEnds *pRe = const_cast<ReadEnds *>(readEndsArr[i]);
if (opticalDuplicateFlags[i]) {
++opticalDuplicates;
pRe->isOpticalDuplicate = true;
} else {
pRe->isOpticalDuplicate = false;
}
}
return opticalDuplicates;
}
/**
*
*/
void trackOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const ReadEnds *pBestRe) {
bool hasFR = false, hasRF = false;
// Check to see if we have a mixture of FR/RF
for (auto pRe : readEndsArr) {
if (ReadEnds::FR == pRe->orientationForOpticalDuplicates)
hasFR = true;
else if (ReadEnds::RF == pRe->orientationForOpticalDuplicates)
hasRF = true;
}
// Check if we need to partition since the orientations could have changed
int nOpticalDup;
if (hasFR && hasRF) { // need to track them independently
vector<const ReadEnds *> trackOpticalDuplicatesF;
vector<const ReadEnds *> trackOpticalDuplicatesR;
// Split into two lists: first of pairs and second of pairs,
// since they must have orientation and same starting end
for (auto pRe : readEndsArr) {
if (ReadEnds::FR == pRe->orientationForOpticalDuplicates)
trackOpticalDuplicatesF.push_back(pRe);
else if (ReadEnds::RF == pRe->orientationForOpticalDuplicates)
trackOpticalDuplicatesR.push_back(pRe);
else
cerr << "Found an unexpected orientation: " << pRe->orientation << endl;
}
// track the duplicates
int nOpticalDupF = checkOpticalDuplicates(trackOpticalDuplicatesF, pBestRe);
int nOpticalDupR = checkOpticalDuplicates(trackOpticalDuplicatesR, pBestRe);
nOpticalDup = nOpticalDupF + nOpticalDupR;
} else { // No need to partition
nOpticalDup = checkOpticalDuplicates(readEndsArr, pBestRe);
}
// 统计信息trackDuplicateCounts
++nsgv::gMetrics.DuplicateCountHist[readEndsArr.size()];
if (readEndsArr.size() > nOpticalDup)
++nsgv::gMetrics.NonOpticalDuplicateCountHist[readEndsArr.size() - nOpticalDup];
if (nOpticalDup)
++nsgv::gMetrics.OpticalDuplicatesCountHist[nOpticalDup + 1];
}
/**
* Estimates the size of a library based on the number of paired end molecules observed
* and the number of unique pairs observed.
* <p>
* Based on the Lander-Waterman equation that states:
* C/X = 1 - exp( -N/X )
* where
* X = number of distinct molecules in library
* N = number of read pairs
* C = number of distinct fragments observed in read pairs
*/
int64_t estimateLibrarySize(int64_t readPairs, int64_t uniqueReadPairs) {
int64_t librarySize = 0;
int64_t readPairDuplicates = readPairs - uniqueReadPairs;
auto f = [](double x, double c, double n) { return c / x - 1 + exp(-n / x); };
if (readPairs > 0 && readPairDuplicates > 0) {
double m = 1.0;
double M = 100.0;
if (uniqueReadPairs >= readPairs || f(m * uniqueReadPairs, uniqueReadPairs, readPairs) < 0) {
cerr << "Invalid values for pairs and unique pairs: " << readPairs << ", " << uniqueReadPairs << endl;
return 0;
}
// find value of M, large enough to act as other side for bisection method
while (f(M * uniqueReadPairs, uniqueReadPairs, readPairs) > 0) {
M *= 10.0;
}
// use bisection method (no more than 40 times) to find solution
for (int i = 0; i < 40; ++i) {
double r = (m + M) / 2.0;
double u = f(r * uniqueReadPairs, uniqueReadPairs, readPairs);
if (u == 0)
break;
else if (u > 0)
m = r;
else if (u < 0)
M = r;
}
return uniqueReadPairs * (m + M) / 2.0;
}
return librarySize;
}
/**
* Estimates the ROI (return on investment) that one would see if a library was sequenced to
* x higher coverage than the observed coverage.
*
* @param estimatedLibrarySize the estimated number of molecules in the library
* @param x the multiple of sequencing to be simulated (i.e. how many X sequencing)
* @param pairs the number of pairs observed in the actual sequencing
* @param uniquePairs the number of unique pairs observed in the actual sequencing
* @return a number z <= x that estimates if you had pairs*x as your sequencing then you
* would observe uniquePairs*z unique pairs.
*/
double estimateRoi(long estimatedLibrarySize, double x, long pairs, long uniquePairs) {
return estimatedLibrarySize * (1 - exp(-(x * pairs) / estimatedLibrarySize)) / uniquePairs;
}
/**
* Calculates a histogram using the estimateRoi method to estimate the effective yield
* doing x sequencing for x=1..10.
*/
void calculateRoiHistogram(DuplicationMetrics &metrics) {
int64_t uniquePairs = metrics.READ_PAIRS_EXAMINED - metrics.READ_PAIR_DUPLICATES;
metrics.CoverageMult.resize(101);
for (int x = 1; x <= 100; ++x) {
metrics.CoverageMult[x] =
estimateRoi(metrics.ESTIMATED_LIBRARY_SIZE, x, metrics.READ_PAIRS_EXAMINED, uniquePairs);
}
}

View File

@ -0,0 +1,141 @@
#pragma once
#include <robin-map/robin_map.h>
#include <queue>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "dup_metrics.h"
using std::priority_queue;
using std::unordered_map;
using std::unordered_set;
using std::vector;
/* 前向声明 */
class BamWrap;
class ReadEnds;
class ReadNameParser;
/*
* optical duplicationgraph
*/
template <class Node>
struct Graph { // 用set
vector<Node> nodes; // 图中的结点
unordered_map<Node, int> nodeIdxMap;
unordered_map<int, unordered_set<int>> neighbors; // 邻接列表
int addNode(const Node &singleton) {
int idx = -1;
if (nodeIdxMap.find(singleton) == nodeIdxMap.end()) {
nodes.push_back(singleton);
idx = nodes.size() - 1;
nodeIdxMap[singleton] = idx;
neighbors[idx].clear();
} else
idx = nodeIdxMap[singleton];
return idx;
}
/* bidirectional and public */
void addEdge(Node &left, Node &right) {
int leftIndex = addNode(left);
if (left == right)
return;
int rightIndex = addNode(right);
addNeighbor(leftIndex, rightIndex);
addNeighbor(rightIndex, leftIndex);
}
void addNeighbor(int fromNode, int toNode) {
unordered_set<int> &fromNodesNeighbors = neighbors[fromNode];
if (fromNodesNeighbors.find(toNode) == fromNodesNeighbors.end())
fromNodesNeighbors.insert(toNode);
}
/**
* returns the cluster map of connected components
*
* @return Nodes that point to the same integer are in the same cluster.
*/
void cluster(unordered_map<Node, int> *pClusterMap) {
auto &clusterMap = *pClusterMap;
vector<int> vCluster(nodes.size(), 0);
for (int i = 0; i < vCluster.size(); ++i) vCluster[i] = i;
for (int i = 0; i < neighbors.size(); ++i) {
for (auto &j : neighbors[i]) joinNodes(j, i, &vCluster);
}
for (auto &node : nodes) {
clusterMap[node] = findRepNode(vCluster, nodeIdxMap[node]);
}
}
// Part of Union-Find with Path Compression that joins to nodes to be part of the same cluster.
static void joinNodes(int nodeId1, int nodeId2, vector<int> *pGrouping) {
auto &grouping = *pGrouping;
int repNode1 = findRepNode(grouping, nodeId1);
int repNode2 = findRepNode(grouping, nodeId2);
if (repNode1 == repNode2)
return;
grouping[repNode1] = repNode2;
}
// Part of Union-Find with Path Compression to determine the duplicate set a particular UMI belongs to.
static int findRepNode(vector<int> &grouping, int nodeId) {
int representativeUmi = nodeId; // All UMIs of a duplicate set will have the same reprsentativeUmi.
while (representativeUmi != grouping[representativeUmi]) representativeUmi = grouping[representativeUmi];
while (nodeId != representativeUmi) {
int newUmiId = grouping[nodeId];
grouping[nodeId] = representativeUmi;
nodeId = newUmiId;
}
return representativeUmi;
}
};
/*
* read
*/
int16_t computeDuplicateScore(BamWrap &bw);
/*
* Builds a read ends object that represents a single read.
* read
*/
void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey);
/*
* pairend read end
*/
void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds);
/**
* Looks through the set of reads and identifies how many of the duplicates are
* in fact optical duplicates, and stores the data in the instance level histogram.
* Additionally sets the transient isOpticalDuplicate flag on each read end that is
* identified as an optical duplicate.
*
*/
void trackOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const ReadEnds *pBestRe);
/*
* Estimates the size of a library based on the number of paired end molecules observed
* and the number of unique pairs observed.
*/
int64_t estimateLibrarySize(int64_t readPairs, int64_t uniqueReadPairs);
/**
* Estimates the ROI (return on investment) that one would see if a library was sequenced to
* x higher coverage than the observed coverage.
**/
double estimateRoi(long estimatedLibrarySize, double x, long pairs, long uniquePairs);
/**
* Calculates a histogram using the estimateRoi method to estimate the effective yield
* doing x sequencing for x=1..10.
*/
void calculateRoiHistogram(DuplicationMetrics &metrics);

View File

@ -0,0 +1,194 @@
#pragma once
#include <robin-map/robin_map.h>
#include <robin-map/robin_set.h>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "read_ends.h"
#include "util/bam_buf.h"
using std::priority_queue;
using std::set;
using std::string;
using std::unordered_set;
using std::vector;
/* 存放未匹配readend相同位点的所有readend */
struct UnpairedREInfo {
int64_t taskSeq; // 对应第几轮计算
ReadEnds unpairedRE;
};
/* 对于一个pair数据一个完整的计算点包含read1的比对位置和read2的比对位置 */
struct CalcKey {
int64_t read1Pos;
int64_t read2Pos;
bool operator<(const CalcKey &o) const {
int comp = (int)(read1Pos - o.read1Pos);
if (comp == 0)
comp = (int)(read2Pos - o.read2Pos);
return comp < 0;
}
bool operator==(const CalcKey &o) const { return read1Pos == o.read1Pos && read2Pos == o.read2Pos; }
std::size_t operator()(const CalcKey &o) const {
return std::hash<int64_t>()(read1Pos) ^ std::hash<int64_t>()(read2Pos);
}
};
struct CalcKeyHash {
std::size_t operator()(const CalcKey &o) const {
return std::hash<int64_t>()(o.read1Pos) ^ std::hash<int64_t>()(o.read2Pos);
}
};
/* 用来记录冗余索引相关的信息 */
struct DupInfo {
int64_t idx;
int64_t repIdx = 0; // 这一批冗余中的非冗余read 代表的索引
int16_t dupSet = 0; // dup set size
DupInfo() : DupInfo(-1, 0, 0) {}
DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) {}
DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), repIdx(repIdx_), dupSet(dupSet_) {}
bool operator<(const DupInfo &o) const { return idx < o.idx; }
bool operator>(const DupInfo &o) const { return idx > o.idx; }
operator int64_t() const { return idx; }
};
struct DupInfoHash {
std::size_t operator()(const DupInfo &o) const { return std::hash<int64_t>()(o.idx); }
};
struct DupInfoEqual {
bool operator()(const DupInfo &o1, const DupInfo &o2) const { return o1.idx == o2.idx; }
bool operator()(const DupInfo &o1, const int64_t &o2) const { return o1.idx == o2; }
bool operator()(const int64_t &o1, const DupInfo &o2) const { return o1 == o2.idx; }
};
using MDMap = tsl::robin_map<int64_t, int64_t>;
template <typename T>
// using MDSet = set<T>;
// using MDSet = unordered_set<T>;
using MDSet = tsl::robin_set<T>;
template <typename T>
// using DPSet = set<T>;
// using DPSet = unordered_set<T, DupInfoHash, DupInfoEqual>;
using DPSet = tsl::robin_set<T, DupInfoHash, DupInfoEqual>;
template <typename T>
using CalcSet = set<T>;
// using CalcSet = tsl::robin_set<T, CalcKeyHash>;
using ReadEndsSet = tsl::robin_set<ReadEnds, ReadEndsHash, ReadEndsEqual>;
/* 当遗留数据在当前任务找到了pair read后进行冗余计算时候存放结果的数据结构 */
struct TaskSeqDupInfo {
DPSet<DupInfo> dupIdx;
MDSet<int64_t> opticalDupIdx;
DPSet<DupInfo> repIdx;
MDSet<int64_t> notDupIdx;
MDSet<int64_t> notOpticalDupIdx;
MDSet<int64_t> notRepIdx;
};
/* 保存有未匹配pair位点的信息包括read end数组和有几个未匹配的read end */
struct UnpairedPosInfo {
int unpairedNum = 0;
int64_t taskSeq;
vector<ReadEnds> pairArr;
MDSet<string> readNameSet;
};
// typedef unordered_map<string, UnpairedREInfo> UnpairedNameMap;
// typedef unordered_map<int64_t, UnpairedPosInfo> UnpairedPositionMap;
typedef tsl::robin_map<string, UnpairedREInfo> UnpairedNameMap; // 以read name为索引保存未匹配的pair read
typedef tsl::robin_map<int64_t, UnpairedPosInfo>
UnpairedPositionMap; // 以位点为索引保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量
/* 单线程处理冗余参数结构体 */
struct MarkDupDataArg {
int64_t taskSeq; // 任务序号
int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置
vector<BamWrap *> bams; // 存放待处理的bam read
vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads
DPSet<DupInfo> pairDupIdx; // pair的冗余read的索引
MDSet<int64_t> pairOpticalDupIdx; // optical冗余read的索引
DPSet<DupInfo> fragDupIdx; // frag的冗余read的索引
DPSet<DupInfo> pairRepIdx; // pair的dupset代表read的索引
MDSet<int64_t> pairSingletonIdx; // 某位置只有一对read的所有read pair个数
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
};
/*
*
*/
template <typename T>
struct PairArrIdIdx {
int arrId = 0;
uint64_t arrIdx = 0;
T dupIdx = 0;
};
template <typename T>
struct IdxGreaterThan {
bool operator()(const PairArrIdIdx<T> &a, const PairArrIdIdx<T> &b) { return a.dupIdx > b.dupIdx; }
};
template <typename T>
struct DupIdxQueue {
// 将冗余索引和他对应的task vector对应起来
// 由于是多个task来查找冗余所以每次找到的冗余index都放在一个独立的vector中vector之间可能有重叠所以需要用一个最小堆来维护
vector<vector<T>> *dupIdx2DArr;
priority_queue<PairArrIdIdx<T>, vector<PairArrIdIdx<T>>, IdxGreaterThan<T>> minHeap;
uint64_t popNum = 0;
int Init(vector<vector<T>> *_dupIdx2DArr) {
dupIdx2DArr = _dupIdx2DArr;
if (dupIdx2DArr == nullptr) {
return -1;
}
for (int i = 0; i < dupIdx2DArr->size(); ++i) {
auto &v = (*dupIdx2DArr)[i];
if (!v.empty()) {
minHeap.push({i, 1, v[0]});
}
}
return 0;
}
T Pop() {
T ret = -1;
if (!minHeap.empty()) {
auto idx = minHeap.top();
minHeap.pop();
++popNum;
ret = idx.dupIdx;
auto &v = (*dupIdx2DArr)[idx.arrId];
if (v.size() > idx.arrIdx) {
minHeap.push({idx.arrId, idx.arrIdx + 1, v[idx.arrIdx]});
}
}
return ret;
}
uint64_t Size() {
uint64_t len = 0;
if (dupIdx2DArr != nullptr) {
for (auto &v : *dupIdx2DArr) {
len += v.size();
}
}
return len - popNum;
}
};

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,174 @@
#pragma once
#include <inttypes.h>
#include <util/yarn.h>
#include "md_types.h"
struct ReadData {
vector<BamWrap *> bams; // read step output
int64_t bamStartIdx = 0; // 每轮读入的bam数组起始位置在全局bam中的索引
int64_t taskSeq = 0; // 任务序号
};
struct GenREData {
vector<vector<ReadEnds>> pairsArr; // 成对的reads
vector<vector<ReadEnds>> fragsArr; // 暂未找到配对的reads
vector<UnpairedNameMap> unpairedDicArr; // 用来寻找pair end
void Init(int nThread) {
for (int i = 0; i <= nThread; ++i) { // 比线程多一个主要是pairs多一个其他没用
pairsArr.push_back(vector<ReadEnds>());
fragsArr.push_back(vector<ReadEnds>());
unpairedDicArr.push_back(UnpairedNameMap());
}
}
UnpairedNameMap unpairedDic; // 代替sort step中一部分计算
UnpairedPositionMap unpairedPosArr; //
};
struct SortMarkData {
vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
};
struct SortData {
volatile void *dataPtr; // SortMarkData pointer
};
struct MarkDupData {
int64_t taskSeq = 0; // 任务序号
DPSet<DupInfo> pairDupIdx; // pair的冗余read的索引
MDSet<int64_t> pairOpticalDupIdx; // optical冗余read的索引
DPSet<DupInfo> fragDupIdx; // frag的冗余read的索引
DPSet<DupInfo> pairRepIdx; // pair的dupset代表read的索引
volatile void *dataPtr; // SortMarkData pointer
};
struct IntersectData {
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr;
// 每个task对应一个vector
vector<vector<DupInfo>> dupIdxArr;
vector<vector<int64_t>> opticalDupIdxArr;
vector<vector<DupInfo>> repIdxArr;
// 用来存放后续计算的数据
vector<DPSet<DupInfo>> latterDupIdxArr;
vector<MDSet<int64_t>> latterOpticalDupIdxArr;
vector<DPSet<DupInfo>> latterRepIdxArr;
vector<MDSet<int64_t>> latterNotDupIdxArr;
vector<MDSet<int64_t>> latterNotOpticalDupIdxArr;
vector<MDSet<int64_t>> latterNotRepIdxArr;
};
// 记录流水线状态task的序号以及某阶段是否结束
struct PipelineArg {
static const int GENBUFNUM = 2;
static const int SORTBUFNUM = 2;
static const int MARKBUFNUM = 3;
uint64_t readOrder = 0;
uint64_t genREOrder = 0;
uint64_t sortOrder = 0;
uint64_t markDupOrder = 0;
uint64_t intersectOrder = 0;
int numThread = 0;
volatile int readFinish = 0;
volatile int genREFinish = 0;
volatile int sortFinish = 0;
volatile int markDupFinish = 0;
yarn::lock_t *readSig;
yarn::lock_t *genRESig;
yarn::lock_t *sortSig;
yarn::lock_t *markDupSig;
PipelineArg() {
readSig = yarn::NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了对调用透明
genRESig = yarn::NEW_LOCK(0); // 最大值2, 双buffer
sortSig = yarn::NEW_LOCK(0);
markDupSig = yarn::NEW_LOCK(0);
for (int i = 0; i < SORTBUFNUM; ++i) {
sortData[i].dataPtr = &sortMarkData[i];
}
for (int i = 0; i < MARKBUFNUM; ++i) {
markDupData[i].dataPtr = &sortMarkData[i + SORTBUFNUM];
}
}
SortMarkData sortMarkData[SORTBUFNUM + MARKBUFNUM];
// for step-1 read
ReadData readData;
// for step-2 generate readends
GenREData genREData[GENBUFNUM];
// for step-3 sort each thread frags and pairs
SortData sortData[SORTBUFNUM];
// for step-4 mark duplicate
MarkDupData markDupData[MARKBUFNUM];
// for step-5 deal with intersect data
IntersectData intersectData;
};
struct REArrIdIdx {
int arrId = 0; // 数组索引
uint64_t arrIdx = 0; // 数组内部当前索引
const ReadEnds *pE = nullptr;
};
struct REGreaterThan {
bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return *b.pE < *a.pE; }
};
struct ReadEndsHeap {
// 将冗余索引和他对应的task vector对应起来
vector<vector<ReadEnds>> *arr2d;
priority_queue<REArrIdIdx, vector<REArrIdIdx>, REGreaterThan> minHeap;
uint64_t popNum = 0;
int Init(vector<vector<ReadEnds>> *_arr2d) {
arr2d = _arr2d;
if (arr2d == nullptr) {
return -1;
}
for (int i = 0; i < arr2d->size(); ++i) {
auto &v = (*arr2d)[i];
if (!v.empty()) {
minHeap.push({i, 1, &v[0]});
}
}
return 0;
}
const ReadEnds *Pop() {
const ReadEnds *ret = nullptr;
if (!minHeap.empty()) {
auto minVal = minHeap.top();
minHeap.pop();
++popNum;
ret = minVal.pE;
auto &v = (*arr2d)[minVal.arrId];
if (v.size() > minVal.arrIdx) {
minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v[minVal.arrIdx]});
}
}
return ret;
}
uint64_t Size() {
uint64_t len = 0;
if (arr2d != nullptr) {
for (auto &v : *arr2d) {
len += v.size();
}
}
return len - popNum;
}
};
// 并行运行mark duplicate
void pipelineMarkDups();

View File

@ -0,0 +1,170 @@
/*
Description: read
ends
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2023/11/3
*/
#pragma once
#include <stdint.h>
#include <stdlib.h>
#include <algorithm>
/**
* Small interface that provides access to the physical location information
* about a cluster. All values should be defaulted to -1 if unavailable.
* ReadGroup and Tile should only allow non-zero positive integers, x and y
* coordinates may be negative.
*/
struct PhysicalLocation {
static const int NO_VALUE = -1;
/**
* Small class that provides access to the physical location information
* about a cluster. All values should be defaulted to -1 if unavailable.
* Tile should only allow non-zero positive integers, x and y coordinates
* must be non-negative. This is different from PhysicalLocationShort in
* that the x and y positions are ints, not shorts thus, they do not
* overflow within a HiSeqX tile.
*/
int16_t tile = -1;
// int32_t x = -1;
// int32_t y = -1;
int16_t x = -1;
int16_t y = -1;
};
/* 包含了所有read ends信息如picard里边的 ReadEndsForMarkDuplicates*/
struct ReadEnds : PhysicalLocation {
static const int8_t F = 0, R = 1, FF = 2, FR = 3, RR = 4, RF = 5;
/* 保留一些bam记录中的数据 */
bool read1FirstOfPair = true;
/* ReadEnds中的成员变量 */
/** Little struct-like class to hold read pair (and fragment) end data for
* duplicate marking. */
// int16_t libraryId; // 没用,不考虑多样本
int8_t orientation = -1;
int32_t read1ReferenceIndex = -1;
int32_t read1Coordinate = -1;
int32_t read2ReferenceIndex = -1;
// This field is overloaded for flow based processing as the end coordinate of read 1. (paired reads not supported)
int32_t read2Coordinate = -1;
/* Additional information used to detect optical dupes */
// int16_t readGroup = -1; 一般经过比对后的bam文件只有一个read
// groupnormal或者tumor
/** For optical duplicate detection the orientation matters regard to 1st or
* 2nd end of a mate */
int8_t orientationForOpticalDuplicates = -1;
/** A *transient* flag marking this read end as being an optical duplicate.
*/
bool isOpticalDuplicate = false;
/* ReadEndsForMarkDuplicates中的成员变量 */
/** Little struct-like class to hold read pair (and fragment) end data for
* MarkDuplicatesWithMateCigar **/
int16_t score = 0;
int64_t read1IndexInFile = -1;
int64_t read2IndexInFile = -1;
int64_t duplicateSetSize = -1;
/* ReadEndsForMarkDuplicatesWithBarcodes中的成员变量 (好像用不到) */
// int32_t barcode = 0; // primary barcode for this read (and pair)
// int32_t readOneBarcode = 0; // read one barcode, 0 if not present
// int32_t readTwoBarcode = 0; // read two barcode, 0 if not present or not
// paired
/* zzh增加的成员变量 */
int64_t posKey = -1; // 根据位置信息生成的关键字 return (int64_t)tid <<
// MAX_CONTIG_LEN_SHIFT | (int64_t)pos;
/* 用来做一些判断因为一些readends会做多次操作比如task之间有重叠等等 */
int oprateTime = 0;
/* 根据pairend read的比对方向来确定整体的比对方向 */
static int8_t GetOrientationByte(bool read1NegativeStrand, bool read2NegativeStrand) {
if (read1NegativeStrand) {
if (read2NegativeStrand)
return RR;
else
return RF;
} else {
if (read2NegativeStrand)
return FR;
else
return FF;
}
}
/* 比较两个readends是否一样有个冗余 */
static bool AreComparableForDuplicates(const ReadEnds &lhs, const ReadEnds &rhs, bool compareRead2) {
bool areComparable = true;
areComparable = lhs.read1ReferenceIndex == rhs.read1ReferenceIndex &&
lhs.read1Coordinate == rhs.read1Coordinate && lhs.orientation == rhs.orientation;
if (areComparable && compareRead2) {
areComparable =
lhs.read2ReferenceIndex == rhs.read2ReferenceIndex && lhs.read2Coordinate == rhs.read2Coordinate;
}
return areComparable;
}
/* 比对方向是否正向 */
bool IsPositiveStrand() const { return orientation == F; }
/* pairend是否合适的比对上了 */
bool IsPaired() const { return read2ReferenceIndex != -1; }
bool IsNegativeStrand() const { return orientation == R; }
// 对于相交的数据进行比对a是否小于b根据AreComparableForDuplicates函数得来
static inline bool ReadLittleThan(const ReadEnds &a, const ReadEnds &b, bool compareRead2 = false) {
int comp = a.read1ReferenceIndex - b.read1ReferenceIndex;
if (comp == 0)
comp = a.read1Coordinate - b.read1Coordinate;
if (comp == 0)
comp = a.orientation - b.orientation;
if (compareRead2) {
if (comp == 0)
comp = a.read2ReferenceIndex - b.read2ReferenceIndex;
if (comp == 0)
comp = a.read2Coordinate - b.read2Coordinate;
}
return comp < 0;
}
// 找某一个位置的所有readend时需要
static bool PairsLittleThan(const ReadEnds &lhs, const ReadEnds &rhs) { return ReadLittleThan(lhs, rhs, true); }
// 比较函数
bool operator<(const ReadEnds &o) const {
int comp = read1ReferenceIndex - o.read1ReferenceIndex;
if (comp == 0)
comp = read1Coordinate - o.read1Coordinate;
if (comp == 0)
comp = orientation - o.orientation;
if (comp == 0)
comp = read2ReferenceIndex - o.read2ReferenceIndex;
if (comp == 0)
comp = read2Coordinate - o.read2Coordinate;
if (comp == 0)
comp = tile - o.tile;
if (comp == 0)
comp = x - o.x;
if (comp == 0)
comp - y - o.y;
if (comp == 0)
comp = (int)(read1IndexInFile - o.read1IndexInFile);
if (comp == 0)
comp = (int)(read2IndexInFile - o.read2IndexInFile);
return comp < 0;
}
};
struct ReadEndsHash {
std::size_t operator()(const ReadEnds &o) const { return std::hash<int64_t>()(o.read1IndexInFile); }
};
struct ReadEndsEqual {
bool operator()(const ReadEnds &o1, const ReadEnds &o2) const { return o1.read1IndexInFile == o2.read1IndexInFile; }
};

View File

@ -0,0 +1,226 @@
/*
Description: readnametile, x, y
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2023/11/6
*/
#ifndef READ_NAME_PARSER_H_
#define READ_NAME_PARSER_H_
#include <spdlog/spdlog.h>
#include <stdint.h>
#include <regex>
#include <string>
#include "read_ends.h"
// using std::regex;
using std::cmatch;
using std::regex;
using std::string;
/**
* Provides access to the physical location information about a cluster.
* All values should be defaulted to -1 if unavailable. ReadGroup and Tile
* should only allow non-zero positive integers, x and y coordinates may be
* negative. 线
*/
struct ReadNameParser {
/**
* The read name regular expression (regex) is used to extract three pieces
* of information from the read name: tile, x location, and y location. Any
* read name regex should parse the read name to produce these and only
* these values. An example regex is:
* (?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$
* which assumes that fields in the read name are delimited by ':' and the
* last three fields correspond to the tile, x and y locations, ignoring any
* trailing non-digit characters.
*
* The default regex is optimized for fast parsing (see {@link
* #getLastThreeFields(String, char, int[])}) by searching for the last
* three fields, ignoring any trailing non-digit characters, assuming the
* delimiter ':'. This should consider correctly read names where we have 5
* or 7 field with the last three fields being tile/x/y, as is the case for
* the majority of read names produced by Illumina technology.
*/
const string DEFAULT_READ_NAME_REGEX = "(?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$";
bool warnAboutRegexNotMatching = true;
static bool sWrongNameFormat;
string readNameStored = "";
PhysicalLocation physicalLocationStored;
int tmpLocationFields[3]; // for optimization of addLocationInformation
bool useOptimizedDefaultParsing = true; // was the regex default?
string readNameRegex = DEFAULT_READ_NAME_REGEX;
regex readNamePattern;
ReadNameParser() : ReadNameParser(DEFAULT_READ_NAME_REGEX) {}
ReadNameParser(const string &strReadNameRegex) : ReadNameParser(strReadNameRegex, true) {}
ReadNameParser(const string &strReadNameRegex, bool isWarn) {
readNameRegex = strReadNameRegex;
if (strReadNameRegex == DEFAULT_READ_NAME_REGEX)
useOptimizedDefaultParsing = true;
else
useOptimizedDefaultParsing = false;
readNamePattern = std::regex(strReadNameRegex, std::regex_constants::optimize);
warnAboutRegexNotMatching = isWarn;
}
/* 重新设置readNameRegex */
void SetReadNameRegex(const string &strReadNameRegex) {
readNameRegex = strReadNameRegex;
if (strReadNameRegex == DEFAULT_READ_NAME_REGEX)
useOptimizedDefaultParsing = true;
else
useOptimizedDefaultParsing = false;
readNamePattern = std::regex(strReadNameRegex, std::regex_constants::optimize);
// readNamePattern = strReadNameRegex;
}
/* 添加测序时候的tile x y 信息 */
bool AddLocationInformation(const string &readName, PhysicalLocation *loc) {
if (!(readName == readNameStored)) {
if (ReadLocationInformation(readName, loc)) {
readNameStored = readName;
physicalLocationStored = *loc;
return true;
}
// return false if read name cannot be parsed
return false;
} else {
*loc = physicalLocationStored;
return true;
}
}
/**
* Method used to extract tile/x/y from the read name and add it to the
* PhysicalLocationShort so that it can be used later to determine optical
* duplication
*
* @param readName the name of the read/cluster
* @param loc the object to add tile/x/y to
* @return true if the read name contained the information in parsable form,
* false otherwise
*/
bool ReadLocationInformation(const string &readName, PhysicalLocation *loc) {
try {
// Optimized version if using the default read name regex (== used on purpose):
if (useOptimizedDefaultParsing) {
const int fields = getLastThreeFields(readName, ':');
if (!(fields == 5 || fields == 7)) {
if (warnAboutRegexNotMatching) {
spdlog::warn(
"Default READ_NAME_REGEX '{}' did not match read "
"name '{}'."
"You may need to specify a READ_NAME_REGEX in "
"order to correctly identify optical duplicates. "
"Note that this message will not be emitted again "
"even if other read names do not match the regex.",
readNameRegex.c_str(), readName.c_str());
warnAboutRegexNotMatching = false;
sWrongNameFormat = true;
}
return false;
}
loc->tile = (int16_t)tmpLocationFields[0];
loc->x = tmpLocationFields[1];
loc->y = tmpLocationFields[2];
return true;
} else if (readNameRegex.empty()) {
return false;
} else {
// Standard version that will use the regex
cmatch m;
// cout << "here1" << endl;
if (std::regex_match(readName.c_str(), m, readNamePattern)) {
loc->tile = std::stoi(m[1].str());
loc->x = std::stoi(m[2].str());
loc->y = std::stoi(m[3].str());
return true;
} else {
if (warnAboutRegexNotMatching) {
spdlog::warn(
"READ_NAME_REGEX '{}' did not match read name '{}'."
"Your regex may not be correct. "
"Note that this message will not be emitted again "
"even if other read names do not match the regex.",
readNameRegex.c_str(), readName.c_str());
warnAboutRegexNotMatching = false;
sWrongNameFormat = true;
}
return false;
}
}
} catch (const std::runtime_error &e) {
if (warnAboutRegexNotMatching) {
spdlog::warn(
"A field parsed out of a read name was expected to contain "
"an integer and did not. READ_NAME_REGEX: {}; Read name: "
"{}; Error Msg: {}",
readNameRegex.c_str(), readName.c_str(), e.what());
warnAboutRegexNotMatching = false;
sWrongNameFormat = true;
}
} catch (...) {
if (warnAboutRegexNotMatching) {
spdlog::warn(
"A field parsed out of a read name was expected to contain "
"an integer and did not. READ_NAME_REGEX: {}; Read name: "
"{}",
readNameRegex.c_str(), readName.c_str());
warnAboutRegexNotMatching = false;
sWrongNameFormat = true;
}
}
return true;
}
/**
* Given a string, splits the string by the delimiter, and returns the the
* last three fields parsed as integers. Parsing a field considers only a
* sequence of digits up until the first non-digit character. The three
* values are stored in the passed-in array.
*
* @throws NumberFormatException if any of the tokens that should contain
* numbers do not start with parsable numbers
*/
int getLastThreeFields(const string &readName, char delim) {
int tokensIdx = 2; // start at the last token
int numFields = 0;
int i, endIdx;
endIdx = readName.size();
// find the last three tokens only
for (i = (int)readName.size() - 1; 0 <= i && 0 <= tokensIdx; i--) {
if (readName.at(i) == delim || 0 == i) {
numFields++;
const int startIdx = (0 == i) ? 0 : (i + 1);
// cout << readName << endl;
tmpLocationFields[tokensIdx] = std::stoi(readName.substr(startIdx, endIdx - startIdx));
tokensIdx--;
endIdx = i;
}
}
// continue to find the # of fields
while (0 <= i) {
if (readName.at(i) == delim || 0 == i)
numFields++;
i--;
}
if (numFields < 3) {
tmpLocationFields[0] = tmpLocationFields[1] = tmpLocationFields[2] = -1;
numFields = -1;
}
return numFields;
}
};
#endif

View File

@ -0,0 +1,248 @@
/*
Description: sam/bambuf
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2019/11/27
*/
#include "bam_buf.h"
/*
* BamBuf
*/
// 读取数据直到读完,或者缓冲区满
int BamBuf::ReadBam() {
int read_num = 0;
if (handle_last) { // 处理上次读入的最后一个bam
if (has_enough_space()) { // 必须调用在边界处调整memffset
++read_num;
append_one_bam();
} else {
return read_num; // 还是没空间
}
}
while (read_stat_ >= 0 && (read_stat_ = sam_read1(fp, hdr, bw->b)) >= 0) {
bw->end_pos_ = BamWrap::BamEndPos(bw->b);
if (has_enough_space()) { // 还有空间
// if (true) { // 还有空间
append_one_bam();
++read_num; // 放进缓存才算读取到
} else {
break;
}
}
if (read_stat_ >= 0) {
handle_last = true;
} else {
handle_last = false;
}
return read_num;
}
// 初始化缓存
void BamBuf::Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size) {
this->fp = fp;
this->hdr = hdr;
this->mem_size = mem_size;
this->mem = (uint8_t *)malloc(mem_size);
this->bw = (BamWrap *)malloc(sizeof(BamWrap));
this->bw->b = bam_init1();
if (bw == NULL || this->mem == NULL || this->bw->b == NULL) {
fprintf(stderr, "allocate memory failed! Abort\n");
exit(-1);
}
}
void BamBuf::ClearBeforeIdx(size_t idxInBv) {
if (idxInBv < 1)
return;
int i = 0, j = idxInBv;
for (; j < bv.size(); ++i, ++j) {
bv[i] = bv[j];
}
bv.resize(i);
prepare_read();
}
void BamBuf::ClearAll() {
bv.clear();
prepare_read();
}
// 为下一次读取做准备, 计算一些边界条件
inline void BamBuf::prepare_read() {
// 计算余留的下次计算可能用到的bam所占的位置
if (bv.size() > 0) {
BamWrap *bw = bv[0];
legacy_start = (int64_t)bw - (int64_t)mem;
bw = bv.back();
legacy_end = (int64_t)bw + bw->length() - (int64_t)mem;
} else {
legacy_start = legacy_end = 0;
mem_offset = 0; // 上次没剩下,那就从头存储
}
}
// 检查缓存是否还有空间
inline bool BamBuf::has_enough_space() {
const uint32_t bam_len = bw->length();
int64_t potential_end = mem_offset + bam_len;
if (legacy_start <= legacy_end)
legacy_start += mem_size;
if (potential_end >= legacy_start) {
return false;
}
if (potential_end >= mem_size) {
mem_offset = 0;
}
int64_t virtual_offset = mem_offset;
if (virtual_offset < legacy_end)
virtual_offset += mem_size;
potential_end = virtual_offset + bam_len;
return potential_end < legacy_start;
}
// 处理一个读取后的bam
inline void BamBuf::append_one_bam() {
BamWrap *bwp = (BamWrap *)(mem + mem_offset);
*bwp = *bw;
bwp->b = (bam1_t *)((char *)bwp + sizeof(*bwp));
bam1_t *bp = bwp->b;
*bp = *bw->b;
bp->data = (uint8_t *)((char *)bwp->b + sizeof(bam1_t));
memcpy(bp->data, bw->b->data, bw->b->l_data);
// 更新下次存储的位置
mem_offset = (mem_offset + bw->length() + 8 - 1) & ~((size_t)(8 - 1));
bv.push_back(bwp);
}
// 处理上次读入的最后一个read
inline bool BamBuf::handle_last_read() {
if (handle_last) { // 处理上次读入的最后一个bam
if (has_enough_space()) { // 必须调用在边界处调整memffset
append_one_bam();
handle_last = false;
return true;
}
}
return false;
}
/*
* AsyncIoBamBuf
*/
// 初始化缓存
void AsyncIoBamBuf::Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size) {
if (use_async_io_) {
buf1_.Init(fp, hdr, mem_size >> 1);
buf2_.Init(fp, hdr, mem_size >> 1);
pi_ = &buf1_;
po_ = &buf2_;
tid_ = (pthread_t *)malloc(sizeof(pthread_t));
} else {
buf1_.Init(fp, hdr, mem_size);
pi_ = &buf1_;
}
}
// 读取数据
int AsyncIoBamBuf::ReadBam() {
if (use_async_io_) {
hasThread = true;
return async_read_bam();
} else {
return sync_read_bam();
}
}
int AsyncIoBamBuf::sync_read_bam() {
int read_num = 0;
if (clear_all_) {
clear_all_ = false;
pi_->ClearAll();
} else if (clear_before_idx_ > 0) {
pi_->ClearBeforeIdx(clear_before_idx_);
clear_before_idx_ = 0;
}
read_num = pi_->ReadBam();
refresh_bam_arr();
return read_num;
}
int AsyncIoBamBuf::async_read_bam() {
int read_num = 0;
if (first_read_) {
read_num = pi_->ReadBam();
first_read_ = false;
refresh_bam_arr();
} else {
// join, 交换缓冲区指针
pthread_join(*tid_, 0);
resize_buf();
if (need_read_) { // 需要交换指针
BamBuf *tmp = pi_;
pi_ = po_;
po_ = tmp;
}
read_num = last_read_num_;
refresh_bam_arr();
}
// 异步读
pthread_create(tid_, 0, async_read, this);
return read_num;
}
void *AsyncIoBamBuf::async_read(void *data) {
AsyncIoBamBuf *ab = (AsyncIoBamBuf *)data;
if (ab->need_read_ && ab->ReadStat() >= 0) { // 需要读取
ab->last_read_num_ = ab->po_->ReadBam();
} else {
ab->last_read_num_ = 0;
}
pthread_exit(0);
}
// 为下一次读取做准备,
// 计算一些边界条件延迟操作因为此时可能po_对应的buf正在读取
void AsyncIoBamBuf::ClearBeforeIdx(size_t idxInBv) { clear_before_idx_ = idxInBv; }
// 清空上一次所有读入的数据延迟操作因为此时可能po_对应的buf正在读取
void AsyncIoBamBuf::ClearAll() { clear_all_ = true; }
inline void AsyncIoBamBuf::resize_buf() {
if (clear_all_) { // 清理上一轮的数据
clear_all_ = false;
po_->ClearBeforeIdx(legacy_size_);
pi_->ClearAll();
if (pi_->handle_last_read()) { // 上次读取有一个read没放入缓存
last_read_num_ += 1;
legacy_size_ = pi_->Size(); // 应该只有一个read
need_read_ = true;
} else { // 没空间存放,则不交换指针,或者文件已经读取完毕
legacy_size_ = 0;
need_read_ = false;
}
} else if (clear_before_idx_ > 0) {
if (clear_before_idx_ < legacy_size_) {
po_->ClearBeforeIdx(clear_before_idx_);
legacy_size_ -= clear_before_idx_;
// 不需要交换指针,不需要读取
need_read_ = false;
} else {
po_->ClearBeforeIdx(legacy_size_);
pi_->ClearBeforeIdx(clear_before_idx_ - legacy_size_);
if (pi_->handle_last_read()) { // 上次读取有一个read没放入缓存
last_read_num_ += 1;
legacy_size_ = pi_->Size(); // 应该只有一个read
need_read_ = true;
} else { // 没空间存放,则不交换指针,或者文件已经读取完毕
legacy_size_ = 0;
need_read_ = false;
}
}
clear_before_idx_ = 0;
}
}

View File

@ -0,0 +1,172 @@
/*
Description: sam/bambuf
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2019/11/27
*/
#pragma once
#include <htslib/sam.h>
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <fstream>
#include <functional>
#include <iostream>
#include <sstream>
#include <vector>
#include "bam_wrap.h"
using std::vector;
using namespace std;
/*
* bam
*/
struct BamBuf {
sam_hdr_t *hdr; // sam文件的header信息
samFile *fp; // sam文件指针
BamWrap *bw = nullptr; // 用来循环读入bam
uint8_t *mem = nullptr; // 用来存放bam的数据,
// 程序结束后自动释放,所以没在析构函数里释放
int64_t mem_offset = 0; // 下一次要存放的位置
int64_t mem_size; // 缓存大小
int read_stat_ = 0; // 读取状态,是否读完
vector<BamWrap *> bv; // 方便对bam数据的访问
int64_t legacy_start = 0; // 没处理完的bam在mem中的起始位置, 闭区间
int64_t legacy_end = 0; // 没处理完的bam在mem中的结束位置, 开区间
bool handle_last = false; // 上次最后读入的bam是否需要处理
// 初始化缓存
void Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size);
// 读取数据直到读完,或者缓冲区满
int ReadBam();
// 为下一次读取做准备, 计算一些边界条件
void ClearBeforeIdx(size_t idxInBv);
// 清空上一次所有读入的数据
void ClearAll();
inline int64_t Size() { return bv.size(); } // 包含多少个read
inline int ReadStat() { return read_stat_; } // 文件的读取状态,是否可读(读取完全)
~BamBuf() {
if (this->mem != nullptr) {
free(this->mem);
}
if (this->bw != nullptr) {
bam_destroy1(bw->b);
free(bw);
}
}
void FreeMemory() // 释放开辟的内存
{
if (this->mem != nullptr) {
free(this->mem);
}
if (this->bw != nullptr) {
bam_destroy1(bw->b);
free(bw);
}
this->mem = nullptr;
this->bw = nullptr;
}
void prepare_read();
// 检查缓存是否还有空间
bool has_enough_space();
// 处理一个读取后的bam
void append_one_bam();
// 处理上次读入的最后一个read
bool handle_last_read();
// 针对bv的操作
inline BamWrap *operator[](int64_t pos) { return bv[pos]; }
inline void push_back(BamWrap *val) { bv.push_back(val); }
inline void clear() { bv.clear(); }
inline void resize(int64_t s) { bv.resize(s); }
};
/*
* io
*/
struct AsyncIoBamBuf {
BamBuf buf1_;
BamBuf buf2_;
BamBuf *pi_; // 当前用的buf
BamBuf *po_; // 后台在读取的buf
pthread_t *tid_ = NULL;
bool hasThread = false;
int64_t legacy_size_ = 0; // 上一轮运算之后缓存中还剩余的上次读取的read数量
bool first_read_ = true;
int last_read_num_ = 0; // 上一次读取了多少reads
bool need_read_ = true;
bool use_async_io_ = true;
int64_t clear_before_idx_ = 0; // 用户异步读取下一轮读取之前清理掉clear_before_idx_之前的所有reads
bool clear_all_ = false; // 用于异步读取下一轮读取之前清理掉之前的所有reads
vector<BamWrap *> bam_arr_; // 用来访问buf中的bam
AsyncIoBamBuf() {}
AsyncIoBamBuf(bool use_async) : use_async_io_(use_async) {}
// 析构
~AsyncIoBamBuf() {
if (tid_ != NULL) {
if (hasThread)
pthread_join(*tid_, 0);
free(tid_);
}
// 其他的内存就等程序结束自动释放
// buf的析构函数会自动调用
}
// 初始化缓存
void Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size);
// 读取数据
int ReadBam();
// 为下一次读取做准备, 计算一些边界条件
void ClearBeforeIdx(size_t idxInBv);
vector<BamWrap *> &GetBamArr() { return bam_arr_; } // 获取bam array
// 清空上一次所有读入的数据
void ClearAll();
// 包含的read数量
inline int64_t Size() { return legacy_size_ + pi_->Size(); }
inline int ReadStat() { return pi_->read_stat_; }
inline BamWrap *operator[](int64_t pos) { return bam_arr_[pos]; }
// 获取某一段reads
inline vector<BamWrap *> Slice(size_t startIdx, size_t endIdx) {
if (endIdx > startIdx) {
auto begItr = bam_arr_.begin();
return std::move(vector<BamWrap *>(begItr + startIdx, begItr + endIdx));
}
return std::move(vector<BamWrap *>());
}
void FreeMemory() {
buf1_.FreeMemory();
buf2_.FreeMemory();
}
// 同步读取
int sync_read_bam();
// 异步读取
int async_read_bam();
// 异步读取线程函数
static void *async_read(void *data);
void resize_buf();
inline void refresh_bam_arr() {
bam_arr_.resize(this->Size());
for (int i = 0; i < bam_arr_.size(); ++i) {
if (i < legacy_size_)
bam_arr_[i] = (*po_)[i];
else
bam_arr_[i] = (*pi_)[i - legacy_size_];
}
}
};
typedef AsyncIoBamBuf BamBufType;
typedef vector<BamWrap *> BamArray;

View File

@ -0,0 +1,369 @@
/*
Description: sam/bambuf
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2019/11/27
*/
#pragma once
#include <htslib/sam.h>
#include <limits.h>
#include <math.h>
#include <map>
#include <sstream>
#include <string>
#include <vector>
using namespace std;
/*
线
bam
*/
/*
* sam read
*/
struct BamWrap {
// 将contig左移后加上pos作为全局位置
const static int MAX_CONTIG_LEN_SHIFT = 40; // 将染色体id左移多少位和位点拼合在一起
const static int READ_MAX_LENGTH = 200;
const static int READ_MAX_DEPTH = 1000; // 这只是用来初始化空间用的,深度大于这个值也没关系
// 成员变量尽量少,减少占用内存空间
bam1_t *b;
int64_t end_pos_; // bam的全局结束位置, 闭区间
// 全局开始位置
inline int64_t start_pos() { return bam_global_pos(b); }
// 全局结束位置
inline int64_t end_pos() { return end_pos_; }
// 和reference对应的序列长度
inline int16_t read_len() { return (end_pos_ - start_pos() + 1); }
// 在contig内的开始位置
inline int32_t contig_pos() { return b->core.pos; }
// 在contig内部的结束位置
inline int32_t contig_end_pos() { return bam_pos(end_pos_); }
// 序列的长度AGTC字母个数
inline int16_t seq_len() { return b->core.l_qseq; }
// 算上开头的softclip
inline int32_t softclip_start() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
const char c = bam_cigar_opchr(cigar[0]);
const int len = bam_cigar_oplen(cigar[0]);
if (c == 'S')
return bc.pos - len;
return bc.pos;
}
// 算上结尾的softclip
inline int32_t softclip_end() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
const char c = bam_cigar_opchr(cigar[bc.n_cigar - 1]);
const int len = bam_cigar_oplen(cigar[bc.n_cigar - 1]);
if (c == 'S')
return bam_pos(end_pos_) + len;
return bam_pos(end_pos_);
}
// 算上结尾的softclip
inline int32_t right_softclip_len() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
const char c = bam_cigar_opchr(cigar[bc.n_cigar - 1]);
const int len = bam_cigar_oplen(cigar[bc.n_cigar - 1]);
if (c == 'S')
return len;
return 0;
}
// 获取序列
inline std::string sequence() {
ostringstream oss;
char *seq = (char *)bam_get_seq(b);
const bam1_core_t &bc = b->core;
const char base_to_char[16] = {'N', 'A', 'C', 'N', 'G', 'N', 'N', 'N', 'T', 'N', 'N', 'N', 'N', 'N', 'N', 'N'};
for (int i = 0; i < bc.l_qseq; ++i) {
char base = base_to_char[bam_seqi(seq, i)];
oss << base;
}
return std::move(oss.str());
}
// 获取名字
inline const char *query_name() { return bam_get_qname(b); }
// 获取cigar 字符串
inline string cigar_str() {
ostringstream oss;
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
oss << len << c;
}
return std::move(oss.str());
}
// 占用的内存大小
inline int16_t length() { return sizeof(*this) + sizeof(bam1_t) + b->l_data; }
// 获取cigar中insert的总长度
inline int32_t insert_cigar_len() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
int ret = 0;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
if (c == 'I')
ret += len;
}
return ret;
}
// 获取cigar中delete的总长度
inline int32_t del_cigar_len() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
int ret = 0;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
if (c == 'D')
ret += len;
}
return ret;
}
// 计算sam read的终点位置
static inline int64_t BamEndPos(const bam1_t *b) {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
int start_offset = -1;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
if (c == 'D' || c == 'N' || c == 'M' || c == '=' || c == 'X')
start_offset += len;
}
return (((int64_t)b->core.tid << MAX_CONTIG_LEN_SHIFT) | (int64_t)(b->core.pos + start_offset));
};
bool HasWellDefinedFragmentSize() {
const bam1_core_t &bc = b->core;
bool hasWellDefinedFragmentSize = true;
if (bc.isize == 0 || !(bc.flag & BAM_FPAIRED) || ((bc.flag & BAM_FUNMAP) || (bc.flag & BAM_FMUNMAP)) ||
((bool)(bc.flag & BAM_FREVERSE) == (bool)(bc.flag & BAM_FMREVERSE))) {
hasWellDefinedFragmentSize = false;
} else if (bc.flag & BAM_FREVERSE) {
hasWellDefinedFragmentSize = contig_end_pos() > bc.mpos ? true : false;
} else {
hasWellDefinedFragmentSize = bc.pos <= bc.mpos + bc.isize ? true : false;
}
return hasWellDefinedFragmentSize;
}
// 计算bam的adapterBoundary
int GetAdapterBoundary() {
const bam1_core_t &bc = b->core;
int adapterBoundary;
if (!HasWellDefinedFragmentSize())
adapterBoundary = INT_MIN;
else if (bc.flag & BAM_FREVERSE)
adapterBoundary = bc.mpos - 1;
else
adapterBoundary = bc.pos + abs(bc.isize); // GATK4.0 和 GATK3.5不一样3.5的这里+1
return adapterBoundary;
}
// 获取开头的I的长度
inline int GetHeadInsertLen() {
int insLen = 0;
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
if (c == 'I') {
insLen = len;
break;
} else if (c != 'H' && c != 'S')
break;
}
return insLen;
}
// 获取soft clip开始位置(能处理H和S相连的情况有这种情况么,
// 注意开头的I要当做S)
inline int64_t GetSoftStart() {
int64_t softStart = b->core.pos;
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
if (c == 'S' || c == 'I')
softStart -= len;
else if (c != 'H')
break;
}
return softStart;
}
// 获取unclipped开始位置(包括hardclip)
inline int64_t GetUnclippedStart() {
int64_t start = b->core.pos;
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
if (c == 'S' || c == 'H')
start -= len;
else
break;
}
return start;
}
// 获取unclipped结束位置(包括hardclip)
inline int64_t GetUnclippedEnd() {
int64_t end_pos = bam_endpos(b);
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
for (int i = bc.n_cigar - 1; i >= 0; --i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
if (c == 'S' || c == 'H')
end_pos += len;
else
break;
}
return end_pos - 1;
}
/* 获取碱基质量分数的加和 */
/** Calculates a score for the read which is the sum of scores over Q15. */
inline int GetSumOfBaseQualities() {
int score = 0;
uint8_t *qual = bam_get_qual(b);
for (int i = 0; i < b->core.l_qseq; ++i) {
if (qual[i] >= 15)
score += qual[i];
}
return score;
}
/* 与flag相关的检测 */
/* 没有比对上 unmapped */
inline bool GetReadUnmappedFlag() { return b->core.flag & BAM_FUNMAP; }
/* Template having multiple segments in sequencing */
inline bool GetReadPairedFlag() { return b->core.flag & BAM_FPAIRED; }
/**
* the read fails platform/vendor quality checks.
*/
inline bool GetReadFailsVendorQualityCheckFlag() { return b->core.flag & BAM_FQCFAIL; }
/**
* the mate is unmapped.
*/
bool GetMateUnmappedFlag() { return b->core.flag & BAM_FMUNMAP; }
/**
* @return whether the alignment is secondary (an alternative alignment of
* the read).
*/
bool IsSecondaryAlignment() { return b->core.flag & BAM_FSECONDARY; }
/**
* @return whether the alignment is supplementary (a split alignment such as
* a chimeric alignment).
*/
bool GetSupplementaryAlignmentFlag() { return b->core.flag & BAM_FSUPPLEMENTARY; }
/*
* Tests if this record is either a secondary and/or supplementary
* alignment;
*/
bool IsSecondaryOrSupplementary() { return IsSecondaryAlignment() || GetSupplementaryAlignmentFlag(); }
/**
* the read is the first read in a pair.
*/
bool GetFirstOfPairFlag() { return b->core.flag & BAM_FREAD1; }
/**
* strand of the query (false for forward; true for reverse strand).
*/
bool GetReadNegativeStrandFlag() { return b->core.flag & BAM_FREVERSE; }
/**
* strand of the mate (false for forward; true for reverse strand).
*/
bool GetMateNegativeStrandFlag() { return b->core.flag & BAM_FMREVERSE; }
/* 其他的一些信息 */
inline int GetReferenceLength() {
int length = 0;
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
for (int i = 0; i < bc.n_cigar; ++i) {
const char c = bam_cigar_opchr(cigar[i]);
const int len = bam_cigar_oplen(cigar[i]);
switch (c) {
case 'M':
case 'D':
case 'N':
case '=':
case 'X':
length += len;
break;
default:
break;
}
}
return length;
}
// 计算bam的全局位置算上染色体序号和比对位置
static inline int64_t bam_global_pos(bam1_t *b) {
return (((int64_t)b->core.tid << MAX_CONTIG_LEN_SHIFT) | (int64_t)b->core.pos);
}
static inline int64_t bam_global_pos(int tid, int pos) {
return (((int64_t)tid << MAX_CONTIG_LEN_SHIFT) | (int64_t)pos);
}
// 根据全局位置获取bam的染色体序号
static inline int32_t bam_tid(int64_t global_pos) {
const int64_t mask = ~(((int64_t)1 << MAX_CONTIG_LEN_SHIFT) - 1);
const int64_t high_tid = global_pos & mask;
return (int32_t)(high_tid >> MAX_CONTIG_LEN_SHIFT);
}
// 根据全局位置获取bam的比对位置(染色体内)
static inline int32_t bam_pos(int64_t global_pos) {
const int64_t mask = ((int64_t)1 << MAX_CONTIG_LEN_SHIFT) - 1;
return (int32_t)(global_pos & mask);
}
// 设置是否冗余的标记
void SetDuplicateReadFlag(bool flag) { setFlag(flag, BAM_FDUP); }
void setFlag(bool flag, int bit) {
if (flag)
this->b->core.flag |= bit;
else
this->b->core.flag &= ~bit;
}
};
typedef std::map<const std::string, std::vector<BamWrap *>> SampleBamMap;

View File

@ -0,0 +1,83 @@
/*
Description: Murmur
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2023/11/6
*/
#include <random>
#include <string>
using std::string;
/**
* Provides an implementation of the Murmur3_32 hash algorithm that has
* desirable properties in terms of randomness and uniformity of the
* distribution of output values that make it a useful hashing algorithm for
* downsampling.
*/
struct Murmur3 {
int seed_ = 0;
/** Hashes a character stream to an int using Murmur3. */
int HashUnencodedChars(const string &input) {
int h1 = this->seed_;
// step through the CharSequence 2 chars at a time
const int length = input.size();
for (int i = 1; i < length; i += 2) {
int k1 = input.at(i - 1) | (input.at(i) << 16);
k1 = mixK1(k1);
h1 = mixH1(h1, k1);
}
// deal with any remaining characters
if ((length & 1) == 1) {
int k1 = input.at(length - 1);
k1 = mixK1(k1);
h1 ^= k1;
}
return fmix(h1, 2 * length);
}
static Murmur3 &Instance() {
static Murmur3 instance;
return instance;
}
static int mixK1(int k1) {
const int c1 = 0xcc9e2d51;
const int c2 = 0x1b873593;
k1 *= c1;
k1 = k1 << 15;
k1 *= c2;
return k1;
}
static int mixH1(int h1, int k1) {
h1 ^= k1;
h1 = h1 << 13;
h1 = h1 * 5 + 0xe6546b64;
return h1;
}
// Finalization mix - force all bits of a hash block to avalanche
static int fmix(int h1, int length) {
h1 ^= length;
h1 ^= (unsigned int)h1 >> 16;
h1 *= 0x85ebca6b;
h1 ^= (unsigned int)h1 >> 13;
h1 *= 0xc2b2ae35;
h1 ^= (unsigned int)h1 >> 16;
return h1;
}
private:
Murmur3() {
auto &&rd = std::random_device{};
seed_ = rd();
}
};

View File

@ -0,0 +1,70 @@
#include "profiling.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#ifdef SHOW_PERF
uint64_t tprof[LIM_THREAD_PROF_TYPE][LIM_THREAD] = {0};
uint64_t proc_freq = 1000;
uint64_t gprof[LIM_GLOBAL_PROF_TYPE] = {0};
#endif
uint64_t RealtimeMsec(void) {
struct timeval tv;
gettimeofday(&tv, NULL);
return (uint64_t)1000 * (tv.tv_sec + ((1e-6) * tv.tv_usec));
}
static int CalcThreadTime(uint64_t *a, int len, double *max, double *min, double *avg) {
#ifdef SHOW_PERF
int i = 0;
uint64_t umax = 0, umin = UINT64_MAX, uavg = 0;
for (i = 0; i < len; i++) {
if (a[i] > umax)
umax = a[i];
if (a[i] < umin)
umin = a[i];
uavg += a[i];
}
*avg = uavg * 1.0 / len / proc_freq;
*max = umax * 1.0 / proc_freq;
*min = umin * 1.0 / proc_freq;
#endif
return 1;
}
#define PRINT_GP(gpname) \
fprintf(stderr, "time G %-15s: %0.2lfs\n", #gpname, gprof[GP_##gpname] * 1.0 / proc_freq);
#define PRINT_TP(tpname, nthread) \
{ \
double maxTime, minTime, avgTime; \
CalcThreadTime(tprof[TP_##tpname], nthread, &maxTime, &minTime, &avgTime); \
fprintf(stderr, "time T %-15s: avg %0.2lfs min %0.2lfs max %0.2lfs\n", #tpname, avgTime, minTime, maxTime); \
}
int DisplayProfiling(int nthread) {
#ifdef SHOW_PERF
fprintf(stderr, "\n");
PRINT_GP(read_wait);
PRINT_GP(gen_wait);
PRINT_GP(sort_wait);
PRINT_GP(markdup_wait);
PRINT_GP(intersect_wait);
PRINT_GP(read);
PRINT_GP(gen);
PRINT_GP(sort);
PRINT_GP(markdup);
PRINT_GP(intersect);
PRINT_GP(merge_result);
PRINT_TP(gen, nthread);
PRINT_TP(sort, nthread);
fprintf(stderr, "\n");
#endif
return 0;
}

View File

@ -0,0 +1,58 @@
#pragma once
#include <stdint.h>
#include <stdlib.h>
#include <sys/time.h>
// #define SHOW_PERF
#ifdef __cplusplus
extern "C" {
#endif
#define LIM_THREAD 128
#define LIM_THREAD_PROF_TYPE 128
#define LIM_GLOBAL_PROF_TYPE 128
#define LIM_THREAD_DATA_TYPE 128
#define LIM_GLOBAL_DATA_TYPE 128
#ifdef SHOW_PERF
extern uint64_t proc_freq;
extern uint64_t tprof[LIM_THREAD_PROF_TYPE][LIM_THREAD];
extern uint64_t gprof[LIM_GLOBAL_PROF_TYPE];
#endif
#ifdef SHOW_PERF
#define PROF_START(tmp_time) uint64_t prof_tmp_##tmp_time = RealtimeMsec()
#define PROF_START_AGAIN(tmp_time) prof_tmp_##tmp_time = RealtimeMsec()
#define PROF_END(result, tmp_time) result += RealtimeMsec() - prof_tmp_##tmp_time
#else
#define PROF_START(tmp_time)
#define PROF_END(result, tmp_time)
#endif
// GLOBAL
enum { GP_0 = 0, GP_1, GP_2, GP_3, GP_4, GP_5, GP_6, GP_7, GP_8, GP_9, GP_10 };
enum {
GP_read_wait = 11,
GP_gen_wait,
GP_sort_wait,
GP_markdup_wait,
GP_intersect_wait,
GP_read,
GP_gen,
GP_sort,
GP_markdup,
GP_intersect,
GP_merge_result
};
// THREAD
enum { TP_0 = 0, TP_1, TP_2, TP_3, TP_4, TP_5, TP_6, TP_7, TP_8, TP_9, TP_10 };
enum { TP_gen = 11, TP_sort };
uint64_t RealtimeMsec(void);
int DisplayProfiling(int);
#ifdef __cplusplus
}
#endif

View File

View File

View File

@ -0,0 +1,395 @@
/* yarn.c -- generic thread operations implemented using pthread functions
* Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler
* Version 1.7 12 Apr 2020 Mark Adler
* For conditions of distribution and use, see copyright notice in yarn.h
*/
/* Basic thread operations implemented using the POSIX pthread library. All
pthread references are isolated within this module to allow alternate
implementations with other thread libraries. See yarn.h for the description
of these operations. */
/* Version history:
1.0 19 Oct 2008 First version
1.1 26 Oct 2008 No need to set the stack size -- remove
Add yarn_abort() function for clean-up on error exit
1.2 19 Dec 2011 (changes reversed in 1.3)
1.3 13 Jan 2012 Add large file #define for consistency with pigz.c
Update thread portability #defines per IEEE 1003.1-2008
Fix documentation in yarn.h for yarn_prefix
1.4 19 Jan 2015 Allow yarn_abort() to avoid error message to stderr
Accept and do nothing for NULL argument to free_lock()
1.5 8 May 2018 Remove destruct() to avoid use of pthread_cancel()
Normalize the code style
1.6 3 Apr 2019 Add debugging information to fail() error messages
1.7 12 Apr 2020 Fix use after free bug in ignition()
*/
// For thread portability.
#define _XOPEN_SOURCE 700
#define _POSIX_C_SOURCE 200809L
#define _THREAD_SAFE
// Use large file functions if available.
#define _FILE_OFFSET_BITS 64
// External libraries and entities referenced.
#include <pthread.h> // pthread_t, pthread_create(), pthread_join(),
#include <stdio.h> // fprintf(), stderr
#include <stdlib.h> // exit(), malloc(), free(), NULL
// pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
// PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
// pthread_self(), pthread_equal(),
// pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
// pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
// pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
// pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy()
#include <errno.h> // EPERM, ESRCH, EDEADLK, ENOMEM, EBUSY, EINVAL, EAGAIN
// Interface definition.
#include "yarn.h"
// Constants.
#define local static // for non-exported functions and globals
namespace yarn {
// Error handling external globals, resettable by application.
char *yarn_prefix = (char *)"yarn";
void (*yarn_abort)(int) = NULL;
// Immediately exit -- use for errors that shouldn't ever happen.
local void fail(int err, char const *file, long line, char const *func) {
fprintf(stderr, "%s: ", yarn_prefix);
switch (err) {
case EPERM:
fputs("already unlocked", stderr);
break;
case ESRCH:
fputs("no such thread", stderr);
break;
case EDEADLK:
fputs("resource deadlock", stderr);
break;
case ENOMEM:
fputs("out of memory", stderr);
break;
case EBUSY:
fputs("can't destroy locked resource", stderr);
break;
case EINVAL:
fputs("invalid request", stderr);
break;
case EAGAIN:
fputs("resource unavailable", stderr);
break;
default:
fprintf(stderr, "internal error %d", err);
}
fprintf(stderr, " (%s:%ld:%s)\n", file, line, func);
if (yarn_abort != NULL)
yarn_abort(err);
exit(err);
}
// Memory handling routines provided by user. If none are provided, malloc()
// and free() are used, which are therefore assumed to be thread-safe.
typedef void *(*malloc_t)(size_t);
typedef void (*free_t)(void *);
local malloc_t my_malloc_f = malloc;
local free_t my_free = free;
// Use user-supplied allocation routines instead of malloc() and free().
void yarn_mem(malloc_t lease, free_t vacate) {
my_malloc_f = lease;
my_free = vacate;
}
// Memory allocation that cannot fail (from the point of view of the caller).
local void *my_malloc(size_t size, char const *file, long line) {
void *block;
if ((block = my_malloc_f(size)) == NULL)
fail(ENOMEM, file, line, "malloc");
return block;
}
// -- Lock functions --
struct lock_s {
pthread_mutex_t mutex;
pthread_cond_t cond;
long value;
};
lock_t *new_lock_(long initial, char const *file, long line) {
lock_t *bolt = (lock_t *)my_malloc(sizeof(struct lock_s), file, line);
int ret = pthread_mutex_init(&(bolt->mutex), NULL);
if (ret)
fail(ret, file, line, "mutex_init");
ret = pthread_cond_init(&(bolt->cond), NULL);
if (ret)
fail(ret, file, line, "cond_init");
bolt->value = initial;
return bolt;
}
void possess_(lock_t *bolt, char const *file, long line) {
int ret = pthread_mutex_lock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_lock");
}
void release_(lock_t *bolt, char const *file, long line) {
int ret = pthread_mutex_unlock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_unlock");
}
void twist_(lock_t *bolt, enum twist_op op, long val, char const *file, long line) {
if (op == TO)
bolt->value = val;
else if (op == BY)
bolt->value += val;
int ret = pthread_cond_broadcast(&(bolt->cond));
if (ret)
fail(ret, file, line, "cond_broadcast");
ret = pthread_mutex_unlock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_unlock");
}
#define until(a) while (!(a))
void wait_for_(lock_t *bolt, enum wait_op op, long val, char const *file, long line) {
switch (op) {
case TO_BE:
until(bolt->value == val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case NOT_TO_BE:
until(bolt->value != val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case TO_BE_MORE_THAN:
until(bolt->value > val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case TO_BE_LESS_THAN:
until(bolt->value < val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
}
}
long peek_lock(lock_t *bolt) { return bolt->value; }
void free_lock_(lock_t *bolt, char const *file, long line) {
if (bolt == NULL)
return;
int ret = pthread_cond_destroy(&(bolt->cond));
if (ret)
fail(ret, file, line, "cond_destroy");
ret = pthread_mutex_destroy(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_destroy");
my_free(bolt);
}
// -- Thread functions (uses the lock_t functions above) --
struct thread_s {
pthread_t id;
int done; // true if this thread has exited
thread *next; // for list of all launched threads
};
// List of threads launched but not joined, count of threads exited but not
// joined (incremented by ignition() just before exiting).
local lock_t threads_lock = {
PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,
0 // number of threads exited but not joined
};
local thread *threads = NULL; // list of extant threads
// Structure in which to pass the probe and its payload to ignition().
struct capsule {
void (*probe)(void *);
void *payload;
char const *file;
long line;
};
// Mark the calling thread as done and alert join_all().
local void reenter(void *arg) {
struct capsule *capsule = (struct capsule *)arg;
// find this thread in the threads list by matching the thread id
pthread_t me = pthread_self();
possess_(&(threads_lock), capsule->file, capsule->line);
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (pthread_equal(match->id, me))
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, capsule->file, capsule->line, "reenter lost");
// mark this thread as done and move it to the head of the list
match->done = 1;
if (threads != match) {
*prior = match->next;
match->next = threads;
threads = match;
}
// update the count of threads to be joined and alert join_all()
twist_(&(threads_lock), BY, +1, capsule->file, capsule->line);
// free the capsule resource, even if the thread is cancelled (though yarn
// doesn't use pthread_cancel() -- you never know)
my_free(capsule);
}
// All threads go through this routine. Just before a thread exits, it marks
// itself as done in the threads list and alerts join_all() so that the thread
// resources can be released. Use a cleanup stack so that the marking occurs
// even if the thread is cancelled.
local void *ignition(void *arg) {
struct capsule *capsule = (struct capsule *)arg;
// run reenter() before leaving
pthread_cleanup_push(reenter, arg);
// execute the requested function with argument
capsule->probe(capsule->payload);
// mark this thread as done, letting join_all() know, and free capsule
pthread_cleanup_pop(1);
// exit thread
return NULL;
}
// Not all POSIX implementations create threads as joinable by default, so that
// is made explicit here.
thread *launch_(void (*probe)(void *), void *payload, char const *file, long line) {
// construct the requested call and argument for the ignition() routine
// (allocated instead of automatic so that we're sure this will still be
// there when ignition() actually starts up -- ignition() will free this
// allocation)
struct capsule *capsule = (struct capsule *)my_malloc(sizeof(struct capsule), file, line);
capsule->probe = probe;
capsule->payload = payload;
capsule->file = file;
capsule->line = line;
// assure this thread is in the list before join_all() or ignition() looks
// for it
possess_(&(threads_lock), file, line);
// create the thread and call ignition() from that thread
thread *th = (thread *)my_malloc(sizeof(struct thread_s), file, line);
pthread_attr_t attr;
int ret = pthread_attr_init(&attr);
if (ret)
fail(ret, file, line, "attr_init");
ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (ret)
fail(ret, file, line, "attr_setdetachstate");
ret = pthread_create(&(th->id), &attr, ignition, capsule);
if (ret)
fail(ret, file, line, "create");
ret = pthread_attr_destroy(&attr);
if (ret)
fail(ret, file, line, "attr_destroy");
// put the thread in the threads list for join_all()
th->done = 0;
th->next = threads;
threads = th;
release_(&(threads_lock), file, line);
return th;
}
void join_(thread *ally, char const *file, long line) {
// wait for thread to exit and return its resources
int ret = pthread_join(ally->id, NULL);
if (ret)
fail(ret, file, line, "join");
// find the thread in the threads list
possess_(&(threads_lock), file, line);
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (match == ally)
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, file, line, "join lost");
// remove thread from list and update exited count, free thread
if (match->done)
threads_lock.value--;
*prior = match->next;
release_(&(threads_lock), file, line);
my_free(ally);
}
// This implementation of join_all() only attempts to join threads that have
// announced that they have exited (see ignition()). When there are many
// threads, this is faster than waiting for some random thread to exit while a
// bunch of other threads have already exited.
int join_all_(char const *file, long line) {
// grab the threads list and initialize the joined count
int count = 0;
possess_(&(threads_lock), file, line);
// do until threads list is empty
while (threads != NULL) {
// wait until at least one thread has reentered
wait_for_(&(threads_lock), NOT_TO_BE, 0, file, line);
// find the first thread marked done (should be at or near the top)
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (match->done)
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, file, line, "join_all lost");
// join the thread (will be almost immediate), remove from the threads
// list, update the reenter count, and free the thread
int ret = pthread_join(match->id, NULL);
if (ret)
fail(ret, file, line, "join");
threads_lock.value--;
*prior = match->next;
my_free(match);
count++;
}
// let go of the threads list and return the number of threads joined
release_(&(threads_lock), file, line);
return count;
}
}; // namespace yarn

View File

@ -0,0 +1,150 @@
/* yarn.h -- generic interface for thread operations
* Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler
* Version 1.7 12 Apr 2020 Mark Adler
*/
/*
This software is provided 'as-is', without any express or implied
warranty. In no event will the author be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.
Mark Adler
madler@alumni.caltech.edu
*/
/* Basic thread operations
This interface isolates the local operating system implementation of threads
from the application in order to facilitate platform independent use of
threads. All of the implementation details are deliberately hidden.
Assuming adequate system resources and proper use, none of these functions
can fail. As a result, any errors encountered will cause an exit() to be
executed, or the execution of your own optionally-provided abort function.
These functions allow the simple launching and joining of threads, and the
locking of objects and synchronization of changes of objects. The latter is
implemented with a single lock_t type that contains an integer value. The
value can be ignored for simple exclusive access to an object, or the value
can be used to signal and wait for changes to an object.
-- Arguments --
thread *thread; identifier for launched thread, used by join
void probe(void *); pointer to function "probe", run when thread starts
void *payload; single argument passed to the probe function
lock_t *lock_t; a lock_t with a value -- used for exclusive access to
an object and to synchronize threads waiting for
changes to an object
long val; value to set lock_t, increment lock_t, or wait for
int n; number of threads joined
-- Thread functions --
thread = launch(probe, payload) - launch a thread -- exit via probe() return
join(thread) - join a thread and by joining end it, waiting for the thread
to exit if it hasn't already -- will free the resources allocated by
launch() (don't try to join the same thread more than once)
n = join_all() - join all threads launched by launch() that are not joined
yet and free the resources allocated by the launches, usually to clean
up when the thread processing is done -- join_all() returns an int with
the count of the number of threads joined (join_all() should only be
called from the main thread, and should only be called after any calls
of join() have completed)
-- Lock functions --
lock_t = new_lock(val) - create a new lock_t with initial value val (lock_t is
created in the released state)
possess(lock_t) - acquire exclusive possession of a lock_t, waiting if necessary
twist(lock_t, [TO | BY], val) - set lock_t to or increment lock_t by val, signal
all threads waiting on this lock_t and then release the lock_t -- must
possess the lock_t before calling (twist releases, so don't do a
release() after a twist() on the same lock_t)
wait_for(lock_t, [TO_BE | NOT_TO_BE | TO_BE_MORE_THAN | TO_BE_LESS_THAN], val)
- wait on lock_t value to be, not to be, be greater than, or be less than
val -- must possess the lock_t before calling, will possess the lock_t on
return but the lock_t is released while waiting to permit other threads
to use twist() to change the value and signal the change (so make sure
that the object is in a usable state when waiting)
release(lock_t) - release a possessed lock_t (do not try to release a lock_t that
the current thread does not possess)
val = peek_lock(lock_t) - return the value of the lock_t (assumes that lock_t is
already possessed, no possess or release is done by peek_lock())
free_lock(lock_t) - free the resources allocated by new_lock() (application
must assure that the lock_t is released before calling free_lock())
-- Memory allocation ---
yarn_mem(better_malloc, better_free) - set the memory allocation and free
routines for use by the yarn routines where the supplied routines have
the same interface and operation as malloc() and free(), and may be
provided in order to supply thread-safe memory allocation routines or
for any other reason -- by default malloc() and free() will be used
-- Error control --
yarn_prefix - a char pointer to a string that will be the prefix for any
error messages that these routines generate before exiting -- if not
changed by the application, "yarn" will be used
yarn_abort - an external function that will be executed when there is an
internal yarn error, due to out of memory or misuse -- this function
may exit to abort the application, or if it returns, the yarn error
handler will exit (set to NULL by default for no action)
*/
#pragma once
#include <stdlib.h>
namespace yarn {
extern char *yarn_prefix;
extern void (*yarn_abort)(int);
void yarn_mem(void *(*)(size_t), void (*)(void *));
typedef struct thread_s thread;
thread *launch_(void (*)(void *), void *, char const *, long);
#define LAUNCH(a, b) launch_(a, b, __FILE__, __LINE__)
void join_(thread *, char const *, long);
#define JOIN(a) join_(a, __FILE__, __LINE__)
int join_all_(char const *, long);
#define JOIN_ALL() join_all_(__FILE__, __LINE__)
typedef struct lock_s lock_t;
lock_t *new_lock_(long, char const *, long);
#define NEW_LOCK(a) new_lock_(a, __FILE__, __LINE__)
void possess_(lock_t *, char const *, long);
#define POSSESS(a) possess_(a, __FILE__, __LINE__)
void release_(lock_t *, char const *, long);
// #define release(a) release_(a, __FILE__, __LINE__)
#define RELEASE(a) release_(a, __FILE__, __LINE__)
enum twist_op { TO, BY };
void twist_(lock_t *, enum twist_op, long, char const *, long);
#define TWIST(a, b, c) twist_(a, b, c, __FILE__, __LINE__)
enum wait_op {
TO_BE,
/* or */ NOT_TO_BE, /* that is the question */
TO_BE_MORE_THAN,
TO_BE_LESS_THAN
};
void wait_for_(lock_t *, enum wait_op, long, char const *, long);
#define WAIT_FOR(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__)
long peek_lock(lock_t *);
#define PEEK_LOCK(a) peek_lock(a)
void free_lock_(lock_t *, char const *, long);
#define FREE_LOCK(a) free_lock_(a, __FILE__, __LINE__)
}; // namespace yarn