First draft of a test script for I/O performance with the new asynchronous I/O processing.
Also includes convenience parameters for specifying the IO/CPU threading balance outside of a tag. Will be killed when Queue gets better support for tagged arguments (hopefully soon).
This commit is contained in:
parent
91413cf0d9
commit
b65db6a854
|
|
@ -280,8 +280,24 @@ public class GenomeAnalysisEngine {
|
|||
*/
|
||||
private void determineThreadAllocation() {
|
||||
Tags tags = parsingEngine.getTags(argCollection.numberOfThreads);
|
||||
Integer numCPUThreads = tags.containsKey("cpu") ? Integer.parseInt(tags.getValue("cpu")) : null;
|
||||
Integer numIOThreads = tags.containsKey("io") ? Integer.parseInt(tags.getValue("io")) : null;
|
||||
|
||||
// TODO: Kill this complicated logic once Queue supports arbitrary tagged parameters.
|
||||
Integer numCPUThreads = null;
|
||||
if(tags.containsKey("cpu") && argCollection.numberOfCPUThreads != null)
|
||||
throw new UserException("Number of CPU threads specified both directly on the command-line and as a tag to the nt argument. Please specify only one or the other.");
|
||||
else if(tags.containsKey("cpu"))
|
||||
numCPUThreads = Integer.parseInt(tags.getValue("cpu"));
|
||||
else if(argCollection.numberOfCPUThreads != null)
|
||||
numCPUThreads = argCollection.numberOfCPUThreads;
|
||||
|
||||
Integer numIOThreads = null;
|
||||
if(tags.containsKey("io") && argCollection.numberOfIOThreads != null)
|
||||
throw new UserException("Number of IO threads specified both directly on the command-line and as a tag to the nt argument. Please specify only one or the other.");
|
||||
else if(tags.containsKey("io"))
|
||||
numIOThreads = Integer.parseInt(tags.getValue("io"));
|
||||
else if(argCollection.numberOfIOThreads != null)
|
||||
numIOThreads = argCollection.numberOfIOThreads;
|
||||
|
||||
this.threadAllocation = new ThreadAllocation(argCollection.numberOfThreads,numCPUThreads,numIOThreads);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -198,6 +198,17 @@ public class GATKArgumentCollection {
|
|||
@Argument(fullName = "num_threads", shortName = "nt", doc = "How many threads should be allocated to running this analysis.", required = false)
|
||||
public Integer numberOfThreads = 1;
|
||||
|
||||
/**
|
||||
* The following two arguments (num_cpu_threads, num_io_threads are TEMPORARY since Queue cannot currently support arbitrary tagged data types.
|
||||
* TODO: Kill this when I can do a tagged integer in Queue.
|
||||
*/
|
||||
@Argument(fullName="num_cpu_threads", shortName = "nct", doc="How many of the given threads should be allocated to the CPU", required = false)
|
||||
@Hidden
|
||||
public Integer numberOfCPUThreads = null;
|
||||
@Argument(fullName="num_io_threads", shortName = "nit", doc="How many of the given threads should be allocated to IO", required = false)
|
||||
@Hidden
|
||||
public Integer numberOfIOThreads = null;
|
||||
|
||||
@Argument(fullName = "num_bam_file_handles", shortName = "bfh", doc="The total number of BAM file handles to keep open simultaneously", required=false)
|
||||
public Integer numberOfBAMFileHandles = null;
|
||||
|
||||
|
|
@ -369,6 +380,14 @@ public class GATKArgumentCollection {
|
|||
if (!other.numberOfThreads.equals(this.numberOfThreads)) {
|
||||
return false;
|
||||
}
|
||||
if ((this.numberOfCPUThreads == null && other.numberOfCPUThreads != null) ||
|
||||
this.numberOfCPUThreads.equals(other.numberOfCPUThreads) ) {
|
||||
return false;
|
||||
}
|
||||
if ((this.numberOfIOThreads == null && other.numberOfIOThreads != null) ||
|
||||
this.numberOfIOThreads.equals(other.numberOfIOThreads) ) {
|
||||
return false;
|
||||
}
|
||||
if ((other.numberOfBAMFileHandles == null && this.numberOfBAMFileHandles != null) ||
|
||||
(other.numberOfBAMFileHandles != null && !other.numberOfBAMFileHandles.equals(this.numberOfBAMFileHandles))) {
|
||||
return false;
|
||||
|
|
|
|||
|
|
@ -242,8 +242,10 @@ public class SAMDataSource {
|
|||
|
||||
this.threadAllocation = threadAllocation;
|
||||
// TODO: Consider a borrowed-thread dispatcher implementation.
|
||||
if(this.threadAllocation.getNumIOThreads() > 0)
|
||||
if(this.threadAllocation.getNumIOThreads() > 0) {
|
||||
logger.info("Running in asynchronous I/O mode; number of threads = " + this.threadAllocation.getNumIOThreads());
|
||||
dispatcher = new BGZFBlockLoadingDispatcher(this.threadAllocation.getNumIOThreads(), numFileHandles != null ? numFileHandles : 1);
|
||||
}
|
||||
else
|
||||
dispatcher = null;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue