package org.broadinstitute.sting.gatk.traversals; import org.broadinstitute.sting.gatk.walkers.Requires; import org.broadinstitute.sting.gatk.walkers.DataSource; import org.broadinstitute.sting.gatk.walkers.ReadPairWalker; import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; import org.broadinstitute.sting.gatk.datasources.providers.ReadView; import org.broadinstitute.sting.gatk.datasources.shards.BAMFormatAwareShard; import org.apache.log4j.Logger; import net.sf.samtools.SAMRecord; import net.sf.samtools.SAMRecordCoordinateComparator; import java.util.*; /** * Traverse over a collection of read pairs, assuming that a given shard will contain all pairs. * * @author mhanna * @version 0.1 */ @Requires({DataSource.REFERENCE}) public class TraverseReadPairs extends TraversalEngine,ReadShardDataProvider> { /** our log, which we want to capture anything from this class */ protected static Logger logger = Logger.getLogger(TraverseReadPairs.class); /** descriptor of the type */ private static final String PAIRS_STRING = "read pairs"; /** * Traverse by reads, given the data and the walker * * @param walker the walker to execute over * @param sum of type T, the return from the walker * * @return the result type T, the product of all the reduce calls */ public T traverse(ReadPairWalker walker, ReadShardDataProvider dataProvider, T sum) { logger.debug(String.format("TraverseReads.traverse Covered dataset is %s", dataProvider)); if( !dataProvider.hasReads() ) throw new IllegalArgumentException("Unable to traverse reads; no read data is available."); ReadView reads = new ReadView(dataProvider); List pairs = new ArrayList(); for(SAMRecord read: reads) { TraversalStatistics.nReads++; if(pairs.size() == 0 || pairs.get(0).getReadName().equals(read.getReadName())) { // If this read name is the same as the last, accumulate it. pairs.add(read); } else { // Otherwise, walk over the accumulated list, then start fresh with the new read. sum = walkOverPairs(walker,pairs,sum); pairs.clear(); pairs.add(read); printProgress(PAIRS_STRING, null); } } // If any data was left in the queue, process it. if(pairs.size() > 0) sum = walkOverPairs(walker,pairs,sum); return sum; } /** * Filter / map / reduce over a single pair. * @param walker The walker. * @param reads The reads in the pair. * @param sum The accumulator. * @return The accumulator after application of the given read pairing. */ private T walkOverPairs(ReadPairWalker walker, List reads, T sum) { // update the number of reads we've seen TraversalStatistics.nRecords++; // Sort the reads present in coordinate order. Collections.sort(reads,new SAMRecordCoordinateComparator()); final boolean keepMeP = walker.filter(reads); if (keepMeP) { M x = walker.map(reads); sum = walker.reduce(x, sum); } return sum; } /** * Temporary override of printOnTraversalDone. * * @param sum Result of the computation. */ public void printOnTraversalDone(T sum) { printOnTraversalDone(PAIRS_STRING, sum); } }