diff --git a/java/src/org/broadinstitute/sting/gatk/iterators/ThreadedIterator.java b/java/src/org/broadinstitute/sting/gatk/iterators/ThreadedIterator.java index 10c74a340..42e52085e 100755 --- a/java/src/org/broadinstitute/sting/gatk/iterators/ThreadedIterator.java +++ b/java/src/org/broadinstitute/sting/gatk/iterators/ThreadedIterator.java @@ -1,5 +1,7 @@ package org.broadinstitute.sting.gatk.iterators; +import org.apache.log4j.Logger; + import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -18,11 +20,21 @@ public class ThreadedIterator implements Iterator, Runnable { private int nOps = 0; private final int printStateFreq = -1; + protected static Logger logger = Logger.getLogger(ThreadedIterator.class); + public void run() { try { while (it.hasNext()) { - queue.put(it.next()); - printState("addNext"); + // Spin loop...if queue has remaining capacity, add more elements. + // Otherwise, yield to the thread(s) waiting for the data. + if(queue.remainingCapacity() > 0 ) { + synchronized(this) { + queue.put(it.next()); + printState("addNext"); + } + } + else + Thread.yield(); } } catch (InterruptedException ex) { // bail out @@ -32,7 +44,7 @@ public class ThreadedIterator implements Iterator, Runnable { public synchronized void printState(final String op) { if ( printStateFreq != -1 && nOps++ % printStateFreq == 0 ) - System.out.printf(" [%s] Queue has %d elements %d ops%n", op, queue.size(), nOps); + logger.info(String.format(" [%s] Queue has %d elements %d ops%n", op, queue.size(), nOps)); } public ThreadedIterator(Iterator it, int buffSize) { @@ -41,7 +53,7 @@ public class ThreadedIterator implements Iterator, Runnable { new Thread(this).start(); } - public boolean hasNext() { + public synchronized boolean hasNext() { return queue.peek() != null || it.hasNext(); } @@ -51,7 +63,7 @@ public class ThreadedIterator implements Iterator, Runnable { return queue.poll(10, TimeUnit.SECONDS); } catch (InterruptedException ex) { // bail out - System.out.printf("ThreadedIterator next() timed out...%n"); + logger.info("ThreadedIterator next() timed out..."); printState("getNext"); return null; }