New GATK argument interface for data and cpu threads

-- Closes GSA-515 Nanoscheduler GSA-542 Good interface to nanoScheduler
-- Old -nt means dataThreads
-- New -cnt (--num_cpu_threads_per_data_thread) gives you n cpu threads for each data thread in the system
-- Cleanup logic for handling data and cpu threading in HMS, LMS, and MS
-- GATKRunReport reports the total number of threads in use by the GATK, not just the nt value
-- Removed the io,cpu tags for nt.  Stupid system if you ask me.  Cleaned up the GenomeAnalysisEngine and ThreadAllocation handling to be totally straightforward now
This commit is contained in:
Mark DePristo 2012-09-05 15:41:52 -04:00
parent 1e55475adc
commit 9bf1d138d9
8 changed files with 123 additions and 103 deletions

View File

@ -400,28 +400,22 @@ public class GenomeAnalysisEngine {
* Parse out the thread allocation from the given command-line argument.
*/
private void determineThreadAllocation() {
Tags tags = parsingEngine.getTags(argCollection.numberOfThreads);
if ( argCollection.numberOfDataThreads < 1 ) throw new UserException.BadArgumentValue("num_threads", "cannot be less than 1, but saw " + argCollection.numberOfDataThreads);
if ( argCollection.numberOfCPUThreadsPerDataThread < 1 ) throw new UserException.BadArgumentValue("num_cpu_threads", "cannot be less than 1, but saw " + argCollection.numberOfCPUThreadsPerDataThread);
if ( argCollection.numberOfIOThreads < 0 ) throw new UserException.BadArgumentValue("num_io_threads", "cannot be less than 0, but saw " + argCollection.numberOfIOThreads);
// TODO: Kill this complicated logic once Queue supports arbitrary tagged parameters.
Integer numCPUThreads = null;
if(tags.containsKey("cpu") && argCollection.numberOfCPUThreads != null)
throw new UserException("Number of CPU threads specified both directly on the command-line and as a tag to the nt argument. Please specify only one or the other.");
else if(tags.containsKey("cpu"))
numCPUThreads = Integer.parseInt(tags.getValue("cpu"));
else if(argCollection.numberOfCPUThreads != null)
numCPUThreads = argCollection.numberOfCPUThreads;
Integer numIOThreads = null;
if(tags.containsKey("io") && argCollection.numberOfIOThreads != null)
throw new UserException("Number of IO threads specified both directly on the command-line and as a tag to the nt argument. Please specify only one or the other.");
else if(tags.containsKey("io"))
numIOThreads = Integer.parseInt(tags.getValue("io"));
else if(argCollection.numberOfIOThreads != null)
numIOThreads = argCollection.numberOfIOThreads;
this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads, numCPUThreads, numIOThreads, ! argCollection.disableEfficiencyMonitor);
this.threadAllocation = new ThreadAllocation(argCollection.numberOfDataThreads,
argCollection.numberOfCPUThreadsPerDataThread,
argCollection.numberOfIOThreads,
! argCollection.disableEfficiencyMonitor);
}
public int getTotalNumberOfThreads() {
return this.threadAllocation == null ? 1 : threadAllocation.getTotalNumThreads();
}
/**
* Allow subclasses and others within this package direct access to the walker manager.
* @return The walker manager used by this package.

View File

@ -287,9 +287,32 @@ public class GATKArgumentCollection {
@Argument(fullName = "unsafe", shortName = "U", doc = "If set, enables unsafe operations: nothing will be checked at runtime. For expert users only who know what they are doing. We do not support usage of this argument.", required = false)
public ValidationExclusion.TYPE unsafe;
/** How many threads should be allocated to this analysis. */
@Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false)
public Integer numberOfThreads = 1;
// --------------------------------------------------------------------------------------------------------------
//
// Multi-threading arguments
//
// --------------------------------------------------------------------------------------------------------------
/**
* How many data threads should be allocated to this analysis? Data threads contains N cpu threads per
* data thread, and act as completely data parallel processing, increasing the memory usage of GATK
* by M data threads. Data threads generally scale extremely effectively, up to 24 cores
*/
@Argument(fullName = "num_threads", shortName = "nt", doc = "How many data threads should be allocated to running this analysis.", required = false)
public Integer numberOfDataThreads = 1;
/**
* How many CPU threads should be allocated per data thread? Each CPU thread operates the map
* cycle independently, but may run into earlier scaling problems with IO than data threads. Has
* the benefit of not requiring X times as much memory per thread as data threads do, but rather
* only a constant overhead.
*/
@Argument(fullName="num_cpu_threads_per_data_thread", shortName = "cnt", doc="How many CPU threads should be allocated per data thread to running this analysis?", required = false)
public int numberOfCPUThreadsPerDataThread = 1;
@Argument(fullName="num_io_threads", shortName = "nit", doc="How many of the given threads should be allocated to IO", required = false)
@Hidden
public int numberOfIOThreads = 0;
/**
* By default the GATK monitors its own efficiency, but this can have a itsy-bitsy tiny
@ -299,24 +322,9 @@ public class GATKArgumentCollection {
@Argument(fullName = "disableThreadEfficiencyMonitor", shortName = "dtem", doc = "Disable GATK efficiency monitoring", required = false)
public Boolean disableEfficiencyMonitor = false;
/**
* The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types.
* TODO: Kill this when I can do a tagged integer in Queue.
*/
@Argument(fullName="num_cpu_threads", shortName = "nct", doc="How many of the given threads should be allocated to the CPU", required = false)
@Hidden
public Integer numberOfCPUThreads = null;
@Argument(fullName="num_io_threads", shortName = "nit", doc="How many of the given threads should be allocated to IO", required = false)
@Hidden
public Integer numberOfIOThreads = null;
@Argument(fullName = "num_bam_file_handles", shortName = "bfh", doc="The total number of BAM file handles to keep open simultaneously", required=false)
public Integer numberOfBAMFileHandles = null;
@Argument(fullName="nanoThreads", shortName = "nanoThreads", doc="NanoThreading", required = false)
@Hidden
public int nanoThreads = 1;
@Input(fullName = "read_group_black_list", shortName="rgbl", doc="Filters out read groups matching <TAG>:<STRING> or a .txt file containing the filter strings one per line.", required = false)
public List<String> readGroupBlackList = null;

View File

@ -8,6 +8,7 @@ import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.io.OutputTracker;
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
@ -76,21 +77,21 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/**
* Create a new hierarchical microscheduler to process the given reads and reference.
*
* @param walker the walker used to process the dataset.
* @param reads Reads file(s) to process.
* @param reference Reference for driving the traversal.
* @param nThreadsToUse maximum number of threads to use to do the work
* @param walker the walker used to process the dataset.
* @param reads Reads file(s) to process.
* @param reference Reference for driving the traversal.
* @param threadAllocation How should we apply multi-threaded execution?
*/
protected HierarchicalMicroScheduler(final GenomeAnalysisEngine engine,
final Walker walker,
final SAMDataSource reads,
final IndexedFastaSequenceFile reference,
final Collection<ReferenceOrderedDataSource> rods,
final int nThreadsToUse,
final boolean monitorThreadPerformance ) {
super(engine, walker, reads, reference, rods, nThreadsToUse);
final ThreadAllocation threadAllocation) {
super(engine, walker, reads, reference, rods, threadAllocation);
if ( monitorThreadPerformance ) {
final int nThreadsToUse = threadAllocation.getNumDataThreads();
if ( threadAllocation.monitorThreadEfficiency() ) {
final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse);
setThreadEfficiencyMonitor(monitoringThreadFactory);
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory);

View File

@ -10,6 +10,7 @@ import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.io.DirectOutputTracker;
import org.broadinstitute.sting.gatk.io.OutputTracker;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.SampleUtils;
@ -39,13 +40,11 @@ public class LinearMicroScheduler extends MicroScheduler {
final SAMDataSource reads,
final IndexedFastaSequenceFile reference,
final Collection<ReferenceOrderedDataSource> rods,
final int numThreads, // may be > 1 if are nanoScheduling
final boolean monitorThreadPerformance ) {
super(engine, walker, reads, reference, rods, numThreads);
final ThreadAllocation threadAllocation) {
super(engine, walker, reads, reference, rods, threadAllocation);
if ( monitorThreadPerformance )
if ( threadAllocation.monitorThreadEfficiency() )
setThreadEfficiencyMonitor(new ThreadEfficiencyMonitor());
}
/**

View File

@ -100,27 +100,30 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* @return The best-fit microscheduler.
*/
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, ThreadAllocation threadAllocation) {
if (threadAllocation.getNumCPUThreads() > 1) {
if ( threadAllocation.isRunningInParallelMode() )
logger.info(String.format("Running the GATK in parallel mode with %d CPU threads for each of %d data threads",
threadAllocation.getNumCPUThreadsPerDataThread(), threadAllocation.getNumDataThreads()));
if ( threadAllocation.getNumDataThreads() > 1 ) {
if (walker.isReduceByInterval())
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads()));
if ( walker instanceof ReadWalker ) {
if ( ! (walker instanceof ThreadSafeMapReduce) ) badNT(engine, walker);
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
if ( ! (walker instanceof TreeReducible) ) {
throw badNT("nt", engine, walker);
} else {
// TODO -- update test for when nano scheduling only is an option
if ( ! (walker instanceof TreeReducible) ) badNT(engine, walker);
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation);
}
} else {
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
if ( threadAllocation.getNumCPUThreadsPerDataThread() > 1 && ! (walker instanceof ThreadSafeMapReduce) )
throw badNT("cnt", engine, walker);
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation);
}
}
private static void badNT(final GenomeAnalysisEngine engine, final Walker walker) {
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
private static UserException badNT(final String parallelArg, final GenomeAnalysisEngine engine, final Walker walker) {
throw new UserException.BadArgumentValue("nt",
String.format("The analysis %s currently does not support parallel execution with %s. " +
"Please run your analysis without the %s option.", engine.getWalkerName(walker.getClass()), parallelArg, parallelArg));
}
/**
@ -130,24 +133,27 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* @param reads The reads.
* @param reference The reference.
* @param rods the rods to include in the traversal
* @param numThreads the number of threads we are using in the underlying traversal
* @param threadAllocation the allocation of threads to use in the underlying traversal
*/
protected MicroScheduler(final GenomeAnalysisEngine engine,
final Walker walker,
final SAMDataSource reads,
final IndexedFastaSequenceFile reference,
final Collection<ReferenceOrderedDataSource> rods,
final int numThreads) {
final ThreadAllocation threadAllocation) {
this.engine = engine;
this.reads = reads;
this.reference = reference;
this.rods = rods;
if (walker instanceof ReadWalker) {
traversalEngine = numThreads > 1 ? new TraverseReadsNano(numThreads) : new TraverseReads();
traversalEngine = threadAllocation.getNumCPUThreadsPerDataThread() > 1
? new TraverseReadsNano(threadAllocation.getNumCPUThreadsPerDataThread())
: new TraverseReads();
} else if (walker instanceof LocusWalker) {
// TODO -- refactor to use better interface
traversalEngine = engine.getArguments().nanoThreads > 1 ? new TraverseLociNano(engine.getArguments().nanoThreads) : new TraverseLociLinear();
traversalEngine = threadAllocation.getNumCPUThreadsPerDataThread() > 1
? new TraverseLociNano(threadAllocation.getNumCPUThreadsPerDataThread())
: new TraverseLociLinear();
} else if (walker instanceof DuplicateWalker) {
traversalEngine = new TraverseDuplicates();
} else if (walker instanceof ReadPairWalker) {

View File

@ -32,9 +32,9 @@ import org.broadinstitute.sting.utils.classloader.JVMUtils;
import org.broadinstitute.sting.utils.codecs.vcf.VCFHeader;
import org.broadinstitute.sting.utils.codecs.vcf.VCFHeaderLine;
import org.broadinstitute.sting.utils.codecs.vcf.VCFUtils;
import org.broadinstitute.sting.utils.variantcontext.VariantContext;
import org.broadinstitute.sting.utils.variantcontext.writer.Options;
import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriter;
import org.broadinstitute.sting.utils.variantcontext.VariantContext;
import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriterFactory;
import java.io.File;
@ -269,7 +269,7 @@ public class VariantContextWriterStub implements Stub<VariantContextWriter>, Var
* @return
*/
public boolean alsoWriteBCFForTest() {
return engine.getArguments().numberOfThreads == 1 && // only works single threaded
return engine.getArguments().numberOfDataThreads == 1 && // only works single threaded
! isCompressed() && // for non-compressed outputs
getFile() != null && // that are going to disk
engine.getArguments().generateShadowBCF; // and we actually want to do it

View File

@ -218,7 +218,7 @@ public class GATKRunReport {
// if there was an exception, capture it
this.mException = e == null ? null : new ExceptionToXML(e);
numThreads = engine.getArguments().numberOfThreads;
numThreads = engine.getTotalNumberOfThreads();
percentTimeRunning = getThreadEfficiencyPercent(engine, ThreadEfficiencyMonitor.State.USER_CPU);
percentTimeBlocking = getThreadEfficiencyPercent(engine, ThreadEfficiencyMonitor.State.BLOCKING);
percentTimeWaiting = getThreadEfficiencyPercent(engine, ThreadEfficiencyMonitor.State.WAITING);

View File

@ -24,7 +24,7 @@
package org.broadinstitute.sting.gatk.resourcemanagement;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
/**
* Models how threads are distributed between various components of the GATK.
@ -33,7 +33,12 @@ public class ThreadAllocation {
/**
* The number of CPU threads to be used by the GATK.
*/
private final int numCPUThreads;
private final int numDataThreads;
/**
* The number of CPU threads per data thread for GATK processing
*/
private final int numCPUThreadsPerDataThread;
/**
* Number of threads to devote exclusively to IO. Default is 0.
@ -45,8 +50,12 @@ public class ThreadAllocation {
*/
private final boolean monitorEfficiency;
public int getNumCPUThreads() {
return numCPUThreads;
public int getNumDataThreads() {
return numDataThreads;
}
public int getNumCPUThreadsPerDataThread() {
return numCPUThreadsPerDataThread;
}
public int getNumIOThreads() {
@ -57,47 +66,50 @@ public class ThreadAllocation {
return monitorEfficiency;
}
/**
* Are we running in parallel mode?
*
* @return true if any parallel processing is enabled
*/
public boolean isRunningInParallelMode() {
return getTotalNumThreads() > 1;
}
/**
* What is the total number of threads in use by the GATK?
*
* @return the sum of all thread allocations in this object
*/
public int getTotalNumThreads() {
return getNumDataThreads() + getNumCPUThreadsPerDataThread() + getNumIOThreads();
}
/**
* Construct the default thread allocation.
*/
public ThreadAllocation() {
this(1, null, null, false);
this(1, 1, 0, false);
}
/**
* Set up the thread allocation. Default allocation is 1 CPU thread, 0 IO threads.
* (0 IO threads means that no threads are devoted exclusively to IO; they're inline on the CPU thread).
* @param totalThreads Complete number of threads to allocate.
* @param numCPUThreads Total number of threads allocated to the traversal.
* @param numDataThreads Total number of threads allocated to the traversal.
* @param numCPUThreadsPerDataThread The number of CPU threads per data thread to allocate
* @param numIOThreads Total number of threads allocated exclusively to IO.
* @param monitorEfficiency should we monitor threading efficiency in the GATK?
*/
public ThreadAllocation(final int totalThreads, final Integer numCPUThreads, final Integer numIOThreads, final boolean monitorEfficiency) {
// If no allocation information is present, allocate all threads to CPU
if(numCPUThreads == null && numIOThreads == null) {
this.numCPUThreads = totalThreads;
this.numIOThreads = 0;
}
// If only CPU threads are specified, allocate remainder to IO (minimum 0 dedicated IO threads).
else if(numIOThreads == null) {
if(numCPUThreads > totalThreads)
throw new UserException(String.format("Invalid thread allocation. User requested %d threads in total, but the count of cpu threads (%d) is higher than the total threads",totalThreads,numCPUThreads));
this.numCPUThreads = numCPUThreads;
this.numIOThreads = totalThreads - numCPUThreads;
}
// If only IO threads are specified, allocate remainder to CPU (minimum 1 dedicated CPU thread).
else if(numCPUThreads == null) {
if(numIOThreads > totalThreads)
throw new UserException(String.format("Invalid thread allocation. User requested %d threads in total, but the count of io threads (%d) is higher than the total threads",totalThreads,numIOThreads));
this.numCPUThreads = Math.max(1,totalThreads-numIOThreads);
this.numIOThreads = numIOThreads;
}
else {
if(numCPUThreads + numIOThreads != totalThreads)
throw new UserException(String.format("Invalid thread allocation. User requested %d threads in total, but the count of cpu threads (%d) + the count of io threads (%d) does not match",totalThreads,numCPUThreads,numIOThreads));
this.numCPUThreads = numCPUThreads;
this.numIOThreads = numIOThreads;
}
public ThreadAllocation(final int numDataThreads,
final int numCPUThreadsPerDataThread,
final int numIOThreads,
final boolean monitorEfficiency) {
if ( numDataThreads < 1 ) throw new ReviewedStingException("numDataThreads cannot be less than 1, but saw " + numDataThreads);
if ( numCPUThreadsPerDataThread < 1 ) throw new ReviewedStingException("numCPUThreadsPerDataThread cannot be less than 1, but saw " + numCPUThreadsPerDataThread);
if ( numIOThreads < 0 ) throw new ReviewedStingException("numIOThreads cannot be less than 0, but saw " + numIOThreads);
this.numDataThreads = numDataThreads;
this.numCPUThreadsPerDataThread = numCPUThreadsPerDataThread;
this.numIOThreads = numIOThreads;
this.monitorEfficiency = monitorEfficiency;
}
}