diff --git a/java/src/org/broadinstitute/sting/gatk/OutputTracker.java b/java/src/org/broadinstitute/sting/gatk/OutputTracker.java index 55f263ea6..b48f91bfe 100755 --- a/java/src/org/broadinstitute/sting/gatk/OutputTracker.java +++ b/java/src/org/broadinstitute/sting/gatk/OutputTracker.java @@ -118,7 +118,7 @@ public class OutputTracker { /** * Remove pointers to alternate, local output streams. */ - public void removeLocalStreams() { + public void cleanup() { localOut.remove(); localErr.remove(); } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java b/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java index dde0e376c..a06a02266 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/OutputMerger.java @@ -1,10 +1,10 @@ package org.broadinstitute.sting.gatk.executive; import org.broadinstitute.sting.utils.StingException; +import org.broadinstitute.sting.utils.io.LazyFileOutputStream; import java.io.File; import java.io.IOException; -import java.io.FileOutputStream; import java.io.OutputStream; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -32,41 +32,41 @@ import java.nio.channels.WritableByteChannel; * thread which is doing the writing. */ public class OutputMerger { - /** - * Create a unique identifier - */ - private static int id = 0; - /** * Is writing to these streams complete? */ private boolean complete = false; - /** - * File objects that the output and error data went into. - */ - private File outFile; - private File errFile; - /** * The printstreams which should be written to. */ - private FileOutputStream out = null; - private FileOutputStream err = null; + private LazyFileOutputStream out = null; + private LazyFileOutputStream err = null; public OutputMerger() { - try { - outFile = File.createTempFile("gatkout_" + id, null); - errFile = File.createTempFile("gatkerr_" + id, null); - - out = new FileOutputStream( outFile ); - err = new FileOutputStream( errFile ); - } - catch( IOException ex ) { - throw new StingException("Unable to open temporary file for GATK output",ex); - } + out = new LazyFileOutputStream( outStreamFactory ); + err = new LazyFileOutputStream( errStreamFactory ); } + /** + * Creates a generic output stream for temporary data. + */ + private static class TempFileFactory implements LazyFileOutputStream.FileFactory { + private final String prefix; + public TempFileFactory( String prefix ) { this.prefix = prefix; } + public File create() throws IOException { return File.createTempFile(prefix,null); } + } + + /** + * Creates a stream for temporary out (not err) data. + */ + private static final TempFileFactory outStreamFactory = new TempFileFactory("gatkout_"); + + /** + * Creates a stream for temporary err data. + */ + private static final TempFileFactory errStreamFactory = new TempFileFactory("gatkerr_"); + /** * Waits for any the given OutputMerger to be ready for merging. */ @@ -95,18 +95,19 @@ public class OutputMerger { throw new IllegalStateException("Tried to complete an output merge twice."); try { - out.flush(); - err.flush(); - out.close(); - err.close(); + if( out.isCreated() ) { + out.flush(); + out.close(); + } + if( err.isCreated() ) { + err.flush(); + err.close(); + } } catch( IOException ex ) { throw new StingException( "Unable to close sharding output files", ex ); } - out = null; - err = null; - this.complete = true; // Notify waiting listeners that this shard is complete and ready for merging. @@ -119,8 +120,8 @@ public class OutputMerger { throw new StingException("Tried to merge incomplete stream into output"); } - transferContentsToTarget( outFile, globalOut ); - transferContentsToTarget( errFile, globalErr ); + if( out.isCreated() ) transferContentsToTarget( out.getBackingFile(), globalOut ); + if( err.isCreated() ) transferContentsToTarget( err.getBackingFile(), globalErr ); } /** diff --git a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 93601e267..fe2a8e939 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -53,7 +53,7 @@ public class ShardTraverser implements Callable { finally { dataProvider.close(); output.complete(); - outputTracker.removeLocalStreams(); + outputTracker.cleanup(); } return accumulator; diff --git a/java/src/org/broadinstitute/sting/utils/io/LazyFileOutputStream.java b/java/src/org/broadinstitute/sting/utils/io/LazyFileOutputStream.java new file mode 100755 index 000000000..c1ca09944 --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/io/LazyFileOutputStream.java @@ -0,0 +1,115 @@ +package org.broadinstitute.sting.utils.io; + +import org.broadinstitute.sting.utils.StingException; + +import java.io.OutputStream; +import java.io.IOException; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileNotFoundException; +/** + * User: hanna + * Date: May 26, 2009 + * Time: 3:51:49 PM + * BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT + * Software and documentation are copyright 2005 by the Broad Institute. + * All rights are reserved. + * + * Users acknowledge that this software is supplied without any warranty or support. + * The Broad Institute is not responsible for its use, misuse, or + * functionality. + */ + +/** + * An output stream that only initializes itself the first time its used. + * Needs a callback that can create an output stream. + */ +public class LazyFileOutputStream extends OutputStream { + /** + * Generates output files on demand. + */ + private final FileFactory factory; + + private File targetFile = null; + + /** + * The target for any writes performed by the output stream. + */ + private FileOutputStream targetOutputStream = null; + + /** + * Create a new LazyOutputStream, indicating how to create a new stream. + * @param factory Creator of the output stream, when necessary. + */ + public LazyFileOutputStream( FileFactory factory ) { + this.factory = factory; + } + + /** + * Indicates whether the LazyOutputStream had to get off its butt and create + * a new output stream. + * @return + */ + public boolean isCreated() { + return targetOutputStream != null; + } + + /** + * Public method to return the lazily created file. + * @return Stream created by the lazy loader. + * @throw StingException if no stream was created. + */ + public File getBackingFile() { + if( targetFile == null ) + throw new StingException("No lazy-loaded stream was created."); + return targetFile; + } + + @Override + public void close() throws IOException { + getBackingOutputStream().close(); + } + + @Override + public void flush() throws IOException { + getBackingOutputStream().flush(); + } + + @Override + public void write(byte[] b) throws IOException { + getBackingOutputStream().write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + getBackingOutputStream().write(b,off,len); + } + + @Override + public void write(int b) throws IOException { + getBackingOutputStream().write(b); + } + + /** + * Lazy loader for the output stream. + */ + protected OutputStream getBackingOutputStream() { + if( targetOutputStream == null ) { + try { + targetFile = factory.create(); + targetOutputStream = new FileOutputStream( targetFile ); + } + catch( IOException ex ) { + throw new StingException("Unable to open new temp file", ex ); + } + } + return targetOutputStream; + } + + /** + * Teaches the LazyOutputStream how to create a new outputstream when necessary. + */ + public interface FileFactory { + public File create() throws IOException; + } +}