2009-06-23 05:11:18 +08:00
/ *
* Copyright ( c ) 2009 The Broad Institute
*
* Permission is hereby granted , free of charge , to any person
* obtaining a copy of this software and associated documentation
* files ( the "Software" ) , to deal in the Software without
* restriction , including without limitation the rights to use ,
* copy , modify , merge , publish , distribute , sublicense , and / or sell
* copies of the Software , and to permit persons to whom the
* Software is furnished to do so , subject to the following
* conditions :
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software .
*
* THE SOFTWARE IS PROVIDED "AS IS" , WITHOUT WARRANTY OF ANY KIND ,
* EXPRESS OR IMPLIED , INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY , FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT . IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM , DAMAGES OR OTHER LIABILITY ,
* WHETHER IN AN ACTION OF CONTRACT , TORT OR OTHERWISE , ARISING
* FROM , OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE .
* /
2009-05-01 06:16:21 +08:00
package org.broadinstitute.sting.gatk.traversals ;
2009-06-22 22:39:41 +08:00
import net.sf.picard.filter.FilteringIterator ;
import net.sf.picard.filter.SamRecordFilter ;
2009-05-01 06:16:21 +08:00
import net.sf.samtools.SAMRecord ;
import org.apache.log4j.Logger ;
import org.broadinstitute.sting.gatk.LocusContext ;
2009-06-12 02:13:22 +08:00
import org.broadinstitute.sting.gatk.datasources.providers.ReadView ;
2009-06-22 22:39:41 +08:00
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider ;
2009-06-12 02:13:22 +08:00
import org.broadinstitute.sting.gatk.datasources.shards.ReadShard ;
import org.broadinstitute.sting.gatk.datasources.shards.Shard ;
2009-05-01 06:16:21 +08:00
import org.broadinstitute.sting.gatk.iterators.PushbackIterator ;
import org.broadinstitute.sting.gatk.walkers.DuplicateWalker ;
2009-06-22 22:39:41 +08:00
import org.broadinstitute.sting.gatk.walkers.Walker ;
2009-05-01 06:16:21 +08:00
import org.broadinstitute.sting.utils.GenomeLoc ;
2009-06-22 22:39:41 +08:00
import org.broadinstitute.sting.utils.GenomeLocParser ;
2009-05-08 02:06:02 +08:00
import org.broadinstitute.sting.utils.Pair ;
2009-05-01 06:16:21 +08:00
2009-06-22 22:39:41 +08:00
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Iterator ;
import java.util.List ;
2009-05-01 06:16:21 +08:00
/ * *
*
* User : aaron
* Date : Apr 24 , 2009
* Time : 10 : 35 : 22 AM
* /
/ * *
* @author Mark DePristo
* @version 0.1
* < p / >
* Class TraverseDuplicates
* < p / >
* This class handles traversing lists of duplicate reads in the new shardable style
* /
public class TraverseDuplicates extends TraversalEngine {
/** our log, which we want to capture anything from this class */
protected static Logger logger = Logger . getLogger ( TraverseDuplicates . class ) ;
private final boolean DEBUG = false ;
2009-06-23 05:11:18 +08:00
private List < SAMRecord > readsAtLoc ( final SAMRecord read , PushbackIterator < SAMRecord > iter ) {
2009-06-22 22:39:41 +08:00
GenomeLoc site = GenomeLocParser . createGenomeLoc ( read ) ;
2009-05-01 06:16:21 +08:00
ArrayList < SAMRecord > l = new ArrayList < SAMRecord > ( ) ;
l . add ( read ) ;
2009-06-23 05:11:18 +08:00
for ( SAMRecord read2 : iter ) {
2009-06-22 22:39:41 +08:00
GenomeLoc site2 = GenomeLocParser . createGenomeLoc ( read2 ) ;
2009-05-01 06:16:21 +08:00
// the next read starts too late
2009-06-23 05:11:18 +08:00
if ( site2 . getStart ( ) ! = site . getStart ( ) ) {
2009-05-01 06:16:21 +08:00
//System.out.printf("site = %s, site2 = %s%n", site, site2);
iter . pushback ( read2 ) ;
break ;
} else {
//System.out.printf("Read is a duplicate: %s%n", read.format());
l . add ( read2 ) ;
}
}
return l ;
}
2009-05-08 02:06:02 +08:00
private Pair < List < SAMRecord > , List < SAMRecord > > splitDuplicates ( List < SAMRecord > reads ) {
List < SAMRecord > uniques = new ArrayList < SAMRecord > ( ) ;
List < SAMRecord > dups = new ArrayList < SAMRecord > ( ) ;
2009-05-01 06:16:21 +08:00
// find the first duplicate
SAMRecord key = null ;
2009-06-23 05:11:18 +08:00
for ( SAMRecord read : reads ) {
if ( read . getDuplicateReadFlag ( ) ) {
2009-05-01 06:16:21 +08:00
// this is our key
key = read ;
if ( DEBUG ) logger . debug ( String . format ( "Key %s is a duplicate" , read . getReadName ( ) ) ) ;
break ;
}
}
// At this point, there are two possibilities, we have found at least one dup or not
2009-05-08 02:06:02 +08:00
// if it's a dup, add it to the dups list, otherwise add it to the uniques list
2009-06-23 05:11:18 +08:00
if ( key ! = null ) {
2009-06-22 22:39:41 +08:00
final GenomeLoc keyLoc = GenomeLocParser . createGenomeLoc ( key ) ;
final GenomeLoc keyMateLoc = GenomeLocParser . createGenomeLoc ( key . getMateReferenceIndex ( ) , key . getMateAlignmentStart ( ) , key . getMateAlignmentStart ( ) ) ;
2009-05-01 06:16:21 +08:00
2009-06-23 05:11:18 +08:00
for ( SAMRecord read : reads ) {
2009-06-22 22:39:41 +08:00
final GenomeLoc readLoc = GenomeLocParser . createGenomeLoc ( read ) ;
final GenomeLoc readMateLoc = GenomeLocParser . createGenomeLoc ( read . getMateReferenceIndex ( ) , read . getMateAlignmentStart ( ) , read . getMateAlignmentStart ( ) ) ;
2009-06-23 05:11:18 +08:00
if ( DEBUG )
logger . debug ( String . format ( "Examining reads at %s vs. %s at %s / %s vs. %s / %s%n" , key . getReadName ( ) , read . getReadName ( ) , keyLoc , keyMateLoc , readLoc , readMateLoc ) ) ;
2009-05-08 02:55:45 +08:00
// read and key start at the same place, and either the this read and the key
// share a mate location or the read is flagged as a duplicate
2009-06-23 05:11:18 +08:00
if ( readLoc . compareTo ( keyLoc ) = = 0 & &
( readMateLoc . compareTo ( keyMateLoc ) = = 0 ) | |
read . getDuplicateReadFlag ( ) ) {
2009-05-01 06:16:21 +08:00
// we are at the same position as the dup and have the same mat pos, it's a dup
if ( DEBUG ) logger . debug ( String . format ( " => Adding read to dups list: %s%n" , read ) ) ;
dups . add ( read ) ;
2009-05-08 02:06:02 +08:00
} else {
uniques . add ( read ) ;
2009-05-01 06:16:21 +08:00
}
}
2009-05-08 02:06:02 +08:00
} else {
uniques = reads ;
2009-05-01 06:16:21 +08:00
}
2009-05-08 02:06:02 +08:00
return new Pair < List < SAMRecord > , List < SAMRecord > > ( uniques , dups ) ;
2009-05-01 06:16:21 +08:00
}
/ * *
* Traverse by reads , given the data and the walker
2009-06-23 05:11:18 +08:00
*
2009-05-01 06:16:21 +08:00
* @param sum of type T , the return from the walker
* @param < M > the generic type
* @param < T > the return type of the reduce function
2009-06-23 05:11:18 +08:00
* @param dupWalker our duplicates walker
* @param readIter our iterator
*
* @return the reduce type , T , the final product of all the reduce calls
2009-05-01 06:16:21 +08:00
* /
2009-06-23 05:11:18 +08:00
private < M , T > T actuallyTraverse ( DuplicateWalker < M , T > dupWalker ,
Iterator < SAMRecord > readIter ,
T sum ) {
/ * *
* while we still have more reads :
* ok , here ' s the idea . We get all the reads that start at the same position in the genome
* We then split the list of reads into sublists of reads :
* - > those with the same mate pair position , for paired reads
* - > those flagged as unpaired and duplicated but having the same start and end and
* /
2009-05-01 06:16:21 +08:00
PushbackIterator < SAMRecord > iter = new PushbackIterator < SAMRecord > ( readIter ) ;
2009-06-23 05:11:18 +08:00
for ( SAMRecord read : iter ) {
2009-05-01 06:16:21 +08:00
// get the genome loc from the read
2009-06-22 22:39:41 +08:00
GenomeLoc site = GenomeLocParser . createGenomeLoc ( read ) ;
2009-06-23 05:11:18 +08:00
2009-05-01 06:16:21 +08:00
List < SAMRecord > reads = readsAtLoc ( read , iter ) ;
2009-05-08 02:06:02 +08:00
Pair < List < SAMRecord > , List < SAMRecord > > split = splitDuplicates ( reads ) ;
2009-06-23 05:11:18 +08:00
List < SAMRecord > uniqueReads = split . getFirst ( ) ;
List < SAMRecord > duplicateReads = split . getSecond ( ) ;
2009-05-08 02:06:02 +08:00
2009-05-08 02:55:45 +08:00
logger . debug ( String . format ( "*** TraverseDuplicates.traverse at %s with %d reads has %d unique and %d duplicate reads" ,
site , reads . size ( ) , uniqueReads . size ( ) , duplicateReads . size ( ) ) ) ;
2009-06-23 05:11:18 +08:00
if ( reads . size ( ) ! = uniqueReads . size ( ) + duplicateReads . size ( ) )
throw new RuntimeException ( String . format ( "Bug occurred spliting reads [N=%d] at loc %s into unique [N=%d] and duplicates [N=%d], sizes don't match" ,
reads . size ( ) , site . toString ( ) , uniqueReads . size ( ) , duplicateReads . size ( ) ) ) ;
2009-05-01 06:16:21 +08:00
// Jump forward in the reference to this locus location
LocusContext locus = new LocusContext ( site , duplicateReads , Arrays . asList ( 0 ) ) ;
// update the number of duplicate sets we've seen
TraversalStatistics . nRecords + + ;
// we still have to fix the locus context provider to take care of this problem with > 1 length contexts
// LocusContext locus = locusProvider.getLocusContext(site);
byte [ ] refBases = new byte [ 0 ] ;
2009-06-23 05:11:18 +08:00
if ( dupWalker . mapUniqueReadsTooP ( ) ) {
2009-05-08 02:06:02 +08:00
// Send each unique read to the map function
2009-06-23 05:11:18 +08:00
for ( SAMRecord unique : uniqueReads ) {
2009-05-08 02:06:02 +08:00
List < SAMRecord > l = Arrays . asList ( unique ) ;
2009-05-22 06:26:57 +08:00
sum = mapOne ( dupWalker , uniqueReads , l , site , refBases , locus , sum ) ;
2009-05-08 02:06:02 +08:00
}
2009-05-01 06:16:21 +08:00
}
2009-06-23 05:11:18 +08:00
if ( duplicateReads . size ( ) > 0 )
2009-05-22 06:26:57 +08:00
sum = mapOne ( dupWalker , uniqueReads , duplicateReads , site , refBases , locus , sum ) ;
2009-05-08 02:06:02 +08:00
2009-05-01 06:16:21 +08:00
printProgress ( "dups" , site ) ;
2009-07-10 06:03:45 +08:00
if ( this . maximumIterations > 0 & & TraversalStatistics . nRecords > this . maximumIterations ) {
2009-05-01 06:16:21 +08:00
logger . warn ( String . format ( ( "Maximum number of duplicate sets encountered, terminating traversal " + TraversalStatistics . nRecords ) ) ) ;
break ;
}
}
return sum ;
}
/ * *
* Class to filter out un - handle - able reads from the stream . We currently are skipping
* unmapped reads , non - primary reads , unaligned reads , and duplicate reads .
* /
public static class duplicateStreamFilterFunc implements SamRecordFilter {
SAMRecord lastRead = null ;
public boolean filterOut ( SAMRecord rec ) {
boolean result = false ;
if ( rec . getReadUnmappedFlag ( ) ) {
TraversalStatistics . nUnmappedReads + + ;
result = true ;
} else if ( rec . getNotPrimaryAlignmentFlag ( ) ) {
TraversalStatistics . nNotPrimary + + ;
result = true ;
} else if ( rec . getAlignmentStart ( ) = = SAMRecord . NO_ALIGNMENT_START ) {
TraversalStatistics . nBadAlignments + + ;
result = true ;
2009-06-23 05:11:18 +08:00
} else {
2009-05-01 06:16:21 +08:00
result = false ;
}
if ( result ) {
TraversalStatistics . nSkippedReads + + ;
//System.out.printf(" [filter] %s => %b %s", rec.getReadName(), result, why);
} else {
TraversalStatistics . nReads + + ;
}
return result ;
}
}
2009-05-08 02:06:02 +08:00
public < M , T > T mapOne ( DuplicateWalker < M , T > dupWalker ,
2009-05-22 06:26:57 +08:00
List < SAMRecord > uniqueReads ,
List < SAMRecord > duplicateReads ,
2009-05-08 02:06:02 +08:00
GenomeLoc site ,
byte [ ] refBases ,
LocusContext locus ,
T sum ) {
2009-05-22 06:26:57 +08:00
final boolean keepMeP = dupWalker . filter ( site , refBases , locus , uniqueReads , duplicateReads ) ;
2009-05-08 02:06:02 +08:00
if ( keepMeP ) {
2009-05-22 06:26:57 +08:00
M x = dupWalker . map ( site , refBases , locus , uniqueReads , duplicateReads ) ;
2009-05-08 02:06:02 +08:00
sum = dupWalker . reduce ( x , sum ) ;
}
return sum ;
}
2009-05-01 06:16:21 +08:00
// --------------------------------------------------------------------------------------------------------------
//
// new style interface to the system
//
// --------------------------------------------------------------------------------------------------------------
/ * *
* Traverse by reads , given the data and the walker
2009-06-23 05:11:18 +08:00
*
2009-05-01 06:16:21 +08:00
* @param walker the walker to execute over
2009-06-23 05:11:18 +08:00
* @param shard the shard of data to feed the walker
* @param sum of type T , the return from the walker
* @param < M > the generic type
* @param < T > the return type of the reduce function
*
* @return the result type T , the product of all the reduce calls
2009-05-01 06:16:21 +08:00
* /
public < M , T > T traverse ( Walker < M , T > walker ,
Shard shard ,
2009-05-14 02:51:16 +08:00
ShardDataProvider dataProvider ,
2009-05-01 06:16:21 +08:00
T sum ) {
2009-06-23 05:11:18 +08:00
logger . debug ( String . format ( "TraverseDuplicates.traverse Genomic interval is %s" , ( ( ReadShard ) shard ) . getSize ( ) ) ) ;
2009-05-01 06:16:21 +08:00
if ( ! ( walker instanceof DuplicateWalker ) )
throw new IllegalArgumentException ( "Walker isn't a duplicate walker!" ) ;
DuplicateWalker < M , T > dupWalker = ( DuplicateWalker < M , T > ) walker ;
// while we still have more reads
// ok, here's the idea. We get all the reads that start at the same position in the genome
// We then split the list of reads into sublists of reads:
// -> those with the same mate pair position, for paired reads
// -> those flagged as unpaired and duplicated but having the same start and end and
2009-05-23 03:12:00 +08:00
FilteringIterator filterIter = new FilteringIterator ( new ReadView ( dataProvider ) . iterator ( ) , new duplicateStreamFilterFunc ( ) ) ;
2009-05-01 06:16:21 +08:00
PushbackIterator < SAMRecord > iter = new PushbackIterator < SAMRecord > ( filterIter ) ;
2009-05-14 02:51:16 +08:00
return actuallyTraverse ( dupWalker , iter , sum ) ;
2009-05-01 06:16:21 +08:00
}
2009-06-23 05:11:18 +08:00
/ * *
* Temporary override of printOnTraversalDone .
* TODO : Add some sort of TE . getName ( ) function once all TraversalEngines are ported .
*
* @param sum Result of the computation .
* @param < T > Type of the result .
* /
public < T > void printOnTraversalDone ( T sum ) {
printOnTraversalDone ( "reads" , sum ) ;
}
2009-05-01 06:16:21 +08:00
}