diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java index d70c63bd2..aacdf9b95 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java @@ -41,6 +41,7 @@ import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.GenomeLocSortedSet; +import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.baq.BAQ; import org.broadinstitute.sting.utils.baq.BAQSamIterator; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; @@ -51,6 +52,7 @@ import java.io.File; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.*; +import java.util.concurrent.*; /** * User: aaron @@ -199,7 +201,7 @@ public class SAMDataSource { BAQ.QualityMode.DONT_MODIFY, null, // no BAQ (byte) -1); - } + } /** * Create a new SAM data source given the supplied read metadata. @@ -253,17 +255,15 @@ public class SAMDataSource { if(readBufferSize != null) ReadShard.setReadBufferSize(readBufferSize); - for (SAMReaderID readerID : samFiles) { - if (!readerID.samFile.canRead()) - throw new UserException.CouldNotReadInputFile(readerID.samFile,"file is not present or user does not have appropriate permissions. " + - "Please check that the file is present and readable and try again."); - } - resourcePool = new SAMResourcePool(Integer.MAX_VALUE); SAMReaders readers = resourcePool.getAvailableReaders(); // Determine the sort order. for(SAMReaderID readerID: readerIDs) { + if (! readerID.samFile.canRead() ) + throw new UserException.CouldNotReadInputFile(readerID.samFile,"file is not present or user does not have appropriate permissions. " + + "Please check that the file is present and readable and try again."); + // Get the sort order, forcing it to coordinate if unsorted. SAMFileReader reader = readers.getReader(readerID); SAMFileHeader header = reader.getFileHeader(); @@ -711,29 +711,68 @@ public class SAMDataSource { * @param validationStringency validation stringency. */ public SAMReaders(Collection readerIDs, SAMFileReader.ValidationStringency validationStringency) { + final int N_THREADS = 8; int totalNumberOfFiles = readerIDs.size(); int readerNumber = 1; - for(SAMReaderID readerID: readerIDs) { - File indexFile = findIndexFile(readerID.samFile); - SAMFileReader reader = null; - - if(threadAllocation.getNumIOThreads() > 0) { - BlockInputStream blockInputStream = new BlockInputStream(dispatcher,readerID,false); - reader = new SAMFileReader(blockInputStream,indexFile,false); - inputStreams.put(readerID,blockInputStream); - } - else - reader = new SAMFileReader(readerID.samFile,indexFile,false); - reader.setSAMRecordFactory(factory); - - reader.enableFileSource(true); - reader.setValidationStringency(validationStringency); - - logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, readerID.samFile)); - - readers.put(readerID,reader); + ExecutorService executor = Executors.newFixedThreadPool(N_THREADS); + final List inits = new ArrayList(totalNumberOfFiles); + Queue> futures = new LinkedList>(); + for (SAMReaderID readerID: readerIDs) { + logger.debug("Enqueuing for initialization: " + readerID.samFile); + final ReaderInitializer init = new ReaderInitializer(readerID); + inits.add(init); + futures.add(executor.submit(init)); } + + final SimpleTimer timer = new SimpleTimer(); + try { + final int MAX_WAIT = 30 * 1000; + final int MIN_WAIT = 1 * 1000; + + timer.start(); + while ( ! futures.isEmpty() ) { + final int prevSize = futures.size(); + final double waitTime = prevSize * (0.5 / N_THREADS); // about 0.5 seconds to load each file + final int waitTimeInMS = Math.min(MAX_WAIT, Math.max((int) (waitTime * 1000), MIN_WAIT)); + Thread.sleep(waitTimeInMS); + + Queue> pending = new LinkedList>(); + for ( final Future initFuture : futures ) { + if ( initFuture.isDone() ) { + final ReaderInitializer init = initFuture.get(); + if (threadAllocation.getNumIOThreads() > 0) { + inputStreams.put(init.readerID, init.blockInputStream); // get from initializer + } + logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, init.readerID)); + readers.put(init.readerID, init.reader); + } else { + pending.add(initFuture); + } + } + + final int pendingSize = pending.size(); + final int nExecutedInTick = prevSize - pendingSize; + final int nExecutedTotal = totalNumberOfFiles - pendingSize; + final double totalTimeInSeconds = timer.getElapsedTime(); + final double nTasksPerSecond = nExecutedTotal / (1.0*totalTimeInSeconds); + final int nRemaining = pendingSize; + final double estTimeToComplete = pendingSize / nTasksPerSecond; + logger.info(String.format("Init %d BAMs in last %d s, %d of %d in %.2f s / %.2f m (%.2f tasks/s). %d remaining with est. completion in %.2f s / %.2f m", + nExecutedInTick, (int)(waitTimeInMS / 1000.0), + nExecutedTotal, totalNumberOfFiles, totalTimeInSeconds, totalTimeInSeconds / 60, nTasksPerSecond, + nRemaining, estTimeToComplete, estTimeToComplete / 60)); + + futures = pending; + } + } catch ( InterruptedException e ) { + throw new ReviewedStingException("Interrupted SAMReader initialization", e); + } catch ( ExecutionException e ) { + throw new ReviewedStingException("Execution exception during SAMReader initialization", e); + } + + logger.info(String.format("Done initializing BAM readers: total time %.2f", timer.getElapsedTime())); + executor.shutdown(); } /** @@ -806,6 +845,30 @@ public class SAMDataSource { } } + class ReaderInitializer implements Callable { + final SAMReaderID readerID; + BlockInputStream blockInputStream = null; + SAMFileReader reader; + + public ReaderInitializer(final SAMReaderID readerID) { + this.readerID = readerID; + } + + public ReaderInitializer call() { + final File indexFile = findIndexFile(readerID.samFile); + if (threadAllocation.getNumIOThreads() > 0) { + blockInputStream = new BlockInputStream(dispatcher,readerID,false); + reader = new SAMFileReader(blockInputStream,indexFile,false); + } + else + reader = new SAMFileReader(readerID.samFile,indexFile,false); + reader.setSAMRecordFactory(factory); + reader.enableFileSource(true); + reader.setValidationStringency(validationStringency); + return this; + } + } + private class ReleasingIterator implements StingSAMIterator { /** * The resource acting as the source of the data. @@ -988,12 +1051,12 @@ public class SAMDataSource { return // Read ends on a later contig, or... read.getReferenceIndex() > intervalContigIndices[currentBound] || - // Read ends of this contig... - (read.getReferenceIndex() == intervalContigIndices[currentBound] && - // either after this location, or... - (read.getAlignmentEnd() >= intervalStarts[currentBound] || - // read is unmapped but positioned and alignment start is on or after this start point. - (read.getReadUnmappedFlag() && read.getAlignmentStart() >= intervalStarts[currentBound]))); + // Read ends of this contig... + (read.getReferenceIndex() == intervalContigIndices[currentBound] && + // either after this location, or... + (read.getAlignmentEnd() >= intervalStarts[currentBound] || + // read is unmapped but positioned and alignment start is on or after this start point. + (read.getReadUnmappedFlag() && read.getAlignmentStart() >= intervalStarts[currentBound]))); } /** @@ -1005,8 +1068,8 @@ public class SAMDataSource { return // Read starts on a prior contig, or... read.getReferenceIndex() < intervalContigIndices[currentBound] || - // Read starts on this contig and the alignment start is registered before this end point. - (read.getReferenceIndex() == intervalContigIndices[currentBound] && read.getAlignmentStart() <= intervalEnds[currentBound]); + // Read starts on this contig and the alignment start is registered before this end point. + (read.getReferenceIndex() == intervalContigIndices[currentBound] && read.getAlignmentStart() <= intervalEnds[currentBound]); } } diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/GenotypesContext.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/GenotypesContext.java index 845c65c9c..85f7cc078 100644 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/GenotypesContext.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/GenotypesContext.java @@ -410,6 +410,12 @@ public class GenotypesContext implements List { return getGenotypes().get(i); } + /** + * Gets sample associated with this sampleName, or null if none is found + * + * @param sampleName + * @return + */ public Genotype get(final String sampleName) { Integer offset = getSampleI(sampleName); return offset == null ? null : getGenotypes().get(offset); @@ -648,16 +654,15 @@ public class GenotypesContext implements List { @Ensures("result != null") public GenotypesContext subsetToSamples( final Set samples ) { final int nSamples = samples.size(); - final int nGenotypes = size(); - if ( nSamples == nGenotypes ) - return this; - else if ( nSamples == 0 ) + if ( nSamples == 0 ) return NO_GENOTYPES; else { // nGenotypes < nSamples final GenotypesContext subset = create(samples.size()); for ( final String sample : samples ) { - subset.add(get(sample)); + final Genotype g = get(sample); + if ( g != null ) + subset.add(g); } return subset; } diff --git a/public/java/test/org/broadinstitute/sting/utils/variantcontext/VariantContextUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/variantcontext/VariantContextUnitTest.java index fca7440e4..0e75eee14 100755 --- a/public/java/test/org/broadinstitute/sting/utils/variantcontext/VariantContextUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/variantcontext/VariantContextUnitTest.java @@ -704,11 +704,14 @@ public class VariantContextUnitTest extends BaseTest { public Object[][] MakeSubContextTest() { for ( boolean updateAlleles : Arrays.asList(true, false)) { new SubContextTest(Collections.emptySet(), updateAlleles); + new SubContextTest(Collections.singleton("MISSING"), updateAlleles); new SubContextTest(Collections.singleton("AA"), updateAlleles); new SubContextTest(Collections.singleton("AT"), updateAlleles); new SubContextTest(Collections.singleton("TT"), updateAlleles); new SubContextTest(Arrays.asList("AA", "AT"), updateAlleles); new SubContextTest(Arrays.asList("AA", "AT", "TT"), updateAlleles); + new SubContextTest(Arrays.asList("AA", "AT", "MISSING"), updateAlleles); + new SubContextTest(Arrays.asList("AA", "AT", "TT", "MISSING"), updateAlleles); } return SubContextTest.getTests(SubContextTest.class);