diff --git a/out.bam b/out.bam new file mode 100644 index 0000000..e69de29 diff --git a/run.sh b/run.sh index 46d89c0..475ad64 100755 --- a/run.sh +++ b/run.sh @@ -1,5 +1,5 @@ -input=~/data/bam/zy_normal.bam -#input=~/data/bam/zy_tumor.bam +#input=~/data/bam/zy_normal.bam +input=~/data/bam/zy_tumor.bam #input=~/data/bam/100w.bam time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \ diff --git a/src/sam/markdups/md_funcs.cpp b/src/sam/markdups/md_funcs.cpp index eb0eb99..91a5de4 100644 --- a/src/sam/markdups/md_funcs.cpp +++ b/src/sam/markdups/md_funcs.cpp @@ -28,6 +28,8 @@ using std::set; using std::unordered_map; using std::vector; +static int zzhtestnum = 0; + /* 清除key位置的数据 */ void clearIdxAtPos(int64_t key, map> *pmsIdx) { auto &msIdx = *pmsIdx; @@ -300,6 +302,10 @@ static inline bool closeEnough(const ReadEnds *lhs, const ReadEnds *rhs, const i abs(lhs->x - rhs->x) <= distance && abs(lhs->y - rhs->y) <= distance; } +static inline bool closeEnoughShort(const ReadEnds *lhs, const ReadEnds *rhs, const int distance) { + return lhs != rhs && abs(lhs->x - rhs->x) <= distance && abs(lhs->y - rhs->y) <= distance; +} + /** * Finds which reads within the list of duplicates that are likely to be optical/co-localized duplicates of * one another. Within each cluster of optical duplicates that is found, one read remains un-flagged for @@ -319,7 +325,7 @@ static void findOpticalDuplicates(vector &readEndsArr, const R const int DEFAULT_MAX_DUPLICATE_SET_SIZE = 300000; vector &opticalDuplicateFlags = *pOpticalDuplicateFlags; - opticalDuplicateFlags.push_back(true); + // opticalDuplicateFlags.push_back(true); // for test int len = readEndsArr.size(); // If there is only one or zero reads passed in (so there are obviously no optical duplicates), // or if there are too many reads (so we don't want to try to run this expensive n^2 algorithm), @@ -347,8 +353,61 @@ static void findOpticalDuplicates(vector &readEndsArr, const R int key = currentLoc->tile; // 只处理一个样本,所以只有一个read group tileRGmap[key].push_back(i); } + opticalDistanceRelationGraph.addNode(i); + } + // Since because finding adjacent optical duplicates is an O(n^2) operation, we can subdivide the input into its + // readgroups in order to reduce the amount of redundant checks across readgroups between reads. + // fillGraphFromAGroup + for (auto &entry : tileRGmap) { + auto &groupList = entry.second; + if (groupList.size() > 1) { + for (int i = 0; i < groupList.size(); ++i) { + int iIndex = groupList[i]; + const ReadEnds *currentLoc = readEndsArr[iIndex]; + for (int j = i + 1; j < groupList.size(); ++j) { + int jIndex = groupList[j]; + const ReadEnds *other = readEndsArr[jIndex]; + if (closeEnoughShort(currentLoc, other, DEFAULT_OPTICAL_DUPLICATE_DISTANCE)) + opticalDistanceRelationGraph.addEdge(iIndex, jIndex); + } + } + } + } + // Keep a map of the reads and their cluster assignments + unordered_map opticalDuplicateClusterMap; + opticalDistanceRelationGraph.cluster(&opticalDuplicateClusterMap); + unordered_map clusterToRepresentativeRead; + int keeperCluster = -1; + // Specially mark the keeper as specifically not a duplicate if it exists + if (keeperIndex >= 0) { + keeperCluster = opticalDuplicateClusterMap[keeperIndex]; + clusterToRepresentativeRead[keeperCluster] = keeperIndex; } + for (auto &entry: opticalDuplicateClusterMap) { + int recordIndex = entry.first; + int recordAssignedCluster = entry.second; + // If its not the first read we've seen for this cluster, mark it as an optical duplicate + auto repReadItr = clusterToRepresentativeRead.find(recordAssignedCluster); + if (repReadItr != clusterToRepresentativeRead.end() && recordIndex != keeperIndex) { + const ReadEnds *representativeLoc = readEndsArr[repReadItr->second]; + const ReadEnds *currentRecordLoc = readEndsArr[recordIndex]; + // If not in the keeper cluster, then keep the minX -> minY valued duplicate (note the tile must be + // equal for reads to cluster together) + if (!(keeperIndex >= 0 && recordAssignedCluster == keeperCluster) && + (currentRecordLoc->x < representativeLoc->x || + (currentRecordLoc->x == representativeLoc->x && currentRecordLoc->y < representativeLoc->y))) { + // Mark the old min as an optical duplicate, and save the new min + opticalDuplicateFlags[repReadItr->second] = true; + clusterToRepresentativeRead[recordAssignedCluster] = recordIndex; + } else { + // If a smaller read has already been visited, mark the test read as an optical duplicate + opticalDuplicateFlags[recordIndex] = true; + } + } else { + clusterToRepresentativeRead[recordAssignedCluster] = recordIndex; + } + } } else { /** * Compute optical duplicates quickly in the standard case where we know that there won't be any transitive @@ -453,4 +512,6 @@ void trackOpticalDuplicates(vector &readEndsArr, const ReadEnd gMetrics.NonOpticalDuplicateCountHist += readEndsArr.size() - nOpticalDup; if (nOpticalDup) gMetrics.OpticalDuplicatesCountHist += nOpticalDup + 1; + + // cout << "zzh optical:" << (++zzhtestnum) << "\t" << readEndsArr.size() << "\t" << nOpticalDup << endl; } \ No newline at end of file diff --git a/src/sam/markdups/md_funcs.h b/src/sam/markdups/md_funcs.h index b723f65..054c3a6 100644 --- a/src/sam/markdups/md_funcs.h +++ b/src/sam/markdups/md_funcs.h @@ -138,16 +138,75 @@ struct DupIdxQueue { */ template struct Graph { // 用set? - unordered_set nodes; // 图中的结点 - unordered_map> neighbors; // 邻接列表 + vector nodes; // 图中的结点 + unordered_map nodeIdxMap; + unordered_map> neighbors; // 邻接列表 - Node *addNode(const Node &singleton) { - if (nodes.find(singleton) == nodes.end()) { - Node *n = const_cast(&(*nodes.insert(singleton).first)); - neighbors[n].clear(); - return n; + int addNode(const Node &singleton) { + int idx = -1; + if (nodeIdxMap.find(singleton) == nodeIdxMap.end()) { + nodes.push_back(singleton); + idx = nodes.size() - 1; + nodeIdxMap[singleton] = idx; + neighbors[idx].clear(); + } else + idx = nodeIdxMap[singleton]; + + return idx; + } + + /* bidirectional and public */ + void addEdge(Node &left, Node &right) { + int leftIndex = addNode(left); + if (left == right) return; + int rightIndex = addNode(right); + addNeighbor(leftIndex, rightIndex); + addNeighbor(rightIndex, leftIndex); + } + + void addNeighbor(int fromNode, int toNode) { + unordered_set &fromNodesNeighbors = neighbors[fromNode]; + if (fromNodesNeighbors.find(toNode) == fromNodesNeighbors.end()) + fromNodesNeighbors.insert(toNode); + } + + /** + * returns the cluster map of connected components + * + * @return Nodes that point to the same integer are in the same cluster. + */ + void cluster(unordered_map *pClusterMap) { + auto &clusterMap = *pClusterMap; + vector vCluster(nodes.size(), 0); + for (int i = 0; i < vCluster.size(); ++i) vCluster[i] = i; + for (int i = 0; i < neighbors.size(); ++i) { + for (auto &j : neighbors[i]) joinNodes(j, i, &vCluster); } - return const_cast(&(*nodes.find(singleton))); + for (auto & node : nodes) { + clusterMap[node] = findRepNode(vCluster, nodeIdxMap[node]); + } + } + + // Part of Union-Find with Path Compression that joins to nodes to be part of the same cluster. + static void joinNodes(int nodeId1, int nodeId2, vector *pGrouping) { + auto &grouping = *pGrouping; + int repNode1 = findRepNode(grouping, nodeId1); + int repNode2 = findRepNode(grouping, nodeId2); + if (repNode1 == repNode2) + return; + grouping[repNode1] = repNode2; + } + + // Part of Union-Find with Path Compression to determine the duplicate set a particular UMI belongs to. + static int findRepNode(vector &grouping, int nodeId) { + int representativeUmi = nodeId; // All UMIs of a duplicate set will have the same reprsentativeUmi. + while (representativeUmi != grouping[representativeUmi]) representativeUmi = grouping[representativeUmi]; + while (nodeId != representativeUmi) { + int newUmiId = grouping[nodeId]; + grouping[nodeId] = representativeUmi; + nodeId = newUmiId; + } + return representativeUmi; } }; diff --git a/src/sam/markdups/serial_md.cpp b/src/sam/markdups/serial_md.cpp index ed1dcc7..b173c0d 100644 --- a/src/sam/markdups/serial_md.cpp +++ b/src/sam/markdups/serial_md.cpp @@ -155,33 +155,27 @@ static void generateReadEnds(SerailMarkDupArg *arg) { p.unpairedPosArr.clear(); /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ - set reSet; + // set reSet; + // ReadEnds lastRe; - ReadEnds lastRe; - - for (int i = 0; i < p.bams.size(); ++i) // 循环处理每个read - { + for (int i = 0; i < p.bams.size(); ++i) { // 循环处理每个read BamWrap *bw = p.bams[i]; const int64_t bamIdx = p.bamStartIdx + i; if (bw->GetReadUnmappedFlag()) { if (bw->b->core.tid == -1) - // When we hit the unmapped reads with no coordinate, no reason - // to continue (only in coordinate sort). + // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). break; - } else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对 - { + } else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对 ReadEnds fragEnd; tm_arr[8].acc_start(); buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); tm_arr[8].acc_end(); p.frags.push_back(fragEnd); // 添加进frag集合 - if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了 - { + if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了 string key = bw->query_name(); if (p.unpairedDic.find(key) == p.unpairedDic.end()) { p.unpairedDic[key] = {p.taskSeq, fragEnd}; - } else // 找到了pairend - { + } else { // 找到了pairend auto &pairedEnds = p.unpairedDic.at(key).unpairedRE; modifyPairedEnds(fragEnd, &pairedEnds); p.pairs.push_back(pairedEnds); @@ -191,8 +185,8 @@ static void generateReadEnds(SerailMarkDupArg *arg) { } } tm_arr[9].acc_start(); - sortReadEndsArr(p.frags); - // sort(p.frags.begin(), p.frags.end()); + //sortReadEndsArr(p.frags); + sort(p.frags.begin(), p.frags.end()); tm_arr[9].acc_end(); // cout << "sort pairs" << endl; tm_arr[10].acc_start(); @@ -283,7 +277,7 @@ static inline void getIntersectData(vector &leftArr, vector while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) { leftSpan += 1; - if (leftSpan > leftEndIdx) { + if (leftSpan > leftEndIdx) { // 上一个的范围被下一个全部包围了(可能会有bug,上上个也与下一个有交集呢?) leftSpan = leftArr.size() - 1; break; } @@ -291,7 +285,7 @@ static inline void getIntersectData(vector &leftArr, vector while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) { rightSpan += 1; - if (rightSpan == rightArr.size() - 1) + if (rightSpan == rightArr.size() - 1) // 同上,可能会有bug break; } dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end()); @@ -775,6 +769,10 @@ void serialMarkDups() { cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl; cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl; cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; + cout << "metrics: " << gMetrics.DuplicateCountHist << "\t" + << gMetrics.NonOpticalDuplicateCountHist << "\t" + << gMetrics.OpticalDuplicatesCountHist << "\t" + << gMetrics.OpticalDuplicatesByLibraryId << endl; Timer::log_time("serial end "); diff --git a/src/sam/utils/read_name_parser.h b/src/sam/utils/read_name_parser.h index a7572a7..45013ce 100644 --- a/src/sam/utils/read_name_parser.h +++ b/src/sam/utils/read_name_parser.h @@ -113,8 +113,7 @@ struct ReadNameParser { bool ReadLocationInformation(const string &readName, PhysicalLocation *loc) { try { - // Optimized version if using the default read name regex (== used - // on purpose): + // Optimized version if using the default read name regex (== used on purpose): if (useOptimizedDefaultParsing) { const int fields = getLastThreeFields(readName, ':'); if (!(fields == 5 || fields == 7)) {