Make the operation 'read from the iterator and place on the queue' atomic with respect to hasNext(), next().
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@513 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
998763950c
commit
282f1d88b8
|
|
@ -1,5 +1,7 @@
|
||||||
package org.broadinstitute.sting.gatk.iterators;
|
package org.broadinstitute.sting.gatk.iterators;
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
@ -18,11 +20,21 @@ public class ThreadedIterator<T> implements Iterator<T>, Runnable {
|
||||||
private int nOps = 0;
|
private int nOps = 0;
|
||||||
private final int printStateFreq = -1;
|
private final int printStateFreq = -1;
|
||||||
|
|
||||||
|
protected static Logger logger = Logger.getLogger(ThreadedIterator.class);
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
queue.put(it.next());
|
// Spin loop...if queue has remaining capacity, add more elements.
|
||||||
printState("addNext");
|
// Otherwise, yield to the thread(s) waiting for the data.
|
||||||
|
if(queue.remainingCapacity() > 0 ) {
|
||||||
|
synchronized(this) {
|
||||||
|
queue.put(it.next());
|
||||||
|
printState("addNext");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
Thread.yield();
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
// bail out
|
// bail out
|
||||||
|
|
@ -32,7 +44,7 @@ public class ThreadedIterator<T> implements Iterator<T>, Runnable {
|
||||||
|
|
||||||
public synchronized void printState(final String op) {
|
public synchronized void printState(final String op) {
|
||||||
if ( printStateFreq != -1 && nOps++ % printStateFreq == 0 )
|
if ( printStateFreq != -1 && nOps++ % printStateFreq == 0 )
|
||||||
System.out.printf(" [%s] Queue has %d elements %d ops%n", op, queue.size(), nOps);
|
logger.info(String.format(" [%s] Queue has %d elements %d ops%n", op, queue.size(), nOps));
|
||||||
}
|
}
|
||||||
|
|
||||||
public ThreadedIterator(Iterator<T> it, int buffSize) {
|
public ThreadedIterator(Iterator<T> it, int buffSize) {
|
||||||
|
|
@ -41,7 +53,7 @@ public class ThreadedIterator<T> implements Iterator<T>, Runnable {
|
||||||
new Thread(this).start();
|
new Thread(this).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasNext() {
|
public synchronized boolean hasNext() {
|
||||||
return queue.peek() != null || it.hasNext();
|
return queue.peek() != null || it.hasNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -51,7 +63,7 @@ public class ThreadedIterator<T> implements Iterator<T>, Runnable {
|
||||||
return queue.poll(10, TimeUnit.SECONDS);
|
return queue.poll(10, TimeUnit.SECONDS);
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
// bail out
|
// bail out
|
||||||
System.out.printf("ThreadedIterator next() timed out...%n");
|
logger.info("ThreadedIterator next() timed out...");
|
||||||
printState("getNext");
|
printState("getNext");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue