Cleanup for multithreading memory leak during integration tests...unregister MXBean at end

of traversal to avoid holding a reference to the microscheduler, which holds a reference to
the engine, which in turn holds a reference to the walker, which itself holds a reference to
all the data aggregated during the course of the traversal.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4594 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-10-28 18:37:42 +00:00
parent d768c6558d
commit 2f8057bf24
7 changed files with 45 additions and 77 deletions

View File

@ -478,7 +478,7 @@
<property name="queue.test.classes" value="${build.dir}/scala/testclasses"/>
<property name="queue.test.sources" value="scala/test"/>
<!-- provide a ceiling on the memory that unit/integration tests can consume. -->
<property name="test.maxmemory" value="5g"/>
<property name="test.maxmemory" value="4g"/>
<!-- TEST -->
<macrodef name="run-test">

View File

@ -134,19 +134,6 @@ public abstract class CommandLineProgram {
*/
protected String getArgumentSourceName( Class source ) { return source.toString(); }
/**
* The command-line argument system allows free-form String tags to accompany each
* object. However, there's no way for the clp to push these tags into the fields
* themselves, so we just provide a callback so that the clp can push tags into the
* argument system.
* @param key Key to use, created by the command-line argument system.
* @param tags List of freeform tags.
*/
protected void addTags(Object key, List<String> tags) {
// NO-OP by default.
}
/**
* this is the function that the inheriting class can expect to have called
* when all the argument processing is done

View File

@ -41,11 +41,6 @@ import java.util.*;
* A parser for Sting command-line arguments.
*/
public class ParsingEngine {
/**
* The command-line program at the heart of this parsing engine.
*/
CommandLineProgram clp = null;
/**
* A list of defined arguments against which command lines are matched.
* Package protected for testing access.
@ -74,14 +69,17 @@ public class ParsingEngine {
private Set<ArgumentTypeDescriptor> argumentTypeDescriptors = new LinkedHashSet<ArgumentTypeDescriptor>();
/**
* List of tags associated with the given instantiation of the command-line argument.
*/
private final Map<Object,List<String>> tags = new IdentityHashMap<Object,List<String>>();
/**
* our log, which we want to capture anything from org.broadinstitute.sting
*/
protected static Logger logger = Logger.getLogger(ParsingEngine.class);
public ParsingEngine( CommandLineProgram clp ) {
this.clp = clp;
parsingMethods.add( ParsingMethod.FullNameParsingMethod );
parsingMethods.add( ParsingMethod.ShortNameParsingMethod );
@ -294,9 +292,20 @@ public class ParsingEngine {
* @param tags List of tags, or empty list if no tags are present.
*/
public void addTags(Object key, List<String> tags) {
if(clp!=null) clp.addTags(key,tags);
this.tags.put(key,tags);
}
/**
* Gets the tags associated with a given object.
* @param key Key for which to find a tag.
* @return List of tags associated with this key.
*/
public List<String> getTags(Object key) {
if(!tags.containsKey(key))
return Collections.emptyList();
return tags.get(key);
}
/**
* Notify the user that a deprecated command-line argument has been used.
* @param argumentSource Deprecated argument source specified by user.

View File

@ -111,11 +111,6 @@ public abstract class AbstractGenomeAnalysisEngine {
*/
private Collection<Stub<?>> outputs = new ArrayList<Stub<?>>();
/**
* List of tags associated with the given instantiation of the command-line argument.
*/
private final Map<Object,List<String>> tags = new IdentityHashMap<Object,List<String>>();
/**
* Collection of the filters applied to the input data.
*/
@ -273,25 +268,13 @@ public abstract class AbstractGenomeAnalysisEngine {
outputs.add(stub);
}
/**
* Adds an association between a object created by the
* command-line argument system and a freeform list of tags.
* @param key Object created by the command-line argument system.
* @param tags List of tags to use when reading arguments.
*/
public void addTags(Object key, List<String> tags) {
this.tags.put(key,tags);
}
/**
* Gets the tags associated with a given object.
* @param key Key for which to find a tag.
* @return List of tags associated with this key.
*/
public List<String> getTags(Object key) {
if(!tags.containsKey(key))
return Collections.emptyList();
return tags.get(key);
return parsingEngine.getTags(key);
}
/**

View File

@ -182,14 +182,4 @@ public abstract class CommandLineExecutable extends CommandLineProgram {
protected String getArgumentSourceName( Class argumentSource ) {
return engine.getWalkerName((Class<Walker>)argumentSource);
}
/**
* Supply command-line argument tags to the GATK engine.
* @param key Key to use, created by the command-line argument system.
* @param tags List of freeform tags.
*/
@Override
protected void addTags(Object key, List<String> tags) {
engine.addTags(key,tags);
}
}
}

View File

@ -94,6 +94,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
super(engine, walker, reads, reference, rods);
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
}
public Object execute( Walker walker, ShardStrategy shardStrategy ) {
// Fast fail for walkers not supporting TreeReducible interface.
if (!( walker instanceof TreeReducible ))
throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers");
// JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean.
// To get around this limitation and since we have no job identifier at this point, register a simple counter that
@ -101,22 +107,16 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
int thisInstance;
synchronized(HierarchicalMicroScheduler.class) {
thisInstance = instanceNumber++;
}
}
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = null;
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("org.broadinstitute.sting.gatk.executive:type=HierarchicalMicroScheduler,instanceNumber="+thisInstance);
name = new ObjectName("org.broadinstitute.sting.gatk.executive:type=HierarchicalMicroScheduler,instanceNumber="+thisInstance);
mbs.registerMBean(this, name);
}
catch (JMException ex) {
throw new ReviewedStingException("Unable to register microscheduler with JMX", ex);
}
}
public Object execute( Walker walker, ShardStrategy shardStrategy ) {
// Fast fail for walkers not supporting TreeReducible interface.
if (!( walker instanceof TreeReducible ))
throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers");
traversalEngine.startTimers();
ReduceTree reduceTree = new ReduceTree(this);
@ -163,6 +163,13 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
outputTracker.close();
try {
mbs.unregisterMBean(name);
}
catch (JMException ex) {
throw new ReviewedStingException("Unable to unregister microscheduler with JMX", ex);
}
return result;
}

View File

@ -30,8 +30,7 @@ public class
for (String tests : testsEnumerations) {
WalkerTestSpec spec = new WalkerTestSpec(withSelect(tests, "DP < 50", "DP50") + " " + extraArgs + " -o %s",
1, Arrays.asList("158ac8e6d32eb2ea1bbeebfa512965de"));
//executeTestParallel("testSelect1", spec);
executeTest("testSelect1", spec);
executeTestParallel("testSelect1", spec);
}
}
@ -40,8 +39,7 @@ public class
String extraArgs = "-L 1:1-10,000,000";
WalkerTestSpec spec = new WalkerTestSpec( withSelect(withSelect(root, "DP < 50", "DP50"), "set==\"Intersection\"", "intersection") + " " + extraArgs + " -o %s",
1, Arrays.asList("cee96f61ffa1d042fe0c63550c508ec9"));
//executeTestParallel("testSelect2", spec);
executeTest("testSelect2", spec);
executeTestParallel("testSelect2", spec);
}
@Test
@ -51,8 +49,7 @@ public class
WalkerTestSpec spec = new WalkerTestSpec(cmdRoot + " -B:eval,VCF " + validationDataLocation + vcfFile + " -B:comp,VCF " + validationDataLocation + "GenotypeConcordanceComp.vcf -noStandard -E GenotypeConcordance -reportType CSV -o %s",
1,
Arrays.asList("7e9ce1b26cdeaa50705f5de163847638"));
//executeTestParallel("testVEGenotypeConcordance" + vcfFile, spec);
executeTest("testVEGenotypeConcordance" + vcfFile, spec);
executeTestParallel("testVEGenotypeConcordance" + vcfFile, spec);
}
}
@ -70,8 +67,7 @@ public class
WalkerTestSpec spec = new WalkerTestSpec( tests + " " + extraArgs + " -o %s",
1, // just one output file
Arrays.asList(md5));
//executeTestParallel("testVESimple", spec);
executeTest("testVESimple", spec);
executeTestParallel("testVESimple", spec);
}
}
}
@ -96,8 +92,7 @@ public class
WalkerTestSpec spec = new WalkerTestSpec(tests + " " + extraArgs1 + extraArgs2 + " -o %s",
1, // just one output file
Arrays.asList(md5));
//executeTestParallel("testVEComplex", spec);
executeTest("testVEComplex", spec);
executeTestParallel("testVEComplex", spec);
}
}
}
@ -114,8 +109,7 @@ public class
String md5 = "d41d8cd98f00b204e9800998ecf8427e";
WalkerTestSpec spec = new WalkerTestSpec(vecmd, 1, Arrays.asList(md5));
//executeTestParallel("testVEGenomicallyAnnotated", spec);
executeTest("testVEGenomicallyAnnotated", spec);
executeTestParallel("testVEGenomicallyAnnotated", spec);
}
@Test
@ -125,8 +119,7 @@ public class
WalkerTestSpec spec = new WalkerTestSpec(tests + " " + extraArgs + " -o %s -outputVCF %s -NO_HEADER",
2,
Arrays.asList("6b97a019402b3984fead9a4e8b7c7c2a", "989bc30dea6c8a4cf771cd1b9fdab488"));
//executeTestParallel("testVEWriteVCF", spec);
executeTest("testVEWriteVCF", spec);
executeTestParallel("testVEWriteVCF", spec);
}
}
@ -134,8 +127,7 @@ public class
public void testCompVsEvalAC() {
String extraArgs = "-T VariantEval -R "+b36KGReference+" -o %s -E GenotypeConcordance -B:evalYRI,VCF /humgen/gsa-hpprojects/GATK/data/Validation_Data/yri.trio.gatk.ug.very.few.lines.vcf -B:compYRI,VCF /humgen/gsa-hpprojects/GATK/data/Validation_Data/yri.trio.gatk.fake.genotypes.ac.test.vcf -reportType CSV";
WalkerTestSpec spec = new WalkerTestSpec(extraArgs,1,Arrays.asList("25a681855cb26e7380fbf1a93de0a41f"));
//executeTestParallel("testACDiscordanceAtAC1EvalAC2Comp",spec);
executeTest("testACDiscordanceAtAC1EvalAC2Comp",spec);
executeTestParallel("testACDiscordanceAtAC1EvalAC2Comp",spec);
}
private static String withSelect(String cmd, String select, String name) {