Lazy creation of output streams. Only create output streams when absolutely necessary.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@824 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
ab7bb5800a
commit
a488d2dbb2
|
|
@ -118,7 +118,7 @@ public class OutputTracker {
|
||||||
/**
|
/**
|
||||||
* Remove pointers to alternate, local output streams.
|
* Remove pointers to alternate, local output streams.
|
||||||
*/
|
*/
|
||||||
public void removeLocalStreams() {
|
public void cleanup() {
|
||||||
localOut.remove();
|
localOut.remove();
|
||||||
localErr.remove();
|
localErr.remove();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
package org.broadinstitute.sting.gatk.executive;
|
package org.broadinstitute.sting.gatk.executive;
|
||||||
|
|
||||||
import org.broadinstitute.sting.utils.StingException;
|
import org.broadinstitute.sting.utils.StingException;
|
||||||
|
import org.broadinstitute.sting.utils.io.LazyFileOutputStream;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
|
|
@ -32,41 +32,41 @@ import java.nio.channels.WritableByteChannel;
|
||||||
* thread which is doing the writing.
|
* thread which is doing the writing.
|
||||||
*/
|
*/
|
||||||
public class OutputMerger {
|
public class OutputMerger {
|
||||||
/**
|
|
||||||
* Create a unique identifier
|
|
||||||
*/
|
|
||||||
private static int id = 0;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is writing to these streams complete?
|
* Is writing to these streams complete?
|
||||||
*/
|
*/
|
||||||
private boolean complete = false;
|
private boolean complete = false;
|
||||||
|
|
||||||
/**
|
|
||||||
* File objects that the output and error data went into.
|
|
||||||
*/
|
|
||||||
private File outFile;
|
|
||||||
private File errFile;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The printstreams which should be written to.
|
* The printstreams which should be written to.
|
||||||
*/
|
*/
|
||||||
private FileOutputStream out = null;
|
private LazyFileOutputStream out = null;
|
||||||
private FileOutputStream err = null;
|
private LazyFileOutputStream err = null;
|
||||||
|
|
||||||
public OutputMerger() {
|
public OutputMerger() {
|
||||||
try {
|
out = new LazyFileOutputStream( outStreamFactory );
|
||||||
outFile = File.createTempFile("gatkout_" + id, null);
|
err = new LazyFileOutputStream( errStreamFactory );
|
||||||
errFile = File.createTempFile("gatkerr_" + id, null);
|
}
|
||||||
|
|
||||||
out = new FileOutputStream( outFile );
|
/**
|
||||||
err = new FileOutputStream( errFile );
|
* Creates a generic output stream for temporary data.
|
||||||
}
|
*/
|
||||||
catch( IOException ex ) {
|
private static class TempFileFactory implements LazyFileOutputStream.FileFactory {
|
||||||
throw new StingException("Unable to open temporary file for GATK output",ex);
|
private final String prefix;
|
||||||
}
|
public TempFileFactory( String prefix ) { this.prefix = prefix; }
|
||||||
|
public File create() throws IOException { return File.createTempFile(prefix,null); }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a stream for temporary out (not err) data.
|
||||||
|
*/
|
||||||
|
private static final TempFileFactory outStreamFactory = new TempFileFactory("gatkout_");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a stream for temporary err data.
|
||||||
|
*/
|
||||||
|
private static final TempFileFactory errStreamFactory = new TempFileFactory("gatkerr_");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for any the given OutputMerger to be ready for merging.
|
* Waits for any the given OutputMerger to be ready for merging.
|
||||||
*/
|
*/
|
||||||
|
|
@ -95,18 +95,19 @@ public class OutputMerger {
|
||||||
throw new IllegalStateException("Tried to complete an output merge twice.");
|
throw new IllegalStateException("Tried to complete an output merge twice.");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if( out.isCreated() ) {
|
||||||
out.flush();
|
out.flush();
|
||||||
err.flush();
|
|
||||||
out.close();
|
out.close();
|
||||||
|
}
|
||||||
|
if( err.isCreated() ) {
|
||||||
|
err.flush();
|
||||||
err.close();
|
err.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
catch( IOException ex ) {
|
catch( IOException ex ) {
|
||||||
throw new StingException( "Unable to close sharding output files", ex );
|
throw new StingException( "Unable to close sharding output files", ex );
|
||||||
}
|
}
|
||||||
|
|
||||||
out = null;
|
|
||||||
err = null;
|
|
||||||
|
|
||||||
this.complete = true;
|
this.complete = true;
|
||||||
|
|
||||||
// Notify waiting listeners that this shard is complete and ready for merging.
|
// Notify waiting listeners that this shard is complete and ready for merging.
|
||||||
|
|
@ -119,8 +120,8 @@ public class OutputMerger {
|
||||||
throw new StingException("Tried to merge incomplete stream into output");
|
throw new StingException("Tried to merge incomplete stream into output");
|
||||||
}
|
}
|
||||||
|
|
||||||
transferContentsToTarget( outFile, globalOut );
|
if( out.isCreated() ) transferContentsToTarget( out.getBackingFile(), globalOut );
|
||||||
transferContentsToTarget( errFile, globalErr );
|
if( err.isCreated() ) transferContentsToTarget( err.getBackingFile(), globalErr );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ public class ShardTraverser implements Callable {
|
||||||
finally {
|
finally {
|
||||||
dataProvider.close();
|
dataProvider.close();
|
||||||
output.complete();
|
output.complete();
|
||||||
outputTracker.removeLocalStreams();
|
outputTracker.cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
return accumulator;
|
return accumulator;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,115 @@
|
||||||
|
package org.broadinstitute.sting.utils.io;
|
||||||
|
|
||||||
|
import org.broadinstitute.sting.utils.StingException;
|
||||||
|
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
/**
|
||||||
|
* User: hanna
|
||||||
|
* Date: May 26, 2009
|
||||||
|
* Time: 3:51:49 PM
|
||||||
|
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
|
||||||
|
* Software and documentation are copyright 2005 by the Broad Institute.
|
||||||
|
* All rights are reserved.
|
||||||
|
*
|
||||||
|
* Users acknowledge that this software is supplied without any warranty or support.
|
||||||
|
* The Broad Institute is not responsible for its use, misuse, or
|
||||||
|
* functionality.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An output stream that only initializes itself the first time its used.
|
||||||
|
* Needs a callback that can create an output stream.
|
||||||
|
*/
|
||||||
|
public class LazyFileOutputStream extends OutputStream {
|
||||||
|
/**
|
||||||
|
* Generates output files on demand.
|
||||||
|
*/
|
||||||
|
private final FileFactory factory;
|
||||||
|
|
||||||
|
private File targetFile = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The target for any writes performed by the output stream.
|
||||||
|
*/
|
||||||
|
private FileOutputStream targetOutputStream = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new LazyOutputStream, indicating how to create a new stream.
|
||||||
|
* @param factory Creator of the output stream, when necessary.
|
||||||
|
*/
|
||||||
|
public LazyFileOutputStream( FileFactory factory ) {
|
||||||
|
this.factory = factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates whether the LazyOutputStream had to get off its butt and create
|
||||||
|
* a new output stream.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public boolean isCreated() {
|
||||||
|
return targetOutputStream != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Public method to return the lazily created file.
|
||||||
|
* @return Stream created by the lazy loader.
|
||||||
|
* @throw StingException if no stream was created.
|
||||||
|
*/
|
||||||
|
public File getBackingFile() {
|
||||||
|
if( targetFile == null )
|
||||||
|
throw new StingException("No lazy-loaded stream was created.");
|
||||||
|
return targetFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
getBackingOutputStream().close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() throws IOException {
|
||||||
|
getBackingOutputStream().flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(byte[] b) throws IOException {
|
||||||
|
getBackingOutputStream().write(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(byte[] b, int off, int len) throws IOException {
|
||||||
|
getBackingOutputStream().write(b,off,len);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(int b) throws IOException {
|
||||||
|
getBackingOutputStream().write(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lazy loader for the output stream.
|
||||||
|
*/
|
||||||
|
protected OutputStream getBackingOutputStream() {
|
||||||
|
if( targetOutputStream == null ) {
|
||||||
|
try {
|
||||||
|
targetFile = factory.create();
|
||||||
|
targetOutputStream = new FileOutputStream( targetFile );
|
||||||
|
}
|
||||||
|
catch( IOException ex ) {
|
||||||
|
throw new StingException("Unable to open new temp file", ex );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return targetOutputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Teaches the LazyOutputStream how to create a new outputstream when necessary.
|
||||||
|
*/
|
||||||
|
public interface FileFactory {
|
||||||
|
public File create() throws IOException;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue