From a8a2d0eab9a3e80f7879515e765ac581ecfcf328 Mon Sep 17 00:00:00 2001 From: aaron Date: Mon, 8 Jun 2009 15:12:24 +0000 Subject: [PATCH] added support for the -M option in traversals. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@935 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/gatk/GATKArgumentCollection.java | 6 +- .../sting/gatk/GenomeAnalysisEngine.java | 4 +- .../shards/ExpGrowthLocusShardStrategy.java | 6 +- .../shards/LinearLocusShardStrategy.java | 6 +- .../shards/LocusShardStrategy.java | 35 ++- .../dataSources/shards/ReadShardStrategy.java | 23 +- .../shards/ShardStrategyFactory.java | 37 +++- .../executive/HierarchicalMicroScheduler.java | 208 ++++++++---------- .../gatk/executive/LinearMicroScheduler.java | 5 +- .../sting/gatk/executive/MicroScheduler.java | 16 +- .../gatk/GATKArgumentCollectionTest.java | 2 +- .../shards/LinearLocusShardStrategyTest.java | 21 +- .../SAMBAMDataSourceTest.java | 2 +- 13 files changed, 214 insertions(+), 157 deletions(-) diff --git a/java/src/org/broadinstitute/sting/gatk/GATKArgumentCollection.java b/java/src/org/broadinstitute/sting/gatk/GATKArgumentCollection.java index c87ae5828..90091e829 100755 --- a/java/src/org/broadinstitute/sting/gatk/GATKArgumentCollection.java +++ b/java/src/org/broadinstitute/sting/gatk/GATKArgumentCollection.java @@ -105,8 +105,8 @@ public class GATKArgumentCollection { public Boolean walkAllLoci = false; @Element(required=false) - @Argument(fullName = "maximum_reads", shortName = "M", doc = "Maximum number of reads to process before exiting", required = false) - public String maximumReads = "-1"; + @Argument(fullName = "maximum_reads", shortName = "M", doc = "Maximum number of iterations to process before exiting, the lower bound is zero. Intended only for testing", required = false) + public Integer maximumEngineIterations = -1; @Element(required=false) @Argument(fullName = "sort_on_the_fly", shortName = "sort", doc = "Maximum number of reads to sort on the fly", required = false) @@ -215,7 +215,7 @@ public class GATKArgumentCollection { if (!other.samFiles.equals(this.samFiles)) { return false; } - if (!other.maximumReads.equals(this.maximumReads)) { + if (!other.maximumEngineIterations.equals(this.maximumEngineIterations)) { return false; } if (!other.strictnessLevel.equals(this.strictnessLevel)) { diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index 5c10ad654..089f963d4 100755 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -122,7 +122,7 @@ public class GenomeAnalysisEngine { locs = GenomeLocSortedSet.createSetFromList(locationsList); // excute the microscheduler, storing the results - walkerReturn = microScheduler.execute(my_walker, locs); + walkerReturn = microScheduler.execute(my_walker, locs, argCollection.maximumEngineIterations); } @@ -199,7 +199,7 @@ public class GenomeAnalysisEngine { private void genericEngineSetup(ValidationStringency strictness) { engine.setStrictness(strictness); - engine.setMaxReads(Integer.parseInt(argCollection.maximumReads)); + engine.setMaxReads(argCollection.maximumEngineIterations); engine.setFilterZeroMappingQualityReads(argCollection.filterZeroMappingQualityReads); // we default interval files over the genome region string diff --git a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ExpGrowthLocusShardStrategy.java b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ExpGrowthLocusShardStrategy.java index f80142ce5..611659181 100755 --- a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ExpGrowthLocusShardStrategy.java +++ b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ExpGrowthLocusShardStrategy.java @@ -52,8 +52,9 @@ public class ExpGrowthLocusShardStrategy extends LocusShardStrategy { * * @param dic the seq dictionary */ - ExpGrowthLocusShardStrategy(SAMSequenceDictionary dic, long startSize) { + ExpGrowthLocusShardStrategy(SAMSequenceDictionary dic, long startSize, long limitByCount) { super(dic); + this.limitingFactor = limitByCount; this.baseSize = startSize; currentExp = 0; } @@ -76,8 +77,9 @@ public class ExpGrowthLocusShardStrategy extends LocusShardStrategy { * @param startSize the starting size of the shard * @param lst locations to iterate from */ - ExpGrowthLocusShardStrategy(SAMSequenceDictionary dic, long startSize, GenomeLocSortedSet lst) { + ExpGrowthLocusShardStrategy(SAMSequenceDictionary dic, long startSize, GenomeLocSortedSet lst, long limitByCount) { super(dic, lst); + this.limitingFactor = limitByCount; this.baseSize = startSize; this.currentExp = 0; } diff --git a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/LinearLocusShardStrategy.java b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/LinearLocusShardStrategy.java index a11791439..f84c89d8b 100755 --- a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/LinearLocusShardStrategy.java +++ b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/LinearLocusShardStrategy.java @@ -51,8 +51,9 @@ class LinearLocusShardStrategy extends LocusShardStrategy { * * @param dic the seq dictionary */ - LinearLocusShardStrategy(SAMSequenceDictionary dic, long startSize) { + LinearLocusShardStrategy(SAMSequenceDictionary dic, long startSize, long limitByCount) { super(dic); + this.limitingFactor = limitByCount; this.nextShardSize = startSize; } @@ -73,8 +74,9 @@ class LinearLocusShardStrategy extends LocusShardStrategy { * @param startSize the starting size of the shard * @param lst locations to iterate from */ - LinearLocusShardStrategy(SAMSequenceDictionary dic, long startSize, GenomeLocSortedSet lst) { + LinearLocusShardStrategy(SAMSequenceDictionary dic, long startSize, GenomeLocSortedSet lst, long limitByCount) { super(dic, lst); + this.limitingFactor = limitByCount; this.nextShardSize = startSize; } diff --git a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/LocusShardStrategy.java b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/LocusShardStrategy.java index c663da43e..851ad172b 100755 --- a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/LocusShardStrategy.java +++ b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/LocusShardStrategy.java @@ -51,14 +51,18 @@ public abstract class LocusShardStrategy implements ShardStrategy { /** our log, which we want to capture anything from this class */ private static Logger logger = Logger.getLogger(LocusShardStrategy.class); + /** the number of iterations before we stop */ + protected long limitingFactor = -1; + private boolean stopDueToLimitingFactor = false; /** * the constructor, taking a seq dictionary to parse out contigs * * @param dic the seq dictionary */ - LocusShardStrategy(SAMSequenceDictionary dic) { + LocusShardStrategy( SAMSequenceDictionary dic ) { this.dic = dic; + limitingFactor = -1; mLoc = new GenomeLoc(0, 0, 0); if (dic.getSequences().size() > 0) { nextContig = true; @@ -70,12 +74,13 @@ public abstract class LocusShardStrategy implements ShardStrategy { * * @param old the old strategy */ - LocusShardStrategy(LocusShardStrategy old) { + LocusShardStrategy( LocusShardStrategy old ) { this.dic = old.dic; this.mLoc = old.mLoc; this.seqLoc = old.seqLoc; this.lastGenomeLocSize = old.lastGenomeLocSize; this.nextContig = old.nextContig; + this.limitingFactor = old.limitingFactor; } @@ -85,7 +90,7 @@ public abstract class LocusShardStrategy implements ShardStrategy { * @param dic the seq dictionary * @param intervals file */ - LocusShardStrategy(SAMSequenceDictionary dic, GenomeLocSortedSet intervals) { + LocusShardStrategy( SAMSequenceDictionary dic, GenomeLocSortedSet intervals ) { this.dic = dic; this.intervals = intervals.clone(); // set the starting point to the beginning interval @@ -132,6 +137,15 @@ public abstract class LocusShardStrategy implements ShardStrategy { long proposedSize = nextShardSize(); long nextStart = mLoc.getStop() + 1; + if (this.limitingFactor > 0) { + if (proposedSize < limitingFactor) { + limitingFactor = limitingFactor - proposedSize; + } else { + proposedSize = limitingFactor; + this.stopDueToLimitingFactor = true; + } + } + // if we don't have an interval set, use the non interval based approach. Simple, eh? if (this.intervals == null) { return nonIntervaledNext(length, proposedSize, nextStart); @@ -148,8 +162,8 @@ public abstract class LocusShardStrategy implements ShardStrategy { * * @return the shard that represents this data */ - private Shard intervaledNext(long proposedSize) { - if ((this.intervals == null) || (intervals.isEmpty())) { + private Shard intervaledNext( long proposedSize ) { + if (( this.intervals == null ) || ( intervals.isEmpty() )) { throw new StingException("LocusShardStrategy: genomic regions list is empty in next() function."); } @@ -175,12 +189,12 @@ public abstract class LocusShardStrategy implements ShardStrategy { * * @return the shard to return to the user */ - private Shard nonIntervaledNext(long length, long proposedSize, long nextStart) { + private Shard nonIntervaledNext( long length, long proposedSize, long nextStart ) { // can we fit it into the current seq size? if (nextStart + proposedSize - 1 < length) { lastGenomeLocSize = proposedSize; mLoc = new GenomeLoc(dic.getSequence(seqLoc).getSequenceIndex(), nextStart, nextStart + proposedSize - 1); - return LocusShard.toShard(new GenomeLoc(dic.getSequence(seqLoc).getSequenceIndex(), nextStart, nextStart + proposedSize - 1)); + return LocusShard.toShard(mLoc); } // else we can't make it in the current location, we have to stitch one together else { @@ -207,7 +221,7 @@ public abstract class LocusShardStrategy implements ShardStrategy { private void jumpContig() { ++seqLoc; - if (!(seqLoc < dic.getSequences().size())) { + if (!( seqLoc < dic.getSequences().size() )) { nextContig = false; return; } @@ -223,11 +237,14 @@ public abstract class LocusShardStrategy implements ShardStrategy { * @return */ public boolean hasNext() { + if (this.stopDueToLimitingFactor) { + return false; + } // if we don't have an interval file, use the non interval based approach. if (this.intervals == null) { return nextContig; } else { - return (this.intervals.size() > 0); + return ( this.intervals.size() > 0 ); } } diff --git a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ReadShardStrategy.java b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ReadShardStrategy.java index ffa2f2685..b29e26c8e 100755 --- a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ReadShardStrategy.java +++ b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ReadShardStrategy.java @@ -54,14 +54,19 @@ public class ReadShardStrategy implements ShardStrategy { // our hasnext flag boolean hasNext = true; + // our limiting factor + long limitedSize = -1; + boolean stopDueToLimitingFactor = false; + /** * the default constructor * @param dic the sequence dictionary to use * @param size the read count to iterate over */ - ReadShardStrategy(SAMSequenceDictionary dic, long size) { + ReadShardStrategy(SAMSequenceDictionary dic, long size, long limitedSize) { this.dic = dic; - readCount = size; + readCount = size; + this.limitedSize = limitedSize; } /** @@ -69,10 +74,24 @@ public class ReadShardStrategy implements ShardStrategy { * @return */ public boolean hasNext() { + if (stopDueToLimitingFactor) { + return false; + } return hasNext; } public Shard next() { + if (limitedSize > 0) { + if (limitedSize > readCount) { + limitedSize = limitedSize - readCount; + } + else { + readCount = limitedSize; + limitedSize = 0; + stopDueToLimitingFactor = true; + } + } + return new ReadShard((int)readCount, this); } diff --git a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ShardStrategyFactory.java b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ShardStrategyFactory.java index f9b4b3364..5fadafb0e 100644 --- a/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ShardStrategyFactory.java +++ b/java/src/org/broadinstitute/sting/gatk/dataSources/shards/ShardStrategyFactory.java @@ -50,13 +50,27 @@ public class ShardStrategyFactory { * @return */ static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize) { + return ShardStrategyFactory.shatter(strat, dic, startingSize, -1L); + } + + /** + * get a new shatter strategy + * + * @param strat what's our strategy - SHATTER_STRATEGY type + * @param dic the seq dictionary + * @param startingSize the starting size + * @return + */ + static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, long limitByCount) { switch (strat) { case LINEAR: - return new LinearLocusShardStrategy(dic, startingSize); + return new LinearLocusShardStrategy(dic, startingSize, limitByCount); case EXPONENTIAL: - return new ExpGrowthLocusShardStrategy(dic, startingSize); + return new ExpGrowthLocusShardStrategy(dic, startingSize, limitByCount); case READS: - return new ReadShardStrategy(dic, startingSize); + return new ReadShardStrategy(dic, startingSize, limitByCount); + case INTERVAL: + throw new StingException("Requested trategy: " + strat + " doesn't work with the limiting count (-M) command line option"); default: throw new StingException("Strategy: " + strat + " isn't implemented for this type of shatter request"); } @@ -73,11 +87,24 @@ public class ShardStrategyFactory { * @return */ static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst) { + return ShardStrategyFactory.shatter(strat, dic, startingSize, lst, -1l); + + } + + /** + * get a new shatter strategy + * + * @param strat what's our strategy - SHATTER_STRATEGY type + * @param dic the seq dictionary + * @param startingSize the starting size + * @return + */ + static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst, long limitDataCount) { switch (strat) { case LINEAR: - return new LinearLocusShardStrategy(dic, startingSize, lst); + return new LinearLocusShardStrategy(dic, startingSize, lst, limitDataCount); case EXPONENTIAL: - return new ExpGrowthLocusShardStrategy(dic, startingSize, lst); + return new ExpGrowthLocusShardStrategy(dic, startingSize, lst, limitDataCount); case INTERVAL: case READS: return new IntervalShardStrategy(startingSize, lst); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 7338e6369..c2ad28011 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -45,82 +45,69 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar */ private static final int MAX_OUTSTANDING_OUTPUT_MERGES = 50; - /** - * Manage currently running threads. - */ + /** Manage currently running threads. */ private ExecutorService threadPool; private Queue traverseTasks = new LinkedList(); private Queue reduceTasks = new LinkedList(); private Queue outputMergeTasks = new LinkedList(); - /** - * How many total tasks were in the queue at the start of run. - */ - private int totalTraversals = 0; + /** How many total tasks were in the queue at the start of run. */ + private int totalTraversals = 0; - /** - * How many shard traversals have run to date? - */ + /** How many shard traversals have run to date? */ private int totalCompletedTraversals = 0; - /** - * What is the total time spent traversing shards? - */ + /** What is the total time spent traversing shards? */ private long totalShardTraverseTime = 0; - /** - * What is the total time spent tree reducing shard output? - */ + /** What is the total time spent tree reducing shard output? */ private long totalTreeReduceTime = 0; - /** - * How many tree reduces have been completed? - */ + /** How many tree reduces have been completed? */ private long totalCompletedTreeReduces = 0; - /** - * What is the total time spent merging output? - */ + /** What is the total time spent merging output? */ private long totalOutputMergeTime = 0; /** * Create a new hierarchical microscheduler to process the given reads and reference. - * @param reads Reads file(s) to process. - * @param refFile Reference for driving the traversal. + * + * @param reads Reads file(s) to process. + * @param refFile Reference for driving the traversal. * @param nThreadsToUse maximum number of threads to use to do the work */ protected HierarchicalMicroScheduler( Walker walker, Reads reads, File refFile, List> rods, int nThreadsToUse ) { - super( walker, reads, refFile, rods ); + super(walker, reads, refFile, rods); this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); try { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName name = new ObjectName("org.broadinstitute.sting.gatk.executive:type=HierarchicalMicroScheduler"); - mbs.registerMBean(this,name); + mbs.registerMBean(this, name); } - catch( JMException ex ) { + catch (JMException ex) { throw new StingException("Unable to register microscheduler with JMX", ex); } } - public Object execute( Walker walker, GenomeLocSortedSet intervals ) { + public Object execute( Walker walker, GenomeLocSortedSet intervals, Integer maxIterations ) { // Fast fail for walkers not supporting TreeReducible interface. - if( !(walker instanceof TreeReducible) ) + if (!( walker instanceof TreeReducible )) throw new IllegalArgumentException("Hierarchical microscheduler only works with TreeReducible walkers"); - ShardStrategy shardStrategy = getShardStrategy( walker, reference, intervals ); - ReduceTree reduceTree = new ReduceTree( this ); + ShardStrategy shardStrategy = getShardStrategy(walker, reference, intervals, maxIterations); + ReduceTree reduceTree = new ReduceTree(this); walker.initialize(); - - for(Shard shard: shardStrategy) + + for (Shard shard : shardStrategy) traverseTasks.add(shard); totalTraversals = traverseTasks.size(); - while( isShardTraversePending() || isTreeReducePending() ) { + while (isShardTraversePending() || isTreeReducePending()) { // Too many files sitting around taking up space? Merge them. - if( isMergeLimitExceeded() ) + if (isMergeLimitExceeded()) mergeExistingOutput(); // Wait for the next slot in the queue to become free. @@ -128,10 +115,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar // Pick the next most appropriate task and run it. In the interest of // memory conservation, hierarchical reduces always run before traversals. - if( isTreeReduceReady() ) - queueNextTreeReduce( walker ); - else if( isShardTraversePending() ) - queueNextShardTraverse( walker, reduceTree ); + if (isTreeReduceReady()) + queueNextTreeReduce(walker); + else if (isShardTraversePending()) + queueNextShardTraverse(walker, reduceTree); } // Merge any lingering output files. If these files aren't ready, @@ -144,10 +131,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar try { result = reduceTree.getResult().get(); } - catch(Exception ex) { - throw new StingException("Unable to retrieve result", ex ); + catch (Exception ex) { + throw new StingException("Unable to retrieve result", ex); } - + traversalEngine.printOnTraversalDone(result); walker.onTraversalDone(result); @@ -156,6 +143,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Returns true if there are unscheduled shard traversal waiting to run. + * * @return true if a shard traversal is waiting; false otherwise. */ protected boolean isShardTraversePending() { @@ -165,10 +153,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Returns true if there are tree reduces that can be run without * blocking. + * * @return true if a tree reduce is ready; false otherwise. */ protected boolean isTreeReduceReady() { - if( reduceTasks.size() == 0 ) + if (reduceTasks.size() == 0) return false; return reduceTasks.peek().isReadyForReduce(); } @@ -177,6 +166,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar * Returns true if there are tree reduces that need to be run before * the computation is complete. Returns true if any entries are in the queue, * blocked or otherwise. + * * @return true if a tree reduce is pending; false otherwise. */ protected boolean isTreeReducePending() { @@ -186,17 +176,18 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Returns whether the maximum number of files is sitting in the temp directory * waiting to be merged back in. + * * @return True if the merging needs to take priority. False otherwise. */ protected boolean isMergeLimitExceeded() { - if( outputMergeTasks.size() < MAX_OUTSTANDING_OUTPUT_MERGES ) + if (outputMergeTasks.size() < MAX_OUTSTANDING_OUTPUT_MERGES) return false; // If any of the first MAX_OUTSTANDING merges aren't ready, the merge limit // has not been exceeded. - OutputMerger[] outputMergers = outputMergeTasks.toArray( new OutputMerger[0] ); - for( int i = 0; i < MAX_OUTSTANDING_OUTPUT_MERGES; i++ ) { - if( !outputMergers[i].isComplete() ) + OutputMerger[] outputMergers = outputMergeTasks.toArray(new OutputMerger[0]); + for (int i = 0; i < MAX_OUTSTANDING_OUTPUT_MERGES; i++) { + if (!outputMergers[i].isComplete()) return false; } @@ -207,10 +198,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Returns whether there is output waiting to be merged into the global output * streams right now. + * * @return True if this output is ready to be merged. False otherwise. */ protected boolean isOutputMergeReady() { - if( outputMergeTasks.size() > 0 ) + if (outputMergeTasks.size() > 0) return outputMergeTasks.peek().isComplete(); else return false; @@ -224,105 +216,99 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar long startTime = System.currentTimeMillis(); OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker(); - while( isOutputMergeReady() ) - outputMergeTasks.remove().mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() ); + while (isOutputMergeReady()) + outputMergeTasks.remove().mergeInto(outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream()); long endTime = System.currentTimeMillis(); - totalOutputMergeTime += (endTime - startTime); + totalOutputMergeTime += ( endTime - startTime ); } - /** - * Merge any output that hasn't yet been taken care of by the blocking thread. - */ + /** Merge any output that hasn't yet been taken care of by the blocking thread. */ protected void mergeRemainingOutput() { long startTime = System.currentTimeMillis(); OutputTracker outputTracker = GenomeAnalysisEngine.instance.getOutputTracker(); - while( outputMergeTasks.size() > 0 ) { + while (outputMergeTasks.size() > 0) { OutputMerger outputMerger = outputMergeTasks.remove(); - synchronized(outputMerger) { - if( !outputMerger.isComplete() ) + synchronized (outputMerger) { + if (!outputMerger.isComplete()) outputMerger.waitForOutputComplete(); } - outputMerger.mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() ); + outputMerger.mergeInto(outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream()); } long endTime = System.currentTimeMillis(); - totalOutputMergeTime += (endTime - startTime); + totalOutputMergeTime += ( endTime - startTime ); } /** * Queues the next traversal of a walker from the traversal tasks queue. - * @param walker Walker to apply to the dataset. + * + * @param walker Walker to apply to the dataset. * @param reduceTree Tree of reduces to which to add this shard traverse. */ protected Future queueNextShardTraverse( Walker walker, ReduceTree reduceTree ) { - if( traverseTasks.size() == 0 ) - throw new IllegalStateException( "Cannot traverse; no pending traversals exist."); + if (traverseTasks.size() == 0) + throw new IllegalStateException("Cannot traverse; no pending traversals exist."); Shard shard = traverseTasks.remove(); OutputMerger outputMerger = new OutputMerger(); - ShardTraverser traverser = new ShardTraverser( this, - getTraversalEngine(), - walker, - shard, - getShardDataProvider(shard), - outputMerger ); + ShardTraverser traverser = new ShardTraverser(this, + getTraversalEngine(), + walker, + shard, + getShardDataProvider(shard), + outputMerger); Future traverseResult = threadPool.submit(traverser); // Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue. - reduceTree.addEntry( traverseResult ); + reduceTree.addEntry(traverseResult); // No more data? Let the reduce tree know so it can finish processing what it's got. - if( !isShardTraversePending() ) + if (!isShardTraversePending()) reduceTree.complete(); - outputMergeTasks.add(outputMerger); + outputMergeTasks.add(outputMerger); return traverseResult; } - /** - * Pulls the next reduce from the queue and runs it. - */ + /** Pulls the next reduce from the queue and runs it. */ protected void queueNextTreeReduce( Walker walker ) { - if( reduceTasks.size() == 0 ) - throw new IllegalStateException( "Cannot reduce; no pending reduces exist."); + if (reduceTasks.size() == 0) + throw new IllegalStateException("Cannot reduce; no pending reduces exist."); TreeReduceTask reducer = reduceTasks.remove(); - reducer.setWalker( (TreeReducible)walker ); + reducer.setWalker((TreeReducible) walker); - threadPool.submit( reducer ); + threadPool.submit(reducer); } - /** - * Blocks until a free slot appears in the thread queue. - */ + /** Blocks until a free slot appears in the thread queue. */ protected void waitForFreeQueueSlot() { ThreadPoolMonitor monitor = new ThreadPoolMonitor(); - synchronized(monitor) { - threadPool.submit( monitor ); + synchronized (monitor) { + threadPool.submit(monitor); monitor.watch(); } } /** * Callback for adding reduce tasks to the run queue. + * * @return A new, composite future of the result of this reduce. */ public Future notifyReduce( Future lhs, Future rhs ) { - TreeReduceTask reducer = new TreeReduceTask( new TreeReducer( this, lhs, rhs ) ); + TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs)); reduceTasks.add(reducer); - return reducer; + return reducer; } - /** - * A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. - */ + /** A small wrapper class that provides the TreeReducer interface along with the FutureTask semantics. */ private class TreeReduceTask extends FutureTask { private TreeReducer treeReducer = null; @@ -332,7 +318,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar } public void setWalker( TreeReducible walker ) { - treeReducer.setWalker( walker ); + treeReducer.setWalker(walker); } public boolean isReadyForReduce() { @@ -342,6 +328,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Used by the ShardTraverser to report time consumed traversing a given shard. + * * @param shardTraversalTime Elapsed time traversing a given shard. */ synchronized void reportShardTraverseTime( long shardTraversalTime ) { @@ -351,6 +338,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Used by the TreeReducer to report time consumed reducing two shards. + * * @param treeReduceTime Elapsed time reducing two shards. */ synchronized void reportTreeReduceTime( long treeReduceTime ) { @@ -359,69 +347,51 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public int getTotalNumberOfShards() { return totalTraversals; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public int getRemainingNumberOfShards() { return traverseTasks.size(); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public int getNumberOfTasksInReduceQueue() { - return reduceTasks.size(); + return reduceTasks.size(); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public int getNumberOfTasksInIOQueue() { return outputMergeTasks.size(); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public long getTotalShardTraverseTimeMillis() { return totalShardTraverseTime; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public long getAvgShardTraverseTimeMillis() { - if( totalCompletedTraversals == 0 ) + if (totalCompletedTraversals == 0) return 0; return totalShardTraverseTime / totalCompletedTraversals; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public long getTotalTreeReduceTimeMillis() { return totalTreeReduceTime; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public long getAvgTreeReduceTimeMillis() { - if( totalCompletedTreeReduces == 0 ) + if (totalCompletedTreeReduces == 0) return 0; return totalTreeReduceTime / totalCompletedTreeReduces; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public long getTotalOutputMergeTimeMillis() { return totalOutputMergeTime; } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 5a579698a..5032d5a52 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -31,9 +31,10 @@ public class LinearMicroScheduler extends MicroScheduler { * * @param walker Computation to perform over dataset. * @param locations Subset of the dataset over which to walk. + * @param maxIterations the maximum number of iterations we're to perform */ - public Object execute(Walker walker, GenomeLocSortedSet locations) { - ShardStrategy shardStrategy = getShardStrategy(walker, reference, locations); + public Object execute(Walker walker, GenomeLocSortedSet locations, Integer maxIterations) { + ShardStrategy shardStrategy = getShardStrategy(walker, reference, locations, maxIterations); walker.initialize(); Accumulator accumulator = Accumulator.create(walker); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index eb6674105..414ec123c 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -99,9 +99,10 @@ public abstract class MicroScheduler { * Walks a walker over the given list of intervals. * @param walker Computation to perform over dataset. * @param intervals A list of intervals over which to walk. Null for whole dataset. + * @param maxIterations the maximum number of iterations we're to perform * @return the return type of the walker */ - public abstract Object execute(Walker walker, GenomeLocSortedSet intervals); + public abstract Object execute(Walker walker, GenomeLocSortedSet intervals, Integer maxIterations); /** * Get the sharding strategy given a driving data source. @@ -110,7 +111,10 @@ public abstract class MicroScheduler { * @param intervals Intervals to use when limiting sharding. * @return Sharding strategy for this driving data source. */ - protected ShardStrategy getShardStrategy(Walker walker, ReferenceSequenceFile drivingDataSource, GenomeLocSortedSet intervals) { + protected ShardStrategy getShardStrategy(Walker walker, + ReferenceSequenceFile drivingDataSource, + GenomeLocSortedSet intervals, + Integer maxIterations) { ShardStrategy shardStrategy = null; ShardStrategyFactory.SHATTER_STRATEGY shardType; if (walker instanceof LocusWalker) { @@ -122,11 +126,11 @@ public abstract class MicroScheduler { shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, - intervals); + intervals, maxIterations); } else shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, drivingDataSource.getSequenceDictionary(), - SHARD_SIZE); + SHARD_SIZE, maxIterations); } else if (walker instanceof ReadWalker) { @@ -136,11 +140,11 @@ public abstract class MicroScheduler { shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), SHARD_SIZE, - intervals); + intervals, maxIterations); } else { shardStrategy = ShardStrategyFactory.shatter(shardType, drivingDataSource.getSequenceDictionary(), - SHARD_SIZE); + SHARD_SIZE, maxIterations); } } else throw new StingException("Unable to support walker of type" + walker.getClass().getName()); diff --git a/java/test/org/broadinstitute/sting/gatk/GATKArgumentCollectionTest.java b/java/test/org/broadinstitute/sting/gatk/GATKArgumentCollectionTest.java index dfd6a8527..c9849490c 100755 --- a/java/test/org/broadinstitute/sting/gatk/GATKArgumentCollectionTest.java +++ b/java/test/org/broadinstitute/sting/gatk/GATKArgumentCollectionTest.java @@ -73,7 +73,7 @@ public class GATKArgumentCollectionTest extends BaseTest { List input = new ArrayList(); input.add(new File("test.file")); collect.samFiles = input; - collect.maximumReads = "-1"; + collect.maximumEngineIterations = -1; collect.strictnessLevel = "strict"; collect.referenceFile = new File("referenceFile".toLowerCase()); collect.analysisName = "analysisName".toLowerCase(); diff --git a/java/test/org/broadinstitute/sting/gatk/dataSources/shards/LinearLocusShardStrategyTest.java b/java/test/org/broadinstitute/sting/gatk/dataSources/shards/LinearLocusShardStrategyTest.java index 00b45e731..6af3fd925 100755 --- a/java/test/org/broadinstitute/sting/gatk/dataSources/shards/LinearLocusShardStrategyTest.java +++ b/java/test/org/broadinstitute/sting/gatk/dataSources/shards/LinearLocusShardStrategyTest.java @@ -58,7 +58,7 @@ public class LinearLocusShardStrategyTest extends BaseTest { @Test public void testSetup() { - LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 500); + LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 500, -1); int counter = 0; while(strat.hasNext()) { Shard d = strat.next(); @@ -71,7 +71,7 @@ public class LinearLocusShardStrategyTest extends BaseTest { @Test public void testAdjustSize() { - LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 500); + LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 500, -1); strat.adjustNextShardSize(1000); int counter = 0; while(strat.hasNext()) { @@ -86,7 +86,7 @@ public class LinearLocusShardStrategyTest extends BaseTest { @Test public void testUnevenSplit() { - LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 600); + LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 600, -1); int counter = 0; while(strat.hasNext()) { Shard d = strat.next(); @@ -100,4 +100,19 @@ public class LinearLocusShardStrategyTest extends BaseTest { } assertTrue(counter == 10); } + + + @Test + public void testDashMOption() { + LinearLocusShardStrategy strat = new LinearLocusShardStrategy(header.getSequenceDictionary(), 600, 200); + int counter = 0; + while(strat.hasNext()) { + Shard d = strat.next(); + assertTrue(d instanceof LocusShard); + assertTrue((d.getGenomeLoc().getStop() - d.getGenomeLoc().getStart()) == 199); + ++counter; + } + assertTrue(counter == 1); + } + } diff --git a/java/test/org/broadinstitute/sting/gatk/dataSources/simpleDataSources/SAMBAMDataSourceTest.java b/java/test/org/broadinstitute/sting/gatk/dataSources/simpleDataSources/SAMBAMDataSourceTest.java index aff387c0b..e0d44c9c3 100755 --- a/java/test/org/broadinstitute/sting/gatk/dataSources/simpleDataSources/SAMBAMDataSourceTest.java +++ b/java/test/org/broadinstitute/sting/gatk/dataSources/simpleDataSources/SAMBAMDataSourceTest.java @@ -129,7 +129,7 @@ public class SAMBAMDataSourceTest extends BaseTest { // setup the test files fl.add(new File(seqLocation + "/dirseq/analysis/cancer_exome/twoflowcell_sams/TCGA-06-0188.aligned.duplicates_marked.bam")); - Reads reads = new Reads(fl); + Reads reads = new Reads(fl); ArrayList readcountPerShard = new ArrayList(); ArrayList readcountPerShard2 = new ArrayList();