diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 652dda55e..445884459 100755 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -535,6 +535,7 @@ public class GenomeAnalysisEngine { return new Reads(argCollection.samFiles, argCollection.strictnessLevel, + argCollection.readBufferSize, method, new ValidationExclusion(Arrays.asList(argCollection.unsafe)), filters, diff --git a/java/src/org/broadinstitute/sting/gatk/Reads.java b/java/src/org/broadinstitute/sting/gatk/Reads.java index 0135cfb20..8affdf3c9 100755 --- a/java/src/org/broadinstitute/sting/gatk/Reads.java +++ b/java/src/org/broadinstitute/sting/gatk/Reads.java @@ -28,6 +28,7 @@ import java.util.Collection; public class Reads { private List readsFiles = null; private SAMFileReader.ValidationStringency validationStringency = SAMFileReader.ValidationStringency.STRICT; + private Integer readBufferSize = null; private DownsamplingMethod downsamplingMethod = null; private ValidationExclusion exclusionList = null; private Collection supplementalFilters = null; @@ -74,6 +75,14 @@ public class Reads { return validationStringency; } + /** + * Gets a list of the total number of reads that the sharding system should buffer per BAM file. + * @return + */ + public Integer getReadBufferSize() { + return readBufferSize; + } + /** * Gets the method and parameters used when downsampling reads. * @return Downsample fraction. @@ -119,6 +128,7 @@ public class Reads { * is package protected. * @param samFiles list of reads files. * @param strictness Stringency of reads file parsing. + * @param readBufferSize Number of reads to hold in memory per BAM. * @param exclusionList what safety checks we're willing to let slide * @param supplementalFilters additional filters to dynamically apply. * @param generateExtendedEvents if true, the engine will issue an extra call to walker's map() with @@ -130,6 +140,7 @@ public class Reads { */ Reads( List samFiles, SAMFileReader.ValidationStringency strictness, + Integer readBufferSize, DownsamplingMethod downsamplingMethod, ValidationExclusion exclusionList, Collection supplementalFilters, @@ -137,6 +148,7 @@ public class Reads { boolean includeReadsWithDeletionAtLoci, boolean generateExtendedEvents) { this.readsFiles = samFiles; + this.readBufferSize = readBufferSize; this.validationStringency = strictness; this.downsamplingMethod = downsamplingMethod; this.exclusionList = exclusionList == null ? new ValidationExclusion() : exclusionList; diff --git a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index 7c94ec272..576f9be3a 100755 --- a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -66,6 +66,10 @@ public class GATKArgumentCollection { @Argument(fullName = "input_file", shortName = "I", doc = "SAM or BAM file(s)", required = false) public List samFiles = new ArrayList(); + @Element(required = false) + @Argument(fullName = "read_buffer_size", shortName = "rbs", doc="Number of reads per SAM file to buffer in memory", required = false) + public Integer readBufferSize = null; + @ElementList(required = false) @Argument(fullName = "read_filter", shortName = "rf", doc = "Specify filtration criteria to apply to each read individually.", required = false) public List readFilters = new ArrayList(); @@ -280,6 +284,18 @@ public class GATKArgumentCollection { if (!other.samFiles.equals(this.samFiles)) { return false; } + if(other.readBufferSize == null || this.readBufferSize == null) { + // If either is null, return false if they're both null, otherwise keep going... + if(other.readBufferSize != null || this.readBufferSize != null) + return false; + } + else { + if(!other.readBufferSize.equals(this.readBufferSize)) + return false; + } + if (!(other.readBufferSize == null && this.readBufferSize == null) && (other.readBufferSize == null || this.readBufferSize == null)) { + return false; + } if (!other.maximumEngineIterations.equals(this.maximumEngineIterations)) { return false; } diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java index 52dcfe5c7..987cac90e 100755 --- a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java @@ -361,6 +361,8 @@ public class SAMDataSource implements SimpleDataSource { if(shard.getFileSpans().get(id) == null) continue; CloseableIterator iterator = readers.getReader(id).iterator(shard.getFileSpans().get(id)); + if(reads.getReadBufferSize() != null) + iterator = new BufferingReadIterator(iterator,reads.getReadBufferSize()); if(shard.getFilter() != null) iterator = new FilteringIterator(iterator,shard.getFilter()); // not a counting iterator because we don't want to show the filtering of reads mergingIterator.addIterator(readers.getReader(id),iterator); diff --git a/java/src/org/broadinstitute/sting/gatk/iterators/BufferingReadIterator.java b/java/src/org/broadinstitute/sting/gatk/iterators/BufferingReadIterator.java new file mode 100644 index 000000000..9cb35233a --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/iterators/BufferingReadIterator.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2010, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.iterators; + +import net.sf.samtools.SAMRecord; +import net.sf.samtools.util.CloseableIterator; + +import java.util.Queue; +import java.util.LinkedList; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.broadinstitute.sting.utils.StingException; + +/** + * Buffers access to a large stream of reads, replenishing the buffer only when the reads + * + * @author mhanna + * @version 0.1 + */ +public class BufferingReadIterator implements CloseableIterator { + private final CloseableIterator wrappedIterator; + private final Queue buffer; + private final int bufferSize; + + public BufferingReadIterator(final CloseableIterator readIterator, final int bufferSize) { + this.wrappedIterator = readIterator; + this.buffer = new LinkedList(); + this.bufferSize = bufferSize; + } + + public boolean hasNext() { + assureBufferFull(); + return !buffer.isEmpty(); + } + + public SAMRecord next() { + assureBufferFull(); + if(!hasNext()) throw new NoSuchElementException("No next element available"); + return buffer.remove(); + } + + public void close() { + wrappedIterator.close(); + } + + public void remove() { + throw new StingException("Unable to remove from a BufferingReadIterator"); + } + + /** + * If the buffer is empty but there are more elements in the iterator, + */ + private void assureBufferFull() { + if(!buffer.isEmpty()) + return; + while(buffer.size() < bufferSize && wrappedIterator.hasNext()) + buffer.add(wrappedIterator.next()); + } +}