diff --git a/.vscode/settings.json b/.vscode/settings.json index 6c73617..c4e5c44 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,8 @@ "files.associations": { "cstring": "cpp", "vector": "cpp", - "random": "cpp" + "random": "cpp", + "ostream": "cpp", + "*.tcc": "cpp" } } \ No newline at end of file diff --git a/build.sh b/build.sh index 963fc94..e560501 100755 --- a/build.sh +++ b/build.sh @@ -3,6 +3,6 @@ dir="/home/zzh/work/GeneKit/picard_cpp/build" #[ -d "$dir" ] && rm -rf "$dir" #mkdir "$dir" cd "$dir" -cmake .. -DCMAKE_BUILD_TYPE=Debug -#cmake .. -DCMAKE_BUILD_TYPE=Release +#cmake .. -DCMAKE_BUILD_TYPE=Debug +cmake .. -DCMAKE_BUILD_TYPE=Release make -j 8 diff --git a/out.bam b/out.bam new file mode 100644 index 0000000..a776e8d Binary files /dev/null and b/out.bam differ diff --git a/run.sh b/run.sh index ede0c45..a43168d 100755 --- a/run.sh +++ b/run.sh @@ -1,8 +1,13 @@ /home/zzh/work/GeneKit/picard_cpp/build/bin/picard_cpp \ MarkDuplicates \ - --INPUT test.bam \ + --INPUT /mnt/d/data/zy_normal.bam \ --OUTPUT out.bam \ --num_threads 12 \ --max_mem 4G \ --verbosity DEBUG \ --asyncio true + + +# --INPUT /mnt/d/data/100w.bam \ +# --INPUT /mnt/d/data/zy_normal.bam \ + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5286867..b23d2cf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,6 +4,8 @@ SET(EXECUTABLE_OUTPUT_PATH "${PROJECT_BINARY_DIR}/bin") # 源码目录 AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src MAIN_SRC) AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/common COMMON) +AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/common/utils UTILS) +AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/common/hts HTS) AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/sam SAM_SRC) AUX_SOURCE_DIRECTORY(${PROJECT_SOURCE_DIR}/src/sam/markdups SAM_MARKDUPS_SRC) @@ -19,12 +21,15 @@ LINK_DIRECTORIES("${PROJECT_SOURCE_DIR}/lib/htslib") set(PG_NAME "picard_cpp") # 为程序添加依赖关系 -ADD_EXECUTABLE(${PG_NAME} ${MAIN_SRC} ${COMMON} ${SAM_SRC} ${SAM_MARKDUPS_SRC}) +ADD_EXECUTABLE(${PG_NAME} ${MAIN_SRC} ${COMMON} ${UTILS} ${HTS} + ${SAM_SRC} ${SAM_MARKDUPS_SRC}) # 链接库 TARGET_LINK_LIBRARIES(${PG_NAME} libhts.a) # 检测系统是否包含必需的库 + +# pthread库 find_package(Threads REQUIRED) if(THREADS_HAVE_PTHREAD_ARG) set_property(TARGET ${PG_NAME} PROPERTY COMPILE_OPTIONS "-pthread") @@ -32,4 +37,32 @@ if(THREADS_HAVE_PTHREAD_ARG) endif() if(CMAKE_THREAD_LIBS_INIT) TARGET_LINK_LIBRARIES(${PG_NAME} "${CMAKE_THREAD_LIBS_INIT}") -endif() \ No newline at end of file +endif() + +# bzip2库 +find_package(BZip2 REQUIRED) +if(BZip2_FOUND) + INCLUDE_DIRECTORIES(${BZIP2_INCLUDE_DIR}) + TARGET_LINK_LIBRARIES(${PG_NAME} ${BZIP2_LIBRARIES}) +endif() + +#openmp +find_package(OpenMP) +if(OPENMP_FOUND) + set_target_properties(${PG_NAME} PROPERTIES COMPILE_OPTIONS ${OpenMP_CXX_FLAGS}) + target_link_libraries(${PG_NAME} ${OpenMP_CXX_FLAGS}) +endif() + +# 链接库 +TARGET_LINK_LIBRARIES(${PG_NAME} -lz) +TARGET_LINK_LIBRARIES(${PG_NAME} -lm) +TARGET_LINK_LIBRARIES(${PG_NAME} -llzma) +TARGET_LINK_LIBRARIES(${PG_NAME} -lbz2) +TARGET_LINK_LIBRARIES(${PG_NAME} -lcurl) + +# 安装文件夹设置 +INSTALL(TARGETS ${PG_NAME} + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION libstatic + ) \ No newline at end of file diff --git a/src/common/hts/bam_buf.cpp b/src/common/hts/bam_buf.cpp new file mode 100755 index 0000000..84404c4 --- /dev/null +++ b/src/common/hts/bam_buf.cpp @@ -0,0 +1,322 @@ +/* + Description: 读入sam/bam时,开辟一个大的buf,存放这些数据 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/11/27 +*/ + +#include "bam_buf.h" + +/* + * BamBuf类 + */ +// 读取数据直到读完,或者缓冲区满 +int BamBuf::ReadBam() +{ + int read_num = 0; + if (handle_last) + { // 处理上次读入的最后一个bam + if (has_enough_space()) + { // 必须调用,在边界处调整memffset + ++read_num; + append_one_bam(); + } + else + { + return read_num; // 还是没空间 + } + } + while (read_stat_ >= 0 && (read_stat_ = sam_read1(fp, hdr, bw->b)) >= 0) + { + bw->end_pos_ = BamWrap::BamEndPos(bw->b); + if (has_enough_space()) + { // 还有空间 + append_one_bam(); + ++read_num; // 放进缓存才算读取到 + } + else + { + break; + } + } + if (read_stat_ >= 0) + { + handle_last = true; + } + else + { + handle_last = false; + } + return read_num; +} + +// 初始化缓存 +void BamBuf::Init(samFile *fp, + sam_hdr_t *hdr, + int64_t mem_size) +{ + this->fp = fp; + this->hdr = hdr; + this->mem_size = mem_size; + this->mem = (uint8_t *)malloc(mem_size); + this->bw = (BamWrap *)malloc(sizeof(BamWrap)); + this->bw->b = bam_init1(); + if (bw == NULL || + this->mem == NULL || + this->bw->b == NULL) + { + fprintf(stderr, "allocate memory failed! Abort\n"); + exit(-1); + } +} + +void BamBuf::ClearBeforeIdx(size_t idxInBv) +{ + if (idxInBv < 1) + return; + int i = 0, j = idxInBv; + for (; j < bv.size(); ++i, ++j) + { + bv[i] = bv[j]; + } + bv.resize(i); + prepare_read(); +} + +void BamBuf::ClearAll() +{ + + bv.clear(); + prepare_read(); +} + +// 为下一次读取做准备, 计算一些边界条件 +inline void BamBuf::prepare_read() +{ + // 计算余留的下次计算可能用到的bam所占的位置 + if (bv.size() > 0) + { + BamWrap *bw = bv[0]; + legacy_start = (int64_t)bw - (int64_t)mem; + bw = bv.back(); + legacy_end = (int64_t)bw + bw->length() - (int64_t)mem; + } + else + { + legacy_start = legacy_end = 0; + mem_offset = 0; // 上次没剩下,那就从头存储 + } +} + +// 检查缓存是否还有空间 +inline bool BamBuf::has_enough_space() +{ + const uint32_t bam_len = bw->length(); + int64_t potential_end = mem_offset + bam_len; + if (legacy_start <= legacy_end) + legacy_start += mem_size; + if (potential_end >= legacy_start) + { + return false; + } + if (potential_end >= mem_size) + { + mem_offset = 0; + } + int64_t virtual_offset = mem_offset; + if (virtual_offset < legacy_end) + virtual_offset += mem_size; + potential_end = virtual_offset + bam_len; + return potential_end < legacy_start; +} + +// 处理一个读取后的bam +inline void BamBuf::append_one_bam() +{ + BamWrap *bwp = (BamWrap *)(mem + mem_offset); + *bwp = *bw; + bwp->b = (bam1_t *)((char *)bwp + sizeof(*bwp)); + bam1_t *bp = bwp->b; + *bp = *bw->b; + bp->data = (uint8_t *)((char *)bwp->b + sizeof(bam1_t)); + memcpy(bp->data, bw->b->data, bw->b->l_data); + // 更新下次存储的位置 + mem_offset = (mem_offset + bw->length() + 8 - 1) & ~((size_t)(8 - 1)); + + // cout << "size: " << bv.size() << " " << buf_name << endl; + bv.push_back(bwp); +} + +// 处理上次读入的最后一个read +inline bool BamBuf::handle_last_read() +{ + if (handle_last) + { // 处理上次读入的最后一个bam + if (has_enough_space()) + { // 必须调用,在边界处调整memffset + append_one_bam(); + handle_last = false; + return true; + } + } + return false; +} + +/* + * AsyncIoBamBuf 类 + */ +// 初始化缓存 +void AsyncIoBamBuf::Init(samFile *fp, + sam_hdr_t *hdr, + int64_t mem_size) +{ + if (use_async_io_) + { + buf1_.Init(fp, hdr, mem_size >> 1); + buf2_.Init(fp, hdr, mem_size >> 1); + pi_ = &buf1_; + po_ = &buf2_; + tid_ = (pthread_t *)malloc(sizeof(pthread_t)); + } + else + { + buf1_.Init(fp, hdr, mem_size); + pi_ = &buf1_; + } +} + +// 读取数据 +int AsyncIoBamBuf::ReadBam() +{ + if (use_async_io_) + { + return async_read_bam(); + } + else + { + return sync_read_bam(); + } +} + +int AsyncIoBamBuf::sync_read_bam() +{ + int read_num = 0; + if (clear_all_) + { + clear_all_ = false; + pi_->ClearAll(); + } + else if (clear_before_idx_ > 0) + { + pi_->ClearBeforeIdx(clear_before_idx_); + clear_before_idx_ = 0; + } + read_num = pi_->ReadBam(); + refresh_bam_arr(); + return read_num; +} + +int AsyncIoBamBuf::async_read_bam() +{ + int read_num = 0; + if (first_read_) + { + read_num = pi_->ReadBam(); + first_read_ = false; + refresh_bam_arr(); + } + else + { + // join, 交换缓冲区指针 + pthread_join(*tid_, 0); + resize_buf(); + + if (need_read_) + { // 需要交换指针 + BamBuf *tmp = pi_; + pi_ = po_; + po_ = tmp; + } + read_num = last_read_num_; + refresh_bam_arr(); + } + // 异步读 + pthread_create(tid_, 0, async_read, this); + return read_num; +} + +void *AsyncIoBamBuf::async_read(void *data) +{ + AsyncIoBamBuf *ab = (AsyncIoBamBuf *)data; + if (ab->need_read_ && ab->ReadStat() >= 0) + { // 需要读取 + ab->last_read_num_ = ab->po_->ReadBam(); + } + else + { + ab->last_read_num_ = 0; + } + pthread_exit(0); +} + +// 为下一次读取做准备, 计算一些边界条件,延迟操作,因为此时可能po_对应的buf正在读取 +void AsyncIoBamBuf::ClearBeforeIdx(size_t idxInBv) +{ + clear_before_idx_ = idxInBv; +} + +// 清空上一次所有读入的数据,延迟操作,因为此时可能po_对应的buf正在读取 +void AsyncIoBamBuf::ClearAll() +{ + clear_all_ = true; +} + +inline void AsyncIoBamBuf::resize_buf() +{ + if (clear_all_) + { // 清理上一轮的数据 + clear_all_ = false; + po_->ClearBeforeIdx(legacy_size_); + pi_->ClearAll(); + if (pi_->handle_last_read()) // 上次读取有一个read没放入缓存 + { + last_read_num_ += 1; + legacy_size_ = pi_->Size(); // 应该只有一个read + need_read_ = true; + } + else // 没空间存放,则不交换指针,或者文件已经读取完毕 + { + legacy_size_ = 0; + need_read_ = false; + } + } + else if (clear_before_idx_ > 0) + { + if (clear_before_idx_ < legacy_size_) + { + po_->ClearBeforeIdx(clear_before_idx_); + legacy_size_ -= clear_before_idx_; + // 不需要交换指针,不需要读取 + need_read_ = false; + } + else + { + po_->ClearBeforeIdx(legacy_size_); + pi_->ClearBeforeIdx(clear_before_idx_ - legacy_size_); + if (pi_->handle_last_read()) // 上次读取有一个read没放入缓存 + { + last_read_num_ += 1; + legacy_size_ = pi_->Size(); // 应该只有一个read + need_read_ = true; + } + else // 没空间存放,则不交换指针,或者文件已经读取完毕 + { + legacy_size_ = 0; + need_read_ = false; + } + } + clear_before_idx_ = 0; + } +} \ No newline at end of file diff --git a/src/common/hts/bam_buf.h b/src/common/hts/bam_buf.h new file mode 100644 index 0000000..ae06f77 --- /dev/null +++ b/src/common/hts/bam_buf.h @@ -0,0 +1,169 @@ +/* + Description: 读入sam/bam时,开辟一个大的buf,存放这些数据 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/11/27 +*/ + +#ifndef BAM_BUF_H_ +#define BAM_BUF_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include "interval.h" +#include "bam_wrap.h" + +using std::vector; +using namespace std; + +/* + * 存放读入的bam数据 + */ +struct BamBuf +{ + sam_hdr_t *hdr; // sam文件的header信息 + samFile *fp; // sam文件指针 + BamWrap *bw = nullptr; // 用来循环读入bam + uint8_t *mem = nullptr; // 用来存放bam的数据, 程序结束后自动释放,所以没在析构函数里释放 + int64_t mem_offset = 0; // 下一次要存放的位置 + int64_t mem_size; // 缓存大小 + int read_stat_ = 0; // 读取状态,是否读完 + vector bv; // 方便对bam数据的访问 + int64_t legacy_start = 0; // 没处理完的bam在mem中的起始位置, 闭区间 + int64_t legacy_end = 0; // 没处理完的bam在mem中的结束位置, 开区间 + bool handle_last = false; // 上次最后读入的bam是否需要处理 + + // 初始化缓存 + void Init(samFile *fp, + sam_hdr_t *hdr, + int64_t mem_size); + // 读取数据直到读完,或者缓冲区满 + int ReadBam(); + // 为下一次读取做准备, 计算一些边界条件 + void ClearBeforeIdx(size_t idxInBv); + // 清空上一次所有读入的数据 + void ClearAll(); + inline int64_t Size() { return bv.size(); } // 包含多少个read + inline int ReadStat() { return read_stat_; } // 文件的读取状态,是否可读(读取完全) + ~BamBuf() + { + if (this->mem != nullptr) + { + free(this->mem); + } + if (this->bw != nullptr) + { + bam_destroy1(bw->b); + free(bw); + } + } + void prepare_read(); + // 检查缓存是否还有空间 + bool has_enough_space(); + // 处理一个读取后的bam + void append_one_bam(); + // 处理上次读入的最后一个read + bool handle_last_read(); + + // 针对bv的操作 + inline BamWrap *operator[](int64_t pos) { return bv[pos]; } + inline void push_back(BamWrap *val) { bv.push_back(val); } + inline void clear() { bv.clear(); } + inline void resize(int64_t s) { bv.resize(s); } +}; + +/* + * io异步缓冲区 + */ +struct AsyncIoBamBuf +{ + BamBuf buf1_; + BamBuf buf2_; + BamBuf *pi_; // 当前用的buf + BamBuf *po_; // 后台在读取的buf + pthread_t *tid_ = NULL; + int64_t legacy_size_ = 0; // 上一轮运算之后,缓存中还剩余的上次读取的read数量 + bool first_read_ = true; + int last_read_num_ = 0; // 上一次读取了多少reads + bool need_read_ = true; + bool use_async_io_ = true; + int64_t clear_before_idx_ = 0; // 用户异步读取,下一轮读取之前清理掉clear_before_idx_之前的所有reads + bool clear_all_ = false; // 用于异步读取,下一轮读取之前清理掉之前的所有reads + + vector bam_arr_; // 用来访问buf中的bam + + AsyncIoBamBuf() {} + AsyncIoBamBuf(bool use_async) : use_async_io_(use_async) {} + // 析构 + ~AsyncIoBamBuf() + { + if (tid_ != NULL) + { + pthread_join(*tid_, 0); + free(tid_); + } + // 其他的内存就等程序结束自动释放 + // buf的析构函数会自动调用 + } + + // 初始化缓存 + void Init(samFile *fp, + sam_hdr_t *hdr, + int64_t mem_size); + + // 读取数据 + int ReadBam(); + // 为下一次读取做准备, 计算一些边界条件 + void ClearBeforeIdx(size_t idxInBv); + // 清空上一次所有读入的数据 + void ClearAll(); + // 包含的read数量 + inline int64_t Size() { return legacy_size_ + pi_->Size(); } + inline int ReadStat() { return pi_->read_stat_; } + inline BamWrap *operator[](int64_t pos) + { + return bam_arr_[pos]; + } + + // 同步读取 + int sync_read_bam(); + // 异步读取 + int async_read_bam(); + // 异步读取线程函数 + static void *async_read(void *data); + void resize_buf(); + inline void refresh_bam_arr() + { + if (this->Size() != bam_arr_.size()) + { + bam_arr_.resize(this->Size()); + for (int i = 0; i < bam_arr_.size(); ++i) + { + if (i < legacy_size_) + bam_arr_[i] = (*po_)[i]; + else + bam_arr_[i] = (*pi_)[i - legacy_size_]; + } + } + } +}; + +typedef AsyncIoBamBuf BamBufType; + +typedef vector BamArray; + +#endif diff --git a/src/common/hts/bam_wrap.h b/src/common/hts/bam_wrap.h new file mode 100644 index 0000000..4d61848 --- /dev/null +++ b/src/common/hts/bam_wrap.h @@ -0,0 +1,338 @@ +/* + Description: 读入sam/bam时,开辟一个大的buf,存放这些数据 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/11/27 +*/ + +#ifndef BAM_WRAP_H_ +#define BAM_WRAP_H_ +#include +#include +#include +#include + +#include +#include + +#include + +using namespace std; + +/* + 这里的成员函数命名有点混乱,特此说明,小写加下划线的函数命名,无论是静态函数,还是普通成员函数,更侧重说明 + 这是类似bam的一个属性,而大写加驼峰命名的函数,更侧重说明这是通过计算得出的。 +*/ +/* + * sam read的封装 + */ +struct BamWrap +{ + // 将contig左移后加上pos作为全局位置 + const static int MAX_CONTIG_LEN_SHIFT = 30; + const static int READ_MAX_LENGTH = 200; + const static int READ_MAX_DEPTH = 1000; // 这只是用来初始化空间用的,深度大于这个值也没关系 + + // 成员变量尽量少,减少占用内存空间 + bam1_t *b; + int64_t end_pos_; // bam的全局结束位置, 闭区间 + + // 全局开始位置 + inline int64_t start_pos() + { + return bam_global_pos(b); + } + // 全局结束位置 + inline int64_t end_pos() + { + return end_pos_; + } + // 和reference对应的序列长度 + inline int16_t read_len() + { + return (end_pos_ - start_pos() + 1); + } + + // 在contig内的开始位置 + inline int32_t contig_pos() + { + return b->core.pos; + } + // 在contig内部的结束位置 + inline int32_t contig_end_pos() + { + return bam_pos(end_pos_); + } + // 序列的长度(AGTC字母个数) + inline int16_t seq_len() + { + return b->core.l_qseq; + } + + // 算上开头的softclip + inline int32_t softclip_start() + { + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + const char c = bam_cigar_opchr(cigar[0]); + const int len = bam_cigar_oplen(cigar[0]); + if (c == 'S') + return bc.pos - len; + return bc.pos; + } + + // 算上结尾的softclip + inline int32_t softclip_end() + { + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + const char c = bam_cigar_opchr(cigar[bc.n_cigar - 1]); + const int len = bam_cigar_oplen(cigar[bc.n_cigar - 1]); + if (c == 'S') + return bam_pos(end_pos_) + len; + return bam_pos(end_pos_); + } + + // 算上结尾的softclip + inline int32_t right_softclip_len() + { + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + const char c = bam_cigar_opchr(cigar[bc.n_cigar - 1]); + const int len = bam_cigar_oplen(cigar[bc.n_cigar - 1]); + if (c == 'S') + return len; + return 0; + } + + // 获取序列 + inline std::string sequence() + { + ostringstream oss; + char *seq = (char *)bam_get_seq(b); + const bam1_core_t &bc = b->core; + const char base_to_char[16] = {'N', 'A', 'C', 'N', 'G', 'N', 'N', 'N', 'T', 'N', 'N', 'N', 'N', 'N', 'N', 'N'}; + for (int i = 0; i < bc.l_qseq; ++i) + { + char base = base_to_char[bam_seqi(seq, i)]; + oss << base; + } + return oss.str(); + } + + // 获取名字 + inline std::string query_name() + { + return bam_get_qname(b); + } + // 获取cigar 字符串 + inline string cigar_str() + { + ostringstream oss; + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + for (int i = 0; i < bc.n_cigar; ++i) + { + const char c = bam_cigar_opchr(cigar[i]); + const int len = bam_cigar_oplen(cigar[i]); + oss << len << c; + } + return oss.str(); + } + + // 占用的内存大小 + inline int16_t length() + { + return sizeof(*this) + + sizeof(bam1_t) + + b->l_data; + } + + // 获取cigar中insert的总长度 + inline int32_t insert_cigar_len() + { + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + int ret = 0; + for (int i = 0; i < bc.n_cigar; ++i) + { + const char c = bam_cigar_opchr(cigar[i]); + const int len = bam_cigar_oplen(cigar[i]); + if (c == 'I') + ret += len; + } + return ret; + } + + // 获取cigar中delete的总长度 + inline int32_t del_cigar_len() + { + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + int ret = 0; + for (int i = 0; i < bc.n_cigar; ++i) + { + const char c = bam_cigar_opchr(cigar[i]); + const int len = bam_cigar_oplen(cigar[i]); + if (c == 'D') + ret += len; + } + return ret; + } + + // 计算sam read的终点位置 + static inline int64_t BamEndPos(const bam1_t *b) + { + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + int start_offset = -1; + for (int i = 0; i < bc.n_cigar; ++i) + { + const char c = bam_cigar_opchr(cigar[i]); + const int len = bam_cigar_oplen(cigar[i]); + if (c == 'D' || c == 'N' || c == 'M' || c == '=' || c == 'X') + start_offset += len; + } + return (((int64_t)b->core.tid << MAX_CONTIG_LEN_SHIFT) | (int64_t)(b->core.pos + start_offset)); + }; + + bool HasWellDefinedFragmentSize() + { + const bam1_core_t &bc = b->core; + bool hasWellDefinedFragmentSize = true; + if (bc.isize == 0 || + !(bc.flag & BAM_FPAIRED) || + ((bc.flag & BAM_FUNMAP) || (bc.flag & BAM_FMUNMAP)) || + ((bool)(bc.flag & BAM_FREVERSE) == (bool)(bc.flag & BAM_FMREVERSE))) + { + hasWellDefinedFragmentSize = false; + } + else if (bc.flag & BAM_FREVERSE) + { + hasWellDefinedFragmentSize = contig_end_pos() > bc.mpos ? true : false; + } + else + { + hasWellDefinedFragmentSize = bc.pos <= bc.mpos + bc.isize ? true : false; + } + return hasWellDefinedFragmentSize; + } + + // 计算bam的adapterBoundary + int GetAdapterBoundary() + { + const bam1_core_t &bc = b->core; + int adapterBoundary; + if (!HasWellDefinedFragmentSize()) + adapterBoundary = INT_MIN; + else if (bc.flag & BAM_FREVERSE) + adapterBoundary = bc.mpos - 1; + else + adapterBoundary = bc.pos + abs(bc.isize); // GATK4.0 和 GATK3.5不一样,3.5的这里+1 + return adapterBoundary; + } + + // 获取开头的I的长度 + inline int GetHeadInsertLen() + { + int insLen = 0; + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + for (int i = 0; i < bc.n_cigar; ++i) + { + const char c = bam_cigar_opchr(cigar[i]); + const int len = bam_cigar_oplen(cigar[i]); + if (c == 'I') + { + insLen = len; + break; + } + else if (c != 'H' && c != 'S') + break; + } + return insLen; + } + + // 获取soft clip开始位置(能处理H和S相连的情况,有这种情况么?, 注意开头的I要当做S?) + inline int64_t GetSoftStart() + { + int64_t softStart = b->core.pos; + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + for (int i = 0; i < bc.n_cigar; ++i) + { + const char c = bam_cigar_opchr(cigar[i]); + const int len = bam_cigar_oplen(cigar[i]); + if (c == 'S' || c == 'I') + softStart -= len; + else if (c != 'H') + break; + } + return softStart; + } + + // 获取unclipped开始位置(包括hardclip) + inline int64_t GetUnclippedStart() + { + int64_t start = b->core.pos; + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + for (int i = 0; i < bc.n_cigar; ++i) + { + const char c = bam_cigar_opchr(cigar[i]); + const int len = bam_cigar_oplen(cigar[i]); + if (c == 'S' || c == 'H') + start -= len; + else + break; + } + return start; + } + + // 获取unclipped结束位置(包括hardclip) + inline int64_t GetUnclippedEnd() + { + int64_t end_pos = bam_endpos(b); + const uint32_t *cigar = bam_get_cigar(b); + const bam1_core_t &bc = b->core; + for (int i = bc.n_cigar; i >= 0; --i) + { + const char c = bam_cigar_opchr(cigar[i]); + const int len = bam_cigar_oplen(cigar[i]); + if (c == 'S' || c == 'H') + end_pos += len; + else + break; + } + return end_pos; + } + + // 计算bam的全局位置,算上染色体序号和比对位置 + static inline int64_t bam_global_pos(bam1_t *b) + { + return (((int64_t)b->core.tid << MAX_CONTIG_LEN_SHIFT) | (int64_t)b->core.pos); + } + static inline int64_t bam_global_pos(int tid, int pos) + { + return (((int64_t)tid << MAX_CONTIG_LEN_SHIFT) | (int64_t)pos); + } + // 根据全局位置获取bam的染色体序号 + static inline int32_t bam_tid(int64_t global_pos) + { + const int64_t mask = ~(((int64_t)1 << MAX_CONTIG_LEN_SHIFT) - 1); + const int64_t high_tid = global_pos & mask; + return (int32_t)(high_tid >> MAX_CONTIG_LEN_SHIFT); + } + // 根据全局位置获取bam的比对位置(染色体内) + static inline int32_t bam_pos(int64_t global_pos) + { + const int64_t mask = ((int64_t)1 << MAX_CONTIG_LEN_SHIFT) - 1; + return (int32_t)(global_pos & mask); + } +}; + +typedef std::map> SampleBamMap; + +#endif \ No newline at end of file diff --git a/src/common/hts/interval.cpp b/src/common/hts/interval.cpp new file mode 100755 index 0000000..acb5bc6 --- /dev/null +++ b/src/common/hts/interval.cpp @@ -0,0 +1,295 @@ +/* + Description: 处理intervals + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/11/24 +*/ + +#include "interval.h" + +#include +#include +#include +#include +#include + +#include + +#include "../utils/util.h" +#include "bam_wrap.h" + +using std::min; +using std::max; +using std::string; +using std::ifstream; +using std::stringstream; + +using namespace std; + +// 构造函数 +Interval::Interval() : Interval(0, 0) {} +Interval::Interval(int64_t l, int64_t r) : left(l), right(r) {} + +// 比较函数 +bool Interval::operator<(const Interval& other) { + if (left == other.left) { + return right < other.right; + } + return left < other.left; +} + +// 是否有重叠 +bool Interval::overlaps(const Interval &other) { + return left <= other.right && right >= other.left; +} + +// 两个interval的合并 +Interval& Interval::spanWith(const Interval &other) { + left = min(left, other.left); + right = max(right, other.right); + return *this; +} + +// 返回两个interval的交集,不改变当前interval +Interval Interval::intersect(const Interval &that) const { + Interval val; + val.left = max(left, that.left); + val.right = min(right, that.right); + return val; +} + + +/* + * 合并两个interval arr,取相交区域的交集, interval arr都是排序后的 + */ +void Interval::IntersectIntervals(const IntervalArr &a_arr, + const IntervalArr &b_arr, + IntervalArr *r_arr) { + if (a_arr.size() < 1 || b_arr.size() < 1) return; + int ai=0, bi=0; + const Interval *last, *cur; + if (a_arr[ai].left < b_arr[bi].left) last = &a_arr[ai++]; + else last = &b_arr[bi++]; + while (ai < a_arr.size() && bi < b_arr.size()) { + if (a_arr[ai].left < b_arr[bi].left) cur = &a_arr[ai++]; + else cur = &b_arr[bi++]; + if (last->right < cur->left) { + last = cur; continue; + } else if (last->right > cur->right) { + r_arr->push_back(*cur); + } else { + r_arr->push_back(Interval(cur->left, last->right)); + last = cur; + } + } + const IntervalArr *arrp; + int ii; + if (ai < a_arr.size()) { arrp = &a_arr; ii = ai;} + else { arrp = &b_arr; ii = bi; } + const IntervalArr &arr = *arrp; + while(ii < arr.size()) { + cur = &arr[ii++]; + if (last->right < cur->left) { + break; + } else if (last->right > cur->right) { + r_arr->push_back(*cur); + } else { + r_arr->push_back(Interval(cur->left, last->right)); + break; + } + } +} + +/* + * 合并两个interval arr,取并集 + */ +void Interval::UnionIntervals(const IntervalArr &a_arr, + const IntervalArr &b_arr, + IntervalArr *r_arr) { + Interval tmp; + const Interval *cur; + Interval *last; + int ai=0, bi=0; + if (a_arr.size() < 1) { *r_arr = b_arr; return; } + if (b_arr.size() < 1) { *r_arr = a_arr; return; } + r_arr->clear(); + + if (a_arr[ai].left < b_arr[bi].left) tmp = a_arr[ai++]; + else tmp = b_arr[bi++]; + last = &tmp; + while(ai < a_arr.size() && bi < b_arr.size()) { + if (a_arr[ai].left < b_arr[bi].left) cur = &a_arr[ai++]; + else cur = &b_arr[bi++]; + if (last->right < cur->left) { + r_arr->push_back(*last); + *last = *cur; + } else { + last->right = max(last->right, cur->right); + } + } + const IntervalArr *arrp; + int ii; + if (ai < a_arr.size()) { arrp = &a_arr; ii = ai; } + else { arrp = &b_arr; ii = bi; } + const IntervalArr &arr = *arrp; + + while(ii < arr.size()) { + cur = &arr[ii++]; + if (last->right < cur->left) { + r_arr->push_back(*last); + *last = *cur; + } else { + last->right = max(last->right, cur->right); + } + } + r_arr->push_back(*last); +} + +/* + * 将有read覆盖的区域和参数提供的interval文件中的区域做一个交集 + */ +int64_t Interval::MergeIntervals(const IntervalArr &n_arr, + const IntervalArr &t_arr, + IntervalArr &in_arr, + int64_t start_loc, // 闭区间 + int64_t *end_loc, // 开区间 + IntervalArr *r_arr) { + IntervalArr tmp_arr; + const int64_t end_loc_val = *end_loc; + if (in_arr.size() < 1) { // 如果输入的interval为空,则使用tumor normal覆盖的interval + UnionIntervals(n_arr, t_arr, &tmp_arr); + } else { + IntervalArr mid_arr; + UnionIntervals(n_arr, t_arr, &mid_arr); + IntersectIntervals(mid_arr, in_arr, &tmp_arr); + } + for(int i=tmp_arr.size()-1; i>=0; --i) { + if (tmp_arr[i].left >= end_loc_val) { + tmp_arr.pop_back(); // 删除该元素 + continue; + } + tmp_arr[i].right = min(tmp_arr[i].right, end_loc_val - 1); // end_loc是开区间 + break; + } + for (int i=0; ipush_back(Interval(start_loc, tmp_arr[i].right)); + } else { + r_arr->push_back(tmp_arr[i]); + } + } + + int next_i = 0; + while(next_i < in_arr.size() && in_arr[next_i].right < end_loc_val) ++next_i; + if (next_i < in_arr.size()) { + if (end_loc_val < in_arr[next_i].left) { + *end_loc = in_arr[next_i].left; // 更新本次处理的终点 + } else { + in_arr[next_i].left = end_loc_val; // 更新panel + } + int i=0, j=next_i; + for (; jsize(); ++i) { + locus_num += (*r_arr)[i].right - (*r_arr)[i].left + 1; + } + return locus_num; +} + +/* + * 读取interval文件 + */ +void Interval::ReadInterval(const string &interval_fn, + bam_hdr_t* header, + int interval_padding, + IntervalArr *r_arr) { + ifstream interval_fs(interval_fn); + string one_line; + IntervalArr intervals; + getline(interval_fs, one_line); + while (!interval_fs.eof()) { + if (one_line[0] == '@') { + getline(interval_fs, one_line); + continue; + } + stringstream ss_line(one_line); + string contig_name; + ss_line >> contig_name; + int itid = sam_hdr_name2tid(header, contig_name.c_str()); + if (itid < 0) Error("[%s] interval file has unknown contig name [%s]\n", __func__, contig_name.c_str()); + int64_t tid = (int64_t)itid; + tid <<= CONTIG_SHIFT; + int64_t start, stop; + ss_line >> start >> stop; + // interval文件是1-based,所以这里要减去1 + intervals.push_back(Interval(tid + start - 1, tid + stop -1)); + getline(interval_fs, one_line); + } + sort(intervals.begin(), intervals.end()); + if (intervals.size() > 0) { + Interval new_span(intervals[0].left-interval_padding, intervals[0].right+interval_padding); + for (int i=1; i new_span.right) { + r_arr->push_back(new_span); + new_span.left = intervals[i].left - interval_padding; + new_span.right = intervals[i].right + interval_padding; + } else { + new_span.right = max(new_span.right, intervals[i].right + interval_padding); + } + } + r_arr->push_back(new_span); + } + interval_fs.close(); +} + +/* + * 将interval相连的区域合并 + */ +void Interval::ShrinkInterval(IntervalArr *ivap) { + if (ivap->size() < 1) return; + IntervalArr &iva = *ivap; + IntervalArr tiva = iva; + iva.clear(); + Interval iv; + iv.left = tiva[0].left; + iv.right = tiva[0].right; + for (int i=1; itarget_len[tid]; + result.left = max(BamWrap::bam_global_pos(tid, 0), ext_left); + result.right = min(ext_right, contig_len - 1 + BamWrap::bam_global_pos(tid, 0)); + + return result; +} + diff --git a/src/common/hts/interval.h b/src/common/hts/interval.h new file mode 100755 index 0000000..167fae8 --- /dev/null +++ b/src/common/hts/interval.h @@ -0,0 +1,101 @@ +/* + Description: 处理intervals + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/11/24 +*/ + +#ifndef INTERVAL_H_ +#define INTERVAL_H_ + +#include +#include +#include +#include + +#include + +#include "bam_wrap.h" + +using namespace std; + +// 前向声明 +class Interval; +typedef std::vector IntervalArr; +/* + * 闭区间 + */ +struct Interval { + // const常量 + const static int CONTIG_SHIFT = 30; + + // 类变量 + int64_t left; + int64_t right; + + // 构造函数 + Interval(); + explicit Interval(int64_t l, int64_t r); + // 比较函数 + bool operator<(const Interval &other); + // 是否有重叠 + bool overlaps(const Interval &other); + // 两个interval的合并, 会改变当前interval + Interval& spanWith(const Interval &other); + // 返回两个interval的交集,不改变当前interval + Interval intersect(const Interval &that) const; + + // for debug + string toString() const { + ostringstream oss; + oss << BamWrap::bam_tid(left) + 1 << ":" + << BamWrap::bam_pos(left) + 1 << "-" + << BamWrap::bam_pos(right) + 1; + + return oss.str(); + } + /* + * 合并两个interval arr,取相交区域的交集, interval arr都是排序后的 + */ + static void IntersectIntervals(const IntervalArr &a_arr, + const IntervalArr &b_arr, + IntervalArr *r_arr); + /* + * 合并两个interval arr,相交的区域取并集 + */ + static void UnionIntervals(const IntervalArr &a_arr, + const IntervalArr &b_arr, + IntervalArr *r_arr); + /* + * 将有read覆盖的区域和参数提供的interval文件中的区域做一个交集 + */ + static int64_t MergeIntervals(const IntervalArr &n_arr, + const IntervalArr &t_arr, + IntervalArr &in_arr, // 会更改 + int64_t start_loc, // 闭区间 + int64_t *end_loc, // 开区间, 会更改 + IntervalArr *r_arr); + /* + * 读取interval文件 + */ + static void ReadInterval(const std::string &interval_fn, + bam_hdr_t* header, + int interval_padding, + IntervalArr *r_arr); + /* + * 将interval相连的区域合并 + */ + static void ShrinkInterval(IntervalArr *iva); + + /* + * 根据header信息,扩展interval + */ + static Interval ExpandInterval(int64_t start, int64_t end, int expandVal, bam_hdr_t* header); +}; + + +#endif + + diff --git a/src/common/hts/read_ends.h b/src/common/hts/read_ends.h new file mode 100644 index 0000000..985efd2 --- /dev/null +++ b/src/common/hts/read_ends.h @@ -0,0 +1,59 @@ +/* +Description: read ends结构体主要用来标记冗余,包含一些序列的测序过程中的物理信息等 + +Copyright : All right reserved by ICT + +Author : Zhang Zhonghai +Date : 2023/11/3 +*/ + +#ifndef READ_ENDS_H_ +#define READ_ENDS_H_ + +#include + +/* 包含了所有read ends信息,如picard里边的 ReadEndsForMarkDuplicates*/ +struct ReadEnds +{ + /* PhysicalLocationInt中的成员变量 */ + /** + * Small class that provides access to the physical location information about a cluster. + * All values should be defaulted to -1 if unavailable. Tile should only allow + * non-zero positive integers, x and y coordinates must be non-negative. + * This is different from PhysicalLocationShort in that the x and y positions are ints, not shorts + * thus, they do not overflow within a HiSeqX tile. + */ + int16_t tile = -1; + int32_t x = -1; + int32_t y = -1; + + /* ReadEnds中的成员变量 */ + /** Little struct-like class to hold read pair (and fragment) end data for duplicate marking. */ + static const int8_t F = 0, R = 1, FF = 2, FR = 3, RR = 4, RF = 5; + int16_t libraryId; + int8_t orientation; + int32_t read1ReferenceIndex = -1; + int32_t read1Coordinate = -1; + int32_t read2ReferenceIndex = -1; + int32_t read2Coordinate = -1; // This field is overloaded for flow based processing as the end coordinate of read 1. (paired reads not supported) + /* Additional information used to detect optical dupes */ + int16_t readGroup = -1; + /** For optical duplicate detection the orientation matters regard to 1st or 2nd end of a mate */ + int8_t orientationForOpticalDuplicates = -1; + /** A *transient* flag marking this read end as being an optical duplicate. */ + bool isOpticalDuplicate = false; + + /* ReadEndsForMarkDuplicates中的成员变量 */ + /** Little struct-like class to hold read pair (and fragment) end data for MarkDuplicatesWithMateCigar **/ + int16_t score = 0; + int64_t read1IndexInFile = -1; + int64_t read2IndexInFile = -1; + int64_t duplicateSetSize = -1; + + /* ReadEndsForMarkDuplicatesWithBarcodes中的成员变量 (好像用不到) */ + int32_t barcode = 0; // primary barcode for this read (and pair) + int32_t readOneBarcode = 0; // read one barcode, 0 if not present + int32_t readTwoBarcode = 0; // read two barcode, 0 if not present or not paired +}; + +#endif \ No newline at end of file diff --git a/src/common/global_arg.cpp b/src/common/utils/global_arg.cpp similarity index 100% rename from src/common/global_arg.cpp rename to src/common/utils/global_arg.cpp diff --git a/src/common/global_arg.h b/src/common/utils/global_arg.h similarity index 100% rename from src/common/global_arg.h rename to src/common/utils/global_arg.h diff --git a/src/common/utils/thpool.cpp b/src/common/utils/thpool.cpp new file mode 100644 index 0000000..407a6ad --- /dev/null +++ b/src/common/utils/thpool.cpp @@ -0,0 +1,554 @@ +/* ******************************** + * Author: Johan Hanssen Seferidis + * License: MIT + * Description: Library providing a threading pool where you can add + * work. For usage, check the thpool.h file or README.md + * + */ +/** @file thpool.h */ /* + * + ********************************/ + +#if defined(__APPLE__) +#include +#else +#ifndef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 200809L +#endif +#endif +#include +#include +#include +#include +#include +#include +#include +#if defined(__linux__) +#include +#endif + +#include "thpool.h" + +#ifdef THPOOL_DEBUG +#define THPOOL_DEBUG 1 +#else +#define THPOOL_DEBUG 0 +#endif + +#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG) +#define err(str) fprintf(stderr, str) +#else +#define err(str) +#endif + +static volatile int threads_keepalive; +static volatile int threads_on_hold; + +/* ========================== STRUCTURES ============================ */ + +/* Binary semaphore */ +typedef struct bsem +{ + pthread_mutex_t mutex; + pthread_cond_t cond; + int v; +} bsem; + +/* Job */ +typedef struct job +{ + struct job *prev; /* pointer to previous job */ + void (*function)(void *arg); /* function pointer */ + void *arg; /* function's argument */ +} job; + +/* Job queue */ +typedef struct jobqueue +{ + pthread_mutex_t rwmutex; /* used for queue r/w access */ + job *front; /* pointer to front of queue */ + job *rear; /* pointer to rear of queue */ + bsem *has_jobs; /* flag as binary semaphore */ + int len; /* number of jobs in queue */ +} jobqueue_t; + +/* Thread */ +typedef struct thread +{ + int id; /* friendly id */ + pthread_t pthread; /* pointer to actual thread */ + struct thpool_ *thpool_p; /* access to thpool */ +} thread; + +/* Threadpool */ +typedef struct thpool_ +{ + thread **threads; /* pointer to threads */ + volatile int num_threads_alive; /* threads currently alive */ + volatile int num_threads_working; /* threads currently working */ + pthread_mutex_t thcount_lock; /* used for thread count etc */ + pthread_cond_t threads_all_idle; /* signal to thpool_wait */ + jobqueue_t jobqueue; /* job queue */ +} thpool_; + +/* ========================== PROTOTYPES ============================ */ + +static int thread_init(thpool_ *thpool_p, struct thread **thread_p, int id); +static void *thread_do(struct thread *thread_p); +static void thread_hold(int sig_id); +static void thread_destroy(struct thread *thread_p); + +static int jobqueue_init(jobqueue_t *jobqueue_p); +static void jobqueue_clear(jobqueue_t *jobqueue_p); +static void jobqueue_push(jobqueue_t *jobqueue_p, struct job *newjob_p); +static struct job *jobqueue_pull(jobqueue_t *jobqueue_p); +static void jobqueue_destroy(jobqueue_t *jobqueue_p); + +static void bsem_init(struct bsem *bsem_p, int value); +static void bsem_reset(struct bsem *bsem_p); +static void bsem_post(struct bsem *bsem_p); +static void bsem_post_all(struct bsem *bsem_p); +static void bsem_wait(struct bsem *bsem_p); + +/* ========================== THREADPOOL ============================ */ + +/* Initialise thread pool */ +struct thpool_ *thpool_init(int num_threads) +{ + + threads_on_hold = 0; + threads_keepalive = 1; + + if (num_threads < 0) + { + num_threads = 0; + } + + /* Make new thread pool */ + thpool_ *thpool_p; + thpool_p = (struct thpool_ *)malloc(sizeof(struct thpool_)); + if (thpool_p == NULL) + { + err("thpool_init(): Could not allocate memory for thread pool\n"); + return NULL; + } + thpool_p->num_threads_alive = 0; + thpool_p->num_threads_working = 0; + + /* Initialise the job queue */ + if (jobqueue_init(&thpool_p->jobqueue) == -1) + { + err("thpool_init(): Could not allocate memory for job queue\n"); + free(thpool_p); + return NULL; + } + + /* Make threads in pool */ + thpool_p->threads = (struct thread **)malloc(num_threads * sizeof(struct thread *)); + if (thpool_p->threads == NULL) + { + err("thpool_init(): Could not allocate memory for threads\n"); + jobqueue_destroy(&thpool_p->jobqueue); + free(thpool_p); + return NULL; + } + + pthread_mutex_init(&(thpool_p->thcount_lock), NULL); + pthread_cond_init(&thpool_p->threads_all_idle, NULL); + + /* Thread init */ + int n; + for (n = 0; n < num_threads; n++) + { + thread_init(thpool_p, &thpool_p->threads[n], n); +#if THPOOL_DEBUG + printf("THPOOL_DEBUG: Created thread %d in pool \n", n); +#endif + } + + /* Wait for threads to initialize */ + while (thpool_p->num_threads_alive != num_threads) + { + } + + return thpool_p; +} + +/* Add work to the thread pool */ +int thpool_add_work(thpool_ *thpool_p, void (*function_p)(void *), void *arg_p) +{ + job *newjob; + + newjob = (struct job *)malloc(sizeof(struct job)); + if (newjob == NULL) + { + err("thpool_add_work(): Could not allocate memory for new job\n"); + return -1; + } + + /* add function and argument */ + newjob->function = function_p; + newjob->arg = arg_p; + + /* add job to queue */ + jobqueue_push(&thpool_p->jobqueue, newjob); + + return 0; +} + +/* Wait until all jobs have finished */ +void thpool_wait(thpool_ *thpool_p) +{ + pthread_mutex_lock(&thpool_p->thcount_lock); + while (thpool_p->jobqueue.len || thpool_p->num_threads_working) + { + pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock); + } + pthread_mutex_unlock(&thpool_p->thcount_lock); +} + +/* Destroy the threadpool */ +void thpool_destroy(thpool_ *thpool_p) +{ + /* No need to destroy if it's NULL */ + if (thpool_p == NULL) + return; + + volatile int threads_total = thpool_p->num_threads_alive; + + /* End each thread 's infinite loop */ + threads_keepalive = 0; + + /* Give one second to kill idle threads */ + double TIMEOUT = 1.0; + time_t start, end; + double tpassed = 0.0; + time(&start); + while (tpassed < TIMEOUT && thpool_p->num_threads_alive) + { + bsem_post_all(thpool_p->jobqueue.has_jobs); + time(&end); + tpassed = difftime(end, start); + } + + /* Poll remaining threads */ + while (thpool_p->num_threads_alive) + { + bsem_post_all(thpool_p->jobqueue.has_jobs); + sleep(1); + } + + /* Job queue cleanup */ + jobqueue_destroy(&thpool_p->jobqueue); + /* Deallocs */ + int n; + for (n = 0; n < threads_total; n++) + { + thread_destroy(thpool_p->threads[n]); + } + free(thpool_p->threads); + free(thpool_p); +} + +/* Pause all threads in threadpool */ +void thpool_pause(thpool_ *thpool_p) +{ + int n; + for (n = 0; n < thpool_p->num_threads_alive; n++) + { + pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1); + } +} + +/* Resume all threads in threadpool */ +void thpool_resume(thpool_ *thpool_p) +{ + // resuming a single threadpool hasn't been + // implemented yet, meanwhile this suppresses + // the warnings + (void)thpool_p; + + threads_on_hold = 0; +} + +int thpool_num_threads_working(thpool_ *thpool_p) +{ + return thpool_p->num_threads_working; +} + +/* ============================ THREAD ============================== */ + +/* Initialize a thread in the thread pool + * + * @param thread address to the pointer of the thread to be created + * @param id id to be given to the thread + * @return 0 on success, -1 otherwise. + */ +static int thread_init(thpool_ *thpool_p, struct thread **thread_p, int id) +{ + + *thread_p = (struct thread *)malloc(sizeof(struct thread)); + if (*thread_p == NULL) + { + err("thread_init(): Could not allocate memory for thread\n"); + return -1; + } + + (*thread_p)->thpool_p = thpool_p; + (*thread_p)->id = id; + + pthread_create(&(*thread_p)->pthread, NULL, (void *(*)(void *))thread_do, (*thread_p)); + pthread_detach((*thread_p)->pthread); + return 0; +} + +/* Sets the calling thread on hold */ +static void thread_hold(int sig_id) +{ + (void)sig_id; + threads_on_hold = 1; + while (threads_on_hold) + { + sleep(1); + } +} + +/* What each thread is doing + * + * In principle this is an endless loop. The only time this loop gets interuppted is once + * thpool_destroy() is invoked or the program exits. + * + * @param thread thread that will run this function + * @return nothing + */ +static void *thread_do(struct thread *thread_p) +{ + + /* Set thread name for profiling and debugging */ + char thread_name[16] = {0}; + snprintf(thread_name, 16, "thpool-%d", thread_p->id); + +#if defined(__linux__) + /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */ + prctl(PR_SET_NAME, thread_name); +#elif defined(__APPLE__) && defined(__MACH__) + pthread_setname_np(thread_name); +#else + err("thread_do(): pthread_setname_np is not supported on this system"); +#endif + + /* Assure all threads have been created before starting serving */ + thpool_ *thpool_p = thread_p->thpool_p; + + /* Register signal handler */ + struct sigaction act; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + act.sa_handler = thread_hold; + if (sigaction(SIGUSR1, &act, NULL) == -1) + { + err("thread_do(): cannot handle SIGUSR1"); + } + + /* Mark thread as alive (initialized) */ + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_alive += 1; + pthread_mutex_unlock(&thpool_p->thcount_lock); + + while (threads_keepalive) + { + + bsem_wait(thpool_p->jobqueue.has_jobs); + + if (threads_keepalive) + { + + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_working++; + pthread_mutex_unlock(&thpool_p->thcount_lock); + + /* Read job from queue and execute it */ + void (*func_buff)(void *); + void *arg_buff; + job *job_p = jobqueue_pull(&thpool_p->jobqueue); + if (job_p) + { + func_buff = job_p->function; + arg_buff = job_p->arg; + func_buff(arg_buff); + free(job_p); + } + + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_working--; + if (!thpool_p->num_threads_working) + { + pthread_cond_signal(&thpool_p->threads_all_idle); + } + pthread_mutex_unlock(&thpool_p->thcount_lock); + } + } + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_alive--; + pthread_mutex_unlock(&thpool_p->thcount_lock); + + return NULL; +} + +/* Frees a thread */ +static void thread_destroy(thread *thread_p) +{ + free(thread_p); +} + +/* ============================ JOB QUEUE =========================== */ + +/* Initialize queue */ +static int jobqueue_init(jobqueue_t *jobqueue_p) +{ + jobqueue_p->len = 0; + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + + jobqueue_p->has_jobs = (struct bsem *)malloc(sizeof(struct bsem)); + if (jobqueue_p->has_jobs == NULL) + { + return -1; + } + + pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); + bsem_init(jobqueue_p->has_jobs, 0); + + return 0; +} + +/* Clear the queue */ +static void jobqueue_clear(jobqueue_t *jobqueue_p) +{ + + while (jobqueue_p->len) + { + free(jobqueue_pull(jobqueue_p)); + } + + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + bsem_reset(jobqueue_p->has_jobs); + jobqueue_p->len = 0; +} + +/* Add (allocated) job to queue + */ +static void jobqueue_push(jobqueue_t *jobqueue_p, struct job *newjob) +{ + + pthread_mutex_lock(&jobqueue_p->rwmutex); + newjob->prev = NULL; + + switch (jobqueue_p->len) + { + + case 0: /* if no jobs in queue */ + jobqueue_p->front = newjob; + jobqueue_p->rear = newjob; + break; + + default: /* if jobs in queue */ + jobqueue_p->rear->prev = newjob; + jobqueue_p->rear = newjob; + } + jobqueue_p->len++; + + bsem_post(jobqueue_p->has_jobs); + pthread_mutex_unlock(&jobqueue_p->rwmutex); +} + +/* Get first job from queue(removes it from queue) + * Notice: Caller MUST hold a mutex + */ +static struct job *jobqueue_pull(jobqueue_t *jobqueue_p) +{ + + pthread_mutex_lock(&jobqueue_p->rwmutex); + job *job_p = jobqueue_p->front; + + switch (jobqueue_p->len) + { + + case 0: /* if no jobs in queue */ + break; + + case 1: /* if one job in queue */ + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + jobqueue_p->len = 0; + break; + + default: /* if >1 jobs in queue */ + jobqueue_p->front = job_p->prev; + jobqueue_p->len--; + /* more than one job in queue -> post it */ + bsem_post(jobqueue_p->has_jobs); + } + + pthread_mutex_unlock(&jobqueue_p->rwmutex); + return job_p; +} + +/* Free all queue resources back to the system */ +static void jobqueue_destroy(jobqueue_t *jobqueue_p) +{ + jobqueue_clear(jobqueue_p); + free(jobqueue_p->has_jobs); +} + +/* ======================== SYNCHRONISATION ========================= */ + +/* Init semaphore to 1 or 0 */ +static void bsem_init(bsem *bsem_p, int value) +{ + if (value < 0 || value > 1) + { + err("bsem_init(): Binary semaphore can take only values 1 or 0"); + exit(1); + } + pthread_mutex_init(&(bsem_p->mutex), NULL); + pthread_cond_init(&(bsem_p->cond), NULL); + bsem_p->v = value; +} + +/* Reset semaphore to 0 */ +static void bsem_reset(bsem *bsem_p) +{ + bsem_init(bsem_p, 0); +} + +/* Post to at least one thread */ +static void bsem_post(bsem *bsem_p) +{ + pthread_mutex_lock(&bsem_p->mutex); + bsem_p->v = 1; + pthread_cond_signal(&bsem_p->cond); + pthread_mutex_unlock(&bsem_p->mutex); +} + +/* Post to all threads */ +static void bsem_post_all(bsem *bsem_p) +{ + pthread_mutex_lock(&bsem_p->mutex); + bsem_p->v = 1; + pthread_cond_broadcast(&bsem_p->cond); + pthread_mutex_unlock(&bsem_p->mutex); +} + +/* Wait on semaphore until semaphore has value 0 */ +static void bsem_wait(bsem *bsem_p) +{ + pthread_mutex_lock(&bsem_p->mutex); + while (bsem_p->v != 1) + { + pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex); + } + bsem_p->v = 0; + pthread_mutex_unlock(&bsem_p->mutex); +} \ No newline at end of file diff --git a/src/common/utils/thpool.h b/src/common/utils/thpool.h new file mode 100644 index 0000000..8581c1a --- /dev/null +++ b/src/common/utils/thpool.h @@ -0,0 +1,179 @@ +/********************************** + * @author Johan Hanssen Seferidis + * License: MIT + * + **********************************/ + +#ifndef _THPOOL_ +#define _THPOOL_ + +#ifdef __cplusplus +extern "C" +{ +#endif + + /* =================================== API ======================================= */ + + typedef struct thpool_ *threadpool; + + /** + * @brief Initialize threadpool + * + * Initializes a threadpool. This function will not return until all + * threads have initialized successfully. + * + * @example + * + * .. + * threadpool thpool; //First we declare a threadpool + * thpool = thpool_init(4); //then we initialize it to 4 threads + * .. + * + * @param num_threads number of threads to be created in the threadpool + * @return threadpool created threadpool on success, + * NULL on error + */ + threadpool thpool_init(int num_threads); + + /** + * @brief Add work to the job queue + * + * Takes an action and its argument and adds it to the threadpool's job queue. + * If you want to add to work a function with more than one arguments then + * a way to implement this is by passing a pointer to a structure. + * + * NOTICE: You have to cast both the function and argument to not get warnings. + * + * @example + * + * void print_num(int num){ + * printf("%d\n", num); + * } + * + * int main() { + * .. + * int a = 10; + * thpool_add_work(thpool, (void*)print_num, (void*)a); + * .. + * } + * + * @param threadpool threadpool to which the work will be added + * @param function_p pointer to function to add as work + * @param arg_p pointer to an argument + * @return 0 on success, -1 otherwise. + */ + int thpool_add_work(threadpool, void (*function_p)(void *), void *arg_p); + + /** + * @brief Wait for all queued jobs to finish + * + * Will wait for all jobs - both queued and currently running to finish. + * Once the queue is empty and all work has completed, the calling thread + * (probably the main program) will continue. + * + * Smart polling is used in wait. The polling is initially 0 - meaning that + * there is virtually no polling at all. If after 1 seconds the threads + * haven't finished, the polling interval starts growing exponentially + * until it reaches max_secs seconds. Then it jumps down to a maximum polling + * interval assuming that heavy processing is being used in the threadpool. + * + * @example + * + * .. + * threadpool thpool = thpool_init(4); + * .. + * // Add a bunch of work + * .. + * thpool_wait(thpool); + * puts("All added work has finished"); + * .. + * + * @param threadpool the threadpool to wait for + * @return nothing + */ + void thpool_wait(threadpool); + + /** + * @brief Pauses all threads immediately + * + * The threads will be paused no matter if they are idle or working. + * The threads return to their previous states once thpool_resume + * is called. + * + * While the thread is being paused, new work can be added. + * + * @example + * + * threadpool thpool = thpool_init(4); + * thpool_pause(thpool); + * .. + * // Add a bunch of work + * .. + * thpool_resume(thpool); // Let the threads start their magic + * + * @param threadpool the threadpool where the threads should be paused + * @return nothing + */ + void thpool_pause(threadpool); + + /** + * @brief Unpauses all threads if they are paused + * + * @example + * .. + * thpool_pause(thpool); + * sleep(10); // Delay execution 10 seconds + * thpool_resume(thpool); + * .. + * + * @param threadpool the threadpool where the threads should be unpaused + * @return nothing + */ + void thpool_resume(threadpool); + + /** + * @brief Destroy the threadpool + * + * This will wait for the currently active threads to finish and then 'kill' + * the whole threadpool to free up memory. + * + * @example + * int main() { + * threadpool thpool1 = thpool_init(2); + * threadpool thpool2 = thpool_init(2); + * .. + * thpool_destroy(thpool1); + * .. + * return 0; + * } + * + * @param threadpool the threadpool to destroy + * @return nothing + */ + void thpool_destroy(threadpool); + + /** + * @brief Show currently working threads + * + * Working threads are the threads that are performing work (not idle). + * + * @example + * int main() { + * threadpool thpool1 = thpool_init(2); + * threadpool thpool2 = thpool_init(2); + * .. + * printf("Working threads: %d\n", thpool_num_threads_working(thpool1)); + * .. + * return 0; + * } + * + * @param threadpool the threadpool of interest + * @return integer number of threads working + */ + int thpool_num_threads_working(threadpool); + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/src/common/utils/timer.cpp b/src/common/utils/timer.cpp new file mode 100755 index 0000000..99c2677 --- /dev/null +++ b/src/common/utils/timer.cpp @@ -0,0 +1,78 @@ +/* + Description: 用来统计程序段执行的时间,测试用 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/01/30 +*/ + +#include "timer.h" + +#include +#include +#include +#include + +/* + * class Timer implementation + */ +double Timer::mseconds_elapsed() { + return (get_mseconds() -start_); +} + +double Timer::seconds_elapsed() { + return (mseconds_elapsed() / 1000); +} + +void Timer::reinit() { + start_ = get_mseconds(); +} + +void Timer::reinit_acc() { + acc_count_ = 0.0; +} + +void Timer::acc_start() { + acc_start_ = get_mseconds(); +} + +void Timer::acc_end() { + acc_count_ += get_mseconds() - acc_start_; +} + +double Timer::acc_mseconds_elapsed() { + return acc_count_; +} + +double Timer::acc_seconds_elapsed() { + return acc_count_ / 1000; +} + +void Timer::log_time(const char *desc) { + printf("[%s] ", desc); + print_current_time(); +} + +void Timer::print_current_time() { + time_t time_val; + struct tm *at; + char now[80]; + time(&time_val); + at = localtime(&time_val); + strftime(now, 79, "%Y-%m-%d %H:%M:%S", at); + puts(now); +} + +double Timer::get_mseconds() { + struct timeval tv; + gettimeofday(&tv, NULL); + return (double) 1000 * (tv.tv_sec + ((1e-6) * tv.tv_usec)); +} + +double Timer::get_seconds() { + return 1000 * get_mseconds(); +} + + +/********** end of Timer implementation *************/ diff --git a/src/common/utils/timer.h b/src/common/utils/timer.h new file mode 100755 index 0000000..88bdcde --- /dev/null +++ b/src/common/utils/timer.h @@ -0,0 +1,39 @@ +/* + Description: 用来统计程序段执行的时间,测试用 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/01/30 +*/ + +#ifndef TIMER_H_ +#define TIMER_H_ + +/* +* @brief: Record the run time of this program +*/ +class Timer { +public: + Timer() { reinit(); reinit_acc();} + double mseconds_elapsed(); + double seconds_elapsed(); + void reinit(); // restart time count + void reinit_acc(); + void acc_start(); + void acc_end(); + double acc_mseconds_elapsed(); + double acc_seconds_elapsed(); + + static void log_time(const char *desc); + static void print_current_time(); + static double get_mseconds(); + static double get_seconds(); + +private: + double start_; + double acc_start_; // used for accumulate time count + double acc_count_; +}; + +#endif diff --git a/src/common/utils/util.cpp b/src/common/utils/util.cpp new file mode 100755 index 0000000..6ffebe6 --- /dev/null +++ b/src/common/utils/util.cpp @@ -0,0 +1,89 @@ +/* + Description: 全局用到的工具 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/11/26 +*/ + +#include "util.h" + +#include +#include +#include +#include +#include +#include + +using std::vector; +using std::string; +using std::ostringstream; +using std::for_each; + +#define PRINT_INFO(info_type) \ + do \ + { \ + fprintf(stderr, info_type); \ + va_list ap; \ + va_start(ap, format); \ + vfprintf(stderr, format, ap); \ + va_end(ap); \ + } while (0) + +/* + * 打印错误信息 + */ +void Error(const char *format, ...) { + PRINT_INFO("[Error]: "); +} + +/* 警告信息 */ +void Warn(const char *format, ...) +{ + PRINT_INFO("[ Warn]: "); +} + +/* 普通信息 */ +void Info(const char *format, ...) +{ + PRINT_INFO("[ Info]: "); +} + +/* 调试信息 */ +void Debug(const char *format, ...) +{ + PRINT_INFO("[Debug]: "); +} + +/* + * string util类 + */ + +// 合并字符串数组 +void StringUtil::Join(vector& strs, + string& str, + const char sep) { + str = ""; + ostringstream oss; + for_each(strs.begin(), strs.end()-1, [&](const string& s) { + oss << s << sep; + }); + oss << strs[strs.size() - 1]; + str = oss.str(); +} + +// 合并int数组成一个字符串 +void StringUtil::Join(vector& strs, + string& str, + const char sep) { + str = ""; + ostringstream oss; + for_each(strs.begin(), strs.end()-1, [&](const int& s) { + oss << s << sep; + }); + oss << strs[strs.size()-1]; + str = oss.str(); +} + + diff --git a/src/common/utils/util.h b/src/common/utils/util.h new file mode 100755 index 0000000..34c7682 --- /dev/null +++ b/src/common/utils/util.h @@ -0,0 +1,182 @@ +/* + Description: 全局用到的的工具 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2019/11/26 +*/ +#ifndef UTIL_H_ +#define UTIL_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +using std::ostringstream; +using std::vector; +using std::string; +using std::ifstream; +using std::ofstream; + +using namespace std; + +/* + * 打印错误信息,退出程序 + */ +void Error(const char *format, ...); +void Warn(const char *format, ...); +void Info(const char *format, ...); +void Debug(const char *format, ...); + +/* + * StringUtil 用来合并字符串等操作 + */ +struct StringUtil { + // 合并字符串数组 + static void Join(vector& strs, + string& str, + const char sep); + // 合并int数组成一个字符串 + static void Join(vector& strs, + string& str, + const char sep); + // format Float + static string Format(float val) { + ostringstream oss; + oss << fixed << setprecision(3); + oss << val; + const string &s = oss.str(); + if (s.back() == '0') + return s.substr(0, s.size()-1); + return s; + }; + + static string Format(int val) { + ostringstream oss; + oss << val; + return oss.str(); + }; + + // 判断string区间字符串是否相等 + static bool EqualRange(const string &left, + const int leftOffset, + const string &right, + const int rightOffset, + const int len) { + for (int i=0; i> 8) & 0xFF) + << (char)((i >> 16) & 0xFF) + << (char)((i >> 24) & 0xFF); + }; + + static void WriteLong(ofstream &out, uint64_t val) { + uint64_t i = val; + out << (char)(i & 0xFF) + << (char)((i >> 8) & 0xFF) + << (char)((i >> 16) & 0xFF) + << (char)((i >> 24) & 0xFF) + << (char)((i >> 32) & 0xFF) + << (char)((i >> 40) & 0xFF) + << (char)((i >> 48) & 0xFF) + << (char)((i >> 56) & 0xFF); + }; + + static void WriteStr(ofstream &out, const string &s) { + for (int i=0; i total) return false; + b1=buf[(*cur)++];b2=buf[(*cur)++];b3=buf[(*cur)++];b4=buf[(*cur)++]; + *res = (((uint32_t)(uint8_t)b4) << 24) + + (((uint32_t)(uint8_t)b3) << 16) + + (((uint32_t)(uint8_t)b2) << 8) + + (((uint32_t)(uint8_t)b1)); + return true; + }; + + static bool ReadInt(ifstream &ifs, int *res) { + //if(ifs.read((char*)res, sizeof(*res))) return true; + char b[4]; + if(!ifs.read(&b[0], 1)) return false; + if(!ifs.read(&b[1], 1)) return false; + if(!ifs.read(&b[2], 1)) return false; + if(!ifs.read(&b[3], 1)) return false; + uint64_t cur = 0; + return ReadInt((char*)b, 4, &cur, res); + } + + static bool ReadLong(char *buf, uint64_t total, uint64_t *cur, uint64_t *res) { + char b1, b2, b3, b4, b5, b6, b7, b8; + if (*cur + 8 > total) return false; + b1=buf[(*cur)++];b2=buf[(*cur)++];b3=buf[(*cur)++];b4=buf[(*cur)++]; + b5=buf[(*cur)++];b6=buf[(*cur)++];b7=buf[(*cur)++];b8=buf[(*cur)++]; + *res = (((uint64_t)(uint8_t)b8) << 56) + + (((uint64_t)(uint8_t)b7) << 48) + + (((uint64_t)(uint8_t)b6) << 40) + + (((uint64_t)(uint8_t)b5) << 32) + + (((uint64_t)(uint8_t)b4) << 24) + + (((uint64_t)(uint8_t)b3) << 16) + + (((uint64_t)(uint8_t)b2) << 8) + + (((uint64_t)(uint8_t)b1)); + return true; + }; + + static bool ReadLong(ifstream &ifs, uint64_t *res) { + //if(ifs.read((char*)res, sizeof(*res))) return true; + char b[8]; + if(!ifs.read(&b[0], 1)) return false; + if(!ifs.read(&b[1], 1)) return false; + if(!ifs.read(&b[2], 1)) return false; + if(!ifs.read(&b[3], 1)) return false; + if(!ifs.read(&b[4], 1)) return false; + if(!ifs.read(&b[5], 1)) return false; + if(!ifs.read(&b[6], 1)) return false; + if(!ifs.read(&b[7], 1)) return false; + uint64_t cur = 0; + return ReadLong((char*)b, 8, &cur, res); + } + static bool ReadStr(ifstream &ifs, string *res) { + char b; + res->clear(); + if (!ifs.read(&b, 1)) return false; + while ((int)b != 0) { + res->push_back(b); + if (!ifs.read(&b, 1)) return false; + } + return true; + } + static bool ReadStr(char *buf, uint64_t total, uint64_t *cur, string *res) { + char b; + res->clear(); + if (*cur >= total) return false; + b = buf[(*cur)++]; + while ((int)b != 0) { + res->push_back(b); + if (*cur >= total) return false; + b = buf[(*cur)++]; + } + return true; + } +}; + +#endif diff --git a/src/common/utils/yarn.cpp b/src/common/utils/yarn.cpp new file mode 100644 index 0000000..45c60cb --- /dev/null +++ b/src/common/utils/yarn.cpp @@ -0,0 +1,398 @@ +/* yarn.c -- generic thread operations implemented using pthread functions + * Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler + * Version 1.7 12 Apr 2020 Mark Adler + * For conditions of distribution and use, see copyright notice in yarn.h + */ + +/* Basic thread operations implemented using the POSIX pthread library. All + pthread references are isolated within this module to allow alternate + implementations with other thread libraries. See yarn.h for the description + of these operations. */ + +/* Version history: + 1.0 19 Oct 2008 First version + 1.1 26 Oct 2008 No need to set the stack size -- remove + Add yarn_abort() function for clean-up on error exit + 1.2 19 Dec 2011 (changes reversed in 1.3) + 1.3 13 Jan 2012 Add large file #define for consistency with pigz.c + Update thread portability #defines per IEEE 1003.1-2008 + Fix documentation in yarn.h for yarn_prefix + 1.4 19 Jan 2015 Allow yarn_abort() to avoid error message to stderr + Accept and do nothing for NULL argument to free_lock() + 1.5 8 May 2018 Remove destruct() to avoid use of pthread_cancel() + Normalize the code style + 1.6 3 Apr 2019 Add debugging information to fail() error messages + 1.7 12 Apr 2020 Fix use after free bug in ignition() + */ + +// For thread portability. +#define _XOPEN_SOURCE 700 +#define _POSIX_C_SOURCE 200809L +#define _THREAD_SAFE + +// Use large file functions if available. +#define _FILE_OFFSET_BITS 64 + +// External libraries and entities referenced. +#include // fprintf(), stderr +#include // exit(), malloc(), free(), NULL +#include // pthread_t, pthread_create(), pthread_join(), + // pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(), + // PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(), + // pthread_self(), pthread_equal(), + // pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(), + // pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(), + // pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(), + // pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() +#include // EPERM, ESRCH, EDEADLK, ENOMEM, EBUSY, EINVAL, EAGAIN + +// Interface definition. +#include "yarn.h" + +// Constants. +#define local static // for non-exported functions and globals + +// Error handling external globals, resettable by application. +char *yarn_prefix = (char*)"yarn"; +void (*yarn_abort)(int) = NULL; + + +// Immediately exit -- use for errors that shouldn't ever happen. +local void fail(int err, char const *file, long line, char const *func) { + fprintf(stderr, "%s: ", yarn_prefix); + switch (err) { + case EPERM: + fputs("already unlocked", stderr); + break; + case ESRCH: + fputs("no such thread", stderr); + break; + case EDEADLK: + fputs("resource deadlock", stderr); + break; + case ENOMEM: + fputs("out of memory", stderr); + break; + case EBUSY: + fputs("can't destroy locked resource", stderr); + break; + case EINVAL: + fputs("invalid request", stderr); + break; + case EAGAIN: + fputs("resource unavailable", stderr); + break; + default: + fprintf(stderr, "internal error %d", err); + } + fprintf(stderr, " (%s:%ld:%s)\n", file, line, func); + if (yarn_abort != NULL) + yarn_abort(err); + exit(err); +} + +// Memory handling routines provided by user. If none are provided, malloc() +// and free() are used, which are therefore assumed to be thread-safe. +typedef void *(*malloc_t)(size_t); +typedef void (*free_t)(void *); +local malloc_t my_malloc_f = malloc; +local free_t my_free = free; + +// Use user-supplied allocation routines instead of malloc() and free(). +void yarn_mem(malloc_t lease, free_t vacate) { + my_malloc_f = lease; + my_free = vacate; +} + +// Memory allocation that cannot fail (from the point of view of the caller). +local void *my_malloc(size_t size, char const *file, long line) { + void *block; + + if ((block = my_malloc_f(size)) == NULL) + fail(ENOMEM, file, line, "malloc"); + return block; +} + +// -- Lock functions -- + +struct lock_s { + pthread_mutex_t mutex; + pthread_cond_t cond; + long value; +}; + +lock *new_lock_(long initial, char const *file, long line) { + lock *bolt = (lock *)my_malloc(sizeof(struct lock_s), file, line); + int ret = pthread_mutex_init(&(bolt->mutex), NULL); + if (ret) + fail(ret, file, line, "mutex_init"); + ret = pthread_cond_init(&(bolt->cond), NULL); + if (ret) + fail(ret, file, line, "cond_init"); + bolt->value = initial; + return bolt; +} + +void possess_(lock *bolt, char const *file, long line) { + int ret = pthread_mutex_lock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_lock"); +} + +void release_(lock *bolt, char const *file, long line) { + int ret = pthread_mutex_unlock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_unlock"); +} + +void twist_(lock *bolt, enum twist_op op, long val, + char const *file, long line) { + if (op == TO) + bolt->value = val; + else if (op == BY) + bolt->value += val; + int ret = pthread_cond_broadcast(&(bolt->cond)); + if (ret) + fail(ret, file, line, "cond_broadcast"); + ret = pthread_mutex_unlock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_unlock"); +} + +#define until(a) while(!(a)) + +void wait_for_(lock *bolt, enum wait_op op, long val, + char const *file, long line) { + switch (op) { + case TO_BE: + until (bolt->value == val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case NOT_TO_BE: + until (bolt->value != val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case TO_BE_MORE_THAN: + until (bolt->value > val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case TO_BE_LESS_THAN: + until (bolt->value < val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + } +} + +long peek_lock(lock *bolt) { + return bolt->value; +} + +void free_lock_(lock *bolt, char const *file, long line) { + if (bolt == NULL) + return; + int ret = pthread_cond_destroy(&(bolt->cond)); + if (ret) + fail(ret, file, line, "cond_destroy"); + ret = pthread_mutex_destroy(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_destroy"); + my_free(bolt); +} + +// -- Thread functions (uses the lock functions above) -- + +struct thread_s { + pthread_t id; + int done; // true if this thread has exited + thread *next; // for list of all launched threads +}; + +// List of threads launched but not joined, count of threads exited but not +// joined (incremented by ignition() just before exiting). +local lock threads_lock = { + PTHREAD_MUTEX_INITIALIZER, + PTHREAD_COND_INITIALIZER, + 0 // number of threads exited but not joined +}; +local thread *threads = NULL; // list of extant threads + +// Structure in which to pass the probe and its payload to ignition(). +struct capsule { + void (*probe)(void *); + void *payload; + char const *file; + long line; +}; + +// Mark the calling thread as done and alert join_all(). +local void reenter(void *arg) { + struct capsule *capsule = (struct capsule *)arg; + + // find this thread in the threads list by matching the thread id + pthread_t me = pthread_self(); + possess_(&(threads_lock), capsule->file, capsule->line); + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (pthread_equal(match->id, me)) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, capsule->file, capsule->line, "reenter lost"); + + // mark this thread as done and move it to the head of the list + match->done = 1; + if (threads != match) { + *prior = match->next; + match->next = threads; + threads = match; + } + + // update the count of threads to be joined and alert join_all() + twist_(&(threads_lock), BY, +1, capsule->file, capsule->line); + + // free the capsule resource, even if the thread is cancelled (though yarn + // doesn't use pthread_cancel() -- you never know) + my_free(capsule); +} + +// All threads go through this routine. Just before a thread exits, it marks +// itself as done in the threads list and alerts join_all() so that the thread +// resources can be released. Use a cleanup stack so that the marking occurs +// even if the thread is cancelled. +local void *ignition(void *arg) { + struct capsule *capsule = (struct capsule *)arg; + + // run reenter() before leaving + pthread_cleanup_push(reenter, arg); + + // execute the requested function with argument + capsule->probe(capsule->payload); + + // mark this thread as done, letting join_all() know, and free capsule + pthread_cleanup_pop(1); + + // exit thread + return NULL; +} + +// Not all POSIX implementations create threads as joinable by default, so that +// is made explicit here. +thread *launch_(void (*probe)(void *), void *payload, + char const *file, long line) { + // construct the requested call and argument for the ignition() routine + // (allocated instead of automatic so that we're sure this will still be + // there when ignition() actually starts up -- ignition() will free this + // allocation) + struct capsule *capsule = (struct capsule *)my_malloc(sizeof(struct capsule), file, line); + capsule->probe = probe; + capsule->payload = payload; + capsule->file = file; + capsule->line = line; + + // assure this thread is in the list before join_all() or ignition() looks + // for it + possess_(&(threads_lock), file, line); + + // create the thread and call ignition() from that thread + thread *th = (thread *)my_malloc(sizeof(struct thread_s), file, line); + pthread_attr_t attr; + int ret = pthread_attr_init(&attr); + if (ret) + fail(ret, file, line, "attr_init"); + ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + if (ret) + fail(ret, file, line, "attr_setdetachstate"); + ret = pthread_create(&(th->id), &attr, ignition, capsule); + if (ret) + fail(ret, file, line, "create"); + ret = pthread_attr_destroy(&attr); + if (ret) + fail(ret, file, line, "attr_destroy"); + + // put the thread in the threads list for join_all() + th->done = 0; + th->next = threads; + threads = th; + release_(&(threads_lock), file, line); + return th; +} + +void join_(thread *ally, char const *file, long line) { + // wait for thread to exit and return its resources + int ret = pthread_join(ally->id, NULL); + if (ret) + fail(ret, file, line, "join"); + + // find the thread in the threads list + possess_(&(threads_lock), file, line); + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (match == ally) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, file, line, "join lost"); + + // remove thread from list and update exited count, free thread + if (match->done) + threads_lock.value--; + *prior = match->next; + release_(&(threads_lock), file, line); + my_free(ally); +} + +// This implementation of join_all() only attempts to join threads that have +// announced that they have exited (see ignition()). When there are many +// threads, this is faster than waiting for some random thread to exit while a +// bunch of other threads have already exited. +int join_all_(char const *file, long line) { + // grab the threads list and initialize the joined count + int count = 0; + possess_(&(threads_lock), file, line); + + // do until threads list is empty + while (threads != NULL) { + // wait until at least one thread has reentered + wait_for_(&(threads_lock), NOT_TO_BE, 0, file, line); + + // find the first thread marked done (should be at or near the top) + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (match->done) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, file, line, "join_all lost"); + + // join the thread (will be almost immediate), remove from the threads + // list, update the reenter count, and free the thread + int ret = pthread_join(match->id, NULL); + if (ret) + fail(ret, file, line, "join"); + threads_lock.value--; + *prior = match->next; + my_free(match); + count++; + } + + // let go of the threads list and return the number of threads joined + release_(&(threads_lock), file, line); + return count; +} diff --git a/src/common/utils/yarn.h b/src/common/utils/yarn.h new file mode 100644 index 0000000..3a6b4c3 --- /dev/null +++ b/src/common/utils/yarn.h @@ -0,0 +1,138 @@ +/* yarn.h -- generic interface for thread operations + * Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler + * Version 1.7 12 Apr 2020 Mark Adler + */ + +/* + This software is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + 3. This notice may not be removed or altered from any source distribution. + + Mark Adler + madler@alumni.caltech.edu + */ + +/* Basic thread operations + + This interface isolates the local operating system implementation of threads + from the application in order to facilitate platform independent use of + threads. All of the implementation details are deliberately hidden. + + Assuming adequate system resources and proper use, none of these functions + can fail. As a result, any errors encountered will cause an exit() to be + executed, or the execution of your own optionally-provided abort function. + + These functions allow the simple launching and joining of threads, and the + locking of objects and synchronization of changes of objects. The latter is + implemented with a single lock type that contains an integer value. The + value can be ignored for simple exclusive access to an object, or the value + can be used to signal and wait for changes to an object. + + -- Arguments -- + + thread *thread; identifier for launched thread, used by join + void probe(void *); pointer to function "probe", run when thread starts + void *payload; single argument passed to the probe function + lock *lock; a lock with a value -- used for exclusive access to + an object and to synchronize threads waiting for + changes to an object + long val; value to set lock, increment lock, or wait for + int n; number of threads joined + + -- Thread functions -- + + thread = launch(probe, payload) - launch a thread -- exit via probe() return + join(thread) - join a thread and by joining end it, waiting for the thread + to exit if it hasn't already -- will free the resources allocated by + launch() (don't try to join the same thread more than once) + n = join_all() - join all threads launched by launch() that are not joined + yet and free the resources allocated by the launches, usually to clean + up when the thread processing is done -- join_all() returns an int with + the count of the number of threads joined (join_all() should only be + called from the main thread, and should only be called after any calls + of join() have completed) + + -- Lock functions -- + + lock = new_lock(val) - create a new lock with initial value val (lock is + created in the released state) + possess(lock) - acquire exclusive possession of a lock, waiting if necessary + twist(lock, [TO | BY], val) - set lock to or increment lock by val, signal + all threads waiting on this lock and then release the lock -- must + possess the lock before calling (twist releases, so don't do a + release() after a twist() on the same lock) + wait_for(lock, [TO_BE | NOT_TO_BE | TO_BE_MORE_THAN | TO_BE_LESS_THAN], val) + - wait on lock value to be, not to be, be greater than, or be less than + val -- must possess the lock before calling, will possess the lock on + return but the lock is released while waiting to permit other threads + to use twist() to change the value and signal the change (so make sure + that the object is in a usable state when waiting) + release(lock) - release a possessed lock (do not try to release a lock that + the current thread does not possess) + val = peek_lock(lock) - return the value of the lock (assumes that lock is + already possessed, no possess or release is done by peek_lock()) + free_lock(lock) - free the resources allocated by new_lock() (application + must assure that the lock is released before calling free_lock()) + + -- Memory allocation --- + + yarn_mem(better_malloc, better_free) - set the memory allocation and free + routines for use by the yarn routines where the supplied routines have + the same interface and operation as malloc() and free(), and may be + provided in order to supply thread-safe memory allocation routines or + for any other reason -- by default malloc() and free() will be used + + -- Error control -- + + yarn_prefix - a char pointer to a string that will be the prefix for any + error messages that these routines generate before exiting -- if not + changed by the application, "yarn" will be used + yarn_abort - an external function that will be executed when there is an + internal yarn error, due to out of memory or misuse -- this function + may exit to abort the application, or if it returns, the yarn error + handler will exit (set to NULL by default for no action) + */ + +extern char *yarn_prefix; +extern void (*yarn_abort)(int); + +void yarn_mem(void *(*)(size_t), void (*)(void *)); + +typedef struct thread_s thread; +thread *launch_(void (*)(void *), void *, char const *, long); +#define launch(a, b) launch_(a, b, __FILE__, __LINE__) +void join_(thread *, char const *, long); +#define join(a) join_(a, __FILE__, __LINE__) +int join_all_(char const *, long); +#define join_all() join_all_(__FILE__, __LINE__) + +typedef struct lock_s lock; +lock *new_lock_(long, char const *, long); +#define new_lock(a) new_lock_(a, __FILE__, __LINE__) +void possess_(lock *, char const *, long); +#define possess(a) possess_(a, __FILE__, __LINE__) +void release_(lock *, char const *, long); +#define release(a) release_(a, __FILE__, __LINE__) +enum twist_op { TO, BY }; +void twist_(lock *, enum twist_op, long, char const *, long); +#define twist(a, b, c) twist_(a, b, c, __FILE__, __LINE__) +enum wait_op { + TO_BE, /* or */ NOT_TO_BE, /* that is the question */ + TO_BE_MORE_THAN, TO_BE_LESS_THAN }; +void wait_for_(lock *, enum wait_op, long, char const *, long); +#define wait_for(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__) +long peek_lock(lock *); +void free_lock_(lock *, char const *, long); +#define free_lock(a) free_lock_(a, __FILE__, __LINE__) diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index f33e97c..0aacef9 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -7,29 +7,235 @@ Author : Zhang Zhonghai Date : 2023/10/23 */ #include "markdups_arg.h" -#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include "htslib/thread_pool.h" #include +#include +#include +#include using namespace std; +#define BAM_BLOCK_SIZE 2 * 1024 * 1024 + +/* 前向声明 */ +class ThMarkDupArg; +/* 全局本地变量 */ +static queue qpThMarkDupArg; // 存放线程变量的队列 +static lock *queueFirstLock = new_lock(-1); // 队列的第一个任务是否完成 + +/* 多线程处理冗余参数结构体 */ +struct ThMarkDupArg +{ + vector *pvBam; + int startIdx; // 闭区间 + int endIdx; // 开区间 + long seq; // 当前任务在所有任务的排序 + bool more; // 后面还有任务 + volatile bool finish; // 当前任务有没有处理完 + set sDupIdx; // 冗余read的索引 +}; /* - * mark duplicate 入口 + * 多线程查找和标记冗余函数 + */ +void thread_markdups(void *arg) +{ + auto &p = *(ThMarkDupArg *)arg; + + p.sDupIdx.insert(1); + /* 处理数据 */ + + /* 本段数据处理完成,告诉输出线程 */ + possess(queueFirstLock); + p.finish = true; + cout << "process: " << p.seq << endl; + auto front = qpThMarkDupArg.front(); + if (front->finish) + { + twist(queueFirstLock, TO, front->seq); + } else { + release(queueFirstLock); + } +} + +/* + * 多线程将结果写入文件,写之前需要合并相邻线程的未处理的结果 + */ +void thread_write(void *) +{ + bool more = false; + long seq = 0; + possess(queueFirstLock); + wait_for(queueFirstLock, TO_BE, seq++); // 等待首个任务完成 + auto lastP = qpThMarkDupArg.front(); // 取队首的数据 + qpThMarkDupArg.pop(); // 删除队首 + twist(queueFirstLock, TO, seq); + more = lastP->more; + while (more) // 循环处理,将结果写入文件 + { + possess(queueFirstLock); + if (qpThMarkDupArg.empty()) // 有可能新任务没来得及添加进队列 + { + release(queueFirstLock); + continue; + } + wait_for(queueFirstLock, TO_BE, seq); // 等待任务完成 + auto p = qpThMarkDupArg.front(); + if (!p->finish) // 有可能这个任务没有完成,是下边那个twist导致进到这里,因为这一段代码可能运行比较快 + { + twist(queueFirstLock, TO, -1); // 此时队首任务没完成,-1可以让锁无法进入到这里,避免无效获得锁 + continue; + } + qpThMarkDupArg.pop(); + twist(queueFirstLock, TO, seq + 1); + + /* 处理结果数据 */ + cout << "finish: " << seq - 1 << endl; + + /* 准备下一轮循环 */ + delete lastP; + more = p->more; + lastP = p; + seq++; + } + + // 处理最后一个数据 + cout << "finish: " << seq - 1 << endl; + pthread_exit(0); +} + +/* + * Builds a read ends object that represents a single read. + */ +static void buildReadEnds(BamWrap &bw, int64_t index, ReadEnds *pKey) +{ + auto &k = *pKey; + auto &bc = bw.b->core; + k.read1ReferenceIndex = bc.tid; + k.read1Coordinate = (bc.flag & BAM_FREVERSE) ? bw.GetUnclippedEnd() : bw.GetUnclippedStart(); + k.orientation = (bc.flag & BAM_FREVERSE) ? ReadEnds::R : ReadEnds::F; + k.read1IndexInFile = index; +} + +/* + * mark duplicate 入口,假定bam是按照比对后的坐标排序的,同一个样本的话不需要考虑barcode的问题 */ int MarkDuplicates(int argc, char *argv[]) { - // cout << argc << endl; - // for (int i = 0; i < argc; ++i) { - // cout << argv[i] << '\t'; - // } - // cout << endl; - + Timer::log_time("程序开始"); + Timer time_all; + /* 初始化参数 */ GlobalArg &gArg = GlobalArg::Instance(); MarkDupsArg mdArg; vector vAuxVar; - mdArg.parseArgument(argc, argv, &vAuxVar, &gArg); + mdArg.parseArgument(argc, argv, &gArg); // 解析命令行参数 - // cout << ns_md::ValidationStringency::DEFAULT_STRINGENCY << '\t' << ns_md::ValidationStringency::SILENT << endl; + // if (gArg.num_threads > 1) // 多线程处理 + if (false) + { + threadpool thpool = thpool_init(gArg.num_threads); // 创建mark dup所需的线程池 + thread *writeth = launch(thread_write, nullptr); // 启动处理结果的的线程 + for (int i = 0; i < 40; ++i) + { + ThMarkDupArg *thArg = new ThMarkDupArg({nullptr, i, i * 10, i, true, false}); + if (i == 39) + thArg->more = false; + possess(queueFirstLock); // 加锁 + qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 + release(queueFirstLock); // 解锁 + thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 + } + /* 同步所有线程 */ + thpool_wait(thpool); + thpool_destroy(thpool); + join(writeth); + } else { // 单线程串行处理 + /* 打开输入bam文件 */ + sam_hdr_t *inBamHeader; + samFile *inBamFp; + inBamFp = sam_open_format(gArg.in_fn.c_str(), "r", nullptr); + if (! inBamFp) { + Error("[%s] load sam/bam file failed.\n", __func__); + return -1; + } + hts_set_opt(inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); + inBamHeader = sam_hdr_read(inBamFp); + htsThreadPool htsPoolRead = {NULL, 0}; // 多线程读取,创建线程池 + htsThreadPool htsPoolWrite = {NULL, 0}; + htsPoolRead.pool = hts_tpool_init(gArg.num_threads); + htsPoolWrite.pool = hts_tpool_init(gArg.num_threads); + if (!htsPoolRead.pool || !htsPoolWrite.pool) + { + Error("[%d] failed to set up thread pool", __LINE__); + return -1; + } + hts_set_opt(inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); + + /* 创建输出文件 */ + samFile *outBamFp; + htsFormat outFormat = {}; + hts_parse_format(&outFormat, "bam"); + outBamFp = sam_open_format(gArg.out_fn.c_str(), "wb", &outFormat); + hts_set_opt(outBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); + hts_set_opt(outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); // 用同样的线程池处理输出文件 + + // /* 读取缓存初始化 */ + BamBufType inBamBuf(gArg.use_asyncio); + inBamBuf.Init(inBamFp, inBamHeader, gArg.max_mem); + + /* 循环读入信息,并处理 */ + while (inBamBuf.ReadStat() >= 0) + { + int readNum = inBamBuf.ReadBam(); + cout << readNum << endl; + // inBamBuf.ClearAll(); + // cout << inBamBuf.Size() << endl; + inBamBuf.ClearBeforeIdx(inBamBuf.Size()); + // break; + for (int i = 0; i < inBamBuf.Size(); ++i) { + if (sam_write1(outBamFp, inBamHeader, inBamBuf[i]->b) < 0) + { + Error("failed writing to \"%s\"", gArg.out_fn.c_str()); + sam_close(outBamFp); + return -1; + } + } + if (readNum == 0) + break; + } + + // int res = -1; + // bam1_t *b = bam_init1(); + // size_t num = 0; + // while ((res = sam_read1(inBamFp, inBamHeader, b)) >= 0) + // { + // ++num; + // } + // cout << num << endl; + + /* 为每个read创建ReadEnd信息 */ + + /* 标记冗余, 将处理后的结果写入文件 */ + + /* 关闭文件,收尾清理 */ + sam_close(outBamFp); + sam_close(inBamFp); + } + + // cout << "read ends size: " << sizeof(ReadEnds) << endl; + + cout << "总时间: " << time_all.seconds_elapsed() << endl; + Timer::log_time("程序结束"); return 0; } \ No newline at end of file diff --git a/src/sam/markdups/markdups_arg.cpp b/src/sam/markdups/markdups_arg.cpp index 97b7c31..edde02e 100644 --- a/src/sam/markdups/markdups_arg.cpp +++ b/src/sam/markdups/markdups_arg.cpp @@ -8,7 +8,7 @@ Date : 2023/10/27 */ #include "markdups_arg.h" -#include "common/global_arg.h" +#include "common/utils/global_arg.h" #include #include @@ -77,10 +77,8 @@ void setBoolArg(bool *arg) { // 解析参数 void MarkDupsArg::parseArgument(int argc, char **argv, - vector *pvAuxVar, GlobalArg *pGArg) { - auto &vAuxVar = *pvAuxVar; auto &gArg = *pGArg; struct option allOpt[MarkDupsArg::ARG_COUNT + GlobalArg::GLOBAL_ARG_CNT]; @@ -258,12 +256,12 @@ void MarkDupsArg::PrintHelp() "\n" "Required Arguments:\n" "\n" - "--INPUT,-I One or more input SAM, BAM or CRAM files to analyze. Must be coordinate sorted. This\n" + "--INPUT One or more input SAM, BAM or CRAM files to analyze. Must be coordinate sorted. This\n" " argument must be specified at least once.Required.\n" "\n" - "--METRICS_FILE,-M File to write duplication metrics to Required.\n" + "--METRICS_FILE File to write duplication metrics to Required.\n" "\n" - "--OUTPUT,-O The output file to write marked records to Required.\n" + "--OUTPUT The output file to write marked records to Required.\n" "\n" "\n" "Optional Arguments:\n" @@ -275,13 +273,13 @@ void MarkDupsArg::PrintHelp() "--arguments_file read one or more arguments files and add them to the command line This argument may be\n" " specified 0 or more times. Default value: null.\n" "\n" - "--ASSUME_SORT_ORDER,-ASO \n" + "--ASSUME_SORT_ORDER \n" " If not null, assume that the input file has this order even if the header says otherwise.\n" " Default value: null. Possible values: {unsorted, queryname, coordinate, duplicate,\n" " unknown} Cannot be used in conjunction with argument(s) ASSUME_SORTED (AS)\n" "\n" "\n" - "--ASSUME_SORTED,-AS If true, assume that the input file is coordinate sorted even if the header says\n" + "--ASSUME_SORTED If true, assume that the input file is coordinate sorted even if the header says\n" " otherwise. Deprecated, used ASSUME_SORT_ORDER=coordinate instead. Default value: false.\n" " Possible values: {true, false} Cannot be used in conjunction with argument(s)\n" " ASSUME_SORT_ORDER (ASO)\n" @@ -291,7 +289,7 @@ void MarkDupsArg::PrintHelp() "--CLEAR_DT Clear DT tag from input SAM records. Should be set to false if input SAM doesn't have this\n" " tag. Default true Default value: true. Possible values: {true, false}\n" "\n" - "--COMMENT,-CO Comment(s) to include in the output file's header. This argument may be specified 0 or\n" + "--COMMENT Comment(s) to include in the output file's header. This argument may be specified 0 or\n" " more times. Default value: null.\n" "\n" "--COMPRESSION_LEVEL Compression level for all compressed files created (e.g. BAM and VCF). Default value: 5.\n" @@ -313,7 +311,7 @@ void MarkDupsArg::PrintHelp() " BARCODE_TAG hold non-normalized UMIs. Default false. Default value: false. Possible\n" " values: {true, false}\n" "\n" - "--DUPLICATE_SCORING_STRATEGY,-DS \n" + "--DUPLICATE_SCORING_STRATEGY \n" " The scoring strategy for choosing the non-duplicate among candidates. Default value:\n" " SUM_OF_BASE_QUALITIES. Possible values: {SUM_OF_BASE_QUALITIES,\n" " TOTAL_MAPPED_REFERENCE_LENGTH, RANDOM}\n" @@ -343,7 +341,7 @@ void MarkDupsArg::PrintHelp() "\n" "--help,-h display the help message Default value: false. Possible values: {true, false}\n" "\n" - "--MAX_FILE_HANDLES_FOR_READ_ENDS_MAP,-MAX_FILE_HANDLES \n" + "--MAX_FILE_HANDLES_FOR_READ_ENDS_MAP \n" " Maximum number of file handles to keep open when spilling read ends to disk. Set this\n" " number a little lower than the per-process maximum number of file that may be open. This\n" " number can be found by executing the 'ulimit -n' command on a Unix system. Default value:\n" @@ -360,7 +358,7 @@ void MarkDupsArg::PrintHelp() " in RAM before spilling to disk. Increasing this number reduces the number of file handles\n" " needed to sort the file, and increases the amount of RAM needed. Default value: 500000.\n" "\n" - "--MAX_SEQUENCES_FOR_DISK_READ_ENDS_MAP,-MAX_SEQS \n" + "--MAX_SEQUENCES_FOR_DISK_READ_ENDS_MAP \n" " This option is obsolete. ReadEnds will always be spilled to disk. Default value: 50000.\n" "\n" "--MOLECULAR_IDENTIFIER_TAG \n" @@ -374,18 +372,18 @@ void MarkDupsArg::PrintHelp() " For the patterned flowcell models, 2500 is moreappropriate. For other platforms and\n" " models, users should experiment to find what works best. Default value: 100.\n" "\n" - "--PROGRAM_GROUP_COMMAND_LINE,-PG_COMMAND \n" + "--PROGRAM_GROUP_COMMAND_LINE \n" " Value of CL tag of PG record to be created. If not supplied the command line will be\n" " detected automatically. Default value: null.\n" "\n" - "--PROGRAM_GROUP_NAME,-PG_NAME \n" + "--PROGRAM_GROUP_NAME \n" " Value of PN tag of PG record to be created. Default value: MarkDuplicates.\n" "\n" - "--PROGRAM_GROUP_VERSION,-PG_VERSION \n" + "--PROGRAM_GROUP_VERSION \n" " Value of VN tag of PG record to be created. If not specified, the version will be detected\n" " automatically. Default value: null.\n" "\n" - "--PROGRAM_RECORD_ID,-PG \n" + "--PROGRAM_RECORD_ID \n" " The program record ID for the @PG record(s) created by this program. Set to null to\n" " disable PG record creation. This string may have a suffix appended to avoid collision\n" " with other program record IDs. Default value: MarkDuplicates.\n" @@ -416,7 +414,7 @@ void MarkDupsArg::PrintHelp() "--READ_TWO_BARCODE_TAG \n" " Read two barcode SAM tag (ex. BX for 10X Genomics) Default value: null.\n" "\n" - "--REFERENCE_SEQUENCE,-R Reference sequence file. Default value: null.\n" + "--REFERENCE_SEQUENCE Reference sequence file. Default value: null.\n" "\n" "--REMOVE_DUPLICATES If true do not write duplicates to the output file instead of writing them with\n" " appropriate flags set. Default value: false. Possible values: {true, false}\n" @@ -460,14 +458,6 @@ void MarkDupsArg::PrintHelp() " reads to start andend on the same position to be considered duplicate) (for this argument,\n" " \" read end \" means 3' end). Default value: false. Possible values: {true, false}\n" "\n" - "--USE_JDK_DEFLATER,-use_jdk_deflater \n" - " Use the JDK Deflater instead of the Intel Deflater for writing compressed output Default\n" - " value: false. Possible values: {true, false}\n" - "\n" - "--USE_JDK_INFLATER,-use_jdk_inflater \n" - " Use the JDK Inflater instead of the Intel Inflater for reading compressed input Default\n" - " value: false. Possible values: {true, false}\n" - "\n" "--USE_UNPAIRED_CLIPPED_END \n" " Use position of the clipping as the end position, when considering duplicates (or use the\n" " unclipped end position) (for this argument, \" read end \" means 3' end). Default value:\n" diff --git a/src/sam/markdups/markdups_arg.h b/src/sam/markdups/markdups_arg.h index fa66cca..ef03aa1 100644 --- a/src/sam/markdups/markdups_arg.h +++ b/src/sam/markdups/markdups_arg.h @@ -301,7 +301,6 @@ struct MarkDupsArg // 解析参数 void parseArgument(int argc, char **argv, - vector *pvAuxVar, GlobalArg *pGArg); static void PrintHelp();