diff --git a/java/src/org/broadinstitute/sting/gatk/iterators/DownsamplingLocusIteratorByState.java b/java/src/org/broadinstitute/sting/gatk/iterators/DownsamplingLocusIteratorByState.java index 4ee17faad..b72d32d2e 100755 --- a/java/src/org/broadinstitute/sting/gatk/iterators/DownsamplingLocusIteratorByState.java +++ b/java/src/org/broadinstitute/sting/gatk/iterators/DownsamplingLocusIteratorByState.java @@ -53,6 +53,12 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { //public static final EnumSet NO_DISCARDS = EnumSet.noneOf(Discard.class); public static final List NO_FILTERS = Arrays.asList(); + /** + * the overflow tracker, which makes sure we get a limited number of warnings for locus pile-ups that + * exceed the max depth + */ + private LocusOverflowTracker overflowTracker; + /** our log, which we want to capture anything from this class */ private static Logger logger = Logger.getLogger(DownsamplingLocusIteratorByState.class); @@ -275,9 +281,10 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { // TODO: Push in header via constructor if(GenomeAnalysisEngine.instance.getDataSource() != null) sampleNames.addAll(SampleUtils.getSAMFileSamples(GenomeAnalysisEngine.instance.getSAMFileHeader())); - readStates = new ReadStateManager(samIterator,readInformation.getDownsamplingMethod(),sampleNames); + readStates = new ReadStateManager(samIterator,readInformation.getDownsamplingMethod(),readInformation.getMaxReadsAtLocus(),sampleNames); this.readInfo = readInformation; this.filters = filters; + overflowTracker = new LocusOverflowTracker(readInformation.getMaxReadsAtLocus()); } public Iterator iterator() { @@ -292,6 +299,11 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { lazyLoadNextAlignmentContext(); boolean r = (nextAlignmentContext != null); //if ( DEBUG ) System.out.printf("hasNext() = %b%n", r); + + // if we don't have a next record, make sure we clean the warning queue + // TODO: Note that this implementation requires that hasNext() always be called before next(). + if (!r) overflowTracker.cleanWarningQueue(); + return r; } @@ -489,7 +501,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { * @param tracker */ protected void setLocusOverflowTracker(LocusOverflowTracker tracker) { - // TODO: implement + this.overflowTracker = tracker; } /** @@ -497,16 +509,17 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { * @return the overflow tracker, null if none exists */ public LocusOverflowTracker getLocusOverflowTracker() { - // TODO: implement - return null; + return this.overflowTracker; } private class ReadStateManager implements Iterable { private final PeekableIterator iterator; private final DownsamplingMethod downsamplingMethod; - private final Map> downsamplersBySampleName = new HashMap>(); + private final Map> aggregatorsBySampleName = new HashMap>(); + private final int targetCoverage; + private final int maxReadsAtLocus; private final Deque>> readStatesByAlignmentStart; private int totalReadStatesInHanger = 0; @@ -518,15 +531,18 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { */ private Random downsampleRandomizer = new Random(38148309L); - public ReadStateManager(Iterator source, DownsamplingMethod downsamplingMethod, Collection sampleNames) { + public ReadStateManager(Iterator source, DownsamplingMethod downsamplingMethod, int maxReadsAtLocus, Collection sampleNames) { this.iterator = new PeekableIterator(source); this.downsamplingMethod = downsamplingMethod; this.targetCoverage = downsamplingMethod.toCoverage != null ? downsamplingMethod.toCoverage : 1; + this.maxReadsAtLocus = maxReadsAtLocus; + if(downsamplingMethod.type == DownsampleType.NONE) + aggregatorsBySampleName.put(null,new ArrayList()); if(downsamplingMethod.type == DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) - downsamplersBySampleName.put(null,new ReservoirDownsampler(targetCoverage)); + aggregatorsBySampleName.put(null,new ReservoirDownsampler(targetCoverage)); else { for(String sampleName: sampleNames) - downsamplersBySampleName.put(sampleName,new ReservoirDownsampler(targetCoverage)); + aggregatorsBySampleName.put(sampleName,new ReservoirDownsampler(targetCoverage)); } this.readStatesByAlignmentStart = new LinkedList>>(); } @@ -624,7 +640,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { int firstAlignmentStart = iterator.peek().getAlignmentStart(); while(iterator.hasNext() && iterator.peek().getReferenceIndex() == firstContigIndex && iterator.peek().getAlignmentStart() == firstAlignmentStart) { SAMRecord read = iterator.next(); - getDownsampler(read.getReadGroup().getSample()).add(read); + getAggregator(read.getReadGroup().getSample()).add(read); } } else { @@ -634,23 +650,30 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { while (iterator.hasNext() && !readIsPastCurrentPosition(iterator.peek())) { SAMRecord read = iterator.next(); - getDownsampler(read.getReadGroup().getSample()).add(read); + getAggregator(read.getReadGroup().getSample()).add(read); } } Map> culledReadStatesBySample = new HashMap>(); int readStatesInHangerEntry = 0; - for(Map.Entry> entry: downsamplersBySampleName.entrySet()) { + for(Map.Entry> entry: aggregatorsBySampleName.entrySet()) { String sampleName = entry.getKey(); - ReservoirDownsampler downsampler = entry.getValue(); + Collection aggregator = entry.getValue(); - Collection newReads = downsampler.getDownsampledContents(); - downsampler.clear(); + Collection newReads = new ArrayList(aggregator); + aggregator.clear(); int readsInHanger = countReadsInHanger(sampleName); - if(readsInHanger+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) - readStatesInHangerEntry += addReadsToHanger(culledReadStatesBySample,sampleName,newReads,newReads.size()); + if(readsInHanger+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.NONE || downsamplingMethod.type==DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) { + int readLimit = newReads.size(); + boolean mrlViolation = false; + if(readLimit > maxReadsAtLocus-totalReadStatesInHanger) { + readLimit = maxReadsAtLocus-totalReadStatesInHanger; + mrlViolation = true; + } + readStatesInHangerEntry += addReadsToHanger(culledReadStatesBySample,sampleName,newReads,readLimit,mrlViolation); + } else { Iterator>> backIterator = readStatesByAlignmentStart.descendingIterator(); boolean readPruned = true; @@ -672,7 +695,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { firstHangerForSample.clear(); } - readStatesInHangerEntry += addReadsToHanger(culledReadStatesBySample,sampleName,newReads,targetCoverage-readsInHanger); + readStatesInHangerEntry += addReadsToHanger(culledReadStatesBySample,sampleName,newReads,targetCoverage-readsInHanger,false); } } if(!culledReadStatesBySample.isEmpty()) { @@ -681,11 +704,11 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { } } - private ReservoirDownsampler getDownsampler(String sampleName) { + private Collection getAggregator(String sampleName) { if(downsamplingMethod.type == DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) - return downsamplersBySampleName.get(null); + return aggregatorsBySampleName.get(null); else - return downsamplersBySampleName.get(sampleName); + return aggregatorsBySampleName.get(sampleName); } private int countReadsInHanger(final String sampleName) { @@ -705,22 +728,38 @@ public class DownsamplingLocusIteratorByState extends LocusIterator { * @param maxReads Maximum number of reads to add. * @return Total number of reads added. */ - private int addReadsToHanger(final Map> newHangerEntry, final String sampleName, final Collection reads, final int maxReads) { + private int addReadsToHanger(final Map> newHangerEntry, final String sampleName, final Collection reads, final int maxReads, boolean atMaxReadsAtLocusLimit) { if(reads.isEmpty()) return 0; + + GenomeLoc location = null; + + // the farthest right a read extends + Integer rightMostEnd = -1; + List readStatesBySample = new LinkedList(); int readCount = 0; for(SAMRecord read: reads) { - if(readCount >= maxReads) - break; - SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents()); - state.stepForwardOnGenome(); - readStatesBySample.add(state); - // TODO: What if we downsample the extended events away? - if (state.hadIndel()) hasExtendedEvents = true; - readCount++; + if(readCount < maxReads) { + SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents()); + state.stepForwardOnGenome(); + readStatesBySample.add(state); + // TODO: What if we downsample the extended events away? + if (state.hadIndel()) hasExtendedEvents = true; + readCount++; + } + else if(atMaxReadsAtLocusLimit) { + if (location == null) + location = GenomeLocParser.createGenomeLoc(read); + rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd; + } } newHangerEntry.put(sampleName,readStatesBySample); + + if (location != null) + overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd), + readCount); + return readCount; } }