diff --git a/public/java/src/org/broadinstitute/sting/gatk/downsampling/PerSampleDownsamplingReadsIterator.java b/public/java/src/org/broadinstitute/sting/gatk/downsampling/PerSampleDownsamplingReadsIterator.java index 5275c471e..b4161b06e 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/downsampling/PerSampleDownsamplingReadsIterator.java +++ b/public/java/src/org/broadinstitute/sting/gatk/downsampling/PerSampleDownsamplingReadsIterator.java @@ -104,6 +104,56 @@ public class PerSampleDownsamplingReadsIterator implements StingSAMIterator { readComparator.compare(orderedDownsampledReadsCache.peek(), earliestPendingRead) <= 0; } + private boolean fillDownsampledReadsCache() { + SAMRecord prevRead = null; + int numPositionalChanges = 0; + + // Continue submitting reads to the per-sample downsamplers until the read at the top of the priority queue + // can be released without violating global sort order + while ( nestedSAMIterator.hasNext() && ! readyToReleaseReads() ) { + SAMRecord read = nestedSAMIterator.next(); + String sampleName = read.getReadGroup() != null ? read.getReadGroup().getSample() : null; + + ReadsDownsampler thisSampleDownsampler = perSampleDownsamplers.get(sampleName); + if ( thisSampleDownsampler == null ) { + thisSampleDownsampler = downsamplerFactory.newInstance(); + perSampleDownsamplers.put(sampleName, thisSampleDownsampler); + } + + thisSampleDownsampler.submit(read); + processFinalizedAndPendingItems(thisSampleDownsampler); + + if ( prevRead != null && prevRead.getAlignmentStart() != read.getAlignmentStart() ) { + numPositionalChanges++; + } + + // Periodically inform all downsamplers of the current position in the read stream. This is + // to prevent downsamplers for samples with sparser reads than others from getting stuck too + // long in a pending state. + if ( numPositionalChanges > 0 && numPositionalChanges % DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL == 0 ) { + for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { + perSampleDownsampler.signalNoMoreReadsBefore(read); + processFinalizedAndPendingItems(perSampleDownsampler); + } + } + + prevRead = read; + } + + if ( ! nestedSAMIterator.hasNext() ) { + for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { + perSampleDownsampler.signalEndOfInput(); + if ( perSampleDownsampler.hasFinalizedItems() ) { + orderedDownsampledReadsCache.addAll(perSampleDownsampler.consumeFinalizedItems()); + } + } + earliestPendingRead = null; + earliestPendingDownsampler = null; + } + + return readyToReleaseReads(); + } + private void updateEarliestPendingRead( ReadsDownsampler currentDownsampler ) { // If there is no recorded earliest pending read and this downsampler has pending items, // then this downsampler's first pending item becomes the new earliest pending read: @@ -135,57 +185,11 @@ public class PerSampleDownsamplingReadsIterator implements StingSAMIterator { } } - private boolean fillDownsampledReadsCache() { - SAMRecord prevRead = null; - int numPositionalChanges = 0; - - // Continue submitting reads to the per-sample downsamplers until the read at the top of the priority queue - // can be released without violating global sort order - while ( nestedSAMIterator.hasNext() && ! readyToReleaseReads() ) { - SAMRecord read = nestedSAMIterator.next(); - String sampleName = read.getReadGroup() != null ? read.getReadGroup().getSample() : null; - - ReadsDownsampler thisSampleDownsampler = perSampleDownsamplers.get(sampleName); - if ( thisSampleDownsampler == null ) { - thisSampleDownsampler = downsamplerFactory.newInstance(); - perSampleDownsamplers.put(sampleName, thisSampleDownsampler); - } - - thisSampleDownsampler.submit(read); - updateEarliestPendingRead(thisSampleDownsampler); - - if ( prevRead != null && prevRead.getAlignmentStart() != read.getAlignmentStart() ) { - numPositionalChanges++; - } - - // Periodically inform all downsamplers of the current position in the read stream. This is - // to prevent downsamplers for samples with sparser reads than others from getting stuck too - // long in a pending state. - if ( numPositionalChanges > 0 && numPositionalChanges % DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL == 0 ) { - for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { - perSampleDownsampler.signalNoMoreReadsBefore(read); - updateEarliestPendingRead(perSampleDownsampler); - } - } - - prevRead = read; + private void processFinalizedAndPendingItems( ReadsDownsampler currentDownsampler ) { + if ( currentDownsampler.hasFinalizedItems() ) { + orderedDownsampledReadsCache.addAll(currentDownsampler.consumeFinalizedItems()); } - - if ( ! nestedSAMIterator.hasNext() ) { - for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { - perSampleDownsampler.signalEndOfInput(); - } - earliestPendingRead = null; - earliestPendingDownsampler = null; - } - - for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { - if ( perSampleDownsampler.hasFinalizedItems() ) { - orderedDownsampledReadsCache.addAll(perSampleDownsampler.consumeFinalizedItems()); - } - } - - return readyToReleaseReads(); + updateEarliestPendingRead(currentDownsampler); } public void remove() {