Fixes GSA-515 Nanoscheduler GSA-577 -nt and -nct together appear to not close resources properly
-- Fixes monster bug in the way that traversal engines interacted with the NanoScheduler via the output tracker. -- ThreadLocalOutputTracker is now a ThreadBasedOutputTracker that associates via a map from a master thread -> the storage map. Lookups occur by walking through threads in the same thread group, not just the thread itself (TBD -- should have a map from ThreadGroup instead) -- Removed unnecessary debug statement in GenomeLocParser -- nt and nct officially work together now
This commit is contained in:
parent
4749fc114f
commit
a6b3497eac
|
|
@ -7,13 +7,13 @@ import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource;
|
|||
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
|
||||
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
|
||||
import org.broadinstitute.sting.gatk.io.OutputTracker;
|
||||
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
|
||||
import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker;
|
||||
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
|
||||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
||||
|
||||
import java.util.Collection;
|
||||
|
|
@ -39,7 +39,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
/**
|
||||
* A thread local output tracker for managing output per-thread.
|
||||
*/
|
||||
private ThreadLocalOutputTracker outputTracker = new ThreadLocalOutputTracker();
|
||||
private ThreadBasedOutputTracker outputTracker = new ThreadBasedOutputTracker();
|
||||
|
||||
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
||||
|
||||
|
|
@ -93,11 +93,23 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
|
||||
final int nThreadsToUse = threadAllocation.getNumDataThreads();
|
||||
if ( threadAllocation.monitorThreadEfficiency() ) {
|
||||
final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse);
|
||||
setThreadEfficiencyMonitor(monitoringThreadFactory);
|
||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory);
|
||||
} else {
|
||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
||||
throw new UserException.BadArgumentValue("nt", "Cannot monitor thread efficiency with -nt, sorry");
|
||||
}
|
||||
|
||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, new UniqueThreadGroupThreadFactory());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates threads for HMS each with a unique thread group. Critical to
|
||||
* track outputs via the ThreadBasedOutputTracker.
|
||||
*/
|
||||
private static class UniqueThreadGroupThreadFactory implements ThreadFactory {
|
||||
int counter = 0;
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
final ThreadGroup group = new ThreadGroup("HMS-group-" + counter++);
|
||||
return new Thread(group, r);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -253,6 +265,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
protected void mergeExistingOutput( boolean wait ) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// logger.warn("MergingExistingOutput");
|
||||
// printOutputMergeTasks();
|
||||
|
||||
// Create a list of the merge tasks that will be performed in this run of the mergeExistingOutput().
|
||||
Queue<ShardTraverser> mergeTasksInSession = new LinkedList<ShardTraverser>();
|
||||
while( !outputMergeTasks.isEmpty() ) {
|
||||
|
|
@ -266,8 +281,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
mergeTasksInSession.add(traverser);
|
||||
}
|
||||
|
||||
// logger.warn("Selected things to merge:");
|
||||
// printOutputMergeTasks(mergeTasksInSession);
|
||||
|
||||
// Actually run through, merging the tasks in the working queue.
|
||||
for( ShardTraverser traverser: mergeTasksInSession ) {
|
||||
//logger.warn("*** Merging " + traverser.getIntervalsString());
|
||||
if( !traverser.isComplete() )
|
||||
traverser.waitForComplete();
|
||||
|
||||
|
|
@ -312,11 +331,24 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
reduceTree.addEntry(traverseResult);
|
||||
outputMergeTasks.add(traverser);
|
||||
|
||||
// logger.warn("adding merge task");
|
||||
// printOutputMergeTasks();
|
||||
|
||||
// No more data? Let the reduce tree know so it can finish processing what it's got.
|
||||
if (!isShardTraversePending())
|
||||
reduceTree.complete();
|
||||
}
|
||||
|
||||
private synchronized void printOutputMergeTasks() {
|
||||
printOutputMergeTasks(outputMergeTasks);
|
||||
}
|
||||
|
||||
private synchronized void printOutputMergeTasks(final Queue<ShardTraverser> tasks) {
|
||||
logger.info("Output merge tasks " + tasks.size());
|
||||
for ( final ShardTraverser traverser : tasks )
|
||||
logger.info(String.format("\t%s: complete? %b", traverser.getIntervalsString(), traverser.isComplete()));
|
||||
}
|
||||
|
||||
/** Pulls the next reduce from the queue and runs it. */
|
||||
protected void queueNextTreeReduce( Walker walker ) {
|
||||
if (reduceTasks.size() == 0)
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ public class LinearMicroScheduler extends MicroScheduler {
|
|||
boolean done = walker.isDone();
|
||||
int counter = 0;
|
||||
|
||||
final TraversalEngine traversalEngine = borrowTraversalEngine();
|
||||
final TraversalEngine traversalEngine = borrowTraversalEngine(this);
|
||||
for (Shard shard : shardStrategy ) {
|
||||
if ( done || shard == null ) // we ran out of shards that aren't owned
|
||||
break;
|
||||
|
|
@ -97,7 +97,7 @@ public class LinearMicroScheduler extends MicroScheduler {
|
|||
Object result = accumulator.finishTraversal();
|
||||
|
||||
outputTracker.close();
|
||||
returnTraversalEngine(traversalEngine);
|
||||
returnTraversalEngine(this, traversalEngine);
|
||||
cleanup();
|
||||
executionIsDone();
|
||||
|
||||
|
|
|
|||
|
|
@ -51,10 +51,7 @@ import javax.management.MBeanServer;
|
|||
import javax.management.ObjectName;
|
||||
import java.io.File;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -94,6 +91,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
*/
|
||||
final LinkedList<TraversalEngine> availableTraversalEngines = new LinkedList<TraversalEngine>();
|
||||
|
||||
/**
|
||||
* Engines that have been allocated to a key already.
|
||||
*/
|
||||
final HashMap<Object, TraversalEngine> allocatedTraversalEngines = new HashMap<Object, TraversalEngine>();
|
||||
|
||||
/**
|
||||
* Counts the number of instances of the class that are currently alive.
|
||||
*/
|
||||
|
|
@ -145,8 +147,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
logger.warn(String.format("Number of requested GATK threads %d is more than the number of " +
|
||||
"available processors on this machine %d", threadAllocation.getTotalNumThreads(),
|
||||
Runtime.getRuntime().availableProcessors()));
|
||||
if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1)
|
||||
throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1");
|
||||
// if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1)
|
||||
// throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1");
|
||||
}
|
||||
|
||||
if ( threadAllocation.getNumDataThreads() > 1 ) {
|
||||
|
|
@ -391,21 +393,37 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a traversal engine suitable for use in this thread.
|
||||
* Returns a traversal engine suitable for use, associated with key
|
||||
*
|
||||
* Pops the next available engine from the available ones maintained by this
|
||||
* Key is an arbitrary object that is used to retrieve the same traversal
|
||||
* engine over and over. This can be important in the case where the
|
||||
* traversal engine has data associated with it in some other context,
|
||||
* and we need to ensure that the context always sees the same traversal
|
||||
* engine. This happens in the HierarchicalMicroScheduler, where you want
|
||||
* the a thread executing traversals to retrieve the same engine each time,
|
||||
* as outputs are tracked w.r.t. that engine.
|
||||
*
|
||||
* If no engine is associated with key yet, pops the next available engine
|
||||
* from the available ones maintained by this
|
||||
* microscheduler. Note that it's a runtime error to pop a traversal engine
|
||||
* from this scheduler if there are none available. Callers that
|
||||
* once pop'd an engine for use must return it with returnTraversalEngine
|
||||
*
|
||||
* @param key the key to associate with this engine
|
||||
* @return a non-null TraversalEngine suitable for execution in this scheduler
|
||||
*/
|
||||
@Ensures("result != null")
|
||||
protected synchronized TraversalEngine borrowTraversalEngine() {
|
||||
if ( availableTraversalEngines.isEmpty() )
|
||||
throw new IllegalStateException("no traversal engines were available");
|
||||
else {
|
||||
return availableTraversalEngines.pop();
|
||||
protected synchronized TraversalEngine borrowTraversalEngine(final Object key) {
|
||||
if ( key == null ) throw new IllegalArgumentException("key cannot be null");
|
||||
|
||||
final TraversalEngine engine = allocatedTraversalEngines.get(key);
|
||||
if ( engine == null ) {
|
||||
if ( availableTraversalEngines.isEmpty() )
|
||||
throw new IllegalStateException("no traversal engines were available");
|
||||
allocatedTraversalEngines.put(key, availableTraversalEngines.pop());
|
||||
return allocatedTraversalEngines.get(key);
|
||||
} else {
|
||||
return engine;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -413,14 +431,18 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
* Return a borrowed traversal engine to this MicroScheduler, for later use
|
||||
* in another traversal execution
|
||||
*
|
||||
* @param key the key used to id the engine, provided to the borrowTraversalEngine function
|
||||
* @param traversalEngine the borrowed traversal engine. Must have been previously borrowed.
|
||||
*/
|
||||
protected synchronized void returnTraversalEngine(final TraversalEngine traversalEngine) {
|
||||
protected synchronized void returnTraversalEngine(final Object key, final TraversalEngine traversalEngine) {
|
||||
if ( traversalEngine == null )
|
||||
throw new IllegalArgumentException("Attempting to push a null traversal engine");
|
||||
if ( ! allCreatedTraversalEngines.contains(traversalEngine) )
|
||||
throw new IllegalArgumentException("Attempting to push a traversal engine not created by this MicroScheduler" + engine);
|
||||
if ( ! allocatedTraversalEngines.containsKey(key) )
|
||||
throw new IllegalArgumentException("No traversal engine was never checked out with key " + key);
|
||||
|
||||
availableTraversalEngines.push(traversalEngine);
|
||||
// note there's nothing to actually do here, but a function implementation
|
||||
// might want to do something
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,9 +4,10 @@ import org.apache.log4j.Logger;
|
|||
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
|
||||
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
|
||||
import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.utils.Utils;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
|
@ -29,7 +30,7 @@ public class ShardTraverser implements Callable {
|
|||
final private HierarchicalMicroScheduler microScheduler;
|
||||
final private Walker walker;
|
||||
final private Shard shard;
|
||||
final private ThreadLocalOutputTracker outputTracker;
|
||||
final private ThreadBasedOutputTracker outputTracker;
|
||||
private OutputMergeTask outputMergeTask;
|
||||
|
||||
/** our log, which we want to capture anything from this class */
|
||||
|
|
@ -43,7 +44,7 @@ public class ShardTraverser implements Callable {
|
|||
public ShardTraverser( HierarchicalMicroScheduler microScheduler,
|
||||
Walker walker,
|
||||
Shard shard,
|
||||
ThreadLocalOutputTracker outputTracker) {
|
||||
ThreadBasedOutputTracker outputTracker) {
|
||||
this.microScheduler = microScheduler;
|
||||
this.walker = walker;
|
||||
this.shard = shard;
|
||||
|
|
@ -51,13 +52,15 @@ public class ShardTraverser implements Callable {
|
|||
}
|
||||
|
||||
public Object call() {
|
||||
final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine();
|
||||
final Object traversalEngineKey = Thread.currentThread();
|
||||
final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine(traversalEngineKey);
|
||||
|
||||
try {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
// this is CRITICAL -- initializes the thread-local output maps in the parent thread,
|
||||
// so that any subthreads created by the traversal itself are shared...
|
||||
outputTracker.getStorageAndInitializeIfNecessary();
|
||||
// this is CRITICAL -- initializes output maps in this master thread,
|
||||
// so that any subthreads created by the traversal itself can access this map
|
||||
outputTracker.initializeStorage();
|
||||
|
||||
Object accumulator = walker.reduceInit();
|
||||
final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(),
|
||||
|
|
@ -85,12 +88,20 @@ public class ShardTraverser implements Callable {
|
|||
} finally {
|
||||
synchronized(this) {
|
||||
complete = true;
|
||||
microScheduler.returnTraversalEngine(traversalEngine);
|
||||
microScheduler.returnTraversalEngine(traversalEngineKey, traversalEngine);
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a human readable string describing the intervals this traverser is operating on
|
||||
* @return
|
||||
*/
|
||||
public String getIntervalsString() {
|
||||
return Utils.join(",", shard.getGenomeLocs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Has this traversal completed?
|
||||
* @return True if completed, false otherwise.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
* Copyright (c) 2009 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.io;
|
||||
|
||||
import org.broadinstitute.sting.gatk.executive.OutputMergeTask;
|
||||
import org.broadinstitute.sting.gatk.io.storage.Storage;
|
||||
import org.broadinstitute.sting.gatk.io.storage.StorageFactory;
|
||||
import org.broadinstitute.sting.gatk.io.stubs.Stub;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* An output tracker that can either track its output per-thread or directly.
|
||||
*
|
||||
* This output tracker doesn't use thread local values, but rather looks up the
|
||||
* storage map via the thread's group. This is necessary in the case where
|
||||
* there's a master thread that creates the output map, and spawns subthreads
|
||||
* that actually do work. As long as those subthreads are spawned in the
|
||||
* thread group of the master thread, this tracker will properly find the
|
||||
* storage map associated with the master thread in the group, and return
|
||||
* the map to all subthreads.
|
||||
*
|
||||
* @author mhanna, depristo
|
||||
* @version 0.2
|
||||
*/
|
||||
public class ThreadBasedOutputTracker extends OutputTracker {
|
||||
/**
|
||||
* A map from thread ID of the master thread to the storage map from
|
||||
* Stub to Storage objects
|
||||
*/
|
||||
private Map<Long, Map<Stub, Storage>> threadsToStorage = new HashMap<Long, Map<Stub, Storage>>();
|
||||
|
||||
/**
|
||||
* A total hack. If bypass = true, bypass thread local storage and write directly
|
||||
* to the target file. Used to handle output during initialize() and onTraversalDone().
|
||||
*/
|
||||
private boolean bypass = false;
|
||||
public void bypassThreadLocalStorage(boolean bypass) {
|
||||
this.bypass = bypass;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the storage map for this thread.
|
||||
*
|
||||
* Checks if there's a thread local binding for this thread, and if
|
||||
* not initializes the map for it. This map is then
|
||||
* populated with stub -> storage bindings according to the
|
||||
* superclasses' outputs map.
|
||||
*
|
||||
* Must be called within the master thread to create a map associated with
|
||||
* the master thread ID.
|
||||
*/
|
||||
public synchronized void initializeStorage() {
|
||||
final long threadID = Thread.currentThread().getId();
|
||||
Map<Stub,Storage> threadLocalOutputStreams = threadsToStorage.get(threadID);
|
||||
|
||||
if( threadLocalOutputStreams == null ) {
|
||||
threadLocalOutputStreams = new HashMap<Stub,Storage>();
|
||||
threadsToStorage.put( threadID, threadLocalOutputStreams );
|
||||
}
|
||||
|
||||
for ( final Stub stub : outputs.keySet() ) {
|
||||
final Storage target = StorageFactory.createStorage(stub, createTempFile(stub));
|
||||
threadLocalOutputStreams.put(stub, target);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T getStorage( final Stub<T> stub ) {
|
||||
Storage target;
|
||||
|
||||
if (bypass) {
|
||||
target = outputs.get(stub);
|
||||
if( target == null ) {
|
||||
target = StorageFactory.createStorage(stub);
|
||||
outputs.put(stub, target);
|
||||
}
|
||||
}
|
||||
else {
|
||||
final Map<Stub,Storage> threadLocalOutputStreams = findStorage(Thread.currentThread());
|
||||
target = threadLocalOutputStreams.get(stub);
|
||||
|
||||
// make sure something hasn't gone wrong, and we somehow find a map that doesn't include our stub
|
||||
if ( target == null )
|
||||
throw new ReviewedStingException("target isn't supposed to be null for " + Thread.currentThread()
|
||||
+ " id " + Thread.currentThread().getId() + " map is " + threadLocalOutputStreams);
|
||||
}
|
||||
|
||||
return (T)target;
|
||||
}
|
||||
|
||||
|
||||
final Thread[] members = new Thread[1000]; // TODO -- dangerous -- fixme
|
||||
private synchronized Map<Stub,Storage> findStorage(final Thread thread) {
|
||||
final Map<Stub, Storage> map = threadsToStorage.get(thread.getId());
|
||||
if ( map != null ) {
|
||||
return map;
|
||||
} else {
|
||||
final ThreadGroup tg = thread.getThreadGroup();
|
||||
final int nInfo = tg.enumerate(members);
|
||||
if ( nInfo == members.length )
|
||||
throw new ReviewedStingException("too many threads in thread-group " + tg + " to safely get info. " +
|
||||
"Maximum allowed threads is " + members.length);
|
||||
|
||||
for ( int i = 0; i < nInfo; i++ ) {
|
||||
final Map<Stub, Storage> map2 = threadsToStorage.get(members[i].getId());
|
||||
if ( map2 != null )
|
||||
return map2;
|
||||
}
|
||||
|
||||
// something is terribly wrong, we have a storage lookup for a thread that doesn't have
|
||||
// any map data associated with it!
|
||||
throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " id " + thread.getId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close down any existing temporary files which have been opened.
|
||||
*/
|
||||
public synchronized OutputMergeTask closeStorage() {
|
||||
final Map<Stub,Storage> threadLocalOutputStreams = findStorage(Thread.currentThread());
|
||||
|
||||
if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() )
|
||||
return null;
|
||||
|
||||
final OutputMergeTask outputMergeTask = new OutputMergeTask();
|
||||
for( Map.Entry<Stub,Storage> entry: threadLocalOutputStreams.entrySet() ) {
|
||||
final Stub stub = entry.getKey();
|
||||
final Storage storageEntry = entry.getValue();
|
||||
|
||||
storageEntry.close();
|
||||
outputMergeTask.addMergeOperation(getTargetStream(stub), storageEntry);
|
||||
}
|
||||
|
||||
// logger.info("Closing " + Thread.currentThread().getId() + " => " + threadLocalOutputStreams);
|
||||
threadLocalOutputStreams.clear();
|
||||
|
||||
return outputMergeTask;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a temporary file for a stub of the given type.
|
||||
* @param stub Stub for which to create a temporary file.
|
||||
* @param <T> Type of the stub to accept.
|
||||
* @return A temp file, or throw an exception if the temp file cannot be created.
|
||||
*/
|
||||
private <T> File createTempFile( Stub<T> stub ) {
|
||||
try {
|
||||
return File.createTempFile( stub.getClass().getName(), null );
|
||||
} catch( IOException ex ) {
|
||||
throw new UserException.BadTmpDir("Unable to create temporary file for stub: " + stub.getClass().getName() );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,151 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2009 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.io;
|
||||
|
||||
import org.broadinstitute.sting.gatk.executive.OutputMergeTask;
|
||||
import org.broadinstitute.sting.gatk.io.storage.Storage;
|
||||
import org.broadinstitute.sting.gatk.io.storage.StorageFactory;
|
||||
import org.broadinstitute.sting.gatk.io.stubs.Stub;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* An output tracker that can either track its output per-thread or directly,
|
||||
*
|
||||
* @author mhanna, depristo
|
||||
* @version 0.2
|
||||
*/
|
||||
public class ThreadLocalOutputTracker extends OutputTracker {
|
||||
/**
|
||||
* Thread-local storage for output streams.
|
||||
*
|
||||
* MUST BE A INHERITABLE THREAD LOCAL
|
||||
* -- NanoScheduler creates subthreads, and these threads must inherit the binding from their parent
|
||||
*/
|
||||
private ThreadLocal<Map<Stub, Storage>> storage = new InheritableThreadLocal<Map<Stub, Storage>>();
|
||||
|
||||
/**
|
||||
* A total hack. If bypass = true, bypass thread local storage and write directly
|
||||
* to the target file. Used to handle output during initialize() and onTraversalDone().
|
||||
*/
|
||||
private boolean bypass = false;
|
||||
public void bypassThreadLocalStorage(boolean bypass) {
|
||||
this.bypass = bypass;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the storage map for this thread, if necessary.
|
||||
*
|
||||
* Checks if there's a thread local binding for this thread, and if
|
||||
* not initializes it.
|
||||
*
|
||||
* Particularly useful in the case where we want to initialize the map in
|
||||
* a parent thread but have it used available to all the children via
|
||||
* the InheritedThreadLocal map.
|
||||
*
|
||||
* @return the storage
|
||||
*/
|
||||
public Map<Stub,Storage> getStorageAndInitializeIfNecessary() {
|
||||
Map<Stub,Storage> threadLocalOutputStreams = storage.get();
|
||||
|
||||
if( threadLocalOutputStreams == null ) {
|
||||
threadLocalOutputStreams = new HashMap<Stub,Storage>();
|
||||
storage.set( threadLocalOutputStreams );
|
||||
}
|
||||
|
||||
return threadLocalOutputStreams;
|
||||
}
|
||||
|
||||
public <T> T getStorage( Stub<T> stub ) {
|
||||
Storage target;
|
||||
|
||||
if(bypass) {
|
||||
target = outputs.get(stub);
|
||||
if( target == null ) {
|
||||
target = StorageFactory.createStorage(stub);
|
||||
outputs.put(stub, target);
|
||||
}
|
||||
}
|
||||
else {
|
||||
final Map<Stub,Storage> threadLocalOutputStreams = getStorageAndInitializeIfNecessary();
|
||||
|
||||
target = threadLocalOutputStreams.get(stub);
|
||||
if( target == null ) {
|
||||
target = StorageFactory.createStorage(stub, createTempFile(stub));
|
||||
threadLocalOutputStreams.put(stub, target);
|
||||
}
|
||||
}
|
||||
|
||||
return (T)target;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close down any existing temporary files which have been opened.
|
||||
*/
|
||||
public OutputMergeTask closeStorage() {
|
||||
Map<Stub,Storage> threadLocalOutputStreams = storage.get();
|
||||
|
||||
if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() )
|
||||
return null;
|
||||
|
||||
OutputMergeTask outputMergeTask = new OutputMergeTask();
|
||||
for( Map.Entry<Stub,Storage> entry: threadLocalOutputStreams.entrySet() ) {
|
||||
Stub stub = entry.getKey();
|
||||
Storage storageEntry = entry.getValue();
|
||||
|
||||
storageEntry.close();
|
||||
outputMergeTask.addMergeOperation(getTargetStream(stub),storageEntry);
|
||||
}
|
||||
|
||||
threadLocalOutputStreams.clear();
|
||||
|
||||
return outputMergeTask;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a temporary file for a stub of the given type.
|
||||
* @param stub Stub for which to create a temporary file.
|
||||
* @param <T> Type of the stub to accept.
|
||||
* @return A temp file, or throw an exception if the temp file cannot be created.
|
||||
*/
|
||||
private <T> File createTempFile( Stub<T> stub ) {
|
||||
File tempFile = null;
|
||||
|
||||
try {
|
||||
tempFile = File.createTempFile( stub.getClass().getName(), null );
|
||||
//tempFile.deleteOnExit();
|
||||
}
|
||||
catch( IOException ex ) {
|
||||
throw new UserException.BadTmpDir("Unable to create temporary file for stub: " + stub.getClass().getName() );
|
||||
}
|
||||
|
||||
return tempFile;
|
||||
}
|
||||
}
|
||||
|
|
@ -89,7 +89,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
|
|||
* @param tempFile File into which to direct the output data.
|
||||
*/
|
||||
public VariantContextWriterStorage(VariantContextWriterStub stub, File tempFile) {
|
||||
logger.debug("Creating temporary output file " + tempFile.getAbsolutePath() + " for VariantContext output.");
|
||||
//logger.debug("Creating temporary output file " + tempFile.getAbsolutePath() + " for VariantContext output.");
|
||||
this.file = tempFile;
|
||||
this.writer = vcfWriterToFile(stub, file, false);
|
||||
writer.writeHeader(stub.getVCFHeader());
|
||||
|
|
@ -154,6 +154,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
|
|||
}
|
||||
|
||||
public void add(VariantContext vc) {
|
||||
if ( closed ) throw new ReviewedStingException("Attempting to write to a closed VariantContextWriterStorage " + vc.getStart() + " storage=" + this);
|
||||
writer.add(vc);
|
||||
}
|
||||
|
||||
|
|
@ -170,8 +171,6 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
|
|||
* Close the VCF storage object.
|
||||
*/
|
||||
public void close() {
|
||||
if(file != null)
|
||||
logger.debug("Closing temporary file " + file.getAbsolutePath());
|
||||
writer.close();
|
||||
closed = true;
|
||||
}
|
||||
|
|
@ -181,7 +180,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
|
|||
if ( ! closed )
|
||||
throw new ReviewedStingException("Writer not closed, but we are merging into the file!");
|
||||
final String targetFilePath = target.file != null ? target.file.getAbsolutePath() : "/dev/stdin";
|
||||
logger.debug(String.format("Merging %s into %s",file.getAbsolutePath(),targetFilePath));
|
||||
logger.debug(String.format("Merging VariantContextWriterStorage from %s into %s", file.getAbsolutePath(), targetFilePath));
|
||||
|
||||
// use the feature manager to determine the right codec for the tmp file
|
||||
// that way we don't assume it's a specific type
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@
|
|||
package org.broadinstitute.sting.utils;
|
||||
|
||||
import com.google.java.contract.Ensures;
|
||||
import com.google.java.contract.Invariant;
|
||||
import com.google.java.contract.Requires;
|
||||
import com.google.java.contract.ThrowEnsures;
|
||||
import net.sf.picard.reference.ReferenceSequenceFile;
|
||||
|
|
@ -70,7 +69,6 @@ public final class GenomeLocParser {
|
|||
private CachingSequenceDictionary getContigInfo() {
|
||||
if ( contigInfoPerThread.get() == null ) {
|
||||
// initialize for this thread
|
||||
logger.debug("Creating thread-local caching sequence dictionary for thread " + Thread.currentThread().getName());
|
||||
contigInfoPerThread.set(new CachingSequenceDictionary(SINGLE_MASTER_SEQUENCE_DICTIONARY));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,8 +19,6 @@ public class NanoSchedulerIntegrationTest extends WalkerTest {
|
|||
|
||||
for ( final int nt : Arrays.asList(1, 2) )
|
||||
for ( final int nct : Arrays.asList(1, 2) ) {
|
||||
if ( nt > 1 && nct > 1 )
|
||||
continue; // TODO -- remove me when we support -nct and -nt together
|
||||
// tests.add(new Object[]{ "SNP", "a1c7546f32a8919a3f3a70a04b2e8322", nt, nct });
|
||||
//// tests.add(new Object[]{ "INDEL", "0a6d2be79f4f8a4b0eb788cc4751b31b", nt, nct });
|
||||
tests.add(new Object[]{ "BOTH", "081d077786ac0af24e9f97259a55209c", nt, nct });
|
||||
|
|
|
|||
Loading…
Reference in New Issue