diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 1b4333ce2..fa28b02cd 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -400,28 +400,22 @@ public class GenomeAnalysisEngine { * Parse out the thread allocation from the given command-line argument. */ private void determineThreadAllocation() { - Tags tags = parsingEngine.getTags(argCollection.numberOfThreads); + if ( argCollection.numberOfDataThreads < 1 ) throw new UserException.BadArgumentValue("num_threads", "cannot be less than 1, but saw " + argCollection.numberOfDataThreads); + if ( argCollection.numberOfCPUThreadsPerDataThread < 1 ) throw new UserException.BadArgumentValue("num_cpu_threads", "cannot be less than 1, but saw " + argCollection.numberOfCPUThreadsPerDataThread); + if ( argCollection.numberOfIOThreads < 0 ) throw new UserException.BadArgumentValue("num_io_threads", "cannot be less than 0, but saw " + argCollection.numberOfIOThreads); - // TODO: Kill this complicated logic once Queue supports arbitrary tagged parameters. - Integer numCPUThreads = null; - if(tags.containsKey("cpu") && argCollection.numberOfCPUThreads != null) - throw new UserException("Number of CPU threads specified both directly on the command-line and as a tag to the nt argument. Please specify only one or the other."); - else if(tags.containsKey("cpu")) - numCPUThreads = Integer.parseInt(tags.getValue("cpu")); - else if(argCollection.numberOfCPUThreads != null) - numCPUThreads = argCollection.numberOfCPUThreads; - - Integer numIOThreads = null; - if(tags.containsKey("io") && argCollection.numberOfIOThreads != null) - throw new UserException("Number of IO threads specified both directly on the command-line and as a tag to the nt argument. Please specify only one or the other."); - else if(tags.containsKey("io")) - numIOThreads = Integer.parseInt(tags.getValue("io")); - else if(argCollection.numberOfIOThreads != null) - numIOThreads = argCollection.numberOfIOThreads; - - this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, ! argCollection.disableEfficiencyMonitor); + this.threadAllocation = new ThreadAllocation(argCollection.numberOfDataThreads, + argCollection.numberOfCPUThreadsPerDataThread, + argCollection.numberOfIOThreads, + ! argCollection.disableEfficiencyMonitor); } + public int getTotalNumberOfThreads() { + return this.threadAllocation == null ? 1 : threadAllocation.getTotalNumThreads(); + } + + + /** * Allow subclasses and others within this package direct access to the walker manager. * @return The walker manager used by this package. diff --git a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index 33400bd9e..b9e44d87b 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/public/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -287,9 +287,32 @@ public class GATKArgumentCollection { @Argument(fullName = "unsafe", shortName = "U", doc = "If set, enables unsafe operations: nothing will be checked at runtime. For expert users only who know what they are doing. We do not support usage of this argument.", required = false) public ValidationExclusion.TYPE unsafe; - /** How many threads should be allocated to this analysis. */ - @Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false) - public Integer numberOfThreads = 1; + // -------------------------------------------------------------------------------------------------------------- + // + // Multi-threading arguments + // + // -------------------------------------------------------------------------------------------------------------- + + /** + * How many data threads should be allocated to this analysis? Data threads contains N cpu threads per + * data thread, and act as completely data parallel processing, increasing the memory usage of GATK + * by M data threads. Data threads generally scale extremely effectively, up to 24 cores + */ + @Argument(fullName = "num_threads", shortName = "nt", doc = "How many data threads should be allocated to running this analysis.", required = false) + public Integer numberOfDataThreads = 1; + + /** + * How many CPU threads should be allocated per data thread? Each CPU thread operates the map + * cycle independently, but may run into earlier scaling problems with IO than data threads. Has + * the benefit of not requiring X times as much memory per thread as data threads do, but rather + * only a constant overhead. + */ + @Argument(fullName="num_cpu_threads_per_data_thread", shortName = "cnt", doc="How many CPU threads should be allocated per data thread to running this analysis?", required = false) + public int numberOfCPUThreadsPerDataThread = 1; + + @Argument(fullName="num_io_threads", shortName = "nit", doc="How many of the given threads should be allocated to IO", required = false) + @Hidden + public int numberOfIOThreads = 0; /** * By default the GATK monitors its own efficiency, but this can have a itsy-bitsy tiny @@ -299,24 +322,9 @@ public class GATKArgumentCollection { @Argument(fullName = "disableThreadEfficiencyMonitor", shortName = "dtem", doc = "Disable GATK efficiency monitoring", required = false) public Boolean disableEfficiencyMonitor = false; - /** - * The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types. - * TODO: Kill this when I can do a tagged integer in Queue. - */ - @Argument(fullName="num_cpu_threads", shortName = "nct", doc="How many of the given threads should be allocated to the CPU", required = false) - @Hidden - public Integer numberOfCPUThreads = null; - @Argument(fullName="num_io_threads", shortName = "nit", doc="How many of the given threads should be allocated to IO", required = false) - @Hidden - public Integer numberOfIOThreads = null; - @Argument(fullName = "num_bam_file_handles", shortName = "bfh", doc="The total number of BAM file handles to keep open simultaneously", required=false) public Integer numberOfBAMFileHandles = null; - @Argument(fullName="nanoThreads", shortName = "nanoThreads", doc="NanoThreading", required = false) - @Hidden - public int nanoThreads = 1; - @Input(fullName = "read_group_black_list", shortName="rgbl", doc="Filters out read groups matching : or a .txt file containing the filter strings one per line.", required = false) public List readGroupBlackList = null; diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 9198d210d..f1d2f7b5b 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -8,6 +8,7 @@ import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.io.OutputTracker; import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; +import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; @@ -76,21 +77,21 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Create a new hierarchical microscheduler to process the given reads and reference. * - * @param walker the walker used to process the dataset. - * @param reads Reads file(s) to process. - * @param reference Reference for driving the traversal. - * @param nThreadsToUse maximum number of threads to use to do the work + * @param walker the walker used to process the dataset. + * @param reads Reads file(s) to process. + * @param reference Reference for driving the traversal. + * @param threadAllocation How should we apply multi-threaded execution? */ protected HierarchicalMicroScheduler(final GenomeAnalysisEngine engine, final Walker walker, final SAMDataSource reads, final IndexedFastaSequenceFile reference, final Collection rods, - final int nThreadsToUse, - final boolean monitorThreadPerformance ) { - super(engine, walker, reads, reference, rods, nThreadsToUse); + final ThreadAllocation threadAllocation) { + super(engine, walker, reads, reference, rods, threadAllocation); - if ( monitorThreadPerformance ) { + final int nThreadsToUse = threadAllocation.getNumDataThreads(); + if ( threadAllocation.monitorThreadEfficiency() ) { final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse); setThreadEfficiencyMonitor(monitoringThreadFactory); this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 740bcb566..ceb4a6f9b 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -10,6 +10,7 @@ import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.io.DirectOutputTracker; import org.broadinstitute.sting.gatk.io.OutputTracker; +import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.SampleUtils; @@ -39,13 +40,11 @@ public class LinearMicroScheduler extends MicroScheduler { final SAMDataSource reads, final IndexedFastaSequenceFile reference, final Collection rods, - final int numThreads, // may be > 1 if are nanoScheduling - final boolean monitorThreadPerformance ) { - super(engine, walker, reads, reference, rods, numThreads); + final ThreadAllocation threadAllocation) { + super(engine, walker, reads, reference, rods, threadAllocation); - if ( monitorThreadPerformance ) + if ( threadAllocation.monitorThreadEfficiency() ) setThreadEfficiencyMonitor(new ThreadEfficiencyMonitor()); - } /** diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 073a46ee3..bc0d5da96 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -100,27 +100,30 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * @return The best-fit microscheduler. */ public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, ThreadAllocation threadAllocation) { - if (threadAllocation.getNumCPUThreads() > 1) { + if ( threadAllocation.isRunningInParallelMode() ) + logger.info(String.format("Running the GATK in parallel mode with %d CPU threads for each of %d data threads", + threadAllocation.getNumCPUThreadsPerDataThread(), threadAllocation.getNumDataThreads())); + + if ( threadAllocation.getNumDataThreads() > 1 ) { if (walker.isReduceByInterval()) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); - logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads())); - - if ( walker instanceof ReadWalker ) { - if ( ! (walker instanceof ThreadSafeMapReduce) ) badNT(engine, walker); - return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); + if ( ! (walker instanceof TreeReducible) ) { + throw badNT("nt", engine, walker); } else { - // TODO -- update test for when nano scheduling only is an option - if ( ! (walker instanceof TreeReducible) ) badNT(engine, walker); - return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); + return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation); } } else { - return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); + if ( threadAllocation.getNumCPUThreadsPerDataThread() > 1 && ! (walker instanceof ThreadSafeMapReduce) ) + throw badNT("cnt", engine, walker); + return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation); } } - private static void badNT(final GenomeAnalysisEngine engine, final Walker walker) { - throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); + private static UserException badNT(final String parallelArg, final GenomeAnalysisEngine engine, final Walker walker) { + throw new UserException.BadArgumentValue("nt", + String.format("The analysis %s currently does not support parallel execution with %s. " + + "Please run your analysis without the %s option.", engine.getWalkerName(walker.getClass()), parallelArg, parallelArg)); } /** @@ -130,24 +133,27 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * @param reads The reads. * @param reference The reference. * @param rods the rods to include in the traversal - * @param numThreads the number of threads we are using in the underlying traversal + * @param threadAllocation the allocation of threads to use in the underlying traversal */ protected MicroScheduler(final GenomeAnalysisEngine engine, final Walker walker, final SAMDataSource reads, final IndexedFastaSequenceFile reference, final Collection rods, - final int numThreads) { + final ThreadAllocation threadAllocation) { this.engine = engine; this.reads = reads; this.reference = reference; this.rods = rods; if (walker instanceof ReadWalker) { - traversalEngine = numThreads > 1 ? new TraverseReadsNano(numThreads) : new TraverseReads(); + traversalEngine = threadAllocation.getNumCPUThreadsPerDataThread() > 1 + ? new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread()) + : new TraverseReads(); } else if (walker instanceof LocusWalker) { - // TODO -- refactor to use better interface - traversalEngine = engine.getArguments().nanoThreads > 1 ? new TraverseLociNano(engine.getArguments().nanoThreads) : new TraverseLociLinear(); + traversalEngine = threadAllocation.getNumCPUThreadsPerDataThread() > 1 + ? new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread()) + : new TraverseLociLinear(); } else if (walker instanceof DuplicateWalker) { traversalEngine = new TraverseDuplicates(); } else if (walker instanceof ReadPairWalker) { diff --git a/public/java/src/org/broadinstitute/sting/gatk/io/stubs/VariantContextWriterStub.java b/public/java/src/org/broadinstitute/sting/gatk/io/stubs/VariantContextWriterStub.java index 260a7efda..ee1dc63e6 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/io/stubs/VariantContextWriterStub.java +++ b/public/java/src/org/broadinstitute/sting/gatk/io/stubs/VariantContextWriterStub.java @@ -32,9 +32,9 @@ import org.broadinstitute.sting.utils.classloader.JVMUtils; import org.broadinstitute.sting.utils.codecs.vcf.VCFHeader; import org.broadinstitute.sting.utils.codecs.vcf.VCFHeaderLine; import org.broadinstitute.sting.utils.codecs.vcf.VCFUtils; +import org.broadinstitute.sting.utils.variantcontext.VariantContext; import org.broadinstitute.sting.utils.variantcontext.writer.Options; import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriter; -import org.broadinstitute.sting.utils.variantcontext.VariantContext; import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriterFactory; import java.io.File; @@ -269,7 +269,7 @@ public class VariantContextWriterStub implements Stub, Var * @return */ public boolean alsoWriteBCFForTest() { - return engine.getArguments().numberOfThreads == 1 && // only works single threaded + return engine.getArguments().numberOfDataThreads == 1 && // only works single threaded ! isCompressed() && // for non-compressed outputs getFile() != null && // that are going to disk engine.getArguments().generateShadowBCF; // and we actually want to do it diff --git a/public/java/src/org/broadinstitute/sting/gatk/phonehome/GATKRunReport.java b/public/java/src/org/broadinstitute/sting/gatk/phonehome/GATKRunReport.java index 6f3f175a2..51fed470f 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/phonehome/GATKRunReport.java +++ b/public/java/src/org/broadinstitute/sting/gatk/phonehome/GATKRunReport.java @@ -218,7 +218,7 @@ public class GATKRunReport { // if there was an exception, capture it this.mException = e == null ? null : new ExceptionToXML(e); - numThreads = engine.getArguments().numberOfThreads; + numThreads = engine.getTotalNumberOfThreads(); percentTimeRunning = getThreadEfficiencyPercent(engine, ThreadEfficiencyMonitor.State.USER_CPU); percentTimeBlocking = getThreadEfficiencyPercent(engine, ThreadEfficiencyMonitor.State.BLOCKING); percentTimeWaiting = getThreadEfficiencyPercent(engine, ThreadEfficiencyMonitor.State.WAITING); diff --git a/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java b/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java index caae55ac5..f958c9db8 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java +++ b/public/java/src/org/broadinstitute/sting/gatk/resourcemanagement/ThreadAllocation.java @@ -24,7 +24,7 @@ package org.broadinstitute.sting.gatk.resourcemanagement; -import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; /** * Models how threads are distributed between various components of the GATK. @@ -33,7 +33,12 @@ public class ThreadAllocation { /** * The number of CPU threads to be used by the GATK. */ - private final int numCPUThreads; + private final int numDataThreads; + + /** + * The number of CPU threads per data thread for GATK processing + */ + private final int numCPUThreadsPerDataThread; /** * Number of threads to devote exclusively to IO. Default is 0. @@ -45,8 +50,12 @@ public class ThreadAllocation { */ private final boolean monitorEfficiency; - public int getNumCPUThreads() { - return numCPUThreads; + public int getNumDataThreads() { + return numDataThreads; + } + + public int getNumCPUThreadsPerDataThread() { + return numCPUThreadsPerDataThread; } public int getNumIOThreads() { @@ -57,47 +66,50 @@ public class ThreadAllocation { return monitorEfficiency; } + /** + * Are we running in parallel mode? + * + * @return true if any parallel processing is enabled + */ + public boolean isRunningInParallelMode() { + return getTotalNumThreads() > 1; + } + + /** + * What is the total number of threads in use by the GATK? + * + * @return the sum of all thread allocations in this object + */ + public int getTotalNumThreads() { + return getNumDataThreads() + getNumCPUThreadsPerDataThread() + getNumIOThreads(); + } + /** * Construct the default thread allocation. */ public ThreadAllocation() { - this(1, null, null, false); + this(1, 1, 0, false); } /** * Set up the thread allocation. Default allocation is 1 CPU thread, 0 IO threads. * (0 IO threads means that no threads are devoted exclusively to IO; they're inline on the CPU thread). - * @param totalThreads Complete number of threads to allocate. - * @param numCPUThreads Total number of threads allocated to the traversal. + * @param numDataThreads Total number of threads allocated to the traversal. + * @param numCPUThreadsPerDataThread The number of CPU threads per data thread to allocate * @param numIOThreads Total number of threads allocated exclusively to IO. + * @param monitorEfficiency should we monitor threading efficiency in the GATK? */ - public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorEfficiency) { - // If no allocation information is present, allocate all threads to CPU - if(numCPUThreads == null && numIOThreads == null) { - this.numCPUThreads = totalThreads; - this.numIOThreads = 0; - } - // If only CPU threads are specified, allocate remainder to IO (minimum 0 dedicated IO threads). - else if(numIOThreads == null) { - if(numCPUThreads > totalThreads) - throw new UserException(String.format("Invalid thread allocation. User requested %d threads in total, but the count of cpu threads (%d) is higher than the total threads",totalThreads,numCPUThreads)); - this.numCPUThreads = numCPUThreads; - this.numIOThreads = totalThreads - numCPUThreads; - } - // If only IO threads are specified, allocate remainder to CPU (minimum 1 dedicated CPU thread). - else if(numCPUThreads == null) { - if(numIOThreads > totalThreads) - throw new UserException(String.format("Invalid thread allocation. User requested %d threads in total, but the count of io threads (%d) is higher than the total threads",totalThreads,numIOThreads)); - this.numCPUThreads = Math.max(1,totalThreads-numIOThreads); - this.numIOThreads = numIOThreads; - } - else { - if(numCPUThreads + numIOThreads != totalThreads) - throw new UserException(String.format("Invalid thread allocation. User requested %d threads in total, but the count of cpu threads (%d) + the count of io threads (%d) does not match",totalThreads,numCPUThreads,numIOThreads)); - this.numCPUThreads = numCPUThreads; - this.numIOThreads = numIOThreads; - } + public ThreadAllocation(final int numDataThreads, + final int numCPUThreadsPerDataThread, + final int numIOThreads, + final boolean monitorEfficiency) { + if ( numDataThreads < 1 ) throw new ReviewedStingException("numDataThreads cannot be less than 1, but saw " + numDataThreads); + if ( numCPUThreadsPerDataThread < 1 ) throw new ReviewedStingException("numCPUThreadsPerDataThread cannot be less than 1, but saw " + numCPUThreadsPerDataThread); + if ( numIOThreads < 0 ) throw new ReviewedStingException("numIOThreads cannot be less than 0, but saw " + numIOThreads); + this.numDataThreads = numDataThreads; + this.numCPUThreadsPerDataThread = numCPUThreadsPerDataThread; + this.numIOThreads = numIOThreads; this.monitorEfficiency = monitorEfficiency; } }