Restored serial version of reader initialization. Parallel mode is default.

-- Serial version can be re-enabled with a static boolean, if we decide to return to the serial version
This commit is contained in:
Mark DePristo 2011-12-16 09:05:28 -05:00
parent 16a563889f
commit fb1c9d2abc
1 changed files with 83 additions and 54 deletions

View File

@ -63,6 +63,7 @@ import java.util.concurrent.*;
*/ */
public class SAMDataSource { public class SAMDataSource {
final private static GATKSamRecordFactory factory = new GATKSamRecordFactory(); final private static GATKSamRecordFactory factory = new GATKSamRecordFactory();
final private static boolean USE_PARALLEL_LOADING = true;
/** Backing support for reads. */ /** Backing support for reads. */
protected final ReadProperties readProperties; protected final ReadProperties readProperties;
@ -714,26 +715,45 @@ public class SAMDataSource {
* @param validationStringency validation stringency. * @param validationStringency validation stringency.
*/ */
public SAMReaders(Collection<SAMReaderID> readerIDs, SAMFileReader.ValidationStringency validationStringency) { public SAMReaders(Collection<SAMReaderID> readerIDs, SAMFileReader.ValidationStringency validationStringency) {
final int N_THREADS = 8; final int totalNumberOfFiles = readerIDs.size();
int totalNumberOfFiles = readerIDs.size();
int readerNumber = 1; int readerNumber = 1;
final SimpleTimer timer = new SimpleTimer().start();
ExecutorService executor = Executors.newFixedThreadPool(N_THREADS); logger.info("Initializing SAMRecords " + (USE_PARALLEL_LOADING ? "in parallel" : "in serial"));
if ( ! USE_PARALLEL_LOADING ) {
final int tickSize = 10;
int nExecutedTotal = 0;
long lastTick = timer.currentTime();
for(final SAMReaderID readerID: readerIDs) {
final ReaderInitializer init = new ReaderInitializer(readerID).call();
if (threadAllocation.getNumIOThreads() > 0) {
inputStreams.put(init.readerID, init.blockInputStream); // get from initializer
}
logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, readerID.samFile));
readers.put(init.readerID,init.reader);
if ( nExecutedTotal++ % tickSize == 0) {
int tickInMS = (int)((timer.currentTime() - lastTick) / 1000.0);
printReaderPerformance(nExecutedTotal, tickSize, totalNumberOfFiles, timer, tickInMS);
}
}
} else {
final int N_THREADS = 8;
final ExecutorService executor = Executors.newFixedThreadPool(N_THREADS);
final List<ReaderInitializer> inits = new ArrayList<ReaderInitializer>(totalNumberOfFiles); final List<ReaderInitializer> inits = new ArrayList<ReaderInitializer>(totalNumberOfFiles);
Queue<Future<ReaderInitializer>> futures = new LinkedList<Future<ReaderInitializer>>(); Queue<Future<ReaderInitializer>> futures = new LinkedList<Future<ReaderInitializer>>();
for (SAMReaderID readerID: readerIDs) { for (final SAMReaderID readerID: readerIDs) {
logger.debug("Enqueuing for initialization: " + readerID.samFile); logger.debug("Enqueuing for initialization: " + readerID.samFile);
final ReaderInitializer init = new ReaderInitializer(readerID); final ReaderInitializer init = new ReaderInitializer(readerID);
inits.add(init); inits.add(init);
futures.add(executor.submit(init)); futures.add(executor.submit(init));
} }
final SimpleTimer timer = new SimpleTimer();
try { try {
final int MAX_WAIT = 30 * 1000; final int MAX_WAIT = 30 * 1000;
final int MIN_WAIT = 1 * 1000; final int MIN_WAIT = 1 * 1000;
timer.start();
while ( ! futures.isEmpty() ) { while ( ! futures.isEmpty() ) {
final int prevSize = futures.size(); final int prevSize = futures.size();
final double waitTime = prevSize * (0.5 / N_THREADS); // about 0.5 seconds to load each file final double waitTime = prevSize * (0.5 / N_THREADS); // about 0.5 seconds to load each file
@ -754,18 +774,9 @@ public class SAMDataSource {
} }
} }
final int pendingSize = pending.size(); final int nExecutedTotal = totalNumberOfFiles - pending.size();
final int nExecutedInTick = prevSize - pendingSize; final int nExecutedInTick = prevSize - pending.size();
final int nExecutedTotal = totalNumberOfFiles - pendingSize; printReaderPerformance(nExecutedTotal, nExecutedInTick, totalNumberOfFiles, timer, (int)(waitTimeInMS / 1000));
final double totalTimeInSeconds = timer.getElapsedTime();
final double nTasksPerSecond = nExecutedTotal / (1.0*totalTimeInSeconds);
final int nRemaining = pendingSize;
final double estTimeToComplete = pendingSize / nTasksPerSecond;
logger.info(String.format("Init %d BAMs in last %d s, %d of %d in %.2f s / %.2f m (%.2f tasks/s). %d remaining with est. completion in %.2f s / %.2f m",
nExecutedInTick, (int)(waitTimeInMS / 1000.0),
nExecutedTotal, totalNumberOfFiles, totalTimeInSeconds, totalTimeInSeconds / 60, nTasksPerSecond,
nRemaining, estTimeToComplete, estTimeToComplete / 60));
futures = pending; futures = pending;
} }
} catch ( InterruptedException e ) { } catch ( InterruptedException e ) {
@ -774,10 +785,28 @@ public class SAMDataSource {
throw new ReviewedStingException("Execution exception during SAMReader initialization", e); throw new ReviewedStingException("Execution exception during SAMReader initialization", e);
} }
logger.info(String.format("Done initializing BAM readers: total time %.2f", timer.getElapsedTime()));
executor.shutdown(); executor.shutdown();
} }
logger.info(String.format("Done initializing BAM readers: total time %.2f", timer.getElapsedTime()));
}
final private void printReaderPerformance(final int nExecutedTotal,
final int nExecutedInTick,
final int totalNumberOfFiles,
final SimpleTimer timer, final int tickDuration) {
final int pendingSize = totalNumberOfFiles - nExecutedTotal;
final double totalTimeInSeconds = timer.getElapsedTime();
final double nTasksPerSecond = nExecutedTotal / (1.0*totalTimeInSeconds);
final int nRemaining = pendingSize;
final double estTimeToComplete = pendingSize / nTasksPerSecond;
logger.info(String.format("Init %d BAMs in last %d s, %d of %d in %.2f s / %.2f m (%.2f tasks/s). %d remaining with est. completion in %.2f s / %.2f m",
nExecutedInTick, tickDuration,
nExecutedTotal, totalNumberOfFiles, totalTimeInSeconds, totalTimeInSeconds / 60, nTasksPerSecond,
nRemaining, estTimeToComplete, estTimeToComplete / 60));
}
/** /**
* Retrieve the reader from the data structure. * Retrieve the reader from the data structure.
* @param id The ID of the reader to retrieve. * @param id The ID of the reader to retrieve.