The sharding system now buffers reads, with a size determined by command-line argument. Will investigate whether/how this

impacts performance on low-pass data and, if it works well, will create a more automatic version of the tool.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3709 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-07-01 22:28:55 +00:00
parent f967cae1aa
commit cab8394103
5 changed files with 113 additions and 0 deletions

View File

@ -535,6 +535,7 @@ public class GenomeAnalysisEngine {
return new Reads(argCollection.samFiles,
argCollection.strictnessLevel,
argCollection.readBufferSize,
method,
new ValidationExclusion(Arrays.asList(argCollection.unsafe)),
filters,

View File

@ -28,6 +28,7 @@ import java.util.Collection;
public class Reads {
private List<File> readsFiles = null;
private SAMFileReader.ValidationStringency validationStringency = SAMFileReader.ValidationStringency.STRICT;
private Integer readBufferSize = null;
private DownsamplingMethod downsamplingMethod = null;
private ValidationExclusion exclusionList = null;
private Collection<SamRecordFilter> supplementalFilters = null;
@ -74,6 +75,14 @@ public class Reads {
return validationStringency;
}
/**
* Gets a list of the total number of reads that the sharding system should buffer per BAM file.
* @return
*/
public Integer getReadBufferSize() {
return readBufferSize;
}
/**
* Gets the method and parameters used when downsampling reads.
* @return Downsample fraction.
@ -119,6 +128,7 @@ public class Reads {
* is package protected.
* @param samFiles list of reads files.
* @param strictness Stringency of reads file parsing.
* @param readBufferSize Number of reads to hold in memory per BAM.
* @param exclusionList what safety checks we're willing to let slide
* @param supplementalFilters additional filters to dynamically apply.
* @param generateExtendedEvents if true, the engine will issue an extra call to walker's map() with
@ -130,6 +140,7 @@ public class Reads {
*/
Reads( List<File> samFiles,
SAMFileReader.ValidationStringency strictness,
Integer readBufferSize,
DownsamplingMethod downsamplingMethod,
ValidationExclusion exclusionList,
Collection<SamRecordFilter> supplementalFilters,
@ -137,6 +148,7 @@ public class Reads {
boolean includeReadsWithDeletionAtLoci,
boolean generateExtendedEvents) {
this.readsFiles = samFiles;
this.readBufferSize = readBufferSize;
this.validationStringency = strictness;
this.downsamplingMethod = downsamplingMethod;
this.exclusionList = exclusionList == null ? new ValidationExclusion() : exclusionList;

View File

@ -66,6 +66,10 @@ public class GATKArgumentCollection {
@Argument(fullName = "input_file", shortName = "I", doc = "SAM or BAM file(s)", required = false)
public List<File> samFiles = new ArrayList<File>();
@Element(required = false)
@Argument(fullName = "read_buffer_size", shortName = "rbs", doc="Number of reads per SAM file to buffer in memory", required = false)
public Integer readBufferSize = null;
@ElementList(required = false)
@Argument(fullName = "read_filter", shortName = "rf", doc = "Specify filtration criteria to apply to each read individually.", required = false)
public List<String> readFilters = new ArrayList<String>();
@ -280,6 +284,18 @@ public class GATKArgumentCollection {
if (!other.samFiles.equals(this.samFiles)) {
return false;
}
if(other.readBufferSize == null || this.readBufferSize == null) {
// If either is null, return false if they're both null, otherwise keep going...
if(other.readBufferSize != null || this.readBufferSize != null)
return false;
}
else {
if(!other.readBufferSize.equals(this.readBufferSize))
return false;
}
if (!(other.readBufferSize == null && this.readBufferSize == null) && (other.readBufferSize == null || this.readBufferSize == null)) {
return false;
}
if (!other.maximumEngineIterations.equals(this.maximumEngineIterations)) {
return false;
}

View File

@ -361,6 +361,8 @@ public class SAMDataSource implements SimpleDataSource {
if(shard.getFileSpans().get(id) == null)
continue;
CloseableIterator<SAMRecord> iterator = readers.getReader(id).iterator(shard.getFileSpans().get(id));
if(reads.getReadBufferSize() != null)
iterator = new BufferingReadIterator(iterator,reads.getReadBufferSize());
if(shard.getFilter() != null)
iterator = new FilteringIterator(iterator,shard.getFilter()); // not a counting iterator because we don't want to show the filtering of reads
mergingIterator.addIterator(readers.getReader(id),iterator);

View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.iterators;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.util.CloseableIterator;
import java.util.Queue;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.broadinstitute.sting.utils.StingException;
/**
* Buffers access to a large stream of reads, replenishing the buffer only when the reads
*
* @author mhanna
* @version 0.1
*/
public class BufferingReadIterator implements CloseableIterator<SAMRecord> {
private final CloseableIterator<SAMRecord> wrappedIterator;
private final Queue<SAMRecord> buffer;
private final int bufferSize;
public BufferingReadIterator(final CloseableIterator<SAMRecord> readIterator, final int bufferSize) {
this.wrappedIterator = readIterator;
this.buffer = new LinkedList<SAMRecord>();
this.bufferSize = bufferSize;
}
public boolean hasNext() {
assureBufferFull();
return !buffer.isEmpty();
}
public SAMRecord next() {
assureBufferFull();
if(!hasNext()) throw new NoSuchElementException("No next element available");
return buffer.remove();
}
public void close() {
wrappedIterator.close();
}
public void remove() {
throw new StingException("Unable to remove from a BufferingReadIterator");
}
/**
* If the buffer is empty but there are more elements in the iterator,
*/
private void assureBufferFull() {
if(!buffer.isEmpty())
return;
while(buffer.size() < bufferSize && wrappedIterator.hasNext())
buffer.add(wrappedIterator.next());
}
}