Limited version of parallel read walkers

-- Currently doesn't support accessing reference or ROD data
-- Parallel versions of PrintReads and CountReads
This commit is contained in:
Mark DePristo 2012-08-25 16:48:22 -04:00
parent e060b148e2
commit af540888f1
9 changed files with 221 additions and 16 deletions

View File

@ -94,6 +94,13 @@ public abstract class ShardDataProvider {
return referenceOrderedData;
}
/**
* @return true if reference ordered data will be provided by this shard
*/
public boolean hasReferenceOrderedData() {
return ! getReferenceOrderedData().isEmpty();
}
/**
* Create a data provider for the shard given the reads and reference.
* @param shard The chunk of data over which traversals happen.

View File

@ -88,7 +88,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
final Collection<ReferenceOrderedDataSource> rods,
final int nThreadsToUse,
final boolean monitorThreadPerformance ) {
super(engine, walker, reads, reference, rods);
super(engine, walker, reads, reference, rods, nThreadsToUse);
if ( monitorThreadPerformance ) {
final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse);

View File

@ -39,8 +39,9 @@ public class LinearMicroScheduler extends MicroScheduler {
final SAMDataSource reads,
final IndexedFastaSequenceFile reference,
final Collection<ReferenceOrderedDataSource> rods,
final int numThreads, // may be > 1 if are nanoScheduling
final boolean monitorThreadPerformance ) {
super(engine, walker, reads, reference, rods);
super(engine, walker, reads, reference, rods, numThreads);
if ( monitorThreadPerformance )
setThreadEfficiencyMonitor(new ThreadEfficiencyMonitor());

View File

@ -103,14 +103,16 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
if (walker instanceof TreeReducible && threadAllocation.getNumCPUThreads() > 1) {
if(walker.isReduceByInterval())
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
if(walker instanceof ReadWalker)
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s is a read walker. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads()));
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
if ( walker instanceof ReadWalker )
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
else
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
} else {
if(threadAllocation.getNumCPUThreads() > 1)
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.monitorThreadEfficiency());
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
}
}
@ -121,15 +123,23 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* @param reads The reads.
* @param reference The reference.
* @param rods the rods to include in the traversal
* @param numThreads the number of threads we are using in the underlying traversal
*/
protected MicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods) {
protected MicroScheduler(final GenomeAnalysisEngine engine,
final Walker walker,
final SAMDataSource reads,
final IndexedFastaSequenceFile reference,
final Collection<ReferenceOrderedDataSource> rods,
final int numThreads) {
this.engine = engine;
this.reads = reads;
this.reference = reference;
this.rods = rods;
if (walker instanceof ReadWalker) {
traversalEngine = new TraverseReads();
traversalEngine = numThreads > 1 ? new TraverseReadsNano(numThreads) : new TraverseReads();
} else if ( numThreads > 1 ) {
throw new IllegalArgumentException("BUG: numThreads > 1 but this is only allowed for ReadWalkers");
} else if (walker instanceof LocusWalker) {
traversalEngine = new TraverseLoci();
} else if (walker instanceof DuplicateWalker) {

View File

@ -0,0 +1,167 @@
/*
* Copyright (c) 2009 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.traversals;
import net.sf.samtools.SAMRecord;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.datasources.providers.*;
import org.broadinstitute.sting.gatk.datasources.reads.ReadShard;
import org.broadinstitute.sting.gatk.refdata.ReadMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.nanoScheduler.MapFunction;
import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler;
import org.broadinstitute.sting.utils.nanoScheduler.ReduceFunction;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
/**
* @author aaron
* @version 1.0
* @date Apr 24, 2009
* <p/>
* Class TraverseReads
* <p/>
* This class handles traversing by reads in the new shardable style
*/
public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,ReadShardDataProvider> {
/** our log, which we want to capture anything from this class */
protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class);
private static final boolean DEBUG = false;
final int bufferSize = ReadShard.MAX_READS;
final int mapGroupSize = bufferSize / 10 + 1;
final int nThreads;
public TraverseReadsNano(int nThreads) {
this.nThreads = nThreads;
}
@Override
protected String getTraversalType() {
return "reads";
}
/**
* Traverse by reads, given the data and the walker
*
* @param walker the walker to traverse with
* @param dataProvider the provider of the reads data
* @param sum the value of type T, specified by the walker, to feed to the walkers reduce function
* @return the reduce variable of the read walker
*/
public T traverse(ReadWalker<M,T> walker,
ReadShardDataProvider dataProvider,
T sum) {
logger.debug(String.format("TraverseReadsNano.traverse Covered dataset is %s", dataProvider));
if( !dataProvider.hasReads() )
throw new IllegalArgumentException("Unable to traverse reads; no read data is available.");
if ( dataProvider.hasReferenceOrderedData() )
throw new ReviewedStingException("Parallel read walkers currently don't support access to reference ordered data");
final ReadView reads = new ReadView(dataProvider);
final ReadReferenceView reference = new NotImplementedReadReferenceView(dataProvider);
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
final NanoScheduler<SAMRecord, M, T> nanoScheduler = new NanoScheduler<SAMRecord, M, T>(bufferSize, mapGroupSize, nThreads);
nanoScheduler.setDebug(DEBUG);
final TraverseReadsMap myMap = new TraverseReadsMap(reads, reference, rodView, walker);
final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker);
T result = nanoScheduler.execute(reads.iterator().iterator(), myMap, sum, myReduce);
nanoScheduler.shutdown();
//printProgress(dataProvider.getShard(), ???);
return result;
}
private static class NotImplementedReadReferenceView extends ReadReferenceView {
private NotImplementedReadReferenceView(ShardDataProvider provider) {
super(provider);
}
@Override
protected byte[] getReferenceBases(SAMRecord read) {
throw new ReviewedStingException("Parallel read walkers don't support accessing reference yet");
}
@Override
protected byte[] getReferenceBases(GenomeLoc genomeLoc) {
throw new ReviewedStingException("Parallel read walkers don't support accessing reference yet");
}
}
private class TraverseReadsReduce implements ReduceFunction<M, T> {
final ReadWalker<M,T> walker;
private TraverseReadsReduce(ReadWalker<M, T> walker) {
this.walker = walker;
}
@Override
public T apply(M one, T sum) {
return walker.reduce(one, sum);
}
}
private class TraverseReadsMap implements MapFunction<SAMRecord, M> {
final ReadView reads;
final ReadReferenceView reference;
final ReadBasedReferenceOrderedView rodView;
final ReadWalker<M,T> walker;
private TraverseReadsMap(ReadView reads, ReadReferenceView reference, ReadBasedReferenceOrderedView rodView, ReadWalker<M, T> walker) {
this.reads = reads;
this.reference = reference;
this.rodView = rodView;
this.walker = walker;
}
@Override
public M apply(final SAMRecord read) {
if ( ! walker.isDone() ) {
// ReferenceContext -- the reference bases covered by the read
final ReferenceContext refContext = ! read.getReadUnmappedFlag() && reference != null
? reference.getReferenceContext(read)
: null;
// update the number of reads we've seen
//dataProvider.getShard().getReadMetrics().incrementNumIterations();
// if the read is mapped, create a metadata tracker
final ReadMetaDataTracker tracker = read.getReferenceIndex() >= 0 ? rodView.getReferenceOrderedDataForRead(read) : null;
final boolean keepMeP = walker.filter(refContext, (GATKSAMRecord) read);
if (keepMeP) {
return walker.map(refContext, (GATKSAMRecord) read, tracker);
}
}
return null; // TODO -- what should we return in the case where the walker is done or the read is filtered?
}
}
}

View File

@ -93,7 +93,7 @@ import java.util.TreeSet;
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
@BAQMode(QualityMode = BAQ.QualityMode.ADD_TAG, ApplicationTime = BAQ.ApplicationTime.ON_OUTPUT)
@Requires({DataSource.READS, DataSource.REFERENCE})
public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> {
public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> implements TreeReducible<SAMFileWriter> {
@Output(doc="Write output to this BAM filename instead of STDOUT", required = true)
SAMFileWriter out;
@ -246,4 +246,8 @@ public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> {
return output;
}
@Override
public SAMFileWriter treeReduce(SAMFileWriter lhs, SAMFileWriter rhs) {
return lhs; // nothing to do
}
}

View File

@ -6,6 +6,7 @@ import org.broadinstitute.sting.gatk.refdata.ReadMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.DataSource;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.gatk.walkers.Requires;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.utils.help.DocumentedGATKFeature;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
@ -40,15 +41,12 @@ import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
*/
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
@Requires({DataSource.READS, DataSource.REFERENCE})
public class CountReads extends ReadWalker<Integer, Integer> {
public class CountReads extends ReadWalker<Integer, Integer> implements TreeReducible<Integer> {
public Integer map(ReferenceContext ref, GATKSAMRecord read, ReadMetaDataTracker tracker) {
return 1;
}
public Integer reduceInit() { return 0; }
public Integer reduce(Integer value, Integer sum) {
return value + sum;
}
@Override public Integer reduceInit() { return 0; }
@Override public Integer reduce(Integer value, Integer sum) { return value + sum; }
@Override public Integer treeReduce(Integer lhs, Integer rhs) { return lhs + rhs; }
}

View File

@ -50,6 +50,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final int nThreads;
final ExecutorService executor;
boolean shutdown = false;
boolean debug = false;
/**
* Create a new nanoschedule with the desire characteristics requested by the argument
@ -129,6 +130,20 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
return shutdown;
}
public boolean isDebug() {
return debug;
}
private void debugPrint(final String format, Object ... args) {
if ( isDebug() )
logger.info(String.format(format, args));
}
public void setDebug(boolean debug) {
this.debug = debug;
}
/**
* Execute a map/reduce job with this nanoScheduler
*
@ -190,6 +205,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final MapFunction<InputType, MapType> map,
final ReduceType initialValue,
final ReduceFunction<MapType, ReduceType> reduce) {
debugPrint("Executing nanoScheduler with initial reduce value " + initialValue);
ReduceType sum = initialValue;
while ( inputReader.hasNext() ) {
try {
@ -278,6 +294,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
final List<MapType> outputs = new LinkedList<MapType>();
for ( final InputType input : inputs )
outputs.add(map.apply(input));
debugPrint(" Processed %d elements with map", outputs.size());
return outputs;
}
}

View File

@ -140,6 +140,7 @@ public class ThreadEfficiencyMonitor {
logger.log(priority, String.format("CPU efficiency : %6.2f%% of time spent %s", getStatePercent(State.USER_CPU), State.USER_CPU.getUserFriendlyName()));
logger.log(priority, String.format("Walker inefficiency : %6.2f%% of time spent %s", getStatePercent(State.BLOCKING), State.BLOCKING.getUserFriendlyName()));
logger.log(priority, String.format("I/O inefficiency : %6.2f%% of time spent %s", getStatePercent(State.WAITING_FOR_IO), State.WAITING_FOR_IO.getUserFriendlyName()));
logger.log(priority, String.format("Thread inefficiency : %6.2f%% of time spent %s", getStatePercent(State.WAITING), State.WAITING.getUserFriendlyName()));
}
/**