2009-03-16 06:21:48 +08:00
package org.broadinstitute.sting.gatk ;
2009-02-27 05:50:29 +08:00
import edu.mit.broad.picard.filter.FilteringIterator ;
2009-03-24 04:27:21 +08:00
import edu.mit.broad.picard.filter.SamRecordFilter ;
2009-03-13 22:50:45 +08:00
import edu.mit.broad.picard.reference.ReferenceSequence ;
2009-03-24 04:27:21 +08:00
import net.sf.functionalj.Function1 ;
import net.sf.functionalj.FunctionN ;
import net.sf.functionalj.Functions ;
import net.sf.functionalj.reflect.JdkStdReflect ;
import net.sf.functionalj.reflect.StdReflect ;
import net.sf.functionalj.util.Operators ;
import net.sf.samtools.SAMFileHeader ;
import net.sf.samtools.SAMFileReader ;
import net.sf.samtools.SAMFileReader.ValidationStringency ;
import net.sf.samtools.SAMRecord ;
import net.sf.samtools.util.RuntimeIOException ;
2009-03-25 04:55:34 +08:00
import net.sf.samtools.util.CloseableIterator ;
2009-03-24 04:27:21 +08:00
import org.apache.log4j.Logger ;
2009-03-16 06:21:48 +08:00
import org.broadinstitute.sting.gatk.iterators.* ;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData ;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum ;
2009-03-24 04:27:21 +08:00
import org.broadinstitute.sting.gatk.walkers.LocusWalker ;
import org.broadinstitute.sting.gatk.walkers.ReadWalker ;
import org.broadinstitute.sting.utils.FastaSequenceFile2 ;
import org.broadinstitute.sting.utils.FileProgressTracker ;
import org.broadinstitute.sting.utils.GenomeLoc ;
import org.broadinstitute.sting.utils.Utils ;
2009-02-27 05:50:29 +08:00
import java.io.* ;
2009-03-03 02:18:48 +08:00
import java.util.* ;
2009-02-27 05:50:29 +08:00
public class TraversalEngine {
2009-03-16 06:21:48 +08:00
// list of reference ordered data objects
private List < ReferenceOrderedData > rods = null ;
// Iterator over rods
List < ReferenceOrderedData . RODIterator > rodIters ;
2009-03-01 04:47:48 +08:00
2009-03-12 05:43:31 +08:00
//private String regionStr = null; // String dec
//private String traversalType = null; // String describing this traversal type
// How strict should we be with SAM/BAM parsing?
2009-02-27 05:50:29 +08:00
private ValidationStringency strictness = ValidationStringency . STRICT ;
2009-03-12 05:43:31 +08:00
// Time in milliseconds since we initialized this engine
2009-02-27 05:50:29 +08:00
private long startTime = - 1 ;
2009-03-12 05:43:31 +08:00
private long lastProgressPrintTime = - 1 ; // When was the last time we printed our progress?
// How long can we go without printing some progress info?
private long MAX_PROGRESS_PRINT_TIME = 10 * 1000 ; // 10 seconds in millisecs
// Maximum number of reads to process before finishing
2009-02-27 05:50:29 +08:00
private long maxReads = - 1 ;
2009-03-12 05:43:31 +08:00
// Name of the reads file, in BAM/SAM format
private File readsFile = null ; // the name of the reads file
2009-03-17 07:22:04 +08:00
private SAMFileReader samReader = null ;
2009-03-12 05:43:31 +08:00
// iterator over the sam records in the readsFile
private Iterator < SAMRecord > samReadIter = null ;
2009-03-17 07:22:04 +08:00
// The verifying iterator, it does checking
VerifyingSamIterator verifyingSamReadIter = null ;
2009-03-12 05:43:31 +08:00
// The reference data -- filename, refSeqFile, and iterator
private File refFileName = null ; // the name of the reference file
2009-03-23 03:56:54 +08:00
//private ReferenceSequenceFile refFile = null;
private FastaSequenceFile2 refFile = null ; // todo: merge FastaSequenceFile2 into picard!
2009-03-24 04:27:21 +08:00
private ReferenceIterator refIter = null ;
2009-02-27 05:50:29 +08:00
2009-03-12 05:43:31 +08:00
// Number of records (loci, reads) we've processed
private long nRecords = 0 ;
// How many reads have we processed, along with those skipped for various reasons
2009-02-27 05:50:29 +08:00
private int nReads = 0 ;
private int nSkippedReads = 0 ;
private int nUnmappedReads = 0 ;
private int nNotPrimary = 0 ;
private int nBadAlignments = 0 ;
private int nSkippedIndels = 0 ;
2009-03-12 05:43:31 +08:00
// Progress tracker for the sam file
2009-03-03 05:49:08 +08:00
private FileProgressTracker samReadingTracker = null ;
2009-02-27 05:50:29 +08:00
public boolean DEBUGGING = false ;
2009-03-17 07:22:04 +08:00
public boolean beSafeP = true ;
2009-03-18 04:29:09 +08:00
public boolean SORT_ON_FLY = false ;
2009-03-25 04:55:34 +08:00
public boolean FILTER_UNSORTED_READS = false ;
2009-03-18 04:29:09 +08:00
public int MAX_ON_FLY_SORTS = 100000 ;
2009-03-12 05:43:31 +08:00
public long N_RECORDS_TO_PRINT = 100000 ;
2009-03-25 06:32:45 +08:00
public boolean THREADED_IO = false ;
2009-03-13 22:50:45 +08:00
public int THREADED_IO_BUFFER_SIZE = 10000 ;
2009-02-27 05:50:29 +08:00
2009-03-24 04:27:21 +08:00
/ * *
* our log , which we want to capture anything from this class
* /
2009-03-24 04:51:01 +08:00
private static Logger logger = Logger . getLogger ( TraversalEngine . class ) ;
2009-03-24 04:27:21 +08:00
2009-03-12 05:43:31 +08:00
// Locations we are going to process during the traversal
2009-03-03 02:18:48 +08:00
private GenomeLoc [ ] locs = null ;
2009-02-27 05:50:29 +08:00
// --------------------------------------------------------------------------------------------------------------
//
// Setting up the engine
//
// --------------------------------------------------------------------------------------------------------------
2009-03-12 05:43:31 +08:00
/ * *
* Creates a new , uninitialized TraversalEngine
*
* @param reads SAM / BAM file of reads
2009-03-24 04:27:21 +08:00
* @param ref Reference file in FASTA format , assumes a . dict file is also available
* @param rods Array of reference ordered data sets
2009-03-12 05:43:31 +08:00
* /
2009-03-24 11:58:03 +08:00
public TraversalEngine ( File reads , File ref , List < ReferenceOrderedData > rods ) {
2009-02-27 05:50:29 +08:00
readsFile = reads ;
refFileName = ref ;
2009-03-24 11:58:03 +08:00
this . rods = rods ;
2009-02-27 05:50:29 +08:00
}
2009-03-03 05:49:08 +08:00
2009-03-12 05:43:31 +08:00
// --------------------------------------------------------------------------------------------------------------
//
// Manipulating the underlying engine parameters
//
// --------------------------------------------------------------------------------------------------------------
//public void setRegion(final String reg) { regionStr = regionStr; }
//public void setTraversalType(final String type) { traversalType = type; }
2009-03-24 04:27:21 +08:00
public void setStrictness ( final ValidationStringency s ) {
strictness = s ;
}
public void setMaxReads ( final int maxReads ) {
this . maxReads = maxReads ;
}
2009-03-25 06:32:45 +08:00
public void setThreadedIO ( final boolean threadedIO ) {
this . THREADED_IO = threadedIO ;
}
2009-03-24 04:27:21 +08:00
public void setDebugging ( final boolean d ) {
DEBUGGING = d ;
}
public void setSafetyChecking ( final boolean beSafeP ) {
if ( ! beSafeP )
2009-03-24 04:46:55 +08:00
logger . warn ( "*** Turning off safety checking, I hope you know what you are doing. Errors will result in debugging assert failures and other inscrutable messages..." ) ;
2009-03-17 07:22:04 +08:00
this . beSafeP = beSafeP ;
}
2009-03-24 04:27:21 +08:00
2009-03-25 04:55:34 +08:00
public void setFilterUnsortedReads ( final boolean filterUnsorted ) {
if ( ! filterUnsorted )
logger . warn ( "*** Turning on filtering of out of order reads, I *really* hope you know what you are doing, as you are removing data..." ) ;
this . FILTER_UNSORTED_READS = filterUnsorted ;
}
2009-03-24 04:27:21 +08:00
public void setSortOnFly ( final boolean SORT_ON_FLY ) {
if ( SORT_ON_FLY )
2009-03-24 04:46:55 +08:00
logger . info ( "Sorting read file on the fly: max reads allowed is " + MAX_ON_FLY_SORTS ) ;
2009-03-18 04:29:09 +08:00
this . SORT_ON_FLY = SORT_ON_FLY ;
}
2009-02-27 05:50:29 +08:00
2009-03-03 02:18:48 +08:00
// --------------------------------------------------------------------------------------------------------------
//
// functions for dealing locations (areas of the genome we're traversing over)
//
// --------------------------------------------------------------------------------------------------------------
2009-03-12 05:43:31 +08:00
/ * *
* Parses the location string locStr and sets the traversal engine to only process
* regions specified by the location string . The string is of the form :
2009-03-24 04:27:21 +08:00
* Of the form : loc1 ; loc2 ; . . .
* Where each locN can be :
* <EFBFBD> chr2 <EFBFBD> , <EFBFBD> chr2 : 1000000 <EFBFBD> or <EFBFBD> chr2 : 1 , 000 , 000 - 2 , 000 , 000 <EFBFBD>
2009-03-12 05:43:31 +08:00
*
* @param locStr
* /
2009-03-24 04:27:21 +08:00
public void setLocation ( final String locStr ) {
2009-03-03 02:18:48 +08:00
this . locs = parseGenomeLocs ( locStr ) ;
}
2009-03-22 00:07:32 +08:00
/ * *
* Read a file of genome locations to process .
* regions specified by the location string . The string is of the form :
2009-03-24 04:27:21 +08:00
* Of the form : loc1 ; loc2 ; . . .
* Where each locN can be :
* <EFBFBD> chr2 <EFBFBD> , <EFBFBD> chr2 : 1000000 <EFBFBD> or <EFBFBD> chr2 : 1 , 000 , 000 - 2 , 000 , 000 <EFBFBD>
2009-03-22 00:07:32 +08:00
*
* @param file_name
* /
2009-03-24 04:27:21 +08:00
public void setLocationFromFile ( final String file_name ) {
2009-03-25 10:17:48 +08:00
StringBuilder locStr = new StringBuilder ( ) ;
2009-03-22 00:07:32 +08:00
2009-03-24 04:27:21 +08:00
Scanner scanner = null ;
try {
2009-03-22 00:07:32 +08:00
scanner = new Scanner ( new File ( file_name ) ) ;
2009-03-24 04:27:21 +08:00
while ( scanner . hasNextLine ( ) ) {
String line = scanner . nextLine ( ) ;
2009-03-22 00:07:32 +08:00
line . replaceAll ( "\n" , "" ) ;
2009-03-25 10:17:48 +08:00
locStr . append ( line ) ;
2009-03-24 04:27:21 +08:00
if ( scanner . hasNextLine ( ) ) {
2009-03-25 10:17:48 +08:00
locStr . append ( ";" ) ;
2009-03-24 04:27:21 +08:00
}
}
2009-03-22 00:07:32 +08:00
}
2009-03-24 04:27:21 +08:00
catch ( Exception e ) {
2009-03-22 00:07:32 +08:00
e . printStackTrace ( ) ;
2009-03-24 04:27:21 +08:00
System . exit ( - 1 ) ;
2009-03-22 00:07:32 +08:00
}
2009-03-24 04:27:21 +08:00
finally {
//ensure the underlying stream is always closed
scanner . close ( ) ;
2009-03-22 00:07:32 +08:00
}
2009-03-25 10:17:48 +08:00
logger . debug ( "DEBUG: locStr: " + locStr . toString ( ) ) ;
2009-03-22 00:07:32 +08:00
2009-03-25 10:17:48 +08:00
this . locs = parseGenomeLocs ( locStr . toString ( ) ) ;
2009-03-22 00:07:32 +08:00
}
2009-03-12 05:43:31 +08:00
/ * *
* Useful utility function that parses a location string into a coordinate - order sorted
* array of GenomeLoc objects
*
* @param str
* @return Array of GenomeLoc objects corresponding to the locations in the string , sorted by coordinate order
* /
2009-03-24 04:27:21 +08:00
public static GenomeLoc [ ] parseGenomeLocs ( final String str ) {
2009-03-03 02:18:48 +08:00
// Of the form: loc1;loc2;...
// Where each locN can be:
// <20> chr2<72> , <20> chr2:1000000<30> or <20> chr2:1,000,000-2,000,000<30>
StdReflect reflect = new JdkStdReflect ( ) ;
FunctionN < GenomeLoc > parseOne = reflect . staticFunction ( GenomeLoc . class , "parseGenomeLoc" , String . class ) ;
Function1 < GenomeLoc , String > f1 = parseOne . f1 ( ) ;
2009-03-23 03:56:54 +08:00
try {
Collection < GenomeLoc > result = Functions . map ( f1 , Arrays . asList ( str . split ( ";" ) ) ) ;
2009-03-24 04:27:21 +08:00
GenomeLoc [ ] locs = ( GenomeLoc [ ] ) result . toArray ( new GenomeLoc [ 0 ] ) ;
2009-03-23 03:56:54 +08:00
Arrays . sort ( locs ) ;
2009-03-25 04:55:34 +08:00
System . out . println ( " Locations are: " + Utils . join ( "\n" , Functions . map ( Operators . toString , Arrays . asList ( locs ) ) ) ) ;
2009-03-23 03:56:54 +08:00
return locs ;
2009-03-24 04:27:21 +08:00
} catch ( Exception e ) {
logger . fatal ( String . format ( "Invalid locations string: %s, format is loc1;loc2; where each locN can be 'chr2', 'chr2:1000000' or 'chr2:1,000,000-2,000,000'" , str ) ) ;
throw new IllegalArgumentException ( "Invalid locations string: " + str + ", format is loc1;loc2; where each locN can be 'chr2', 'chr2:1000000' or 'chr2:1,000,000-2,000,000'" ) ;
2009-03-23 03:56:54 +08:00
}
2009-03-24 04:27:21 +08:00
}
2009-03-03 02:18:48 +08:00
2009-03-12 05:43:31 +08:00
/ * *
* A key function that returns true if the proposed GenomeLoc curr is within the list of
* locations we are processing in this TraversalEngine
*
* @param curr
* @return true if we should process GenomeLoc curr , otherwise false
* /
2009-03-24 04:27:21 +08:00
public boolean inLocations ( GenomeLoc curr ) {
if ( this . locs = = null ) {
2009-03-03 02:18:48 +08:00
return true ;
2009-03-24 04:27:21 +08:00
} else {
2009-03-25 04:55:34 +08:00
for ( GenomeLoc loc : this . locs ) {
2009-03-03 02:18:48 +08:00
//System.out.printf(" Overlap %s vs. %s => %b%n", loc, curr, loc.overlapsP(curr));
2009-03-24 04:27:21 +08:00
if ( loc . overlapsP ( curr ) )
2009-03-03 02:18:48 +08:00
return true ;
}
return false ;
}
}
2009-03-12 05:43:31 +08:00
/ * *
* Returns true iff we have a specified series of locations to process AND we are past the last
* location in the list . It means that , in a serial processing of the genome , that we are done .
*
* @param curr Current genome Location
* @return true if we are past the last location to process
* /
2009-03-24 04:27:21 +08:00
private boolean pastFinalLocation ( GenomeLoc curr ) {
boolean r = locs ! = null & & locs [ locs . length - 1 ] . compareTo ( curr ) = = - 1 & & ! locs [ locs . length - 1 ] . overlapsP ( curr ) ;
2009-03-03 02:18:48 +08:00
//System.out.printf(" pastFinalLocation %s vs. %s => %d => %b%n", locs[locs.length-1], curr, locs[locs.length-1].compareTo( curr ), r);
return r ;
}
2009-02-27 05:50:29 +08:00
// --------------------------------------------------------------------------------------------------------------
//
2009-03-12 05:43:31 +08:00
// printing
2009-02-27 05:50:29 +08:00
//
// --------------------------------------------------------------------------------------------------------------
2009-03-24 04:27:21 +08:00
2009-03-10 22:59:42 +08:00
/ * *
* @param curTime ( current runtime , in millisecs )
* @return true if the maximum interval ( in millisecs ) has passed since the last printing
* /
private boolean maxElapsedIntervalForPrinting ( final long curTime ) {
return ( curTime - this . lastProgressPrintTime ) > MAX_PROGRESS_PRINT_TIME ;
}
2009-03-12 05:43:31 +08:00
/ * *
* Forward request to printProgress
*
* @param type
* @param loc
* /
public void printProgress ( final String type , GenomeLoc loc ) {
2009-03-24 04:27:21 +08:00
printProgress ( false , type , loc ) ;
2009-03-12 05:43:31 +08:00
}
2009-02-27 05:50:29 +08:00
2009-03-12 05:43:31 +08:00
/ * *
* Utility routine that prints out process information ( including timing ) every N records or
* every M seconds , for N and M set in global variables .
*
* @param mustPrint If true , will print out info , regardless of nRecords or time interval
2009-03-24 04:27:21 +08:00
* @param type String to print out describing our atomic traversal type ( "read" , "locus" , etc )
* @param loc Current location
2009-03-12 05:43:31 +08:00
* /
2009-03-24 04:27:21 +08:00
public void printProgress ( boolean mustPrint , final String type , GenomeLoc loc ) {
2009-02-27 05:50:29 +08:00
final long nRecords = this . nRecords ;
2009-03-10 22:59:42 +08:00
final long curTime = System . currentTimeMillis ( ) ;
final double elapsed = ( curTime - startTime ) / 1000.0 ;
//System.out.printf("Cur = %d, last print = %d%n", curTime, lastProgressPrintTime);
2009-03-24 04:27:21 +08:00
if ( mustPrint | | nRecords % N_RECORDS_TO_PRINT = = 0 | | maxElapsedIntervalForPrinting ( curTime ) ) {
2009-03-10 22:59:42 +08:00
this . lastProgressPrintTime = curTime ;
2009-02-27 05:50:29 +08:00
final double secsPer1MReads = ( elapsed * 1000000.0 ) / nRecords ;
2009-03-24 04:27:21 +08:00
if ( loc ! = null )
2009-03-24 04:46:55 +08:00
logger . info ( String . format ( "[PROGRESS] Traversed to %s, processing %,d %s in %.2f secs (%.2f secs per 1M %s)" , loc , nRecords , type , elapsed , secsPer1MReads , type ) ) ;
2009-03-03 05:51:25 +08:00
else
2009-03-24 04:46:55 +08:00
logger . info ( String . format ( "[PROGRESS] Traversed %,d %s in %.2f secs (%.2f secs per 1M %s)" , nRecords , type , elapsed , secsPer1MReads , type ) ) ;
2009-03-10 22:59:42 +08:00
2009-03-12 05:43:31 +08:00
// Currently samReadingTracker will print misleading info if we're not processing the whole file
2009-03-20 07:02:49 +08:00
// If an index is enabled, file read progress is meaningless because a linear
// traversal is not being performed. For now, don't bother printing progress.
// TODO: Create a sam indexed read tracker that tracks based on percentage through the query.
2009-03-24 04:27:21 +08:00
if ( samReadingTracker ! = null & & this . locs = = null )
2009-03-24 04:46:55 +08:00
logger . info ( String . format ( "[PROGRESS] -> %s" , samReadingTracker . progressMeter ( ) ) ) ;
2009-02-27 05:50:29 +08:00
}
}
2009-03-12 05:43:31 +08:00
/ * *
* Called after a traversal to print out information about the traversal process
*
* @param type String describing this type of traversal ( "loci" , "read" )
2009-03-24 04:27:21 +08:00
* @param sum The reduce result of the traversal
* @param < T > ReduceType of the traversal
2009-03-12 05:43:31 +08:00
* /
2009-03-24 04:27:21 +08:00
protected < T > void printOnTraversalDone ( final String type , T sum ) {
printProgress ( true , type , null ) ;
2009-03-24 04:46:55 +08:00
logger . info ( String . format ( "Traversal reduce result is " + sum ) ) ;
logger . info ( String . format ( "Traversal skipped %d reads out of %d total (%.2f%%)" , nSkippedReads , nReads , ( nSkippedReads * 100.0 ) / nReads ) ) ;
logger . info ( String . format ( " -> %d unmapped reads" , nUnmappedReads ) ) ;
logger . info ( String . format ( " -> %d non-primary reads" , nNotPrimary ) ) ;
logger . info ( String . format ( " -> %d reads with bad alignments" , nBadAlignments ) ) ;
logger . info ( String . format ( " -> %d reads with indels" , nSkippedIndels ) ) ;
2009-03-12 05:43:31 +08:00
}
2009-02-27 05:50:29 +08:00
// --------------------------------------------------------------------------------------------------------------
//
2009-03-12 05:43:31 +08:00
// Initialization
2009-02-27 05:50:29 +08:00
//
// --------------------------------------------------------------------------------------------------------------
2009-03-12 05:43:31 +08:00
/ * *
* Initialize the traversal engine . After this point traversals can be run over the data
*
* @return true on success
* /
2009-03-25 06:32:45 +08:00
public boolean initialize ( ) {
2009-03-12 05:43:31 +08:00
lastProgressPrintTime = startTime = System . currentTimeMillis ( ) ;
2009-03-16 06:21:48 +08:00
initializeReference ( ) ;
// Initial the reference ordered data iterators
initializeRODs ( ) ;
return true ;
}
2009-03-25 06:32:45 +08:00
private Iterator < SAMRecord > initializeReads ( ) {
samReader = initializeSAMFile ( readsFile ) ;
return WrapReadsIterator ( getReadsIterator ( samReader ) , true ) ;
}
2009-03-18 00:39:03 +08:00
2009-03-25 06:32:45 +08:00
private Iterator < SAMRecord > getReadsIterator ( final SAMFileReader samReader ) {
// If the file has an index, querying functions are available. Use them if possible...
if ( samReader . hasIndex ( ) ) {
return new SamQueryIterator ( samReader , locs ) ;
} else {
return samReader . iterator ( ) ;
2009-03-18 00:39:03 +08:00
}
2009-03-25 06:32:45 +08:00
}
private Iterator < SAMRecord > WrapReadsIterator ( final Iterator < SAMRecord > rawIterator , final boolean enableVerification ) {
Iterator < SAMRecord > wrappedIterator = rawIterator ;
2009-03-18 00:39:03 +08:00
2009-03-24 04:27:21 +08:00
if ( SORT_ON_FLY )
2009-03-25 06:32:45 +08:00
wrappedIterator = new SortSamIterator ( wrappedIterator , MAX_ON_FLY_SORTS ) ;
if ( beSafeP & & enableVerification )
wrappedIterator = new VerifyingSamIterator ( wrappedIterator ) ;
2009-03-24 04:27:21 +08:00
if ( THREADED_IO ) {
2009-03-24 04:46:55 +08:00
logger . info ( String . format ( "Enabling threaded I/O with buffer of %d reads" , THREADED_IO_BUFFER_SIZE ) ) ;
2009-03-25 06:32:45 +08:00
wrappedIterator = new ThreadedIterator < SAMRecord > ( wrappedIterator , THREADED_IO_BUFFER_SIZE ) ;
2009-03-18 00:39:03 +08:00
}
2009-03-25 06:32:45 +08:00
return wrappedIterator ;
}
2009-03-18 00:39:03 +08:00
2009-03-25 06:32:45 +08:00
private SAMFileReader initializeSAMFile ( final File samFile ) {
SAMFileReader samReader = new SAMFileReader ( samFile , true ) ;
2009-03-18 00:39:03 +08:00
samReader . setValidationStringency ( strictness ) ;
final SAMFileHeader header = samReader . getFileHeader ( ) ;
2009-03-24 04:46:55 +08:00
logger . info ( String . format ( "Sort order is: " + header . getSortOrder ( ) ) ) ;
2009-03-18 00:39:03 +08:00
2009-03-25 06:32:45 +08:00
return samReader ;
2009-03-12 05:43:31 +08:00
}
2009-03-25 06:32:45 +08:00
// cleaning up past mistakes
// private Iterator<SAMRecord> loadSAMFile(final File samFile)
// throws IOException {
// Iterator<SAMRecord> iterator = null;
//
// samReader = new SAMFileReader(samFile, true);
// samReader.setValidationStringency(strictness);
//
// final SAMFileHeader header = samReader.getFileHeader();
// logger.info(String.format("Sort order is: " + header.getSortOrder()));
//
// // If the file has an index, querying functions are available. Use them if possible...
// if (samReader.hasIndex()) {
// iterator = new SamQueryIterator(samReader, locs);
// } else {
// // Ugh. Close and reopen the file so that the file progress decorator can be assigned to the input stream.
// samReader.close();
//
// final FileInputStream samFileStream = new FileInputStream(readsFile);
// //final InputStream bufferedStream = new BufferedInputStream(samFileStream);
// samReader = new SAMFileReader(readsFile, true);
// samReader.setValidationStringency(strictness);
//
// samReadingTracker = new FileProgressTracker<SAMRecord>(readsFile, samReader.iterator(), samFileStream.getChannel(), 1000);
// iterator = samReadingTracker;
// }
//
// return iterator;
// }
2009-03-12 05:43:31 +08:00
/ * *
* Prepare the reference for stream processing
* /
2009-03-16 06:21:48 +08:00
protected void initializeReference ( ) {
2009-03-24 04:27:21 +08:00
if ( refFileName ! = null ) {
2009-03-23 03:56:54 +08:00
//this.refFile = ReferenceSequenceFileFactory.getReferenceSequenceFile(refFileName);
this . refFile = new FastaSequenceFile2 ( refFileName ) ; // todo: replace when FastaSequenceFile2 is in picard
2009-02-27 05:50:29 +08:00
this . refIter = new ReferenceIterator ( this . refFile ) ;
2009-03-24 04:27:21 +08:00
if ( ! Utils . setupRefContigOrdering ( this . refFile ) ) {
2009-03-16 06:21:48 +08:00
// We couldn't process the reference contig ordering, fail since we need it
2009-03-24 04:27:21 +08:00
logger . fatal ( String . format ( "We couldn't load the contig dictionary associated with %s. At the current time we require this dictionary file to efficiently access the FASTA file. In the near future this program will automatically construct the dictionary for you and save it down." , refFileName ) ) ;
throw new RuntimeException ( "We couldn't load the contig dictionary associated with " + refFileName + ". At the current time we require this dictionary file to efficiently access the FASTA file. In the near future this program will automatically construct the dictionary for you and save it down." ) ;
2009-03-16 06:21:48 +08:00
}
2009-03-24 04:27:21 +08:00
}
2009-02-27 05:50:29 +08:00
}
2009-03-12 05:43:31 +08:00
/ * *
* Prepare the list of reference ordered data iterators for each of the rods
2009-03-24 04:27:21 +08:00
*
2009-03-12 05:43:31 +08:00
* @return A list of ROD iterators for getting data from each ROD
* /
2009-03-12 04:58:01 +08:00
protected List < ReferenceOrderedData . RODIterator > initializeRODs ( ) {
// set up reference ordered data
2009-03-16 06:21:48 +08:00
rodIters = new ArrayList < ReferenceOrderedData . RODIterator > ( ) ;
2009-03-24 04:27:21 +08:00
for ( ReferenceOrderedData data : rods ) {
2009-03-12 04:58:01 +08:00
rodIters . add ( data . iterator ( ) ) ;
}
return rodIters ;
}
2009-03-23 03:56:54 +08:00
/ * *
* An inappropriately placed testing of reading the reference
* /
2009-03-13 22:50:45 +08:00
protected void testReference ( ) {
while ( true ) {
ReferenceSequence ref = refFile . nextSequence ( ) ;
2009-03-24 04:46:55 +08:00
logger . debug ( String . format ( "%s %d %d" , ref . getName ( ) , ref . length ( ) , System . currentTimeMillis ( ) ) ) ;
2009-03-13 22:50:45 +08:00
printProgress ( true , "loci" , new GenomeLoc ( "foo" , 1 ) ) ;
}
}
2009-03-12 05:43:31 +08:00
// --------------------------------------------------------------------------------------------------------------
//
// dealing with reference ordered data
//
// --------------------------------------------------------------------------------------------------------------
/ * *
* Builds a list of the reference ordered datum at loc from each of the iterators . This function
* assumes you are accessing the data in order . You can ' t use this function for random access . Each
* successive call moves you along the file , consuming all data before loc .
*
* @param rodIters Iterators to access the RODs
2009-03-24 04:27:21 +08:00
* @param loc The location to get the rods at
2009-03-12 05:43:31 +08:00
* @return A list of ReferenceOrderDatum at loc . ROD without a datum at loc will be null in the list
* /
2009-03-01 04:47:48 +08:00
protected List < ReferenceOrderedDatum > getReferenceOrderedDataAtLocus ( List < ReferenceOrderedData . RODIterator > rodIters ,
2009-03-24 04:27:21 +08:00
final GenomeLoc loc ) {
2009-03-01 04:47:48 +08:00
List < ReferenceOrderedDatum > data = new ArrayList < ReferenceOrderedDatum > ( ) ;
2009-03-24 04:27:21 +08:00
for ( ReferenceOrderedData . RODIterator iter : rodIters ) {
2009-03-03 02:18:48 +08:00
data . add ( iter . seekForward ( loc ) ) ;
2009-03-01 04:47:48 +08:00
}
return data ;
}
2009-03-03 02:18:48 +08:00
2009-02-27 05:50:29 +08:00
// --------------------------------------------------------------------------------------------------------------
//
2009-03-03 02:18:48 +08:00
// traversal by loci functions
2009-02-27 05:50:29 +08:00
//
// --------------------------------------------------------------------------------------------------------------
2009-03-12 05:43:31 +08:00
/ * *
* Class to filter out un - handle - able reads from the stream . We currently are skipping
* unmapped reads , non - primary reads , unaligned reads , and those with indels . We should
* really change this to handle indel containing reads .
* /
2009-02-27 05:50:29 +08:00
class locusStreamFilterFunc implements SamRecordFilter {
2009-03-25 04:55:34 +08:00
SAMRecord lastRead = null ;
2009-02-27 05:50:29 +08:00
public boolean filterOut ( SAMRecord rec ) {
boolean result = false ;
String why = "" ;
2009-03-24 04:27:21 +08:00
if ( rec . getReadUnmappedFlag ( ) ) {
2009-02-27 05:50:29 +08:00
nUnmappedReads + + ;
result = true ;
why = "Unmapped" ;
2009-03-24 04:27:21 +08:00
} else if ( rec . getNotPrimaryAlignmentFlag ( ) ) {
2009-02-27 05:50:29 +08:00
nNotPrimary + + ;
result = true ;
why = "Not Primary" ;
2009-03-24 04:27:21 +08:00
} else if ( rec . getAlignmentStart ( ) = = SAMRecord . NO_ALIGNMENT_START ) {
2009-02-27 05:50:29 +08:00
nBadAlignments + + ;
result = true ;
why = "No alignment start" ;
2009-03-25 04:55:34 +08:00
}
else {
2009-02-27 05:50:29 +08:00
result = false ;
}
2009-03-24 04:27:21 +08:00
if ( result ) {
2009-02-27 05:50:29 +08:00
nSkippedReads + + ;
2009-03-24 04:46:55 +08:00
//System.out.printf(" [filter] %s => %b %s", rec.getReadName(), result, why);
2009-03-24 04:27:21 +08:00
} else {
2009-02-27 05:50:29 +08:00
nReads + + ;
}
2009-03-24 04:27:21 +08:00
return result ;
2009-02-27 05:50:29 +08:00
}
}
2009-03-17 07:22:04 +08:00
public void verifySortOrder ( final boolean requiresSortedOrder ) {
2009-03-24 04:27:21 +08:00
if ( beSafeP & & ! SORT_ON_FLY & & samReader . getFileHeader ( ) . getSortOrder ( ) ! = SAMFileHeader . SortOrder . coordinate ) {
2009-03-17 07:22:04 +08:00
final String msg = "SAM file is not sorted in coordinate order (according to header) Walker type with given arguments requires a sorted file for correct processing" ;
2009-03-24 04:27:21 +08:00
if ( requiresSortedOrder | | strictness = = SAMFileReader . ValidationStringency . STRICT )
2009-03-17 07:22:04 +08:00
throw new RuntimeIOException ( msg ) ;
2009-03-24 04:27:21 +08:00
else if ( strictness = = SAMFileReader . ValidationStringency . LENIENT )
logger . warn ( msg ) ;
2009-03-17 07:22:04 +08:00
}
}
2009-03-12 05:43:31 +08:00
/ * *
* Traverse by loci - - the key driver of linearly ordered traversal of loci . Provides reads , RODs , and
* the reference base for each locus in the reference to the LocusWalker walker . Supports all of the
* interaction contract implied by the locus walker
*
* @param walker A locus walker object
2009-03-24 04:27:21 +08:00
* @param < M > MapType - - the result of calling map ( ) on walker
* @param < T > ReduceType - - the result of calling reduce ( ) on the walker
2009-03-12 05:43:31 +08:00
* @return 0 on success
* /
2009-03-24 04:27:21 +08:00
protected < M , T > int traverseByLoci ( LocusWalker < M , T > walker ) {
2009-03-25 06:32:45 +08:00
samReadIter = initializeReads ( ) ;
2009-03-25 04:55:34 +08:00
2009-03-17 07:22:04 +08:00
verifySortOrder ( true ) ;
2009-03-12 05:43:31 +08:00
// initialize the walker object
walker . initialize ( ) ;
2009-03-25 06:32:45 +08:00
// We aren't locus oriented
T sum = carryWalkerOverInterval ( walker , samReadIter , walker . reduceInit ( ) , null ) ;
printOnTraversalDone ( "loci" , sum ) ;
walker . onTraversalDone ( ) ;
return 0 ;
}
private < M , T > T carryWalkerOverInterval ( LocusWalker < M , T > walker , Iterator < SAMRecord > readIter , T sum , GenomeLoc interval ) {
// prepare the read filtering read iterator and provide it to a new locus iterator
FilteringIterator filterIter = new FilteringIterator ( readIter , new locusStreamFilterFunc ( ) ) ;
2009-03-12 05:43:31 +08:00
2009-03-25 06:32:45 +08:00
boolean done = false ;
LocusIterator iter = new LocusIteratorByHanger ( filterIter ) ;
2009-03-24 04:27:21 +08:00
while ( iter . hasNext ( ) & & ! done ) {
2009-02-27 05:50:29 +08:00
this . nRecords + + ;
// actually get the read and hand it to the walker
2009-03-22 20:04:11 +08:00
LocusContext locus = iter . next ( ) ;
2009-02-27 05:50:29 +08:00
2009-03-25 06:32:45 +08:00
// if we don't have a particular interval we're processing, check them all, otherwise only operate at this
// location
2009-03-25 09:12:05 +08:00
if ( ( interval = = null & & inLocations ( locus . getLocation ( ) ) ) | | ( interval ! = null & & interval . overlapsP ( locus . getLocation ( ) ) ) ) {
2009-03-22 20:04:11 +08:00
//System.out.format("Working at %s\n", locus.getLocation().toString());
2009-03-12 05:43:31 +08:00
// Jump forward in the reference to this locus location
2009-03-22 20:04:11 +08:00
final ReferenceIterator refSite ;
refSite = refIter . seekForward ( locus . getLocation ( ) ) ;
2009-03-03 02:18:48 +08:00
final char refBase = refSite . getBaseAsChar ( ) ;
2009-03-16 22:46:19 +08:00
locus . setReferenceContig ( refSite . getCurrentContig ( ) ) ;
2009-03-12 05:43:31 +08:00
// Iterate forward to get all reference ordered data covering this locus
2009-03-03 02:18:48 +08:00
final List < ReferenceOrderedDatum > rodData = getReferenceOrderedDataAtLocus ( rodIters , locus . getLocation ( ) ) ;
2009-02-27 05:50:29 +08:00
2009-03-25 04:55:34 +08:00
logger . debug ( String . format ( " Reference: %s:%d %c" , refSite . getCurrentContig ( ) . getName ( ) , refSite . getPosition ( ) , refBase ) ) ;
2009-02-27 05:50:29 +08:00
2009-03-12 05:43:31 +08:00
//
// Execute our contract with the walker. Call filter, map, and reduce
//
2009-03-03 02:18:48 +08:00
final boolean keepMeP = walker . filter ( rodData , refBase , locus ) ;
2009-03-24 04:27:21 +08:00
if ( keepMeP ) {
2009-03-03 02:18:48 +08:00
M x = walker . map ( rodData , refBase , locus ) ;
sum = walker . reduce ( x , sum ) ;
}
2009-03-24 04:27:21 +08:00
if ( this . maxReads > 0 & & this . nRecords > this . maxReads ) {
2009-03-24 04:46:55 +08:00
logger . warn ( String . format ( "Maximum number of reads encountered, terminating traversal " + this . nRecords ) ) ;
2009-03-03 02:18:48 +08:00
done = true ;
}
2009-03-25 04:55:34 +08:00
printProgress ( "loci" , locus . getLocation ( ) ) ;
if ( pastFinalLocation ( locus . getLocation ( ) ) )
done = true ;
2009-02-27 05:50:29 +08:00
}
2009-03-25 04:55:34 +08:00
}
2009-03-25 06:32:45 +08:00
return sum ;
2009-03-25 04:55:34 +08:00
}
2009-02-27 05:50:29 +08:00
2009-03-25 06:32:45 +08:00
/ * *
* Same as the normal locus traverser , but oriented by locus , rather than implicitly
*
* @param walker A locus walker object
* @param < M > MapType - - the result of calling map ( ) on walker
* @param < T > ReduceType - - the result of calling reduce ( ) on the walker
* @return 0 on success
* /
2009-03-25 04:55:34 +08:00
protected < M , T > int traverseByLociByInterval ( LocusWalker < M , T > walker ) {
//verifySortOrder(true);
// initialize the walker object
walker . initialize ( ) ;
// Initialize the T sum using the walker
T sum = walker . reduceInit ( ) ;
2009-03-25 06:32:45 +08:00
samReader = initializeSAMFile ( readsFile ) ;
2009-03-25 04:55:34 +08:00
for ( GenomeLoc interval : locs ) {
System . out . printf ( "Processing locus %s%n" , interval . toString ( ) ) ;
2009-03-25 06:32:45 +08:00
CloseableIterator < SAMRecord > readIter = samReader . queryOverlapping ( interval . getContig ( ) ,
( int ) interval . getStart ( ) ,
( int ) interval . getStop ( ) ) ;
2009-03-25 04:55:34 +08:00
2009-03-25 06:32:45 +08:00
Iterator < SAMRecord > wrappedIter = WrapReadsIterator ( readIter , false ) ;
sum = carryWalkerOverInterval ( walker , wrappedIter , sum , interval ) ;
readIter . close ( ) ;
2009-02-27 05:50:29 +08:00
}
2009-03-12 05:43:31 +08:00
printOnTraversalDone ( "loci" , sum ) ;
2009-03-14 02:50:29 +08:00
walker . onTraversalDone ( ) ;
2009-02-27 05:50:29 +08:00
return 0 ;
}
2009-03-12 05:43:31 +08:00
/ * *
* Traverse by read - - the key driver of linearly ordered traversal of reads . Provides a single read to
* the walker object , in coordinate order . Supports all of the
* interaction contract implied by the read walker
2009-03-24 04:27:21 +08:00
* sor
*
2009-03-12 05:43:31 +08:00
* @param walker A read walker object
2009-03-24 04:27:21 +08:00
* @param < M > MapType - - the result of calling map ( ) on walker
* @param < R > ReduceType - - the result of calling reduce ( ) on the walker
2009-03-12 05:43:31 +08:00
* @return 0 on success
* /
2009-03-24 04:27:21 +08:00
protected < M , R > int traverseByRead ( ReadWalker < M , R > walker ) {
2009-03-25 06:32:45 +08:00
samReadIter = initializeReads ( ) ;
2009-03-25 04:55:34 +08:00
2009-03-24 04:27:21 +08:00
if ( refFileName = = null & & ! walker . requiresOrderedReads ( ) & & verifyingSamReadIter ! = null ) {
2009-03-24 04:46:55 +08:00
logger . warn ( String . format ( "STATUS: No reference file provided and unordered reads are tolerated, enabling out of order read processing." ) ) ;
2009-03-24 04:27:21 +08:00
if ( verifyingSamReadIter ! = null )
2009-03-17 23:28:04 +08:00
verifyingSamReadIter . setCheckOrderP ( false ) ;
2009-03-17 07:22:04 +08:00
}
2009-03-24 04:27:21 +08:00
verifySortOrder ( refFileName ! = null | | walker . requiresOrderedReads ( ) ) ;
2009-03-12 05:43:31 +08:00
// Initialize the walker
2009-02-27 05:50:29 +08:00
walker . initialize ( ) ;
2009-03-03 05:49:08 +08:00
2009-03-12 05:43:31 +08:00
// Initialize the sum
2009-02-27 05:50:29 +08:00
R sum = walker . reduceInit ( ) ;
2009-03-16 22:46:19 +08:00
List < Integer > offsets = Arrays . asList ( 0 ) ; // Offset of a single read is always 0
2009-03-12 05:43:31 +08:00
2009-03-03 02:18:48 +08:00
boolean done = false ;
2009-03-24 04:27:21 +08:00
while ( samReadIter . hasNext ( ) & & ! done ) {
2009-02-27 05:50:29 +08:00
this . nRecords + + ;
2009-03-12 05:43:31 +08:00
// get the next read
2009-03-03 05:49:08 +08:00
final SAMRecord read = samReadIter . next ( ) ;
2009-03-16 22:46:19 +08:00
final List < SAMRecord > reads = Arrays . asList ( read ) ;
GenomeLoc loc = Utils . genomicLocationOf ( read ) ;
// Jump forward in the reference to this locus location
LocusContext locus = new LocusContext ( loc , reads , offsets ) ;
2009-03-24 04:27:21 +08:00
if ( ! loc . isUnmapped ( ) & & refIter ! = null ) {
2009-03-17 07:22:04 +08:00
final ReferenceIterator refSite = refIter . seekForward ( loc ) ;
locus . setReferenceContig ( refSite . getCurrentContig ( ) ) ;
}
2009-02-27 05:50:29 +08:00
2009-03-24 04:27:21 +08:00
if ( inLocations ( loc ) ) {
2009-03-12 04:58:01 +08:00
2009-03-12 05:43:31 +08:00
//
// execute the walker contact
//
2009-03-16 22:46:19 +08:00
final boolean keepMeP = walker . filter ( locus , read ) ;
2009-03-24 04:27:21 +08:00
if ( keepMeP ) {
2009-03-16 22:46:19 +08:00
M x = walker . map ( locus , read ) ;
2009-03-03 02:18:48 +08:00
sum = walker . reduce ( x , sum ) ;
}
2009-03-24 04:27:21 +08:00
if ( this . maxReads > 0 & & this . nRecords > this . maxReads ) {
2009-03-24 04:46:55 +08:00
logger . warn ( String . format ( ( "Maximum number of reads encountered, terminating traversal " + this . nRecords ) ) ) ;
2009-03-12 05:43:31 +08:00
done = true ;
2009-03-03 02:18:48 +08:00
}
2009-03-12 04:58:01 +08:00
}
2009-03-12 05:43:31 +08:00
printProgress ( "reads" , loc ) ;
2009-03-24 04:27:21 +08:00
if ( pastFinalLocation ( loc ) )
2009-03-03 02:18:48 +08:00
done = true ;
//System.out.printf("Done? %b%n", done);
2009-03-24 04:27:21 +08:00
}
2009-02-27 05:50:29 +08:00
2009-03-12 05:43:31 +08:00
printOnTraversalDone ( "reads" , sum ) ;
2009-03-14 02:50:29 +08:00
walker . onTraversalDone ( ) ;
2009-02-27 05:50:29 +08:00
return 0 ;
}
2009-03-03 05:51:25 +08:00
}
2009-03-16 06:21:48 +08:00