BQSR cleanup: merge Advanced and Standard recalibration engine into just the RecalibrationEngine
-- As we are no longer maintaining a public/protected system we need only have one RecalibrationEngine. -- Misc. code cleanup and docs along the way
This commit is contained in:
parent
7df47418d8
commit
bbdf9ee91b
|
|
@ -1,65 +0,0 @@
|
|||
package org.broadinstitute.sting.gatk.walkers.bqsr;
|
||||
|
||||
/*
|
||||
* Copyright (c) 2009 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
import org.broadinstitute.sting.utils.classloader.ProtectedPackageSource;
|
||||
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
|
||||
import org.broadinstitute.sting.utils.recalibration.EventType;
|
||||
import org.broadinstitute.sting.utils.recalibration.ReadCovariates;
|
||||
import org.broadinstitute.sting.utils.recalibration.RecalDatum;
|
||||
import org.broadinstitute.sting.utils.recalibration.RecalibrationTables;
|
||||
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
||||
|
||||
public class AdvancedRecalibrationEngine extends StandardRecalibrationEngine implements ProtectedPackageSource {
|
||||
@Override
|
||||
public void updateDataForRead( final ReadRecalibrationInfo recalInfo ) {
|
||||
final GATKSAMRecord read = recalInfo.getRead();
|
||||
final ReadCovariates readCovariates = recalInfo.getCovariatesValues();
|
||||
final RecalibrationTables tables = getRecalibrationTables();
|
||||
final NestedIntegerArray<RecalDatum> qualityScoreTable = tables.getQualityScoreTable();
|
||||
|
||||
for( int offset = 0; offset < read.getReadBases().length; offset++ ) {
|
||||
if( ! recalInfo.skip(offset) ) {
|
||||
|
||||
for (final EventType eventType : EventType.values()) {
|
||||
final int[] keys = readCovariates.getKeySet(offset, eventType);
|
||||
final int eventIndex = eventType.index;
|
||||
final byte qual = recalInfo.getQual(eventType, offset);
|
||||
final double isError = recalInfo.getErrorFraction(eventType, offset);
|
||||
|
||||
incrementDatumOrPutIfNecessary(qualityScoreTable, qual, isError, keys[0], keys[1], eventIndex);
|
||||
|
||||
for (int i = 2; i < covariates.length; i++) {
|
||||
if (keys[i] < 0)
|
||||
continue;
|
||||
|
||||
incrementDatumOrPutIfNecessary(tables.getTable(i), qual, isError, keys[0], keys[1], keys[i], eventIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -113,12 +113,11 @@ import java.util.List;
|
|||
@ReadFilters({MappingQualityZeroFilter.class, MappingQualityUnavailableFilter.class, UnmappedReadFilter.class, NotPrimaryAlignmentFilter.class, DuplicateReadFilter.class, FailsVendorQualityCheckFilter.class})
|
||||
@PartitionBy(PartitionType.READ)
|
||||
public class BaseRecalibrator extends ReadWalker<Long, Long> implements NanoSchedulable {
|
||||
/**
|
||||
* all the command line arguments for BQSR and it's covariates
|
||||
*/
|
||||
@ArgumentCollection
|
||||
private final RecalibrationArgumentCollection RAC = new RecalibrationArgumentCollection(); // all the command line arguments for BQSR and it's covariates
|
||||
|
||||
@Advanced
|
||||
@Argument(fullName = "bqsrBAQGapOpenPenalty", shortName="bqsrBAQGOP", doc="BQSR BAQ gap open penalty (Phred Scaled). Default value is 40. 30 is perhaps better for whole genome call sets", required = false)
|
||||
public double BAQGOP = BAQ.DEFAULT_GOP;
|
||||
private final RecalibrationArgumentCollection RAC = new RecalibrationArgumentCollection();
|
||||
|
||||
/**
|
||||
* When you have nct > 1, BQSR uses nct times more memory to compute its recalibration tables, for efficiency
|
||||
|
|
@ -129,9 +128,19 @@ public class BaseRecalibrator extends ReadWalker<Long, Long> implements NanoSche
|
|||
@Argument(fullName = "lowMemoryMode", shortName="lowMemoryMode", doc="Reduce memory usage in multi-threaded code at the expense of threading efficiency", required = false)
|
||||
public boolean lowMemoryMode = false;
|
||||
|
||||
private QuantizationInfo quantizationInfo; // an object that keeps track of the information necessary for quality score quantization
|
||||
@Advanced
|
||||
@Argument(fullName = "bqsrBAQGapOpenPenalty", shortName="bqsrBAQGOP", doc="BQSR BAQ gap open penalty (Phred Scaled). Default value is 40. 30 is perhaps better for whole genome call sets", required = false)
|
||||
public double BAQGOP = BAQ.DEFAULT_GOP;
|
||||
|
||||
private Covariate[] requestedCovariates; // list to hold the all the covariate objects that were requested (required + standard + experimental)
|
||||
/**
|
||||
* an object that keeps track of the information necessary for quality score quantization
|
||||
*/
|
||||
private QuantizationInfo quantizationInfo;
|
||||
|
||||
/**
|
||||
* list to hold the all the covariate objects that were requested (required + standard + experimental)
|
||||
*/
|
||||
private Covariate[] requestedCovariates;
|
||||
|
||||
private RecalibrationEngine recalibrationEngine;
|
||||
|
||||
|
|
@ -189,30 +198,20 @@ public class BaseRecalibrator extends ReadWalker<Long, Long> implements NanoSche
|
|||
throw new UserException.CouldNotCreateOutputFile(RAC.RECAL_TABLE_FILE, e);
|
||||
}
|
||||
|
||||
int numReadGroups = 0;
|
||||
for ( final SAMFileHeader header : getToolkit().getSAMFileHeaders() )
|
||||
numReadGroups += header.getReadGroups().size();
|
||||
|
||||
recalibrationEngine = initializeRecalibrationEngine();
|
||||
recalibrationEngine.initialize(requestedCovariates, numReadGroups, RAC.RECAL_TABLE_UPDATE_LOG);
|
||||
if ( lowMemoryMode )
|
||||
recalibrationEngine.enableLowMemoryMode();
|
||||
|
||||
initializeRecalibrationEngine();
|
||||
minimumQToUse = getToolkit().getArguments().PRESERVE_QSCORES_LESS_THAN;
|
||||
referenceReader = getToolkit().getReferenceDataSource().getReference();
|
||||
}
|
||||
|
||||
private RecalibrationEngine initializeRecalibrationEngine() {
|
||||
/**
|
||||
* Initialize the recalibration engine
|
||||
*/
|
||||
private void initializeRecalibrationEngine() {
|
||||
int numReadGroups = 0;
|
||||
for ( final SAMFileHeader header : getToolkit().getSAMFileHeaders() )
|
||||
numReadGroups += header.getReadGroups().size();
|
||||
|
||||
final Class recalibrationEngineClass = GATKLiteUtils.getProtectedClassIfAvailable(RecalibrationEngine.class);
|
||||
try {
|
||||
final Constructor constructor = recalibrationEngineClass.getDeclaredConstructor((Class[])null);
|
||||
constructor.setAccessible(true);
|
||||
return (RecalibrationEngine)constructor.newInstance();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ReviewedStingException("Unable to create RecalibrationEngine class instance " + recalibrationEngineClass.getSimpleName());
|
||||
}
|
||||
recalibrationEngine = new RecalibrationEngine(requestedCovariates, numReadGroups, RAC.RECAL_TABLE_UPDATE_LOG, lowMemoryMode);
|
||||
}
|
||||
|
||||
private boolean isLowQualityBase( final GATKSAMRecord read, final int offset ) {
|
||||
|
|
|
|||
|
|
@ -1,37 +1,90 @@
|
|||
/*
|
||||
* Copyright (c) 2012 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.walkers.bqsr;
|
||||
|
||||
import com.google.java.contract.Requires;
|
||||
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
|
||||
import org.broadinstitute.sting.utils.recalibration.EventType;
|
||||
import org.broadinstitute.sting.utils.recalibration.ReadCovariates;
|
||||
import org.broadinstitute.sting.utils.recalibration.RecalDatum;
|
||||
import org.broadinstitute.sting.utils.recalibration.RecalibrationTables;
|
||||
import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
|
||||
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
public class RecalibrationEngine {
|
||||
final protected Covariate[] covariates;
|
||||
final private int numReadGroups;
|
||||
final private PrintStream maybeLogStream;
|
||||
final private boolean lowMemoryMode;
|
||||
|
||||
/**
|
||||
* Has finalizeData() been called?
|
||||
*/
|
||||
private boolean finalized = false;
|
||||
|
||||
/**
|
||||
* The final (merged, etc) recalibration tables, suitable for downstream analysis.
|
||||
*/
|
||||
private RecalibrationTables finalRecalibrationTables = null;
|
||||
|
||||
private final List<RecalibrationTables> recalibrationTablesList = new LinkedList<RecalibrationTables>();
|
||||
|
||||
private final ThreadLocal<RecalibrationTables> threadLocalTables = new ThreadLocal<RecalibrationTables>() {
|
||||
private synchronized RecalibrationTables makeAndCaptureTable() {
|
||||
final RecalibrationTables newTable = new RecalibrationTables(covariates, numReadGroups, maybeLogStream);
|
||||
recalibrationTablesList.add(newTable);
|
||||
return newTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized RecalibrationTables initialValue() {
|
||||
if ( lowMemoryMode ) {
|
||||
return recalibrationTablesList.isEmpty() ? makeAndCaptureTable() : recalibrationTablesList.get(0);
|
||||
} else {
|
||||
return makeAndCaptureTable();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Get a recalibration table suitable for updating the underlying RecalDatums
|
||||
*
|
||||
* May return a thread-local version, or a single version, depending on the initialization
|
||||
* arguments of this instance.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
protected RecalibrationTables getUpdatableRecalibrationTables() {
|
||||
return threadLocalTables.get();
|
||||
}
|
||||
|
||||
/*
|
||||
* Copyright (c) 2009 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
public interface RecalibrationEngine {
|
||||
/**
|
||||
* Initialize the recalibration engine
|
||||
*
|
||||
|
|
@ -45,23 +98,164 @@ public interface RecalibrationEngine {
|
|||
* @param numReadGroups the number of read groups we should use for the recalibration tables
|
||||
* @param maybeLogStream an optional print stream for logging calls to the nestedhashmap in the recalibration tables
|
||||
*/
|
||||
public void initialize(final Covariate[] covariates, final int numReadGroups, final PrintStream maybeLogStream);
|
||||
public RecalibrationEngine(final Covariate[] covariates, final int numReadGroups, final PrintStream maybeLogStream, final boolean enableLowMemoryMode) {
|
||||
if ( covariates == null ) throw new IllegalArgumentException("Covariates cannot be null");
|
||||
if ( numReadGroups < 1 ) throw new IllegalArgumentException("numReadGroups must be >= 1 but got " + numReadGroups);
|
||||
|
||||
this.covariates = covariates.clone();
|
||||
this.numReadGroups = numReadGroups;
|
||||
this.maybeLogStream = maybeLogStream;
|
||||
this.lowMemoryMode = enableLowMemoryMode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the recalibration statistics using the information in recalInfo
|
||||
* @param recalInfo data structure holding information about the recalibration values for a single read
|
||||
*/
|
||||
@Requires("recalInfo != null")
|
||||
public void updateDataForRead(final ReadRecalibrationInfo recalInfo);
|
||||
public void updateDataForRead( final ReadRecalibrationInfo recalInfo ) {
|
||||
final GATKSAMRecord read = recalInfo.getRead();
|
||||
final ReadCovariates readCovariates = recalInfo.getCovariatesValues();
|
||||
final RecalibrationTables tables = getUpdatableRecalibrationTables();
|
||||
final NestedIntegerArray<RecalDatum> qualityScoreTable = tables.getQualityScoreTable();
|
||||
|
||||
for( int offset = 0; offset < read.getReadBases().length; offset++ ) {
|
||||
if( ! recalInfo.skip(offset) ) {
|
||||
|
||||
for (final EventType eventType : EventType.values()) {
|
||||
final int[] keys = readCovariates.getKeySet(offset, eventType);
|
||||
final int eventIndex = eventType.index;
|
||||
final byte qual = recalInfo.getQual(eventType, offset);
|
||||
final double isError = recalInfo.getErrorFraction(eventType, offset);
|
||||
|
||||
incrementDatumOrPutIfNecessary(qualityScoreTable, qual, isError, keys[0], keys[1], eventIndex);
|
||||
|
||||
for (int i = 2; i < covariates.length; i++) {
|
||||
if (keys[i] < 0)
|
||||
continue;
|
||||
|
||||
incrementDatumOrPutIfNecessary(tables.getTable(i), qual, isError, keys[0], keys[1], keys[i], eventIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a datum object with one observation and one or zero error
|
||||
*
|
||||
* @param reportedQual the quality score reported by the instrument for this base
|
||||
* @param isError whether or not the observation is an error
|
||||
* @return a new RecalDatum object with the observation and the error
|
||||
*/
|
||||
protected RecalDatum createDatumObject(final byte reportedQual, final double isError) {
|
||||
return new RecalDatum(1, isError, reportedQual);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize, if appropriate, all derived data in recalibrationTables.
|
||||
*
|
||||
* Called once after all calls to updateDataForRead have been issued.
|
||||
*
|
||||
* Assumes that all of the principal tables (by quality score) have been completely updated,
|
||||
* and walks over this data to create summary data tables like by read group table.
|
||||
*/
|
||||
public void finalizeData();
|
||||
public void finalizeData() {
|
||||
if ( finalized ) throw new IllegalStateException("FinalizeData() has already been called");
|
||||
|
||||
public void enableLowMemoryMode();
|
||||
// merge all of the thread-local tables
|
||||
finalRecalibrationTables = mergeThreadLocalRecalibrationTables();
|
||||
|
||||
public RecalibrationTables getFinalRecalibrationTables();
|
||||
final NestedIntegerArray<RecalDatum> byReadGroupTable = finalRecalibrationTables.getReadGroupTable();
|
||||
final NestedIntegerArray<RecalDatum> byQualTable = finalRecalibrationTables.getQualityScoreTable();
|
||||
|
||||
// iterate over all values in the qual table
|
||||
for ( NestedIntegerArray.Leaf<RecalDatum> leaf : byQualTable.getAllLeaves() ) {
|
||||
final int rgKey = leaf.keys[0];
|
||||
final int eventIndex = leaf.keys[2];
|
||||
final RecalDatum rgDatum = byReadGroupTable.get(rgKey, eventIndex);
|
||||
final RecalDatum qualDatum = leaf.value;
|
||||
|
||||
if ( rgDatum == null ) {
|
||||
// create a copy of qualDatum, and initialize byReadGroup table with it
|
||||
byReadGroupTable.put(new RecalDatum(qualDatum), rgKey, eventIndex);
|
||||
} else {
|
||||
// combine the qual datum with the existing datum in the byReadGroup table
|
||||
rgDatum.combine(qualDatum);
|
||||
}
|
||||
}
|
||||
|
||||
finalized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge all of the thread local recalibration tables into a single one.
|
||||
*
|
||||
* Reuses one of the recalibration tables to hold the merged table, so this function can only be
|
||||
* called once in the engine.
|
||||
*
|
||||
* @return the merged recalibration table
|
||||
*/
|
||||
@Requires("! finalized")
|
||||
private RecalibrationTables mergeThreadLocalRecalibrationTables() {
|
||||
if ( recalibrationTablesList.isEmpty() ) throw new IllegalStateException("recalibration tables list is empty");
|
||||
|
||||
RecalibrationTables merged = null;
|
||||
for ( final RecalibrationTables table : recalibrationTablesList ) {
|
||||
if ( merged == null )
|
||||
// fast path -- if there's only only one table, so just make it the merged one
|
||||
merged = table;
|
||||
else {
|
||||
merged.combine(table);
|
||||
}
|
||||
}
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the final recalibration tables, after finalizeData() has been called
|
||||
*
|
||||
* This returns the finalized recalibration table collected by this engine.
|
||||
*
|
||||
* It is an error to call this function before finalizeData has been called
|
||||
*
|
||||
* @return the finalized recalibration table collected by this engine
|
||||
*/
|
||||
public RecalibrationTables getFinalRecalibrationTables() {
|
||||
if ( ! finalized ) throw new IllegalStateException("Cannot get final recalibration tables until finalizeData() has been called");
|
||||
return finalRecalibrationTables;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments the RecalDatum at the specified position in the specified table, or put a new item there
|
||||
* if there isn't already one.
|
||||
*
|
||||
* Does this in a thread-safe way WITHOUT being synchronized: relies on the behavior of NestedIntegerArray.put()
|
||||
* to return false if another thread inserts a new item at our position in the middle of our put operation.
|
||||
*
|
||||
* @param table the table that holds/will hold our item
|
||||
* @param qual qual for this event
|
||||
* @param isError error value for this event
|
||||
* @param keys location in table of our item
|
||||
*/
|
||||
protected void incrementDatumOrPutIfNecessary( final NestedIntegerArray<RecalDatum> table,
|
||||
final byte qual,
|
||||
final double isError,
|
||||
final int... keys ) {
|
||||
final RecalDatum existingDatum = table.get(keys);
|
||||
|
||||
if ( existingDatum == null ) {
|
||||
// No existing item, try to put a new one
|
||||
if ( ! table.put(createDatumObject(qual, isError), keys) ) {
|
||||
// Failed to put a new item because another thread came along and put an item here first.
|
||||
// Get the newly-put item and increment it (item is guaranteed to exist at this point)
|
||||
table.get(keys).increment(1.0, isError);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// Easy case: already an item here, so increment it
|
||||
existingDatum.increment(1.0, isError);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,219 +0,0 @@
|
|||
package org.broadinstitute.sting.gatk.walkers.bqsr;
|
||||
|
||||
/*
|
||||
* Copyright (c) 2009 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
import com.google.java.contract.Requires;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.broadinstitute.sting.utils.classloader.PublicPackageSource;
|
||||
import org.broadinstitute.sting.utils.collections.NestedIntegerArray;
|
||||
import org.broadinstitute.sting.utils.recalibration.*;
|
||||
import org.broadinstitute.sting.utils.recalibration.covariates.Covariate;
|
||||
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
public class StandardRecalibrationEngine implements RecalibrationEngine, PublicPackageSource {
|
||||
private static final Logger logger = Logger.getLogger(StandardRecalibrationEngine.class);
|
||||
protected Covariate[] covariates;
|
||||
private int numReadGroups;
|
||||
private PrintStream maybeLogStream;
|
||||
private boolean lowMemoryMode = false;
|
||||
|
||||
private boolean finalized = false;
|
||||
private RecalibrationTables mergedRecalibrationTables = null;
|
||||
|
||||
private final List<RecalibrationTables> recalibrationTablesList = new LinkedList<RecalibrationTables>();
|
||||
|
||||
private final ThreadLocal<RecalibrationTables> threadLocalTables = new ThreadLocal<RecalibrationTables>() {
|
||||
private synchronized RecalibrationTables makeAndCaptureTable() {
|
||||
logger.info("Creating RecalibrationTable for " + Thread.currentThread());
|
||||
final RecalibrationTables newTable = new RecalibrationTables(covariates, numReadGroups, maybeLogStream);
|
||||
recalibrationTablesList.add(newTable);
|
||||
return newTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized RecalibrationTables initialValue() {
|
||||
if ( lowMemoryMode ) {
|
||||
return recalibrationTablesList.isEmpty() ? makeAndCaptureTable() : recalibrationTablesList.get(0);
|
||||
} else {
|
||||
return makeAndCaptureTable();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
protected RecalibrationTables getRecalibrationTables() {
|
||||
return threadLocalTables.get();
|
||||
}
|
||||
|
||||
public void enableLowMemoryMode() {
|
||||
this.lowMemoryMode = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(final Covariate[] covariates, final int numReadGroups, final PrintStream maybeLogStream) {
|
||||
if ( covariates == null ) throw new IllegalArgumentException("Covariates cannot be null");
|
||||
if ( numReadGroups < 1 ) throw new IllegalArgumentException("numReadGroups must be >= 1 but got " + numReadGroups);
|
||||
|
||||
this.covariates = covariates.clone();
|
||||
this.numReadGroups = numReadGroups;
|
||||
this.maybeLogStream = maybeLogStream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateDataForRead( final ReadRecalibrationInfo recalInfo ) {
|
||||
final GATKSAMRecord read = recalInfo.getRead();
|
||||
final EventType eventType = EventType.BASE_SUBSTITUTION;
|
||||
final ReadCovariates readCovariates = recalInfo.getCovariatesValues();
|
||||
|
||||
for( int offset = 0; offset < read.getReadBases().length; offset++ ) {
|
||||
if( ! recalInfo.skip(offset) ) {
|
||||
final byte qual = recalInfo.getQual(eventType, offset);
|
||||
final double isError = recalInfo.getErrorFraction(eventType, offset);
|
||||
final int[] keys = readCovariates.getKeySet(offset, eventType);
|
||||
|
||||
incrementDatumOrPutIfNecessary(getRecalibrationTables().getQualityScoreTable(), qual, isError, keys[0], keys[1], eventType.index);
|
||||
|
||||
for (int i = 2; i < covariates.length; i++) {
|
||||
if (keys[i] < 0)
|
||||
continue;
|
||||
|
||||
incrementDatumOrPutIfNecessary(getRecalibrationTables().getTable(i), qual, isError, keys[0], keys[1], keys[i], eventType.index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a datum object with one observation and one or zero error
|
||||
*
|
||||
* @param reportedQual the quality score reported by the instrument for this base
|
||||
* @param isError whether or not the observation is an error
|
||||
* @return a new RecalDatum object with the observation and the error
|
||||
*/
|
||||
protected RecalDatum createDatumObject(final byte reportedQual, final double isError) {
|
||||
return new RecalDatum(1, isError, reportedQual);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create derived recalibration data tables
|
||||
*
|
||||
* Assumes that all of the principal tables (by quality score) have been completely updated,
|
||||
* and walks over this data to create summary data tables like by read group table.
|
||||
*/
|
||||
@Override
|
||||
public void finalizeData() {
|
||||
if ( finalized ) throw new IllegalStateException("FinalizeData() has already been called");
|
||||
|
||||
// merge all of the thread-local tables
|
||||
mergedRecalibrationTables = mergeThreadLocalRecalibrationTables();
|
||||
|
||||
final NestedIntegerArray<RecalDatum> byReadGroupTable = mergedRecalibrationTables.getReadGroupTable();
|
||||
final NestedIntegerArray<RecalDatum> byQualTable = mergedRecalibrationTables.getQualityScoreTable();
|
||||
|
||||
// iterate over all values in the qual table
|
||||
for ( NestedIntegerArray.Leaf<RecalDatum> leaf : byQualTable.getAllLeaves() ) {
|
||||
final int rgKey = leaf.keys[0];
|
||||
final int eventIndex = leaf.keys[2];
|
||||
final RecalDatum rgDatum = byReadGroupTable.get(rgKey, eventIndex);
|
||||
final RecalDatum qualDatum = leaf.value;
|
||||
|
||||
if ( rgDatum == null ) {
|
||||
// create a copy of qualDatum, and initialize byReadGroup table with it
|
||||
byReadGroupTable.put(new RecalDatum(qualDatum), rgKey, eventIndex);
|
||||
} else {
|
||||
// combine the qual datum with the existing datum in the byReadGroup table
|
||||
rgDatum.combine(qualDatum);
|
||||
}
|
||||
}
|
||||
|
||||
finalized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge all of the thread local recalibration tables into a single one.
|
||||
*
|
||||
* Reuses one of the recalibration tables to hold the merged table, so this function can only be
|
||||
* called once in the engine.
|
||||
*
|
||||
* @return the merged recalibration table
|
||||
*/
|
||||
@Requires("! finalized")
|
||||
private RecalibrationTables mergeThreadLocalRecalibrationTables() {
|
||||
if ( recalibrationTablesList.isEmpty() ) throw new IllegalStateException("recalibration tables list is empty");
|
||||
|
||||
RecalibrationTables merged = null;
|
||||
for ( final RecalibrationTables table : recalibrationTablesList ) {
|
||||
if ( merged == null )
|
||||
// fast path -- if there's only only one table, so just make it the merged one
|
||||
merged = table;
|
||||
else {
|
||||
merged.combine(table);
|
||||
}
|
||||
}
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
public RecalibrationTables getFinalRecalibrationTables() {
|
||||
if ( ! finalized ) throw new IllegalStateException("Cannot get final recalibration tables until finalizeData() has been called");
|
||||
return mergedRecalibrationTables;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments the RecalDatum at the specified position in the specified table, or put a new item there
|
||||
* if there isn't already one.
|
||||
*
|
||||
* Does this in a thread-safe way WITHOUT being synchronized: relies on the behavior of NestedIntegerArray.put()
|
||||
* to return false if another thread inserts a new item at our position in the middle of our put operation.
|
||||
*
|
||||
* @param table the table that holds/will hold our item
|
||||
* @param qual qual for this event
|
||||
* @param isError error value for this event
|
||||
* @param keys location in table of our item
|
||||
*/
|
||||
protected void incrementDatumOrPutIfNecessary( final NestedIntegerArray<RecalDatum> table,
|
||||
final byte qual,
|
||||
final double isError,
|
||||
final int... keys ) {
|
||||
final RecalDatum existingDatum = table.get(keys);
|
||||
|
||||
if ( existingDatum == null ) {
|
||||
// No existing item, try to put a new one
|
||||
if ( ! table.put(createDatumObject(qual, isError), keys) ) {
|
||||
// Failed to put a new item because another thread came along and put an item here first.
|
||||
// Get the newly-put item and increment it (item is guaranteed to exist at this point)
|
||||
table.get(keys).increment(1.0, isError);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// Easy case: already an item here, so increment it
|
||||
existingDatum.increment(1.0, isError);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue