From b4b4e8d672025670f17abbec60f32528abbb95bf Mon Sep 17 00:00:00 2001 From: hanna Date: Sun, 21 Mar 2010 23:22:25 +0000 Subject: [PATCH] For Sarah Calvo: initial implementation of read pair traversal, for BAM files sorted by read name. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3052 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/gatk/GenomeAnalysisEngine.java | 39 ++++--- .../shards/BlockDelimitedReadShard.java | 4 +- .../BlockDelimitedReadShardStrategy.java | 4 + .../BlockDrivenSAMDataSource.java | 75 +++++++++++-- .../IndexDrivenSAMDataSource.java | 8 ++ .../simpleDataSources/SAMDataSource.java | 6 + .../sting/gatk/executive/MicroScheduler.java | 2 + .../gatk/traversals/TraverseReadPairs.java | 104 ++++++++++++++++++ .../sting/gatk/walkers/ReadPairWalker.java | 38 +++++++ .../gatk/walkers/qc/CountPairsWalker.java | 91 +++++++++++++++ 10 files changed, 347 insertions(+), 24 deletions(-) create mode 100644 java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java create mode 100644 java/src/org/broadinstitute/sting/gatk/walkers/ReadPairWalker.java create mode 100644 java/src/org/broadinstitute/sting/playground/gatk/walkers/qc/CountPairsWalker.java diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index d77ed666a..b8ee04481 100755 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -312,19 +312,13 @@ public class GenomeAnalysisEngine { // the mircoscheduler to return MicroScheduler microScheduler = null; - // we need to verify different parameter based on the walker type - if (my_walker instanceof LocusWalker || my_walker instanceof LocusWindowWalker) { - // create the MicroScheduler - microScheduler = MicroScheduler.create(this,my_walker,readsDataSource,referenceDataSource,rodDataSources,argCollection.numberOfThreads); - } else if (my_walker instanceof ReadWalker || my_walker instanceof DuplicateWalker) { - if (argCollection.referenceFile == null) - Utils.scareUser(String.format("Read-based traversals require a reference file but none was given")); - microScheduler = MicroScheduler.create(this,my_walker,readsDataSource,referenceDataSource,rodDataSources,argCollection.numberOfThreads); - } else { - Utils.scareUser(String.format("Unable to create the appropriate TraversalEngine for analysis type %s", walkerManager.getName(my_walker.getClass()))); + // Temporarily require all walkers to have a reference, even if that reference is not conceptually necessary. + if ((my_walker instanceof ReadWalker || my_walker instanceof DuplicateWalker || my_walker instanceof ReadPairWalker) && + argCollection.referenceFile == null) { + Utils.scareUser(String.format("Read-based traversals require a reference file but none was given")); } - return microScheduler; + return MicroScheduler.create(this,my_walker,readsDataSource,referenceDataSource,rodDataSources,argCollection.numberOfThreads); } /** @@ -666,14 +660,16 @@ public class GenomeAnalysisEngine { GenomeLocSortedSet intervals, Integer maxIterations, ValidationExclusion exclusions) { - if(readsDataSource != null && !readsDataSource.hasIndex()) { + // Use monolithic sharding if no index is present. Monolithic sharding is always required for the original + // sharding system; it's required with the new sharding system only for locus walkers. + if(readsDataSource != null && !readsDataSource.hasIndex() && (argCollection.disableExperimentalSharding || walker instanceof LocusWalker)) { if(!exclusions.contains(ValidationExclusion.TYPE.ALLOW_UNINDEXED_BAM) || intervals != null) throw new StingException("The GATK cannot currently process unindexed BAM files"); Shard.ShardType shardType; if(walker instanceof LocusWalker) shardType = Shard.ShardType.LOCUS; - else if(walker instanceof ReadWalker || walker instanceof DuplicateWalker) + else if(walker instanceof ReadWalker || walker instanceof DuplicateWalker || walker instanceof ReadPairWalker) shardType = Shard.ShardType.READ; else throw new StingException("The GATK cannot currently process unindexed BAM files"); @@ -690,6 +686,9 @@ public class GenomeAnalysisEngine { if (walker instanceof RodWalker) SHARD_SIZE *= 1000; if (intervals != null && !intervals.isEmpty()) { + if(readsDataSource != null && readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.coordinate) + Utils.scareUser("Locus walkers can only walk over coordinate-sorted data. Please resort your input BAM file."); + shardType = (walker.isReduceByInterval()) ? ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL : ShardStrategyFactory.SHATTER_STRATEGY.LINEAR; @@ -726,6 +725,20 @@ public class GenomeAnalysisEngine { drivingDataSource.getSequenceDictionary(), SHARD_SIZE, maxIterations); } + } else if (walker instanceof ReadPairWalker) { + if(argCollection.disableExperimentalSharding) + Utils.scareUser("Pairs traversal cannot be used in conjunction with the old sharding system."); + if(readsDataSource != null && readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.queryname) + Utils.scareUser("Read pair walkers can only walk over query name-sorted data. Please resort your input BAM file."); + if(intervals != null && !intervals.isEmpty()) + Utils.scareUser("Pairs traversal cannot be used in conjunction with intervals."); + + shardStrategy = ShardStrategyFactory.shatter(readsDataSource, + referenceDataSource, + ShardStrategyFactory.SHATTER_STRATEGY.READS_EXPERIMENTAL, + drivingDataSource.getSequenceDictionary(), + SHARD_SIZE, maxIterations); + } else if (walker instanceof LocusWindowWalker) { if ((intervals == null || intervals.isEmpty()) && !exclusions.contains(ValidationExclusion.TYPE.ALLOW_EMPTY_INTERVAL_LIST)) Utils.warnUser("walker is of type LocusWindow (which operates over intervals), but no intervals were provided." + diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShard.java b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShard.java index eec05c936..fb0f3a248 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShard.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShard.java @@ -83,8 +83,8 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware * @param read Add a read to the internal shard buffer. */ public void addRead(SAMRecord read) { - if(isBufferFull()) - throw new StingException("Cannot store more reads in buffer: out of space"); + // DO NOT validate that the buffer is full. Paired read sharding will occasionally have to stuff another + // read or two into the buffer. reads.add(read); } diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShardStrategy.java b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShardStrategy.java index 7e92fb7ea..032c93b54 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShardStrategy.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/shards/BlockDelimitedReadShardStrategy.java @@ -29,6 +29,9 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy { */ protected final BlockDrivenSAMDataSource dataSource; + /** + * The cached shard to be returned next. Prefetched in the peekable iterator style. + */ private Shard nextShard = null; /** our storage of the genomic locations they'd like to shard over */ @@ -52,6 +55,7 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy { /** * Create a new read shard strategy, loading read shards from the given BAM file. * @param dataSource Data source from which to load shards. + * @param locations intervals to use for sharding. */ public BlockDelimitedReadShardStrategy(SAMDataSource dataSource, GenomeLocSortedSet locations) { if(!(dataSource instanceof BlockDrivenSAMDataSource)) diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java index d56e33478..f1e5c48dd 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/BlockDrivenSAMDataSource.java @@ -16,6 +16,8 @@ import net.sf.picard.filter.FilteringIterator; import java.util.*; import java.io.File; +import static net.sf.samtools.SAMFileHeader.SortOrder; + /** * An iterator that's aware of how data is stored on disk in SAM format. * @@ -28,6 +30,12 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { */ private final SAMResourcePool resourcePool; + /** + * The sort order of the BAM files. Files without a sort order tag are assumed to be + * in coordinate order. + */ + private SAMFileHeader.SortOrder sortOrder = null; + /** * The merged header. */ @@ -58,6 +66,20 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { resourcePool = new SAMResourcePool(Integer.MAX_VALUE); SAMReaders readers = resourcePool.getAvailableReaders(); + // Determine the sort order. + for(SAMFileReader reader: readers.values()) { + // Get the sort order, forcing it to coordinate if unsorted. + SAMFileHeader header = reader.getFileHeader(); + SortOrder sortOrder = header.getSortOrder() != SortOrder.unsorted ? header.getSortOrder() : SortOrder.coordinate; + + // Validate that all input files are sorted in the same order. + if(this.sortOrder != null && this.sortOrder != sortOrder) + throw new StingException(String.format("Attempted to process mixed of files sorted as %s and %s.",this.sortOrder,sortOrder)); + + // Update the sort order. + this.sortOrder = sortOrder; + } + initializeReaderPositions(readers); SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true); @@ -95,6 +117,14 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { return true; } + /** + * Retrieves the sort order of the readers. + * @return Sort order. Can be unsorted, coordinate order, or query name order. + */ + public SortOrder getSortOrder() { + return sortOrder; + } + /** * Gets the index for a particular reader. Always preloaded. * @param id Id of the reader. @@ -122,23 +152,42 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { if(!shard.buffersReads()) throw new StingException("Attempting to fill a non-buffering shard."); - // Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't - // know why this is. Please add a comment here if you do. - boolean enableVerification = shard instanceof ReadShard; - SAMReaders readers = resourcePool.getAvailableReaders(); + // Cache the most recently viewed read so that we can check whether we've reached the end of a pair. + SAMRecord read = null; - CloseableIterator iterator = getIterator(readers,shard,enableVerification); + CloseableIterator iterator = getIterator(readers,shard,sortOrder == SortOrder.coordinate); while(!shard.isBufferFull() && iterator.hasNext()) { - SAMRecord read = iterator.next(); - Chunk endChunk = new Chunk(read.getCoordinates().getChunkEnd(),Long.MAX_VALUE); - shard.addRead(read); - readerPositions.put(getReaderID(readers,read),endChunk); + read = iterator.next(); + addReadToBufferingShard(shard,getReaderID(readers,read),read); + } + + // If the reads are sorted in queryname order, ensure that all reads + // having the same queryname become part of the same shard. + if(sortOrder == SortOrder.queryname) { + while(iterator.hasNext()) { + SAMRecord nextRead = iterator.next(); + if(read == null || !read.getReadName().equals(nextRead.getReadName())) + break; + addReadToBufferingShard(shard,getReaderID(readers,nextRead),nextRead); + } } iterator.close(); } + /** + * Adds this read to the given shard. + * @param shard The shard to which to add the read. + * @param id The id of the given reader. + * @param read The read to add to the shard. + */ + private void addReadToBufferingShard(BAMFormatAwareShard shard,SAMReaderID id,SAMRecord read) { + Chunk endChunk = new Chunk(read.getCoordinates().getChunkEnd(),Long.MAX_VALUE); + shard.addRead(read); + readerPositions.put(id,endChunk); + } + /** * Gets the reader associated with the given read. * @param readers Available readers. @@ -178,6 +227,14 @@ public class BlockDrivenSAMDataSource extends SAMDataSource { } } + /** + * Get an iterator over the data types specified in the shard. + * @param readers Readers from which to load data. + * @param shard The shard specifying the data limits. + * @param enableVerification True to verify. For compatibility with old sharding strategy. + * TODO: Collapse this flag when the two sharding systems are merged. + * @return An iterator over the selected data. + */ private StingSAMIterator getIterator(SAMReaders readers, BAMFormatAwareShard shard, boolean enableVerification) { Map> readerToIteratorMap = new HashMap>(); for(SAMReaderID id: getReaderIDs()) { diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/IndexDrivenSAMDataSource.java b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/IndexDrivenSAMDataSource.java index b99be3fff..322d83f6e 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/IndexDrivenSAMDataSource.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/IndexDrivenSAMDataSource.java @@ -99,6 +99,14 @@ public class IndexDrivenSAMDataSource extends SAMDataSource { return resourcePool.hasIndex; } + /** + * Retrieves the sort order of the readers. + * @return Sort order. For this datasource, MUST be coordinate. + */ + public SAMFileHeader.SortOrder getSortOrder() { + return SAMFileHeader.SortOrder.coordinate; + } + /** * Gets the (potentially merged) SAM file header. * diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java index 052279c44..f60e52a03 100755 --- a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java @@ -102,6 +102,12 @@ public abstract class SAMDataSource implements SimpleDataSource { */ public abstract boolean hasIndex(); + /** + * Retrieves the sort order of the readers. + * @return Sort order. Can be unsorted, coordinate order, or query name order. + */ + public abstract SAMFileHeader.SortOrder getSortOrder(); + /** * Gets the (potentially merged) SAM file header. * diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 7e3b0d64c..92c7e16b9 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -111,6 +111,8 @@ public abstract class MicroScheduler { traversalEngine = new TraverseLocusWindows(); } else if (walker instanceof DuplicateWalker) { traversalEngine = new TraverseDuplicates(); + } else if (walker instanceof ReadPairWalker) { + traversalEngine = new TraverseReadPairs(); } else { throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); } diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java new file mode 100644 index 000000000..ae40e80fd --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseReadPairs.java @@ -0,0 +1,104 @@ +package org.broadinstitute.sting.gatk.traversals; + +import org.broadinstitute.sting.gatk.walkers.Requires; +import org.broadinstitute.sting.gatk.walkers.DataSource; +import org.broadinstitute.sting.gatk.walkers.ReadPairWalker; +import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; +import org.broadinstitute.sting.gatk.datasources.providers.ReadView; +import org.broadinstitute.sting.gatk.datasources.shards.BAMFormatAwareShard; +import org.apache.log4j.Logger; +import net.sf.samtools.SAMRecord; +import net.sf.samtools.SAMRecordCoordinateComparator; + +import java.util.*; + +/** + * Traverse over a collection of read pairs, assuming that a given shard will contain all pairs. + * + * @author mhanna + * @version 0.1 + */ +@Requires({DataSource.REFERENCE}) +public class TraverseReadPairs extends TraversalEngine,ReadShardDataProvider> { + + /** our log, which we want to capture anything from this class */ + protected static Logger logger = Logger.getLogger(TraverseReadPairs.class); + + /** descriptor of the type */ + private static final String PAIRS_STRING = "read pairs"; + + /** + * Traverse by reads, given the data and the walker + * + * @param walker the walker to execute over + * @param sum of type T, the return from the walker + * + * @return the result type T, the product of all the reduce calls + */ + public T traverse(ReadPairWalker walker, + ReadShardDataProvider dataProvider, + T sum) { + logger.debug(String.format("TraverseReads.traverse Covered dataset is %s", dataProvider)); + + if( !dataProvider.hasReads() ) + throw new IllegalArgumentException("Unable to traverse reads; no read data is available."); + + ReadView reads = new ReadView(dataProvider); + List pairs = new ArrayList(); + + for(SAMRecord read: reads) { + TraversalStatistics.nReads++; + + if(pairs.size() == 0 || pairs.get(0).getReadName().equals(read.getReadName())) { + // If this read name is the same as the last, accumulate it. + pairs.add(read); + } + else { + // Otherwise, walk over the accumulated list, then start fresh with the new read. + sum = walkOverPairs(walker,pairs,sum); + pairs.clear(); + pairs.add(read); + + printProgress(PAIRS_STRING, null); + } + } + + // If any data was left in the queue, process it. + if(pairs.size() > 0) + sum = walkOverPairs(walker,pairs,sum); + + return sum; + } + + /** + * Filter / map / reduce over a single pair. + * @param walker The walker. + * @param reads The reads in the pair. + * @param sum The accumulator. + * @return The accumulator after application of the given read pairing. + */ + private T walkOverPairs(ReadPairWalker walker, List reads, T sum) { + // update the number of reads we've seen + TraversalStatistics.nRecords++; + + // Sort the reads present in coordinate order. + Collections.sort(reads,new SAMRecordCoordinateComparator()); + + final boolean keepMeP = walker.filter(reads); + if (keepMeP) { + M x = walker.map(reads); + sum = walker.reduce(x, sum); + } + + return sum; + } + + /** + * Temporary override of printOnTraversalDone. + * + * @param sum Result of the computation. + */ + public void printOnTraversalDone(T sum) { + printOnTraversalDone(PAIRS_STRING, sum); + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/ReadPairWalker.java b/java/src/org/broadinstitute/sting/gatk/walkers/ReadPairWalker.java new file mode 100644 index 000000000..ef52616b0 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/walkers/ReadPairWalker.java @@ -0,0 +1,38 @@ +package org.broadinstitute.sting.gatk.walkers; + +import net.sf.samtools.SAMRecord; + +import java.util.Collection; + +/** + * Walks over all pairs/collections of reads in a BAM file sorted by + * read name. + * + * @author mhanna + * @version 0.1 + */ +@Requires({DataSource.READS}) +public abstract class ReadPairWalker extends Walker { + + /** + * Optionally filters out read pairs. + * @param reads collections of all reads with the same read name. + * @return True to process the reads with map/reduce; false otherwise. + */ + public boolean filter(Collection reads) { + // Keep all pairs by default. + return true; + } + + /** + * Maps a read pair to a given reduce of type MapType. Semantics determined by subclasser. + * @param reads Collection of reads having the same name. + * @return Semantics defined by implementer. + */ + public abstract MapType map(Collection reads); + + // Given result of map function + public abstract ReduceType reduceInit(); + public abstract ReduceType reduce(MapType value, ReduceType sum); + +} diff --git a/java/src/org/broadinstitute/sting/playground/gatk/walkers/qc/CountPairsWalker.java b/java/src/org/broadinstitute/sting/playground/gatk/walkers/qc/CountPairsWalker.java new file mode 100644 index 000000000..580660c46 --- /dev/null +++ b/java/src/org/broadinstitute/sting/playground/gatk/walkers/qc/CountPairsWalker.java @@ -0,0 +1,91 @@ +package org.broadinstitute.sting.playground.gatk.walkers.qc; + +import org.broadinstitute.sting.gatk.walkers.ReadPairWalker; +import org.broadinstitute.sting.utils.ExpandingArrayList; +import net.sf.samtools.SAMRecord; + +import java.util.Collection; +import java.util.List; + +/** + * Counts the number of read pairs encountered in a file sorted in + * query name order. Breaks counts down by total pairs and number + * of paired reads. + * + * @author mhanna + * @version 0.1 + */ +public class CountPairsWalker extends ReadPairWalker { + /** + * How many reads are the first in a pair, based on flag 0x0040 from the SAM spec. + */ + private long firstOfPair = 0; + + /** + * How many reads are the second in a pair, based on flag 0x0080 from the SAM spec. + */ + private long secondOfPair = 0; + + /** + * A breakdown of the total number of reads seen with exactly the same read name. + */ + private List pairCountsByType = new ExpandingArrayList(); + + /** + * Maps a read pair to a given reduce of type MapType. Semantics determined by subclasser. + * @param reads Collection of reads having the same name. + * @return Semantics defined by implementer. + */ + @Override + public Integer map(Collection reads) { + if(pairCountsByType.get(reads.size()) != null) + pairCountsByType.set(reads.size(),pairCountsByType.get(reads.size())+1); + else + pairCountsByType.set(reads.size(),1L); + + for(SAMRecord read: reads) { + if(read.getFirstOfPairFlag()) firstOfPair++; + if(read.getSecondOfPairFlag()) secondOfPair++; + } + + return 1; + } + + /** + * No pairs at the beginning of a traversal. + * @return 0 always. + */ + @Override + public Long reduceInit() { + return 0L; + } + + /** + * Combine number of pairs seen in this iteration (always 1) with total number of pairs + * seen in previous iterations. + * @param value Pairs in this iteration (1), from the map function. + * @param sum Count of all pairs in prior iterations. + * @return All pairs encountered in previous iterations + all pairs encountered in this iteration (sum + 1). + */ + @Override + public Long reduce(Integer value, Long sum) { + return value + sum; + } + + /** + * Print summary statistics over the entire traversal. + * @param sum A count of all read pairs viewed. + */ + @Override + public void onTraversalDone(Long sum) { + out.printf("Total number of pairs : %d%n",sum); + out.printf("Total number of first reads in pair : %d%n",firstOfPair); + out.printf("Total number of second reads in pair: %d%n",secondOfPair); + for(int i = 1; i < pairCountsByType.size(); i++) { + if(pairCountsByType.get(i) == null) + continue; + out.printf("Pairs of size %d: %d%n",i,pairCountsByType.get(i)); + } + } + +}