diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index dca2ecb7b..31f2a469c 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -7,7 +7,7 @@ import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource; import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.io.OutputTracker; -import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker; +import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker; import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; @@ -39,7 +39,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * A thread local output tracker for managing output per-thread. */ - private ThreadBasedOutputTracker outputTracker = new ThreadBasedOutputTracker(); + private ThreadGroupOutputTracker outputTracker = new ThreadGroupOutputTracker(); private final Queue reduceTasks = new LinkedList(); @@ -101,7 +101,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar /** * Creates threads for HMS each with a unique thread group. Critical to - * track outputs via the ThreadBasedOutputTracker. + * track outputs via the ThreadGroupOutputTracker. */ private static class UniqueThreadGroupThreadFactory implements ThreadFactory { int counter = 0; diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java index 6d165f76a..d9a694846 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/ShardTraverser.java @@ -4,7 +4,7 @@ import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.reads.Shard; -import org.broadinstitute.sting.gatk.io.ThreadBasedOutputTracker; +import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker; import org.broadinstitute.sting.gatk.traversals.TraversalEngine; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.Utils; @@ -30,7 +30,7 @@ public class ShardTraverser implements Callable { final private HierarchicalMicroScheduler microScheduler; final private Walker walker; final private Shard shard; - final private ThreadBasedOutputTracker outputTracker; + final private ThreadGroupOutputTracker outputTracker; private OutputMergeTask outputMergeTask; /** our log, which we want to capture anything from this class */ @@ -44,7 +44,7 @@ public class ShardTraverser implements Callable { public ShardTraverser( HierarchicalMicroScheduler microScheduler, Walker walker, Shard shard, - ThreadBasedOutputTracker outputTracker) { + ThreadGroupOutputTracker outputTracker) { this.microScheduler = microScheduler; this.walker = walker; this.shard = shard; diff --git a/public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java b/public/java/src/org/broadinstitute/sting/gatk/io/ThreadGroupOutputTracker.java similarity index 86% rename from public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java rename to public/java/src/org/broadinstitute/sting/gatk/io/ThreadGroupOutputTracker.java index f26d0c954..fdfe494a7 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/io/ThreadBasedOutputTracker.java +++ b/public/java/src/org/broadinstitute/sting/gatk/io/ThreadGroupOutputTracker.java @@ -51,12 +51,12 @@ import java.util.Map; * @author mhanna, depristo * @version 0.2 */ -public class ThreadBasedOutputTracker extends OutputTracker { +public class ThreadGroupOutputTracker extends OutputTracker { /** * A map from thread ID of the master thread to the storage map from * Stub to Storage objects */ - private Map> threadsToStorage = new HashMap>(); + private Map> threadsToStorage = new HashMap>(); /** * A total hack. If bypass = true, bypass thread local storage and write directly @@ -79,12 +79,12 @@ public class ThreadBasedOutputTracker extends OutputTracker { * the master thread ID. */ public synchronized void initializeStorage() { - final long threadID = Thread.currentThread().getId(); - Map threadLocalOutputStreams = threadsToStorage.get(threadID); + final ThreadGroup group = Thread.currentThread().getThreadGroup(); + Map threadLocalOutputStreams = threadsToStorage.get(group); if( threadLocalOutputStreams == null ) { threadLocalOutputStreams = new HashMap(); - threadsToStorage.put( threadID, threadLocalOutputStreams ); + threadsToStorage.put( group, threadLocalOutputStreams ); } for ( final Stub stub : outputs.keySet() ) { @@ -118,27 +118,15 @@ public class ThreadBasedOutputTracker extends OutputTracker { } - final Thread[] members = new Thread[1000]; // TODO -- dangerous -- fixme private synchronized Map findStorage(final Thread thread) { - final Map map = threadsToStorage.get(thread.getId()); + final Map map = threadsToStorage.get(thread.getThreadGroup()); + if ( map != null ) { return map; } else { - final ThreadGroup tg = thread.getThreadGroup(); - final int nInfo = tg.enumerate(members); - if ( nInfo == members.length ) - throw new ReviewedStingException("too many threads in thread-group " + tg + " to safely get info. " + - "Maximum allowed threads is " + members.length); - - for ( int i = 0; i < nInfo; i++ ) { - final Map map2 = threadsToStorage.get(members[i].getId()); - if ( map2 != null ) - return map2; - } - // something is terribly wrong, we have a storage lookup for a thread that doesn't have // any map data associated with it! - throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " id " + thread.getId()); + throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " in group " + thread.getThreadGroup()); } }