When writing the initial commit for nt + nct I realized this class was really just a ThreadGroupOutputTracker

-- The code is cleaner and the logical more obvious now.
This commit is contained in:
Mark DePristo 2012-09-23 18:19:10 -04:00
parent 3e8d992828
commit 9fd30d6f1c
3 changed files with 14 additions and 26 deletions

View File

@ -7,7 +7,7 @@ import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.io.OutputTracker;
import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker;
import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker;
@ -39,7 +39,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/**
* A thread local output tracker for managing output per-thread.
*/
private ThreadBasedOutputTracker outputTracker = new ThreadBasedOutputTracker();
private ThreadGroupOutputTracker outputTracker = new ThreadGroupOutputTracker();
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
@ -101,7 +101,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
/**
* Creates threads for HMS each with a unique thread group. Critical to
* track outputs via the ThreadBasedOutputTracker.
* track outputs via the ThreadGroupOutputTracker.
*/
private static class UniqueThreadGroupThreadFactory implements ThreadFactory {
int counter = 0;

View File

@ -4,7 +4,7 @@ import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker;
import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker;
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.Utils;
@ -30,7 +30,7 @@ public class ShardTraverser implements Callable {
final private HierarchicalMicroScheduler microScheduler;
final private Walker walker;
final private Shard shard;
final private ThreadBasedOutputTracker outputTracker;
final private ThreadGroupOutputTracker outputTracker;
private OutputMergeTask outputMergeTask;
/** our log, which we want to capture anything from this class */
@ -44,7 +44,7 @@ public class ShardTraverser implements Callable {
public ShardTraverser( HierarchicalMicroScheduler microScheduler,
Walker walker,
Shard shard,
ThreadBasedOutputTracker outputTracker) {
ThreadGroupOutputTracker outputTracker) {
this.microScheduler = microScheduler;
this.walker = walker;
this.shard = shard;

View File

@ -51,12 +51,12 @@ import java.util.Map;
* @author mhanna, depristo
* @version 0.2
*/
public class ThreadBasedOutputTracker extends OutputTracker {
public class ThreadGroupOutputTracker extends OutputTracker {
/**
* A map from thread ID of the master thread to the storage map from
* Stub to Storage objects
*/
private Map<Long, Map<Stub, Storage>> threadsToStorage = new HashMap<Long, Map<Stub, Storage>>();
private Map<ThreadGroup, Map<Stub, Storage>> threadsToStorage = new HashMap<ThreadGroup, Map<Stub, Storage>>();
/**
* A total hack. If bypass = true, bypass thread local storage and write directly
@ -79,12 +79,12 @@ public class ThreadBasedOutputTracker extends OutputTracker {
* the master thread ID.
*/
public synchronized void initializeStorage() {
final long threadID = Thread.currentThread().getId();
Map<Stub,Storage> threadLocalOutputStreams = threadsToStorage.get(threadID);
final ThreadGroup group = Thread.currentThread().getThreadGroup();
Map<Stub,Storage> threadLocalOutputStreams = threadsToStorage.get(group);
if( threadLocalOutputStreams == null ) {
threadLocalOutputStreams = new HashMap<Stub,Storage>();
threadsToStorage.put( threadID, threadLocalOutputStreams );
threadsToStorage.put( group, threadLocalOutputStreams );
}
for ( final Stub stub : outputs.keySet() ) {
@ -118,27 +118,15 @@ public class ThreadBasedOutputTracker extends OutputTracker {
}
final Thread[] members = new Thread[1000]; // TODO -- dangerous -- fixme
private synchronized Map<Stub,Storage> findStorage(final Thread thread) {
final Map<Stub, Storage> map = threadsToStorage.get(thread.getId());
final Map<Stub, Storage> map = threadsToStorage.get(thread.getThreadGroup());
if ( map != null ) {
return map;
} else {
final ThreadGroup tg = thread.getThreadGroup();
final int nInfo = tg.enumerate(members);
if ( nInfo == members.length )
throw new ReviewedStingException("too many threads in thread-group " + tg + " to safely get info. " +
"Maximum allowed threads is " + members.length);
for ( int i = 0; i < nInfo; i++ ) {
final Map<Stub, Storage> map2 = threadsToStorage.get(members[i].getId());
if ( map2 != null )
return map2;
}
// something is terribly wrong, we have a storage lookup for a thread that doesn't have
// any map data associated with it!
throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " id " + thread.getId());
throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " in group " + thread.getThreadGroup());
}
}