Merge branch 'master' of ssh://gsa2.broadinstitute.org/humgen/gsa-scr1/gsa-engineering/git/unstable

This commit is contained in:
Ryan Poplin 2012-10-26 15:14:41 -04:00
commit b0dcc2c78e
13 changed files with 205 additions and 25 deletions

View File

@ -37,12 +37,6 @@ import java.util.*;
public class AlleleBiasedDownsamplingUtils {
private static final ArrayList<PileupElement>[] alleleStratifiedElements = new ArrayList[4];
static {
for ( int i = 0; i < 4; i++ )
alleleStratifiedElements[i] = new ArrayList<PileupElement>();
}
/**
* Computes an allele biased version of the given pileup
*
@ -51,13 +45,17 @@ public class AlleleBiasedDownsamplingUtils {
* @param log logging output
* @return allele biased pileup
*/
public synchronized static ReadBackedPileup createAlleleBiasedBasePileup(final ReadBackedPileup pileup, final double downsamplingFraction, final PrintStream log) {
public static ReadBackedPileup createAlleleBiasedBasePileup(final ReadBackedPileup pileup, final double downsamplingFraction, final PrintStream log) {
// special case removal of all or no reads
if ( downsamplingFraction <= 0.0 )
return pileup;
if ( downsamplingFraction >= 1.0 )
return new ReadBackedPileupImpl(pileup.getLocation(), new ArrayList<PileupElement>());
final ArrayList<PileupElement>[] alleleStratifiedElements = new ArrayList[4];
for ( int i = 0; i < 4; i++ )
alleleStratifiedElements[i] = new ArrayList<PileupElement>();
// start by stratifying the reads by the alleles they represent at this position
for( final PileupElement pe : pileup ) {
// abort if we have a reduced read - we do not want to remove it!

View File

@ -122,12 +122,12 @@ public class BQSRIntegrationTest extends WalkerTest {
public Object[][] createPRTestData() {
List<Object[]> tests = new ArrayList<Object[]>();
tests.add(new Object[]{1, new PRTest(" -qq -1", "a1d87da5dcbde35170d6ba6bc3ee2812")});
tests.add(new Object[]{1, new PRTest(" -qq 6", "a0fecae6d0e5ab9917862fa306186d10")});
tests.add(new Object[]{1, new PRTest(" -qq -1", "5226c06237b213b9e9b25a32ed92d09a")});
tests.add(new Object[]{1, new PRTest(" -qq 6", "b592a5c62b952a012e18adb898ea9c33")});
tests.add(new Object[]{1, new PRTest(" -DIQ", "8977bea0c57b808e65e9505eb648cdf7")});
for ( final int nct : Arrays.asList(1, 2, 4) ) {
tests.add(new Object[]{nct, new PRTest("", "d1bbb4ce6aa93e866f106f8b11d888ed")});
tests.add(new Object[]{nct, new PRTest("", "ab2f209ab98ad3432e208cbd524a4c4a")});
}
return tests.toArray(new Object[][]{});

View File

@ -64,6 +64,7 @@ import org.broadinstitute.sting.utils.threading.ThreadEfficiencyMonitor;
import java.io.File;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* A GenomeAnalysisEngine that runs a specified walker.
@ -73,6 +74,7 @@ public class GenomeAnalysisEngine {
* our log, which we want to capture anything from this class
*/
private static Logger logger = Logger.getLogger(GenomeAnalysisEngine.class);
public static final long NO_RUNTIME_LIMIT = -1;
/**
* The GATK command-line argument parsing code.
@ -1090,6 +1092,33 @@ public class GenomeAnalysisEngine {
public String createApproximateCommandLineArgumentString(Object... argumentProviders) {
return CommandLineUtils.createApproximateCommandLineArgumentString(parsingEngine,argumentProviders);
}
/**
* Does the current runtime in unit exceed the runtime limit, if one has been provided?
*
* @param runtime the runtime of this GATK instance in minutes
* @param unit the time unit of runtime
* @return false if not limit was requested or if runtime <= the limit, true otherwise
*/
public boolean exceedsRuntimeLimit(final long runtime, final TimeUnit unit) {
if ( runtime < 0 ) throw new IllegalArgumentException("runtime must be >= 0 but got " + runtime);
if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT )
return false;
else {
final long actualRuntimeNano = TimeUnit.NANOSECONDS.convert(runtime, unit);
final long maxRuntimeNano = getRuntimeLimitInNanoseconds();
return actualRuntimeNano > maxRuntimeNano;
}
}
/**
* @return the runtime limit in nanoseconds, or -1 if no limit was specified
*/
public long getRuntimeLimitInNanoseconds() {
if ( getArguments().maxRuntime == NO_RUNTIME_LIMIT )
return -1;
else
return TimeUnit.NANOSECONDS.convert(getArguments().maxRuntime, getArguments().maxRuntimeUnits);
}
}

View File

@ -31,6 +31,7 @@ import org.broadinstitute.sting.commandline.Argument;
import org.broadinstitute.sting.commandline.Hidden;
import org.broadinstitute.sting.commandline.Input;
import org.broadinstitute.sting.commandline.IntervalBinding;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.gatk.downsampling.DownsampleType;
import org.broadinstitute.sting.gatk.downsampling.DownsamplingMethod;
import org.broadinstitute.sting.gatk.phonehome.GATKRunReport;
@ -44,6 +45,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author aaron
@ -91,7 +93,7 @@ public class GATKArgumentCollection {
// --------------------------------------------------------------------------------------------------------------
//
// XXX
// General features
//
// --------------------------------------------------------------------------------------------------------------
@ -143,6 +145,12 @@ public class GATKArgumentCollection {
@Argument(fullName = "disableRandomization",doc="Completely eliminates randomization from nondeterministic methods. To be used mostly in the testing framework where dynamic parallelism can result in differing numbers of calls to the generator.")
public boolean disableRandomization = false;
@Argument(fullName = "maxRuntime", shortName = "maxRuntime", doc="If provided, that GATK will stop execution cleanly as soon after maxRuntime has been exceeded, truncating the run but not exiting with a failure. By default the value is interpreted in minutes, but this can be changed by maxRuntimeUnits", required = false)
public long maxRuntime = GenomeAnalysisEngine.NO_RUNTIME_LIMIT;
@Argument(fullName = "maxRuntimeUnits", shortName = "maxRuntimeUnits", doc="The TimeUnit for maxRuntime", required = false)
public TimeUnit maxRuntimeUnits = TimeUnit.MINUTES;
// --------------------------------------------------------------------------------------------------------------
//
// Downsampling Arguments

View File

@ -123,7 +123,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
final ReduceTree reduceTree = new ReduceTree(this);
initializeWalker(walker);
while (isShardTraversePending() || isTreeReducePending()) {
while (! abortExecution() && (isShardTraversePending() || isTreeReducePending())) {
// Check for errors during execution.
errorTracker.throwErrorIfPending();

View File

@ -63,7 +63,7 @@ public class LinearMicroScheduler extends MicroScheduler {
final TraversalEngine traversalEngine = borrowTraversalEngine(this);
for (Shard shard : shardStrategy ) {
if ( done || shard == null ) // we ran out of shards that aren't owned
if ( abortExecution() || done || shard == null ) // we ran out of shards that aren't owned
break;
if(shard.getShardType() == Shard.ShardType.LOCUS) {

View File

@ -39,6 +39,7 @@ import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.gatk.traversals.*;
import org.broadinstitute.sting.gatk.walkers.*;
import org.broadinstitute.sting.utils.AutoFormattingTime;
import org.broadinstitute.sting.utils.MathUtils;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
@ -52,6 +53,7 @@ import javax.management.ObjectName;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@ -269,6 +271,26 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
this.threadEfficiencyMonitor = threadEfficiencyMonitor;
}
/**
* Should we stop all execution work and exit gracefully?
*
* Returns true in the case where some external signal or time limit has been received, indicating
* that this GATK shouldn't continue executing. This isn't a kill signal, it is really a "shutdown
* gracefully at the next opportunity" signal. Concrete implementations of the MicroScheduler
* examine this value as often as reasonable and, if it returns true, stop what they are doing
* at the next available opportunity, shutdown their resources, call notify done, and return.
*
* @return true if we should abort execution, or false otherwise
*/
protected boolean abortExecution() {
final boolean abort = engine.exceedsRuntimeLimit(progressMeter.getRuntimeInNanoseconds(), TimeUnit.NANOSECONDS);
if ( abort ) {
final AutoFormattingTime aft = new AutoFormattingTime(TimeUnit.SECONDS.convert(engine.getRuntimeLimitInNanoseconds(), TimeUnit.NANOSECONDS), 1, 4);
logger.info("Aborting execution (cleanly) because the runtime has exceeded the requested maximum " + aft);
}
return abort;
}
/**
* Walks a walker over the given list of intervals.
*

View File

@ -55,8 +55,8 @@ public class AssessReducedQuals extends LocusWalker<GenomeLoc, GenomeLoc> implem
@Argument(fullName = "qual_epsilon", shortName = "epsilon", doc = "when |Quals_reduced_bam - Quals_original_bam| > epsilon*Quals_original_bam we output this interval", required = false)
public int qual_epsilon = 0;
@Argument(fullName = "debugLevel", shortName = "debug", doc = "debug mode on")
public int debugLevel = 0;
@Argument(fullName = "debugLevel", shortName = "debug", doc = "debug mode on") // TODO -- best to make this optional
public int debugLevel = 0; // TODO -- best to make this an enum or boolean
@Output
protected PrintStream out;
@ -114,7 +114,11 @@ public class AssessReducedQuals extends LocusWalker<GenomeLoc, GenomeLoc> implem
return quals;
}
// TODO -- arguments/variables should be final, not method declaration
private final boolean isGoodRead(PileupElement p, List<String> tags){
// TODO -- this isn't quite right. You don't need the tags here. Instead, you want to check whether the read itself (which
// TODO -- you can get from the PileupElement) is a reduced read (not all reads from the reduced bam are reduced) and only
// TODO -- for them do you want to ignore that min mapping quality cutoff (but you *do* still want the min base cutoff).
return !p.isDeletion() && (tags.contains(reduced) || (tags.contains(original) && (int)p.getQual() >= 20 && p.getMappingQual() >= 20));
}

View File

@ -4,12 +4,21 @@ package org.broadinstitute.sting.utils;
* Simple utility class that makes it convenient to print unit adjusted times
*/
public class AutoFormattingTime {
double timeInSeconds; // in Seconds
int precision; // for format
private final int width; // for format
private final int precision; // for format
public AutoFormattingTime(double timeInSeconds, int precision) {
double timeInSeconds; // in Seconds
private final String formatString;
public AutoFormattingTime(double timeInSeconds, final int width, int precision) {
this.width = width;
this.timeInSeconds = timeInSeconds;
this.precision = precision;
this.formatString = "%" + width + "." + precision + "f %s";
}
public AutoFormattingTime(double timeInSeconds, int precision) {
this(timeInSeconds, 6, precision);
}
public AutoFormattingTime(double timeInSeconds) {
@ -20,6 +29,20 @@ public class AutoFormattingTime {
return timeInSeconds;
}
/**
* @return the precision (a la format's %WIDTH.PERCISIONf)
*/
public int getWidth() {
return width;
}
/**
* @return the precision (a la format's %WIDTH.PERCISIONf)
*/
public int getPrecision() {
return precision;
}
/**
* Instead of 10000 s, returns 2.8 hours
* @return
@ -48,6 +71,6 @@ public class AutoFormattingTime {
}
}
return String.format("%6."+precision+"f %s", unitTime, unit);
return String.format(formatString, unitTime, unit);
}
}

View File

@ -24,6 +24,7 @@
package org.broadinstitute.sting.utils.progressmeter;
import com.google.java.contract.Ensures;
import com.google.java.contract.Invariant;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.*;
@ -200,6 +201,14 @@ public class ProgressMeter {
"Location", processingUnitName, processingUnitName));
}
/**
* @return the current runtime in nanoseconds
*/
@Ensures("result >= 0")
public long getRuntimeInNanoseconds() {
return timer.getElapsedTimeNano();
}
/**
* Utility routine that prints out process information (including timing) every N records or
* every M seconds, for N and M set in global variables.

View File

@ -0,0 +1,86 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk;
import org.broadinstitute.sting.WalkerTest;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class MaxRuntimeIntegrationTest extends WalkerTest {
private static final long STARTUP_TIME = TimeUnit.NANOSECONDS.convert(20, TimeUnit.SECONDS);
private class MaxRuntimeTestProvider extends TestDataProvider {
final long maxRuntime;
final TimeUnit unit;
public MaxRuntimeTestProvider(final long maxRuntime, final TimeUnit unit) {
super(MaxRuntimeTestProvider.class);
this.maxRuntime = maxRuntime;
this.unit = unit;
setName(String.format("Max runtime test : %d of %s", maxRuntime, unit));
}
public long expectedMaxRuntimeNano() {
return TimeUnit.NANOSECONDS.convert(maxRuntime, unit) + STARTUP_TIME;
}
}
@DataProvider(name = "MaxRuntimeProvider")
public Object[][] makeMaxRuntimeProvider() {
for ( final TimeUnit requestedUnits : Arrays.asList(TimeUnit.NANOSECONDS, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES) )
new MaxRuntimeTestProvider(requestedUnits.convert(30, TimeUnit.SECONDS), requestedUnits);
return MaxRuntimeTestProvider.getTests(MaxRuntimeTestProvider.class);
}
//
// Loop over errors to throw, make sure they are the errors we get back from the engine, regardless of NT type
//
@Test(enabled = true, dataProvider = "MaxRuntimeProvider", timeOut = 60 * 1000)
public void testMaxRuntime(final MaxRuntimeTestProvider cfg) {
WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec(
"-T UnifiedGenotyper -R " + hg18Reference
+ " -I " + validationDataLocation + "NA12878.WEx.downsampled20x.bam -o /dev/null"
+ " -maxRuntime " + cfg.maxRuntime + " -maxRuntimeUnits " + cfg.unit, 0,
Collections.<String>emptyList());
final SimpleTimer timer = new SimpleTimer().start();
executeTest("Max runtime " + cfg, spec);
final long actualRuntimeNano = timer.getElapsedTimeNano();
Assert.assertTrue(actualRuntimeNano < cfg.expectedMaxRuntimeNano(),
"Actual runtime " + TimeUnit.SECONDS.convert(actualRuntimeNano, TimeUnit.NANOSECONDS)
+ " exceeded max. tolerated runtime " + TimeUnit.SECONDS.convert(cfg.expectedMaxRuntimeNano(), TimeUnit.NANOSECONDS)
+ " given requested runtime " + cfg.maxRuntime + " " + cfg.unit);
}
}

View File

@ -117,24 +117,24 @@ public class UnifiedGenotyperIntegrationTest extends WalkerTest {
// Note that we need to turn off any randomization for this to work, so no downsampling and no annotations
String md5 = "3d25ea660275a455ca443a786bff3d32";
String md5 = "d408b4661b820ed86272415b8ea08780";
WalkerTest.WalkerTestSpec spec1 = new WalkerTest.WalkerTestSpec(
baseCommand + " -dt NONE -G none -I " + validationDataLocation + "NA12878.1kg.p2.chr1_10mb_11_mb.SLX.bam -o %s -L 1:10,000,000-10,075,000", 1,
baseCommand + " -dt NONE -G none --contamination_fraction_to_filter 0.0 -I " + validationDataLocation + "NA12878.1kg.p2.chr1_10mb_11_mb.SLX.bam -o %s -L 1:10,000,000-10,075,000", 1,
Arrays.asList(md5));
executeTest("test parallelization (single thread)", spec1);
GenomeAnalysisEngine.resetRandomGenerator();
WalkerTest.WalkerTestSpec spec2 = new WalkerTest.WalkerTestSpec(
baseCommand + " -dt NONE -G none -I " + validationDataLocation + "NA12878.1kg.p2.chr1_10mb_11_mb.SLX.bam -o %s -L 1:10,000,000-10,075,000 -nt 2", 1,
baseCommand + " -dt NONE -G none --contamination_fraction_to_filter 0.0 -I " + validationDataLocation + "NA12878.1kg.p2.chr1_10mb_11_mb.SLX.bam -o %s -L 1:10,000,000-10,075,000 -nt 2", 1,
Arrays.asList(md5));
executeTest("test parallelization (2 threads)", spec2);
GenomeAnalysisEngine.resetRandomGenerator();
WalkerTest.WalkerTestSpec spec3 = new WalkerTest.WalkerTestSpec(
baseCommand + " -dt NONE -G none -I " + validationDataLocation + "NA12878.1kg.p2.chr1_10mb_11_mb.SLX.bam -o %s -L 1:10,000,000-10,075,000 -nt 4", 1,
baseCommand + " -dt NONE -G none --contamination_fraction_to_filter 0.0 -I " + validationDataLocation + "NA12878.1kg.p2.chr1_10mb_11_mb.SLX.bam -o %s -L 1:10,000,000-10,075,000 -nt 4", 1,
Arrays.asList(md5));
executeTest("test parallelization (4 threads)", spec3);
}

View File

@ -32,11 +32,12 @@ public class NanoSchedulerIntegrationTest extends WalkerTest {
WalkerTestSpec spec = new WalkerTestSpec(
buildCommandLine(
"-T UnifiedGenotyper -R " + b37KGReference,
"-nosl --no_cmdline_in_header -G",
"--no_cmdline_in_header -G",
//"--dbsnp " + b37dbSNP132,
"-I " + privateTestDir + "NA12878.HiSeq.b37.chr20.10_11mb.bam",
"-L 20:10,000,000-10,100,000",
"-glm " + glm,
"--contamination_fraction_to_filter 0.0",
"-nt " + nt,
"-nct " + nct,
"-o %s"