Merge branch 'master' of ssh://gsa2/humgen/gsa-scr1/gsa-engineering/git/unstable

This commit is contained in:
Eric Banks 2012-09-24 21:46:36 -04:00
commit caa431c367
11 changed files with 174 additions and 86 deletions

View File

@ -89,6 +89,10 @@ public class ExperimentalReadShardBalancer extends ShardBalancer {
// If we've exhausted the current file pointer of reads, move to the next file pointer (if there is one): // If we've exhausted the current file pointer of reads, move to the next file pointer (if there is one):
if ( currentFilePointerReadsIterator != null && ! currentFilePointerReadsIterator.hasNext() ) { if ( currentFilePointerReadsIterator != null && ! currentFilePointerReadsIterator.hasNext() ) {
// Close the old, exhausted chain of iterators to release resources
currentFilePointerReadsIterator.close();
do { do {
advanceFilePointer(); advanceFilePointer();
} while ( currentFilePointer != null && isEmpty(currentFilePointer.fileSpans) ); // skip empty file pointers } while ( currentFilePointer != null && isEmpty(currentFilePointer.fileSpans) ); // skip empty file pointers

View File

@ -7,13 +7,13 @@ import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.io.OutputTracker; import org.broadinstitute.sting.gatk.io.OutputTracker;
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker; import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory; import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
import java.util.Collection; import java.util.Collection;
@ -39,7 +39,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/** /**
* A thread local output tracker for managing output per-thread. * A thread local output tracker for managing output per-thread.
*/ */
private ThreadLocalOutputTracker outputTracker = new ThreadLocalOutputTracker(); private ThreadGroupOutputTracker outputTracker = new ThreadGroupOutputTracker();
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>(); private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
@ -93,11 +93,23 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
final int nThreadsToUse = threadAllocation.getNumDataThreads(); final int nThreadsToUse = threadAllocation.getNumDataThreads();
if ( threadAllocation.monitorThreadEfficiency() ) { if ( threadAllocation.monitorThreadEfficiency() ) {
final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse); throw new UserException.BadArgumentValue("nt", "Cannot monitor thread efficiency with -nt, sorry");
setThreadEfficiencyMonitor(monitoringThreadFactory); }
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory);
} else { this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, new UniqueThreadGroupThreadFactory());
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); }
/**
* Creates threads for HMS each with a unique thread group. Critical to
* track outputs via the ThreadGroupOutputTracker.
*/
private static class UniqueThreadGroupThreadFactory implements ThreadFactory {
int counter = 0;
@Override
public Thread newThread(Runnable r) {
final ThreadGroup group = new ThreadGroup("HMS-group-" + counter++);
return new Thread(group, r);
} }
} }
@ -253,6 +265,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
protected void mergeExistingOutput( boolean wait ) { protected void mergeExistingOutput( boolean wait ) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
// logger.warn("MergingExistingOutput");
// printOutputMergeTasks();
// Create a list of the merge tasks that will be performed in this run of the mergeExistingOutput(). // Create a list of the merge tasks that will be performed in this run of the mergeExistingOutput().
Queue<ShardTraverser> mergeTasksInSession = new LinkedList<ShardTraverser>(); Queue<ShardTraverser> mergeTasksInSession = new LinkedList<ShardTraverser>();
while( !outputMergeTasks.isEmpty() ) { while( !outputMergeTasks.isEmpty() ) {
@ -266,8 +281,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
mergeTasksInSession.add(traverser); mergeTasksInSession.add(traverser);
} }
// logger.warn("Selected things to merge:");
// printOutputMergeTasks(mergeTasksInSession);
// Actually run through, merging the tasks in the working queue. // Actually run through, merging the tasks in the working queue.
for( ShardTraverser traverser: mergeTasksInSession ) { for( ShardTraverser traverser: mergeTasksInSession ) {
//logger.warn("*** Merging " + traverser.getIntervalsString());
if( !traverser.isComplete() ) if( !traverser.isComplete() )
traverser.waitForComplete(); traverser.waitForComplete();
@ -312,11 +331,24 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
reduceTree.addEntry(traverseResult); reduceTree.addEntry(traverseResult);
outputMergeTasks.add(traverser); outputMergeTasks.add(traverser);
// logger.warn("adding merge task");
// printOutputMergeTasks();
// No more data? Let the reduce tree know so it can finish processing what it's got. // No more data? Let the reduce tree know so it can finish processing what it's got.
if (!isShardTraversePending()) if (!isShardTraversePending())
reduceTree.complete(); reduceTree.complete();
} }
private synchronized void printOutputMergeTasks() {
printOutputMergeTasks(outputMergeTasks);
}
private synchronized void printOutputMergeTasks(final Queue<ShardTraverser> tasks) {
logger.info("Output merge tasks " + tasks.size());
for ( final ShardTraverser traverser : tasks )
logger.info(String.format("\t%s: complete? %b", traverser.getIntervalsString(), traverser.isComplete()));
}
/** Pulls the next reduce from the queue and runs it. */ /** Pulls the next reduce from the queue and runs it. */
protected void queueNextTreeReduce( Walker walker ) { protected void queueNextTreeReduce( Walker walker ) {
if (reduceTasks.size() == 0) if (reduceTasks.size() == 0)

View File

@ -61,7 +61,7 @@ public class LinearMicroScheduler extends MicroScheduler {
boolean done = walker.isDone(); boolean done = walker.isDone();
int counter = 0; int counter = 0;
final TraversalEngine traversalEngine = borrowTraversalEngine(); final TraversalEngine traversalEngine = borrowTraversalEngine(this);
for (Shard shard : shardStrategy ) { for (Shard shard : shardStrategy ) {
if ( done || shard == null ) // we ran out of shards that aren't owned if ( done || shard == null ) // we ran out of shards that aren't owned
break; break;
@ -97,7 +97,7 @@ public class LinearMicroScheduler extends MicroScheduler {
Object result = accumulator.finishTraversal(); Object result = accumulator.finishTraversal();
outputTracker.close(); outputTracker.close();
returnTraversalEngine(traversalEngine); returnTraversalEngine(this, traversalEngine);
cleanup(); cleanup();
executionIsDone(); executionIsDone();

View File

@ -51,10 +51,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.io.File; import java.io.File;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.Collection; import java.util.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/** /**
@ -94,6 +91,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
*/ */
final LinkedList<TraversalEngine> availableTraversalEngines = new LinkedList<TraversalEngine>(); final LinkedList<TraversalEngine> availableTraversalEngines = new LinkedList<TraversalEngine>();
/**
* Engines that have been allocated to a key already.
*/
final HashMap<Object, TraversalEngine> allocatedTraversalEngines = new HashMap<Object, TraversalEngine>();
/** /**
* Counts the number of instances of the class that are currently alive. * Counts the number of instances of the class that are currently alive.
*/ */
@ -145,6 +147,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
logger.warn(String.format("Number of requested GATK threads %d is more than the number of " + logger.warn(String.format("Number of requested GATK threads %d is more than the number of " +
"available processors on this machine %d", threadAllocation.getTotalNumThreads(), "available processors on this machine %d", threadAllocation.getTotalNumThreads(),
Runtime.getRuntime().availableProcessors())); Runtime.getRuntime().availableProcessors()));
// if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1)
// throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1");
} }
if ( threadAllocation.getNumDataThreads() > 1 ) { if ( threadAllocation.getNumDataThreads() > 1 ) {
@ -315,10 +319,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* pointers to the traversal engines * pointers to the traversal engines
*/ */
public synchronized void shutdownTraversalEngines() { public synchronized void shutdownTraversalEngines() {
if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() ) // no longer applicable because engines are allocated to keys now
throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " + // if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() )
"have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size() // throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " +
+ " have been returned"); // "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size()
// + " have been returned");
for ( final TraversalEngine te : allCreatedTraversalEngines) for ( final TraversalEngine te : allCreatedTraversalEngines)
te.shutdown(); te.shutdown();
@ -389,21 +394,37 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
} }
/** /**
* Returns a traversal engine suitable for use in this thread. * Returns a traversal engine suitable for use, associated with key
* *
* Pops the next available engine from the available ones maintained by this * Key is an arbitrary object that is used to retrieve the same traversal
* engine over and over. This can be important in the case where the
* traversal engine has data associated with it in some other context,
* and we need to ensure that the context always sees the same traversal
* engine. This happens in the HierarchicalMicroScheduler, where you want
* the a thread executing traversals to retrieve the same engine each time,
* as outputs are tracked w.r.t. that engine.
*
* If no engine is associated with key yet, pops the next available engine
* from the available ones maintained by this
* microscheduler. Note that it's a runtime error to pop a traversal engine * microscheduler. Note that it's a runtime error to pop a traversal engine
* from this scheduler if there are none available. Callers that * from this scheduler if there are none available. Callers that
* once pop'd an engine for use must return it with returnTraversalEngine * once pop'd an engine for use must return it with returnTraversalEngine
* *
* @param key the key to associate with this engine
* @return a non-null TraversalEngine suitable for execution in this scheduler * @return a non-null TraversalEngine suitable for execution in this scheduler
*/ */
@Ensures("result != null") @Ensures("result != null")
protected synchronized TraversalEngine borrowTraversalEngine() { protected synchronized TraversalEngine borrowTraversalEngine(final Object key) {
if ( availableTraversalEngines.isEmpty() ) if ( key == null ) throw new IllegalArgumentException("key cannot be null");
throw new IllegalStateException("no traversal engines were available");
else { final TraversalEngine engine = allocatedTraversalEngines.get(key);
return availableTraversalEngines.pop(); if ( engine == null ) {
if ( availableTraversalEngines.isEmpty() )
throw new IllegalStateException("no traversal engines were available");
allocatedTraversalEngines.put(key, availableTraversalEngines.pop());
return allocatedTraversalEngines.get(key);
} else {
return engine;
} }
} }
@ -411,14 +432,18 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* Return a borrowed traversal engine to this MicroScheduler, for later use * Return a borrowed traversal engine to this MicroScheduler, for later use
* in another traversal execution * in another traversal execution
* *
* @param key the key used to id the engine, provided to the borrowTraversalEngine function
* @param traversalEngine the borrowed traversal engine. Must have been previously borrowed. * @param traversalEngine the borrowed traversal engine. Must have been previously borrowed.
*/ */
protected synchronized void returnTraversalEngine(final TraversalEngine traversalEngine) { protected synchronized void returnTraversalEngine(final Object key, final TraversalEngine traversalEngine) {
if ( traversalEngine == null ) if ( traversalEngine == null )
throw new IllegalArgumentException("Attempting to push a null traversal engine"); throw new IllegalArgumentException("Attempting to push a null traversal engine");
if ( ! allCreatedTraversalEngines.contains(traversalEngine) ) if ( ! allCreatedTraversalEngines.contains(traversalEngine) )
throw new IllegalArgumentException("Attempting to push a traversal engine not created by this MicroScheduler" + engine); throw new IllegalArgumentException("Attempting to push a traversal engine not created by this MicroScheduler" + engine);
if ( ! allocatedTraversalEngines.containsKey(key) )
throw new IllegalArgumentException("No traversal engine was never checked out with key " + key);
availableTraversalEngines.push(traversalEngine); // note there's nothing to actually do here, but a function implementation
// might want to do something
} }
} }

View File

@ -4,9 +4,10 @@ import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker;
import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.Utils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -29,7 +30,7 @@ public class ShardTraverser implements Callable {
final private HierarchicalMicroScheduler microScheduler; final private HierarchicalMicroScheduler microScheduler;
final private Walker walker; final private Walker walker;
final private Shard shard; final private Shard shard;
final private ThreadLocalOutputTracker outputTracker; final private ThreadGroupOutputTracker outputTracker;
private OutputMergeTask outputMergeTask; private OutputMergeTask outputMergeTask;
/** our log, which we want to capture anything from this class */ /** our log, which we want to capture anything from this class */
@ -43,7 +44,7 @@ public class ShardTraverser implements Callable {
public ShardTraverser( HierarchicalMicroScheduler microScheduler, public ShardTraverser( HierarchicalMicroScheduler microScheduler,
Walker walker, Walker walker,
Shard shard, Shard shard,
ThreadLocalOutputTracker outputTracker) { ThreadGroupOutputTracker outputTracker) {
this.microScheduler = microScheduler; this.microScheduler = microScheduler;
this.walker = walker; this.walker = walker;
this.shard = shard; this.shard = shard;
@ -51,13 +52,15 @@ public class ShardTraverser implements Callable {
} }
public Object call() { public Object call() {
final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine(); final Object traversalEngineKey = Thread.currentThread();
final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine(traversalEngineKey);
try { try {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
// this is CRITICAL -- initializes the thread-local output maps in the parent thread, // this is CRITICAL -- initializes output maps in this master thread,
// so that any subthreads created by the traversal itself are shared... // so that any subthreads created by the traversal itself can access this map
outputTracker.getStorageAndInitializeIfNecessary(); outputTracker.initializeStorage();
Object accumulator = walker.reduceInit(); Object accumulator = walker.reduceInit();
final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(), final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(),
@ -85,12 +88,20 @@ public class ShardTraverser implements Callable {
} finally { } finally {
synchronized(this) { synchronized(this) {
complete = true; complete = true;
microScheduler.returnTraversalEngine(traversalEngine); microScheduler.returnTraversalEngine(traversalEngineKey, traversalEngine);
notifyAll(); notifyAll();
} }
} }
} }
/**
* Return a human readable string describing the intervals this traverser is operating on
* @return
*/
public String getIntervalsString() {
return Utils.join(",", shard.getGenomeLocs());
}
/** /**
* Has this traversal completed? * Has this traversal completed?
* @return True if completed, false otherwise. * @return True if completed, false otherwise.

View File

@ -29,6 +29,7 @@ import org.broadinstitute.sting.gatk.executive.OutputMergeTask;
import org.broadinstitute.sting.gatk.io.storage.Storage; import org.broadinstitute.sting.gatk.io.storage.Storage;
import org.broadinstitute.sting.gatk.io.storage.StorageFactory; import org.broadinstitute.sting.gatk.io.storage.StorageFactory;
import org.broadinstitute.sting.gatk.io.stubs.Stub; import org.broadinstitute.sting.gatk.io.stubs.Stub;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File; import java.io.File;
@ -37,19 +38,25 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
* An output tracker that can either track its output per-thread or directly, * An output tracker that can either track its output per-thread or directly.
*
* This output tracker doesn't use thread local values, but rather looks up the
* storage map via the thread's group. This is necessary in the case where
* there's a master thread that creates the output map, and spawns subthreads
* that actually do work. As long as those subthreads are spawned in the
* thread group of the master thread, this tracker will properly find the
* storage map associated with the master thread in the group, and return
* the map to all subthreads.
* *
* @author mhanna, depristo * @author mhanna, depristo
* @version 0.2 * @version 0.2
*/ */
public class ThreadLocalOutputTracker extends OutputTracker { public class ThreadGroupOutputTracker extends OutputTracker {
/** /**
* Thread-local storage for output streams. * A map from thread ID of the master thread to the storage map from
* * Stub to Storage objects
* MUST BE A INHERITABLE THREAD LOCAL
* -- NanoScheduler creates subthreads, and these threads must inherit the binding from their parent
*/ */
private ThreadLocal<Map<Stub, Storage>> storage = new InheritableThreadLocal<Map<Stub, Storage>>(); private Map<ThreadGroup, Map<Stub, Storage>> threadsToStorage = new HashMap<ThreadGroup, Map<Stub, Storage>>();
/** /**
* A total hack. If bypass = true, bypass thread local storage and write directly * A total hack. If bypass = true, bypass thread local storage and write directly
@ -61,32 +68,36 @@ public class ThreadLocalOutputTracker extends OutputTracker {
} }
/** /**
* Initialize the storage map for this thread, if necessary. * Initialize the storage map for this thread.
* *
* Checks if there's a thread local binding for this thread, and if * Checks if there's a thread local binding for this thread, and if
* not initializes it. * not initializes the map for it. This map is then
* populated with stub -> storage bindings according to the
* superclasses' outputs map.
* *
* Particularly useful in the case where we want to initialize the map in * Must be called within the master thread to create a map associated with
* a parent thread but have it used available to all the children via * the master thread ID.
* the InheritedThreadLocal map.
*
* @return the storage
*/ */
public Map<Stub,Storage> getStorageAndInitializeIfNecessary() { public synchronized void initializeStorage() {
Map<Stub,Storage> threadLocalOutputStreams = storage.get(); final ThreadGroup group = Thread.currentThread().getThreadGroup();
Map<Stub,Storage> threadLocalOutputStreams = threadsToStorage.get(group);
if( threadLocalOutputStreams == null ) { if( threadLocalOutputStreams == null ) {
threadLocalOutputStreams = new HashMap<Stub,Storage>(); threadLocalOutputStreams = new HashMap<Stub,Storage>();
storage.set( threadLocalOutputStreams ); threadsToStorage.put( group, threadLocalOutputStreams );
} }
return threadLocalOutputStreams; for ( final Stub stub : outputs.keySet() ) {
final Storage target = StorageFactory.createStorage(stub, createTempFile(stub));
threadLocalOutputStreams.put(stub, target);
}
} }
public <T> T getStorage( Stub<T> stub ) { @Override
public <T> T getStorage( final Stub<T> stub ) {
Storage target; Storage target;
if(bypass) { if (bypass) {
target = outputs.get(stub); target = outputs.get(stub);
if( target == null ) { if( target == null ) {
target = StorageFactory.createStorage(stub); target = StorageFactory.createStorage(stub);
@ -94,36 +105,50 @@ public class ThreadLocalOutputTracker extends OutputTracker {
} }
} }
else { else {
final Map<Stub,Storage> threadLocalOutputStreams = getStorageAndInitializeIfNecessary(); final Map<Stub,Storage> threadLocalOutputStreams = findStorage(Thread.currentThread());
target = threadLocalOutputStreams.get(stub); target = threadLocalOutputStreams.get(stub);
if( target == null ) {
target = StorageFactory.createStorage(stub, createTempFile(stub)); // make sure something hasn't gone wrong, and we somehow find a map that doesn't include our stub
threadLocalOutputStreams.put(stub, target); if ( target == null )
} throw new ReviewedStingException("target isn't supposed to be null for " + Thread.currentThread()
+ " id " + Thread.currentThread().getId() + " map is " + threadLocalOutputStreams);
} }
return (T)target; return (T)target;
} }
private synchronized Map<Stub,Storage> findStorage(final Thread thread) {
final Map<Stub, Storage> map = threadsToStorage.get(thread.getThreadGroup());
if ( map != null ) {
return map;
} else {
// something is terribly wrong, we have a storage lookup for a thread that doesn't have
// any map data associated with it!
throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " in group " + thread.getThreadGroup());
}
}
/** /**
* Close down any existing temporary files which have been opened. * Close down any existing temporary files which have been opened.
*/ */
public OutputMergeTask closeStorage() { public synchronized OutputMergeTask closeStorage() {
Map<Stub,Storage> threadLocalOutputStreams = storage.get(); final Map<Stub,Storage> threadLocalOutputStreams = findStorage(Thread.currentThread());
if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() ) if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() )
return null; return null;
OutputMergeTask outputMergeTask = new OutputMergeTask(); final OutputMergeTask outputMergeTask = new OutputMergeTask();
for( Map.Entry<Stub,Storage> entry: threadLocalOutputStreams.entrySet() ) { for( Map.Entry<Stub,Storage> entry: threadLocalOutputStreams.entrySet() ) {
Stub stub = entry.getKey(); final Stub stub = entry.getKey();
Storage storageEntry = entry.getValue(); final Storage storageEntry = entry.getValue();
storageEntry.close(); storageEntry.close();
outputMergeTask.addMergeOperation(getTargetStream(stub),storageEntry); outputMergeTask.addMergeOperation(getTargetStream(stub), storageEntry);
} }
// logger.info("Closing " + Thread.currentThread().getId() + " => " + threadLocalOutputStreams);
threadLocalOutputStreams.clear(); threadLocalOutputStreams.clear();
return outputMergeTask; return outputMergeTask;
@ -136,16 +161,10 @@ public class ThreadLocalOutputTracker extends OutputTracker {
* @return A temp file, or throw an exception if the temp file cannot be created. * @return A temp file, or throw an exception if the temp file cannot be created.
*/ */
private <T> File createTempFile( Stub<T> stub ) { private <T> File createTempFile( Stub<T> stub ) {
File tempFile = null;
try { try {
tempFile = File.createTempFile( stub.getClass().getName(), null ); return File.createTempFile( stub.getClass().getName(), null );
//tempFile.deleteOnExit(); } catch( IOException ex ) {
}
catch( IOException ex ) {
throw new UserException.BadTmpDir("Unable to create temporary file for stub: " + stub.getClass().getName() ); throw new UserException.BadTmpDir("Unable to create temporary file for stub: " + stub.getClass().getName() );
} }
return tempFile;
} }
} }

View File

@ -89,7 +89,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
* @param tempFile File into which to direct the output data. * @param tempFile File into which to direct the output data.
*/ */
public VariantContextWriterStorage(VariantContextWriterStub stub, File tempFile) { public VariantContextWriterStorage(VariantContextWriterStub stub, File tempFile) {
logger.debug("Creating temporary output file " + tempFile.getAbsolutePath() + " for VariantContext output."); //logger.debug("Creating temporary output file " + tempFile.getAbsolutePath() + " for VariantContext output.");
this.file = tempFile; this.file = tempFile;
this.writer = vcfWriterToFile(stub, file, false); this.writer = vcfWriterToFile(stub, file, false);
writer.writeHeader(stub.getVCFHeader()); writer.writeHeader(stub.getVCFHeader());
@ -154,6 +154,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
} }
public void add(VariantContext vc) { public void add(VariantContext vc) {
if ( closed ) throw new ReviewedStingException("Attempting to write to a closed VariantContextWriterStorage " + vc.getStart() + " storage=" + this);
writer.add(vc); writer.add(vc);
} }
@ -170,8 +171,6 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
* Close the VCF storage object. * Close the VCF storage object.
*/ */
public void close() { public void close() {
if(file != null)
logger.debug("Closing temporary file " + file.getAbsolutePath());
writer.close(); writer.close();
closed = true; closed = true;
} }
@ -181,7 +180,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
if ( ! closed ) if ( ! closed )
throw new ReviewedStingException("Writer not closed, but we are merging into the file!"); throw new ReviewedStingException("Writer not closed, but we are merging into the file!");
final String targetFilePath = target.file != null ? target.file.getAbsolutePath() : "/dev/stdin"; final String targetFilePath = target.file != null ? target.file.getAbsolutePath() : "/dev/stdin";
logger.debug(String.format("Merging %s into %s",file.getAbsolutePath(),targetFilePath)); logger.debug(String.format("Merging VariantContextWriterStorage from %s into %s", file.getAbsolutePath(), targetFilePath));
// use the feature manager to determine the right codec for the tmp file // use the feature manager to determine the right codec for the tmp file
// that way we don't assume it's a specific type // that way we don't assume it's a specific type

View File

@ -26,7 +26,6 @@
package org.broadinstitute.sting.utils; package org.broadinstitute.sting.utils;
import com.google.java.contract.Ensures; import com.google.java.contract.Ensures;
import com.google.java.contract.Invariant;
import com.google.java.contract.Requires; import com.google.java.contract.Requires;
import com.google.java.contract.ThrowEnsures; import com.google.java.contract.ThrowEnsures;
import net.sf.picard.reference.ReferenceSequenceFile; import net.sf.picard.reference.ReferenceSequenceFile;
@ -70,7 +69,6 @@ public final class GenomeLocParser {
private CachingSequenceDictionary getContigInfo() { private CachingSequenceDictionary getContigInfo() {
if ( contigInfoPerThread.get() == null ) { if ( contigInfoPerThread.get() == null ) {
// initialize for this thread // initialize for this thread
logger.debug("Creating thread-local caching sequence dictionary for thread " + Thread.currentThread().getName());
contigInfoPerThread.set(new CachingSequenceDictionary(SINGLE_MASTER_SEQUENCE_DICTIONARY)); contigInfoPerThread.set(new CachingSequenceDictionary(SINGLE_MASTER_SEQUENCE_DICTIONARY));
} }

View File

@ -92,7 +92,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
runningMapJobSlots = new Semaphore(this.bufferSize); runningMapJobSlots = new Semaphore(this.bufferSize);
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d")); this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d"));
} }
// start timing the time spent outside of the nanoScheduler // start timing the time spent outside of the nanoScheduler

View File

@ -250,7 +250,7 @@ class VCFWriter extends IndexingVariantContextWriter {
mWriter.write("\n"); mWriter.write("\n");
mWriter.flush(); // necessary so that writing to an output stream will work mWriter.flush(); // necessary so that writing to an output stream will work
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Unable to write the VCF object to " + getStreamName()); throw new RuntimeException("Unable to write the VCF object to " + getStreamName(), e);
} }
} }

View File

@ -41,7 +41,7 @@ class DataProcessingPipelineTest {
" -D " + BaseTest.publicTestDir + "exampleDBSNP.vcf", " -D " + BaseTest.publicTestDir + "exampleDBSNP.vcf",
" -test ", " -test ",
" -p " + projectName).mkString " -p " + projectName).mkString
spec.fileMD5s += testOut -> "60d39ae909fdd049920b54e0965b6d3c" spec.fileMD5s += testOut -> "45d97df6d291695b92668e8a55c54cd0"
PipelineTest.executeTest(spec) PipelineTest.executeTest(spec)
} }
@ -60,7 +60,7 @@ class DataProcessingPipelineTest {
" -bwa /home/unix/carneiro/bin/bwa", " -bwa /home/unix/carneiro/bin/bwa",
" -bwape ", " -bwape ",
" -p " + projectName).mkString " -p " + projectName).mkString
spec.fileMD5s += testOut -> "61ca3237afdfabf78ee27a5bb80dae59" spec.fileMD5s += testOut -> "6e70efbe6bafc3fedd60bd406bd201db"
PipelineTest.executeTest(spec) PipelineTest.executeTest(spec)
} }