diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 70201a6cc..417a0982f 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -100,22 +100,29 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * @return The best-fit microscheduler. */ public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, ThreadAllocation threadAllocation) { - if (walker instanceof TreeReducible && threadAllocation.getNumCPUThreads() > 1) { - if(walker.isReduceByInterval()) + if (threadAllocation.getNumCPUThreads() > 1) { + if (walker.isReduceByInterval()) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); + logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads())); - if ( walker instanceof ReadWalker ) + if ( walker instanceof ReadWalker ) { + if ( ! (walker instanceof ThreadSafeMapReduce) ) badNT(engine, walker); return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); - else + } else { + // TODO -- update test for when nano scheduling only is an option + if ( ! (walker instanceof TreeReducible) ) badNT(engine, walker); return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); + } } else { - if(threadAllocation.getNumCPUThreads() > 1) - throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); } } + private static void badNT(final GenomeAnalysisEngine engine, final Walker walker) { + throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); + } + /** * Create a microscheduler given the reads and reference. * diff --git a/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java b/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java index 2763bca7c..3ffe95e8b 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java +++ b/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java @@ -1,7 +1,6 @@ package org.broadinstitute.sting.gatk.iterators; import net.sf.samtools.SAMRecord; -import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.exceptions.UserException; @@ -48,7 +47,9 @@ public class VerifyingSamIterator implements StingSAMIterator { if(cur.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX || cur.getAlignmentStart() == SAMRecord.NO_ALIGNMENT_START) throw new UserException.MalformedBAM(last,String.format("read %s has inconsistent mapping information.",cur.format())); - return last.getAlignmentStart() > cur.getAlignmentStart(); + return (last.getReferenceIndex() > cur.getReferenceIndex()) || + (last.getReferenceIndex().equals(cur.getReferenceIndex()) && + last.getAlignmentStart() > cur.getAlignmentStart()); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/FlagStat.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/FlagStat.java index 6f28e8726..14d14aca5 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/FlagStat.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/FlagStat.java @@ -45,7 +45,7 @@ import java.text.NumberFormat; */ @DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} ) @Requires({DataSource.READS}) -public class FlagStat extends ReadWalker implements TreeReducible { +public class FlagStat extends ReadWalker implements ThreadSafeMapReduce { @Output PrintStream out; @@ -193,11 +193,6 @@ public class FlagStat extends ReadWalker implements TreeReducible { +public class PrintReads extends ReadWalker implements ThreadSafeMapReduce { @Output(doc="Write output to this BAM filename instead of STDOUT", required = true) SAMFileWriter out; @@ -245,9 +245,4 @@ public class PrintReads extends ReadWalker impleme output.addAlignment(read); return output; } - - @Override - public SAMFileWriter treeReduce(SAMFileWriter lhs, SAMFileWriter rhs) { - return lhs; // nothing to do - } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/ThreadSafeMapReduce.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/ThreadSafeMapReduce.java new file mode 100755 index 000000000..1ce469f8c --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/ThreadSafeMapReduce.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2010. The Broad Institute + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ +package org.broadinstitute.sting.gatk.walkers; + +/** + * Root parallelism interface. Walkers that implement this + * declare that their map function is thread-safe and so multiple + * map calls can be run in parallel in the same JVM instance. + */ +public interface ThreadSafeMapReduce { +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java index c950e07e4..8621c0e9d 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java @@ -13,7 +13,7 @@ package org.broadinstitute.sting.gatk.walkers; * shards of the data can reduce with each other, and the composite result * can be reduced with other composite results. */ -public interface TreeReducible { +public interface TreeReducible extends ThreadSafeMapReduce { /** * A composite, 'reduce of reduces' function. * @param lhs 'left-most' portion of data in the composite reduce. diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java index 72bda03e9..856ea77f5 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java @@ -6,7 +6,7 @@ import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.Requires; -import org.broadinstitute.sting.gatk.walkers.TreeReducible; +import org.broadinstitute.sting.gatk.walkers.ThreadSafeMapReduce; import org.broadinstitute.sting.utils.help.DocumentedGATKFeature; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; @@ -41,12 +41,11 @@ import org.broadinstitute.sting.utils.sam.GATKSAMRecord; */ @DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} ) @Requires({DataSource.READS, DataSource.REFERENCE}) -public class CountReads extends ReadWalker implements TreeReducible { +public class CountReads extends ReadWalker implements ThreadSafeMapReduce { public Integer map(ReferenceContext ref, GATKSAMRecord read, RefMetaDataTracker tracker) { return 1; } @Override public Integer reduceInit() { return 0; } @Override public Integer reduce(Integer value, Integer sum) { return value + sum; } - @Override public Integer treeReduce(Integer lhs, Integer rhs) { return lhs + rhs; } }