Merge branch 'master' of github.com:broadinstitute/gsa-unstable

This commit is contained in:
Ami Levy-Moonshine 2012-12-27 21:23:59 -05:00
commit f450cbc1a3
44 changed files with 1491 additions and 520 deletions

View File

@ -107,6 +107,12 @@
<!-- To run tests with debugging, use -Dtest.debug=true -Dtest.debug.port=XXXX on the command line -->
<property name="test.debug.port" value="5005" /> <!-- override on the command line if desired -->
<property name="test.default.maxmemory" value="4g"/>
<!-- clover parameters -->
<property name="clover.jar" location="private/resources/clover/lib/clover.jar"/>
<property name="clover.instrument.level" location="method"/>
<taskdef resource="cloverlib.xml" classpath="${clover.jar}"/>
<!-- ******************************************************************************** -->
@ -267,19 +273,19 @@
<!-- Comment out the following lines to build the GATK without a network connection, assuming you have all of the libraries cached already -->
<get src="http://repo1.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/${ivy.jar.file}"
dest="${ivy.jar.dir}/${ivy.jar.file}"
usetimestamp="true"/>
<taskdef resource="org/apache/ivy/ant/antlib.xml"
uri="antlib:org.apache.ivy.ant"
classpath="${ivy.jar.dir}/${ivy.jar.file}"/>
<!-- <get src="http://repo1.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/${ivy.jar.file}" -->
<!-- dest="${ivy.jar.dir}/${ivy.jar.file}" -->
<!-- usetimestamp="true"/> -->
<!-- <taskdef resource="org/apache/ivy/ant/antlib.xml" -->
<!-- uri="antlib:org.apache.ivy.ant" -->
<!-- classpath="${ivy.jar.dir}/${ivy.jar.file}"/> -->
<get src="http://repo1.maven.org/maven2/org/apache/maven/maven-ant-tasks/${maven-ant-tasks.install.version}/${maven-ant-tasks.jar.file}"
dest="${ivy.jar.dir}/${maven-ant-tasks.jar.file}"
usetimestamp="true"/>
<taskdef resource="org/apache/maven/artifact/ant/antlib.xml"
uri="antlib:antlib:org.apache.maven.artifact.ant"
classpath="${ivy.jar.dir}/${maven-ant-tasks.jar.file}"/>
<!-- <get src="http://repo1.maven.org/maven2/org/apache/maven/maven-ant-tasks/${maven-ant-tasks.install.version}/${maven-ant-tasks.jar.file}" -->
<!-- dest="${ivy.jar.dir}/${maven-ant-tasks.jar.file}" -->
<!-- usetimestamp="true"/> -->
<!-- <taskdef resource="org/apache/maven/artifact/ant/antlib.xml" -->
<!-- uri="antlib:antlib:org.apache.maven.artifact.ant" -->
<!-- classpath="${ivy.jar.dir}/${maven-ant-tasks.jar.file}"/> -->
<!-- End network lines -->
@ -596,6 +602,7 @@
<path id="doclet.classpath">
<path refid="external.dependencies" />
<pathelement location="${java.classes}" />
<pathelement location="${clover.jar}" />
</path>
<javadoc doclet="org.broadinstitute.sting.utils.help.ResourceBundleExtractorDoclet"
@ -1090,7 +1097,6 @@
<property name="iwww.report.dir" value="${user.home}/private_html/report"/>
<property name="test.output" value="${dist.dir}/test"/>
<property name="testng.jar" value="${lib.dir}/testng-5.14.1.jar"/>
<property name="test.maxmemory" value="4g"/> <!-- provide a ceiling on the memory that unit/integration tests can consume. -->
<path id="java.test.source.path">
<dirset dir="${basedir}">
@ -1121,6 +1127,7 @@
<path id="testng.default.classpath">
<path refid="build.results" />
<pathelement path="${clover.jar}"/>
<pathelement location="${java.contracts.dir}" />
<pathelement location="${java.test.classes}" />
<pathelement location="${scala.test.classes}" />
@ -1128,6 +1135,21 @@
<!-- Test targets -->
<target name="clover.clean">
<clover-clean/>
</target>
<target name="clover.report">
<clover-html-report outdir="clover_html" title="GATK Clover report"/>
</target>
<target name="with.clover">
<clover-setup fullyQualifyJavaLang="true" instrumentationLevel="${clover.instrument.level}">
</clover-setup>
<property name="compile.scala" value="false" /> <!-- currently doesn't work with scala -->
<property name="test.maxmemory" value="32g"/> <!-- clover requires lots of memory -->
</target>
<target name="test.init.compile">
<mkdir dir="${java.test.classes}"/>
<mkdir dir="${scala.test.classes}"/>
@ -1207,6 +1229,7 @@
<echo message="Test Classpath: ${test.classpath.display.string}" />
<echo message="" />
<echo message="Sting: Running @{testtype} test cases!"/>
<echo message="Test Memory : ${test.maxmemory}" />
<!-- no test is allowed to run for more than 10 hours -->
<taskdef resource="testngtasks" classpath="${testng.jar}"/>
@ -1220,10 +1243,11 @@
listeners="org.testng.reporters.FailedReporter,org.testng.reporters.JUnitXMLReporter,org.broadinstitute.sting.TestNGTestTransformer,org.broadinstitute.sting.StingTextReporter,org.uncommons.reportng.HTMLReporter">
<jvmarg value="-Xmx${test.maxmemory}" />
<jvmarg value="-ea" />
<jvmarg value="-Dclover.pertest.coverage=diff" />
<jvmarg value="-Djava.awt.headless=true" />
<jvmarg value="-Dpipeline.run=${pipeline.run}" />
<jvmarg value="-Djava.io.tmpdir=${java.io.tmpdir}" />
<jvmarg line="${cofoja.jvm.args}"/>
<!-- <jvmarg line="${cofoja.jvm.args}"/> -->
<jvmarg line="${debug.jvm.args}"/>
<!-- NOTE: To run tests with debugging, use -Dtest.debug=true -Dtest.debug.port=XXXX on the command line -->
@ -1262,6 +1286,7 @@
<target name="test.init">
<property name="testng.classpath" value="testng.default.classpath" />
<property name="test.maxmemory" value="${test.default.maxmemory}"/>
</target>
<target name="init.testgatkjar">
@ -1374,6 +1399,7 @@
<!-- Fast test target that cuts major corners for speed. Requires that a full build has been done first. Java-only, single test class only -->
<!-- Usage: ant fasttest -Dsingle=TestClass -->
<target name="fasttest" depends="init.javaonly,init">
<property name="test.maxmemory" value="${test.default.maxmemory}"/>
<condition property="not.clean">
<and>
<available file="${build.dir}" />

View File

@ -25,46 +25,46 @@ package org.broadinstitute.sting.gatk.walkers.bqsr;
* OTHER DEALINGS IN THE SOFTWARE.
*/
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.classloader.ProtectedPackageSource;
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
import org.broadinstitute.sting.utils.recalibration.EventType;
import org.broadinstitute.sting.utils.recalibration.ReadCovariates;
import org.broadinstitute.sting.utils.recalibration.RecalibrationTables;
import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
import org.broadinstitute.sting.utils.recalibration.RecalDatum;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.broadinstitute.sting.utils.threading.ThreadLocalArray;
import java.util.LinkedList;
import java.util.List;
public class AdvancedRecalibrationEngine extends StandardRecalibrationEngine implements ProtectedPackageSource {
private final static Logger logger = Logger.getLogger(AdvancedRecalibrationEngine.class);
// optimization: only allocate temp arrays once per thread
private final ThreadLocal<byte[]> threadLocalTempQualArray = new ThreadLocalArray<byte[]>(EventType.values().length, byte.class);
private final ThreadLocal<double[]> threadLocalTempFractionalErrorArray = new ThreadLocalArray<double[]>(EventType.values().length, double.class);
public void initialize(final Covariate[] covariates, final RecalibrationTables recalibrationTables) {
super.initialize(covariates, recalibrationTables);
}
final List<NestedIntegerArray<RecalDatum>> allThreadLocalQualityScoreTables = new LinkedList<NestedIntegerArray<RecalDatum>>();
private ThreadLocal<NestedIntegerArray<RecalDatum>> threadLocalQualityScoreTables = new ThreadLocal<NestedIntegerArray<RecalDatum>>() {
@Override
protected synchronized NestedIntegerArray<RecalDatum> initialValue() {
final NestedIntegerArray<RecalDatum> table = recalibrationTables.makeQualityScoreTable();
allThreadLocalQualityScoreTables.add(table);
return table;
}
};
@Override
public void updateDataForRead(final GATKSAMRecord read, final boolean[] skip, final double[] snpErrors, final double[] insertionErrors, final double[] deletionErrors ) {
final ReadCovariates readCovariates = covariateKeySetFrom(read);
byte[] tempQualArray = threadLocalTempQualArray.get();
double[] tempFractionalErrorArray = threadLocalTempFractionalErrorArray.get();
public void updateDataForRead( final ReadRecalibrationInfo recalInfo ) {
final GATKSAMRecord read = recalInfo.getRead();
final ReadCovariates readCovariates = recalInfo.getCovariatesValues();
final NestedIntegerArray<RecalDatum> qualityScoreTable = getThreadLocalQualityScoreTable();
for( int offset = 0; offset < read.getReadBases().length; offset++ ) {
if( !skip[offset] ) {
tempQualArray[EventType.BASE_SUBSTITUTION.index] = read.getBaseQualities()[offset];
tempFractionalErrorArray[EventType.BASE_SUBSTITUTION.index] = snpErrors[offset];
tempQualArray[EventType.BASE_INSERTION.index] = read.getBaseInsertionQualities()[offset];
tempFractionalErrorArray[EventType.BASE_INSERTION.index] = insertionErrors[offset];
tempQualArray[EventType.BASE_DELETION.index] = read.getBaseDeletionQualities()[offset];
tempFractionalErrorArray[EventType.BASE_DELETION.index] = deletionErrors[offset];
if( ! recalInfo.skip(offset) ) {
for (final EventType eventType : EventType.values()) {
final int[] keys = readCovariates.getKeySet(offset, eventType);
final int eventIndex = eventType.index;
final byte qual = tempQualArray[eventIndex];
final double isError = tempFractionalErrorArray[eventIndex];
final byte qual = recalInfo.getQual(eventType, offset);
final double isError = recalInfo.getErrorFraction(eventType, offset);
incrementDatumOrPutIfNecessary(recalibrationTables.getQualityScoreTable(), qual, isError, keys[0], keys[1], eventIndex);
incrementDatumOrPutIfNecessary(qualityScoreTable, qual, isError, keys[0], keys[1], eventIndex);
for (int i = 2; i < covariates.length; i++) {
if (keys[i] < 0)
@ -76,4 +76,24 @@ public class AdvancedRecalibrationEngine extends StandardRecalibrationEngine imp
}
}
}
/**
* Get a NestedIntegerArray for a QualityScore table specific to this thread
* @return a non-null NestedIntegerArray ready to be used to collect calibration info for the quality score covariate
*/
private NestedIntegerArray<RecalDatum> getThreadLocalQualityScoreTable() {
return threadLocalQualityScoreTables.get();
}
@Override
public void finalizeData() {
// merge in all of the thread local tables
logger.info("Merging " + allThreadLocalQualityScoreTables.size() + " thread-local quality score tables");
for ( final NestedIntegerArray<RecalDatum> localTable : allThreadLocalQualityScoreTables ) {
recalibrationTables.combineQualityScoreTable(localTable);
}
allThreadLocalQualityScoreTables.clear(); // cleanup after ourselves
super.finalizeData();
}
}

View File

@ -120,6 +120,11 @@ public class LikelihoodCalculationEngine {
for( int jjj = 0; jjj < numHaplotypes; jjj++ ) {
final Haplotype haplotype = haplotypes.get(jjj);
// TODO -- need to test against a reference/position with non-standard bases
if ( !Allele.acceptableAlleleBases(haplotype.getBases(), false) )
continue;
final int haplotypeStart = ( previousHaplotypeSeen == null ? 0 : computeFirstDifferingPosition(haplotype.getBases(), previousHaplotypeSeen.getBases()) );
previousHaplotypeSeen = haplotype;

View File

@ -44,6 +44,7 @@ import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.baq.ReadTransformingIterator;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.sam.GATKSAMReadGroupRecord;
import org.broadinstitute.sting.utils.sam.GATKSamRecordFactory;
import java.io.File;
@ -252,9 +253,10 @@ public class SAMDataSource {
if(readBufferSize != null)
ReadShard.setReadBufferSize(readBufferSize); // TODO: use of non-final static variable here is just awful, especially for parallel tests
else {
// Choose a sensible default for the read buffer size. For the moment, we're picking 1000 reads per BAM per shard (which effectively
// will mean per-thread once ReadWalkers are parallelized) with a max cap of 250K reads in memory at once.
ReadShard.setReadBufferSize(Math.min(10000*samFiles.size(),250000));
// Choose a sensible default for the read buffer size.
// Previously we we're picked 100000 reads per BAM per shard with a max cap of 250K reads in memory at once.
// Now we are simply setting it to 100K reads
ReadShard.setReadBufferSize(100000);
}
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
@ -894,9 +896,11 @@ public class SAMDataSource {
long lastTick = timer.currentTime();
for(final SAMReaderID readerID: readerIDs) {
final ReaderInitializer init = new ReaderInitializer(readerID).call();
if (removeProgramRecords) {
init.reader.getFileHeader().setProgramRecords(new ArrayList<SAMProgramRecord>());
}
if (threadAllocation.getNumIOThreads() > 0) {
inputStreams.put(init.readerID, init.blockInputStream); // get from initializer
}
@ -916,6 +920,13 @@ public class SAMDataSource {
for(SAMFileReader reader: readers.values())
headers.add(reader.getFileHeader());
headerMerger = new SamFileHeaderMerger(SAMFileHeader.SortOrder.coordinate,headers,true);
// update all read groups to GATKSAMRecordReadGroups
final List<SAMReadGroupRecord> gatkReadGroups = new LinkedList<SAMReadGroupRecord>();
for ( final SAMReadGroupRecord rg : headerMerger.getMergedHeader().getReadGroups() ) {
gatkReadGroups.add(new GATKSAMReadGroupRecord(rg));
}
headerMerger.getMergedHeader().setReadGroups(gatkReadGroups);
}
final private void printReaderPerformance(final int nExecutedTotal,

View File

@ -26,6 +26,7 @@
package org.broadinstitute.sting.gatk.filters;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.broadinstitute.sting.utils.sam.ReadUtils;
/**
@ -37,6 +38,6 @@ import org.broadinstitute.sting.utils.sam.ReadUtils;
public class Platform454Filter extends ReadFilter {
public boolean filterOut(SAMRecord rec) {
return (ReadUtils.is454Read(rec));
return (ReadUtils.is454Read((GATKSAMRecord)rec));
}
}

View File

@ -27,6 +27,7 @@ package org.broadinstitute.sting.gatk.filters;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.commandline.Argument;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.broadinstitute.sting.utils.sam.ReadUtils;
/**
@ -41,7 +42,7 @@ public class PlatformFilter extends ReadFilter {
public boolean filterOut(SAMRecord rec) {
for ( String name : PLFilterNames )
if ( ReadUtils.isPlatformRead(rec, name.toUpperCase() ))
if ( ReadUtils.isPlatformRead((GATKSAMRecord)rec, name.toUpperCase() ))
return true;
return false;
}

View File

@ -131,7 +131,7 @@ public class GATKRunReport {
private String hostName;
@Element(required = true, name = "java")
private String java;
private String javaVersion;
@Element(required = true, name = "machine")
private String machine;
@ -212,7 +212,7 @@ public class GATKRunReport {
hostName = Utils.resolveHostname();
// basic java information
java = Utils.join("-", Arrays.asList(System.getProperty("java.vendor"), System.getProperty("java.version")));
javaVersion = Utils.join("-", Arrays.asList(System.getProperty("java.vendor"), System.getProperty("java.version")));
machine = Utils.join("-", Arrays.asList(System.getProperty("os.name"), System.getProperty("os.arch")));
// if there was an exception, capture it

View File

@ -40,6 +40,7 @@ import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.Iterator;
import java.util.LinkedList;
/**
* A nano-scheduling version of TraverseReads.
@ -53,6 +54,7 @@ import java.util.Iterator;
*/
public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,ReadShardDataProvider> {
/** our log, which we want to capture anything from this class */
private final static boolean PRE_READ_ALL_MAP_DATA = true;
protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class);
private static final boolean DEBUG = false;
final NanoScheduler<MapData, MapResult, T> nanoScheduler;
@ -111,7 +113,19 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
* should execute
*/
private Iterator<MapData> aggregateMapData(final ReadShardDataProvider dataProvider) {
return new Iterator<MapData>() {
final Iterator<MapData> it = makeDataIterator(dataProvider);
if ( PRE_READ_ALL_MAP_DATA ) {
final LinkedList<MapData> l = new LinkedList<MapData>();
while ( it.hasNext() ) l.add(it.next());
return l.iterator();
} else {
return it;
}
}
private Iterator<MapData> makeDataIterator(final ReadShardDataProvider dataProvider) {
return new Iterator<MapData> () {
final ReadView reads = new ReadView(dataProvider);
final ReadReferenceView reference = new ReadReferenceView(dataProvider);
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);

View File

@ -38,6 +38,7 @@ import org.broadinstitute.sting.gatk.filters.*;
import org.broadinstitute.sting.gatk.iterators.ReadTransformer;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.*;
import org.broadinstitute.sting.utils.MathUtils;
import org.broadinstitute.variant.utils.BaseUtils;
import org.broadinstitute.sting.utils.baq.BAQ;
import org.broadinstitute.sting.utils.classloader.GATKLiteUtils;
@ -135,6 +136,7 @@ public class BaseRecalibrator extends ReadWalker<Long, Long> implements NanoSche
private BAQ baq; // BAQ the reads on the fly to generate the alignment uncertainty vector
private IndexedFastaSequenceFile referenceReader; // fasta reference reader for use with BAQ calculation
private final static byte NO_BAQ_UNCERTAINTY = (byte)'@';
/**
* Parse the -cov arguments and create a list of covariates to be used here
@ -225,25 +227,48 @@ public class BaseRecalibrator extends ReadWalker<Long, Long> implements NanoSche
if (!RecalUtils.isColorSpaceConsistent(RAC.SOLID_NOCALL_STRATEGY, read)) { // parse the solid color space and check for color no-calls
return 0L; // skip this read completely
}
read.setTemporaryAttribute(COVARS_ATTRIBUTE, RecalUtils.computeCovariates(read, requestedCovariates));
final boolean[] skip = calculateSkipArray(read, metaDataTracker); // skip known sites of variation as well as low quality and non-regular bases
final int[] isSNP = calculateIsSNP(read, ref, originalRead);
final int[] isInsertion = calculateIsIndel(read, EventType.BASE_INSERTION);
final int[] isDeletion = calculateIsIndel(read, EventType.BASE_DELETION);
final byte[] baqArray = calculateBAQArray(read);
final int nErrors = nEvents(isSNP, isInsertion, isDeletion);
// note for efficiency regions we don't compute the BAQ array unless we actually have
// some error to marginalize over. For ILMN data ~85% of reads have no error
final byte[] baqArray = nErrors == 0 ? flatBAQArray(read) : calculateBAQArray(read);
if( baqArray != null ) { // some reads just can't be BAQ'ed
final ReadCovariates covariates = RecalUtils.computeCovariates(read, requestedCovariates);
final boolean[] skip = calculateSkipArray(read, metaDataTracker); // skip known sites of variation as well as low quality and non-regular bases
final double[] snpErrors = calculateFractionalErrorArray(isSNP, baqArray);
final double[] insertionErrors = calculateFractionalErrorArray(isInsertion, baqArray);
final double[] deletionErrors = calculateFractionalErrorArray(isDeletion, baqArray);
recalibrationEngine.updateDataForRead(read, skip, snpErrors, insertionErrors, deletionErrors);
// aggregate all of the info into our info object, and update the data
final ReadRecalibrationInfo info = new ReadRecalibrationInfo(read, covariates, skip, snpErrors, insertionErrors, deletionErrors);
recalibrationEngine.updateDataForRead(info);
return 1L;
} else {
return 0L;
}
}
/**
* Compute the number of mutational events across all hasEvent vectors
*
* Simply the sum of entries in hasEvents
*
* @param hasEvents a vector a vectors of 0 (no event) and 1 (has event)
* @return the total number of events across all hasEvent arrays
*/
private int nEvents(final int[]... hasEvents) {
int n = 0;
for ( final int[] hasEvent : hasEvents ) {
n += MathUtils.sum(hasEvent);
}
return n;
}
protected boolean[] calculateSkipArray( final GATKSAMRecord read, final RefMetaDataTracker metaDataTracker ) {
final byte[] bases = read.getReadBases();
final boolean[] skip = new boolean[bases.length];
@ -371,7 +396,6 @@ public class BaseRecalibrator extends ReadWalker<Long, Long> implements NanoSche
throw new ReviewedStingException("Array length mismatch detected. Malformed read?");
}
final byte NO_BAQ_UNCERTAINTY = (byte)'@';
final int BLOCK_START_UNSET = -1;
final double[] fractionalErrors = new double[baqArray.length];
@ -415,8 +439,24 @@ public class BaseRecalibrator extends ReadWalker<Long, Long> implements NanoSche
}
}
/**
* Create a BAQ style array that indicates no alignment uncertainty
* @param read the read for which we want a BAQ array
* @return a BAQ-style non-null byte[] counting NO_BAQ_UNCERTAINTY values
* // TODO -- could be optimized avoiding this function entirely by using this inline if the calculation code above
*/
private byte[] flatBAQArray(final GATKSAMRecord read) {
final byte[] baq = new byte[read.getReadLength()];
Arrays.fill(baq, NO_BAQ_UNCERTAINTY);
return baq;
}
/**
* Compute an actual BAQ array for read, based on its quals and the reference sequence
* @param read the read to BAQ
* @return a non-null BAQ tag array for read
*/
private byte[] calculateBAQArray( final GATKSAMRecord read ) {
// todo -- it would be good to directly use the BAQ qualities rather than encoding and decoding the result and using the special @ value
baq.baqRead(read, referenceReader, BAQ.CalculationMode.RECALCULATE, BAQ.QualityMode.ADD_TAG);
return BAQ.getBAQTag(read);
}

View File

@ -0,0 +1,162 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.walkers.bqsr;
import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.broadinstitute.sting.utils.QualityUtils;
import org.broadinstitute.sting.utils.recalibration.EventType;
import org.broadinstitute.sting.utils.recalibration.ReadCovariates;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
/**
* Created with IntelliJ IDEA.
* User: depristo
* Date: 12/18/12
* Time: 3:50 PM
*
* TODO -- merge in ReadCovariates?
*/
public final class ReadRecalibrationInfo {
private final GATKSAMRecord read;
private final int length;
private final ReadCovariates covariates;
private final boolean[] skips;
private final byte[] baseQuals, insertionQuals, deletionQuals;
private final double[] snpErrors, insertionErrors, deletionErrors;
public ReadRecalibrationInfo(final GATKSAMRecord read,
final ReadCovariates covariates,
final boolean[] skips,
final double[] snpErrors,
final double[] insertionErrors,
final double[] deletionErrors) {
if ( read == null ) throw new IllegalArgumentException("read cannot be null");
if ( covariates == null ) throw new IllegalArgumentException("covariates cannot be null");
if ( skips == null ) throw new IllegalArgumentException("skips cannot be null");
if ( snpErrors == null ) throw new IllegalArgumentException("snpErrors cannot be null");
// future: may allow insertionErrors && deletionErrors to be null, so don't enforce
this.read = read;
this.baseQuals = read.getBaseQualities();
this.length = baseQuals.length;
this.covariates = covariates;
this.skips = skips;
this.insertionQuals = read.getExistingBaseInsertionQualities();
this.deletionQuals = read.getExistingBaseDeletionQualities();
this.snpErrors = snpErrors;
this.insertionErrors = insertionErrors;
this.deletionErrors = deletionErrors;
if ( skips.length != length ) throw new IllegalArgumentException("skips.length " + snpErrors.length + " != length " + length);
if ( snpErrors.length != length ) throw new IllegalArgumentException("snpErrors.length " + snpErrors.length + " != length " + length);
if ( insertionErrors != null && insertionErrors.length != length ) throw new IllegalArgumentException("insertionErrors.length " + snpErrors.length + " != length " + length);
if ( deletionErrors != null && deletionErrors.length != length ) throw new IllegalArgumentException("deletionErrors.length " + snpErrors.length + " != length " + length);
}
/**
* Get the qual score for event type at offset
*
* @param eventType the type of event we want the qual for
* @param offset the offset into this read for the qual
* @return a valid quality score for event at offset
*/
@Requires("validOffset(offset)")
@Ensures("validQual(result)")
public byte getQual(final EventType eventType, final int offset) {
switch ( eventType ) {
case BASE_SUBSTITUTION: return baseQuals[offset];
// note optimization here -- if we don't have ins/del quals we just return the default byte directly
case BASE_INSERTION: return insertionQuals == null ? GATKSAMRecord.DEFAULT_INSERTION_DELETION_QUAL : insertionQuals[offset];
case BASE_DELETION: return deletionQuals == null ? GATKSAMRecord.DEFAULT_INSERTION_DELETION_QUAL : deletionQuals[offset];
default: throw new IllegalStateException("Unknown event type " + eventType);
}
}
/**
* Get the error fraction for event type at offset
*
* The error fraction is a value between 0 and 1 that indicates how much certainty we have
* in the error occurring at offset. A value of 1 means that the error definitely occurs at this
* site, a value of 0.0 means it definitely doesn't happen here. 0.5 means that half the weight
* of the error belongs here
*
* @param eventType the type of event we want the qual for
* @param offset the offset into this read for the qual
* @return a fractional weight for an error at this offset
*/
@Requires("validOffset(offset)")
@Ensures("result >= 0.0 && result <= 1.0")
public double getErrorFraction(final EventType eventType, final int offset) {
switch ( eventType ) {
case BASE_SUBSTITUTION: return snpErrors[offset];
case BASE_INSERTION: return insertionErrors[offset];
case BASE_DELETION: return deletionErrors[offset];
default: throw new IllegalStateException("Unknown event type " + eventType);
}
}
/**
* Get the read involved in this recalibration info
* @return a non-null GATKSAMRecord
*/
@Ensures("result != null")
public GATKSAMRecord getRead() {
return read;
}
/**
* Should offset in this read be skipped (because it's covered by a known variation site?)
* @param offset a valid offset into this info
* @return true if offset should be skipped, false otherwise
*/
@Requires("validOffset(offset)")
public boolean skip(final int offset) {
return skips[offset];
}
/**
* Get the ReadCovariates object carrying the mapping from offsets -> covariate key sets
* @return a non-null ReadCovariates object
*/
@Ensures("result != null")
public ReadCovariates getCovariatesValues() {
return covariates;
}
/**
* Ensures an offset is valid. Used in contracts
* @param offset a proposed offset
* @return true if offset is valid w.r.t. the data in this object, false otherwise
*/
private boolean validOffset(final int offset) {
return offset >= 0 && offset < baseQuals.length;
}
private boolean validQual(final byte result) {
return result >= 0 && result <= QualityUtils.MAX_QUAL_SCORE;
}
}

View File

@ -1,5 +1,6 @@
package org.broadinstitute.sting.gatk.walkers.bqsr;
import com.google.java.contract.Requires;
import org.broadinstitute.sting.utils.recalibration.RecalibrationTables;
import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
@ -29,10 +30,31 @@ import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
* OTHER DEALINGS IN THE SOFTWARE.
*/
public interface RecalibrationEngine {
/**
* Initialize the recalibration engine
*
* Called once before any calls to updateDataForRead are made. The engine should prepare itself
* to handle any number of updateDataForRead calls containing ReadRecalibrationInfo containing
* keys for each of the covariates provided.
*
* The engine should collect match and mismatch data into the recalibrationTables data.
*
* @param covariates an array of the covariates we'll be using in this engine, order matters
* @param recalibrationTables the destination recalibrationTables where stats should be collected
*/
public void initialize(final Covariate[] covariates, final RecalibrationTables recalibrationTables);
public void updateDataForRead(final GATKSAMRecord read, final boolean[] skip, final double[] snpErrors, final double[] insertionErrors, final double[] deletionErrors);
/**
* Update the recalibration statistics using the information in recalInfo
* @param recalInfo data structure holding information about the recalibration values for a single read
*/
@Requires("recalInfo != null")
public void updateDataForRead(final ReadRecalibrationInfo recalInfo);
/**
* Finalize, if appropriate, all derived data in recalibrationTables.
*
* Called once after all calls to updateDataForRead have been issued.
*/
public void finalizeData();
}

View File

@ -35,34 +35,37 @@ import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
public class StandardRecalibrationEngine implements RecalibrationEngine, PublicPackageSource {
protected Covariate[] covariates;
protected RecalibrationTables recalibrationTables;
@Override
public void initialize(final Covariate[] covariates, final RecalibrationTables recalibrationTables) {
if ( covariates == null ) throw new IllegalArgumentException("Covariates cannot be null");
if ( recalibrationTables == null ) throw new IllegalArgumentException("recalibrationTables cannot be null");
this.covariates = covariates.clone();
this.recalibrationTables = recalibrationTables;
}
@Override
public void updateDataForRead( final GATKSAMRecord read, final boolean[] skip, final double[] snpErrors, final double[] insertionErrors, final double[] deletionErrors ) {
public void updateDataForRead( final ReadRecalibrationInfo recalInfo ) {
final GATKSAMRecord read = recalInfo.getRead();
final EventType eventType = EventType.BASE_SUBSTITUTION;
final ReadCovariates readCovariates = recalInfo.getCovariatesValues();
for( int offset = 0; offset < read.getReadBases().length; offset++ ) {
if( !skip[offset] ) {
final ReadCovariates readCovariates = covariateKeySetFrom(read);
if( ! recalInfo.skip(offset) ) {
final byte qual = recalInfo.getQual(eventType, offset);
final double isError = recalInfo.getErrorFraction(eventType, offset);
final int[] keys = readCovariates.getKeySet(offset, eventType);
final byte qual = read.getBaseQualities()[offset];
final double isError = snpErrors[offset];
final int[] keys = readCovariates.getKeySet(offset, EventType.BASE_SUBSTITUTION);
final int eventIndex = EventType.BASE_SUBSTITUTION.index;
incrementDatumOrPutIfNecessary(recalibrationTables.getQualityScoreTable(), qual, isError, keys[0], keys[1], eventIndex);
incrementDatumOrPutIfNecessary(recalibrationTables.getQualityScoreTable(), qual, isError, keys[0], keys[1], eventType.index);
for (int i = 2; i < covariates.length; i++) {
if (keys[i] < 0)
continue;
incrementDatumOrPutIfNecessary(recalibrationTables.getTable(i), qual, isError, keys[0], keys[1], keys[i], eventIndex);
incrementDatumOrPutIfNecessary(recalibrationTables.getTable(i), qual, isError, keys[0], keys[1], keys[i], eventType.index);
}
}
}
@ -79,16 +82,6 @@ public class StandardRecalibrationEngine implements RecalibrationEngine, PublicP
return new RecalDatum(1, isError, reportedQual);
}
/**
* Get the covariate key set from a read
*
* @param read the read
* @return the covariate keysets for this read
*/
protected ReadCovariates covariateKeySetFrom(GATKSAMRecord read) {
return (ReadCovariates) read.getTemporaryAttribute(BaseRecalibrator.COVARS_ATTRIBUTE);
}
/**
* Create derived recalibration data tables
*
@ -129,7 +122,10 @@ public class StandardRecalibrationEngine implements RecalibrationEngine, PublicP
* @param isError error value for this event
* @param keys location in table of our item
*/
protected void incrementDatumOrPutIfNecessary( final NestedIntegerArray<RecalDatum> table, final byte qual, final double isError, final int... keys ) {
protected void incrementDatumOrPutIfNecessary( final NestedIntegerArray<RecalDatum> table,
final byte qual,
final double isError,
final int... keys ) {
final RecalDatum existingDatum = table.get(keys);
if ( existingDatum == null ) {

View File

@ -30,6 +30,7 @@ import org.broadinstitute.sting.utils.Haplotype;
import org.broadinstitute.sting.utils.MathUtils;
import org.broadinstitute.sting.utils.QualityUtils;
import org.broadinstitute.sting.utils.pileup.ReadBackedPileup;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.broadinstitute.sting.utils.sam.ReadUtils;
import org.broadinstitute.variant.variantcontext.Allele;
@ -421,7 +422,7 @@ public class HaplotypeIndelErrorModel {
double[][] haplotypeLikehoodMatrix = new double[haplotypesInVC.size()][haplotypesInVC.size()];
double readLikelihoods[][] = new double[pileup.getReads().size()][haplotypesInVC.size()];
int i=0;
for (SAMRecord read : pileup.getReads()) {
for (GATKSAMRecord read : pileup.getReads()) {
if(ReadUtils.is454Read(read)) {
continue;
}

View File

@ -529,7 +529,7 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
sawReadInCurrentInterval = false;
}
private boolean doNotTryToClean(SAMRecord read) {
private boolean doNotTryToClean(GATKSAMRecord read) {
return read.getReadUnmappedFlag() ||
read.getNotPrimaryAlignmentFlag() ||
read.getReadFailsVendorQualityCheckFlag() ||
@ -835,7 +835,7 @@ public class IndelRealigner extends ReadWalker<Integer, Integer> {
// TODO -- get rid of this try block when Picard does the right thing for reads aligned off the end of the reference
try {
if ( read.getAttribute(SAMTag.NM.name()) != null )
read.setAttribute(SAMTag.NM.name(), SequenceUtil.calculateSamNmTag(read, reference, leftmostIndex-1));
read.setAttribute(SAMTag.NM.name(), SequenceUtil.calculateSamNmTag(read, reference, leftmostIndex - 1));
if ( read.getAttribute(SAMTag.UQ.name()) != null )
read.setAttribute(SAMTag.UQ.name(), SequenceUtil.sumQualitiesOfMismatches(read, reference, leftmostIndex-1));
} catch (Exception e) {

View File

@ -24,8 +24,7 @@
package org.broadinstitute.sting.utils;
import net.sf.samtools.SAMReadGroupRecord;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
/**
* A canonical, master list of the standard NGS platforms. These values
@ -64,25 +63,15 @@ public enum NGSPlatform {
}
/**
* Convenience constructor -- calculates the NGSPlatfrom from a SAMRecord.
* Note you should not use this function if you have a GATKSAMRecord -- use the
* accessor method instead.
* Convenience get -- get the NGSPlatfrom from a SAMRecord.
*
* @param read
* Just gets the platform from the GATKReadGroupRecord associated with this read.
*
* @param read a GATKSAMRecord
* @return an NGSPlatform object matching the PL field of the header, of UNKNOWN if there was no match
*/
public static final NGSPlatform fromRead(SAMRecord read) {
return fromReadGroup(read.getReadGroup());
}
/**
* Returns the NGSPlatform corresponding to the PL tag in the read group
* @param rg
* @return an NGSPlatform object matching the PL field of the header, of UNKNOWN if there was no match
*/
public static final NGSPlatform fromReadGroup(SAMReadGroupRecord rg) {
if ( rg == null ) return UNKNOWN;
return fromReadGroupPL(rg.getPlatform());
public static NGSPlatform fromRead(GATKSAMRecord read) {
return read.getReadGroup().getNGSPlatform();
}
/**
@ -90,7 +79,7 @@ public enum NGSPlatform {
* @param plFromRG -- the PL field (or equivalent) in a ReadGroup object
* @return an NGSPlatform object matching the PL field of the header, or UNKNOWN if there was no match
*/
public static final NGSPlatform fromReadGroupPL(final String plFromRG) {
public static NGSPlatform fromReadGroupPL(final String plFromRG) {
if ( plFromRG == null ) return UNKNOWN;
// todo -- algorithm could be implemented more efficiently, as the list of all
@ -113,7 +102,7 @@ public enum NGSPlatform {
* @param platform the read group string that describes the platform used
* @return true if the platform is known (i.e. it's in the list and is not UNKNOWN)
*/
public static final boolean isKnown (final String platform) {
public static final boolean isKnown(final String platform) {
return fromReadGroupPL(platform) != UNKNOWN;
}
}

View File

@ -54,6 +54,7 @@ public class ExpandingArrayList<E> extends ArrayList<E> {
private void maybeExpand(int index, E value) {
if ( index >= size() ) {
ensureCapacity(index+1); // make sure we have space to hold at least index + 1 elements
// We need to add null items until we can safely set index to element
for ( int i = size(); i <= index; i++ )
add(value);

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import org.apache.log4j.Logger;
@ -37,12 +62,6 @@ class InputProducer<InputType> {
int nRead = 0;
int inputID = -1;
/**
* A latch used to block threads that want to start up only when all of the values
* in inputReader have been read by the thread executing run()
*/
final CountDownLatch latch = new CountDownLatch(1);
public InputProducer(final Iterator<InputType> inputReader) {
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
this.inputReader = inputReader;

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**
@ -25,14 +50,6 @@ class MapResult<MapType> extends EOFMarkedValue<MapType> implements Comparable<M
if ( jobID < 0 ) throw new IllegalArgumentException("JobID must be >= 0");
}
/**
* Create the EOF marker version of MapResult
*/
MapResult() {
super();
this.jobID = Integer.MAX_VALUE;
}
/**
* @return the job ID of the map job that produced this MapResult
*/

View File

@ -0,0 +1,116 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.utils.collections.ExpandingArrayList;
/**
* Created with IntelliJ IDEA.
* User: depristo
* Date: 12/19/12
* Time: 3:53 PM
*
* This class makes some critical assumptions. First is that the jobID of the first
* job is 0. If this isn't true the MapResultsQueue will certainly fail.
*/
public class MapResultsQueue<MapType> {
//private final static boolean DEBUG = false;
//private final static Logger logger = Logger.getLogger(MapResultsQueue.class);
/**
* Although naturally stored as priority blocking queue, this is actually quite expensive
* due to the O(n log n) sorting calculation. Since we know that the job ids start
* at 0 and increment by 1 in each successive job, we store an array instead. The
* array is indexed by jobID, and contains the MapResult for that job id. Because elements
* can be added to the queue in any order, we need to use an expanding array list to
* store the elements.
*/
final ExpandingArrayList<MapResult<MapType>> queue = new ExpandingArrayList<MapResult<MapType>>(10000);
/**
* The jobID of the last job we've seen
*/
int prevJobID = -1; // no jobs observed
/**
* Put mapResult into this MapResultsQueue, associated with its jobID
* @param mapResult a non-null map result
*/
public synchronized void put(final MapResult<MapType> mapResult) {
if ( mapResult == null ) throw new IllegalArgumentException("mapResult cannot be null");
// make sure that nothing is at the job id for map
assert queue.size() < mapResult.getJobID() || queue.get(mapResult.getJobID()) == null;
queue.set(mapResult.getJobID(), mapResult);
}
/**
* Should we reduce the next value in the mapResultQueue?
*
* @return true if we should reduce
*/
public synchronized boolean nextValueIsAvailable() {
final MapResult<MapType> nextMapResult = queue.get(nextJobID());
if ( nextMapResult == null ) {
// natural case -- the next job hasn't had a value added yet
return false;
} else if ( nextMapResult.getJobID() != nextJobID() ) {
// sanity check -- the job id at next isn't the one we expect
throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is not == previous job id " + prevJobID + " + 1");
} else {
// there's a value at the next job id, so return true
return true;
}
}
/**
* Get the next job ID'd be expect to see given our previous job id
* @return the next job id we'd fetch to reduce
*/
private int nextJobID() {
return prevJobID + 1;
}
/**
* Can only be called when nextValueIsAvailable is true
* @return
* @throws InterruptedException
*/
// TODO -- does this have to be synchronized? -- I think the answer is no
public synchronized MapResult<MapType> take() throws InterruptedException {
final MapResult<MapType> result = queue.get(nextJobID());
// make sure the value we've fetched has the right id
assert result.getJobID() == nextJobID();
prevJobID = result.getJobID();
queue.set(prevJobID, null);
return result;
}
}

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
/**

View File

@ -1,3 +1,28 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
@ -43,13 +68,20 @@ import java.util.concurrent.*;
public class NanoScheduler<InputType, MapType, ReduceType> {
private final static Logger logger = Logger.getLogger(NanoScheduler.class);
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
private final static boolean LOG_MAP_TIMES = false;
protected final static int UPDATE_PROGRESS_FREQ = 100;
/**
* Currently not used, but kept because it's conceptual reasonable to have a buffer
*/
final int bufferSize;
/**
* The number of threads we're using to execute the map jobs in this nano scheduler
*/
final int nThreads;
final ExecutorService masterExecutor;
final ExecutorService mapExecutor;
final Semaphore runningMapJobSlots;
final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
boolean shutdown = false;
@ -75,11 +107,9 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
if ( nThreads == 1 ) {
this.mapExecutor = this.masterExecutor = null;
runningMapJobSlots = null;
} else {
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d"));
this.mapExecutor = Executors.newFixedThreadPool(nThreads, new NamedThreadFactory("NS-map-thread-%d"));
runningMapJobSlots = new Semaphore(this.bufferSize);
}
}
@ -243,8 +273,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// map
final MapType mapValue = map.apply(input);
if ( progressFunction != null )
progressFunction.progress(input);
updateProgress(i++, input);
// reduce
sum = reduce.apply(mapValue, sum);
@ -254,6 +283,16 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
return sum;
}
/**
* Maybe update the progress meter (maybe because we don't want to do so so often that it costs cpu time)
* @param counter increasing counter to use to cut down on updates
* @param input the input we're currently at
*/
private void updateProgress(final int counter, final InputType input) {
if ( progressFunction != null && counter % UPDATE_PROGRESS_FREQ == 0 )
progressFunction.progress(input);
}
/**
* Efficient parallel version of Map/Reduce
*
@ -349,33 +388,23 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
// Create the input producer and start it running
final InputProducer<InputType> inputProducer = new InputProducer<InputType>(inputReader);
// a priority queue that stores up to bufferSize elements
// produced by completed map jobs.
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue =
new PriorityBlockingQueue<MapResult<MapType>>();
// create the MapResultsQueue to store results of map jobs.
final MapResultsQueue<MapType> mapResultQueue = new MapResultsQueue<MapType>();
final Reducer<MapType, ReduceType> reducer
= new Reducer<MapType, ReduceType>(reduce, errorTracker, initialValue);
// create the reducer we'll use for this nano scheduling run
final Reducer<MapType, ReduceType> reducer = new Reducer<MapType, ReduceType>(reduce, errorTracker, initialValue);
final CountDownLatch runningMapJobs = new CountDownLatch(nThreads);
try {
int nSubmittedJobs = 0;
while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
// acquire a slot to run a map job. Blocks if too many jobs are enqueued
runningMapJobSlots.acquire();
mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, map, reducer));
nSubmittedJobs++;
// create and submit the info needed by the read/map/reduce threads to do their work
for ( int i = 0; i < nThreads; i++ ) {
mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, runningMapJobs, map, reducer));
}
// mark the last job id we've submitted so we now the id to wait for
//logger.warn("setting jobs submitted to " + nSubmittedJobs);
reducer.setTotalJobCount(nSubmittedJobs);
// wait for all of the input and map threads to finish
return waitForCompletion(inputProducer, reducer);
return waitForCompletion(mapResultQueue, runningMapJobs, reducer);
} catch (Throwable ex) {
// logger.warn("Reduce job got exception " + ex);
errorTracker.notifyOfError(ex);
return initialValue;
}
@ -384,52 +413,40 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
/**
* Wait until the input thread and all map threads have completed running, and return the final reduce result
*/
private ReduceType waitForCompletion(final InputProducer<InputType> inputProducer,
private ReduceType waitForCompletion(final MapResultsQueue<MapType> mapResultsQueue,
final CountDownLatch runningMapJobs,
final Reducer<MapType, ReduceType> reducer) throws InterruptedException {
// wait until we have a final reduce result
// logger.warn("waiting for final reduce");
final ReduceType finalSum = reducer.waitForFinalReduce();
// wait for all the map threads to finish by waiting on the runningMapJobs latch
runningMapJobs.await();
// wait for all the map threads to finish by acquiring and then releasing all map job semaphores
// logger.warn("waiting on map");
runningMapJobSlots.acquire(bufferSize);
runningMapJobSlots.release(bufferSize);
// do a final reduce here. This is critically important because the InputMapReduce jobs
// no longer block on reducing, so it's possible for all the threads to end with a few
// reduce jobs on the queue still to do. This call ensures that we reduce everything
reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
// wait until we have a final reduce result
final ReduceType finalSum = reducer.getReduceResult();
// everything is finally shutdown, return the final reduce value
return finalSum;
}
/**
* Should we continue to submit jobs given the number of jobs already submitted and the
* number of read items in inputProducer?
*
* We continue to submit jobs while inputProducer hasn't reached EOF or the number
* of jobs we've enqueued isn't the number of read elements. This means that in
* some cases we submit more jobs than total read elements (cannot know because of
* multi-threading) so map jobs must handle the case where getNext() returns EOF.
*
* @param nJobsSubmitted
* @param inputProducer
* @return
*/
private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer<InputType> inputProducer) {
final int nReadItems = inputProducer.getNumInputValues();
return nReadItems == -1 || nJobsSubmitted < nReadItems;
}
}
private class ReadMapReduceJob implements Runnable {
final InputProducer<InputType> inputProducer;
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue;
final MapResultsQueue<MapType> mapResultQueue;
final NSMapFunction<InputType, MapType> map;
final Reducer<MapType, ReduceType> reducer;
final CountDownLatch runningMapJobs;
private ReadMapReduceJob(final InputProducer<InputType> inputProducer,
final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue,
final MapResultsQueue<MapType> mapResultQueue,
final CountDownLatch runningMapJobs,
final NSMapFunction<InputType, MapType> map,
final Reducer<MapType, ReduceType> reducer) {
this.inputProducer = inputProducer;
this.mapResultQueue = mapResultQueue;
this.runningMapJobs = runningMapJobs;
this.map = map;
this.reducer = reducer;
}
@ -437,39 +454,41 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
@Override
public void run() {
try {
// get the next item from the input producer
final InputProducer<InputType>.InputValue inputWrapper = inputProducer.next();
boolean done = false;
while ( ! done ) {
// get the next item from the input producer
final InputProducer<InputType>.InputValue inputWrapper = inputProducer.next();
// depending on inputWrapper, actually do some work or not, putting result input result object
final MapResult<MapType> result;
if ( ! inputWrapper.isEOFMarker() ) {
// just skip doing anything if we don't have work to do, which is possible
// because we don't necessarily know how much input there is when we queue
// up our jobs
final InputType input = inputWrapper.getValue();
// depending on inputWrapper, actually do some work or not, putting result input result object
final MapResult<MapType> result;
if ( ! inputWrapper.isEOFMarker() ) {
// just skip doing anything if we don't have work to do, which is possible
// because we don't necessarily know how much input there is when we queue
// up our jobs
final InputType input = inputWrapper.getValue();
// map
final MapType mapValue = map.apply(input);
// actually execute the map
final MapType mapValue = map.apply(input);
// enqueue the result into the mapResultQueue
result = new MapResult<MapType>(mapValue, inputWrapper.getId());
// enqueue the result into the mapResultQueue
result = new MapResult<MapType>(mapValue, inputWrapper.getId());
if ( progressFunction != null )
progressFunction.progress(input);
} else {
// if there's no input we push empty MapResults with jobIDs for synchronization with Reducer
result = new MapResult<MapType>(inputWrapper.getId());
mapResultQueue.put(result);
// reduce as much as possible, without blocking, if another thread is already doing reduces
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false);
updateProgress(inputWrapper.getId(), input);
} else {
done = true;
}
}
mapResultQueue.put(result);
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue);
} catch (Throwable ex) {
errorTracker.notifyOfError(ex);
} finally {
// we finished a map job, release the job queue semaphore
runningMapJobSlots.release();
runningMapJobs.countDown();
}
}
}
}
}

View File

@ -1,39 +1,68 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Reducer supporting two-threaded reduce of the map/reduce.
* Reducer supporting multi-threaded reduce of the map/reduce.
*
* The first thread, using the reduceAsMuchAsPossible function, actually reduces the data
* as it arrives in the blockingQueue.
* reduceAsMuchAsPossible is the key function. Multiple threads can call into this, providing
* the map results queue, and this class accumulates the result of calling reduce
* on the maps objects. reduceAsMuchAsPossible isn't directly synchronized, but manages multi-threading
* directly with a lock. Threads can request either to block on the reduce call until it can be
* executed, or immediately exit if the lock isn't available. That allows multi-threaded users
* to avoid piling up waiting to reduce while one thread is reducing. They can instead immediately
* leave to go do something else productive
*
* The second thread, using the waitForFinalReduce, can block on this data structure
* until that all jobs have arrived and been reduced.
*
* The key function for communication here is setTotalJobCount(), which the thread that submits
* jobs that enqueue MapResults into the blocking queue must call ONCE to tell the
* Reducer the total number of jobs that have been submitted for map. When numOfSubmittedJobs
* have been processed, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed.
*
* This thread reads from mapResultsQueue until the poison EOF object arrives. At each
* stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the
* queue waits until the mapResultQueue has a value to take. Then, it gets and waits
* until the map result Future has a value.
* @author depristo
* @since 2012
*/
class Reducer<MapType, ReduceType> {
private final static Logger logger = Logger.getLogger(Reducer.class);
private final static int UNSET_NUM_SUBMITTED_JOBS = -2;
final CountDownLatch countDownLatch = new CountDownLatch(1);
final NSReduceFunction<MapType, ReduceType> reduce;
final MultiThreadedErrorTracker errorTracker;
/**
* The reduce function to execute
*/
private final NSReduceFunction<MapType, ReduceType> reduce;
/**
* Used to communicate errors to the outer master thread
*/
private final MultiThreadedErrorTracker errorTracker;
/**
* Lock used to protect the call reduceAsMuchAsPossible from race conditions
*/
private final Lock reduceLock = new ReentrantLock();
/**
* The sum of the reduce function applied to all MapResults. After this Reducer
@ -41,18 +70,6 @@ class Reducer<MapType, ReduceType> {
*/
ReduceType sum;
int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set
/**
* The jobID of the last job we've seen
*/
int prevJobID = -1; // no jobs observed
/**
* A counter keeping track of the number of jobs we're reduced
*/
int numJobsReduced = 0;
/**
* Create a new Reducer that will apply the reduce function with initialSum value
* to values via reduceAsMuchAsPossible, timing the reduce function call costs with
@ -72,26 +89,6 @@ class Reducer<MapType, ReduceType> {
this.sum = initialSum;
}
/**
* Should we reduce the next value in the mapResultQueue?
*
* @param mapResultQueue the queue of map results
* @return true if we should reduce
*/
@Requires("mapResultQueue != null")
private synchronized boolean reduceNextValueInQueue(final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue) {
final MapResult<MapType> nextMapResult = mapResultQueue.peek();
if ( nextMapResult == null ) {
return false;
} else if ( nextMapResult.getJobID() < prevJobID + 1 ) {
throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is not < previous job id " + prevJobID);
} else if ( nextMapResult.getJobID() == prevJobID + 1 ) {
return true;
} else {
return false;
}
}
/**
* Reduce as much data as possible in mapResultQueue, returning the number of reduce calls completed
*
@ -104,97 +101,69 @@ class Reducer<MapType, ReduceType> {
* @throws InterruptedException
*/
@Ensures("result >= 0")
public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue<MapResult<MapType>> mapResultQueue) {
public int reduceAsMuchAsPossible(final MapResultsQueue<MapType> mapResultQueue, final boolean waitForLock) {
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
int nReducesNow = 0;
// if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
final boolean haveLock = acquireReduceLock(waitForLock);
try {
while ( reduceNextValueInQueue(mapResultQueue) ) {
final MapResult<MapType> result = mapResultQueue.take();
prevJobID = result.getJobID();
if ( haveLock ) {
while ( mapResultQueue.nextValueIsAvailable() ) {
final MapResult<MapType> result = mapResultQueue.take();
if ( ! result.isEOFMarker() ) {
nReducesNow++;
if ( ! result.isEOFMarker() ) {
nReducesNow++;
// apply reduce, keeping track of sum
sum = reduce.apply(result.getValue(), sum);
// apply reduce, keeping track of sum
sum = reduce.apply(result.getValue(), sum);
}
}
numJobsReduced++;
maybeReleaseLatch();
}
} catch (Exception ex) {
errorTracker.notifyOfError(ex);
countDownLatch.countDown();
} finally {
if ( haveLock ) // if we acquired the lock, unlock it
releaseReduceLock();
}
// if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS )
// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
return nReducesNow;
}
/**
* release the latch if appropriate
* Acquire the reduce lock, either returning immediately if not possible or blocking until the lock is available
*
* Appropriate means we've seen the last job, or there's only a single job id
* @param blockUntilAvailable if true, we will block until the lock is available, otherwise we return immediately
* without acquiring the lock
* @return true if the lock has been acquired, false otherwise
*/
private synchronized void maybeReleaseLatch() {
if ( numJobsReduced == numSubmittedJobs ) {
// either we've already seen the last one prevJobID == numSubmittedJobs or
// the last job ID is -1, meaning that no jobs were ever submitted
countDownLatch.countDown();
protected boolean acquireReduceLock(final boolean blockUntilAvailable) {
if ( blockUntilAvailable ) {
reduceLock.lock();
return true;
} else {
return reduceLock.tryLock();
}
}
/**
* For testing only
* Free the reduce lock.
*
* @return true if latch is released
* Assumes that the invoking thread actually previously acquired the lock (it's a problem if not).
*/
protected synchronized boolean latchIsReleased() {
return countDownLatch.getCount() == 0;
protected void releaseReduceLock() {
reduceLock.unlock();
}
/**
* Key function: tell this class the total number of jobs will provide data in the mapResultsQueue
* Get the current reduce result resulting from applying reduce(...) to all MapResult elements.
*
* The total job count when we free threads blocked on waitForFinalReduce. When we see numOfSubmittedJobs
* MapResults from the queue, those threads are released.
*
* Until this function is called, those thread will block forever. The numOfSubmittedJobs has a few constraints.
* First, it must be >= 0. 0 indicates that in fact no jobs will ever be submitted (i.e., there's no
* data coming) so the latch should be opened immediately. If it's >= 1, we will wait until
* we see numOfSubmittedJobs jobs before freeing them.
*
* Note that we throw an IllegalStateException if this function is called twice.
*
* @param numOfSubmittedJobs int >= 0 indicating the total number of MapResults that will
* enqueue results into our queue
*/
public synchronized void setTotalJobCount(final int numOfSubmittedJobs) {
if ( numOfSubmittedJobs < 0 )
throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs);
if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS)
throw new IllegalStateException("setlastJobID called multiple times, but should only be called once");
//logger.warn("setTotalJobCount " + numJobsReduced + " numSubmitted " + numOfSubmittedJobs);
this.numSubmittedJobs = numOfSubmittedJobs;
maybeReleaseLatch();
}
/**
* Block until the last job has submitted its MapResult to our queue, and we've reduced it, and
* return the reduce result resulting from applying reduce(...) to all MapResult elements.
* Note that this method cannot know if future reduce calls are coming in. So it simply gets
* the current reduce result. It is up to the caller to know whether the returned value is
* a partial result, or the full final value
*
* @return the total reduce result across all jobs
* @throws InterruptedException
*/
public ReduceType waitForFinalReduce() throws InterruptedException {
//logger.warn("waitForFinalReduce() " + numJobsReduced + " " + numSubmittedJobs);
countDownLatch.await();
//logger.warn(" done waitForFinalReduce");
public ReduceType getReduceResult() {
return sum;
}
}

View File

@ -135,14 +135,6 @@ public class RecalDatum {
this.estimatedQReported = estimatedQReported;
}
public static RecalDatum createRandomRecalDatum(int maxObservations, int maxErrors) {
final Random random = new Random();
final int nObservations = random.nextInt(maxObservations);
final int nErrors = random.nextInt(maxErrors);
final int qual = random.nextInt(QualityUtils.MAX_QUAL_SCORE);
return new RecalDatum(nObservations, nErrors, (byte)qual);
}
public final double getEstimatedQReported() {
return estimatedQReported;
}
@ -212,49 +204,49 @@ public class RecalDatum {
//
//---------------------------------------------------------------------------------------------------------------
public double getNumObservations() {
public final double getNumObservations() {
return numObservations;
}
public synchronized void setNumObservations(final double numObservations) {
public final synchronized void setNumObservations(final double numObservations) {
if ( numObservations < 0 ) throw new IllegalArgumentException("numObservations < 0");
this.numObservations = numObservations;
empiricalQuality = UNINITIALIZED;
}
public double getNumMismatches() {
public final double getNumMismatches() {
return numMismatches;
}
@Requires({"numMismatches >= 0"})
public synchronized void setNumMismatches(final double numMismatches) {
public final synchronized void setNumMismatches(final double numMismatches) {
if ( numMismatches < 0 ) throw new IllegalArgumentException("numMismatches < 0");
this.numMismatches = numMismatches;
empiricalQuality = UNINITIALIZED;
}
@Requires({"by >= 0"})
public synchronized void incrementNumObservations(final double by) {
public final synchronized void incrementNumObservations(final double by) {
numObservations += by;
empiricalQuality = UNINITIALIZED;
}
@Requires({"by >= 0"})
public synchronized void incrementNumMismatches(final double by) {
public final synchronized void incrementNumMismatches(final double by) {
numMismatches += by;
empiricalQuality = UNINITIALIZED;
}
@Requires({"incObservations >= 0", "incMismatches >= 0"})
@Ensures({"numObservations == old(numObservations) + incObservations", "numMismatches == old(numMismatches) + incMismatches"})
public synchronized void increment(final double incObservations, final double incMismatches) {
public final synchronized void increment(final double incObservations, final double incMismatches) {
numObservations += incObservations;
numMismatches += incMismatches;
empiricalQuality = UNINITIALIZED;
}
@Ensures({"numObservations == old(numObservations) + 1", "numMismatches >= old(numMismatches)"})
public synchronized void increment(final boolean isError) {
public final synchronized void increment(final boolean isError) {
increment(1, isError ? 1 : 0.0);
}

View File

@ -769,4 +769,28 @@ public class RecalUtils {
return base;
}
}
/**
* Combines the recalibration data for table1 and table2 into table1
*
* Note that table1 is the destination, so it is modified
*
* @param table1 the destination table to merge table2 into
* @param table2 the source table to merge into table1
*/
public static void combineTables(final NestedIntegerArray<RecalDatum> table1, final NestedIntegerArray<RecalDatum> table2) {
if ( table1 == null ) throw new IllegalArgumentException("table1 cannot be null");
if ( table2 == null ) throw new IllegalArgumentException("table2 cannot be null");
if ( ! Arrays.equals(table1.getDimensions(), table2.getDimensions()))
throw new IllegalArgumentException("Table1 " + Utils.join(",", table1.getDimensions()) + " not equal to " + Utils.join(",", table2.getDimensions()));
for (final NestedIntegerArray.Leaf<RecalDatum> row : table2.getAllLeaves()) {
final RecalDatum myDatum = table1.get(row.keys);
if (myDatum == null)
table1.put(row.value, row.keys);
else
myDatum.combine(row.value);
}
}
}

View File

@ -69,19 +69,19 @@ public class RecalibrationReport {
}
protected RecalibrationReport(final QuantizationInfo quantizationInfo, final RecalibrationTables recalibrationTables, final GATKReportTable argumentTable, final RecalibrationArgumentCollection RAC) {
protected RecalibrationReport(final QuantizationInfo quantizationInfo, final RecalibrationTables recalibrationTables, final Covariate[] requestedCovariates, final GATKReportTable argumentTable, final RecalibrationArgumentCollection RAC) {
this.quantizationInfo = quantizationInfo;
this.recalibrationTables = recalibrationTables;
this.requestedCovariates = requestedCovariates;
this.argumentTable = argumentTable;
this.RAC = RAC;
this.requestedCovariates = null;
this.optionalCovariateIndexes = null;
}
/**
* Counts the number of unique read groups in the table
*
* @param reportTable the GATKReport table containing data for this table
* @param reportTable the GATKReport table containing data for this table
* @return the number of unique read groups
*/
private int countReadGroups(final GATKReportTable reportTable) {
@ -105,19 +105,10 @@ public class RecalibrationReport {
* @param other the recalibration report to combine with this one
*/
public void combine(final RecalibrationReport other) {
for ( int tableIndex = 0; tableIndex < recalibrationTables.numTables(); tableIndex++ ) {
final NestedIntegerArray<RecalDatum> myTable = recalibrationTables.getTable(tableIndex);
final NestedIntegerArray<RecalDatum> otherTable = other.recalibrationTables.getTable(tableIndex);
for (final NestedIntegerArray.Leaf row : otherTable.getAllLeaves()) {
final RecalDatum myDatum = myTable.get(row.keys);
if (myDatum == null)
myTable.put((RecalDatum)row.value, row.keys);
else
myDatum.combine((RecalDatum)row.value);
}
RecalUtils.combineTables(myTable, otherTable);
}
}

View File

@ -25,11 +25,13 @@
package org.broadinstitute.sting.utils.recalibration;
import com.google.java.contract.Ensures;
import org.broadinstitute.sting.utils.collections.LoggingNestedIntegerArray;
import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
import java.io.PrintStream;
import java.util.ArrayList;
/**
* Utility class to facilitate on-the-fly base quality score recalibration.
@ -38,8 +40,7 @@ import java.io.PrintStream;
* Date: 6/20/12
*/
public class RecalibrationTables {
public final class RecalibrationTables {
public enum TableType {
READ_GROUP_TABLE(0),
QUALITY_SCORE_TABLE(1),
@ -52,49 +53,82 @@ public class RecalibrationTables {
}
}
private final NestedIntegerArray[] tables;
private final ArrayList<NestedIntegerArray<RecalDatum>> tables;
private final int qualDimension;
private final int eventDimension = EventType.values().length;
private final int numReadGroups;
private final PrintStream log;
public RecalibrationTables(final Covariate[] covariates) {
this(covariates, covariates[TableType.READ_GROUP_TABLE.index].maximumKeyValue() + 1, null);
}
public RecalibrationTables(final Covariate[] covariates, final PrintStream log) {
this(covariates, covariates[TableType.READ_GROUP_TABLE.index].maximumKeyValue() + 1, log);
}
public RecalibrationTables(final Covariate[] covariates, final int numReadGroups) {
this(covariates, numReadGroups, null);
}
public RecalibrationTables(final Covariate[] covariates, final int numReadGroups, final PrintStream log) {
tables = new NestedIntegerArray[covariates.length];
tables = new ArrayList<NestedIntegerArray<RecalDatum>>(covariates.length);
for ( int i = 0; i < covariates.length; i++ )
tables.add(i, null); // initialize so we can set below
final int qualDimension = covariates[TableType.QUALITY_SCORE_TABLE.index].maximumKeyValue() + 1;
final int eventDimension = EventType.values().length;
qualDimension = covariates[TableType.QUALITY_SCORE_TABLE.index].maximumKeyValue() + 1;
this.numReadGroups = numReadGroups;
this.log = log;
tables.set(TableType.READ_GROUP_TABLE.index,
log == null ? new NestedIntegerArray<RecalDatum>(numReadGroups, eventDimension) :
new LoggingNestedIntegerArray<RecalDatum>(log, "READ_GROUP_TABLE", numReadGroups, eventDimension));
tables.set(TableType.QUALITY_SCORE_TABLE.index, makeQualityScoreTable());
tables[TableType.READ_GROUP_TABLE.index] = log == null ? new NestedIntegerArray<RecalDatum>(numReadGroups, eventDimension) :
new LoggingNestedIntegerArray<RecalDatum>(log, "READ_GROUP_TABLE", numReadGroups, eventDimension);
tables[TableType.QUALITY_SCORE_TABLE.index] = log == null ? new NestedIntegerArray<RecalDatum>(numReadGroups, qualDimension, eventDimension) :
new LoggingNestedIntegerArray<RecalDatum>(log, "QUALITY_SCORE_TABLE", numReadGroups, qualDimension, eventDimension);
for (int i = TableType.OPTIONAL_COVARIATE_TABLES_START.index; i < covariates.length; i++)
tables[i] = log == null ? new NestedIntegerArray<RecalDatum>(numReadGroups, qualDimension, covariates[i].maximumKeyValue()+1, eventDimension) :
new LoggingNestedIntegerArray<RecalDatum>(log, String.format("OPTIONAL_COVARIATE_TABLE_%d", i - TableType.OPTIONAL_COVARIATE_TABLES_START.index + 1),
numReadGroups, qualDimension, covariates[i].maximumKeyValue()+1, eventDimension);
tables.set(i,
log == null ? new NestedIntegerArray<RecalDatum>(numReadGroups, qualDimension, covariates[i].maximumKeyValue()+1, eventDimension) :
new LoggingNestedIntegerArray<RecalDatum>(log, String.format("OPTIONAL_COVARIATE_TABLE_%d", i - TableType.OPTIONAL_COVARIATE_TABLES_START.index + 1),
numReadGroups, qualDimension, covariates[i].maximumKeyValue()+1, eventDimension));
}
@Ensures("result != null")
public NestedIntegerArray<RecalDatum> getReadGroupTable() {
return (NestedIntegerArray<RecalDatum>)tables[TableType.READ_GROUP_TABLE.index];
return getTable(TableType.READ_GROUP_TABLE.index);
}
@Ensures("result != null")
public NestedIntegerArray<RecalDatum> getQualityScoreTable() {
return (NestedIntegerArray<RecalDatum>)tables[TableType.QUALITY_SCORE_TABLE.index];
return getTable(TableType.QUALITY_SCORE_TABLE.index);
}
@Ensures("result != null")
public NestedIntegerArray<RecalDatum> getTable(final int index) {
return (NestedIntegerArray<RecalDatum>)tables[index];
return tables.get(index);
}
@Ensures("result >= 0")
public int numTables() {
return tables.length;
return tables.size();
}
/**
* Allocate a new quality score table, based on requested parameters
* in this set of tables, without any data in it. The return result
* of this table is suitable for acting as a thread-local cache
* for quality score values
* @return a newly allocated, empty read group x quality score table
*/
public NestedIntegerArray<RecalDatum> makeQualityScoreTable() {
return log == null
? new NestedIntegerArray<RecalDatum>(numReadGroups, qualDimension, eventDimension)
: new LoggingNestedIntegerArray<RecalDatum>(log, "QUALITY_SCORE_TABLE", numReadGroups, qualDimension, eventDimension);
}
/**
* Merge in the quality score table information from qualityScoreTable into this
* recalibration table's quality score table.
*
* @param qualityScoreTable the quality score table we want to merge in
*/
public void combineQualityScoreTable(final NestedIntegerArray<RecalDatum> qualityScoreTable) {
RecalUtils.combineTables(getQualityScoreTable(), qualityScoreTable);
}
}

View File

@ -50,7 +50,7 @@ import java.util.EnumSet;
public class CycleCovariate implements StandardCovariate {
private int MAXIMUM_CYCLE_VALUE;
private static final int CUSHION_FOR_INDELS = 4;
public static final int CUSHION_FOR_INDELS = 4;
private String default_platform = null;
private static final EnumSet<NGSPlatform> DISCRETE_CYCLE_PLATFORMS = EnumSet.of(NGSPlatform.ILLUMINA, NGSPlatform.SOLID, NGSPlatform.PACBIO, NGSPlatform.COMPLETE_GENOMICS);

View File

@ -188,6 +188,7 @@ public class ArtificialSAMUtils {
GATKSAMRecord rec = createArtificialRead(header, name, refIndex, alignmentStart, bases.length);
rec.setReadBases(bases);
rec.setBaseQualities(qual);
rec.setReadGroup(new GATKSAMReadGroupRecord("x"));
if (refIndex == -1) {
rec.setReadUnmappedFlag(true);
}

View File

@ -12,9 +12,6 @@ import org.broadinstitute.sting.utils.NGSPlatform;
*
*/
public class GATKSAMReadGroupRecord extends SAMReadGroupRecord {
public static final String LANE_TAG = "LN";
// the SAMReadGroupRecord data we're caching
private String mSample = null;
private String mPlatform = null;
@ -33,46 +30,14 @@ public class GATKSAMReadGroupRecord extends SAMReadGroupRecord {
super(record.getReadGroupId(), record);
}
public GATKSAMReadGroupRecord(SAMReadGroupRecord record, NGSPlatform pl) {
super(record.getReadGroupId(), record);
setPlatform(pl.getDefaultPlatform());
mNGSPlatform = pl;
retrievedPlatform = retrievedNGSPlatform = true;
}
///////////////////////////////////////////////////////////////////////////////
// *** The following methods are overloaded to cache the appropriate data ***//
///////////////////////////////////////////////////////////////////////////////
public String getSample() {
if ( !retrievedSample ) {
mSample = super.getSample();
retrievedSample = true;
}
return mSample;
}
public void setSample(String s) {
super.setSample(s);
mSample = s;
retrievedSample = true;
}
public String getPlatform() {
if ( !retrievedPlatform ) {
mPlatform = super.getPlatform();
retrievedPlatform = true;
}
return mPlatform;
}
public void setPlatform(String s) {
super.setPlatform(s);
mPlatform = s;
retrievedPlatform = true;
retrievedNGSPlatform = false; // recalculate the NGSPlatform
}
/**
* Get the NGSPlatform enum telling us the platform of this read group
*
* This function call is caching, so subsequent calls to it are free, while
* the first time it's called there's a bit of work to resolve the enum
*
* @return an NGSPlatform enum value
*/
public NGSPlatform getNGSPlatform() {
if ( ! retrievedNGSPlatform ) {
mNGSPlatform = NGSPlatform.fromReadGroupPL(getPlatform());
@ -82,11 +47,40 @@ public class GATKSAMReadGroupRecord extends SAMReadGroupRecord {
return mNGSPlatform;
}
public String getLane() {
return this.getAttribute(LANE_TAG);
///////////////////////////////////////////////////////////////////////////////
// *** The following methods are overloaded to cache the appropriate data ***//
///////////////////////////////////////////////////////////////////////////////
@Override
public String getSample() {
if ( !retrievedSample ) {
mSample = super.getSample();
retrievedSample = true;
}
return mSample;
}
public void setLane(String lane) {
this.setAttribute(LANE_TAG, lane);
@Override
public void setSample(String s) {
super.setSample(s);
mSample = s;
retrievedSample = true;
}
@Override
public String getPlatform() {
if ( !retrievedPlatform ) {
mPlatform = super.getPlatform();
retrievedPlatform = true;
}
return mPlatform;
}
@Override
public void setPlatform(String s) {
super.setPlatform(s);
mPlatform = s;
retrievedPlatform = true;
retrievedNGSPlatform = false; // recalculate the NGSPlatform
}
}

View File

@ -25,9 +25,9 @@
package org.broadinstitute.sting.utils.sam;
import net.sf.samtools.*;
import org.broadinstitute.sting.utils.recalibration.EventType;
import org.broadinstitute.sting.utils.NGSPlatform;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.recalibration.EventType;
import java.util.Arrays;
import java.util.HashMap;
@ -56,6 +56,12 @@ public class GATKSAMRecord extends BAMRecord {
public static final String BQSR_BASE_INSERTION_QUALITIES = "BI"; // base qualities for insertions
public static final String BQSR_BASE_DELETION_QUALITIES = "BD"; // base qualities for deletions
/**
* The default quality score for an insertion or deletion, if
* none are provided for this read.
*/
public static final byte DEFAULT_INSERTION_DELETION_QUAL = (byte)45;
// the SAMRecord data we're caching
private String mReadString = null;
private GATKSAMReadGroupRecord mReadGroup = null;
@ -141,16 +147,36 @@ public class GATKSAMRecord extends BAMRecord {
mReadString = s;
}
/**
* Get the GATKSAMReadGroupRecord of this read
* @return a non-null GATKSAMReadGroupRecord
*/
@Override
public GATKSAMReadGroupRecord getReadGroup() {
if ( !retrievedReadGroup ) {
SAMReadGroupRecord tempReadGroup = super.getReadGroup();
mReadGroup = (tempReadGroup == null ? null : new GATKSAMReadGroupRecord(tempReadGroup));
if ( ! retrievedReadGroup ) {
final SAMReadGroupRecord rg = super.getReadGroup();
// three cases: rg may be null (no rg, rg may already be a GATKSAMReadGroupRecord, or it may be
// a regular SAMReadGroupRecord in which case we have to make it a GATKSAMReadGroupRecord
if ( rg == null )
mReadGroup = null;
else if ( rg instanceof GATKSAMReadGroupRecord )
mReadGroup = (GATKSAMReadGroupRecord)rg;
else
mReadGroup = new GATKSAMReadGroupRecord(rg);
retrievedReadGroup = true;
}
return mReadGroup;
}
public void setReadGroup( final GATKSAMReadGroupRecord readGroup ) {
mReadGroup = readGroup;
retrievedReadGroup = true;
setAttribute("RG", mReadGroup.getId()); // todo -- this should be standardized, but we don't have access to SAMTagUtils!
}
@Override
public int hashCode() {
return super.hashCode();
@ -229,7 +255,7 @@ public class GATKSAMRecord extends BAMRecord {
byte [] quals = getExistingBaseInsertionQualities();
if( quals == null ) {
quals = new byte[getBaseQualities().length];
Arrays.fill(quals, (byte) 45); // Some day in the future when base insertion and base deletion quals exist the samtools API will
Arrays.fill(quals, DEFAULT_INSERTION_DELETION_QUAL); // Some day in the future when base insertion and base deletion quals exist the samtools API will
// be updated and the original quals will be pulled here, but for now we assume the original quality is a flat Q45
}
return quals;
@ -245,7 +271,7 @@ public class GATKSAMRecord extends BAMRecord {
byte[] quals = getExistingBaseDeletionQualities();
if( quals == null ) {
quals = new byte[getBaseQualities().length];
Arrays.fill(quals, (byte) 45); // Some day in the future when base insertion and base deletion quals exist the samtools API will
Arrays.fill(quals, DEFAULT_INSERTION_DELETION_QUAL); // Some day in the future when base insertion and base deletion quals exist the samtools API will
// be updated and the original quals will be pulled here, but for now we assume the original quality is a flat Q45
}
return quals;
@ -259,12 +285,6 @@ public class GATKSAMRecord extends BAMRecord {
return getReadGroup().getNGSPlatform();
}
public void setReadGroup( final GATKSAMReadGroupRecord readGroup ) {
mReadGroup = readGroup;
retrievedReadGroup = true;
setAttribute("RG", mReadGroup.getId()); // todo -- this should be standardized, but we don't have access to SAMTagUtils!
}
///////////////////////////////////////////////////////////////////////////////
// *** ReduceReads functions ***//
///////////////////////////////////////////////////////////////////////////////

View File

@ -47,6 +47,8 @@ public class MisencodedBaseQualityReadTransformer extends ReadTransformer {
final byte[] quals = read.getBaseQualities();
for ( int i = 0; i < quals.length; i++ ) {
quals[i] -= encodingFixValue;
if ( quals[i] < 0 )
throw new UserException.BadInput("while fixing mis-encoded base qualities we encountered a read that was correctly encoded; we cannot handle such a mixture of reads so unfortunately the BAM must be fixed with some other tool");
}
read.setBaseQualities(quals);
return read;

View File

@ -226,7 +226,7 @@ public class ReadUtils {
* @param read the read to test
* @return checks the read group tag PL for the default 454 tag
*/
public static boolean is454Read(SAMRecord read) {
public static boolean is454Read(GATKSAMRecord read) {
return NGSPlatform.fromRead(read) == NGSPlatform.LS454;
}
@ -236,7 +236,7 @@ public class ReadUtils {
* @param read the read to test
* @return checks the read group tag PL for the default ion tag
*/
public static boolean isIonRead(SAMRecord read) {
public static boolean isIonRead(GATKSAMRecord read) {
return NGSPlatform.fromRead(read) == NGSPlatform.ION_TORRENT;
}
@ -246,7 +246,7 @@ public class ReadUtils {
* @param read the read to test
* @return checks the read group tag PL for the default SOLiD tag
*/
public static boolean isSOLiDRead(SAMRecord read) {
public static boolean isSOLiDRead(GATKSAMRecord read) {
return NGSPlatform.fromRead(read) == NGSPlatform.SOLID;
}
@ -256,7 +256,7 @@ public class ReadUtils {
* @param read the read to test
* @return checks the read group tag PL for the default SLX tag
*/
public static boolean isIlluminaRead(SAMRecord read) {
public static boolean isIlluminaRead(GATKSAMRecord read) {
return NGSPlatform.fromRead(read) == NGSPlatform.ILLUMINA;
}
@ -268,7 +268,7 @@ public class ReadUtils {
* @param name the upper-cased platform name to test
* @return whether or not name == PL tag in the read group of read
*/
public static boolean isPlatformRead(SAMRecord read, String name) {
public static boolean isPlatformRead(GATKSAMRecord read, String name) {
SAMReadGroupRecord readGroup = read.getReadGroup();
if (readGroup != null) {

View File

@ -37,6 +37,7 @@ import org.broadinstitute.sting.utils.pileup.PileupElement;
import org.broadinstitute.sting.utils.pileup.ReadBackedPileup;
import org.broadinstitute.sting.utils.pileup.ReadBackedPileupImpl;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.utils.sam.GATKSAMReadGroupRecord;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.*;
@ -53,11 +54,11 @@ public class ArtificialReadPileupTestProvider {
final String artificialReadName = "synth";
final int artificialRefStart = 1;
final int artificialMappingQuality = 60;
Map<String, SAMReadGroupRecord> sample2RG = new HashMap<String, SAMReadGroupRecord>();
Map<String, GATKSAMReadGroupRecord> sample2RG = new HashMap<String, GATKSAMReadGroupRecord>();
List<SAMReadGroupRecord> sampleRGs;
List<String> sampleNames = new ArrayList<String>();
private String sampleName(int i) { return sampleNames.get(i); }
private SAMReadGroupRecord sampleRG(String name) { return sample2RG.get(name); }
private GATKSAMReadGroupRecord sampleRG(String name) { return sample2RG.get(name); }
public final int locStart = 105; // start position where we desire artificial variant
private final int readLength = 10; // desired read length in pileup
public final int readOffset = 4;
@ -75,7 +76,7 @@ public class ArtificialReadPileupTestProvider {
for ( int i = 0; i < numSamples; i++ ) {
sampleNames.add(String.format("%s%04d", SAMPLE_PREFIX, i));
SAMReadGroupRecord rg = createRG(sampleName(i));
GATKSAMReadGroupRecord rg = createRG(sampleName(i));
sampleRGs.add(rg);
sample2RG.put(sampleName(i), rg);
}
@ -134,8 +135,8 @@ public class ArtificialReadPileupTestProvider {
return contexts;
}
private SAMReadGroupRecord createRG(String name) {
SAMReadGroupRecord rg = new SAMReadGroupRecord(name);
private GATKSAMReadGroupRecord createRG(String name) {
GATKSAMReadGroupRecord rg = new GATKSAMReadGroupRecord(name);
rg.setPlatform("ILLUMINA");
rg.setSample(name);
return rg;
@ -189,7 +190,7 @@ public class ArtificialReadPileupTestProvider {
read.setMappingQuality(artificialMappingQuality);
read.setReferenceName(loc.getContig());
read.setReadNegativeStrandFlag(false);
read.setAttribute("RG", sampleRG(sample).getReadGroupId());
read.setReadGroup(sampleRG(sample));
pileupElements.add(new PileupElement(read,readOffset,false,isBeforeDeletion, false, isBeforeInsertion,false,false,altBases,Math.abs(eventLength)));

View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.nanoScheduler;
import org.broadinstitute.sting.BaseTest;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* UnitTests for the InputProducer
*
* User: depristo
* Date: 8/24/12
* Time: 11:25 AM
* To change this template use File | Settings | File Templates.
*/
public class MapResultUnitTest {
@DataProvider(name = "CompareTester")
public Object[][] createCompareTester() {
List<Object[]> tests = new ArrayList<Object[]>();
for ( int id1 = 0; id1 < 10; id1++ ) {
for ( int id2 = 0; id2 < 10; id2++ ) {
tests.add(new Object[]{ id1, id2, Integer.valueOf(id1).compareTo(id2)});
}
}
return tests.toArray(new Object[][]{});
}
@Test(enabled = true, dataProvider = "CompareTester")
public void testInputProducer(final int id1, final int id2, final int comp ) throws InterruptedException {
final MapResult<Integer> mr1 = new MapResult<Integer>(id1, id1);
final MapResult<Integer> mr2 = new MapResult<Integer>(id2, id2);
Assert.assertEquals(mr1.compareTo(mr2), comp, "Compare MapResultsUnitTest failed");
}
}

View File

@ -101,7 +101,7 @@ public class NanoSchedulerUnitTest extends BaseTest {
public int nExpectedCallbacks() {
int nElements = Math.max(end - start, 0);
return nElements / bufferSize;
return nElements / bufferSize / NanoScheduler.UPDATE_PROGRESS_FREQ;
}
public Map2x makeMap() { return addDelays ? new Map2xWithDelays() : new Map2x(); }

View File

@ -11,10 +11,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
* UnitTests for Reducer
@ -30,19 +27,17 @@ public class ReducerUnitTest extends BaseTest {
List<Object[]> tests = new ArrayList<Object[]>();
for ( final int groupSize : Arrays.asList(-1, 1, 5, 50, 500, 5000, 50000) ) {
for ( final boolean setJobIDAtStart : Arrays.asList(true, false) ) {
for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) {
if ( groupSize < nElements ) {
for ( final List<MapResult<Integer>> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) {
tests.add(new Object[]{ new ListOfJobs(jobs), setJobIDAtStart, groupSize });
}
for ( final int nElements : Arrays.asList(0, 1, 3, 5) ) {
if ( groupSize < nElements ) {
for ( final List<MapResult<Integer>> jobs : Utils.makePermutations(makeJobs(nElements), nElements, false) ) {
tests.add(new Object[]{ new ListOfJobs(jobs), groupSize });
}
}
}
for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) {
if ( groupSize < nElements ) {
tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), setJobIDAtStart, groupSize });
}
for ( final int nElements : Arrays.asList(10, 100, 1000, 10000, 100000, 1000000) ) {
if ( groupSize < nElements ) {
tests.add(new Object[]{ new ListOfJobs(makeJobs(nElements)), groupSize });
}
}
}
@ -80,15 +75,11 @@ public class ReducerUnitTest extends BaseTest {
}
@Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME)
public void testReducerThread(final List<MapResult<Integer>> jobs, final boolean setJobIDAtStart, final int groupSize) throws Exception {
runTests(jobs, setJobIDAtStart, groupSize);
}
private void runTests( final List<MapResult<Integer>> allJobs, boolean setJobIDAtStart, int groupSize ) throws Exception {
public void testReducerThread(final List<MapResult<Integer>> allJobs, int groupSize) throws Exception {
if ( groupSize == -1 )
groupSize = allJobs.size();
final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>();
final MapResultsQueue<Integer> mapResultsQueue = new MapResultsQueue<Integer>();
final List<List<MapResult<Integer>>> jobGroups = Utils.groupList(allJobs, groupSize);
final ReduceSumTest reduce = new ReduceSumTest();
@ -98,68 +89,93 @@ public class ReducerUnitTest extends BaseTest {
final ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(waitingThread);
int lastJobID = -1;
int nJobsSubmitted = 0;
int jobGroupCount = 0;
final int lastJobGroupCount = jobGroups.size() - 1;
setJobIDAtStart = setJobIDAtStart && groupSize == 1;
for ( final List<MapResult<Integer>> jobs : jobGroups ) {
//logger.warn("Processing job group " + jobGroupCount + " with " + jobs.size() + " jobs");
for ( final MapResult<Integer> job : jobs ) {
mapResultsQueue.add(job);
lastJobID = Math.max(lastJobID, job.getJobID());
mapResultsQueue.put(job);
nJobsSubmitted++;
}
if ( jobGroupCount == lastJobGroupCount ) {
mapResultsQueue.add(new MapResult<Integer>());
mapResultsQueue.put(new MapResult<Integer>(lastJobID+1));
nJobsSubmitted++;
}
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed at the start");
if ( jobGroupCount == 0 && setJobIDAtStart ) {
// only can do the setJobID if jobs cannot be submitted out of order
reducer.setTotalJobCount(allJobs.size());
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed even after setting last job if we haven't processed anything");
}
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue);
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
Assert.assertTrue(nReduced <= nJobsSubmitted, "Somehow reduced more jobs than submitted");
if ( setJobIDAtStart ) {
final boolean submittedLastJob = jobGroupCount == lastJobGroupCount;
Assert.assertEquals(reducer.latchIsReleased(), submittedLastJob,
"When last job is set, latch should only be released if the last job has been submitted");
} else {
Assert.assertEquals(reducer.latchIsReleased(), false, "When last job isn't set, latch should never be release");
}
jobGroupCount++;
}
if ( setJobIDAtStart )
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing with last job id being set");
else {
Assert.assertFalse(reducer.latchIsReleased(), "Latch should be closed after reducing without last job id being set");
reducer.setTotalJobCount(allJobs.size());
Assert.assertTrue(reducer.latchIsReleased(), "Latch should be released after reducing after setting last job id ");
}
Assert.assertEquals(reduce.nRead, allJobs.size(), "number of read values not all of the values in the reducer queue");
es.shutdown();
es.awaitTermination(1, TimeUnit.HOURS);
}
@Test(expectedExceptions = IllegalStateException.class)
private void runSettingJobIDTwice() throws Exception {
final PriorityBlockingQueue<MapResult<Integer>> mapResultsQueue = new PriorityBlockingQueue<MapResult<Integer>>();
@Test(timeOut = 1000, invocationCount = 100)
private void testNonBlockingReduce() throws Exception {
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
final MapResultsQueue<Integer> mapResultsQueue = new MapResultsQueue<Integer>();
mapResultsQueue.put(new MapResult<Integer>(0, 0));
mapResultsQueue.put(new MapResult<Integer>(1, 1));
reducer.setTotalJobCount(10);
reducer.setTotalJobCount(15);
final CountDownLatch latch = new CountDownLatch(1);
final ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(new Runnable() {
@Override
public void run() {
reducer.acquireReduceLock(true);
latch.countDown();
}
});
latch.await();
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, false);
Assert.assertEquals(nReduced, 0, "The reducer lock was already held but we did some work");
es.shutdown();
es.awaitTermination(1, TimeUnit.HOURS);
}
@Test(timeOut = 10000, invocationCount = 100)
private void testBlockingReduce() throws Exception {
final Reducer<Integer, Integer> reducer = new Reducer<Integer, Integer>(new ReduceSumTest(), new MultiThreadedErrorTracker(), 0);
final MapResultsQueue<Integer> mapResultsQueue = new MapResultsQueue<Integer>();
mapResultsQueue.put(new MapResult<Integer>(0, 0));
mapResultsQueue.put(new MapResult<Integer>(1, 1));
final CountDownLatch latch = new CountDownLatch(1);
final ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(new Runnable() {
@Override
public void run() {
reducer.acquireReduceLock(true);
latch.countDown();
try {
Thread.sleep(100);
} catch ( InterruptedException e ) {
;
} finally {
reducer.releaseReduceLock();
}
}
});
latch.await();
final int nReduced = reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
Assert.assertEquals(nReduced, 2, "The reducer should have blocked until the lock was freed and reduced 2 values");
es.shutdown();
es.awaitTermination(1, TimeUnit.HOURS);
}
public class ReduceSumTest implements NSReduceFunction<Integer, Integer> {
int nRead = 0;
int lastValue = -1;
@ -188,12 +204,8 @@ public class ReducerUnitTest extends BaseTest {
@Override
public void run() {
try {
final int observedSum = reducer.waitForFinalReduce();
Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value");
} catch ( InterruptedException ex ) {
Assert.fail("Got interrupted");
}
final int observedSum = reducer.getReduceResult();
Assert.assertEquals(observedSum, expectedSum, "Reduce didn't sum to expected value");
}
}
}

View File

@ -106,6 +106,11 @@ public class RecalDatumUnitTest extends BaseTest {
Assert.assertEquals(datum.getEstimatedQReportedAsByte(), cfg.getReportedQual());
BaseTest.assertEqualsDoubleSmart(datum.getEmpiricalQuality(), cfg.getErrorRatePhredScaled());
BaseTest.assertEqualsDoubleSmart(datum.getEmpiricalErrorRate(), cfg.getErrorRate());
final double e = datum.getEmpiricalQuality();
Assert.assertTrue(datum.getEmpiricalQualityAsByte() >= Math.floor(e));
Assert.assertTrue(datum.getEmpiricalQualityAsByte() <= Math.ceil(e));
Assert.assertNotNull(datum.toString());
}
@Test(dataProvider = "RecalDatumTestProvider")
@ -145,10 +150,32 @@ public class RecalDatumUnitTest extends BaseTest {
cfg.exTotal++;
assertBasicFeaturesOfRecalDatum(datum, cfg);
datum = cfg.makeRecalDatum();
datum.increment(false);
cfg.exTotal++;
assertBasicFeaturesOfRecalDatum(datum, cfg);
datum = cfg.makeRecalDatum();
datum.incrementNumObservations(2);
cfg.exTotal += 2;
assertBasicFeaturesOfRecalDatum(datum, cfg);
datum = cfg.makeRecalDatum();
datum.incrementNumMismatches(2);
cfg.exError += 2;
assertBasicFeaturesOfRecalDatum(datum, cfg);
datum = cfg.makeRecalDatum();
datum.increment(10, 5);
cfg.exError += 5;
cfg.exTotal += 10;
assertBasicFeaturesOfRecalDatum(datum, cfg);
}
@Test
public void testNoObs() {
final RecalDatum rd = new RecalDatum(0, 0, (byte)10);
Assert.assertEquals(rd.getEmpiricalErrorRate(), 0.0);
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.recalibration;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.Utils;
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
public final class RecalUtilsUnitTest extends BaseTest {
private class Row {
int rg, qual, ne, no;
private Row(final Row copy) {
this(copy.rg, copy.qual, copy.ne, copy.no);
}
private Row(int rg, int qual, int ne, int no) {
this.rg = rg;
this.qual = qual;
this.ne = ne;
this.no = no;
}
@Override
public String toString() {
return "Row{" +
"" + rg +
", " + qual +
", " + ne +
", " + no +
'}';
}
}
@DataProvider(name = "CombineTablesProvider")
public Object[][] createCombineTablesProvider() {
List<Object[]> tests = new ArrayList<Object[]>();
final List<Row> rows = new ArrayList<Row>();
for ( final int rg : Arrays.asList(0, 1) ) {
for ( final int qual : Arrays.asList(0, 1) ) {
rows.add(new Row(rg, qual, 1, 10));
}
}
logger.warn("Number of rows " + rows.size());
List<List<Row>> permutations = new LinkedList<List<Row>>();
permutations.addAll(Utils.makePermutations(rows, 1, false));
permutations.addAll(Utils.makePermutations(rows, 2, false));
permutations.addAll(Utils.makePermutations(rows, 3, false));
// adding 1 row to 2
for ( final List<Row> table1 : permutations ) {
for ( final Row table2 : rows ) {
tests.add(new Object[]{table1, Arrays.asList(table2)});
}
}
// adding 2 rows to 1
for ( final List<Row> table1 : permutations ) {
for ( final Row table2 : rows ) {
tests.add(new Object[]{Arrays.asList(table2), table1});
}
}
for ( final List<Row> table1 : permutations ) {
for ( final List<Row> table2 : permutations ) {
tests.add(new Object[]{table1, table2});
}
}
return tests.toArray(new Object[][]{});
}
@Test(dataProvider = "CombineTablesProvider")
public void testCombineTables(final List<Row> table1, final List<Row> table2) {
final NestedIntegerArray<RecalDatum> nia1 = makeTable(table1);
final NestedIntegerArray<RecalDatum> nia2 = makeTable(table2);
final List<Row> expectedRows = makeExpected(table1, table2);
final NestedIntegerArray<RecalDatum> expected = makeTable(expectedRows);
RecalUtils.combineTables(nia1, nia2);
Assert.assertEquals(nia1.getDimensions(), expected.getDimensions());
Assert.assertEquals(nia1.getAllValues().size(), expected.getAllValues().size());
for ( final NestedIntegerArray.Leaf<RecalDatum> leaf : expected.getAllLeaves() ) {
final RecalDatum actual = nia1.get(leaf.keys);
Assert.assertEquals(actual.getNumMismatches(), leaf.value.getNumMismatches());
Assert.assertEquals(actual.getNumObservations(), leaf.value.getNumObservations());
}
}
public List<Row> makeExpected(final List<Row> table1, final List<Row> table2) {
final List<Row> combined = new LinkedList<Row>();
for ( final Row t1 : table1 ) combined.add(new Row(t1));
for ( final Row t2 : table2 ) {
combine(combined, t2);
}
return combined;
}
private void combine(final List<Row> combined, final Row row) {
for ( final Row c : combined ) {
if ( c.rg == row.rg && c.qual == row.qual ) {
c.ne += row.ne;
c.no += row.no;
return;
}
}
combined.add(new Row(row));
}
public NestedIntegerArray<RecalDatum> makeTable(final List<Row> rows) {
final NestedIntegerArray<RecalDatum> x = new NestedIntegerArray<RecalDatum>(3, 3);
for ( final Row r : rows )
x.put(new RecalDatum(r.no, r.ne, (byte)10), r.rg, r.qual);
return x;
}
}

View File

@ -4,16 +4,12 @@ import org.broadinstitute.sting.gatk.walkers.bqsr.RecalibrationArgumentCollectio
import org.broadinstitute.sting.utils.recalibration.covariates.*;
import org.broadinstitute.sting.utils.QualityUtils;
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.sam.GATKSAMReadGroupRecord;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.broadinstitute.sting.utils.sam.ReadUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.util.*;
/**
@ -21,7 +17,15 @@ import java.util.*;
* @since 4/21/12
*/
public class RecalibrationReportUnitTest {
@Test(enabled = false)
private static RecalDatum createRandomRecalDatum(int maxObservations, int maxErrors) {
final Random random = new Random();
final int nObservations = random.nextInt(maxObservations);
final int nErrors = random.nextInt(maxErrors);
final int qual = random.nextInt(QualityUtils.MAX_QUAL_SCORE);
return new RecalDatum(nObservations, nErrors, (byte)qual);
}
@Test(enabled = true)
public void testOutput() {
final int length = 100;
@ -71,7 +75,7 @@ public class RecalibrationReportUnitTest {
readQuals[i] = 20;
read.setBaseQualities(readQuals);
final int expectedKeys = expectedNumberOfKeys(4, length, RAC.INDELS_CONTEXT_SIZE, RAC.MISMATCHES_CONTEXT_SIZE);
final int expectedKeys = expectedNumberOfKeys(length, RAC.INDELS_CONTEXT_SIZE, RAC.MISMATCHES_CONTEXT_SIZE);
int nKeys = 0; // keep track of how many keys were produced
final ReadCovariates rc = RecalUtils.computeCovariates(read, requestedCovariates);
@ -86,40 +90,30 @@ public class RecalibrationReportUnitTest {
final int[] covariates = rc.getKeySet(offset, errorMode);
final int randomMax = errorMode == EventType.BASE_SUBSTITUTION ? 10000 : 100000;
rgTable.put(RecalDatum.createRandomRecalDatum(randomMax, 10), covariates[0], errorMode.index);
qualTable.put(RecalDatum.createRandomRecalDatum(randomMax, 10), covariates[0], covariates[1], errorMode.index);
rgTable.put(createRandomRecalDatum(randomMax, 10), covariates[0], errorMode.index);
qualTable.put(createRandomRecalDatum(randomMax, 10), covariates[0], covariates[1], errorMode.index);
nKeys += 2;
for (int j = 0; j < optionalCovariates.size(); j++) {
final NestedIntegerArray<RecalDatum> covTable = recalibrationTables.getTable(RecalibrationTables.TableType.OPTIONAL_COVARIATE_TABLES_START.index + j);
covTable.put(RecalDatum.createRandomRecalDatum(randomMax, 10), covariates[0], covariates[1], j, covariates[RecalibrationTables.TableType.OPTIONAL_COVARIATE_TABLES_START.index + j], errorMode.index);
nKeys++;
final int covValue = covariates[RecalibrationTables.TableType.OPTIONAL_COVARIATE_TABLES_START.index + j];
if ( covValue >= 0 ) {
covTable.put(createRandomRecalDatum(randomMax, 10), covariates[0], covariates[1], covValue, errorMode.index);
nKeys++;
}
}
}
}
Assert.assertEquals(nKeys, expectedKeys);
final RecalibrationReport report = new RecalibrationReport(quantizationInfo, recalibrationTables, RAC.generateReportTable("ignore"), RAC);
File output = new File("RecalibrationReportUnitTestOutuput.grp");
PrintStream out;
try {
out = new PrintStream(output);
} catch (FileNotFoundException e) {
throw new ReviewedStingException("couldn't create the file " + output, e);
}
report.output(out);
RecalibrationReport loadedReport = new RecalibrationReport(output);
Assert.assertTrue(report.equals(loadedReport));
if (!output.delete())
throw new ReviewedStingException("File could not be deleted " + output);
}
private static int expectedNumberOfKeys (int nCovariates, int readLength, int indelContextSize, int mismatchesContextSize) {
int nommcs = readLength >= mismatchesContextSize ? mismatchesContextSize-1 : readLength;
int noincs = readLength >= indelContextSize ? 2*(indelContextSize-1) : 2*readLength;
return (nCovariates * readLength * 3) - nommcs - noincs;
private static int expectedNumberOfKeys (int readLength, int indelContextSize, int mismatchesContextSize) {
final int numCovariates = 4;
final int numTables = 3;
final int mismatchContextPadding = mismatchesContextSize - 1;
final int indelContextPadding = 2 * (indelContextSize - 1);
final int indelCyclePadding = 2 * (2 * CycleCovariate.CUSHION_FOR_INDELS);
return (numCovariates * numTables * readLength) - mismatchContextPadding - indelContextPadding - indelCyclePadding;
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.recalibration;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
import org.broadinstitute.sting.utils.recalibration.covariates.*;
import org.testng.Assert;
import org.testng.annotations.Test;
public final class RecalibrationTablesUnitTest extends BaseTest {
@Test
public void basicTest() {
final Covariate[] covariates = RecalibrationTestUtils.makeInitializedStandardCovariates();
final int numReadGroups = 6;
final RecalibrationTables tables = new RecalibrationTables(covariates, numReadGroups);
final Covariate qualCov = covariates[1];
final Covariate cycleCov = covariates[2];
final Covariate contextCov = covariates[3];
Assert.assertEquals(tables.numTables(), covariates.length);
Assert.assertNotNull(tables.getReadGroupTable());
Assert.assertEquals(tables.getReadGroupTable(), tables.getTable(RecalibrationTables.TableType.READ_GROUP_TABLE.index));
testDimensions(tables.getReadGroupTable(), numReadGroups);
Assert.assertNotNull(tables.getQualityScoreTable());
Assert.assertEquals(tables.getQualityScoreTable(), tables.getTable(RecalibrationTables.TableType.QUALITY_SCORE_TABLE.index));
testDimensions(tables.getQualityScoreTable(), numReadGroups, qualCov.maximumKeyValue() + 1);
Assert.assertNotNull(tables.getTable(2));
testDimensions(tables.getTable(2), numReadGroups, qualCov.maximumKeyValue() + 1, cycleCov.maximumKeyValue() + 1);
Assert.assertNotNull(tables.getTable(3));
testDimensions(tables.getTable(3), numReadGroups, qualCov.maximumKeyValue() + 1, contextCov.maximumKeyValue() + 1);
}
private void testDimensions(final NestedIntegerArray<RecalDatum> table, final int ... dimensions) {
final int[] dim = new int[dimensions.length+1];
System.arraycopy(dimensions, 0, dim, 0, dimensions.length);
dim[dimensions.length] = EventType.values().length;
Assert.assertEquals(table.getDimensions().length, dim.length);
for ( int i = 0; i < dim.length; i++ ) {
Assert.assertEquals(table.getDimensions()[i], dim[i], "Table dimensions not expected at dim " + i);
}
}
@Test
public void basicMakeQualityScoreTable() {
final Covariate[] covariates = RecalibrationTestUtils.makeInitializedStandardCovariates();
final int numReadGroups = 6;
final RecalibrationTables tables = new RecalibrationTables(covariates, numReadGroups);
final Covariate qualCov = covariates[1];
final NestedIntegerArray<RecalDatum> copy = tables.makeQualityScoreTable();
testDimensions(copy, numReadGroups, qualCov.maximumKeyValue()+1);
Assert.assertEquals(copy.getAllValues().size(), 0);
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.recalibration;
import org.broadinstitute.sting.gatk.walkers.bqsr.RecalibrationArgumentCollection;
import org.broadinstitute.sting.utils.recalibration.covariates.*;
/**
* Created with IntelliJ IDEA.
* User: depristo
* Date: 12/23/12
* Time: 1:06 PM
* To change this template use File | Settings | File Templates.
*/
public class RecalibrationTestUtils {
public static Covariate[] makeInitializedStandardCovariates() {
final RecalibrationArgumentCollection RAC = new RecalibrationArgumentCollection();
final Covariate[] covariates = new Covariate[4];
covariates[0] = new ReadGroupCovariate();
covariates[1] = new QualityScoreCovariate();
covariates[2] = new ContextCovariate();
covariates[3] = new CycleCovariate();
for ( Covariate cov : covariates ) cov.initialize(RAC);
return covariates;
}
}