diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 4f9fedce3..89e44ce93 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -51,7 +51,8 @@ public class NanoScheduler { final int bufferSize; final int nThreads; - final ExecutorService executor; + final ExecutorService inputExecutor; + final ExecutorService mapExecutor; boolean shutdown = false; boolean debug = false; @@ -75,7 +76,8 @@ public class NanoScheduler { this.bufferSize = bufferSize; this.nThreads = nThreads; - this.executor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads-1); + this.mapExecutor = nThreads == 1 ? null : Executors.newFixedThreadPool(nThreads-1); + this.inputExecutor = Executors.newSingleThreadExecutor(); // start timing the time spent outside of the nanoScheduler outsideSchedulerTimer.start(); @@ -107,10 +109,10 @@ public class NanoScheduler { public void shutdown() { outsideSchedulerTimer.stop(); - if ( executor != null ) { - final List remaining = executor.shutdownNow(); + if ( mapExecutor != null ) { + final List remaining = mapExecutor.shutdownNow(); if ( ! remaining.isEmpty() ) - throw new IllegalStateException("Remaining tasks found in the executor, unexpected behavior!"); + throw new IllegalStateException("Remaining tasks found in the mapExecutor, unexpected behavior!"); } shutdown = true; @@ -236,8 +238,8 @@ public class NanoScheduler { boolean done = false; final BlockingQueue inputQueue = new LinkedBlockingDeque(bufferSize); - final InputProducer inputProducer = new InputProducer(inputReader, inputQueue); - inputProducer.start(); + + inputExecutor.submit(new InputProducer(inputReader, inputQueue)); while ( ! done ) { try { @@ -247,7 +249,7 @@ public class NanoScheduler { if ( ! inputs.isEmpty() ) { // send jobs for map - final Queue> mapQueue = submitMapJobs(map, executor, inputs); + final Queue> mapQueue = submitMapJobs(map, mapExecutor, inputs); // send off the reduce job, and block until we get at least one reduce result sum = reduceSerial(reduce, mapQueue, sum); @@ -311,7 +313,7 @@ public class NanoScheduler { return new Pair, Boolean>(inputs, done); } - private class InputProducer extends Thread { + private class InputProducer implements Runnable { final Iterator inputReader; final BlockingQueue outputQueue;