Fix GSA-515 Nanoscheduler GSA-573 -nt and -nct interact badly w.r.t. output
-- See https://jira.broadinstitute.org/browse/GSA-573 -- Uses InheritedThreadLocal storage so that children threads created by the NanoScheduler see the parent stubs in the main thread. -- Added explicit integration test that checks that -nt 1, 2 and -nct 1, 2 give the same results for GLM BOTH with the UG over 1 MB.
This commit is contained in:
parent
90b7df46cf
commit
b5fa848255
|
|
@ -55,6 +55,10 @@ public class ShardTraverser implements Callable {
|
|||
try {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
// this is CRITICAL -- initializes the thread-local output maps in the parent thread,
|
||||
// so that any subthreads created by the traversal itself are shared...
|
||||
outputTracker.getStorageAndInitializeIfNecessary();
|
||||
|
||||
Object accumulator = walker.reduceInit();
|
||||
final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(),
|
||||
microScheduler.getReadIterator(shard),
|
||||
|
|
|
|||
|
|
@ -39,14 +39,17 @@ import java.util.Map;
|
|||
/**
|
||||
* An output tracker that can either track its output per-thread or directly,
|
||||
*
|
||||
* @author mhanna
|
||||
* @version 0.1
|
||||
* @author mhanna, depristo
|
||||
* @version 0.2
|
||||
*/
|
||||
public class ThreadLocalOutputTracker extends OutputTracker {
|
||||
/**
|
||||
* Thread-local storage for output streams.
|
||||
*
|
||||
* MUST BE A INHERITABLE THREAD LOCAL
|
||||
* -- NanoScheduler creates subthreads, and these threads must inherit the binding from their parent
|
||||
*/
|
||||
private ThreadLocal<Map<Stub, Storage>> storage = new ThreadLocal<Map<Stub,Storage>>();
|
||||
private ThreadLocal<Map<Stub, Storage>> storage = new InheritableThreadLocal<Map<Stub, Storage>>();
|
||||
|
||||
/**
|
||||
* A total hack. If bypass = true, bypass thread local storage and write directly
|
||||
|
|
@ -57,6 +60,29 @@ public class ThreadLocalOutputTracker extends OutputTracker {
|
|||
this.bypass = bypass;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the storage map for this thread, if necessary.
|
||||
*
|
||||
* Checks if there's a thread local binding for this thread, and if
|
||||
* not initializes it.
|
||||
*
|
||||
* Particularly useful in the case where we want to initialize the map in
|
||||
* a parent thread but have it used available to all the children via
|
||||
* the InheritedThreadLocal map.
|
||||
*
|
||||
* @return the storage
|
||||
*/
|
||||
public Map<Stub,Storage> getStorageAndInitializeIfNecessary() {
|
||||
Map<Stub,Storage> threadLocalOutputStreams = storage.get();
|
||||
|
||||
if( threadLocalOutputStreams == null ) {
|
||||
threadLocalOutputStreams = new HashMap<Stub,Storage>();
|
||||
storage.set( threadLocalOutputStreams );
|
||||
}
|
||||
|
||||
return threadLocalOutputStreams;
|
||||
}
|
||||
|
||||
public <T> T getStorage( Stub<T> stub ) {
|
||||
Storage target;
|
||||
|
||||
|
|
@ -68,12 +94,7 @@ public class ThreadLocalOutputTracker extends OutputTracker {
|
|||
}
|
||||
}
|
||||
else {
|
||||
Map<Stub,Storage> threadLocalOutputStreams = storage.get();
|
||||
|
||||
if( threadLocalOutputStreams == null ) {
|
||||
threadLocalOutputStreams = new HashMap<Stub,Storage>();
|
||||
storage.set( threadLocalOutputStreams );
|
||||
}
|
||||
final Map<Stub,Storage> threadLocalOutputStreams = getStorageAndInitializeIfNecessary();
|
||||
|
||||
target = threadLocalOutputStreams.get(stub);
|
||||
if( target == null ) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
package org.broadinstitute.sting.utils.nanoScheduler;
|
||||
|
||||
import org.broadinstitute.sting.WalkerTest;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
// ********************************************************************************** //
|
||||
// Note that this class also serves as an integration test for the VariantAnnotator! //
|
||||
// ********************************************************************************** //
|
||||
|
||||
public class NanoSchedulerIntegrationTest extends WalkerTest {
|
||||
@DataProvider(name = "NanoSchedulerUGTest")
|
||||
public Object[][] createNanoSchedulerUGTest() {
|
||||
List<Object[]> tests = new ArrayList<Object[]>();
|
||||
|
||||
for ( final int nt : Arrays.asList(1, 2) )
|
||||
for ( final int nct : Arrays.asList(1, 2) ) {
|
||||
// tests.add(new Object[]{ "SNP", "a1c7546f32a8919a3f3a70a04b2e8322", nt, nct });
|
||||
// tests.add(new Object[]{ "INDEL", "0a6d2be79f4f8a4b0eb788cc4751b31b", nt, nct });
|
||||
tests.add(new Object[]{ "BOTH", "1eaf8ac30cdefd573850e58c1ec38790", nt, nct });
|
||||
}
|
||||
|
||||
return tests.toArray(new Object[][]{});
|
||||
}
|
||||
|
||||
@Test(enabled = true, dataProvider = "NanoSchedulerUGTest")
|
||||
private void testNanoSchedulerUGTest(final String glm, final String md5, final int nt, final int nct ) {
|
||||
WalkerTestSpec spec = new WalkerTestSpec(
|
||||
buildCommandLine(
|
||||
"-T UnifiedGenotyper -R " + b37KGReference,
|
||||
"-nosl --no_cmdline_in_header -G",
|
||||
//"--dbsnp " + b37dbSNP132,
|
||||
"-I " + privateTestDir + "NA12878.HiSeq.b37.chr20.10_11mb.bam",
|
||||
"-L 20:10,000,000-11,000,000",
|
||||
"-glm " + glm,
|
||||
"-nt " + nt,
|
||||
"-nct " + nct,
|
||||
"-o %s"
|
||||
),
|
||||
1,
|
||||
Arrays.asList(md5)
|
||||
);
|
||||
executeTest(String.format("testUG-glm:%s-nt%d-nct%d", glm, nt, nct), spec);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue