diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index fa9f7a645..bd6e2a2a9 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -102,7 +102,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar ReduceTree reduceTree = new ReduceTree(this); - walker.initialize(); + initializeWalker(walker); for (Shard shard : shardStrategy) traverseTasks.add(shard); @@ -124,29 +124,59 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar queueNextShardTraverse(walker, reduceTree); } + threadPool.shutdown(); + // Merge any lingering output files. If these files aren't ready, // sit around and wait for them, then merge them. mergeExistingOutput(true); - threadPool.shutdown(); - Object result = null; try { result = reduceTree.getResult().get(); + notifyTraversalDone(walker,result); } catch (Exception ex) { throw new StingException("Unable to retrieve result", ex); } - walker.onTraversalDone(result); - - printOnTraversalDone(result); - - getOutputTracker().close(); + outputTracker.close(); return result; } + /** + * Run the initialize method of the walker. Ensure that any calls + * to the output stream will bypass thread local storage and write + * directly to the output file. + * @param walker Walker to initialize. + */ + protected void initializeWalker(Walker walker) { + outputTracker.bypassThreadLocalStorage(true); + try { + walker.initialize(); + } + finally { + outputTracker.bypassThreadLocalStorage(false); + } + } + + /** + * Run the initialize method of the walker. Ensure that any calls + * to the output stream will bypass thread local storage and write + * directly to the output file. + * @param walker Walker to initialize. + */ + protected void notifyTraversalDone(Walker walker, Object result) { + outputTracker.bypassThreadLocalStorage(true); + try { + walker.onTraversalDone(result); + printOnTraversalDone(result); + } + finally { + outputTracker.bypassThreadLocalStorage(false); + } + } + /** * @{inheritDoc} */ diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 246472358..d96d1b745 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -59,7 +59,7 @@ public class LinearMicroScheduler extends MicroScheduler { printOnTraversalDone(result); - getOutputTracker().close(); + outputTracker.close(); return accumulator; } diff --git a/java/src/org/broadinstitute/sting/gatk/io/ThreadLocalOutputTracker.java b/java/src/org/broadinstitute/sting/gatk/io/ThreadLocalOutputTracker.java index 1799b71bf..2fdedca9a 100644 --- a/java/src/org/broadinstitute/sting/gatk/io/ThreadLocalOutputTracker.java +++ b/java/src/org/broadinstitute/sting/gatk/io/ThreadLocalOutputTracker.java @@ -36,7 +36,7 @@ import java.io.File; import java.io.IOException; /** - * An output tracker that tracks its output per-thread. + * An output tracker that can either track its output per-thread or directly, * * @author mhanna * @version 0.1 @@ -47,18 +47,38 @@ public class ThreadLocalOutputTracker extends OutputTracker { */ private ThreadLocal> storage = new ThreadLocal>(); + /** + * A total hack. If bypass = true, bypass thread local storage and write directly + * to the target file. Used to handle output during initialize() and onTraversalDone(). + */ + private boolean bypass = false; + public void bypassThreadLocalStorage(boolean bypass) { + this.bypass = bypass; + } + public T getStorage( Stub stub ) { - Map threadLocalOutputStreams = storage.get(); + Storage target; - if( threadLocalOutputStreams == null ) { - threadLocalOutputStreams = new HashMap(); - storage.set( threadLocalOutputStreams ); + if(bypass) { + target = outputs.get(stub); + if( target == null ) { + target = StorageFactory.createStorage(stub); + outputs.put(stub, target); + } } + else { + Map threadLocalOutputStreams = storage.get(); - Storage target = threadLocalOutputStreams.get(stub); - if( target == null ) { - target = StorageFactory.createStorage(stub, createTempFile(stub)); - threadLocalOutputStreams.put(stub, target); + if( threadLocalOutputStreams == null ) { + threadLocalOutputStreams = new HashMap(); + storage.set( threadLocalOutputStreams ); + } + + target = threadLocalOutputStreams.get(stub); + if( target == null ) { + target = StorageFactory.createStorage(stub, createTempFile(stub)); + threadLocalOutputStreams.put(stub, target); + } } return (T)target; @@ -68,13 +88,13 @@ public class ThreadLocalOutputTracker extends OutputTracker { * Close down any existing temporary files which have been opened. */ public OutputMergeTask closeStorage() { - Map threadLocalStorage = storage.get(); + Map threadLocalOutputStreams = storage.get(); - if( threadLocalStorage == null || threadLocalStorage.isEmpty() ) + if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() ) return null; OutputMergeTask outputMergeTask = new OutputMergeTask(); - for( Map.Entry entry: threadLocalStorage.entrySet() ) { + for( Map.Entry entry: threadLocalOutputStreams.entrySet() ) { Stub stub = entry.getKey(); Storage storageEntry = entry.getValue(); @@ -82,7 +102,7 @@ public class ThreadLocalOutputTracker extends OutputTracker { outputMergeTask.addMergeOperation(getTargetStream(stub),storageEntry); } - threadLocalStorage.clear(); + threadLocalOutputStreams.clear(); return outputMergeTask; }