Switching over DownsamplingLocusIteratorByState -> LocusIteratorByState. Some operations

will not be as fast as they could be because the workflow is currently merge sam records (sharding)
-> split sam records (LocusIteratorByState) -> merge records (LocusIteraotorByState) -> split
records (StratifiedAlignmentContext), but this will be fixed when StratifiedAlignmentContext
is updated to take advantage of the new functionality in ReadBackedPileup.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3599 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-06-21 02:11:42 +00:00
parent 1d50fc7087
commit c806ffba5f
4 changed files with 701 additions and 1255 deletions

View File

@ -68,13 +68,7 @@ public class WindowMaker implements Iterable<WindowMaker.WindowMakerIterator>, I
LocusIterator locusIterator;
Iterator<SAMRecord> wrappedIterator = TraversalEngine.addMandatoryFilteringIterators(iterator, filters);
if(sourceInfo.getDownsamplingMethod() != null &&
(sourceInfo.getDownsamplingMethod().type == DownsampleType.EXPERIMENTAL_BY_SAMPLE)) {
if ( discards.size() > 0 )
throw new StingException("Experimental downsampling iterator doesn't support base discarding at this point; complain to Matt Hanna");
locusIterator = new DownsamplingLocusIteratorByState(wrappedIterator,sourceInfo);
} else
locusIterator = new LocusIteratorByState(wrappedIterator,sourceInfo, discards);
locusIterator = new LocusIteratorByState(wrappedIterator,sourceInfo,discards);
this.locusOverflowTracker = locusIterator.getLocusOverflowTracker();

View File

@ -26,13 +26,14 @@
package org.broadinstitute.sting.gatk.iterators;
import net.sf.samtools.*;
import net.sf.picard.filter.SamRecordFilter;
import net.sf.picard.util.PeekableIterator;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.gatk.DownsamplingMethod;
import org.broadinstitute.sting.gatk.DownsampleType;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.utils.*;
import org.broadinstitute.sting.utils.sam.ReadUtils;
import org.broadinstitute.sting.utils.pileup.*;
import java.util.*;
@ -47,7 +48,6 @@ public class LocusIteratorByState extends LocusIterator {
//
//public enum Discard { ADAPTOR_BASES }
//public static final EnumSet<Discard> NO_DISCARDS = EnumSet.noneOf(Discard.class);
public static final List<LocusIteratorFilter> NO_FILTERS = Arrays.asList();
/**
@ -64,9 +64,11 @@ public class LocusIteratorByState extends LocusIterator {
// member fields
//
// -----------------------------------------------------------------------------------------------------------------
private final PushbackIterator<SAMRecord> it;
private boolean hasExtendedEvents = false; // will be set to true if at least one read had an indel right before the current position
private final Collection<String> sampleNames = new ArrayList<String>();
private final ReadStateManager readStates;
private class SAMRecordState {
SAMRecord read;
int readOffset = -1; // how far are we offset from the start of the read bases?
@ -256,12 +258,11 @@ public class LocusIteratorByState extends LocusIterator {
}
}
private LinkedList<SAMRecordState> readStates = new LinkedList<SAMRecordState>();
//final boolean DEBUG = false;
//final boolean DEBUG2 = false && DEBUG;
private Reads readInfo;
private AlignmentContext nextAlignmentContext;
private List<LocusIteratorFilter> filters = new ArrayList<LocusIteratorFilter>();
private List<LocusIteratorFilter> filters = new ArrayList<LocusIteratorFilter>();
// -----------------------------------------------------------------------------------------------------------------
//
@ -271,9 +272,16 @@ public class LocusIteratorByState extends LocusIterator {
public LocusIteratorByState(final Iterator<SAMRecord> samIterator, Reads readInformation ) {
this(samIterator, readInformation, NO_FILTERS);
}
public LocusIteratorByState(final Iterator<SAMRecord> samIterator, Reads readInformation, List<LocusIteratorFilter> filters ) {
this.it = new PushbackIterator<SAMRecord>(samIterator);
// Aggregate all sample names.
// TODO: Push in header via constructor
if(GenomeAnalysisEngine.instance != null && GenomeAnalysisEngine.instance.getDataSource() != null) {
sampleNames.addAll(SampleUtils.getSAMFileSamples(GenomeAnalysisEngine.instance.getSAMFileHeader()));
}
// Add a null sample name as a catch-all for reads without samples
if(!sampleNames.contains(null)) sampleNames.add(null);
readStates = new ReadStateManager(samIterator,readInformation.getDownsamplingMethod(),readInformation.getMaxReadsAtLocus(),sampleNames);
this.readInfo = readInformation;
this.filters = filters;
overflowTracker = new LocusOverflowTracker(readInformation.getMaxReadsAtLocus());
@ -293,24 +301,25 @@ public class LocusIteratorByState extends LocusIterator {
//if ( DEBUG ) System.out.printf("hasNext() = %b%n", r);
// if we don't have a next record, make sure we clean the warning queue
// TODO: Note that this implementation requires that hasNext() always be called before next().
if (!r) overflowTracker.cleanWarningQueue();
return r;
}
public void printState() {
for ( SAMRecordState state : readStates ) {
logger.debug(String.format("printState():"));
SAMRecord read = state.getRead();
int offset = state.getReadOffset();
logger.debug(String.format(" read: %s(%d)=%s, cigar=%s", read.getReadName(), offset, (char)read.getReadBases()[offset], read.getCigarString()));
for(String sampleName: sampleNames) {
Iterator<SAMRecordState> iterator = readStates.iteratorForSample(sampleName);
while(iterator.hasNext()) {
SAMRecordState state = iterator.next();
logger.debug(String.format("printState():"));
SAMRecord read = state.getRead();
int offset = state.getReadOffset();
logger.debug(String.format(" read: %s(%d)=%s, cigar=%s", read.getReadName(), offset, (char)read.getReadBases()[offset], read.getCigarString()));
}
}
}
public void clear() {
logger.debug(String.format(("clear() called")));
readStates.clear();
}
private GenomeLoc getLocation() {
return readStates.isEmpty() ? null : readStates.getFirst().getLocation();
}
@ -334,9 +343,9 @@ public class LocusIteratorByState extends LocusIterator {
* nextAlignmentContext MUST BE null in order for this method to advance to the next entry.
*/
private void lazyLoadNextAlignmentContext() {
while(nextAlignmentContext == null && (!readStates.isEmpty() || it.hasNext())) {
while(nextAlignmentContext == null && readStates.hasNext()) {
// this call will set hasExtendedEvents to true if it picks up a read with indel right before the current position on the ref:
collectPendingReads(readInfo.getMaxReadsAtLocus());
readStates.collectPendingReads();
int size = 0;
int nDeletions = 0;
@ -350,144 +359,113 @@ public class LocusIteratorByState extends LocusIterator {
// In this case, the subsequent call to next() will emit the normal pileup at the current base
// and shift the position.
if (readInfo.generateExtendedEvents() && hasExtendedEvents) {
ArrayList<ExtendedEventPileupElement> indelPile = new ArrayList<ExtendedEventPileupElement>(readStates.size());
Map<String,AbstractReadBackedPileup<ReadBackedExtendedEventPileup,ExtendedEventPileupElement>> fullExtendedEventPileup =
new HashMap<String,AbstractReadBackedPileup<ReadBackedExtendedEventPileup,ExtendedEventPileupElement>>();
int maxDeletionLength = 0;
for ( SAMRecordState state : readStates ) {
if ( state.hadIndel() ) {
size++;
if ( state.getEventBases() == null ) {
nDeletions++;
maxDeletionLength = Math.max(maxDeletionLength,state.getEventLength());
}
else nInsertions++;
indelPile.add ( new ExtendedEventPileupElement(state.getRead(),
state.getReadEventStartOffset(),
state.getEventLength(),
state.getEventBases()) );
} else {
if ( state.getCurrentCigarOperator() != CigarOperator.N ) {
// this read has no indel associated with the previous position on the ref;
// we count this read in only if it has actual bases, not N span...
if ( state.getCurrentCigarOperator() != CigarOperator.D || readInfo.includeReadsWithDeletionAtLoci() ) {
// if cigar operator is D but the read has no extended event reported (that's why we ended
// up in this branch), it means that we are currently inside a deletion that started earlier;
// we count such reads (with a longer deletion spanning over a deletion at the previous base we are
// about to report) only if includeReadsWithDeletionAtLoci is true.
size++;
indelPile.add ( new ExtendedEventPileupElement(state.getRead(),
state.getReadOffset()-1,
-1) // length=-1 --> noevent
);
}
}
}
if ( state.getRead().getMappingQuality() == 0 ) {
nMQ0Reads++;
}
}
hasExtendedEvents = false; // we are done with extended events prior to current ref base
SAMRecordState our1stState = readStates.getFirst();
// get current location on the reference and decrement it by 1: the indels we just stepped over
// are associated with the *previous* reference base
GenomeLoc loc = GenomeLocParser.incPos(our1stState.getLocation(),-1);
for(String sampleName: sampleNames) {
Iterator<SAMRecordState> iterator = readStates.iteratorForSample(sampleName);
ArrayList<ExtendedEventPileupElement> indelPile = new ArrayList<ExtendedEventPileupElement>(readStates.size());
size = 0;
nDeletions = 0;
nInsertions = 0;
nMQ0Reads = 0;
int maxDeletionLength = 0;
while(iterator.hasNext()) {
SAMRecordState state = iterator.next();
if ( state.hadIndel() ) {
size++;
if ( state.getEventBases() == null ) {
nDeletions++;
maxDeletionLength = Math.max(maxDeletionLength,state.getEventLength());
}
else nInsertions++;
indelPile.add ( new ExtendedEventPileupElement(state.getRead(),
state.getReadEventStartOffset(),
state.getEventLength(),
state.getEventBases()) );
} else {
if ( state.getCurrentCigarOperator() != CigarOperator.N ) {
// this read has no indel associated with the previous position on the ref;
// we count this read in only if it has actual bases, not N span...
if ( state.getCurrentCigarOperator() != CigarOperator.D || readInfo.includeReadsWithDeletionAtLoci() ) {
// if cigar operator is D but the read has no extended event reported (that's why we ended
// up in this branch), it means that we are currently inside a deletion that started earlier;
// we count such reads (with a longer deletion spanning over a deletion at the previous base we are
// about to report) only if includeReadsWithDeletionAtLoci is true.
size++;
indelPile.add ( new ExtendedEventPileupElement(state.getRead(),
state.getReadOffset()-1,
-1) // length=-1 --> noevent
);
}
}
}
if ( state.getRead().getMappingQuality() == 0 ) {
nMQ0Reads++;
}
}
if( indelPile.size() != 0 ) fullExtendedEventPileup.put(sampleName,new ReadBackedExtendedEventPileupImpl(loc,indelPile,size,maxDeletionLength,nInsertions,nDeletions,nMQ0Reads));
}
hasExtendedEvents = false; // we are done with extended events prior to current ref base
// System.out.println("Indel(s) at "+loc);
// for ( ExtendedEventPileupElement pe : indelPile ) { if ( pe.isIndel() ) System.out.println(" "+pe.toString()); }
nextAlignmentContext = new AlignmentContext(loc, new ReadBackedExtendedEventPileupImpl(loc, indelPile, size, maxDeletionLength, nInsertions, nDeletions, nMQ0Reads));
nextAlignmentContext = new AlignmentContext(loc, new ReadBackedExtendedEventPileupImpl(loc, fullExtendedEventPileup));
} else {
ArrayList<PileupElement> pile = new ArrayList<PileupElement>(readStates.size());
GenomeLoc location = getLocation();
Map<String,AbstractReadBackedPileup<ReadBackedPileup,PileupElement>> fullPileup = new HashMap<String,AbstractReadBackedPileup<ReadBackedPileup,PileupElement>>();
// todo -- performance problem -- should be lazy, really
for ( SAMRecordState state : readStates ) {
if ( state.getCurrentCigarOperator() != CigarOperator.D && state.getCurrentCigarOperator() != CigarOperator.N ) {
if ( filterRead(state.getRead(), getLocation().getStart(), filters ) ) {
discarded_bases++;
//printStatus("Adaptor bases", discarded_adaptor_bases);
continue;
} else {
observed_bases++;
pile.add(new PileupElement(state.getRead(), state.getReadOffset()));
size++;
}
} else if ( readInfo.includeReadsWithDeletionAtLoci() && state.getCurrentCigarOperator() != CigarOperator.N ) {
size++;
pile.add(new PileupElement(state.getRead(), -1));
nDeletions++;
}
for(String sampleName: sampleNames) {
Iterator<SAMRecordState> iterator = readStates.iteratorForSample(sampleName);
ArrayList<PileupElement> pile = new ArrayList<PileupElement>(readStates.size());
// todo -- this looks like a bug w.r.t. including reads with deletion at loci -- MAD 05/27/10
if ( state.getRead().getMappingQuality() == 0 ) {
nMQ0Reads++;
size = 0;
nDeletions = 0;
nInsertions = 0;
nMQ0Reads = 0;
while(iterator.hasNext()) {
SAMRecordState state = iterator.next();
if ( state.getCurrentCigarOperator() != CigarOperator.D && state.getCurrentCigarOperator() != CigarOperator.N ) {
if ( filterRead(state.getRead(), location.getStart(), filters ) ) {
discarded_bases++;
//printStatus("Adaptor bases", discarded_adaptor_bases);
continue;
} else {
observed_bases++;
pile.add(new PileupElement(state.getRead(), state.getReadOffset()));
size++;
}
} else if ( readInfo.includeReadsWithDeletionAtLoci() && state.getCurrentCigarOperator() != CigarOperator.N ) {
size++;
pile.add(new PileupElement(state.getRead(), -1));
nDeletions++;
}
// todo -- this looks like a bug w.r.t. including reads with deletion at loci -- MAD 05/27/10
if ( state.getRead().getMappingQuality() == 0 ) {
nMQ0Reads++;
}
}
if( pile.size() != 0 ) fullPileup.put(sampleName,new ReadBackedPileupImpl(location,pile,size,nDeletions,nMQ0Reads));
}
GenomeLoc loc = getLocation();
updateReadStates(); // critical - must be called after we get the current state offsets and location
// if we got reads with non-D/N over the current position, we are done
if ( pile.size() != 0 ) nextAlignmentContext = new AlignmentContext(loc, new ReadBackedPileupImpl(loc, pile, size, nDeletions, nMQ0Reads));
if ( !fullPileup.isEmpty() ) nextAlignmentContext = new AlignmentContext(location, new ReadBackedPileupImpl(location,fullPileup));
}
}
}
private static boolean filterRead(SAMRecord rec, long pos, List<LocusIteratorFilter> filters) {
for ( LocusIteratorFilter filter : filters ) {
if ( filter.filterOut(rec, pos) ) {
return true;
}
}
return false;
}
private void printStatus(final String title, long n) {
if ( n % 10000 == 0 )
System.out.printf("%s %d / %d = %.2f%n", title, n, observed_bases, 100.0 * n / (observed_bases + 1));
}
private void collectPendingReads(int maxReads) {
//if (DEBUG) {
// logger.debug(String.format("entering collectPendingReads..., hasNext=%b", it.hasNext()));
// printState();
//}
GenomeLoc location = null;
// the farthest right a read extends
Integer rightMostEnd = -1;
int curSize = readStates.size(); // simple performance improvement -- avoids unnecessary size() operation
while (it.hasNext()) {
SAMRecord read = it.next();
if (readIsPastCurrentPosition(read)) {
// We've collected up all the reads that span this region
it.pushback(read);
break;
} else {
if (curSize < maxReads) {
SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents());
state.stepForwardOnGenome();
readStates.add(state);
curSize++;
if (state.hadIndel()) hasExtendedEvents = true;
//if (DEBUG) logger.debug(String.format(" ... added read %s", read.getReadName()));
} else {
if (location == null)
location = GenomeLocParser.createGenomeLoc(read);
rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd;
}
}
}
if (location != null)
overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd),
curSize);
}
// fast testing of position
private boolean readIsPastCurrentPosition(SAMRecord read) {
if ( readStates.isEmpty() )
@ -506,20 +484,35 @@ public class LocusIteratorByState extends LocusIterator {
}
}
private static boolean filterRead(SAMRecord rec, long pos, List<LocusIteratorFilter> filters) {
for ( LocusIteratorFilter filter : filters ) {
if ( filter.filterOut(rec, pos) ) {
return true;
}
}
return false;
}
private void printStatus(final String title, long n) {
if ( n % 10000 == 0 )
System.out.printf("%s %d / %d = %.2f%n", title, n, observed_bases, 100.0 * n / (observed_bases + 1));
}
private void updateReadStates() {
Iterator<SAMRecordState> it = readStates.iterator();
while ( it.hasNext() ) {
SAMRecordState state = it.next();
CigarOperator op = state.stepForwardOnGenome();
if ( state.hadIndel() && readInfo.generateExtendedEvents() ) hasExtendedEvents = true;
else {
// we discard the read only when we are past its end AND indel at the end of the read (if any) was
// already processed. Keeping the read state that retunred null upon stepForwardOnGenome() is safe
// as the next call to stepForwardOnGenome() will return null again AND will clear hadIndel() flag.
if ( op == null ) { // we've stepped off the end of the object
//if (DEBUG) logger.debug(String.format(" removing read %s at %d", state.getRead().getReadName(), state.getRead().getAlignmentStart()));
it.remove();
for(String sampleName: sampleNames) {
Iterator<SAMRecordState> it = readStates.iteratorForSample(sampleName);
while ( it.hasNext() ) {
SAMRecordState state = it.next();
CigarOperator op = state.stepForwardOnGenome();
if ( state.hadIndel() && readInfo.generateExtendedEvents() ) hasExtendedEvents = true;
else {
// we discard the read only when we are past its end AND indel at the end of the read (if any) was
// already processed. Keeping the read state that retunred null upon stepForwardOnGenome() is safe
// as the next call to stepForwardOnGenome() will return null again AND will clear hadIndel() flag.
if ( op == null ) { // we've stepped off the end of the object
//if (DEBUG) logger.debug(String.format(" removing read %s at %d", state.getRead().getReadName(), state.getRead().getAlignmentStart()));
it.remove();
}
}
}
}
@ -544,5 +537,561 @@ public class LocusIteratorByState extends LocusIterator {
public LocusOverflowTracker getLocusOverflowTracker() {
return this.overflowTracker;
}
private class ReadStateManager {
private final PeekableIterator<SAMRecord> iterator;
private final DownsamplingMethod downsamplingMethod;
private final ReadSelector chainedReadSelector;
private final SamplePartitioner samplePartitioner;
private final Map<String,PerSampleReadStateManager> readStatesBySample = new HashMap<String,PerSampleReadStateManager>();
private final int targetCoverage;
private final int maxReadsAtLocus;
private int totalReadStates = 0;
public ReadStateManager(Iterator<SAMRecord> source, DownsamplingMethod downsamplingMethod, int maxReadsAtLocus, Collection<String> sampleNames) {
this.iterator = new PeekableIterator<SAMRecord>(source);
this.downsamplingMethod = downsamplingMethod;
switch(downsamplingMethod.type) {
case EXPERIMENTAL_BY_SAMPLE:
if(downsamplingMethod.toCoverage == null)
throw new StingException("Downsampling coverage (-dcov) must be specified when downsampling by sample");
this.targetCoverage = downsamplingMethod.toCoverage;
break;
default:
this.targetCoverage = Integer.MAX_VALUE;
}
this.maxReadsAtLocus = maxReadsAtLocus;
samplePartitioner = new SamplePartitioner(sampleNames);
for(String sampleName: sampleNames)
readStatesBySample.put(sampleName,new PerSampleReadStateManager());
ReadSelector primaryReadSelector= samplePartitioner;
chainedReadSelector = maxReadsAtLocus!=Integer.MAX_VALUE ? new FirstNReadSelector(primaryReadSelector,maxReadsAtLocus) : primaryReadSelector;
}
public Iterator<SAMRecordState> iteratorForSample(final String sampleName) {
return new Iterator<SAMRecordState>() {
private Iterator<SAMRecordState> wrappedIterator = readStatesBySample.get(sampleName).iterator();
public boolean hasNext() {
return wrappedIterator.hasNext();
}
public SAMRecordState next() {
return wrappedIterator.next();
}
public void remove() {
wrappedIterator.remove();
totalReadStates--;
}
};
}
public boolean isEmpty() {
return totalReadStates == 0;
}
public int size() {
return totalReadStates;
}
public SAMRecordState getFirst() {
for(String sampleName: sampleNames) {
PerSampleReadStateManager reads = readStatesBySample.get(sampleName);
if(!reads.isEmpty())
return reads.peek();
}
return null;
}
public boolean hasNext() {
return totalReadStates > 0 || iterator.hasNext();
}
public void collectPendingReads() {
if(!iterator.hasNext())
return;
if(readStates.size() == 0) {
int firstContigIndex = iterator.peek().getReferenceIndex();
int firstAlignmentStart = iterator.peek().getAlignmentStart();
while(iterator.hasNext() && iterator.peek().getReferenceIndex() == firstContigIndex && iterator.peek().getAlignmentStart() == firstAlignmentStart) {
chainedReadSelector.submitRead(iterator.next());
}
}
else {
// Fast fail in the case that the read is past the current position.
if(readIsPastCurrentPosition(iterator.peek()))
return;
while (iterator.hasNext() && !readIsPastCurrentPosition(iterator.peek())) {
chainedReadSelector.submitRead(iterator.next());
}
}
chainedReadSelector.complete();
for(String sampleName: sampleNames) {
ReadSelector aggregator = samplePartitioner.getSelectedReads(sampleName);
Collection<SAMRecord> newReads = new ArrayList<SAMRecord>(aggregator.getSelectedReads());
PerSampleReadStateManager statesBySample = readStatesBySample.get(sampleName);
int numReads = statesBySample.size();
if(numReads+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.NONE) {
long readLimit = aggregator.getNumReadsSeen();
boolean mrlViolation = false;
if(readLimit > maxReadsAtLocus-totalReadStates) {
readLimit = maxReadsAtLocus-totalReadStates;
mrlViolation = true;
}
addReadsToSample(statesBySample,newReads,readLimit,mrlViolation);
}
else {
int[] counts = statesBySample.getCountsPerAlignmentStart();
int[] updatedCounts = new int[counts.length];
System.arraycopy(counts,0,updatedCounts,0,counts.length);
boolean readPruned = true;
while(numReads+newReads.size()>targetCoverage && readPruned) {
readPruned = false;
for(int alignmentStart=updatedCounts.length-1;numReads+newReads.size()>targetCoverage&&alignmentStart>=0;alignmentStart--) {
if(updatedCounts[alignmentStart] > 1) {
updatedCounts[alignmentStart]--;
numReads--;
readPruned = true;
}
}
}
if(numReads == targetCoverage) {
updatedCounts[0]--;
numReads--;
}
BitSet toPurge = new BitSet(readStates.size());
int readOffset = 0;
for(int i = 0; i < updatedCounts.length; i++) {
int n = counts[i];
int k = updatedCounts[i];
for(Integer purgedElement: MathUtils.sampleIndicesWithoutReplacement(n,n-k))
toPurge.set(readOffset+purgedElement);
readOffset += counts[i];
}
statesBySample.purge(toPurge);
addReadsToSample(statesBySample,newReads,targetCoverage-numReads,false);
}
}
chainedReadSelector.reset();
}
/**
* Add reads with the given sample name to the given hanger entry.
* @param readStates The list of read states to add this collection of reads.
* @param reads Reads to add. Selected reads will be pulled from this source.
* @param maxReads Maximum number of reads to add.
*/
private void addReadsToSample(final PerSampleReadStateManager readStates, final Collection<SAMRecord> reads, final long maxReads, boolean atMaxReadsAtLocusLimit) {
if(reads.isEmpty())
return;
GenomeLoc location = null;
// the farthest right a read extends
Integer rightMostEnd = -1;
Collection<SAMRecordState> newReadStates = new LinkedList<SAMRecordState>();
int readCount = 0;
for(SAMRecord read: reads) {
if(readCount < maxReads) {
SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents());
state.stepForwardOnGenome();
newReadStates.add(state);
// TODO: What if we downsample the extended events away?
if (state.hadIndel()) hasExtendedEvents = true;
readCount++;
}
if(atMaxReadsAtLocusLimit) {
if (location == null)
location = GenomeLocParser.createGenomeLoc(read);
rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd;
}
}
readStates.addStatesAtNextAlignmentStart(newReadStates);
if (location != null)
overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd),
totalReadStates);
}
private class PerSampleReadStateManager implements Iterable<SAMRecordState> {
private final Queue<SAMRecordState> readStates = new LinkedList<SAMRecordState>();
private final Deque<Counter> readStateCounter = new LinkedList<Counter>();
public void addStatesAtNextAlignmentStart(Collection<SAMRecordState> states) {
readStates.addAll(states);
readStateCounter.add(new Counter(states.size()));
totalReadStates += states.size();
}
public boolean isEmpty() {
return readStates.isEmpty();
}
public SAMRecordState peek() {
return readStates.peek();
}
public int size() {
return readStates.size();
}
public int[] getCountsPerAlignmentStart() {
int[] counts = new int[readStateCounter.size()];
int index = 0;
for(Counter counter: readStateCounter)
counts[index++] = counter.getCount();
return counts;
}
public Iterator<SAMRecordState> iterator() {
return new Iterator<SAMRecordState>() {
private Iterator<SAMRecordState> wrappedIterator = readStates.iterator();
public boolean hasNext() {
return wrappedIterator.hasNext();
}
public SAMRecordState next() {
return wrappedIterator.next();
}
public void remove() {
wrappedIterator.remove();
Counter counter = readStateCounter.peek();
counter.decrement();
if(counter.getCount() == 0)
readStateCounter.remove();
}
};
}
/**
* Purge the given elements from the bitset. If an element in the bitset is true, purge
* the corresponding read state.
* @param elements bits from the set to purge.
*/
public void purge(final BitSet elements) {
if(elements.isEmpty() || readStates.isEmpty()) return;
Iterator<SAMRecordState> readStateIterator = readStates.iterator();
Iterator<Counter> counterIterator = readStateCounter.iterator();
Counter currentCounter = counterIterator.next();
int readIndex = 0;
long alignmentStartCounter = currentCounter.getCount();
int toPurge = elements.nextSetBit(0);
int removedCount = 0;
while(readStateIterator.hasNext() && toPurge >= 0) {
readStateIterator.next();
if(readIndex == toPurge) {
readStateIterator.remove();
currentCounter.decrement();
if(currentCounter.getCount() == 0)
counterIterator.remove();
removedCount++;
toPurge = elements.nextSetBit(toPurge+1);
}
readIndex++;
alignmentStartCounter--;
if(alignmentStartCounter == 0 && counterIterator.hasNext()) {
currentCounter = counterIterator.next();
alignmentStartCounter = currentCounter.getCount();
}
}
totalReadStates -= removedCount;
}
}
}
/**
* Note: assuming that, whenever we downsample, we downsample to an integer capacity.
*/
private class Counter {
private int count;
public Counter(int count) {
this.count = count;
}
public int getCount() {
return count;
}
public void decrement() {
count--;
}
}
}
/**
* Selects reads passed to it based on a criteria decided through inheritance.
* TODO: This is a temporary abstraction until we can get rid of this downsampling implementation and the mrl option. Get rid of this.
*/
interface ReadSelector {
/**
* All previous selectors in the chain have allowed this read. Submit it to this selector for consideration.
* @param read the read to evaluate.
*/
public void submitRead(SAMRecord read);
/**
* A previous selector has deemed this read unfit. Notify this selector so that this selector's counts are valid.
* @param read the read previously rejected.
*/
public void notifyReadRejected(SAMRecord read);
/**
* Signal the selector that read additions are complete.
*/
public void complete();
/**
* Retrieve the number of reads seen by this selector so far.
* @return number of reads seen.
*/
public long getNumReadsSeen();
/**
* Return the number of reads accepted by this selector so far.
* @return number of reads selected.
*/
public long getNumReadsSelected();
/**
* Get the reads selected by this selector.
* @return collection of reads selected by this selector.
*/
public Collection<SAMRecord> getSelectedReads();
/**
* Reset this collection to its pre-gathered state.
*/
public void reset();
}
/**
* Choose the first N reads from the submitted set.
*/
class FirstNReadSelector implements ReadSelector {
private final ReadSelector chainedSelector;
private final Collection<SAMRecord> selectedReads = new LinkedList<SAMRecord>();
private final long readLimit;
private long readsSeen = 0;
public FirstNReadSelector(ReadSelector chainedSelector, long readLimit) {
this.chainedSelector = chainedSelector;
this.readLimit = readLimit;
}
public void submitRead(SAMRecord read) {
if(readsSeen < readLimit) {
selectedReads.add(read);
if(chainedSelector != null)
chainedSelector.submitRead(read);
}
else
if(chainedSelector != null)
chainedSelector.notifyReadRejected(read);
readsSeen++;
}
public void notifyReadRejected(SAMRecord read) {
if(chainedSelector != null)
chainedSelector.notifyReadRejected(read);
readsSeen++;
}
public void complete() {
if(chainedSelector != null)
chainedSelector.complete();
}
public long getNumReadsSeen() {
return readsSeen;
}
public long getNumReadsSelected() {
return selectedReads.size();
}
public Collection<SAMRecord> getSelectedReads() {
return selectedReads;
}
public void reset() {
selectedReads.clear();
readsSeen = 0;
if(chainedSelector != null)
chainedSelector.reset();
}
}
/**
* Select N reads randomly from the input stream.
*/
class NRandomReadSelector implements ReadSelector {
private final ReservoirDownsampler<SAMRecord> reservoir;
private final ReadSelector chainedSelector;
private long readsSeen = 0;
public NRandomReadSelector(ReadSelector chainedSelector, long readLimit) {
this.reservoir = new ReservoirDownsampler<SAMRecord>((int)readLimit);
this.chainedSelector = chainedSelector;
}
public void submitRead(SAMRecord read) {
SAMRecord displaced = reservoir.add(read);
if(displaced != null && chainedSelector != null)
chainedSelector.notifyReadRejected(read);
readsSeen++;
}
public void notifyReadRejected(SAMRecord read) {
readsSeen++;
}
public void complete() {
for(SAMRecord read: reservoir.getDownsampledContents())
chainedSelector.submitRead(read);
if(chainedSelector != null)
chainedSelector.complete();
}
public long getNumReadsSeen() {
return readsSeen;
}
public long getNumReadsSelected() {
return reservoir.size();
}
public Collection<SAMRecord> getSelectedReads() {
return reservoir.getDownsampledContents();
}
public void reset() {
reservoir.clear();
if(chainedSelector != null)
chainedSelector.reset();
}
}
class SamplePartitioner implements ReadSelector {
private final Map<String,SampleStorage> readsBySample;
private long readsSeen = 0;
public SamplePartitioner(Collection<String> sampleNames) {
readsBySample = new HashMap<String,SampleStorage>();
for(String sampleName: sampleNames)
readsBySample.put(sampleName,new SampleStorage());
}
public void submitRead(SAMRecord read) {
String sampleName = read.getReadGroup()!=null ? read.getReadGroup().getSample() : null;
if(readsBySample.containsKey(sampleName))
readsBySample.get(sampleName).submitRead(read);
readsSeen++;
}
public void notifyReadRejected(SAMRecord read) {
String sampleName = read.getReadGroup()!=null ? read.getReadGroup().getSample() : null;
if(readsBySample.containsKey(sampleName))
readsBySample.get(sampleName).notifyReadRejected(read);
readsSeen++;
}
public void complete() {
// NO-OP.
}
public long getNumReadsSeen() {
return readsSeen;
}
public long getNumReadsSelected() {
return readsSeen;
}
public Collection<SAMRecord> getSelectedReads() {
throw new UnsupportedOperationException("Cannot directly get selected reads from a read partitioner.");
}
public ReadSelector getSelectedReads(String sampleName) {
if(!readsBySample.containsKey(sampleName))
throw new NoSuchElementException("Sample name not found");
return readsBySample.get(sampleName);
}
public void reset() {
for(SampleStorage storage: readsBySample.values())
storage.reset();
readsSeen = 0;
}
private class SampleStorage implements ReadSelector {
private Collection<SAMRecord> reads = new LinkedList<SAMRecord>();
private long readsSeen = 0;
public void submitRead(SAMRecord read) {
reads.add(read);
readsSeen++;
}
public void notifyReadRejected(SAMRecord read) {
readsSeen++;
}
public void complete() {
// NO-OP.
}
public long getNumReadsSeen() {
return readsSeen;
}
public long getNumReadsSelected() {
return readsSeen;
}
public Collection<SAMRecord> getSelectedReads() {
return reads;
}
public void reset() {
reads.clear();
readsSeen = 0;
}
}
}

View File

@ -19,7 +19,7 @@ public class PileupWalkerIntegrationTest extends WalkerTest {
String gatk_args = "-T Pileup -I " + validationDataLocation + "FHS_Pileup_Test.bam "
+ "-R " + seqLocation + "references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta"
+ " -L chr15:46,347,148 -o %s";
String expected_md5 = "d23032d10111755ccb1c1b01e6e097a7";
String expected_md5 = "052187dd2bf2516a027578c8775856a8";
WalkerTestSpec spec = new WalkerTestSpec(gatk_args, 1, Arrays.asList(expected_md5));
executeTest("Testing the standard (no-indel) pileup on three merged FHS pools with 27 deletions in 969 bases", spec);
}