From 76027d17e654e7a11d383d6f0e56914c02541216 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Thu, 13 Sep 2012 15:32:28 -0400 Subject: [PATCH] Add a few more UnitTests for InputProducer -- Cleaned up function calls for clarity --- .../utils/nanoScheduler/InputProducer.java | 39 +++++--- .../utils/nanoScheduler/NanoScheduler.java | 2 +- .../nanoScheduler/InputProducerUnitTest.java | 97 +++++++++++++++++-- 3 files changed, 117 insertions(+), 21 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java index 2e5003ff0..d0d25e886 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/InputProducer.java @@ -30,7 +30,7 @@ class InputProducer implements Runnable { * Have we read the last value from inputReader? * * Must be a local variable, as inputReader.hasNext() can actually end up doing a lot - * of work, and the method getNElementsInInputStream() is supposed to be called not in the + * of work, and the method getNumInputValues() is supposed to be called not in the * thread executing the reading of values but in the thread enqueuing results */ boolean readLastValue = false; @@ -61,8 +61,17 @@ class InputProducer implements Runnable { * * @return the total number of elements in input stream, or -1 if some are still to be read */ - public synchronized int getNElementsInInputStream() { - return readLastValue ? nRead : -1; + public synchronized int getNumInputValues() { + return allInputsHaveBeenRead() ? nRead : -1; + } + + /** + * Returns true if all of the elements have been read from the input stream + * + * @return true if all of the elements have been read from the input stream + */ + public synchronized boolean allInputsHaveBeenRead() { + return readLastValue; } /** @@ -100,17 +109,10 @@ class InputProducer implements Runnable { public void run() { try { while ( true ) { - final InputType value = readNextItem(); - if ( value == null ) { - // add the EOF marker - // add the EOF object so our consumer knows we are done in all inputs - outputQueue.put(new InputValue()); - + final InputValue inputValue = runOne(); + outputQueue.put(inputValue); + if ( inputValue.isEOFMarker() ) break; - } else { - // add the actual value - outputQueue.put(new InputValue(value)); - } } latch.countDown(); @@ -119,6 +121,17 @@ class InputProducer implements Runnable { } } + protected InputValue runOne() throws InterruptedException { + final InputType value = readNextItem(); + if ( value == null ) { + // add the EOF object so our consumer knows we are done in all inputs + return new InputValue(); + } else { + // add the actual value + return new InputValue(value); + } + } + /** * Block until all of the items have been read from inputReader. * diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java index 08f29d155..0aa27f662 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/NanoScheduler.java @@ -383,7 +383,7 @@ public class NanoScheduler { * @return */ private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer inputProducer) { - final int nReadItems = inputProducer.getNElementsInInputStream(); + final int nReadItems = inputProducer.getNumInputValues(); return nReadItems == -1 || nJobsSubmitted < nReadItems; } diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index 829fc2f12..3baca66ef 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -8,10 +8,12 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.Semaphore; /** * UnitTests for the InputProducer @@ -47,20 +49,21 @@ public class InputProducerUnitTest extends BaseTest { final ExecutorService es = Executors.newSingleThreadExecutor(); - Assert.assertEquals(ip.getNElementsInInputStream(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet"); + Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I haven't started reading yet"); + Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet"); es.submit(ip); int lastValue = -1; int nRead = 0; while ( true ) { - final int nTotalElements = ip.getNElementsInInputStream(); + final int nTotalElements = ip.getNumInputValues(); final int observedQueueSize = readQueue.size(); Assert.assertTrue(observedQueueSize <= queueSize, "Reader is enqueuing more elements " + observedQueueSize + " than allowed " + queueSize); if ( nRead + observedQueueSize < nElements ) - Assert.assertEquals(nTotalElements, -1, "getNElementsInInputStream should have returned -1 with not all elements read"); + Assert.assertEquals(nTotalElements, -1, "getNumInputValues should have returned -1 with not all elements read"); // note, cannot test else case because elements input could have emptied between calls final InputProducer.InputValue value = readQueue.take(); @@ -77,7 +80,9 @@ public class InputProducerUnitTest extends BaseTest { } } - Assert.assertEquals(ip.getNElementsInInputStream(), nElements, "Wrong number of total elements getNElementsInInputStream"); + Assert.assertTrue(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs haven't been read, but I read them all"); + Assert.assertEquals(ip.getNumInputValues(), nElements, "Wrong number of total elements getNumInputValues"); + es.shutdownNow(); } @Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) @@ -95,10 +100,88 @@ public class InputProducerUnitTest extends BaseTest { ip.waitForDone(); - Assert.assertEquals(ip.getNElementsInInputStream(), nElements, "InputProvider told me that the queue was done, but I haven't started reading yet"); + Assert.assertEquals(ip.getNumInputValues(), nElements, "InputProvider told me that the queue was done, but I haven't started reading yet"); Assert.assertEquals(readQueue.size(), nElements + 1, "readQueue should have had all elements read into it"); } - // TODO -- add a test that really tests ip.getNElementsInInputStream - // Create an iterator, containing a semaphore, that allows us to step through the reader + final static class BlockingIterator implements Iterator { + final Semaphore blockNext = new Semaphore(0); + final Semaphore blockOnNext = new Semaphore(0); + final Iterator underlyingIterator; + + BlockingIterator(Iterator underlyingIterator) { + this.underlyingIterator = underlyingIterator; + } + + public void allowNext() { + blockNext.release(1); + } + + public void blockTillNext() throws InterruptedException { + blockOnNext.acquire(1); + } + + @Override + public boolean hasNext() { + return underlyingIterator.hasNext(); + } + + @Override + public T next() { + try { + blockNext.acquire(1); + T value = underlyingIterator.next(); + blockOnNext.release(1); + return value; + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("x"); + } + } + + // TODO -- this doesn't work because the synchronization in InputProvider... +// @Test(enabled = false, dataProvider = "InputProducerTest", dependsOnMethods = "testInputProducer", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) +// public void testInputProducerSingleStepIterator(final int nElements, final int queueSize) throws InterruptedException { +// +// final List elements = new ArrayList(nElements); +// for ( int i = 0; i < nElements; i++ ) elements.add(i); +// +// //final BlockingIterator myIterator = new BlockingIterator(elements.iterator()); +// +// final LinkedBlockingDeque.InputValue> readQueue = +// new LinkedBlockingDeque.InputValue>(queueSize); +// +// final InputProducer ip = new InputProducer(elements.iterator(), new SimpleTimer(), readQueue); +// +// final ExecutorService es = Executors.newSingleThreadExecutor(); +// +// Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I haven't started reading yet"); +// Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet"); +// +// //es.submit(ip); +// +// for ( int nCycles = 0; nCycles < nElements; nCycles++ ) { +// Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I'm not down reading yet"); +// Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I'm not down reading yet"); +// +//// final int observedQueueSize = readQueue.size(); +//// Assert.assertEquals(observedQueueSize, nCycles, "Reader enqueued " + observedQueueSize + " elements but expected expected " + nCycles); +// +// //myIterator.allowNext(); +// //myIterator.blockTillNext(); +// ip.runOne(); +// } +// +// //myIterator.allowNext(); +// //Thread.sleep(100); +// +// Assert.assertTrue(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs haven't been read, but I read them all"); +// Assert.assertEquals(ip.getNumInputValues(), nElements, "Wrong number of total elements getNumInputValues"); +// es.shutdownNow(); +// } }