From 71b4bb12b7903e1542334b80a96ea3345a486967 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 14 Dec 2011 09:58:23 -0500 Subject: [PATCH 2/4] Bug fix for incorrect logic in subsetSamples -- Now properly handles the case where a sample isn't present (no longer adds a null to the genotypes list) -- Fix for logic failure where if the number of requested samples equals the number of known genotypes then all of the records were returned, which isn't correct when there are missing samples. -- Unit tests added to handle these cases --- .../utils/variantcontext/GenotypesContext.java | 15 ++++++++++----- .../variantcontext/VariantContextUnitTest.java | 3 +++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/utils/variantcontext/GenotypesContext.java b/public/java/src/org/broadinstitute/sting/utils/variantcontext/GenotypesContext.java index 845c65c9c..85f7cc078 100644 --- a/public/java/src/org/broadinstitute/sting/utils/variantcontext/GenotypesContext.java +++ b/public/java/src/org/broadinstitute/sting/utils/variantcontext/GenotypesContext.java @@ -410,6 +410,12 @@ public class GenotypesContext implements List { return getGenotypes().get(i); } + /** + * Gets sample associated with this sampleName, or null if none is found + * + * @param sampleName + * @return + */ public Genotype get(final String sampleName) { Integer offset = getSampleI(sampleName); return offset == null ? null : getGenotypes().get(offset); @@ -648,16 +654,15 @@ public class GenotypesContext implements List { @Ensures("result != null") public GenotypesContext subsetToSamples( final Set samples ) { final int nSamples = samples.size(); - final int nGenotypes = size(); - if ( nSamples == nGenotypes ) - return this; - else if ( nSamples == 0 ) + if ( nSamples == 0 ) return NO_GENOTYPES; else { // nGenotypes < nSamples final GenotypesContext subset = create(samples.size()); for ( final String sample : samples ) { - subset.add(get(sample)); + final Genotype g = get(sample); + if ( g != null ) + subset.add(g); } return subset; } diff --git a/public/java/test/org/broadinstitute/sting/utils/variantcontext/VariantContextUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/variantcontext/VariantContextUnitTest.java index fca7440e4..0e75eee14 100755 --- a/public/java/test/org/broadinstitute/sting/utils/variantcontext/VariantContextUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/variantcontext/VariantContextUnitTest.java @@ -704,11 +704,14 @@ public class VariantContextUnitTest extends BaseTest { public Object[][] MakeSubContextTest() { for ( boolean updateAlleles : Arrays.asList(true, false)) { new SubContextTest(Collections.emptySet(), updateAlleles); + new SubContextTest(Collections.singleton("MISSING"), updateAlleles); new SubContextTest(Collections.singleton("AA"), updateAlleles); new SubContextTest(Collections.singleton("AT"), updateAlleles); new SubContextTest(Collections.singleton("TT"), updateAlleles); new SubContextTest(Arrays.asList("AA", "AT"), updateAlleles); new SubContextTest(Arrays.asList("AA", "AT", "TT"), updateAlleles); + new SubContextTest(Arrays.asList("AA", "AT", "MISSING"), updateAlleles); + new SubContextTest(Arrays.asList("AA", "AT", "TT", "MISSING"), updateAlleles); } return SubContextTest.getTests(SubContextTest.class); From 01e547eed3b70f8c1c5fac507136264fe9f08200 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 14 Dec 2011 10:00:21 -0500 Subject: [PATCH 3/4] Parallel SAMDataSource initialization -- Uses 8 threads to load BAM files and indices in parallel, decreasing costs to read thousands of BAM files by a significant amount -- Added logger.info message noting progress and cost of reading low-level BAM data. --- .../gatk/datasources/reads/SAMDataSource.java | 131 +++++++++++++----- 1 file changed, 97 insertions(+), 34 deletions(-) diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java index d70c63bd2..aacdf9b95 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java @@ -41,6 +41,7 @@ import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.GenomeLocSortedSet; +import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.baq.BAQ; import org.broadinstitute.sting.utils.baq.BAQSamIterator; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; @@ -51,6 +52,7 @@ import java.io.File; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.*; +import java.util.concurrent.*; /** * User: aaron @@ -199,7 +201,7 @@ public class SAMDataSource { BAQ.QualityMode.DONT_MODIFY, null, // no BAQ (byte) -1); - } + } /** * Create a new SAM data source given the supplied read metadata. @@ -253,17 +255,15 @@ public class SAMDataSource { if(readBufferSize != null) ReadShard.setReadBufferSize(readBufferSize); - for (SAMReaderID readerID : samFiles) { - if (!readerID.samFile.canRead()) - throw new UserException.CouldNotReadInputFile(readerID.samFile,"file is not present or user does not have appropriate permissions. " + - "Please check that the file is present and readable and try again."); - } - resourcePool = new SAMResourcePool(Integer.MAX_VALUE); SAMReaders readers = resourcePool.getAvailableReaders(); // Determine the sort order. for(SAMReaderID readerID: readerIDs) { + if (! readerID.samFile.canRead() ) + throw new UserException.CouldNotReadInputFile(readerID.samFile,"file is not present or user does not have appropriate permissions. " + + "Please check that the file is present and readable and try again."); + // Get the sort order, forcing it to coordinate if unsorted. SAMFileReader reader = readers.getReader(readerID); SAMFileHeader header = reader.getFileHeader(); @@ -711,29 +711,68 @@ public class SAMDataSource { * @param validationStringency validation stringency. */ public SAMReaders(Collection readerIDs, SAMFileReader.ValidationStringency validationStringency) { + final int N_THREADS = 8; int totalNumberOfFiles = readerIDs.size(); int readerNumber = 1; - for(SAMReaderID readerID: readerIDs) { - File indexFile = findIndexFile(readerID.samFile); - SAMFileReader reader = null; - - if(threadAllocation.getNumIOThreads() > 0) { - BlockInputStream blockInputStream = new BlockInputStream(dispatcher,readerID,false); - reader = new SAMFileReader(blockInputStream,indexFile,false); - inputStreams.put(readerID,blockInputStream); - } - else - reader = new SAMFileReader(readerID.samFile,indexFile,false); - reader.setSAMRecordFactory(factory); - - reader.enableFileSource(true); - reader.setValidationStringency(validationStringency); - - logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, readerID.samFile)); - - readers.put(readerID,reader); + ExecutorService executor = Executors.newFixedThreadPool(N_THREADS); + final List inits = new ArrayList(totalNumberOfFiles); + Queue> futures = new LinkedList>(); + for (SAMReaderID readerID: readerIDs) { + logger.debug("Enqueuing for initialization: " + readerID.samFile); + final ReaderInitializer init = new ReaderInitializer(readerID); + inits.add(init); + futures.add(executor.submit(init)); } + + final SimpleTimer timer = new SimpleTimer(); + try { + final int MAX_WAIT = 30 * 1000; + final int MIN_WAIT = 1 * 1000; + + timer.start(); + while ( ! futures.isEmpty() ) { + final int prevSize = futures.size(); + final double waitTime = prevSize * (0.5 / N_THREADS); // about 0.5 seconds to load each file + final int waitTimeInMS = Math.min(MAX_WAIT, Math.max((int) (waitTime * 1000), MIN_WAIT)); + Thread.sleep(waitTimeInMS); + + Queue> pending = new LinkedList>(); + for ( final Future initFuture : futures ) { + if ( initFuture.isDone() ) { + final ReaderInitializer init = initFuture.get(); + if (threadAllocation.getNumIOThreads() > 0) { + inputStreams.put(init.readerID, init.blockInputStream); // get from initializer + } + logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, init.readerID)); + readers.put(init.readerID, init.reader); + } else { + pending.add(initFuture); + } + } + + final int pendingSize = pending.size(); + final int nExecutedInTick = prevSize - pendingSize; + final int nExecutedTotal = totalNumberOfFiles - pendingSize; + final double totalTimeInSeconds = timer.getElapsedTime(); + final double nTasksPerSecond = nExecutedTotal / (1.0*totalTimeInSeconds); + final int nRemaining = pendingSize; + final double estTimeToComplete = pendingSize / nTasksPerSecond; + logger.info(String.format("Init %d BAMs in last %d s, %d of %d in %.2f s / %.2f m (%.2f tasks/s). %d remaining with est. completion in %.2f s / %.2f m", + nExecutedInTick, (int)(waitTimeInMS / 1000.0), + nExecutedTotal, totalNumberOfFiles, totalTimeInSeconds, totalTimeInSeconds / 60, nTasksPerSecond, + nRemaining, estTimeToComplete, estTimeToComplete / 60)); + + futures = pending; + } + } catch ( InterruptedException e ) { + throw new ReviewedStingException("Interrupted SAMReader initialization", e); + } catch ( ExecutionException e ) { + throw new ReviewedStingException("Execution exception during SAMReader initialization", e); + } + + logger.info(String.format("Done initializing BAM readers: total time %.2f", timer.getElapsedTime())); + executor.shutdown(); } /** @@ -806,6 +845,30 @@ public class SAMDataSource { } } + class ReaderInitializer implements Callable { + final SAMReaderID readerID; + BlockInputStream blockInputStream = null; + SAMFileReader reader; + + public ReaderInitializer(final SAMReaderID readerID) { + this.readerID = readerID; + } + + public ReaderInitializer call() { + final File indexFile = findIndexFile(readerID.samFile); + if (threadAllocation.getNumIOThreads() > 0) { + blockInputStream = new BlockInputStream(dispatcher,readerID,false); + reader = new SAMFileReader(blockInputStream,indexFile,false); + } + else + reader = new SAMFileReader(readerID.samFile,indexFile,false); + reader.setSAMRecordFactory(factory); + reader.enableFileSource(true); + reader.setValidationStringency(validationStringency); + return this; + } + } + private class ReleasingIterator implements StingSAMIterator { /** * The resource acting as the source of the data. @@ -988,12 +1051,12 @@ public class SAMDataSource { return // Read ends on a later contig, or... read.getReferenceIndex() > intervalContigIndices[currentBound] || - // Read ends of this contig... - (read.getReferenceIndex() == intervalContigIndices[currentBound] && - // either after this location, or... - (read.getAlignmentEnd() >= intervalStarts[currentBound] || - // read is unmapped but positioned and alignment start is on or after this start point. - (read.getReadUnmappedFlag() && read.getAlignmentStart() >= intervalStarts[currentBound]))); + // Read ends of this contig... + (read.getReferenceIndex() == intervalContigIndices[currentBound] && + // either after this location, or... + (read.getAlignmentEnd() >= intervalStarts[currentBound] || + // read is unmapped but positioned and alignment start is on or after this start point. + (read.getReadUnmappedFlag() && read.getAlignmentStart() >= intervalStarts[currentBound]))); } /** @@ -1005,8 +1068,8 @@ public class SAMDataSource { return // Read starts on a prior contig, or... read.getReferenceIndex() < intervalContigIndices[currentBound] || - // Read starts on this contig and the alignment start is registered before this end point. - (read.getReferenceIndex() == intervalContigIndices[currentBound] && read.getAlignmentStart() <= intervalEnds[currentBound]); + // Read starts on this contig and the alignment start is registered before this end point. + (read.getReferenceIndex() == intervalContigIndices[currentBound] && read.getAlignmentStart() <= intervalEnds[currentBound]); } }