Added ThreadSafeMapReduce interface, super of TreeReducible
-- A higher level interface to declare parallelism capability of a walker. This interface means that the walker can be multi-threaded, but doesn't necessarily support TreeReducible interface, which forces you to have a combine ReduceType operation that isn't appropriate for parallel read walkers -- Updated ReadWalkers to implement ThreadSafeMapReduce not TreeReducible
This commit is contained in:
parent
59508f8266
commit
863a3d73b8
|
|
@ -100,22 +100,29 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
||||||
* @return The best-fit microscheduler.
|
* @return The best-fit microscheduler.
|
||||||
*/
|
*/
|
||||||
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, ThreadAllocation threadAllocation) {
|
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, ThreadAllocation threadAllocation) {
|
||||||
if (walker instanceof TreeReducible && threadAllocation.getNumCPUThreads() > 1) {
|
if (threadAllocation.getNumCPUThreads() > 1) {
|
||||||
if(walker.isReduceByInterval())
|
if (walker.isReduceByInterval())
|
||||||
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
|
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
|
||||||
|
|
||||||
logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads()));
|
logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads()));
|
||||||
|
|
||||||
if ( walker instanceof ReadWalker )
|
if ( walker instanceof ReadWalker ) {
|
||||||
|
if ( ! (walker instanceof ThreadSafeMapReduce) ) badNT(engine, walker);
|
||||||
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
|
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
|
||||||
else
|
|
||||||
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
|
|
||||||
} else {
|
} else {
|
||||||
if(threadAllocation.getNumCPUThreads() > 1)
|
// TODO -- update test for when nano scheduling only is an option
|
||||||
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
|
if ( ! (walker instanceof TreeReducible) ) badNT(engine, walker);
|
||||||
|
return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
|
return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void badNT(final GenomeAnalysisEngine engine, final Walker walker) {
|
||||||
|
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a microscheduler given the reads and reference.
|
* Create a microscheduler given the reads and reference.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package org.broadinstitute.sting.gatk.iterators;
|
package org.broadinstitute.sting.gatk.iterators;
|
||||||
|
|
||||||
import net.sf.samtools.SAMRecord;
|
import net.sf.samtools.SAMRecord;
|
||||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
|
||||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||||
|
|
||||||
|
|
@ -48,7 +47,9 @@ public class VerifyingSamIterator implements StingSAMIterator {
|
||||||
if(cur.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX || cur.getAlignmentStart() == SAMRecord.NO_ALIGNMENT_START)
|
if(cur.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX || cur.getAlignmentStart() == SAMRecord.NO_ALIGNMENT_START)
|
||||||
throw new UserException.MalformedBAM(last,String.format("read %s has inconsistent mapping information.",cur.format()));
|
throw new UserException.MalformedBAM(last,String.format("read %s has inconsistent mapping information.",cur.format()));
|
||||||
|
|
||||||
return last.getAlignmentStart() > cur.getAlignmentStart();
|
return (last.getReferenceIndex() > cur.getReferenceIndex()) ||
|
||||||
|
(last.getReferenceIndex().equals(cur.getReferenceIndex()) &&
|
||||||
|
last.getAlignmentStart() > cur.getAlignmentStart());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ import java.text.NumberFormat;
|
||||||
*/
|
*/
|
||||||
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
|
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
|
||||||
@Requires({DataSource.READS})
|
@Requires({DataSource.READS})
|
||||||
public class FlagStat extends ReadWalker<FlagStat.FlagStatus, FlagStat.FlagStatus> implements TreeReducible<FlagStat.FlagStatus> {
|
public class FlagStat extends ReadWalker<FlagStat.FlagStatus, FlagStat.FlagStatus> implements ThreadSafeMapReduce {
|
||||||
@Output
|
@Output
|
||||||
PrintStream out;
|
PrintStream out;
|
||||||
|
|
||||||
|
|
@ -193,11 +193,6 @@ public class FlagStat extends ReadWalker<FlagStat.FlagStatus, FlagStat.FlagStatu
|
||||||
return sum.add(value);
|
return sum.add(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public FlagStatus treeReduce(final FlagStatus value, final FlagStatus sum) {
|
|
||||||
return sum.add(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTraversalDone(final FlagStatus result) {
|
public void onTraversalDone(final FlagStatus result) {
|
||||||
out.println(result.toString());
|
out.println(result.toString());
|
||||||
|
|
|
||||||
|
|
@ -93,7 +93,7 @@ import java.util.TreeSet;
|
||||||
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
|
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
|
||||||
@BAQMode(QualityMode = BAQ.QualityMode.ADD_TAG, ApplicationTime = BAQ.ApplicationTime.ON_OUTPUT)
|
@BAQMode(QualityMode = BAQ.QualityMode.ADD_TAG, ApplicationTime = BAQ.ApplicationTime.ON_OUTPUT)
|
||||||
@Requires({DataSource.READS, DataSource.REFERENCE})
|
@Requires({DataSource.READS, DataSource.REFERENCE})
|
||||||
public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> implements TreeReducible<SAMFileWriter> {
|
public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> implements ThreadSafeMapReduce {
|
||||||
|
|
||||||
@Output(doc="Write output to this BAM filename instead of STDOUT", required = true)
|
@Output(doc="Write output to this BAM filename instead of STDOUT", required = true)
|
||||||
SAMFileWriter out;
|
SAMFileWriter out;
|
||||||
|
|
@ -245,9 +245,4 @@ public class PrintReads extends ReadWalker<GATKSAMRecord, SAMFileWriter> impleme
|
||||||
output.addAlignment(read);
|
output.addAlignment(read);
|
||||||
return output;
|
return output;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public SAMFileWriter treeReduce(SAMFileWriter lhs, SAMFileWriter rhs) {
|
|
||||||
return lhs; // nothing to do
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2010. The Broad Institute
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||||
|
* OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
package org.broadinstitute.sting.gatk.walkers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Root parallelism interface. Walkers that implement this
|
||||||
|
* declare that their map function is thread-safe and so multiple
|
||||||
|
* map calls can be run in parallel in the same JVM instance.
|
||||||
|
*/
|
||||||
|
public interface ThreadSafeMapReduce {
|
||||||
|
}
|
||||||
|
|
@ -13,7 +13,7 @@ package org.broadinstitute.sting.gatk.walkers;
|
||||||
* shards of the data can reduce with each other, and the composite result
|
* shards of the data can reduce with each other, and the composite result
|
||||||
* can be reduced with other composite results.
|
* can be reduced with other composite results.
|
||||||
*/
|
*/
|
||||||
public interface TreeReducible<ReduceType> {
|
public interface TreeReducible<ReduceType> extends ThreadSafeMapReduce {
|
||||||
/**
|
/**
|
||||||
* A composite, 'reduce of reduces' function.
|
* A composite, 'reduce of reduces' function.
|
||||||
* @param lhs 'left-most' portion of data in the composite reduce.
|
* @param lhs 'left-most' portion of data in the composite reduce.
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
|
||||||
import org.broadinstitute.sting.gatk.walkers.DataSource;
|
import org.broadinstitute.sting.gatk.walkers.DataSource;
|
||||||
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
|
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
|
||||||
import org.broadinstitute.sting.gatk.walkers.Requires;
|
import org.broadinstitute.sting.gatk.walkers.Requires;
|
||||||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
import org.broadinstitute.sting.gatk.walkers.ThreadSafeMapReduce;
|
||||||
import org.broadinstitute.sting.utils.help.DocumentedGATKFeature;
|
import org.broadinstitute.sting.utils.help.DocumentedGATKFeature;
|
||||||
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
||||||
|
|
||||||
|
|
@ -41,12 +41,11 @@ import org.broadinstitute.sting.utils.sam.GATKSAMRecord;
|
||||||
*/
|
*/
|
||||||
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
|
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
|
||||||
@Requires({DataSource.READS, DataSource.REFERENCE})
|
@Requires({DataSource.READS, DataSource.REFERENCE})
|
||||||
public class CountReads extends ReadWalker<Integer, Integer> implements TreeReducible<Integer> {
|
public class CountReads extends ReadWalker<Integer, Integer> implements ThreadSafeMapReduce {
|
||||||
public Integer map(ReferenceContext ref, GATKSAMRecord read, RefMetaDataTracker tracker) {
|
public Integer map(ReferenceContext ref, GATKSAMRecord read, RefMetaDataTracker tracker) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public Integer reduceInit() { return 0; }
|
@Override public Integer reduceInit() { return 0; }
|
||||||
@Override public Integer reduce(Integer value, Integer sum) { return value + sum; }
|
@Override public Integer reduce(Integer value, Integer sum) { return value + sum; }
|
||||||
@Override public Integer treeReduce(Integer lhs, Integer rhs) { return lhs + rhs; }
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue