Merge pull request #191 from broadinstitute/dr_fix_rod_system_locking

Detect stuck lock-acquisition calls, and disable file locking for tests
This commit is contained in:
Mark DePristo 2013-04-25 09:32:54 -07:00
commit 528c3d083a
15 changed files with 451 additions and 310 deletions

1
.gitignore vendored
View File

@ -1,7 +1,6 @@
/*.bam
/*.bai
/*.bed
*.idx
*~
/*.vcf
/*.txt

View File

@ -865,7 +865,8 @@ public class GenomeAnalysisEngine {
SAMSequenceDictionary sequenceDictionary,
GenomeLocParser genomeLocParser,
ValidationExclusion.TYPE validationExclusionType) {
final RMDTrackBuilder builder = new RMDTrackBuilder(sequenceDictionary,genomeLocParser, validationExclusionType);
final RMDTrackBuilder builder = new RMDTrackBuilder(sequenceDictionary,genomeLocParser, validationExclusionType,
getArguments().disableAutoIndexCreationAndLockingWhenReadingRods);
final List<ReferenceOrderedDataSource> dataSources = new ArrayList<ReferenceOrderedDataSource>();
for (RMDTriplet fileDescriptor : referenceMetaDataFiles)

View File

@ -274,6 +274,17 @@ public class GATKArgumentCollection {
@Argument(fullName = "unsafe", shortName = "U", doc = "If set, enables unsafe operations: nothing will be checked at runtime. For expert users only who know what they are doing. We do not support usage of this argument.", required = false)
public ValidationExclusion.TYPE unsafe;
@Hidden
@Advanced
@Argument(fullName = "disable_auto_index_creation_and_locking_when_reading_rods", shortName = "disable_auto_index_creation_and_locking_when_reading_rods",
doc = "UNSAFE FOR GENERAL USE (FOR TEST SUITE USE ONLY). Disable both auto-generation of index files and index file locking " +
"when reading VCFs and other rods and an index isn't present or is out-of-date. The file locking necessary for auto index " +
"generation to work safely is prone to random failures/hangs on certain platforms, which makes it desirable to disable it " +
"for situations like test suite runs where the indices are already known to exist, however this option is unsafe in general " +
"because it allows reading from index files without first acquiring a lock.",
required = false)
public boolean disableAutoIndexCreationAndLockingWhenReadingRods = false;
// --------------------------------------------------------------------------------------------------------------
//
// Multi-threading arguments

View File

@ -44,7 +44,6 @@ import org.broadinstitute.sting.utils.collections.Pair;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.file.FSLockWithShared;
import org.broadinstitute.sting.utils.file.FileSystemInabilityToLockException;
import org.broadinstitute.sting.utils.instrumentation.Sizeof;
import java.io.File;
@ -83,6 +82,10 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
private final FeatureManager featureManager;
// If true, do not attempt to create index files if they don't exist or are outdated, and don't
// make any file lock acquisition calls on the index files.
private final boolean disableAutoIndexCreation;
/**
* Construct an RMDTrackerBuilder, allowing the user to define tracks to build after-the-fact. This is generally
* used when walkers want to directly manage the ROD system for whatever reason. Before using this constructor,
@ -90,14 +93,19 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
* @param dict Sequence dictionary to use.
* @param genomeLocParser Location parser to use.
* @param validationExclusionType Types of validations to exclude, for sequence dictionary verification.
* @param disableAutoIndexCreation Do not auto-create index files, and do not use file locking when accessing index files.
* UNSAFE in general (because it causes us not to lock index files before reading them) --
* suitable only for test suite use.
*/
public RMDTrackBuilder(final SAMSequenceDictionary dict,
final GenomeLocParser genomeLocParser,
ValidationExclusion.TYPE validationExclusionType) {
final ValidationExclusion.TYPE validationExclusionType,
final boolean disableAutoIndexCreation) {
this.dict = dict;
this.validationExclusionType = validationExclusionType;
this.genomeLocParser = genomeLocParser;
this.featureManager = new FeatureManager(GenomeAnalysisEngine.lenientVCFProcessing(validationExclusionType));
this.disableAutoIndexCreation = disableAutoIndexCreation;
}
/**
@ -208,12 +216,15 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
// if we don't have a dictionary in the Tribble file, and we've set a dictionary for this builder, set it in the file if they match
if (sequenceDictionary.size() == 0 && dict != null) {
File indexFile = Tribble.indexFile(inputFile);
validateAndUpdateIndexSequenceDictionary(inputFile, index, dict);
try { // re-write the index
writeIndexToDisk(index,indexFile,new FSLockWithShared(indexFile));
} catch (IOException e) {
logger.warn("Unable to update index with the sequence dictionary for file " + indexFile + "; this will not effect your run of the GATK");
if ( ! disableAutoIndexCreation ) {
File indexFile = Tribble.indexFile(inputFile);
try { // re-write the index
writeIndexToDisk(index,indexFile,new FSLockWithShared(indexFile));
} catch (IOException e) {
logger.warn("Unable to update index with the sequence dictionary for file " + indexFile + "; this will not affect your run of the GATK");
}
}
sequenceDictionary = IndexDictionaryUtils.getSequenceDictionaryFromProperties(index);
@ -225,7 +236,7 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
throw new UserException(e.getMessage());
}
catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(inputFile, "unable to write Tribble index", e);
throw new UserException("I/O error loading or writing tribble index file for " + inputFile.getAbsolutePath(), e);
}
}
else {
@ -242,25 +253,36 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
* @return a linear index for the specified type
* @throws IOException if we cannot write the index file
*/
public synchronized Index loadIndex(File inputFile, FeatureCodec codec) throws IOException {
// create the index file name, locking on the index file name
File indexFile = Tribble.indexFile(inputFile);
FSLockWithShared lock = new FSLockWithShared(indexFile);
// acquire a lock on the file
public synchronized Index loadIndex( final File inputFile, final FeatureCodec codec) throws IOException {
final File indexFile = Tribble.indexFile(inputFile);
final FSLockWithShared lock = new FSLockWithShared(indexFile);
Index idx = null;
if (indexFile.canRead())
idx = attemptIndexFromDisk(inputFile, codec, indexFile, lock);
// if we managed to make an index, return
// If the index file exists and is readable, attempt to load it from disk. We'll get null back
// if a problem was discovered with the index file when it was inspected, and we'll get an
// in-memory index back in the case where the index file could not be locked.
if (indexFile.canRead()) {
idx = disableAutoIndexCreation ? loadFromDisk(inputFile, indexFile) // load without locking if we're in disableAutoIndexCreation mode
: attemptToLockAndLoadIndexFromDisk(inputFile, codec, indexFile, lock);
}
// If we have an index, it means we either loaded it from disk without issue or we created an in-memory
// index due to not being able to acquire a lock.
if (idx != null) return idx;
// we couldn't read the file, or we fell out of the conditions above, continue on to making a new index
return writeIndexToDisk(createIndexInMemory(inputFile, codec), indexFile, lock);
// We couldn't read the file, or we discovered a problem with the index file, so continue on to making a new index
idx = createIndexInMemory(inputFile, codec);
if ( ! disableAutoIndexCreation ) {
writeIndexToDisk(idx, indexFile, lock);
}
return idx;
}
/**
* attempt to read the index from disk
* Attempt to acquire a shared lock and then load the index from disk. Returns an in-memory index if
* a lock could not be obtained. Returns null if a problem was discovered with the index file when it
* was examined (eg., it was out-of-date).
*
* @param inputFile the input file
* @param codec the codec to read from
* @param indexFile the index file itself
@ -268,20 +290,21 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
* @return an index, or null if we couldn't load one
* @throws IOException if we fail for FS issues
*/
protected Index attemptIndexFromDisk(File inputFile, FeatureCodec codec, File indexFile, FSLockWithShared lock) throws IOException {
boolean locked;
protected Index attemptToLockAndLoadIndexFromDisk( final File inputFile, final FeatureCodec codec, final File indexFile, final FSLockWithShared lock ) throws IOException {
boolean locked = false;
Index idx = null;
try {
locked = lock.sharedLock();
}
catch(FileSystemInabilityToLockException ex) {
throw new UserException.MissortedFile(inputFile, "Unexpected inability to lock exception", ex);
}
Index idx;
try {
if (!locked) // can't lock file
if ( ! locked ) { // can't lock file
logger.info(String.format("Could not acquire a shared lock on index file %s, falling back to using an in-memory index for this GATK run.",
indexFile.getAbsolutePath()));
idx = createIndexInMemory(inputFile, codec);
else
}
else {
idx = loadFromDisk(inputFile, indexFile);
}
} finally {
if (locked) lock.unlock();
}
@ -294,7 +317,7 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
* @param indexFile the input file, plus the index extension
* @return an Index, or null if we're unable to load
*/
public static Index loadFromDisk(File inputFile, File indexFile) {
protected Index loadFromDisk( final File inputFile, final File indexFile ) {
logger.info("Loading Tribble index from disk for file " + inputFile);
Index index = IndexFactory.loadIndex(indexFile.getAbsolutePath());
@ -302,14 +325,17 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
if (index.isCurrentVersion() && indexFile.lastModified() >= inputFile.lastModified())
return index;
else if (indexFile.lastModified() < inputFile.lastModified())
logger.warn("Index file " + indexFile + " is out of date (index older than input file), deleting and updating the index file");
logger.warn("Index file " + indexFile + " is out of date (index older than input file), " +
(disableAutoIndexCreation ? "falling back to an in-memory index" : "deleting and updating the index file"));
else // we've loaded an old version of the index, we want to remove it <-- currently not used, but may re-enable
logger.warn("Index file " + indexFile + " is out of date (old version), deleting and updating the index file");
logger.warn("Index file " + indexFile + " is out of date (old version), " +
(disableAutoIndexCreation ? "falling back to an in-memory index" : "deleting and updating the index file"));
// however we got here, remove the index and return null
boolean deleted = indexFile.delete();
if ( ! disableAutoIndexCreation ) {
boolean deleted = indexFile.delete();
if (!deleted) logger.warn("Index file " + indexFile + " is out of date, but could not be removed; it will not be trusted (we'll try to rebuild an in-memory copy)");
}
if (!deleted) logger.warn("Index file " + indexFile + " is out of date, but could not be removed; it will not be trusted (we'll try to rebuild an in-memory copy)");
return null;
}
@ -319,13 +345,18 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
* @param index the index to write to disk
* @param indexFile the index file location
* @param lock the locking object
* @return the index object
* @throws IOException when unable to create the new index
*/
private static Index writeIndexToDisk(Index index, File indexFile, FSLockWithShared lock) throws IOException {
boolean locked = false; // could we exclusive lock the file?
private void writeIndexToDisk( final Index index, final File indexFile, final FSLockWithShared lock ) throws IOException {
if ( disableAutoIndexCreation ) {
return;
}
boolean locked = false;
try {
locked = lock.exclusiveLock(); // handle the case where we aren't locking anything
locked = lock.exclusiveLock();
if (locked) {
logger.info("Writing Tribble index to disk for file " + indexFile);
LittleEndianOutputStream stream = new LittleEndianOutputStream(new FileOutputStream(indexFile));
@ -337,11 +368,6 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
try { logger.info(String.format(" Index for %s has size in bytes %d", indexFile, Sizeof.getObjectGraphSize(index))); }
catch ( ReviewedStingException e) { }
return index;
}
catch(FileSystemInabilityToLockException ex) {
throw new UserException.CouldNotCreateOutputFile(indexFile,"Unexpected inability to lock exception", ex);
}
finally {
if (locked) lock.unlock();
@ -356,7 +382,7 @@ public class RMDTrackBuilder { // extends PluginManager<FeatureCodec> {
* @return a LinearIndex, given the file location
* @throws IOException when unable to create the index in memory
*/
private Index createIndexInMemory(File inputFile, FeatureCodec codec) {
protected Index createIndexInMemory(File inputFile, FeatureCodec codec) {
// this can take a while, let them know what we're doing
logger.info("Creating Tribble index in memory for file " + inputFile);
Index idx = IndexFactory.createDynamicIndex(inputFile, codec, IndexFactory.IndexBalanceApproach.FOR_SEEK_TIME);

View File

@ -577,7 +577,8 @@ public class DepthOfCoverage extends LocusWalker<Map<DoCOutputType.Partition,Map
private LocationAwareSeekableRODIterator initializeRefSeq() {
RMDTrackBuilder builder = new RMDTrackBuilder(getToolkit().getReferenceDataSource().getReference().getSequenceDictionary(),
getToolkit().getGenomeLocParser(),
getToolkit().getArguments().unsafe);
getToolkit().getArguments().unsafe,
getToolkit().getArguments().disableAutoIndexCreationAndLockingWhenReadingRods);
RMDTrack refseq = builder.createInstanceOfTrack(RefSeqCodec.class,refSeqGeneList);
return new SeekableRODIterator(refseq.getHeader(),refseq.getSequenceDictionary(),getToolkit().getReferenceDataSource().getReference().getSequenceDictionary(),
getToolkit().getGenomeLocParser(),refseq.getIterator());

View File

@ -193,7 +193,10 @@ public class VariantsToVCF extends RodWalker<Integer, Integer> {
if ( dbsnp == null )
throw new UserException.BadInput("No dbSNP rod was provided, but one is needed to decipher the correct indel alleles from the HapMap records");
RMDTrackBuilder builder = new RMDTrackBuilder(getToolkit().getReferenceDataSource().getReference().getSequenceDictionary(),getToolkit().getGenomeLocParser(),getToolkit().getArguments().unsafe);
RMDTrackBuilder builder = new RMDTrackBuilder(getToolkit().getReferenceDataSource().getReference().getSequenceDictionary(),
getToolkit().getGenomeLocParser(),
getToolkit().getArguments().unsafe,
getToolkit().getArguments().disableAutoIndexCreationAndLockingWhenReadingRods);
dbsnpIterator = builder.createInstanceOfTrack(VCFCodec.class, new File(dbsnp.dbsnp.getSource())).getIterator();
// Note that we should really use some sort of seekable iterator here so that the search doesn't take forever
// (but it's complicated because the hapmap location doesn't match the dbsnp location, so we don't know where to seek to)

View File

@ -440,4 +440,21 @@ public class UserException extends ReviewedStingException {
f.getAbsolutePath(), PHONE_HOME_DOCS_URL));
}
}
/**
* A special exception that happens only in the case where
* the filesystem, by design or configuration, is completely unable
* to handle locking. This exception will specifically NOT be thrown
* in the case where the filesystem handles locking but is unable to
* acquire a lock due to concurrency.
*/
public static class FileSystemInabilityToLockException extends UserException {
public FileSystemInabilityToLockException( String message ) {
super(message);
}
public FileSystemInabilityToLockException( String message, Exception innerException ) {
super(message,innerException);
}
}
}

View File

@ -26,15 +26,13 @@
package org.broadinstitute.sting.utils.file;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.channels.*;
import java.util.concurrent.*;
/**
* a quick implementation of a file based lock, using the Java NIO classes
@ -52,125 +50,244 @@ public class FSLockWithShared {
// the file channel we open
private FileChannel channel = null;
/**
* A bit of experimental code for Siva at Partners. Conditionally throw an
* exception in the case where an unknown failure occurs, in an effort to stave
* off disabled nfs file locks.
*/
private boolean throwExceptionOnUnknownFailure = false;
// Timeout (in milliseconds) before we give up during non-blocking lock-acquisition calls.
// Necessary because these "non-blocking" calls can hang if there's a problem with the
// OS file locking support.
private int lockAcquisitionTimeout;
// Default value for lockAcquisitionTimeout when none is explicitly provided
public static final int DEFAULT_LOCK_ACQUISITION_TIMEOUT_IN_MILLISECONDS = 30 * 1000;
// Amount of time to wait when trying to shut down the lock-acquisition thread before giving up
public static final int THREAD_TERMINATION_TIMEOUT_IN_MILLISECONDS = 30 * 1000;
/**
* create a file system, given a base file to which a lock string gets appended.
* @param baseFile File descriptor of file to lock
* Create a lock associated with the specified File. Use the default lock
* acquisition timeout of 30 seconds.
*
* @param file file to lock
*/
public FSLockWithShared(File baseFile) {
file = baseFile;
}
public FSLockWithShared(File baseFile,boolean throwExceptionOnUnknownFailure) {
this(baseFile);
this.throwExceptionOnUnknownFailure = throwExceptionOnUnknownFailure;
public FSLockWithShared( final File file ) {
this.file = file;
lockAcquisitionTimeout = DEFAULT_LOCK_ACQUISITION_TIMEOUT_IN_MILLISECONDS;
}
/**
* Get a shared (read) lock on a file
* Cannot get shared lock if it does not exist
* @return boolean true if we obtained a lock
* @throws FileSystemInabilityToLockException in cases of unexpected failure to capture lock.
* Create a lock associated with the specified File, and set a custom lock
* acquisition timeout.
*
* @param file file to lock
* @param lockAcquisitionTimeout maximum number of milliseconds to wait during non-blocking
* lock acquisition calls before concluding that there's a
* problem with the OS file locking support and throwing an error.
*/
public boolean sharedLock() throws FileSystemInabilityToLockException {
public FSLockWithShared( final File file, final int lockAcquisitionTimeout ) {
this.file = file;
this.lockAcquisitionTimeout = lockAcquisitionTimeout;
}
/**
* Get a shared (read) lock on a file. Does not block, and returns immediately
* under normal conditions with the result of the lock acquisition attempt. Will
* throw an exception if there's a problem with the OS file locking support.
*
* @return boolean true if we obtained a lock, false if we failed to obtain one
*/
public boolean sharedLock() {
return acquireLockWithTimeout(true);
}
/**
* Get an exclusive (read-write) lock on a file. Does not block, and returns immediately
* under normal conditions with the result of the lock acquisition attempt. Will
* throw an exception if there's a problem with the OS file locking support.
*
* @return boolean true if we obtained a lock, false if we failed to obtain one
*/
public boolean exclusiveLock() {
return acquireLockWithTimeout(false);
}
/**
* Attempt to acquire a lock of the specified type on the file in a background thread.
* Uses non-blocking lock-acquisition calls that should return immediately, but may
* get stuck if there's a problem with the OS file locking support. If the call gets
* stuck and the timeout elapses, throws a UserException, since it's not safe to
* proceed with a stuck lock acquisition thread (and there's no way to reliably
* interrupt it once the underlying system call hangs).
*
* @param acquireSharedLock if true, request a shared lock rather than an exclusive lock
* @return true if a lock was acquired, false if we failed
*/
private boolean acquireLockWithTimeout( final boolean acquireSharedLock ) {
// Use daemon threads so that hopelessly stuck lock acquisition threads won't prevent the JVM from exiting
final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread( Runnable r ) {
Thread lockAcquisitionThread = new Thread(r);
lockAcquisitionThread.setDaemon(true);
return lockAcquisitionThread;
}
});
final FutureTask<Boolean> lockAcquisitionTask = new FutureTask<Boolean>(new LockAcquisitionTask(acquireSharedLock));
boolean lockAcquired = false;
// get read-only file channel
try {
channel = new RandomAccessFile(file, "r").getChannel();
executor.execute(lockAcquisitionTask);
// Wait at most lockAcquisitionTimeout milliseconds for the lock acquisition task to finish.
lockAcquired = lockAcquisitionTask.get(lockAcquisitionTimeout, TimeUnit.MILLISECONDS);
}
catch (IOException e) {
logger.warn(String.format("WARNING: Unable to lock file %s (could not open read only file channel)",file.getAbsolutePath()));
return false;
// Lock acquisition timeout elapsed. Since we're using NON-BLOCKING lock-acquisition calls,
// this implies that there's a problem with the OS locking daemon, or locks are not supported.
// Since it's not safe to proceed with a potentially stuck lock acquisition thread, we need to
// shut down the JVM in order to kill it.
catch ( TimeoutException e ) {
throw new UserException.FileSystemInabilityToLockException(
String.format("Timeout of %d milliseconds was reached while trying to acquire a lock on file %s. " +
"Since the GATK uses non-blocking lock acquisition calls that are not supposed to wait, " +
"this implies a problem with the file locking support in your operating system.",
lockAcquisitionTimeout, file.getAbsolutePath()));
}
// get shared lock (third argument is true)
// Lock acquisition thread threw an exception. Need to unpack it via e.getCause()
catch ( ExecutionException e ) {
logger.warn(String.format("WARNING: Unable to lock file %s because exception %s occurred with error message %s",
file.getAbsolutePath(),
e.getCause() != null ? e.getCause().getClass().getSimpleName() : "unknown",
e.getCause() != null ? e.getCause().getMessage() : "none"));
lockAcquired = false;
}
// Interrupted while waiting for the lock acquisition thread -- not likely to happen
catch ( InterruptedException e ) {
logger.warn(String.format("WARNING: interrupted while attempting to acquire a lock for file %s", file.getAbsolutePath()));
lockAcquired = false;
}
catch ( Exception e ) {
logger.warn(String.format("WARNING: error while attempting to acquire a lock for file %s. Error message: %s",
file.getAbsolutePath(), e.getMessage()));
lockAcquired = false;
}
shutdownLockAcquisitionTask(executor);
// Upon failure to acquire a lock, we always call unlock() to close the FileChannel if it was opened
// and to deal with very hypothetical edge cases where a lock might actually have been acquired despite the
// lock acquisition thread returning false.
if ( ! lockAcquired ) {
unlock();
}
return lockAcquired;
}
/**
* Ensures that the lock acquisition task running in the provided executor has cleanly terminated.
* Throws a UserException if unable to shut it down within the period defined by the THREAD_TERMINATION_TIMEOUT.
*
* @param executor ExecutorService executing the lock-acquisition thread
*/
private void shutdownLockAcquisitionTask( final ExecutorService executor ) {
boolean shutdownAttemptSucceeded;
try {
lock = channel.tryLock(0, Long.MAX_VALUE, true);
if (lock == null) {
logger.warn(String.format("WARNING: Unable to lock file %s because there is already a lock active.",file.getAbsolutePath()));
executor.shutdownNow();
shutdownAttemptSucceeded = executor.awaitTermination(THREAD_TERMINATION_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e ) {
shutdownAttemptSucceeded = false;
}
if ( ! shutdownAttemptSucceeded ) {
throw new UserException(String.format("Failed to terminate lock acquisition thread while trying to lock file %s. " +
"Exiting because it's not safe to proceed with this run of the GATK.",
file.getAbsolutePath()));
}
}
/**
* Background task that attempts to acquire a lock of the specified type, and returns a boolean
* indicating success/failure. Uses a non-blocking tryLock() call that should return immediately
* (but may get stuck if there's a problem with the OS locking daemon).
*/
private class LockAcquisitionTask implements Callable<Boolean> {
private final boolean acquireSharedLock;
public LockAcquisitionTask( final boolean acquireSharedLock ) {
this.acquireSharedLock = acquireSharedLock;
}
public Boolean call() {
// Get a read-only or read-write file channel, depending on the type of lock
try {
channel = new RandomAccessFile(file, acquireSharedLock ? "r" : "rw").getChannel();
}
catch ( IOException e ) {
logger.warn(String.format("WARNING: Unable to lock file %s because we could not open a file channel", file.getAbsolutePath()));
return false;
}
}
catch (ClosedChannelException e) {
logger.warn(String.format("WARNING: Unable to lock file %s because the file channel is closed.",file.getAbsolutePath()));
return false;
}
catch (OverlappingFileLockException e) {
logger.warn(String.format("WARNING: Unable to lock file %s because you already have a lock on this file.",file.getAbsolutePath()));
return false;
}
catch (IOException e) {
logger.warn(String.format("WARNING: Unable to lock file %s: %s.",file.getAbsolutePath(),e.getMessage()));
if(throwExceptionOnUnknownFailure)
throw new FileSystemInabilityToLockException(e.getMessage(),e);
else
return false;
}
return true;
}
/**
* Get an exclusive lock on a file
* @return boolean true if we obtained a lock
* @throws FileSystemInabilityToLockException in cases of unexpected failure to capture lock.
*/
public boolean exclusiveLock() throws FileSystemInabilityToLockException {
boolean lockAcquired = false;
// read/write file channel is necessary for exclusive lock
try {
channel = new RandomAccessFile(file, "rw").getChannel();
}
catch (Exception e) {
logger.warn(String.format("WARNING: Unable to lock file %s (could not open read/write file channel)",file.getAbsolutePath()));
// do we need to worry about deleting file here? Does RandomAccessFile will only create file if successful?
return false;
}
// get exclusive lock (third argument is false)
try {
lock = channel.tryLock(0, Long.MAX_VALUE, false);
if (lock == null) {
logger.warn(String.format("WARNING: Unable to lock file %s because there is already a lock active.",file.getAbsolutePath()));
return false;
try {
// Non-blocking lock-acquisition call, should return right away. If it doesn't return immediately
// due to problems with the OS locking daemon, it will potentially be timed-out and interrupted.
lock = channel.tryLock(0, Long.MAX_VALUE, acquireSharedLock);
lockAcquired = lock != null;
}
else return true;
}
catch (ClosedChannelException e) {
logger.warn(String.format("WARNING: Unable to lock file %s because the file channel is closed.",file.getAbsolutePath()));
return false;
}
catch (OverlappingFileLockException e) {
logger.warn(String.format("WARNING: Unable to lock file %s because you already have a lock on this file.",file.getAbsolutePath()));
return false;
}
catch (IOException e) {
logger.warn(String.format("WARNING: Unable to lock file %s: %s.",file.getAbsolutePath(),e.getMessage()));
if(throwExceptionOnUnknownFailure)
throw new FileSystemInabilityToLockException(e.getMessage(),e);
else
return false;
catch ( AsynchronousCloseException e ) {
logger.warn(String.format("WARNING: Unable to lock file %s because the file channel was closed by another thread", file.getAbsolutePath()));
lockAcquired = false;
}
catch ( ClosedChannelException e ) {
logger.warn(String.format("WARNING: Unable to lock file %s because the file channel is closed.", file.getAbsolutePath()));
lockAcquired = false;
}
catch ( OverlappingFileLockException e ) {
logger.warn(String.format("WARNING: Unable to lock file %s because you already have a lock on this file.", file.getAbsolutePath()));
lockAcquired = false;
}
catch ( FileLockInterruptionException e ) {
logger.warn(String.format("WARNING: Interrupted while attempting to lock file %s", file.getAbsolutePath()));
lockAcquired = false;
}
catch ( IOException e ) {
logger.warn(String.format("WARNING: Unable to lock file %s because an IOException occurred with message: %s.", file.getAbsolutePath(), e.getMessage()));
lockAcquired = false;
}
return lockAcquired;
}
}
/**
* unlock the file
* Unlock the file
*
* note: this allows unlocking a file that failed to lock (no required user checks on null locks).
*/
public void unlock() {
releaseLock();
closeChannel();
}
private void releaseLock() {
try {
if (lock != null)
if ( lock != null )
lock.release();
if (channel != null)
}
catch ( ClosedChannelException e ) {
// if the channel was already closed we don't have to worry
}
catch ( IOException e ) {
throw new UserException(String.format("An error occurred while releasing the lock for file %s", file.getAbsolutePath()), e);
}
}
private void closeChannel() {
try {
if ( channel != null )
channel.close();
}
catch (Exception e) {
throw new ReviewedStingException("An error occurred while unlocking file", e);
catch ( IOException e ) {
throw new UserException(String.format("An error occurred while closing channel for file %s", file.getAbsolutePath()), e);
}
}
}

View File

@ -1,47 +0,0 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.file;
/**
* A special checked exception that happens only in the case where
* the filesystem, by design or configuration, is completely unable
* to handle locking. This exception will specifically NOT be thrown
* in the case where the filesystem handles locking but is unable to
* acquire a lock due to concurrency.
*
* @author hanna
* @version 0.1
*/
public class FileSystemInabilityToLockException extends Exception {
/**
* Force user to create this exception with a nested inner stack trace.
* @param message Exception message.
* @param innerException Caused-by exception.
*/
public FileSystemInabilityToLockException(String message,Exception innerException) {
super(message,innerException);
}
}

View File

@ -53,6 +53,7 @@ public class WalkerTest extends BaseTest {
private static final boolean GENERATE_SHADOW_BCF = true;
private static final boolean ENABLE_PHONE_HOME_FOR_TESTS = false;
private static final boolean ENABLE_ON_THE_FLY_CHECK_FOR_VCF_INDEX = false;
private static final boolean ENABLE_AUTO_INDEX_CREATION_AND_LOCKING_FOR_TESTS = false;
private static MD5DB md5DB = new MD5DB();
@ -209,6 +210,8 @@ public class WalkerTest extends BaseTest {
String.format(" -et %s -K %s ", GATKRunReport.PhoneHomeOption.NO_ET, gatkKeyFile));
if ( includeShadowBCF && GENERATE_SHADOW_BCF )
args = args + " --generateShadowBCF ";
if ( ! ENABLE_AUTO_INDEX_CREATION_AND_LOCKING_FOR_TESTS )
args = args + " --disable_auto_index_creation_and_locking_when_reading_rods ";
}
return args;

View File

@ -84,7 +84,8 @@ public class ReferenceOrderedViewUnitTest extends BaseTest {
// sequence
seq = new CachingIndexedFastaSequenceFile(new File(hg18Reference));
genomeLocParser = new GenomeLocParser(seq);
builder = new RMDTrackBuilder(seq.getSequenceDictionary(),genomeLocParser,null);
// disable auto-index creation/locking in the RMDTrackBuilder for tests
builder = new RMDTrackBuilder(seq.getSequenceDictionary(),genomeLocParser,null,true);
}
/**

View File

@ -95,12 +95,9 @@ public class ReferenceOrderedDataPoolUnitTest extends BaseTest {
public void setUp() {
String fileName = privateTestDir + "TabularDataTest.dat";
// check to see if we have an index, if so, delete it
File indexFileName = new File(privateTestDir + "TabularDataTest.dat.idx");
if (indexFileName.exists()) indexFileName.delete();
triplet = new RMDTriplet("tableTest","Table",fileName,RMDStorageType.FILE,new Tags());
builder = new RMDTrackBuilder(seq.getSequenceDictionary(),genomeLocParser,null);
// disable auto-index creation/locking in the RMDTrackBuilder for tests
builder = new RMDTrackBuilder(seq.getSequenceDictionary(),genomeLocParser,null,true);
}
@Test

View File

@ -27,17 +27,15 @@ package org.broadinstitute.sting.gatk.refdata.tracks;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import net.sf.samtools.SAMSequenceDictionary;
import org.broad.tribble.Tribble;
import org.broad.tribble.index.Index;
import org.broadinstitute.variant.vcf.VCF3Codec;
import org.broad.tribble.util.LittleEndianOutputStream;
import org.broadinstitute.variant.vcf.VCFCodec;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.testng.Assert;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile;
import org.broadinstitute.sting.utils.file.FSLockWithShared;
import org.testng.annotations.BeforeMethod;
@ -61,7 +59,7 @@ public class RMDTrackBuilderUnitTest extends BaseTest {
@BeforeMethod
public void setup() {
File referenceFile = new File(b36KGReference);
File referenceFile = new File(b37KGReference);
try {
seq = new CachingIndexedFastaSequenceFile(referenceFile);
}
@ -69,7 +67,11 @@ public class RMDTrackBuilderUnitTest extends BaseTest {
throw new UserException.CouldNotReadInputFile(referenceFile,ex);
}
genomeLocParser = new GenomeLocParser(seq);
builder = new RMDTrackBuilder(seq.getSequenceDictionary(),genomeLocParser,null);
// We have to disable auto-index creation/locking in the RMDTrackBuilder for tests,
// as the lock acquisition calls were intermittently hanging on our farm. This unfortunately
// means that we can't include tests for the auto-index creation feature.
builder = new RMDTrackBuilder(seq.getSequenceDictionary(),genomeLocParser,null,true);
}
@Test
@ -78,134 +80,83 @@ public class RMDTrackBuilderUnitTest extends BaseTest {
}
@Test
// in this test, the index exists, but is out of date.
public void testBuilderIndexUnwriteable() {
File vcfFile = new File(validationDataLocation + "/ROD_validation/read_only/relic.vcf");
try {
builder.loadIndex(vcfFile, new VCF3Codec());
} catch (IOException e) {
e.printStackTrace();
Assert.fail("IO exception unexpected" + e.getMessage());
}
// make sure we didn't write the file (check that it's timestamp is within bounds)
//System.err.println(new File(vcfFile + RMDTrackBuilder.indexExtension).lastModified());
Assert.assertTrue(Math.abs(1279591752000l - Tribble.indexFile(vcfFile).lastModified()) < 100);
public void testDisableAutoIndexGeneration() throws IOException {
final File unindexedVCF = new File(privateTestDir + "unindexed.vcf");
final File unindexedVCFIndex = Tribble.indexFile(unindexedVCF);
Index index = builder.loadIndex(unindexedVCF, new VCFCodec());
Assert.assertFalse(unindexedVCFIndex.exists());
Assert.assertNotNull(index);
}
// we have a good index file, in a read-only dir. This would cause the previous version to remake the index; make
// sure we don't do this
@Test
public void testDirIsLockedIndexFromDisk() {
File vcfFile = new File(validationDataLocation + "/ROD_validation/read_only/good_index.vcf");
File vcfFileIndex = Tribble.indexFile(vcfFile);
Index ind = null;
try {
ind = builder.attemptIndexFromDisk(vcfFile,new VCFCodec(),vcfFileIndex,new FSLockWithShared(vcfFile));
} catch (IOException e) {
Assert.fail("We weren't expecting an exception -> " + e.getMessage());
}
// make sure we get back a null index; i.e. we can't load the index from disk
Assert.assertTrue(ind == null);
public void testLoadOnDiskIndex() {
final File originalVCF = new File(privateTestDir + "vcf4.1.example.vcf");
final File tempVCFWithCorrectIndex = createTempVCFFileAndIndex(originalVCF, false);
final File tempVCFIndexFile = Tribble.indexFile(tempVCFWithCorrectIndex);
final Index index = builder.loadFromDisk(tempVCFWithCorrectIndex, tempVCFIndexFile);
Assert.assertNotNull(index);
Assert.assertTrue(tempVCFIndexFile.exists());
final Index inMemoryIndex = builder.createIndexInMemory(tempVCFWithCorrectIndex, new VCFCodec());
Assert.assertTrue(index.equalsIgnoreProperties(inMemoryIndex));
}
@Test
public void testBuilderIndexDirectoryUnwritable() {
File vcfFile = new File(validationDataLocation + "/ROD_validation/read_only/no_index.vcf");
File vcfFileIndex = Tribble.indexFile(vcfFile);
public void testLoadOnDiskOutdatedIndex() {
final File originalVCF = new File(privateTestDir + "vcf4.1.example.vcf");
final File tempVCFWithOutdatedIndex = createTempVCFFileAndIndex(originalVCF, true);
final File tempVCFIndexFile = Tribble.indexFile(tempVCFWithOutdatedIndex);
Index ind = null;
try {
ind = builder.loadIndex(vcfFile, new VCF3Codec());
} catch (IOException e) {
e.printStackTrace();
Assert.fail("IO exception unexpected" + e.getMessage());
}
// make sure we didn't write the file (check that it's timestamp is within bounds)
Assert.assertTrue(!vcfFileIndex.exists());
Assert.assertTrue(ind != null);
final Index index = builder.loadFromDisk(tempVCFWithOutdatedIndex, tempVCFIndexFile);
}
@Test
public void testGenerateIndexForUnindexedFile() {
File vcfFile = new File(privateTestDir + "always_reindex.vcf");
File vcfFileIndex = Tribble.indexFile(vcfFile);
// if we can't write to the directory, don't fault the tester, just pass
if (!vcfFileIndex.getParentFile().canWrite()) {
logger.warn("Unable to run test testGenerateIndexForUnindexedFile: unable to write to dir " + vcfFileIndex.getParentFile());
return;
}
// clean-up our test, and previous tests that may have written the file
vcfFileIndex.deleteOnExit();
if (vcfFileIndex.exists())
vcfFileIndex.delete();
try {
builder.loadIndex(vcfFile, new VCFCodec());
} catch (IOException e) {
e.printStackTrace();
Assert.fail("IO exception unexpected" + e.getMessage());
}
// make sure we wrote the file
Assert.assertTrue(vcfFileIndex.exists());
}
// test to make sure we get a full sequence dictionary from the VCF (when we set the dictionary in the builder)
@Test
public void testBuilderIndexSequenceDictionary() {
File vcfFile = createCorrectDateIndexFile(new File(validationDataLocation + "/ROD_validation/newerTribbleTrack.vcf"));
Long indexTimeStamp = Tribble.indexFile(vcfFile).lastModified();
try {
Index idx = builder.loadIndex(vcfFile, new VCFCodec());
// catch any exception; this call should pass correctly
SAMSequenceDictionary dict = IndexDictionaryUtils.getSequenceDictionaryFromProperties(idx);
} catch (IOException e) {
e.printStackTrace();
Assert.fail("IO exception unexpected" + e.getMessage());
}
// make sure that we removed and updated the index
Assert.assertTrue(Tribble.indexFile(vcfFile).lastModified() >= indexTimeStamp,"Fail: index file was modified");
// loadFromDisk() should return null to indicate that the index is outdated and should not be used,
// but should not delete the index since our builder has disableAutoIndexCreation set to true
Assert.assertNull(index);
Assert.assertTrue(tempVCFIndexFile.exists());
}
/**
* create a temporary file and an associated out of date index file
* Create a temporary vcf file and an associated index file, which may be set to be out-of-date
* relative to the vcf
*
* @param tribbleFile the tribble file
* @return a file pointing to the new tmp location, with out of date index
* @param vcfFile the vcf file
* @param createOutOfDateIndex if true, ensure that the temporary vcf file is modified after the index
* @return a file pointing to the new tmp location, with accompanying index
*/
private File createCorrectDateIndexFile(File tribbleFile) {
private File createTempVCFFileAndIndex( final File vcfFile, final boolean createOutOfDateIndex ) {
try {
// first copy the tribble file to a temperary file
File tmpFile = File.createTempFile("TribbleUnitTestFile", "");
final File tmpFile = File.createTempFile("RMDTrackBuilderUnitTest", "");
final File tmpIndex = Tribble.indexFile(tmpFile);
tmpFile.deleteOnExit();
logger.info("creating temp file " + tmpFile);
// copy the vcf (tribble) file to the tmp file location
copyFile(tribbleFile, tmpFile);
// sleep again, to make sure the timestamps are different (vcf vrs updated index file)
Thread.sleep(2000);
// create a fake index, before we copy so it's out of date
File tmpIndex = Tribble.indexFile(tmpFile);
tmpIndex.deleteOnExit();
// copy the vcf (tribble) file to the tmp file location
copyFile(Tribble.indexFile(tribbleFile), tmpIndex);
copyFile(vcfFile, tmpFile);
final Index inMemoryIndex = builder.createIndexInMemory(tmpFile, new VCFCodec());
final LittleEndianOutputStream indexOutputStream = new LittleEndianOutputStream(new FileOutputStream(tmpIndex));
// If requested, modify the tribble file after the index. Otherwise, modify the index last.
if ( createOutOfDateIndex ) {
inMemoryIndex.write(indexOutputStream);
indexOutputStream.close();
Thread.sleep(2000);
copyFile(vcfFile, tmpFile);
}
else {
copyFile(vcfFile, tmpFile);
Thread.sleep(2000);
inMemoryIndex.write(indexOutputStream);
indexOutputStream.close();
}
return tmpFile;
} catch (IOException e) {
Assert.fail("Unable to create temperary file");
} catch (InterruptedException e) {
Assert.fail("Somehow our thread got interupted");
Assert.fail("Somehow our thread got interrupted");
}
return null;
}

View File

@ -45,7 +45,8 @@ public class TestRMDTrackBuilder extends RMDTrackBuilder {
private GenomeLocParser genomeLocParser;
public TestRMDTrackBuilder(SAMSequenceDictionary dict, GenomeLocParser genomeLocParser) {
super(dict, genomeLocParser, null);
// disable auto-index creation/locking in the RMDTrackBuilder for tests
super(dict, genomeLocParser, null, true);
this.genomeLocParser = genomeLocParser;
}

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2012 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.utils.file;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.testng.annotations.Test;
import java.io.File;
public class FSLockWithSharedUnitTest extends BaseTest {
private static final int MAX_EXPECTED_LOCK_ACQUISITION_TIME = FSLockWithShared.DEFAULT_LOCK_ACQUISITION_TIMEOUT_IN_MILLISECONDS +
FSLockWithShared.THREAD_TERMINATION_TIMEOUT_IN_MILLISECONDS;
/**
* Test to ensure that we're never spending more than the maximum configured amount of time in lock acquisition calls.
*/
@Test( timeOut = MAX_EXPECTED_LOCK_ACQUISITION_TIME + 10 * 1000 )
public void testLockAcquisitionTimeout() {
final File lockFile = createTempFile("FSLockWithSharedUnitTest", ".lock");
final FSLockWithShared lock = new FSLockWithShared(lockFile);
boolean lockAcquisitionSucceeded = false;
try {
lockAcquisitionSucceeded = lock.sharedLock();
}
catch ( UserException e ) {
logger.info("Caught UserException from lock acquisition call: lock acquisition must have timed out. Message: " + e.getMessage());
}
finally {
if ( lockAcquisitionSucceeded ) {
lock.unlock();
}
}
}
}