diff --git a/analysis/depristo/distributedGATK/commands.R b/analysis/depristo/distributedGATK/commands.R index ad5759d62..c3d97277e 100644 --- a/analysis/depristo/distributedGATK/commands.R +++ b/analysis/depristo/distributedGATK/commands.R @@ -1,25 +1,27 @@ -d <- read.table("results.new.dat", header=T) -require("lattice") - plot1 <- function(d, name) { d = subset(d, dataset == name) -subd = data.frame(parallel.type=d$parallel.type, nWaysParallel=d$nWaysParallel, end.to.end.time=d$end.to.end.time,per.1M.sites = d$per.1M.sites, job.run.time = d$job.run.time) + subd = data.frame(parallel.type=d$parallel.type, nWaysParallel=d$nWaysParallel, end.to.end.time=d$end.to.end.time,per.1M.sites = d$per.1M.sites, job.run.time = d$job.run.time) -nways = unique(subd$nWaysParallel) -m = max(subd$end.to.end.time) -nNW = subset(subd, end.to.end.time == m)$nWaysParallel[1] -timeAt1 = m * nNW -my.runtime = subset(subd, end.to.end.time == m)$job.run.time[1] * nNW -my.pms = subset(subd, end.to.end.time == m)$per.1M.sites[1] + nways = unique(subd$nWaysParallel) + m = max(subset(subd, nWaysParallel == min(nways))$end.to.end.time) + nNW = subset(subd, end.to.end.time == m)$nWaysParallel[1] + timeAt1 = m * nNW + my.runtime = subset(subd, end.to.end.time == m)$job.run.time[1] * nNW + my.pms = subset(subd, end.to.end.time == m)$per.1M.sites[1] -theo = data.frame(parallel.type="theoretic", end.to.end.time=timeAt1/nways, nWaysParallel=nways, per.1M.sites = my.pms, job.run.time = my.runtime / nways) + theo = data.frame(parallel.type="theoretic", end.to.end.time=timeAt1/nways, nWaysParallel=nways, per.1M.sites = my.pms, job.run.time = my.runtime / nways) -subd = rbind(subd, theo) + subd = rbind(subd, theo) -print(summary(subd)) + print(summary(subd)) -print(xyplot(log10(end.to.end.time) + per.1M.sites + log10(job.run.time) ~ log2(nWaysParallel), data=subd[order(subd$nWaysParallel),], group=parallel.type, type="b", outer=T, scale=list(relation="free"), auto.key=T, lwd=c(2,2,1), main=name)) + print(xyplot(log10(end.to.end.time) + per.1M.sites + log10(job.run.time) ~ log2(nWaysParallel), data=subd[order(subd$nWaysParallel),], group=parallel.type, type="b", outer=T, scale=list(relation="free"), auto.key=T, lwd=c(2,2,1), main=name)) + + return(subd) } -plot1(d, "NA12878Trio.WEx") -plot1(d, "NA12878.HiSeq") +myData <- read.table("results.new.dat", header=T) +require("lattice") + +for (name in unique(d$dataset)) + plot1(myData, name) diff --git a/analysis/depristo/distributedGATK/distributedGATKPerformance.scala b/analysis/depristo/distributedGATK/distributedGATKPerformance.scala index 244898b47..17a95b4bb 100755 --- a/analysis/depristo/distributedGATK/distributedGATKPerformance.scala +++ b/analysis/depristo/distributedGATK/distributedGATKPerformance.scala @@ -28,6 +28,15 @@ class DistributedGATKPerformance extends QScript { @Argument(shortName="limitTo30Min", doc="runs long calculations", required=false) var limitTo30Min: Boolean = false + @Argument(shortName="huge", doc="runs long calculations", required=false) + var huge: Int = -1 + + @Argument(shortName="justDist", doc="runs long calculations", required=false) + var justDist: Boolean = false + + @Argument(shortName="justSG", doc="runs long calculations", required=false) + var justSG: Boolean = false + @Argument(shortName="trackerDir", doc="root directory for distributed tracker files", required=false) var trackerDir: String = "" // "/humgen/gsa-scr1/depristo/tmp/" @@ -88,7 +97,7 @@ class DistributedGATKPerformance extends QScript { "FIN" -> new Target("FIN", b37, dbSNP_b37, hapmap_b37, indelMask_b37, new File("/humgen/1kg/processing/pipeline_test_bams/FIN.79sample.Nov2010.chr20.bam"), new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** - "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass, true), + "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/distributedGATK/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass, true), "WEx" -> new Target("NA12878.WEx", hg18, dbSNP_hg18, hapmap_hg18, "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/1000GenomesProcessingPaper/wgs.v13/GA2.WEx.cleaned.indels.10.mask", new File("/humgen/gsa-hpprojects/NA12878Collection/bams/NA12878.WEx.cleaned.recal.bam"), @@ -102,14 +111,14 @@ class DistributedGATKPerformance extends QScript { new File("/humgen/1kg/analysis/bamsForDataProcessingPapers/lowpass_b36/lowpass.chr20.cleaned.matefixed.bam"), // the bam list to call from new File("/home/radon01/depristo/work/oneOffProjects/VQSRCutByNRS/lowpass.N60.chr20.filtered.vcf"), // the gold standard VCF file to run through the VQSR "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.b36.intervals", 2.3, lowPass,true), // chunked interval list to use with Queue's scatter/gather functionality -// "LowPassAugust" -> new Target("ALL.august.v4", b37, dbSNP_b37, hapmap_b37, indelMask_b37, // BUGBUG: kill this, it is too large -// new File("/humgen/1kg/processing/allPopulations_chr20_august_release.cleaned.merged.bams/ALL.cleaned.merged.list"), -// new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), -// "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass, false), + "LowPassAugust" -> new Target("ALL.august.v4", b37, dbSNP_b37, hapmap_b37, indelMask_b37, // BUGBUG: kill this, it is too large + new File("/humgen/1kg/processing/allPopulations_chr20_august_release.cleaned.merged.bams/ALL.cleaned.merged.list"), + new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), + "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass, true), "LowPassEUR363Nov" -> new Target("EUR.nov2010", b37, dbSNP_b37, hapmap_b37, indelMask_b37, new File("/humgen/1kg/processing/pipeline_test_bams/EUR.363sample.Nov2010.chr20.bam"), new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** - "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass,false), + "/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/distributedGATK/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass,false), "WExTrio" -> new Target("NA12878Trio.WEx", b37, dbSNP_b37, hapmap_b37, indelMask_b37, new File("/humgen/gsa-hpprojects/NA12878Collection/bams/CEUTrio.HiSeq.WEx.bwa.cleaned.recal.bams.list"), new File("/humgen/gsa-scr1/delangel/NewUG/calls/AugustRelease.filtered_Q50_QD5.0_SB0.0.allSamples.SNPs_hg19.WEx_UG_newUG_MQC.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** @@ -119,6 +128,8 @@ class DistributedGATKPerformance extends QScript { def getTargetInterval(target: Target): List[String] = target.name match { case "NA12878.HiSeq" => List("chr1") case "FIN" => List("20") + case "ALL.august.v4" => List("20") + case "EUR.nov2010" => List("20") case _ => List(target.intervals) } @@ -133,11 +144,11 @@ class DistributedGATKPerformance extends QScript { for (targetDS <- targetDataSets.valuesIterator) // for Scala 2.7 or older, use targetDataSets.values targets ::= targetDS - val nWays = if ( test ) List(32) else { if ( long ) List(1,2,4,8) else List(16,32,64,128) } + val nWays = if ( test ) List(32) else { if ( long ) List(1,2,4,8) else if ( huge != -1 ) List(huge) else List(16,32,64,128) } //val nWays = List(2) for (target <- targets) { - for ( scatterP <- if ( test ) List(false) else List(true, false) ) + for ( scatterP <- if ( test ) List(false) else if ( justSG ) List(true) else if ( justDist ) List(false) else List(true, false) ) for (nWaysParallel <- nWays ) { val aname = "ptype_%s.nways_%d".format(if ( scatterP ) "sg" else "dist", nWaysParallel) diff --git a/analysis/depristo/distributedGATK/fileBackedGLPperformance.R b/analysis/depristo/distributedGATK/fileBackedGLPperformance.R new file mode 100644 index 000000000..cf50b1df1 --- /dev/null +++ b/analysis/depristo/distributedGATK/fileBackedGLPperformance.R @@ -0,0 +1,3 @@ +#d <- read.table("../GATK/trunk/timer.dat", header=T) +require("lattice") +print(xyplot(elapsed.time + delta ~ cycle | name, data=d, scales=list(relation="free"), auto.key=T, type="b", outer=T)) \ No newline at end of file diff --git a/analysis/depristo/distributedGATK/makeChunks.csh b/analysis/depristo/distributedGATK/makeChunks.csh new file mode 100644 index 000000000..1493ce0d9 --- /dev/null +++ b/analysis/depristo/distributedGATK/makeChunks.csh @@ -0,0 +1 @@ +echo "63025520" | awk '{ for(i = 0; i < $1; i += 100000) {print "20:" i+1 "-" (i+100000 < $1 ? i+100000 : $1)}}' > whole_genome_chunked.chr20.hg19.intervals diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 2ab248924..b1e2c9082 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -9,6 +9,7 @@ import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrde import org.broadinstitute.sting.gatk.io.*; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import javax.management.MBeanServer; @@ -89,6 +90,10 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar super(engine, walker, reads, reference, rods); this.threadPool = Executors.newFixedThreadPool(nThreadsToUse); + + if (engine.getArguments().processingTrackerFile != null) { + throw new UserException.BadArgumentValue("-C", "Distributed GATK calculations currently not supported in multi-threaded mode. Complain to Mark depristo@broadinstitute.org to implement and test this code path"); + } } public Object execute( Walker walker, ShardStrategy shardStrategy ) { diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index b81cc7c7e..1e21b6542 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -72,13 +72,14 @@ public class LinearMicroScheduler extends MicroScheduler { dataProvider.close(); } - - counter++; - logger.debug(String.format("At %s: processed %d shards. %.2e s / lock (n=%d), %.2e s / read (n=%d), %.2e s / write (n=%d)", - shard.getLocation(), counter, - processingTracker.getTimePerLock(), processingTracker.getNLocks(), - processingTracker.getTimePerRead(), processingTracker.getNReads(), - processingTracker.getTimePerWrite(), processingTracker.getNWrites())); + if ( logger.isDebugEnabled() ) { + counter++; + logger.debug(String.format("At %s: processed %d shards. %.2e s / lock (n=%d), %.2e s / read (n=%d), %.2e s / write (n=%d)", + shard.getLocation(), counter, + processingTracker.getTimePerLock(), processingTracker.getNLocks(), + processingTracker.getTimePerRead(), processingTracker.getNReads(), + processingTracker.getTimePerWrite(), processingTracker.getNWrites())); + } } Object result = accumulator.finishTraversal(); diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index f372dbcc2..418220b9e 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -163,7 +163,9 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { throw new ReviewedStingException("Unable to register microscheduler with JMX", ex); } + // // create the processing tracker + // if ( engine.getArguments().processingTrackerFile != null ) { if ( engine.getArguments().restartProcessingTracker && engine.getArguments().processingTrackerFile.exists() ) { engine.getArguments().processingTrackerFile.delete(); @@ -183,6 +185,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { processingTracker = new FileBackedGenomeLocProcessingTracker(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser(), lock, statusStream) ; logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile + " process.id = " + engine.getName() + " CID = " + engine.getArguments().processTrackerID); } else { + // create a NoOp version that doesn't do anything but say "yes" processingTracker = new NoOpGenomeLocProcessingTracker(); } } diff --git a/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java b/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java index 365760c6d..d16c19130 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java +++ b/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java @@ -8,7 +8,7 @@ import java.util.concurrent.locks.ReentrantLock; * Date: 1/19/11 * Time: 9:50 AM * - * Simple extension of a ReentrantLock that supports a close method + * Simple extension of a ReentrantLock that supports a close method. */ public class ClosableReentrantLock extends ReentrantLock { public boolean ownsLock() { return super.isHeldByCurrentThread(); } diff --git a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java index 93e07b3d7..cae099eeb 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java @@ -15,7 +15,7 @@ import java.util.List; import java.util.concurrent.locks.ReentrantLock; /** - * Keeps a copy of the processing locks in a file, in addition to tracking in memory via the base class + * Keeps a copy of the processing locks in a file */ public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTracker { private static final Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); @@ -50,10 +50,6 @@ public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTra } } -// protected void close() { -// super.close(); -// } - @Override protected List readNewLocs() { List newPLocs = new ArrayList(); // todo -- gratitous object creation diff --git a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java index 56ed49740..942886326 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java @@ -1,7 +1,6 @@ package org.broadinstitute.sting.utils.threading; import net.sf.picard.reference.IndexedFastaSequenceFile; -import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; @@ -17,29 +16,78 @@ import java.text.SimpleDateFormat; import java.util.*; /** + * Abstract base class to coordinating data processing by a collecting for processes / threads. * + * Conceptually, the genome is viewed as a collection of non-overlapping genome location: + * + * chr1:1-10 + * chr1:11-20 + * chr1:21-30 + * etc. + * + * This class, and it's concrete derived classes, provide the ability to claim individual locations + * as "mine", and exclude other processes / threads from processing them. At the lowest-level this + * is implemented by the claimOwnership(loc, name) function, that returns true if loc free (unclaimed) + * and makes name the owner of loc. High-level, and more efficient operations provide claiming + * iterators over streams of objects implementing the HasGenomeLocation interface, so that you can + * write code that looks like: + * + * for ( GenomeLoc ownedLoc : onlyOwned(allLocsToProcess.iterator) ) { + * doSomeWork(ownedLoc) + * + * Much of the code in this class is actually surrounding debugging and performance metrics code. + * The actual synchronization code is separated out into the ClosableReentrantLock() system + * and the two abstract functions: + * + * protected abstract void registerNewLocs(Collection plocs); + * protected abstract Collection readNewLocs(); + * + * That maintain the state of the tracker. + * + * That is, the ProcessingTracker is made of two components: a thread / process locking system and + * a subclass that implements the methods to record new claimed state changes and to read out updates + * that may have occurred by another thread or process. + * + * NOTE: this class assumes that all threads / processes are working with the same set of potential + * GenomeLocs to own. Claiming chr1:1-10 and then chr1:5-6 is allowed by the system. Basically, + * you only can stake claim to GenomeLocs that are .equal(). */ public abstract class GenomeLocProcessingTracker { private final static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); private final static SimpleDateFormat STATUS_FORMAT = new SimpleDateFormat("HH:mm:ss,SSS"); private final static int DEFAULT_OWNERSHIP_ITERATOR_SIZE = 20; + /** + * Useful state strings for printing status + */ private final static String GOING_FOR_LOCK = "going_for_lock"; private final static String RELEASING_LOCK = "releasing_lock"; private final static String HAVE_LOCK = "have_lock"; private final static String RUNNING = "running"; + /** + * A map, for efficiency, that allows quick lookup of the processing loc for a + * given GenomeLoc. The map points from loc -> loc / owner as a ProcessingLoc + */ private final Map processingLocs; + + /** + * The locking object used to protect data from simulatanous access by multiple + * threads or processes. + */ private final ClosableReentrantLock lock; + + /** A stream for writing status messages. Can be null if we aren't writing status */ private final PrintStream status; - protected SimpleTimer writeTimer = new SimpleTimer("writeTimer"); - protected SimpleTimer readTimer = new SimpleTimer("readTimer"); - protected SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer"); + // + // Timers for recording performance information + // + protected final SimpleTimer writeTimer = new SimpleTimer("writeTimer"); + protected final SimpleTimer readTimer = new SimpleTimer("readTimer"); + protected final SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer"); protected long nLocks = 0, nWrites = 0, nReads = 0; - // TODO -- LOCK / UNLOCK OPERATIONS NEEDS TO HAVE MORE INTELLIGENT TRY / CATCH - // -------------------------------------------------------------------------------- // // Creating ProcessingTrackers @@ -52,6 +100,45 @@ public abstract class GenomeLocProcessingTracker { printStatusHeader(); } + // -------------------------------------------------------------------------------- + // + // Code to override to change the dynamics of the the GenomeLocProcessingTracker + // + // -------------------------------------------------------------------------------- + + protected void close() { + lock.close(); + if ( status != null ) status.close(); + } + + /** + * Takes a collection of newly claimed (i.e., previous unclaimed) genome locs + * and the name of their owner and "registers" this data in some persistent way that's + * visible to all threads / processes communicating via this GenomeLocProcessingTracker. + * + * Could be a in-memory data structure (a list) if we are restricting ourselves to intra-memory + * parallelism, a locked file on a shared file system, or a server we communicate with. + * + * @param plocs + */ + protected abstract void registerNewLocs(Collection plocs); + + /** + * The inverse of the registerNewLocs() function. Looks at the persistent data store + * shared by all threads / processes and returns the ones that have appeared since the last + * call to readNewLocs(). Note that we expect the pair of registerNewLocs and readNewLocs to + * include everything, even locs registered by this thread / process. For example: + * + * readNewLocs() => List() + * registerNewLocs(List(x, y,)) => void + * readNewLocs() => List(x,y)) + * + * even for this thread or process. + * @return + */ + protected abstract Collection readNewLocs(); + + // -------------------------------------------------------------------------------- // // Code to claim intervals for processing and query for their ownership @@ -69,14 +156,6 @@ public abstract class GenomeLocProcessingTracker { return findOwner(loc, id) != null; } - protected final ProcessingLoc findOwner(GenomeLoc loc, String id) { - // fast path to check if we already have the existing genome loc in memory for ownership claims - // getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it - // unless necessary - ProcessingLoc x = findOwnerInMap(loc, processingLocs); - return x == null ? findOwnerInMap(loc, updateLocs(id)) : x; - } - /** * The workhorse routine. Attempt to claim processing ownership of loc, with my name. * This is an atomic operation -- other threads / processes will wait until this function @@ -171,16 +250,15 @@ public abstract class GenomeLocProcessingTracker { return new WithLock(myName) { public T doBody() { // read once the database of owners at the start - updateLocs(myName); + updateAndGetProcessingLocs(myName); boolean done = false; Queue pwns = new LinkedList(); // ;-) while ( !done && cache.size() < cacheSize && subit.hasNext() ) { final T elt = subit.next(); - //logger.warn("Checking elt for ownership " + elt); GenomeLoc loc = elt.getLocation(); - ProcessingLoc owner = findOwnerInMap(loc, processingLocs); + ProcessingLoc owner = processingLocs.get(loc); if ( owner == null ) { // we are unowned owner = new ProcessingLoc(loc, myName); @@ -195,8 +273,6 @@ public abstract class GenomeLocProcessingTracker { // we've either filled up the cache or run out of elements. Either way we return // the first element of the cache. If the cache is empty, we return null here. - //logger.warn("Cache size is " + cache.size()); - //logger.warn("Cache contains " + cache); return cache.poll(); } }.run(); @@ -212,20 +288,34 @@ public abstract class GenomeLocProcessingTracker { } } + // -------------------------------------------------------------------------------- + // + // private / protected low-level accessors / manipulators and utility functions + // + // -------------------------------------------------------------------------------- + /** - * Returns the list of currently owned locations, updating the database as necessary. - * DO NOT MODIFY THIS LIST! As with all parallelizing data structures, the list may be - * out of date immediately after the call returns, or may be updating on the fly. - * - * This is really useful for printing, counting, etc. operations that aren't mission critical - * + * Useful debugging function that returns the ProcessingLoc who owns loc. ID + * is provided for debugging purposes + * @param loc + * @param id * @return */ - protected final Collection getProcessingLocs(String myName) { - return updateLocs(myName).values(); + protected final ProcessingLoc findOwner(GenomeLoc loc, String id) { + // fast path to check if we already have the existing genome loc in memory for ownership claims + // getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it + // unless necessary + ProcessingLoc x = processingLocs.get(loc); + return x == null ? updateAndGetProcessingLocs(id).get(loc) : x; } - private final Map updateLocs(String myName) { + /** + * Returns the list of currently owned locations, updating the database as necessary. + * DO NOT MODIFY THIS MAP! As with all parallelizing data structures, the list may be + * out of date immediately after the call returns, or may be updating on the fly. + * @return + */ + protected final Map updateAndGetProcessingLocs(String myName) { return new WithLock>(myName) { public Map doBody() { readTimer.restart(); @@ -238,6 +328,12 @@ public abstract class GenomeLocProcessingTracker { }.run(); } + /** + * Wrapper around registerNewLocs that also times the operation + * + * @param plocs + * @param myName + */ protected final void registerNewLocsWithTimers(Collection plocs, String myName) { writeTimer.restart(); registerNewLocs(plocs); @@ -245,27 +341,24 @@ public abstract class GenomeLocProcessingTracker { writeTimer.stop(); } - // -------------------------------------------------------------------------------- - // - // Low-level accessors / manipulators and utility functions - // - // -------------------------------------------------------------------------------- - private final boolean hasStatus() { - return status != null; - } - private final void printStatusHeader() { - if ( hasStatus() ) status.printf("process.id\thr.time\ttime\tstate%n"); + if ( status != null ) status.printf("process.id\thr.time\ttime\tstate%n"); } private final void printStatus(String id, long machineTime, String state) { // prints a line like processID human-readable-time machine-time state - if ( hasStatus() ) { + if ( status != null ) { status.printf("%s\t%s\t%d\t%s%n", id, STATUS_FORMAT.format(machineTime), machineTime, state); status.flush(); } } + + /** + * Lock the data structure, preventing other threads / processes from reading and writing to the + * common store + * @param id the name of the process doing the locking + */ private final void lock(String id) { lockWaitTimer.restart(); boolean hadLock = lock.ownsLock(); @@ -278,17 +371,16 @@ public abstract class GenomeLocProcessingTracker { if ( ! hadLock ) printStatus(id, lockWaitTimer.currentTime(), HAVE_LOCK); } + /** + * Unlock the data structure, allowing other threads / processes to read and write to the common store + * @param id the name of the process doing the unlocking + */ private final void unlock(String id) { if ( lock.getHoldCount() == 1 ) printStatus(id, lockWaitTimer.currentTime(), RELEASING_LOCK); lock.unlock(); if ( ! lock.ownsLock() ) printStatus(id, lockWaitTimer.currentTime(), RUNNING); } - - protected final static ProcessingLoc findOwnerInMap(GenomeLoc loc, Map locs) { - return locs.get(loc); - } - // useful code for getting public final long getNLocks() { return nLocks; } public final long getNReads() { return nReads; } @@ -303,7 +395,18 @@ public abstract class GenomeLocProcessingTracker { // // -------------------------------------------------------------------------------- - public abstract class WithLock { + /** + * Private utility class that executes doBody() method with the lock() acquired and + * handles property unlock()ing the system, even if an error occurs. Allows one to write + * clean code like: + * + * new WithLock(name) { + * public Integer doBody() { doSomething(); return 1; } + * }.run() + * + * @param the return type of the doBody() method + */ + private abstract class WithLock { private final String myName; public WithLock(String myName) { @@ -324,21 +427,6 @@ public abstract class GenomeLocProcessingTracker { } } - // -------------------------------------------------------------------------------- - // - // Code to override to change the dynamics of the the GenomeLocProcessingTracker - // - // -------------------------------------------------------------------------------- - - protected void close() { - lock.close(); - if ( hasStatus() ) status.close(); - //logger.warn("Locking events: " + nLocks); - } - - protected abstract void registerNewLocs(Collection plocs); - protected abstract Collection readNewLocs(); - // -------------------------------------------------------------------------------- // // main function for testing performance diff --git a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java index 3a1246862..4e61ef9e1 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java @@ -15,19 +15,9 @@ import java.util.List; */ public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker { public NoOpGenomeLocProcessingTracker() { - super(new ClosableReentrantLock(), null); // todo -- should be lighter weight + super(new ClosableReentrantLock(), null); } -// @Override -// public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { -// return new ProcessingLoc(loc, myName); -// } - -// @Override -// protected List getProcessingLocs() { -// return Collections.emptyList(); -// } - @Override protected void registerNewLocs(Collection loc) { ; diff --git a/java/src/org/broadinstitute/sting/utils/threading/OldSharedFileThreadSafeLock.java b/java/src/org/broadinstitute/sting/utils/threading/OldSharedFileThreadSafeLock.java deleted file mode 100644 index 7fb0c118e..000000000 --- a/java/src/org/broadinstitute/sting/utils/threading/OldSharedFileThreadSafeLock.java +++ /dev/null @@ -1,208 +0,0 @@ -package org.broadinstitute.sting.utils.threading; - -import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.*; - -/** - * User: depristo - * Date: 1/19/11 - * Time: 8:24 AM - * - * A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common - * file in the file system. It itself a shared memory reenterant lock to managed thread safety and a - * FileChannel FileLock to handle the file integrity - */ -public class OldSharedFileThreadSafeLock extends ClosableReentrantLock { - private static Logger logger = Logger.getLogger(OldSharedFileThreadSafeLock.class); - private static final boolean DEBUG = false; - - // 100 seconds of trying -> failure - private static final int DEFAULT_N_TRIES = 1000; - private static final long DEFAULT_MILLISECONDS_PER_TRY = 100; - - /** The file we are locking */ - private final File file; - - /** The file lock itself that guards the file */ - FileLock fileLock; - - /** the channel object that 'owns' the file lock, and we use to request the lock */ - FileChannel channel; - - /** - * A counter that indicates the number of 'locks' on this file. - * If locks == 2, then two unlocks are required - * before any resources are freed. - */ - int fileLockReentrantCounter = 0; - - // type of locking - private final boolean blockOnLock; - private final int nRetries; - private final long milliSecPerTry; - - /** - * Create a SharedFileThreadSafeLock object locking the file - * @param file - */ - public OldSharedFileThreadSafeLock(File file, boolean blockOnLock, int nRetries, long milliSecPerTry) { - super(); - this.file = file; - this.blockOnLock = blockOnLock; - this.nRetries = nRetries; - this.milliSecPerTry = milliSecPerTry; - } - - public OldSharedFileThreadSafeLock(File file, boolean blockOnLock) { - this(file, blockOnLock, DEFAULT_N_TRIES, DEFAULT_MILLISECONDS_PER_TRY); - } - - - private FileChannel getChannel() { - if ( DEBUG ) logger.warn(" Get channel: " + Thread.currentThread().getName() + " channel = " + channel); - if ( channel == null ) { - try { - if ( DEBUG ) logger.warn(" opening channel: " + Thread.currentThread().getName()); - this.channel = new RandomAccessFile(file, "rw").getChannel(); - if ( DEBUG ) logger.warn(" opened channel: " + Thread.currentThread().getName()); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(file, e); - } - } - - return this.channel; - } - - private void closeChannel() { - try { - if ( channel != null ) { - channel.close(); - channel = null; - } - } - catch (IOException e) { - throw new UserException("Count not close channel associated with file" + file, e); - } - } - - public void close() { - super.close(); - closeChannel(); - } - - public boolean ownsLock() { - return super.isHeldByCurrentThread() && fileLockReentrantCounter > 0; - } - - // ------------------------------------------------------------------------------------------ - // - // workhorse routines -- acquiring file locks - // - // ------------------------------------------------------------------------------------------ - - private void acquireFileLock() { - try { - // Precondition -- lock is always null while we don't have a lock - if ( fileLock != null ) - throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); - - if ( blockOnLock ) { - // - // blocking code - // - fileLock = getChannel().lock(); - } else { - // - // polling code - // - int i = 0; - for ( ; fileLock == null && i < nRetries; i++ ) { - fileLock = getChannel().tryLock(); - if ( fileLock == null ) { - try { - //logger.warn("tryLock failed on try " + i + ", waiting " + milliSecPerTry + " millseconds for retry"); - Thread.sleep(milliSecPerTry); - } catch ( InterruptedException e ) { - throw new UserException("SharedFileThreadSafeLock interrupted during wait for file lock", e); - } - } - } - if ( i > 1 ) logger.warn("tryLock required " + i + " tries before completing, waited " + i * milliSecPerTry + " millseconds"); - - if ( fileLock == null ) { - // filelock == null -> we never managed to acquire the lock! - throw new UserException("SharedFileThreadSafeLock failed to obtain the lock after " + nRetries + " attempts"); - } - } - if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName()); - } catch (ClosedChannelException e) { - throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + file, e); - } catch (FileLockInterruptionException e) { - throw new ReviewedStingException("File lock interrupted", e); - } catch (NonWritableChannelException e) { - throw new ReviewedStingException("File channel not writable", e); - } catch (OverlappingFileLockException e) { - // this only happens when multiple threads are running, and one is waiting - // for the lock above and we come here. - throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); - } catch (IOException e) { - throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); - } - } - - /** - * Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be - * unwound appropriately. Uses file channel lock *after* thread locking. - */ - @Override - public void lock() { - if ( DEBUG ) logger.warn("Attempting threadlock: " + Thread.currentThread().getName()); - - if ( super.isHeldByCurrentThread() ) { - if ( DEBUG ) logger.warn(" Already have threadlock, continuing: " + Thread.currentThread().getName()); - super.lock(); // call the lock here so we can call unlock later - fileLockReentrantCounter++; // inc. the file lock counter - return; - } else { - super.lock(); - if ( DEBUG ) logger.warn(" Have thread-lock, going for filelock: " + Thread.currentThread().getName()); - if ( fileLockReentrantCounter == 0 ) - acquireFileLock(); - fileLockReentrantCounter++; - } - } - - @Override - public void unlock() { - try { - // update for reentrant unlocking - fileLockReentrantCounter--; - if ( fileLockReentrantCounter < 0 ) throw new ReviewedStingException("BUG: file lock counter < 0"); - - if ( fileLock != null && fileLockReentrantCounter == 0 ) { - if ( ! fileLock.isValid() ) throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); - - if ( DEBUG ) logger.warn(" going to release filelock: " + Thread.currentThread().getName()); - fileLock.release(); - closeChannel(); - fileLock = null; - if ( DEBUG ) logger.warn(" released filelock: " + Thread.currentThread().getName()); - } else { - if ( DEBUG ) logger.warn(" skipping filelock release, reenterring unlock via multiple threads " + Thread.currentThread().getName()); - } - } catch ( IOException e ) { - throw new ReviewedStingException("Could not free lock on file " + file, e); - } finally { - if ( DEBUG ) logger.warn(" going to release threadlock: " + Thread.currentThread().getName()); - super.unlock(); - if ( DEBUG ) logger.warn(" released threadlock: " + Thread.currentThread().getName()); - } - } -} diff --git a/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java b/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java index d2ec1fe9a..ee2283dcf 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java +++ b/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java @@ -10,7 +10,14 @@ import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; * Date: 1/19/11 * Time: 8:06 AM * - * Information about processing locations and their owners + * Information about processing locations and their owners. Contains two basic data, associated + * together. The first is a genome loc, and the second is the name of the owner, as a string. + * + * chr1:1-10 Mark + * chr2:11-20 DePristo + * + * would be two ProcessingLocs that first indicate that the first 10 bp of chr1 are owned by Mark, + * and the second is owned by DePristo. */ public class ProcessingLoc implements HasGenomeLocation { private final GenomeLoc loc; @@ -27,7 +34,7 @@ public class ProcessingLoc implements HasGenomeLocation { } this.loc = loc; - this.owner = owner; + this.owner = owner.intern(); // reduce memory consumption by interning the string } public GenomeLoc getLocation() { @@ -38,6 +45,13 @@ public class ProcessingLoc implements HasGenomeLocation { return owner; } + /** + * Returns true iff the owner of this processing loc is name. Can be used to determine + * the owner of this processing location. + * + * @param name + * @return + */ public boolean isOwnedBy(String name) { return getOwner().equals(name); } diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java index eb708a9c7..3eb2be96b 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java @@ -16,11 +16,10 @@ import java.nio.channels.*; * Date: 1/19/11 * Time: 8:24 AM * - * A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common - * file in the file system. It itself a shared memory reenterant lock to managed thread safety and a - * FileChannel FileLock to handle the file integrity + * A reentrant lock for a shared file common file in the file system. Relies on a a Lucene SimpleFSLock + * to manage on disk file locking. */ -public class SharedFileLock extends ClosableReentrantLock { // todo -- kinda gross inheritance +public class SharedFileLock extends ClosableReentrantLock { // todo -- kinda gross inheritance. The super lock is never used private static Logger logger = Logger.getLogger(SharedFileLock.class); private static final String VERIFY_HOST = System.getProperty("verify.host", "gsa1"); diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java index cff41b708..9bf8b58b1 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java @@ -1,16 +1,13 @@ package org.broadinstitute.sting.utils.threading; -import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.GenomeLoc; - import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; /** - * Thread-safe shared memory only implementation + * Thread-safe shared memory only implementation. Uses a simple list to manage the newly + * added processing locations. */ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker { private List newPLocs = new ArrayList(); diff --git a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java index 6a3d6f56e..df4e1660d 100644 --- a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java @@ -162,7 +162,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { GenomeLoc loc = genomeLocParser.createGenomeLoc(chr1, start, start +1); ProcessingLoc ploc = tracker.claimOwnership(loc, NAME_ONE); Assert.assertTrue(ploc.isOwnedBy(NAME_ONE)); - Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), 0); + Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), 0); } } } @@ -188,8 +188,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { Assert.assertEquals(proc.getOwner(), NAME_ONE); Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc); Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE)); - Assert.assertNotNull(tracker.getProcessingLocs(NAME_ONE)); - Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), counter); + Assert.assertNotNull(tracker.updateAndGetProcessingLocs(NAME_ONE)); + Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), counter); ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO); Assert.assertFalse(badClaimAttempt.getOwner().equals(NAME_TWO)); @@ -374,7 +374,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { assertAllThreadsFinished(results); // we ran everything - Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), shards.size(), "Not all shards were run"); + Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), shards.size(), "Not all shards were run"); for ( GenomeLoc shard : shards ) { Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE), "Unowned shard");