Parallel SAMDataSource initialization

-- Uses 8 threads to load BAM files and indices in parallel, decreasing costs to read thousands of BAM files by a significant amount
-- Added logger.info message noting progress and cost of reading low-level BAM data.
This commit is contained in:
Mark DePristo 2011-12-14 10:00:21 -05:00
parent 71b4bb12b7
commit 01e547eed3
1 changed files with 97 additions and 34 deletions

View File

@ -41,6 +41,7 @@ import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.baq.BAQ;
import org.broadinstitute.sting.utils.baq.BAQSamIterator;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
@ -51,6 +52,7 @@ import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
/**
* User: aaron
@ -199,7 +201,7 @@ public class SAMDataSource {
BAQ.QualityMode.DONT_MODIFY,
null, // no BAQ
(byte) -1);
}
}
/**
* Create a new SAM data source given the supplied read metadata.
@ -253,17 +255,15 @@ public class SAMDataSource {
if(readBufferSize != null)
ReadShard.setReadBufferSize(readBufferSize);
for (SAMReaderID readerID : samFiles) {
if (!readerID.samFile.canRead())
throw new UserException.CouldNotReadInputFile(readerID.samFile,"file is not present or user does not have appropriate permissions. " +
"Please check that the file is present and readable and try again.");
}
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
SAMReaders readers = resourcePool.getAvailableReaders();
// Determine the sort order.
for(SAMReaderID readerID: readerIDs) {
if (! readerID.samFile.canRead() )
throw new UserException.CouldNotReadInputFile(readerID.samFile,"file is not present or user does not have appropriate permissions. " +
"Please check that the file is present and readable and try again.");
// Get the sort order, forcing it to coordinate if unsorted.
SAMFileReader reader = readers.getReader(readerID);
SAMFileHeader header = reader.getFileHeader();
@ -711,29 +711,68 @@ public class SAMDataSource {
* @param validationStringency validation stringency.
*/
public SAMReaders(Collection<SAMReaderID> readerIDs, SAMFileReader.ValidationStringency validationStringency) {
final int N_THREADS = 8;
int totalNumberOfFiles = readerIDs.size();
int readerNumber = 1;
for(SAMReaderID readerID: readerIDs) {
File indexFile = findIndexFile(readerID.samFile);
SAMFileReader reader = null;
if(threadAllocation.getNumIOThreads() > 0) {
BlockInputStream blockInputStream = new BlockInputStream(dispatcher,readerID,false);
reader = new SAMFileReader(blockInputStream,indexFile,false);
inputStreams.put(readerID,blockInputStream);
}
else
reader = new SAMFileReader(readerID.samFile,indexFile,false);
reader.setSAMRecordFactory(factory);
reader.enableFileSource(true);
reader.setValidationStringency(validationStringency);
logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, readerID.samFile));
readers.put(readerID,reader);
ExecutorService executor = Executors.newFixedThreadPool(N_THREADS);
final List<ReaderInitializer> inits = new ArrayList<ReaderInitializer>(totalNumberOfFiles);
Queue<Future<ReaderInitializer>> futures = new LinkedList<Future<ReaderInitializer>>();
for (SAMReaderID readerID: readerIDs) {
logger.debug("Enqueuing for initialization: " + readerID.samFile);
final ReaderInitializer init = new ReaderInitializer(readerID);
inits.add(init);
futures.add(executor.submit(init));
}
final SimpleTimer timer = new SimpleTimer();
try {
final int MAX_WAIT = 30 * 1000;
final int MIN_WAIT = 1 * 1000;
timer.start();
while ( ! futures.isEmpty() ) {
final int prevSize = futures.size();
final double waitTime = prevSize * (0.5 / N_THREADS); // about 0.5 seconds to load each file
final int waitTimeInMS = Math.min(MAX_WAIT, Math.max((int) (waitTime * 1000), MIN_WAIT));
Thread.sleep(waitTimeInMS);
Queue<Future<ReaderInitializer>> pending = new LinkedList<Future<ReaderInitializer>>();
for ( final Future<ReaderInitializer> initFuture : futures ) {
if ( initFuture.isDone() ) {
final ReaderInitializer init = initFuture.get();
if (threadAllocation.getNumIOThreads() > 0) {
inputStreams.put(init.readerID, init.blockInputStream); // get from initializer
}
logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, init.readerID));
readers.put(init.readerID, init.reader);
} else {
pending.add(initFuture);
}
}
final int pendingSize = pending.size();
final int nExecutedInTick = prevSize - pendingSize;
final int nExecutedTotal = totalNumberOfFiles - pendingSize;
final double totalTimeInSeconds = timer.getElapsedTime();
final double nTasksPerSecond = nExecutedTotal / (1.0*totalTimeInSeconds);
final int nRemaining = pendingSize;
final double estTimeToComplete = pendingSize / nTasksPerSecond;
logger.info(String.format("Init %d BAMs in last %d s, %d of %d in %.2f s / %.2f m (%.2f tasks/s). %d remaining with est. completion in %.2f s / %.2f m",
nExecutedInTick, (int)(waitTimeInMS / 1000.0),
nExecutedTotal, totalNumberOfFiles, totalTimeInSeconds, totalTimeInSeconds / 60, nTasksPerSecond,
nRemaining, estTimeToComplete, estTimeToComplete / 60));
futures = pending;
}
} catch ( InterruptedException e ) {
throw new ReviewedStingException("Interrupted SAMReader initialization", e);
} catch ( ExecutionException e ) {
throw new ReviewedStingException("Execution exception during SAMReader initialization", e);
}
logger.info(String.format("Done initializing BAM readers: total time %.2f", timer.getElapsedTime()));
executor.shutdown();
}
/**
@ -806,6 +845,30 @@ public class SAMDataSource {
}
}
class ReaderInitializer implements Callable<ReaderInitializer> {
final SAMReaderID readerID;
BlockInputStream blockInputStream = null;
SAMFileReader reader;
public ReaderInitializer(final SAMReaderID readerID) {
this.readerID = readerID;
}
public ReaderInitializer call() {
final File indexFile = findIndexFile(readerID.samFile);
if (threadAllocation.getNumIOThreads() > 0) {
blockInputStream = new BlockInputStream(dispatcher,readerID,false);
reader = new SAMFileReader(blockInputStream,indexFile,false);
}
else
reader = new SAMFileReader(readerID.samFile,indexFile,false);
reader.setSAMRecordFactory(factory);
reader.enableFileSource(true);
reader.setValidationStringency(validationStringency);
return this;
}
}
private class ReleasingIterator implements StingSAMIterator {
/**
* The resource acting as the source of the data.
@ -988,12 +1051,12 @@ public class SAMDataSource {
return
// Read ends on a later contig, or...
read.getReferenceIndex() > intervalContigIndices[currentBound] ||
// Read ends of this contig...
(read.getReferenceIndex() == intervalContigIndices[currentBound] &&
// either after this location, or...
(read.getAlignmentEnd() >= intervalStarts[currentBound] ||
// read is unmapped but positioned and alignment start is on or after this start point.
(read.getReadUnmappedFlag() && read.getAlignmentStart() >= intervalStarts[currentBound])));
// Read ends of this contig...
(read.getReferenceIndex() == intervalContigIndices[currentBound] &&
// either after this location, or...
(read.getAlignmentEnd() >= intervalStarts[currentBound] ||
// read is unmapped but positioned and alignment start is on or after this start point.
(read.getReadUnmappedFlag() && read.getAlignmentStart() >= intervalStarts[currentBound])));
}
/**
@ -1005,8 +1068,8 @@ public class SAMDataSource {
return
// Read starts on a prior contig, or...
read.getReferenceIndex() < intervalContigIndices[currentBound] ||
// Read starts on this contig and the alignment start is registered before this end point.
(read.getReferenceIndex() == intervalContigIndices[currentBound] && read.getAlignmentStart() <= intervalEnds[currentBound]);
// Read starts on this contig and the alignment start is registered before this end point.
(read.getReferenceIndex() == intervalContigIndices[currentBound] && read.getAlignmentStart() <= intervalEnds[currentBound]);
}
}