Support for threaded IO!

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@45 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2009-03-13 14:50:45 +00:00
parent 851254970c
commit 2a8dc05f2e
3 changed files with 84 additions and 21 deletions

View File

@ -24,7 +24,8 @@ public class AnalysisTK extends CommandLineProgram {
@Option(shortName="L", doc="Genome region to operation on: from chr:start-end", optional=true) public String REGION_STR = null;
@Option(shortName="T", doc="Type of analysis to run") public String Analysis_Name = null;
@Option(shortName="DBSNP", doc="DBSNP file", optional=true) public String DBSNP_FILE = null;
@Option(shortName="THREADED_IO", doc="If true, enables threaded I/O operations", optional=true) public String ENABLED_THREADED_IO = "false";
public static HashMap<String, Object> MODULES = new HashMap<String,Object>();
public static void addModule(final String name, final Object walker) {
System.out.printf("* Adding module %s%n", name);
@ -74,8 +75,9 @@ public class AnalysisTK extends CommandLineProgram {
}
this.engine = new TraversalEngine(INPUT_FILE, REF_FILE_ARG, rods);
engine.initialize();
engine.initialize(ENABLED_THREADED_IO.toLowerCase().equals("true"));
//engine.testReference();
ValidationStringency strictness;
if ( STRICTNESS_ARG == null ) {
strictness = ValidationStringency.STRICT;
@ -105,7 +107,7 @@ public class AnalysisTK extends CommandLineProgram {
Object my_module;
if (MODULES.containsKey(Analysis_Name)) {
my_module = MODULES.get(Analysis_Name);
}else{
} else {
System.out.println("Could not find module "+Analysis_Name);
return 0;
}

View File

@ -8,6 +8,7 @@ import edu.mit.broad.picard.filter.SamRecordFilter;
import edu.mit.broad.picard.filter.FilteringIterator;
import edu.mit.broad.picard.reference.ReferenceSequenceFile;
import edu.mit.broad.picard.reference.ReferenceSequenceFileFactory;
import edu.mit.broad.picard.reference.ReferenceSequence;
import org.broadinstitute.sting.utils.*;
import java.io.*;
@ -65,6 +66,7 @@ public class TraversalEngine {
public boolean DEBUGGING = false;
public long N_RECORDS_TO_PRINT = 100000;
public int THREADED_IO_BUFFER_SIZE = 10000;
// Locations we are going to process during the traversal
private GenomeLoc[] locs = null;
@ -258,7 +260,7 @@ public class TraversalEngine {
*
* @return true on success
*/
public boolean initialize() {
public boolean initialize(final boolean THREADED_IO) {
lastProgressPrintTime = startTime = System.currentTimeMillis();
loadReference();
//testReference();
@ -275,6 +277,12 @@ public class TraversalEngine {
samReadingTracker = new FileProgressTracker<SAMRecord>( readsFile, samReader.iterator(), samFileStream.getChannel(), 1000 );
samReadIter = samReadingTracker;
if ( THREADED_IO ) {
System.out.printf("Enabling threaded I/O with buffer of %d reads%n", THREADED_IO_BUFFER_SIZE);
samReadIter = new ThreadedIterator<SAMRecord>(samReadIter, THREADED_IO_BUFFER_SIZE);
}
}
catch (IOException e) {
throw new RuntimeIOException(e);
@ -310,22 +318,13 @@ public class TraversalEngine {
return rodIters;
}
// protected void testReference() {
// String line = "";
// refIter.seekForward("chr20", 79);
// for ( int i = 0; i < this.maxReads && refIter.hasNext(); i++ ) {
// final ReferenceIterator refSite = refIter.next();
// final char refBase = refSite.getBaseAsChar();
// line += refBase;
// if ( (i + 1) % 80 == 0 ) {
// System.out.println(line);
// line = "";
// }
// //System.out.printf(" Reference: %s:%d %c%n", refSite.getCurrentContig().getName(), refSite.getPosition(), refBase);
// }
// System.out.println(line);
// System.exit(1);
// }
protected void testReference() {
while (true) {
ReferenceSequence ref = refFile.nextSequence();
System.out.printf("%s %d %d%n", ref.getName(), ref.length(), System.currentTimeMillis());
printProgress(true, "loci", new GenomeLoc("foo", 1));
}
}
// --------------------------------------------------------------------------------------------------------------
//

View File

@ -0,0 +1,62 @@
package org.broadinstitute.sting.utils;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by IntelliJ IDEA.
* User: depristo
* Date: Feb 24, 2009
* Time: 10:24:38 AM
* To change this template use File | Settings | File Templates.
*/
public class ThreadedIterator<T> implements Iterator<T>, Runnable {
private Iterator<T> it;
private final BlockingQueue<T> queue;
private int nOps = 0;
public void run() {
try {
while (it.hasNext()) {
queue.put(it.next());
printState("addNext");
}
} catch (InterruptedException ex) {
// bail out
;
}
}
public synchronized void printState(final String op) {
if ( nOps++ % 100000 == 0 )
System.out.printf(" [%s] Queue has %d elements %d ops%n", op, queue.size(), nOps);
}
public ThreadedIterator(Iterator<T> it, int buffSize) {
this.it = it;
queue = new LinkedBlockingQueue<T>(buffSize);
new Thread(this).start();
}
public boolean hasNext() {
return queue.peek() != null || it.hasNext();
}
public T next() {
printState("getNext");
try {
return queue.poll(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
// bail out
System.out.printf("ThreadedIterator next() timed out...%n");
printState("getNext");
return null;
}
}
public void remove () {
throw new UnsupportedOperationException();
}
}