From 792092b8917128868aedfbc4d5c86327dedb0371 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 30 Aug 2012 10:39:16 -0400 Subject: [PATCH 1/7] ReadShards now default to 10K (up from 1K) reads per samFile up to 250K -- This should help make the inputs for parallel read walkers a little meater, and avoid spinning the shard creation infrastructure so often --- .../sting/gatk/datasources/reads/SAMDataSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java index c8b654f81..2b88775b1 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java @@ -262,7 +262,7 @@ public class SAMDataSource { else { // Choose a sensible default for the read buffer size. For the moment, we're picking 1000 reads per BAM per shard (which effectively // will mean per-thread once ReadWalkers are parallelized) with a max cap of 250K reads in memory at once. - ReadShard.setReadBufferSize(Math.min(1000*samFiles.size(),250000)); + ReadShard.setReadBufferSize(Math.min(10000*samFiles.size(),250000)); } resourcePool = new SAMResourcePool(Integer.MAX_VALUE); From 7f166c319844de475aa2813b15fb8edcef9e30a9 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 30 Aug 2012 15:07:02 -0400 Subject: [PATCH 2/7] Bugfix to compareTo and equals in GenomeLoc -- Yes, GenomeLoc.compareTo was broken. The compareTo function only considered the contig and start position, but not the stop, when comparing genome locs. -- Updated GenomeLoc.compareTo function to account for stop. Updated GATK code where necessary to fix resulting problems that depended on this. -- Added unit tests to ensure that hashcode, equals, and compareTo are all correct for GenomeLocs --- .../gatk/iterators/VerifyingSamIterator.java | 4 +- .../broadinstitute/sting/utils/GenomeLoc.java | 5 +- .../sting/utils/GenomeLocUnitTest.java | 56 +++++++++++++++++++ 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java b/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java index f33dd414b..2763bca7c 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java +++ b/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java @@ -48,9 +48,7 @@ public class VerifyingSamIterator implements StingSAMIterator { if(cur.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX || cur.getAlignmentStart() == SAMRecord.NO_ALIGNMENT_START) throw new UserException.MalformedBAM(last,String.format("read %s has inconsistent mapping information.",cur.format())); - GenomeLoc lastLoc = genomeLocParser.createGenomeLoc( last ); - GenomeLoc curLoc = genomeLocParser.createGenomeLoc( cur ); - return curLoc.compareTo(lastLoc) == -1; + return last.getAlignmentStart() > cur.getAlignmentStart(); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java b/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java index 0b35dd599..6df9c9f1d 100644 --- a/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java +++ b/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java @@ -427,7 +427,10 @@ public class GenomeLoc implements Comparable, Serializable, HasGenome result = cmpContig; } else { if ( this.getStart() < that.getStart() ) result = -1; - if ( this.getStart() > that.getStart() ) result = 1; + else if ( this.getStart() > that.getStart() ) result = 1; + // these have the same start, so check the ends + else if ( this.getStop() < that.getStop() ) result = -1; + else if ( this.getStop() > that.getStop() ) result = 1; } } diff --git a/public/java/test/org/broadinstitute/sting/utils/GenomeLocUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/GenomeLocUnitTest.java index 49778a4d8..122e0265f 100644 --- a/public/java/test/org/broadinstitute/sting/utils/GenomeLocUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/GenomeLocUnitTest.java @@ -16,6 +16,7 @@ import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile; import java.io.File; import java.io.FileNotFoundException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -211,4 +212,59 @@ public class GenomeLocUnitTest extends BaseTest { Assert.assertEquals(cfg.gl1.reciprocialOverlapFraction(cfg.gl2), cfg.overlapFraction); } } + + // ------------------------------------------------------------------------------------- + // + // testing comparison, hashcode, and equals + // + // ------------------------------------------------------------------------------------- + + @DataProvider(name = "GenomeLocComparisons") + public Object[][] createGenomeLocComparisons() { + List tests = new ArrayList(); + + final int start = 10; + for ( int stop = start; stop < start + 3; stop++ ) { + final GenomeLoc g1 = genomeLocParser.createGenomeLoc("chr2", start, stop); + for ( final String contig : Arrays.asList("chr1", "chr2", "chr3")) { + for ( int start2 = start - 1; start2 <= stop + 1; start2++ ) { + for ( int stop2 = start2; stop2 < stop + 2; stop2++ ) { + final GenomeLoc g2 = genomeLocParser.createGenomeLoc(contig, start2, stop2); + + ComparisonResult cmp = ComparisonResult.EQUALS; + if ( contig.equals("chr3") ) cmp = ComparisonResult.LESS_THAN; + else if ( contig.equals("chr1") ) cmp = ComparisonResult.GREATER_THAN; + else if ( start < start2 ) cmp = ComparisonResult.LESS_THAN; + else if ( start > start2 ) cmp = ComparisonResult.GREATER_THAN; + else if ( stop < stop2 ) cmp = ComparisonResult.LESS_THAN; + else if ( stop > stop2 ) cmp = ComparisonResult.GREATER_THAN; + + tests.add(new Object[]{g1, g2, cmp}); + } + } + } + } + + return tests.toArray(new Object[][]{}); + } + + private enum ComparisonResult { + LESS_THAN(-1), + EQUALS(0), + GREATER_THAN(1); + + final int cmp; + + private ComparisonResult(int cmp) { + this.cmp = cmp; + } + } + + @Test(dataProvider = "GenomeLocComparisons") + public void testGenomeLocComparisons(GenomeLoc g1, GenomeLoc g2, ComparisonResult expected) { + Assert.assertEquals(g1.compareTo(g2), expected.cmp, "Comparing genome locs failed"); + Assert.assertEquals(g1.equals(g2), expected == ComparisonResult.EQUALS); + if ( expected == ComparisonResult.EQUALS ) + Assert.assertEquals(g1.hashCode(), g2.hashCode(), "Equal genome locs don't have the same hash code"); + } } From 72cf6bdd9f7d675797d0a76902907e3af05cea56 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 30 Aug 2012 15:10:58 -0400 Subject: [PATCH 3/7] Fix GSA-529: Fix RODs for parallel read walkers -- TraverseReadsNano modified to read in all input data before invoking maps, so the input to TraverseReadsNano is a MapData object holding the sam record, the ref context, and the refmetadatatracker. -- Update ValidateRODForReads to be tree reducible, using synchronized map and explicitly sort the output map from locations -> counts in onTraversalDone -- Expanded integration tests to test nt 1, 2, 4. --- .../gatk/traversals/TraverseReadsNano.java | 91 +++++++++++-------- .../utils/nanoScheduler/NanoScheduler.java | 5 +- 2 files changed, 58 insertions(+), 38 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java index 081c6b8fc..b397cb8c0 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadsNano.java @@ -27,16 +27,21 @@ package org.broadinstitute.sting.gatk.traversals; import net.sf.samtools.SAMRecord; import org.apache.log4j.Logger; import org.broadinstitute.sting.gatk.contexts.ReferenceContext; -import org.broadinstitute.sting.gatk.datasources.providers.*; +import org.broadinstitute.sting.gatk.datasources.providers.ReadBasedReferenceOrderedView; +import org.broadinstitute.sting.gatk.datasources.providers.ReadReferenceView; +import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; +import org.broadinstitute.sting.gatk.datasources.providers.ReadView; import org.broadinstitute.sting.gatk.datasources.reads.ReadShard; import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.nanoScheduler.MapFunction; import org.broadinstitute.sting.utils.nanoScheduler.NanoScheduler; import org.broadinstitute.sting.utils.nanoScheduler.ReduceFunction; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; +import java.util.ArrayList; +import java.util.List; + /** * @author aaron * @version 1.0 @@ -50,12 +55,13 @@ public class TraverseReadsNano extends TraversalEngine, /** our log, which we want to capture anything from this class */ protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class); private static final boolean DEBUG = false; - final NanoScheduler nanoScheduler; + private static final int MIN_GROUP_SIZE = 100; + final NanoScheduler nanoScheduler; public TraverseReadsNano(int nThreads) { final int bufferSize = ReadShard.getReadBufferSize() + 1; // actually has 1 more than max - final int mapGroupSize = bufferSize / 10 + 1; - nanoScheduler = new NanoScheduler(bufferSize, mapGroupSize, nThreads); + final int mapGroupSize = (int)Math.max(Math.ceil(bufferSize / 50.0 + 1), MIN_GROUP_SIZE); + nanoScheduler = new NanoScheduler(bufferSize, mapGroupSize, nThreads); } @Override @@ -79,24 +85,42 @@ public class TraverseReadsNano extends TraversalEngine, if( !dataProvider.hasReads() ) throw new IllegalArgumentException("Unable to traverse reads; no read data is available."); - if ( dataProvider.hasReferenceOrderedData() ) - throw new ReviewedStingException("Parallel read walkers currently don't support access to reference ordered data"); - - final ReadView reads = new ReadView(dataProvider); - final ReadReferenceView reference = new ReadReferenceView(dataProvider); - final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); - nanoScheduler.setDebug(DEBUG); - final TraverseReadsMap myMap = new TraverseReadsMap(reads, reference, rodView, walker); + final TraverseReadsMap myMap = new TraverseReadsMap(walker); final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker); - T result = nanoScheduler.execute(reads.iterator().iterator(), myMap, sum, myReduce); + T result = nanoScheduler.execute(aggregateMapData(dataProvider).iterator(), myMap, sum, myReduce); // TODO -- how do we print progress? //printProgress(dataProvider.getShard(), ???); return result; } + private List aggregateMapData(final ReadShardDataProvider dataProvider) { + final ReadView reads = new ReadView(dataProvider); + final ReadReferenceView reference = new ReadReferenceView(dataProvider); + final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider); + + final List mapData = new ArrayList(); // TODO -- need size of reads + for ( final SAMRecord read : reads ) { + final ReferenceContext refContext = ! read.getReadUnmappedFlag() + ? reference.getReferenceContext(read) + : null; + + // if the read is mapped, create a metadata tracker + final RefMetaDataTracker tracker = read.getReferenceIndex() >= 0 + ? rodView.getReferenceOrderedDataForRead(read) + : null; + + // update the number of reads we've seen + dataProvider.getShard().getReadMetrics().incrementNumIterations(); + + mapData.add(new MapData((GATKSAMRecord)read, refContext, tracker)); + } + + return mapData; + } + @Override public void printOnTraversalDone() { nanoScheduler.shutdown(); @@ -116,36 +140,31 @@ public class TraverseReadsNano extends TraversalEngine, } } - private class TraverseReadsMap implements MapFunction { - final ReadView reads; - final ReadReferenceView reference; - final ReadBasedReferenceOrderedView rodView; + private class MapData { + final GATKSAMRecord read; + final ReferenceContext refContext; + final RefMetaDataTracker tracker; + + private MapData(GATKSAMRecord read, ReferenceContext refContext, RefMetaDataTracker tracker) { + this.read = read; + this.refContext = refContext; + this.tracker = tracker; + } + } + + private class TraverseReadsMap implements MapFunction { final ReadWalker walker; - private TraverseReadsMap(ReadView reads, ReadReferenceView reference, ReadBasedReferenceOrderedView rodView, ReadWalker walker) { - this.reads = reads; - this.reference = reference; - this.rodView = rodView; + private TraverseReadsMap(ReadWalker walker) { this.walker = walker; } @Override - public M apply(final SAMRecord read) { + public M apply(final MapData data) { if ( ! walker.isDone() ) { - // ReferenceContext -- the reference bases covered by the read - final ReferenceContext refContext = ! read.getReadUnmappedFlag() && reference != null - ? reference.getReferenceContext(read) - : null; - - // update the number of reads we've seen - //dataProvider.getShard().getReadMetrics().incrementNumIterations(); - - // if the read is mapped, create a metadata tracker - final RefMetaDataTracker tracker = read.getReferenceIndex() >= 0 ? rodView.getReferenceOrderedDataForRead(read) : null; - - final boolean keepMeP = walker.filter(refContext, (GATKSAMRecord) read); + final boolean keepMeP = walker.filter(data.refContext, data.read); if (keepMeP) { - return walker.map(refContext, (GATKSAMRecord) read, tracker); + return walker.map(data.refContext, data.read, data.tracker); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 4bca3728f..25ed0766d 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -43,7 +43,8 @@ import java.util.concurrent.*; * Time: 9:47 AM */ public class NanoScheduler { - private static Logger logger = Logger.getLogger(NanoScheduler.class); + private final static Logger logger = Logger.getLogger(NanoScheduler.class); + private final static boolean ALLOW_SINGLE_THREAD_FASTPATH = true; final int bufferSize; final int mapGroupSize; @@ -172,7 +173,7 @@ public class NanoScheduler { if ( map == null ) throw new IllegalArgumentException("map function cannot be null"); if ( reduce == null ) throw new IllegalArgumentException("reduce function cannot be null"); - if ( getnThreads() == 1 ) { + if ( ALLOW_SINGLE_THREAD_FASTPATH && getnThreads() == 1 ) { return executeSingleThreaded(inputReader, map, initialValue, reduce); } else { return executeMultiThreaded(inputReader, map, initialValue, reduce); From 27d1c63448384d0d6b6bf74949608c7a92c42ccf Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 30 Aug 2012 15:56:58 -0400 Subject: [PATCH 4/7] Reduce the number of test combinations in ReadBasedREferenceOrderedView --- .../ReadBasedReferenceOrderedViewUnitTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/public/java/test/org/broadinstitute/sting/gatk/datasources/providers/ReadBasedReferenceOrderedViewUnitTest.java b/public/java/test/org/broadinstitute/sting/gatk/datasources/providers/ReadBasedReferenceOrderedViewUnitTest.java index d55c48054..eaa098793 100644 --- a/public/java/test/org/broadinstitute/sting/gatk/datasources/providers/ReadBasedReferenceOrderedViewUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/gatk/datasources/providers/ReadBasedReferenceOrderedViewUnitTest.java @@ -121,7 +121,7 @@ public class ReadBasedReferenceOrderedViewUnitTest extends BaseTest { // test in the present of a large spanning element { List oneLargeSpan = new ArrayList(handPickedFeatures); - oneLargeSpan.add(new BasicFeature(contig, 1, 100)); + oneLargeSpan.add(new BasicFeature(contig, 1, 30)); createTestsForFeatures(oneLargeSpan); } @@ -135,7 +135,7 @@ public class ReadBasedReferenceOrderedViewUnitTest extends BaseTest { // test in the presence of a partially spanning element at the end { List partialSpanEnd = new ArrayList(handPickedFeatures); - partialSpanEnd.add(new BasicFeature(contig, 10, 100)); + partialSpanEnd.add(new BasicFeature(contig, 10, 30)); createTestsForFeatures(partialSpanEnd); } @@ -165,7 +165,7 @@ public class ReadBasedReferenceOrderedViewUnitTest extends BaseTest { int featuresStart = 1; for ( final Feature f : features ) featuresStart = Math.min(featuresStart, f.getStart()); int featuresStop = 1; for ( final Feature f : features ) featuresStop = Math.max(featuresStop, f.getEnd()); - for ( final int size : Arrays.asList(1, 5, 10, 100, 1000) ) { + for ( final int size : Arrays.asList(1, 5, 10, 100) ) { final List allIntervals = new ArrayList(); // regularly spaced for ( int start = featuresStart; start < featuresStop; start++) { @@ -256,11 +256,12 @@ public class ReadBasedReferenceOrderedViewUnitTest extends BaseTest { } // all 3 way pairwise tests - for ( List singleTest : Utils.makePermutations(multiSiteTests, 3, false)) { - tests.add(new Object[]{singleTest, testStateless}); - } + //for ( List singleTest : Utils.makePermutations(multiSiteTests, 3, false)) { + // tests.add(new Object[]{singleTest, testStateless}); + //} } + logger.warn("Creating " + tests.size() + " tests for ReadMetaDataTrackerTests"); return tests.toArray(new Object[][]{}); } From 59508f82663ce27637c4a968b831cc6796537f1d Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 30 Aug 2012 15:57:29 -0400 Subject: [PATCH 5/7] tasking for n threads should give you n threads in NanoScheduler, not n - 1 --- .../broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 25ed0766d..668c82524 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -80,7 +80,7 @@ public class NanoScheduler { this.mapGroupSize = mapGroupSize; } - this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads - 1); + this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads); } /** From 863a3d73b8796510ca1461d759115cf1ed4e2f11 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 30 Aug 2012 16:21:17 -0400 Subject: [PATCH 6/7] Added ThreadSafeMapReduce interface, super of TreeReducible -- A higher level interface to declare parallelism capability of a walker. This interface means that the walker can be multi-threaded, but doesn't necessarily support TreeReducible interface, which forces you to have a combine ReduceType operation that isn't appropriate for parallel read walkers -- Updated ReadWalkers to implement ThreadSafeMapReduce not TreeReducible --- .../sting/gatk/executive/MicroScheduler.java | 19 ++++++++---- .../gatk/iterators/VerifyingSamIterator.java | 5 +-- .../sting/gatk/walkers/FlagStat.java | 7 +---- .../sting/gatk/walkers/PrintReads.java | 7 +---- .../gatk/walkers/ThreadSafeMapReduce.java | 31 +++++++++++++++++++ .../sting/gatk/walkers/TreeReducible.java | 2 +- .../sting/gatk/walkers/qc/CountReads.java | 5 ++- 7 files changed, 52 insertions(+), 24 deletions(-) create mode 100755 public/java/src/org/broadinstitute/sting/gatk/walkers/ThreadSafeMapReduce.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 70201a6cc..417a0982f 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -100,22 +100,29 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * @return The best-fit microscheduler. */ public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, ThreadAllocation threadAllocation) { - if (walker instanceof TreeReducible && threadAllocation.getNumCPUThreads() > 1) { - if(walker.isReduceByInterval()) + if (threadAllocation.getNumCPUThreads() > 1) { + if (walker.isReduceByInterval()) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); + logger.info(String.format("Running the GATK in parallel mode with %d concurrent threads",threadAllocation.getNumCPUThreads())); - if ( walker instanceof ReadWalker ) + if ( walker instanceof ReadWalker ) { + if ( ! (walker instanceof ThreadSafeMapReduce) ) badNT(engine, walker); return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); - else + } else { + // TODO -- update test for when nano scheduling only is an option + if ( ! (walker instanceof TreeReducible) ) badNT(engine, walker); return new HierarchicalMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); + } } else { - if(threadAllocation.getNumCPUThreads() > 1) - throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); return new LinearMicroScheduler(engine, walker, reads, reference, rods, threadAllocation.getNumCPUThreads(), threadAllocation.monitorThreadEfficiency()); } } + private static void badNT(final GenomeAnalysisEngine engine, final Walker walker) { + throw new UserException.BadArgumentValue("nt", String.format("The analysis %s currently does not support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); + } + /** * Create a microscheduler given the reads and reference. * diff --git a/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java b/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java index 2763bca7c..3ffe95e8b 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java +++ b/public/java/src/org/broadinstitute/sting/gatk/iterators/VerifyingSamIterator.java @@ -1,7 +1,6 @@ package org.broadinstitute.sting.gatk.iterators; import net.sf.samtools.SAMRecord; -import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.exceptions.UserException; @@ -48,7 +47,9 @@ public class VerifyingSamIterator implements StingSAMIterator { if(cur.getReferenceIndex() == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX || cur.getAlignmentStart() == SAMRecord.NO_ALIGNMENT_START) throw new UserException.MalformedBAM(last,String.format("read %s has inconsistent mapping information.",cur.format())); - return last.getAlignmentStart() > cur.getAlignmentStart(); + return (last.getReferenceIndex() > cur.getReferenceIndex()) || + (last.getReferenceIndex().equals(cur.getReferenceIndex()) && + last.getAlignmentStart() > cur.getAlignmentStart()); } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/FlagStat.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/FlagStat.java index 6f28e8726..14d14aca5 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/FlagStat.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/FlagStat.java @@ -45,7 +45,7 @@ import java.text.NumberFormat; */ @DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} ) @Requires({DataSource.READS}) -public class FlagStat extends ReadWalker implements TreeReducible { +public class FlagStat extends ReadWalker implements ThreadSafeMapReduce { @Output PrintStream out; @@ -193,11 +193,6 @@ public class FlagStat extends ReadWalker implements TreeReducible { +public class PrintReads extends ReadWalker implements ThreadSafeMapReduce { @Output(doc="Write output to this BAM filename instead of STDOUT", required = true) SAMFileWriter out; @@ -245,9 +245,4 @@ public class PrintReads extends ReadWalker impleme output.addAlignment(read); return output; } - - @Override - public SAMFileWriter treeReduce(SAMFileWriter lhs, SAMFileWriter rhs) { - return lhs; // nothing to do - } } diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/ThreadSafeMapReduce.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/ThreadSafeMapReduce.java new file mode 100755 index 000000000..1ce469f8c --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/ThreadSafeMapReduce.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2010. The Broad Institute + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ +package org.broadinstitute.sting.gatk.walkers; + +/** + * Root parallelism interface. Walkers that implement this + * declare that their map function is thread-safe and so multiple + * map calls can be run in parallel in the same JVM instance. + */ +public interface ThreadSafeMapReduce { +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java index c950e07e4..8621c0e9d 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/TreeReducible.java @@ -13,7 +13,7 @@ package org.broadinstitute.sting.gatk.walkers; * shards of the data can reduce with each other, and the composite result * can be reduced with other composite results. */ -public interface TreeReducible { +public interface TreeReducible extends ThreadSafeMapReduce { /** * A composite, 'reduce of reduces' function. * @param lhs 'left-most' portion of data in the composite reduce. diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java index 72bda03e9..856ea77f5 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/qc/CountReads.java @@ -6,7 +6,7 @@ import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.Requires; -import org.broadinstitute.sting.gatk.walkers.TreeReducible; +import org.broadinstitute.sting.gatk.walkers.ThreadSafeMapReduce; import org.broadinstitute.sting.utils.help.DocumentedGATKFeature; import org.broadinstitute.sting.utils.sam.GATKSAMRecord; @@ -41,12 +41,11 @@ import org.broadinstitute.sting.utils.sam.GATKSAMRecord; */ @DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} ) @Requires({DataSource.READS, DataSource.REFERENCE}) -public class CountReads extends ReadWalker implements TreeReducible { +public class CountReads extends ReadWalker implements ThreadSafeMapReduce { public Integer map(ReferenceContext ref, GATKSAMRecord read, RefMetaDataTracker tracker) { return 1; } @Override public Integer reduceInit() { return 0; } @Override public Integer reduce(Integer value, Integer sum) { return value + sum; } - @Override public Integer treeReduce(Integer lhs, Integer rhs) { return lhs + rhs; } } From 7b4caec8cb45504fbeaf5df2c685dcb131f72c83 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 30 Aug 2012 16:56:36 -0400 Subject: [PATCH 7/7] Fix: GSA-531 ApplyRecalibration writing to BCF: java.lang.String cannot be cast to java.lang.Double -- LOD must be added a double to attributes, not as string, so that it can be written out as BCF --- .../walkers/variantrecalibration/ApplyRecalibration.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/variantrecalibration/ApplyRecalibration.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/variantrecalibration/ApplyRecalibration.java index 011f3471c..158d1e78a 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/walkers/variantrecalibration/ApplyRecalibration.java +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/variantrecalibration/ApplyRecalibration.java @@ -39,11 +39,11 @@ import org.broadinstitute.sting.gatk.walkers.RodWalker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.utils.SampleUtils; import org.broadinstitute.sting.utils.codecs.vcf.*; -import org.broadinstitute.sting.utils.help.DocumentedGATKFeature; -import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriter; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.help.DocumentedGATKFeature; import org.broadinstitute.sting.utils.variantcontext.VariantContext; import org.broadinstitute.sting.utils.variantcontext.VariantContextBuilder; +import org.broadinstitute.sting.utils.variantcontext.writer.VariantContextWriter; import java.io.File; import java.util.*; @@ -218,7 +218,7 @@ public class ApplyRecalibration extends RodWalker implements T String filterString = null; // Annotate the new record with its VQSLOD and the worst performing annotation - builder.attribute(VariantRecalibrator.VQS_LOD_KEY, lodString); // use the String representation so that we don't lose precision on output + builder.attribute(VariantRecalibrator.VQS_LOD_KEY, lod); builder.attribute(VariantRecalibrator.CULPRIT_KEY, recalDatum.getAttribute(VariantRecalibrator.CULPRIT_KEY)); for( int i = tranches.size() - 1; i >= 0; i-- ) {