SamplePartitioner optimizations and bugfixes
-- Use a linked hash map instead of a hash map since we want to iterate through the map fairly often -- Ensure that we call doneSubmittingReads before getting reads for samples. This function call fell out before and since it wasn't enforced I only noticed the problem while writing comments -- Don't make unnecessary calls to contains for map. Just use get() and check that the result is null -- Use a LinkedList in PassThroughDownsampler, since this is faster for add() than the existing ArrayList, and we were's using random access to any resulting
This commit is contained in:
parent
19288b007d
commit
a4334a67e0
|
|
@ -27,8 +27,8 @@ package org.broadinstitute.sting.gatk.downsampling;
|
|||
|
||||
import net.sf.samtools.SAMRecord;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
|
@ -41,7 +41,7 @@ import java.util.List;
|
|||
*/
|
||||
public class PassThroughDownsampler<T extends SAMRecord> implements ReadsDownsampler<T> {
|
||||
|
||||
private ArrayList<T> selectedReads;
|
||||
private LinkedList<T> selectedReads;
|
||||
|
||||
public PassThroughDownsampler() {
|
||||
clear();
|
||||
|
|
@ -59,9 +59,13 @@ public class PassThroughDownsampler<T extends SAMRecord> implements ReadsDownsam
|
|||
}
|
||||
|
||||
public boolean hasFinalizedItems() {
|
||||
return selectedReads.size() > 0;
|
||||
return ! selectedReads.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that this list is a linked list and so doesn't support fast random access
|
||||
* @return
|
||||
*/
|
||||
public List<T> consumeFinalizedItems() {
|
||||
// pass by reference rather than make a copy, for speed
|
||||
List<T> downsampledItems = selectedReads;
|
||||
|
|
@ -74,7 +78,7 @@ public class PassThroughDownsampler<T extends SAMRecord> implements ReadsDownsam
|
|||
}
|
||||
|
||||
public T peekFinalized() {
|
||||
return selectedReads.isEmpty() ? null : selectedReads.get(0);
|
||||
return selectedReads.isEmpty() ? null : selectedReads.getFirst();
|
||||
}
|
||||
|
||||
public T peekPending() {
|
||||
|
|
@ -90,7 +94,7 @@ public class PassThroughDownsampler<T extends SAMRecord> implements ReadsDownsam
|
|||
}
|
||||
|
||||
public void clear() {
|
||||
selectedReads = new ArrayList<T>();
|
||||
selectedReads = new LinkedList<T>();
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
|
|
|
|||
|
|
@ -193,13 +193,15 @@ final class ReadStateManager implements Iterable<Map.Entry<String, ReadStateMana
|
|||
submitRead(iterator.next());
|
||||
}
|
||||
|
||||
samplePartitioner.doneSubmittingReads();
|
||||
|
||||
for (final String sample : samples) {
|
||||
final Collection<GATKSAMRecord> newReads = samplePartitioner.getReadsForSample(sample);
|
||||
|
||||
// // if we're keeping reads, take the (potentially downsampled) list of new reads for this sample
|
||||
// // and add to the list of reads. Note this may reorder the list of reads someone (it groups them
|
||||
// // by sample, but it cannot change their absolute position on the genome as they all must
|
||||
// // start at the current location
|
||||
// if we're keeping reads, take the (potentially downsampled) list of new reads for this sample
|
||||
// and add to the list of reads. Note this may reorder the list of reads someone (it groups them
|
||||
// by sample, but it cannot change their absolute position on the genome as they all must
|
||||
// start at the current location
|
||||
if ( keepSubmittedReads )
|
||||
submittedReads.addAll(newReads);
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@
|
|||
|
||||
package org.broadinstitute.sting.utils.locusiterator;
|
||||
|
||||
import com.google.java.contract.Ensures;
|
||||
import com.google.java.contract.Requires;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import org.broadinstitute.sting.gatk.downsampling.Downsampler;
|
||||
import org.broadinstitute.sting.gatk.downsampling.PassThroughDownsampler;
|
||||
|
|
@ -33,49 +35,137 @@ import org.broadinstitute.sting.gatk.downsampling.ReservoirDownsampler;
|
|||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Divides reads by sample and (if requested) does a preliminary downsampling pass with a ReservoirDownsampler.
|
||||
* Divides reads by sample and (if requested) does a preliminary downsampling pass
|
||||
* with a ReservoirDownsampler.
|
||||
*
|
||||
* Note: stores reads by sample ID string, not by sample object
|
||||
*/
|
||||
class SamplePartitioner<T extends SAMRecord> {
|
||||
private Map<String, Downsampler<T>> readsBySample;
|
||||
/**
|
||||
* Map from sample name (as a string) to a downsampler of reads for that sample
|
||||
*/
|
||||
final private Map<String, Downsampler<T>> readsBySample;
|
||||
|
||||
/**
|
||||
* Are we in a state where we're done submitting reads and have semi-finalized the
|
||||
* underlying per sample downsampler?
|
||||
*/
|
||||
boolean doneSubmittingReads = false;
|
||||
|
||||
/**
|
||||
* Create a new SamplePartitioner capable of splitting reads up into buckets of reads for
|
||||
* each sample in samples, and perform a preliminary downsampling of these reads
|
||||
* (separately for each sample) if downsampling is requested in LIBSDownsamplingInfo
|
||||
*
|
||||
* Note that samples must be comprehensive, in that all reads every submitted to this
|
||||
* partitioner must come from one of the samples provided here. If not, submitRead
|
||||
* will throw an exception. Duplicates in the list of samples will be ignored
|
||||
*
|
||||
* @param LIBSDownsamplingInfo do we want to downsample, and if so to what coverage?
|
||||
* @param samples the complete list of samples we're going to partition reads into
|
||||
*/
|
||||
@Ensures({
|
||||
"readsBySample != null",
|
||||
"! readsBySample.isEmpty()",
|
||||
"readsBySample.size() == new HashSet(samples).size()"
|
||||
})
|
||||
public SamplePartitioner(final LIBSDownsamplingInfo LIBSDownsamplingInfo, final List<String> samples) {
|
||||
readsBySample = new HashMap<String, Downsampler<T>>(samples.size());
|
||||
for ( String sample : samples ) {
|
||||
if ( LIBSDownsamplingInfo == null ) throw new IllegalArgumentException("LIBSDownsamplingInfo cannot be null");
|
||||
if ( samples == null || samples.isEmpty() ) throw new IllegalArgumentException("samples must be a non-null, non-empty list but got " + samples);
|
||||
|
||||
readsBySample = new LinkedHashMap<String, Downsampler<T>>(samples.size());
|
||||
for ( final String sample : samples ) {
|
||||
readsBySample.put(sample, createDownsampler(LIBSDownsamplingInfo));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new, ready to use downsampler based on the parameters in LIBSDownsamplingInfo
|
||||
* @param LIBSDownsamplingInfo the parameters to use in creating the downsampler
|
||||
* @return a downsampler appropriate for LIBSDownsamplingInfo. If no downsampling is requested,
|
||||
* uses the PassThroughDownsampler, which does nothing at all.
|
||||
*/
|
||||
@Requires("LIBSDownsamplingInfo != null")
|
||||
@Ensures("result != null")
|
||||
private Downsampler<T> createDownsampler(final LIBSDownsamplingInfo LIBSDownsamplingInfo) {
|
||||
return LIBSDownsamplingInfo.isPerformDownsampling()
|
||||
? new ReservoirDownsampler<T>(LIBSDownsamplingInfo.getToCoverage())
|
||||
: new PassThroughDownsampler<T>();
|
||||
}
|
||||
|
||||
public void submitRead(T read) {
|
||||
String sampleName = read.getReadGroup() != null ? read.getReadGroup().getSample() : null;
|
||||
if (readsBySample.containsKey(sampleName))
|
||||
readsBySample.get(sampleName).submit(read);
|
||||
/**
|
||||
* Offer this read to the partitioner, putting it into the bucket of reads for the sample
|
||||
* of read (obtained via the read's read group).
|
||||
*
|
||||
* If the read group is missing, uses the special "null" read group
|
||||
*
|
||||
* @throws IllegalStateException if the sample of read wasn't present in the original
|
||||
* set of samples provided to this SamplePartitioner at construction
|
||||
*
|
||||
* @param read the read to add to the sample's list of reads
|
||||
*/
|
||||
@Requires("read != null")
|
||||
@Ensures("doneSubmittingReads == false")
|
||||
public void submitRead(final T read) {
|
||||
final String sampleName = read.getReadGroup() != null ? read.getReadGroup().getSample() : null;
|
||||
final Downsampler<T> downsampler = readsBySample.get(sampleName);
|
||||
if ( downsampler == null )
|
||||
throw new IllegalStateException("Offered read with sample name " + sampleName + " to SamplePartitioner " +
|
||||
"but this sample wasn't provided as one of possible samples at construction");
|
||||
|
||||
downsampler.submit(read);
|
||||
doneSubmittingReads = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tell this partitioner that all reads in this cycle have been submitted, so that we
|
||||
* can finalize whatever downsampling is required by each sample.
|
||||
*
|
||||
* Note that we *must* call this function before getReadsForSample, or else that
|
||||
* function will exception out.
|
||||
*/
|
||||
@Ensures("doneSubmittingReads == true")
|
||||
public void doneSubmittingReads() {
|
||||
for ( Map.Entry<String, Downsampler<T>> perSampleReads : readsBySample.entrySet() ) {
|
||||
perSampleReads.getValue().signalEndOfInput();
|
||||
for ( final Downsampler<T> downsampler : readsBySample.values() ) {
|
||||
downsampler.signalEndOfInput();
|
||||
}
|
||||
doneSubmittingReads = true;
|
||||
}
|
||||
|
||||
public Collection<T> getReadsForSample(String sampleName) {
|
||||
if ( ! readsBySample.containsKey(sampleName) )
|
||||
throw new NoSuchElementException("Sample name not found");
|
||||
/**
|
||||
* Get the final collection of reads for this sample for this cycle
|
||||
*
|
||||
* The cycle is defined as all of the reads that occur between
|
||||
* the first call to submitRead until doneSubmittingReads is called. At that
|
||||
* point additional downsampling may occur (depending on construction arguments)
|
||||
* and that set of reads is returned here.
|
||||
*
|
||||
* Note that this function can only be called once per cycle, as underlying
|
||||
* collection of reads is cleared.
|
||||
*
|
||||
* @param sampleName the sample we want reads for, must be present in the original samples
|
||||
* @return a non-null collection of reads for sample in this cycle
|
||||
*/
|
||||
@Ensures("result != null")
|
||||
public Collection<T> getReadsForSample(final String sampleName) {
|
||||
if ( ! doneSubmittingReads ) throw new IllegalStateException("getReadsForSample called before doneSubmittingReads was called");
|
||||
|
||||
return readsBySample.get(sampleName).consumeFinalizedItems();
|
||||
final Downsampler<T> downsampler = readsBySample.get(sampleName);
|
||||
if ( downsampler == null ) throw new NoSuchElementException("Sample name not found");
|
||||
|
||||
return downsampler.consumeFinalizedItems();
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets this SamplePartitioner, indicating that we're starting a new
|
||||
* cycle of adding reads to each underlying downsampler.
|
||||
*/
|
||||
@Ensures("doneSubmittingReads == false")
|
||||
public void reset() {
|
||||
for ( Map.Entry<String, Downsampler<T>> perSampleReads : readsBySample.entrySet() ) {
|
||||
perSampleReads.getValue().clear();
|
||||
perSampleReads.getValue().reset();
|
||||
for ( final Downsampler<T> downsampler : readsBySample.values() ) {
|
||||
downsampler.clear();
|
||||
downsampler.reset();
|
||||
}
|
||||
doneSubmittingReads = false;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue