From bf87de8a252bc566d820cf85cfe7dcc745d8e679 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 7 Sep 2012 09:51:32 -0400 Subject: [PATCH] UnitTests for ReducerThread and InputProducer -- Uncovered bug in ReducerThread in detecting abnormal case where jobs are coming in out of order --- .../utils/nanoScheduler/ReducerThread.java | 1 + .../nanoScheduler/InputProducerUnitTest.java | 6 +- .../nanoScheduler/ReducerThreadUnitTest.java | 94 +++++++++++++++++++ 3 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java diff --git a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java index bd29799b6..506e45453 100644 --- a/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java +++ b/public/java/src/org/broadinstitute/sting/utils/nanoScheduler/ReducerThread.java @@ -49,6 +49,7 @@ class ReducerThread implements Callable { // make sure the map results are coming in order throw new IllegalStateException("BUG: last jobID " + lastJobID + " > current jobID " + result.getJobID()); } else { + lastJobID = result.getJobID(); // apply reduce, keeping track of sum if ( reduceTimer != null ) reduceTimer.restart(); sum = reduce.apply(result.getValue(), sum); diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java index 0973db8a3..b3365c13c 100644 --- a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/InputProducerUnitTest.java @@ -13,7 +13,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; /** - * UnitTests for the NanoScheduler + * UnitTests for the InputProducer * * User: depristo * Date: 8/24/12 @@ -35,7 +35,7 @@ public class InputProducerUnitTest extends BaseTest { } @Test(enabled = true, dataProvider = "InputProducerTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) - public void testSingleThreadedNanoScheduler(final int nElements, final int queueSize) throws InterruptedException { + public void testInputProducer(final int nElements, final int queueSize) throws InterruptedException { final List elements = new ArrayList(nElements); for ( int i = 0; i < nElements; i++ ) elements.add(i); @@ -52,7 +52,7 @@ public class InputProducerUnitTest extends BaseTest { while ( true ) { final int observedQueueSize = readQueue.size(); Assert.assertTrue(observedQueueSize <= queueSize, - "Reader is enqueuing more elements " + queueSize + " than allowed " + queueSize); + "Reader is enqueuing more elements " + observedQueueSize + " than allowed " + queueSize); final InputProducer.InputValue value = readQueue.take(); if ( value.isLast() ) { diff --git a/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java new file mode 100644 index 000000000..61d1330bc --- /dev/null +++ b/public/java/test/org/broadinstitute/sting/utils/nanoScheduler/ReducerThreadUnitTest.java @@ -0,0 +1,94 @@ +package org.broadinstitute.sting.utils.nanoScheduler; + +import org.broadinstitute.sting.BaseTest; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.*; + +/** + * UnitTests for the InputProducer + * + * User: depristo + * Date: 8/24/12 + * Time: 11:25 AM + * To change this template use File | Settings | File Templates. + */ +public class ReducerThreadUnitTest extends BaseTest { + @DataProvider(name = "ReducerThreadTest") + public Object[][] createReducerThreadTest() { + List tests = new ArrayList(); + + for ( final int nElements : Arrays.asList(0, 1, 10, 100, 1000, 10000, 100000) ) { + tests.add(new Object[]{ nElements }); + } + + return tests.toArray(new Object[][]{}); + } + + @Test(enabled = true, dataProvider = "ReducerThreadTest", timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME) + public void testReducerThreadTest(final int nElements) throws Exception { + List values = new ArrayList(nElements); + List jobIDs = new ArrayList(nElements); + for ( int i = 0; i < nElements; i++ ) { + values.add(i); + jobIDs.add(i); + } + + runTests(values, jobIDs); + } + + @Test(enabled = true, timeOut = NanoSchedulerUnitTest.NANO_SCHEDULE_MAX_RUNTIME, expectedExceptions = ExecutionException.class) + public void testReducerThreadTestByJobOrder() throws Exception { + runTests(Arrays.asList(0, 1, 2), Arrays.asList(1, 3, 2)); + } + + private void runTests( final List mapValues, final List jobIDs) throws Exception { + final LinkedBlockingDeque>> mapResultsQueue = + new LinkedBlockingDeque>>(mapValues.size()+1); + + for ( int i = 0; i < mapValues.size(); i++ ) { + final int value = mapValues.get(i); + final int jobID = jobIDs.get(i); + final MapResult mapResult = new MapResult(value, jobID); + mapResultsQueue.add(new FutureValue>(mapResult)); + } + mapResultsQueue.add(new FutureValue>(new MapResult())); + + final ReduceSumTest reduce = new ReduceSumTest(mapResultsQueue); + final ReducerThread thread + = new ReducerThread(reduce, null, 0, mapResultsQueue); + + final ExecutorService es = Executors.newSingleThreadExecutor(); + final Future value = es.submit(thread); + value.get(); + + Assert.assertEquals(reduce.nRead, mapValues.size()); + } + + public class ReduceSumTest implements NSReduceFunction { + final LinkedBlockingDeque>> mapResultsQueue; + int nRead = 0; + int lastValue = -1; + + public ReduceSumTest(LinkedBlockingDeque>> mapResultsQueue) { + this.mapResultsQueue = mapResultsQueue; + } + + @Override public Integer apply(Integer one, Integer sum) { + Assert.assertTrue(lastValue < one, "Reduce came in out of order. Prev " + lastValue + " cur " + one); + + Assert.assertTrue(lastValue < one, "Read values coming out of order!"); + final int expected = lastValue + 1; + Assert.assertEquals((int)one, expected, "Value observed " + one + " not equal to the expected value " + expected); + nRead++; + lastValue = expected; + + return one + sum; + } + } +}