diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2f5f705 --- /dev/null +++ b/Makefile @@ -0,0 +1,203 @@ +# CMAKE generated file: DO NOT EDIT! +# Generated by "Unix Makefiles" Generator, CMake Version 3.22 + +# Default target executed when no arguments are given to make. +default_target: all +.PHONY : default_target + +# Allow only one "make -f Makefile2" at a time, but pass parallelism. +.NOTPARALLEL: + +#============================================================================= +# Special targets provided by cmake. + +# Disable implicit rules so canonical targets will work. +.SUFFIXES: + +# Disable VCS-based implicit rules. +% : %,v + +# Disable VCS-based implicit rules. +% : RCS/% + +# Disable VCS-based implicit rules. +% : RCS/%,v + +# Disable VCS-based implicit rules. +% : SCCS/s.% + +# Disable VCS-based implicit rules. +% : s.% + +.SUFFIXES: .hpux_make_needs_suffix_list + +# Command-line flag to silence nested $(MAKE). +$(VERBOSE)MAKESILENT = -s + +#Suppress display of executed commands. +$(VERBOSE).SILENT: + +# A target that is always out of date. +cmake_force: +.PHONY : cmake_force + +#============================================================================= +# Set environment variables for the build. + +# The shell in which to execute make rules. +SHELL = /bin/sh + +# The CMake executable. +CMAKE_COMMAND = /usr/bin/cmake + +# The command to remove a file. +RM = /usr/bin/cmake -E rm -f + +# Escaping for special characters. +EQUALS = = + +# The top-level source directory on which CMake was run. +CMAKE_SOURCE_DIR = /home/zzh/work/ngs/FastDup + +# The top-level build directory on which CMake was run. +CMAKE_BINARY_DIR = /home/zzh/work/ngs/FastDup/build + +#============================================================================= +# Targets provided globally by CMake. + +# Special rule for the target edit_cache +edit_cache: + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "No interactive CMake dialog available..." + /usr/bin/cmake -E echo No\ interactive\ CMake\ dialog\ available. +.PHONY : edit_cache + +# Special rule for the target edit_cache +edit_cache/fast: edit_cache +.PHONY : edit_cache/fast + +# Special rule for the target rebuild_cache +rebuild_cache: + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Running CMake to regenerate build system..." + /usr/bin/cmake --regenerate-during-build -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR) +.PHONY : rebuild_cache + +# Special rule for the target rebuild_cache +rebuild_cache/fast: rebuild_cache +.PHONY : rebuild_cache/fast + +# Special rule for the target list_install_components +list_install_components: + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Available install components are: \"Unspecified\"" +.PHONY : list_install_components + +# Special rule for the target list_install_components +list_install_components/fast: list_install_components +.PHONY : list_install_components/fast + +# Special rule for the target install +install: preinstall + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Install the project..." + /usr/bin/cmake -P cmake_install.cmake +.PHONY : install + +# Special rule for the target install +install/fast: preinstall/fast + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Install the project..." + /usr/bin/cmake -P cmake_install.cmake +.PHONY : install/fast + +# Special rule for the target install/local +install/local: preinstall + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Installing only the local directory..." + /usr/bin/cmake -DCMAKE_INSTALL_LOCAL_ONLY=1 -P cmake_install.cmake +.PHONY : install/local + +# Special rule for the target install/local +install/local/fast: preinstall/fast + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Installing only the local directory..." + /usr/bin/cmake -DCMAKE_INSTALL_LOCAL_ONLY=1 -P cmake_install.cmake +.PHONY : install/local/fast + +# Special rule for the target install/strip +install/strip: preinstall + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Installing the project stripped..." + /usr/bin/cmake -DCMAKE_INSTALL_DO_STRIP=1 -P cmake_install.cmake +.PHONY : install/strip + +# Special rule for the target install/strip +install/strip/fast: preinstall/fast + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Installing the project stripped..." + /usr/bin/cmake -DCMAKE_INSTALL_DO_STRIP=1 -P cmake_install.cmake +.PHONY : install/strip/fast + +# The main all target +all: cmake_check_build_system + $(CMAKE_COMMAND) -E cmake_progress_start /home/zzh/work/ngs/FastDup/build/CMakeFiles /home/zzh/work/ngs/FastDup/build//CMakeFiles/progress.marks + $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 all + $(CMAKE_COMMAND) -E cmake_progress_start /home/zzh/work/ngs/FastDup/build/CMakeFiles 0 +.PHONY : all + +# The main clean target +clean: + $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 clean +.PHONY : clean + +# The main clean target +clean/fast: clean +.PHONY : clean/fast + +# Prepare targets for installation. +preinstall: all + $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 preinstall +.PHONY : preinstall + +# Prepare targets for installation. +preinstall/fast: + $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 preinstall +.PHONY : preinstall/fast + +# clear depends +depend: + $(CMAKE_COMMAND) -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR) --check-build-system CMakeFiles/Makefile.cmake 1 +.PHONY : depend + +#============================================================================= +# Target rules for targets named fastdup + +# Build rule for target. +fastdup: cmake_check_build_system + $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 fastdup +.PHONY : fastdup + +# fast build rule for target. +fastdup/fast: + $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/build +.PHONY : fastdup/fast + +# Help Target +help: + @echo "The following are some of the valid targets for this Makefile:" + @echo "... all (the default if no target is provided)" + @echo "... clean" + @echo "... depend" + @echo "... edit_cache" + @echo "... install" + @echo "... install/local" + @echo "... install/strip" + @echo "... list_install_components" + @echo "... rebuild_cache" + @echo "... fastdup" +.PHONY : help + + + +#============================================================================= +# Special targets to cleanup operation of make. + +# Special rule to run CMake to check the build system integrity. +# No rule that depends on this can have commands that come from listfiles +# because they might be regenerated. +cmake_check_build_system: + $(CMAKE_COMMAND) -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR) --check-build-system CMakeFiles/Makefile.cmake 0 +.PHONY : cmake_check_build_system + diff --git a/Makefile_src b/Makefile_src new file mode 100644 index 0000000..3897990 --- /dev/null +++ b/Makefile_src @@ -0,0 +1,447 @@ +# CMAKE generated file: DO NOT EDIT! +# Generated by "Unix Makefiles" Generator, CMake Version 3.22 + +# Default target executed when no arguments are given to make. +default_target: all +.PHONY : default_target + +# Allow only one "make -f Makefile2" at a time, but pass parallelism. +.NOTPARALLEL: + +#============================================================================= +# Special targets provided by cmake. + +# Disable implicit rules so canonical targets will work. +.SUFFIXES: + +# Disable VCS-based implicit rules. +% : %,v + +# Disable VCS-based implicit rules. +% : RCS/% + +# Disable VCS-based implicit rules. +% : RCS/%,v + +# Disable VCS-based implicit rules. +% : SCCS/s.% + +# Disable VCS-based implicit rules. +% : s.% + +.SUFFIXES: .hpux_make_needs_suffix_list + +# Command-line flag to silence nested $(MAKE). +$(VERBOSE)MAKESILENT = -s + +#Suppress display of executed commands. +$(VERBOSE).SILENT: + +# A target that is always out of date. +cmake_force: +.PHONY : cmake_force + +#============================================================================= +# Set environment variables for the build. + +# The shell in which to execute make rules. +SHELL = /bin/sh + +# The CMake executable. +CMAKE_COMMAND = /usr/bin/cmake + +# The command to remove a file. +RM = /usr/bin/cmake -E rm -f + +# Escaping for special characters. +EQUALS = = + +# The top-level source directory on which CMake was run. +CMAKE_SOURCE_DIR = /home/zzh/work/ngs/FastDup + +# The top-level build directory on which CMake was run. +CMAKE_BINARY_DIR = /home/zzh/work/ngs/FastDup/build + +#============================================================================= +# Targets provided globally by CMake. + +# Special rule for the target edit_cache +edit_cache: + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "No interactive CMake dialog available..." + /usr/bin/cmake -E echo No\ interactive\ CMake\ dialog\ available. +.PHONY : edit_cache + +# Special rule for the target edit_cache +edit_cache/fast: edit_cache +.PHONY : edit_cache/fast + +# Special rule for the target rebuild_cache +rebuild_cache: + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Running CMake to regenerate build system..." + /usr/bin/cmake --regenerate-during-build -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR) +.PHONY : rebuild_cache + +# Special rule for the target rebuild_cache +rebuild_cache/fast: rebuild_cache +.PHONY : rebuild_cache/fast + +# Special rule for the target list_install_components +list_install_components: + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Available install components are: \"Unspecified\"" +.PHONY : list_install_components + +# Special rule for the target list_install_components +list_install_components/fast: list_install_components +.PHONY : list_install_components/fast + +# Special rule for the target install +install: preinstall + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Install the project..." + /usr/bin/cmake -P cmake_install.cmake +.PHONY : install + +# Special rule for the target install +install/fast: preinstall/fast + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Install the project..." + /usr/bin/cmake -P cmake_install.cmake +.PHONY : install/fast + +# Special rule for the target install/local +install/local: preinstall + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Installing only the local directory..." + /usr/bin/cmake -DCMAKE_INSTALL_LOCAL_ONLY=1 -P cmake_install.cmake +.PHONY : install/local + +# Special rule for the target install/local +install/local/fast: preinstall/fast + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Installing only the local directory..." + /usr/bin/cmake -DCMAKE_INSTALL_LOCAL_ONLY=1 -P cmake_install.cmake +.PHONY : install/local/fast + +# Special rule for the target install/strip +install/strip: preinstall + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Installing the project stripped..." + /usr/bin/cmake -DCMAKE_INSTALL_DO_STRIP=1 -P cmake_install.cmake +.PHONY : install/strip + +# Special rule for the target install/strip +install/strip/fast: preinstall/fast + @$(CMAKE_COMMAND) -E cmake_echo_color --switch=$(COLOR) --cyan "Installing the project stripped..." + /usr/bin/cmake -DCMAKE_INSTALL_DO_STRIP=1 -P cmake_install.cmake +.PHONY : install/strip/fast + +# The main all target +all: cmake_check_build_system + cd /home/zzh/work/ngs/FastDup/build && $(CMAKE_COMMAND) -E cmake_progress_start /home/zzh/work/ngs/FastDup/build/CMakeFiles /home/zzh/work/ngs/FastDup/build/src//CMakeFiles/progress.marks + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 src/all + $(CMAKE_COMMAND) -E cmake_progress_start /home/zzh/work/ngs/FastDup/build/CMakeFiles 0 +.PHONY : all + +# The main clean target +clean: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 src/clean +.PHONY : clean + +# The main clean target +clean/fast: clean +.PHONY : clean/fast + +# Prepare targets for installation. +preinstall: all + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 src/preinstall +.PHONY : preinstall + +# Prepare targets for installation. +preinstall/fast: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 src/preinstall +.PHONY : preinstall/fast + +# clear depends +depend: + cd /home/zzh/work/ngs/FastDup/build && $(CMAKE_COMMAND) -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR) --check-build-system CMakeFiles/Makefile.cmake 1 +.PHONY : depend + +# Convenience name for target. +src/CMakeFiles/fastdup.dir/rule: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 src/CMakeFiles/fastdup.dir/rule +.PHONY : src/CMakeFiles/fastdup.dir/rule + +# Convenience name for target. +fastdup: src/CMakeFiles/fastdup.dir/rule +.PHONY : fastdup + +# fast build rule for target. +fastdup/fast: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/build +.PHONY : fastdup/fast + +__/ext/klib/kthread.o: __/ext/klib/kthread.c.o +.PHONY : __/ext/klib/kthread.o + +# target to build an object file +__/ext/klib/kthread.c.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/__/ext/klib/kthread.c.o +.PHONY : __/ext/klib/kthread.c.o + +__/ext/klib/kthread.i: __/ext/klib/kthread.c.i +.PHONY : __/ext/klib/kthread.i + +# target to preprocess a source file +__/ext/klib/kthread.c.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/__/ext/klib/kthread.c.i +.PHONY : __/ext/klib/kthread.c.i + +__/ext/klib/kthread.s: __/ext/klib/kthread.c.s +.PHONY : __/ext/klib/kthread.s + +# target to generate assembly for a file +__/ext/klib/kthread.c.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/__/ext/klib/kthread.c.s +.PHONY : __/ext/klib/kthread.c.s + +main.o: main.cpp.o +.PHONY : main.o + +# target to build an object file +main.cpp.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/main.cpp.o +.PHONY : main.cpp.o + +main.i: main.cpp.i +.PHONY : main.i + +# target to preprocess a source file +main.cpp.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/main.cpp.i +.PHONY : main.cpp.i + +main.s: main.cpp.s +.PHONY : main.s + +# target to generate assembly for a file +main.cpp.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/main.cpp.s +.PHONY : main.cpp.s + +markdup/class_static_var.o: markdup/class_static_var.cpp.o +.PHONY : markdup/class_static_var.o + +# target to build an object file +markdup/class_static_var.cpp.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/class_static_var.cpp.o +.PHONY : markdup/class_static_var.cpp.o + +markdup/class_static_var.i: markdup/class_static_var.cpp.i +.PHONY : markdup/class_static_var.i + +# target to preprocess a source file +markdup/class_static_var.cpp.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/class_static_var.cpp.i +.PHONY : markdup/class_static_var.cpp.i + +markdup/class_static_var.s: markdup/class_static_var.cpp.s +.PHONY : markdup/class_static_var.s + +# target to generate assembly for a file +markdup/class_static_var.cpp.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/class_static_var.cpp.s +.PHONY : markdup/class_static_var.cpp.s + +markdup/markdup.o: markdup/markdup.cpp.o +.PHONY : markdup/markdup.o + +# target to build an object file +markdup/markdup.cpp.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/markdup.cpp.o +.PHONY : markdup/markdup.cpp.o + +markdup/markdup.i: markdup/markdup.cpp.i +.PHONY : markdup/markdup.i + +# target to preprocess a source file +markdup/markdup.cpp.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/markdup.cpp.i +.PHONY : markdup/markdup.cpp.i + +markdup/markdup.s: markdup/markdup.cpp.s +.PHONY : markdup/markdup.s + +# target to generate assembly for a file +markdup/markdup.cpp.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/markdup.cpp.s +.PHONY : markdup/markdup.cpp.s + +markdup/md_funcs.o: markdup/md_funcs.cpp.o +.PHONY : markdup/md_funcs.o + +# target to build an object file +markdup/md_funcs.cpp.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/md_funcs.cpp.o +.PHONY : markdup/md_funcs.cpp.o + +markdup/md_funcs.i: markdup/md_funcs.cpp.i +.PHONY : markdup/md_funcs.i + +# target to preprocess a source file +markdup/md_funcs.cpp.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/md_funcs.cpp.i +.PHONY : markdup/md_funcs.cpp.i + +markdup/md_funcs.s: markdup/md_funcs.cpp.s +.PHONY : markdup/md_funcs.s + +# target to generate assembly for a file +markdup/md_funcs.cpp.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/md_funcs.cpp.s +.PHONY : markdup/md_funcs.cpp.s + +markdup/md_pipeline.o: markdup/md_pipeline.cpp.o +.PHONY : markdup/md_pipeline.o + +# target to build an object file +markdup/md_pipeline.cpp.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/md_pipeline.cpp.o +.PHONY : markdup/md_pipeline.cpp.o + +markdup/md_pipeline.i: markdup/md_pipeline.cpp.i +.PHONY : markdup/md_pipeline.i + +# target to preprocess a source file +markdup/md_pipeline.cpp.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/md_pipeline.cpp.i +.PHONY : markdup/md_pipeline.cpp.i + +markdup/md_pipeline.s: markdup/md_pipeline.cpp.s +.PHONY : markdup/md_pipeline.s + +# target to generate assembly for a file +markdup/md_pipeline.cpp.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/markdup/md_pipeline.cpp.s +.PHONY : markdup/md_pipeline.cpp.s + +util/bam_buf.o: util/bam_buf.cpp.o +.PHONY : util/bam_buf.o + +# target to build an object file +util/bam_buf.cpp.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/bam_buf.cpp.o +.PHONY : util/bam_buf.cpp.o + +util/bam_buf.i: util/bam_buf.cpp.i +.PHONY : util/bam_buf.i + +# target to preprocess a source file +util/bam_buf.cpp.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/bam_buf.cpp.i +.PHONY : util/bam_buf.cpp.i + +util/bam_buf.s: util/bam_buf.cpp.s +.PHONY : util/bam_buf.s + +# target to generate assembly for a file +util/bam_buf.cpp.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/bam_buf.cpp.s +.PHONY : util/bam_buf.cpp.s + +util/profiling.o: util/profiling.cpp.o +.PHONY : util/profiling.o + +# target to build an object file +util/profiling.cpp.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/profiling.cpp.o +.PHONY : util/profiling.cpp.o + +util/profiling.i: util/profiling.cpp.i +.PHONY : util/profiling.i + +# target to preprocess a source file +util/profiling.cpp.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/profiling.cpp.i +.PHONY : util/profiling.cpp.i + +util/profiling.s: util/profiling.cpp.s +.PHONY : util/profiling.s + +# target to generate assembly for a file +util/profiling.cpp.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/profiling.cpp.s +.PHONY : util/profiling.cpp.s + +util/yarn.o: util/yarn.cpp.o +.PHONY : util/yarn.o + +# target to build an object file +util/yarn.cpp.o: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/yarn.cpp.o +.PHONY : util/yarn.cpp.o + +util/yarn.i: util/yarn.cpp.i +.PHONY : util/yarn.i + +# target to preprocess a source file +util/yarn.cpp.i: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/yarn.cpp.i +.PHONY : util/yarn.cpp.i + +util/yarn.s: util/yarn.cpp.s +.PHONY : util/yarn.s + +# target to generate assembly for a file +util/yarn.cpp.s: + cd /home/zzh/work/ngs/FastDup/build && $(MAKE) $(MAKESILENT) -f src/CMakeFiles/fastdup.dir/build.make src/CMakeFiles/fastdup.dir/util/yarn.cpp.s +.PHONY : util/yarn.cpp.s + +# Help Target +help: + @echo "The following are some of the valid targets for this Makefile:" + @echo "... all (the default if no target is provided)" + @echo "... clean" + @echo "... depend" + @echo "... edit_cache" + @echo "... install" + @echo "... install/local" + @echo "... install/strip" + @echo "... list_install_components" + @echo "... rebuild_cache" + @echo "... fastdup" + @echo "... __/ext/klib/kthread.o" + @echo "... __/ext/klib/kthread.i" + @echo "... __/ext/klib/kthread.s" + @echo "... main.o" + @echo "... main.i" + @echo "... main.s" + @echo "... markdup/class_static_var.o" + @echo "... markdup/class_static_var.i" + @echo "... markdup/class_static_var.s" + @echo "... markdup/markdup.o" + @echo "... markdup/markdup.i" + @echo "... markdup/markdup.s" + @echo "... markdup/md_funcs.o" + @echo "... markdup/md_funcs.i" + @echo "... markdup/md_funcs.s" + @echo "... markdup/md_pipeline.o" + @echo "... markdup/md_pipeline.i" + @echo "... markdup/md_pipeline.s" + @echo "... util/bam_buf.o" + @echo "... util/bam_buf.i" + @echo "... util/bam_buf.s" + @echo "... util/profiling.o" + @echo "... util/profiling.i" + @echo "... util/profiling.s" + @echo "... util/yarn.o" + @echo "... util/yarn.i" + @echo "... util/yarn.s" +.PHONY : help + + + +#============================================================================= +# Special targets to cleanup operation of make. + +# Special rule to run CMake to check the build system integrity. +# No rule that depends on this can have commands that come from listfiles +# because they might be regenerated. +cmake_check_build_system: + cd /home/zzh/work/ngs/FastDup/build && $(CMAKE_COMMAND) -S$(CMAKE_SOURCE_DIR) -B$(CMAKE_BINARY_DIR) --check-build-system CMakeFiles/Makefile.cmake 0 +.PHONY : cmake_check_build_system + diff --git a/src/markdup/markdup.cpp b/src/markdup/markdup.cpp index 6ec50ef..3916c19 100644 --- a/src/markdup/markdup.cpp +++ b/src/markdup/markdup.cpp @@ -18,8 +18,7 @@ Date : 2023/10/23 #include "fastdup_version.h" #include "md_args.h" #include "md_funcs.h" -// #include "pipeline_md.h" -#include "new_pipe.h" +#include "md_pipeline.h" #include "read_name_parser.h" #include "util/profiling.h" @@ -99,8 +98,7 @@ int MarkDuplicates() { /* 冗余检查和标记 */ PROF_START(markdup_all); - //pipelineMarkDups(); - NewPipeMarkDups(); + PipelineMarkDups(); PROF_END(gprof[GP_markdup_all], markdup_all); /* 标记冗余, 将处理后的结果写入文件 */ @@ -165,7 +163,7 @@ int MarkDuplicates() { spdlog::info("{} duplicate reads found", dupIdxQue.Size()); spdlog::info("{} optical reads found", opticalIdxQue.Size()); spdlog::info("{} represent reads found", repIdxQue.Size()); - spdlog::info("real dup size: {}", dupIdxQue.RealSize()); + // spdlog::info("real dup size: {}", dupIdxQue.RealSize()); return 0; diff --git a/src/markdup/md_args.h b/src/markdup/md_args.h index e68b11f..675e001 100644 --- a/src/markdup/md_args.h +++ b/src/markdup/md_args.h @@ -64,7 +64,7 @@ struct MarkDupsArg { int NUM_THREADS = 1; - size_t MAX_MEM = ((size_t)1) << 30; // << 31 // 最小2G + size_t MAX_MEM = ((size_t)1) << 28; // << 31 // 最小2G bool DUPLEX_IO = true; // 同时读写 diff --git a/src/markdup/new_pipe.cpp b/src/markdup/md_pipeline.cpp similarity index 79% rename from src/markdup/new_pipe.cpp rename to src/markdup/md_pipeline.cpp index 07b4e3a..6d9dd71 100644 --- a/src/markdup/new_pipe.cpp +++ b/src/markdup/md_pipeline.cpp @@ -1,4 +1,4 @@ -#include "new_pipe.h" +#include "md_pipeline.h" #include #include @@ -110,12 +110,14 @@ static void markDupsForPairs(vector &vpRe, DPSet *dup } for (auto pe : vpRe) { // 对非best read标记冗余 if (pe != pBest) { // 非best - if (pe->read1IndexInFile == 1165334 || pe->read1IndexInFile == 1165335) { - for (auto p : vpRe) { - cout << "zzh find in pairs " << p->read1IndexInFile << " " << p->score << endl; - } - // cout << "zzh find in pairs " << pe->read1IndexInFile << " " << (notOpticalDupIdx == nullptr) << endl; - } + + // if (1555341360 == pe->read1IndexInFile) { + // for (auto p : vpRe) { + // cout << "zzh find: " << p->read1IndexInFile << '\t' << p->read2IndexInFile << endl; + // } + // cout << "split" << endl; + // } + dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 if (pe->read2IndexInFile != pe->read1IndexInFile) dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, @@ -150,9 +152,6 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, if (containsPairs) { for (auto pe : vpRe) { if (!pe->IsPaired()) { - //if (pe->read1IndexInFile == 143876) { - // cout << "zzh find in frags" << endl; - //} dupIdx->insert(pe->read1IndexInFile); } } @@ -170,9 +169,6 @@ static void markDupsForFrags(vector &vpRe, bool containsPairs, } for (auto pe : vpRe) { if (pe != pBest) { - //if (pe->read1IndexInFile == 143876) { - // cout << "zzh find in frags" << endl; - //} dupIdx->insert(pe->read1IndexInFile); } } @@ -373,6 +369,9 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { if (bw->GetReadUnmappedFlag()) { if (bw->b->core.tid == -1) // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). + //if (p.genREOrder >= 3719) { + // cout << "inner core.tid check\t" << bw->b->core.tid << '\t' << i << "\t" << start_id << '\t' << end_id << endl; + //} break; } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 ReadEnds fragEnd; @@ -380,6 +379,9 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { frags.push_back(fragEnd); // 添加进frag集合 if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 string key = bw->query_name(); + // if ("ERR194147.780864524" == key) { + // cout << "zzh find: " << key << '\t' << fragEnd.read1IndexInFile << '\t' << fragEnd.read2IndexInFile << endl; + // } if (unpairedDic.find(key) == unpairedDic.end()) { unpairedDic[key] = {taskSeq, fragEnd}; } else { // 找到了pairend @@ -401,10 +403,18 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) { PROF_START(sort_pair); sort(pairs.begin(), pairs.end()); PROF_END(tprof[TP_sort_pair][tid], sort_pair); + + // if (p.genREOrder >= 3719) { + // cout << "zzh genRE size: " << p.genREOrder << '\t' << tid << '\t' << frags.size() << '\t' << pairs.size() + // << '\t' << start_id << '\t' << end_id << '\t' << bams.size() << endl; + // } } static void doGenRE(PipelineArg &pipeArg) { // return; + //if (pipeArg.genREOrder < 895) return; + //if (pipeArg.genREOrder < 440) return; + GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM]; ReadData &readData = pipeArg.readData; @@ -417,21 +427,43 @@ static void doGenRE(PipelineArg &pipeArg) { genREData.unpairedDic.clear(); vector &pairs = genREData.pairsArr[numThread]; pairs.clear(); + + int testNum = 0; for (int i = 0; i < numThread; ++i) { + testNum += genREData.unpairedDicArr[i].size(); for (auto &val : genREData.unpairedDicArr[i]) { const string &key = val.first; const ReadEnds &fragEnd = val.second.unpairedRE; + + // if ("ERR194147.783448001" == key) { + // cout << "zzh find in doGen: " << key << '\t' << fragEnd.read1IndexInFile << '\t' << fragEnd.read2IndexInFile + // << endl; + // } + if (genREData.unpairedDic.find(key) == genREData.unpairedDic.end()) { genREData.unpairedDic[key] = {readData.taskSeq, fragEnd}; + // if (fragEnd.read1IndexInFile == 1524046492 || fragEnd.read2IndexInFile == 1524046492) { + // cout << "zzh not find paired: " << key << "\t" << fragEnd.read1IndexInFile << '\t' + // << fragEnd.read2IndexInFile << endl; + // } } else { // 找到了pairend auto &pairedEnds = genREData.unpairedDic.at(key).unpairedRE; modifyPairedEnds(fragEnd, &pairedEnds); pairs.push_back(pairedEnds); + // if (pairedEnds.read1IndexInFile == 1524046492 || pairedEnds.read2IndexInFile == 1524046492) { + // cout << "zzh find paired: " << key << '\t' << pairedEnds.read1IndexInFile << '\t' + // << pairedEnds.read2IndexInFile << "\t" << fragEnd.read1IndexInFile << '\t' + // << fragEnd.read2IndexInFile << '\t' << endl; + // } genREData.unpairedDic.erase(key); // 删除找到的pairend } } } sort(pairs.begin(), pairs.end()); + // if (pipeArg.genREOrder >= 3719) { + // cout << "zzh genRE size: " << pipeArg.genREOrder << '\t' << pairs.size() << '\t' << genREData.unpairedDic.size() + // << '\t' << readData.bams.size() << '\t' << testNum << endl; + // } } // end for step 2 generate read ends @@ -450,16 +482,26 @@ static void doSort(PipelineArg &pipeArg) { ReadEndsHeap pairsHeap, fragsHeap; PROF_START(sort_pair); pairsHeap.Init(&genREData.pairsArr); + // if (pipeArg.sortOrder >= 3719) { + // cout << "zzh sort pair size: " << pairsHeap.Size() << endl; + // } while ((pRE = pairsHeap.Pop()) != nullptr) { smd.pairs.push_back(*pRE); } PROF_END(gprof[GP_sort_pair], sort_pair); PROF_START(sort_frag); fragsHeap.Init(&genREData.fragsArr); + // if (pipeArg.sortOrder >= 3719) { + // cout << "zzh sort frag size: " << fragsHeap.Size() << endl; + // } while ((pRE = fragsHeap.Pop()) != nullptr) { smd.frags.push_back(*pRE); } PROF_END(gprof[GP_sort_frag], sort_frag); + // if (pipeArg.sortOrder >= 929) { + // cout << "zzh sort size: " << pipeArg.sortOrder << '\t' << smd.pairs.size() << '\t' << smd.frags.size() << '\t' + // << genREData.pairsArr.size() << '\t' << genREData.fragsArr.size() << endl; + // } } // for step-4 sort static void doMarkDup(PipelineArg &pipeArg) { @@ -467,6 +509,7 @@ static void doMarkDup(PipelineArg &pipeArg) { MarkDupData &mdData = pipeArg.markDupData[pipeArg.markDupOrder % pipeArg.MARKBUFNUM]; SortData &sortData = pipeArg.sortData[pipeArg.markDupOrder % pipeArg.SORTBUFNUM]; mdData.taskSeq = pipeArg.markDupOrder; + cout << "zzh markdup: " << mdData.taskSeq << '\t' << pipeArg.markDupOrder << '\t' << pipeArg.intersectOrder << endl; mdData.clear(); auto tmpPtr = mdData.dataPtr; @@ -482,6 +525,10 @@ static void doMarkDup(PipelineArg &pipeArg) { PROF_START(markdup_frag); processFrags(smd.frags, &mdData.fragDupIdx); PROF_END(gprof[GP_markdup_frag], markdup_frag); + // if (mdData.taskSeq >= 929) { + // cout << "zzh markdup size: " << mdData.taskSeq << '\t' << smd.pairs.size() << '\t' << smd.frags.size() << '\t' + // << smd.unpairedDic.size() << '\t' << mdData.pairDupIdx.size() << '\t' << mdData.fragDupIdx.size() << endl; + // } } template @@ -561,6 +608,14 @@ static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) { SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; SortMarkData &csm = *(SortMarkData *)cp.dataPtr; + //if (lp.taskSeq >= 1856) + // return; // + //if (lp.taskSeq >= 1800) { + // cout << "zzh find in findUnpairedInDatas: " << lp.taskSeq << '\t' << cp.taskSeq << '\t' + // << lsm.unpairedDic.size() << '\t' << csm.unpairedDic.size() << '\t' << lp.ckeyReadEndsMap.size() << '\t' + // << cp.ckeyReadEndsMap.size() << endl; + //} + for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end(); ) { // 遍历上一个任务中的每个未匹配的read auto &lastUnpair = *itr; auto &readName = lastUnpair.first; @@ -569,13 +624,24 @@ static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) { if (csm.unpairedDic.find(readName) != csm.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read auto &curUnpairInfo = csm.unpairedDic[readName]; auto &curRE = curUnpairInfo.unpairedRE; - // 在某些clip情况下,poskey可能是后面的read - int64_t lastPosKey = lastRE.posKey; - int64_t curPosKey = curRE.posKey; modifyPairedEnds(curRE, &lastRE); // lastRE当做找到匹配后,完整的ReadEnds - CalcKey ck = {min(lastPosKey, curPosKey), max(lastPosKey, curPosKey)}; + CalcKey ck(lastRE); + // if (lp.taskSeq == 1856) { + // cout << "zzh ck: " << ck.Read1Pos() << ' ' << ck.Read2Pos() + // << "\t find: " << (interPairedData.find(ck) != interPairedData.end()) << '\t' << interPairedData.size() << endl; + // } auto &pairArr = interPairedData[ck]; pairArr.push_back(lastRE); + + // if (lastRE.read1IndexInFile == 1555341360 || lastRE.read2IndexInFile == 1555341360 || + // lastRE.read1IndexInFile == 1555341145 || lastRE.read2IndexInFile == 1555341145) { + // cout << "zzh find in lp: " << pairArr.size() << '\t' << readName << '\t' << lastRE.read1IndexInFile + // << '\t' << lastRE.read2IndexInFile << '\t' << ck.Read1Pos() << '\t' << ck.Read2Pos() << endl; + // for (auto &p : pairArr) { + // cout << "reads in arr: " << p.read1IndexInFile << '\t' << p.read2IndexInFile << '\t' << p.score + // << endl; + // } + // } // 从dict中清除配对后的readends csm.unpairedDic.erase(readName); itr = lsm.unpairedDic.erase(itr); @@ -590,6 +656,11 @@ static void findUnpairedInGlobal(IntersectData &g, MarkDupData &lp) { auto &interPairedData = g.ckeyReadEndsMap; SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; + if (lp.taskSeq >= 929) { + cout << "zzh findUnpairedInGlobal size: " << lp.taskSeq << '\t' << lsm.unpairedDic.size() << '\t' + << g.unpairedDic.size() << '\t' << g.ckeyReadEndsMap.size() << endl; + } + for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end();) { // 遍历上一个任务中的每个未匹配的read auto &lastUnpair = *itr; auto &readName = lastUnpair.first; @@ -598,13 +669,18 @@ static void findUnpairedInGlobal(IntersectData &g, MarkDupData &lp) { if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在global里找到了这个未匹配的read auto &gUnpairInfo = g.unpairedDic[readName]; auto &gRE = gUnpairInfo.unpairedRE; - // 在某些clip情况下,poskey可能是后面的read - int64_t lastPosKey = lastRE.posKey; - int64_t gPosKey = gRE.posKey; modifyPairedEnds(lastRE, &gRE); // gRE当做找到匹配后,完整的ReadEnds - CalcKey ck = {min(lastPosKey, gPosKey), max(lastPosKey, gPosKey)}; + CalcKey ck(gRE); auto &pairArr = interPairedData[ck]; pairArr.push_back(gRE); + // if (gRE.read1IndexInFile == 1555341360 || gRE.read2IndexInFile == 1555341360 || + // gRE.read1IndexInFile == 1555341145 || gRE.read2IndexInFile == 1555341145) { + // cout << "zzh find in global: " << pairArr.size() << '\t' << readName << '\t' << gRE.read1IndexInFile << '\t' << gRE.read2IndexInFile + // << '\t' << ck.Read1Pos() << '\t' << ck.Read2Pos() << endl; + // for (auto &p : pairArr) { + // cout << "reads in arr: " << p.read1IndexInFile << '\t' << p.read2IndexInFile << '\t' << p.score << endl; + // } + // } // 从dict中清除配对后的readends g.unpairedDic.erase(readName); itr = lsm.unpairedDic.erase(itr); @@ -650,16 +726,32 @@ static void doIntersect(PipelineArg &pipeArg) { SortMarkData &csm = *(SortMarkData *)cp.dataPtr; SortMarkData &nsm = *(SortMarkData *)np.dataPtr; + // if (pipeArg.sortOrder >= 929) { + // cout << "zzh intersect size: " << pipeArg.intersectOrder << '\t' << lp.taskSeq << '\t' + // << lsm.pairs.size() << '\t' << lsm.frags.size() << '\t' << lsm.unpairedDic.size()<< '\t' + // << csm.pairs.size() << '\t' << csm.frags.size() << '\t' << csm.unpairedDic.size()<< '\t' + // << nsm.pairs.size() << '\t' << nsm.frags.size() << '\t' << nsm.unpairedDic.size()<< '\t' + // << endl; + // } + // 处理相邻数据块之间重叠的部分 if (pipeArg.intersectOrder == kInitIntersectOrder) { // 第一次运行,需要处理lp和cp processIntersectFragPairs(lp, cp); } processIntersectFragPairs(cp, np); + // if (pipeArg.sortOrder >= 929) { + // cout << "zzh intersect lp taskSeq: " << lp.taskSeq << '\t' << pipeArg.markDupData[0].taskSeq << '\t' + // << pipeArg.markDupData[1].taskSeq << '\t' << pipeArg.markDupData[2].taskSeq << '\t' + // << pipeArg.markDupData[3].taskSeq << '\t' << endl; + // } // 检查确保lp和np之间没有数据交叉 - int64_t lastRightPos = 0, curLeftPos = INT64_MAX, nextLeftPos = INT64_MAX; + int64_t lastRightPos = 0, curLeftPos = INT64_MAX, curRightPos = INT64_MAX, nextLeftPos = INT64_MAX; if (lsm.frags.size() > 0) lastRightPos = lsm.frags.back().posKey; - if (csm.frags.size() > 0) curLeftPos = csm.frags[0].posKey; + if (csm.frags.size() > 0) { + curLeftPos = csm.frags[0].posKey; + curRightPos = csm.frags.back().posKey; + } if (nsm.frags.size() > 0) nextLeftPos = nsm.frags[0].posKey; if (lastRightPos >= nextLeftPos) { spdlog::error("previous data can not contain readends included by next data block! {} {}", lastRightPos, nextLeftPos); @@ -676,7 +768,8 @@ static void doIntersect(PipelineArg &pipeArg) { // 处理lp中的新找到的匹配 TaskSeqDupInfo t; - for (auto &ckVal : lp.ckeyReadEndsMap) { + for (auto itr = lp.ckeyReadEndsMap.begin(); itr != lp.ckeyReadEndsMap.end();) { + auto &ckVal = *itr; auto &ck = ckVal.first; auto &pairArr = ckVal.second; getEqualRE(pairArr[0], lsm.pairs, &pairArr); @@ -688,15 +781,20 @@ static void doIntersect(PipelineArg &pipeArg) { pairArr.insert(pairArr.end(), cpPairArr.begin(), cpPairArr.end()); cp.ckeyReadEndsMap.erase(cpKeyItr); } - sort(pairArr.begin(), pairArr.end()); - processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, - &t.notRepIdx); + if (ck.Read2Pos() <= curRightPos) { // 必须在当前数据块的范围内,否则放到global里 + sort(pairArr.begin(), pairArr.end()); + processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, + &t.notRepIdx); + itr = lp.ckeyReadEndsMap.erase(itr); + } else { + ++itr; + } } // 处理找到匹配的global数据 for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) { auto &ckVal = *itr; auto &ck = ckVal.first; - if (ck.read2Pos < curLeftPos) { + if (ck.Read2Pos() < curLeftPos) { // 只有在这个范围内,对应位点的所有reads才完全都包含了 auto &pairArr = ckVal.second; sort(pairArr.begin(), pairArr.end()); processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx); @@ -705,6 +803,9 @@ static void doIntersect(PipelineArg &pipeArg) { ++itr; } } + for (auto &ckVal : lp.ckeyReadEndsMap) { + g.ckeyReadEndsMap.insert(ckVal); + } lp.ckeyReadEndsMap.clear(); // 更新一下冗余结果 refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp, cp); @@ -744,8 +845,12 @@ static void *pipeRead(void *data) { inBamBuf.ClearAll(); // 清理上一轮读入的数据 pipeArg.readOrder += 1; // for next yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 + + if (pipeArg.readOrder >= 3719) { + cout << "zzh read size: " << pipeArg.readData.bams.size() << endl; + } } - spdlog::info("total reads processed {}", readNumSum); + spdlog::info("total reads processed {}, last order {}", readNumSum, pipeArg.readOrder); return 0; } static void *pipeGenRE(void *data) { @@ -767,7 +872,7 @@ static void *pipeGenRE(void *data) { yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, pipeArg.GENBUFNUM); // 有BUFNUM个坑 yarn::RELEASE(pipeArg.genRESig); // 因为有不止一个位置,所以要释放 - if (pipeArg.readFinish) { + if (pipeArg.readFinish) { // 读取结束的时候,没有新的数据需要处理了 yarn::POSSESS(pipeArg.genRESig); pipeArg.genREFinish = 1; yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); @@ -784,6 +889,7 @@ static void *pipeGenRE(void *data) { yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // 使用了一次读入的数据 } + spdlog::info("genRE last order {}", pipeArg.genREOrder); return 0; } static void *pipeSort(void *data) { @@ -802,11 +908,19 @@ static void *pipeSort(void *data) { if (pipeArg.genREFinish) { // 处理完剩余数据 + cout << "zzh pipeSort: " << pipeArg.genREOrder << '\t' << pipeArg.sortOrder << endl; while (pipeArg.sortOrder < pipeArg.genREOrder) { + yarn::POSSESS(pipeArg.sortSig); + yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // 有BUFNUM个位置 + yarn::RELEASE(pipeArg.sortSig); + PROF_START(sort); doSort(pipeArg); PROF_END(gprof[GP_sort], sort); + + yarn::POSSESS(pipeArg.sortSig); pipeArg.sortOrder += 1; + yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); } yarn::POSSESS(pipeArg.sortSig); pipeArg.sortFinish = 1; @@ -825,6 +939,7 @@ static void *pipeSort(void *data) { pipeArg.sortOrder += 1; yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); } + spdlog::info("sort last order {}", pipeArg.sortOrder); return 0; } static void *pipeMarkDup(void *data) { @@ -842,16 +957,26 @@ static void *pipeMarkDup(void *data) { yarn::RELEASE(pipeArg.markDupSig); if (pipeArg.sortFinish) { + cout << "zzh pipeMarkDup: " << pipeArg.sortOrder << '\t' << pipeArg.markDupOrder << endl; // 应该还得处理剩余的数据 while (pipeArg.markDupOrder < pipeArg.sortOrder) { + cout << "zzh pipeMarkDup: " << pipeArg.sortOrder << '\t' << pipeArg.markDupOrder << endl; + yarn::POSSESS(pipeArg.markDupSig); + yarn::WAIT_FOR(pipeArg.markDupSig, yarn::NOT_TO_BE, pipeArg.MARKBUFNUM); + yarn::RELEASE(pipeArg.markDupSig); + PROF_START(markdup); doMarkDup(pipeArg); PROF_END(gprof[GP_markdup], markdup); + + yarn::POSSESS(pipeArg.markDupSig); pipeArg.markDupOrder += 1; + yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); } + cout << "zzh pipeMarkDup: " << pipeArg.sortOrder << '\t' << pipeArg.markDupOrder << endl; yarn::POSSESS(pipeArg.markDupSig); pipeArg.markDupFinish = 1; - yarn::TWIST(pipeArg.markDupSig, yarn::BY, 2); + yarn::TWIST(pipeArg.markDupSig, yarn::TO, 3 + pipeArg.MARKBUFNUM); break; } /* 冗余检测 readends */ @@ -865,6 +990,7 @@ static void *pipeMarkDup(void *data) { pipeArg.markDupOrder += 1; yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); } + spdlog::info("markdup last order {}", pipeArg.markDupOrder); return 0; } static void *pipeIntersect(void *data) { @@ -878,12 +1004,14 @@ static void *pipeIntersect(void *data) { PROF_END(gprof[GP_intersect_wait], intersect_wait); if (pipeArg.markDupFinish) { + cout << "zzh pipeIntersect: " << pipeArg.markDupOrder << '\t' << pipeArg.intersectOrder << endl; while (pipeArg.intersectOrder < pipeArg.markDupOrder) { PROF_START(intersect); doIntersect(pipeArg); PROF_END(gprof[GP_intersect], intersect); pipeArg.intersectOrder += 1; } + break; } /* 交叉数据处理 readends */ @@ -896,6 +1024,7 @@ static void *pipeIntersect(void *data) { pipeArg.intersectOrder += 1; } + spdlog::info("intersect last order {}", pipeArg.intersectOrder); return 0; } @@ -908,12 +1037,16 @@ static void processLastData(PipelineArg &pipeArg) { SortMarkData &lsm = *(SortMarkData *)lp.dataPtr; SortMarkData &csm = *(SortMarkData *)cp.dataPtr; + spdlog::info("last taskseq: lp {}, cp {}, {}, {}", lp.taskSeq, cp.taskSeq, lp.pairDupIdx.size(), cp.pairDupIdx.size()); spdlog::info("last data size: g{}-lp{}-cp{}", g.unpairedDic.size(), lsm.unpairedDic.size(), csm.unpairedDic.size()); + spdlog::info("last pair frags: {}, {}, {}, {}", lsm.frags.size(), lsm.pairs.size(), csm.frags.size(), csm.pairs.size()); // 把lp中未匹配的放到global里保存 findUnpairedInGlobal(g, lp); findUnpairedInGlobal(g, cp); + spdlog::info("last data size after: g{}-lp{}-cp{}", g.unpairedDic.size(), lsm.unpairedDic.size(), csm.unpairedDic.size()); + // 处理lp中的新找到的匹配 TaskSeqDupInfo t; for (auto &ckVal : lp.ckeyReadEndsMap) { @@ -945,7 +1078,6 @@ static void processLastData(PipelineArg &pipeArg) { static void parallelPipeline() { PipelineArg pipeArg(&nsgv::gDupRes); - // PipelineArg &pipeArg = nsgv::gPipe; pipeArg.numThread = nsgv::gMdArg.NUM_THREADS; pthread_t tidArr[5]; @@ -959,17 +1091,17 @@ static void parallelPipeline() { processLastData(pipeArg); PROF_END(gprof[GP_merge_result], merge_result); - spdlog::info("pipeArg size : {}", pipeArg.byteSize()); + spdlog::info("pipeArg size : {} GB", pipeArg.byteSize() / 1024.0 / 1024 / 1024); size_t repNum = 0; for (auto &v : pipeArg.intersectData.repIdxArr) repNum += v.size(); spdlog::info("rep num : {}", repNum); - spdlog::info("result size : {}", nsgv::gDupRes.byteSize()); + spdlog::info("result size : {} GB", nsgv::gDupRes.byteSize() / 1024.0 / 1024 / 1024); } /* 并行流水线方式处理数据,标记冗余 */ -void NewPipeMarkDups() { +void PipelineMarkDups() { if (nsgv::gMdArg.NUM_THREADS > 1) return parallelPipeline(); diff --git a/src/markdup/new_pipe.h b/src/markdup/md_pipeline.h similarity index 89% rename from src/markdup/new_pipe.h rename to src/markdup/md_pipeline.h index 6e098b3..5dbfb3b 100644 --- a/src/markdup/new_pipe.h +++ b/src/markdup/md_pipeline.h @@ -30,8 +30,9 @@ struct GenREData { for (auto &r : v) bytes += sizeof(r); for (auto &v : fragsArr) for (auto &r : v) bytes += sizeof(r); - for (auto &m : unpairedDicArr) bytes += m.size() * 100; - bytes += unpairedDic.size() * 100; + spdlog::info("genRE readends size : {} GB", bytes / 1024.0 / 1024 / 1024); + for (auto &m : unpairedDicArr) bytes += m.size() * sizeof(*m.begin()); + bytes += unpairedDic.size() * sizeof(*unpairedDic.begin()); return bytes; } }; @@ -44,7 +45,8 @@ struct SortMarkData { size_t bytes = 0; for (auto &r : pairs) bytes += sizeof(r); for (auto &r : frags) bytes += sizeof(r); - bytes += unpairedDic.size() * 100; + spdlog::info("sortmark readends size : {} GB", bytes / 1024.0 / 1024 / 1024); + bytes += unpairedDic.bucket_count() * sizeof(*unpairedDic.begin()); return bytes; } }; @@ -122,14 +124,14 @@ struct IntersectData { size_t byteSize() { size_t bytes = 0; - bytes += unpairedDic.size() * 100; + bytes += unpairedDic.size() * sizeof(*unpairedDic.begin()); for (auto &v : dupIdxArr) for (auto &r : v) bytes += sizeof(r); for (auto &v : opticalDupIdxArr) for (auto &r : v) bytes += sizeof(r); for (auto &v : repIdxArr) for (auto &r : v) bytes += sizeof(r); - spdlog::info("result size : {}", bytes); + spdlog::info("result size : {} GB", bytes / 1024.0 / 1024 / 1024); return bytes; } @@ -189,16 +191,16 @@ struct PipelineArg { size_t tmp = 0; for (int i = 0; i < SORTBUFNUM + MARKBUFNUM; ++i) tmp += sortMarkData[i].byteSize(); bytes += tmp; - spdlog::info("sortMarkData size : {}", tmp); + spdlog::info("sortMarkData size : {} GB", tmp / 1024.0 / 1024 / 1024); for (int i = 0; i < GENBUFNUM; ++i) tmp += genREData[i].byteSize(); bytes += tmp; - spdlog::info("genREData size : {}", tmp); + spdlog::info("genREData size : {} GB", tmp / 1024.0 / 1024 / 1024); for (int i = 0; i < MARKBUFNUM; ++i) tmp += markDupData[i].byteSize(); bytes += tmp; - spdlog::info("markDupData size : {}", tmp); + spdlog::info("markDupData size : {} GB", tmp / 1024.0 / 1024 / 1024); tmp += intersectData.byteSize(); bytes += tmp; - spdlog::info("intersectData size : {}", tmp); + spdlog::info("intersectData size : {} GB", tmp / 1024.0 / 1024 / 1024); return bytes; } @@ -261,4 +263,4 @@ struct ReadEndsHeap { }; // 并行运行mark duplicate -void NewPipeMarkDups(); \ No newline at end of file +void PipelineMarkDups(); \ No newline at end of file diff --git a/src/markdup/md_types.h b/src/markdup/md_types.h index db5d31c..064aead 100644 --- a/src/markdup/md_types.h +++ b/src/markdup/md_types.h @@ -27,24 +27,58 @@ struct UnpairedREInfo { /* 对于一个pair数据,一个完整的计算点,包含read1的比对位置和read2的比对位置 */ struct CalcKey { - int64_t read1Pos; - int64_t read2Pos; + int8_t orientation = -1; + int32_t read1ReferenceIndex = -1; + int32_t read1Coordinate = -1; + int32_t read2ReferenceIndex = -1; + int32_t read2Coordinate = -1; + + CalcKey() {} + CalcKey(const ReadEnds &re) { + orientation = re.orientation; + read1ReferenceIndex = re.read1ReferenceIndex; + read1Coordinate = re.read1Coordinate; + read2ReferenceIndex = re.read2ReferenceIndex; + read2Coordinate = re.read2Coordinate; + } + + int64_t Read1Pos() const { return BamWrap::bam_global_pos(read1ReferenceIndex, read1Coordinate); } + + int64_t Read2Pos() const { return BamWrap::bam_global_pos(read2ReferenceIndex, read2Coordinate); } + bool operator<(const CalcKey &o) const { - int comp = (int)(read1Pos - o.read1Pos); + int comp = read1ReferenceIndex - o.read1ReferenceIndex; if (comp == 0) - comp = (int)(read2Pos - o.read2Pos); + comp = read1Coordinate - o.read1Coordinate; + // 需要orientation,因为要跟排序的比较方式和顺序一致 + if (comp == 0) + comp = orientation - o.orientation; + if (comp == 0) + comp = read2ReferenceIndex - o.read2ReferenceIndex; + if (comp == 0) + comp = read2Coordinate - o.read2Coordinate; return comp < 0; } - bool operator==(const CalcKey &o) const { return read1Pos == o.read1Pos && read2Pos == o.read2Pos; } - std::size_t operator()(const CalcKey &o) const { - return std::hash()(read1Pos) ^ std::hash()(read2Pos); + bool operator==(const CalcKey &o) const { + return read1ReferenceIndex == o.read1ReferenceIndex && read1Coordinate == o.read1Coordinate && + orientation == o.orientation && read2ReferenceIndex == o.read2ReferenceIndex && + read2Coordinate == o.read2Coordinate; + } + std::size_t operator()() const { + size_t h1 = read1ReferenceIndex; + h1 = (h1 << 40) | (read1Coordinate << 8) | orientation; + size_t h2 = read2ReferenceIndex; + h2 = (h2 << 32) | read2Coordinate; + return std::hash()(h1) ^ std::hash()(h2); } }; struct CalcKeyHash { - std::size_t operator()(const CalcKey &o) const { - return std::hash()(o.read1Pos) ^ std::hash()(o.read2Pos); - } + std::size_t operator()(const CalcKey &o) const { return o(); } +}; + +struct CalcKeyEqual { + bool operator()(const CalcKey &o1, const CalcKey &o2) const { return o1 == o2; } }; /* 用来记录冗余索引相关的信息 */ @@ -119,8 +153,13 @@ struct UnpairedPosInfo { // typedef unordered_map UnpairedPositionMap; typedef tsl::robin_map UnpairedNameMap; // 以read name为索引,保存未匹配的pair read +//typedef map UnpairedNameMap; // 以read name为索引,保存未匹配的pair read typedef tsl::robin_map UnpairedPositionMap; // 以位点为索引,保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量 -typedef map> CkeyReadEndsMap; // 以calckey为关键字,保存在相邻数据块之前找到的匹配readEnds +// typedef map> CkeyReadEndsMap; // 以calckey为关键字,保存在相邻数据块之前找到的匹配readEnds +typedef unordered_map, CalcKeyHash, CalcKeyEqual> + CkeyReadEndsMap; // 以calckey为关键字,保存在相邻数据块之前找到的匹配readEnds +// typedef tsl::robin_map, CalcKeyHash, CalcKeyEqual> CkeyReadEndsMap; // +// 以calckey为关键字,保存在相邻数据块之前找到的匹配readEnds /* 单线程处理冗余参数结构体 */ struct MarkDupDataArg { @@ -208,7 +247,8 @@ struct DupIdxQueue { DupInfo nextDup = dupIdx; auto topIdx = minHeap.top(); - ofstream ofs("dupn-noxyz.txt"); + ofstream ofs("na12878.txt"); + ofstream ofs1("na12878-all.txt"); while (dupIdx != -1) { ++len; @@ -224,7 +264,7 @@ struct DupIdxQueue { << endl; } } - //ofs << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx << endl; + ofs1 << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx << endl; ofs << topIdx.dupIdx << endl; dupIdx = nextDup; preTop = topIdx; diff --git a/src/markdup/pipeline_md.cpp b/src/markdup/pipeline_md.cpp deleted file mode 100644 index a1e9fd4..0000000 --- a/src/markdup/pipeline_md.cpp +++ /dev/null @@ -1,1120 +0,0 @@ -#include "pipeline_md.h" - -#include -#include -#include - -#include -#include -#include - -#include "dup_metrics.h" -#include "md_args.h" -#include "md_funcs.h" -#include "read_ends.h" -#include "read_name_parser.h" -#include "util/bam_buf.h" -#include "util/profiling.h" -#include "util/yarn.h" - -using std::vector; - -namespace nsgv { - -extern MarkDupsArg gMdArg; // 用来测试性能 -extern samFile *gInBamFp; // 输入的bam文件 -extern sam_hdr_t *gInBamHeader; // 输入的bam文件头信息 -extern DuplicationMetrics gMetrics; // 统计信息 -extern vector gNameParsers; -extern DupResult gDupRes; -extern PipelineArg gPipe; -}; // namespace nsgv - -/* 排序 */ -static inline void sortReadEndsArr(vector &arr) { - size_t blockSize = 64 * 1024; - if (arr.size() < blockSize) { - std::sort(arr.begin(), arr.end()); - return; - } - size_t blockNum = (arr.size() + blockSize - 1) / blockSize; - size_t crossNum = 1024; - size_t start, end, i, left, right; - std::sort(arr.begin(), arr.begin() + blockSize); - for (i = 1; i < blockNum; ++i) { - start = i * blockSize; - end = min(start + blockSize, arr.size()); - std::sort(arr.begin() + start, arr.begin() + end); - left = crossNum; - while (!(arr[start - left] < arr[start])) { - left = left << 1; - if (left >= blockSize) { - std::sort(arr.begin(), arr.end()); // 退化到普通排序 - return; - } - } - right = min(crossNum, end - start - 1); - - while (!(arr[start - 1] < arr[start + right])) { - right = min(right << 1, end - start - 1); - if (right == end - start - 1) - break; - } - std::sort(arr.begin() + start - left, arr.begin() + start + right); - } -} - -/* 处理一组pairend的readends,标记冗余, 这个函数需要串行运行,因为需要做一些统计*/ -static void markDupsForPairs(vector &vpRe, - DPSet *dupIdx, - MDSet *opticalDupIdx, - DPSet *repIdx, - MDSet *notDupIdx = nullptr, - MDSet *notOpticalDupIdx = nullptr, - MDSet *notRepIdx = nullptr) { - if (vpRe.size() < 2) { - return; - } - - int maxScore = 0; - const ReadEnds *pBest = nullptr; - /** All read ends should have orientation FF, FR, RF, or RR **/ - for (auto pe : vpRe) { // 找分数最高的readend - if (pe->score > maxScore || pBest == nullptr) { - maxScore = pe->score; - pBest = pe; - } - } - - if (notDupIdx != nullptr) { - notDupIdx->insert(pBest->read1IndexInFile); - notDupIdx->insert(pBest->read2IndexInFile); - } - - if (nsgv::gMdArg.CHECK_OPTICAL_DUP) { // 检查光学冗余 - // trackOpticalDuplicates - vector prevOpticalRe; - if (notOpticalDupIdx != nullptr) { - for (auto pe : vpRe) { - if (pe->isOpticalDuplicate) { - prevOpticalRe.push_back(pe); - } - } - } - trackOpticalDuplicates(vpRe, pBest); - // 由于重叠问题,可能会更新数据 - if (notOpticalDupIdx != nullptr) { - for (auto pe : prevOpticalRe) { - if (!pe->isOpticalDuplicate) { - notOpticalDupIdx->insert(pe->read1IndexInFile); - notOpticalDupIdx->insert(pe->read2IndexInFile); - } - } - } - } - for (auto pe : vpRe) { // 对非best read标记冗余 - if (pe != pBest) { // 非best - dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1 - if (pe->read2IndexInFile != pe->read1IndexInFile) - dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2, - // 注意这里代表是read1的索引 - // 检查是否optical dup - if (pe->isOpticalDuplicate && opticalDupIdx != nullptr) { - opticalDupIdx->insert(pe->read1IndexInFile); - if (pe->read2IndexInFile != pe->read1IndexInFile) - opticalDupIdx->insert(pe->read2IndexInFile); - } - } - } - // 在输出的bam文件中添加tag, best作为dupset的代表 - if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS) { - repIdx->insert(DupInfo(pBest->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); - repIdx->insert(DupInfo(pBest->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); - if (notRepIdx != nullptr) { - for (auto pe : vpRe) { - if (pe != pBest) { - notRepIdx->insert(pe->read1IndexInFile); - if (pe->read2IndexInFile != pe->read1IndexInFile) - notRepIdx->insert(pe->read2IndexInFile); - } - } - } - } -} - -/* 处理一组非paired的readends,标记冗余 */ -static void markDupsForFrags(vector &vpRe, bool containsPairs, DPSet *dupIdx, - MDSet *notDupIdx = nullptr) { - if (containsPairs) { - for (auto pe : vpRe) { - if (!pe->IsPaired()) { - dupIdx->insert(pe->read1IndexInFile); - } - } - } else { - int maxScore = 0; - const ReadEnds *pBest = nullptr; - for (auto pe : vpRe) { - if (pe->score > maxScore || pBest == nullptr) { - maxScore = pe->score; - pBest = pe; - } - } - if (notDupIdx != nullptr) { - notDupIdx->insert(pBest->read1IndexInFile); - } - for (auto pe : vpRe) { - if (pe != pBest) { - dupIdx->insert(pe->read1IndexInFile); - } - } - } -} - -/* 找到与readend pos相等的所有readend */ -static void getEqualRE(const ReadEnds &re, vector &src, vector *dst) { - auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::PairsLittleThan); // 只比对位点 - dst->insert(dst->end(), range.first, range.second); -} - -#define LOWER_BOUND(tid, nthread, ndata) ((tid) * (ndata) / (nthread)) -#define UPPER_BOUND(tid, nthread, ndata) ((tid + 1) * (ndata) / (nthread)) - -/* 处理pairs */ -static void processPairs(vector &readEnds, DPSet *dupIdx, MDSet *opticalDupIdx, - DPSet *repIdx, MDSet *notDupIdx = nullptr, - MDSet *notOpticalDupIdx = nullptr, MDSet *notRepIdx = nullptr) { - // return; - vector vpCache; // 有可能是冗余的reads - const ReadEnds *pReadEnd = nullptr; - for (auto &re : readEnds) { - if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 - vpCache.push_back(&re); // 处理一个潜在的冗余组 - else { - markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, - notRepIdx); // 不一样 - vpCache.clear(); - vpCache.push_back(&re); - pReadEnd = &re; - } - } - markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx); -} - -/* 处理frags */ -static void processFrags(vector &readEnds, DPSet *dupIdx, MDSet *notDupIdx = nullptr) { - bool containsPairs = false; - bool containsFrags = false; - vector vpCache; // 有可能是冗余的reads - const ReadEnds *pReadEnd = nullptr; - for (auto &re : readEnds) { - if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) { - vpCache.push_back(&re); - containsPairs = containsPairs || re.IsPaired(); - containsFrags = containsFrags || !re.IsPaired(); - } else { - if (vpCache.size() > 1 && containsFrags) { - markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); - } - vpCache.clear(); - vpCache.push_back(&re); - pReadEnd = &re; - containsPairs = re.IsPaired(); - containsFrags = !re.IsPaired(); - } - } - if (vpCache.size() > 1 && containsFrags) { - markDupsForFrags(vpCache, containsPairs, dupIdx, notDupIdx); - } -} - -/* 单线程markdup (第二步)*/ -static void markdups(MarkDupDataArg *arg) { - auto &p = *arg; - p.fragDupIdx.clear(); - p.pairDupIdx.clear(); - p.pairOpticalDupIdx.clear(); - p.pairRepIdx.clear(); - p.pairSingletonIdx.clear(); - /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ - // 先处理 pair - processPairs(p.pairs, &p.pairDupIdx, &p.pairOpticalDupIdx, &p.pairRepIdx, &p.pairSingletonIdx); - // 再处理frag - processFrags(p.frags, &p.fragDupIdx); -} - -/* 获取交叉部分的数据 */ -static inline void getIntersectData(vector &leftArr, vector &rightArr, vector *dst, - bool isPairCmp = false) { - if (leftArr.empty() || rightArr.empty()) { - return; - } - const size_t leftEndIdx = leftArr.size() - 1; - const size_t rightStartIdx = 0; - size_t leftSpan = 0; - size_t rightSpan = 0; - - while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) { - leftSpan += 1; - if (leftSpan > leftEndIdx) { // 上一个的范围被下一个全部包围了(可能会有bug,上上个也与下一个有交集呢?) - leftSpan = leftArr.size() - 1; - break; - } - } - - while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) { - rightSpan += 1; - if (rightSpan == rightArr.size() - 1) // 同上,可能会有bug - break; - } - dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end()); - dst->insert(dst->end(), rightArr.begin(), rightArr.begin() + rightSpan); - std::sort(dst->begin(), dst->end()); -} - -/* 将frags重叠部分的dup idx放进数据中 */ -static inline void refreshFragDupIdx(DPSet &dupIdx, MDSet ¬DupIdx, MarkDupDataArg *lastArg, - MarkDupDataArg *curArg) { - auto &lp = *lastArg; - auto &p = *curArg; - for (auto idx : dupIdx) { - lp.fragDupIdx.insert(idx); - p.fragDupIdx.erase(idx); - } - for (auto idx : notDupIdx) { - lp.fragDupIdx.erase(idx); - p.fragDupIdx.erase(idx); - } -} - -// 用来分别处理dup和optical dup -static void refeshTaskDupInfo(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, - MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, - DPSet &latterDupIdx, MDSet &latterOpticalDupIdx, - DPSet &latterRepIdx, MDSet &latterNotDupIdx, - MDSet &latterNotOpticalDupIdx, MDSet &latterNotRepIdx) { - for (auto idx : dupIdx) latterDupIdx.insert(idx); - for (auto idx : opticalDupIdx) latterOpticalDupIdx.insert(idx); - for (auto idx : repIdx) latterRepIdx.insert(idx); - for (auto idx : notDupIdx) latterNotDupIdx.insert(idx); - for (auto idx : notOpticalDupIdx) latterNotOpticalDupIdx.insert(idx); - for (auto idx : notRepIdx) latterNotRepIdx.insert(idx); -} - -/* 最后合并数据并排序 */ -template -static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet ¬DupIdx, vector &dupArr, - vector &cacheDupIdx1, vector &midArr1) { - //midArr.resize(0); - //cacheDupIdx.resize(0); - - vector cacheDupIdx; - vector midArr; - - cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end()); - std::sort(cacheDupIdx.begin(), cacheDupIdx.end()); - - auto ai = dupArr.begin(); - auto ae = dupArr.end(); - auto bi = cacheDupIdx.begin(); - auto be = cacheDupIdx.end(); - - T val = 0; - while (ai != ae && bi != be) { - if (*ai < *bi) { - val = *ai++; - } else if (*bi < *ai) { - val = *bi++; - } else { - val = *bi++; // 相等的时候取后面的作为结果 - ai++; - } - if (notDupIdx.find(val) == notDupIdx.end()) { - midArr.push_back(val); - } - } - while (ai != ae) { - val = *ai++; - if (notDupIdx.find(val) == notDupIdx.end()) { - midArr.push_back(val); - } - } - while (bi != be) { - val = *bi++; - if (notDupIdx.find(val) == notDupIdx.end()) { - midArr.push_back(val); - } - } - // spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size()); - //dupArr = midArr; - dupArr.clear(); - dupArr.assign(midArr.begin(), midArr.end()); - spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size()); -} - -// for step 2 generate read ends - -// multi-thread generate read ends -static void mtGenerateReadEnds(void *data, long idx, int tid) { - auto &p = *(PipelineArg *)data; - auto &rnParser = nsgv::gNameParsers[idx]; - int nThread = p.numThread; - auto &bams = p.readData.bams; - int64_t bamStartIdx = p.readData.bamStartIdx; - int64_t taskSeq = p.readData.taskSeq; - GenREData &genREData = p.genREData[p.genREOrder % p.GENBUFNUM]; - auto &pairs = genREData.pairsArr[idx]; - auto &frags = genREData.fragsArr[idx]; - auto &unpairedDic = genREData.unpairedDicArr[idx]; - - pairs.clear(); - frags.clear(); - unpairedDic.clear(); - - PROF_START(gen); - size_t start_id = LOWER_BOUND(idx, nThread, bams.size()); - size_t end_id = UPPER_BOUND(idx, nThread, bams.size()); - for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read - BamWrap *bw = bams[i]; - const int64_t bamIdx = bamStartIdx + i; - if (bw->GetReadUnmappedFlag()) { - if (bw->b->core.tid == -1) - // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). - break; - } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 - ReadEnds fragEnd; - buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); - frags.push_back(fragEnd); // 添加进frag集合 - if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 - string key = bw->query_name(); - if (unpairedDic.find(key) == unpairedDic.end()) { - unpairedDic[key] = {taskSeq, fragEnd}; - } else { // 找到了pairend - auto &pairedEnds = unpairedDic.at(key).unpairedRE; - modifyPairedEnds(fragEnd, &pairedEnds); - pairs.push_back(pairedEnds); - unpairedDic.erase(key); // 删除找到的pairend - } - } - } - } - PROF_END(tprof[TP_gen][tid], gen); - - PROF_START(sort_frag); - // sortReadEndsArr(frags); - sort(frags.begin(), frags.end()); - PROF_END(tprof[TP_sort_frag][tid], sort_frag); - - PROF_START(sort_pair); - sort(pairs.begin(), pairs.end()); - PROF_END(tprof[TP_sort_pair][tid], sort_pair); -} - -static void doGenRE(PipelineArg &pipeArg) { - // return; - GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM]; - ReadData &readData = pipeArg.readData; - - // generate read ends - const int numThread = pipeArg.numThread; - - kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread); - // 用来在genRE阶段计算一些Sort阶段应该计算的数据,保持每个step用时更平衡 - // 轮询每个线程中未找到匹配的read,找到匹配的 - genREData.unpairedDic.clear(); - genREData.unpairedPosArr.clear(); - vector &pairs = genREData.pairsArr[numThread]; - pairs.clear(); - for (int i = 0; i < numThread; ++i) { - for (auto &val : genREData.unpairedDicArr[i]) { - const string &key = val.first; - const ReadEnds &fragEnd = val.second.unpairedRE; - if (genREData.unpairedDic.find(key) == genREData.unpairedDic.end()) { - genREData.unpairedDic[key] = {readData.taskSeq, fragEnd}; - } else { // 找到了pairend - auto &pairedEnds = genREData.unpairedDic.at(key).unpairedRE; - modifyPairedEnds(fragEnd, &pairedEnds); - pairs.push_back(pairedEnds); - genREData.unpairedDic.erase(key); // 删除找到的pairend - } - } - } - sort(pairs.begin(), pairs.end()); - - // create unpaired info - for (auto &e : genREData.unpairedDic) { - auto posKey = e.second.unpairedRE.posKey; - auto &unpairArrInfo = genREData.unpairedPosArr[posKey]; - unpairArrInfo.unpairedNum++; - unpairArrInfo.taskSeq = readData.taskSeq; - unpairArrInfo.readNameSet.insert(e.first); - } -} -// end for step 2 generate read ends - -// for step-3 sort -static void doSort(PipelineArg &pipeArg) { - // return; - GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM]; - SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM]; - SortMarkData &smd = *(SortMarkData *)sortData.dataPtr; - - smd.unpairedDic = genREData.unpairedDic; - smd.unpairedPosArr = genREData.unpairedPosArr; - - smd.pairs.clear(); - smd.frags.clear(); - const ReadEnds *pRE; - ReadEndsHeap pairsHeap, fragsHeap; - PROF_START(sort_pair); - pairsHeap.Init(&genREData.pairsArr); - while ((pRE = pairsHeap.Pop()) != nullptr) { - smd.pairs.push_back(*pRE); - } - PROF_END(gprof[GP_sort_pair], sort_pair); - PROF_START(sort_frag); - fragsHeap.Init(&genREData.fragsArr); - while ((pRE = fragsHeap.Pop()) != nullptr) { - smd.frags.push_back(*pRE); - } - PROF_END(gprof[GP_sort_frag], sort_frag); -} -// for step-4 sort -static void doMarkDup(PipelineArg &pipeArg) { - // return; - MarkDupData &mdData = pipeArg.markDupData[pipeArg.markDupOrder % pipeArg.MARKBUFNUM]; - SortData &sortData = pipeArg.sortData[pipeArg.markDupOrder % pipeArg.SORTBUFNUM]; - mdData.taskSeq = pipeArg.markDupOrder; - mdData.fragDupIdx.clear(); - mdData.pairDupIdx.clear(); - mdData.pairOpticalDupIdx.clear(); - mdData.pairRepIdx.clear(); - - auto tmpPtr = mdData.dataPtr; - mdData.dataPtr = sortData.dataPtr; - sortData.dataPtr = tmpPtr; - - SortMarkData &smd = *(SortMarkData *)mdData.dataPtr; - // 先处理 pair - PROF_START(markdup_pair); - processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx); - PROF_END(gprof[GP_markdup_pair], markdup_pair); - // 再处理frag - PROF_START(markdup_frag); - processFrags(smd.frags, &mdData.fragDupIdx); - PROF_END(gprof[GP_markdup_frag], markdup_frag); -} - -template -static void refreshDupIdx(T &srcArr, T &insArr, T &delArr) { - for (auto dup : srcArr) { - insArr.insert(dup); - delArr.erase(dup); - } -} -template -static void refreshNotDupIdx(T1 &srcArr, T2 &delArr1, T2 &delArr2) { - for (auto dup : srcArr) { - delArr1.erase(dup); - delArr2.erase(dup); - } -} - -static void refreshMarkDupData(DPSet &dupIdx, MDSet &opticalDupIdx, DPSet &repIdx, - MDSet ¬DupIdx, MDSet ¬OpticalDupIdx, MDSet ¬RepIdx, - MarkDupData &lp, MarkDupData &p) { - refreshDupIdx(dupIdx, lp.pairDupIdx, p.pairDupIdx); - refreshDupIdx(opticalDupIdx, lp.pairOpticalDupIdx, p.pairOpticalDupIdx); - refreshDupIdx(repIdx, lp.pairRepIdx, p.pairRepIdx); - - refreshNotDupIdx(notDupIdx, lp.pairDupIdx, p.pairDupIdx); - refreshNotDupIdx(notOpticalDupIdx, lp.pairOpticalDupIdx, p.pairOpticalDupIdx); - refreshNotDupIdx(notRepIdx, lp.pairRepIdx, p.pairRepIdx); -} - -// for step-5 sort -static void doIntersect(PipelineArg &pipeArg) { - // return; - IntersectData &g = pipeArg.intersectData; - MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM]; - MarkDupData &p = pipeArg.markDupData[pipeArg.intersectOrder % pipeArg.MARKBUFNUM]; - SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr; - SortMarkData &pSM = *(SortMarkData *)p.dataPtr; - - vector reArr; - DPSet dupIdx; - MDSet opticalDupIdx; - DPSet repIdx; - MDSet notOpticalDupIdx; - MDSet notDupIdx; - MDSet notRepIdx; - - // 先处理重叠的frags - getIntersectData(lpSM.frags, pSM.frags, &reArr); - processFrags(reArr, &dupIdx, ¬DupIdx); - refreshDupIdx(dupIdx, lp.fragDupIdx, p.fragDupIdx); - refreshNotDupIdx(notDupIdx, lp.fragDupIdx, p.fragDupIdx); - - // 再处理重叠的pairs - reArr.clear(); - dupIdx.clear(); - notDupIdx.clear(); - getIntersectData(lpSM.pairs, pSM.pairs, &reArr, true); - - processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, ¬DupIdx, ¬OpticalDupIdx, ¬RepIdx); - refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, lp, p); - - // 处理之前未匹配的部分 - map recalcPos; - CalcSet alreadyAdd; // 与该位点相同的pair都添加到数组里了 - MDSet addToGlobal; - int64_t prevLastPos = 0, nextFirstPos = 0; - if (lpSM.frags.size() > 0) - prevLastPos = lpSM.frags.back().posKey; - if (pSM.frags.size() > 0) - nextFirstPos = pSM.frags[0].posKey; - - for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read - auto &readName = prevUnpair.first; - auto &prevPosInfo = prevUnpair.second; - auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end - if (pSM.unpairedDic.find(readName) != pSM.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read - auto &nextPosInfo = pSM.unpairedDic[readName]; - auto &nextFragEnd = nextPosInfo.unpairedRE; - int64_t prevPosKey = prevFragEnd.posKey; - modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下,poskey可能是后面的read - int64_t nextPosKey = max(prevPosKey, nextFragEnd.posKey); - CalcKey ck = {prevPosKey, nextPosKey}; - UnpairedPosInfo *prevUnpairInfoP = nullptr; - UnpairedPosInfo *nextUnpairInfoP = nullptr; - - if (lpSM.unpairedPosArr.find(prevPosKey) != lpSM.unpairedPosArr.end()) - prevUnpairInfoP = &lpSM.unpairedPosArr[prevPosKey]; - if (pSM.unpairedPosArr.find(prevPosKey) != pSM.unpairedPosArr.end()) - nextUnpairInfoP = &pSM.unpairedPosArr[prevPosKey]; - // pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定 - // 1. - // prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据; - // 2. - // prevpos在交叉部分之前,nextpos在交叉部分,需要获取lp中的相等read pair进行重新计算 - // 复杂情况1. g中包含prevPosKey对应的unpair,p中有对应的pair,此时应该把这些pair考虑进去 - // 3. - // prevpos在交叉部分,nextpos在交叉部分之后,需要获取p中的相等read pair进行重新计算 - // 复杂情况2. p中是否包含prevPosKey对应的unpair - // 4. - // prevpos在交叉部分,nextpos在交叉部分,需要获取lp和p中的相等read pair进行重新计算 - - bool addDataToPos = true; - if (alreadyAdd.find(ck) != alreadyAdd.end()) { - // 之前已经添加过了,后面就不用再添加数据了,因为同一个位置可能找到两个及以上的unpair数据,处理之前的数据时候可能已经添加了这些数据 - addDataToPos = false; - } else - alreadyAdd.insert(ck); - - if (prevPosKey < nextFirstPos) { // prevpos在交叉部分之前 - auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr - prevPairArr.push_back(prevFragEnd); - if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况 - getEqualRE(prevFragEnd, lpSM.pairs, &prevPairArr); - } - // 第一种情况,第二种情况下都会出现,复杂情况一 - auto gPosInfo = g.unpairedPosArr.find(prevPosKey); - if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的,刚好和该位点一致 - auto &gUnpairInfo = gPosInfo->second; - auto pPosInfo = pSM.unpairedPosArr.find(nextPosKey); - if (pPosInfo != pSM.unpairedPosArr.end()) { - auto &pUnpairInfo = pPosInfo->second; - for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname,看是否有匹配的 - if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) { - auto pe = g.unpairedDic[rn].unpairedRE; - auto fe = pSM.unpairedDic[rn].unpairedRE; - modifyPairedEnds(fe, &pe); - prevPairArr.push_back(pe); - g.unpairedDic.erase(rn); - pSM.unpairedDic.erase(rn); - } - } - } - } - recalcPos[ck] = prevPosInfo.taskSeq; - std::sort(prevPairArr.begin(), prevPairArr.end()); - } else { // prevpos在交叉部分 - if (nextPosKey > prevLastPos) { // nextpos在交叉部分之后 第三种情况 - if (nextUnpairInfoP != nullptr) { // 且在pos点,next task有unpair,这样才把这些数据放到next task里 - auto &nextPairArr = nextUnpairInfoP->pairArr; - nextPairArr.push_back(prevFragEnd); - auto &prevPairArr = prevUnpairInfoP->pairArr; - prevPairArr.push_back(prevFragEnd); - if (addDataToPos) { - getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr); - getEqualRE(prevFragEnd, pSM.pairs, &nextPairArr); - } - // 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除) - recalcPos[ck] = nextPosInfo.taskSeq; - - std::sort(prevPairArr.begin(), prevPairArr.end()); - std::sort(nextPairArr.begin(), nextPairArr.end()); - } else { // next task在该位点没有unpair,那就把数据放到prev task里 - auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr - prevPairArr.push_back(prevFragEnd); - if (addDataToPos) // 第二种情况 - getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr); - recalcPos[ck] = prevPosInfo.taskSeq; - std::sort(prevPairArr.begin(), prevPairArr.end()); - } - } else { // 第四种情况 - if (prevUnpairInfoP == nullptr) { - prevUnpairInfoP = &lpSM.unpairedPosArr[prevPosKey]; - prevUnpairInfoP->taskSeq = lp.taskSeq; - } - auto &prevPairArr = prevUnpairInfoP->pairArr; - prevPairArr.push_back(prevFragEnd); - if (addDataToPos) { - getEqualRE(prevFragEnd, lpSM.pairs, &prevPairArr); - getEqualRE(prevFragEnd, pSM.pairs, &prevPairArr); - } - recalcPos[ck] = prevPosInfo.taskSeq; - std::sort(prevPairArr.begin(), prevPairArr.end()); - } - } - pSM.unpairedDic.erase(readName); // 在next task里删除该read - } else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read - auto &remainPosInfo = g.unpairedDic[readName]; - auto remainFragEnd = remainPosInfo.unpairedRE; - int64_t remainPosKey = remainFragEnd.posKey; - modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read - auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; - auto &remainPairArr = remainUnpairInfo.pairArr; - remainPairArr.push_back(remainFragEnd); - CalcKey ck = {remainPosKey, prevFragEnd.posKey}; - recalcPos[ck] = remainPosInfo.taskSeq; - std::sort(remainPairArr.begin(), remainPairArr.end()); - - g.unpairedDic.erase(readName); - } else { // 都没找到,那就保存到遗留数据里 - int64_t prevPosKey = prevFragEnd.posKey; - g.unpairedDic.insert(prevUnpair); - addToGlobal.insert(prevPosKey); - } - } - map taskChanged; - MDSet posProcessed; - for (auto &e : recalcPos) { - auto posKey = e.first.read1Pos; - if (posProcessed.find(posKey) != posProcessed.end()) - continue; - posProcessed.insert(posKey); - auto taskSeq = e.second; - auto &t = taskChanged[taskSeq]; - // 在对应的任务包含的dup idx里修改结果数据 - vector *pairArrP = nullptr; - if (taskSeq < lp.taskSeq) - pairArrP = &g.unpairedPosArr[posKey].pairArr; - else - pairArrP = &lpSM.unpairedPosArr[posKey].pairArr; - processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, - &t.notRepIdx); - if (taskSeq < lp.taskSeq) - g.unpairedPosArr.erase(posKey); - } - // 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据 - // 放在这里,因为lp中的unpairedPosArr中的readends可能会被修改(比如optical duplicate) -#if 0 - for (auto posKey : addToGlobal) { - g.unpairedPosArr[posKey] = lpSM.unpairedPosArr[posKey]; - } -#endif - /* 不需要把p中找不到lp的unpair,放到global中,否则最后找到pair后,还要再执行一次冗余检测,造成重复的冗余索引 */ - - // 更新结果 - for (auto &e : taskChanged) { - auto taskSeq = e.first; - auto &t = e.second; - if (taskSeq < lp.taskSeq) { - refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, - g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq], - g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq], - g.latterNotRepIdxArr[taskSeq]); - } else if (taskSeq == lp.taskSeq) { - refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp, - p); - } else { - refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, p, - lp); // 把结果放到p中 - } - } - - // 将dupidx放进全局数据 - g.latterDupIdxArr.push_back(DPSet()); - g.latterOpticalDupIdxArr.push_back(MDSet()); - g.latterRepIdxArr.push_back(DPSet()); - g.latterNotDupIdxArr.push_back(MDSet()); - g.latterNotOpticalDupIdxArr.push_back(MDSet()); - g.latterNotRepIdxArr.push_back(MDSet()); - - g.dupIdxArr.push_back(vector()); - auto &vIdx = g.dupIdxArr.back(); - lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); - vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); - std::sort(vIdx.begin(), vIdx.end()); - - g.opticalDupIdxArr.push_back(vector()); - auto &vOpticalIdx = g.opticalDupIdxArr.back(); - vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); - std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); - - g.repIdxArr.push_back(vector()); - auto &vRepIdx = g.repIdxArr.back(); - vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); - std::sort(vRepIdx.begin(), vRepIdx.end()); -} - -static void *pipeRead(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - BamBufType inBamBuf(nsgv::gMdArg.DUPLEX_IO); - inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM); - int64_t readNumSum = 0; - while (1) { - PROF_START(read_wait); - yarn::POSSESS(pipeArg.readSig); - yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 1); // 只有一个坑,因为在bambuf内部支持异步读取 - PROF_END(gprof[GP_read_wait], read_wait); - size_t readNum = 0; - PROF_START(read); - if (inBamBuf.ReadStat() >= 0) - readNum = inBamBuf.ReadBam(); // 读入新一轮的数据 - PROF_END(gprof[GP_read], read); - if (readNum < 1) { - pipeArg.readFinish = 1; - yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 - break; - } - spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder); - - pipeArg.readData.bams = inBamBuf.GetBamArr(); - - pipeArg.readData.bamStartIdx = readNumSum; - pipeArg.readData.taskSeq = pipeArg.readOrder; - - readNumSum += readNum; - inBamBuf.ClearAll(); // 清理上一轮读入的数据 - pipeArg.readOrder += 1; // for next - yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据 - } - spdlog::info("total reads processed {}", readNumSum); - return 0; -} -static void *pipeGenRE(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - auto &genREData = pipeArg.genREData; - // init generate read ends data by num_thread - int genREThread = pipeArg.numThread; - // / 4; - for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { - genREData[i].Init(genREThread); - } - - while (1) { - PROF_START(gen_wait); - yarn::POSSESS(pipeArg.readSig); - yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 0); // 等待有数据 - yarn::POSSESS(pipeArg.genRESig); - PROF_END(gprof[GP_gen_wait], gen_wait); - - yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, pipeArg.GENBUFNUM); // 有BUFNUM个坑 - yarn::RELEASE(pipeArg.genRESig); // 因为有不止一个位置,所以要释放 - - if (pipeArg.readFinish) { - yarn::POSSESS(pipeArg.genRESig); - pipeArg.genREFinish = 1; - yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); - yarn::TWIST(pipeArg.readSig, yarn::BY, -1); - break; - } - /* 处理bam,生成readends */ - PROF_START(gen); - doGenRE(pipeArg); - // usleep(200000); - PROF_END(gprof[GP_gen], gen); - - yarn::POSSESS(pipeArg.genRESig); - pipeArg.genREOrder += 1; - yarn::TWIST(pipeArg.genRESig, yarn::BY, 1); - yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // 使用了一次读入的数据 - } - return 0; -} -static void *pipeSort(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - - while (1) { - PROF_START(sort_wait); - yarn::POSSESS(pipeArg.genRESig); - yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, 0); // 等待有数据 - yarn::RELEASE(pipeArg.genRESig); - PROF_END(gprof[GP_sort_wait], sort_wait); - - yarn::POSSESS(pipeArg.sortSig); - yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // 有BUFNUM个位置 - yarn::RELEASE(pipeArg.sortSig); - - if (pipeArg.genREFinish) { - // 处理完剩余数据 - while (pipeArg.sortOrder < pipeArg.genREOrder) { - PROF_START(sort); - doSort(pipeArg); - PROF_END(gprof[GP_sort], sort); - pipeArg.sortOrder += 1; - } - yarn::POSSESS(pipeArg.sortSig); - pipeArg.sortFinish = 1; - yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); - break; - } - /* 排序 readends */ - PROF_START(sort); - doSort(pipeArg); - PROF_END(gprof[GP_sort], sort); - - yarn::POSSESS(pipeArg.genRESig); - yarn::TWIST(pipeArg.genRESig, yarn::BY, -1); - - yarn::POSSESS(pipeArg.sortSig); - pipeArg.sortOrder += 1; - yarn::TWIST(pipeArg.sortSig, yarn::BY, 1); - } - return 0; -} -static void *pipeMarkDup(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - - while (1) { - PROF_START(markdup_wait); - yarn::POSSESS(pipeArg.sortSig); - yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, 0); // 等待有数据 - yarn::RELEASE(pipeArg.sortSig); - PROF_END(gprof[GP_markdup_wait], markdup_wait); - - yarn::POSSESS(pipeArg.markDupSig); - yarn::WAIT_FOR(pipeArg.markDupSig, yarn::NOT_TO_BE, pipeArg.MARKBUFNUM); - yarn::RELEASE(pipeArg.markDupSig); - - if (pipeArg.sortFinish) { - // 应该还得处理剩余的数据 - while (pipeArg.markDupOrder < pipeArg.sortOrder) { - PROF_START(markdup); - doMarkDup(pipeArg); - PROF_END(gprof[GP_markdup], markdup); - pipeArg.markDupOrder += 1; - } - yarn::POSSESS(pipeArg.markDupSig); - pipeArg.markDupFinish = 1; - yarn::TWIST(pipeArg.markDupSig, yarn::BY, 2); - break; - } - /* 冗余检测 readends */ - PROF_START(markdup); - doMarkDup(pipeArg); - PROF_END(gprof[GP_markdup], markdup); - yarn::POSSESS(pipeArg.sortSig); - yarn::TWIST(pipeArg.sortSig, yarn::BY, -1); - - yarn::POSSESS(pipeArg.markDupSig); - pipeArg.markDupOrder += 1; - yarn::TWIST(pipeArg.markDupSig, yarn::BY, 1); - } - return 0; -} -static void *pipeIntersect(void *data) { - PipelineArg &pipeArg = *(PipelineArg *)data; - pipeArg.intersectOrder = 1; - while (1) { - PROF_START(intersect_wait); - yarn::POSSESS(pipeArg.markDupSig); - yarn::WAIT_FOR(pipeArg.markDupSig, yarn::TO_BE_MORE_THAN, 1); // 等待有数据 - yarn::RELEASE(pipeArg.markDupSig); - PROF_END(gprof[GP_intersect_wait], intersect_wait); - - if (pipeArg.markDupFinish) { - while (pipeArg.intersectOrder < pipeArg.markDupOrder) { - PROF_START(intersect); - doIntersect(pipeArg); - PROF_END(gprof[GP_intersect], intersect); - pipeArg.intersectOrder += 1; - } - break; - } - /* 交叉数据处理 readends */ - PROF_START(intersect); - doIntersect(pipeArg); - PROF_END(gprof[GP_intersect], intersect); - - yarn::POSSESS(pipeArg.markDupSig); - yarn::TWIST(pipeArg.markDupSig, yarn::BY, -1); - - pipeArg.intersectOrder += 1; - } - return 0; -} - -/* 当所有任务结束后,global data里还有未处理的数据 */ -static void mergeAllTask(PipelineArg &pipeArg) { - MarkDupData &lp = pipeArg.markDupData[(pipeArg.markDupOrder - 1) % pipeArg.MARKBUFNUM]; - IntersectData &g = pipeArg.intersectData; - SortMarkData &lpSM = *(SortMarkData *)lp.dataPtr; - // 遗留的未匹配的pair - PROF_START(merge_match); - for (auto &prevUnpair : lpSM.unpairedDic) { // 遍历上一个任务中的每个未匹配的read - auto &readName = prevUnpair.first; - auto &prevPosInfo = prevUnpair.second; - auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end - - if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read - auto &remainPosInfo = g.unpairedDic[readName]; - auto remainFragEnd = remainPosInfo.unpairedRE; - int64_t remainPosKey = remainFragEnd.posKey; - modifyPairedEnds(prevFragEnd, &remainFragEnd); // 在某些clip情况下,poskey可能是后面的read - auto &remainUnpairInfo = g.unpairedPosArr[remainPosKey]; - - remainUnpairInfo.pairArr.push_back(remainFragEnd); - g.unpairedDic.erase(readName); - } else { - g.unpairedDic.insert(prevUnpair); // 用来记录没有匹配的read个数 - } - } - PROF_END(gprof[GP_merge_match], merge_match); - - PROF_START(merge_markdup); - map taskChanged; - for (auto &e : g.unpairedPosArr) { - auto posKey = e.first; - auto taskSeq = e.second.taskSeq; - auto &t = taskChanged[taskSeq]; - auto &arr = g.unpairedPosArr[posKey].pairArr; - - if (arr.size() > 1) { - std::sort(arr.begin(), arr.end()); - processPairs(arr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx); - } - } - // 更新结果 - vector addDup; - map ndPosVal; - for (auto &e : taskChanged) { - auto taskSeq = e.first; - auto &t = e.second; - refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, - g.latterDupIdxArr[taskSeq], g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq], - g.latterNotDupIdxArr[taskSeq], g.latterNotOpticalDupIdxArr[taskSeq], - g.latterNotRepIdxArr[taskSeq]); - } - g.unpairedPosArr.clear(); - PROF_END(gprof[GP_merge_markdup], merge_markdup); - -// // 将dupidx放进全局数据 - PROF_START(merge_update); - vector cacheDupIdx; - vector midArr; - vector intCacheDupIdx; - vector intMidArr; - for (int i = 0; i < (int)g.dupIdxArr.size() - 1; ++i) { - refeshFinalTaskDupInfo(g.latterDupIdxArr[i], g.latterNotDupIdxArr[i], g.dupIdxArr[i], cacheDupIdx, midArr); - } - for (int i = 0; i < (int)g.opticalDupIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterOpticalDupIdxArr[i], g.latterNotOpticalDupIdxArr[i], g.opticalDupIdxArr[i], - intCacheDupIdx, intMidArr); - for (int i = 0; i < (int)g.repIdxArr.size() - 1; ++i) - refeshFinalTaskDupInfo(g.latterRepIdxArr[i], g.latterNotRepIdxArr[i], g.repIdxArr[i], cacheDupIdx, midArr); - PROF_END(gprof[GP_merge_update], merge_update); - - PROF_START(merge_add); - g.dupIdxArr.push_back(vector()); - auto &vIdx = g.dupIdxArr.back(); - lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end()); - vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end()); - std::sort(vIdx.begin(), vIdx.end()); - - g.opticalDupIdxArr.push_back(vector()); - auto &vOpticalIdx = g.opticalDupIdxArr.back(); - vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end()); - std::sort(vOpticalIdx.begin(), vOpticalIdx.end()); - - g.repIdxArr.push_back(vector()); - auto &vRepIdx = g.repIdxArr.back(); - vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end()); - std::sort(vRepIdx.begin(), vRepIdx.end()); - PROF_END(gprof[GP_merge_add], merge_add); -} - -static void parallelPipeline() { - PipelineArg pipeArg(&nsgv::gDupRes); - //PipelineArg &pipeArg = nsgv::gPipe; - pipeArg.numThread = nsgv::gMdArg.NUM_THREADS; - - pthread_t tidArr[5]; - pthread_create(&tidArr[0], 0, pipeRead, &pipeArg); - pthread_create(&tidArr[1], 0, pipeGenRE, &pipeArg); - pthread_create(&tidArr[2], 0, pipeSort, &pipeArg); - pthread_create(&tidArr[3], 0, pipeMarkDup, &pipeArg); - pthread_create(&tidArr[4], 0, pipeIntersect, &pipeArg); - for (int i = 0; i < 5; ++i) pthread_join(tidArr[i], 0); - PROF_START(merge_result); - mergeAllTask(pipeArg); - PROF_END(gprof[GP_merge_result], merge_result); - - spdlog::info("pipeArg size : {}", pipeArg.byteSize()); - - size_t repNum = 0; - for (auto &v : pipeArg.intersectData.repIdxArr) repNum += v.size(); - spdlog::info("rep num : {}", repNum); - - spdlog::info("result size : {}", nsgv::gDupRes.byteSize()); -} - -/* 并行流水线方式处理数据,标记冗余 */ -void pipelineMarkDups() { - if (nsgv::gMdArg.NUM_THREADS > 1) - return parallelPipeline(); - - PipelineArg pipeArg(&nsgv::gDupRes); - pipeArg.numThread = nsgv::gMdArg.NUM_THREADS; - BamBufType inBamBuf(nsgv::gMdArg.DUPLEX_IO); - inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM); - int64_t readNumSum = 0; - for (int i = 0; i < pipeArg.GENBUFNUM; ++i) { - pipeArg.genREData[i].Init(pipeArg.numThread); - } - pipeArg.intersectOrder = 1; // do intersect 从1开始 - while (1) { - size_t readNum = 0; - if (inBamBuf.ReadStat() >= 0) - readNum = inBamBuf.ReadBam(); // 读入新一轮的数据 - if (readNum < 1) { - break; - } - spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder); - - pipeArg.readData.bams = inBamBuf.GetBamArr(); - pipeArg.readData.bamStartIdx = readNumSum; - pipeArg.readData.taskSeq = pipeArg.readOrder; - // 1. do generate read ends - doGenRE(pipeArg); - pipeArg.genREOrder += 1; - // 2. do sort - doSort(pipeArg); - pipeArg.sortOrder += 1; - // 3. do markduplicate - doMarkDup(pipeArg); - pipeArg.markDupOrder += 1; - // 4. do intersect data - if (pipeArg.markDupOrder > 1) { - doIntersect(pipeArg); - pipeArg.intersectOrder += 1; - } - - readNumSum += readNum; - inBamBuf.ClearAll(); // 清理上一轮读入的数据 - pipeArg.readOrder += 1; // for next - } - mergeAllTask(pipeArg); -} \ No newline at end of file diff --git a/src/markdup/pipeline_md.h b/src/markdup/pipeline_md.h deleted file mode 100644 index f5700d3..0000000 --- a/src/markdup/pipeline_md.h +++ /dev/null @@ -1,271 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "md_types.h" - -struct ReadData { - vector bams; // read step output - int64_t bamStartIdx = 0; // 每轮读入的bam数组,起始位置在全局bam中的索引 - int64_t taskSeq = 0; // 任务序号 -}; - -struct GenREData { - vector> pairsArr; // 成对的reads - vector> fragsArr; // 暂未找到配对的reads - vector unpairedDicArr; // 用来寻找pair end - void Init(int nThread) { - for (int i = 0; i <= nThread; ++i) { // 比线程多一个,主要是pairs多一个,其他没用 - pairsArr.push_back(vector()); - fragsArr.push_back(vector()); - unpairedDicArr.push_back(UnpairedNameMap()); - } - } - UnpairedNameMap unpairedDic; // 代替sort step中一部分计算 - UnpairedPositionMap unpairedPosArr; // - size_t byteSize() { - size_t bytes = 0; - for (auto &v : pairsArr) for (auto &r : v) bytes += sizeof(r); - for (auto &v : fragsArr) for (auto &r : v) bytes += sizeof(r); - for (auto &m : unpairedDicArr) bytes += m.size() * 100; - bytes += unpairedDic.size() * 100; - bytes += unpairedPosArr.size() * 1000; - return bytes; - } -}; - -struct SortMarkData { - vector pairs; // 成对的reads - vector frags; // 暂未找到配对的reads - UnpairedNameMap unpairedDic; // 用来寻找pair end - UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储 - size_t byteSize() { - size_t bytes = 0; - for (auto &r : pairs) bytes += sizeof(r); - for (auto &r : frags) bytes += sizeof(r); - bytes += unpairedDic.size() * 100; - bytes += unpairedPosArr.size() * 1000; - return bytes; - } -}; - -struct SortData { - volatile void *dataPtr; // SortMarkData pointer -}; - -struct MarkDupData { - int64_t taskSeq = 0; // 任务序号 - DPSet pairDupIdx; // pair的冗余read的索引 - MDSet pairOpticalDupIdx; // optical冗余read的索引 - DPSet fragDupIdx; // frag的冗余read的索引 - DPSet pairRepIdx; // pair的dupset代表read的索引 - - volatile void *dataPtr; // SortMarkData pointer - - size_t byteSize() { - size_t bytes = 0; - bytes += pairDupIdx.size() * 100; - bytes += pairOpticalDupIdx.size() * 100; - bytes += fragDupIdx.size() * 100; - bytes += pairRepIdx.size() * 100; - return bytes; - } -}; - -struct DupResult { - vector> dupIdxArr; - vector> opticalDupIdxArr; - vector> repIdxArr; - size_t byteSize() { - size_t bytes = 0; - size_t tmp = 0; - for (auto &v : dupIdxArr) for (auto &r : v) tmp += sizeof(r); - spdlog::info("dupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); - bytes += tmp; - tmp = 0; - for (auto &v : opticalDupIdxArr) - for (auto &r : v) tmp += sizeof(r); - spdlog::info("opticalDupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); - bytes += tmp; - tmp = 0; - for (auto &v : repIdxArr) for (auto &r : v) tmp += sizeof(r); - spdlog::info("repIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024); - bytes += tmp; - spdlog::info("result size : {} GB", bytes / 1024.0 / 1024 / 1024); - - return bytes; - } -}; - -struct IntersectData { - UnpairedNameMap unpairedDic; // 用来寻找pair end - UnpairedPositionMap unpairedPosArr; - - // 每个task对应一个vector - vector> &dupIdxArr; - vector> &opticalDupIdxArr; - vector> &repIdxArr; - - // 用来存放后续计算的数据 - vector> latterDupIdxArr; - vector> latterOpticalDupIdxArr; - vector> latterRepIdxArr; - vector> latterNotDupIdxArr; - vector> latterNotOpticalDupIdxArr; - vector> latterNotRepIdxArr; - - IntersectData(DupResult *resPtr) : - dupIdxArr(resPtr->dupIdxArr), - opticalDupIdxArr(resPtr->opticalDupIdxArr), - repIdxArr(resPtr->repIdxArr) - {} - - size_t byteSize() { - size_t bytes = 0; - bytes += unpairedDic.size() * 100; - bytes += unpairedPosArr.size() * 1000; - for (auto &v : dupIdxArr) for (auto &r : v) bytes += sizeof(r); - for (auto &v : opticalDupIdxArr) for (auto &r : v) bytes += sizeof(r); - for (auto &v : repIdxArr) for (auto &r : v) bytes += sizeof(r); - spdlog::info("result size : {}", bytes); - - for (auto &s : latterDupIdxArr) bytes += s.size() * sizeof(DupInfo); - for (auto &s : latterOpticalDupIdxArr) bytes += s.size() * sizeof(DupInfo); - for (auto &s : latterRepIdxArr) bytes += s.size() * sizeof(DupInfo); - for (auto &s : latterNotDupIdxArr) bytes += s.size() * sizeof(DupInfo); - for (auto &s : latterNotOpticalDupIdxArr) bytes += s.size() * sizeof(DupInfo); - for (auto &s : latterNotRepIdxArr) bytes += s.size() * sizeof(DupInfo); - - return bytes; - } -}; - -// 记录流水线状态,task的序号,以及某阶段是否结束 -struct PipelineArg { - static const int GENBUFNUM = 2; - static const int SORTBUFNUM = 2; - static const int MARKBUFNUM = 3; - uint64_t readOrder = 0; - uint64_t genREOrder = 0; - uint64_t sortOrder = 0; - uint64_t markDupOrder = 0; - uint64_t intersectOrder = 0; - int numThread = 0; - - volatile int readFinish = 0; - volatile int genREFinish = 0; - volatile int sortFinish = 0; - volatile int markDupFinish = 0; - - yarn::lock_t *readSig; - yarn::lock_t *genRESig; - yarn::lock_t *sortSig; - yarn::lock_t *markDupSig; - - PipelineArg(DupResult *resPtr) : intersectData(resPtr) { - readSig = yarn::NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了,对调用透明 - genRESig = yarn::NEW_LOCK(0); // 最大值2, 双buffer - sortSig = yarn::NEW_LOCK(0); - markDupSig = yarn::NEW_LOCK(0); - for (int i = 0; i < SORTBUFNUM; ++i) { - sortData[i].dataPtr = &sortMarkData[i]; - } - for (int i = 0; i < MARKBUFNUM; ++i) { - markDupData[i].dataPtr = &sortMarkData[i + SORTBUFNUM]; - } - } - - SortMarkData sortMarkData[SORTBUFNUM + MARKBUFNUM]; - - // for step-1 read - ReadData readData; - // for step-2 generate readends - GenREData genREData[GENBUFNUM]; - // for step-3 sort each thread frags and pairs - SortData sortData[SORTBUFNUM]; - // for step-4 mark duplicate - MarkDupData markDupData[MARKBUFNUM]; - // for step-5 deal with intersect data - IntersectData intersectData; - - size_t byteSize() { - size_t bytes = 0; - - size_t tmp = 0; - for (int i = 0; i < SORTBUFNUM + MARKBUFNUM; ++i) tmp += sortMarkData[i].byteSize(); - bytes += tmp; - spdlog::info("sortMarkData size : {}", tmp); - for (int i = 0; i < GENBUFNUM; ++i) tmp += genREData[i].byteSize(); - bytes += tmp; - spdlog::info("genREData size : {}", tmp); - for (int i = 0; i < MARKBUFNUM; ++i) tmp += markDupData[i].byteSize(); - bytes += tmp; - spdlog::info("markDupData size : {}", tmp); - tmp += intersectData.byteSize(); - bytes += tmp; - spdlog::info("intersectData size : {}", tmp); - - return bytes; - } -}; - -struct REArrIdIdx { - int arrId = 0; // 数组索引 - uint64_t arrIdx = 0; // 数组内部当前索引 - const ReadEnds *pE = nullptr; -}; - -struct REGreaterThan { - bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return *b.pE < *a.pE; } -}; - -struct ReadEndsHeap { - // 将冗余索引和他对应的task vector对应起来 - vector> *arr2d; - priority_queue, REGreaterThan> minHeap; - uint64_t popNum = 0; - - int Init(vector> *_arr2d) { - arr2d = _arr2d; - if (arr2d == nullptr) { - return -1; - } - for (int i = 0; i < arr2d->size(); ++i) { - auto &v = (*arr2d)[i]; - if (!v.empty()) { - minHeap.push({i, 1, &v[0]}); - } - } - return 0; - } - - const ReadEnds *Pop() { - const ReadEnds *ret = nullptr; - if (!minHeap.empty()) { - auto minVal = minHeap.top(); - minHeap.pop(); - ++popNum; - ret = minVal.pE; - auto &v = (*arr2d)[minVal.arrId]; - if (v.size() > minVal.arrIdx) { - minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v[minVal.arrIdx]}); - } - } - return ret; - } - - uint64_t Size() { - uint64_t len = 0; - if (arr2d != nullptr) { - for (auto &v : *arr2d) { - len += v.size(); - } - } - return len - popNum; - } -}; - -// 并行运行mark duplicate -void pipelineMarkDups(); \ No newline at end of file diff --git a/src/markdup/read_ends.h b/src/markdup/read_ends.h index cb6773a..17045ce 100644 --- a/src/markdup/read_ends.h +++ b/src/markdup/read_ends.h @@ -134,8 +134,20 @@ struct ReadEnds : PhysicalLocation { return comp < 0; } - // 找某一个位置的所有readend时需要 - static bool PairsLittleThan(const ReadEnds &lhs, const ReadEnds &rhs) { return ReadLittleThan(lhs, rhs, true); } + // 找某一个位置的所有readend时需要, 只对比位点,不对比orientation + static bool PairsLittleThan(const ReadEnds &a, const ReadEnds &b) { + int comp = a.read1ReferenceIndex - b.read1ReferenceIndex; + if (comp == 0) + comp = a.read1Coordinate - b.read1Coordinate; + // 需要orientation,因为要跟排序的比较方式和顺序一致 + if (comp == 0) + comp = a.orientation - b.orientation; + if (comp == 0) + comp = a.read2ReferenceIndex - b.read2ReferenceIndex; + if (comp == 0) + comp = a.read2Coordinate - b.read2Coordinate; + return comp < 0; + } // 比较函数 bool operator<(const ReadEnds &o) const {