Bugfix for incorrect error messages for missing BAMs and VCFs
-- Missing BAMs were appearing as StingExceptions -- Missing VCFs were showing up as CommandLineErrors, but it's clearer for them to be CouldNotReadInputFile exceptions -- Added integration tests to ensure missing BAMs, VCFs, and -L files are properly thrown as CouldNotReadInputFile exceptions -- Added path to standard b37 BAM to BaseTest -- Cleaned up code in SAMDataSource, removing my parallel loading code as this just didn't prove to be useful.
This commit is contained in:
parent
d5199db8ec
commit
80a4ce0edf
|
|
@ -436,9 +436,12 @@ class RodBindingArgumentTypeDescriptor extends ArgumentTypeDescriptor {
|
|||
String.format("Failed to parse value %s for argument %s.",
|
||||
value, source.field.getName()));
|
||||
} catch (Exception e) {
|
||||
throw new UserException.CommandLineException(
|
||||
String.format("Failed to parse value %s for argument %s. Message: %s",
|
||||
value, source.field.getName(), e.getMessage()));
|
||||
if ( e instanceof UserException )
|
||||
throw ((UserException)e);
|
||||
else
|
||||
throw new UserException.CommandLineException(
|
||||
String.format("Failed to parse value %s for argument %s. Message: %s",
|
||||
value, source.field.getName(), e.getMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import net.sf.picard.sam.MergingSamRecordIterator;
|
|||
import net.sf.picard.sam.SamFileHeaderMerger;
|
||||
import net.sf.samtools.*;
|
||||
import net.sf.samtools.util.CloseableIterator;
|
||||
import net.sf.samtools.util.RuntimeIOException;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.broadinstitute.sting.gatk.DownsamplingMethod;
|
||||
import org.broadinstitute.sting.gatk.ReadMetrics;
|
||||
|
|
@ -49,6 +50,7 @@ import org.broadinstitute.sting.utils.exceptions.UserException;
|
|||
import org.broadinstitute.sting.utils.sam.GATKSamRecordFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.*;
|
||||
|
|
@ -64,9 +66,6 @@ import java.util.concurrent.*;
|
|||
public class SAMDataSource {
|
||||
final private static GATKSamRecordFactory factory = new GATKSamRecordFactory();
|
||||
|
||||
/** If true, we will load SAMReaders in parallel */
|
||||
final private static boolean USE_PARALLEL_LOADING = false;
|
||||
|
||||
/** Backing support for reads. */
|
||||
protected final ReadProperties readProperties;
|
||||
|
||||
|
|
@ -726,74 +725,23 @@ public class SAMDataSource {
|
|||
int readerNumber = 1;
|
||||
final SimpleTimer timer = new SimpleTimer().start();
|
||||
|
||||
if ( totalNumberOfFiles > 0 ) logger.info("Initializing SAMRecords " + (USE_PARALLEL_LOADING ? "in parallel" : "in serial"));
|
||||
if ( ! USE_PARALLEL_LOADING ) {
|
||||
final int tickSize = 50;
|
||||
int nExecutedTotal = 0;
|
||||
long lastTick = timer.currentTime();
|
||||
for(final SAMReaderID readerID: readerIDs) {
|
||||
final ReaderInitializer init = new ReaderInitializer(readerID).call();
|
||||
if (threadAllocation.getNumIOThreads() > 0) {
|
||||
inputStreams.put(init.readerID, init.blockInputStream); // get from initializer
|
||||
}
|
||||
|
||||
logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, readerID.samFile));
|
||||
readers.put(init.readerID,init.reader);
|
||||
if ( ++nExecutedTotal % tickSize == 0) {
|
||||
double tickInSec = (timer.currentTime() - lastTick) / 1000.0;
|
||||
printReaderPerformance(nExecutedTotal, tickSize, totalNumberOfFiles, timer, tickInSec);
|
||||
lastTick = timer.currentTime();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
final int N_THREADS = 8;
|
||||
|
||||
final ExecutorService executor = Executors.newFixedThreadPool(N_THREADS);
|
||||
final List<ReaderInitializer> inits = new ArrayList<ReaderInitializer>(totalNumberOfFiles);
|
||||
Queue<Future<ReaderInitializer>> futures = new LinkedList<Future<ReaderInitializer>>();
|
||||
for (final SAMReaderID readerID: readerIDs) {
|
||||
logger.debug("Enqueuing for initialization: " + readerID.samFile);
|
||||
final ReaderInitializer init = new ReaderInitializer(readerID);
|
||||
inits.add(init);
|
||||
futures.add(executor.submit(init));
|
||||
if ( totalNumberOfFiles > 0 ) logger.info("Initializing SAMRecords in serial");
|
||||
final int tickSize = 50;
|
||||
int nExecutedTotal = 0;
|
||||
long lastTick = timer.currentTime();
|
||||
for(final SAMReaderID readerID: readerIDs) {
|
||||
final ReaderInitializer init = new ReaderInitializer(readerID).call();
|
||||
if (threadAllocation.getNumIOThreads() > 0) {
|
||||
inputStreams.put(init.readerID, init.blockInputStream); // get from initializer
|
||||
}
|
||||
|
||||
try {
|
||||
final int MAX_WAIT = 30 * 1000;
|
||||
final int MIN_WAIT = 1 * 1000;
|
||||
|
||||
while ( ! futures.isEmpty() ) {
|
||||
final int prevSize = futures.size();
|
||||
final double waitTime = prevSize * (0.5 / N_THREADS); // about 0.5 seconds to load each file
|
||||
final int waitTimeInMS = Math.min(MAX_WAIT, Math.max((int) (waitTime * 1000), MIN_WAIT));
|
||||
Thread.sleep(waitTimeInMS);
|
||||
|
||||
Queue<Future<ReaderInitializer>> pending = new LinkedList<Future<ReaderInitializer>>();
|
||||
for ( final Future<ReaderInitializer> initFuture : futures ) {
|
||||
if ( initFuture.isDone() ) {
|
||||
final ReaderInitializer init = initFuture.get();
|
||||
if (threadAllocation.getNumIOThreads() > 0) {
|
||||
inputStreams.put(init.readerID, init.blockInputStream); // get from initializer
|
||||
}
|
||||
logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, init.readerID));
|
||||
readers.put(init.readerID, init.reader);
|
||||
} else {
|
||||
pending.add(initFuture);
|
||||
}
|
||||
}
|
||||
|
||||
final int nExecutedTotal = totalNumberOfFiles - pending.size();
|
||||
final int nExecutedInTick = prevSize - pending.size();
|
||||
printReaderPerformance(nExecutedTotal, nExecutedInTick, totalNumberOfFiles, timer, waitTimeInMS / 1000.0);
|
||||
futures = pending;
|
||||
}
|
||||
} catch ( InterruptedException e ) {
|
||||
throw new ReviewedStingException("Interrupted SAMReader initialization", e);
|
||||
} catch ( ExecutionException e ) {
|
||||
throw new ReviewedStingException("Execution exception during SAMReader initialization", e);
|
||||
logger.debug(String.format("Processing file (%d of %d) %s...", readerNumber++, totalNumberOfFiles, readerID.samFile));
|
||||
readers.put(init.readerID,init.reader);
|
||||
if ( ++nExecutedTotal % tickSize == 0) {
|
||||
double tickInSec = (timer.currentTime() - lastTick) / 1000.0;
|
||||
printReaderPerformance(nExecutedTotal, tickSize, totalNumberOfFiles, timer, tickInSec);
|
||||
lastTick = timer.currentTime();
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
if ( totalNumberOfFiles > 0 ) logger.info(String.format("Done initializing BAM readers: total time %.2f", timer.getElapsedTime()));
|
||||
|
|
@ -913,12 +861,19 @@ public class SAMDataSource {
|
|||
|
||||
public ReaderInitializer call() {
|
||||
final File indexFile = findIndexFile(readerID.samFile);
|
||||
if (threadAllocation.getNumIOThreads() > 0) {
|
||||
blockInputStream = new BlockInputStream(dispatcher,readerID,false);
|
||||
reader = new SAMFileReader(blockInputStream,indexFile,false);
|
||||
try {
|
||||
if (threadAllocation.getNumIOThreads() > 0) {
|
||||
blockInputStream = new BlockInputStream(dispatcher,readerID,false);
|
||||
reader = new SAMFileReader(blockInputStream,indexFile,false);
|
||||
}
|
||||
else
|
||||
reader = new SAMFileReader(readerID.samFile,indexFile,false);
|
||||
} catch ( RuntimeIOException e ) {
|
||||
if ( e.getCause() != null && e.getCause() instanceof FileNotFoundException )
|
||||
throw new UserException.CouldNotReadInputFile(readerID.samFile, e);
|
||||
else
|
||||
throw e;
|
||||
}
|
||||
else
|
||||
reader = new SAMFileReader(readerID.samFile,indexFile,false);
|
||||
reader.setSAMRecordFactory(factory);
|
||||
reader.enableFileSource(true);
|
||||
reader.setValidationStringency(validationStringency);
|
||||
|
|
|
|||
|
|
@ -52,6 +52,8 @@ public abstract class BaseTest {
|
|||
public static final String comparisonDataLocation = GATKDataLocation + "Comparisons/";
|
||||
public static final String annotationDataLocation = GATKDataLocation + "Annotations/";
|
||||
|
||||
public static final String b37GoodBAM = validationDataLocation + "/CEUTrio.HiSeq.b37.chr20.10_11mb.bam";
|
||||
|
||||
public static final String refseqAnnotationLocation = annotationDataLocation + "refseq/";
|
||||
public static final String hg18Refseq = refseqAnnotationLocation + "refGene-big-table-hg18.txt";
|
||||
public static final String hg19Refseq = refseqAnnotationLocation + "refGene-big-table-hg19.txt";
|
||||
|
|
|
|||
|
|
@ -54,4 +54,23 @@ public class EngineFeaturesIntegrationTest extends WalkerTest {
|
|||
@Test() private void testBadRODBindingInputTypeUnknownType() {
|
||||
testBadRODBindingInput("bedXXX", "Unknown input to VCF expecting walker", UserException.UnknownTribbleType.class);
|
||||
}
|
||||
|
||||
private void testMissingFile(String name, String missingBinding) {
|
||||
WalkerTestSpec spec = new WalkerTestSpec(missingBinding + " -R " + b37KGReference + " -o %s",
|
||||
1, UserException.CouldNotReadInputFile.class);
|
||||
executeTest(name, spec);
|
||||
}
|
||||
|
||||
@Test() private void testMissingBAMnt1() {
|
||||
testMissingFile("missing BAM", "-T UnifiedGenotyper -I missing.bam -nt 1");
|
||||
}
|
||||
@Test() private void testMissingBAMnt4() {
|
||||
testMissingFile("missing BAM", "-T UnifiedGenotyper -I missing.bam -nt 4");
|
||||
}
|
||||
@Test() private void testMissingVCF() {
|
||||
testMissingFile("missing VCF", "-T SelectVariants -V missing.vcf");
|
||||
}
|
||||
@Test() private void testMissingInterval() {
|
||||
testMissingFile("missing interval", "-T UnifiedGenotyper -L missing.interval_list -I " + b37GoodBAM);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue