diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/ShardDataProvider.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/ShardDataProvider.java index 803bd885b..4279381d7 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/ShardDataProvider.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/providers/ShardDataProvider.java @@ -94,6 +94,13 @@ public abstract class ShardDataProvider { return referenceOrderedData; } + /** + * @return true if reference ordered data will be provided by this shard + */ + public boolean hasReferenceOrderedData() { + return ! getReferenceOrderedData().isEmpty(); + } + /** * Create a data provider for the shard given the reads and reference. * @param shard The chunk of data over which traversals happen. diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 70cdaab22..9198d210d 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -88,7 +88,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar final Collection rods, final int nThreadsToUse, final boolean monitorThreadPerformance ) { - super(engine, walker, reads, reference, rods); + super(engine, walker, reads, reference, rods, nThreadsToUse); if ( monitorThreadPerformance ) { final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 7a6902fff..5bcb16c94 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -39,8 +39,9 @@ public class LinearMicroScheduler extends MicroScheduler { final SAMDataSource reads, final IndexedFastaSequenceFile reference, final Collection rods, + final int numThreads, // may be > 1 if are nanoScheduling final boolean monitorThreadPerformance ) { - super(engine, walker, reads, reference, rods); + super(engine, walker, reads, reference, rods, numThreads); if ( monitorThreadPerformance ) setThreadEfficiencyMonitor(new ThreadEfficiencyMonitor()); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 4becc5a78..9b4fe53ed 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -103,14 +103,16 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { if (walker instanceof TreeReducible && threadAllocation.getNumCPUThreads() > 1) { if(walker.isReduceByInterval()) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); - if(walker instanceof ReadWalker) - throw new UserException.BadArgumentValue("nt", String.format("The analysis %s is a read walker. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads())); - return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); + + if ( walker instanceof ReadWalker ) + return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); + else + return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); } else { if(threadAllocation.getNumCPUThreads() > 1) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); - return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.monitorThreadEfficiency()); + return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); } } @@ -121,15 +123,23 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * @param reads The reads. * @param reference The reference. * @param rods the rods to include in the traversal + * @param numThreads the number of threads we are using in the underlying traversal */ - protected MicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods) { + protected MicroScheduler(final GenomeAnalysisEngine engine, + final Walker walker, + final SAMDataSource reads, + final IndexedFastaSequenceFile reference, + final Collection rods, + final int numThreads) { this.engine = engine; this.reads = reads; this.reference = reference; this.rods = rods; if (walker instanceof ReadWalker) { - traversalEngine = new TraverseReads(); + traversalEngine = numThreads > 1 ? new TraverseReadsNano(numThreads) : new TraverseReads(); + } else if ( numThreads > 1 ) { + throw new IllegalArgumentException("BUG: numThreads > 1 but this is only allowed for ReadWalkers"); } else if (walker instanceof LocusWalker) { traversalEngine = new TraverseLoci(); } else if (walker instanceof DuplicateWalker) { diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java new file mode 100755 index 000000000..dc774230b --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2009 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ +package org.broadinstitute.sting.gatk.traversals; + +import net.sf.samtools.SAMRecord; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.gatk.contexts.ReferenceContext; +import org.broadinstitute.sting.gatk.datasources.providers.*; +import org.broadinstitute.sting.gatk.datasources.reads.ReadShard; +import org.broadinstitute.sting.gatk.refdata.ReadMetaDataTracker; +import org.broadinstitute.sting.gatk.walkers.ReadWalker; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.nanoScheduler.MapFunction; +import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; +import org.broadinstitute.sting.utils.nanoScheduler.ReduceFunction; +import org.broadinstitute.sting.utils.sam.GATKSAMRecord; + +/** + * @author aaron + * @version 1.0 + * @date Apr 24, 2009 + *

+ * Class TraverseReads + *

+ * This class handles traversing by reads in the new shardable style + */ +public class TraverseReadsNano extends TraversalEngine,ReadShardDataProvider> { + /** our log, which we want to capture anything from this class */ + protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class); + private static final boolean DEBUG = false; + final int bufferSize = ReadShard.MAX_READS; + final int mapGroupSize = bufferSize / 10 + 1; + final int nThreads; + + public TraverseReadsNano(int nThreads) { + this.nThreads = nThreads; + } + + @Override + protected String getTraversalType() { + return "reads"; + } + + /** + * Traverse by reads, given the data and the walker + * + * @param walker the walker to traverse with + * @param dataProvider the provider of the reads data + * @param sum the value of type T, specified by the walker, to feed to the walkers reduce function + * @return the reduce variable of the read walker + */ + public T traverse(ReadWalker walker, + ReadShardDataProvider dataProvider, + T sum) { + logger.debug(String.format("TraverseReadsNano.traverse Covered dataset is %s", dataProvider)); + + if( !dataProvider.hasReads() ) + throw new IllegalArgumentException("Unable to traverse reads; no read data is available."); + + if ( dataProvider.hasReferenceOrderedData() ) + throw new ReviewedStingException("Parallel read walkers currently don't support access to reference ordered data"); + + final ReadView reads = new ReadView(dataProvider); + final ReadReferenceView reference = new NotImplementedReadReferenceView(dataProvider); + final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); + + final NanoScheduler nanoScheduler = new NanoScheduler(bufferSize, mapGroupSize, nThreads); + nanoScheduler.setDebug(DEBUG); + final TraverseReadsMap myMap = new TraverseReadsMap(reads, reference, rodView, walker); + final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker); + + T result = nanoScheduler.execute(reads.iterator().iterator(), myMap, sum, myReduce); + nanoScheduler.shutdown(); + //printProgress(dataProvider.getShard(), ???); + + return result; + } + + private static class NotImplementedReadReferenceView extends ReadReferenceView { + private NotImplementedReadReferenceView(ShardDataProvider provider) { + super(provider); + } + + @Override + protected byte[] getReferenceBases(SAMRecord read) { + throw new ReviewedStingException("Parallel read walkers don't support accessing reference yet"); + } + + @Override + protected byte[] getReferenceBases(GenomeLoc genomeLoc) { + throw new ReviewedStingException("Parallel read walkers don't support accessing reference yet"); + } + } + + private class TraverseReadsReduce implements ReduceFunction { + final ReadWalker walker; + + private TraverseReadsReduce(ReadWalker walker) { + this.walker = walker; + } + + @Override + public T apply(M one, T sum) { + return walker.reduce(one, sum); + } + } + + private class TraverseReadsMap implements MapFunction { + final ReadView reads; + final ReadReferenceView reference; + final ReadBasedReferenceOrderedView rodView; + final ReadWalker walker; + + private TraverseReadsMap(ReadView reads, ReadReferenceView reference, ReadBasedReferenceOrderedView rodView, ReadWalker walker) { + this.reads = reads; + this.reference = reference; + this.rodView = rodView; + this.walker = walker; + } + + @Override + public M apply(final SAMRecord read) { + if ( ! walker.isDone() ) { + // ReferenceContext -- the reference bases covered by the read + final ReferenceContext refContext = ! read.getReadUnmappedFlag() && reference != null + ? reference.getReferenceContext(read) + : null; + + // update the number of reads we've seen + //dataProvider.getShard().getReadMetrics().incrementNumIterations(); + + // if the read is mapped, create a metadata tracker + final ReadMetaDataTracker tracker = read.getReferenceIndex() >= 0 ? rodView.getReferenceOrderedDataForRead(read) : null; + + final boolean keepMeP = walker.filter(refContext, (GATKSAMRecord) read); + if (keepMeP) { + return walker.map(refContext, (GATKSAMRecord) read, tracker); + } + } + + return null; // TODO -- what should we return in the case where the walker is done or the read is filtered? + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/PrintReads.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/PrintReads.java index 8257794d7..2b05e4dc5 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/PrintReads.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/PrintReads.java @@ -93,7 +93,7 @@ import java.util.TreeSet; @DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} ) @BAQMode(QualityMode = BAQ.QualityMode.ADD_TAG, ApplicationTime = BAQ.ApplicationTime.ON_OUTPUT) @Requires({DataSource.READS, DataSource.REFERENCE}) -public class PrintReads extends ReadWalker { +public class PrintReads extends ReadWalker implements TreeReducible { @Output(doc="Write output to this BAM filename instead of STDOUT", required = true) SAMFileWriter out; @@ -246,4 +246,8 @@ public class PrintReads extends ReadWalker { return output; } + @Override + public SAMFileWriter treeReduce(SAMFileWriter lhs, SAMFileWriter rhs) { + return lhs; // nothing to do + } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java index 5a9e5e7d2..d33db2925 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java @@ -6,6 +6,7 @@ import org.broadinstitute.sting.gatk.refdata.ReadMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.Requires; +import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.utils.help.DocumentedGATKFeature; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; @@ -40,15 +41,12 @@ import org.broadinstitute.sting.utils.sam.GATKSAMRecord; */ @DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} ) @Requires({DataSource.READS, DataSource.REFERENCE}) -public class CountReads extends ReadWalker { +public class CountReads extends ReadWalker implements TreeReducible { public Integer map(ReferenceContext ref, GATKSAMRecord read, ReadMetaDataTracker tracker) { - return 1; } - public Integer reduceInit() { return 0; } - - public Integer reduce(Integer value, Integer sum) { - return value + sum; - } + @Override public Integer reduceInit() { return 0; } + @Override public Integer reduce(Integer value, Integer sum) { return value + sum; } + @Override public Integer treeReduce(Integer lhs, Integer rhs) { return lhs + rhs; } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 63ae1958c..c587e44c6 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -50,6 +50,7 @@ public class NanoScheduler { final int nThreads; final ExecutorService executor; boolean shutdown = false; + boolean debug = false; /** * Create a new nanoschedule with the desire characteristics requested by the argument @@ -129,6 +130,20 @@ public class NanoScheduler { return shutdown; } + public boolean isDebug() { + return debug; + } + + private void debugPrint(final String format, Object ... args) { + if ( isDebug() ) + logger.info(String.format(format, args)); + } + + + public void setDebug(boolean debug) { + this.debug = debug; + } + /** * Execute a map/reduce job with this nanoScheduler * @@ -190,6 +205,7 @@ public class NanoScheduler { final MapFunction map, final ReduceType initialValue, final ReduceFunction reduce) { + debugPrint("Executing nanoScheduler with initial reduce value " + initialValue); ReduceType sum = initialValue; while ( inputReader.hasNext() ) { try { @@ -278,6 +294,7 @@ public class NanoScheduler { final List outputs = new LinkedList(); for ( final InputType input : inputs ) outputs.add(map.apply(input)); + debugPrint(" Processed %d elements with map", outputs.size()); return outputs; } } diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/ThreadEfficiencyMonitor.java b/public/java/src/org/broadinstitute/sting/utils/threading/ThreadEfficiencyMonitor.java index ef836a06d..9159f5657 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/ThreadEfficiencyMonitor.java +++ b/public/java/src/org/broadinstitute/sting/utils/threading/ThreadEfficiencyMonitor.java @@ -140,6 +140,7 @@ public class ThreadEfficiencyMonitor { logger.log(priority, String.format("CPU efficiency : %6.2f%% of time spent %s", getStatePercent(State.USER_CPU), State.USER_CPU.getUserFriendlyName())); logger.log(priority, String.format("Walker inefficiency : %6.2f%% of time spent %s", getStatePercent(State.BLOCKING), State.BLOCKING.getUserFriendlyName())); logger.log(priority, String.format("I/O inefficiency : %6.2f%% of time spent %s", getStatePercent(State.WAITING_FOR_IO), State.WAITING_FOR_IO.getUserFriendlyName())); + logger.log(priority, String.format("Thread inefficiency : %6.2f%% of time spent %s", getStatePercent(State.WAITING), State.WAITING.getUserFriendlyName())); } /**