diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java index 5e5b425d6..589f870d2 100644 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java @@ -346,19 +346,11 @@ public class GenomeAnalysisTK extends CommandLineProgram { } /** - * Get the output stream to be used with this walker. - * @return Output stream used with this walker. + * Gets the output tracker. Tracks data available to a given walker. + * @return The output tracker. */ - public PrintStream getWalkerOutputStream() { - return outputTracker.getOutStream(); - } - - /** - * Get the output stream to be used with this walker. - * @return Output stream used with this walker. - */ - public PrintStream getWalkerErrorStream() { - return outputTracker.getErrStream(); + public OutputTracker getOutputTracker() { + return outputTracker; } diff --git a/java/src/org/broadinstitute/sting/gatk/OutputTracker.java b/java/src/org/broadinstitute/sting/gatk/OutputTracker.java index 412bab677..55f263ea6 100755 --- a/java/src/org/broadinstitute/sting/gatk/OutputTracker.java +++ b/java/src/org/broadinstitute/sting/gatk/OutputTracker.java @@ -1,10 +1,13 @@ package org.broadinstitute.sting.gatk; import org.broadinstitute.sting.utils.StingException; +import org.broadinstitute.sting.utils.io.RedirectingOutputStream; import java.io.FileOutputStream; import java.io.PrintStream; import java.io.FileNotFoundException; +import java.io.OutputStream; +import java.io.PrintWriter; /** * User: hanna * Date: Apr 30, 2009 @@ -27,15 +30,14 @@ public class OutputTracker { /** * The streams to which walker users should be writing directly. */ - private PrintStream out; - private PrintStream err; + protected OutputStream globalOut; + protected OutputStream globalErr; /** - * Cache the file streams so that reassemblers can use nio to do fast file transfers. + * Thread-local versions of the output stream. */ - private FileOutputStream outFileStream; - private FileOutputStream errFileStream; - + protected ThreadLocal localOut = new ThreadLocal(); + protected ThreadLocal localErr = new ThreadLocal(); /** * Create an object to manage output given filenames for the output and error files. @@ -48,23 +50,11 @@ public class OutputTracker { // Otherwise, initialize them separately. if( outFileName != null && outFileName.equals(errFileName) ) { FileOutputStream outputFile = prepareOutputFile( outFileName ); - outFileStream = errFileStream = outputFile; - out = err = new PrintStream( outputFile ); + globalOut = globalErr = outputFile; } else { - if( outFileName != null ) { - outFileStream = prepareOutputFile( outFileName ); - out = new PrintStream( outFileStream ); - } - else - out = System.out; - - if( errFileName != null ) { - errFileStream = prepareOutputFile( errFileName ); - err = new PrintStream( errFileStream ); - } - else - err = System.err; + globalOut = (outFileName != null) ? prepareOutputFile( outFileName ) : System.out; + globalErr = (errFileName != null) ? prepareOutputFile( errFileName ) : System.err; } } @@ -72,32 +62,65 @@ public class OutputTracker { * Gets the output stream for the walker. * @return Output stream; should be either file-backed or System.out. */ - public PrintStream getOutStream() { - return out; + public OutputStream getGlobalOutStream() { + return globalOut; } /** * Gets the error stream for the walker. * @return Error stream; should be either file-backed or System.err. */ - public PrintStream getErrStream() { - return err; + public OutputStream getGlobalErrStream() { + return globalErr; } /** - * Gets the filestream associated with normal output. - * @return FileStream associated with the output; null if not backed by a file. + * Retrieve an output stream that will always return the most appropriate + * writer for this thread. */ - public FileOutputStream getOutFile() { - return outFileStream; + public OutputStream getOutStream() { + // Create an anonymous inner class which will just return the most + // appropriate OutputStream from those streams stored in this class. + return new RedirectingOutputStream( + new RedirectingOutputStream.OutputStreamProvider() { + public OutputStream getOutputStream() { + return localOut.get() != null ? localOut.get() : globalOut; + } + } ); } /** - * Gets the filestream associated with error output. - * @return stream associated with error output; null if not backed by a file. + * Retrieve the most appropriate output stream for this thread. + * Will retrieve thread-local if available; otherwise, it'll read the + * global stream. */ - public FileOutputStream getErrFile() { - return errFileStream; + public OutputStream getErrStream() { + // Create an anonymous inner class which will just return the most + // appropriate OutputStream from those streams stored in this class. + return new RedirectingOutputStream( + new RedirectingOutputStream.OutputStreamProvider() { + public OutputStream getOutputStream() { + return localErr.get() != null ? localErr.get() : globalErr; + } + } ); + } + + /** + * Set the (thread-)local version of the given output and error files. + * @param out output stream to which to write. + * @param err error stream to which to write. + */ + public void setLocalStreams( OutputStream out, OutputStream err ) { + localOut.set( out ); + localErr.set( err ); + } + + /** + * Remove pointers to alternate, local output streams. + */ + public void removeLocalStreams() { + localOut.remove(); + localErr.remove(); } /** diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 5f282bf63..d8da5fdf5 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -7,10 +7,13 @@ import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; +import org.broadinstitute.sting.gatk.GenomeAnalysisTK; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.StingException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.io.File; +import java.io.OutputStream; import java.util.List; import java.util.Queue; import java.util.LinkedList; @@ -41,7 +44,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce private Queue traverseTasks = new LinkedList(); private Queue reduceTasks = new LinkedList(); - + private Queue outputMergeTasks = new LinkedList(); /** * Create a new hierarchical microscheduler to process the given reads and reference. @@ -76,12 +79,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce for(Shard shard: shardStrategy) traverseTasks.add(shard); - while( isShardTraversePending() || isTreeReducePending() ) { + while( isShardTraversePending() || isTreeReducePending() || isOutputMergePending() ) { waitForFreeQueueSlot(); if( isTreeReduceReady() ) queueNextTreeReduce( walker ); - else { + else if( isShardTraversePending() ) { Future traverseResult = queueNextShardTraverse( walker, dataSource ); // Add this traverse result to the reduce tree. The reduce tree will call a callback to throw its entries on the queue. @@ -91,9 +94,20 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce if( !isShardTraversePending() ) reduceTree.complete(); } + else if( isOutputMergeReady() ) { + queueNextOutputMerge(); + } } - Object result = reduceTree.getResult(); + threadPool.shutdown(); + + Object result = null; + try { + result = reduceTree.getResult().get(); + } + catch(Exception ex) { + throw new StingException("Unable to retrieve result", ex ); + } traversalEngine.printOnTraversalDone("loci", result); walker.onTraversalDone(result); @@ -128,6 +142,24 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce return reduceTasks.size() > 0; } + /** + * Returns whether there is output waiting to be merged into the global output + * streams right now. + * @return True if this output is ready to be merged. False otherwise. + */ + protected boolean isOutputMergeReady() { + return !OutputMerger.isMergeQueued() && outputMergeTasks.peek().isComplete(); + } + + /** + * Returns whether there is output that will eventually need to be merged into + * the output streams. + * @return True if output will eventually need to be merged. False otherwise. + */ + protected boolean isOutputMergePending() { + return outputMergeTasks.size() > 0; + } + /** * Queues the next traversal of a walker from the traversal tasks queue. * @param walker Walker to apply to the dataset. @@ -136,11 +168,20 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce protected Future queueNextShardTraverse( Walker walker, SAMDataSource dataSource ) { if( traverseTasks.size() == 0 ) throw new IllegalStateException( "Cannot traverse; no pending traversals exist."); + + ShardOutput shardOutput = new ShardOutput(); + ShardTraverser traverser = new ShardTraverser( traversalEngine, walker, traverseTasks.remove(), reference, - dataSource ); + dataSource, + shardOutput ); + + outputMergeTasks.add(new OutputMerger(shardOutput, + GenomeAnalysisTK.Instance.getOutputTracker().getGlobalOutStream(), + GenomeAnalysisTK.Instance.getOutputTracker().getGlobalErrStream())); + return threadPool.submit(traverser); } @@ -156,6 +197,19 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce threadPool.submit( reducer ); } + /** + * Pulls the next output merge and puts it on the queue. + */ + protected void queueNextOutputMerge() { + if( outputMergeTasks.size() == 0 ) + throw new IllegalStateException( "Cannot merge output; no pending merges exist."); + if( OutputMerger.isMergeQueued() ) + throw new IllegalStateException( "Cannot merge output; another merge has already been queued."); + + OutputMerger.queueMerge(); + threadPool.submit( outputMergeTasks.remove() ); + } + /** * Blocks until a free slot appears in the thread queue. */ @@ -198,4 +252,5 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Reduce } } + } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java b/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java new file mode 100755 index 000000000..4426bd1fd --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java @@ -0,0 +1,66 @@ +package org.broadinstitute.sting.gatk.executive; + +import java.io.OutputStream; +/** + * User: hanna + * Date: May 1, 2009 + * Time: 11:47:44 AM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * A queueable task to merge two output files. Provides a static 'lock' to + * track whether a merge is currently happening. + */ +public class OutputMerger implements Runnable { + public final ShardOutput shardOutput; + + private final OutputStream out; + private final OutputStream err; + + /** + * Indicates whether a merge has been queued. Keeps merges from stepping on each other. + */ + private static boolean mergeQueued; + + public OutputMerger( ShardOutput shardOutput, OutputStream out, OutputStream err ) { + this.shardOutput = shardOutput; + this.out = out; + this.err = err; + } + + public boolean isComplete() { + return shardOutput.isComplete(); + } + + /** + * Is a merge currently queued? + * @return True if a merge is waiting on the queue. False otherwise. + */ + public static boolean isMergeQueued() { + return mergeQueued; + } + + public static void queueMerge() { + if( mergeQueued ) + throw new IllegalStateException( "Attempted to mark that a merge has been queued when a merge is already queued." ); + mergeQueued = true; + } + + public void run() { + if( !mergeQueued ) + throw new IllegalStateException( "Starting a file merge, but our state shows no merge has been queued." ); + try { + shardOutput.mergeInto( out, err ); + } + finally { + mergeQueued = false; + } + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardOutput.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardOutput.java new file mode 100755 index 000000000..63ed4e913 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardOutput.java @@ -0,0 +1,143 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.broadinstitute.sting.utils.StingException; + +import java.io.File; +import java.io.IOException; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.nio.channels.FileChannel; +import java.nio.channels.Channels; +import java.nio.channels.Channel; +import java.nio.channels.WritableByteChannel; +/** + * User: hanna + * Date: Apr 30, 2009 + * Time: 4:04:38 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * Hold pointers to the output and error streams, and state to indicate whether + * a write is complete. Not generally thread-safe. Calls to isComplete()/complete() + * can be made at any time from any thread, but complete() should be called on the + * thread which is doing the writing. + */ +public class ShardOutput { + /** + * Create a unique identifier + */ + private static int id = 0; + + /** + * Is writing to these streams complete? + */ + private boolean complete = false; + + /** + * File objects that the output and error data went into. + */ + private File outFile; + private File errFile; + + /** + * The printstreams which should be written to. + */ + private FileOutputStream out = null; + private FileOutputStream err = null; + + public ShardOutput() { + try { + outFile = File.createTempFile("gatkout_" + id, null); + errFile = File.createTempFile("gatkerr_" + id, null); + + out = new FileOutputStream( outFile ); + err = new FileOutputStream( errFile ); + } + catch( IOException ex ) { + throw new StingException("Unable to open temporary file for GATK output",ex); + } + } + + /** + * Is this output complete? + * @return True if output complete. False otherwise. + */ + public synchronized boolean isComplete() { + return complete; + } + + /** + * Indicate that no more data will be written to these output streams. + */ + public synchronized void complete() { + try { + out.flush(); + err.flush(); + out.close(); + err.close(); + } + catch( IOException ex ) { + throw new StingException( "Unable to close sharding output files", ex ); + } + + out = null; + err = null; + + this.complete = true; + } + + public void mergeInto( OutputStream globalOut, OutputStream globalErr ) { + synchronized(this) { + if( !isComplete() ) + throw new StingException("Tried to merge incomplete stream into output"); + } + + transferContentsToTarget( outFile, globalOut ); + transferContentsToTarget( errFile, globalErr ); + } + + private void transferContentsToTarget( File source, OutputStream target ) { + FileInputStream sourceStream = null; + try { + sourceStream = new FileInputStream( source ); + FileChannel sourceChannel = sourceStream.getChannel(); + + WritableByteChannel targetChannel = Channels.newChannel( target ); + sourceChannel.transferTo( 0, sourceChannel.size(), targetChannel ); + + sourceStream.close(); + source.delete(); + } + catch( FileNotFoundException ex ) { + throw new StingException("Unable to open input stream for file: " + source,ex); + } + catch( IOException ex ) { + throw new StingException("Unable to transfer contents of file:" + source,ex); + } + } + + /** + * Return the stream where output is sent. + * @return output stream. + */ + public OutputStream getOutStream() { + return out; + } + + /** + * Gets the stream where error info is sent. + * @return error stream. + */ + public OutputStream getErrStream() { + return err; + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 985fdf24f..7637f3b62 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -4,14 +4,17 @@ import org.broadinstitute.sting.gatk.dataSources.providers.LocusContextProvider; import org.broadinstitute.sting.gatk.dataSources.providers.ReferenceProvider; import org.broadinstitute.sting.gatk.dataSources.shards.Shard; import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SAMDataSource; -import org.broadinstitute.sting.gatk.dataSources.simpleDataSources.SimpleDataSourceLoadException; -import org.broadinstitute.sting.gatk.iterators.MergingSamRecordIterator2; import org.broadinstitute.sting.gatk.traversals.TraverseLociByReference; +import org.broadinstitute.sting.gatk.GenomeAnalysisTK; +import org.broadinstitute.sting.gatk.OutputTracker; import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile; import java.util.concurrent.Callable; + +import net.sf.samtools.SAMRecord; +import net.sf.samtools.util.CloseableIterator; /** * User: hanna * Date: Apr 29, 2009 @@ -33,36 +36,43 @@ public class ShardTraverser implements Callable { private Shard shard; private IndexedFastaSequenceFile reference; private SAMDataSource reads; + private ShardOutput output; public ShardTraverser( TraverseLociByReference traversalEngine, Walker walker, Shard shard, IndexedFastaSequenceFile reference, - SAMDataSource reads ) { + SAMDataSource reads, + ShardOutput output ) { this.walker = walker; this.traversalEngine = traversalEngine; this.shard = shard; this.reference = reference; this.reads = reads; + this.output = output; } public Object call() { Object accumulator = ((LocusWalker)walker).reduceInit(); - MergingSamRecordIterator2 readShard = null; - try { - readShard = (MergingSamRecordIterator2)reads.seek( shard ); - } - catch( SimpleDataSourceLoadException ex ) { - throw new RuntimeException( ex ); - } + CloseableIterator readShard = null; + readShard = reads.seek( shard ); ReferenceProvider referenceProvider = new ReferenceProvider( reference, shard.getGenomeLoc() ); LocusContextProvider locusProvider = new LocusContextProvider( readShard ); - accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator ); + OutputTracker outputTracker = GenomeAnalysisTK.Instance.getOutputTracker(); - readShard.close(); + outputTracker.setLocalStreams( output.getOutStream(), output.getErrStream() ); + try { + accumulator = traversalEngine.traverse( walker, shard, referenceProvider, locusProvider, accumulator ); + } + finally { + readShard.close(); + + output.complete(); + outputTracker.removeLocalStreams(); + } return accumulator; } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java b/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java index 693157847..434c196db 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/TreeReducer.java @@ -77,7 +77,7 @@ public class TreeReducer implements Callable { if( lhs == null ) return lhs.get(); else - return walker.reduce( lhs.get(), rhs.get() ); + return walker.treeReduce( lhs.get(), rhs.get() ); } catch( InterruptedException ex ) { throw new RuntimeException("Hierarchical reduce interrupted", ex); diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/CountLociWalker.java b/java/src/org/broadinstitute/sting/gatk/walkers/CountLociWalker.java index a37c3a347..f2ba77321 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/CountLociWalker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/CountLociWalker.java @@ -20,11 +20,15 @@ public class CountLociWalker extends LocusWalker implements Tr public Integer reduceInit() { return 0; } + public Integer reduce(Integer value, Integer sum) { + return value + sum; + } + /** * Reduces two subtrees together. In this case, the implementation of the tree reduce * is exactly the same as the implementation of the single reduce. */ - public Integer reduce(Integer value, Integer sum) { - return value + sum; + public Integer treeReduce(Integer lhs, Integer rhs) { + return lhs + rhs; } } diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/PrintLocusContextWalker.java b/java/src/org/broadinstitute/sting/gatk/walkers/PrintLocusContextWalker.java index 7c2033f7b..12a48f659 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/PrintLocusContextWalker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/PrintLocusContextWalker.java @@ -17,7 +17,7 @@ import net.sf.samtools.SAMRecord; * Time: 11:23:14 AM * To change this template use File | Settings | File Templates. */ -public class PrintLocusContextWalker extends LocusWalker { +public class PrintLocusContextWalker extends LocusWalker implements TreeReducible { public LocusContext map(RefMetaDataTracker tracker, char ref, LocusContext context) { out.printf( "In map: ref = %c, loc = %s, reads = %s%n", ref, context.getLocation(), @@ -31,6 +31,10 @@ public class PrintLocusContextWalker extends LocusWalker return sum + 1; } + public Integer treeReduce(Integer lhs, Integer rhs) { + return lhs + rhs; + } + private String[] getReadNames( List reads ) { String[] readNames = new String[ reads.size() ]; for( int i = 0; i < reads.size(); i++ ) { diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java b/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java index 0b76f16a0..c950e07e4 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java @@ -20,5 +20,5 @@ public interface TreeReducible { * @param rhs 'right-most' portion of data in the composite reduce. * @return The composite reduce type. */ - ReduceType reduce(ReduceType lhs, ReduceType rhs); + ReduceType treeReduce(ReduceType lhs, ReduceType rhs); } diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java b/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java index 6f47d3b32..2e7485a65 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java @@ -33,15 +33,16 @@ public abstract class Walker { protected PrintStream err = null; protected Walker() { - if( GenomeAnalysisTK.Instance != null ) { - GenomeAnalysisTK.Instance.loadArgumentsIntoObject(this); - out = GenomeAnalysisTK.Instance.getWalkerOutputStream(); - err = GenomeAnalysisTK.Instance.getWalkerErrorStream(); - } - else { - out = System.out; - err = System.err; - } + if( GenomeAnalysisTK.Instance != null ) { + GenomeAnalysisTK gatk = GenomeAnalysisTK.Instance; + gatk.loadArgumentsIntoObject(this); + out = new PrintStream( gatk.getOutputTracker().getOutStream() ); + err = new PrintStream( gatk.getOutputTracker().getErrStream() ); + } + else { + out = System.out; + err = System.err; + } } /** diff --git a/java/src/org/broadinstitute/sting/utils/io/RedirectingOutputStream.java b/java/src/org/broadinstitute/sting/utils/io/RedirectingOutputStream.java new file mode 100755 index 000000000..9bc09cd01 --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/io/RedirectingOutputStream.java @@ -0,0 +1,78 @@ +package org.broadinstitute.sting.utils.io; + +import java.io.OutputStream; +import java.io.IOException; +/** + * User: hanna + * Date: Apr 30, 2009 + * Time: 5:53:32 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * A stream that allows redirection to a variety of sources transparently to the + * user of the class. + */ +public class RedirectingOutputStream extends OutputStream { + + /** + * Informs us which output stream should be used. + */ + private OutputStreamProvider provider; + + /** + * Build a new output stream, given the function telling us where to + * send output. + * @param provider Function which returns an output stream. + */ + public RedirectingOutputStream( OutputStreamProvider provider ) { + this.provider = provider; + } + + /** + * Gets the OutputStream backing this redirector now. + * Note that the backing output stream could change at any time. + * Use sparingly (for testing). + */ + public OutputStream getBackingOutputStream() { + return provider.getOutputStream(); + } + + @Override + public void close() throws IOException { + getBackingOutputStream().close(); + } + + @Override + public void flush() throws IOException { + getBackingOutputStream().flush(); + } + + @Override + public void write(byte[] b) throws IOException { + getBackingOutputStream().write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + getBackingOutputStream().write(b,off,len); + } + + @Override + public void write(int b) throws IOException { + getBackingOutputStream().write(b); + } + + /** + * Provides whatever output stream this data should go to at the moment. + */ + public interface OutputStreamProvider { + public OutputStream getOutputStream(); + } +} diff --git a/java/test/org/broadinstitute/sting/gatk/OutputTrackerTest.java b/java/test/org/broadinstitute/sting/gatk/OutputTrackerTest.java index 3b9bcae7b..4a616d793 100755 --- a/java/test/org/broadinstitute/sting/gatk/OutputTrackerTest.java +++ b/java/test/org/broadinstitute/sting/gatk/OutputTrackerTest.java @@ -3,9 +3,11 @@ package org.broadinstitute.sting.gatk; import org.junit.Test; import org.junit.After; import org.junit.Assert; +import org.broadinstitute.sting.utils.io.RedirectingOutputStream; import java.io.File; import java.io.FileNotFoundException; +import java.io.PrintWriter; import java.util.Scanner; /** * User: hanna * Date: Apr 30, 2009 @@ -39,8 +41,15 @@ public class OutputTrackerTest { @Test public void testNullInputs() { OutputTracker ot = new OutputTracker(null,null); - Assert.assertSame("OutputTracker: Output stream incorrectly initialized.", System.out, ot.getOutStream()); - Assert.assertSame("OutputTracker: Error stream incorrectly initialized.", System.err, ot.getErrStream()); + + Assert.assertTrue("OutputTracker: Output stream is of wrong type.", ot.getOutStream() instanceof RedirectingOutputStream ); + Assert.assertTrue("OutputTracker: Error stream is of wrong type.", ot.getErrStream() instanceof RedirectingOutputStream ); + + RedirectingOutputStream outStream = (RedirectingOutputStream)ot.getOutStream(); + RedirectingOutputStream errStream = (RedirectingOutputStream)ot.getErrStream(); + + Assert.assertSame("OutputTracker: Output stream incorrectly initialized.", System.out, outStream.getBackingOutputStream()); + Assert.assertSame("OutputTracker: Error stream incorrectly initialized.", System.err, errStream.getBackingOutputStream()); } @Test @@ -48,14 +57,19 @@ public class OutputTrackerTest { OutputTracker ot = new OutputTracker(OUTPUT_FILENAME,null); final String OUTPUT_TEXT = "out stream test"; - ot.getOutStream().append(OUTPUT_TEXT); + PrintWriter outWriter = new PrintWriter(ot.getOutStream()); + outWriter.append(OUTPUT_TEXT); + outWriter.close(); Scanner outScanner = new Scanner(new File(OUTPUT_FILENAME)); String outText = outScanner.nextLine(); Assert.assertFalse("Out stream has too much data", outScanner.hasNext()); Assert.assertEquals("OutputTracker: Written output is incorrect", outText, OUTPUT_TEXT); - Assert.assertSame("OutputTracker: Error stream incorrectly initialized.", System.err, ot.getErrStream()); + + Assert.assertTrue("OutputTracker: Error stream is of wrong type.", ot.getErrStream() instanceof RedirectingOutputStream ); + RedirectingOutputStream errStream = (RedirectingOutputStream)ot.getErrStream(); + Assert.assertSame("OutputTracker: Error stream incorrectly initialized.", System.err, errStream.getBackingOutputStream()); } @Test @@ -63,13 +77,17 @@ public class OutputTrackerTest { OutputTracker ot = new OutputTracker(null,ERROR_FILENAME); final String ERROR_TEXT = "err stream test"; - ot.getErrStream().append(ERROR_TEXT); + PrintWriter errWriter = new PrintWriter(ot.getErrStream()); + errWriter.append(ERROR_TEXT); + errWriter.close(); Scanner errScanner = new Scanner(new File(ERROR_FILENAME)); String errText = errScanner.nextLine(); Assert.assertFalse("Err stream has too much data", errScanner.hasNext()); - Assert.assertSame("OutputTracker: Output stream incorrectly initialized.", System.out, ot.getOutStream()); + Assert.assertTrue("OutputTracker: Output stream is of wrong type.", ot.getOutStream() instanceof RedirectingOutputStream ); + RedirectingOutputStream outStream = (RedirectingOutputStream)ot.getOutStream(); + Assert.assertSame("OutputTracker: Output stream incorrectly initialized.", System.out, outStream.getBackingOutputStream()); Assert.assertEquals("OutputTracker: Written error text is incorrect", errText, ERROR_TEXT); } @@ -78,10 +96,14 @@ public class OutputTrackerTest { OutputTracker ot = new OutputTracker(OUTPUT_FILENAME,ERROR_FILENAME); final String OUTPUT_TEXT = "out stream test"; - ot.getOutStream().append(OUTPUT_TEXT); + PrintWriter outWriter = new PrintWriter(ot.getOutStream()); + outWriter.append(OUTPUT_TEXT); + outWriter.close(); final String ERROR_TEXT = "err stream test"; - ot.getErrStream().append(ERROR_TEXT); + PrintWriter errWriter = new PrintWriter(ot.getErrStream()); + errWriter.append(ERROR_TEXT); + errWriter.close(); Scanner outScanner = new Scanner(new File(OUTPUT_FILENAME)); String outText = outScanner.nextLine(); @@ -98,7 +120,13 @@ public class OutputTrackerTest { @Test public void testIdenticalInputsGetIdenticalResults() { OutputTracker ot = new OutputTracker(OUTPUT_FILENAME,OUTPUT_FILENAME); - Assert.assertSame("OutputTracker: FileOutputStreams don't match", ot.getOutFile(), ot.getErrFile()); - Assert.assertSame("OutputTracker: PrintStreams don't match", ot.getOutStream(), ot.getErrStream()); + + Assert.assertTrue("OutputTracker: Output stream is of wrong type.", ot.getOutStream() instanceof RedirectingOutputStream ); + Assert.assertTrue("OutputTracker: Error stream is of wrong type.", ot.getErrStream() instanceof RedirectingOutputStream ); + + RedirectingOutputStream outStream = (RedirectingOutputStream)ot.getOutStream(); + RedirectingOutputStream errStream = (RedirectingOutputStream)ot.getErrStream(); + + Assert.assertSame("OutputTracker: PrintStreams don't match", outStream.getBackingOutputStream(), errStream.getBackingOutputStream()); } }