diff --git a/java/src/org/broadinstitute/sting/gatk/executive/WindowMaker.java b/java/src/org/broadinstitute/sting/gatk/executive/WindowMaker.java index 3cbae2783..24964d51d 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/WindowMaker.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/WindowMaker.java @@ -68,13 +68,7 @@ public class WindowMaker implements Iterable, I LocusIterator locusIterator; Iterator wrappedIterator = TraversalEngine.addMandatoryFilteringIterators(iterator, filters); - if(sourceInfo.getDownsamplingMethod() != null && - (sourceInfo.getDownsamplingMethod().type == DownsampleType.EXPERIMENTAL_BY_SAMPLE)) { - if ( discards.size() > 0 ) - throw new StingException("Experimental downsampling iterator doesn't support base discarding at this point; complain to Matt Hanna"); - locusIterator = new DownsamplingLocusIteratorByState(wrappedIterator,sourceInfo); - } else - locusIterator = new LocusIteratorByState(wrappedIterator,sourceInfo, discards); + locusIterator = new LocusIteratorByState(wrappedIterator,sourceInfo,discards); this.locusOverflowTracker = locusIterator.getLocusOverflowTracker(); diff --git a/java/src/org/broadinstitute/sting/gatk/iterators/DownsamplingLocusIteratorByState.java b/java/src/org/broadinstitute/sting/gatk/iterators/DownsamplingLocusIteratorByState.java deleted file mode 100755 index 72a05a34e..000000000 --- a/java/src/org/broadinstitute/sting/gatk/iterators/DownsamplingLocusIteratorByState.java +++ /dev/null @@ -1,1097 +0,0 @@ -/* - * Copyright (c) 2009 The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ - -package org.broadinstitute.sting.gatk.iterators; - -import net.sf.samtools.*; -import net.sf.picard.util.PeekableIterator; -import org.apache.log4j.Logger; -import org.broadinstitute.sting.gatk.Reads; -import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; -import org.broadinstitute.sting.gatk.DownsamplingMethod; -import org.broadinstitute.sting.gatk.DownsampleType; -import org.broadinstitute.sting.gatk.contexts.AlignmentContext; -import org.broadinstitute.sting.utils.*; -import org.broadinstitute.sting.utils.pileup.*; - -import java.util.*; - -/** Iterator that traverses a SAM File, accumulating information on a per-locus basis */ -public class DownsamplingLocusIteratorByState extends LocusIterator { - private static long discarded_bases = 0L; - private static long observed_bases = 0L; - - // - // todo -- eric, add your UG filters here - // - //public enum Discard { ADAPTOR_BASES } - //public static final EnumSet NO_DISCARDS = EnumSet.noneOf(Discard.class); - public static final List NO_FILTERS = Arrays.asList(); - - /** - * the overflow tracker, which makes sure we get a limited number of warnings for locus pile-ups that - * exceed the max depth - */ - private LocusOverflowTracker overflowTracker; - - /** our log, which we want to capture anything from this class */ - private static Logger logger = Logger.getLogger(DownsamplingLocusIteratorByState.class); - - // ----------------------------------------------------------------------------------------------------------------- - // - // member fields - // - // ----------------------------------------------------------------------------------------------------------------- - private boolean hasExtendedEvents = false; // will be set to true if at least one read had an indel right before the current position - - private final Collection sampleNames = new ArrayList(); - private final ReadStateManager readStates; - - private class SAMRecordState { - SAMRecord read; - int readOffset = -1; // how far are we offset from the start of the read bases? - int genomeOffset = -1; // how far are we offset from the alignment start on the genome? - - Cigar cigar = null; - int cigarOffset = -1; - CigarElement curElement = null; - int nCigarElements = 0; - - // how far are we into a single cigarElement - int cigarElementCounter = -1; - - // The logical model for generating extended events is as follows: the "record state" implements the traversal - // along the reference; thus stepForwardOnGenome() returns on every and only on actual reference bases. This - // can be a (mis)match or a deletion (in the latter case, we still return on every individual reference base the - // deletion spans). In the extended events mode, the record state also remembers if there was an insertion, or - // if the deletion just started *right before* the current reference base the record state is pointing to upon the return from - // stepForwardOnGenome(). The next call to stepForwardOnGenome() will clear that memory (as we remember only extended - // events immediately preceding the current reference base). - - boolean generateExtendedEvents = true; // should we generate an additional, special pile for indels between the ref bases? - // the only purpose of this flag is to shield away a few additional lines of code - // when extended piles are not needed, it may not be even worth it... - byte[] insertedBases = null; // remember full inserted sequence if we are generating piles of extended events (indels) - int eventLength = -1; // will be set to the length of insertion/deletion if we are generating piles of extended events - byte eventDelayedFlag = 0; // will be set to non-0 if there was an event (indel) right before the - // current base on the ref. We use a counter-like variable here since clearing the indel event is - // delayed by one base, so we need to remember how long ago we have seen the actual event - int eventStart = -1; // where on the read the extended event starts (i.e. the last position on the read prior to the - // event, or -1 if alignment starts with an insertion); this one is easy to recompute on the fly, - // we cache it here mainly for convenience - - - public SAMRecordState(SAMRecord read, boolean extended) { - this.read = read; - cigar = read.getCigar(); - nCigarElements = cigar.numCigarElements(); - generateExtendedEvents = extended; - - //System.out.printf("Creating a SAMRecordState: %s%n", this); - } - - public SAMRecordState(SAMRecord read) { - this(read,false); - } - - public SAMRecord getRead() { return read; } - - /** - * What is our current offset in the read's bases that aligns us with the reference genome? - * - * @return - */ - public int getReadOffset() { return readOffset; } - - /** - * What is the current offset w.r.t. the alignment state that aligns us to the readOffset? - * - * @return - */ - public int getGenomeOffset() { return genomeOffset; } - - public int getGenomePosition() { return read.getAlignmentStart() + getGenomeOffset(); } - - public GenomeLoc getLocation() { - return GenomeLocParser.createGenomeLoc(read.getReferenceName(), getGenomePosition()); - } - - public CigarOperator getCurrentCigarOperator() { - return curElement.getOperator(); - } - - /** Returns true if we just stepped over insertion/into a deletion prior to the last return from stepForwardOnGenome. - * - * @return - */ - public boolean hadIndel() { - return ( eventLength > 0 ); - } - - public int getEventLength() { return eventLength; } - - public byte[] getEventBases() { return insertedBases; } - - public int getReadEventStartOffset() { return eventStart; } - - public String toString() { - return String.format("%s ro=%d go=%d co=%d cec=%d %s", read.getReadName(), readOffset, genomeOffset, cigarOffset, cigarElementCounter, curElement); - } - - public CigarOperator stepForwardOnGenome() { - // we enter this method with readOffset = index of the last processed base on the read - // (-1 if we did not process a single base yet); this can be last matching base, or last base of an insertion - - - if ( curElement == null || ++cigarElementCounter > curElement.getLength() ) { - cigarOffset++; - if ( cigarOffset < nCigarElements ) { - curElement = cigar.getCigarElement(cigarOffset); - cigarElementCounter = 0; - // next line: guards against cigar elements of length 0; when new cigar element is retrieved, - // we reenter in order to re-check cigarElementCounter against curElement's length - return stepForwardOnGenome(); - } else { - if ( generateExtendedEvents && eventDelayedFlag > 0 ) { - genomeOffset++; // extended events need that. Logically, it's legal to advance the genomic offset here: - // we do step forward on the ref, and by returning null we also indicate that we are past the read end. - - // if we had an indel right before the read ended (i.e. insertion was the last cigar element), - // we keep it until next reference base; then we discard it and this will allow the LocusIterator to - // finally discard this read - eventDelayedFlag--; - if ( eventDelayedFlag == 0 ) { - eventLength = -1; // reset event when we are past it - insertedBases = null; - eventStart = -1; - } - } - return null; - } - } - - - boolean done = false; - switch (curElement.getOperator()) { - case H : // ignore hard clips - case P : // ignore pads - cigarElementCounter = curElement.getLength(); - break; - case I : // insertion w.r.t. the reference - if ( generateExtendedEvents ) { - // we see insertions only once, when we step right onto them; the position on the read is scrolled - // past the insertion right after that - if ( eventDelayedFlag > 1 ) throw new StingException("Adjacent I/D events in read "+read.getReadName()); - insertedBases = Arrays.copyOfRange(read.getReadBases(),readOffset+1,readOffset+1+curElement.getLength()); - eventLength = curElement.getLength() ; - eventStart = readOffset; - eventDelayedFlag = 2; // insertion causes re-entry into stepForwardOnGenome, so we set the delay to 2 -// System.out.println("Inserted "+(new String (insertedBases)) +" after "+readOffset); - } // continue onto the 'S' case ! - case S : // soft clip - cigarElementCounter = curElement.getLength(); - readOffset += curElement.getLength(); - break; - case D : // deletion w.r.t. the reference - if ( generateExtendedEvents ) { - if ( cigarElementCounter == 1) { - // generate an extended event only if we just stepped into the deletion (i.e. don't - // generate the event at every deleted position on the ref, that's what cigarElementCounter==1 is for!) - if ( eventDelayedFlag > 1 ) throw new StingException("Adjacent I/D events in read "+read.getReadName()); - eventLength = curElement.getLength(); - eventDelayedFlag = 2; // deletion on the ref causes an immediate return, so we have to delay by 1 only - eventStart = readOffset; - insertedBases = null; -// System.out.println("Deleted "+eventLength +" bases after "+readOffset); - } - } // continue onto the 'N' case ! - case N : // reference skip (looks and gets processed just like a "deletion", just different logical meaning) - genomeOffset++; - done = true; - break; - case M : - readOffset++; - genomeOffset++; - done = true; - break; - default : throw new IllegalStateException("Case statement didn't deal with cigar op: " + curElement.getOperator()); - } - - if ( generateExtendedEvents ) { - if ( eventDelayedFlag > 0 && done ) { - // if we did make a successful step on the ref, decrement delayed flag. If, upon the decrementthe, - // the flag is 1, we are standing on the reference base right after the indel (so we have to keep it). - // Otherwise, we are away from the previous indel and have to clear our memories... - eventDelayedFlag--; // when we notice an indel, we set delayed flag to 2, so now - // if eventDelayedFlag == 1, an indel occured right before the current base - if ( eventDelayedFlag == 0 ) { - eventLength = -1; // reset event when we are past it - insertedBases = null; - eventStart = -1; - } - } - } - - return done ? curElement.getOperator() : stepForwardOnGenome(); - } - } - - //final boolean DEBUG = false; - //final boolean DEBUG2 = false && DEBUG; - private Reads readInfo; - private AlignmentContext nextAlignmentContext; - private List filters = new ArrayList(); - - // ----------------------------------------------------------------------------------------------------------------- - // - // constructors and other basic operations - // - // ----------------------------------------------------------------------------------------------------------------- - public DownsamplingLocusIteratorByState(final Iterator samIterator, Reads readInformation ) { - this(samIterator, readInformation, NO_FILTERS); - } - - public DownsamplingLocusIteratorByState(final Iterator samIterator, Reads readInformation, List filters ) { - // Aggregate all sample names. - // TODO: Push in header via constructor - if(GenomeAnalysisEngine.instance != null && GenomeAnalysisEngine.instance.getDataSource() != null) { - sampleNames.addAll(SampleUtils.getSAMFileSamples(GenomeAnalysisEngine.instance.getSAMFileHeader())); - } - // Add a null sample name as a catch-all for reads without samples - if(!sampleNames.contains(null)) sampleNames.add(null); - readStates = new ReadStateManager(samIterator,readInformation.getDownsamplingMethod(),readInformation.getMaxReadsAtLocus(),sampleNames); - this.readInfo = readInformation; - this.filters = filters; - overflowTracker = new LocusOverflowTracker(readInformation.getMaxReadsAtLocus()); - } - - public Iterator iterator() { - return this; - } - - public void close() { - //this.it.close(); - } - - public boolean hasNext() { - lazyLoadNextAlignmentContext(); - boolean r = (nextAlignmentContext != null); - //if ( DEBUG ) System.out.printf("hasNext() = %b%n", r); - - // if we don't have a next record, make sure we clean the warning queue - // TODO: Note that this implementation requires that hasNext() always be called before next(). - if (!r) overflowTracker.cleanWarningQueue(); - - return r; - } - - public void printState() { - for(String sampleName: sampleNames) { - Iterator iterator = readStates.iteratorForSample(sampleName); - while(iterator.hasNext()) { - SAMRecordState state = iterator.next(); - logger.debug(String.format("printState():")); - SAMRecord read = state.getRead(); - int offset = state.getReadOffset(); - logger.debug(String.format(" read: %s(%d)=%s, cigar=%s", read.getReadName(), offset, (char)read.getReadBases()[offset], read.getCigarString())); - } - } - } - - private GenomeLoc getLocation() { - return readStates.isEmpty() ? null : readStates.getFirst().getLocation(); - } - - // ----------------------------------------------------------------------------------------------------------------- - // - // next() routine and associated collection operations - // - // ----------------------------------------------------------------------------------------------------------------- - public AlignmentContext next() { - lazyLoadNextAlignmentContext(); - if(!hasNext()) - throw new NoSuchElementException("LocusIteratorByState: out of elements."); - AlignmentContext currentAlignmentContext = nextAlignmentContext; - nextAlignmentContext = null; - return currentAlignmentContext; - } - - /** - * Creates the next alignment context from the given state. Note that this is implemented as a lazy load method. - * nextAlignmentContext MUST BE null in order for this method to advance to the next entry. - */ - private void lazyLoadNextAlignmentContext() { - while(nextAlignmentContext == null && readStates.hasNext()) { - // this call will set hasExtendedEvents to true if it picks up a read with indel right before the current position on the ref: - readStates.collectPendingReads(); - - int size = 0; - int nDeletions = 0; - int nInsertions = 0; - int nMQ0Reads = 0; - - - // if extended events are requested, and if previous traversal step brought us over an indel in - // at least one read, we emit extended pileup (making sure that it is associated with the previous base, - // i.e. the one right *before* the indel) and do NOT shift the current position on the ref. - // In this case, the subsequent call to next() will emit the normal pileup at the current base - // and shift the position. - if (readInfo.generateExtendedEvents() && hasExtendedEvents) { - Map> fullExtendedEventPileup = - new HashMap>(); - - SAMRecordState our1stState = readStates.getFirst(); - // get current location on the reference and decrement it by 1: the indels we just stepped over - // are associated with the *previous* reference base - GenomeLoc loc = GenomeLocParser.incPos(our1stState.getLocation(),-1); - - for(String sampleName: sampleNames) { - Iterator iterator = readStates.iteratorForSample(sampleName); - ArrayList indelPile = new ArrayList(readStates.size()); - - size = 0; - nDeletions = 0; - nInsertions = 0; - nMQ0Reads = 0; - int maxDeletionLength = 0; - - while(iterator.hasNext()) { - SAMRecordState state = iterator.next(); - if ( state.hadIndel() ) { - size++; - if ( state.getEventBases() == null ) { - nDeletions++; - maxDeletionLength = Math.max(maxDeletionLength,state.getEventLength()); - } - else nInsertions++; - indelPile.add ( new ExtendedEventPileupElement(state.getRead(), - state.getReadEventStartOffset(), - state.getEventLength(), - state.getEventBases()) ); - - } else { - if ( state.getCurrentCigarOperator() != CigarOperator.N ) { - // this read has no indel associated with the previous position on the ref; - // we count this read in only if it has actual bases, not N span... - if ( state.getCurrentCigarOperator() != CigarOperator.D || readInfo.includeReadsWithDeletionAtLoci() ) { - - // if cigar operator is D but the read has no extended event reported (that's why we ended - // up in this branch), it means that we are currently inside a deletion that started earlier; - // we count such reads (with a longer deletion spanning over a deletion at the previous base we are - // about to report) only if includeReadsWithDeletionAtLoci is true. - size++; - indelPile.add ( new ExtendedEventPileupElement(state.getRead(), - state.getReadOffset()-1, - -1) // length=-1 --> noevent - ); - } - } - } - if ( state.getRead().getMappingQuality() == 0 ) { - nMQ0Reads++; - } - } - if( indelPile.size() != 0 ) fullExtendedEventPileup.put(sampleName,new ReadBackedExtendedEventPileupImpl(loc,indelPile,size,maxDeletionLength,nInsertions,nDeletions,nMQ0Reads)); - } - hasExtendedEvents = false; // we are done with extended events prior to current ref base -// System.out.println("Indel(s) at "+loc); -// for ( ExtendedEventPileupElement pe : indelPile ) { if ( pe.isIndel() ) System.out.println(" "+pe.toString()); } - nextAlignmentContext = new AlignmentContext(loc, new ReadBackedExtendedEventPileupImpl(loc, fullExtendedEventPileup)); - } else { - GenomeLoc location = getLocation(); - Map> fullPileup = new HashMap>(); - - // todo -- performance problem -- should be lazy, really - for(String sampleName: sampleNames) { - Iterator iterator = readStates.iteratorForSample(sampleName); - ArrayList pile = new ArrayList(readStates.size()); - - size = 0; - nDeletions = 0; - nInsertions = 0; - nMQ0Reads = 0; - - while(iterator.hasNext()) { - SAMRecordState state = iterator.next(); - if ( state.getCurrentCigarOperator() != CigarOperator.D && state.getCurrentCigarOperator() != CigarOperator.N ) { - if ( filterRead(state.getRead(), location.getStart(), filters ) ) { - discarded_bases++; - //printStatus("Adaptor bases", discarded_adaptor_bases); - continue; - } else { - observed_bases++; - pile.add(new PileupElement(state.getRead(), state.getReadOffset())); - size++; - } - } else if ( readInfo.includeReadsWithDeletionAtLoci() && state.getCurrentCigarOperator() != CigarOperator.N ) { - size++; - pile.add(new PileupElement(state.getRead(), -1)); - nDeletions++; - } - - // todo -- this looks like a bug w.r.t. including reads with deletion at loci -- MAD 05/27/10 - if ( state.getRead().getMappingQuality() == 0 ) { - nMQ0Reads++; - } - } - if( pile.size() != 0 ) fullPileup.put(sampleName,new ReadBackedPileupImpl(location,pile,size,nDeletions,nMQ0Reads)); - } - - updateReadStates(); // critical - must be called after we get the current state offsets and location - // if we got reads with non-D/N over the current position, we are done - if ( !fullPileup.isEmpty() ) nextAlignmentContext = new AlignmentContext(location, new ReadBackedPileupImpl(location,fullPileup)); - } - } - } - - // fast testing of position - private boolean readIsPastCurrentPosition(SAMRecord read) { - if ( readStates.isEmpty() ) - return false; - else { - SAMRecordState state = readStates.getFirst(); - SAMRecord ourRead = state.getRead(); -// int offset = 0; -// final CigarElement ce = read.getCigar().getCigarElement(0); - // if read starts with an insertion, we want to get it in at the moment we are standing on the - // reference base the insertion is associated with, not when we reach "alignment start", which is - // first base *after* the insertion -// if ( ce.getOperator() == CigarOperator.I ) offset = ce.getLength(); -// return read.getReferenceIndex() > ourRead.getReferenceIndex() || read.getAlignmentStart() - offset > state.getGenomePosition(); - return read.getReferenceIndex() > ourRead.getReferenceIndex() || read.getAlignmentStart() > state.getGenomePosition(); - } - } - - private static boolean filterRead(SAMRecord rec, long pos, List filters) { - for ( LocusIteratorFilter filter : filters ) { - if ( filter.filterOut(rec, pos) ) { - return true; - } - } - return false; - } - - private void printStatus(final String title, long n) { - if ( n % 10000 == 0 ) - System.out.printf("%s %d / %d = %.2f%n", title, n, observed_bases, 100.0 * n / (observed_bases + 1)); - } - - private void updateReadStates() { - for(String sampleName: sampleNames) { - Iterator it = readStates.iteratorForSample(sampleName); - while ( it.hasNext() ) { - SAMRecordState state = it.next(); - CigarOperator op = state.stepForwardOnGenome(); - if ( state.hadIndel() && readInfo.generateExtendedEvents() ) hasExtendedEvents = true; - else { - // we discard the read only when we are past its end AND indel at the end of the read (if any) was - // already processed. Keeping the read state that retunred null upon stepForwardOnGenome() is safe - // as the next call to stepForwardOnGenome() will return null again AND will clear hadIndel() flag. - if ( op == null ) { // we've stepped off the end of the object - //if (DEBUG) logger.debug(String.format(" removing read %s at %d", state.getRead().getReadName(), state.getRead().getAlignmentStart())); - it.remove(); - } - } - } - } - } - - public void remove() { - throw new UnsupportedOperationException("Can not remove records from a SAM file via an iterator!"); - } - - /** - * a method for setting the overflow tracker, for dependency injection - * @param tracker - */ - protected void setLocusOverflowTracker(LocusOverflowTracker tracker) { - this.overflowTracker = tracker; - } - - /** - * a method for getting the overflow tracker - * @return the overflow tracker, null if none exists - */ - public LocusOverflowTracker getLocusOverflowTracker() { - return this.overflowTracker; - } - - private class ReadStateManager { - private final PeekableIterator iterator; - private final DownsamplingMethod downsamplingMethod; - - private final ReadSelector chainedReadSelector; - private final SamplePartitioner samplePartitioner; - - private final Map readStatesBySample = new HashMap(); - - private final int targetCoverage; - private final int maxReadsAtLocus; - - private int totalReadStates = 0; - - public ReadStateManager(Iterator source, DownsamplingMethod downsamplingMethod, int maxReadsAtLocus, Collection sampleNames) { - this.iterator = new PeekableIterator(source); - this.downsamplingMethod = downsamplingMethod; - switch(downsamplingMethod.type) { - case EXPERIMENTAL_BY_SAMPLE: - if(downsamplingMethod.toCoverage == null) - throw new StingException("Downsampling coverage (-dcov) must be specified when downsampling by sample"); - this.targetCoverage = downsamplingMethod.toCoverage; - break; - default: - this.targetCoverage = Integer.MAX_VALUE; - } - this.maxReadsAtLocus = maxReadsAtLocus; - - samplePartitioner = new SamplePartitioner(sampleNames); - for(String sampleName: sampleNames) - readStatesBySample.put(sampleName,new PerSampleReadStateManager()); - - ReadSelector primaryReadSelector= samplePartitioner; - - chainedReadSelector = maxReadsAtLocus!=Integer.MAX_VALUE ? new FirstNReadSelector(primaryReadSelector,maxReadsAtLocus) : primaryReadSelector; - } - - public Iterator iteratorForSample(final String sampleName) { - return new Iterator() { - private Iterator wrappedIterator = readStatesBySample.get(sampleName).iterator(); - - public boolean hasNext() { - return wrappedIterator.hasNext(); - } - - public SAMRecordState next() { - return wrappedIterator.next(); - } - - public void remove() { - wrappedIterator.remove(); - totalReadStates--; - } - }; - } - - public boolean isEmpty() { - return totalReadStates == 0; - } - - public int size() { - return totalReadStates; - } - - public SAMRecordState getFirst() { - for(String sampleName: sampleNames) { - PerSampleReadStateManager reads = readStatesBySample.get(sampleName); - if(!reads.isEmpty()) - return reads.peek(); - } - return null; - } - - public boolean hasNext() { - return totalReadStates > 0 || iterator.hasNext(); - } - - public void collectPendingReads() { - if(!iterator.hasNext()) - return; - - if(readStates.size() == 0) { - int firstContigIndex = iterator.peek().getReferenceIndex(); - int firstAlignmentStart = iterator.peek().getAlignmentStart(); - while(iterator.hasNext() && iterator.peek().getReferenceIndex() == firstContigIndex && iterator.peek().getAlignmentStart() == firstAlignmentStart) { - chainedReadSelector.submitRead(iterator.next()); - } - } - else { - // Fast fail in the case that the read is past the current position. - if(readIsPastCurrentPosition(iterator.peek())) - return; - - while (iterator.hasNext() && !readIsPastCurrentPosition(iterator.peek())) { - chainedReadSelector.submitRead(iterator.next()); - } - } - chainedReadSelector.complete(); - - for(String sampleName: sampleNames) { - ReadSelector aggregator = samplePartitioner.getSelectedReads(sampleName); - - Collection newReads = new ArrayList(aggregator.getSelectedReads()); - - PerSampleReadStateManager statesBySample = readStatesBySample.get(sampleName); - int numReads = statesBySample.size(); - - if(numReads+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.NONE) { - long readLimit = aggregator.getNumReadsSeen(); - boolean mrlViolation = false; - if(readLimit > maxReadsAtLocus-totalReadStates) { - readLimit = maxReadsAtLocus-totalReadStates; - mrlViolation = true; - } - addReadsToSample(statesBySample,newReads,readLimit,mrlViolation); - } - else { - int[] counts = statesBySample.getCountsPerAlignmentStart(); - int[] updatedCounts = new int[counts.length]; - System.arraycopy(counts,0,updatedCounts,0,counts.length); - - boolean readPruned = true; - while(numReads+newReads.size()>targetCoverage && readPruned) { - readPruned = false; - for(int alignmentStart=updatedCounts.length-1;numReads+newReads.size()>targetCoverage&&alignmentStart>=0;alignmentStart--) { - if(updatedCounts[alignmentStart] > 1) { - updatedCounts[alignmentStart]--; - numReads--; - readPruned = true; - } - } - } - - if(numReads == targetCoverage) { - updatedCounts[0]--; - numReads--; - } - - BitSet toPurge = new BitSet(readStates.size()); - int readOffset = 0; - - for(int i = 0; i < updatedCounts.length; i++) { - int n = counts[i]; - int k = updatedCounts[i]; - - for(Integer purgedElement: MathUtils.sampleIndicesWithoutReplacement(n,n-k)) - toPurge.set(readOffset+purgedElement); - - readOffset += counts[i]; - } - statesBySample.purge(toPurge); - addReadsToSample(statesBySample,newReads,targetCoverage-numReads,false); - } - } - chainedReadSelector.reset(); - } - - /** - * Add reads with the given sample name to the given hanger entry. - * @param readStates The list of read states to add this collection of reads. - * @param reads Reads to add. Selected reads will be pulled from this source. - * @param maxReads Maximum number of reads to add. - */ - private void addReadsToSample(final PerSampleReadStateManager readStates, final Collection reads, final long maxReads, boolean atMaxReadsAtLocusLimit) { - if(reads.isEmpty()) - return; - - GenomeLoc location = null; - - // the farthest right a read extends - Integer rightMostEnd = -1; - - Collection newReadStates = new LinkedList(); - int readCount = 0; - for(SAMRecord read: reads) { - if(readCount < maxReads) { - SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents()); - state.stepForwardOnGenome(); - newReadStates.add(state); - // TODO: What if we downsample the extended events away? - if (state.hadIndel()) hasExtendedEvents = true; - readCount++; - } - else if(atMaxReadsAtLocusLimit) { - if (location == null) - location = GenomeLocParser.createGenomeLoc(read); - rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd; - } - } - readStates.addStatesAtNextAlignmentStart(newReadStates); - - if (location != null) - overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd), - totalReadStates); - } - - private class PerSampleReadStateManager implements Iterable { - private final Queue readStates = new LinkedList(); - private final Deque readStateCounter = new LinkedList(); - - public void addStatesAtNextAlignmentStart(Collection states) { - readStates.addAll(states); - readStateCounter.add(new Counter(states.size())); - totalReadStates += states.size(); - } - - public boolean isEmpty() { - return readStates.isEmpty(); - } - - public SAMRecordState peek() { - return readStates.peek(); - } - - public int size() { - return readStates.size(); - } - - public int[] getCountsPerAlignmentStart() { - int[] counts = new int[readStateCounter.size()]; - int index = 0; - for(Counter counter: readStateCounter) - counts[index++] = counter.getCount(); - return counts; - } - - public Iterator iterator() { - return new Iterator() { - private Iterator wrappedIterator = readStates.iterator(); - - public boolean hasNext() { - return wrappedIterator.hasNext(); - } - - public SAMRecordState next() { - return wrappedIterator.next(); - } - - public void remove() { - wrappedIterator.remove(); - Counter counter = readStateCounter.peek(); - counter.decrement(); - if(counter.getCount() == 0) - readStateCounter.remove(); - } - }; - } - - /** - * Purge the given elements from the bitset. If an element in the bitset is true, purge - * the corresponding read state. - * @param elements bits from the set to purge. - */ - public void purge(final BitSet elements) { - if(elements.isEmpty() || readStates.isEmpty()) return; - - Iterator readStateIterator = readStates.iterator(); - - Iterator counterIterator = readStateCounter.iterator(); - Counter currentCounter = counterIterator.next(); - - int readIndex = 0; - long alignmentStartCounter = currentCounter.getCount(); - - int toPurge = elements.nextSetBit(0); - int removedCount = 0; - - while(readStateIterator.hasNext() && toPurge >= 0) { - readStateIterator.next(); - - if(readIndex == toPurge) { - readStateIterator.remove(); - currentCounter.decrement(); - if(currentCounter.getCount() == 0) - counterIterator.remove(); - removedCount++; - toPurge = elements.nextSetBit(toPurge+1); - } - - readIndex++; - alignmentStartCounter--; - if(alignmentStartCounter == 0 && counterIterator.hasNext()) { - currentCounter = counterIterator.next(); - alignmentStartCounter = currentCounter.getCount(); - } - } - - totalReadStates -= removedCount; - } - } - } - - /** - * Note: assuming that, whenever we downsample, we downsample to an integer capacity. - */ - private class Counter { - private int count; - - public Counter(int count) { - this.count = count; - } - - public int getCount() { - return count; - } - - public void decrement() { - count--; - } - } -} - -/** - * Selects reads passed to it based on a criteria decided through inheritance. - * TODO: This is a temporary abstraction until we can get rid of this downsampling implementation and the mrl option. Get rid of this. - */ -interface ReadSelector { - /** - * All previous selectors in the chain have allowed this read. Submit it to this selector for consideration. - * @param read the read to evaluate. - */ - public void submitRead(SAMRecord read); - - /** - * A previous selector has deemed this read unfit. Notify this selector so that this selector's counts are valid. - * @param read the read previously rejected. - */ - public void notifyReadRejected(SAMRecord read); - - /** - * Signal the selector that read additions are complete. - */ - public void complete(); - - /** - * Retrieve the number of reads seen by this selector so far. - * @return number of reads seen. - */ - public long getNumReadsSeen(); - - /** - * Return the number of reads accepted by this selector so far. - * @return number of reads selected. - */ - public long getNumReadsSelected(); - - /** - * Get the reads selected by this selector. - * @return collection of reads selected by this selector. - */ - public Collection getSelectedReads(); - - /** - * Reset this collection to its pre-gathered state. - */ - public void reset(); -} - -/** - * Choose the first N reads from the submitted set. - */ -class FirstNReadSelector implements ReadSelector { - private final ReadSelector chainedSelector; - - private final Collection selectedReads = new LinkedList(); - private final long readLimit; - private long readsSeen = 0; - - public FirstNReadSelector(ReadSelector chainedSelector, long readLimit) { - this.chainedSelector = chainedSelector; - this.readLimit = readLimit; - } - - public void submitRead(SAMRecord read) { - if(readsSeen < readLimit) { - selectedReads.add(read); - if(chainedSelector != null) - chainedSelector.submitRead(read); - } - else - if(chainedSelector != null) - chainedSelector.notifyReadRejected(read); - readsSeen++; - } - - public void notifyReadRejected(SAMRecord read) { - if(chainedSelector != null) - chainedSelector.notifyReadRejected(read); - readsSeen++; - } - - public void complete() { - if(chainedSelector != null) - chainedSelector.complete(); - } - - public long getNumReadsSeen() { - return readsSeen; - } - - public long getNumReadsSelected() { - return selectedReads.size(); - } - - public Collection getSelectedReads() { - return selectedReads; - } - - public void reset() { - selectedReads.clear(); - readsSeen = 0; - if(chainedSelector != null) - chainedSelector.reset(); - } -} - -/** - * Select N reads randomly from the input stream. - */ -class NRandomReadSelector implements ReadSelector { - private final ReservoirDownsampler reservoir; - private final ReadSelector chainedSelector; - private long readsSeen = 0; - - public NRandomReadSelector(ReadSelector chainedSelector, long readLimit) { - this.reservoir = new ReservoirDownsampler((int)readLimit); - this.chainedSelector = chainedSelector; - } - - public void submitRead(SAMRecord read) { - SAMRecord displaced = reservoir.add(read); - if(displaced != null && chainedSelector != null) - chainedSelector.notifyReadRejected(read); - readsSeen++; - } - - public void notifyReadRejected(SAMRecord read) { - readsSeen++; - } - - public void complete() { - for(SAMRecord read: reservoir.getDownsampledContents()) - chainedSelector.submitRead(read); - if(chainedSelector != null) - chainedSelector.complete(); - } - - - public long getNumReadsSeen() { - return readsSeen; - } - - public long getNumReadsSelected() { - return reservoir.size(); - } - - public Collection getSelectedReads() { - return reservoir.getDownsampledContents(); - } - - public void reset() { - reservoir.clear(); - if(chainedSelector != null) - chainedSelector.reset(); - } -} - -class SamplePartitioner implements ReadSelector { - private final Map readsBySample; - private long readsSeen = 0; - - public SamplePartitioner(Collection sampleNames) { - readsBySample = new HashMap(); - for(String sampleName: sampleNames) - readsBySample.put(sampleName,new SampleStorage()); - } - - public void submitRead(SAMRecord read) { - String sampleName = read.getReadGroup()!=null ? read.getReadGroup().getSample() : null; - if(readsBySample.containsKey(sampleName)) - readsBySample.get(sampleName).submitRead(read); - readsSeen++; - } - - public void notifyReadRejected(SAMRecord read) { - String sampleName = read.getReadGroup()!=null ? read.getReadGroup().getSample() : null; - if(readsBySample.containsKey(sampleName)) - readsBySample.get(sampleName).notifyReadRejected(read); - readsSeen++; - } - - public void complete() { - // NO-OP. - } - - public long getNumReadsSeen() { - return readsSeen; - } - - public long getNumReadsSelected() { - return readsSeen; - } - - public Collection getSelectedReads() { - throw new UnsupportedOperationException("Cannot directly get selected reads from a read partitioner."); - } - - public ReadSelector getSelectedReads(String sampleName) { - if(!readsBySample.containsKey(sampleName)) - throw new NoSuchElementException("Sample name not found"); - return readsBySample.get(sampleName); - } - - public void reset() { - for(SampleStorage storage: readsBySample.values()) - storage.reset(); - readsSeen = 0; - } - - private class SampleStorage implements ReadSelector { - private Collection reads = new LinkedList(); - private long readsSeen = 0; - - public void submitRead(SAMRecord read) { - reads.add(read); - readsSeen++; - } - - public void notifyReadRejected(SAMRecord read) { - readsSeen++; - } - - public void complete() { - // NO-OP. - } - - public long getNumReadsSeen() { - return readsSeen; - } - - public long getNumReadsSelected() { - return readsSeen; - } - - public Collection getSelectedReads() { - return reads; - } - - public void reset() { - reads.clear(); - readsSeen = 0; - } - } -} - - - diff --git a/java/src/org/broadinstitute/sting/gatk/iterators/LocusIteratorByState.java b/java/src/org/broadinstitute/sting/gatk/iterators/LocusIteratorByState.java index d608efe10..fdf24a127 100755 --- a/java/src/org/broadinstitute/sting/gatk/iterators/LocusIteratorByState.java +++ b/java/src/org/broadinstitute/sting/gatk/iterators/LocusIteratorByState.java @@ -26,13 +26,14 @@ package org.broadinstitute.sting.gatk.iterators; import net.sf.samtools.*; -import net.sf.picard.filter.SamRecordFilter; +import net.sf.picard.util.PeekableIterator; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.Reads; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; +import org.broadinstitute.sting.gatk.DownsamplingMethod; +import org.broadinstitute.sting.gatk.DownsampleType; import org.broadinstitute.sting.gatk.contexts.AlignmentContext; import org.broadinstitute.sting.utils.*; -import org.broadinstitute.sting.utils.sam.ReadUtils; import org.broadinstitute.sting.utils.pileup.*; import java.util.*; @@ -47,7 +48,6 @@ public class LocusIteratorByState extends LocusIterator { // //public enum Discard { ADAPTOR_BASES } //public static final EnumSet NO_DISCARDS = EnumSet.noneOf(Discard.class); - public static final List NO_FILTERS = Arrays.asList(); /** @@ -64,9 +64,11 @@ public class LocusIteratorByState extends LocusIterator { // member fields // // ----------------------------------------------------------------------------------------------------------------- - private final PushbackIterator it; private boolean hasExtendedEvents = false; // will be set to true if at least one read had an indel right before the current position + private final Collection sampleNames = new ArrayList(); + private final ReadStateManager readStates; + private class SAMRecordState { SAMRecord read; int readOffset = -1; // how far are we offset from the start of the read bases? @@ -256,12 +258,11 @@ public class LocusIteratorByState extends LocusIterator { } } - private LinkedList readStates = new LinkedList(); //final boolean DEBUG = false; //final boolean DEBUG2 = false && DEBUG; private Reads readInfo; private AlignmentContext nextAlignmentContext; - private List filters = new ArrayList(); + private List filters = new ArrayList(); // ----------------------------------------------------------------------------------------------------------------- // @@ -271,9 +272,16 @@ public class LocusIteratorByState extends LocusIterator { public LocusIteratorByState(final Iterator samIterator, Reads readInformation ) { this(samIterator, readInformation, NO_FILTERS); } - + public LocusIteratorByState(final Iterator samIterator, Reads readInformation, List filters ) { - this.it = new PushbackIterator(samIterator); + // Aggregate all sample names. + // TODO: Push in header via constructor + if(GenomeAnalysisEngine.instance != null && GenomeAnalysisEngine.instance.getDataSource() != null) { + sampleNames.addAll(SampleUtils.getSAMFileSamples(GenomeAnalysisEngine.instance.getSAMFileHeader())); + } + // Add a null sample name as a catch-all for reads without samples + if(!sampleNames.contains(null)) sampleNames.add(null); + readStates = new ReadStateManager(samIterator,readInformation.getDownsamplingMethod(),readInformation.getMaxReadsAtLocus(),sampleNames); this.readInfo = readInformation; this.filters = filters; overflowTracker = new LocusOverflowTracker(readInformation.getMaxReadsAtLocus()); @@ -293,24 +301,25 @@ public class LocusIteratorByState extends LocusIterator { //if ( DEBUG ) System.out.printf("hasNext() = %b%n", r); // if we don't have a next record, make sure we clean the warning queue + // TODO: Note that this implementation requires that hasNext() always be called before next(). if (!r) overflowTracker.cleanWarningQueue(); + return r; } public void printState() { - for ( SAMRecordState state : readStates ) { - logger.debug(String.format("printState():")); - SAMRecord read = state.getRead(); - int offset = state.getReadOffset(); - logger.debug(String.format(" read: %s(%d)=%s, cigar=%s", read.getReadName(), offset, (char)read.getReadBases()[offset], read.getCigarString())); + for(String sampleName: sampleNames) { + Iterator iterator = readStates.iteratorForSample(sampleName); + while(iterator.hasNext()) { + SAMRecordState state = iterator.next(); + logger.debug(String.format("printState():")); + SAMRecord read = state.getRead(); + int offset = state.getReadOffset(); + logger.debug(String.format(" read: %s(%d)=%s, cigar=%s", read.getReadName(), offset, (char)read.getReadBases()[offset], read.getCigarString())); + } } } - public void clear() { - logger.debug(String.format(("clear() called"))); - readStates.clear(); - } - private GenomeLoc getLocation() { return readStates.isEmpty() ? null : readStates.getFirst().getLocation(); } @@ -334,9 +343,9 @@ public class LocusIteratorByState extends LocusIterator { * nextAlignmentContext MUST BE null in order for this method to advance to the next entry. */ private void lazyLoadNextAlignmentContext() { - while(nextAlignmentContext == null && (!readStates.isEmpty() || it.hasNext())) { + while(nextAlignmentContext == null && readStates.hasNext()) { // this call will set hasExtendedEvents to true if it picks up a read with indel right before the current position on the ref: - collectPendingReads(readInfo.getMaxReadsAtLocus()); + readStates.collectPendingReads(); int size = 0; int nDeletions = 0; @@ -350,144 +359,113 @@ public class LocusIteratorByState extends LocusIterator { // In this case, the subsequent call to next() will emit the normal pileup at the current base // and shift the position. if (readInfo.generateExtendedEvents() && hasExtendedEvents) { - ArrayList indelPile = new ArrayList(readStates.size()); + Map> fullExtendedEventPileup = + new HashMap>(); - int maxDeletionLength = 0; - - for ( SAMRecordState state : readStates ) { - if ( state.hadIndel() ) { - size++; - if ( state.getEventBases() == null ) { - nDeletions++; - maxDeletionLength = Math.max(maxDeletionLength,state.getEventLength()); - } - else nInsertions++; - indelPile.add ( new ExtendedEventPileupElement(state.getRead(), - state.getReadEventStartOffset(), - state.getEventLength(), - state.getEventBases()) ); - - } else { - if ( state.getCurrentCigarOperator() != CigarOperator.N ) { - // this read has no indel associated with the previous position on the ref; - // we count this read in only if it has actual bases, not N span... - if ( state.getCurrentCigarOperator() != CigarOperator.D || readInfo.includeReadsWithDeletionAtLoci() ) { - - // if cigar operator is D but the read has no extended event reported (that's why we ended - // up in this branch), it means that we are currently inside a deletion that started earlier; - // we count such reads (with a longer deletion spanning over a deletion at the previous base we are - // about to report) only if includeReadsWithDeletionAtLoci is true. - size++; - indelPile.add ( new ExtendedEventPileupElement(state.getRead(), - state.getReadOffset()-1, - -1) // length=-1 --> noevent - ); - } - } - } - if ( state.getRead().getMappingQuality() == 0 ) { - nMQ0Reads++; - } - } - hasExtendedEvents = false; // we are done with extended events prior to current ref base SAMRecordState our1stState = readStates.getFirst(); // get current location on the reference and decrement it by 1: the indels we just stepped over // are associated with the *previous* reference base GenomeLoc loc = GenomeLocParser.incPos(our1stState.getLocation(),-1); + + for(String sampleName: sampleNames) { + Iterator iterator = readStates.iteratorForSample(sampleName); + ArrayList indelPile = new ArrayList(readStates.size()); + + size = 0; + nDeletions = 0; + nInsertions = 0; + nMQ0Reads = 0; + int maxDeletionLength = 0; + + while(iterator.hasNext()) { + SAMRecordState state = iterator.next(); + if ( state.hadIndel() ) { + size++; + if ( state.getEventBases() == null ) { + nDeletions++; + maxDeletionLength = Math.max(maxDeletionLength,state.getEventLength()); + } + else nInsertions++; + indelPile.add ( new ExtendedEventPileupElement(state.getRead(), + state.getReadEventStartOffset(), + state.getEventLength(), + state.getEventBases()) ); + + } else { + if ( state.getCurrentCigarOperator() != CigarOperator.N ) { + // this read has no indel associated with the previous position on the ref; + // we count this read in only if it has actual bases, not N span... + if ( state.getCurrentCigarOperator() != CigarOperator.D || readInfo.includeReadsWithDeletionAtLoci() ) { + + // if cigar operator is D but the read has no extended event reported (that's why we ended + // up in this branch), it means that we are currently inside a deletion that started earlier; + // we count such reads (with a longer deletion spanning over a deletion at the previous base we are + // about to report) only if includeReadsWithDeletionAtLoci is true. + size++; + indelPile.add ( new ExtendedEventPileupElement(state.getRead(), + state.getReadOffset()-1, + -1) // length=-1 --> noevent + ); + } + } + } + if ( state.getRead().getMappingQuality() == 0 ) { + nMQ0Reads++; + } + } + if( indelPile.size() != 0 ) fullExtendedEventPileup.put(sampleName,new ReadBackedExtendedEventPileupImpl(loc,indelPile,size,maxDeletionLength,nInsertions,nDeletions,nMQ0Reads)); + } + hasExtendedEvents = false; // we are done with extended events prior to current ref base // System.out.println("Indel(s) at "+loc); // for ( ExtendedEventPileupElement pe : indelPile ) { if ( pe.isIndel() ) System.out.println(" "+pe.toString()); } - nextAlignmentContext = new AlignmentContext(loc, new ReadBackedExtendedEventPileupImpl(loc, indelPile, size, maxDeletionLength, nInsertions, nDeletions, nMQ0Reads)); + nextAlignmentContext = new AlignmentContext(loc, new ReadBackedExtendedEventPileupImpl(loc, fullExtendedEventPileup)); } else { - ArrayList pile = new ArrayList(readStates.size()); + GenomeLoc location = getLocation(); + Map> fullPileup = new HashMap>(); // todo -- performance problem -- should be lazy, really - for ( SAMRecordState state : readStates ) { - if ( state.getCurrentCigarOperator() != CigarOperator.D && state.getCurrentCigarOperator() != CigarOperator.N ) { - if ( filterRead(state.getRead(), getLocation().getStart(), filters ) ) { - discarded_bases++; - //printStatus("Adaptor bases", discarded_adaptor_bases); - continue; - } else { - observed_bases++; - pile.add(new PileupElement(state.getRead(), state.getReadOffset())); - size++; - } - } else if ( readInfo.includeReadsWithDeletionAtLoci() && state.getCurrentCigarOperator() != CigarOperator.N ) { - size++; - pile.add(new PileupElement(state.getRead(), -1)); - nDeletions++; - } + for(String sampleName: sampleNames) { + Iterator iterator = readStates.iteratorForSample(sampleName); + ArrayList pile = new ArrayList(readStates.size()); - // todo -- this looks like a bug w.r.t. including reads with deletion at loci -- MAD 05/27/10 - if ( state.getRead().getMappingQuality() == 0 ) { - nMQ0Reads++; + size = 0; + nDeletions = 0; + nInsertions = 0; + nMQ0Reads = 0; + + while(iterator.hasNext()) { + SAMRecordState state = iterator.next(); + if ( state.getCurrentCigarOperator() != CigarOperator.D && state.getCurrentCigarOperator() != CigarOperator.N ) { + if ( filterRead(state.getRead(), location.getStart(), filters ) ) { + discarded_bases++; + //printStatus("Adaptor bases", discarded_adaptor_bases); + continue; + } else { + observed_bases++; + pile.add(new PileupElement(state.getRead(), state.getReadOffset())); + size++; + } + } else if ( readInfo.includeReadsWithDeletionAtLoci() && state.getCurrentCigarOperator() != CigarOperator.N ) { + size++; + pile.add(new PileupElement(state.getRead(), -1)); + nDeletions++; + } + + // todo -- this looks like a bug w.r.t. including reads with deletion at loci -- MAD 05/27/10 + if ( state.getRead().getMappingQuality() == 0 ) { + nMQ0Reads++; + } } + if( pile.size() != 0 ) fullPileup.put(sampleName,new ReadBackedPileupImpl(location,pile,size,nDeletions,nMQ0Reads)); } - GenomeLoc loc = getLocation(); updateReadStates(); // critical - must be called after we get the current state offsets and location // if we got reads with non-D/N over the current position, we are done - if ( pile.size() != 0 ) nextAlignmentContext = new AlignmentContext(loc, new ReadBackedPileupImpl(loc, pile, size, nDeletions, nMQ0Reads)); + if ( !fullPileup.isEmpty() ) nextAlignmentContext = new AlignmentContext(location, new ReadBackedPileupImpl(location,fullPileup)); } } } - private static boolean filterRead(SAMRecord rec, long pos, List filters) { - for ( LocusIteratorFilter filter : filters ) { - if ( filter.filterOut(rec, pos) ) { - return true; - } - } - - return false; - } - - private void printStatus(final String title, long n) { - if ( n % 10000 == 0 ) - System.out.printf("%s %d / %d = %.2f%n", title, n, observed_bases, 100.0 * n / (observed_bases + 1)); - } - - - - private void collectPendingReads(int maxReads) { - //if (DEBUG) { - // logger.debug(String.format("entering collectPendingReads..., hasNext=%b", it.hasNext())); - // printState(); - //} - GenomeLoc location = null; - - // the farthest right a read extends - Integer rightMostEnd = -1; - - int curSize = readStates.size(); // simple performance improvement -- avoids unnecessary size() operation - while (it.hasNext()) { - SAMRecord read = it.next(); - if (readIsPastCurrentPosition(read)) { - // We've collected up all the reads that span this region - it.pushback(read); - break; - } else { - if (curSize < maxReads) { - SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents()); - state.stepForwardOnGenome(); - readStates.add(state); - curSize++; - if (state.hadIndel()) hasExtendedEvents = true; - //if (DEBUG) logger.debug(String.format(" ... added read %s", read.getReadName())); - } else { - if (location == null) - location = GenomeLocParser.createGenomeLoc(read); - rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd; - } - } - - } - - if (location != null) - overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd), - curSize); - } - // fast testing of position private boolean readIsPastCurrentPosition(SAMRecord read) { if ( readStates.isEmpty() ) @@ -506,20 +484,35 @@ public class LocusIteratorByState extends LocusIterator { } } + private static boolean filterRead(SAMRecord rec, long pos, List filters) { + for ( LocusIteratorFilter filter : filters ) { + if ( filter.filterOut(rec, pos) ) { + return true; + } + } + return false; + } + + private void printStatus(final String title, long n) { + if ( n % 10000 == 0 ) + System.out.printf("%s %d / %d = %.2f%n", title, n, observed_bases, 100.0 * n / (observed_bases + 1)); + } private void updateReadStates() { - Iterator it = readStates.iterator(); - while ( it.hasNext() ) { - SAMRecordState state = it.next(); - CigarOperator op = state.stepForwardOnGenome(); - if ( state.hadIndel() && readInfo.generateExtendedEvents() ) hasExtendedEvents = true; - else { - // we discard the read only when we are past its end AND indel at the end of the read (if any) was - // already processed. Keeping the read state that retunred null upon stepForwardOnGenome() is safe - // as the next call to stepForwardOnGenome() will return null again AND will clear hadIndel() flag. - if ( op == null ) { // we've stepped off the end of the object - //if (DEBUG) logger.debug(String.format(" removing read %s at %d", state.getRead().getReadName(), state.getRead().getAlignmentStart())); - it.remove(); + for(String sampleName: sampleNames) { + Iterator it = readStates.iteratorForSample(sampleName); + while ( it.hasNext() ) { + SAMRecordState state = it.next(); + CigarOperator op = state.stepForwardOnGenome(); + if ( state.hadIndel() && readInfo.generateExtendedEvents() ) hasExtendedEvents = true; + else { + // we discard the read only when we are past its end AND indel at the end of the read (if any) was + // already processed. Keeping the read state that retunred null upon stepForwardOnGenome() is safe + // as the next call to stepForwardOnGenome() will return null again AND will clear hadIndel() flag. + if ( op == null ) { // we've stepped off the end of the object + //if (DEBUG) logger.debug(String.format(" removing read %s at %d", state.getRead().getReadName(), state.getRead().getAlignmentStart())); + it.remove(); + } } } } @@ -544,5 +537,561 @@ public class LocusIteratorByState extends LocusIterator { public LocusOverflowTracker getLocusOverflowTracker() { return this.overflowTracker; } + + private class ReadStateManager { + private final PeekableIterator iterator; + private final DownsamplingMethod downsamplingMethod; + + private final ReadSelector chainedReadSelector; + private final SamplePartitioner samplePartitioner; + + private final Map readStatesBySample = new HashMap(); + + private final int targetCoverage; + private final int maxReadsAtLocus; + + private int totalReadStates = 0; + + public ReadStateManager(Iterator source, DownsamplingMethod downsamplingMethod, int maxReadsAtLocus, Collection sampleNames) { + this.iterator = new PeekableIterator(source); + this.downsamplingMethod = downsamplingMethod; + switch(downsamplingMethod.type) { + case EXPERIMENTAL_BY_SAMPLE: + if(downsamplingMethod.toCoverage == null) + throw new StingException("Downsampling coverage (-dcov) must be specified when downsampling by sample"); + this.targetCoverage = downsamplingMethod.toCoverage; + break; + default: + this.targetCoverage = Integer.MAX_VALUE; + } + this.maxReadsAtLocus = maxReadsAtLocus; + + samplePartitioner = new SamplePartitioner(sampleNames); + for(String sampleName: sampleNames) + readStatesBySample.put(sampleName,new PerSampleReadStateManager()); + + ReadSelector primaryReadSelector= samplePartitioner; + + chainedReadSelector = maxReadsAtLocus!=Integer.MAX_VALUE ? new FirstNReadSelector(primaryReadSelector,maxReadsAtLocus) : primaryReadSelector; + } + + public Iterator iteratorForSample(final String sampleName) { + return new Iterator() { + private Iterator wrappedIterator = readStatesBySample.get(sampleName).iterator(); + + public boolean hasNext() { + return wrappedIterator.hasNext(); + } + + public SAMRecordState next() { + return wrappedIterator.next(); + } + + public void remove() { + wrappedIterator.remove(); + totalReadStates--; + } + }; + } + + public boolean isEmpty() { + return totalReadStates == 0; + } + + public int size() { + return totalReadStates; + } + + public SAMRecordState getFirst() { + for(String sampleName: sampleNames) { + PerSampleReadStateManager reads = readStatesBySample.get(sampleName); + if(!reads.isEmpty()) + return reads.peek(); + } + return null; + } + + public boolean hasNext() { + return totalReadStates > 0 || iterator.hasNext(); + } + + public void collectPendingReads() { + if(!iterator.hasNext()) + return; + + if(readStates.size() == 0) { + int firstContigIndex = iterator.peek().getReferenceIndex(); + int firstAlignmentStart = iterator.peek().getAlignmentStart(); + while(iterator.hasNext() && iterator.peek().getReferenceIndex() == firstContigIndex && iterator.peek().getAlignmentStart() == firstAlignmentStart) { + chainedReadSelector.submitRead(iterator.next()); + } + } + else { + // Fast fail in the case that the read is past the current position. + if(readIsPastCurrentPosition(iterator.peek())) + return; + + while (iterator.hasNext() && !readIsPastCurrentPosition(iterator.peek())) { + chainedReadSelector.submitRead(iterator.next()); + } + } + chainedReadSelector.complete(); + + for(String sampleName: sampleNames) { + ReadSelector aggregator = samplePartitioner.getSelectedReads(sampleName); + + Collection newReads = new ArrayList(aggregator.getSelectedReads()); + + PerSampleReadStateManager statesBySample = readStatesBySample.get(sampleName); + int numReads = statesBySample.size(); + + if(numReads+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.NONE) { + long readLimit = aggregator.getNumReadsSeen(); + boolean mrlViolation = false; + if(readLimit > maxReadsAtLocus-totalReadStates) { + readLimit = maxReadsAtLocus-totalReadStates; + mrlViolation = true; + } + addReadsToSample(statesBySample,newReads,readLimit,mrlViolation); + } + else { + int[] counts = statesBySample.getCountsPerAlignmentStart(); + int[] updatedCounts = new int[counts.length]; + System.arraycopy(counts,0,updatedCounts,0,counts.length); + + boolean readPruned = true; + while(numReads+newReads.size()>targetCoverage && readPruned) { + readPruned = false; + for(int alignmentStart=updatedCounts.length-1;numReads+newReads.size()>targetCoverage&&alignmentStart>=0;alignmentStart--) { + if(updatedCounts[alignmentStart] > 1) { + updatedCounts[alignmentStart]--; + numReads--; + readPruned = true; + } + } + } + + if(numReads == targetCoverage) { + updatedCounts[0]--; + numReads--; + } + + BitSet toPurge = new BitSet(readStates.size()); + int readOffset = 0; + + for(int i = 0; i < updatedCounts.length; i++) { + int n = counts[i]; + int k = updatedCounts[i]; + + for(Integer purgedElement: MathUtils.sampleIndicesWithoutReplacement(n,n-k)) + toPurge.set(readOffset+purgedElement); + + readOffset += counts[i]; + } + statesBySample.purge(toPurge); + addReadsToSample(statesBySample,newReads,targetCoverage-numReads,false); + } + } + chainedReadSelector.reset(); + } + + /** + * Add reads with the given sample name to the given hanger entry. + * @param readStates The list of read states to add this collection of reads. + * @param reads Reads to add. Selected reads will be pulled from this source. + * @param maxReads Maximum number of reads to add. + */ + private void addReadsToSample(final PerSampleReadStateManager readStates, final Collection reads, final long maxReads, boolean atMaxReadsAtLocusLimit) { + if(reads.isEmpty()) + return; + + GenomeLoc location = null; + + // the farthest right a read extends + Integer rightMostEnd = -1; + + Collection newReadStates = new LinkedList(); + int readCount = 0; + for(SAMRecord read: reads) { + if(readCount < maxReads) { + SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents()); + state.stepForwardOnGenome(); + newReadStates.add(state); + // TODO: What if we downsample the extended events away? + if (state.hadIndel()) hasExtendedEvents = true; + readCount++; + } + if(atMaxReadsAtLocusLimit) { + if (location == null) + location = GenomeLocParser.createGenomeLoc(read); + rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd; + } + } + readStates.addStatesAtNextAlignmentStart(newReadStates); + + if (location != null) + overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd), + totalReadStates); + } + + private class PerSampleReadStateManager implements Iterable { + private final Queue readStates = new LinkedList(); + private final Deque readStateCounter = new LinkedList(); + + public void addStatesAtNextAlignmentStart(Collection states) { + readStates.addAll(states); + readStateCounter.add(new Counter(states.size())); + totalReadStates += states.size(); + } + + public boolean isEmpty() { + return readStates.isEmpty(); + } + + public SAMRecordState peek() { + return readStates.peek(); + } + + public int size() { + return readStates.size(); + } + + public int[] getCountsPerAlignmentStart() { + int[] counts = new int[readStateCounter.size()]; + int index = 0; + for(Counter counter: readStateCounter) + counts[index++] = counter.getCount(); + return counts; + } + + public Iterator iterator() { + return new Iterator() { + private Iterator wrappedIterator = readStates.iterator(); + + public boolean hasNext() { + return wrappedIterator.hasNext(); + } + + public SAMRecordState next() { + return wrappedIterator.next(); + } + + public void remove() { + wrappedIterator.remove(); + Counter counter = readStateCounter.peek(); + counter.decrement(); + if(counter.getCount() == 0) + readStateCounter.remove(); + } + }; + } + + /** + * Purge the given elements from the bitset. If an element in the bitset is true, purge + * the corresponding read state. + * @param elements bits from the set to purge. + */ + public void purge(final BitSet elements) { + if(elements.isEmpty() || readStates.isEmpty()) return; + + Iterator readStateIterator = readStates.iterator(); + + Iterator counterIterator = readStateCounter.iterator(); + Counter currentCounter = counterIterator.next(); + + int readIndex = 0; + long alignmentStartCounter = currentCounter.getCount(); + + int toPurge = elements.nextSetBit(0); + int removedCount = 0; + + while(readStateIterator.hasNext() && toPurge >= 0) { + readStateIterator.next(); + + if(readIndex == toPurge) { + readStateIterator.remove(); + currentCounter.decrement(); + if(currentCounter.getCount() == 0) + counterIterator.remove(); + removedCount++; + toPurge = elements.nextSetBit(toPurge+1); + } + + readIndex++; + alignmentStartCounter--; + if(alignmentStartCounter == 0 && counterIterator.hasNext()) { + currentCounter = counterIterator.next(); + alignmentStartCounter = currentCounter.getCount(); + } + } + + totalReadStates -= removedCount; + } + } + } + + /** + * Note: assuming that, whenever we downsample, we downsample to an integer capacity. + */ + private class Counter { + private int count; + + public Counter(int count) { + this.count = count; + } + + public int getCount() { + return count; + } + + public void decrement() { + count--; + } + } } +/** + * Selects reads passed to it based on a criteria decided through inheritance. + * TODO: This is a temporary abstraction until we can get rid of this downsampling implementation and the mrl option. Get rid of this. + */ +interface ReadSelector { + /** + * All previous selectors in the chain have allowed this read. Submit it to this selector for consideration. + * @param read the read to evaluate. + */ + public void submitRead(SAMRecord read); + + /** + * A previous selector has deemed this read unfit. Notify this selector so that this selector's counts are valid. + * @param read the read previously rejected. + */ + public void notifyReadRejected(SAMRecord read); + + /** + * Signal the selector that read additions are complete. + */ + public void complete(); + + /** + * Retrieve the number of reads seen by this selector so far. + * @return number of reads seen. + */ + public long getNumReadsSeen(); + + /** + * Return the number of reads accepted by this selector so far. + * @return number of reads selected. + */ + public long getNumReadsSelected(); + + /** + * Get the reads selected by this selector. + * @return collection of reads selected by this selector. + */ + public Collection getSelectedReads(); + + /** + * Reset this collection to its pre-gathered state. + */ + public void reset(); +} + +/** + * Choose the first N reads from the submitted set. + */ +class FirstNReadSelector implements ReadSelector { + private final ReadSelector chainedSelector; + + private final Collection selectedReads = new LinkedList(); + private final long readLimit; + private long readsSeen = 0; + + public FirstNReadSelector(ReadSelector chainedSelector, long readLimit) { + this.chainedSelector = chainedSelector; + this.readLimit = readLimit; + } + + public void submitRead(SAMRecord read) { + if(readsSeen < readLimit) { + selectedReads.add(read); + if(chainedSelector != null) + chainedSelector.submitRead(read); + } + else + if(chainedSelector != null) + chainedSelector.notifyReadRejected(read); + readsSeen++; + } + + public void notifyReadRejected(SAMRecord read) { + if(chainedSelector != null) + chainedSelector.notifyReadRejected(read); + readsSeen++; + } + + public void complete() { + if(chainedSelector != null) + chainedSelector.complete(); + } + + public long getNumReadsSeen() { + return readsSeen; + } + + public long getNumReadsSelected() { + return selectedReads.size(); + } + + public Collection getSelectedReads() { + return selectedReads; + } + + public void reset() { + selectedReads.clear(); + readsSeen = 0; + if(chainedSelector != null) + chainedSelector.reset(); + } +} + +/** + * Select N reads randomly from the input stream. + */ +class NRandomReadSelector implements ReadSelector { + private final ReservoirDownsampler reservoir; + private final ReadSelector chainedSelector; + private long readsSeen = 0; + + public NRandomReadSelector(ReadSelector chainedSelector, long readLimit) { + this.reservoir = new ReservoirDownsampler((int)readLimit); + this.chainedSelector = chainedSelector; + } + + public void submitRead(SAMRecord read) { + SAMRecord displaced = reservoir.add(read); + if(displaced != null && chainedSelector != null) + chainedSelector.notifyReadRejected(read); + readsSeen++; + } + + public void notifyReadRejected(SAMRecord read) { + readsSeen++; + } + + public void complete() { + for(SAMRecord read: reservoir.getDownsampledContents()) + chainedSelector.submitRead(read); + if(chainedSelector != null) + chainedSelector.complete(); + } + + + public long getNumReadsSeen() { + return readsSeen; + } + + public long getNumReadsSelected() { + return reservoir.size(); + } + + public Collection getSelectedReads() { + return reservoir.getDownsampledContents(); + } + + public void reset() { + reservoir.clear(); + if(chainedSelector != null) + chainedSelector.reset(); + } +} + +class SamplePartitioner implements ReadSelector { + private final Map readsBySample; + private long readsSeen = 0; + + public SamplePartitioner(Collection sampleNames) { + readsBySample = new HashMap(); + for(String sampleName: sampleNames) + readsBySample.put(sampleName,new SampleStorage()); + } + + public void submitRead(SAMRecord read) { + String sampleName = read.getReadGroup()!=null ? read.getReadGroup().getSample() : null; + if(readsBySample.containsKey(sampleName)) + readsBySample.get(sampleName).submitRead(read); + readsSeen++; + } + + public void notifyReadRejected(SAMRecord read) { + String sampleName = read.getReadGroup()!=null ? read.getReadGroup().getSample() : null; + if(readsBySample.containsKey(sampleName)) + readsBySample.get(sampleName).notifyReadRejected(read); + readsSeen++; + } + + public void complete() { + // NO-OP. + } + + public long getNumReadsSeen() { + return readsSeen; + } + + public long getNumReadsSelected() { + return readsSeen; + } + + public Collection getSelectedReads() { + throw new UnsupportedOperationException("Cannot directly get selected reads from a read partitioner."); + } + + public ReadSelector getSelectedReads(String sampleName) { + if(!readsBySample.containsKey(sampleName)) + throw new NoSuchElementException("Sample name not found"); + return readsBySample.get(sampleName); + } + + public void reset() { + for(SampleStorage storage: readsBySample.values()) + storage.reset(); + readsSeen = 0; + } + + private class SampleStorage implements ReadSelector { + private Collection reads = new LinkedList(); + private long readsSeen = 0; + + public void submitRead(SAMRecord read) { + reads.add(read); + readsSeen++; + } + + public void notifyReadRejected(SAMRecord read) { + readsSeen++; + } + + public void complete() { + // NO-OP. + } + + public long getNumReadsSeen() { + return readsSeen; + } + + public long getNumReadsSelected() { + return readsSeen; + } + + public Collection getSelectedReads() { + return reads; + } + + public void reset() { + reads.clear(); + readsSeen = 0; + } + } +} + + + diff --git a/java/test/org/broadinstitute/sting/gatk/walkers/PileupWalkerIntegrationTest.java b/java/test/org/broadinstitute/sting/gatk/walkers/PileupWalkerIntegrationTest.java index a2b15ef29..517d6563e 100644 --- a/java/test/org/broadinstitute/sting/gatk/walkers/PileupWalkerIntegrationTest.java +++ b/java/test/org/broadinstitute/sting/gatk/walkers/PileupWalkerIntegrationTest.java @@ -19,7 +19,7 @@ public class PileupWalkerIntegrationTest extends WalkerTest { String gatk_args = "-T Pileup -I " + validationDataLocation + "FHS_Pileup_Test.bam " + "-R " + seqLocation + "references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta" + " -L chr15:46,347,148 -o %s"; - String expected_md5 = "d23032d10111755ccb1c1b01e6e097a7"; + String expected_md5 = "052187dd2bf2516a027578c8775856a8"; WalkerTestSpec spec = new WalkerTestSpec(gatk_args, 1, Arrays.asList(expected_md5)); executeTest("Testing the standard (no-indel) pileup on three merged FHS pools with 27 deletions in 969 bases", spec); }