package org.broadinstitute.sting.gatk.executive; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.gatk.iterators.LocusIterator; import org.broadinstitute.sting.gatk.iterators.LocusIteratorByState; import org.broadinstitute.sting.gatk.iterators.LocusOverflowTracker; import org.broadinstitute.sting.gatk.Reads; import org.broadinstitute.sting.gatk.traversals.TraversalStatistics; import org.broadinstitute.sting.gatk.contexts.AlignmentContext; import java.util.*; import net.sf.samtools.SAMRecord; import net.sf.picard.util.PeekableIterator; import net.sf.picard.filter.FilteringIterator; import net.sf.picard.filter.SamRecordFilter; /** * Buffer shards of data which may or may not contain multiple loci into * iterators of all data which cover an interval. Its existence is an homage * to Mark's stillborn WindowMaker, RIP 2009. * * @author mhanna * @version 0.1 */ public class WindowMaker implements Iterable, Iterator { /** * Source information for iteration. */ private final Reads sourceInfo; /** * Hold the read iterator so that it can be closed later. */ private final StingSAMIterator readIterator; /** * The locus overflow tracker. */ private final LocusOverflowTracker locusOverflowTracker; /** * The data source for reads. Will probably come directly from the BAM file. */ private final PeekableIterator sourceIterator; /** * Stores the sequence of intervals that the windowmaker should be tracking. */ private final PeekableIterator intervalIterator; /** * In the case of monolithic sharding, this case returns whether the only shard has been generated. */ private boolean shardGenerated = false; /** * Create a new window maker with the given iterator as a data source, covering * the given inteervals. * @param iterator The data source for this window. * @param intervals The set of intervals over which to traverse. */ public WindowMaker(StingSAMIterator iterator, List intervals) { this.sourceInfo = iterator.getSourceInfo(); this.readIterator = iterator; LocusIterator locusIterator = new LocusIteratorByState(new FilteringIterator(iterator,new LocusStreamFilterFunc()),sourceInfo); this.locusOverflowTracker = locusIterator.getLocusOverflowTracker(); this.sourceIterator = new PeekableIterator(locusIterator); this.intervalIterator = intervals.size()>0 ? new PeekableIterator(intervals.iterator()) : null; } public Iterator iterator() { return this; } public boolean hasNext() { return (intervalIterator != null && intervalIterator.hasNext()) || !shardGenerated; } public WindowMakerIterator next() { shardGenerated = true; return new WindowMakerIterator(intervalIterator != null ? intervalIterator.next() : null); } public void remove() { throw new UnsupportedOperationException("Cannot remove from a window maker."); } public void close() { this.readIterator.close(); } public class WindowMakerIterator extends LocusIterator { /** * The locus for which this iterator is currently returning reads. */ private final GenomeLoc locus; public WindowMakerIterator(GenomeLoc locus) { this.locus = locus; seedNextLocus(); } public Reads getSourceInfo() { return sourceInfo; } public GenomeLoc getLocus() { return locus; } public WindowMakerIterator iterator() { return this; } public boolean hasNext() { // locus == null when doing monolithic sharding. // TODO: Move the monolithic sharding iterator so that we don't have to special case here. return sourceIterator.hasNext() && (locus == null || sourceIterator.peek().getLocation().overlapsP(locus)); } public AlignmentContext next() { if(!hasNext()) throw new NoSuchElementException("WindowMakerIterator is out of elements for this interval."); return sourceIterator.next(); } public LocusOverflowTracker getLocusOverflowTracker() { return locusOverflowTracker; } public void seedNextLocus() { // locus == null when doing monolithic sharding. // TODO: Move the monolithic sharding iterator so that we don't have to special case here. if(locus == null) return; while(sourceIterator.hasNext() && sourceIterator.peek().getLocation().isBefore(locus)) sourceIterator.next(); } } /** * Class to filter out un-handle-able reads from the stream. We currently are skipping * unmapped reads, non-primary reads, unaligned reads, and duplicate reads. */ private static class LocusStreamFilterFunc implements SamRecordFilter { SAMRecord lastRead = null; public boolean filterOut(SAMRecord rec) { boolean result = false; String why = ""; if (rec.getReadUnmappedFlag()) { TraversalStatistics.nUnmappedReads++; result = true; why = "Unmapped"; } else if (rec.getNotPrimaryAlignmentFlag()) { TraversalStatistics.nNotPrimary++; result = true; why = "Not Primary"; } else if (rec.getAlignmentStart() == SAMRecord.NO_ALIGNMENT_START) { TraversalStatistics.nBadAlignments++; result = true; why = "No alignment start"; } else if (rec.getDuplicateReadFlag()) { TraversalStatistics.nDuplicates++; result = true; why = "Duplicate reads"; } else { result = false; } if (result) { TraversalStatistics.nSkippedReads++; //System.out.printf(" [filter] %s => %b %s", rec.getReadName(), result, why); } else { TraversalStatistics.nReads++; } return result; } } }