diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index d8da5fdf5..7faa8fd21 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -8,12 +8,12 @@ import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; import org.broadinstitute.sting.gatk.GenomeAnalysisTK; +import org.broadinstitute.sting.gatk.OutputTracker; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.io.File; -import java.io.OutputStream; import java.util.List; import java.util.Queue; import java.util.LinkedList; @@ -35,6 +35,12 @@ import java.util.concurrent.FutureTask; * Requires a special walker tagged with a 'TreeReducible' interface. */ public class HierarchicalMicroScheduler extends MicroScheduler implements ReduceTree.TreeReduceNotifier { + /** + * How many outstanding output merges are allowed before the scheduler stops + * allowing new processes and starts merging flat-out. + */ + private static final int MAX_OUTSTANDING_OUTPUT_MERGES = 50; + private TraverseLociByReference traversalEngine = null; /** @@ -44,7 +50,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce private Queue traverseTasks = new LinkedList(); private Queue reduceTasks = new LinkedList(); - private Queue outputMergeTasks = new LinkedList(); + private Queue outputMergeTasks = new LinkedList(); /** * Create a new hierarchical microscheduler to process the given reads and reference. @@ -79,26 +85,26 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce for(Shard shard: shardStrategy) traverseTasks.add(shard); - while( isShardTraversePending() || isTreeReducePending() || isOutputMergePending() ) { + while( isShardTraversePending() || isTreeReducePending() ) { + // Too many files sitting around taking up space? Merge them. + if( isMergeLimitExceeded() ) + mergeExistingOutput(); + + // Wait for the next slot in the queue to become free. waitForFreeQueueSlot(); + // Pick the next most appropriate task and run it. In the interest of + // memory conservation, hierarchical reduces always run before traversals. if( isTreeReduceReady() ) queueNextTreeReduce( walker ); - else if( isShardTraversePending() ) { - Future traverseResult = queueNextShardTraverse( walker, dataSource ); - - // Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue. - reduceTree.addEntry( traverseResult ); - - // No more data? Let the reduce tree know so it can finish processing what it's got. - if( !isShardTraversePending() ) - reduceTree.complete(); - } - else if( isOutputMergeReady() ) { - queueNextOutputMerge(); - } + else if( isShardTraversePending() ) + queueNextShardTraverse( walker, dataSource, reduceTree ); } + // Merge any lingering output files. If these files aren't ready, + // sit around and wait for them, then merge them. + mergeRemainingOutput(); + threadPool.shutdown(); Object result = null; @@ -142,22 +148,59 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce return reduceTasks.size() > 0; } + /** + * Returns whether the maximum number of files is sitting in the temp directory + * waiting to be merged back in. + * @return True if the merging needs to take priority. False otherwise. + */ + protected boolean isMergeLimitExceeded() { + if( outputMergeTasks.size() < MAX_OUTSTANDING_OUTPUT_MERGES ) + return false; + + // If any of the first MAX_OUTSTANDING merges aren't ready, the merge limit + // has not been exceeded. + ShardOutput[] outputMergers = outputMergeTasks.toArray( new ShardOutput[0] ); + for( int i = 0; i < outputMergers.length; i++ ) { + if( !outputMergers[i].isComplete() ) + return false; + } + + // Everything's ready? + return true; + } + /** * Returns whether there is output waiting to be merged into the global output * streams right now. * @return True if this output is ready to be merged. False otherwise. */ protected boolean isOutputMergeReady() { - return !OutputMerger.isMergeQueued() && outputMergeTasks.peek().isComplete(); + return outputMergeTasks.peek().isComplete(); } /** - * Returns whether there is output that will eventually need to be merged into - * the output streams. - * @return True if output will eventually need to be merged. False otherwise. + * Merging all output that's sitting ready in the OutputMerger queue into + * the final data streams. */ - protected boolean isOutputMergePending() { - return outputMergeTasks.size() > 0; + protected void mergeExistingOutput() { + OutputTracker outputTracker = GenomeAnalysisTK.Instance.getOutputTracker(); + while( isOutputMergeReady() ) + outputMergeTasks.remove().mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() ); + } + + /** + * Merge any output that hasn't yet been taken care of by the blocking thread. + */ + protected void mergeRemainingOutput() { + OutputTracker outputTracker = GenomeAnalysisTK.Instance.getOutputTracker(); + while( outputMergeTasks.size() > 0 ) { + ShardOutput shardOutput = outputMergeTasks.remove(); + synchronized(shardOutput) { + if( !shardOutput.isComplete() ) + shardOutput.waitForOutputComplete(); + } + shardOutput.mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() ); + } } /** @@ -165,7 +208,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce * @param walker Walker to apply to the dataset. * @param dataSource Source of the reads */ - protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource ) { + protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource, ReduceTree reduceTree ) { if( traverseTasks.size() == 0 ) throw new IllegalStateException( "Cannot traverse; no pending traversals exist."); @@ -178,11 +221,18 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce dataSource, shardOutput ); - outputMergeTasks.add(new OutputMerger(shardOutput, - GenomeAnalysisTK.Instance.getOutputTracker().getGlobalOutStream(), - GenomeAnalysisTK.Instance.getOutputTracker().getGlobalErrStream())); + Future traverseResult = threadPool.submit(traverser); - return threadPool.submit(traverser); + // Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue. + reduceTree.addEntry( traverseResult ); + + // No more data? Let the reduce tree know so it can finish processing what it's got. + if( !isShardTraversePending() ) + reduceTree.complete(); + + outputMergeTasks.add(shardOutput); + + return traverseResult; } /** @@ -197,19 +247,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce threadPool.submit( reducer ); } - /** - * Pulls the next output merge and puts it on the queue. - */ - protected void queueNextOutputMerge() { - if( outputMergeTasks.size() == 0 ) - throw new IllegalStateException( "Cannot merge output; no pending merges exist."); - if( OutputMerger.isMergeQueued() ) - throw new IllegalStateException( "Cannot merge output; another merge has already been queued."); - - OutputMerger.queueMerge(); - threadPool.submit( outputMergeTasks.remove() ); - } - /** * Blocks until a free slot appears in the thread queue. */ diff --git a/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java b/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java deleted file mode 100755 index 4426bd1fd..000000000 --- a/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.broadinstitute.sting.gatk.executive; - -import java.io.OutputStream; -/** - * User: hanna - * Date: May 1, 2009 - * Time: 11:47:44 AM - * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT - * Software and documentation are copyright 2005 by the Broad Institute. - * All rights are reserved. - * - * Users acknowledge that this software is supplied without any warranty or support. - * The Broad Institute is not responsible for its use, misuse, or - * functionality. - */ - -/** - * A queueable task to merge two output files. Provides a static 'lock' to - * track whether a merge is currently happening. - */ -public class OutputMerger implements Runnable { - public final ShardOutput shardOutput; - - private final OutputStream out; - private final OutputStream err; - - /** - * Indicates whether a merge has been queued. Keeps merges from stepping on each other. - */ - private static boolean mergeQueued; - - public OutputMerger( ShardOutput shardOutput, OutputStream out, OutputStream err ) { - this.shardOutput = shardOutput; - this.out = out; - this.err = err; - } - - public boolean isComplete() { - return shardOutput.isComplete(); - } - - /** - * Is a merge currently queued? - * @return True if a merge is waiting on the queue. False otherwise. - */ - public static boolean isMergeQueued() { - return mergeQueued; - } - - public static void queueMerge() { - if( mergeQueued ) - throw new IllegalStateException( "Attempted to mark that a merge has been queued when a merge is already queued." ); - mergeQueued = true; - } - - public void run() { - if( !mergeQueued ) - throw new IllegalStateException( "Starting a file merge, but our state shows no merge has been queued." ); - try { - shardOutput.mergeInto( out, err ); - } - finally { - mergeQueued = false; - } - } -} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardOutput.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardOutput.java index 63ed4e913..ce64b7853 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/ShardOutput.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardOutput.java @@ -10,8 +10,8 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.nio.channels.FileChannel; import java.nio.channels.Channels; -import java.nio.channels.Channel; import java.nio.channels.WritableByteChannel; + /** * User: hanna * Date: Apr 30, 2009 @@ -67,6 +67,18 @@ public class ShardOutput { } } + /** + * Waits for any the given ShardOutput to be ready for merging. + */ + public synchronized void waitForOutputComplete() { + try { + wait(); + } + catch( InterruptedException ex ) { + throw new StingException("Interrupted while waiting for more output to be finalized.",ex); + } + } + /** * Is this output complete? * @return True if output complete. False otherwise. @@ -79,6 +91,9 @@ public class ShardOutput { * Indicate that no more data will be written to these output streams. */ public synchronized void complete() { + if( isComplete() ) + throw new IllegalStateException("Tried to complete an output merge twice."); + try { out.flush(); err.flush(); @@ -91,8 +106,11 @@ public class ShardOutput { out = null; err = null; - + this.complete = true; + + // Notify waiting listeners that this shard is complete and ready for merging. + notifyAll(); } public void mergeInto( OutputStream globalOut, OutputStream globalErr ) { @@ -105,6 +123,11 @@ public class ShardOutput { transferContentsToTarget( errFile, globalErr ); } + /** + * Copy the contents of the given file into the specified output stream. + * @param source Source of data to copy. + * @param target Target for copied data. + */ private void transferContentsToTarget( File source, OutputStream target ) { FileInputStream sourceStream = null; try { diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/PileupWalker.java b/java/src/org/broadinstitute/sting/gatk/walkers/PileupWalker.java index 2a74e61f1..cbf865c2d 100644 --- a/java/src/org/broadinstitute/sting/gatk/walkers/PileupWalker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/PileupWalker.java @@ -5,14 +5,7 @@ import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.refdata.rodDbSNP; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.utils.cmdLine.Argument; -import org.broadinstitute.sting.utils.Utils; -import org.broadinstitute.sting.utils.Pileup; -import org.broadinstitute.sting.utils.BasicPileup; import org.broadinstitute.sting.utils.ReadBackedPileup; -import net.sf.samtools.SAMRecord; - -import java.util.List; -import java.util.ArrayList; /** * Created by IntelliJ IDEA. @@ -21,7 +14,7 @@ import java.util.ArrayList; * Time: 3:22:14 PM * To change this template use File | Settings | File Templates. */ -public class PileupWalker extends LocusWalker { +public class PileupWalker extends LocusWalker implements TreeReducible { @Argument(fullName="verbose",required=false,defaultValue="false") public boolean VERBOSE; @@ -74,7 +67,7 @@ public class PileupWalker extends LocusWalker { if ( EXTENDED ) { String probDists = pileup.getProbDistPileup(); - System.out.println(probDists); + out.println(probDists); } return 1; @@ -83,6 +76,9 @@ public class PileupWalker extends LocusWalker { // Given result of map function public Integer reduceInit() { return 0; } public Integer reduce(Integer value, Integer sum) { - return value + sum; + return reduce(sum,value); + } + public Integer treeReduce(Integer lhs, Integer rhs) { + return lhs + rhs; } }