From 10a6b57be6f5e56f92a2bf4a1e7775540a2e376c Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sat, 22 Sep 2012 13:21:55 -0400 Subject: [PATCH 01/10] Fix thread name: should be master executor not input --- .../broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index b014695da..d83a23c0f 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -92,7 +92,7 @@ public class NanoScheduler { runningMapJobSlots = new Semaphore(this.bufferSize); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); - this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); + this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d")); } // start timing the time spent outside of the nanoScheduler From 09bbd2c4c3846715fceada347584ca75d058b91a Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sat, 22 Sep 2012 13:22:27 -0400 Subject: [PATCH 02/10] Include exception in VCFWriter when one is found when rethrowing as ReviewedStingException --- .../sting/utils/variantcontext/writer/VCFWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/VCFWriter.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/VCFWriter.java index f5306b6da..f2d34fe85 100755 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/VCFWriter.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/writer/VCFWriter.java @@ -250,7 +250,7 @@ class VCFWriter extends IndexingVariantContextWriter { mWriter.write("\n"); mWriter.flush(); // necessary so that writing to an output stream will work } catch (IOException e) { - throw new RuntimeException("Unable to write the VCF object to " + getStreamName()); + throw new RuntimeException("Unable to write the VCF object to " + getStreamName(), e); } } From 4749fc114ff3337ae6b9ddc4bfd2ae30390de7d3 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sat, 22 Sep 2012 16:22:42 -0400 Subject: [PATCH 05/10] Temp. disable -nt > 1 and -nct > 1 while bugs are worked out --- .../org/broadinstitute/sting/gatk/executive/MicroScheduler.java | 2 ++ .../sting/utils/nanoScheduler/NanoSchedulerIntegrationTest.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index a256c8a97..1555da494 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -145,6 +145,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { logger.warn(String.format("Number of requested GATK threads %d is more than the number of " + "available processors on this machine %d", threadAllocation.getTotalNumThreads(), Runtime.getRuntime().availableProcessors())); + if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1) + throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1"); } if ( threadAllocation.getNumDataThreads() > 1 ) { diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerIntegrationTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerIntegrationTest.java index d19a58b3a..674b0d4de 100755 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerIntegrationTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/NanoSchedulerIntegrationTest.java @@ -19,6 +19,8 @@ public class NanoSchedulerIntegrationTest extends WalkerTest { for ( final int nt : Arrays.asList(1, 2) ) for ( final int nct : Arrays.asList(1, 2) ) { + if ( nt > 1 && nct > 1 ) + continue; // TODO -- remove me when we support -nct and -nt together // tests.add(new Object[]{ "SNP", "a1c7546f32a8919a3f3a70a04b2e8322", nt, nct }); //// tests.add(new Object[]{ "INDEL", "0a6d2be79f4f8a4b0eb788cc4751b31b", nt, nct }); tests.add(new Object[]{ "BOTH", "081d077786ac0af24e9f97259a55209c", nt, nct }); From a6b3497eacebb8d7d06684675744761dce9af044 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sun, 23 Sep 2012 18:02:48 -0400 Subject: [PATCH 06/10] Fixes GSA-515 Nanoscheduler GSA-577 -nt and -nct together appear to not close resources properly -- Fixes monster bug in the way that traversal engines interacted with the NanoScheduler via the output tracker. -- ThreadLocalOutputTracker is now a ThreadBasedOutputTracker that associates via a map from a master thread -> the storage map. Lookups occur by walking through threads in the same thread group, not just the thread itself (TBD -- should have a map from ThreadGroup instead) -- Removed unnecessary debug statement in GenomeLocParser -- nt and nct officially work together now --- .../executive/HierarchicalMicroScheduler.java | 48 ++++- .../gatk/executive/LinearMicroScheduler.java | 4 +- .../sting/gatk/executive/MicroScheduler.java | 52 +++-- .../sting/gatk/executive/ShardTraverser.java | 27 ++- .../gatk/io/ThreadBasedOutputTracker.java | 182 ++++++++++++++++++ .../gatk/io/ThreadLocalOutputTracker.java | 151 --------------- .../storage/VariantContextWriterStorage.java | 7 +- .../sting/utils/GenomeLocParser.java | 2 - .../NanoSchedulerIntegrationTest.java | 2 - 9 files changed, 283 insertions(+), 192 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java delete mode 100644 public/java/src/org/broadinstitute/sting/gatk/io/ThreadLocalOutputTracker.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 01c4315f2..dca2ecb7b 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -7,13 +7,13 @@ import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.io.OutputTracker; -import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; +import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory; +import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; @@ -39,7 +39,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * A thread local output tracker for managing output per-thread. */ - private ThreadLocalOutputTracker outputTracker = new ThreadLocalOutputTracker(); + private ThreadBasedOutputTracker outputTracker = new ThreadBasedOutputTracker(); private final Queue reduceTasks = new LinkedList(); @@ -93,11 +93,23 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar final int nThreadsToUse = threadAllocation.getNumDataThreads(); if ( threadAllocation.monitorThreadEfficiency() ) { - final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse); - setThreadEfficiencyMonitor(monitoringThreadFactory); - this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory); - } else { - this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + throw new UserException.BadArgumentValue("nt", "Cannot monitor thread efficiency with -nt, sorry"); + } + + this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, new UniqueThreadGroupThreadFactory()); + } + + /** + * Creates threads for HMS each with a unique thread group. Critical to + * track outputs via the ThreadBasedOutputTracker. + */ + private static class UniqueThreadGroupThreadFactory implements ThreadFactory { + int counter = 0; + + @Override + public Thread newThread(Runnable r) { + final ThreadGroup group = new ThreadGroup("HMS-group-" + counter++); + return new Thread(group, r); } } @@ -253,6 +265,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar protected void mergeExistingOutput( boolean wait ) { long startTime = System.currentTimeMillis(); +// logger.warn("MergingExistingOutput"); +// printOutputMergeTasks(); + // Create a list of the merge tasks that will be performed in this run of the mergeExistingOutput(). Queue mergeTasksInSession = new LinkedList(); while( !outputMergeTasks.isEmpty() ) { @@ -266,8 +281,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar mergeTasksInSession.add(traverser); } +// logger.warn("Selected things to merge:"); +// printOutputMergeTasks(mergeTasksInSession); + // Actually run through, merging the tasks in the working queue. for( ShardTraverser traverser: mergeTasksInSession ) { + //logger.warn("*** Merging " + traverser.getIntervalsString()); if( !traverser.isComplete() ) traverser.waitForComplete(); @@ -312,11 +331,24 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar reduceTree.addEntry(traverseResult); outputMergeTasks.add(traverser); +// logger.warn("adding merge task"); +// printOutputMergeTasks(); + // No more data? Let the reduce tree know so it can finish processing what it's got. if (!isShardTraversePending()) reduceTree.complete(); } + private synchronized void printOutputMergeTasks() { + printOutputMergeTasks(outputMergeTasks); + } + + private synchronized void printOutputMergeTasks(final Queue tasks) { + logger.info("Output merge tasks " + tasks.size()); + for ( final ShardTraverser traverser : tasks ) + logger.info(String.format("\t%s: complete? %b", traverser.getIntervalsString(), traverser.isComplete())); + } + /** Pulls the next reduce from the queue and runs it. */ protected void queueNextTreeReduce( Walker walker ) { if (reduceTasks.size() == 0) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 09b18bfe1..5b94e0767 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -61,7 +61,7 @@ public class LinearMicroScheduler extends MicroScheduler { boolean done = walker.isDone(); int counter = 0; - final TraversalEngine traversalEngine = borrowTraversalEngine(); + final TraversalEngine traversalEngine = borrowTraversalEngine(this); for (Shard shard : shardStrategy ) { if ( done || shard == null ) // we ran out of shards that aren't owned break; @@ -97,7 +97,7 @@ public class LinearMicroScheduler extends MicroScheduler { Object result = accumulator.finishTraversal(); outputTracker.close(); - returnTraversalEngine(traversalEngine); + returnTraversalEngine(this, traversalEngine); cleanup(); executionIsDone(); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 1555da494..5b1230c78 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -51,10 +51,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import java.io.File; import java.lang.management.ManagementFactory; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; /** @@ -94,6 +91,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ final LinkedList availableTraversalEngines = new LinkedList(); + /** + * Engines that have been allocated to a key already. + */ + final HashMap allocatedTraversalEngines = new HashMap(); + /** * Counts the number of instances of the class that are currently alive. */ @@ -145,8 +147,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { logger.warn(String.format("Number of requested GATK threads %d is more than the number of " + "available processors on this machine %d", threadAllocation.getTotalNumThreads(), Runtime.getRuntime().availableProcessors())); - if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1) - throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1"); +// if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1) +// throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1"); } if ( threadAllocation.getNumDataThreads() > 1 ) { @@ -391,21 +393,37 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } /** - * Returns a traversal engine suitable for use in this thread. + * Returns a traversal engine suitable for use, associated with key * - * Pops the next available engine from the available ones maintained by this + * Key is an arbitrary object that is used to retrieve the same traversal + * engine over and over. This can be important in the case where the + * traversal engine has data associated with it in some other context, + * and we need to ensure that the context always sees the same traversal + * engine. This happens in the HierarchicalMicroScheduler, where you want + * the a thread executing traversals to retrieve the same engine each time, + * as outputs are tracked w.r.t. that engine. + * + * If no engine is associated with key yet, pops the next available engine + * from the available ones maintained by this * microscheduler. Note that it's a runtime error to pop a traversal engine * from this scheduler if there are none available. Callers that * once pop'd an engine for use must return it with returnTraversalEngine * + * @param key the key to associate with this engine * @return a non-null TraversalEngine suitable for execution in this scheduler */ @Ensures("result != null") - protected synchronized TraversalEngine borrowTraversalEngine() { - if ( availableTraversalEngines.isEmpty() ) - throw new IllegalStateException("no traversal engines were available"); - else { - return availableTraversalEngines.pop(); + protected synchronized TraversalEngine borrowTraversalEngine(final Object key) { + if ( key == null ) throw new IllegalArgumentException("key cannot be null"); + + final TraversalEngine engine = allocatedTraversalEngines.get(key); + if ( engine == null ) { + if ( availableTraversalEngines.isEmpty() ) + throw new IllegalStateException("no traversal engines were available"); + allocatedTraversalEngines.put(key, availableTraversalEngines.pop()); + return allocatedTraversalEngines.get(key); + } else { + return engine; } } @@ -413,14 +431,18 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * Return a borrowed traversal engine to this MicroScheduler, for later use * in another traversal execution * + * @param key the key used to id the engine, provided to the borrowTraversalEngine function * @param traversalEngine the borrowed traversal engine. Must have been previously borrowed. */ - protected synchronized void returnTraversalEngine(final TraversalEngine traversalEngine) { + protected synchronized void returnTraversalEngine(final Object key, final TraversalEngine traversalEngine) { if ( traversalEngine == null ) throw new IllegalArgumentException("Attempting to push a null traversal engine"); if ( ! allCreatedTraversalEngines.contains(traversalEngine) ) throw new IllegalArgumentException("Attempting to push a traversal engine not created by this MicroScheduler" + engine); + if ( ! allocatedTraversalEngines.containsKey(key) ) + throw new IllegalArgumentException("No traversal engine was never checked out with key " + key); - availableTraversalEngines.push(traversalEngine); + // note there's nothing to actually do here, but a function implementation + // might want to do something } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index e6f539614..6d165f76a 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -4,9 +4,10 @@ import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; -import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; +import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.utils.Utils; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.concurrent.Callable; @@ -29,7 +30,7 @@ public class ShardTraverser implements Callable { final private HierarchicalMicroScheduler microScheduler; final private Walker walker; final private Shard shard; - final private ThreadLocalOutputTracker outputTracker; + final private ThreadBasedOutputTracker outputTracker; private OutputMergeTask outputMergeTask; /** our log, which we want to capture anything from this class */ @@ -43,7 +44,7 @@ public class ShardTraverser implements Callable { public ShardTraverser( HierarchicalMicroScheduler microScheduler, Walker walker, Shard shard, - ThreadLocalOutputTracker outputTracker) { + ThreadBasedOutputTracker outputTracker) { this.microScheduler = microScheduler; this.walker = walker; this.shard = shard; @@ -51,13 +52,15 @@ public class ShardTraverser implements Callable { } public Object call() { - final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine(); + final Object traversalEngineKey = Thread.currentThread(); + final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine(traversalEngineKey); + try { final long startTime = System.currentTimeMillis(); - // this is CRITICAL -- initializes the thread-local output maps in the parent thread, - // so that any subthreads created by the traversal itself are shared... - outputTracker.getStorageAndInitializeIfNecessary(); + // this is CRITICAL -- initializes output maps in this master thread, + // so that any subthreads created by the traversal itself can access this map + outputTracker.initializeStorage(); Object accumulator = walker.reduceInit(); final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(), @@ -85,12 +88,20 @@ public class ShardTraverser implements Callable { } finally { synchronized(this) { complete = true; - microScheduler.returnTraversalEngine(traversalEngine); + microScheduler.returnTraversalEngine(traversalEngineKey, traversalEngine); notifyAll(); } } } + /** + * Return a human readable string describing the intervals this traverser is operating on + * @return + */ + public String getIntervalsString() { + return Utils.join(",", shard.getGenomeLocs()); + } + /** * Has this traversal completed? * @return True if completed, false otherwise. diff --git a/public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java b/public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java new file mode 100644 index 000000000..f26d0c954 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2009 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.broadinstitute.sting.gatk.io; + +import org.broadinstitute.sting.gatk.executive.OutputMergeTask; +import org.broadinstitute.sting.gatk.io.storage.Storage; +import org.broadinstitute.sting.gatk.io.storage.StorageFactory; +import org.broadinstitute.sting.gatk.io.stubs.Stub; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * An output tracker that can either track its output per-thread or directly. + * + * This output tracker doesn't use thread local values, but rather looks up the + * storage map via the thread's group. This is necessary in the case where + * there's a master thread that creates the output map, and spawns subthreads + * that actually do work. As long as those subthreads are spawned in the + * thread group of the master thread, this tracker will properly find the + * storage map associated with the master thread in the group, and return + * the map to all subthreads. + * + * @author mhanna, depristo + * @version 0.2 + */ +public class ThreadBasedOutputTracker extends OutputTracker { + /** + * A map from thread ID of the master thread to the storage map from + * Stub to Storage objects + */ + private Map> threadsToStorage = new HashMap>(); + + /** + * A total hack. If bypass = true, bypass thread local storage and write directly + * to the target file. Used to handle output during initialize() and onTraversalDone(). + */ + private boolean bypass = false; + public void bypassThreadLocalStorage(boolean bypass) { + this.bypass = bypass; + } + + /** + * Initialize the storage map for this thread. + * + * Checks if there's a thread local binding for this thread, and if + * not initializes the map for it. This map is then + * populated with stub -> storage bindings according to the + * superclasses' outputs map. + * + * Must be called within the master thread to create a map associated with + * the master thread ID. + */ + public synchronized void initializeStorage() { + final long threadID = Thread.currentThread().getId(); + Map threadLocalOutputStreams = threadsToStorage.get(threadID); + + if( threadLocalOutputStreams == null ) { + threadLocalOutputStreams = new HashMap(); + threadsToStorage.put( threadID, threadLocalOutputStreams ); + } + + for ( final Stub stub : outputs.keySet() ) { + final Storage target = StorageFactory.createStorage(stub, createTempFile(stub)); + threadLocalOutputStreams.put(stub, target); + } + } + + @Override + public T getStorage( final Stub stub ) { + Storage target; + + if (bypass) { + target = outputs.get(stub); + if( target == null ) { + target = StorageFactory.createStorage(stub); + outputs.put(stub, target); + } + } + else { + final Map threadLocalOutputStreams = findStorage(Thread.currentThread()); + target = threadLocalOutputStreams.get(stub); + + // make sure something hasn't gone wrong, and we somehow find a map that doesn't include our stub + if ( target == null ) + throw new ReviewedStingException("target isn't supposed to be null for " + Thread.currentThread() + + " id " + Thread.currentThread().getId() + " map is " + threadLocalOutputStreams); + } + + return (T)target; + } + + + final Thread[] members = new Thread[1000]; // TODO -- dangerous -- fixme + private synchronized Map findStorage(final Thread thread) { + final Map map = threadsToStorage.get(thread.getId()); + if ( map != null ) { + return map; + } else { + final ThreadGroup tg = thread.getThreadGroup(); + final int nInfo = tg.enumerate(members); + if ( nInfo == members.length ) + throw new ReviewedStingException("too many threads in thread-group " + tg + " to safely get info. " + + "Maximum allowed threads is " + members.length); + + for ( int i = 0; i < nInfo; i++ ) { + final Map map2 = threadsToStorage.get(members[i].getId()); + if ( map2 != null ) + return map2; + } + + // something is terribly wrong, we have a storage lookup for a thread that doesn't have + // any map data associated with it! + throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " id " + thread.getId()); + } + } + + /** + * Close down any existing temporary files which have been opened. + */ + public synchronized OutputMergeTask closeStorage() { + final Map threadLocalOutputStreams = findStorage(Thread.currentThread()); + + if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() ) + return null; + + final OutputMergeTask outputMergeTask = new OutputMergeTask(); + for( Map.Entry entry: threadLocalOutputStreams.entrySet() ) { + final Stub stub = entry.getKey(); + final Storage storageEntry = entry.getValue(); + + storageEntry.close(); + outputMergeTask.addMergeOperation(getTargetStream(stub), storageEntry); + } + +// logger.info("Closing " + Thread.currentThread().getId() + " => " + threadLocalOutputStreams); + threadLocalOutputStreams.clear(); + + return outputMergeTask; + } + + /** + * Creates a temporary file for a stub of the given type. + * @param stub Stub for which to create a temporary file. + * @param Type of the stub to accept. + * @return A temp file, or throw an exception if the temp file cannot be created. + */ + private File createTempFile( Stub stub ) { + try { + return File.createTempFile( stub.getClass().getName(), null ); + } catch( IOException ex ) { + throw new UserException.BadTmpDir("Unable to create temporary file for stub: " + stub.getClass().getName() ); + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/io/ThreadLocalOutputTracker.java b/public/java/src/org/broadinstitute/sting/gatk/io/ThreadLocalOutputTracker.java deleted file mode 100644 index e1e42a9a1..000000000 --- a/public/java/src/org/broadinstitute/sting/gatk/io/ThreadLocalOutputTracker.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright (c) 2009 The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ - -package org.broadinstitute.sting.gatk.io; - -import org.broadinstitute.sting.gatk.executive.OutputMergeTask; -import org.broadinstitute.sting.gatk.io.storage.Storage; -import org.broadinstitute.sting.gatk.io.storage.StorageFactory; -import org.broadinstitute.sting.gatk.io.stubs.Stub; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * An output tracker that can either track its output per-thread or directly, - * - * @author mhanna, depristo - * @version 0.2 - */ -public class ThreadLocalOutputTracker extends OutputTracker { - /** - * Thread-local storage for output streams. - * - * MUST BE A INHERITABLE THREAD LOCAL - * -- NanoScheduler creates subthreads, and these threads must inherit the binding from their parent - */ - private ThreadLocal> storage = new InheritableThreadLocal>(); - - /** - * A total hack. If bypass = true, bypass thread local storage and write directly - * to the target file. Used to handle output during initialize() and onTraversalDone(). - */ - private boolean bypass = false; - public void bypassThreadLocalStorage(boolean bypass) { - this.bypass = bypass; - } - - /** - * Initialize the storage map for this thread, if necessary. - * - * Checks if there's a thread local binding for this thread, and if - * not initializes it. - * - * Particularly useful in the case where we want to initialize the map in - * a parent thread but have it used available to all the children via - * the InheritedThreadLocal map. - * - * @return the storage - */ - public Map getStorageAndInitializeIfNecessary() { - Map threadLocalOutputStreams = storage.get(); - - if( threadLocalOutputStreams == null ) { - threadLocalOutputStreams = new HashMap(); - storage.set( threadLocalOutputStreams ); - } - - return threadLocalOutputStreams; - } - - public T getStorage( Stub stub ) { - Storage target; - - if(bypass) { - target = outputs.get(stub); - if( target == null ) { - target = StorageFactory.createStorage(stub); - outputs.put(stub, target); - } - } - else { - final Map threadLocalOutputStreams = getStorageAndInitializeIfNecessary(); - - target = threadLocalOutputStreams.get(stub); - if( target == null ) { - target = StorageFactory.createStorage(stub, createTempFile(stub)); - threadLocalOutputStreams.put(stub, target); - } - } - - return (T)target; - } - - /** - * Close down any existing temporary files which have been opened. - */ - public OutputMergeTask closeStorage() { - Map threadLocalOutputStreams = storage.get(); - - if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() ) - return null; - - OutputMergeTask outputMergeTask = new OutputMergeTask(); - for( Map.Entry entry: threadLocalOutputStreams.entrySet() ) { - Stub stub = entry.getKey(); - Storage storageEntry = entry.getValue(); - - storageEntry.close(); - outputMergeTask.addMergeOperation(getTargetStream(stub),storageEntry); - } - - threadLocalOutputStreams.clear(); - - return outputMergeTask; - } - - /** - * Creates a temporary file for a stub of the given type. - * @param stub Stub for which to create a temporary file. - * @param Type of the stub to accept. - * @return A temp file, or throw an exception if the temp file cannot be created. - */ - private File createTempFile( Stub stub ) { - File tempFile = null; - - try { - tempFile = File.createTempFile( stub.getClass().getName(), null ); - //tempFile.deleteOnExit(); - } - catch( IOException ex ) { - throw new UserException.BadTmpDir("Unable to create temporary file for stub: " + stub.getClass().getName() ); - } - - return tempFile; - } -} diff --git a/public/java/src/org/broadinstitute/sting/gatk/io/storage/VariantContextWriterStorage.java b/public/java/src/org/broadinstitute/sting/gatk/io/storage/VariantContextWriterStorage.java index 28ea69f4c..c6438cfdb 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/io/storage/VariantContextWriterStorage.java +++ b/public/java/src/org/broadinstitute/sting/gatk/io/storage/VariantContextWriterStorage.java @@ -89,7 +89,7 @@ public class VariantContextWriterStorage implements Storage 1 && nct > 1 ) - continue; // TODO -- remove me when we support -nct and -nt together // tests.add(new Object[]{ "SNP", "a1c7546f32a8919a3f3a70a04b2e8322", nt, nct }); //// tests.add(new Object[]{ "INDEL", "0a6d2be79f4f8a4b0eb788cc4751b31b", nt, nct }); tests.add(new Object[]{ "BOTH", "081d077786ac0af24e9f97259a55209c", nt, nct }); From 3e8d9928287b2f7614976a4b539a86baaf5f4c8d Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sun, 23 Sep 2012 18:13:44 -0400 Subject: [PATCH 07/10] Remove bad error test from MicroScheduler, as it's no longer applicable. --- .../sting/gatk/executive/MicroScheduler.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 5b1230c78..07d9df79a 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -319,10 +319,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * pointers to the traversal engines */ public synchronized void shutdownTraversalEngines() { - if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() ) - throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " + - "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size() - + " have been returned"); + // no longer applicable because engines are allocated to keys now +// if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() ) +// throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " + +// "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size() +// + " have been returned"); for ( final TraversalEngine te : allCreatedTraversalEngines) te.shutdown(); From 9fd30d6f1c326bd0625a5b7fef24751dc1d03f80 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Sun, 23 Sep 2012 18:19:10 -0400 Subject: [PATCH 08/10] When writing the initial commit for nt + nct I realized this class was really just a ThreadGroupOutputTracker -- The code is cleaner and the logical more obvious now. --- .../executive/HierarchicalMicroScheduler.java | 6 ++-- .../sting/gatk/executive/ShardTraverser.java | 6 ++-- ...ker.java => ThreadGroupOutputTracker.java} | 28 ++++++------------- 3 files changed, 14 insertions(+), 26 deletions(-) rename public/java/src/org/broadinstitute/sting/gatk/io/{ThreadBasedOutputTracker.java => ThreadGroupOutputTracker.java} (86%) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index dca2ecb7b..31f2a469c 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -7,7 +7,7 @@ import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.io.OutputTracker; -import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker; +import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; @@ -39,7 +39,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * A thread local output tracker for managing output per-thread. */ - private ThreadBasedOutputTracker outputTracker = new ThreadBasedOutputTracker(); + private ThreadGroupOutputTracker outputTracker = new ThreadGroupOutputTracker(); private final Queue reduceTasks = new LinkedList(); @@ -101,7 +101,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Creates threads for HMS each with a unique thread group. Critical to - * track outputs via the ThreadBasedOutputTracker. + * track outputs via the ThreadGroupOutputTracker. */ private static class UniqueThreadGroupThreadFactory implements ThreadFactory { int counter = 0; diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 6d165f76a..d9a694846 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -4,7 +4,7 @@ import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; -import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker; +import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.Utils; @@ -30,7 +30,7 @@ public class ShardTraverser implements Callable { final private HierarchicalMicroScheduler microScheduler; final private Walker walker; final private Shard shard; - final private ThreadBasedOutputTracker outputTracker; + final private ThreadGroupOutputTracker outputTracker; private OutputMergeTask outputMergeTask; /** our log, which we want to capture anything from this class */ @@ -44,7 +44,7 @@ public class ShardTraverser implements Callable { public ShardTraverser( HierarchicalMicroScheduler microScheduler, Walker walker, Shard shard, - ThreadBasedOutputTracker outputTracker) { + ThreadGroupOutputTracker outputTracker) { this.microScheduler = microScheduler; this.walker = walker; this.shard = shard; diff --git a/public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java b/public/java/src/org/broadinstitute/sting/gatk/io/ThreadGroupOutputTracker.java similarity index 86% rename from public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java rename to public/java/src/org/broadinstitute/sting/gatk/io/ThreadGroupOutputTracker.java index f26d0c954..fdfe494a7 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java +++ b/public/java/src/org/broadinstitute/sting/gatk/io/ThreadGroupOutputTracker.java @@ -51,12 +51,12 @@ import java.util.Map; * @author mhanna, depristo * @version 0.2 */ -public class ThreadBasedOutputTracker extends OutputTracker { +public class ThreadGroupOutputTracker extends OutputTracker { /** * A map from thread ID of the master thread to the storage map from * Stub to Storage objects */ - private Map> threadsToStorage = new HashMap>(); + private Map> threadsToStorage = new HashMap>(); /** * A total hack. If bypass = true, bypass thread local storage and write directly @@ -79,12 +79,12 @@ public class ThreadBasedOutputTracker extends OutputTracker { * the master thread ID. */ public synchronized void initializeStorage() { - final long threadID = Thread.currentThread().getId(); - Map threadLocalOutputStreams = threadsToStorage.get(threadID); + final ThreadGroup group = Thread.currentThread().getThreadGroup(); + Map threadLocalOutputStreams = threadsToStorage.get(group); if( threadLocalOutputStreams == null ) { threadLocalOutputStreams = new HashMap(); - threadsToStorage.put( threadID, threadLocalOutputStreams ); + threadsToStorage.put( group, threadLocalOutputStreams ); } for ( final Stub stub : outputs.keySet() ) { @@ -118,27 +118,15 @@ public class ThreadBasedOutputTracker extends OutputTracker { } - final Thread[] members = new Thread[1000]; // TODO -- dangerous -- fixme private synchronized Map findStorage(final Thread thread) { - final Map map = threadsToStorage.get(thread.getId()); + final Map map = threadsToStorage.get(thread.getThreadGroup()); + if ( map != null ) { return map; } else { - final ThreadGroup tg = thread.getThreadGroup(); - final int nInfo = tg.enumerate(members); - if ( nInfo == members.length ) - throw new ReviewedStingException("too many threads in thread-group " + tg + " to safely get info. " + - "Maximum allowed threads is " + members.length); - - for ( int i = 0; i < nInfo; i++ ) { - final Map map2 = threadsToStorage.get(members[i].getId()); - if ( map2 != null ) - return map2; - } - // something is terribly wrong, we have a storage lookup for a thread that doesn't have // any map data associated with it! - throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " id " + thread.getId()); + throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " in group " + thread.getThreadGroup()); } } From 0b488cce669ac294a9d3212d5d19423ca256dc7a Mon Sep 17 00:00:00 2001 From: David Roazen Date: Mon, 24 Sep 2012 14:45:53 -0400 Subject: [PATCH 09/10] ExperimentalReadShardBalancer: close() exhausted iterators Fixes a truly awful SAMReaders resource leak reported by Eric -- thanks Eric! --- .../gatk/datasources/reads/ExperimentalReadShardBalancer.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ExperimentalReadShardBalancer.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ExperimentalReadShardBalancer.java index 73719cbb0..4d1d2a533 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ExperimentalReadShardBalancer.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/ExperimentalReadShardBalancer.java @@ -89,6 +89,10 @@ public class ExperimentalReadShardBalancer extends ShardBalancer { // If we've exhausted the current file pointer of reads, move to the next file pointer (if there is one): if ( currentFilePointerReadsIterator != null && ! currentFilePointerReadsIterator.hasNext() ) { + + // Close the old, exhausted chain of iterators to release resources + currentFilePointerReadsIterator.close(); + do { advanceFilePointer(); } while ( currentFilePointer != null && isEmpty(currentFilePointer.fileSpans) ); // skip empty file pointers From 3f44b3e01939e2a5f4ca33cdaf05548a64e5efd4 Mon Sep 17 00:00:00 2001 From: David Roazen Date: Mon, 24 Sep 2012 15:38:07 -0400 Subject: [PATCH 10/10] Update DataProcessingPipelineTest MD5s --- .../sting/queue/pipeline/DataProcessingPipelineTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/public/scala/test/org/broadinstitute/sting/queue/pipeline/DataProcessingPipelineTest.scala b/public/scala/test/org/broadinstitute/sting/queue/pipeline/DataProcessingPipelineTest.scala index 3fb9e0efa..19f00ac62 100644 --- a/public/scala/test/org/broadinstitute/sting/queue/pipeline/DataProcessingPipelineTest.scala +++ b/public/scala/test/org/broadinstitute/sting/queue/pipeline/DataProcessingPipelineTest.scala @@ -41,7 +41,7 @@ class DataProcessingPipelineTest { " -D " + BaseTest.publicTestDir + "exampleDBSNP.vcf", " -test ", " -p " + projectName).mkString - spec.fileMD5s += testOut -> "60d39ae909fdd049920b54e0965b6d3c" + spec.fileMD5s += testOut -> "45d97df6d291695b92668e8a55c54cd0" PipelineTest.executeTest(spec) } @@ -60,7 +60,7 @@ class DataProcessingPipelineTest { " -bwa /home/unix/carneiro/bin/bwa", " -bwape ", " -p " + projectName).mkString - spec.fileMD5s += testOut -> "61ca3237afdfabf78ee27a5bb80dae59" + spec.fileMD5s += testOut -> "6e70efbe6bafc3fedd60bd406bd201db" PipelineTest.executeTest(spec) }