完成了optical duplication检测,接下来处理task之间optical数据冲突的问题

This commit is contained in:
zzh 2024-08-25 00:08:38 +08:00
parent 022be611cd
commit 949f6302da
6 changed files with 147 additions and 30 deletions

0
out.bam 100644
View File

4
run.sh
View File

@ -1,5 +1,5 @@
input=~/data/bam/zy_normal.bam
#input=~/data/bam/zy_tumor.bam
#input=~/data/bam/zy_normal.bam
input=~/data/bam/zy_tumor.bam
#input=~/data/bam/100w.bam
time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \

View File

@ -28,6 +28,8 @@ using std::set;
using std::unordered_map;
using std::vector;
static int zzhtestnum = 0;
/* 清除key位置的数据 */
void clearIdxAtPos(int64_t key, map<int64_t, set<int64_t>> *pmsIdx) {
auto &msIdx = *pmsIdx;
@ -300,6 +302,10 @@ static inline bool closeEnough(const ReadEnds *lhs, const ReadEnds *rhs, const i
abs(lhs->x - rhs->x) <= distance && abs(lhs->y - rhs->y) <= distance;
}
static inline bool closeEnoughShort(const ReadEnds *lhs, const ReadEnds *rhs, const int distance) {
return lhs != rhs && abs(lhs->x - rhs->x) <= distance && abs(lhs->y - rhs->y) <= distance;
}
/**
* Finds which reads within the list of duplicates that are likely to be optical/co-localized duplicates of
* one another. Within each cluster of optical duplicates that is found, one read remains un-flagged for
@ -319,7 +325,7 @@ static void findOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const R
const int DEFAULT_MAX_DUPLICATE_SET_SIZE = 300000;
vector<bool> &opticalDuplicateFlags = *pOpticalDuplicateFlags;
opticalDuplicateFlags.push_back(true);
// opticalDuplicateFlags.push_back(true); // for test
int len = readEndsArr.size();
// If there is only one or zero reads passed in (so there are obviously no optical duplicates),
// or if there are too many reads (so we don't want to try to run this expensive n^2 algorithm),
@ -347,8 +353,61 @@ static void findOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const R
int key = currentLoc->tile; // 只处理一个样本所以只有一个read group
tileRGmap[key].push_back(i);
}
opticalDistanceRelationGraph.addNode(i);
}
// Since because finding adjacent optical duplicates is an O(n^2) operation, we can subdivide the input into its
// readgroups in order to reduce the amount of redundant checks across readgroups between reads.
// fillGraphFromAGroup
for (auto &entry : tileRGmap) {
auto &groupList = entry.second;
if (groupList.size() > 1) {
for (int i = 0; i < groupList.size(); ++i) {
int iIndex = groupList[i];
const ReadEnds *currentLoc = readEndsArr[iIndex];
for (int j = i + 1; j < groupList.size(); ++j) {
int jIndex = groupList[j];
const ReadEnds *other = readEndsArr[jIndex];
if (closeEnoughShort(currentLoc, other, DEFAULT_OPTICAL_DUPLICATE_DISTANCE))
opticalDistanceRelationGraph.addEdge(iIndex, jIndex);
}
}
}
}
// Keep a map of the reads and their cluster assignments
unordered_map<int, int> opticalDuplicateClusterMap;
opticalDistanceRelationGraph.cluster(&opticalDuplicateClusterMap);
unordered_map<int, int> clusterToRepresentativeRead;
int keeperCluster = -1;
// Specially mark the keeper as specifically not a duplicate if it exists
if (keeperIndex >= 0) {
keeperCluster = opticalDuplicateClusterMap[keeperIndex];
clusterToRepresentativeRead[keeperCluster] = keeperIndex;
}
for (auto &entry: opticalDuplicateClusterMap) {
int recordIndex = entry.first;
int recordAssignedCluster = entry.second;
// If its not the first read we've seen for this cluster, mark it as an optical duplicate
auto repReadItr = clusterToRepresentativeRead.find(recordAssignedCluster);
if (repReadItr != clusterToRepresentativeRead.end() && recordIndex != keeperIndex) {
const ReadEnds *representativeLoc = readEndsArr[repReadItr->second];
const ReadEnds *currentRecordLoc = readEndsArr[recordIndex];
// If not in the keeper cluster, then keep the minX -> minY valued duplicate (note the tile must be
// equal for reads to cluster together)
if (!(keeperIndex >= 0 && recordAssignedCluster == keeperCluster) &&
(currentRecordLoc->x < representativeLoc->x ||
(currentRecordLoc->x == representativeLoc->x && currentRecordLoc->y < representativeLoc->y))) {
// Mark the old min as an optical duplicate, and save the new min
opticalDuplicateFlags[repReadItr->second] = true;
clusterToRepresentativeRead[recordAssignedCluster] = recordIndex;
} else {
// If a smaller read has already been visited, mark the test read as an optical duplicate
opticalDuplicateFlags[recordIndex] = true;
}
} else {
clusterToRepresentativeRead[recordAssignedCluster] = recordIndex;
}
}
} else {
/**
* Compute optical duplicates quickly in the standard case where we know that there won't be any transitive
@ -453,4 +512,6 @@ void trackOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const ReadEnd
gMetrics.NonOpticalDuplicateCountHist += readEndsArr.size() - nOpticalDup;
if (nOpticalDup)
gMetrics.OpticalDuplicatesCountHist += nOpticalDup + 1;
// cout << "zzh optical:" << (++zzhtestnum) << "\t" << readEndsArr.size() << "\t" << nOpticalDup << endl;
}

View File

@ -138,16 +138,75 @@ struct DupIdxQueue {
*/
template <class Node>
struct Graph { // 用set
unordered_set<Node> nodes; // 图中的结点
unordered_map<Node*, unordered_set<Node*>> neighbors; // 邻接列表
vector<Node> nodes; // 图中的结点
unordered_map<Node, int> nodeIdxMap;
unordered_map<int, unordered_set<int>> neighbors; // 邻接列表
Node *addNode(const Node &singleton) {
if (nodes.find(singleton) == nodes.end()) {
Node *n = const_cast<Node *>(&(*nodes.insert(singleton).first));
neighbors[n].clear();
return n;
int addNode(const Node &singleton) {
int idx = -1;
if (nodeIdxMap.find(singleton) == nodeIdxMap.end()) {
nodes.push_back(singleton);
idx = nodes.size() - 1;
nodeIdxMap[singleton] = idx;
neighbors[idx].clear();
} else
idx = nodeIdxMap[singleton];
return idx;
}
return const_cast<Node *>(&(*nodes.find(singleton)));
/* bidirectional and public */
void addEdge(Node &left, Node &right) {
int leftIndex = addNode(left);
if (left == right) return;
int rightIndex = addNode(right);
addNeighbor(leftIndex, rightIndex);
addNeighbor(rightIndex, leftIndex);
}
void addNeighbor(int fromNode, int toNode) {
unordered_set<int> &fromNodesNeighbors = neighbors[fromNode];
if (fromNodesNeighbors.find(toNode) == fromNodesNeighbors.end())
fromNodesNeighbors.insert(toNode);
}
/**
* returns the cluster map of connected components
*
* @return Nodes that point to the same integer are in the same cluster.
*/
void cluster(unordered_map<Node, int> *pClusterMap) {
auto &clusterMap = *pClusterMap;
vector<int> vCluster(nodes.size(), 0);
for (int i = 0; i < vCluster.size(); ++i) vCluster[i] = i;
for (int i = 0; i < neighbors.size(); ++i) {
for (auto &j : neighbors[i]) joinNodes(j, i, &vCluster);
}
for (auto & node : nodes) {
clusterMap[node] = findRepNode(vCluster, nodeIdxMap[node]);
}
}
// Part of Union-Find with Path Compression that joins to nodes to be part of the same cluster.
static void joinNodes(int nodeId1, int nodeId2, vector<int> *pGrouping) {
auto &grouping = *pGrouping;
int repNode1 = findRepNode(grouping, nodeId1);
int repNode2 = findRepNode(grouping, nodeId2);
if (repNode1 == repNode2)
return;
grouping[repNode1] = repNode2;
}
// Part of Union-Find with Path Compression to determine the duplicate set a particular UMI belongs to.
static int findRepNode(vector<int> &grouping, int nodeId) {
int representativeUmi = nodeId; // All UMIs of a duplicate set will have the same reprsentativeUmi.
while (representativeUmi != grouping[representativeUmi]) representativeUmi = grouping[representativeUmi];
while (nodeId != representativeUmi) {
int newUmiId = grouping[nodeId];
grouping[nodeId] = representativeUmi;
nodeId = newUmiId;
}
return representativeUmi;
}
};

View File

@ -155,33 +155,27 @@ static void generateReadEnds(SerailMarkDupArg *arg) {
p.unpairedPosArr.clear();
/* 处理每个read创建ReadEnd并放入frag和pair中 */
set<ReadEnds> reSet;
// set<ReadEnds> reSet;
// ReadEnds lastRe;
ReadEnds lastRe;
for (int i = 0; i < p.bams.size(); ++i) // 循环处理每个read
{
for (int i = 0; i < p.bams.size(); ++i) { // 循环处理每个read
BamWrap *bw = p.bams[i];
const int64_t bamIdx = p.bamStartIdx + i;
if (bw->GetReadUnmappedFlag()) {
if (bw->b->core.tid == -1)
// When we hit the unmapped reads with no coordinate, no reason
// to continue (only in coordinate sort).
// When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort).
break;
} else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对
{
} else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对
ReadEnds fragEnd;
tm_arr[8].acc_start();
buildReadEnds(*bw, bamIdx, rnParser, &fragEnd);
tm_arr[8].acc_end();
p.frags.push_back(fragEnd); // 添加进frag集合
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了
{
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // 是pairend而且互补的read也比对上了
string key = bw->query_name();
if (p.unpairedDic.find(key) == p.unpairedDic.end()) {
p.unpairedDic[key] = {p.taskSeq, fragEnd};
} else // 找到了pairend
{
} else { // 找到了pairend
auto &pairedEnds = p.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
p.pairs.push_back(pairedEnds);
@ -191,8 +185,8 @@ static void generateReadEnds(SerailMarkDupArg *arg) {
}
}
tm_arr[9].acc_start();
sortReadEndsArr(p.frags);
// sort(p.frags.begin(), p.frags.end());
//sortReadEndsArr(p.frags);
sort(p.frags.begin(), p.frags.end());
tm_arr[9].acc_end();
// cout << "sort pairs" << endl;
tm_arr[10].acc_start();
@ -283,7 +277,7 @@ static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds>
while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) {
leftSpan += 1;
if (leftSpan > leftEndIdx) {
if (leftSpan > leftEndIdx) { // 上一个的范围被下一个全部包围了可能会有bug上上个也与下一个有交集呢
leftSpan = leftArr.size() - 1;
break;
}
@ -291,7 +285,7 @@ static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds>
while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) {
rightSpan += 1;
if (rightSpan == rightArr.size() - 1)
if (rightSpan == rightArr.size() - 1) // 同上可能会有bug
break;
}
dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end());
@ -775,6 +769,10 @@ void serialMarkDups() {
cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl;
cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl;
cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl;
cout << "metrics: " << gMetrics.DuplicateCountHist << "\t"
<< gMetrics.NonOpticalDuplicateCountHist << "\t"
<< gMetrics.OpticalDuplicatesCountHist << "\t"
<< gMetrics.OpticalDuplicatesByLibraryId << endl;
Timer::log_time("serial end ");

View File

@ -113,8 +113,7 @@ struct ReadNameParser {
bool ReadLocationInformation(const string &readName,
PhysicalLocation *loc) {
try {
// Optimized version if using the default read name regex (== used
// on purpose):
// Optimized version if using the default read name regex (== used on purpose):
if (useOptimizedDefaultParsing) {
const int fields = getLastThreeFields(readName, ':');
if (!(fields == 5 || fields == 7)) {