Merge pull request #267 from broadinstitute/dr_reducereads_downsampling_fix

Exclude reduced reads from elimination during downsampling
This commit is contained in:
Mark DePristo 2013-06-11 13:52:28 -07:00
commit b2dc7095ab
18 changed files with 545 additions and 205 deletions

View File

@ -125,7 +125,14 @@ public class GATKArgumentCollection {
@Argument(fullName = "downsample_to_fraction", shortName = "dfrac", doc = "Fraction [0.0-1.0] of reads to downsample to", required = false) @Argument(fullName = "downsample_to_fraction", shortName = "dfrac", doc = "Fraction [0.0-1.0] of reads to downsample to", required = false)
public Double downsampleFraction = null; public Double downsampleFraction = null;
@Argument(fullName = "downsample_to_coverage", shortName = "dcov", doc = "Coverage [integer] to downsample to at any given locus; note that downsampled reads are randomly selected from all possible reads at a locus. For non-locus-based traversals (eg., ReadWalkers), this sets the maximum number of reads at each alignment start position.", required = false) @Argument(fullName = "downsample_to_coverage", shortName = "dcov",
doc = "Coverage [integer] to downsample to. For locus-based traversals (eg., LocusWalkers and ActiveRegionWalkers)," +
"this controls the maximum depth of coverage at each locus. For non-locus-based traversals (eg., ReadWalkers), " +
"this controls the maximum number of reads sharing the same alignment start position. Note that the " +
"coverage target is an approximate goal that is not guaranteed to be met exactly: the GATK's approach " +
"to downsampling is based on even representation of reads from all alignment start positions, and the " +
"downsampling algorithm will under some circumstances retain slightly more coverage than requested.",
required = false)
public Integer downsampleCoverage = null; public Integer downsampleCoverage = null;
/** /**

View File

@ -25,19 +25,27 @@
package org.broadinstitute.sting.gatk.downsampling; package org.broadinstitute.sting.gatk.downsampling;
import org.broadinstitute.sting.utils.locusiterator.AlignmentStateMachine;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
* The basic downsampler API, with no reads-specific operations. * The basic downsampler API, with no reads-specific operations.
* *
* Downsamplers that extend this interface rather than the ReadsDownsampler interface can handle * Downsamplers that extend this class rather than the ReadsDownsampler class can handle
* any kind of item, however they cannot be wrapped within a DownsamplingReadsIterator or a * any kind of item, however they cannot be wrapped within a DownsamplingReadsIterator or a
* PerSampleDownsamplingReadsIterator. * PerSampleDownsamplingReadsIterator.
* *
* @author David Roazen * @author David Roazen
*/ */
public interface Downsampler<T> { public abstract class Downsampler<T> {
/**
* Number of items discarded by this downsampler since the last call to resetStats()
*/
protected int numDiscardedItems = 0;
/** /**
* Submit one item to the downsampler for consideration. Some downsamplers will be able to determine * Submit one item to the downsampler for consideration. Some downsamplers will be able to determine
@ -46,7 +54,7 @@ public interface Downsampler<T> {
* *
* @param item the individual item to submit to the downsampler for consideration * @param item the individual item to submit to the downsampler for consideration
*/ */
public void submit( T item ); public abstract void submit( final T item );
/** /**
* Submit a collection of items to the downsampler for consideration. Should be equivalent to calling * Submit a collection of items to the downsampler for consideration. Should be equivalent to calling
@ -54,21 +62,29 @@ public interface Downsampler<T> {
* *
* @param items the collection of items to submit to the downsampler for consideration * @param items the collection of items to submit to the downsampler for consideration
*/ */
public void submit( Collection<T> items ); public void submit( final Collection<T> items ) {
if ( items == null ) {
throw new IllegalArgumentException("submitted items must not be null");
}
for ( final T item : items ) {
submit(item);
}
}
/** /**
* Are there items that have survived the downsampling process waiting to be retrieved? * Are there items that have survived the downsampling process waiting to be retrieved?
* *
* @return true if this downsampler has > 0 finalized items, otherwise false * @return true if this downsampler has > 0 finalized items, otherwise false
*/ */
public boolean hasFinalizedItems(); public abstract boolean hasFinalizedItems();
/** /**
* Return (and *remove*) all items that have survived downsampling and are waiting to be retrieved. * Return (and *remove*) all items that have survived downsampling and are waiting to be retrieved.
* *
* @return a list of all finalized items this downsampler contains, or an empty list if there are none * @return a list of all finalized items this downsampler contains, or an empty list if there are none
*/ */
public List<T> consumeFinalizedItems(); public abstract List<T> consumeFinalizedItems();
/** /**
* Are there items stored in this downsampler that it doesn't yet know whether they will * Are there items stored in this downsampler that it doesn't yet know whether they will
@ -76,7 +92,7 @@ public interface Downsampler<T> {
* *
* @return true if this downsampler has > 0 pending items, otherwise false * @return true if this downsampler has > 0 pending items, otherwise false
*/ */
public boolean hasPendingItems(); public abstract boolean hasPendingItems();
/** /**
* Peek at the first finalized item stored in this downsampler (or null if there are no finalized items) * Peek at the first finalized item stored in this downsampler (or null if there are no finalized items)
@ -84,7 +100,7 @@ public interface Downsampler<T> {
* @return the first finalized item in this downsampler (the item is not removed from the downsampler by this call), * @return the first finalized item in this downsampler (the item is not removed from the downsampler by this call),
* or null if there are none * or null if there are none
*/ */
public T peekFinalized(); public abstract T peekFinalized();
/** /**
* Peek at the first pending item stored in this downsampler (or null if there are no pending items) * Peek at the first pending item stored in this downsampler (or null if there are no pending items)
@ -92,7 +108,7 @@ public interface Downsampler<T> {
* @return the first pending item stored in this downsampler (the item is not removed from the downsampler by this call), * @return the first pending item stored in this downsampler (the item is not removed from the downsampler by this call),
* or null if there are none * or null if there are none
*/ */
public T peekPending(); public abstract T peekPending();
/** /**
* Get the current number of items in this downsampler * Get the current number of items in this downsampler
@ -103,7 +119,7 @@ public interface Downsampler<T> {
* *
* @return a positive integer * @return a positive integer
*/ */
public int size(); public abstract int size();
/** /**
* Returns the number of items discarded (so far) during the downsampling process * Returns the number of items discarded (so far) during the downsampling process
@ -111,21 +127,46 @@ public interface Downsampler<T> {
* @return the number of items that have been submitted to this downsampler and discarded in the process of * @return the number of items that have been submitted to this downsampler and discarded in the process of
* downsampling * downsampling
*/ */
public int getNumberOfDiscardedItems(); public int getNumberOfDiscardedItems() {
return numDiscardedItems;
}
/** /**
* Used to tell the downsampler that no more items will be submitted to it, and that it should * Used to tell the downsampler that no more items will be submitted to it, and that it should
* finalize any pending items. * finalize any pending items.
*/ */
public void signalEndOfInput(); public abstract void signalEndOfInput();
/** /**
* Empty the downsampler of all finalized/pending items * Empty the downsampler of all finalized/pending items
*/ */
public void clear(); public abstract void clearItems();
/** /**
* Reset stats in the downsampler such as the number of discarded items *without* clearing the downsampler of items * Reset stats in the downsampler such as the number of discarded items *without* clearing the downsampler of items
*/ */
public void reset(); public void resetStats() {
numDiscardedItems = 0;
}
/**
* Indicates whether an item should be excluded from elimination during downsampling. By default,
* all items representing reduced reads are excluded from downsampling, but individual downsamplers
* may override if they are able to handle reduced reads correctly. Downsamplers should check
* the return value of this method before discarding an item.
*
* @param item The item to test
* @return true if the item should not be subject to elimination during downsampling, otherwise false
*/
protected boolean doNotDiscardItem( final Object item ) {
// Use getClass() rather than instanceof for performance reasons. Ugly but fast.
if ( item.getClass() == GATKSAMRecord.class ) {
return ((GATKSAMRecord)item).isReducedRead();
}
else if ( item.getClass() == AlignmentStateMachine.class ) {
return ((AlignmentStateMachine)item).isReducedRead();
}
return false;
}
} }

View File

@ -30,7 +30,6 @@ import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
@ -41,13 +40,11 @@ import java.util.List;
* *
* @author David Roazen * @author David Roazen
*/ */
public class FractionalDownsampler<T extends SAMRecord> implements ReadsDownsampler<T> { public class FractionalDownsampler<T extends SAMRecord> extends ReadsDownsampler<T> {
private ArrayList<T> selectedReads; private ArrayList<T> selectedReads;
private int cutoffForInclusion; private final int cutoffForInclusion;
private int numDiscardedItems;
private static final int RANDOM_POOL_SIZE = 10000; private static final int RANDOM_POOL_SIZE = 10000;
@ -57,18 +54,19 @@ public class FractionalDownsampler<T extends SAMRecord> implements ReadsDownsamp
* @param fraction Fraction of reads to preserve, between 0.0 (inclusive) and 1.0 (inclusive). * @param fraction Fraction of reads to preserve, between 0.0 (inclusive) and 1.0 (inclusive).
* Actual number of reads preserved may differ randomly. * Actual number of reads preserved may differ randomly.
*/ */
public FractionalDownsampler( double fraction ) { public FractionalDownsampler( final double fraction ) {
if ( fraction < 0.0 || fraction > 1.0 ) { if ( fraction < 0.0 || fraction > 1.0 ) {
throw new ReviewedStingException("Fraction of reads to include must be between 0.0 and 1.0, inclusive"); throw new ReviewedStingException("Fraction of reads to include must be between 0.0 and 1.0, inclusive");
} }
cutoffForInclusion = (int)(fraction * RANDOM_POOL_SIZE); cutoffForInclusion = (int)(fraction * RANDOM_POOL_SIZE);
clear(); clearItems();
reset(); resetStats();
} }
public void submit( T newRead ) { @Override
if ( GenomeAnalysisEngine.getRandomGenerator().nextInt(10000) < cutoffForInclusion ) { public void submit( final T newRead ) {
if ( GenomeAnalysisEngine.getRandomGenerator().nextInt(10000) < cutoffForInclusion || doNotDiscardItem(newRead) ) {
selectedReads.add(newRead); selectedReads.add(newRead);
} }
else { else {
@ -76,61 +74,56 @@ public class FractionalDownsampler<T extends SAMRecord> implements ReadsDownsamp
} }
} }
public void submit( Collection<T> newReads ) { @Override
for ( T read : newReads ) {
submit(read);
}
}
public boolean hasFinalizedItems() { public boolean hasFinalizedItems() {
return selectedReads.size() > 0; return selectedReads.size() > 0;
} }
@Override
public List<T> consumeFinalizedItems() { public List<T> consumeFinalizedItems() {
// pass by reference rather than make a copy, for speed // pass by reference rather than make a copy, for speed
List<T> downsampledItems = selectedReads; List<T> downsampledItems = selectedReads;
clear(); clearItems();
return downsampledItems; return downsampledItems;
} }
@Override
public boolean hasPendingItems() { public boolean hasPendingItems() {
return false; return false;
} }
@Override
public T peekFinalized() { public T peekFinalized() {
return selectedReads.isEmpty() ? null : selectedReads.get(0); return selectedReads.isEmpty() ? null : selectedReads.get(0);
} }
@Override
public T peekPending() { public T peekPending() {
return null; return null;
} }
public int getNumberOfDiscardedItems() {
return numDiscardedItems;
}
@Override @Override
public int size() { public int size() {
return selectedReads.size(); return selectedReads.size();
} }
@Override
public void signalEndOfInput() { public void signalEndOfInput() {
// NO-OP // NO-OP
} }
public void clear() { @Override
public void clearItems() {
selectedReads = new ArrayList<T>(); selectedReads = new ArrayList<T>();
} }
public void reset() { @Override
numDiscardedItems = 0;
}
public boolean requiresCoordinateSortOrder() { public boolean requiresCoordinateSortOrder() {
return false; return false;
} }
public void signalNoMoreReadsBefore( T read ) { @Override
public void signalNoMoreReadsBefore( final T read ) {
// NO-OP // NO-OP
} }
} }

View File

@ -46,16 +46,15 @@ import java.util.*;
* *
* @author David Roazen * @author David Roazen
*/ */
public class LevelingDownsampler<T extends List<E>, E> implements Downsampler<T> { public class LevelingDownsampler<T extends List<E>, E> extends Downsampler<T> {
private final int minElementsPerStack; private final int minElementsPerStack;
private final int targetSize; private final int targetSize;
private List<T> groups; private List<T> groups;
private boolean groupsAreFinalized; private boolean groupsAreFinalized;
private int numDiscardedItems;
/** /**
* Construct a LevelingDownsampler * Construct a LevelingDownsampler
* *
@ -65,7 +64,7 @@ public class LevelingDownsampler<T extends List<E>, E> implements Downsampler<T>
* this value -- if it does, items are removed from Lists evenly until the total size * this value -- if it does, items are removed from Lists evenly until the total size
* is <= this value * is <= this value
*/ */
public LevelingDownsampler( int targetSize ) { public LevelingDownsampler( final int targetSize ) {
this(targetSize, 1); this(targetSize, 1);
} }
@ -79,55 +78,58 @@ public class LevelingDownsampler<T extends List<E>, E> implements Downsampler<T>
* if a stack has only 3 elements and minElementsPerStack is 3, no matter what * if a stack has only 3 elements and minElementsPerStack is 3, no matter what
* we'll not reduce this stack below 3. * we'll not reduce this stack below 3.
*/ */
public LevelingDownsampler(final int targetSize, final int minElementsPerStack) { public LevelingDownsampler( final int targetSize, final int minElementsPerStack ) {
if ( targetSize < 0 ) throw new IllegalArgumentException("targetSize must be >= 0 but got " + targetSize); if ( targetSize < 0 ) throw new IllegalArgumentException("targetSize must be >= 0 but got " + targetSize);
if ( minElementsPerStack < 0 ) throw new IllegalArgumentException("minElementsPerStack must be >= 0 but got " + minElementsPerStack); if ( minElementsPerStack < 0 ) throw new IllegalArgumentException("minElementsPerStack must be >= 0 but got " + minElementsPerStack);
this.targetSize = targetSize; this.targetSize = targetSize;
this.minElementsPerStack = minElementsPerStack; this.minElementsPerStack = minElementsPerStack;
clear(); clearItems();
reset(); resetStats();
} }
public void submit( T item ) { @Override
public void submit( final T item ) {
groups.add(item); groups.add(item);
} }
public void submit( Collection<T> items ){ @Override
public void submit( final Collection<T> items ){
groups.addAll(items); groups.addAll(items);
} }
@Override
public boolean hasFinalizedItems() { public boolean hasFinalizedItems() {
return groupsAreFinalized && groups.size() > 0; return groupsAreFinalized && groups.size() > 0;
} }
@Override
public List<T> consumeFinalizedItems() { public List<T> consumeFinalizedItems() {
if ( ! hasFinalizedItems() ) { if ( ! hasFinalizedItems() ) {
return new ArrayList<T>(); return new ArrayList<T>();
} }
// pass by reference rather than make a copy, for speed // pass by reference rather than make a copy, for speed
List<T> toReturn = groups; final List<T> toReturn = groups;
clear(); clearItems();
return toReturn; return toReturn;
} }
@Override
public boolean hasPendingItems() { public boolean hasPendingItems() {
return ! groupsAreFinalized && groups.size() > 0; return ! groupsAreFinalized && groups.size() > 0;
} }
@Override
public T peekFinalized() { public T peekFinalized() {
return hasFinalizedItems() ? groups.get(0) : null; return hasFinalizedItems() ? groups.get(0) : null;
} }
@Override
public T peekPending() { public T peekPending() {
return hasPendingItems() ? groups.get(0) : null; return hasPendingItems() ? groups.get(0) : null;
} }
public int getNumberOfDiscardedItems() {
return numDiscardedItems;
}
@Override @Override
public int size() { public int size() {
int s = 0; int s = 0;
@ -137,26 +139,24 @@ public class LevelingDownsampler<T extends List<E>, E> implements Downsampler<T>
return s; return s;
} }
@Override
public void signalEndOfInput() { public void signalEndOfInput() {
levelGroups(); levelGroups();
groupsAreFinalized = true; groupsAreFinalized = true;
} }
public void clear() { @Override
public void clearItems() {
groups = new ArrayList<T>(); groups = new ArrayList<T>();
groupsAreFinalized = false; groupsAreFinalized = false;
} }
public void reset() {
numDiscardedItems = 0;
}
private void levelGroups() { private void levelGroups() {
final int[] groupSizes = new int[groups.size()];
int totalSize = 0; int totalSize = 0;
int[] groupSizes = new int[groups.size()];
int currentGroupIndex = 0; int currentGroupIndex = 0;
for ( T group : groups ) { for ( final T group : groups ) {
groupSizes[currentGroupIndex] = group.size(); groupSizes[currentGroupIndex] = group.size();
totalSize += groupSizes[currentGroupIndex]; totalSize += groupSizes[currentGroupIndex];
currentGroupIndex++; currentGroupIndex++;
@ -191,20 +191,18 @@ public class LevelingDownsampler<T extends List<E>, E> implements Downsampler<T>
// Now we actually go through and reduce each group to its new count as specified in groupSizes // Now we actually go through and reduce each group to its new count as specified in groupSizes
currentGroupIndex = 0; currentGroupIndex = 0;
for ( T group : groups ) { for ( final T group : groups ) {
downsampleOneGroup(group, groupSizes[currentGroupIndex]); downsampleOneGroup(group, groupSizes[currentGroupIndex]);
currentGroupIndex++; currentGroupIndex++;
} }
} }
private void downsampleOneGroup( T group, int numItemsToKeep ) { private void downsampleOneGroup( final T group, final int numItemsToKeep ) {
if ( numItemsToKeep >= group.size() ) { if ( numItemsToKeep >= group.size() ) {
return; return;
} }
numDiscardedItems += group.size() - numItemsToKeep; final BitSet itemsToKeep = new BitSet(group.size());
BitSet itemsToKeep = new BitSet(group.size());
for ( Integer selectedIndex : MathUtils.sampleIndicesWithoutReplacement(group.size(), numItemsToKeep) ) { for ( Integer selectedIndex : MathUtils.sampleIndicesWithoutReplacement(group.size(), numItemsToKeep) ) {
itemsToKeep.set(selectedIndex); itemsToKeep.set(selectedIndex);
} }
@ -213,12 +211,13 @@ public class LevelingDownsampler<T extends List<E>, E> implements Downsampler<T>
// If our group is a linked list, we can remove the desired items in a single O(n) pass with an iterator // If our group is a linked list, we can remove the desired items in a single O(n) pass with an iterator
if ( group instanceof LinkedList ) { if ( group instanceof LinkedList ) {
Iterator iter = group.iterator(); final Iterator<E> iter = group.iterator();
while ( iter.hasNext() ) { while ( iter.hasNext() ) {
iter.next(); final E item = iter.next();
if ( ! itemsToKeep.get(currentIndex) ) { if ( ! itemsToKeep.get(currentIndex) && ! doNotDiscardItem(item) ) {
iter.remove(); iter.remove();
numDiscardedItems++;
} }
currentIndex++; currentIndex++;
@ -227,14 +226,15 @@ public class LevelingDownsampler<T extends List<E>, E> implements Downsampler<T>
// If it's not a linked list, it's more efficient to copy the desired items into a new list and back rather // If it's not a linked list, it's more efficient to copy the desired items into a new list and back rather
// than suffer O(n^2) of item shifting // than suffer O(n^2) of item shifting
else { else {
List<E> keptItems = new ArrayList<E>(numItemsToKeep); final List<E> keptItems = new ArrayList<E>(group.size());
for ( E item : group ) { for ( final E item : group ) {
if ( itemsToKeep.get(currentIndex) ) { if ( itemsToKeep.get(currentIndex) || doNotDiscardItem(item) ) {
keptItems.add(item); keptItems.add(item);
} }
currentIndex++; currentIndex++;
} }
numDiscardedItems += group.size() - keptItems.size();
group.clear(); group.clear();
group.addAll(keptItems); group.addAll(keptItems);
} }

View File

@ -27,7 +27,6 @@ package org.broadinstitute.sting.gatk.downsampling;
import net.sf.samtools.SAMRecord; import net.sf.samtools.SAMRecord;
import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -39,25 +38,21 @@ import java.util.List;
* *
* @author David Roazen * @author David Roazen
*/ */
public class PassThroughDownsampler<T extends SAMRecord> implements ReadsDownsampler<T> { public class PassThroughDownsampler<T extends SAMRecord> extends ReadsDownsampler<T> {
private LinkedList<T> selectedReads; private LinkedList<T> selectedReads;
public PassThroughDownsampler() { public PassThroughDownsampler() {
clear(); clearItems();
} }
@Override
public void submit( T newRead ) { public void submit( T newRead ) {
// All reads pass-through, no reads get downsampled // All reads pass-through, no reads get downsampled
selectedReads.add(newRead); selectedReads.add(newRead);
} }
public void submit( Collection<T> newReads ) { @Override
for ( T read : newReads ) {
submit(read);
}
}
public boolean hasFinalizedItems() { public boolean hasFinalizedItems() {
return ! selectedReads.isEmpty(); return ! selectedReads.isEmpty();
} }
@ -66,50 +61,50 @@ public class PassThroughDownsampler<T extends SAMRecord> implements ReadsDownsam
* Note that this list is a linked list and so doesn't support fast random access * Note that this list is a linked list and so doesn't support fast random access
* @return * @return
*/ */
@Override
public List<T> consumeFinalizedItems() { public List<T> consumeFinalizedItems() {
// pass by reference rather than make a copy, for speed // pass by reference rather than make a copy, for speed
List<T> downsampledItems = selectedReads; final List<T> downsampledItems = selectedReads;
clear(); clearItems();
return downsampledItems; return downsampledItems;
} }
@Override
public boolean hasPendingItems() { public boolean hasPendingItems() {
return false; return false;
} }
@Override
public T peekFinalized() { public T peekFinalized() {
return selectedReads.isEmpty() ? null : selectedReads.getFirst(); return selectedReads.isEmpty() ? null : selectedReads.getFirst();
} }
@Override
public T peekPending() { public T peekPending() {
return null; return null;
} }
public int getNumberOfDiscardedItems() {
return 0;
}
@Override @Override
public int size() { public int size() {
return selectedReads.size(); return selectedReads.size();
} }
@Override
public void signalEndOfInput() { public void signalEndOfInput() {
// NO-OP // NO-OP
} }
public void clear() { @Override
public void clearItems() {
selectedReads = new LinkedList<T>(); selectedReads = new LinkedList<T>();
} }
public void reset() { @Override
// NO-OP
}
public boolean requiresCoordinateSortOrder() { public boolean requiresCoordinateSortOrder() {
return false; return false;
} }
@Override
public void signalNoMoreReadsBefore( T read ) { public void signalNoMoreReadsBefore( T read ) {
// NO-OP // NO-OP
} }

View File

@ -32,14 +32,14 @@ import net.sf.samtools.SAMRecord;
* *
* @author David Roazen * @author David Roazen
*/ */
public interface ReadsDownsampler<T extends SAMRecord> extends Downsampler<T> { public abstract class ReadsDownsampler<T extends SAMRecord> extends Downsampler<T> {
/** /**
* Does this downsampler require that reads be fed to it in coordinate order? * Does this downsampler require that reads be fed to it in coordinate order?
* *
* @return true if reads must be submitted to this downsampler in coordinate order, otherwise false * @return true if reads must be submitted to this downsampler in coordinate order, otherwise false
*/ */
public boolean requiresCoordinateSortOrder(); public abstract boolean requiresCoordinateSortOrder();
/** /**
* Tell this downsampler that no more reads located before the provided read (according to * Tell this downsampler that no more reads located before the provided read (according to
@ -52,5 +52,5 @@ public interface ReadsDownsampler<T extends SAMRecord> extends Downsampler<T> {
* @param read the downsampler will assume that no reads located before this read will ever * @param read the downsampler will assume that no reads located before this read will ever
* be submitted to it in the future * be submitted to it in the future
*/ */
public void signalNoMoreReadsBefore( T read ); public abstract void signalNoMoreReadsBefore( final T read );
} }

View File

@ -39,7 +39,12 @@ import java.util.*;
* *
* @author David Roazen * @author David Roazen
*/ */
public class ReservoirDownsampler<T extends SAMRecord> implements ReadsDownsampler<T> { public class ReservoirDownsampler<T extends SAMRecord> extends ReadsDownsampler<T> {
/**
* size of our reservoir -- ie., the maximum number of reads from the stream that will be retained
* (not including any undiscardable items)
*/
private final int targetSampleSize; private final int targetSampleSize;
/** /**
@ -58,17 +63,33 @@ public class ReservoirDownsampler<T extends SAMRecord> implements ReadsDownsampl
*/ */
private List<T> reservoir; private List<T> reservoir;
/**
* Certain items (eg., reduced reads) cannot be discarded at all during downsampling. We store
* these items separately so as not to impact the fair selection of items for inclusion in the
* reservoir. These items are returned (and cleared) along with any items in the reservoir in
* calls to consumeFinalizedItems().
*/
private List<T> undiscardableItems;
/**
* Are we currently using a linked list for the reservoir?
*/
private boolean isLinkedList; private boolean isLinkedList;
private int totalReadsSeen; /**
* Count of the number of reads seen that were actually eligible for discarding. Used by the reservoir downsampling
* algorithm to ensure that all discardable reads have an equal chance of making it into the reservoir.
*/
private int totalDiscardableReadsSeen;
private int numDiscardedItems;
/** /**
* Construct a ReservoirDownsampler * Construct a ReservoirDownsampler
* *
* @param targetSampleSize Size of the reservoir used by this downsampler. Number of items retained * @param targetSampleSize Size of the reservoir used by this downsampler. Number of items retained
* after downsampling will be min(totalReads, targetSampleSize) * after downsampling will be min(totalDiscardableReads, targetSampleSize) + any
* undiscardable reads (eg., reduced reads).
*
* @param expectFewOverflows if true, this downsampler will be optimized for the case * @param expectFewOverflows if true, this downsampler will be optimized for the case
* where most of the time we won't fill up anything like the * where most of the time we won't fill up anything like the
* targetSampleSize elements. If this is false, we will allocate * targetSampleSize elements. If this is false, we will allocate
@ -76,15 +97,15 @@ public class ReservoirDownsampler<T extends SAMRecord> implements ReadsDownsampl
* the cost of allocation if we often use targetSampleSize or more * the cost of allocation if we often use targetSampleSize or more
* elements. * elements.
*/ */
public ReservoirDownsampler ( final int targetSampleSize, final boolean expectFewOverflows) { public ReservoirDownsampler ( final int targetSampleSize, final boolean expectFewOverflows ) {
if ( targetSampleSize <= 0 ) { if ( targetSampleSize <= 0 ) {
throw new ReviewedStingException("Cannot do reservoir downsampling with a sample size <= 0"); throw new ReviewedStingException("Cannot do reservoir downsampling with a sample size <= 0");
} }
this.targetSampleSize = targetSampleSize; this.targetSampleSize = targetSampleSize;
this.expectFewOverflows = expectFewOverflows; this.expectFewOverflows = expectFewOverflows;
clear(); clearItems();
reset(); resetStats();
} }
/** /**
@ -93,15 +114,21 @@ public class ReservoirDownsampler<T extends SAMRecord> implements ReadsDownsampl
* @param targetSampleSize Size of the reservoir used by this downsampler. Number of items retained * @param targetSampleSize Size of the reservoir used by this downsampler. Number of items retained
* after downsampling will be min(totalReads, targetSampleSize) * after downsampling will be min(totalReads, targetSampleSize)
*/ */
public ReservoirDownsampler ( int targetSampleSize ) { public ReservoirDownsampler ( final int targetSampleSize ) {
this(targetSampleSize, false); this(targetSampleSize, false);
} }
@Override
public void submit ( final T newRead ) {
if ( doNotDiscardItem(newRead) ) {
undiscardableItems.add(newRead);
return;
}
public void submit ( T newRead ) { // Only count reads that are actually eligible for discarding for the purposes of the reservoir downsampling algorithm
totalReadsSeen++; totalDiscardableReadsSeen++;
if ( totalReadsSeen <= targetSampleSize ) { if ( totalDiscardableReadsSeen <= targetSampleSize ) {
reservoir.add(newRead); reservoir.add(newRead);
} }
else { else {
@ -110,7 +137,7 @@ public class ReservoirDownsampler<T extends SAMRecord> implements ReadsDownsampl
isLinkedList = false; isLinkedList = false;
} }
final int randomSlot = GenomeAnalysisEngine.getRandomGenerator().nextInt(totalReadsSeen); final int randomSlot = GenomeAnalysisEngine.getRandomGenerator().nextInt(totalDiscardableReadsSeen);
if ( randomSlot < targetSampleSize ) { if ( randomSlot < targetSampleSize ) {
reservoir.set(randomSlot, newRead); reservoir.set(randomSlot, newRead);
} }
@ -118,49 +145,46 @@ public class ReservoirDownsampler<T extends SAMRecord> implements ReadsDownsampl
} }
} }
public void submit ( Collection<T> newReads ) { @Override
for ( T read : newReads ) {
submit(read);
}
}
public boolean hasFinalizedItems() { public boolean hasFinalizedItems() {
return reservoir.size() > 0; return ! reservoir.isEmpty() || ! undiscardableItems.isEmpty();
} }
@Override
public List<T> consumeFinalizedItems() { public List<T> consumeFinalizedItems() {
if ( reservoir.isEmpty() ) { if ( ! hasFinalizedItems() ) {
// if there's nothing here, don't both allocating a new list completely // if there's nothing here, don't bother allocating a new list
return Collections.emptyList(); return Collections.emptyList();
} else { } else {
// pass by reference rather than make a copy, for speed // pass reservoir by reference rather than make a copy, for speed
List<T> downsampledItems = reservoir; final List<T> downsampledItems = reservoir;
clear(); downsampledItems.addAll(undiscardableItems);
clearItems();
return downsampledItems; return downsampledItems;
} }
} }
@Override
public boolean hasPendingItems() { public boolean hasPendingItems() {
return false; return false;
} }
@Override
public T peekFinalized() { public T peekFinalized() {
return reservoir.isEmpty() ? null : reservoir.get(0); return ! reservoir.isEmpty() ? reservoir.get(0) : (! undiscardableItems.isEmpty() ? undiscardableItems.get(0) : null);
} }
@Override
public T peekPending() { public T peekPending() {
return null; return null;
} }
public int getNumberOfDiscardedItems() { @Override
return numDiscardedItems; public int size() {
return reservoir.size() + undiscardableItems.size();
} }
@Override @Override
public int size() {
return reservoir.size();
}
public void signalEndOfInput() { public void signalEndOfInput() {
// NO-OP // NO-OP
} }
@ -168,25 +192,27 @@ public class ReservoirDownsampler<T extends SAMRecord> implements ReadsDownsampl
/** /**
* Clear the data structures used to hold information * Clear the data structures used to hold information
*/ */
public void clear() { @Override
public void clearItems() {
// if we aren't expecting many overflows, allocate a linked list not an arraylist // if we aren't expecting many overflows, allocate a linked list not an arraylist
reservoir = expectFewOverflows ? new LinkedList<T>() : new ArrayList<T>(targetSampleSize); reservoir = expectFewOverflows ? new LinkedList<T>() : new ArrayList<T>(targetSampleSize);
// there's no possibility of overflow with the undiscardable items, so we always use a linked list for them
undiscardableItems = new LinkedList<>();
// it's a linked list if we allocate one // it's a linked list if we allocate one
isLinkedList = expectFewOverflows; isLinkedList = expectFewOverflows;
// an internal stat used by the downsampling process, so not cleared by reset() below // an internal stat used by the downsampling process, so not cleared by resetStats() below
totalReadsSeen = 0; totalDiscardableReadsSeen = 0;
}
public void reset() {
numDiscardedItems = 0;
} }
@Override
public boolean requiresCoordinateSortOrder() { public boolean requiresCoordinateSortOrder() {
return false; return false;
} }
@Override
public void signalNoMoreReadsBefore( T read ) { public void signalNoMoreReadsBefore( T read ) {
// NO-OP // NO-OP
} }

View File

@ -35,11 +35,11 @@ import java.util.*;
* *
* @author David Roazen * @author David Roazen
*/ */
public class SimplePositionalDownsampler<T extends SAMRecord> implements ReadsDownsampler<T> { public class SimplePositionalDownsampler<T extends SAMRecord> extends ReadsDownsampler<T> {
private int targetCoverage; private final int targetCoverage;
private ReservoirDownsampler<T> reservoir; private final ReservoirDownsampler<T> reservoir;
private int currentContigIndex; private int currentContigIndex;
@ -51,97 +51,93 @@ public class SimplePositionalDownsampler<T extends SAMRecord> implements ReadsDo
private ArrayList<T> finalizedReads; private ArrayList<T> finalizedReads;
private int numDiscardedItems;
/** /**
* Construct a SimplePositionalDownsampler * Construct a SimplePositionalDownsampler
* *
* @param targetCoverage Maximum number of reads that may share any given alignment start position * @param targetCoverage Maximum number of reads that may share any given alignment start position
*/ */
public SimplePositionalDownsampler( int targetCoverage ) { public SimplePositionalDownsampler( final int targetCoverage ) {
this.targetCoverage = targetCoverage; this.targetCoverage = targetCoverage;
reservoir = new ReservoirDownsampler<T>(targetCoverage); reservoir = new ReservoirDownsampler<T>(targetCoverage);
finalizedReads = new ArrayList<T>(); finalizedReads = new ArrayList<T>();
clear(); clearItems();
reset(); resetStats();
} }
public void submit( T newRead ) { @Override
public void submit( final T newRead ) {
updatePositionalState(newRead); updatePositionalState(newRead);
if ( unmappedReadsReached ) { // don't downsample the unmapped reads at the end of the stream if ( unmappedReadsReached ) { // don't downsample the unmapped reads at the end of the stream
finalizedReads.add(newRead); finalizedReads.add(newRead);
} }
else { else {
int reservoirPreviouslyDiscardedItems = reservoir.getNumberOfDiscardedItems(); final int reservoirPreviouslyDiscardedItems = reservoir.getNumberOfDiscardedItems();
// our reservoir downsampler will call doNotDiscardItem() for us to exclude items from elimination as appropriate
reservoir.submit(newRead); reservoir.submit(newRead);
numDiscardedItems += reservoir.getNumberOfDiscardedItems() - reservoirPreviouslyDiscardedItems; numDiscardedItems += reservoir.getNumberOfDiscardedItems() - reservoirPreviouslyDiscardedItems;
} }
} }
public void submit( Collection<T> newReads ) { @Override
for ( T read : newReads ) {
submit(read);
}
}
public boolean hasFinalizedItems() { public boolean hasFinalizedItems() {
return finalizedReads.size() > 0; return finalizedReads.size() > 0;
} }
@Override
public List<T> consumeFinalizedItems() { public List<T> consumeFinalizedItems() {
// pass by reference rather than make a copy, for speed // pass by reference rather than make a copy, for speed
List<T> toReturn = finalizedReads; final List<T> toReturn = finalizedReads;
finalizedReads = new ArrayList<T>(); finalizedReads = new ArrayList<T>();
return toReturn; return toReturn;
} }
@Override
public boolean hasPendingItems() { public boolean hasPendingItems() {
return reservoir.hasFinalizedItems(); return reservoir.hasFinalizedItems();
} }
@Override
public T peekFinalized() { public T peekFinalized() {
return finalizedReads.isEmpty() ? null : finalizedReads.get(0); return finalizedReads.isEmpty() ? null : finalizedReads.get(0);
} }
@Override
public T peekPending() { public T peekPending() {
return reservoir.peekFinalized(); return reservoir.peekFinalized();
} }
public int getNumberOfDiscardedItems() {
return numDiscardedItems;
}
@Override @Override
public int size() { public int size() {
return finalizedReads.size() + reservoir.size(); return finalizedReads.size() + reservoir.size();
} }
@Override
public void signalEndOfInput() { public void signalEndOfInput() {
finalizeReservoir(); finalizeReservoir();
} }
public void clear() { @Override
reservoir.clear(); public void clearItems() {
reservoir.reset(); reservoir.clearItems();
reservoir.resetStats();
finalizedReads.clear(); finalizedReads.clear();
positionEstablished = false; positionEstablished = false;
unmappedReadsReached = false; unmappedReadsReached = false;
} }
public void reset() { @Override
numDiscardedItems = 0;
}
public boolean requiresCoordinateSortOrder() { public boolean requiresCoordinateSortOrder() {
return true; return true;
} }
public void signalNoMoreReadsBefore( T read ) { @Override
public void signalNoMoreReadsBefore( final T read ) {
updatePositionalState(read); updatePositionalState(read);
} }
private void updatePositionalState( T newRead ) { private void updatePositionalState( final T newRead ) {
if ( readIsPastCurrentPosition(newRead) ) { if ( readIsPastCurrentPosition(newRead) ) {
if ( reservoir.hasFinalizedItems() ) { if ( reservoir.hasFinalizedItems() ) {
finalizeReservoir(); finalizeReservoir();
@ -155,13 +151,13 @@ public class SimplePositionalDownsampler<T extends SAMRecord> implements ReadsDo
} }
} }
private void setCurrentPosition( T read ) { private void setCurrentPosition( final T read ) {
currentContigIndex = read.getReferenceIndex(); currentContigIndex = read.getReferenceIndex();
currentAlignmentStart = read.getAlignmentStart(); currentAlignmentStart = read.getAlignmentStart();
positionEstablished = true; positionEstablished = true;
} }
private boolean readIsPastCurrentPosition( T read ) { private boolean readIsPastCurrentPosition( final T read ) {
return ! positionEstablished || return ! positionEstablished ||
read.getReferenceIndex() > currentContigIndex || read.getReferenceIndex() > currentContigIndex ||
read.getAlignmentStart() > currentAlignmentStart || read.getAlignmentStart() > currentAlignmentStart ||
@ -170,6 +166,6 @@ public class SimplePositionalDownsampler<T extends SAMRecord> implements ReadsDo
private void finalizeReservoir() { private void finalizeReservoir() {
finalizedReads.addAll(reservoir.consumeFinalizedItems()); finalizedReads.addAll(reservoir.consumeFinalizedItems());
reservoir.reset(); reservoir.resetStats();
} }
} }

View File

@ -43,17 +43,42 @@ import java.util.List;
* Time: 11:23 AM * Time: 11:23 AM
*/ */
public class TAROrderedReadCache { public class TAROrderedReadCache {
final int maxCapacity; private final int maxCapacity;
final Downsampler<GATKSAMRecord> downsampler; private ArrayList<GATKSAMRecord> undownsampledCache;
private Downsampler<GATKSAMRecord> downsampler;
private static final int UNDOWNSAMPLED_CACHE_MAX_INITIAL_SIZE = 10000;
/** /**
* Create a new empty ReadCache * Create a new empty ReadCache
* @param maxCapacity the max capacity of the read cache. * @param maxCapacity the max capacity of the read cache.
*/ */
public TAROrderedReadCache(int maxCapacity) { public TAROrderedReadCache( final int maxCapacity ) {
if ( maxCapacity < 0 ) throw new IllegalArgumentException("maxCapacity must be >= 0 but got " + maxCapacity); if ( maxCapacity < 0 ) throw new IllegalArgumentException("maxCapacity must be >= 0 but got " + maxCapacity);
this.maxCapacity = maxCapacity; this.maxCapacity = maxCapacity;
this.downsampler = new ReservoirDownsampler<GATKSAMRecord>(maxCapacity);
// The one we're not currently using will always be null:
initializeUndownsampledCache();
this.downsampler = null;
}
/**
* Moves all reads over to the downsampler, causing it to be used from this point on. Should be called
* when the undownsampledCache fills up and we need to start discarding reads. Since the
* ReservoirDownsampler doesn't preserve relative ordering, pop operations become expensive
* after this point, as they require a O(n log n) sort.
*/
private void activateDownsampler() {
downsampler = new ReservoirDownsampler<>(maxCapacity, false);
downsampler.submit(undownsampledCache);
undownsampledCache = null; // preferable to the O(n) clear() method
}
/**
* Allocate the undownsampled cache used when we have fewer than maxCapacity items
*/
private void initializeUndownsampledCache() {
undownsampledCache = new ArrayList<>(Math.min(maxCapacity + 1, UNDOWNSAMPLED_CACHE_MAX_INITIAL_SIZE));
} }
/** /**
@ -68,18 +93,31 @@ public class TAROrderedReadCache {
* Add a single read to this cache. Assumed to be in sorted order w.r.t. the previously added reads * Add a single read to this cache. Assumed to be in sorted order w.r.t. the previously added reads
* @param read a read to add * @param read a read to add
*/ */
public void add(final GATKSAMRecord read) { public void add( final GATKSAMRecord read ) {
if ( read == null ) throw new IllegalArgumentException("Read cannot be null"); if ( read == null ) throw new IllegalArgumentException("Read cannot be null");
downsampler.submit(read);
if ( downsampler != null ) {
downsampler.submit(read);
}
else {
undownsampledCache.add(read);
// No more room in the undownsampledCache? Time to start downsampling
if ( undownsampledCache.size() > maxCapacity ) {
activateDownsampler();
}
}
} }
/** /**
* Add a collection of reads to this cache. Assumed to be in sorted order w.r.t. the previously added reads and each other * Add a collection of reads to this cache. Assumed to be in sorted order w.r.t. the previously added reads and each other
* @param reads a collection of reads to add * @param reads a collection of reads to add
*/ */
public void addAll(final List<GATKSAMRecord> reads) { public void addAll( final List<GATKSAMRecord> reads ) {
if ( reads == null ) throw new IllegalArgumentException("Reads cannot be null"); if ( reads == null ) throw new IllegalArgumentException("Reads cannot be null");
downsampler.submit(reads); for ( final GATKSAMRecord read : reads ) {
add(read);
}
} }
/** /**
@ -87,40 +125,44 @@ public class TAROrderedReadCache {
* @return a positive integer * @return a positive integer
*/ */
public int size() { public int size() {
return downsampler.size(); return downsampler != null ? downsampler.size() : undownsampledCache.size();
} }
/** /**
* How many reads were discarded since the last call to popCurrentReads * How many reads were discarded since the last call to popCurrentReads
* @return *
* @return number of items discarded during downsampling since last pop operation
*/ */
public int getNumDiscarded() { public int getNumDiscarded() {
return downsampler.getNumberOfDiscardedItems(); return downsampler != null ? downsampler.getNumberOfDiscardedItems() : 0;
} }
/** /**
* Removes all reads currently in the cache, and returns them in sorted order (w.r.t. alignmentStart) * Removes all reads currently in the cache, and returns them in sorted order (w.r.t. alignmentStart)
* *
* Flushes this cache, so after this call the cache will contain no reads and all downsampling stats will * Flushes this cache, so after this call the cache will contain no reads, and we'll be in the same
* be reset. * initial state as the constructor would put us in, with a non-null undownsampledCache and a null
* downsampler.
* *
* @return a list of GATKSAMRecords in this cache * @return a list of GATKSAMRecords in this cache
*/ */
public List<GATKSAMRecord> popCurrentReads() { public List<GATKSAMRecord> popCurrentReads() {
final List<GATKSAMRecord> maybeUnordered = downsampler.consumeFinalizedItems(); final List<GATKSAMRecord> poppedReads;
final List<GATKSAMRecord> ordered; if ( downsampler == null ) {
if ( downsampler.getNumberOfDiscardedItems() == 0 ) { poppedReads = undownsampledCache; // avoid making a copy here, since we're going to allocate a new cache
// haven't discarded anything, so the reads are ordered properly }
ordered = maybeUnordered; else {
} else { // If we triggered the downsampler, we need to sort the reads before returning them,
// we need to sort these damn things: O(n log n) // since the ReservoirDownsampler is not guaranteed to preserve relative ordering of items.
ordered = new ArrayList<GATKSAMRecord>(maybeUnordered); // After consuming the downsampled items in this call to popCurrentReads(), we switch back
Collections.sort(ordered, new AlignmentStartComparator()); // to using the undownsampledCache until we fill up again.
poppedReads = downsampler.consumeFinalizedItems(); // avoid making a copy here
Collections.sort(poppedReads, new AlignmentStartComparator());
downsampler = null;
} }
// reset the downsampler stats so getNumberOfDiscardedItems is 0 initializeUndownsampledCache();
downsampler.reset(); return poppedReads;
return ordered;
} }
} }

View File

@ -123,6 +123,15 @@ public class AlignmentStateMachine {
return getRead().getReferenceIndex(); return getRead().getReferenceIndex();
} }
/**
* Is our read a reduced read?
*
* @return true if the read we encapsulate is a reduced read, otherwise false
*/
public boolean isReducedRead() {
return read.isReducedRead();
}
/** /**
* Is this the left edge state? I.e., one that is before or after the current read? * Is this the left edge state? I.e., one that is before or after the current read?
* @return true if this state is an edge state, false otherwise * @return true if this state is an edge state, false otherwise

View File

@ -167,7 +167,7 @@ final class PerSampleReadStateManager implements Iterable<AlignmentStateMachine>
// use returned List directly rather than make a copy, for efficiency's sake // use returned List directly rather than make a copy, for efficiency's sake
readStatesByAlignmentStart = flattenByAlignmentStart(levelingDownsampler.consumeFinalizedItems()); readStatesByAlignmentStart = flattenByAlignmentStart(levelingDownsampler.consumeFinalizedItems());
levelingDownsampler.reset(); levelingDownsampler.resetStats();
} }
return nStatesAdded; return nStatesAdded;

View File

@ -164,8 +164,8 @@ class SamplePartitioner<T extends SAMRecord> {
@Ensures("doneSubmittingReads == false") @Ensures("doneSubmittingReads == false")
public void reset() { public void reset() {
for ( final Downsampler<T> downsampler : readsBySample.values() ) { for ( final Downsampler<T> downsampler : readsBySample.values() ) {
downsampler.clear(); downsampler.clearItems();
downsampler.reset(); downsampler.resetStats();
} }
doneSubmittingReads = false; doneSubmittingReads = false;
} }

View File

@ -324,6 +324,31 @@ public class ArtificialSAMUtils {
return Arrays.asList(left, right); return Arrays.asList(left, right);
} }
/**
* Create an artificial reduced read based on the parameters. The cigar string will be *M, where * is the
* length of the read. The base counts specified in the baseCounts array will be stored fully encoded in
* the RR attribute.
*
* @param header the SAM header to associate the read with
* @param name the name of the read
* @param refIndex the reference index, i.e. what chromosome to associate it with
* @param alignmentStart where to start the alignment
* @param length the length of the read
* @param baseCounts reduced base counts to encode in the RR attribute; length must match the read length
* @return the artificial reduced read
*/
public static GATKSAMRecord createArtificialReducedRead( final SAMFileHeader header,
final String name,
final int refIndex,
final int alignmentStart,
final int length,
final int[] baseCounts ) {
final GATKSAMRecord read = createArtificialRead(header, name, refIndex, alignmentStart, length);
read.setReducedReadCounts(baseCounts);
read.setReducedReadCountsTag();
return read;
}
/** /**
* Create a collection of identical artificial reads based on the parameters. The cigar string for each * Create a collection of identical artificial reads based on the parameters. The cigar string for each
* read will be *M, where * is the length of the read. * read will be *M, where * is the length of the read.

View File

@ -30,6 +30,7 @@ import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils; import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.testng.Assert; import org.testng.Assert;
@ -152,7 +153,39 @@ public class FractionalDownsamplerUnitTest extends BaseTest {
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), test.totalReads - downsampledReads.size()); Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), test.totalReads - downsampledReads.size());
downsampler.reset(); downsampler.resetStats();
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0); Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0);
} }
@Test
public void testDoNotDiscardReducedReads() {
GenomeAnalysisEngine.resetRandomGenerator();
final ReadsDownsampler<GATKSAMRecord> downsampler = new FractionalDownsampler<GATKSAMRecord>(0.0);
final Collection<GATKSAMRecord> reads = new ArrayList<GATKSAMRecord>();
final SAMFileHeader header = ArtificialSAMUtils.createArtificialSamHeader(1, 1, 1000000);
final int[] baseCounts = { 10, 10, 10, 10, 10 };
for ( int i = 1; i <= 10; i++ ) {
reads.add(ArtificialSAMUtils.createArtificialReducedRead(header, "foo", 0, 1, 5, baseCounts));
}
for ( int i = 1; i <= 5; i++ ) {
reads.add(ArtificialSAMUtils.createArtificialRead(header, "foo", 0, 1, 5));
}
downsampler.submit(reads);
downsampler.signalEndOfInput();
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 5, "wrong number of items discarded by the downsampler");
Assert.assertTrue(downsampler.hasFinalizedItems(), "downsampler should have finalized items but doesn't");
Assert.assertEquals(downsampler.size(), 10, "downsampler size() reports wrong number of items");
final Collection<GATKSAMRecord> readsReturned = downsampler.consumeFinalizedItems();
Assert.assertEquals(readsReturned.size(), 10, "wrong number of items returned by the downsampler");
for ( GATKSAMRecord readReturned : readsReturned ) {
Assert.assertTrue(readReturned.isReducedRead(), "non-reduced read survived the downsampling process, but shouldn't have");
}
}
} }

View File

@ -25,16 +25,17 @@
package org.broadinstitute.sting.gatk.downsampling; package org.broadinstitute.sting.gatk.downsampling;
import net.sf.samtools.SAMFileHeader;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.utils.locusiterator.AlignmentStateMachine;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;
import org.testng.Assert; import org.testng.Assert;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
public class LevelingDownsamplerUnitTest extends BaseTest { public class LevelingDownsamplerUnitTest extends BaseTest {
@ -158,9 +159,46 @@ public class LevelingDownsamplerUnitTest extends BaseTest {
Assert.assertEquals(numItemsReportedDiscarded, numItemsActuallyDiscarded); Assert.assertEquals(numItemsReportedDiscarded, numItemsActuallyDiscarded);
downsampler.reset(); downsampler.resetStats();
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0); Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0);
Assert.assertTrue(totalRemainingItems <= Math.max(test.targetSize, test.numStacks)); Assert.assertTrue(totalRemainingItems <= Math.max(test.targetSize, test.numStacks));
} }
@Test
public void testDoNotDiscardReducedReads() {
GenomeAnalysisEngine.resetRandomGenerator();
final Downsampler<LinkedList<AlignmentStateMachine>> downsampler = new LevelingDownsampler<LinkedList<AlignmentStateMachine>, AlignmentStateMachine>(1);
final Collection<LinkedList<AlignmentStateMachine>> groups = new LinkedList<LinkedList<AlignmentStateMachine>>();
final SAMFileHeader header = ArtificialSAMUtils.createArtificialSamHeader(1, 1, 1000000);
final int[] baseCounts = { 10, 10, 10, 10, 10 };
for ( int alignmentStart : Arrays.asList(1, 2, 3) ) {
final LinkedList<AlignmentStateMachine> group = new LinkedList<AlignmentStateMachine>();
for ( int i = 1; i <= 10; i++ ) {
group.add(new AlignmentStateMachine(ArtificialSAMUtils.createArtificialReducedRead(header, "foo", 0, alignmentStart, 5, baseCounts)));
}
groups.add(group);
}
downsampler.submit(groups);
downsampler.signalEndOfInput();
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0, "wrong number of items discarded by the downsampler");
Assert.assertTrue(downsampler.hasFinalizedItems(), "downsampler should have finalized items but doesn't");
Assert.assertEquals(downsampler.size(), 30, "downsampler size() reports wrong number of items");
final Collection<LinkedList<AlignmentStateMachine>> groupsReturned = downsampler.consumeFinalizedItems();
Assert.assertEquals(groupsReturned.size(), 3, "wrong number of groups returned by the downsampler");
for ( LinkedList<AlignmentStateMachine> group : groupsReturned ) {
Assert.assertEquals(group.size(), 10, "group has wrong size after downsampling");
for ( AlignmentStateMachine state : group ) {
Assert.assertTrue(state.isReducedRead());
}
}
}
} }

View File

@ -30,6 +30,7 @@ import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils; import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.testng.Assert; import org.testng.Assert;
@ -125,7 +126,49 @@ public class ReservoirDownsamplerUnitTest extends BaseTest {
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), test.expectedNumDiscardedItems); Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), test.expectedNumDiscardedItems);
Assert.assertEquals(test.totalReads - downsampledReads.size(), test.expectedNumDiscardedItems); Assert.assertEquals(test.totalReads - downsampledReads.size(), test.expectedNumDiscardedItems);
downsampler.reset(); downsampler.resetStats();
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0); Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0);
} }
@Test
public void testDoNotDiscardReducedReads() {
GenomeAnalysisEngine.resetRandomGenerator();
final ReadsDownsampler<GATKSAMRecord> downsampler = new ReservoirDownsampler<GATKSAMRecord>(1);
final Collection<GATKSAMRecord> reads = new ArrayList<GATKSAMRecord>();
final SAMFileHeader header = ArtificialSAMUtils.createArtificialSamHeader(1, 1, 1000000);
final int[] baseCounts = { 10, 10, 10, 10, 10 };
for ( int i = 1; i <= 10; i++ ) {
reads.add(ArtificialSAMUtils.createArtificialReducedRead(header, "foo", 0, 1, 5, baseCounts));
}
for ( int i = 1; i <= 5; i++ ) {
reads.add(ArtificialSAMUtils.createArtificialRead(header, "foo", 0, 1, 5));
}
downsampler.submit(reads);
downsampler.signalEndOfInput();
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 4, "wrong number of items discarded by the downsampler");
Assert.assertTrue(downsampler.hasFinalizedItems(), "downsampler should have finalized items but doesn't");
Assert.assertEquals(downsampler.size(), 11, "downsampler size() reports wrong number of items");
final Collection<GATKSAMRecord> readsReturned = downsampler.consumeFinalizedItems();
Assert.assertEquals(readsReturned.size(), 11, "wrong number of items returned by the downsampler");
int numReducedReadsReturned = 0;
int numNormalReadsReturned = 0;
for ( GATKSAMRecord readReturned : readsReturned ) {
if ( readReturned.isReducedRead() ) {
numReducedReadsReturned++;
}
else {
numNormalReadsReturned++;
}
}
Assert.assertEquals(numReducedReadsReturned, 10, "wrong number of reduced reads returned by the downsampler");
Assert.assertEquals(numNormalReadsReturned, 1, "wrong number of non-reduced reads returned by the downsampler");
}
} }

View File

@ -177,7 +177,7 @@ public class SimplePositionalDownsamplerUnitTest extends BaseTest {
Assert.assertEquals(numReadsActuallyEliminated, numReadsReportedEliminated); Assert.assertEquals(numReadsActuallyEliminated, numReadsReportedEliminated);
} }
downsampler.reset(); downsampler.resetStats();
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0); Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0);
} }
@ -328,4 +328,48 @@ public class SimplePositionalDownsamplerUnitTest extends BaseTest {
Assert.assertEquals(downsampledReads.size(), 10); Assert.assertEquals(downsampledReads.size(), 10);
} }
@Test
public void testDoNotDiscardReducedReads() {
GenomeAnalysisEngine.resetRandomGenerator();
final ReadsDownsampler<GATKSAMRecord> downsampler = new SimplePositionalDownsampler<GATKSAMRecord>(1);
final Collection<GATKSAMRecord> reads = new ArrayList<GATKSAMRecord>();
final SAMFileHeader header = ArtificialSAMUtils.createArtificialSamHeader(1, 1, 1000000);
final int[] baseCounts = { 10, 10, 10, 10, 10 };
for ( int alignmentStart : Arrays.asList(1, 2, 3) ) {
for ( int i = 1; i <= 10; i++ ) {
reads.add(ArtificialSAMUtils.createArtificialReducedRead(header, "foo", 0, alignmentStart, 5, baseCounts));
}
for ( int i = 1; i <= 5; i++ ) {
reads.add(ArtificialSAMUtils.createArtificialRead(header, "foo", 0, alignmentStart, 5));
}
}
downsampler.submit(reads);
downsampler.signalEndOfInput();
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 12, "wrong number of items discarded by the downsampler");
Assert.assertTrue(downsampler.hasFinalizedItems(), "downsampler should have finalized items but doesn't");
Assert.assertEquals(downsampler.size(), 33, "downsampler size() reports wrong number of items");
final Collection<GATKSAMRecord> readsReturned = downsampler.consumeFinalizedItems();
Assert.assertEquals(readsReturned.size(), 33, "wrong number of items returned by the downsampler");
int numReducedReadsReturned = 0;
int numNormalReadsReturned = 0;
for ( GATKSAMRecord readReturned : readsReturned ) {
if ( readReturned.isReducedRead() ) {
numReducedReadsReturned++;
}
else {
numNormalReadsReturned++;
}
}
Assert.assertEquals(numReducedReadsReturned, 30, "wrong number of reduced reads returned by the downsampler");
Assert.assertEquals(numNormalReadsReturned, 3, "wrong number of non-reduced reads returned by the downsampler");
}
} }

View File

@ -26,9 +26,11 @@
package org.broadinstitute.sting.gatk.traversals; package org.broadinstitute.sting.gatk.traversals;
import net.sf.picard.reference.IndexedFastaSequenceFile; import net.sf.picard.reference.IndexedFastaSequenceFile;
import net.sf.samtools.SAMFileHeader;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile; import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile;
import org.broadinstitute.sting.utils.sam.ArtificialBAMBuilder; import org.broadinstitute.sting.utils.sam.ArtificialBAMBuilder;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord; import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
@ -39,6 +41,7 @@ import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
public class TAROrderedReadCacheUnitTest extends BaseTest { public class TAROrderedReadCacheUnitTest extends BaseTest {
@ -98,8 +101,53 @@ public class TAROrderedReadCacheUnitTest extends BaseTest {
Assert.assertEquals(cache.getNumDiscarded(), 0, "should have reset stats"); Assert.assertEquals(cache.getNumDiscarded(), 0, "should have reset stats");
Assert.assertEquals(cacheReads.size(), nExpectedToKeep, "should have 1 read for every read we expected to keep"); Assert.assertEquals(cacheReads.size(), nExpectedToKeep, "should have 1 read for every read we expected to keep");
verifySortednessOfReads(cacheReads);
}
@Test
public void testReadCacheWithReducedReads() {
final List<GATKSAMRecord> reads = new ArrayList<GATKSAMRecord>();
final SAMFileHeader header = ArtificialSAMUtils.createArtificialSamHeader(1, 1, 1000000);
final int[] baseCounts = { 10, 10, 10, 10, 10 };
for ( int i = 1; i <= 100; i++ ) {
reads.add(ArtificialSAMUtils.createArtificialReducedRead(header, "foo", 0, i, 5, baseCounts));
reads.add(ArtificialSAMUtils.createArtificialRead(header, "foo", 0, i, 5));
}
final TAROrderedReadCache cache = new TAROrderedReadCache(50);
cache.addAll(reads);
// Our cache should have kept all of the reduced reads (which are retained unconditionally and do not count
// towards the capacity limit), and discarded half of the 100 non-reduced reads due to the cache capacity
// limit of 50.
Assert.assertEquals(cache.size(), 150, "wrong number of reads in the cache at the end");
Assert.assertEquals(cache.getNumDiscarded(), 50, "wrong number of reads discarded from the cache");
final List<GATKSAMRecord> cacheReads = cache.popCurrentReads();
int numReducedReadsRetained = 0;
int numNormalReadsRetained = 0;
for ( GATKSAMRecord read : cacheReads ) {
if ( read.isReducedRead() ) {
numReducedReadsRetained++;
}
else {
numNormalReadsRetained++;
}
}
Assert.assertEquals(numReducedReadsRetained, 100, "wrong number of reduced reads retained in the cache");
Assert.assertEquals(numNormalReadsRetained, 50, "wrong number of non-reduced reads retained in the cache");
verifySortednessOfReads(cacheReads);
}
private void verifySortednessOfReads( final List<GATKSAMRecord> reads) {
int lastStart = -1; int lastStart = -1;
for ( final GATKSAMRecord read : cacheReads ) { for ( GATKSAMRecord read : reads ) {
Assert.assertTrue(lastStart <= read.getAlignmentStart(), "Reads should be sorted but weren't. Found read with start " + read.getAlignmentStart() + " while last was " + lastStart); Assert.assertTrue(lastStart <= read.getAlignmentStart(), "Reads should be sorted but weren't. Found read with start " + read.getAlignmentStart() + " while last was " + lastStart);
lastStart = read.getAlignmentStart(); lastStart = read.getAlignmentStart();
} }