added support for the -M option in traversals.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@935 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
aaron 2009-06-08 15:12:24 +00:00
parent e2ed56dc96
commit a8a2d0eab9
13 changed files with 214 additions and 157 deletions

View File

@ -105,8 +105,8 @@ public class GATKArgumentCollection {
public Boolean walkAllLoci = false; public Boolean walkAllLoci = false;
@Element(required=false) @Element(required=false)
@Argument(fullName = "maximum_reads", shortName = "M", doc = "Maximum number of reads to process before exiting", required = false) @Argument(fullName = "maximum_reads", shortName = "M", doc = "Maximum number of iterations to process before exiting, the lower bound is zero. Intended only for testing", required = false)
public String maximumReads = "-1"; public Integer maximumEngineIterations = -1;
@Element(required=false) @Element(required=false)
@Argument(fullName = "sort_on_the_fly", shortName = "sort", doc = "Maximum number of reads to sort on the fly", required = false) @Argument(fullName = "sort_on_the_fly", shortName = "sort", doc = "Maximum number of reads to sort on the fly", required = false)
@ -215,7 +215,7 @@ public class GATKArgumentCollection {
if (!other.samFiles.equals(this.samFiles)) { if (!other.samFiles.equals(this.samFiles)) {
return false; return false;
} }
if (!other.maximumReads.equals(this.maximumReads)) { if (!other.maximumEngineIterations.equals(this.maximumEngineIterations)) {
return false; return false;
} }
if (!other.strictnessLevel.equals(this.strictnessLevel)) { if (!other.strictnessLevel.equals(this.strictnessLevel)) {

View File

@ -122,7 +122,7 @@ public class GenomeAnalysisEngine {
locs = GenomeLocSortedSet.createSetFromList(locationsList); locs = GenomeLocSortedSet.createSetFromList(locationsList);
// excute the microscheduler, storing the results // excute the microscheduler, storing the results
walkerReturn = microScheduler.execute(my_walker, locs); walkerReturn = microScheduler.execute(my_walker, locs, argCollection.maximumEngineIterations);
} }
@ -199,7 +199,7 @@ public class GenomeAnalysisEngine {
private void genericEngineSetup(ValidationStringency strictness) { private void genericEngineSetup(ValidationStringency strictness) {
engine.setStrictness(strictness); engine.setStrictness(strictness);
engine.setMaxReads(Integer.parseInt(argCollection.maximumReads)); engine.setMaxReads(argCollection.maximumEngineIterations);
engine.setFilterZeroMappingQualityReads(argCollection.filterZeroMappingQualityReads); engine.setFilterZeroMappingQualityReads(argCollection.filterZeroMappingQualityReads);
// we default interval files over the genome region string // we default interval files over the genome region string

View File

@ -52,8 +52,9 @@ public class ExpGrowthLocusShardStrategy extends LocusShardStrategy {
* *
* @param dic the seq dictionary * @param dic the seq dictionary
*/ */
ExpGrowthLocusShardStrategy(SAMSequenceDictionary dic, long startSize) { ExpGrowthLocusShardStrategy(SAMSequenceDictionary dic, long startSize, long limitByCount) {
super(dic); super(dic);
this.limitingFactor = limitByCount;
this.baseSize = startSize; this.baseSize = startSize;
currentExp = 0; currentExp = 0;
} }
@ -76,8 +77,9 @@ public class ExpGrowthLocusShardStrategy extends LocusShardStrategy {
* @param startSize the starting size of the shard * @param startSize the starting size of the shard
* @param lst locations to iterate from * @param lst locations to iterate from
*/ */
ExpGrowthLocusShardStrategy(SAMSequenceDictionary dic, long startSize, GenomeLocSortedSet lst) { ExpGrowthLocusShardStrategy(SAMSequenceDictionary dic, long startSize, GenomeLocSortedSet lst, long limitByCount) {
super(dic, lst); super(dic, lst);
this.limitingFactor = limitByCount;
this.baseSize = startSize; this.baseSize = startSize;
this.currentExp = 0; this.currentExp = 0;
} }

View File

@ -51,8 +51,9 @@ class LinearLocusShardStrategy extends LocusShardStrategy {
* *
* @param dic the seq dictionary * @param dic the seq dictionary
*/ */
LinearLocusShardStrategy(SAMSequenceDictionary dic, long startSize) { LinearLocusShardStrategy(SAMSequenceDictionary dic, long startSize, long limitByCount) {
super(dic); super(dic);
this.limitingFactor = limitByCount;
this.nextShardSize = startSize; this.nextShardSize = startSize;
} }
@ -73,8 +74,9 @@ class LinearLocusShardStrategy extends LocusShardStrategy {
* @param startSize the starting size of the shard * @param startSize the starting size of the shard
* @param lst locations to iterate from * @param lst locations to iterate from
*/ */
LinearLocusShardStrategy(SAMSequenceDictionary dic, long startSize, GenomeLocSortedSet lst) { LinearLocusShardStrategy(SAMSequenceDictionary dic, long startSize, GenomeLocSortedSet lst, long limitByCount) {
super(dic, lst); super(dic, lst);
this.limitingFactor = limitByCount;
this.nextShardSize = startSize; this.nextShardSize = startSize;
} }

View File

@ -51,14 +51,18 @@ public abstract class LocusShardStrategy implements ShardStrategy {
/** our log, which we want to capture anything from this class */ /** our log, which we want to capture anything from this class */
private static Logger logger = Logger.getLogger(LocusShardStrategy.class); private static Logger logger = Logger.getLogger(LocusShardStrategy.class);
/** the number of iterations before we stop */
protected long limitingFactor = -1;
private boolean stopDueToLimitingFactor = false;
/** /**
* the constructor, taking a seq dictionary to parse out contigs * the constructor, taking a seq dictionary to parse out contigs
* *
* @param dic the seq dictionary * @param dic the seq dictionary
*/ */
LocusShardStrategy(SAMSequenceDictionary dic) { LocusShardStrategy( SAMSequenceDictionary dic ) {
this.dic = dic; this.dic = dic;
limitingFactor = -1;
mLoc = new GenomeLoc(0, 0, 0); mLoc = new GenomeLoc(0, 0, 0);
if (dic.getSequences().size() > 0) { if (dic.getSequences().size() > 0) {
nextContig = true; nextContig = true;
@ -70,12 +74,13 @@ public abstract class LocusShardStrategy implements ShardStrategy {
* *
* @param old the old strategy * @param old the old strategy
*/ */
LocusShardStrategy(LocusShardStrategy old) { LocusShardStrategy( LocusShardStrategy old ) {
this.dic = old.dic; this.dic = old.dic;
this.mLoc = old.mLoc; this.mLoc = old.mLoc;
this.seqLoc = old.seqLoc; this.seqLoc = old.seqLoc;
this.lastGenomeLocSize = old.lastGenomeLocSize; this.lastGenomeLocSize = old.lastGenomeLocSize;
this.nextContig = old.nextContig; this.nextContig = old.nextContig;
this.limitingFactor = old.limitingFactor;
} }
@ -85,7 +90,7 @@ public abstract class LocusShardStrategy implements ShardStrategy {
* @param dic the seq dictionary * @param dic the seq dictionary
* @param intervals file * @param intervals file
*/ */
LocusShardStrategy(SAMSequenceDictionary dic, GenomeLocSortedSet intervals) { LocusShardStrategy( SAMSequenceDictionary dic, GenomeLocSortedSet intervals ) {
this.dic = dic; this.dic = dic;
this.intervals = intervals.clone(); this.intervals = intervals.clone();
// set the starting point to the beginning interval // set the starting point to the beginning interval
@ -132,6 +137,15 @@ public abstract class LocusShardStrategy implements ShardStrategy {
long proposedSize = nextShardSize(); long proposedSize = nextShardSize();
long nextStart = mLoc.getStop() + 1; long nextStart = mLoc.getStop() + 1;
if (this.limitingFactor > 0) {
if (proposedSize < limitingFactor) {
limitingFactor = limitingFactor - proposedSize;
} else {
proposedSize = limitingFactor;
this.stopDueToLimitingFactor = true;
}
}
// if we don't have an interval set, use the non interval based approach. Simple, eh? // if we don't have an interval set, use the non interval based approach. Simple, eh?
if (this.intervals == null) { if (this.intervals == null) {
return nonIntervaledNext(length, proposedSize, nextStart); return nonIntervaledNext(length, proposedSize, nextStart);
@ -148,8 +162,8 @@ public abstract class LocusShardStrategy implements ShardStrategy {
* *
* @return the shard that represents this data * @return the shard that represents this data
*/ */
private Shard intervaledNext(long proposedSize) { private Shard intervaledNext( long proposedSize ) {
if ((this.intervals == null) || (intervals.isEmpty())) { if (( this.intervals == null ) || ( intervals.isEmpty() )) {
throw new StingException("LocusShardStrategy: genomic regions list is empty in next() function."); throw new StingException("LocusShardStrategy: genomic regions list is empty in next() function.");
} }
@ -175,12 +189,12 @@ public abstract class LocusShardStrategy implements ShardStrategy {
* *
* @return the shard to return to the user * @return the shard to return to the user
*/ */
private Shard nonIntervaledNext(long length, long proposedSize, long nextStart) { private Shard nonIntervaledNext( long length, long proposedSize, long nextStart ) {
// can we fit it into the current seq size? // can we fit it into the current seq size?
if (nextStart + proposedSize - 1 < length) { if (nextStart + proposedSize - 1 < length) {
lastGenomeLocSize = proposedSize; lastGenomeLocSize = proposedSize;
mLoc = new GenomeLoc(dic.getSequence(seqLoc).getSequenceIndex(), nextStart, nextStart + proposedSize - 1); mLoc = new GenomeLoc(dic.getSequence(seqLoc).getSequenceIndex(), nextStart, nextStart + proposedSize - 1);
return LocusShard.toShard(new GenomeLoc(dic.getSequence(seqLoc).getSequenceIndex(), nextStart, nextStart + proposedSize - 1)); return LocusShard.toShard(mLoc);
} }
// else we can't make it in the current location, we have to stitch one together // else we can't make it in the current location, we have to stitch one together
else { else {
@ -207,7 +221,7 @@ public abstract class LocusShardStrategy implements ShardStrategy {
private void jumpContig() { private void jumpContig() {
++seqLoc; ++seqLoc;
if (!(seqLoc < dic.getSequences().size())) { if (!( seqLoc < dic.getSequences().size() )) {
nextContig = false; nextContig = false;
return; return;
} }
@ -223,11 +237,14 @@ public abstract class LocusShardStrategy implements ShardStrategy {
* @return * @return
*/ */
public boolean hasNext() { public boolean hasNext() {
if (this.stopDueToLimitingFactor) {
return false;
}
// if we don't have an interval file, use the non interval based approach. // if we don't have an interval file, use the non interval based approach.
if (this.intervals == null) { if (this.intervals == null) {
return nextContig; return nextContig;
} else { } else {
return (this.intervals.size() > 0); return ( this.intervals.size() > 0 );
} }
} }

View File

@ -54,14 +54,19 @@ public class ReadShardStrategy implements ShardStrategy {
// our hasnext flag // our hasnext flag
boolean hasNext = true; boolean hasNext = true;
// our limiting factor
long limitedSize = -1;
boolean stopDueToLimitingFactor = false;
/** /**
* the default constructor * the default constructor
* @param dic the sequence dictionary to use * @param dic the sequence dictionary to use
* @param size the read count to iterate over * @param size the read count to iterate over
*/ */
ReadShardStrategy(SAMSequenceDictionary dic, long size) { ReadShardStrategy(SAMSequenceDictionary dic, long size, long limitedSize) {
this.dic = dic; this.dic = dic;
readCount = size; readCount = size;
this.limitedSize = limitedSize;
} }
/** /**
@ -69,10 +74,24 @@ public class ReadShardStrategy implements ShardStrategy {
* @return * @return
*/ */
public boolean hasNext() { public boolean hasNext() {
if (stopDueToLimitingFactor) {
return false;
}
return hasNext; return hasNext;
} }
public Shard next() { public Shard next() {
if (limitedSize > 0) {
if (limitedSize > readCount) {
limitedSize = limitedSize - readCount;
}
else {
readCount = limitedSize;
limitedSize = 0;
stopDueToLimitingFactor = true;
}
}
return new ReadShard((int)readCount, this); return new ReadShard((int)readCount, this);
} }

View File

@ -50,13 +50,27 @@ public class ShardStrategyFactory {
* @return * @return
*/ */
static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize) { static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize) {
return ShardStrategyFactory.shatter(strat, dic, startingSize, -1L);
}
/**
* get a new shatter strategy
*
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @return
*/
static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, long limitByCount) {
switch (strat) { switch (strat) {
case LINEAR: case LINEAR:
return new LinearLocusShardStrategy(dic, startingSize); return new LinearLocusShardStrategy(dic, startingSize, limitByCount);
case EXPONENTIAL: case EXPONENTIAL:
return new ExpGrowthLocusShardStrategy(dic, startingSize); return new ExpGrowthLocusShardStrategy(dic, startingSize, limitByCount);
case READS: case READS:
return new ReadShardStrategy(dic, startingSize); return new ReadShardStrategy(dic, startingSize, limitByCount);
case INTERVAL:
throw new StingException("Requested trategy: " + strat + " doesn't work with the limiting count (-M) command line option");
default: default:
throw new StingException("Strategy: " + strat + " isn't implemented for this type of shatter request"); throw new StingException("Strategy: " + strat + " isn't implemented for this type of shatter request");
} }
@ -73,11 +87,24 @@ public class ShardStrategyFactory {
* @return * @return
*/ */
static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst) { static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst) {
return ShardStrategyFactory.shatter(strat, dic, startingSize, lst, -1l);
}
/**
* get a new shatter strategy
*
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @return
*/
static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst, long limitDataCount) {
switch (strat) { switch (strat) {
case LINEAR: case LINEAR:
return new LinearLocusShardStrategy(dic, startingSize, lst); return new LinearLocusShardStrategy(dic, startingSize, lst, limitDataCount);
case EXPONENTIAL: case EXPONENTIAL:
return new ExpGrowthLocusShardStrategy(dic, startingSize, lst); return new ExpGrowthLocusShardStrategy(dic, startingSize, lst, limitDataCount);
case INTERVAL: case INTERVAL:
case READS: case READS:
return new IntervalShardStrategy(startingSize, lst); return new IntervalShardStrategy(startingSize, lst);

View File

@ -45,82 +45,69 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
*/ */
private static final int MAX_OUTSTANDING_OUTPUT_MERGES = 50; private static final int MAX_OUTSTANDING_OUTPUT_MERGES = 50;
/** /** Manage currently running threads. */
* Manage currently running threads.
*/
private ExecutorService threadPool; private ExecutorService threadPool;
private Queue<Shard> traverseTasks = new LinkedList<Shard>(); private Queue<Shard> traverseTasks = new LinkedList<Shard>();
private Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>(); private Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
private Queue<OutputMerger> outputMergeTasks = new LinkedList<OutputMerger>(); private Queue<OutputMerger> outputMergeTasks = new LinkedList<OutputMerger>();
/** /** How many total tasks were in the queue at the start of run. */
* How many total tasks were in the queue at the start of run. private int totalTraversals = 0;
*/
private int totalTraversals = 0;
/** /** How many shard traversals have run to date? */
* How many shard traversals have run to date?
*/
private int totalCompletedTraversals = 0; private int totalCompletedTraversals = 0;
/** /** What is the total time spent traversing shards? */
* What is the total time spent traversing shards?
*/
private long totalShardTraverseTime = 0; private long totalShardTraverseTime = 0;
/** /** What is the total time spent tree reducing shard output? */
* What is the total time spent tree reducing shard output?
*/
private long totalTreeReduceTime = 0; private long totalTreeReduceTime = 0;
/** /** How many tree reduces have been completed? */
* How many tree reduces have been completed?
*/
private long totalCompletedTreeReduces = 0; private long totalCompletedTreeReduces = 0;
/** /** What is the total time spent merging output? */
* What is the total time spent merging output?
*/
private long totalOutputMergeTime = 0; private long totalOutputMergeTime = 0;
/** /**
* Create a new hierarchical microscheduler to process the given reads and reference. * Create a new hierarchical microscheduler to process the given reads and reference.
* @param reads Reads file(s) to process. *
* @param refFile Reference for driving the traversal. * @param reads Reads file(s) to process.
* @param refFile Reference for driving the traversal.
* @param nThreadsToUse maximum number of threads to use to do the work * @param nThreadsToUse maximum number of threads to use to do the work
*/ */
protected HierarchicalMicroScheduler( Walker walker, Reads reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, int nThreadsToUse ) { protected HierarchicalMicroScheduler( Walker walker, Reads reads, File refFile, List<ReferenceOrderedData<? extends ReferenceOrderedDatum>> rods, int nThreadsToUse ) {
super( walker, reads, refFile, rods ); super(walker, reads, refFile, rods);
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
try { try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("org.broadinstitute.sting.gatk.executive:type=HierarchicalMicroScheduler"); ObjectName name = new ObjectName("org.broadinstitute.sting.gatk.executive:type=HierarchicalMicroScheduler");
mbs.registerMBean(this,name); mbs.registerMBean(this, name);
} }
catch( JMException ex ) { catch (JMException ex) {
throw new StingException("Unable to register microscheduler with JMX", ex); throw new StingException("Unable to register microscheduler with JMX", ex);
} }
} }
public Object execute( Walker walker, GenomeLocSortedSet intervals ) { public Object execute( Walker walker, GenomeLocSortedSet intervals, Integer maxIterations ) {
// Fast fail for walkers not supporting TreeReducible interface. // Fast fail for walkers not supporting TreeReducible interface.
if( !(walker instanceof TreeReducible) ) if (!( walker instanceof TreeReducible ))
throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers"); throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers");
ShardStrategy shardStrategy = getShardStrategy( walker, reference, intervals ); ShardStrategy shardStrategy = getShardStrategy(walker, reference, intervals, maxIterations);
ReduceTree reduceTree = new ReduceTree( this ); ReduceTree reduceTree = new ReduceTree(this);
walker.initialize(); walker.initialize();
for(Shard shard: shardStrategy) for (Shard shard : shardStrategy)
traverseTasks.add(shard); traverseTasks.add(shard);
totalTraversals = traverseTasks.size(); totalTraversals = traverseTasks.size();
while( isShardTraversePending() || isTreeReducePending() ) { while (isShardTraversePending() || isTreeReducePending()) {
// Too many files sitting around taking up space? Merge them. // Too many files sitting around taking up space? Merge them.
if( isMergeLimitExceeded() ) if (isMergeLimitExceeded())
mergeExistingOutput(); mergeExistingOutput();
// Wait for the next slot in the queue to become free. // Wait for the next slot in the queue to become free.
@ -128,10 +115,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
// Pick the next most appropriate task and run it. In the interest of // Pick the next most appropriate task and run it. In the interest of
// memory conservation, hierarchical reduces always run before traversals. // memory conservation, hierarchical reduces always run before traversals.
if( isTreeReduceReady() ) if (isTreeReduceReady())
queueNextTreeReduce( walker ); queueNextTreeReduce(walker);
else if( isShardTraversePending() ) else if (isShardTraversePending())
queueNextShardTraverse( walker, reduceTree ); queueNextShardTraverse(walker, reduceTree);
} }
// Merge any lingering output files. If these files aren't ready, // Merge any lingering output files. If these files aren't ready,
@ -144,10 +131,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
try { try {
result = reduceTree.getResult().get(); result = reduceTree.getResult().get();
} }
catch(Exception ex) { catch (Exception ex) {
throw new StingException("Unable to retrieve result", ex ); throw new StingException("Unable to retrieve result", ex);
} }
traversalEngine.printOnTraversalDone(result); traversalEngine.printOnTraversalDone(result);
walker.onTraversalDone(result); walker.onTraversalDone(result);
@ -156,6 +143,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** /**
* Returns true if there are unscheduled shard traversal waiting to run. * Returns true if there are unscheduled shard traversal waiting to run.
*
* @return true if a shard traversal is waiting; false otherwise. * @return true if a shard traversal is waiting; false otherwise.
*/ */
protected boolean isShardTraversePending() { protected boolean isShardTraversePending() {
@ -165,10 +153,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** /**
* Returns true if there are tree reduces that can be run without * Returns true if there are tree reduces that can be run without
* blocking. * blocking.
*
* @return true if a tree reduce is ready; false otherwise. * @return true if a tree reduce is ready; false otherwise.
*/ */
protected boolean isTreeReduceReady() { protected boolean isTreeReduceReady() {
if( reduceTasks.size() == 0 ) if (reduceTasks.size() == 0)
return false; return false;
return reduceTasks.peek().isReadyForReduce(); return reduceTasks.peek().isReadyForReduce();
} }
@ -177,6 +166,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
* Returns true if there are tree reduces that need to be run before * Returns true if there are tree reduces that need to be run before
* the computation is complete. Returns true if any entries are in the queue, * the computation is complete. Returns true if any entries are in the queue,
* blocked or otherwise. * blocked or otherwise.
*
* @return true if a tree reduce is pending; false otherwise. * @return true if a tree reduce is pending; false otherwise.
*/ */
protected boolean isTreeReducePending() { protected boolean isTreeReducePending() {
@ -186,17 +176,18 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** /**
* Returns whether the maximum number of files is sitting in the temp directory * Returns whether the maximum number of files is sitting in the temp directory
* waiting to be merged back in. * waiting to be merged back in.
*
* @return True if the merging needs to take priority. False otherwise. * @return True if the merging needs to take priority. False otherwise.
*/ */
protected boolean isMergeLimitExceeded() { protected boolean isMergeLimitExceeded() {
if( outputMergeTasks.size() < MAX_OUTSTANDING_OUTPUT_MERGES ) if (outputMergeTasks.size() < MAX_OUTSTANDING_OUTPUT_MERGES)
return false; return false;
// If any of the first MAX_OUTSTANDING merges aren't ready, the merge limit // If any of the first MAX_OUTSTANDING merges aren't ready, the merge limit
// has not been exceeded. // has not been exceeded.
OutputMerger[] outputMergers = outputMergeTasks.toArray( new OutputMerger[0] ); OutputMerger[] outputMergers = outputMergeTasks.toArray(new OutputMerger[0]);
for( int i = 0; i < MAX_OUTSTANDING_OUTPUT_MERGES; i++ ) { for (int i = 0; i < MAX_OUTSTANDING_OUTPUT_MERGES; i++) {
if( !outputMergers[i].isComplete() ) if (!outputMergers[i].isComplete())
return false; return false;
} }
@ -207,10 +198,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** /**
* Returns whether there is output waiting to be merged into the global output * Returns whether there is output waiting to be merged into the global output
* streams right now. * streams right now.
*
* @return True if this output is ready to be merged. False otherwise. * @return True if this output is ready to be merged. False otherwise.
*/ */
protected boolean isOutputMergeReady() { protected boolean isOutputMergeReady() {
if( outputMergeTasks.size() > 0 ) if (outputMergeTasks.size() > 0)
return outputMergeTasks.peek().isComplete(); return outputMergeTasks.peek().isComplete();
else else
return false; return false;
@ -224,105 +216,99 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker(); OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
while( isOutputMergeReady() ) while (isOutputMergeReady())
outputMergeTasks.remove().mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() ); outputMergeTasks.remove().mergeInto(outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream());
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
totalOutputMergeTime += (endTime - startTime); totalOutputMergeTime += ( endTime - startTime );
} }
/** /** Merge any output that hasn't yet been taken care of by the blocking thread. */
* Merge any output that hasn't yet been taken care of by the blocking thread.
*/
protected void mergeRemainingOutput() { protected void mergeRemainingOutput() {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker(); OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker();
while( outputMergeTasks.size() > 0 ) { while (outputMergeTasks.size() > 0) {
OutputMerger outputMerger = outputMergeTasks.remove(); OutputMerger outputMerger = outputMergeTasks.remove();
synchronized(outputMerger) { synchronized (outputMerger) {
if( !outputMerger.isComplete() ) if (!outputMerger.isComplete())
outputMerger.waitForOutputComplete(); outputMerger.waitForOutputComplete();
} }
outputMerger.mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() ); outputMerger.mergeInto(outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream());
} }
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
totalOutputMergeTime += (endTime - startTime); totalOutputMergeTime += ( endTime - startTime );
} }
/** /**
* Queues the next traversal of a walker from the traversal tasks queue. * Queues the next traversal of a walker from the traversal tasks queue.
* @param walker Walker to apply to the dataset. *
* @param walker Walker to apply to the dataset.
* @param reduceTree Tree of reduces to which to add this shard traverse. * @param reduceTree Tree of reduces to which to add this shard traverse.
*/ */
protected Future queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) { protected Future queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) {
if( traverseTasks.size() == 0 ) if (traverseTasks.size() == 0)
throw new IllegalStateException( "Cannot traverse; no pending traversals exist."); throw new IllegalStateException("Cannot traverse; no pending traversals exist.");
Shard shard = traverseTasks.remove(); Shard shard = traverseTasks.remove();
OutputMerger outputMerger = new OutputMerger(); OutputMerger outputMerger = new OutputMerger();
ShardTraverser traverser = new ShardTraverser( this, ShardTraverser traverser = new ShardTraverser(this,
getTraversalEngine(), getTraversalEngine(),
walker, walker,
shard, shard,
getShardDataProvider(shard), getShardDataProvider(shard),
outputMerger ); outputMerger);
Future traverseResult = threadPool.submit(traverser); Future traverseResult = threadPool.submit(traverser);
// Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue. // Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue.
reduceTree.addEntry( traverseResult ); reduceTree.addEntry(traverseResult);
// No more data? Let the reduce tree know so it can finish processing what it's got. // No more data? Let the reduce tree know so it can finish processing what it's got.
if( !isShardTraversePending() ) if (!isShardTraversePending())
reduceTree.complete(); reduceTree.complete();
outputMergeTasks.add(outputMerger); outputMergeTasks.add(outputMerger);
return traverseResult; return traverseResult;
} }
/** /** Pulls the next reduce from the queue and runs it. */
* Pulls the next reduce from the queue and runs it.
*/
protected void queueNextTreeReduce( Walker walker ) { protected void queueNextTreeReduce( Walker walker ) {
if( reduceTasks.size() == 0 ) if (reduceTasks.size() == 0)
throw new IllegalStateException( "Cannot reduce; no pending reduces exist."); throw new IllegalStateException("Cannot reduce; no pending reduces exist.");
TreeReduceTask reducer = reduceTasks.remove(); TreeReduceTask reducer = reduceTasks.remove();
reducer.setWalker( (TreeReducible)walker ); reducer.setWalker((TreeReducible) walker);
threadPool.submit( reducer ); threadPool.submit(reducer);
} }
/** /** Blocks until a free slot appears in the thread queue. */
* Blocks until a free slot appears in the thread queue.
*/
protected void waitForFreeQueueSlot() { protected void waitForFreeQueueSlot() {
ThreadPoolMonitor monitor = new ThreadPoolMonitor(); ThreadPoolMonitor monitor = new ThreadPoolMonitor();
synchronized(monitor) { synchronized (monitor) {
threadPool.submit( monitor ); threadPool.submit(monitor);
monitor.watch(); monitor.watch();
} }
} }
/** /**
* Callback for adding reduce tasks to the run queue. * Callback for adding reduce tasks to the run queue.
*
* @return A new, composite future of the result of this reduce. * @return A new, composite future of the result of this reduce.
*/ */
public Future notifyReduce( Future lhs, Future rhs ) { public Future notifyReduce( Future lhs, Future rhs ) {
TreeReduceTask reducer = new TreeReduceTask( new TreeReducer( this, lhs, rhs ) ); TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs));
reduceTasks.add(reducer); reduceTasks.add(reducer);
return reducer; return reducer;
} }
/** /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */
* A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics.
*/
private class TreeReduceTask extends FutureTask { private class TreeReduceTask extends FutureTask {
private TreeReducer treeReducer = null; private TreeReducer treeReducer = null;
@ -332,7 +318,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
} }
public void setWalker( TreeReducible walker ) { public void setWalker( TreeReducible walker ) {
treeReducer.setWalker( walker ); treeReducer.setWalker(walker);
} }
public boolean isReadyForReduce() { public boolean isReadyForReduce() {
@ -342,6 +328,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** /**
* Used by the ShardTraverser to report time consumed traversing a given shard. * Used by the ShardTraverser to report time consumed traversing a given shard.
*
* @param shardTraversalTime Elapsed time traversing a given shard. * @param shardTraversalTime Elapsed time traversing a given shard.
*/ */
synchronized void reportShardTraverseTime( long shardTraversalTime ) { synchronized void reportShardTraverseTime( long shardTraversalTime ) {
@ -351,6 +338,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** /**
* Used by the TreeReducer to report time consumed reducing two shards. * Used by the TreeReducer to report time consumed reducing two shards.
*
* @param treeReduceTime Elapsed time reducing two shards. * @param treeReduceTime Elapsed time reducing two shards.
*/ */
synchronized void reportTreeReduceTime( long treeReduceTime ) { synchronized void reportTreeReduceTime( long treeReduceTime ) {
@ -359,69 +347,51 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public int getTotalNumberOfShards() { public int getTotalNumberOfShards() {
return totalTraversals; return totalTraversals;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public int getRemainingNumberOfShards() { public int getRemainingNumberOfShards() {
return traverseTasks.size(); return traverseTasks.size();
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public int getNumberOfTasksInReduceQueue() { public int getNumberOfTasksInReduceQueue() {
return reduceTasks.size(); return reduceTasks.size();
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public int getNumberOfTasksInIOQueue() { public int getNumberOfTasksInIOQueue() {
return outputMergeTasks.size(); return outputMergeTasks.size();
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public long getTotalShardTraverseTimeMillis() { public long getTotalShardTraverseTimeMillis() {
return totalShardTraverseTime; return totalShardTraverseTime;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public long getAvgShardTraverseTimeMillis() { public long getAvgShardTraverseTimeMillis() {
if( totalCompletedTraversals == 0 ) if (totalCompletedTraversals == 0)
return 0; return 0;
return totalShardTraverseTime / totalCompletedTraversals; return totalShardTraverseTime / totalCompletedTraversals;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public long getTotalTreeReduceTimeMillis() { public long getTotalTreeReduceTimeMillis() {
return totalTreeReduceTime; return totalTreeReduceTime;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public long getAvgTreeReduceTimeMillis() { public long getAvgTreeReduceTimeMillis() {
if( totalCompletedTreeReduces == 0 ) if (totalCompletedTreeReduces == 0)
return 0; return 0;
return totalTreeReduceTime / totalCompletedTreeReduces; return totalTreeReduceTime / totalCompletedTreeReduces;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
public long getTotalOutputMergeTimeMillis() { public long getTotalOutputMergeTimeMillis() {
return totalOutputMergeTime; return totalOutputMergeTime;
} }

View File

@ -31,9 +31,10 @@ public class LinearMicroScheduler extends MicroScheduler {
* *
* @param walker Computation to perform over dataset. * @param walker Computation to perform over dataset.
* @param locations Subset of the dataset over which to walk. * @param locations Subset of the dataset over which to walk.
* @param maxIterations the maximum number of iterations we're to perform
*/ */
public Object execute(Walker walker, GenomeLocSortedSet locations) { public Object execute(Walker walker, GenomeLocSortedSet locations, Integer maxIterations) {
ShardStrategy shardStrategy = getShardStrategy(walker, reference, locations); ShardStrategy shardStrategy = getShardStrategy(walker, reference, locations, maxIterations);
walker.initialize(); walker.initialize();
Accumulator accumulator = Accumulator.create(walker); Accumulator accumulator = Accumulator.create(walker);

View File

@ -99,9 +99,10 @@ public abstract class MicroScheduler {
* Walks a walker over the given list of intervals. * Walks a walker over the given list of intervals.
* @param walker Computation to perform over dataset. * @param walker Computation to perform over dataset.
* @param intervals A list of intervals over which to walk. Null for whole dataset. * @param intervals A list of intervals over which to walk. Null for whole dataset.
* @param maxIterations the maximum number of iterations we're to perform
* @return the return type of the walker * @return the return type of the walker
*/ */
public abstract Object execute(Walker walker, GenomeLocSortedSet intervals); public abstract Object execute(Walker walker, GenomeLocSortedSet intervals, Integer maxIterations);
/** /**
* Get the sharding strategy given a driving data source. * Get the sharding strategy given a driving data source.
@ -110,7 +111,10 @@ public abstract class MicroScheduler {
* @param intervals Intervals to use when limiting sharding. * @param intervals Intervals to use when limiting sharding.
* @return Sharding strategy for this driving data source. * @return Sharding strategy for this driving data source.
*/ */
protected ShardStrategy getShardStrategy(Walker walker, ReferenceSequenceFile drivingDataSource, GenomeLocSortedSet intervals) { protected ShardStrategy getShardStrategy(Walker walker,
ReferenceSequenceFile drivingDataSource,
GenomeLocSortedSet intervals,
Integer maxIterations) {
ShardStrategy shardStrategy = null; ShardStrategy shardStrategy = null;
ShardStrategyFactory.SHATTER_STRATEGY shardType; ShardStrategyFactory.SHATTER_STRATEGY shardType;
if (walker instanceof LocusWalker) { if (walker instanceof LocusWalker) {
@ -122,11 +126,11 @@ public abstract class MicroScheduler {
shardStrategy = ShardStrategyFactory.shatter(shardType, shardStrategy = ShardStrategyFactory.shatter(shardType,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, SHARD_SIZE,
intervals); intervals, maxIterations);
} else } else
shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE); SHARD_SIZE, maxIterations);
} else if (walker instanceof ReadWalker) { } else if (walker instanceof ReadWalker) {
@ -136,11 +140,11 @@ public abstract class MicroScheduler {
shardStrategy = ShardStrategyFactory.shatter(shardType, shardStrategy = ShardStrategyFactory.shatter(shardType,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, SHARD_SIZE,
intervals); intervals, maxIterations);
} else { } else {
shardStrategy = ShardStrategyFactory.shatter(shardType, shardStrategy = ShardStrategyFactory.shatter(shardType,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE); SHARD_SIZE, maxIterations);
} }
} else } else
throw new StingException("Unable to support walker of type" + walker.getClass().getName()); throw new StingException("Unable to support walker of type" + walker.getClass().getName());

View File

@ -73,7 +73,7 @@ public class GATKArgumentCollectionTest extends BaseTest {
List<File> input = new ArrayList<File>(); List<File> input = new ArrayList<File>();
input.add(new File("test.file")); input.add(new File("test.file"));
collect.samFiles = input; collect.samFiles = input;
collect.maximumReads = "-1"; collect.maximumEngineIterations = -1;
collect.strictnessLevel = "strict"; collect.strictnessLevel = "strict";
collect.referenceFile = new File("referenceFile".toLowerCase()); collect.referenceFile = new File("referenceFile".toLowerCase());
collect.analysisName = "analysisName".toLowerCase(); collect.analysisName = "analysisName".toLowerCase();

View File

@ -58,7 +58,7 @@ public class LinearLocusShardStrategyTest extends BaseTest {
@Test @Test
public void testSetup() { public void testSetup() {
LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 500); LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 500, -1);
int counter = 0; int counter = 0;
while(strat.hasNext()) { while(strat.hasNext()) {
Shard d = strat.next(); Shard d = strat.next();
@ -71,7 +71,7 @@ public class LinearLocusShardStrategyTest extends BaseTest {
@Test @Test
public void testAdjustSize() { public void testAdjustSize() {
LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 500); LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 500, -1);
strat.adjustNextShardSize(1000); strat.adjustNextShardSize(1000);
int counter = 0; int counter = 0;
while(strat.hasNext()) { while(strat.hasNext()) {
@ -86,7 +86,7 @@ public class LinearLocusShardStrategyTest extends BaseTest {
@Test @Test
public void testUnevenSplit() { public void testUnevenSplit() {
LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 600); LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 600, -1);
int counter = 0; int counter = 0;
while(strat.hasNext()) { while(strat.hasNext()) {
Shard d = strat.next(); Shard d = strat.next();
@ -100,4 +100,19 @@ public class LinearLocusShardStrategyTest extends BaseTest {
} }
assertTrue(counter == 10); assertTrue(counter == 10);
} }
@Test
public void testDashMOption() {
LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 600, 200);
int counter = 0;
while(strat.hasNext()) {
Shard d = strat.next();
assertTrue(d instanceof LocusShard);
assertTrue((d.getGenomeLoc().getStop() - d.getGenomeLoc().getStart()) == 199);
++counter;
}
assertTrue(counter == 1);
}
} }

View File

@ -129,7 +129,7 @@ public class SAMBAMDataSourceTest extends BaseTest {
// setup the test files // setup the test files
fl.add(new File(seqLocation + "/dirseq/analysis/cancer_exome/twoflowcell_sams/TCGA-06-0188.aligned.duplicates_marked.bam")); fl.add(new File(seqLocation + "/dirseq/analysis/cancer_exome/twoflowcell_sams/TCGA-06-0188.aligned.duplicates_marked.bam"));
Reads reads = new Reads(fl); Reads reads = new Reads(fl);
ArrayList<Integer> readcountPerShard = new ArrayList<Integer>(); ArrayList<Integer> readcountPerShard = new ArrayList<Integer>();
ArrayList<Integer> readcountPerShard2 = new ArrayList<Integer>(); ArrayList<Integer> readcountPerShard2 = new ArrayList<Integer>();