From cfee59e0e6c26b8b313e103e01d009ea70d50e16 Mon Sep 17 00:00:00 2001 From: depristo Date: Fri, 27 Mar 2009 15:40:45 +0000 Subject: [PATCH] New type hierarchy for Traversals. There's a new package to hold them (traversals) and an easy system to create new ones. We are now one step closer to supporting the execution manager (a totally non-functional version is included here) that actually executes walkers in parallel using N threads. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@214 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/gatk/GenomeAnalysisTK.java | 47 +-- .../sting/gatk/executive/MicroManager.java | 22 ++ .../executive/TraversalEngineExecutive.java | 17 + .../{ => traversals}/TraversalEngine.java | 313 ++++-------------- .../sting/gatk/traversals/TraverseByLoci.java | 139 ++++++++ .../gatk/traversals/TraverseByReads.java | 113 +++++++ .../sting/gatk/walkers/LocusWalker.java | 2 +- .../sting/gatk/walkers/ReadWalker.java | 2 +- .../sting/gatk/walkers/Walker.java | 2 +- 9 files changed, 391 insertions(+), 266 deletions(-) create mode 100644 java/src/org/broadinstitute/sting/gatk/executive/MicroManager.java create mode 100644 java/src/org/broadinstitute/sting/gatk/executive/TraversalEngineExecutive.java rename java/src/org/broadinstitute/sting/gatk/{ => traversals}/TraversalEngine.java (67%) create mode 100644 java/src/org/broadinstitute/sting/gatk/traversals/TraverseByLoci.java create mode 100644 java/src/org/broadinstitute/sting/gatk/traversals/TraverseByReads.java diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java index 82299eefe..194270c7b 100644 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java @@ -13,6 +13,9 @@ import org.broadinstitute.sting.gatk.refdata.rodGFF; import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.traversals.TraversalEngine; +import org.broadinstitute.sting.gatk.traversals.TraverseByReads; +import org.broadinstitute.sting.gatk.traversals.TraverseByLoci; import org.broadinstitute.sting.utils.FastaSequenceFile2; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.Utils; @@ -170,7 +173,27 @@ public class GenomeAnalysisTK extends CommandLineProgram { initializeOutputStreams(); - this.engine = new TraversalEngine(INPUT_FILE, REF_FILE_ARG, rods); + Walker my_walker = null; + try { + my_walker = walkerManager.createWalkerByName( Analysis_Name ); + } + catch( InstantiationException ex ) { + throw new RuntimeException( "Unable to instantiate walker.", ex ); + } + catch( IllegalAccessException ex ) { + throw new RuntimeException( "Unable to access walker", ex ); + } + + // Try to get the walker specified + try { + LocusWalker walker = (LocusWalker) my_walker; + this.engine = new TraverseByLoci(INPUT_FILE, REF_FILE_ARG, rods); + } + catch (java.lang.ClassCastException e) { + // I guess we're a read walker LOL + ReadWalker walker = (ReadWalker) my_walker; + this.engine = new TraverseByReads(INPUT_FILE, REF_FILE_ARG, rods); + } // Prepare the sort ordering w.r.t. the sequence dictionary if (REF_FILE_ARG != null) { @@ -215,27 +238,7 @@ public class GenomeAnalysisTK extends CommandLineProgram { engine.setWalkOverAllSites(WALK_ALL_LOCI); engine.initialize(); - Walker my_walker = null; - try { - my_walker = walkerManager.createWalkerByName( Analysis_Name ); - } - catch( InstantiationException ex ) { - throw new RuntimeException( "Unable to instantiate walker.", ex ); - } - catch( IllegalAccessException ex ) { - throw new RuntimeException( "Unable to access walker", ex ); - } - - // Try to get the walker specified - try { - LocusWalker walker = (LocusWalker) my_walker; - engine.traverseByLoci(walker); - } - catch (java.lang.ClassCastException e) { - // I guess we're a read walker LOL - ReadWalker walker = (ReadWalker) my_walker; - engine.traverseByRead(walker); - } + engine.traverse(my_walker); return 0; } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroManager.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroManager.java new file mode 100644 index 000000000..47b316baa --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroManager.java @@ -0,0 +1,22 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.broadinstitute.sting.utils.GenomeLoc; + +/** + * A micro-scheduling manager for N-way threaded execution of a traversal + * + */ +public class MicroManager { + + public MicroManager( TraversalEngineExecutive TEfactory, // makes worker units + GenomeLoc[] locations, // list of work to do + int nThreadsToUse, // maximum number of threads to use to do the work + int initialChunkSize ) { // the initial chunk size for dividing up the work + // do a lot of work here to organize the computation + } + + public void execute() { + // actually divide up the work, create worker units, and send them off to do work + + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/executive/TraversalEngineExecutive.java b/java/src/org/broadinstitute/sting/gatk/executive/TraversalEngineExecutive.java new file mode 100644 index 000000000..dd1f795d1 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/TraversalEngineExecutive.java @@ -0,0 +1,17 @@ +package org.broadinstitute.sting.gatk.executive; + +import org.broadinstitute.sting.utils.GenomeLoc; + +import java.util.List; + +/** + * Created by IntelliJ IDEA. + * User: mdepristo + * Date: Mar 27, 2009 + * Time: 10:00:05 AM + * To change this template use File | Settings | File Templates. + */ +public interface TraversalEngineExecutive { + + public ReduceType processIntervals(List locations); +} diff --git a/java/src/org/broadinstitute/sting/gatk/TraversalEngine.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java similarity index 67% rename from java/src/org/broadinstitute/sting/gatk/TraversalEngine.java rename to java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index e77471aee..eb14050cf 100755 --- a/java/src/org/broadinstitute/sting/gatk/TraversalEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.gatk; +package org.broadinstitute.sting.gatk.traversals; import edu.mit.broad.picard.filter.FilteringIterator; import edu.mit.broad.picard.filter.SamRecordFilter; @@ -9,7 +9,6 @@ import net.sf.functionalj.FunctionN; import net.sf.functionalj.Functions; import net.sf.functionalj.reflect.JdkStdReflect; import net.sf.functionalj.reflect.StdReflect; -import net.sf.functionalj.util.Operators; import net.sf.samtools.SAMFileHeader; import net.sf.samtools.SAMFileReader; import net.sf.samtools.SAMFileReader.ValidationStringency; @@ -22,80 +21,79 @@ import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; +import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.LocusContext; import org.broadinstitute.sting.utils.*; import java.io.*; import java.util.*; -public class TraversalEngine { +public abstract class TraversalEngine { // list of reference ordered data objects - private List rods = null; + protected List rods = null; // Iterator over rods List rodIters; - //private String regionStr = null; // String dec - //private String traversalType = null; // String describing this traversal type - // How strict should we be with SAM/BAM parsing? - private ValidationStringency strictness = ValidationStringency.STRICT; + protected ValidationStringency strictness = ValidationStringency.STRICT; // Time in milliseconds since we initialized this engine - private long startTime = -1; - private long lastProgressPrintTime = -1; // When was the last time we printed our progress? + protected long startTime = -1; + protected long lastProgressPrintTime = -1; // When was the last time we printed our progress? // How long can we go without printing some progress info? - private long MAX_PROGRESS_PRINT_TIME = 10 * 1000; // 10 seconds in millisecs + protected long MAX_PROGRESS_PRINT_TIME = 10 * 1000; // 10 seconds in millisecs // Maximum number of reads to process before finishing - private long maxReads = -1; + protected long maxReads = -1; // Name of the reads file, in BAM/SAM format - private File readsFile = null; // the name of the reads file - private SAMFileReader samReader = null; + protected File readsFile = null; // the name of the reads file + protected SAMFileReader samReader = null; // iterator over the sam records in the readsFile - private Iterator samReadIter = null; + protected Iterator samReadIter = null; // The verifying iterator, it does checking - VerifyingSamIterator verifyingSamReadIter = null; + protected VerifyingSamIterator verifyingSamReadIter = null; // The reference data -- filename, refSeqFile, and iterator - private File refFileName = null; // the name of the reference file + protected File refFileName = null; // the name of the reference file //private ReferenceSequenceFile refFile = null; - private FastaSequenceFile2 refFile = null; // todo: merge FastaSequenceFile2 into picard! - private ReferenceIterator refIter = null; + protected FastaSequenceFile2 refFile = null; // todo: merge FastaSequenceFile2 into picard! + protected ReferenceIterator refIter = null; // Number of records (loci, reads) we've processed - private long nRecords = 0; + protected long nRecords = 0; // How many reads have we processed, along with those skipped for various reasons - private int nReads = 0; - private int nSkippedReads = 0; - private int nUnmappedReads = 0; - private int nNotPrimary = 0; - private int nBadAlignments = 0; - private int nSkippedIndels = 0; + protected int nReads = 0; + protected int nSkippedReads = 0; + protected int nUnmappedReads = 0; + protected int nNotPrimary = 0; + protected int nBadAlignments = 0; + protected int nSkippedIndels = 0; // Progress tracker for the sam file - private FileProgressTracker samReadingTracker = null; + protected FileProgressTracker samReadingTracker = null; - public boolean DEBUGGING = false; - public boolean beSafeP = true; - public boolean SORT_ON_FLY = false; - public boolean FILTER_UNSORTED_READS = false; - public boolean walkOverAllSites = false; - public int MAX_ON_FLY_SORTS = 100000; - public long N_RECORDS_TO_PRINT = 100000; - public boolean THREADED_IO = false; - public int THREADED_IO_BUFFER_SIZE = 10000; + protected boolean DEBUGGING = false; + protected boolean beSafeP = true; + protected boolean SORT_ON_FLY = false; + protected boolean FILTER_UNSORTED_READS = false; + protected boolean walkOverAllSites = false; + protected int MAX_ON_FLY_SORTS = 100000; + protected long N_RECORDS_TO_PRINT = 100000; + protected boolean THREADED_IO = false; + protected int THREADED_IO_BUFFER_SIZE = 10000; /** * our log, which we want to capture anything from this class */ - private static Logger logger = Logger.getLogger(TraversalEngine.class); + protected static Logger logger = Logger.getLogger(TraversalEngine.class); // Locations we are going to process during the traversal - private GenomeLoc[] locs = null; + protected GenomeLoc[] locs = null; // -------------------------------------------------------------------------------------------------------------- // @@ -261,7 +259,7 @@ public class TraversalEngine { * @param curr Current genome Location * @return true if we are past the last location to process */ - private boolean pastFinalLocation(GenomeLoc curr) { + protected boolean pastFinalLocation(GenomeLoc curr) { boolean r = locs != null && locs[locs.length - 1].compareTo(curr) == -1 && !locs[locs.length - 1].overlapsP(curr); //System.out.printf(" pastFinalLocation %s vs. %s => %d => %b%n", locs[locs.length-1], curr, locs[locs.length-1].compareTo( curr ), r); return r; @@ -277,7 +275,7 @@ public class TraversalEngine { * @param curTime (current runtime, in millisecs) * @return true if the maximum interval (in millisecs) has passed since the last printing */ - private boolean maxElapsedIntervalForPrinting(final long curTime) { + protected boolean maxElapsedIntervalForPrinting(final long curTime) { return (curTime - this.lastProgressPrintTime) > MAX_PROGRESS_PRINT_TIME; } @@ -363,12 +361,12 @@ public class TraversalEngine { return true; } - private Iterator initializeReads() { + protected Iterator initializeReads() { samReader = initializeSAMFile(readsFile); return WrapReadsIterator(getReadsIterator(samReader), true); } - private Iterator getReadsIterator(final SAMFileReader samReader) { + protected Iterator getReadsIterator(final SAMFileReader samReader) { // If the file has an index, querying functions are available. Use them if possible... if ( samReader == null && readsFile.toString().endsWith(".list") ) { SAMFileHeader.SortOrder SORT_ORDER = SAMFileHeader.SortOrder.coordinate; @@ -394,7 +392,7 @@ public class TraversalEngine { } } - private Iterator WrapReadsIterator( final Iterator rawIterator, final boolean enableVerification ) { + protected Iterator WrapReadsIterator( final Iterator rawIterator, final boolean enableVerification ) { Iterator wrappedIterator = rawIterator; if (SORT_ON_FLY) @@ -411,7 +409,7 @@ public class TraversalEngine { return wrappedIterator; } - private SAMFileReader initializeSAMFile(final File samFile) { + protected SAMFileReader initializeSAMFile(final File samFile) { // todo: fixme, this is a hack to try out dynamic merging if ( samFile.toString().endsWith(".list") ) { return null; @@ -427,36 +425,6 @@ public class TraversalEngine { } } - // cleaning up past mistakes -// private Iterator loadSAMFile(final File samFile) -// throws IOException { -// Iterator iterator = null; -// -// samReader = new SAMFileReader(samFile, true); -// samReader.setValidationStringency(strictness); -// -// final SAMFileHeader header = samReader.getFileHeader(); -// logger.info(String.format("Sort order is: " + header.getSortOrder())); -// -// // If the file has an index, querying functions are available. Use them if possible... -// if (samReader.hasIndex()) { -// iterator = new SamQueryIterator(samReader, locs); -// } else { -// // Ugh. Close and reopen the file so that the file progress decorator can be assigned to the input stream. -// samReader.close(); -// -// final FileInputStream samFileStream = new FileInputStream(readsFile); -// //final InputStream bufferedStream = new BufferedInputStream(samFileStream); -// samReader = new SAMFileReader(readsFile, true); -// samReader.setValidationStringency(strictness); -// -// samReadingTracker = new FileProgressTracker(readsFile, samReader.iterator(), samFileStream.getChannel(), 1000); -// iterator = samReadingTracker; -// } -// -// return iterator; -// } - /** * Prepare the reference for stream processing */ @@ -498,6 +466,19 @@ public class TraversalEngine { } } + // -------------------------------------------------------------------------------------------------------------- + // + // shutdown + // + // -------------------------------------------------------------------------------------------------------------- + + public boolean shutdown() { + // todo: actually shutdown the resources + + return true; + } + + // -------------------------------------------------------------------------------------------------------------- // // dealing with reference ordered data @@ -522,7 +503,24 @@ public class TraversalEngine { return data; } + // -------------------------------------------------------------------------------------------------------------- + // + // processing + // + // -------------------------------------------------------------------------------------------------------------- + public T traverse(Walker walker) { + List l = new ArrayList(); + if ( hasLocations() ) + l = Arrays.asList(locs); + + return traverse(walker, l); + } + + public T traverse(Walker walker, List locations) { + return null; + } + // -------------------------------------------------------------------------------------------------------------- // // traversal by loci functions @@ -575,171 +573,4 @@ public class TraversalEngine { logger.warn(msg); } } - - /** - * Traverse by loci -- the key driver of linearly ordered traversal of loci. Provides reads, RODs, and - * the reference base for each locus in the reference to the LocusWalker walker. Supports all of the - * interaction contract implied by the locus walker - * - * @param walker A locus walker object - * @param MapType -- the result of calling map() on walker - * @param ReduceType -- the result of calling reduce() on the walker - * @return 0 on success - */ - protected int traverseByLoci(LocusWalker walker) { - samReader = initializeSAMFile(readsFile); - - verifySortOrder(true); - - // initialize the walker object - walker.initialize(); - - T sum = walker.reduceInit(); - if ( samReader.hasIndex() && hasLocations() ) { - // we are doing interval-based traversals - for ( GenomeLoc interval : locs ) { - logger.debug(String.format("Processing locus %s", interval.toString())); - - CloseableIterator readIter = samReader.queryOverlapping( interval.getContig(), - (int)interval.getStart(), - (int)interval.getStop() ); - - Iterator wrappedIter = WrapReadsIterator( readIter, false ); - sum = carryWalkerOverInterval(walker, wrappedIter, sum, interval); - readIter.close(); - } - } - else { - // We aren't locus oriented - samReadIter = WrapReadsIterator(getReadsIterator(samReader), true); - sum = carryWalkerOverInterval(walker, samReadIter, sum, null); - } - - printOnTraversalDone("loci", sum); - walker.onTraversalDone(sum); - return 0; - } - - private T carryWalkerOverInterval( LocusWalker walker, Iterator readIter, T sum, GenomeLoc interval ) { - // prepare the read filtering read iterator and provide it to a new locus iterator - FilteringIterator filterIter = new FilteringIterator(readIter, new locusStreamFilterFunc()); - - boolean done = false; - LocusIterator iter = new LocusIteratorByHanger(filterIter); - while (iter.hasNext() && !done) { - this.nRecords++; - - // actually get the read and hand it to the walker - LocusContext locus = iter.next(); - - // if we don't have a particular interval we're processing, check them all, otherwise only operate at this - // location - if ( ( interval == null && inLocations(locus.getLocation()) ) || (interval != null && interval.overlapsP(locus.getLocation())) ) { - - //System.out.format("Working at %s\n", locus.getLocation().toString()); - - ReferenceIterator refSite = refIter.seekForward(locus.getLocation()); - final char refBase = refSite.getBaseAsChar(); - locus.setReferenceContig(refSite.getCurrentContig()); - - // Iterate forward to get all reference ordered data covering this locus - final List rodData = getReferenceOrderedDataAtLocus(rodIters, locus.getLocation()); - - logger.debug(String.format(" Reference: %s:%d %c", refSite.getCurrentContig().getName(), refSite.getPosition(), refBase)); - - // - // Execute our contract with the walker. Call filter, map, and reduce - // - final boolean keepMeP = walker.filter(rodData, refBase, locus); - if (keepMeP) { - M x = walker.map(rodData, refBase, locus); - sum = walker.reduce(x, sum); - } - - if (this.maxReads > 0 && this.nRecords > this.maxReads) { - logger.warn(String.format("Maximum number of reads encountered, terminating traversal " + this.nRecords)); - done = true; - } - - printProgress("loci", locus.getLocation()); - if (pastFinalLocation(locus.getLocation())) - done = true; - } - } - return sum; - } - - /** - * Traverse by read -- the key driver of linearly ordered traversal of reads. Provides a single read to - * the walker object, in coordinate order. Supports all of the - * interaction contract implied by the read walker - * sor - * - * @param walker A read walker object - * @param MapType -- the result of calling map() on walker - * @param ReduceType -- the result of calling reduce() on the walker - * @return 0 on success - */ - protected int traverseByRead(ReadWalker walker) { - samReadIter = initializeReads(); - - if (refFileName == null && !walker.requiresOrderedReads() && verifyingSamReadIter != null) { - logger.warn(String.format("STATUS: No reference file provided and unordered reads are tolerated, enabling out of order read processing.")); - if (verifyingSamReadIter != null) - verifyingSamReadIter.setCheckOrderP(false); - } - - verifySortOrder(refFileName != null || walker.requiresOrderedReads()); - - // Initialize the walker - walker.initialize(); - - // Initialize the sum - R sum = walker.reduceInit(); - List offsets = Arrays.asList(0); // Offset of a single read is always 0 - - boolean done = false; - while (samReadIter.hasNext() && !done) { - this.nRecords++; - - // get the next read - final SAMRecord read = samReadIter.next(); - final List reads = Arrays.asList(read); - GenomeLoc loc = Utils.genomicLocationOf(read); - - // Jump forward in the reference to this locus location - LocusContext locus = new LocusContext(loc, reads, offsets); - if (!loc.isUnmapped() && refIter != null) { - final ReferenceIterator refSite = refIter.seekForward(loc); - locus.setReferenceContig(refSite.getCurrentContig()); - } - - if (inLocations(loc)) { - - // - // execute the walker contact - // - final boolean keepMeP = walker.filter(locus, read); - if (keepMeP) { - M x = walker.map(locus, read); - sum = walker.reduce(x, sum); - } - - if (this.maxReads > 0 && this.nRecords > this.maxReads) { - logger.warn(String.format(("Maximum number of reads encountered, terminating traversal " + this.nRecords))); - done = true; - } - } - printProgress("reads", loc); - - if (pastFinalLocation(loc)) - done = true; - //System.out.printf("Done? %b%n", done); - } - - printOnTraversalDone("reads", sum); - walker.onTraversalDone(sum); - return 0; - } } - diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseByLoci.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseByLoci.java new file mode 100644 index 000000000..2f52c2eef --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseByLoci.java @@ -0,0 +1,139 @@ +package org.broadinstitute.sting.gatk.traversals; + +import org.broadinstitute.sting.gatk.walkers.ReadWalker; +import org.broadinstitute.sting.gatk.walkers.LocusWalker; +import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.LocusContext; +import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; +import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum; +import org.broadinstitute.sting.gatk.iterators.ReferenceIterator; +import org.broadinstitute.sting.gatk.iterators.LocusIterator; +import org.broadinstitute.sting.gatk.iterators.LocusIteratorByHanger; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.Utils; + +import java.util.List; +import java.util.Arrays; +import java.util.Iterator; +import java.io.File; + +import net.sf.samtools.SAMRecord; +import net.sf.samtools.util.CloseableIterator; +import edu.mit.broad.picard.filter.FilteringIterator; + +/** + * Created by IntelliJ IDEA. + * User: mdepristo + * Date: Mar 27, 2009 + * Time: 10:26:03 AM + * To change this template use File | Settings | File Templates. + */ +public class TraverseByLoci extends TraversalEngine { + + public TraverseByLoci(File reads, File ref, List rods) { + super(reads, ref, rods); + } + + public T traverse(Walker walker, List locations) { + if ( walker instanceof LocusWalker ) { + Walker x = walker; + LocusWalker locusWalker = (LocusWalker)x; + return (T)this.traverseByLoci(locusWalker, locations); + } else { + throw new IllegalArgumentException("Walker isn't a loci walker!"); + } + } + + /** + * Traverse by loci -- the key driver of linearly ordered traversal of loci. Provides reads, RODs, and + * the reference base for each locus in the reference to the LocusWalker walker. Supports all of the + * interaction contract implied by the locus walker + * + * @param walker A locus walker object + * @param MapType -- the result of calling map() on walker + * @param ReduceType -- the result of calling reduce() on the walker + * @return 0 on success + */ + protected T traverseByLoci(LocusWalker walker, List locations) { + samReader = initializeSAMFile(readsFile); + + verifySortOrder(true); + + // initialize the walker object + walker.initialize(); + + T sum = walker.reduceInit(); + if ( samReader.hasIndex() && hasLocations() ) { + // we are doing interval-based traversals + for ( GenomeLoc interval : locs ) { + logger.debug(String.format("Processing locus %s", interval.toString())); + + CloseableIterator readIter = samReader.queryOverlapping( interval.getContig(), + (int)interval.getStart(), + (int)interval.getStop() ); + + Iterator wrappedIter = WrapReadsIterator( readIter, false ); + sum = carryWalkerOverInterval(walker, wrappedIter, sum, interval); + readIter.close(); + } + } + else { + // We aren't locus oriented + samReadIter = WrapReadsIterator(getReadsIterator(samReader), true); + sum = carryWalkerOverInterval(walker, samReadIter, sum, null); + } + + printOnTraversalDone("loci", sum); + walker.onTraversalDone(sum); + return sum; + } + + protected T carryWalkerOverInterval( LocusWalker walker, Iterator readIter, T sum, GenomeLoc interval ) { + // prepare the read filtering read iterator and provide it to a new locus iterator + FilteringIterator filterIter = new FilteringIterator(readIter, new locusStreamFilterFunc()); + + boolean done = false; + LocusIterator iter = new LocusIteratorByHanger(filterIter); + while (iter.hasNext() && !done) { + this.nRecords++; + + // actually get the read and hand it to the walker + LocusContext locus = iter.next(); + + // if we don't have a particular interval we're processing, check them all, otherwise only operate at this + // location + if ( ( interval == null && inLocations(locus.getLocation()) ) || (interval != null && interval.overlapsP(locus.getLocation())) ) { + + //System.out.format("Working at %s\n", locus.getLocation().toString()); + + ReferenceIterator refSite = refIter.seekForward(locus.getLocation()); + final char refBase = refSite.getBaseAsChar(); + locus.setReferenceContig(refSite.getCurrentContig()); + + // Iterate forward to get all reference ordered data covering this locus + final List rodData = getReferenceOrderedDataAtLocus(rodIters, locus.getLocation()); + + logger.debug(String.format(" Reference: %s:%d %c", refSite.getCurrentContig().getName(), refSite.getPosition(), refBase)); + + // + // Execute our contract with the walker. Call filter, map, and reduce + // + final boolean keepMeP = walker.filter(rodData, refBase, locus); + if (keepMeP) { + M x = walker.map(rodData, refBase, locus); + sum = walker.reduce(x, sum); + } + + if (this.maxReads > 0 && this.nRecords > this.maxReads) { + logger.warn(String.format("Maximum number of reads encountered, terminating traversal " + this.nRecords)); + done = true; + } + + printProgress("loci", locus.getLocation()); + if (pastFinalLocation(locus.getLocation())) + done = true; + } + } + return sum; + } +} \ No newline at end of file diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraverseByReads.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseByReads.java new file mode 100644 index 000000000..b9297670c --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraverseByReads.java @@ -0,0 +1,113 @@ +package org.broadinstitute.sting.gatk.traversals; + +import org.broadinstitute.sting.gatk.walkers.ReadWalker; +import org.broadinstitute.sting.gatk.walkers.LocusWalker; +import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.gatk.LocusContext; +import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData; +import org.broadinstitute.sting.gatk.iterators.ReferenceIterator; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.Utils; + +import java.util.List; +import java.util.Arrays; +import java.io.File; + +import net.sf.samtools.SAMRecord; + +/** + * Created by IntelliJ IDEA. + * User: mdepristo + * Date: Mar 27, 2009 + * Time: 10:26:03 AM + * To change this template use File | Settings | File Templates. + */ +public class TraverseByReads extends TraversalEngine { + + public TraverseByReads(File reads, File ref, List rods) { + super(reads, ref, rods); + } + + public T traverse(Walker walker, List locations) { + if ( walker instanceof ReadWalker ) { + Walker x = walker; + ReadWalker readWalker = (ReadWalker)x; + return (T)this.traverseByRead(readWalker, locations); + } else { + throw new IllegalArgumentException("Walker isn't a read walker!"); + } + } + + /** + * Traverse by read -- the key driver of linearly ordered traversal of reads. Provides a single read to + * the walker object, in coordinate order. Supports all of the + * interaction contract implied by the read walker + * sor + * + * @param walker A read walker object + * @param MapType -- the result of calling map() on walker + * @param ReduceType -- the result of calling reduce() on the walker + * @return 0 on success + */ + public Object traverseByRead(ReadWalker walker, List locations) { + samReadIter = initializeReads(); + + if (refFileName == null && !walker.requiresOrderedReads() && verifyingSamReadIter != null) { + logger.warn(String.format("STATUS: No reference file provided and unordered reads are tolerated, enabling out of order read processing.")); + if (verifyingSamReadIter != null) + verifyingSamReadIter.setCheckOrderP(false); + } + + verifySortOrder(refFileName != null || walker.requiresOrderedReads()); + + // Initialize the walker + walker.initialize(); + + // Initialize the sum + T sum = walker.reduceInit(); + List offsets = Arrays.asList(0); // Offset of a single read is always 0 + + boolean done = false; + while (samReadIter.hasNext() && !done) { + this.nRecords++; + + // get the next read + final SAMRecord read = samReadIter.next(); + final List reads = Arrays.asList(read); + GenomeLoc loc = Utils.genomicLocationOf(read); + + // Jump forward in the reference to this locus location + LocusContext locus = new LocusContext(loc, reads, offsets); + if (!loc.isUnmapped() && refIter != null) { + final ReferenceIterator refSite = refIter.seekForward(loc); + locus.setReferenceContig(refSite.getCurrentContig()); + } + + if (inLocations(loc)) { + + // + // execute the walker contact + // + final boolean keepMeP = walker.filter(locus, read); + if (keepMeP) { + M x = walker.map(locus, read); + sum = walker.reduce(x, sum); + } + + if (this.maxReads > 0 && this.nRecords > this.maxReads) { + logger.warn(String.format(("Maximum number of reads encountered, terminating traversal " + this.nRecords))); + done = true; + } + } + printProgress("reads", loc); + + if (pastFinalLocation(loc)) + done = true; + //System.out.printf("Done? %b%n", done); + } + + printOnTraversalDone("reads", sum); + walker.onTraversalDone(sum); + return sum; + } +} diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/LocusWalker.java b/java/src/org/broadinstitute/sting/gatk/walkers/LocusWalker.java index a7e371cbb..67abd1b0a 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/LocusWalker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/LocusWalker.java @@ -12,7 +12,7 @@ import java.util.List; * Time: 2:52:28 PM * To change this template use File | Settings | File Templates. */ -public abstract class LocusWalker extends Walker { +public abstract class LocusWalker extends Walker { // Do we actually want to operate on the context? public boolean filter(List rodData, char ref, LocusContext context) { return true; // We are keeping all the reads diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/ReadWalker.java b/java/src/org/broadinstitute/sting/gatk/walkers/ReadWalker.java index 093d55916..098a30e48 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/ReadWalker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/ReadWalker.java @@ -10,7 +10,7 @@ import org.broadinstitute.sting.gatk.LocusContext; * Time: 2:52:28 PM * To change this template use File | Settings | File Templates. */ -public abstract class ReadWalker extends Walker { +public abstract class ReadWalker extends Walker { public boolean requiresOrderedReads() { return false; } // Do we actually want to operate on the context? diff --git a/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java b/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java index e895d84d6..4dc0e55f4 100755 --- a/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java +++ b/java/src/org/broadinstitute/sting/gatk/walkers/Walker.java @@ -11,7 +11,7 @@ import org.broadinstitute.sting.gatk.GenomeAnalysisTK; * Time: 1:53:31 PM * To change this template use File | Settings | File Templates. */ -public abstract class Walker { +public abstract class Walker { // TODO: Can a walker be templatized so that map and reduce live here? /**