diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index 4914e71f3..cd0198a29 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -33,14 +33,13 @@ import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider import org.broadinstitute.sting.gatk.datasources.providers.ReadView; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; -import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.nanoScheduler.NSMapFunction; +import org.broadinstitute.sting.utils.nanoScheduler.NSProgressFunction; import org.broadinstitute.sting.utils.nanoScheduler.NSReduceFunction; import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; -import java.util.LinkedList; -import java.util.List; +import java.util.Iterator; /** * A nano-scheduling version of TraverseReads. @@ -60,6 +59,13 @@ public class TraverseReadsNano extends TraversalEngine, public TraverseReadsNano(int nThreads) { nanoScheduler = new NanoScheduler(nThreads); + nanoScheduler.setProgressFunction(new NSProgressFunction() { + @Override + public void progress(MapData lastProcessedMap) { + if ( lastProcessedMap.refContext != null ) + printProgress(lastProcessedMap.refContext.getLocus()); + } + }); } @Override @@ -88,14 +94,10 @@ public class TraverseReadsNano extends TraversalEngine, final TraverseReadsMap myMap = new TraverseReadsMap(walker); final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker); - final List aggregatedInputs = aggregateMapData(dataProvider); - final T result = nanoScheduler.execute(aggregatedInputs.iterator(), myMap, sum, myReduce); - - final GATKSAMRecord lastRead = aggregatedInputs.get(aggregatedInputs.size() - 1).read; - final GenomeLoc locus = engine.getGenomeLocParser().createGenomeLoc(lastRead); + final Iterator aggregatedInputs = aggregateMapData(dataProvider); + final T result = nanoScheduler.execute(aggregatedInputs, myMap, sum, myReduce); updateCumulativeMetrics(dataProvider.getShard()); - printProgress(locus); return result; } @@ -108,29 +110,37 @@ public class TraverseReadsNano extends TraversalEngine, * @return a linked list of MapData objects holding the read, ref, and ROD info for every map/reduce * should execute */ - private List aggregateMapData(final ReadShardDataProvider dataProvider) { - final ReadView reads = new ReadView(dataProvider); - final ReadReferenceView reference = new ReadReferenceView(dataProvider); - final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); + private Iterator aggregateMapData(final ReadShardDataProvider dataProvider) { + return new Iterator() { + final ReadView reads = new ReadView(dataProvider); + final ReadReferenceView reference = new ReadReferenceView(dataProvider); + final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); + final Iterator readIterator = reads.iterator(); - final List mapData = new LinkedList(); - for ( final SAMRecord read : reads ) { - final ReferenceContext refContext = ! read.getReadUnmappedFlag() - ? reference.getReferenceContext(read) - : null; + @Override public boolean hasNext() { return readIterator.hasNext(); } - // if the read is mapped, create a metadata tracker - final RefMetaDataTracker tracker = read.getReferenceIndex() >= 0 - ? rodView.getReferenceOrderedDataForRead(read) - : null; + @Override + public MapData next() { + final SAMRecord read = readIterator.next(); + final ReferenceContext refContext = ! read.getReadUnmappedFlag() + ? reference.getReferenceContext(read) + : null; - // update the number of reads we've seen - dataProvider.getShard().getReadMetrics().incrementNumIterations(); + // if the read is mapped, create a metadata tracker + final RefMetaDataTracker tracker = read.getReferenceIndex() >= 0 + ? rodView.getReferenceOrderedDataForRead(read) + : null; - mapData.add(new MapData((GATKSAMRecord)read, refContext, tracker)); - } + // update the number of reads we've seen + dataProvider.getShard().getReadMetrics().incrementNumIterations(); - return mapData; + return new MapData((GATKSAMRecord)read, refContext, tracker); + } + + @Override public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + }; } @Override