Merge branch 'master' of ssh://nickel.broadinstitute.org/humgen/gsa-scr1/gsa-engineering/git/unstable

This commit is contained in:
Eric Banks 2011-12-14 16:24:56 -05:00
commit de5928ac5a
3 changed files with 110 additions and 39 deletions

View File

@ -41,6 +41,7 @@ import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.baq.BAQ;
import org.broadinstitute.sting.utils.baq.BAQSamIterator;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
@ -51,6 +52,7 @@ import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.*;
/**
* User: aaron
@ -199,7 +201,7 @@ public class SAMDataSource {
BAQ.QualityMode.DONT_MODIFY,
null, // no BAQ
(byte) -1);
}
}
/**
* Create a new SAM data source given the supplied read metadata.
@ -253,17 +255,15 @@ public class SAMDataSource {
if(readBufferSize != null)
ReadShard.setReadBufferSize(readBufferSize);
for (SAMReaderID readerID : samFiles) {
if (!readerID.samFile.canRead())
throw new UserException.CouldNotReadInputFile(readerID.samFile,"file is not present or user does not have appropriate permissions. " +
"Please check that the file is present and readable and try again.");
}
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
SAMReaders readers = resourcePool.getAvailableReaders();
// Determine the sort order.
for(SAMReaderID readerID: readerIDs) {
if (! readerID.samFile.canRead() )
throw new UserException.CouldNotReadInputFile(readerID.samFile,"file is not present or user does not have appropriate permissions. " +
"Please check that the file is present and readable and try again.");
// Get the sort order, forcing it to coordinate if unsorted.
SAMFileReader reader = readers.getReader(readerID);
SAMFileHeader header = reader.getFileHeader();
@ -711,29 +711,68 @@ public class SAMDataSource {
* @param validationStringency validation stringency.
*/
public SAMReaders(Collection<SAMReaderID> readerIDs, SAMFileReader.ValidationStringency validationStringency) {
final int N_THREADS = 8;
int totalNumberOfFiles = readerIDs.size();
int readerNumber = 1;
for(SAMReaderID readerID: readerIDs) {
File indexFile = findIndexFile(readerID.samFile);
SAMFileReader reader = null;
if(threadAllocation.getNumIOThreads() > 0) {
BlockInputStream blockInputStream = new BlockInputStream(dispatcher,readerID,false);
reader = new SAMFileReader(blockInputStream,indexFile,false);
inputStreams.put(readerID,blockInputStream);
}
else
reader = new SAMFileReader(readerID.samFile,indexFile,false);
reader.setSAMRecordFactory(factory);
reader.enableFileSource(true);
reader.setValidationStringency(validationStringency);
logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, readerID.samFile));
readers.put(readerID,reader);
ExecutorService executor = Executors.newFixedThreadPool(N_THREADS);
final List<ReaderInitializer> inits = new ArrayList<ReaderInitializer>(totalNumberOfFiles);
Queue<Future<ReaderInitializer>> futures = new LinkedList<Future<ReaderInitializer>>();
for (SAMReaderID readerID: readerIDs) {
logger.debug("Enqueuing for initialization: " + readerID.samFile);
final ReaderInitializer init = new ReaderInitializer(readerID);
inits.add(init);
futures.add(executor.submit(init));
}
final SimpleTimer timer = new SimpleTimer();
try {
final int MAX_WAIT = 30 * 1000;
final int MIN_WAIT = 1 * 1000;
timer.start();
while ( ! futures.isEmpty() ) {
final int prevSize = futures.size();
final double waitTime = prevSize * (0.5 / N_THREADS); // about 0.5 seconds to load each file
final int waitTimeInMS = Math.min(MAX_WAIT, Math.max((int) (waitTime * 1000), MIN_WAIT));
Thread.sleep(waitTimeInMS);
Queue<Future<ReaderInitializer>> pending = new LinkedList<Future<ReaderInitializer>>();
for ( final Future<ReaderInitializer> initFuture : futures ) {
if ( initFuture.isDone() ) {
final ReaderInitializer init = initFuture.get();
if (threadAllocation.getNumIOThreads() > 0) {
inputStreams.put(init.readerID, init.blockInputStream); // get from initializer
}
logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, init.readerID));
readers.put(init.readerID, init.reader);
} else {
pending.add(initFuture);
}
}
final int pendingSize = pending.size();
final int nExecutedInTick = prevSize - pendingSize;
final int nExecutedTotal = totalNumberOfFiles - pendingSize;
final double totalTimeInSeconds = timer.getElapsedTime();
final double nTasksPerSecond = nExecutedTotal / (1.0*totalTimeInSeconds);
final int nRemaining = pendingSize;
final double estTimeToComplete = pendingSize / nTasksPerSecond;
logger.info(String.format("Init %d BAMs in last %d s, %d of %d in %.2f s / %.2f m (%.2f tasks/s). %d remaining with est. completion in %.2f s / %.2f m",
nExecutedInTick, (int)(waitTimeInMS / 1000.0),
nExecutedTotal, totalNumberOfFiles, totalTimeInSeconds, totalTimeInSeconds / 60, nTasksPerSecond,
nRemaining, estTimeToComplete, estTimeToComplete / 60));
futures = pending;
}
} catch ( InterruptedException e ) {
throw new ReviewedStingException("Interrupted SAMReader initialization", e);
} catch ( ExecutionException e ) {
throw new ReviewedStingException("Execution exception during SAMReader initialization", e);
}
logger.info(String.format("Done initializing BAM readers: total time %.2f", timer.getElapsedTime()));
executor.shutdown();
}
/**
@ -806,6 +845,30 @@ public class SAMDataSource {
}
}
class ReaderInitializer implements Callable<ReaderInitializer> {
final SAMReaderID readerID;
BlockInputStream blockInputStream = null;
SAMFileReader reader;
public ReaderInitializer(final SAMReaderID readerID) {
this.readerID = readerID;
}
public ReaderInitializer call() {
final File indexFile = findIndexFile(readerID.samFile);
if (threadAllocation.getNumIOThreads() > 0) {
blockInputStream = new BlockInputStream(dispatcher,readerID,false);
reader = new SAMFileReader(blockInputStream,indexFile,false);
}
else
reader = new SAMFileReader(readerID.samFile,indexFile,false);
reader.setSAMRecordFactory(factory);
reader.enableFileSource(true);
reader.setValidationStringency(validationStringency);
return this;
}
}
private class ReleasingIterator implements StingSAMIterator {
/**
* The resource acting as the source of the data.
@ -988,12 +1051,12 @@ public class SAMDataSource {
return
// Read ends on a later contig, or...
read.getReferenceIndex() > intervalContigIndices[currentBound] ||
// Read ends of this contig...
(read.getReferenceIndex() == intervalContigIndices[currentBound] &&
// either after this location, or...
(read.getAlignmentEnd() >= intervalStarts[currentBound] ||
// read is unmapped but positioned and alignment start is on or after this start point.
(read.getReadUnmappedFlag() && read.getAlignmentStart() >= intervalStarts[currentBound])));
// Read ends of this contig...
(read.getReferenceIndex() == intervalContigIndices[currentBound] &&
// either after this location, or...
(read.getAlignmentEnd() >= intervalStarts[currentBound] ||
// read is unmapped but positioned and alignment start is on or after this start point.
(read.getReadUnmappedFlag() && read.getAlignmentStart() >= intervalStarts[currentBound])));
}
/**
@ -1005,8 +1068,8 @@ public class SAMDataSource {
return
// Read starts on a prior contig, or...
read.getReferenceIndex() < intervalContigIndices[currentBound] ||
// Read starts on this contig and the alignment start is registered before this end point.
(read.getReferenceIndex() == intervalContigIndices[currentBound] && read.getAlignmentStart() <= intervalEnds[currentBound]);
// Read starts on this contig and the alignment start is registered before this end point.
(read.getReferenceIndex() == intervalContigIndices[currentBound] && read.getAlignmentStart() <= intervalEnds[currentBound]);
}
}

View File

@ -410,6 +410,12 @@ public class GenotypesContext implements List<Genotype> {
return getGenotypes().get(i);
}
/**
* Gets sample associated with this sampleName, or null if none is found
*
* @param sampleName
* @return
*/
public Genotype get(final String sampleName) {
Integer offset = getSampleI(sampleName);
return offset == null ? null : getGenotypes().get(offset);
@ -648,16 +654,15 @@ public class GenotypesContext implements List<Genotype> {
@Ensures("result != null")
public GenotypesContext subsetToSamples( final Set<String> samples ) {
final int nSamples = samples.size();
final int nGenotypes = size();
if ( nSamples == nGenotypes )
return this;
else if ( nSamples == 0 )
if ( nSamples == 0 )
return NO_GENOTYPES;
else { // nGenotypes < nSamples
final GenotypesContext subset = create(samples.size());
for ( final String sample : samples ) {
subset.add(get(sample));
final Genotype g = get(sample);
if ( g != null )
subset.add(g);
}
return subset;
}

View File

@ -704,11 +704,14 @@ public class VariantContextUnitTest extends BaseTest {
public Object[][] MakeSubContextTest() {
for ( boolean updateAlleles : Arrays.asList(true, false)) {
new SubContextTest(Collections.<String>emptySet(), updateAlleles);
new SubContextTest(Collections.singleton("MISSING"), updateAlleles);
new SubContextTest(Collections.singleton("AA"), updateAlleles);
new SubContextTest(Collections.singleton("AT"), updateAlleles);
new SubContextTest(Collections.singleton("TT"), updateAlleles);
new SubContextTest(Arrays.asList("AA", "AT"), updateAlleles);
new SubContextTest(Arrays.asList("AA", "AT", "TT"), updateAlleles);
new SubContextTest(Arrays.asList("AA", "AT", "MISSING"), updateAlleles);
new SubContextTest(Arrays.asList("AA", "AT", "TT", "MISSING"), updateAlleles);
}
return SubContextTest.getTests(SubContextTest.class);