N-way parallelism. Works for small test cases. Untested for large test cases.
-Needs more comprehensive unit testing. -Needs some basic refactoring. -Needs rethink of interface boundaries. -Needs to play more nicely in the /tmp sandbox. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@583 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
df88c4d6b0
commit
9f5f6f9bc7
|
|
@ -346,19 +346,11 @@ public class GenomeAnalysisTK extends CommandLineProgram {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the output stream to be used with this walker.
|
||||
* @return Output stream used with this walker.
|
||||
* Gets the output tracker. Tracks data available to a given walker.
|
||||
* @return The output tracker.
|
||||
*/
|
||||
public PrintStream getWalkerOutputStream() {
|
||||
return outputTracker.getOutStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the output stream to be used with this walker.
|
||||
* @return Output stream used with this walker.
|
||||
*/
|
||||
public PrintStream getWalkerErrorStream() {
|
||||
return outputTracker.getErrStream();
|
||||
public OutputTracker getOutputTracker() {
|
||||
return outputTracker;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,13 @@
|
|||
package org.broadinstitute.sting.gatk;
|
||||
|
||||
import org.broadinstitute.sting.utils.StingException;
|
||||
import org.broadinstitute.sting.utils.io.RedirectingOutputStream;
|
||||
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintWriter;
|
||||
/**
|
||||
* User: hanna
|
||||
* Date: Apr 30, 2009
|
||||
|
|
@ -27,15 +30,14 @@ public class OutputTracker {
|
|||
/**
|
||||
* The streams to which walker users should be writing directly.
|
||||
*/
|
||||
private PrintStream out;
|
||||
private PrintStream err;
|
||||
protected OutputStream globalOut;
|
||||
protected OutputStream globalErr;
|
||||
|
||||
/**
|
||||
* Cache the file streams so that reassemblers can use nio to do fast file transfers.
|
||||
* Thread-local versions of the output stream.
|
||||
*/
|
||||
private FileOutputStream outFileStream;
|
||||
private FileOutputStream errFileStream;
|
||||
|
||||
protected ThreadLocal<OutputStream> localOut = new ThreadLocal<OutputStream>();
|
||||
protected ThreadLocal<OutputStream> localErr = new ThreadLocal<OutputStream>();
|
||||
|
||||
/**
|
||||
* Create an object to manage output given filenames for the output and error files.
|
||||
|
|
@ -48,23 +50,11 @@ public class OutputTracker {
|
|||
// Otherwise, initialize them separately.
|
||||
if( outFileName != null && outFileName.equals(errFileName) ) {
|
||||
FileOutputStream outputFile = prepareOutputFile( outFileName );
|
||||
outFileStream = errFileStream = outputFile;
|
||||
out = err = new PrintStream( outputFile );
|
||||
globalOut = globalErr = outputFile;
|
||||
}
|
||||
else {
|
||||
if( outFileName != null ) {
|
||||
outFileStream = prepareOutputFile( outFileName );
|
||||
out = new PrintStream( outFileStream );
|
||||
}
|
||||
else
|
||||
out = System.out;
|
||||
|
||||
if( errFileName != null ) {
|
||||
errFileStream = prepareOutputFile( errFileName );
|
||||
err = new PrintStream( errFileStream );
|
||||
}
|
||||
else
|
||||
err = System.err;
|
||||
globalOut = (outFileName != null) ? prepareOutputFile( outFileName ) : System.out;
|
||||
globalErr = (errFileName != null) ? prepareOutputFile( errFileName ) : System.err;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -72,32 +62,65 @@ public class OutputTracker {
|
|||
* Gets the output stream for the walker.
|
||||
* @return Output stream; should be either file-backed or System.out.
|
||||
*/
|
||||
public PrintStream getOutStream() {
|
||||
return out;
|
||||
public OutputStream getGlobalOutStream() {
|
||||
return globalOut;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the error stream for the walker.
|
||||
* @return Error stream; should be either file-backed or System.err.
|
||||
*/
|
||||
public PrintStream getErrStream() {
|
||||
return err;
|
||||
public OutputStream getGlobalErrStream() {
|
||||
return globalErr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the filestream associated with normal output.
|
||||
* @return FileStream associated with the output; null if not backed by a file.
|
||||
* Retrieve an output stream that will always return the most appropriate
|
||||
* writer for this thread.
|
||||
*/
|
||||
public FileOutputStream getOutFile() {
|
||||
return outFileStream;
|
||||
public OutputStream getOutStream() {
|
||||
// Create an anonymous inner class which will just return the most
|
||||
// appropriate OutputStream from those streams stored in this class.
|
||||
return new RedirectingOutputStream(
|
||||
new RedirectingOutputStream.OutputStreamProvider() {
|
||||
public OutputStream getOutputStream() {
|
||||
return localOut.get() != null ? localOut.get() : globalOut;
|
||||
}
|
||||
} );
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the filestream associated with error output.
|
||||
* @return stream associated with error output; null if not backed by a file.
|
||||
* Retrieve the most appropriate output stream for this thread.
|
||||
* Will retrieve thread-local if available; otherwise, it'll read the
|
||||
* global stream.
|
||||
*/
|
||||
public FileOutputStream getErrFile() {
|
||||
return errFileStream;
|
||||
public OutputStream getErrStream() {
|
||||
// Create an anonymous inner class which will just return the most
|
||||
// appropriate OutputStream from those streams stored in this class.
|
||||
return new RedirectingOutputStream(
|
||||
new RedirectingOutputStream.OutputStreamProvider() {
|
||||
public OutputStream getOutputStream() {
|
||||
return localErr.get() != null ? localErr.get() : globalErr;
|
||||
}
|
||||
} );
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the (thread-)local version of the given output and error files.
|
||||
* @param out output stream to which to write.
|
||||
* @param err error stream to which to write.
|
||||
*/
|
||||
public void setLocalStreams( OutputStream out, OutputStream err ) {
|
||||
localOut.set( out );
|
||||
localErr.set( err );
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove pointers to alternate, local output streams.
|
||||
*/
|
||||
public void removeLocalStreams() {
|
||||
localOut.remove();
|
||||
localErr.remove();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -7,10 +7,13 @@ import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
|||
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.GenomeAnalysisTK;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.StingException;
|
||||
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.LinkedList;
|
||||
|
|
@ -41,7 +44,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
|
||||
private Queue<Shard> traverseTasks = new LinkedList<Shard>();
|
||||
private Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
||||
|
||||
private Queue<OutputMerger> outputMergeTasks = new LinkedList<OutputMerger>();
|
||||
|
||||
/**
|
||||
* Create a new hierarchical microscheduler to process the given reads and reference.
|
||||
|
|
@ -76,12 +79,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
for(Shard shard: shardStrategy)
|
||||
traverseTasks.add(shard);
|
||||
|
||||
while( isShardTraversePending() || isTreeReducePending() ) {
|
||||
while( isShardTraversePending() || isTreeReducePending() || isOutputMergePending() ) {
|
||||
waitForFreeQueueSlot();
|
||||
|
||||
if( isTreeReduceReady() )
|
||||
queueNextTreeReduce( walker );
|
||||
else {
|
||||
else if( isShardTraversePending() ) {
|
||||
Future traverseResult = queueNextShardTraverse( walker, dataSource );
|
||||
|
||||
// Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue.
|
||||
|
|
@ -91,9 +94,20 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
if( !isShardTraversePending() )
|
||||
reduceTree.complete();
|
||||
}
|
||||
else if( isOutputMergeReady() ) {
|
||||
queueNextOutputMerge();
|
||||
}
|
||||
}
|
||||
|
||||
Object result = reduceTree.getResult();
|
||||
threadPool.shutdown();
|
||||
|
||||
Object result = null;
|
||||
try {
|
||||
result = reduceTree.getResult().get();
|
||||
}
|
||||
catch(Exception ex) {
|
||||
throw new StingException("Unable to retrieve result", ex );
|
||||
}
|
||||
|
||||
traversalEngine.printOnTraversalDone("loci", result);
|
||||
walker.onTraversalDone(result);
|
||||
|
|
@ -128,6 +142,24 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
return reduceTasks.size() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether there is output waiting to be merged into the global output
|
||||
* streams right now.
|
||||
* @return True if this output is ready to be merged. False otherwise.
|
||||
*/
|
||||
protected boolean isOutputMergeReady() {
|
||||
return !OutputMerger.isMergeQueued() && outputMergeTasks.peek().isComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether there is output that will eventually need to be merged into
|
||||
* the output streams.
|
||||
* @return True if output will eventually need to be merged. False otherwise.
|
||||
*/
|
||||
protected boolean isOutputMergePending() {
|
||||
return outputMergeTasks.size() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues the next traversal of a walker from the traversal tasks queue.
|
||||
* @param walker Walker to apply to the dataset.
|
||||
|
|
@ -136,11 +168,20 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource ) {
|
||||
if( traverseTasks.size() == 0 )
|
||||
throw new IllegalStateException( "Cannot traverse; no pending traversals exist.");
|
||||
|
||||
ShardOutput shardOutput = new ShardOutput();
|
||||
|
||||
ShardTraverser traverser = new ShardTraverser( traversalEngine,
|
||||
walker,
|
||||
traverseTasks.remove(),
|
||||
reference,
|
||||
dataSource );
|
||||
dataSource,
|
||||
shardOutput );
|
||||
|
||||
outputMergeTasks.add(new OutputMerger(shardOutput,
|
||||
GenomeAnalysisTK.Instance.getOutputTracker().getGlobalOutStream(),
|
||||
GenomeAnalysisTK.Instance.getOutputTracker().getGlobalErrStream()));
|
||||
|
||||
return threadPool.submit(traverser);
|
||||
}
|
||||
|
||||
|
|
@ -156,6 +197,19 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
threadPool.submit( reducer );
|
||||
}
|
||||
|
||||
/**
|
||||
* Pulls the next output merge and puts it on the queue.
|
||||
*/
|
||||
protected void queueNextOutputMerge() {
|
||||
if( outputMergeTasks.size() == 0 )
|
||||
throw new IllegalStateException( "Cannot merge output; no pending merges exist.");
|
||||
if( OutputMerger.isMergeQueued() )
|
||||
throw new IllegalStateException( "Cannot merge output; another merge has already been queued.");
|
||||
|
||||
OutputMerger.queueMerge();
|
||||
threadPool.submit( outputMergeTasks.remove() );
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks until a free slot appears in the thread queue.
|
||||
*/
|
||||
|
|
@ -198,4 +252,5 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,66 @@
|
|||
package org.broadinstitute.sting.gatk.executive;
|
||||
|
||||
import java.io.OutputStream;
|
||||
/**
|
||||
* User: hanna
|
||||
* Date: May 1, 2009
|
||||
* Time: 11:47:44 AM
|
||||
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
|
||||
* Software and documentation are copyright 2005 by the Broad Institute.
|
||||
* All rights are reserved.
|
||||
*
|
||||
* Users acknowledge that this software is supplied without any warranty or support.
|
||||
* The Broad Institute is not responsible for its use, misuse, or
|
||||
* functionality.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A queueable task to merge two output files. Provides a static 'lock' to
|
||||
* track whether a merge is currently happening.
|
||||
*/
|
||||
public class OutputMerger implements Runnable {
|
||||
public final ShardOutput shardOutput;
|
||||
|
||||
private final OutputStream out;
|
||||
private final OutputStream err;
|
||||
|
||||
/**
|
||||
* Indicates whether a merge has been queued. Keeps merges from stepping on each other.
|
||||
*/
|
||||
private static boolean mergeQueued;
|
||||
|
||||
public OutputMerger( ShardOutput shardOutput, OutputStream out, OutputStream err ) {
|
||||
this.shardOutput = shardOutput;
|
||||
this.out = out;
|
||||
this.err = err;
|
||||
}
|
||||
|
||||
public boolean isComplete() {
|
||||
return shardOutput.isComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Is a merge currently queued?
|
||||
* @return True if a merge is waiting on the queue. False otherwise.
|
||||
*/
|
||||
public static boolean isMergeQueued() {
|
||||
return mergeQueued;
|
||||
}
|
||||
|
||||
public static void queueMerge() {
|
||||
if( mergeQueued )
|
||||
throw new IllegalStateException( "Attempted to mark that a merge has been queued when a merge is already queued." );
|
||||
mergeQueued = true;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
if( !mergeQueued )
|
||||
throw new IllegalStateException( "Starting a file merge, but our state shows no merge has been queued." );
|
||||
try {
|
||||
shardOutput.mergeInto( out, err );
|
||||
}
|
||||
finally {
|
||||
mergeQueued = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,143 @@
|
|||
package org.broadinstitute.sting.gatk.executive;
|
||||
|
||||
import org.broadinstitute.sting.utils.StingException;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.Channel;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
/**
|
||||
* User: hanna
|
||||
* Date: Apr 30, 2009
|
||||
* Time: 4:04:38 PM
|
||||
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
|
||||
* Software and documentation are copyright 2005 by the Broad Institute.
|
||||
* All rights are reserved.
|
||||
*
|
||||
* Users acknowledge that this software is supplied without any warranty or support.
|
||||
* The Broad Institute is not responsible for its use, misuse, or
|
||||
* functionality.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Hold pointers to the output and error streams, and state to indicate whether
|
||||
* a write is complete. Not generally thread-safe. Calls to isComplete()/complete()
|
||||
* can be made at any time from any thread, but complete() should be called on the
|
||||
* thread which is doing the writing.
|
||||
*/
|
||||
public class ShardOutput {
|
||||
/**
|
||||
* Create a unique identifier
|
||||
*/
|
||||
private static int id = 0;
|
||||
|
||||
/**
|
||||
* Is writing to these streams complete?
|
||||
*/
|
||||
private boolean complete = false;
|
||||
|
||||
/**
|
||||
* File objects that the output and error data went into.
|
||||
*/
|
||||
private File outFile;
|
||||
private File errFile;
|
||||
|
||||
/**
|
||||
* The printstreams which should be written to.
|
||||
*/
|
||||
private FileOutputStream out = null;
|
||||
private FileOutputStream err = null;
|
||||
|
||||
public ShardOutput() {
|
||||
try {
|
||||
outFile = File.createTempFile("gatkout_" + id, null);
|
||||
errFile = File.createTempFile("gatkerr_" + id, null);
|
||||
|
||||
out = new FileOutputStream( outFile );
|
||||
err = new FileOutputStream( errFile );
|
||||
}
|
||||
catch( IOException ex ) {
|
||||
throw new StingException("Unable to open temporary file for GATK output",ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this output complete?
|
||||
* @return True if output complete. False otherwise.
|
||||
*/
|
||||
public synchronized boolean isComplete() {
|
||||
return complete;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicate that no more data will be written to these output streams.
|
||||
*/
|
||||
public synchronized void complete() {
|
||||
try {
|
||||
out.flush();
|
||||
err.flush();
|
||||
out.close();
|
||||
err.close();
|
||||
}
|
||||
catch( IOException ex ) {
|
||||
throw new StingException( "Unable to close sharding output files", ex );
|
||||
}
|
||||
|
||||
out = null;
|
||||
err = null;
|
||||
|
||||
this.complete = true;
|
||||
}
|
||||
|
||||
public void mergeInto( OutputStream globalOut, OutputStream globalErr ) {
|
||||
synchronized(this) {
|
||||
if( !isComplete() )
|
||||
throw new StingException("Tried to merge incomplete stream into output");
|
||||
}
|
||||
|
||||
transferContentsToTarget( outFile, globalOut );
|
||||
transferContentsToTarget( errFile, globalErr );
|
||||
}
|
||||
|
||||
private void transferContentsToTarget( File source, OutputStream target ) {
|
||||
FileInputStream sourceStream = null;
|
||||
try {
|
||||
sourceStream = new FileInputStream( source );
|
||||
FileChannel sourceChannel = sourceStream.getChannel();
|
||||
|
||||
WritableByteChannel targetChannel = Channels.newChannel( target );
|
||||
sourceChannel.transferTo( 0, sourceChannel.size(), targetChannel );
|
||||
|
||||
sourceStream.close();
|
||||
source.delete();
|
||||
}
|
||||
catch( FileNotFoundException ex ) {
|
||||
throw new StingException("Unable to open input stream for file: " + source,ex);
|
||||
}
|
||||
catch( IOException ex ) {
|
||||
throw new StingException("Unable to transfer contents of file:" + source,ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the stream where output is sent.
|
||||
* @return output stream.
|
||||
*/
|
||||
public OutputStream getOutStream() {
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the stream where error info is sent.
|
||||
* @return error stream.
|
||||
*/
|
||||
public OutputStream getErrStream() {
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
|
@ -4,14 +4,17 @@ import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider;
|
|||
import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider;
|
||||
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource;
|
||||
import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException;
|
||||
import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference;
|
||||
import org.broadinstitute.sting.gatk.GenomeAnalysisTK;
|
||||
import org.broadinstitute.sting.gatk.OutputTracker;
|
||||
import org.broadinstitute.sting.gatk.walkers.LocusWalker;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import net.sf.samtools.util.CloseableIterator;
|
||||
/**
|
||||
* User: hanna
|
||||
* Date: Apr 29, 2009
|
||||
|
|
@ -33,36 +36,43 @@ public class ShardTraverser implements Callable {
|
|||
private Shard shard;
|
||||
private IndexedFastaSequenceFile reference;
|
||||
private SAMDataSource reads;
|
||||
private ShardOutput output;
|
||||
|
||||
public ShardTraverser( TraverseLociByReference traversalEngine,
|
||||
Walker walker,
|
||||
Shard shard,
|
||||
IndexedFastaSequenceFile reference,
|
||||
SAMDataSource reads ) {
|
||||
SAMDataSource reads,
|
||||
ShardOutput output ) {
|
||||
this.walker = walker;
|
||||
this.traversalEngine = traversalEngine;
|
||||
this.shard = shard;
|
||||
this.reference = reference;
|
||||
this.reads = reads;
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
public Object call() {
|
||||
Object accumulator = ((LocusWalker<?,?>)walker).reduceInit();
|
||||
|
||||
MergingSamRecordIterator2 readShard = null;
|
||||
try {
|
||||
readShard = (MergingSamRecordIterator2)reads.seek( shard );
|
||||
}
|
||||
catch( SimpleDataSourceLoadException ex ) {
|
||||
throw new RuntimeException( ex );
|
||||
}
|
||||
CloseableIterator<SAMRecord> readShard = null;
|
||||
readShard = reads.seek( shard );
|
||||
|
||||
ReferenceProvider referenceProvider = new ReferenceProvider( reference, shard.getGenomeLoc() );
|
||||
LocusContextProvider locusProvider = new LocusContextProvider( readShard );
|
||||
|
||||
accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator );
|
||||
OutputTracker outputTracker = GenomeAnalysisTK.Instance.getOutputTracker();
|
||||
|
||||
readShard.close();
|
||||
outputTracker.setLocalStreams( output.getOutStream(), output.getErrStream() );
|
||||
try {
|
||||
accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator );
|
||||
}
|
||||
finally {
|
||||
readShard.close();
|
||||
|
||||
output.complete();
|
||||
outputTracker.removeLocalStreams();
|
||||
}
|
||||
|
||||
return accumulator;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ public class TreeReducer implements Callable {
|
|||
if( lhs == null )
|
||||
return lhs.get();
|
||||
else
|
||||
return walker.reduce( lhs.get(), rhs.get() );
|
||||
return walker.treeReduce( lhs.get(), rhs.get() );
|
||||
}
|
||||
catch( InterruptedException ex ) {
|
||||
throw new RuntimeException("Hierarchical reduce interrupted", ex);
|
||||
|
|
|
|||
|
|
@ -20,11 +20,15 @@ public class CountLociWalker extends LocusWalker<Integer, Integer> implements Tr
|
|||
|
||||
public Integer reduceInit() { return 0; }
|
||||
|
||||
public Integer reduce(Integer value, Integer sum) {
|
||||
return value + sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces two subtrees together. In this case, the implementation of the tree reduce
|
||||
* is exactly the same as the implementation of the single reduce.
|
||||
*/
|
||||
public Integer reduce(Integer value, Integer sum) {
|
||||
return value + sum;
|
||||
public Integer treeReduce(Integer lhs, Integer rhs) {
|
||||
return lhs + rhs;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import net.sf.samtools.SAMRecord;
|
|||
* Time: 11:23:14 AM
|
||||
* To change this template use File | Settings | File Templates.
|
||||
*/
|
||||
public class PrintLocusContextWalker extends LocusWalker<LocusContext, Integer> {
|
||||
public class PrintLocusContextWalker extends LocusWalker<LocusContext, Integer> implements TreeReducible<Integer> {
|
||||
public LocusContext map(RefMetaDataTracker tracker, char ref, LocusContext context) {
|
||||
out.printf( "In map: ref = %c, loc = %s, reads = %s%n", ref,
|
||||
context.getLocation(),
|
||||
|
|
@ -31,6 +31,10 @@ public class PrintLocusContextWalker extends LocusWalker<LocusContext, Integer>
|
|||
return sum + 1;
|
||||
}
|
||||
|
||||
public Integer treeReduce(Integer lhs, Integer rhs) {
|
||||
return lhs + rhs;
|
||||
}
|
||||
|
||||
private String[] getReadNames( List<SAMRecord> reads ) {
|
||||
String[] readNames = new String[ reads.size() ];
|
||||
for( int i = 0; i < reads.size(); i++ ) {
|
||||
|
|
|
|||
|
|
@ -20,5 +20,5 @@ public interface TreeReducible<ReduceType> {
|
|||
* @param rhs 'right-most' portion of data in the composite reduce.
|
||||
* @return The composite reduce type.
|
||||
*/
|
||||
ReduceType reduce(ReduceType lhs, ReduceType rhs);
|
||||
ReduceType treeReduce(ReduceType lhs, ReduceType rhs);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,15 +33,16 @@ public abstract class Walker<MapType, ReduceType> {
|
|||
protected PrintStream err = null;
|
||||
|
||||
protected Walker() {
|
||||
if( GenomeAnalysisTK.Instance != null ) {
|
||||
GenomeAnalysisTK.Instance.loadArgumentsIntoObject(this);
|
||||
out = GenomeAnalysisTK.Instance.getWalkerOutputStream();
|
||||
err = GenomeAnalysisTK.Instance.getWalkerErrorStream();
|
||||
}
|
||||
else {
|
||||
out = System.out;
|
||||
err = System.err;
|
||||
}
|
||||
if( GenomeAnalysisTK.Instance != null ) {
|
||||
GenomeAnalysisTK gatk = GenomeAnalysisTK.Instance;
|
||||
gatk.loadArgumentsIntoObject(this);
|
||||
out = new PrintStream( gatk.getOutputTracker().getOutStream() );
|
||||
err = new PrintStream( gatk.getOutputTracker().getErrStream() );
|
||||
}
|
||||
else {
|
||||
out = System.out;
|
||||
err = System.err;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,78 @@
|
|||
package org.broadinstitute.sting.utils.io;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
/**
|
||||
* User: hanna
|
||||
* Date: Apr 30, 2009
|
||||
* Time: 5:53:32 PM
|
||||
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
|
||||
* Software and documentation are copyright 2005 by the Broad Institute.
|
||||
* All rights are reserved.
|
||||
*
|
||||
* Users acknowledge that this software is supplied without any warranty or support.
|
||||
* The Broad Institute is not responsible for its use, misuse, or
|
||||
* functionality.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A stream that allows redirection to a variety of sources transparently to the
|
||||
* user of the class.
|
||||
*/
|
||||
public class RedirectingOutputStream extends OutputStream {
|
||||
|
||||
/**
|
||||
* Informs us which output stream should be used.
|
||||
*/
|
||||
private OutputStreamProvider provider;
|
||||
|
||||
/**
|
||||
* Build a new output stream, given the function telling us where to
|
||||
* send output.
|
||||
* @param provider Function which returns an output stream.
|
||||
*/
|
||||
public RedirectingOutputStream( OutputStreamProvider provider ) {
|
||||
this.provider = provider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the OutputStream backing this redirector now.
|
||||
* Note that the backing output stream could change at any time.
|
||||
* Use sparingly (for testing).
|
||||
*/
|
||||
public OutputStream getBackingOutputStream() {
|
||||
return provider.getOutputStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
getBackingOutputStream().close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
getBackingOutputStream().flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException {
|
||||
getBackingOutputStream().write(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException {
|
||||
getBackingOutputStream().write(b,off,len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
getBackingOutputStream().write(b);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides whatever output stream this data should go to at the moment.
|
||||
*/
|
||||
public interface OutputStreamProvider {
|
||||
public OutputStream getOutputStream();
|
||||
}
|
||||
}
|
||||
|
|
@ -3,9 +3,11 @@ package org.broadinstitute.sting.gatk;
|
|||
import org.junit.Test;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.broadinstitute.sting.utils.io.RedirectingOutputStream;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Scanner; /**
|
||||
* User: hanna
|
||||
* Date: Apr 30, 2009
|
||||
|
|
@ -39,8 +41,15 @@ public class OutputTrackerTest {
|
|||
@Test
|
||||
public void testNullInputs() {
|
||||
OutputTracker ot = new OutputTracker(null,null);
|
||||
Assert.assertSame("OutputTracker: Output stream incorrectly initialized.", System.out, ot.getOutStream());
|
||||
Assert.assertSame("OutputTracker: Error stream incorrectly initialized.", System.err, ot.getErrStream());
|
||||
|
||||
Assert.assertTrue("OutputTracker: Output stream is of wrong type.", ot.getOutStream() instanceof RedirectingOutputStream );
|
||||
Assert.assertTrue("OutputTracker: Error stream is of wrong type.", ot.getErrStream() instanceof RedirectingOutputStream );
|
||||
|
||||
RedirectingOutputStream outStream = (RedirectingOutputStream)ot.getOutStream();
|
||||
RedirectingOutputStream errStream = (RedirectingOutputStream)ot.getErrStream();
|
||||
|
||||
Assert.assertSame("OutputTracker: Output stream incorrectly initialized.", System.out, outStream.getBackingOutputStream());
|
||||
Assert.assertSame("OutputTracker: Error stream incorrectly initialized.", System.err, errStream.getBackingOutputStream());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -48,14 +57,19 @@ public class OutputTrackerTest {
|
|||
OutputTracker ot = new OutputTracker(OUTPUT_FILENAME,null);
|
||||
|
||||
final String OUTPUT_TEXT = "out stream test";
|
||||
ot.getOutStream().append(OUTPUT_TEXT);
|
||||
PrintWriter outWriter = new PrintWriter(ot.getOutStream());
|
||||
outWriter.append(OUTPUT_TEXT);
|
||||
outWriter.close();
|
||||
|
||||
Scanner outScanner = new Scanner(new File(OUTPUT_FILENAME));
|
||||
String outText = outScanner.nextLine();
|
||||
Assert.assertFalse("Out stream has too much data", outScanner.hasNext());
|
||||
|
||||
Assert.assertEquals("OutputTracker: Written output is incorrect", outText, OUTPUT_TEXT);
|
||||
Assert.assertSame("OutputTracker: Error stream incorrectly initialized.", System.err, ot.getErrStream());
|
||||
|
||||
Assert.assertTrue("OutputTracker: Error stream is of wrong type.", ot.getErrStream() instanceof RedirectingOutputStream );
|
||||
RedirectingOutputStream errStream = (RedirectingOutputStream)ot.getErrStream();
|
||||
Assert.assertSame("OutputTracker: Error stream incorrectly initialized.", System.err, errStream.getBackingOutputStream());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -63,13 +77,17 @@ public class OutputTrackerTest {
|
|||
OutputTracker ot = new OutputTracker(null,ERROR_FILENAME);
|
||||
|
||||
final String ERROR_TEXT = "err stream test";
|
||||
ot.getErrStream().append(ERROR_TEXT);
|
||||
PrintWriter errWriter = new PrintWriter(ot.getErrStream());
|
||||
errWriter.append(ERROR_TEXT);
|
||||
errWriter.close();
|
||||
|
||||
Scanner errScanner = new Scanner(new File(ERROR_FILENAME));
|
||||
String errText = errScanner.nextLine();
|
||||
Assert.assertFalse("Err stream has too much data", errScanner.hasNext());
|
||||
|
||||
Assert.assertSame("OutputTracker: Output stream incorrectly initialized.", System.out, ot.getOutStream());
|
||||
Assert.assertTrue("OutputTracker: Output stream is of wrong type.", ot.getOutStream() instanceof RedirectingOutputStream );
|
||||
RedirectingOutputStream outStream = (RedirectingOutputStream)ot.getOutStream();
|
||||
Assert.assertSame("OutputTracker: Output stream incorrectly initialized.", System.out, outStream.getBackingOutputStream());
|
||||
Assert.assertEquals("OutputTracker: Written error text is incorrect", errText, ERROR_TEXT);
|
||||
}
|
||||
|
||||
|
|
@ -78,10 +96,14 @@ public class OutputTrackerTest {
|
|||
OutputTracker ot = new OutputTracker(OUTPUT_FILENAME,ERROR_FILENAME);
|
||||
|
||||
final String OUTPUT_TEXT = "out stream test";
|
||||
ot.getOutStream().append(OUTPUT_TEXT);
|
||||
PrintWriter outWriter = new PrintWriter(ot.getOutStream());
|
||||
outWriter.append(OUTPUT_TEXT);
|
||||
outWriter.close();
|
||||
|
||||
final String ERROR_TEXT = "err stream test";
|
||||
ot.getErrStream().append(ERROR_TEXT);
|
||||
PrintWriter errWriter = new PrintWriter(ot.getErrStream());
|
||||
errWriter.append(ERROR_TEXT);
|
||||
errWriter.close();
|
||||
|
||||
Scanner outScanner = new Scanner(new File(OUTPUT_FILENAME));
|
||||
String outText = outScanner.nextLine();
|
||||
|
|
@ -98,7 +120,13 @@ public class OutputTrackerTest {
|
|||
@Test
|
||||
public void testIdenticalInputsGetIdenticalResults() {
|
||||
OutputTracker ot = new OutputTracker(OUTPUT_FILENAME,OUTPUT_FILENAME);
|
||||
Assert.assertSame("OutputTracker: FileOutputStreams don't match", ot.getOutFile(), ot.getErrFile());
|
||||
Assert.assertSame("OutputTracker: PrintStreams don't match", ot.getOutStream(), ot.getErrStream());
|
||||
|
||||
Assert.assertTrue("OutputTracker: Output stream is of wrong type.", ot.getOutStream() instanceof RedirectingOutputStream );
|
||||
Assert.assertTrue("OutputTracker: Error stream is of wrong type.", ot.getErrStream() instanceof RedirectingOutputStream );
|
||||
|
||||
RedirectingOutputStream outStream = (RedirectingOutputStream)ot.getOutStream();
|
||||
RedirectingOutputStream errStream = (RedirectingOutputStream)ot.getErrStream();
|
||||
|
||||
Assert.assertSame("OutputTracker: PrintStreams don't match", outStream.getBackingOutputStream(), errStream.getBackingOutputStream());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue