Add a few more UnitTests for InputProducer

-- Cleaned up function calls for clarity
This commit is contained in:
Mark DePristo 2012-09-13 15:32:28 -04:00
parent 7605c6bcc4
commit 76027d17e6
3 changed files with 117 additions and 21 deletions

View File

@ -30,7 +30,7 @@ class InputProducer<InputType> implements Runnable {
* Have we read the last value from inputReader?
*
* Must be a local variable, as inputReader.hasNext() can actually end up doing a lot
* of work, and the method getNElementsInInputStream() is supposed to be called not in the
* of work, and the method getNumInputValues() is supposed to be called not in the
* thread executing the reading of values but in the thread enqueuing results
*/
boolean readLastValue = false;
@ -61,8 +61,17 @@ class InputProducer<InputType> implements Runnable {
*
* @return the total number of elements in input stream, or -1 if some are still to be read
*/
public synchronized int getNElementsInInputStream() {
return readLastValue ? nRead : -1;
public synchronized int getNumInputValues() {
return allInputsHaveBeenRead() ? nRead : -1;
}
/**
* Returns true if all of the elements have been read from the input stream
*
* @return true if all of the elements have been read from the input stream
*/
public synchronized boolean allInputsHaveBeenRead() {
return readLastValue;
}
/**
@ -100,17 +109,10 @@ class InputProducer<InputType> implements Runnable {
public void run() {
try {
while ( true ) {
final InputType value = readNextItem();
if ( value == null ) {
// add the EOF marker
// add the EOF object so our consumer knows we are done in all inputs
outputQueue.put(new InputValue());
final InputValue inputValue = runOne();
outputQueue.put(inputValue);
if ( inputValue.isEOFMarker() )
break;
} else {
// add the actual value
outputQueue.put(new InputValue(value));
}
}
latch.countDown();
@ -119,6 +121,17 @@ class InputProducer<InputType> implements Runnable {
}
}
protected InputValue runOne() throws InterruptedException {
final InputType value = readNextItem();
if ( value == null ) {
// add the EOF object so our consumer knows we are done in all inputs
return new InputValue();
} else {
// add the actual value
return new InputValue(value);
}
}
/**
* Block until all of the items have been read from inputReader.
*

View File

@ -383,7 +383,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
* @return
*/
private boolean continueToSubmitJobs(final int nJobsSubmitted, final InputProducer<InputType> inputProducer) {
final int nReadItems = inputProducer.getNElementsInInputStream();
final int nReadItems = inputProducer.getNumInputValues();
return nReadItems == -1 || nJobsSubmitted < nReadItems;
}

View File

@ -8,10 +8,12 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
/**
* UnitTests for the InputProducer
@ -47,20 +49,21 @@ public class InputProducerUnitTest extends BaseTest {
final ExecutorService es = Executors.newSingleThreadExecutor();
Assert.assertEquals(ip.getNElementsInInputStream(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet");
Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I haven't started reading yet");
Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet");
es.submit(ip);
int lastValue = -1;
int nRead = 0;
while ( true ) {
final int nTotalElements = ip.getNElementsInInputStream();
final int nTotalElements = ip.getNumInputValues();
final int observedQueueSize = readQueue.size();
Assert.assertTrue(observedQueueSize <= queueSize,
"Reader is enqueuing more elements " + observedQueueSize + " than allowed " + queueSize);
if ( nRead + observedQueueSize < nElements )
Assert.assertEquals(nTotalElements, -1, "getNElementsInInputStream should have returned -1 with not all elements read");
Assert.assertEquals(nTotalElements, -1, "getNumInputValues should have returned -1 with not all elements read");
// note, cannot test else case because elements input could have emptied between calls
final InputProducer<Integer>.InputValue value = readQueue.take();
@ -77,7 +80,9 @@ public class InputProducerUnitTest extends BaseTest {
}
}
Assert.assertEquals(ip.getNElementsInInputStream(), nElements, "Wrong number of total elements getNElementsInInputStream");
Assert.assertTrue(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs haven't been read, but I read them all");
Assert.assertEquals(ip.getNumInputValues(), nElements, "Wrong number of total elements getNumInputValues");
es.shutdownNow();
}
@Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME)
@ -95,10 +100,88 @@ public class InputProducerUnitTest extends BaseTest {
ip.waitForDone();
Assert.assertEquals(ip.getNElementsInInputStream(), nElements, "InputProvider told me that the queue was done, but I haven't started reading yet");
Assert.assertEquals(ip.getNumInputValues(), nElements, "InputProvider told me that the queue was done, but I haven't started reading yet");
Assert.assertEquals(readQueue.size(), nElements + 1, "readQueue should have had all elements read into it");
}
// TODO -- add a test that really tests ip.getNElementsInInputStream
// Create an iterator, containing a semaphore, that allows us to step through the reader
final static class BlockingIterator<T> implements Iterator<T> {
final Semaphore blockNext = new Semaphore(0);
final Semaphore blockOnNext = new Semaphore(0);
final Iterator<T> underlyingIterator;
BlockingIterator(Iterator<T> underlyingIterator) {
this.underlyingIterator = underlyingIterator;
}
public void allowNext() {
blockNext.release(1);
}
public void blockTillNext() throws InterruptedException {
blockOnNext.acquire(1);
}
@Override
public boolean hasNext() {
return underlyingIterator.hasNext();
}
@Override
public T next() {
try {
blockNext.acquire(1);
T value = underlyingIterator.next();
blockOnNext.release(1);
return value;
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("x");
}
}
// TODO -- this doesn't work because the synchronization in InputProvider...
// @Test(enabled = false, dataProvider = "InputProducerTest", dependsOnMethods = "testInputProducer", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME)
// public void testInputProducerSingleStepIterator(final int nElements, final int queueSize) throws InterruptedException {
//
// final List<Integer> elements = new ArrayList<Integer>(nElements);
// for ( int i = 0; i < nElements; i++ ) elements.add(i);
//
// //final BlockingIterator<Integer> myIterator = new BlockingIterator<Integer>(elements.iterator());
//
// final LinkedBlockingDeque<InputProducer<Integer>.InputValue> readQueue =
// new LinkedBlockingDeque<InputProducer<Integer>.InputValue>(queueSize);
//
// final InputProducer<Integer> ip = new InputProducer<Integer>(elements.iterator(), new SimpleTimer(), readQueue);
//
// final ExecutorService es = Executors.newSingleThreadExecutor();
//
// Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I haven't started reading yet");
// Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I haven't started reading yet");
//
// //es.submit(ip);
//
// for ( int nCycles = 0; nCycles < nElements; nCycles++ ) {
// Assert.assertFalse(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs have been read, but I'm not down reading yet");
// Assert.assertEquals(ip.getNumInputValues(), -1, "InputProvider told me that the queue was done, but I'm not down reading yet");
//
//// final int observedQueueSize = readQueue.size();
//// Assert.assertEquals(observedQueueSize, nCycles, "Reader enqueued " + observedQueueSize + " elements but expected expected " + nCycles);
//
// //myIterator.allowNext();
// //myIterator.blockTillNext();
// ip.runOne();
// }
//
// //myIterator.allowNext();
// //Thread.sleep(100);
//
// Assert.assertTrue(ip.allInputsHaveBeenRead(), "InputProvider said that all inputs haven't been read, but I read them all");
// Assert.assertEquals(ip.getNumInputValues(), nElements, "Wrong number of total elements getNumInputValues");
// es.shutdownNow();
// }
}