diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java index cfa48ed57..6c14df22e 100644 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisTK.java @@ -149,8 +149,8 @@ public class GenomeAnalysisTK extends CommandLineProgram { } engine.setSafetyChecking(!UNSAFE); engine.setSortOnFly(ENABLED_SORT_ON_FLY); - - engine.initialize(ENABLED_THREADED_IO); + engine.setThreadedIO(ENABLED_THREADED_IO); + engine.initialize(); //engine.testReference(); //LocusWalker walker = new PileupWalker(); diff --git a/java/src/org/broadinstitute/sting/gatk/TraversalEngine.java b/java/src/org/broadinstitute/sting/gatk/TraversalEngine.java index 1f7126960..5cd169740 100755 --- a/java/src/org/broadinstitute/sting/gatk/TraversalEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/TraversalEngine.java @@ -86,6 +86,7 @@ public class TraversalEngine { public boolean FILTER_UNSORTED_READS = false; public int MAX_ON_FLY_SORTS = 100000; public long N_RECORDS_TO_PRINT = 100000; + public boolean THREADED_IO = false; public int THREADED_IO_BUFFER_SIZE = 10000; /** @@ -131,6 +132,10 @@ public class TraversalEngine { this.maxReads = maxReads; } + public void setThreadedIO(final boolean threadedIO) { + this.THREADED_IO = threadedIO; + } + public void setDebugging(final boolean d) { DEBUGGING = d; } @@ -353,7 +358,7 @@ public class TraversalEngine { * * @return true on success */ - public boolean initialize(final boolean THREADED_IO) { + public boolean initialize() { lastProgressPrintTime = startTime = System.currentTimeMillis(); initializeReference(); // Initial the reference ordered data iterators @@ -362,57 +367,76 @@ public class TraversalEngine { return true; } - private void initializeReads(final boolean THREADED_IO) { + private Iterator initializeReads() { + samReader = initializeSAMFile(readsFile); + return WrapReadsIterator(getReadsIterator(samReader), true); + } - Iterator samIterator; - try { - samReadIter = loadSAMFile(readsFile, THREADED_IO); - } - catch (IOException ex) { - // TODO: IOException should be a checked exception in this case. - throw new RuntimeIOException(ex); - } - - if (SORT_ON_FLY) - samReadIter = new SortSamIterator(samReadIter, MAX_ON_FLY_SORTS); - if (beSafeP) - samReadIter = new VerifyingSamIterator(samReadIter); - - if (THREADED_IO) { - logger.info(String.format("Enabling threaded I/O with buffer of %d reads", THREADED_IO_BUFFER_SIZE)); - samReadIter = new ThreadedIterator(samReadIter, THREADED_IO_BUFFER_SIZE); + private Iterator getReadsIterator(final SAMFileReader samReader) { + // If the file has an index, querying functions are available. Use them if possible... + if (samReader.hasIndex()) { + return new SamQueryIterator(samReader, locs); + } else { + return samReader.iterator(); } } - protected Iterator loadSAMFile(final File samFile, final boolean threadedIO) - throws IOException { - Iterator iterator = null; + private Iterator WrapReadsIterator( final Iterator rawIterator, final boolean enableVerification ) { + Iterator wrappedIterator = rawIterator; - samReader = new SAMFileReader(readsFile, true); + if (SORT_ON_FLY) + wrappedIterator = new SortSamIterator(wrappedIterator, MAX_ON_FLY_SORTS); + + if (beSafeP && enableVerification) + wrappedIterator = new VerifyingSamIterator(wrappedIterator); + + if (THREADED_IO) { + logger.info(String.format("Enabling threaded I/O with buffer of %d reads", THREADED_IO_BUFFER_SIZE)); + wrappedIterator = new ThreadedIterator(wrappedIterator, THREADED_IO_BUFFER_SIZE); + } + + return wrappedIterator; + } + + private SAMFileReader initializeSAMFile(final File samFile) { + SAMFileReader samReader = new SAMFileReader(samFile, true); samReader.setValidationStringency(strictness); final SAMFileHeader header = samReader.getFileHeader(); logger.info(String.format("Sort order is: " + header.getSortOrder())); - // If the file has an index, querying functions are available. Use them if possible... - if (samReader.hasIndex()) { - iterator = new SamQueryIterator(samReader, locs); - } else { - // Ugh. Close and reopen the file so that the file progress decorator can be assigned to the input stream. - samReader.close(); - - final FileInputStream samFileStream = new FileInputStream(readsFile); - final InputStream bufferedStream = new BufferedInputStream(samFileStream); - samReader = new SAMFileReader(readsFile, true); - samReader.setValidationStringency(strictness); - - samReadingTracker = new FileProgressTracker(readsFile, samReader.iterator(), samFileStream.getChannel(), 1000); - iterator = samReadingTracker; - } - - return iterator; + return samReader; } + // cleaning up past mistakes +// private Iterator loadSAMFile(final File samFile) +// throws IOException { +// Iterator iterator = null; +// +// samReader = new SAMFileReader(samFile, true); +// samReader.setValidationStringency(strictness); +// +// final SAMFileHeader header = samReader.getFileHeader(); +// logger.info(String.format("Sort order is: " + header.getSortOrder())); +// +// // If the file has an index, querying functions are available. Use them if possible... +// if (samReader.hasIndex()) { +// iterator = new SamQueryIterator(samReader, locs); +// } else { +// // Ugh. Close and reopen the file so that the file progress decorator can be assigned to the input stream. +// samReader.close(); +// +// final FileInputStream samFileStream = new FileInputStream(readsFile); +// //final InputStream bufferedStream = new BufferedInputStream(samFileStream); +// samReader = new SAMFileReader(readsFile, true); +// samReader.setValidationStringency(strictness); +// +// samReadingTracker = new FileProgressTracker(readsFile, samReader.iterator(), samFileStream.getChannel(), 1000); +// iterator = samReadingTracker; +// } +// +// return iterator; +// } /** * Prepare the reference for stream processing @@ -544,30 +568,36 @@ public class TraversalEngine { * @return 0 on success */ protected int traverseByLoci(LocusWalker walker) { - initializeReads(false); + samReadIter = initializeReads(); verifySortOrder(true); - // prepare the read filtering read iterator and provide it to a new locus iterator - FilteringIterator filterIter = new FilteringIterator(samReadIter, new locusStreamFilterFunc()); - //LocusIterator iter = new SingleLocusIterator(filterIter); - LocusIterator iter = new LocusIteratorByHanger(filterIter); - - // initialize the walker object walker.initialize(); - // Initialize the T sum using the walker - T sum = walker.reduceInit(); - boolean done = false; - GenomeLoc prevLoc = null; + + // We aren't locus oriented + T sum = carryWalkerOverInterval(walker, samReadIter, walker.reduceInit(), null); + printOnTraversalDone("loci", sum); + walker.onTraversalDone(); + return 0; + } + + private T carryWalkerOverInterval( LocusWalker walker, Iterator readIter, T sum, GenomeLoc interval ) { + // prepare the read filtering read iterator and provide it to a new locus iterator + FilteringIterator filterIter = new FilteringIterator(readIter, new locusStreamFilterFunc()); + + boolean done = false; + LocusIterator iter = new LocusIteratorByHanger(filterIter); while (iter.hasNext() && !done) { this.nRecords++; // actually get the read and hand it to the walker LocusContext locus = iter.next(); - if ( inLocations(locus.getLocation()) ) { + // if we don't have a particular interval we're processing, check them all, otherwise only operate at this + // location + if ( ( interval == null && inLocations(locus.getLocation()) ) || interval.overlapsP(locus.getLocation()) ) { //System.out.format("Working at %s\n", locus.getLocation().toString()); @@ -601,13 +631,17 @@ public class TraversalEngine { done = true; } } - - printOnTraversalDone("loci", sum); - walker.onTraversalDone(); - return 0; + return sum; } - + /** + * Same as the normal locus traverser, but oriented by locus, rather than implicitly + * + * @param walker A locus walker object + * @param MapType -- the result of calling map() on walker + * @param ReduceType -- the result of calling reduce() on the walker + * @return 0 on success + */ protected int traverseByLociByInterval(LocusWalker walker) { //verifySortOrder(true); @@ -616,62 +650,17 @@ public class TraversalEngine { // Initialize the T sum using the walker T sum = walker.reduceInit(); + samReader = initializeSAMFile(readsFile); for ( GenomeLoc interval : locs ) { System.out.printf("Processing locus %s%n", interval.toString()); - GenomeLoc[] intervalArray = { interval }; - samReader = new SAMFileReader(readsFile, true); - samReader.setValidationStringency(strictness); - CloseableIterator readIters = samReader.queryOverlapping( interval.getContig(), - (int)interval.getStart(), - (int)interval.getStop() ); + CloseableIterator readIter = samReader.queryOverlapping( interval.getContig(), + (int)interval.getStart(), + (int)interval.getStop() ); - // prepare the read filtering read iterator and provide it to a new locus iterator - FilteringIterator filterIter = new FilteringIterator(readIters, new locusStreamFilterFunc()); - - boolean done = false; - LocusIterator iter = new LocusIteratorByHanger(filterIter); - while (iter.hasNext() && !done) { - this.nRecords++; - - // actually get the read and hand it to the walker - LocusContext locus = iter.next(); - - if ( interval.overlapsP(locus.getLocation()) ) { - - //System.out.format("Working at %s\n", locus.getLocation().toString()); - - // Jump forward in the reference to this locus location - final ReferenceIterator refSite; - refSite = refIter.seekForward(locus.getLocation()); - final char refBase = refSite.getBaseAsChar(); - locus.setReferenceContig(refSite.getCurrentContig()); - - // Iterate forward to get all reference ordered data covering this locus - final List rodData = getReferenceOrderedDataAtLocus(rodIters, locus.getLocation()); - - logger.debug(String.format(" Reference: %s:%d %c", refSite.getCurrentContig().getName(), refSite.getPosition(), refBase)); - - // - // Execute our contract with the walker. Call filter, map, and reduce - // - final boolean keepMeP = walker.filter(rodData, refBase, locus); - if (keepMeP) { - M x = walker.map(rodData, refBase, locus); - sum = walker.reduce(x, sum); - } - - if (this.maxReads > 0 && this.nRecords > this.maxReads) { - logger.warn(String.format("Maximum number of reads encountered, terminating traversal " + this.nRecords)); - done = true; - } - - printProgress("loci", locus.getLocation()); - if (pastFinalLocation(locus.getLocation())) - done = true; - } - } - readIters.close(); + Iterator wrappedIter = WrapReadsIterator( readIter, false ); + sum = carryWalkerOverInterval(walker, wrappedIter, sum, interval); + readIter.close(); } printOnTraversalDone("loci", sum); @@ -691,7 +680,7 @@ public class TraversalEngine { * @return 0 on success */ protected int traverseByRead(ReadWalker walker) { - initializeReads(false); + samReadIter = initializeReads(); if (refFileName == null && !walker.requiresOrderedReads() && verifyingSamReadIter != null) { logger.warn(String.format("STATUS: No reference file provided and unordered reads are tolerated, enabling out of order read processing."));