Added ThreadSafeMapReduce interface, super of TreeReducible

-- A higher level interface to declare parallelism capability of a walker.  This interface means that the walker can be multi-threaded, but doesn't necessarily support TreeReducible interface, which forces you to have a combine ReduceType operation that isn't appropriate for parallel read walkers
-- Updated ReadWalkers to implement ThreadSafeMapReduce not TreeReducible
This commit is contained in:
Mark DePristo 2012-08-30 16:21:17 -04:00
parent 544740d45d
commit 2f749b5e52
7 changed files with 52 additions and 24 deletions

View File

@ -100,22 +100,29 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* @return The best-fit microscheduler.
*/
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, ThreadAllocation threadAllocation) {
if (walker instanceof TreeReducible && threadAllocation.getNumCPUThreads() > 1) {
if(walker.isReduceByInterval())
if (threadAllocation.getNumCPUThreads() > 1) {
if (walker.isReduceByInterval())
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads()));
if ( walker instanceof ReadWalker )
if ( walker instanceof ReadWalker ) {
if ( ! (walker instanceof ThreadSafeMapReduce) ) badNT(engine, walker);
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
else
} else {
// TODO -- update test for when nano scheduling only is an option
if ( ! (walker instanceof TreeReducible) ) badNT(engine, walker);
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
}
} else {
if(threadAllocation.getNumCPUThreads() > 1)
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
}
}
private static void badNT(final GenomeAnalysisEngine engine, final Walker walker) {
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
}
/**
* Create a microscheduler given the reads and reference.
*

View File

@ -1,7 +1,6 @@
package org.broadinstitute.sting.gatk.iterators;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.UserException;
@ -48,7 +47,9 @@ public class VerifyingSamIterator implements StingSAMIterator {
if(cur.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX || cur.getAlignmentStart() == SAMRecord.NO_ALIGNMENT_START)
throw new UserException.MalformedBAM(last,String.format("read %s has inconsistent mapping information.",cur.format()));
return last.getAlignmentStart() > cur.getAlignmentStart();
return (last.getReferenceIndex() > cur.getReferenceIndex()) ||
(last.getReferenceIndex().equals(cur.getReferenceIndex()) &&
last.getAlignmentStart() > cur.getAlignmentStart());
}
}

View File

@ -45,7 +45,7 @@ import java.text.NumberFormat;
*/
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
@Requires({DataSource.READS})
public class FlagStat extends ReadWalker<FlagStat.FlagStatus, FlagStat.FlagStatus> implements TreeReducible<FlagStat.FlagStatus> {
public class FlagStat extends ReadWalker<FlagStat.FlagStatus, FlagStat.FlagStatus> implements ThreadSafeMapReduce {
@Output
PrintStream out;
@ -193,11 +193,6 @@ public class FlagStat extends ReadWalker<FlagStat.FlagStatus, FlagStat.FlagStatu
return sum.add(value);
}
@Override
public FlagStatus treeReduce(final FlagStatus value, final FlagStatus sum) {
return sum.add(value);
}
@Override
public void onTraversalDone(final FlagStatus result) {
out.println(result.toString());

View File

@ -93,7 +93,7 @@ import java.util.TreeSet;
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
@BAQMode(QualityMode = BAQ.QualityMode.ADD_TAG, ApplicationTime = BAQ.ApplicationTime.ON_OUTPUT)
@Requires({DataSource.READS, DataSource.REFERENCE})
public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> implements TreeReducible<SAMFileWriter> {
public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> implements ThreadSafeMapReduce {
@Output(doc="Write output to this BAM filename instead of STDOUT", required = true)
SAMFileWriter out;
@ -245,9 +245,4 @@ public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> impleme
output.addAlignment(read);
return output;
}
@Override
public SAMFileWriter treeReduce(SAMFileWriter lhs, SAMFileWriter rhs) {
return lhs; // nothing to do
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright (c) 2010. The Broad Institute
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.walkers;
/**
* Root parallelism interface. Walkers that implement this
* declare that their map function is thread-safe and so multiple
* map calls can be run in parallel in the same JVM instance.
*/
public interface ThreadSafeMapReduce {
}

View File

@ -13,7 +13,7 @@ package org.broadinstitute.sting.gatk.walkers;
* shards of the data can reduce with each other, and the composite result
* can be reduced with other composite results.
*/
public interface TreeReducible<ReduceType> {
public interface TreeReducible<ReduceType> extends ThreadSafeMapReduce {
/**
* A composite, 'reduce of reduces' function.
* @param lhs 'left-most' portion of data in the composite reduce.

View File

@ -6,7 +6,7 @@ import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.DataSource;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.gatk.walkers.Requires;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.ThreadSafeMapReduce;
import org.broadinstitute.sting.utils.help.DocumentedGATKFeature;
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
@ -41,12 +41,11 @@ import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
*/
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
@Requires({DataSource.READS, DataSource.REFERENCE})
public class CountReads extends ReadWalker<Integer, Integer> implements TreeReducible<Integer> {
public class CountReads extends ReadWalker<Integer, Integer> implements ThreadSafeMapReduce {
public Integer map(ReferenceContext ref, GATKSAMRecord read, RefMetaDataTracker tracker) {
return 1;
}
@Override public Integer reduceInit() { return 0; }
@Override public Integer reduce(Integer value, Integer sum) { return value + sum; }
@Override public Integer treeReduce(Integer lhs, Integer rhs) { return lhs + rhs; }
}