diff --git a/build.xml b/build.xml
index 49efd616d..47e4eeb47 100644
--- a/build.xml
+++ b/build.xml
@@ -107,6 +107,12 @@
+
+
+
+
+
+
@@ -267,19 +273,19 @@
-
-
+
+
+
+
+
+
-
-
+
+
+
+
+
+
@@ -596,6 +602,7 @@
+
-
@@ -1121,6 +1127,7 @@
+
@@ -1128,6 +1135,21 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1207,6 +1229,7 @@
+
@@ -1220,10 +1243,11 @@
listeners="org.testng.reporters.FailedReporter,org.testng.reporters.JUnitXMLReporter,org.broadinstitute.sting.TestNGTestTransformer,org.broadinstitute.sting.StingTextReporter,org.uncommons.reportng.HTMLReporter">
+
-
+
@@ -1262,6 +1286,7 @@
+
@@ -1374,6 +1399,7 @@
+
diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/AdvancedRecalibrationEngine.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/AdvancedRecalibrationEngine.java
index ac280b70e..255f1fd05 100644
--- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/AdvancedRecalibrationEngine.java
+++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/AdvancedRecalibrationEngine.java
@@ -25,46 +25,46 @@ package org.broadinstitute.sting.gatk.walkers.bqsr;
* OTHER DEALINGS IN THE SOFTWARE.
*/
+import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.classloader.ProtectedPackageSource;
+import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
import org.broadinstitute.sting.utils.recalibration.EventType;
import org.broadinstitute.sting.utils.recalibration.ReadCovariates;
-import org.broadinstitute.sting.utils.recalibration.RecalibrationTables;
-import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
+import org.broadinstitute.sting.utils.recalibration.RecalDatum;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
-import org.broadinstitute.sting.utils.threading.ThreadLocalArray;
+
+import java.util.LinkedList;
+import java.util.List;
public class AdvancedRecalibrationEngine extends StandardRecalibrationEngine implements ProtectedPackageSource {
+ private final static Logger logger = Logger.getLogger(AdvancedRecalibrationEngine.class);
- // optimization: only allocate temp arrays once per thread
- private final ThreadLocal threadLocalTempQualArray = new ThreadLocalArray(EventType.values().length, byte.class);
- private final ThreadLocal threadLocalTempFractionalErrorArray = new ThreadLocalArray(EventType.values().length, double.class);
-
- public void initialize(final Covariate[] covariates, final RecalibrationTables recalibrationTables) {
- super.initialize(covariates, recalibrationTables);
- }
+ final List> allThreadLocalQualityScoreTables = new LinkedList>();
+ private ThreadLocal> threadLocalQualityScoreTables = new ThreadLocal>() {
+ @Override
+ protected synchronized NestedIntegerArray initialValue() {
+ final NestedIntegerArray table = recalibrationTables.makeQualityScoreTable();
+ allThreadLocalQualityScoreTables.add(table);
+ return table;
+ }
+ };
@Override
- public void updateDataForRead(final GATKSAMRecord read, final boolean[] skip, final double[] snpErrors, final double[] insertionErrors, final double[] deletionErrors ) {
- final ReadCovariates readCovariates = covariateKeySetFrom(read);
- byte[] tempQualArray = threadLocalTempQualArray.get();
- double[] tempFractionalErrorArray = threadLocalTempFractionalErrorArray.get();
+ public void updateDataForRead( final ReadRecalibrationInfo recalInfo ) {
+ final GATKSAMRecord read = recalInfo.getRead();
+ final ReadCovariates readCovariates = recalInfo.getCovariatesValues();
+ final NestedIntegerArray qualityScoreTable = getThreadLocalQualityScoreTable();
for( int offset = 0; offset < read.getReadBases().length; offset++ ) {
- if( !skip[offset] ) {
- tempQualArray[EventType.BASE_SUBSTITUTION.index] = read.getBaseQualities()[offset];
- tempFractionalErrorArray[EventType.BASE_SUBSTITUTION.index] = snpErrors[offset];
- tempQualArray[EventType.BASE_INSERTION.index] = read.getBaseInsertionQualities()[offset];
- tempFractionalErrorArray[EventType.BASE_INSERTION.index] = insertionErrors[offset];
- tempQualArray[EventType.BASE_DELETION.index] = read.getBaseDeletionQualities()[offset];
- tempFractionalErrorArray[EventType.BASE_DELETION.index] = deletionErrors[offset];
+ if( ! recalInfo.skip(offset) ) {
for (final EventType eventType : EventType.values()) {
final int[] keys = readCovariates.getKeySet(offset, eventType);
final int eventIndex = eventType.index;
- final byte qual = tempQualArray[eventIndex];
- final double isError = tempFractionalErrorArray[eventIndex];
+ final byte qual = recalInfo.getQual(eventType, offset);
+ final double isError = recalInfo.getErrorFraction(eventType, offset);
- incrementDatumOrPutIfNecessary(recalibrationTables.getQualityScoreTable(), qual, isError, keys[0], keys[1], eventIndex);
+ incrementDatumOrPutIfNecessary(qualityScoreTable, qual, isError, keys[0], keys[1], eventIndex);
for (int i = 2; i < covariates.length; i++) {
if (keys[i] < 0)
@@ -76,4 +76,24 @@ public class AdvancedRecalibrationEngine extends StandardRecalibrationEngine imp
}
}
}
+
+ /**
+ * Get a NestedIntegerArray for a QualityScore table specific to this thread
+ * @return a non-null NestedIntegerArray ready to be used to collect calibration info for the quality score covariate
+ */
+ private NestedIntegerArray getThreadLocalQualityScoreTable() {
+ return threadLocalQualityScoreTables.get();
+ }
+
+ @Override
+ public void finalizeData() {
+ // merge in all of the thread local tables
+ logger.info("Merging " + allThreadLocalQualityScoreTables.size() + " thread-local quality score tables");
+ for ( final NestedIntegerArray localTable : allThreadLocalQualityScoreTables ) {
+ recalibrationTables.combineQualityScoreTable(localTable);
+ }
+ allThreadLocalQualityScoreTables.clear(); // cleanup after ourselves
+
+ super.finalizeData();
+ }
}
diff --git a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java
index 59f241cdb..e1a1ed6d2 100644
--- a/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java
+++ b/protected/java/src/org/broadinstitute/sting/gatk/walkers/haplotypecaller/LikelihoodCalculationEngine.java
@@ -120,6 +120,11 @@ public class LikelihoodCalculationEngine {
for( int jjj = 0; jjj < numHaplotypes; jjj++ ) {
final Haplotype haplotype = haplotypes.get(jjj);
+
+ // TODO -- need to test against a reference/position with non-standard bases
+ if ( !Allele.acceptableAlleleBases(haplotype.getBases(), false) )
+ continue;
+
final int haplotypeStart = ( previousHaplotypeSeen == null ? 0 : computeFirstDifferingPosition(haplotype.getBases(), previousHaplotypeSeen.getBases()) );
previousHaplotypeSeen = haplotype;
diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java
index e99814278..5c932fdce 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java
@@ -44,6 +44,7 @@ import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.baq.ReadTransformingIterator;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
+import org.broadinstitute.sting.utils.sam.GATKSAMReadGroupRecord;
import org.broadinstitute.sting.utils.sam.GATKSamRecordFactory;
import java.io.File;
@@ -252,9 +253,10 @@ public class SAMDataSource {
if(readBufferSize != null)
ReadShard.setReadBufferSize(readBufferSize); // TODO: use of non-final static variable here is just awful, especially for parallel tests
else {
- // Choose a sensible default for the read buffer size. For the moment, we're picking 1000 reads per BAM per shard (which effectively
- // will mean per-thread once ReadWalkers are parallelized) with a max cap of 250K reads in memory at once.
- ReadShard.setReadBufferSize(Math.min(10000*samFiles.size(),250000));
+ // Choose a sensible default for the read buffer size.
+ // Previously we we're picked 100000 reads per BAM per shard with a max cap of 250K reads in memory at once.
+ // Now we are simply setting it to 100K reads
+ ReadShard.setReadBufferSize(100000);
}
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
@@ -894,9 +896,11 @@ public class SAMDataSource {
long lastTick = timer.currentTime();
for(final SAMReaderID readerID: readerIDs) {
final ReaderInitializer init = new ReaderInitializer(readerID).call();
+
if (removeProgramRecords) {
init.reader.getFileHeader().setProgramRecords(new ArrayList());
}
+
if (threadAllocation.getNumIOThreads() > 0) {
inputStreams.put(init.readerID, init.blockInputStream); // get from initializer
}
@@ -916,6 +920,13 @@ public class SAMDataSource {
for(SAMFileReader reader: readers.values())
headers.add(reader.getFileHeader());
headerMerger = new SamFileHeaderMerger(SAMFileHeader.SortOrder.coordinate,headers,true);
+
+ // update all read groups to GATKSAMRecordReadGroups
+ final List gatkReadGroups = new LinkedList();
+ for ( final SAMReadGroupRecord rg : headerMerger.getMergedHeader().getReadGroups() ) {
+ gatkReadGroups.add(new GATKSAMReadGroupRecord(rg));
+ }
+ headerMerger.getMergedHeader().setReadGroups(gatkReadGroups);
}
final private void printReaderPerformance(final int nExecutedTotal,
diff --git a/public/java/src/org/broadinstitute/sting/gatk/filters/Platform454Filter.java b/public/java/src/org/broadinstitute/sting/gatk/filters/Platform454Filter.java
index 8ad91ac1c..a1f2a877b 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/filters/Platform454Filter.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/filters/Platform454Filter.java
@@ -26,6 +26,7 @@
package org.broadinstitute.sting.gatk.filters;
import net.sf.samtools.SAMRecord;
+import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.broadinstitute.sting.utils.sam.ReadUtils;
/**
@@ -37,6 +38,6 @@ import org.broadinstitute.sting.utils.sam.ReadUtils;
public class Platform454Filter extends ReadFilter {
public boolean filterOut(SAMRecord rec) {
- return (ReadUtils.is454Read(rec));
+ return (ReadUtils.is454Read((GATKSAMRecord)rec));
}
}
\ No newline at end of file
diff --git a/public/java/src/org/broadinstitute/sting/gatk/filters/PlatformFilter.java b/public/java/src/org/broadinstitute/sting/gatk/filters/PlatformFilter.java
index 8e241bb2c..de5be94bc 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/filters/PlatformFilter.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/filters/PlatformFilter.java
@@ -27,6 +27,7 @@ package org.broadinstitute.sting.gatk.filters;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.commandline.Argument;
+import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.broadinstitute.sting.utils.sam.ReadUtils;
/**
@@ -41,7 +42,7 @@ public class PlatformFilter extends ReadFilter {
public boolean filterOut(SAMRecord rec) {
for ( String name : PLFilterNames )
- if ( ReadUtils.isPlatformRead(rec, name.toUpperCase() ))
+ if ( ReadUtils.isPlatformRead((GATKSAMRecord)rec, name.toUpperCase() ))
return true;
return false;
}
diff --git a/public/java/src/org/broadinstitute/sting/gatk/phonehome/GATKRunReport.java b/public/java/src/org/broadinstitute/sting/gatk/phonehome/GATKRunReport.java
index 51fed470f..2bc14aa69 100644
--- a/public/java/src/org/broadinstitute/sting/gatk/phonehome/GATKRunReport.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/phonehome/GATKRunReport.java
@@ -131,7 +131,7 @@ public class GATKRunReport {
private String hostName;
@Element(required = true, name = "java")
- private String java;
+ private String javaVersion;
@Element(required = true, name = "machine")
private String machine;
@@ -212,7 +212,7 @@ public class GATKRunReport {
hostName = Utils.resolveHostname();
// basic java information
- java = Utils.join("-", Arrays.asList(System.getProperty("java.vendor"), System.getProperty("java.version")));
+ javaVersion = Utils.join("-", Arrays.asList(System.getProperty("java.vendor"), System.getProperty("java.version")));
machine = Utils.join("-", Arrays.asList(System.getProperty("os.name"), System.getProperty("os.arch")));
// if there was an exception, capture it
diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java
index cd0198a29..ee71d82bb 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java
@@ -40,6 +40,7 @@ import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.Iterator;
+import java.util.LinkedList;
/**
* A nano-scheduling version of TraverseReads.
@@ -53,6 +54,7 @@ import java.util.Iterator;
*/
public class TraverseReadsNano extends TraversalEngine,ReadShardDataProvider> {
/** our log, which we want to capture anything from this class */
+ private final static boolean PRE_READ_ALL_MAP_DATA = true;
protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class);
private static final boolean DEBUG = false;
final NanoScheduler nanoScheduler;
@@ -111,7 +113,19 @@ public class TraverseReadsNano extends TraversalEngine,
* should execute
*/
private Iterator aggregateMapData(final ReadShardDataProvider dataProvider) {
- return new Iterator() {
+ final Iterator it = makeDataIterator(dataProvider);
+ if ( PRE_READ_ALL_MAP_DATA ) {
+ final LinkedList l = new LinkedList();
+ while ( it.hasNext() ) l.add(it.next());
+ return l.iterator();
+ } else {
+ return it;
+ }
+ }
+
+
+ private Iterator makeDataIterator(final ReadShardDataProvider dataProvider) {
+ return new Iterator () {
final ReadView reads = new ReadView(dataProvider);
final ReadReferenceView reference = new ReadReferenceView(dataProvider);
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java
index 6931235b8..7692c58e2 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/BaseRecalibrator.java
@@ -38,6 +38,7 @@ import org.broadinstitute.sting.gatk.filters.*;
import org.broadinstitute.sting.gatk.iterators.ReadTransformer;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.*;
+import org.broadinstitute.sting.utils.MathUtils;
import org.broadinstitute.variant.utils.BaseUtils;
import org.broadinstitute.sting.utils.baq.BAQ;
import org.broadinstitute.sting.utils.classloader.GATKLiteUtils;
@@ -135,6 +136,7 @@ public class BaseRecalibrator extends ReadWalker implements NanoSche
private BAQ baq; // BAQ the reads on the fly to generate the alignment uncertainty vector
private IndexedFastaSequenceFile referenceReader; // fasta reference reader for use with BAQ calculation
+ private final static byte NO_BAQ_UNCERTAINTY = (byte)'@';
/**
* Parse the -cov arguments and create a list of covariates to be used here
@@ -225,25 +227,48 @@ public class BaseRecalibrator extends ReadWalker implements NanoSche
if (!RecalUtils.isColorSpaceConsistent(RAC.SOLID_NOCALL_STRATEGY, read)) { // parse the solid color space and check for color no-calls
return 0L; // skip this read completely
}
- read.setTemporaryAttribute(COVARS_ATTRIBUTE, RecalUtils.computeCovariates(read, requestedCovariates));
- final boolean[] skip = calculateSkipArray(read, metaDataTracker); // skip known sites of variation as well as low quality and non-regular bases
final int[] isSNP = calculateIsSNP(read, ref, originalRead);
final int[] isInsertion = calculateIsIndel(read, EventType.BASE_INSERTION);
final int[] isDeletion = calculateIsIndel(read, EventType.BASE_DELETION);
- final byte[] baqArray = calculateBAQArray(read);
+ final int nErrors = nEvents(isSNP, isInsertion, isDeletion);
+
+ // note for efficiency regions we don't compute the BAQ array unless we actually have
+ // some error to marginalize over. For ILMN data ~85% of reads have no error
+ final byte[] baqArray = nErrors == 0 ? flatBAQArray(read) : calculateBAQArray(read);
if( baqArray != null ) { // some reads just can't be BAQ'ed
+ final ReadCovariates covariates = RecalUtils.computeCovariates(read, requestedCovariates);
+ final boolean[] skip = calculateSkipArray(read, metaDataTracker); // skip known sites of variation as well as low quality and non-regular bases
final double[] snpErrors = calculateFractionalErrorArray(isSNP, baqArray);
final double[] insertionErrors = calculateFractionalErrorArray(isInsertion, baqArray);
final double[] deletionErrors = calculateFractionalErrorArray(isDeletion, baqArray);
- recalibrationEngine.updateDataForRead(read, skip, snpErrors, insertionErrors, deletionErrors);
+
+ // aggregate all of the info into our info object, and update the data
+ final ReadRecalibrationInfo info = new ReadRecalibrationInfo(read, covariates, skip, snpErrors, insertionErrors, deletionErrors);
+ recalibrationEngine.updateDataForRead(info);
return 1L;
} else {
return 0L;
}
}
+ /**
+ * Compute the number of mutational events across all hasEvent vectors
+ *
+ * Simply the sum of entries in hasEvents
+ *
+ * @param hasEvents a vector a vectors of 0 (no event) and 1 (has event)
+ * @return the total number of events across all hasEvent arrays
+ */
+ private int nEvents(final int[]... hasEvents) {
+ int n = 0;
+ for ( final int[] hasEvent : hasEvents ) {
+ n += MathUtils.sum(hasEvent);
+ }
+ return n;
+ }
+
protected boolean[] calculateSkipArray( final GATKSAMRecord read, final RefMetaDataTracker metaDataTracker ) {
final byte[] bases = read.getReadBases();
final boolean[] skip = new boolean[bases.length];
@@ -371,7 +396,6 @@ public class BaseRecalibrator extends ReadWalker implements NanoSche
throw new ReviewedStingException("Array length mismatch detected. Malformed read?");
}
- final byte NO_BAQ_UNCERTAINTY = (byte)'@';
final int BLOCK_START_UNSET = -1;
final double[] fractionalErrors = new double[baqArray.length];
@@ -415,8 +439,24 @@ public class BaseRecalibrator extends ReadWalker implements NanoSche
}
}
+ /**
+ * Create a BAQ style array that indicates no alignment uncertainty
+ * @param read the read for which we want a BAQ array
+ * @return a BAQ-style non-null byte[] counting NO_BAQ_UNCERTAINTY values
+ * // TODO -- could be optimized avoiding this function entirely by using this inline if the calculation code above
+ */
+ private byte[] flatBAQArray(final GATKSAMRecord read) {
+ final byte[] baq = new byte[read.getReadLength()];
+ Arrays.fill(baq, NO_BAQ_UNCERTAINTY);
+ return baq;
+ }
+
+ /**
+ * Compute an actual BAQ array for read, based on its quals and the reference sequence
+ * @param read the read to BAQ
+ * @return a non-null BAQ tag array for read
+ */
private byte[] calculateBAQArray( final GATKSAMRecord read ) {
- // todo -- it would be good to directly use the BAQ qualities rather than encoding and decoding the result and using the special @ value
baq.baqRead(read, referenceReader, BAQ.CalculationMode.RECALCULATE, BAQ.QualityMode.ADD_TAG);
return BAQ.getBAQTag(read);
}
diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/ReadRecalibrationInfo.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/ReadRecalibrationInfo.java
new file mode 100644
index 000000000..121e3449b
--- /dev/null
+++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/ReadRecalibrationInfo.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.gatk.walkers.bqsr;
+
+import com.google.java.contract.Ensures;
+import com.google.java.contract.Requires;
+import org.broadinstitute.sting.utils.QualityUtils;
+import org.broadinstitute.sting.utils.recalibration.EventType;
+import org.broadinstitute.sting.utils.recalibration.ReadCovariates;
+import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: depristo
+ * Date: 12/18/12
+ * Time: 3:50 PM
+ *
+ * TODO -- merge in ReadCovariates?
+ */
+public final class ReadRecalibrationInfo {
+ private final GATKSAMRecord read;
+ private final int length;
+ private final ReadCovariates covariates;
+ private final boolean[] skips;
+ private final byte[] baseQuals, insertionQuals, deletionQuals;
+ private final double[] snpErrors, insertionErrors, deletionErrors;
+
+ public ReadRecalibrationInfo(final GATKSAMRecord read,
+ final ReadCovariates covariates,
+ final boolean[] skips,
+ final double[] snpErrors,
+ final double[] insertionErrors,
+ final double[] deletionErrors) {
+ if ( read == null ) throw new IllegalArgumentException("read cannot be null");
+ if ( covariates == null ) throw new IllegalArgumentException("covariates cannot be null");
+ if ( skips == null ) throw new IllegalArgumentException("skips cannot be null");
+ if ( snpErrors == null ) throw new IllegalArgumentException("snpErrors cannot be null");
+ // future: may allow insertionErrors && deletionErrors to be null, so don't enforce
+
+ this.read = read;
+ this.baseQuals = read.getBaseQualities();
+ this.length = baseQuals.length;
+ this.covariates = covariates;
+ this.skips = skips;
+ this.insertionQuals = read.getExistingBaseInsertionQualities();
+ this.deletionQuals = read.getExistingBaseDeletionQualities();
+ this.snpErrors = snpErrors;
+ this.insertionErrors = insertionErrors;
+ this.deletionErrors = deletionErrors;
+
+ if ( skips.length != length ) throw new IllegalArgumentException("skips.length " + snpErrors.length + " != length " + length);
+ if ( snpErrors.length != length ) throw new IllegalArgumentException("snpErrors.length " + snpErrors.length + " != length " + length);
+ if ( insertionErrors != null && insertionErrors.length != length ) throw new IllegalArgumentException("insertionErrors.length " + snpErrors.length + " != length " + length);
+ if ( deletionErrors != null && deletionErrors.length != length ) throw new IllegalArgumentException("deletionErrors.length " + snpErrors.length + " != length " + length);
+ }
+
+ /**
+ * Get the qual score for event type at offset
+ *
+ * @param eventType the type of event we want the qual for
+ * @param offset the offset into this read for the qual
+ * @return a valid quality score for event at offset
+ */
+ @Requires("validOffset(offset)")
+ @Ensures("validQual(result)")
+ public byte getQual(final EventType eventType, final int offset) {
+ switch ( eventType ) {
+ case BASE_SUBSTITUTION: return baseQuals[offset];
+ // note optimization here -- if we don't have ins/del quals we just return the default byte directly
+ case BASE_INSERTION: return insertionQuals == null ? GATKSAMRecord.DEFAULT_INSERTION_DELETION_QUAL : insertionQuals[offset];
+ case BASE_DELETION: return deletionQuals == null ? GATKSAMRecord.DEFAULT_INSERTION_DELETION_QUAL : deletionQuals[offset];
+ default: throw new IllegalStateException("Unknown event type " + eventType);
+ }
+ }
+
+ /**
+ * Get the error fraction for event type at offset
+ *
+ * The error fraction is a value between 0 and 1 that indicates how much certainty we have
+ * in the error occurring at offset. A value of 1 means that the error definitely occurs at this
+ * site, a value of 0.0 means it definitely doesn't happen here. 0.5 means that half the weight
+ * of the error belongs here
+ *
+ * @param eventType the type of event we want the qual for
+ * @param offset the offset into this read for the qual
+ * @return a fractional weight for an error at this offset
+ */
+ @Requires("validOffset(offset)")
+ @Ensures("result >= 0.0 && result <= 1.0")
+ public double getErrorFraction(final EventType eventType, final int offset) {
+ switch ( eventType ) {
+ case BASE_SUBSTITUTION: return snpErrors[offset];
+ case BASE_INSERTION: return insertionErrors[offset];
+ case BASE_DELETION: return deletionErrors[offset];
+ default: throw new IllegalStateException("Unknown event type " + eventType);
+ }
+ }
+
+ /**
+ * Get the read involved in this recalibration info
+ * @return a non-null GATKSAMRecord
+ */
+ @Ensures("result != null")
+ public GATKSAMRecord getRead() {
+ return read;
+ }
+
+ /**
+ * Should offset in this read be skipped (because it's covered by a known variation site?)
+ * @param offset a valid offset into this info
+ * @return true if offset should be skipped, false otherwise
+ */
+ @Requires("validOffset(offset)")
+ public boolean skip(final int offset) {
+ return skips[offset];
+ }
+
+ /**
+ * Get the ReadCovariates object carrying the mapping from offsets -> covariate key sets
+ * @return a non-null ReadCovariates object
+ */
+ @Ensures("result != null")
+ public ReadCovariates getCovariatesValues() {
+ return covariates;
+ }
+
+ /**
+ * Ensures an offset is valid. Used in contracts
+ * @param offset a proposed offset
+ * @return true if offset is valid w.r.t. the data in this object, false otherwise
+ */
+ private boolean validOffset(final int offset) {
+ return offset >= 0 && offset < baseQuals.length;
+ }
+
+ private boolean validQual(final byte result) {
+ return result >= 0 && result <= QualityUtils.MAX_QUAL_SCORE;
+ }
+}
diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/RecalibrationEngine.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/RecalibrationEngine.java
index 35375eb1d..5c002b7e5 100644
--- a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/RecalibrationEngine.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/RecalibrationEngine.java
@@ -1,5 +1,6 @@
package org.broadinstitute.sting.gatk.walkers.bqsr;
+import com.google.java.contract.Requires;
import org.broadinstitute.sting.utils.recalibration.RecalibrationTables;
import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
@@ -29,10 +30,31 @@ import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
* OTHER DEALINGS IN THE SOFTWARE.
*/
public interface RecalibrationEngine {
-
+ /**
+ * Initialize the recalibration engine
+ *
+ * Called once before any calls to updateDataForRead are made. The engine should prepare itself
+ * to handle any number of updateDataForRead calls containing ReadRecalibrationInfo containing
+ * keys for each of the covariates provided.
+ *
+ * The engine should collect match and mismatch data into the recalibrationTables data.
+ *
+ * @param covariates an array of the covariates we'll be using in this engine, order matters
+ * @param recalibrationTables the destination recalibrationTables where stats should be collected
+ */
public void initialize(final Covariate[] covariates, final RecalibrationTables recalibrationTables);
- public void updateDataForRead(final GATKSAMRecord read, final boolean[] skip, final double[] snpErrors, final double[] insertionErrors, final double[] deletionErrors);
+ /**
+ * Update the recalibration statistics using the information in recalInfo
+ * @param recalInfo data structure holding information about the recalibration values for a single read
+ */
+ @Requires("recalInfo != null")
+ public void updateDataForRead(final ReadRecalibrationInfo recalInfo);
+ /**
+ * Finalize, if appropriate, all derived data in recalibrationTables.
+ *
+ * Called once after all calls to updateDataForRead have been issued.
+ */
public void finalizeData();
}
diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/StandardRecalibrationEngine.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/StandardRecalibrationEngine.java
index 1e166dfd0..a6ab98e8b 100644
--- a/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/StandardRecalibrationEngine.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/bqsr/StandardRecalibrationEngine.java
@@ -35,34 +35,37 @@ import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
public class StandardRecalibrationEngine implements RecalibrationEngine, PublicPackageSource {
-
protected Covariate[] covariates;
protected RecalibrationTables recalibrationTables;
+ @Override
public void initialize(final Covariate[] covariates, final RecalibrationTables recalibrationTables) {
+ if ( covariates == null ) throw new IllegalArgumentException("Covariates cannot be null");
+ if ( recalibrationTables == null ) throw new IllegalArgumentException("recalibrationTables cannot be null");
+
this.covariates = covariates.clone();
this.recalibrationTables = recalibrationTables;
}
@Override
- public void updateDataForRead( final GATKSAMRecord read, final boolean[] skip, final double[] snpErrors, final double[] insertionErrors, final double[] deletionErrors ) {
+ public void updateDataForRead( final ReadRecalibrationInfo recalInfo ) {
+ final GATKSAMRecord read = recalInfo.getRead();
+ final EventType eventType = EventType.BASE_SUBSTITUTION;
+ final ReadCovariates readCovariates = recalInfo.getCovariatesValues();
+
for( int offset = 0; offset < read.getReadBases().length; offset++ ) {
- if( !skip[offset] ) {
- final ReadCovariates readCovariates = covariateKeySetFrom(read);
+ if( ! recalInfo.skip(offset) ) {
+ final byte qual = recalInfo.getQual(eventType, offset);
+ final double isError = recalInfo.getErrorFraction(eventType, offset);
+ final int[] keys = readCovariates.getKeySet(offset, eventType);
- final byte qual = read.getBaseQualities()[offset];
- final double isError = snpErrors[offset];
-
- final int[] keys = readCovariates.getKeySet(offset, EventType.BASE_SUBSTITUTION);
- final int eventIndex = EventType.BASE_SUBSTITUTION.index;
-
- incrementDatumOrPutIfNecessary(recalibrationTables.getQualityScoreTable(), qual, isError, keys[0], keys[1], eventIndex);
+ incrementDatumOrPutIfNecessary(recalibrationTables.getQualityScoreTable(), qual, isError, keys[0], keys[1], eventType.index);
for (int i = 2; i < covariates.length; i++) {
if (keys[i] < 0)
continue;
- incrementDatumOrPutIfNecessary(recalibrationTables.getTable(i), qual, isError, keys[0], keys[1], keys[i], eventIndex);
+ incrementDatumOrPutIfNecessary(recalibrationTables.getTable(i), qual, isError, keys[0], keys[1], keys[i], eventType.index);
}
}
}
@@ -79,16 +82,6 @@ public class StandardRecalibrationEngine implements RecalibrationEngine, PublicP
return new RecalDatum(1, isError, reportedQual);
}
- /**
- * Get the covariate key set from a read
- *
- * @param read the read
- * @return the covariate keysets for this read
- */
- protected ReadCovariates covariateKeySetFrom(GATKSAMRecord read) {
- return (ReadCovariates) read.getTemporaryAttribute(BaseRecalibrator.COVARS_ATTRIBUTE);
- }
-
/**
* Create derived recalibration data tables
*
@@ -129,7 +122,10 @@ public class StandardRecalibrationEngine implements RecalibrationEngine, PublicP
* @param isError error value for this event
* @param keys location in table of our item
*/
- protected void incrementDatumOrPutIfNecessary( final NestedIntegerArray table, final byte qual, final double isError, final int... keys ) {
+ protected void incrementDatumOrPutIfNecessary( final NestedIntegerArray table,
+ final byte qual,
+ final double isError,
+ final int... keys ) {
final RecalDatum existingDatum = table.get(keys);
if ( existingDatum == null ) {
diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/HaplotypeIndelErrorModel.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/HaplotypeIndelErrorModel.java
index 795fc76ed..3c1bc338a 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/HaplotypeIndelErrorModel.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/HaplotypeIndelErrorModel.java
@@ -30,6 +30,7 @@ import org.broadinstitute.sting.utils.Haplotype;
import org.broadinstitute.sting.utils.MathUtils;
import org.broadinstitute.sting.utils.QualityUtils;
import org.broadinstitute.sting.utils.pileup.ReadBackedPileup;
+import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import org.broadinstitute.sting.utils.sam.ReadUtils;
import org.broadinstitute.variant.variantcontext.Allele;
@@ -421,7 +422,7 @@ public class HaplotypeIndelErrorModel {
double[][] haplotypeLikehoodMatrix = new double[haplotypesInVC.size()][haplotypesInVC.size()];
double readLikelihoods[][] = new double[pileup.getReads().size()][haplotypesInVC.size()];
int i=0;
- for (SAMRecord read : pileup.getReads()) {
+ for (GATKSAMRecord read : pileup.getReads()) {
if(ReadUtils.is454Read(read)) {
continue;
}
diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/IndelRealigner.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/IndelRealigner.java
index 5d19ac5e8..15d3f43fd 100755
--- a/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/IndelRealigner.java
+++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/indels/IndelRealigner.java
@@ -529,7 +529,7 @@ public class IndelRealigner extends ReadWalker {
sawReadInCurrentInterval = false;
}
- private boolean doNotTryToClean(SAMRecord read) {
+ private boolean doNotTryToClean(GATKSAMRecord read) {
return read.getReadUnmappedFlag() ||
read.getNotPrimaryAlignmentFlag() ||
read.getReadFailsVendorQualityCheckFlag() ||
@@ -835,7 +835,7 @@ public class IndelRealigner extends ReadWalker {
// TODO -- get rid of this try block when Picard does the right thing for reads aligned off the end of the reference
try {
if ( read.getAttribute(SAMTag.NM.name()) != null )
- read.setAttribute(SAMTag.NM.name(), SequenceUtil.calculateSamNmTag(read, reference, leftmostIndex-1));
+ read.setAttribute(SAMTag.NM.name(), SequenceUtil.calculateSamNmTag(read, reference, leftmostIndex - 1));
if ( read.getAttribute(SAMTag.UQ.name()) != null )
read.setAttribute(SAMTag.UQ.name(), SequenceUtil.sumQualitiesOfMismatches(read, reference, leftmostIndex-1));
} catch (Exception e) {
diff --git a/public/java/src/org/broadinstitute/sting/utils/NGSPlatform.java b/public/java/src/org/broadinstitute/sting/utils/NGSPlatform.java
index 504704e55..847d8067c 100644
--- a/public/java/src/org/broadinstitute/sting/utils/NGSPlatform.java
+++ b/public/java/src/org/broadinstitute/sting/utils/NGSPlatform.java
@@ -24,8 +24,7 @@
package org.broadinstitute.sting.utils;
-import net.sf.samtools.SAMReadGroupRecord;
-import net.sf.samtools.SAMRecord;
+import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
/**
* A canonical, master list of the standard NGS platforms. These values
@@ -64,25 +63,15 @@ public enum NGSPlatform {
}
/**
- * Convenience constructor -- calculates the NGSPlatfrom from a SAMRecord.
- * Note you should not use this function if you have a GATKSAMRecord -- use the
- * accessor method instead.
+ * Convenience get -- get the NGSPlatfrom from a SAMRecord.
*
- * @param read
+ * Just gets the platform from the GATKReadGroupRecord associated with this read.
+ *
+ * @param read a GATKSAMRecord
* @return an NGSPlatform object matching the PL field of the header, of UNKNOWN if there was no match
*/
- public static final NGSPlatform fromRead(SAMRecord read) {
- return fromReadGroup(read.getReadGroup());
- }
-
- /**
- * Returns the NGSPlatform corresponding to the PL tag in the read group
- * @param rg
- * @return an NGSPlatform object matching the PL field of the header, of UNKNOWN if there was no match
- */
- public static final NGSPlatform fromReadGroup(SAMReadGroupRecord rg) {
- if ( rg == null ) return UNKNOWN;
- return fromReadGroupPL(rg.getPlatform());
+ public static NGSPlatform fromRead(GATKSAMRecord read) {
+ return read.getReadGroup().getNGSPlatform();
}
/**
@@ -90,7 +79,7 @@ public enum NGSPlatform {
* @param plFromRG -- the PL field (or equivalent) in a ReadGroup object
* @return an NGSPlatform object matching the PL field of the header, or UNKNOWN if there was no match
*/
- public static final NGSPlatform fromReadGroupPL(final String plFromRG) {
+ public static NGSPlatform fromReadGroupPL(final String plFromRG) {
if ( plFromRG == null ) return UNKNOWN;
// todo -- algorithm could be implemented more efficiently, as the list of all
@@ -113,7 +102,7 @@ public enum NGSPlatform {
* @param platform the read group string that describes the platform used
* @return true if the platform is known (i.e. it's in the list and is not UNKNOWN)
*/
- public static final boolean isKnown (final String platform) {
+ public static final boolean isKnown(final String platform) {
return fromReadGroupPL(platform) != UNKNOWN;
}
}
diff --git a/public/java/src/org/broadinstitute/sting/utils/collections/ExpandingArrayList.java b/public/java/src/org/broadinstitute/sting/utils/collections/ExpandingArrayList.java
index 04ef8ece3..abd4eeaba 100755
--- a/public/java/src/org/broadinstitute/sting/utils/collections/ExpandingArrayList.java
+++ b/public/java/src/org/broadinstitute/sting/utils/collections/ExpandingArrayList.java
@@ -54,6 +54,7 @@ public class ExpandingArrayList extends ArrayList {
private void maybeExpand(int index, E value) {
if ( index >= size() ) {
+ ensureCapacity(index+1); // make sure we have space to hold at least index + 1 elements
// We need to add null items until we can safely set index to element
for ( int i = size(); i <= index; i++ )
add(value);
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java
index d0ad51cb0..464ebfcd5 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/EOFMarkedValue.java
@@ -1,3 +1,28 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.utils.nanoScheduler;
/**
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java
index 84bb8d45f..0ccb2b8cc 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java
@@ -1,3 +1,28 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.utils.nanoScheduler;
import org.apache.log4j.Logger;
@@ -37,12 +62,6 @@ class InputProducer {
int nRead = 0;
int inputID = -1;
- /**
- * A latch used to block threads that want to start up only when all of the values
- * in inputReader have been read by the thread executing run()
- */
- final CountDownLatch latch = new CountDownLatch(1);
-
public InputProducer(final Iterator inputReader) {
if ( inputReader == null ) throw new IllegalArgumentException("inputReader cannot be null");
this.inputReader = inputReader;
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java
index 83d671560..fd23b11d8 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResult.java
@@ -1,3 +1,28 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.utils.nanoScheduler;
/**
@@ -25,14 +50,6 @@ class MapResult extends EOFMarkedValue implements Comparable= 0");
}
- /**
- * Create the EOF marker version of MapResult
- */
- MapResult() {
- super();
- this.jobID = Integer.MAX_VALUE;
- }
-
/**
* @return the job ID of the map job that produced this MapResult
*/
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResultsQueue.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResultsQueue.java
new file mode 100644
index 000000000..ef74d669d
--- /dev/null
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/MapResultsQueue.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.utils.nanoScheduler;
+
+import org.broadinstitute.sting.utils.collections.ExpandingArrayList;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: depristo
+ * Date: 12/19/12
+ * Time: 3:53 PM
+ *
+ * This class makes some critical assumptions. First is that the jobID of the first
+ * job is 0. If this isn't true the MapResultsQueue will certainly fail.
+ */
+public class MapResultsQueue {
+ //private final static boolean DEBUG = false;
+ //private final static Logger logger = Logger.getLogger(MapResultsQueue.class);
+
+ /**
+ * Although naturally stored as priority blocking queue, this is actually quite expensive
+ * due to the O(n log n) sorting calculation. Since we know that the job ids start
+ * at 0 and increment by 1 in each successive job, we store an array instead. The
+ * array is indexed by jobID, and contains the MapResult for that job id. Because elements
+ * can be added to the queue in any order, we need to use an expanding array list to
+ * store the elements.
+ */
+ final ExpandingArrayList> queue = new ExpandingArrayList>(10000);
+
+ /**
+ * The jobID of the last job we've seen
+ */
+ int prevJobID = -1; // no jobs observed
+
+ /**
+ * Put mapResult into this MapResultsQueue, associated with its jobID
+ * @param mapResult a non-null map result
+ */
+ public synchronized void put(final MapResult mapResult) {
+ if ( mapResult == null ) throw new IllegalArgumentException("mapResult cannot be null");
+
+ // make sure that nothing is at the job id for map
+ assert queue.size() < mapResult.getJobID() || queue.get(mapResult.getJobID()) == null;
+
+ queue.set(mapResult.getJobID(), mapResult);
+ }
+
+ /**
+ * Should we reduce the next value in the mapResultQueue?
+ *
+ * @return true if we should reduce
+ */
+ public synchronized boolean nextValueIsAvailable() {
+ final MapResult nextMapResult = queue.get(nextJobID());
+
+ if ( nextMapResult == null ) {
+ // natural case -- the next job hasn't had a value added yet
+ return false;
+ } else if ( nextMapResult.getJobID() != nextJobID() ) {
+ // sanity check -- the job id at next isn't the one we expect
+ throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is not == previous job id " + prevJobID + " + 1");
+ } else {
+ // there's a value at the next job id, so return true
+ return true;
+ }
+ }
+
+ /**
+ * Get the next job ID'd be expect to see given our previous job id
+ * @return the next job id we'd fetch to reduce
+ */
+ private int nextJobID() {
+ return prevJobID + 1;
+ }
+
+ /**
+ * Can only be called when nextValueIsAvailable is true
+ * @return
+ * @throws InterruptedException
+ */
+ // TODO -- does this have to be synchronized? -- I think the answer is no
+ public synchronized MapResult take() throws InterruptedException {
+ final MapResult result = queue.get(nextJobID());
+
+ // make sure the value we've fetched has the right id
+ assert result.getJobID() == nextJobID();
+
+ prevJobID = result.getJobID();
+ queue.set(prevJobID, null);
+
+ return result;
+ }
+}
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java
index cc5335051..1311126d0 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSMapFunction.java
@@ -1,3 +1,28 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.utils.nanoScheduler;
/**
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java
index 8b12c62c4..785a7f4fd 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSProgressFunction.java
@@ -1,3 +1,28 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.utils.nanoScheduler;
/**
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java
index 879a33a1d..8191b16c9 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NSReduceFunction.java
@@ -1,3 +1,28 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.utils.nanoScheduler;
/**
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java
index 38a1d7b8f..c3854eef2 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java
@@ -1,3 +1,28 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
@@ -43,13 +68,20 @@ import java.util.concurrent.*;
public class NanoScheduler {
private final static Logger logger = Logger.getLogger(NanoScheduler.class);
private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true;
- private final static boolean LOG_MAP_TIMES = false;
+ protected final static int UPDATE_PROGRESS_FREQ = 100;
+ /**
+ * Currently not used, but kept because it's conceptual reasonable to have a buffer
+ */
final int bufferSize;
+
+ /**
+ * The number of threads we're using to execute the map jobs in this nano scheduler
+ */
final int nThreads;
+
final ExecutorService masterExecutor;
final ExecutorService mapExecutor;
- final Semaphore runningMapJobSlots;
final MultiThreadedErrorTracker errorTracker = new MultiThreadedErrorTracker();
boolean shutdown = false;
@@ -75,11 +107,9 @@ public class NanoScheduler {
if ( nThreads == 1 ) {
this.mapExecutor = this.masterExecutor = null;
- runningMapJobSlots = null;
} else {
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d"));
this.mapExecutor = Executors.newFixedThreadPool(nThreads, new NamedThreadFactory("NS-map-thread-%d"));
- runningMapJobSlots = new Semaphore(this.bufferSize);
}
}
@@ -243,8 +273,7 @@ public class NanoScheduler {
// map
final MapType mapValue = map.apply(input);
- if ( progressFunction != null )
- progressFunction.progress(input);
+ updateProgress(i++, input);
// reduce
sum = reduce.apply(mapValue, sum);
@@ -254,6 +283,16 @@ public class NanoScheduler {
return sum;
}
+ /**
+ * Maybe update the progress meter (maybe because we don't want to do so so often that it costs cpu time)
+ * @param counter increasing counter to use to cut down on updates
+ * @param input the input we're currently at
+ */
+ private void updateProgress(final int counter, final InputType input) {
+ if ( progressFunction != null && counter % UPDATE_PROGRESS_FREQ == 0 )
+ progressFunction.progress(input);
+ }
+
/**
* Efficient parallel version of Map/Reduce
*
@@ -349,33 +388,23 @@ public class NanoScheduler {
// Create the input producer and start it running
final InputProducer inputProducer = new InputProducer(inputReader);
- // a priority queue that stores up to bufferSize elements
- // produced by completed map jobs.
- final PriorityBlockingQueue> mapResultQueue =
- new PriorityBlockingQueue>();
+ // create the MapResultsQueue to store results of map jobs.
+ final MapResultsQueue mapResultQueue = new MapResultsQueue();
- final Reducer reducer
- = new Reducer(reduce, errorTracker, initialValue);
+ // create the reducer we'll use for this nano scheduling run
+ final Reducer reducer = new Reducer(reduce, errorTracker, initialValue);
+
+ final CountDownLatch runningMapJobs = new CountDownLatch(nThreads);
try {
- int nSubmittedJobs = 0;
-
- while ( continueToSubmitJobs(nSubmittedJobs, inputProducer) ) {
- // acquire a slot to run a map job. Blocks if too many jobs are enqueued
- runningMapJobSlots.acquire();
-
- mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, map, reducer));
- nSubmittedJobs++;
+ // create and submit the info needed by the read/map/reduce threads to do their work
+ for ( int i = 0; i < nThreads; i++ ) {
+ mapExecutor.submit(new ReadMapReduceJob(inputProducer, mapResultQueue, runningMapJobs, map, reducer));
}
- // mark the last job id we've submitted so we now the id to wait for
- //logger.warn("setting jobs submitted to " + nSubmittedJobs);
- reducer.setTotalJobCount(nSubmittedJobs);
-
// wait for all of the input and map threads to finish
- return waitForCompletion(inputProducer, reducer);
+ return waitForCompletion(mapResultQueue, runningMapJobs, reducer);
} catch (Throwable ex) {
-// logger.warn("Reduce job got exception " + ex);
errorTracker.notifyOfError(ex);
return initialValue;
}
@@ -384,52 +413,40 @@ public class NanoScheduler {
/**
* Wait until the input thread and all map threads have completed running, and return the final reduce result
*/
- private ReduceType waitForCompletion(final InputProducer inputProducer,
+ private ReduceType waitForCompletion(final MapResultsQueue mapResultsQueue,
+ final CountDownLatch runningMapJobs,
final Reducer reducer) throws InterruptedException {
- // wait until we have a final reduce result
-// logger.warn("waiting for final reduce");
- final ReduceType finalSum = reducer.waitForFinalReduce();
+ // wait for all the map threads to finish by waiting on the runningMapJobs latch
+ runningMapJobs.await();
- // wait for all the map threads to finish by acquiring and then releasing all map job semaphores
-// logger.warn("waiting on map");
- runningMapJobSlots.acquire(bufferSize);
- runningMapJobSlots.release(bufferSize);
+ // do a final reduce here. This is critically important because the InputMapReduce jobs
+ // no longer block on reducing, so it's possible for all the threads to end with a few
+ // reduce jobs on the queue still to do. This call ensures that we reduce everything
+ reducer.reduceAsMuchAsPossible(mapResultsQueue, true);
+
+ // wait until we have a final reduce result
+ final ReduceType finalSum = reducer.getReduceResult();
// everything is finally shutdown, return the final reduce value
return finalSum;
}
-
- /**
- * Should we continue to submit jobs given the number of jobs already submitted and the
- * number of read items in inputProducer?
- *
- * We continue to submit jobs while inputProducer hasn't reached EOF or the number
- * of jobs we've enqueued isn't the number of read elements. This means that in
- * some cases we submit more jobs than total read elements (cannot know because of
- * multi-threading) so map jobs must handle the case where getNext() returns EOF.
- *
- * @param nJobsSubmitted
- * @param inputProducer
- * @return
- */
- private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) {
- final int nReadItems = inputProducer.getNumInputValues();
- return nReadItems == -1 || nJobsSubmitted < nReadItems;
- }
}
private class ReadMapReduceJob implements Runnable {
final InputProducer inputProducer;
- final PriorityBlockingQueue> mapResultQueue;
+ final MapResultsQueue mapResultQueue;
final NSMapFunction map;
final Reducer reducer;
+ final CountDownLatch runningMapJobs;
private ReadMapReduceJob(final InputProducer inputProducer,
- final PriorityBlockingQueue> mapResultQueue,
+ final MapResultsQueue mapResultQueue,
+ final CountDownLatch runningMapJobs,
final NSMapFunction map,
final Reducer reducer) {
this.inputProducer = inputProducer;
this.mapResultQueue = mapResultQueue;
+ this.runningMapJobs = runningMapJobs;
this.map = map;
this.reducer = reducer;
}
@@ -437,39 +454,41 @@ public class NanoScheduler {
@Override
public void run() {
try {
- // get the next item from the input producer
- final InputProducer.InputValue inputWrapper = inputProducer.next();
+ boolean done = false;
+ while ( ! done ) {
+ // get the next item from the input producer
+ final InputProducer.InputValue inputWrapper = inputProducer.next();
- // depending on inputWrapper, actually do some work or not, putting result input result object
- final MapResult result;
- if ( ! inputWrapper.isEOFMarker() ) {
- // just skip doing anything if we don't have work to do, which is possible
- // because we don't necessarily know how much input there is when we queue
- // up our jobs
- final InputType input = inputWrapper.getValue();
+ // depending on inputWrapper, actually do some work or not, putting result input result object
+ final MapResult result;
+ if ( ! inputWrapper.isEOFMarker() ) {
+ // just skip doing anything if we don't have work to do, which is possible
+ // because we don't necessarily know how much input there is when we queue
+ // up our jobs
+ final InputType input = inputWrapper.getValue();
- // map
- final MapType mapValue = map.apply(input);
+ // actually execute the map
+ final MapType mapValue = map.apply(input);
- // enqueue the result into the mapResultQueue
- result = new MapResult(mapValue, inputWrapper.getId());
+ // enqueue the result into the mapResultQueue
+ result = new MapResult(mapValue, inputWrapper.getId());
- if ( progressFunction != null )
- progressFunction.progress(input);
- } else {
- // if there's no input we push empty MapResults with jobIDs for synchronization with Reducer
- result = new MapResult(inputWrapper.getId());
+ mapResultQueue.put(result);
+
+ // reduce as much as possible, without blocking, if another thread is already doing reduces
+ final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue, false);
+
+ updateProgress(inputWrapper.getId(), input);
+ } else {
+ done = true;
+ }
}
-
- mapResultQueue.put(result);
-
- final int nReduced = reducer.reduceAsMuchAsPossible(mapResultQueue);
} catch (Throwable ex) {
errorTracker.notifyOfError(ex);
} finally {
// we finished a map job, release the job queue semaphore
- runningMapJobSlots.release();
+ runningMapJobs.countDown();
}
}
}
-}
+}
\ No newline at end of file
diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java
index a7b94e323..294065838 100644
--- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java
+++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/Reducer.java
@@ -1,39 +1,68 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
package org.broadinstitute.sting.utils.nanoScheduler;
import com.google.java.contract.Ensures;
-import com.google.java.contract.Requires;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
- * Reducer supporting two-threaded reduce of the map/reduce.
+ * Reducer supporting multi-threaded reduce of the map/reduce.
*
- * The first thread, using the reduceAsMuchAsPossible function, actually reduces the data
- * as it arrives in the blockingQueue.
+ * reduceAsMuchAsPossible is the key function. Multiple threads can call into this, providing
+ * the map results queue, and this class accumulates the result of calling reduce
+ * on the maps objects. reduceAsMuchAsPossible isn't directly synchronized, but manages multi-threading
+ * directly with a lock. Threads can request either to block on the reduce call until it can be
+ * executed, or immediately exit if the lock isn't available. That allows multi-threaded users
+ * to avoid piling up waiting to reduce while one thread is reducing. They can instead immediately
+ * leave to go do something else productive
*
- * The second thread, using the waitForFinalReduce, can block on this data structure
- * until that all jobs have arrived and been reduced.
- *
- * The key function for communication here is setTotalJobCount(), which the thread that submits
- * jobs that enqueue MapResults into the blocking queue must call ONCE to tell the
- * Reducer the total number of jobs that have been submitted for map. When numOfSubmittedJobs
- * have been processed, this class frees a latch that allows thread blocked on waitForFinalReduce to proceed.
- *
- * This thread reads from mapResultsQueue until the poison EOF object arrives. At each
- * stage is calls reduce(value, sum). The blocking mapResultQueue ensures that the
- * queue waits until the mapResultQueue has a value to take. Then, it gets and waits
- * until the map result Future has a value.
+ * @author depristo
+ * @since 2012
*/
class Reducer {
private final static Logger logger = Logger.getLogger(Reducer.class);
- private final static int UNSET_NUM_SUBMITTED_JOBS = -2;
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- final NSReduceFunction reduce;
- final MultiThreadedErrorTracker errorTracker;
+ /**
+ * The reduce function to execute
+ */
+ private final NSReduceFunction reduce;
+
+ /**
+ * Used to communicate errors to the outer master thread
+ */
+ private final MultiThreadedErrorTracker errorTracker;
+
+ /**
+ * Lock used to protect the call reduceAsMuchAsPossible from race conditions
+ */
+ private final Lock reduceLock = new ReentrantLock();
/**
* The sum of the reduce function applied to all MapResults. After this Reducer
@@ -41,18 +70,6 @@ class Reducer {
*/
ReduceType sum;
- int numSubmittedJobs = UNSET_NUM_SUBMITTED_JOBS; // not yet set
-
- /**
- * The jobID of the last job we've seen
- */
- int prevJobID = -1; // no jobs observed
-
- /**
- * A counter keeping track of the number of jobs we're reduced
- */
- int numJobsReduced = 0;
-
/**
* Create a new Reducer that will apply the reduce function with initialSum value
* to values via reduceAsMuchAsPossible, timing the reduce function call costs with
@@ -72,26 +89,6 @@ class Reducer {
this.sum = initialSum;
}
- /**
- * Should we reduce the next value in the mapResultQueue?
- *
- * @param mapResultQueue the queue of map results
- * @return true if we should reduce
- */
- @Requires("mapResultQueue != null")
- private synchronized boolean reduceNextValueInQueue(final PriorityBlockingQueue> mapResultQueue) {
- final MapResult nextMapResult = mapResultQueue.peek();
- if ( nextMapResult == null ) {
- return false;
- } else if ( nextMapResult.getJobID() < prevJobID + 1 ) {
- throw new IllegalStateException("Next job ID " + nextMapResult.getJobID() + " is not < previous job id " + prevJobID);
- } else if ( nextMapResult.getJobID() == prevJobID + 1 ) {
- return true;
- } else {
- return false;
- }
- }
-
/**
* Reduce as much data as possible in mapResultQueue, returning the number of reduce calls completed
*
@@ -104,97 +101,69 @@ class Reducer {
* @throws InterruptedException
*/
@Ensures("result >= 0")
- public synchronized int reduceAsMuchAsPossible(final PriorityBlockingQueue> mapResultQueue) {
+ public int reduceAsMuchAsPossible(final MapResultsQueue mapResultQueue, final boolean waitForLock) {
if ( mapResultQueue == null ) throw new IllegalArgumentException("mapResultQueue cannot be null");
int nReducesNow = 0;
-// if ( numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS )
-// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
+ final boolean haveLock = acquireReduceLock(waitForLock);
try {
- while ( reduceNextValueInQueue(mapResultQueue) ) {
- final MapResult result = mapResultQueue.take();
- prevJobID = result.getJobID();
+ if ( haveLock ) {
+ while ( mapResultQueue.nextValueIsAvailable() ) {
+ final MapResult result = mapResultQueue.take();
- if ( ! result.isEOFMarker() ) {
- nReducesNow++;
+ if ( ! result.isEOFMarker() ) {
+ nReducesNow++;
- // apply reduce, keeping track of sum
- sum = reduce.apply(result.getValue(), sum);
+ // apply reduce, keeping track of sum
+ sum = reduce.apply(result.getValue(), sum);
+ }
}
-
- numJobsReduced++;
- maybeReleaseLatch();
}
} catch (Exception ex) {
errorTracker.notifyOfError(ex);
- countDownLatch.countDown();
+ } finally {
+ if ( haveLock ) // if we acquired the lock, unlock it
+ releaseReduceLock();
}
-// if ( numSubmittedJobs == UNSET_NUM_SUBMITTED_JOBS )
-// logger.warn(" maybeReleaseLatch " + numJobsReduced + " numSubmittedJobs " + numSubmittedJobs + " queue " + mapResultQueue.size());
return nReducesNow;
}
/**
- * release the latch if appropriate
+ * Acquire the reduce lock, either returning immediately if not possible or blocking until the lock is available
*
- * Appropriate means we've seen the last job, or there's only a single job id
+ * @param blockUntilAvailable if true, we will block until the lock is available, otherwise we return immediately
+ * without acquiring the lock
+ * @return true if the lock has been acquired, false otherwise
*/
- private synchronized void maybeReleaseLatch() {
- if ( numJobsReduced == numSubmittedJobs ) {
- // either we've already seen the last one prevJobID == numSubmittedJobs or
- // the last job ID is -1, meaning that no jobs were ever submitted
- countDownLatch.countDown();
+ protected boolean acquireReduceLock(final boolean blockUntilAvailable) {
+ if ( blockUntilAvailable ) {
+ reduceLock.lock();
+ return true;
+ } else {
+ return reduceLock.tryLock();
}
}
/**
- * For testing only
+ * Free the reduce lock.
*
- * @return true if latch is released
+ * Assumes that the invoking thread actually previously acquired the lock (it's a problem if not).
*/
- protected synchronized boolean latchIsReleased() {
- return countDownLatch.getCount() == 0;
+ protected void releaseReduceLock() {
+ reduceLock.unlock();
}
/**
- * Key function: tell this class the total number of jobs will provide data in the mapResultsQueue
+ * Get the current reduce result resulting from applying reduce(...) to all MapResult elements.
*
- * The total job count when we free threads blocked on waitForFinalReduce. When we see numOfSubmittedJobs
- * MapResults from the queue, those threads are released.
- *
- * Until this function is called, those thread will block forever. The numOfSubmittedJobs has a few constraints.
- * First, it must be >= 0. 0 indicates that in fact no jobs will ever be submitted (i.e., there's no
- * data coming) so the latch should be opened immediately. If it's >= 1, we will wait until
- * we see numOfSubmittedJobs jobs before freeing them.
- *
- * Note that we throw an IllegalStateException if this function is called twice.
- *
- * @param numOfSubmittedJobs int >= 0 indicating the total number of MapResults that will
- * enqueue results into our queue
- */
- public synchronized void setTotalJobCount(final int numOfSubmittedJobs) {
- if ( numOfSubmittedJobs < 0 )
- throw new IllegalArgumentException("numOfSubmittedJobs must be >= 0, but saw " + numOfSubmittedJobs);
- if ( this.numSubmittedJobs != UNSET_NUM_SUBMITTED_JOBS)
- throw new IllegalStateException("setlastJobID called multiple times, but should only be called once");
-
- //logger.warn("setTotalJobCount " + numJobsReduced + " numSubmitted " + numOfSubmittedJobs);
- this.numSubmittedJobs = numOfSubmittedJobs;
- maybeReleaseLatch();
- }
-
- /**
- * Block until the last job has submitted its MapResult to our queue, and we've reduced it, and
- * return the reduce result resulting from applying reduce(...) to all MapResult elements.
+ * Note that this method cannot know if future reduce calls are coming in. So it simply gets
+ * the current reduce result. It is up to the caller to know whether the returned value is
+ * a partial result, or the full final value
*
* @return the total reduce result across all jobs
- * @throws InterruptedException
*/
- public ReduceType waitForFinalReduce() throws InterruptedException {
- //logger.warn("waitForFinalReduce() " + numJobsReduced + " " + numSubmittedJobs);
- countDownLatch.await();
- //logger.warn(" done waitForFinalReduce");
+ public ReduceType getReduceResult() {
return sum;
}
}
diff --git a/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalDatum.java b/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalDatum.java
index 4cacc26c4..1ab3b10c4 100755
--- a/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalDatum.java
+++ b/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalDatum.java
@@ -135,14 +135,6 @@ public class RecalDatum {
this.estimatedQReported = estimatedQReported;
}
- public static RecalDatum createRandomRecalDatum(int maxObservations, int maxErrors) {
- final Random random = new Random();
- final int nObservations = random.nextInt(maxObservations);
- final int nErrors = random.nextInt(maxErrors);
- final int qual = random.nextInt(QualityUtils.MAX_QUAL_SCORE);
- return new RecalDatum(nObservations, nErrors, (byte)qual);
- }
-
public final double getEstimatedQReported() {
return estimatedQReported;
}
@@ -212,49 +204,49 @@ public class RecalDatum {
//
//---------------------------------------------------------------------------------------------------------------
- public double getNumObservations() {
+ public final double getNumObservations() {
return numObservations;
}
- public synchronized void setNumObservations(final double numObservations) {
+ public final synchronized void setNumObservations(final double numObservations) {
if ( numObservations < 0 ) throw new IllegalArgumentException("numObservations < 0");
this.numObservations = numObservations;
empiricalQuality = UNINITIALIZED;
}
- public double getNumMismatches() {
+ public final double getNumMismatches() {
return numMismatches;
}
@Requires({"numMismatches >= 0"})
- public synchronized void setNumMismatches(final double numMismatches) {
+ public final synchronized void setNumMismatches(final double numMismatches) {
if ( numMismatches < 0 ) throw new IllegalArgumentException("numMismatches < 0");
this.numMismatches = numMismatches;
empiricalQuality = UNINITIALIZED;
}
@Requires({"by >= 0"})
- public synchronized void incrementNumObservations(final double by) {
+ public final synchronized void incrementNumObservations(final double by) {
numObservations += by;
empiricalQuality = UNINITIALIZED;
}
@Requires({"by >= 0"})
- public synchronized void incrementNumMismatches(final double by) {
+ public final synchronized void incrementNumMismatches(final double by) {
numMismatches += by;
empiricalQuality = UNINITIALIZED;
}
@Requires({"incObservations >= 0", "incMismatches >= 0"})
@Ensures({"numObservations == old(numObservations) + incObservations", "numMismatches == old(numMismatches) + incMismatches"})
- public synchronized void increment(final double incObservations, final double incMismatches) {
+ public final synchronized void increment(final double incObservations, final double incMismatches) {
numObservations += incObservations;
numMismatches += incMismatches;
empiricalQuality = UNINITIALIZED;
}
@Ensures({"numObservations == old(numObservations) + 1", "numMismatches >= old(numMismatches)"})
- public synchronized void increment(final boolean isError) {
+ public final synchronized void increment(final boolean isError) {
increment(1, isError ? 1 : 0.0);
}
diff --git a/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalUtils.java b/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalUtils.java
index e9fe316a8..d4e781fdd 100644
--- a/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalUtils.java
+++ b/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalUtils.java
@@ -769,4 +769,28 @@ public class RecalUtils {
return base;
}
}
+
+ /**
+ * Combines the recalibration data for table1 and table2 into table1
+ *
+ * Note that table1 is the destination, so it is modified
+ *
+ * @param table1 the destination table to merge table2 into
+ * @param table2 the source table to merge into table1
+ */
+ public static void combineTables(final NestedIntegerArray table1, final NestedIntegerArray table2) {
+ if ( table1 == null ) throw new IllegalArgumentException("table1 cannot be null");
+ if ( table2 == null ) throw new IllegalArgumentException("table2 cannot be null");
+ if ( ! Arrays.equals(table1.getDimensions(), table2.getDimensions()))
+ throw new IllegalArgumentException("Table1 " + Utils.join(",", table1.getDimensions()) + " not equal to " + Utils.join(",", table2.getDimensions()));
+
+ for (final NestedIntegerArray.Leaf row : table2.getAllLeaves()) {
+ final RecalDatum myDatum = table1.get(row.keys);
+
+ if (myDatum == null)
+ table1.put(row.value, row.keys);
+ else
+ myDatum.combine(row.value);
+ }
+ }
}
diff --git a/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalibrationReport.java b/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalibrationReport.java
index 871c1eff6..081221da4 100644
--- a/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalibrationReport.java
+++ b/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalibrationReport.java
@@ -69,19 +69,19 @@ public class RecalibrationReport {
}
- protected RecalibrationReport(final QuantizationInfo quantizationInfo, final RecalibrationTables recalibrationTables, final GATKReportTable argumentTable, final RecalibrationArgumentCollection RAC) {
+ protected RecalibrationReport(final QuantizationInfo quantizationInfo, final RecalibrationTables recalibrationTables, final Covariate[] requestedCovariates, final GATKReportTable argumentTable, final RecalibrationArgumentCollection RAC) {
this.quantizationInfo = quantizationInfo;
this.recalibrationTables = recalibrationTables;
+ this.requestedCovariates = requestedCovariates;
this.argumentTable = argumentTable;
this.RAC = RAC;
- this.requestedCovariates = null;
this.optionalCovariateIndexes = null;
}
/**
* Counts the number of unique read groups in the table
*
- * @param reportTable the GATKReport table containing data for this table
+ * @param reportTable the GATKReport table containing data for this table
* @return the number of unique read groups
*/
private int countReadGroups(final GATKReportTable reportTable) {
@@ -105,19 +105,10 @@ public class RecalibrationReport {
* @param other the recalibration report to combine with this one
*/
public void combine(final RecalibrationReport other) {
-
for ( int tableIndex = 0; tableIndex < recalibrationTables.numTables(); tableIndex++ ) {
final NestedIntegerArray myTable = recalibrationTables.getTable(tableIndex);
final NestedIntegerArray otherTable = other.recalibrationTables.getTable(tableIndex);
-
- for (final NestedIntegerArray.Leaf row : otherTable.getAllLeaves()) {
- final RecalDatum myDatum = myTable.get(row.keys);
-
- if (myDatum == null)
- myTable.put((RecalDatum)row.value, row.keys);
- else
- myDatum.combine((RecalDatum)row.value);
- }
+ RecalUtils.combineTables(myTable, otherTable);
}
}
diff --git a/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalibrationTables.java b/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalibrationTables.java
index 0dd510245..3f968d7f6 100644
--- a/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalibrationTables.java
+++ b/public/java/src/org/broadinstitute/sting/utils/recalibration/RecalibrationTables.java
@@ -25,11 +25,13 @@
package org.broadinstitute.sting.utils.recalibration;
+import com.google.java.contract.Ensures;
import org.broadinstitute.sting.utils.collections.LoggingNestedIntegerArray;
import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
import java.io.PrintStream;
+import java.util.ArrayList;
/**
* Utility class to facilitate on-the-fly base quality score recalibration.
@@ -38,8 +40,7 @@ import java.io.PrintStream;
* Date: 6/20/12
*/
-public class RecalibrationTables {
-
+public final class RecalibrationTables {
public enum TableType {
READ_GROUP_TABLE(0),
QUALITY_SCORE_TABLE(1),
@@ -52,49 +53,82 @@ public class RecalibrationTables {
}
}
- private final NestedIntegerArray[] tables;
+ private final ArrayList> tables;
+ private final int qualDimension;
+ private final int eventDimension = EventType.values().length;
+ private final int numReadGroups;
+ private final PrintStream log;
public RecalibrationTables(final Covariate[] covariates) {
this(covariates, covariates[TableType.READ_GROUP_TABLE.index].maximumKeyValue() + 1, null);
}
- public RecalibrationTables(final Covariate[] covariates, final PrintStream log) {
- this(covariates, covariates[TableType.READ_GROUP_TABLE.index].maximumKeyValue() + 1, log);
- }
-
public RecalibrationTables(final Covariate[] covariates, final int numReadGroups) {
this(covariates, numReadGroups, null);
}
public RecalibrationTables(final Covariate[] covariates, final int numReadGroups, final PrintStream log) {
- tables = new NestedIntegerArray[covariates.length];
+ tables = new ArrayList>(covariates.length);
+ for ( int i = 0; i < covariates.length; i++ )
+ tables.add(i, null); // initialize so we can set below
- final int qualDimension = covariates[TableType.QUALITY_SCORE_TABLE.index].maximumKeyValue() + 1;
- final int eventDimension = EventType.values().length;
+ qualDimension = covariates[TableType.QUALITY_SCORE_TABLE.index].maximumKeyValue() + 1;
+ this.numReadGroups = numReadGroups;
+ this.log = log;
+
+ tables.set(TableType.READ_GROUP_TABLE.index,
+ log == null ? new NestedIntegerArray(numReadGroups, eventDimension) :
+ new LoggingNestedIntegerArray(log, "READ_GROUP_TABLE", numReadGroups, eventDimension));
+
+ tables.set(TableType.QUALITY_SCORE_TABLE.index, makeQualityScoreTable());
- tables[TableType.READ_GROUP_TABLE.index] = log == null ? new NestedIntegerArray(numReadGroups, eventDimension) :
- new LoggingNestedIntegerArray(log, "READ_GROUP_TABLE", numReadGroups, eventDimension);
- tables[TableType.QUALITY_SCORE_TABLE.index] = log == null ? new NestedIntegerArray(numReadGroups, qualDimension, eventDimension) :
- new LoggingNestedIntegerArray(log, "QUALITY_SCORE_TABLE", numReadGroups, qualDimension, eventDimension);
for (int i = TableType.OPTIONAL_COVARIATE_TABLES_START.index; i < covariates.length; i++)
- tables[i] = log == null ? new NestedIntegerArray(numReadGroups, qualDimension, covariates[i].maximumKeyValue()+1, eventDimension) :
- new LoggingNestedIntegerArray(log, String.format("OPTIONAL_COVARIATE_TABLE_%d", i - TableType.OPTIONAL_COVARIATE_TABLES_START.index + 1),
- numReadGroups, qualDimension, covariates[i].maximumKeyValue()+1, eventDimension);
+ tables.set(i,
+ log == null ? new NestedIntegerArray(numReadGroups, qualDimension, covariates[i].maximumKeyValue()+1, eventDimension) :
+ new LoggingNestedIntegerArray(log, String.format("OPTIONAL_COVARIATE_TABLE_%d", i - TableType.OPTIONAL_COVARIATE_TABLES_START.index + 1),
+ numReadGroups, qualDimension, covariates[i].maximumKeyValue()+1, eventDimension));
}
+ @Ensures("result != null")
public NestedIntegerArray getReadGroupTable() {
- return (NestedIntegerArray)tables[TableType.READ_GROUP_TABLE.index];
+ return getTable(TableType.READ_GROUP_TABLE.index);
}
+ @Ensures("result != null")
public NestedIntegerArray getQualityScoreTable() {
- return (NestedIntegerArray)tables[TableType.QUALITY_SCORE_TABLE.index];
+ return getTable(TableType.QUALITY_SCORE_TABLE.index);
}
+ @Ensures("result != null")
public NestedIntegerArray getTable(final int index) {
- return (NestedIntegerArray)tables[index];
+ return tables.get(index);
}
+ @Ensures("result >= 0")
public int numTables() {
- return tables.length;
+ return tables.size();
+ }
+
+ /**
+ * Allocate a new quality score table, based on requested parameters
+ * in this set of tables, without any data in it. The return result
+ * of this table is suitable for acting as a thread-local cache
+ * for quality score values
+ * @return a newly allocated, empty read group x quality score table
+ */
+ public NestedIntegerArray makeQualityScoreTable() {
+ return log == null
+ ? new NestedIntegerArray(numReadGroups, qualDimension, eventDimension)
+ : new LoggingNestedIntegerArray(log, "QUALITY_SCORE_TABLE", numReadGroups, qualDimension, eventDimension);
+ }
+
+ /**
+ * Merge in the quality score table information from qualityScoreTable into this
+ * recalibration table's quality score table.
+ *
+ * @param qualityScoreTable the quality score table we want to merge in
+ */
+ public void combineQualityScoreTable(final NestedIntegerArray qualityScoreTable) {
+ RecalUtils.combineTables(getQualityScoreTable(), qualityScoreTable);
}
}
diff --git a/public/java/src/org/broadinstitute/sting/utils/recalibration/covariates/CycleCovariate.java b/public/java/src/org/broadinstitute/sting/utils/recalibration/covariates/CycleCovariate.java
index 6bff833e4..e3c822821 100755
--- a/public/java/src/org/broadinstitute/sting/utils/recalibration/covariates/CycleCovariate.java
+++ b/public/java/src/org/broadinstitute/sting/utils/recalibration/covariates/CycleCovariate.java
@@ -50,7 +50,7 @@ import java.util.EnumSet;
public class CycleCovariate implements StandardCovariate {
private int MAXIMUM_CYCLE_VALUE;
- private static final int CUSHION_FOR_INDELS = 4;
+ public static final int CUSHION_FOR_INDELS = 4;
private String default_platform = null;
private static final EnumSet DISCRETE_CYCLE_PLATFORMS = EnumSet.of(NGSPlatform.ILLUMINA, NGSPlatform.SOLID, NGSPlatform.PACBIO, NGSPlatform.COMPLETE_GENOMICS);
diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialSAMUtils.java b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialSAMUtils.java
index 0859957a3..77cf500f2 100755
--- a/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialSAMUtils.java
+++ b/public/java/src/org/broadinstitute/sting/utils/sam/ArtificialSAMUtils.java
@@ -188,6 +188,7 @@ public class ArtificialSAMUtils {
GATKSAMRecord rec = createArtificialRead(header, name, refIndex, alignmentStart, bases.length);
rec.setReadBases(bases);
rec.setBaseQualities(qual);
+ rec.setReadGroup(new GATKSAMReadGroupRecord("x"));
if (refIndex == -1) {
rec.setReadUnmappedFlag(true);
}
diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/GATKSAMReadGroupRecord.java b/public/java/src/org/broadinstitute/sting/utils/sam/GATKSAMReadGroupRecord.java
index bb99156fe..5f70ced92 100755
--- a/public/java/src/org/broadinstitute/sting/utils/sam/GATKSAMReadGroupRecord.java
+++ b/public/java/src/org/broadinstitute/sting/utils/sam/GATKSAMReadGroupRecord.java
@@ -12,9 +12,6 @@ import org.broadinstitute.sting.utils.NGSPlatform;
*
*/
public class GATKSAMReadGroupRecord extends SAMReadGroupRecord {
-
- public static final String LANE_TAG = "LN";
-
// the SAMReadGroupRecord data we're caching
private String mSample = null;
private String mPlatform = null;
@@ -33,46 +30,14 @@ public class GATKSAMReadGroupRecord extends SAMReadGroupRecord {
super(record.getReadGroupId(), record);
}
- public GATKSAMReadGroupRecord(SAMReadGroupRecord record, NGSPlatform pl) {
- super(record.getReadGroupId(), record);
- setPlatform(pl.getDefaultPlatform());
- mNGSPlatform = pl;
- retrievedPlatform = retrievedNGSPlatform = true;
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- // *** The following methods are overloaded to cache the appropriate data ***//
- ///////////////////////////////////////////////////////////////////////////////
-
- public String getSample() {
- if ( !retrievedSample ) {
- mSample = super.getSample();
- retrievedSample = true;
- }
- return mSample;
- }
-
- public void setSample(String s) {
- super.setSample(s);
- mSample = s;
- retrievedSample = true;
- }
-
- public String getPlatform() {
- if ( !retrievedPlatform ) {
- mPlatform = super.getPlatform();
- retrievedPlatform = true;
- }
- return mPlatform;
- }
-
- public void setPlatform(String s) {
- super.setPlatform(s);
- mPlatform = s;
- retrievedPlatform = true;
- retrievedNGSPlatform = false; // recalculate the NGSPlatform
- }
-
+ /**
+ * Get the NGSPlatform enum telling us the platform of this read group
+ *
+ * This function call is caching, so subsequent calls to it are free, while
+ * the first time it's called there's a bit of work to resolve the enum
+ *
+ * @return an NGSPlatform enum value
+ */
public NGSPlatform getNGSPlatform() {
if ( ! retrievedNGSPlatform ) {
mNGSPlatform = NGSPlatform.fromReadGroupPL(getPlatform());
@@ -82,11 +47,40 @@ public class GATKSAMReadGroupRecord extends SAMReadGroupRecord {
return mNGSPlatform;
}
- public String getLane() {
- return this.getAttribute(LANE_TAG);
+ ///////////////////////////////////////////////////////////////////////////////
+ // *** The following methods are overloaded to cache the appropriate data ***//
+ ///////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public String getSample() {
+ if ( !retrievedSample ) {
+ mSample = super.getSample();
+ retrievedSample = true;
+ }
+ return mSample;
}
-
- public void setLane(String lane) {
- this.setAttribute(LANE_TAG, lane);
+
+ @Override
+ public void setSample(String s) {
+ super.setSample(s);
+ mSample = s;
+ retrievedSample = true;
+ }
+
+ @Override
+ public String getPlatform() {
+ if ( !retrievedPlatform ) {
+ mPlatform = super.getPlatform();
+ retrievedPlatform = true;
+ }
+ return mPlatform;
+ }
+
+ @Override
+ public void setPlatform(String s) {
+ super.setPlatform(s);
+ mPlatform = s;
+ retrievedPlatform = true;
+ retrievedNGSPlatform = false; // recalculate the NGSPlatform
}
}
\ No newline at end of file
diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/GATKSAMRecord.java b/public/java/src/org/broadinstitute/sting/utils/sam/GATKSAMRecord.java
index 9fdb48b34..beadead0a 100755
--- a/public/java/src/org/broadinstitute/sting/utils/sam/GATKSAMRecord.java
+++ b/public/java/src/org/broadinstitute/sting/utils/sam/GATKSAMRecord.java
@@ -25,9 +25,9 @@
package org.broadinstitute.sting.utils.sam;
import net.sf.samtools.*;
-import org.broadinstitute.sting.utils.recalibration.EventType;
import org.broadinstitute.sting.utils.NGSPlatform;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
+import org.broadinstitute.sting.utils.recalibration.EventType;
import java.util.Arrays;
import java.util.HashMap;
@@ -56,6 +56,12 @@ public class GATKSAMRecord extends BAMRecord {
public static final String BQSR_BASE_INSERTION_QUALITIES = "BI"; // base qualities for insertions
public static final String BQSR_BASE_DELETION_QUALITIES = "BD"; // base qualities for deletions
+ /**
+ * The default quality score for an insertion or deletion, if
+ * none are provided for this read.
+ */
+ public static final byte DEFAULT_INSERTION_DELETION_QUAL = (byte)45;
+
// the SAMRecord data we're caching
private String mReadString = null;
private GATKSAMReadGroupRecord mReadGroup = null;
@@ -141,16 +147,36 @@ public class GATKSAMRecord extends BAMRecord {
mReadString = s;
}
+ /**
+ * Get the GATKSAMReadGroupRecord of this read
+ * @return a non-null GATKSAMReadGroupRecord
+ */
@Override
public GATKSAMReadGroupRecord getReadGroup() {
- if ( !retrievedReadGroup ) {
- SAMReadGroupRecord tempReadGroup = super.getReadGroup();
- mReadGroup = (tempReadGroup == null ? null : new GATKSAMReadGroupRecord(tempReadGroup));
+ if ( ! retrievedReadGroup ) {
+ final SAMReadGroupRecord rg = super.getReadGroup();
+
+ // three cases: rg may be null (no rg, rg may already be a GATKSAMReadGroupRecord, or it may be
+ // a regular SAMReadGroupRecord in which case we have to make it a GATKSAMReadGroupRecord
+ if ( rg == null )
+ mReadGroup = null;
+ else if ( rg instanceof GATKSAMReadGroupRecord )
+ mReadGroup = (GATKSAMReadGroupRecord)rg;
+ else
+ mReadGroup = new GATKSAMReadGroupRecord(rg);
+
retrievedReadGroup = true;
}
return mReadGroup;
}
+ public void setReadGroup( final GATKSAMReadGroupRecord readGroup ) {
+ mReadGroup = readGroup;
+ retrievedReadGroup = true;
+ setAttribute("RG", mReadGroup.getId()); // todo -- this should be standardized, but we don't have access to SAMTagUtils!
+ }
+
+
@Override
public int hashCode() {
return super.hashCode();
@@ -229,7 +255,7 @@ public class GATKSAMRecord extends BAMRecord {
byte [] quals = getExistingBaseInsertionQualities();
if( quals == null ) {
quals = new byte[getBaseQualities().length];
- Arrays.fill(quals, (byte) 45); // Some day in the future when base insertion and base deletion quals exist the samtools API will
+ Arrays.fill(quals, DEFAULT_INSERTION_DELETION_QUAL); // Some day in the future when base insertion and base deletion quals exist the samtools API will
// be updated and the original quals will be pulled here, but for now we assume the original quality is a flat Q45
}
return quals;
@@ -245,7 +271,7 @@ public class GATKSAMRecord extends BAMRecord {
byte[] quals = getExistingBaseDeletionQualities();
if( quals == null ) {
quals = new byte[getBaseQualities().length];
- Arrays.fill(quals, (byte) 45); // Some day in the future when base insertion and base deletion quals exist the samtools API will
+ Arrays.fill(quals, DEFAULT_INSERTION_DELETION_QUAL); // Some day in the future when base insertion and base deletion quals exist the samtools API will
// be updated and the original quals will be pulled here, but for now we assume the original quality is a flat Q45
}
return quals;
@@ -259,12 +285,6 @@ public class GATKSAMRecord extends BAMRecord {
return getReadGroup().getNGSPlatform();
}
- public void setReadGroup( final GATKSAMReadGroupRecord readGroup ) {
- mReadGroup = readGroup;
- retrievedReadGroup = true;
- setAttribute("RG", mReadGroup.getId()); // todo -- this should be standardized, but we don't have access to SAMTagUtils!
- }
-
///////////////////////////////////////////////////////////////////////////////
// *** ReduceReads functions ***//
///////////////////////////////////////////////////////////////////////////////
diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/MisencodedBaseQualityReadTransformer.java b/public/java/src/org/broadinstitute/sting/utils/sam/MisencodedBaseQualityReadTransformer.java
index cac51239a..17d40e530 100644
--- a/public/java/src/org/broadinstitute/sting/utils/sam/MisencodedBaseQualityReadTransformer.java
+++ b/public/java/src/org/broadinstitute/sting/utils/sam/MisencodedBaseQualityReadTransformer.java
@@ -47,6 +47,8 @@ public class MisencodedBaseQualityReadTransformer extends ReadTransformer {
final byte[] quals = read.getBaseQualities();
for ( int i = 0; i < quals.length; i++ ) {
quals[i] -= encodingFixValue;
+ if ( quals[i] < 0 )
+ throw new UserException.BadInput("while fixing mis-encoded base qualities we encountered a read that was correctly encoded; we cannot handle such a mixture of reads so unfortunately the BAM must be fixed with some other tool");
}
read.setBaseQualities(quals);
return read;
diff --git a/public/java/src/org/broadinstitute/sting/utils/sam/ReadUtils.java b/public/java/src/org/broadinstitute/sting/utils/sam/ReadUtils.java
index bd908727f..263cd9bd1 100755
--- a/public/java/src/org/broadinstitute/sting/utils/sam/ReadUtils.java
+++ b/public/java/src/org/broadinstitute/sting/utils/sam/ReadUtils.java
@@ -226,7 +226,7 @@ public class ReadUtils {
* @param read the read to test
* @return checks the read group tag PL for the default 454 tag
*/
- public static boolean is454Read(SAMRecord read) {
+ public static boolean is454Read(GATKSAMRecord read) {
return NGSPlatform.fromRead(read) == NGSPlatform.LS454;
}
@@ -236,7 +236,7 @@ public class ReadUtils {
* @param read the read to test
* @return checks the read group tag PL for the default ion tag
*/
- public static boolean isIonRead(SAMRecord read) {
+ public static boolean isIonRead(GATKSAMRecord read) {
return NGSPlatform.fromRead(read) == NGSPlatform.ION_TORRENT;
}
@@ -246,7 +246,7 @@ public class ReadUtils {
* @param read the read to test
* @return checks the read group tag PL for the default SOLiD tag
*/
- public static boolean isSOLiDRead(SAMRecord read) {
+ public static boolean isSOLiDRead(GATKSAMRecord read) {
return NGSPlatform.fromRead(read) == NGSPlatform.SOLID;
}
@@ -256,7 +256,7 @@ public class ReadUtils {
* @param read the read to test
* @return checks the read group tag PL for the default SLX tag
*/
- public static boolean isIlluminaRead(SAMRecord read) {
+ public static boolean isIlluminaRead(GATKSAMRecord read) {
return NGSPlatform.fromRead(read) == NGSPlatform.ILLUMINA;
}
@@ -268,7 +268,7 @@ public class ReadUtils {
* @param name the upper-cased platform name to test
* @return whether or not name == PL tag in the read group of read
*/
- public static boolean isPlatformRead(SAMRecord read, String name) {
+ public static boolean isPlatformRead(GATKSAMRecord read, String name) {
SAMReadGroupRecord readGroup = read.getReadGroup();
if (readGroup != null) {
diff --git a/public/java/test/org/broadinstitute/sting/gatk/walkers/genotyper/ArtificialReadPileupTestProvider.java b/public/java/test/org/broadinstitute/sting/gatk/walkers/genotyper/ArtificialReadPileupTestProvider.java
index e48c4d197..39b81f4a2 100644
--- a/public/java/test/org/broadinstitute/sting/gatk/walkers/genotyper/ArtificialReadPileupTestProvider.java
+++ b/public/java/test/org/broadinstitute/sting/gatk/walkers/genotyper/ArtificialReadPileupTestProvider.java
@@ -37,6 +37,7 @@ import org.broadinstitute.sting.utils.pileup.PileupElement;
import org.broadinstitute.sting.utils.pileup.ReadBackedPileup;
import org.broadinstitute.sting.utils.pileup.ReadBackedPileupImpl;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
+import org.broadinstitute.sting.utils.sam.GATKSAMReadGroupRecord;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
import java.util.*;
@@ -53,11 +54,11 @@ public class ArtificialReadPileupTestProvider {
final String artificialReadName = "synth";
final int artificialRefStart = 1;
final int artificialMappingQuality = 60;
- Map sample2RG = new HashMap();
+ Map sample2RG = new HashMap();
List sampleRGs;
List sampleNames = new ArrayList();
private String sampleName(int i) { return sampleNames.get(i); }
- private SAMReadGroupRecord sampleRG(String name) { return sample2RG.get(name); }
+ private GATKSAMReadGroupRecord sampleRG(String name) { return sample2RG.get(name); }
public final int locStart = 105; // start position where we desire artificial variant
private final int readLength = 10; // desired read length in pileup
public final int readOffset = 4;
@@ -75,7 +76,7 @@ public class ArtificialReadPileupTestProvider {
for ( int i = 0; i < numSamples; i++ ) {
sampleNames.add(String.format("%s%04d", SAMPLE_PREFIX, i));
- SAMReadGroupRecord rg = createRG(sampleName(i));
+ GATKSAMReadGroupRecord rg = createRG(sampleName(i));
sampleRGs.add(rg);
sample2RG.put(sampleName(i), rg);
}
@@ -134,8 +135,8 @@ public class ArtificialReadPileupTestProvider {
return contexts;
}
- private SAMReadGroupRecord createRG(String name) {
- SAMReadGroupRecord rg = new SAMReadGroupRecord(name);
+ private GATKSAMReadGroupRecord createRG(String name) {
+ GATKSAMReadGroupRecord rg = new GATKSAMReadGroupRecord(name);
rg.setPlatform("ILLUMINA");
rg.setSample(name);
return rg;
@@ -189,7 +190,7 @@ public class ArtificialReadPileupTestProvider {
read.setMappingQuality(artificialMappingQuality);
read.setReferenceName(loc.getContig());
read.setReadNegativeStrandFlag(false);
- read.setAttribute("RG", sampleRG(sample).getReadGroupId());
+ read.setReadGroup(sampleRG(sample));
pileupElements.add(new PileupElement(read,readOffset,false,isBeforeDeletion, false, isBeforeInsertion,false,false,altBases,Math.abs(eventLength)));
diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/MapResultUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/MapResultUnitTest.java
new file mode 100644
index 000000000..93fe9578f
--- /dev/null
+++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/MapResultUnitTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2012 The Broad Institute
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+ * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.broadinstitute.sting.utils.nanoScheduler;
+
+import org.broadinstitute.sting.BaseTest;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+* UnitTests for the InputProducer
+*
+* User: depristo
+* Date: 8/24/12
+* Time: 11:25 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class MapResultUnitTest {
+ @DataProvider(name = "CompareTester")
+ public Object[][] createCompareTester() {
+ List