diff --git a/playground/java/src/org/broadinstitute/sting/atk/AnalysisTK.java b/playground/java/src/org/broadinstitute/sting/atk/AnalysisTK.java index bf69f1069..4b323632c 100644 --- a/playground/java/src/org/broadinstitute/sting/atk/AnalysisTK.java +++ b/playground/java/src/org/broadinstitute/sting/atk/AnalysisTK.java @@ -24,7 +24,8 @@ public class AnalysisTK extends CommandLineProgram { @Option(shortName="L", doc="Genome region to operation on: from chr:start-end", optional=true) public String REGION_STR = null; @Option(shortName="T", doc="Type of analysis to run") public String Analysis_Name = null; @Option(shortName="DBSNP", doc="DBSNP file", optional=true) public String DBSNP_FILE = null; - + @Option(shortName="THREADED_IO", doc="If true, enables threaded I/O operations", optional=true) public String ENABLED_THREADED_IO = "false"; + public static HashMap MODULES = new HashMap(); public static void addModule(final String name, final Object walker) { System.out.printf("* Adding module %s%n", name); @@ -74,8 +75,9 @@ public class AnalysisTK extends CommandLineProgram { } this.engine = new TraversalEngine(INPUT_FILE, REF_FILE_ARG, rods); - engine.initialize(); - + engine.initialize(ENABLED_THREADED_IO.toLowerCase().equals("true")); + //engine.testReference(); + ValidationStringency strictness; if ( STRICTNESS_ARG == null ) { strictness = ValidationStringency.STRICT; @@ -105,7 +107,7 @@ public class AnalysisTK extends CommandLineProgram { Object my_module; if (MODULES.containsKey(Analysis_Name)) { my_module = MODULES.get(Analysis_Name); - }else{ + } else { System.out.println("Could not find module "+Analysis_Name); return 0; } diff --git a/playground/java/src/org/broadinstitute/sting/atk/TraversalEngine.java b/playground/java/src/org/broadinstitute/sting/atk/TraversalEngine.java index 7ca448fe1..cccca14d8 100755 --- a/playground/java/src/org/broadinstitute/sting/atk/TraversalEngine.java +++ b/playground/java/src/org/broadinstitute/sting/atk/TraversalEngine.java @@ -8,6 +8,7 @@ import edu.mit.broad.picard.filter.SamRecordFilter; import edu.mit.broad.picard.filter.FilteringIterator; import edu.mit.broad.picard.reference.ReferenceSequenceFile; import edu.mit.broad.picard.reference.ReferenceSequenceFileFactory; +import edu.mit.broad.picard.reference.ReferenceSequence; import org.broadinstitute.sting.utils.*; import java.io.*; @@ -65,6 +66,7 @@ public class TraversalEngine { public boolean DEBUGGING = false; public long N_RECORDS_TO_PRINT = 100000; + public int THREADED_IO_BUFFER_SIZE = 10000; // Locations we are going to process during the traversal private GenomeLoc[] locs = null; @@ -258,7 +260,7 @@ public class TraversalEngine { * * @return true on success */ - public boolean initialize() { + public boolean initialize(final boolean THREADED_IO) { lastProgressPrintTime = startTime = System.currentTimeMillis(); loadReference(); //testReference(); @@ -275,6 +277,12 @@ public class TraversalEngine { samReadingTracker = new FileProgressTracker( readsFile, samReader.iterator(), samFileStream.getChannel(), 1000 ); samReadIter = samReadingTracker; + + if ( THREADED_IO ) { + System.out.printf("Enabling threaded I/O with buffer of %d reads%n", THREADED_IO_BUFFER_SIZE); + samReadIter = new ThreadedIterator(samReadIter, THREADED_IO_BUFFER_SIZE); + } + } catch (IOException e) { throw new RuntimeIOException(e); @@ -310,22 +318,13 @@ public class TraversalEngine { return rodIters; } -// protected void testReference() { -// String line = ""; -// refIter.seekForward("chr20", 79); -// for ( int i = 0; i < this.maxReads && refIter.hasNext(); i++ ) { -// final ReferenceIterator refSite = refIter.next(); -// final char refBase = refSite.getBaseAsChar(); -// line += refBase; -// if ( (i + 1) % 80 == 0 ) { -// System.out.println(line); -// line = ""; -// } -// //System.out.printf(" Reference: %s:%d %c%n", refSite.getCurrentContig().getName(), refSite.getPosition(), refBase); -// } -// System.out.println(line); -// System.exit(1); -// } + protected void testReference() { + while (true) { + ReferenceSequence ref = refFile.nextSequence(); + System.out.printf("%s %d %d%n", ref.getName(), ref.length(), System.currentTimeMillis()); + printProgress(true, "loci", new GenomeLoc("foo", 1)); + } + } // -------------------------------------------------------------------------------------------------------------- // diff --git a/playground/java/src/org/broadinstitute/sting/utils/ThreadedIterator.java b/playground/java/src/org/broadinstitute/sting/utils/ThreadedIterator.java new file mode 100755 index 000000000..e88393f47 --- /dev/null +++ b/playground/java/src/org/broadinstitute/sting/utils/ThreadedIterator.java @@ -0,0 +1,62 @@ +package org.broadinstitute.sting.utils; + +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Created by IntelliJ IDEA. + * User: depristo + * Date: Feb 24, 2009 + * Time: 10:24:38 AM + * To change this template use File | Settings | File Templates. + */ +public class ThreadedIterator implements Iterator, Runnable { + private Iterator it; + private final BlockingQueue queue; + private int nOps = 0; + + public void run() { + try { + while (it.hasNext()) { + queue.put(it.next()); + printState("addNext"); + } + } catch (InterruptedException ex) { + // bail out + ; + } + } + + public synchronized void printState(final String op) { + if ( nOps++ % 100000 == 0 ) + System.out.printf(" [%s] Queue has %d elements %d ops%n", op, queue.size(), nOps); + } + + public ThreadedIterator(Iterator it, int buffSize) { + this.it = it; + queue = new LinkedBlockingQueue(buffSize); + new Thread(this).start(); + } + + public boolean hasNext() { + return queue.peek() != null || it.hasNext(); + } + + public T next() { + printState("getNext"); + try { + return queue.poll(10, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + // bail out + System.out.printf("ThreadedIterator next() timed out...%n"); + printState("getNext"); + return null; + } + } + + public void remove () { + throw new UnsupportedOperationException(); + } +} \ No newline at end of file