Cleanup: prepare for better output handling.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@586 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
fd496159a8
commit
5bdf653919
|
|
@ -8,12 +8,12 @@ import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
|
||||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||||
import org.broadinstitute.sting.gatk.GenomeAnalysisTK;
|
import org.broadinstitute.sting.gatk.GenomeAnalysisTK;
|
||||||
|
import org.broadinstitute.sting.gatk.OutputTracker;
|
||||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||||
import org.broadinstitute.sting.utils.StingException;
|
import org.broadinstitute.sting.utils.StingException;
|
||||||
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
|
@ -35,6 +35,12 @@ import java.util.concurrent.FutureTask;
|
||||||
* Requires a special walker tagged with a 'TreeReducible' interface.
|
* Requires a special walker tagged with a 'TreeReducible' interface.
|
||||||
*/
|
*/
|
||||||
public class HierarchicalMicroScheduler extends MicroScheduler implements ReduceTree.TreeReduceNotifier {
|
public class HierarchicalMicroScheduler extends MicroScheduler implements ReduceTree.TreeReduceNotifier {
|
||||||
|
/**
|
||||||
|
* How many outstanding output merges are allowed before the scheduler stops
|
||||||
|
* allowing new processes and starts merging flat-out.
|
||||||
|
*/
|
||||||
|
private static final int MAX_OUTSTANDING_OUTPUT_MERGES = 50;
|
||||||
|
|
||||||
private TraverseLociByReference traversalEngine = null;
|
private TraverseLociByReference traversalEngine = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -44,7 +50,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
|
|
||||||
private Queue<Shard> traverseTasks = new LinkedList<Shard>();
|
private Queue<Shard> traverseTasks = new LinkedList<Shard>();
|
||||||
private Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
private Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
||||||
private Queue<OutputMerger> outputMergeTasks = new LinkedList<OutputMerger>();
|
private Queue<ShardOutput> outputMergeTasks = new LinkedList<ShardOutput>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new hierarchical microscheduler to process the given reads and reference.
|
* Create a new hierarchical microscheduler to process the given reads and reference.
|
||||||
|
|
@ -79,26 +85,26 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
for(Shard shard: shardStrategy)
|
for(Shard shard: shardStrategy)
|
||||||
traverseTasks.add(shard);
|
traverseTasks.add(shard);
|
||||||
|
|
||||||
while( isShardTraversePending() || isTreeReducePending() || isOutputMergePending() ) {
|
while( isShardTraversePending() || isTreeReducePending() ) {
|
||||||
|
// Too many files sitting around taking up space? Merge them.
|
||||||
|
if( isMergeLimitExceeded() )
|
||||||
|
mergeExistingOutput();
|
||||||
|
|
||||||
|
// Wait for the next slot in the queue to become free.
|
||||||
waitForFreeQueueSlot();
|
waitForFreeQueueSlot();
|
||||||
|
|
||||||
|
// Pick the next most appropriate task and run it. In the interest of
|
||||||
|
// memory conservation, hierarchical reduces always run before traversals.
|
||||||
if( isTreeReduceReady() )
|
if( isTreeReduceReady() )
|
||||||
queueNextTreeReduce( walker );
|
queueNextTreeReduce( walker );
|
||||||
else if( isShardTraversePending() ) {
|
else if( isShardTraversePending() )
|
||||||
Future traverseResult = queueNextShardTraverse( walker, dataSource );
|
queueNextShardTraverse( walker, dataSource, reduceTree );
|
||||||
|
|
||||||
// Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue.
|
|
||||||
reduceTree.addEntry( traverseResult );
|
|
||||||
|
|
||||||
// No more data? Let the reduce tree know so it can finish processing what it's got.
|
|
||||||
if( !isShardTraversePending() )
|
|
||||||
reduceTree.complete();
|
|
||||||
}
|
|
||||||
else if( isOutputMergeReady() ) {
|
|
||||||
queueNextOutputMerge();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Merge any lingering output files. If these files aren't ready,
|
||||||
|
// sit around and wait for them, then merge them.
|
||||||
|
mergeRemainingOutput();
|
||||||
|
|
||||||
threadPool.shutdown();
|
threadPool.shutdown();
|
||||||
|
|
||||||
Object result = null;
|
Object result = null;
|
||||||
|
|
@ -142,22 +148,59 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
return reduceTasks.size() > 0;
|
return reduceTasks.size() > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether the maximum number of files is sitting in the temp directory
|
||||||
|
* waiting to be merged back in.
|
||||||
|
* @return True if the merging needs to take priority. False otherwise.
|
||||||
|
*/
|
||||||
|
protected boolean isMergeLimitExceeded() {
|
||||||
|
if( outputMergeTasks.size() < MAX_OUTSTANDING_OUTPUT_MERGES )
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// If any of the first MAX_OUTSTANDING merges aren't ready, the merge limit
|
||||||
|
// has not been exceeded.
|
||||||
|
ShardOutput[] outputMergers = outputMergeTasks.toArray( new ShardOutput[0] );
|
||||||
|
for( int i = 0; i < outputMergers.length; i++ ) {
|
||||||
|
if( !outputMergers[i].isComplete() )
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Everything's ready?
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns whether there is output waiting to be merged into the global output
|
* Returns whether there is output waiting to be merged into the global output
|
||||||
* streams right now.
|
* streams right now.
|
||||||
* @return True if this output is ready to be merged. False otherwise.
|
* @return True if this output is ready to be merged. False otherwise.
|
||||||
*/
|
*/
|
||||||
protected boolean isOutputMergeReady() {
|
protected boolean isOutputMergeReady() {
|
||||||
return !OutputMerger.isMergeQueued() && outputMergeTasks.peek().isComplete();
|
return outputMergeTasks.peek().isComplete();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns whether there is output that will eventually need to be merged into
|
* Merging all output that's sitting ready in the OutputMerger queue into
|
||||||
* the output streams.
|
* the final data streams.
|
||||||
* @return True if output will eventually need to be merged. False otherwise.
|
|
||||||
*/
|
*/
|
||||||
protected boolean isOutputMergePending() {
|
protected void mergeExistingOutput() {
|
||||||
return outputMergeTasks.size() > 0;
|
OutputTracker outputTracker = GenomeAnalysisTK.Instance.getOutputTracker();
|
||||||
|
while( isOutputMergeReady() )
|
||||||
|
outputMergeTasks.remove().mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() );
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge any output that hasn't yet been taken care of by the blocking thread.
|
||||||
|
*/
|
||||||
|
protected void mergeRemainingOutput() {
|
||||||
|
OutputTracker outputTracker = GenomeAnalysisTK.Instance.getOutputTracker();
|
||||||
|
while( outputMergeTasks.size() > 0 ) {
|
||||||
|
ShardOutput shardOutput = outputMergeTasks.remove();
|
||||||
|
synchronized(shardOutput) {
|
||||||
|
if( !shardOutput.isComplete() )
|
||||||
|
shardOutput.waitForOutputComplete();
|
||||||
|
}
|
||||||
|
shardOutput.mergeInto( outputTracker.getGlobalOutStream(), outputTracker.getGlobalErrStream() );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -165,7 +208,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
* @param walker Walker to apply to the dataset.
|
* @param walker Walker to apply to the dataset.
|
||||||
* @param dataSource Source of the reads
|
* @param dataSource Source of the reads
|
||||||
*/
|
*/
|
||||||
protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource ) {
|
protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource, ReduceTree reduceTree ) {
|
||||||
if( traverseTasks.size() == 0 )
|
if( traverseTasks.size() == 0 )
|
||||||
throw new IllegalStateException( "Cannot traverse; no pending traversals exist.");
|
throw new IllegalStateException( "Cannot traverse; no pending traversals exist.");
|
||||||
|
|
||||||
|
|
@ -178,11 +221,18 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
dataSource,
|
dataSource,
|
||||||
shardOutput );
|
shardOutput );
|
||||||
|
|
||||||
outputMergeTasks.add(new OutputMerger(shardOutput,
|
Future traverseResult = threadPool.submit(traverser);
|
||||||
GenomeAnalysisTK.Instance.getOutputTracker().getGlobalOutStream(),
|
|
||||||
GenomeAnalysisTK.Instance.getOutputTracker().getGlobalErrStream()));
|
|
||||||
|
|
||||||
return threadPool.submit(traverser);
|
// Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue.
|
||||||
|
reduceTree.addEntry( traverseResult );
|
||||||
|
|
||||||
|
// No more data? Let the reduce tree know so it can finish processing what it's got.
|
||||||
|
if( !isShardTraversePending() )
|
||||||
|
reduceTree.complete();
|
||||||
|
|
||||||
|
outputMergeTasks.add(shardOutput);
|
||||||
|
|
||||||
|
return traverseResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -197,19 +247,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
||||||
threadPool.submit( reducer );
|
threadPool.submit( reducer );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Pulls the next output merge and puts it on the queue.
|
|
||||||
*/
|
|
||||||
protected void queueNextOutputMerge() {
|
|
||||||
if( outputMergeTasks.size() == 0 )
|
|
||||||
throw new IllegalStateException( "Cannot merge output; no pending merges exist.");
|
|
||||||
if( OutputMerger.isMergeQueued() )
|
|
||||||
throw new IllegalStateException( "Cannot merge output; another merge has already been queued.");
|
|
||||||
|
|
||||||
OutputMerger.queueMerge();
|
|
||||||
threadPool.submit( outputMergeTasks.remove() );
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Blocks until a free slot appears in the thread queue.
|
* Blocks until a free slot appears in the thread queue.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -1,66 +0,0 @@
|
||||||
package org.broadinstitute.sting.gatk.executive;
|
|
||||||
|
|
||||||
import java.io.OutputStream;
|
|
||||||
/**
|
|
||||||
* User: hanna
|
|
||||||
* Date: May 1, 2009
|
|
||||||
* Time: 11:47:44 AM
|
|
||||||
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
|
|
||||||
* Software and documentation are copyright 2005 by the Broad Institute.
|
|
||||||
* All rights are reserved.
|
|
||||||
*
|
|
||||||
* Users acknowledge that this software is supplied without any warranty or support.
|
|
||||||
* The Broad Institute is not responsible for its use, misuse, or
|
|
||||||
* functionality.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A queueable task to merge two output files. Provides a static 'lock' to
|
|
||||||
* track whether a merge is currently happening.
|
|
||||||
*/
|
|
||||||
public class OutputMerger implements Runnable {
|
|
||||||
public final ShardOutput shardOutput;
|
|
||||||
|
|
||||||
private final OutputStream out;
|
|
||||||
private final OutputStream err;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether a merge has been queued. Keeps merges from stepping on each other.
|
|
||||||
*/
|
|
||||||
private static boolean mergeQueued;
|
|
||||||
|
|
||||||
public OutputMerger( ShardOutput shardOutput, OutputStream out, OutputStream err ) {
|
|
||||||
this.shardOutput = shardOutput;
|
|
||||||
this.out = out;
|
|
||||||
this.err = err;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isComplete() {
|
|
||||||
return shardOutput.isComplete();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Is a merge currently queued?
|
|
||||||
* @return True if a merge is waiting on the queue. False otherwise.
|
|
||||||
*/
|
|
||||||
public static boolean isMergeQueued() {
|
|
||||||
return mergeQueued;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void queueMerge() {
|
|
||||||
if( mergeQueued )
|
|
||||||
throw new IllegalStateException( "Attempted to mark that a merge has been queued when a merge is already queued." );
|
|
||||||
mergeQueued = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void run() {
|
|
||||||
if( !mergeQueued )
|
|
||||||
throw new IllegalStateException( "Starting a file merge, but our state shows no merge has been queued." );
|
|
||||||
try {
|
|
||||||
shardOutput.mergeInto( out, err );
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
mergeQueued = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -10,8 +10,8 @@ import java.io.FileInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
import java.nio.channels.Channel;
|
|
||||||
import java.nio.channels.WritableByteChannel;
|
import java.nio.channels.WritableByteChannel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* User: hanna
|
* User: hanna
|
||||||
* Date: Apr 30, 2009
|
* Date: Apr 30, 2009
|
||||||
|
|
@ -67,6 +67,18 @@ public class ShardOutput {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits for any the given ShardOutput to be ready for merging.
|
||||||
|
*/
|
||||||
|
public synchronized void waitForOutputComplete() {
|
||||||
|
try {
|
||||||
|
wait();
|
||||||
|
}
|
||||||
|
catch( InterruptedException ex ) {
|
||||||
|
throw new StingException("Interrupted while waiting for more output to be finalized.",ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this output complete?
|
* Is this output complete?
|
||||||
* @return True if output complete. False otherwise.
|
* @return True if output complete. False otherwise.
|
||||||
|
|
@ -79,6 +91,9 @@ public class ShardOutput {
|
||||||
* Indicate that no more data will be written to these output streams.
|
* Indicate that no more data will be written to these output streams.
|
||||||
*/
|
*/
|
||||||
public synchronized void complete() {
|
public synchronized void complete() {
|
||||||
|
if( isComplete() )
|
||||||
|
throw new IllegalStateException("Tried to complete an output merge twice.");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
out.flush();
|
out.flush();
|
||||||
err.flush();
|
err.flush();
|
||||||
|
|
@ -91,8 +106,11 @@ public class ShardOutput {
|
||||||
|
|
||||||
out = null;
|
out = null;
|
||||||
err = null;
|
err = null;
|
||||||
|
|
||||||
this.complete = true;
|
this.complete = true;
|
||||||
|
|
||||||
|
// Notify waiting listeners that this shard is complete and ready for merging.
|
||||||
|
notifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void mergeInto( OutputStream globalOut, OutputStream globalErr ) {
|
public void mergeInto( OutputStream globalOut, OutputStream globalErr ) {
|
||||||
|
|
@ -105,6 +123,11 @@ public class ShardOutput {
|
||||||
transferContentsToTarget( errFile, globalErr );
|
transferContentsToTarget( errFile, globalErr );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy the contents of the given file into the specified output stream.
|
||||||
|
* @param source Source of data to copy.
|
||||||
|
* @param target Target for copied data.
|
||||||
|
*/
|
||||||
private void transferContentsToTarget( File source, OutputStream target ) {
|
private void transferContentsToTarget( File source, OutputStream target ) {
|
||||||
FileInputStream sourceStream = null;
|
FileInputStream sourceStream = null;
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
|
|
@ -5,14 +5,7 @@ import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||||
import org.broadinstitute.sting.gatk.refdata.rodDbSNP;
|
import org.broadinstitute.sting.gatk.refdata.rodDbSNP;
|
||||||
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
|
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
|
||||||
import org.broadinstitute.sting.utils.cmdLine.Argument;
|
import org.broadinstitute.sting.utils.cmdLine.Argument;
|
||||||
import org.broadinstitute.sting.utils.Utils;
|
|
||||||
import org.broadinstitute.sting.utils.Pileup;
|
|
||||||
import org.broadinstitute.sting.utils.BasicPileup;
|
|
||||||
import org.broadinstitute.sting.utils.ReadBackedPileup;
|
import org.broadinstitute.sting.utils.ReadBackedPileup;
|
||||||
import net.sf.samtools.SAMRecord;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by IntelliJ IDEA.
|
* Created by IntelliJ IDEA.
|
||||||
|
|
@ -21,7 +14,7 @@ import java.util.ArrayList;
|
||||||
* Time: 3:22:14 PM
|
* Time: 3:22:14 PM
|
||||||
* To change this template use File | Settings | File Templates.
|
* To change this template use File | Settings | File Templates.
|
||||||
*/
|
*/
|
||||||
public class PileupWalker extends LocusWalker<Integer, Integer> {
|
public class PileupWalker extends LocusWalker<Integer, Integer> implements TreeReducible<Integer> {
|
||||||
@Argument(fullName="verbose",required=false,defaultValue="false")
|
@Argument(fullName="verbose",required=false,defaultValue="false")
|
||||||
public boolean VERBOSE;
|
public boolean VERBOSE;
|
||||||
|
|
||||||
|
|
@ -74,7 +67,7 @@ public class PileupWalker extends LocusWalker<Integer, Integer> {
|
||||||
|
|
||||||
if ( EXTENDED ) {
|
if ( EXTENDED ) {
|
||||||
String probDists = pileup.getProbDistPileup();
|
String probDists = pileup.getProbDistPileup();
|
||||||
System.out.println(probDists);
|
out.println(probDists);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
|
@ -83,6 +76,9 @@ public class PileupWalker extends LocusWalker<Integer, Integer> {
|
||||||
// Given result of map function
|
// Given result of map function
|
||||||
public Integer reduceInit() { return 0; }
|
public Integer reduceInit() { return 0; }
|
||||||
public Integer reduce(Integer value, Integer sum) {
|
public Integer reduce(Integer value, Integer sum) {
|
||||||
return value + sum;
|
return reduce(sum,value);
|
||||||
|
}
|
||||||
|
public Integer treeReduce(Integer lhs, Integer rhs) {
|
||||||
|
return lhs + rhs;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue