For Sarah Calvo: initial implementation of read pair traversal, for BAM files

sorted by read name.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3052 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-03-21 23:22:25 +00:00
parent 169d0c6e8f
commit b4b4e8d672
10 changed files with 347 additions and 24 deletions

View File

@ -312,19 +312,13 @@ public class GenomeAnalysisEngine {
// the mircoscheduler to return
MicroScheduler microScheduler = null;
// we need to verify different parameter based on the walker type
if (my_walker instanceof LocusWalker || my_walker instanceof LocusWindowWalker) {
// create the MicroScheduler
microScheduler = MicroScheduler.create(this,my_walker,readsDataSource,referenceDataSource,rodDataSources,argCollection.numberOfThreads);
} else if (my_walker instanceof ReadWalker || my_walker instanceof DuplicateWalker) {
if (argCollection.referenceFile == null)
Utils.scareUser(String.format("Read-based traversals require a reference file but none was given"));
microScheduler = MicroScheduler.create(this,my_walker,readsDataSource,referenceDataSource,rodDataSources,argCollection.numberOfThreads);
} else {
Utils.scareUser(String.format("Unable to create the appropriate TraversalEngine for analysis type %s", walkerManager.getName(my_walker.getClass())));
// Temporarily require all walkers to have a reference, even if that reference is not conceptually necessary.
if ((my_walker instanceof ReadWalker || my_walker instanceof DuplicateWalker || my_walker instanceof ReadPairWalker) &&
argCollection.referenceFile == null) {
Utils.scareUser(String.format("Read-based traversals require a reference file but none was given"));
}
return microScheduler;
return MicroScheduler.create(this,my_walker,readsDataSource,referenceDataSource,rodDataSources,argCollection.numberOfThreads);
}
/**
@ -666,14 +660,16 @@ public class GenomeAnalysisEngine {
GenomeLocSortedSet intervals,
Integer maxIterations,
ValidationExclusion exclusions) {
if(readsDataSource != null && !readsDataSource.hasIndex()) {
// Use monolithic sharding if no index is present. Monolithic sharding is always required for the original
// sharding system; it's required with the new sharding system only for locus walkers.
if(readsDataSource != null && !readsDataSource.hasIndex() && (argCollection.disableExperimentalSharding || walker instanceof LocusWalker)) {
if(!exclusions.contains(ValidationExclusion.TYPE.ALLOW_UNINDEXED_BAM) || intervals != null)
throw new StingException("The GATK cannot currently process unindexed BAM files");
Shard.ShardType shardType;
if(walker instanceof LocusWalker)
shardType = Shard.ShardType.LOCUS;
else if(walker instanceof ReadWalker || walker instanceof DuplicateWalker)
else if(walker instanceof ReadWalker || walker instanceof DuplicateWalker || walker instanceof ReadPairWalker)
shardType = Shard.ShardType.READ;
else
throw new StingException("The GATK cannot currently process unindexed BAM files");
@ -690,6 +686,9 @@ public class GenomeAnalysisEngine {
if (walker instanceof RodWalker) SHARD_SIZE *= 1000;
if (intervals != null && !intervals.isEmpty()) {
if(readsDataSource != null && readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.coordinate)
Utils.scareUser("Locus walkers can only walk over coordinate-sorted data. Please resort your input BAM file.");
shardType = (walker.isReduceByInterval()) ?
ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL :
ShardStrategyFactory.SHATTER_STRATEGY.LINEAR;
@ -726,6 +725,20 @@ public class GenomeAnalysisEngine {
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations);
}
} else if (walker instanceof ReadPairWalker) {
if(argCollection.disableExperimentalSharding)
Utils.scareUser("Pairs traversal cannot be used in conjunction with the old sharding system.");
if(readsDataSource != null && readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.queryname)
Utils.scareUser("Read pair walkers can only walk over query name-sorted data. Please resort your input BAM file.");
if(intervals != null && !intervals.isEmpty())
Utils.scareUser("Pairs traversal cannot be used in conjunction with intervals.");
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
referenceDataSource,
ShardStrategyFactory.SHATTER_STRATEGY.READS_EXPERIMENTAL,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations);
} else if (walker instanceof LocusWindowWalker) {
if ((intervals == null || intervals.isEmpty()) && !exclusions.contains(ValidationExclusion.TYPE.ALLOW_EMPTY_INTERVAL_LIST))
Utils.warnUser("walker is of type LocusWindow (which operates over intervals), but no intervals were provided." +

View File

@ -83,8 +83,8 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware
* @param read Add a read to the internal shard buffer.
*/
public void addRead(SAMRecord read) {
if(isBufferFull())
throw new StingException("Cannot store more reads in buffer: out of space");
// DO NOT validate that the buffer is full. Paired read sharding will occasionally have to stuff another
// read or two into the buffer.
reads.add(read);
}

View File

@ -29,6 +29,9 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
*/
protected final BlockDrivenSAMDataSource dataSource;
/**
* The cached shard to be returned next. Prefetched in the peekable iterator style.
*/
private Shard nextShard = null;
/** our storage of the genomic locations they'd like to shard over */
@ -52,6 +55,7 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
/**
* Create a new read shard strategy, loading read shards from the given BAM file.
* @param dataSource Data source from which to load shards.
* @param locations intervals to use for sharding.
*/
public BlockDelimitedReadShardStrategy(SAMDataSource dataSource, GenomeLocSortedSet locations) {
if(!(dataSource instanceof BlockDrivenSAMDataSource))

View File

@ -16,6 +16,8 @@ import net.sf.picard.filter.FilteringIterator;
import java.util.*;
import java.io.File;
import static net.sf.samtools.SAMFileHeader.SortOrder;
/**
* An iterator that's aware of how data is stored on disk in SAM format.
*
@ -28,6 +30,12 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
*/
private final SAMResourcePool resourcePool;
/**
* The sort order of the BAM files. Files without a sort order tag are assumed to be
* in coordinate order.
*/
private SAMFileHeader.SortOrder sortOrder = null;
/**
* The merged header.
*/
@ -58,6 +66,20 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
SAMReaders readers = resourcePool.getAvailableReaders();
// Determine the sort order.
for(SAMFileReader reader: readers.values()) {
// Get the sort order, forcing it to coordinate if unsorted.
SAMFileHeader header = reader.getFileHeader();
SortOrder sortOrder = header.getSortOrder() != SortOrder.unsorted ? header.getSortOrder() : SortOrder.coordinate;
// Validate that all input files are sorted in the same order.
if(this.sortOrder != null && this.sortOrder != sortOrder)
throw new StingException(String.format("Attempted to process mixed of files sorted as %s and %s.",this.sortOrder,sortOrder));
// Update the sort order.
this.sortOrder = sortOrder;
}
initializeReaderPositions(readers);
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
@ -95,6 +117,14 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
return true;
}
/**
* Retrieves the sort order of the readers.
* @return Sort order. Can be unsorted, coordinate order, or query name order.
*/
public SortOrder getSortOrder() {
return sortOrder;
}
/**
* Gets the index for a particular reader. Always preloaded.
* @param id Id of the reader.
@ -122,23 +152,42 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
if(!shard.buffersReads())
throw new StingException("Attempting to fill a non-buffering shard.");
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't
// know why this is. Please add a comment here if you do.
boolean enableVerification = shard instanceof ReadShard;
SAMReaders readers = resourcePool.getAvailableReaders();
// Cache the most recently viewed read so that we can check whether we've reached the end of a pair.
SAMRecord read = null;
CloseableIterator<SAMRecord> iterator = getIterator(readers,shard,enableVerification);
CloseableIterator<SAMRecord> iterator = getIterator(readers,shard,sortOrder == SortOrder.coordinate);
while(!shard.isBufferFull() && iterator.hasNext()) {
SAMRecord read = iterator.next();
Chunk endChunk = new Chunk(read.getCoordinates().getChunkEnd(),Long.MAX_VALUE);
shard.addRead(read);
readerPositions.put(getReaderID(readers,read),endChunk);
read = iterator.next();
addReadToBufferingShard(shard,getReaderID(readers,read),read);
}
// If the reads are sorted in queryname order, ensure that all reads
// having the same queryname become part of the same shard.
if(sortOrder == SortOrder.queryname) {
while(iterator.hasNext()) {
SAMRecord nextRead = iterator.next();
if(read == null || !read.getReadName().equals(nextRead.getReadName()))
break;
addReadToBufferingShard(shard,getReaderID(readers,nextRead),nextRead);
}
}
iterator.close();
}
/**
* Adds this read to the given shard.
* @param shard The shard to which to add the read.
* @param id The id of the given reader.
* @param read The read to add to the shard.
*/
private void addReadToBufferingShard(BAMFormatAwareShard shard,SAMReaderID id,SAMRecord read) {
Chunk endChunk = new Chunk(read.getCoordinates().getChunkEnd(),Long.MAX_VALUE);
shard.addRead(read);
readerPositions.put(id,endChunk);
}
/**
* Gets the reader associated with the given read.
* @param readers Available readers.
@ -178,6 +227,14 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
}
}
/**
* Get an iterator over the data types specified in the shard.
* @param readers Readers from which to load data.
* @param shard The shard specifying the data limits.
* @param enableVerification True to verify. For compatibility with old sharding strategy.
* TODO: Collapse this flag when the two sharding systems are merged.
* @return An iterator over the selected data.
*/
private StingSAMIterator getIterator(SAMReaders readers, BAMFormatAwareShard shard, boolean enableVerification) {
Map<SAMFileReader,CloseableIterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,CloseableIterator<SAMRecord>>();
for(SAMReaderID id: getReaderIDs()) {

View File

@ -99,6 +99,14 @@ public class IndexDrivenSAMDataSource extends SAMDataSource {
return resourcePool.hasIndex;
}
/**
* Retrieves the sort order of the readers.
* @return Sort order. For this datasource, MUST be coordinate.
*/
public SAMFileHeader.SortOrder getSortOrder() {
return SAMFileHeader.SortOrder.coordinate;
}
/**
* Gets the (potentially merged) SAM file header.
*

View File

@ -102,6 +102,12 @@ public abstract class SAMDataSource implements SimpleDataSource {
*/
public abstract boolean hasIndex();
/**
* Retrieves the sort order of the readers.
* @return Sort order. Can be unsorted, coordinate order, or query name order.
*/
public abstract SAMFileHeader.SortOrder getSortOrder();
/**
* Gets the (potentially merged) SAM file header.
*

View File

@ -111,6 +111,8 @@ public abstract class MicroScheduler {
traversalEngine = new TraverseLocusWindows();
} else if (walker instanceof DuplicateWalker) {
traversalEngine = new TraverseDuplicates();
} else if (walker instanceof ReadPairWalker) {
traversalEngine = new TraverseReadPairs();
} else {
throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type.");
}

View File

@ -0,0 +1,104 @@
package org.broadinstitute.sting.gatk.traversals;
import org.broadinstitute.sting.gatk.walkers.Requires;
import org.broadinstitute.sting.gatk.walkers.DataSource;
import org.broadinstitute.sting.gatk.walkers.ReadPairWalker;
import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ReadView;
import org.broadinstitute.sting.gatk.datasources.shards.BAMFormatAwareShard;
import org.apache.log4j.Logger;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.SAMRecordCoordinateComparator;
import java.util.*;
/**
* Traverse over a collection of read pairs, assuming that a given shard will contain all pairs.
*
* @author mhanna
* @version 0.1
*/
@Requires({DataSource.REFERENCE})
public class TraverseReadPairs<M,T> extends TraversalEngine<M,T, ReadPairWalker<M,T>,ReadShardDataProvider> {
/** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(TraverseReadPairs.class);
/** descriptor of the type */
private static final String PAIRS_STRING = "read pairs";
/**
* Traverse by reads, given the data and the walker
*
* @param walker the walker to execute over
* @param sum of type T, the return from the walker
*
* @return the result type T, the product of all the reduce calls
*/
public T traverse(ReadPairWalker<M, T> walker,
ReadShardDataProvider dataProvider,
T sum) {
logger.debug(String.format("TraverseReads.traverse Covered dataset is %s", dataProvider));
if( !dataProvider.hasReads() )
throw new IllegalArgumentException("Unable to traverse reads; no read data is available.");
ReadView reads = new ReadView(dataProvider);
List<SAMRecord> pairs = new ArrayList<SAMRecord>();
for(SAMRecord read: reads) {
TraversalStatistics.nReads++;
if(pairs.size() == 0 || pairs.get(0).getReadName().equals(read.getReadName())) {
// If this read name is the same as the last, accumulate it.
pairs.add(read);
}
else {
// Otherwise, walk over the accumulated list, then start fresh with the new read.
sum = walkOverPairs(walker,pairs,sum);
pairs.clear();
pairs.add(read);
printProgress(PAIRS_STRING, null);
}
}
// If any data was left in the queue, process it.
if(pairs.size() > 0)
sum = walkOverPairs(walker,pairs,sum);
return sum;
}
/**
* Filter / map / reduce over a single pair.
* @param walker The walker.
* @param reads The reads in the pair.
* @param sum The accumulator.
* @return The accumulator after application of the given read pairing.
*/
private T walkOverPairs(ReadPairWalker<M,T> walker, List<SAMRecord> reads, T sum) {
// update the number of reads we've seen
TraversalStatistics.nRecords++;
// Sort the reads present in coordinate order.
Collections.sort(reads,new SAMRecordCoordinateComparator());
final boolean keepMeP = walker.filter(reads);
if (keepMeP) {
M x = walker.map(reads);
sum = walker.reduce(x, sum);
}
return sum;
}
/**
* Temporary override of printOnTraversalDone.
*
* @param sum Result of the computation.
*/
public void printOnTraversalDone(T sum) {
printOnTraversalDone(PAIRS_STRING, sum);
}
}

View File

@ -0,0 +1,38 @@
package org.broadinstitute.sting.gatk.walkers;
import net.sf.samtools.SAMRecord;
import java.util.Collection;
/**
* Walks over all pairs/collections of reads in a BAM file sorted by
* read name.
*
* @author mhanna
* @version 0.1
*/
@Requires({DataSource.READS})
public abstract class ReadPairWalker<MapType,ReduceType> extends Walker<MapType,ReduceType> {
/**
* Optionally filters out read pairs.
* @param reads collections of all reads with the same read name.
* @return True to process the reads with map/reduce; false otherwise.
*/
public boolean filter(Collection<SAMRecord> reads) {
// Keep all pairs by default.
return true;
}
/**
* Maps a read pair to a given reduce of type MapType. Semantics determined by subclasser.
* @param reads Collection of reads having the same name.
* @return Semantics defined by implementer.
*/
public abstract MapType map(Collection<SAMRecord> reads);
// Given result of map function
public abstract ReduceType reduceInit();
public abstract ReduceType reduce(MapType value, ReduceType sum);
}

View File

@ -0,0 +1,91 @@
package org.broadinstitute.sting.playground.gatk.walkers.qc;
import org.broadinstitute.sting.gatk.walkers.ReadPairWalker;
import org.broadinstitute.sting.utils.ExpandingArrayList;
import net.sf.samtools.SAMRecord;
import java.util.Collection;
import java.util.List;
/**
* Counts the number of read pairs encountered in a file sorted in
* query name order. Breaks counts down by total pairs and number
* of paired reads.
*
* @author mhanna
* @version 0.1
*/
public class CountPairsWalker extends ReadPairWalker<Integer,Long> {
/**
* How many reads are the first in a pair, based on flag 0x0040 from the SAM spec.
*/
private long firstOfPair = 0;
/**
* How many reads are the second in a pair, based on flag 0x0080 from the SAM spec.
*/
private long secondOfPair = 0;
/**
* A breakdown of the total number of reads seen with exactly the same read name.
*/
private List<Long> pairCountsByType = new ExpandingArrayList<Long>();
/**
* Maps a read pair to a given reduce of type MapType. Semantics determined by subclasser.
* @param reads Collection of reads having the same name.
* @return Semantics defined by implementer.
*/
@Override
public Integer map(Collection<SAMRecord> reads) {
if(pairCountsByType.get(reads.size()) != null)
pairCountsByType.set(reads.size(),pairCountsByType.get(reads.size())+1);
else
pairCountsByType.set(reads.size(),1L);
for(SAMRecord read: reads) {
if(read.getFirstOfPairFlag()) firstOfPair++;
if(read.getSecondOfPairFlag()) secondOfPair++;
}
return 1;
}
/**
* No pairs at the beginning of a traversal.
* @return 0 always.
*/
@Override
public Long reduceInit() {
return 0L;
}
/**
* Combine number of pairs seen in this iteration (always 1) with total number of pairs
* seen in previous iterations.
* @param value Pairs in this iteration (1), from the map function.
* @param sum Count of all pairs in prior iterations.
* @return All pairs encountered in previous iterations + all pairs encountered in this iteration (sum + 1).
*/
@Override
public Long reduce(Integer value, Long sum) {
return value + sum;
}
/**
* Print summary statistics over the entire traversal.
* @param sum A count of all read pairs viewed.
*/
@Override
public void onTraversalDone(Long sum) {
out.printf("Total number of pairs : %d%n",sum);
out.printf("Total number of first reads in pair : %d%n",firstOfPair);
out.printf("Total number of second reads in pair: %d%n",secondOfPair);
for(int i = 1; i < pairCountsByType.size(); i++) {
if(pairCountsByType.get(i) == null)
continue;
out.printf("Pairs of size %d: %d%n",i,pairCountsByType.get(i));
}
}
}