diff --git a/analysis/depristo/distributedGATK/distributedGATKPerformance.scala b/analysis/depristo/distributedGATK/distributedGATKPerformance.scala index 0d1eb0e19..244898b47 100755 --- a/analysis/depristo/distributedGATK/distributedGATKPerformance.scala +++ b/analysis/depristo/distributedGATK/distributedGATKPerformance.scala @@ -13,7 +13,7 @@ class DistributedGATKPerformance extends QScript { @Argument(shortName="outputDir", doc="output directory", required=false) var outputDir: String = "" - @Argument(shortName="dataset", doc="selects the datasets to run. If not provided, all datasets will be used", required=true) + @Argument(shortName="dataset", doc="selects the datasets to run. If not provided, all datasets will be used", required=false) var datasets: List[String] = Nil @Argument(shortName="waysParallel", doc="selects the datasets to run. If not provided, all datasets will be used", required=false) @@ -25,8 +25,11 @@ class DistributedGATKPerformance extends QScript { @Argument(shortName="test", doc="runs long calculations", required=false) var test: Boolean = false - //@Argument(shortName="noBAQ", doc="turns off BAQ calculation", required=false) - var noBAQ: Boolean = false + @Argument(shortName="limitTo30Min", doc="runs long calculations", required=false) + var limitTo30Min: Boolean = false + + @Argument(shortName="trackerDir", doc="root directory for distributed tracker files", required=false) + var trackerDir: String = "" // "/humgen/gsa-scr1/depristo/tmp/" trait UNIVERSAL_GATK_ARGS extends CommandLineGATK { logging_level = "DEBUG"; jarFile = gatkJarFile; memoryLimit = Some(2); } @@ -40,7 +43,8 @@ class DistributedGATKPerformance extends QScript { val goldStandard_VCF: File, val intervals: String, val titvTarget: Double, - val isLowpass: Boolean) { + val isLowpass: Boolean, + val useBAQ: Boolean) { val name = qscript.outputDir + baseName val clusterFile = new File(name + ".clusters") def rawVCF(part: String) = new File(name + "." + part + ".raw.vcf") @@ -74,67 +78,73 @@ class DistributedGATKPerformance extends QScript { // produce Kiran's Venn plots based on comparison between new VCF and gold standard produced VCF val lowPass: Boolean = true - val CHROMOSOME: String = "chr1" val targetDataSets: Map[String, Target] = Map( "HiSeq" -> new Target("NA12878.HiSeq", hg18, dbSNP_hg18, hapmap_hg18, "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/HiSeq.WGS.cleaned.indels.10.mask", new File("/humgen/gsa-hpprojects/NA12878Collection/bams/NA12878.HiSeq.WGS.bwa.cleaned.recal.bam"), new File("/home/radon01/depristo/work/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/HiSeq.WGS.cleaned.ug.snpfiltered.indelfiltered.vcf"), - "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/distributedGATK/whole_genome_chunked.hg18.intervals", 2.07, !lowPass), + "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/distributedGATK/whole_genome_chunked.hg18.intervals", 2.07, !lowPass, true), "FIN" -> new Target("FIN", b37, dbSNP_b37, hapmap_b37, indelMask_b37, new File("/humgen/1kg/processing/pipeline_test_bams/FIN.79sample.Nov2010.chr20.bam"), new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** - "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass), + "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass, true), "WEx" -> new Target("NA12878.WEx", hg18, dbSNP_hg18, hapmap_hg18, "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/GA2.WEx.cleaned.indels.10.mask", new File("/humgen/gsa-hpprojects/NA12878Collection/bams/NA12878.WEx.cleaned.recal.bam"), new File("/home/radon01/depristo/work/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/GA2.WEx.cleaned.ug.snpfiltered.indelfiltered.vcf"), - "/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.targets.interval_list", 2.6, !lowPass), + "/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.targets.interval_list", 2.6, !lowPass, true), "TGPWExGdA" -> new Target("1000G.WEx.GdA", b37, dbSNP_b37, hapmap_b37, indelMask_b37, - new File("/humgen/1kg/processing/pipeline_test_bams/Barcoded_1000G_WEx_Reduced_Plate_1.cleaned.list"), // BUGBUG: reduce from 60 to 20 people + new File("/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/distributedGATK/Barcoded_1000G_WEx_Reduced_Plate_1.20.cleaned.list"), // BUGBUG: reduce from 60 to 20 people new File("/humgen/gsa-scr1/delangel/NewUG/calls/AugustRelease.filtered_Q50_QD5.0_SB0.0.allSamples.SNPs_hg19.WEx_UG_newUG_MQC.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** - "/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list", 2.6, !lowPass), + "/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list", 2.6, !lowPass, true), "LowPassN60" -> new Target("lowpass.N60", b36, dbSNP_b36, hapmap_b36, indelMask_b36, new File("/humgen/1kg/analysis/bamsForDataProcessingPapers/lowpass_b36/lowpass.chr20.cleaned.matefixed.bam"), // the bam list to call from new File("/home/radon01/depristo/work/oneOffProjects/VQSRCutByNRS/lowpass.N60.chr20.filtered.vcf"), // the gold standard VCF file to run through the VQSR - "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.b36.intervals", 2.3, lowPass), // chunked interval list to use with Queue's scatter/gather functionality - "LowPassAugust" -> new Target("ALL.august.v4", b37, dbSNP_b37, hapmap_b37, indelMask_b37, // BUGBUG: kill this, it is too large - new File("/humgen/1kg/processing/allPopulations_chr20_august_release.cleaned.merged.bams/ALL.cleaned.merged.list"), - new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), - "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass), + "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.b36.intervals", 2.3, lowPass,true), // chunked interval list to use with Queue's scatter/gather functionality +// "LowPassAugust" -> new Target("ALL.august.v4", b37, dbSNP_b37, hapmap_b37, indelMask_b37, // BUGBUG: kill this, it is too large +// new File("/humgen/1kg/processing/allPopulations_chr20_august_release.cleaned.merged.bams/ALL.cleaned.merged.list"), +// new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), +// "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass, false), "LowPassEUR363Nov" -> new Target("EUR.nov2010", b37, dbSNP_b37, hapmap_b37, indelMask_b37, new File("/humgen/1kg/processing/pipeline_test_bams/EUR.363sample.Nov2010.chr20.bam"), new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** - "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass), + "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass,false), "WExTrio" -> new Target("NA12878Trio.WEx", b37, dbSNP_b37, hapmap_b37, indelMask_b37, - new File("/humgen/gsa-scr1/carneiro/prj/trio/data/NA12878Trio.WEx.hg19.recal.bam"), + new File("/humgen/gsa-hpprojects/NA12878Collection/bams/CEUTrio.HiSeq.WEx.bwa.cleaned.recal.bams.list"), new File("/humgen/gsa-scr1/delangel/NewUG/calls/AugustRelease.filtered_Q50_QD5.0_SB0.0.allSamples.SNPs_hg19.WEx_UG_newUG_MQC.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** - "/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list", 2.6, !lowPass) + "/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list", 2.6, !lowPass, true) ) + def getTargetInterval(target: Target): List[String] = target.name match { + case "NA12878.HiSeq" => List("chr1") + case "FIN" => List("20") + case _ => List(target.intervals) + } + def script = { // Selects the datasets in the -dataset argument and adds them to targets. var targets: List[Target] = List() - for (ds <- datasets) - targets ::= targetDataSets(ds) // Could check if ds was mispelled, but this way an exception will be thrown, maybe it's better this way? - - var nWays = if (long) List(1, 2, 4, 8) else List(16, 32, 64, 96) - if ( ! waysParallelArg.isEmpty ) - nWays = waysParallelArg + if (!datasets.isEmpty) + for (ds <- datasets) + targets ::= targetDataSets(ds) // Could check if ds was mispelled, but this way an exception will be thrown, maybe it's better this way? + else // If -dataset is not specified, all datasets are used. + for (targetDS <- targetDataSets.valuesIterator) // for Scala 2.7 or older, use targetDataSets.values + targets ::= targetDS + val nWays = if ( test ) List(32) else { if ( long ) List(1,2,4,8) else List(16,32,64,128) } //val nWays = List(2) for (target <- targets) { for ( scatterP <- if ( test ) List(false) else List(true, false) ) - for (nWaysParallel <- if ( test ) List(32) else nWays) { + for (nWaysParallel <- nWays ) { val aname = "ptype_%s.nways_%d".format(if ( scatterP ) "sg" else "dist", nWaysParallel) def addUG(ug: UnifiedGenotyper) = { if ( ! long ) ug.jobLimitSeconds = Some(60 * 60 * 4) - if ( test ) + if ( limitTo30Min ) ug.jobLimitSeconds = Some(60 * 30) add(ug); } @@ -148,11 +158,9 @@ class DistributedGATKPerformance extends QScript { } else { for ( part <- 1 to nWaysParallel) { var ug: UnifiedGenotyper = new UnifiedGenotyper(target, aname + ".part" + part) - if ( target.name.equals("NA12878.HiSeq")) - ug.intervalsString ++= List(CHROMOSOME) - else - ug.intervalsString ++= List(target.intervals) - ug.processingTracker = new File(target.name + "." + aname + ".distributed.txt") + ug.intervalsString ++= getTargetInterval(target) + ug.processingTracker = new File(trackerDir + target.name + "." + aname + ".distributed.txt") + ug.processingTrackerID = Some(part) if ( part == 1 ) ug.performanceLog = new File("%s.%s.pf.log".format(target.name, aname)) ug.processingTrackerStatusFile = new File("%s.%s.%d.ptstatus.log".format(target.name, aname, part)) @@ -172,7 +180,7 @@ class DistributedGATKPerformance extends QScript { this.stand_emit_conf = Some( if ( t.isLowpass ) { 4.0 } else { 30.0 } ) this.input_file :+= t.bamList this.out = t.rawVCF(aname) - this.baq = Some( if (noBAQ) {org.broadinstitute.sting.utils.baq.BAQ.CalculationMode.OFF} else {org.broadinstitute.sting.utils.baq.BAQ.CalculationMode.RECALCULATE}) + this.baq = Some( if (t.useBAQ) {org.broadinstitute.sting.utils.baq.BAQ.CalculationMode.RECALCULATE} else {org.broadinstitute.sting.utils.baq.BAQ.CalculationMode.OFF}) this.analysisName = t.name + "_UG." + aname if (t.dbsnpFile.endsWith(".rod")) this.DBSNP = new File(t.dbsnpFile) diff --git a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index f7c5f7c04..891fe5120 100755 --- a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -232,6 +232,11 @@ public class GATKArgumentCollection { @Hidden public File processingTrackerStatusFile = null; + @Element(required=false) + @Argument(fullName="processingTrackerID",shortName="CID",doc="If provided, an integer ID (starting at 1) indicating a unique id for this process within the distributed GATK group",required=false) + @Hidden + public int processTrackerID = -1; + // -------------------------------------------------------------------------------------------------------------- // // methods @@ -410,6 +415,9 @@ public class GATKArgumentCollection { if ( restartProcessingTracker != other.restartProcessingTracker ) return false; + if ( processTrackerID != other.processTrackerID ) + return false; + return true; } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 097adada0..f372dbcc2 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -49,8 +49,7 @@ import net.sf.picard.reference.IndexedFastaSequenceFile; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; -import org.broadinstitute.sting.utils.threading.GenomeLocProcessingTracker; -import org.broadinstitute.sting.utils.threading.ProcessingLoc; +import org.broadinstitute.sting.utils.threading.*; import javax.management.JMException; import javax.management.MBeanServer; @@ -180,10 +179,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { } } - processingTracker = GenomeLocProcessingTracker.createFileBackedDistributed(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser(), false, statusStream); - logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile + " process.id = " + engine.getName()); + ClosableReentrantLock lock = new SharedFileThreadSafeLock(engine.getArguments().processingTrackerFile, engine.getArguments().processTrackerID); + processingTracker = new FileBackedGenomeLocProcessingTracker(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser(), lock, statusStream) ; + logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile + " process.id = " + engine.getName() + " CID = " + engine.getArguments().processTrackerID); } else { - processingTracker = GenomeLocProcessingTracker.createNoOp(); + processingTracker = new NoOpGenomeLocProcessingTracker(); } } diff --git a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java index 57ef756d7..93e07b3d7 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java @@ -27,7 +27,7 @@ public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTra private final GenomeLocParser parser; private long lastReadPosition = 0; - protected FileBackedGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, ClosableReentrantLock lock, PrintStream status) { + public FileBackedGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, ClosableReentrantLock lock, PrintStream status) { super(lock, status); this.sharedFile = sharedFile; diff --git a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java index ba80991ad..9f321d9f7 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java @@ -22,9 +22,10 @@ import java.util.concurrent.locks.ReentrantLock; public abstract class GenomeLocProcessingTracker { private final static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); private final static SimpleDateFormat STATUS_FORMAT = new SimpleDateFormat("HH:mm:ss,SSS"); - private final static int DEFAULT_OWNERSHIP_ITERATOR_SIZE = 100; + private final static int DEFAULT_OWNERSHIP_ITERATOR_SIZE = 20; private final static String GOING_FOR_LOCK = "going_for_lock"; + private final static String RELEASING_LOCK = "releasing_lock"; private final static String HAVE_LOCK = "have_lock"; private final static String RUNNING = "running"; @@ -37,33 +38,7 @@ public abstract class GenomeLocProcessingTracker { protected SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer"); protected long nLocks = 0, nWrites = 0, nReads = 0; - // -------------------------------------------------------------------------------- - // - // Factory methods for creating ProcessingTrackers - // - // -------------------------------------------------------------------------------- - - public static GenomeLocProcessingTracker createNoOp() { - return new NoOpGenomeLocProcessingTracker(); - } - - public static GenomeLocProcessingTracker createSharedMemory() { - return new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock()); - } - - public static GenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser, PrintStream status) { - return createFileBacked(sharedFile, parser, false, false, status); - } - - public static GenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser, boolean blockingP, PrintStream status) { - return createFileBacked(sharedFile, parser, blockingP, true, status); - } - - private static FileBackedGenomeLocProcessingTracker createFileBacked(File sharedFile, GenomeLocParser parser, boolean blockP, boolean useFileLockToo, PrintStream status) { - //logger.warn("Creating file backed GLPT at " + sharedFile); - ClosableReentrantLock lock = useFileLockToo ? new SharedFileThreadSafeLock(sharedFile, blockP) : new ClosableReentrantLock(); - return new FileBackedGenomeLocProcessingTracker(sharedFile, parser, lock, status); - } + // TODO -- LOCK / UNLOCK OPERATIONS NEEDS TO HAVE MORE INTELLIGENT TRY / CATCH // -------------------------------------------------------------------------------- // @@ -113,25 +88,28 @@ public abstract class GenomeLocProcessingTracker { * @param myName * @return */ - public final ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { + public final ProcessingLoc claimOwnership(final GenomeLoc loc, final String myName) { // processingLocs is a shared memory synchronized object, and this // method is synchronized, so we can just do our processing - lock(myName); - try { - ProcessingLoc owner = findOwner(loc, myName); - - if ( owner == null ) { // we are unowned - owner = new ProcessingLoc(loc, myName); - registerNewLocsWithTimers(Arrays.asList(owner), myName); + return new WithLock(myName) { + public ProcessingLoc doBody() { + ProcessingLoc owner = findOwner(loc, myName); + if ( owner == null ) { // we are unowned + owner = new ProcessingLoc(loc, myName); + registerNewLocsWithTimers(Arrays.asList(owner), myName); + } + return owner; } - - return owner; - //logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner)); - } finally { - unlock(myName); - } + }.run(); } + + // -------------------------------------------------------------------------------- + // + // High-level iterator-style interface to claiming ownership + // + // -------------------------------------------------------------------------------- + /** * A higher-level, and more efficient, interface to obtain the next location we own. Takes an * iterator producing objects that support the getLocation() interface, and returns the next @@ -153,7 +131,7 @@ public abstract class GenomeLocProcessingTracker { return new OwnershipIterator(iterator, myName); } - protected final class OwnershipIterator implements Iterator, Iterable { + private final class OwnershipIterator implements Iterator, Iterable { private final Iterator subit; private final String myName; private final Queue cache; @@ -186,46 +164,42 @@ public abstract class GenomeLocProcessingTracker { * @return an object of type T owned by this thread, or null if none of the remaining object could be claimed */ public final T next() { - T elt = cache.poll(); - if ( elt != null) - return elt; + if ( cache.peek() != null) + return cache.poll(); else { // cache is empty, we need to fill up the cache and return the first element of the queue - lock(myName); - try { + return new WithLock(myName) { + public T doBody() { + // read once the database of owners at the start + updateLocs(myName); - // read once the database of owners at the start - updateLocs(myName); + boolean done = false; + Queue pwns = new LinkedList(); // ;-) + while ( !done && cache.size() < cacheSize && subit.hasNext() ) { + final T elt = subit.next(); + //logger.warn("Checking elt for ownership " + elt); + GenomeLoc loc = elt.getLocation(); - boolean done = false; - Queue pwns = new LinkedList(); // ;-) - while ( !done && cache.size() < cacheSize && subit.hasNext() ) { - elt = subit.next(); - //logger.warn("Checking elt for ownership " + elt); - GenomeLoc loc = elt.getLocation(); + ProcessingLoc owner = findOwnerInMap(loc, processingLocs); - ProcessingLoc owner = findOwnerInMap(loc, processingLocs); - - if ( owner == null ) { // we are unowned - owner = new ProcessingLoc(loc, myName); - pwns.offer(owner); - if ( ! cache.offer(elt) ) throw new ReviewedStingException("Cache offer unexpectedly failed"); - if ( GenomeLoc.isUnmapped(loc) ) done = true; + if ( owner == null ) { // we are unowned + owner = new ProcessingLoc(loc, myName); + pwns.offer(owner); + if ( ! cache.offer(elt) ) throw new ReviewedStingException("Cache offer unexpectedly failed"); + if ( GenomeLoc.isUnmapped(loc) ) done = true; + } + // if not, we continue our search } - // if not, we continue our search + + registerNewLocsWithTimers(pwns, myName); + + // we've either filled up the cache or run out of elements. Either way we return + // the first element of the cache. If the cache is empty, we return null here. + //logger.warn("Cache size is " + cache.size()); + //logger.warn("Cache contains " + cache); + return cache.poll(); } - - registerNewLocsWithTimers(pwns, myName); - - // we've either filled up the cache or run out of elements. Either way we return - // the first element of the cache. If the cache is empty, we return null here. - //logger.warn("Cache size is " + cache.size()); - //logger.warn("Cache contains " + cache); - - return cache.poll(); - } finally { - unlock(myName); - } + }.run(); } } @@ -252,17 +226,16 @@ public abstract class GenomeLocProcessingTracker { } private final Map updateLocs(String myName) { - lock(myName); - try { - readTimer.restart(); - for ( ProcessingLoc p : readNewLocs() ) - processingLocs.put(p.getLocation(), p); - readTimer.stop(); - nReads++; - return processingLocs; - } finally { - unlock(myName); - } + return new WithLock>(myName) { + public Map doBody() { + readTimer.restart(); + for ( ProcessingLoc p : readNewLocs() ) + processingLocs.put(p.getLocation(), p); + readTimer.stop(); + nReads++; + return processingLocs; + } + }.run(); } protected final void registerNewLocsWithTimers(Collection plocs, String myName) { @@ -306,18 +279,11 @@ public abstract class GenomeLocProcessingTracker { } private final void unlock(String id) { + if ( lock.getHoldCount() == 1 ) printStatus(id, lockWaitTimer.currentTime(), RELEASING_LOCK); lock.unlock(); if ( ! lock.ownsLock() ) printStatus(id, lockWaitTimer.currentTime(), RUNNING); } - protected final static ProcessingLoc findOwnerInCollection(GenomeLoc loc, Collection locs) { - for ( ProcessingLoc l : locs ) { - if ( l.getLocation().equals(loc) ) - return l; - } - - return null; - } protected final static ProcessingLoc findOwnerInMap(GenomeLoc loc, Map locs) { return locs.get(loc); @@ -331,6 +297,33 @@ public abstract class GenomeLocProcessingTracker { public final double getTimePerRead() { return readTimer.getElapsedTime() / Math.max(nReads,1); } public final double getTimePerWrite() { return writeTimer.getElapsedTime() / Math.max(nWrites,1); } + // -------------------------------------------------------------------------------- + // + // Java-style functional form for with lock do { x }; + // + // -------------------------------------------------------------------------------- + + public abstract class WithLock { + private final String myName; + + public WithLock(String myName) { + this.myName = myName; + } + + protected abstract T doBody(); + + public T run() { + boolean locked = false; + try { + lock(myName); + locked = true; + return doBody(); + } finally { + if (locked) unlock(myName); + } + } + } + // -------------------------------------------------------------------------------- // // Code to override to change the dynamics of the the GenomeLocProcessingTracker diff --git a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java index 45b8593a7..3a1246862 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java @@ -14,7 +14,7 @@ import java.util.List; * etc. ReadShards can differ in their contents but have the same "unmapped" genome loc */ public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - protected NoOpGenomeLocProcessingTracker() { + public NoOpGenomeLocProcessingTracker() { super(new ClosableReentrantLock(), null); // todo -- should be lighter weight } diff --git a/java/src/org/broadinstitute/sting/utils/threading/OldSharedFileThreadSafeLock.java b/java/src/org/broadinstitute/sting/utils/threading/OldSharedFileThreadSafeLock.java new file mode 100644 index 000000000..7fb0c118e --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/OldSharedFileThreadSafeLock.java @@ -0,0 +1,208 @@ +package org.broadinstitute.sting.utils.threading; + +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.*; + +/** + * User: depristo + * Date: 1/19/11 + * Time: 8:24 AM + * + * A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common + * file in the file system. It itself a shared memory reenterant lock to managed thread safety and a + * FileChannel FileLock to handle the file integrity + */ +public class OldSharedFileThreadSafeLock extends ClosableReentrantLock { + private static Logger logger = Logger.getLogger(OldSharedFileThreadSafeLock.class); + private static final boolean DEBUG = false; + + // 100 seconds of trying -> failure + private static final int DEFAULT_N_TRIES = 1000; + private static final long DEFAULT_MILLISECONDS_PER_TRY = 100; + + /** The file we are locking */ + private final File file; + + /** The file lock itself that guards the file */ + FileLock fileLock; + + /** the channel object that 'owns' the file lock, and we use to request the lock */ + FileChannel channel; + + /** + * A counter that indicates the number of 'locks' on this file. + * If locks == 2, then two unlocks are required + * before any resources are freed. + */ + int fileLockReentrantCounter = 0; + + // type of locking + private final boolean blockOnLock; + private final int nRetries; + private final long milliSecPerTry; + + /** + * Create a SharedFileThreadSafeLock object locking the file + * @param file + */ + public OldSharedFileThreadSafeLock(File file, boolean blockOnLock, int nRetries, long milliSecPerTry) { + super(); + this.file = file; + this.blockOnLock = blockOnLock; + this.nRetries = nRetries; + this.milliSecPerTry = milliSecPerTry; + } + + public OldSharedFileThreadSafeLock(File file, boolean blockOnLock) { + this(file, blockOnLock, DEFAULT_N_TRIES, DEFAULT_MILLISECONDS_PER_TRY); + } + + + private FileChannel getChannel() { + if ( DEBUG ) logger.warn(" Get channel: " + Thread.currentThread().getName() + " channel = " + channel); + if ( channel == null ) { + try { + if ( DEBUG ) logger.warn(" opening channel: " + Thread.currentThread().getName()); + this.channel = new RandomAccessFile(file, "rw").getChannel(); + if ( DEBUG ) logger.warn(" opened channel: " + Thread.currentThread().getName()); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(file, e); + } + } + + return this.channel; + } + + private void closeChannel() { + try { + if ( channel != null ) { + channel.close(); + channel = null; + } + } + catch (IOException e) { + throw new UserException("Count not close channel associated with file" + file, e); + } + } + + public void close() { + super.close(); + closeChannel(); + } + + public boolean ownsLock() { + return super.isHeldByCurrentThread() && fileLockReentrantCounter > 0; + } + + // ------------------------------------------------------------------------------------------ + // + // workhorse routines -- acquiring file locks + // + // ------------------------------------------------------------------------------------------ + + private void acquireFileLock() { + try { + // Precondition -- lock is always null while we don't have a lock + if ( fileLock != null ) + throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); + + if ( blockOnLock ) { + // + // blocking code + // + fileLock = getChannel().lock(); + } else { + // + // polling code + // + int i = 0; + for ( ; fileLock == null && i < nRetries; i++ ) { + fileLock = getChannel().tryLock(); + if ( fileLock == null ) { + try { + //logger.warn("tryLock failed on try " + i + ", waiting " + milliSecPerTry + " millseconds for retry"); + Thread.sleep(milliSecPerTry); + } catch ( InterruptedException e ) { + throw new UserException("SharedFileThreadSafeLock interrupted during wait for file lock", e); + } + } + } + if ( i > 1 ) logger.warn("tryLock required " + i + " tries before completing, waited " + i * milliSecPerTry + " millseconds"); + + if ( fileLock == null ) { + // filelock == null -> we never managed to acquire the lock! + throw new UserException("SharedFileThreadSafeLock failed to obtain the lock after " + nRetries + " attempts"); + } + } + if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName()); + } catch (ClosedChannelException e) { + throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + file, e); + } catch (FileLockInterruptionException e) { + throw new ReviewedStingException("File lock interrupted", e); + } catch (NonWritableChannelException e) { + throw new ReviewedStingException("File channel not writable", e); + } catch (OverlappingFileLockException e) { + // this only happens when multiple threads are running, and one is waiting + // for the lock above and we come here. + throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); + } catch (IOException e) { + throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); + } + } + + /** + * Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be + * unwound appropriately. Uses file channel lock *after* thread locking. + */ + @Override + public void lock() { + if ( DEBUG ) logger.warn("Attempting threadlock: " + Thread.currentThread().getName()); + + if ( super.isHeldByCurrentThread() ) { + if ( DEBUG ) logger.warn(" Already have threadlock, continuing: " + Thread.currentThread().getName()); + super.lock(); // call the lock here so we can call unlock later + fileLockReentrantCounter++; // inc. the file lock counter + return; + } else { + super.lock(); + if ( DEBUG ) logger.warn(" Have thread-lock, going for filelock: " + Thread.currentThread().getName()); + if ( fileLockReentrantCounter == 0 ) + acquireFileLock(); + fileLockReentrantCounter++; + } + } + + @Override + public void unlock() { + try { + // update for reentrant unlocking + fileLockReentrantCounter--; + if ( fileLockReentrantCounter < 0 ) throw new ReviewedStingException("BUG: file lock counter < 0"); + + if ( fileLock != null && fileLockReentrantCounter == 0 ) { + if ( ! fileLock.isValid() ) throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); + + if ( DEBUG ) logger.warn(" going to release filelock: " + Thread.currentThread().getName()); + fileLock.release(); + closeChannel(); + fileLock = null; + if ( DEBUG ) logger.warn(" released filelock: " + Thread.currentThread().getName()); + } else { + if ( DEBUG ) logger.warn(" skipping filelock release, reenterring unlock via multiple threads " + Thread.currentThread().getName()); + } + } catch ( IOException e ) { + throw new ReviewedStingException("Could not free lock on file " + file, e); + } finally { + if ( DEBUG ) logger.warn(" going to release threadlock: " + Thread.currentThread().getName()); + super.unlock(); + if ( DEBUG ) logger.warn(" released threadlock: " + Thread.currentThread().getName()); + } + } +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java new file mode 100644 index 000000000..5ff099166 --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java @@ -0,0 +1,175 @@ +package org.broadinstitute.sting.utils.threading; + +import org.apache.log4j.Logger; +import org.apache.lucene.store.*; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.*; + +/** + * User: depristo + * Date: 1/19/11 + * Time: 8:24 AM + * + * A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common + * file in the file system. It itself a shared memory reenterant lock to managed thread safety and a + * FileChannel FileLock to handle the file integrity + */ +public class SharedFileLock extends ClosableReentrantLock { // todo -- kinda gross inheritance + private static Logger logger = Logger.getLogger(SharedFileLock.class); + + private static final String VERIFY_HOST = System.getProperty("verify.host", "gsa1"); + private static final boolean VERIFY = false; + private static final int VERIFY_PORT = 5050; + + // 100 seconds of trying -> failure + protected static final int DEFAULT_N_TRIES = 1000; + protected static final long DEFAULT_MILLISECONDS_PER_TRY = 100; + + /** The file we are locking */ + private final File file; + + private final LockFactory lockFactory; + private Lock fileLock = null; + + /** + * A counter that indicates the number of 'locks' on this file. + * If locks == 2, then two unlocks are required + * before any resources are freed. + */ + int fileLockReentrantCounter = 0; + + // type of locking + private final int nRetries; + private final long milliSecPerTry; + + /** + * Create a SharedFileThreadSafeLock object locking the file + * @param file + */ + public SharedFileLock(File file, int nRetries, long milliSecPerTry, int ID) { + super(); + this.file = file; + this.nRetries = nRetries; + this.milliSecPerTry = milliSecPerTry; + + File lockDir = new File(file.getParent() == null ? "./" : file.getParent()); + try { + LockFactory factory = new SimpleFSLockFactory(lockDir); + if ( VERIFY ) { // don't forget to start up the VerifyLockServer + this.lockFactory = new VerifyingLockFactory((byte)ID, factory, VERIFY_HOST, VERIFY_PORT); + } else { + this.lockFactory = factory; + } + } catch (IOException e) { + throw new UserException.CouldNotCreateOutputFile(lockDir, "Could not create coordination file locking directory " + lockDir, e); + } + } + + public SharedFileLock(File file, int ID) { + this(file, DEFAULT_N_TRIES, DEFAULT_MILLISECONDS_PER_TRY, ID); + } + + @Override + public void close() { + if ( ownsLock() ) throw new ReviewedStingException("closing SharedFileLock while still owned: ownership count " + fileLockReentrantCounter); + } + + @Override + public int getHoldCount() { + return fileLockReentrantCounter; + } + + @Override + public boolean ownsLock() { + return fileLockReentrantCounter > 0; + } + + // ------------------------------------------------------------------------------------------ + // + // workhorse routines -- acquiring file locks + // + // ------------------------------------------------------------------------------------------ + + private boolean obtainFileLock() throws IOException { + // annoying bug work around for verifylockserver + if ( VERIFY ) + try { + return fileLock.obtain(1); + } catch ( LockObtainFailedException e ) { + return false; + } + else + return fileLock.obtain(); + } + + /** + * Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be + * unwound appropriately. Uses file channel lock *after* thread locking. + */ + @Override + public void lock() { + if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" lock() " + Thread.currentThread().getName() + ", fileLockReentrantCounter = " + fileLockReentrantCounter); + if ( fileLockReentrantCounter++ == 0 ) { + // Precondition -- lock is always null while we don't have a lock + if ( fileLock != null ) + throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); + + int i = 1; + fileLock = lockFactory.makeLock(file.getName() + ".lock"); + try { + boolean obtained = obtainFileLock(); // todo -- maybe use intrinsic lock features + for ( ; ! obtained && i < nRetries; i++ ) { + try { + //logger.warn("tryLock failed on try " + i + ", waiting " + milliSecPerTry + " millseconds for retry"); + Thread.sleep(milliSecPerTry); + } catch ( InterruptedException e ) { + throw new UserException("SharedFileThreadSafeLock interrupted during wait for file lock", e); + } + obtained = obtainFileLock(); // gross workaround for error in verify server + } + + if ( i > 1 ) logger.warn("tryLock required " + i + " tries before completing, waited " + i * milliSecPerTry + " millseconds"); + + if ( ! obtained ) { + fileLock = null; + // filelock == null -> we never managed to acquire the lock! + throw new UserException("SharedFileThreadSafeLock failed to obtain the lock after " + nRetries + " attempts"); + } + + if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" lock() " + Thread.currentThread().getName() + ", obtained = " + obtained + ", tries = " + i); + } catch (IOException e) { + fileLock = null; + throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); + } + } + } + + @Override + public void unlock() { + // update for reentrant unlocking + if ( fileLock == null ) throw new ReviewedStingException("BUG: file lock is null -- file lock was not obtained"); + if ( fileLockReentrantCounter <= 0 ) throw new ReviewedStingException("BUG: file lock counter < 0"); + + // this unlock counts as 1 unlock. If this is our last unlock, actually do something + if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", count = " + fileLockReentrantCounter); + if ( --fileLockReentrantCounter == 0 ) { + try { + if ( ! fileLock.isLocked() ) throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); + fileLock.release(); + if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", actually releasing"); + } catch ( IOException e ) { + throw new ReviewedStingException("Could not free file lock on file " + file, e); + } finally { // make sure we null out the filelock, regardless of our state + fileLock = null; + } + } else { + if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", skipping, count = " + fileLockReentrantCounter); + } + } +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java index ab8a20f02..dec69f7c2 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java @@ -16,144 +16,44 @@ import java.nio.channels.*; * Time: 8:24 AM * * A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common - * file in the file system. It itself a shared memory reenterant lock to managed thread safety and a - * FileChannel FileLock to handle the file integrity + * file in the file system. It itself a shared memory reenterant lock to managed thread safety and + * contains a SharedFileLock to handle the file integrity. */ public class SharedFileThreadSafeLock extends ClosableReentrantLock { private static Logger logger = Logger.getLogger(SharedFileThreadSafeLock.class); - private static final boolean DEBUG = false; + protected static final boolean DEBUG = false; - // 100 seconds of trying -> failure - private static final int DEFAULT_N_TRIES = 1000; - private static final long DEFAULT_MILLISECONDS_PER_TRY = 100; - - /** The file we are locking */ - private final File file; - - /** The file lock itself that guards the file */ - FileLock fileLock; - - /** the channel object that 'owns' the file lock, and we use to request the lock */ - FileChannel channel; - - /** - * A counter that indicates the number of 'locks' on this file. - * If locks == 2, then two unlocks are required - * before any resources are freed. - */ - int fileLockReentrantCounter = 0; - - // type of locking - private final boolean blockOnLock; - private final int nRetries; - private final long milliSecPerTry; + private final SharedFileLock fileLock; /** * Create a SharedFileThreadSafeLock object locking the file * @param file */ - public SharedFileThreadSafeLock(File file, boolean blockOnLock, int nRetries, long milliSecPerTry) { + public SharedFileThreadSafeLock(File file, int nRetries, long milliSecPerTry, int ID) { super(); - this.file = file; - this.blockOnLock = blockOnLock; - this.nRetries = nRetries; - this.milliSecPerTry = milliSecPerTry; + this.fileLock = new SharedFileLock(file, nRetries, milliSecPerTry, ID); } - public SharedFileThreadSafeLock(File file, boolean blockOnLock) { - this(file, blockOnLock, DEFAULT_N_TRIES, DEFAULT_MILLISECONDS_PER_TRY); - } - - - private FileChannel getChannel() { - if ( DEBUG ) logger.warn(" Get channel: " + Thread.currentThread().getName() + " channel = " + channel); - if ( channel == null ) { - try { - if ( DEBUG ) logger.warn(" opening channel: " + Thread.currentThread().getName()); - this.channel = new RandomAccessFile(file, "rw").getChannel(); - if ( DEBUG ) logger.warn(" opened channel: " + Thread.currentThread().getName()); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(file, e); - } - } - - return this.channel; - } - - private void closeChannel() { - try { - if ( channel != null ) { - channel.close(); - channel = null; - } - } - catch (IOException e) { - throw new UserException("Count not close channel associated with file" + file, e); - } + public SharedFileThreadSafeLock(File file, int ID) { + this(file, SharedFileLock.DEFAULT_N_TRIES, SharedFileLock.DEFAULT_MILLISECONDS_PER_TRY, ID); } + @Override public void close() { - closeChannel(); + super.close(); + fileLock.close(); } + @Override + public int getHoldCount() { + if ( super.getHoldCount() != fileLock.getHoldCount() ) + throw new ReviewedStingException("BUG: unequal hold counts. threadlock = " + super.getHoldCount() + ", filelock = " + fileLock.getHoldCount()); + return super.getHoldCount(); + } + + @Override public boolean ownsLock() { - return super.isHeldByCurrentThread() && fileLockReentrantCounter > 0; - } - - // ------------------------------------------------------------------------------------------ - // - // workhorse routines -- acquiring file locks - // - // ------------------------------------------------------------------------------------------ - - private void acquireFileLock() { - try { - // Precondition -- lock is always null while we don't have a lock - if ( fileLock != null ) - throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); - - if ( blockOnLock ) { - // - // blocking code - // - fileLock = getChannel().lock(); - } else { - // - // polling code - // - int i = 0; - for ( ; fileLock == null && i < nRetries; i++ ) { - fileLock = getChannel().tryLock(); - if ( fileLock == null ) { - try { - //logger.warn("tryLock failed on try " + i + ", waiting " + milliSecPerTry + " millseconds for retry"); - Thread.sleep(milliSecPerTry); - } catch ( InterruptedException e ) { - throw new UserException("SharedFileThreadSafeLock interrupted during wait for file lock", e); - } - } - } - if ( i > 1 ) logger.warn("tryLock required " + i + " tries before completing, waited " + i * milliSecPerTry + " millseconds"); - - if ( fileLock == null ) { - // filelock == null -> we never managed to acquire the lock! - throw new UserException("SharedFileThreadSafeLock failed to obtain the lock after " + nRetries + " attempts"); - } - } - if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName()); - } catch (ClosedChannelException e) { - throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + file, e); - } catch (FileLockInterruptionException e) { - throw new ReviewedStingException("File lock interrupted", e); - } catch (NonWritableChannelException e) { - throw new ReviewedStingException("File channel not writable", e); - } catch (OverlappingFileLockException e) { - // this only happens when multiple threads are running, and one is waiting - // for the lock above and we come here. - throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); - } catch (IOException e) { - throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); - } + return super.isHeldByCurrentThread() && fileLock.ownsLock(); } /** @@ -162,46 +62,19 @@ public class SharedFileThreadSafeLock extends ClosableReentrantLock { */ @Override public void lock() { - if ( DEBUG ) logger.warn("Attempting threadlock: " + Thread.currentThread().getName()); - - if ( super.isHeldByCurrentThread() ) { - if ( DEBUG ) logger.warn(" Already have threadlock, continuing: " + Thread.currentThread().getName()); - super.lock(); // call the lock here so we can call unlock later - fileLockReentrantCounter++; // inc. the file lock counter - return; - } else { - super.lock(); - if ( DEBUG ) logger.warn(" Have thread-lock, going for filelock: " + Thread.currentThread().getName()); - if ( fileLockReentrantCounter == 0 ) - acquireFileLock(); - fileLockReentrantCounter++; - } + if ( DEBUG ) logger.warn("Attempting SharedFileThreadSafe lock: " + Thread.currentThread().getName()); + if ( DEBUG ) logger.warn(" going for thread lock: " + Thread.currentThread().getName()); + super.lock(); + if ( DEBUG ) logger.warn(" going for file lock: " + Thread.currentThread().getName()); + fileLock.lock(); // todo -- should this be in a try? } @Override public void unlock() { - try { - // update for reentrant unlocking - fileLockReentrantCounter--; - if ( fileLockReentrantCounter < 0 ) throw new ReviewedStingException("BUG: file lock counter < 0"); - - if ( fileLock != null && fileLockReentrantCounter == 0 ) { - if ( ! fileLock.isValid() ) throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); - - if ( DEBUG ) logger.warn(" going to release filelock: " + Thread.currentThread().getName()); - fileLock.release(); - closeChannel(); - fileLock = null; - if ( DEBUG ) logger.warn(" released filelock: " + Thread.currentThread().getName()); - } else { - if ( DEBUG ) logger.warn(" skipping filelock release, reenterring unlock via multiple threads " + Thread.currentThread().getName()); - } - } catch ( IOException e ) { - throw new ReviewedStingException("Could not free lock on file " + file, e); - } finally { - if ( DEBUG ) logger.warn(" going to release threadlock: " + Thread.currentThread().getName()); - super.unlock(); - if ( DEBUG ) logger.warn(" released threadlock: " + Thread.currentThread().getName()); - } + if ( DEBUG ) logger.warn(" releasing filelock: " + Thread.currentThread().getName()); + fileLock.unlock(); + if ( DEBUG ) logger.warn(" releasing threadlock: " + Thread.currentThread().getName()); + super.unlock(); + if ( DEBUG ) logger.warn(" unlock() complete: " + Thread.currentThread().getName()); } } diff --git a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java index 2512dafe0..caeabc125 100644 --- a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java @@ -53,21 +53,32 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { @AfterMethod public void afterMethod(Object[] data) { - if ( data.length > 0 ) + if ( data.length > 0 ) { ((TestTarget)data[0]).getTracker().close(); + ((TestTarget)data[0]).cleanup(); + } } abstract private class TestTarget { String name; int nShards; int shardSize; + File file; - public void init() {} + public void init() { cleanup(); } - protected TestTarget(String name, int nShards, int shardSize) { + public void cleanup() { + if ( file != null && file.exists() ) + file.delete(); + } + + public boolean isThreadSafe() { return true; } + + protected TestTarget(String name, int nShards, int shardSize, File file) { this.name = name; this.nShards = nShards; this.shardSize = shardSize; + this.file = file; } public abstract GenomeLocProcessingTracker getTracker(); @@ -96,35 +107,35 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { List params = new ArrayList(); int counter = 0; + String name = null; for ( int nShard : nShards ) { for ( int shardSize : shardSizes ) { // shared mem -- canonical implementation - params.add(new TestTarget("ThreadSafeSharedMemory", nShard, shardSize) { - GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createSharedMemory(); + params.add(new TestTarget("ThreadSafeSharedMemory", nShard, shardSize, null) { + GenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock()); public GenomeLocProcessingTracker getTracker() { return tracker; } }); final File file1 = new File(String.format("%s_ThreadSafeFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize)); - params.add(new TestTarget("ThreadSafeFileBacked", nShard, shardSize) { - GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedThreaded(file1, genomeLocParser, null); + params.add(new TestTarget("ThreadSafeFileBacked", nShard, shardSize, file1) { + GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file1, genomeLocParser, new ClosableReentrantLock(), null); public GenomeLocProcessingTracker getTracker() { return tracker; } - public void init() { - if ( file1.exists() ) - file1.delete(); - } }); - for ( final boolean blocking : Arrays.asList(true, false) ) { - final File file2 = new File(String.format("%s_ThreadSafeFileLockingFile_blocking%b_%d_%d", FILE_ROOT, blocking, counter++, nShard, shardSize)); - params.add(new TestTarget("ThreadSafeFileLockingFileBackedBlocking" + blocking, nShard, shardSize) { - GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedDistributed(file2, genomeLocParser, blocking, null); - public GenomeLocProcessingTracker getTracker() { return tracker; } - public void init() { - if ( file2.exists() ) - file2.delete(); - } - }); - } + name = "FileBackedSharedFileThreadSafe"; + final File file2 = new File(String.format("%s_%s_%d_%d", FILE_ROOT, name, counter++, nShard, shardSize)); + params.add(new TestTarget(name, nShard, shardSize, file2) { + GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file2, genomeLocParser, new SharedFileThreadSafeLock(file2, -1), null); + public GenomeLocProcessingTracker getTracker() { return tracker; } + }); + + name = "FileBackedSharedFile"; + final File file3 = new File(String.format("%s_%s_%d_%d", FILE_ROOT, name, counter++, nShard, shardSize)); + params.add(new TestTarget(name, nShard, shardSize, file3) { + GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file3, genomeLocParser, new SharedFileLock(file3, -1), null); + public GenomeLocProcessingTracker getTracker() { return tracker; } + public boolean isThreadSafe() { return false; } + }); } } @@ -143,7 +154,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { @Test(enabled = true) public void testNoop() { - GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createNoOp(); + GenomeLocProcessingTracker tracker = new NoOpGenomeLocProcessingTracker(); for ( int start = 1; start < 100; start++ ) { for ( int n = 0; n < 2; n++ ) { GenomeLoc loc = genomeLocParser.createGenomeLoc(chr1, start, start +1); @@ -336,6 +347,10 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { } private void testThreading(TestTarget test, boolean useIterator) { + if ( ! test.isThreadSafe() ) + // skip tests that aren't thread safe + return; + // start up 3 threads logger.warn("ThreadedTesting " + test + " using iterator " + useIterator); List threads = new ArrayList();