diff --git a/java/src/edu/mit/broad/sting/atk/TraversalEngine.java b/java/src/edu/mit/broad/sting/atk/TraversalEngine.java index 794097300..f58aee30f 100755 --- a/java/src/edu/mit/broad/sting/atk/TraversalEngine.java +++ b/java/src/edu/mit/broad/sting/atk/TraversalEngine.java @@ -37,6 +37,7 @@ public class TraversalEngine { private ReferenceSequenceFile refFile = null; private ReferenceIterator refIter = null; private SAMFileReader readStream; + private Iterator samReadIter = null; private int nReads = 0; private int nSkippedReads = 0; @@ -44,6 +45,7 @@ public class TraversalEngine { private int nNotPrimary = 0; private int nBadAlignments = 0; private int nSkippedIndels = 0; + private FileProgressTracker samReadingTracker = null; public boolean DEBUGGING = false; @@ -59,6 +61,31 @@ public class TraversalEngine { refFileName = ref; this.rods = Arrays.asList(rods); } + + protected int initialize() { + startTime = System.currentTimeMillis(); + loadReference(); + //testReference(); + //loadReference(); + try { + final FileInputStream samFileStream = new FileInputStream(readsFile); + final InputStream bufferedStream= new BufferedInputStream(samFileStream); + //final InputStream bufferedStream= new BufferedInputStream(samInputStream, 10000000); + final SAMFileReader samReader = new SAMFileReader(bufferedStream, true); + samReader.setValidationStringency(strictness); + + final SAMFileHeader header = samReader.getFileHeader(); + System.err.println("Sort order is: " + header.getSortOrder()); + + samReadingTracker = new FileProgressTracker( readsFile, samReader.iterator(), samFileStream.getChannel(), 1000 ); + samReadIter = samReadingTracker; + } + catch (IOException e) { + throw new RuntimeIOException(e); + } + + return 0; + } public void setRegion(final String reg) { regionStr = regionStr; } public void setTraversalType(final String type) { traversalType = type; } @@ -127,7 +154,10 @@ public class TraversalEngine { if ( mustPrint || nRecords % 100000 == 0 ) { final double elapsed = (System.currentTimeMillis() - startTime) / 1000.0; final double secsPer1MReads = (elapsed * 1000000.0) / nRecords; + System.out.printf("Traversed %d %s %.2f secs (%.2f secs per 1M %s)%n", nRecords, type, elapsed, secsPer1MReads, type); + + System.out.printf(" -> %s%n", samReadingTracker.progressMeter()); } } @@ -192,15 +222,6 @@ public class TraversalEngine { // traversal by loci functions // // -------------------------------------------------------------------------------------------------------------- - protected int initialize() { - startTime = System.currentTimeMillis(); - loadReference(); - //testReference(); - //loadReference(); - readStream = initializeReadStreams(); - return 0; - } - class locusStreamFilterFunc implements SamRecordFilter { public boolean filterOut(SAMRecord rec) { boolean result = false; @@ -243,7 +264,7 @@ public class TraversalEngine { protected int traverseByLoci(LocusWalker walker) { walker.initialize(); - FilteringIterator filterIter = new FilteringIterator(readStream.iterator(), new locusStreamFilterFunc()); + FilteringIterator filterIter = new FilteringIterator(samReadIter, new locusStreamFilterFunc()); CloseableIterator iter = new LocusIterator(filterIter); List rodIters = initializeRODs(); @@ -301,14 +322,14 @@ public class TraversalEngine { // -------------------------------------------------------------------------------------------------------------- protected int traverseByRead(ReadWalker walker) { walker.initialize(); - CloseableIterator iter = readStream.iterator(); + R sum = walker.reduceInit(); boolean done = false; - while ( iter.hasNext() && ! done ) { + while ( samReadIter.hasNext() && ! done ) { this.nRecords++; // actually get the read and hand it to the walker - final SAMRecord read = iter.next(); + final SAMRecord read = samReadIter.next(); GenomeLoc loc = new GenomeLoc(read.getReferenceName(), read.getAlignmentStart()); if ( inLocations(loc) ) { @@ -337,32 +358,4 @@ public class TraversalEngine { walker.onTraveralDone(); return 0; } - - // - // - // Prepare the input streams - // - // - private SAMFileReader initializeReadStreams() { - SAMFileReader reader = getSamReader(readsFile); - return reader; - } - - private SAMFileReader getSamReader(final File samFile) { - try { - final InputStream samInputStream = new FileInputStream(samFile); - final InputStream bufferedStream= new BufferedInputStream(samInputStream); - //final InputStream bufferedStream= new BufferedInputStream(samInputStream, 10000000); - final SAMFileReader samReader = new SAMFileReader(bufferedStream, true); - samReader.setValidationStringency(strictness); - - final SAMFileHeader header = samReader.getFileHeader(); - System.err.println("Sort order is: " + header.getSortOrder()); - - return samReader; - } - catch (IOException e) { - throw new RuntimeIOException(e); - } - } } \ No newline at end of file diff --git a/java/src/edu/mit/broad/sting/utils/FileProgressTracker.java b/java/src/edu/mit/broad/sting/utils/FileProgressTracker.java new file mode 100644 index 000000000..6d5e8a15a --- /dev/null +++ b/java/src/edu/mit/broad/sting/utils/FileProgressTracker.java @@ -0,0 +1,176 @@ +package edu.mit.broad.sting.utils; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.nio.channels.FileChannel; + +/** + * Created by IntelliJ IDEA. + * User: mdepristo + * Date: Mar 2, 2009 + * Time: 2:25:18 PM + * + * This class is intended to track the reading of files composed of records of approximately equivalent + * size. It can be used to estimate time to completion, complete read, performance of io, etc. + * + */ +public class FileProgressTracker implements Iterator { + private static int DEFAULT_HISTORY_SIZE = 1000; + + private int historySize = DEFAULT_HISTORY_SIZE; + private int samplingFrequency = 1000; + private File file; + private FileChannel channel; + private ArrayList history; + private long nNexts = 0; + private Iterator it = null; + private long startTime = -1; + private int historyI = 0; + + public FileProgressTracker( File file, Iterator it, FileChannel channel, int historySize ) { + this.file = file; + this.channel = channel; + this.it = it; + this.historySize = historySize; + this.history = new ArrayList(Collections.nCopies(historySize, 0L)); + startTime = System.currentTimeMillis(); + } + + public FileProgressTracker( File file, Iterator it, FileChannel channel ) { + this(file, it, channel, DEFAULT_HISTORY_SIZE); + } + + // ----------------------------------------------------------------- + // + // iterator support + // + // ----------------------------------------------------------------- + public boolean hasNext() { return it.hasNext(); } + + public T next() { + T x = it.next(); + if ( nNexts % samplingFrequency == 0 ) { + inc(); + //printStatus(); + } + nNexts++; + return x; + } + + public void remove () { + it.remove(); + } + + /** + * Fundamental operation -- must be called ever time a record is read from the file + * Enables the system to track the relationship between file byte offsets and record + * sizes. + */ + public void inc() { + int i = historyIndex(); + long pos = getPosition(); + history.set(i, pos); + historyI++; + +// for ( long x : history ) { +// System.out.printf("%d ", x); +// } +// System.out.printf("%n"); +// +// for ( long x : recordSizes() ) { +// System.out.printf("%d ", x); +// } +// System.out.printf("%n"); + } + + public long nRecordsProcessed() { + return nNexts; + } + + public double elapsedTimeInSecs() { + return (System.currentTimeMillis() - startTime) / 1000.0; + } + + public int historyIndex() { + return historyIndex(historyI); + } + public int historyIndex(long index) { + return (int)((index + historySize) % historySize); + } + + public int averageRecordSize() { + return Math.round((int)Utils.average(recordSizes(), Math.min(historyI - 1, history.size())) / samplingFrequency); + } + + public double processingRate() { + return nRecordsProcessed() / elapsedTimeInSecs(); + } + + public long estRecordsInFile() { + return (long)(getFileSize() / averageRecordSize()); + } + + public double estFractionProgressThroughFile() { + return (1.0 * nRecordsProcessed()) / estRecordsInFile(); + } + + public double estTimeTotal() { + return estRecordsInFile() / processingRate(); + } + + public double estTimeRemaining() { + return estTimeTotal() * ( 1 - estFractionProgressThroughFile() ); + } + + public void printStatus() { + System.out.printf("FileProgressTracker:%n"); + System.out.printf(" -> File size is: %d%n", getFileSize()); + System.out.printf(" -> File position: %d%n", getPosition()); + System.out.printf(" -> Number of records processed: %d%n", nRecordsProcessed()); + System.out.printf(" -> Average record size is %d%n", averageRecordSize()); + System.out.printf(" -> Elapsed time in secs is %.2f%n", elapsedTimeInSecs()); + System.out.printf(" -> Processing rate (records per second) %.2f%n", processingRate()); + System.out.printf(" -> Estimated number of records in file %d%n", estRecordsInFile()); + System.out.printf(" -> Estimated percent progress through file %.2f%n", estFractionProgressThroughFile() * 100.0); + System.out.printf(" -> Estimated time for entire processing %.2f hrs / %.2f min / %.2f sec%n", estTimeTotal() / (60*60), estTimeTotal() / (60), estTimeTotal()); + System.out.printf(" -> Estimated time remaining %.2f hrs / %.2f min / %.2f sec%n", estTimeRemaining() / (60*60), estTimeRemaining() / 60, estTimeRemaining()); + } + + public String progressMeter() { + return String.format("Est. %.2f%% completed, time remaining (%.2f hrs / %.2f min) of (%.2f hrs / %.2f min) total", + estFractionProgressThroughFile() * 100.0, + estTimeTotal() / (60*60), estTimeTotal() / (60), + estTimeRemaining() / (60*60), estTimeRemaining() / 60); + } + + public ArrayList recordSizes() { + ArrayList sizes = new ArrayList(history); + for ( int i = 0; i < historySize; i++ ) { + sizes.set(i, history.get(historyIndex(i)) - history.get(historyIndex(i-1))); + } + +// for ( long size : sizes ) { +// System.out.printf("%d ", size); +// } +// System.out.printf("%n"); + + return sizes; + } + + private final long getPosition() { + try { + return channel.position(); + } catch ( IOException e ) { + return 0; + } + } + + private final long getFileSize() { + return file.length(); + } + + +} diff --git a/java/src/edu/mit/broad/sting/utils/Utils.java b/java/src/edu/mit/broad/sting/utils/Utils.java index a9e2150a0..b85793cce 100755 --- a/java/src/edu/mit/broad/sting/utils/Utils.java +++ b/java/src/edu/mit/broad/sting/utils/Utils.java @@ -72,6 +72,26 @@ public class Utils { return join( separator, strings.toArray(new String[0]) ); } + public static double average(List vals, int maxI) { + long sum = 0L; + + int i = 0; + for ( long x : vals ) { + if ( i > maxI ) + break; + sum += x; + i++; + } + + //System.out.printf("Sum = %d, n = %d, avg = %f%n", sum, i, (1.0 * sum) / i); + + return (1.0 * sum) / i; + } + + public static double average(List vals) { + return average(vals, vals.size()); + } + public static void setupRefContigOrdering(final ReferenceSequenceFile refFile) { List refContigs = refFile.getSequenceDictionary(); HashMap refContigOrdering = new HashMap();