Fix for IndelRealigner memory problem. Now the Constrained mate fixing writer is told whether a read has been modified and, if it wasn't, can dump it when the cache needs to get flushed at places with tons of coverage.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5777 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
d8a761bbbd
commit
15c7bd82a5
|
|
@ -8,6 +8,7 @@ import org.broadinstitute.sting.utils.GenomeLocParser;
|
|||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
|
||||
|
|
@ -77,6 +78,7 @@ import java.util.Queue;
|
|||
* @version 0.2
|
||||
*/
|
||||
public class ConstrainedMateFixingManager {
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(ConstrainedMateFixingManager.class);
|
||||
private static final boolean DEBUG = false;
|
||||
|
||||
|
|
@ -111,9 +113,20 @@ public class ConstrainedMateFixingManager {
|
|||
int counter = 0;
|
||||
|
||||
/** read.name -> records */
|
||||
HashMap<String, SAMRecord> forMateMatching = new HashMap<String, SAMRecord>();
|
||||
HashMap<String, SAMRecordHashObject> forMateMatching = new HashMap<String, SAMRecordHashObject>();
|
||||
Queue<SAMRecord> waitingReads = new PriorityQueue<SAMRecord>(1000, comparer);
|
||||
|
||||
private class SAMRecordHashObject {
|
||||
public SAMRecord record;
|
||||
public boolean wasModified;
|
||||
|
||||
public SAMRecordHashObject(SAMRecord record, boolean wasModified) {
|
||||
this.record = record;
|
||||
this.wasModified = wasModified;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//private SimpleTimer timer = new SimpleTimer("ConstrainedWriter");
|
||||
//private long PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in milliseconds
|
||||
//private long lastProgressPrintTime = -1; // When was the last time we printed progress log?
|
||||
|
|
@ -164,7 +177,7 @@ public class ConstrainedMateFixingManager {
|
|||
}
|
||||
}
|
||||
|
||||
public void addRead( SAMRecord newRead ) {
|
||||
public void addRead(SAMRecord newRead, boolean readWasModified) {
|
||||
if ( DEBUG ) logger.info("New read pos " + newRead.getAlignmentStart() + " OP = " + newRead.getAttribute("OP"));
|
||||
|
||||
//final long curTime = timer.currentTime();
|
||||
|
|
@ -189,13 +202,15 @@ public class ConstrainedMateFixingManager {
|
|||
|
||||
if ( !tooManyReads )
|
||||
forMateMatching.clear();
|
||||
else
|
||||
purgeUnmodifiedMates();
|
||||
}
|
||||
|
||||
// fix mates, as needed
|
||||
// Since setMateInfo can move reads, we potentially need to remove the mate, and requeue
|
||||
// it to ensure proper sorting
|
||||
if ( newRead.getReadPairedFlag() ) {
|
||||
SAMRecord mate = forMateMatching.get(newRead.getReadName());
|
||||
SAMRecordHashObject mate = forMateMatching.get(newRead.getReadName());
|
||||
if ( mate != null ) {
|
||||
// 1. Frustratingly, Picard's setMateInfo() method unaligns (by setting the reference contig
|
||||
// to '*') read pairs when both of their flags have the unmapped bit set. This is problematic
|
||||
|
|
@ -209,27 +224,27 @@ public class ConstrainedMateFixingManager {
|
|||
// arbitrarily far away). However, we do still want to move legitimately unmapped reads whose
|
||||
// mates are mapped, so the compromise will be that if the mate is still in the queue then we'll
|
||||
// move the read and otherwise we won't.
|
||||
boolean doNotFixMates = newRead.getReadUnmappedFlag() && (mate.getReadUnmappedFlag() || !waitingReads.contains(mate));
|
||||
boolean doNotFixMates = newRead.getReadUnmappedFlag() && (mate.record.getReadUnmappedFlag() || !waitingReads.contains(mate.record));
|
||||
if ( !doNotFixMates ) {
|
||||
|
||||
boolean reQueueMate = mate.getReadUnmappedFlag() && ! newRead.getReadUnmappedFlag();
|
||||
boolean reQueueMate = mate.record.getReadUnmappedFlag() && ! newRead.getReadUnmappedFlag();
|
||||
if ( reQueueMate ) {
|
||||
// the mate was unmapped, but newRead was mapped, so the mate may have been moved
|
||||
// to be next-to newRead, so needs to be reinserted into the waitingReads queue
|
||||
// note -- this must be called before the setMateInfo call below
|
||||
if ( ! waitingReads.remove(mate) )
|
||||
if ( ! waitingReads.remove(mate.record) )
|
||||
// we must have hit a region with too much depth and flushed the queue
|
||||
reQueueMate = false;
|
||||
}
|
||||
|
||||
// we've already seen our mate -- set the mate info and remove it from the map
|
||||
SamPairUtil.setMateInfo(mate, newRead, null);
|
||||
if ( reQueueMate ) waitingReads.add(mate);
|
||||
SamPairUtil.setMateInfo(mate.record, newRead, null);
|
||||
if ( reQueueMate ) waitingReads.add(mate.record);
|
||||
}
|
||||
|
||||
forMateMatching.remove(newRead.getReadName());
|
||||
} else if ( pairedReadIsMovable(newRead) ) {
|
||||
forMateMatching.put(newRead.getReadName(), newRead);
|
||||
forMateMatching.put(newRead.getReadName(), new SAMRecordHashObject(newRead, readWasModified));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -277,6 +292,17 @@ public class ConstrainedMateFixingManager {
|
|||
|| Math.abs(read.getInferredInsertSize()) > maxInsertSizeForMovingReadPairs; // we won't try to move such a read
|
||||
}
|
||||
|
||||
private void purgeUnmodifiedMates() {
|
||||
HashMap<String, SAMRecordHashObject> forMateMatchingCleaned = new HashMap<String, SAMRecordHashObject>();
|
||||
for ( Map.Entry<String, SAMRecordHashObject> entry : forMateMatching.entrySet() ) {
|
||||
if ( entry.getValue().wasModified )
|
||||
forMateMatchingCleaned.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
forMateMatching.clear(); // explicitly clear the memory
|
||||
forMateMatching = forMateMatchingCleaned;
|
||||
}
|
||||
|
||||
private boolean pairedReadIsMovable(SAMRecord read) {
|
||||
return read.getReadPairedFlag() // we're a paired read
|
||||
&& (!read.getReadUnmappedFlag() || !read.getMateUnmappedFlag()) // at least one read is mapped
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
|
|||
|
||||
@Argument(fullName="maxReadsInMemory", shortName="maxInMemory", doc="max reads allowed to be kept in memory at a time by the SAMFileWriter. "+
|
||||
"Keep it low to minimize memory consumption (but the tool may skip realignment on regions with too much coverage. If it is too low, it may generate errors during realignment); keep it high to maximize realignment (but make sure to give Java enough memory).", required=false)
|
||||
protected int MAX_RECORDS_IN_MEMORY = 200000;
|
||||
protected int MAX_RECORDS_IN_MEMORY = 150000;
|
||||
|
||||
@Argument(fullName="maxIsizeForMovement", shortName="maxIsize", doc="maximum insert size of read pairs that we attempt to realign", required=false)
|
||||
protected int MAX_ISIZE_FOR_MOVEMENT = 3000;
|
||||
|
|
@ -192,6 +192,7 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
|
|||
private final ArrayList<SAMRecord> readsNotToClean = new ArrayList<SAMRecord>();
|
||||
private final ArrayList<VariantContext> knownIndelsToTry = new ArrayList<VariantContext>();
|
||||
private final HashSet<Object> indelRodsSeen = new HashSet<Object>();
|
||||
private final HashSet<SAMRecord> readsActuallyCleaned = new HashSet<SAMRecord>();
|
||||
|
||||
private static final int MAX_QUAL = 99;
|
||||
|
||||
|
|
@ -214,6 +215,7 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
|
|||
|
||||
protected Map<SAMReaderID, ConstrainedMateFixingManager> nwayWriters = null;
|
||||
|
||||
|
||||
// debug info for lazy SW evaluation:
|
||||
private long exactMatchesFound = 0; // how many reads exactly matched a consensus we already had
|
||||
private long SWalignmentRuns = 0; // how many times (=for how many reads) we ran SW alignment
|
||||
|
|
@ -388,6 +390,10 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
|
|||
}
|
||||
|
||||
private void emit(final SAMRecord read) {
|
||||
|
||||
// check to see whether the read was modified by looking at the temporary tag
|
||||
boolean wasModified = readsActuallyCleaned.contains(read);
|
||||
|
||||
try {
|
||||
if ( N_WAY_OUT != null ) {
|
||||
SAMReaderID rid = getToolkit().getReaderIDForRead(read);
|
||||
|
|
@ -397,9 +403,9 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
|
|||
read.setAttribute("RG",
|
||||
getToolkit().getReadsDataSource().getOriginalReadGroupId((String)read.getAttribute("RG")));
|
||||
}
|
||||
m.addRead(read);
|
||||
m.addRead(read, wasModified);
|
||||
} else {
|
||||
manager.addRead(read);
|
||||
manager.addRead(read, wasModified);
|
||||
}
|
||||
} catch (RuntimeIOException e) {
|
||||
throw new UserException.ErrorWritingBamFile(e.getMessage());
|
||||
|
|
@ -414,6 +420,7 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
|
|||
emit(read);
|
||||
readsToClean.clear();
|
||||
readsNotToClean.clear();
|
||||
readsActuallyCleaned.clear();
|
||||
}
|
||||
|
||||
public Integer map(ReferenceContext ref, SAMRecord read, ReadMetaDataTracker metaDataTracker) {
|
||||
|
|
@ -797,6 +804,9 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
|
|||
// TODO -- this is only temporary until Tim adds code to recalculate this value
|
||||
if ( read.getAttribute(SAMTag.MD.name()) != null )
|
||||
read.setAttribute(SAMTag.MD.name(), null);
|
||||
|
||||
// mark that it was actually cleaned
|
||||
readsActuallyCleaned.add(read);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ public class IndelRealignerIntegrationTest extends WalkerTest {
|
|||
@Test
|
||||
public void testMaxReadsInMemory() {
|
||||
HashMap<String, String> e = new HashMap<String, String>();
|
||||
e.put( "--maxReadsInMemory 10000", "f8e4279cba9fb3a2181d1ce28f7a62af" );
|
||||
e.put( "--maxReadsInMemory 10000", "87605e2dea24d3e01efaeec5f44e8671" );
|
||||
e.put( "--maxReadsInMemory 40000", base_md5 );
|
||||
|
||||
for ( Map.Entry<String, String> entry : e.entrySet() ) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue