diff --git a/analysis/depristo/distributedGATK/commands.R b/analysis/depristo/distributedGATK/commands.R index ee8f001eb..ad5759d62 100644 --- a/analysis/depristo/distributedGATK/commands.R +++ b/analysis/depristo/distributedGATK/commands.R @@ -1,8 +1,8 @@ -# todo -- add replicate number to system -# tood -- add scatter gather comparison -d <- read.table("results.dat", header=T) +d <- read.table("results.new.dat", header=T) require("lattice") +plot1 <- function(d, name) { + d = subset(d, dataset == name) subd = data.frame(parallel.type=d$parallel.type, nWaysParallel=d$nWaysParallel, end.to.end.time=d$end.to.end.time,per.1M.sites = d$per.1M.sites, job.run.time = d$job.run.time) nways = unique(subd$nWaysParallel) @@ -18,5 +18,8 @@ subd = rbind(subd, theo) print(summary(subd)) -print(xyplot(end.to.end.time + per.1M.sites + job.run.time ~ nWaysParallel, data=subd[order(subd$nWaysParallel),], group=parallel.type, type="b", outer=T, scale=list(relation="free"), auto.key=T, lwd=c(2,2,1))) +print(xyplot(log10(end.to.end.time) + per.1M.sites + log10(job.run.time) ~ log2(nWaysParallel), data=subd[order(subd$nWaysParallel),], group=parallel.type, type="b", outer=T, scale=list(relation="free"), auto.key=T, lwd=c(2,2,1), main=name)) +} +plot1(d, "NA12878Trio.WEx") +plot1(d, "NA12878.HiSeq") diff --git a/analysis/depristo/distributedGATK/distributedGATKPerformance.scala b/analysis/depristo/distributedGATK/distributedGATKPerformance.scala index 1631823eb..0d1eb0e19 100755 --- a/analysis/depristo/distributedGATK/distributedGATKPerformance.scala +++ b/analysis/depristo/distributedGATK/distributedGATKPerformance.scala @@ -13,16 +13,22 @@ class DistributedGATKPerformance extends QScript { @Argument(shortName="outputDir", doc="output directory", required=false) var outputDir: String = "" - @Argument(shortName="dataset", doc="selects the datasets to run. If not provided, all datasets will be used", required=false) + @Argument(shortName="dataset", doc="selects the datasets to run. If not provided, all datasets will be used", required=true) var datasets: List[String] = Nil + @Argument(shortName="waysParallel", doc="selects the datasets to run. If not provided, all datasets will be used", required=false) + var waysParallelArg: List[Int] = Nil + @Argument(shortName="long", doc="runs long calculations", required=false) var long: Boolean = false + @Argument(shortName="test", doc="runs long calculations", required=false) + var test: Boolean = false + //@Argument(shortName="noBAQ", doc="turns off BAQ calculation", required=false) var noBAQ: Boolean = false - trait UNIVERSAL_GATK_ARGS extends CommandLineGATK { logging_level = "INFO"; jarFile = gatkJarFile; memoryLimit = Some(2); } + trait UNIVERSAL_GATK_ARGS extends CommandLineGATK { logging_level = "DEBUG"; jarFile = gatkJarFile; memoryLimit = Some(2); } class Target( val baseName: String, @@ -102,7 +108,7 @@ class DistributedGATKPerformance extends QScript { new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** "/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass), "WExTrio" -> new Target("NA12878Trio.WEx", b37, dbSNP_b37, hapmap_b37, indelMask_b37, - new File("/humgen/gsa-scr1/carneiro/prj/trio/NA12878Trio.WEx.hg19.bam"), + new File("/humgen/gsa-scr1/carneiro/prj/trio/data/NA12878Trio.WEx.hg19.recal.bam"), new File("/humgen/gsa-scr1/delangel/NewUG/calls/AugustRelease.filtered_Q50_QD5.0_SB0.0.allSamples.SNPs_hg19.WEx_UG_newUG_MQC.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED ** "/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list", 2.6, !lowPass) ) @@ -111,24 +117,25 @@ class DistributedGATKPerformance extends QScript { // Selects the datasets in the -dataset argument and adds them to targets. var targets: List[Target] = List() - if (!datasets.isEmpty) - for (ds <- datasets) - targets ::= targetDataSets(ds) // Could check if ds was mispelled, but this way an exception will be thrown, maybe it's better this way? - else // If -dataset is not specified, all datasets are used. - for (targetDS <- targetDataSets.valuesIterator) // for Scala 2.7 or older, use targetDataSets.values - targets ::= targetDS + for (ds <- datasets) + targets ::= targetDataSets(ds) // Could check if ds was mispelled, but this way an exception will be thrown, maybe it's better this way? + + var nWays = if (long) List(1, 2, 4, 8) else List(16, 32, 64, 96) + if ( ! waysParallelArg.isEmpty ) + nWays = waysParallelArg - val nWays = if (long) List(1, 2, 4) else List(8, 16, 32, 64, 96) //val nWays = List(2) for (target <- targets) { - for ( scatterP <- List(true, false) ) - for (nWaysParallel <- nWays) { + for ( scatterP <- if ( test ) List(false) else List(true, false) ) + for (nWaysParallel <- if ( test ) List(32) else nWays) { val aname = "ptype_%s.nways_%d".format(if ( scatterP ) "sg" else "dist", nWaysParallel) def addUG(ug: UnifiedGenotyper) = { if ( ! long ) ug.jobLimitSeconds = Some(60 * 60 * 4) + if ( test ) + ug.jobLimitSeconds = Some(60 * 30) add(ug); } @@ -141,10 +148,14 @@ class DistributedGATKPerformance extends QScript { } else { for ( part <- 1 to nWaysParallel) { var ug: UnifiedGenotyper = new UnifiedGenotyper(target, aname + ".part" + part) - ug.intervalsString ++= List(CHROMOSOME) + if ( target.name.equals("NA12878.HiSeq")) + ug.intervalsString ++= List(CHROMOSOME) + else + ug.intervalsString ++= List(target.intervals) ug.processingTracker = new File(target.name + "." + aname + ".distributed.txt") if ( part == 1 ) ug.performanceLog = new File("%s.%s.pf.log".format(target.name, aname)) + ug.processingTrackerStatusFile = new File("%s.%s.%d.ptstatus.log".format(target.name, aname, part)) addUG(ug) } } diff --git a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index b46ac2f7b..f7c5f7c04 100755 --- a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -212,6 +212,11 @@ public class GATKArgumentCollection { @Input(fullName = "read_group_black_list", shortName="rgbl", doc="Filters out read groups matching : or a .txt file containing the filter strings one per line.", required = false) public List readGroupBlackList = null; + // -------------------------------------------------------------------------------------------------------------- + // + // distributed GATK arguments + // + // -------------------------------------------------------------------------------------------------------------- @Element(required=false) @Argument(fullName="processingTracker",shortName="C",doc="A lockable, shared file for coordinating distributed GATK runs",required=false) @Hidden @@ -222,6 +227,17 @@ public class GATKArgumentCollection { @Hidden public boolean restartProcessingTracker = false; + @Element(required=false) + @Argument(fullName="processingTrackerStatusFile",shortName="CSF",doc="If provided, a detailed accounting of the state of the process tracker is written to this file. For debugging, only",required=false) + @Hidden + public File processingTrackerStatusFile = null; + + // -------------------------------------------------------------------------------------------------------------- + // + // methods + // + // -------------------------------------------------------------------------------------------------------------- + /** * marshal the data out to a object * @@ -387,6 +403,10 @@ public class GATKArgumentCollection { (other.processingTrackerFile != null && !other.processingTrackerFile.equals(this.processingTrackerFile))) return false; + if ((other.processingTrackerStatusFile == null && this.processingTrackerStatusFile != null) || + (other.processingTrackerStatusFile != null && !other.processingTrackerStatusFile.equals(this.processingTrackerStatusFile))) + return false; + if ( restartProcessingTracker != other.restartProcessingTracker ) return false; diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 69d74a364..097adada0 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -38,6 +38,10 @@ import org.broadinstitute.sting.gatk.iterators.NullSAMIterator; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.ReadMetrics; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; import java.lang.management.ManagementFactory; import java.util.*; @@ -167,8 +171,17 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { logger.info("Deleting ProcessingTracker file " + engine.getArguments().processingTrackerFile); } - processingTracker = GenomeLocProcessingTracker.createFileBackedDistributed(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser()); - logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile); + PrintStream statusStream = null; + if ( engine.getArguments().processingTrackerStatusFile != null ) { + try { + statusStream = new PrintStream(new FileOutputStream(engine.getArguments().processingTrackerStatusFile)); + } catch ( FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(engine.getArguments().processingTrackerStatusFile, e); + } + } + + processingTracker = GenomeLocProcessingTracker.createFileBackedDistributed(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser(), false, statusStream); + logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile + " process.id = " + engine.getName()); } else { processingTracker = GenomeLocProcessingTracker.createNoOp(); } diff --git a/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java b/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java index b305ffff7..365760c6d 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java +++ b/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java @@ -11,5 +11,6 @@ import java.util.concurrent.locks.ReentrantLock; * Simple extension of a ReentrantLock that supports a close method */ public class ClosableReentrantLock extends ReentrantLock { + public boolean ownsLock() { return super.isHeldByCurrentThread(); } public void close() {} } diff --git a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java index 3c77eaece..57ef756d7 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java @@ -6,10 +6,8 @@ import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; +import java.io.*; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -20,76 +18,103 @@ import java.util.concurrent.locks.ReentrantLock; * Keeps a copy of the processing locks in a file, in addition to tracking in memory via the base class */ public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); + private static final Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); private static final boolean DEBUG = false; - private File sharedFile = null; - private GenomeLocParser parser; - private RandomAccessFile raFile; + private static final String READ_MODE = "r"; + private static final String WRITE_MODE = "rws"; + + private final File sharedFile; + private final GenomeLocParser parser; private long lastReadPosition = 0; - protected FileBackedGenomeLocProcessingTracker(File sharedFile, RandomAccessFile raFile, GenomeLocParser parser, ClosableReentrantLock lock) { - super(lock); + protected FileBackedGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, ClosableReentrantLock lock, PrintStream status) { + super(lock, status); this.sharedFile = sharedFile; - this.raFile = raFile; this.parser = parser; } - protected void close() { - super.close(); + private RandomAccessFile openFile(String mode) { try { - raFile.close(); + return new RandomAccessFile(sharedFile, mode); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } + } + + private void closeFile(RandomAccessFile raFile) { + try { + if ( raFile != null ) raFile.close(); } catch (IOException e) { throw new UserException.CouldNotCreateOutputFile(sharedFile, e); } } +// protected void close() { +// super.close(); +// } + @Override protected List readNewLocs() { List newPLocs = new ArrayList(); // todo -- gratitous object creation - try { - //logger.warn(String.format("Reading new locs at: file.length=%d last=%d", raFile.length(), lastReadPosition)); - if ( raFile.length() > lastReadPosition ) { - raFile.seek(lastReadPosition); - int counter = 0; - String line = raFile.readLine(); // Read another line - while ( line != null ) { - String[] parts = line.split(" "); - if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer()); - ProcessingLoc ploc = new ProcessingLoc(parser.parseGenomeLoc(parts[0]), parts[1]); - //logger.warn(" Read " + ploc); - newPLocs.add(ploc); - line = raFile.readLine(); - counter++; + if ( sharedFile.exists() ) { + RandomAccessFile raFile = null; + try { + raFile = openFile(READ_MODE); + //logger.warn(String.format("Reading new locs at: file.length=%d last=%d", raFile.length(), lastReadPosition)); + if ( raFile.length() > lastReadPosition ) { + raFile.seek(lastReadPosition); + + int counter = 0; + String line = raFile.readLine(); // Read another line + while ( line != null ) { + String[] parts = line.split(" "); + if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer()); + ProcessingLoc ploc = new ProcessingLoc(parser.parseGenomeLoc(parts[0]), parts[1]); + //logger.warn(" Read " + ploc); + newPLocs.add(ploc); + line = raFile.readLine(); + counter++; + } + lastReadPosition = raFile.getFilePointer(); + if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, # read new locs is %d", + counter, lastReadPosition, newPLocs.size())); } - lastReadPosition = raFile.getFilePointer(); - if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, # read new locs is %d", - counter, lastReadPosition, newPLocs.size())); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotReadInputFile(sharedFile, e); + } catch (IOException e) { + throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); + } finally { + closeFile(raFile); } - } catch (FileNotFoundException e) { - throw new UserException.CouldNotReadInputFile(sharedFile, e); - } catch (IOException e) { - throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); } return newPLocs; - } + } @Override protected void registerNewLocs(Collection plocs) { + RandomAccessFile raFile = null; + try { + raFile = openFile(WRITE_MODE); long startPos = raFile.getFilePointer(); raFile.seek(raFile.length()); + StringBuffer bytes = new StringBuffer(); for ( ProcessingLoc ploc : plocs ) { String packet = String.format("%s %s%n", ploc.getLocation(), ploc.getOwner()); - raFile.write(packet.getBytes()); + bytes.append(packet); if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", ploc, startPos, packet.length(), raFile.getFilePointer())); } + raFile.write(bytes.toString().getBytes()); + //raFile.getChannel().force(true); } catch (FileNotFoundException e) { throw new UserException.CouldNotCreateOutputFile(sharedFile, e); } catch (IOException e) { throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } finally { + closeFile(raFile); } } } diff --git a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java index 759ceeee3..ba80991ad 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java @@ -10,7 +10,9 @@ import org.broadinstitute.sting.utils.exceptions.UserException; import java.io.File; import java.io.FileNotFoundException; +import java.io.PrintStream; import java.io.RandomAccessFile; +import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.locks.ReentrantLock; @@ -18,9 +20,17 @@ import java.util.concurrent.locks.ReentrantLock; * */ public abstract class GenomeLocProcessingTracker { - private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); - private Map processingLocs; - private ClosableReentrantLock lock; + private final static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); + private final static SimpleDateFormat STATUS_FORMAT = new SimpleDateFormat("HH:mm:ss,SSS"); + private final static int DEFAULT_OWNERSHIP_ITERATOR_SIZE = 100; + + private final static String GOING_FOR_LOCK = "going_for_lock"; + private final static String HAVE_LOCK = "have_lock"; + private final static String RUNNING = "running"; + + private final Map processingLocs; + private final ClosableReentrantLock lock; + private final PrintStream status; protected SimpleTimer writeTimer = new SimpleTimer("writeTimer"); protected SimpleTimer readTimer = new SimpleTimer("readTimer"); @@ -41,24 +51,18 @@ public abstract class GenomeLocProcessingTracker { return new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock()); } - public static GenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser) { - return createFileBacked(sharedFile, parser, false); + public static GenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser, PrintStream status) { + return createFileBacked(sharedFile, parser, false, false, status); } - public static GenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser) { - return createFileBacked(sharedFile, parser, true); + public static GenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser, boolean blockingP, PrintStream status) { + return createFileBacked(sharedFile, parser, blockingP, true, status); } - private static FileBackedGenomeLocProcessingTracker createFileBacked(File sharedFile, GenomeLocParser parser, boolean useFileLockToo) { - try { - //logger.warn("Creating file backed GLPT at " + sharedFile); - RandomAccessFile raFile = new RandomAccessFile(sharedFile, "rws"); - ClosableReentrantLock lock = useFileLockToo ? new SharedFileThreadSafeLock(raFile.getChannel()) : new ClosableReentrantLock(); - return new FileBackedGenomeLocProcessingTracker(sharedFile, raFile, parser, lock); - } - catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } + private static FileBackedGenomeLocProcessingTracker createFileBacked(File sharedFile, GenomeLocParser parser, boolean blockP, boolean useFileLockToo, PrintStream status) { + //logger.warn("Creating file backed GLPT at " + sharedFile); + ClosableReentrantLock lock = useFileLockToo ? new SharedFileThreadSafeLock(sharedFile, blockP) : new ClosableReentrantLock(); + return new FileBackedGenomeLocProcessingTracker(sharedFile, parser, lock, status); } // -------------------------------------------------------------------------------- @@ -66,9 +70,11 @@ public abstract class GenomeLocProcessingTracker { // Creating ProcessingTrackers // // -------------------------------------------------------------------------------- - public GenomeLocProcessingTracker(ClosableReentrantLock lock) { - processingLocs = new HashMap(); + public GenomeLocProcessingTracker(ClosableReentrantLock lock, PrintStream status) { + this.processingLocs = new HashMap(); + this.status = status; this.lock = lock; + printStatusHeader(); } // -------------------------------------------------------------------------------- @@ -84,16 +90,16 @@ public abstract class GenomeLocProcessingTracker { * @param loc * @return */ - public final boolean locIsOwned(GenomeLoc loc) { - return findOwner(loc) != null; + public final boolean locIsOwned(GenomeLoc loc, String id) { + return findOwner(loc, id) != null; } - public final ProcessingLoc findOwner(GenomeLoc loc) { + protected final ProcessingLoc findOwner(GenomeLoc loc, String id) { // fast path to check if we already have the existing genome loc in memory for ownership claims // getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it // unless necessary ProcessingLoc x = findOwnerInMap(loc, processingLocs); - return x == null ? findOwnerInMap(loc, updateLocs()) : x; + return x == null ? findOwnerInMap(loc, updateLocs(id)) : x; } /** @@ -110,19 +116,19 @@ public abstract class GenomeLocProcessingTracker { public final ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { // processingLocs is a shared memory synchronized object, and this // method is synchronized, so we can just do our processing - lock(); + lock(myName); try { - ProcessingLoc owner = findOwner(loc); + ProcessingLoc owner = findOwner(loc, myName); if ( owner == null ) { // we are unowned owner = new ProcessingLoc(loc, myName); - registerNewLocsWithTimers(Arrays.asList(owner)); + registerNewLocsWithTimers(Arrays.asList(owner), myName); } return owner; //logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner)); } finally { - unlock(); + unlock(myName); } } @@ -154,7 +160,7 @@ public abstract class GenomeLocProcessingTracker { private final int cacheSize; public OwnershipIterator(Iterator subit, String myName) { - this(subit, myName, 10); + this(subit, myName, DEFAULT_OWNERSHIP_ITERATOR_SIZE); } public OwnershipIterator(Iterator subit, String myName, int cacheSize) { @@ -185,10 +191,11 @@ public abstract class GenomeLocProcessingTracker { return elt; else { // cache is empty, we need to fill up the cache and return the first element of the queue - lock(); + lock(myName); try { + // read once the database of owners at the start - updateLocs(); + updateLocs(myName); boolean done = false; Queue pwns = new LinkedList(); // ;-) @@ -208,7 +215,7 @@ public abstract class GenomeLocProcessingTracker { // if not, we continue our search } - registerNewLocsWithTimers(pwns); + registerNewLocsWithTimers(pwns, myName); // we've either filled up the cache or run out of elements. Either way we return // the first element of the cache. If the cache is empty, we return null here. @@ -217,7 +224,7 @@ public abstract class GenomeLocProcessingTracker { return cache.poll(); } finally { - unlock(); + unlock(myName); } } } @@ -240,12 +247,12 @@ public abstract class GenomeLocProcessingTracker { * * @return */ - protected final Collection getProcessingLocs() { - return updateLocs().values(); + protected final Collection getProcessingLocs(String myName) { + return updateLocs(myName).values(); } - private final Map updateLocs() { - lock(); + private final Map updateLocs(String myName) { + lock(myName); try { readTimer.restart(); for ( ProcessingLoc p : readNewLocs() ) @@ -254,11 +261,11 @@ public abstract class GenomeLocProcessingTracker { nReads++; return processingLocs; } finally { - unlock(); + unlock(myName); } } - protected final void registerNewLocsWithTimers(Collection plocs) { + protected final void registerNewLocsWithTimers(Collection plocs, String myName) { writeTimer.restart(); registerNewLocs(plocs); nWrites++; @@ -270,17 +277,37 @@ public abstract class GenomeLocProcessingTracker { // Low-level accessors / manipulators and utility functions // // -------------------------------------------------------------------------------- - - private final void lock() { - lockWaitTimer.restart(); - if ( ! lock.isHeldByCurrentThread() ) - nLocks++; - lock.lock(); - lockWaitTimer.stop(); + private final boolean hasStatus() { + return status != null; } - private final void unlock() { + private final void printStatusHeader() { + if ( hasStatus() ) status.printf("process.id\thr.time\ttime\tstate%n"); + } + + private final void printStatus(String id, long machineTime, String state) { + // prints a line like processID human-readable-time machine-time state + if ( hasStatus() ) { + status.printf("%s\t%s\t%d\t%s%n", id, STATUS_FORMAT.format(machineTime), machineTime, state); + status.flush(); + } + } + + private final void lock(String id) { + lockWaitTimer.restart(); + boolean hadLock = lock.ownsLock(); + if ( ! hadLock ) { + nLocks++; + printStatus(id, lockWaitTimer.currentTime(), GOING_FOR_LOCK); + } + lock.lock(); + lockWaitTimer.stop(); + if ( ! hadLock ) printStatus(id, lockWaitTimer.currentTime(), HAVE_LOCK); + } + + private final void unlock(String id) { lock.unlock(); + if ( ! lock.ownsLock() ) printStatus(id, lockWaitTimer.currentTime(), RUNNING); } protected final static ProcessingLoc findOwnerInCollection(GenomeLoc loc, Collection locs) { @@ -312,8 +339,8 @@ public abstract class GenomeLocProcessingTracker { protected void close() { lock.close(); - logger.warn("Locking events: " + nLocks); - // by default we don't do anything + if ( hasStatus() ) status.close(); + //logger.warn("Locking events: " + nLocks); } protected abstract void registerNewLocs(Collection plocs); diff --git a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java index eb5759e95..45b8593a7 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java @@ -15,7 +15,7 @@ import java.util.List; */ public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker { protected NoOpGenomeLocProcessingTracker() { - super(new ClosableReentrantLock()); // todo -- should be lighter weight + super(new ClosableReentrantLock(), null); // todo -- should be lighter weight } // @Override diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java index b4f2e761f..ab8a20f02 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java @@ -4,7 +4,10 @@ import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.channels.*; /** @@ -20,28 +23,136 @@ public class SharedFileThreadSafeLock extends ClosableReentrantLock { private static Logger logger = Logger.getLogger(SharedFileThreadSafeLock.class); private static final boolean DEBUG = false; + // 100 seconds of trying -> failure + private static final int DEFAULT_N_TRIES = 1000; + private static final long DEFAULT_MILLISECONDS_PER_TRY = 100; + + /** The file we are locking */ + private final File file; + /** The file lock itself that guards the file */ FileLock fileLock; /** the channel object that 'owns' the file lock, and we use to request the lock */ FileChannel channel; - int fileLockReentrantCounter = 0; /** - * Create a SharedFileThreadSafeLock object locking the file associated with channel - * @param channel + * A counter that indicates the number of 'locks' on this file. + * If locks == 2, then two unlocks are required + * before any resources are freed. */ - public SharedFileThreadSafeLock(FileChannel channel) { + int fileLockReentrantCounter = 0; + + // type of locking + private final boolean blockOnLock; + private final int nRetries; + private final long milliSecPerTry; + + /** + * Create a SharedFileThreadSafeLock object locking the file + * @param file + */ + public SharedFileThreadSafeLock(File file, boolean blockOnLock, int nRetries, long milliSecPerTry) { super(); - this.channel = channel; + this.file = file; + this.blockOnLock = blockOnLock; + this.nRetries = nRetries; + this.milliSecPerTry = milliSecPerTry; + } + + public SharedFileThreadSafeLock(File file, boolean blockOnLock) { + this(file, blockOnLock, DEFAULT_N_TRIES, DEFAULT_MILLISECONDS_PER_TRY); + } + + + private FileChannel getChannel() { + if ( DEBUG ) logger.warn(" Get channel: " + Thread.currentThread().getName() + " channel = " + channel); + if ( channel == null ) { + try { + if ( DEBUG ) logger.warn(" opening channel: " + Thread.currentThread().getName()); + this.channel = new RandomAccessFile(file, "rw").getChannel(); + if ( DEBUG ) logger.warn(" opened channel: " + Thread.currentThread().getName()); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(file, e); + } + } + + return this.channel; + } + + private void closeChannel() { + try { + if ( channel != null ) { + channel.close(); + channel = null; + } + } + catch (IOException e) { + throw new UserException("Count not close channel associated with file" + file, e); + } } public void close() { + closeChannel(); + } + + public boolean ownsLock() { + return super.isHeldByCurrentThread() && fileLockReentrantCounter > 0; + } + + // ------------------------------------------------------------------------------------------ + // + // workhorse routines -- acquiring file locks + // + // ------------------------------------------------------------------------------------------ + + private void acquireFileLock() { try { - channel.close(); - } - catch (IOException e) { - throw new UserException("Count not close channel " + channel, e); + // Precondition -- lock is always null while we don't have a lock + if ( fileLock != null ) + throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); + + if ( blockOnLock ) { + // + // blocking code + // + fileLock = getChannel().lock(); + } else { + // + // polling code + // + int i = 0; + for ( ; fileLock == null && i < nRetries; i++ ) { + fileLock = getChannel().tryLock(); + if ( fileLock == null ) { + try { + //logger.warn("tryLock failed on try " + i + ", waiting " + milliSecPerTry + " millseconds for retry"); + Thread.sleep(milliSecPerTry); + } catch ( InterruptedException e ) { + throw new UserException("SharedFileThreadSafeLock interrupted during wait for file lock", e); + } + } + } + if ( i > 1 ) logger.warn("tryLock required " + i + " tries before completing, waited " + i * milliSecPerTry + " millseconds"); + + if ( fileLock == null ) { + // filelock == null -> we never managed to acquire the lock! + throw new UserException("SharedFileThreadSafeLock failed to obtain the lock after " + nRetries + " attempts"); + } + } + if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName()); + } catch (ClosedChannelException e) { + throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + file, e); + } catch (FileLockInterruptionException e) { + throw new ReviewedStingException("File lock interrupted", e); + } catch (NonWritableChannelException e) { + throw new ReviewedStingException("File channel not writable", e); + } catch (OverlappingFileLockException e) { + // this only happens when multiple threads are running, and one is waiting + // for the lock above and we come here. + throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); + } catch (IOException e) { + throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); } } @@ -61,27 +172,9 @@ public class SharedFileThreadSafeLock extends ClosableReentrantLock { } else { super.lock(); if ( DEBUG ) logger.warn(" Have thread-lock, going for filelock: " + Thread.currentThread().getName()); - try { - // Precondition -- lock is always null while we don't have a lock - if ( fileLock != null ) - throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); - if ( fileLockReentrantCounter == 0 ) - fileLock = channel.lock(); - fileLockReentrantCounter++; - if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName()); - } catch (ClosedChannelException e) { - throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + channel, e); - } catch (FileLockInterruptionException e) { - throw new ReviewedStingException("File lock interrupted", e); - } catch (NonWritableChannelException e) { - throw new ReviewedStingException("File channel not writable", e); - } catch (OverlappingFileLockException e) { - // this only happens when multiple threads are running, and one is waiting - // for the lock above and we come here. - throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); - } catch (IOException e) { - throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); - } + if ( fileLockReentrantCounter == 0 ) + acquireFileLock(); + fileLockReentrantCounter++; } } @@ -97,13 +190,14 @@ public class SharedFileThreadSafeLock extends ClosableReentrantLock { if ( DEBUG ) logger.warn(" going to release filelock: " + Thread.currentThread().getName()); fileLock.release(); + closeChannel(); fileLock = null; if ( DEBUG ) logger.warn(" released filelock: " + Thread.currentThread().getName()); } else { if ( DEBUG ) logger.warn(" skipping filelock release, reenterring unlock via multiple threads " + Thread.currentThread().getName()); } } catch ( IOException e ) { - throw new ReviewedStingException("Could not free lock on file " + channel, e); + throw new ReviewedStingException("Could not free lock on file " + file, e); } finally { if ( DEBUG ) logger.warn(" going to release threadlock: " + Thread.currentThread().getName()); super.unlock(); diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java index 5a67eb7b0..cff41b708 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java @@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.threading; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.GenomeLoc; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -15,7 +16,11 @@ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingT private List newPLocs = new ArrayList(); protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock) { - super(lock); + super(lock, null); + } + + protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock, PrintStream status) { + super(lock, status); } @Override diff --git a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java index 355e3dd92..2512dafe0 100644 --- a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java @@ -96,8 +96,6 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { List params = new ArrayList(); int counter = 0; -// for ( int nShard : Arrays.asList(10,100,1000) ) { -// for ( int shardSize : Arrays.asList(10) ) { for ( int nShard : nShards ) { for ( int shardSize : shardSizes ) { // shared mem -- canonical implementation @@ -108,7 +106,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { final File file1 = new File(String.format("%s_ThreadSafeFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize)); params.add(new TestTarget("ThreadSafeFileBacked", nShard, shardSize) { - GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedThreaded(file1, genomeLocParser); + GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedThreaded(file1, genomeLocParser, null); public GenomeLocProcessingTracker getTracker() { return tracker; } public void init() { if ( file1.exists() ) @@ -116,15 +114,17 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { } }); - final File file2 = new File(String.format("%s_ThreadSafeFileLockingFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize)); - params.add(new TestTarget("ThreadSafeFileLockingFileBacked", nShard, shardSize) { - GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedDistributed(file2, genomeLocParser); - public GenomeLocProcessingTracker getTracker() { return tracker; } - public void init() { - if ( file2.exists() ) - file2.delete(); - } - }); + for ( final boolean blocking : Arrays.asList(true, false) ) { + final File file2 = new File(String.format("%s_ThreadSafeFileLockingFile_blocking%b_%d_%d", FILE_ROOT, blocking, counter++, nShard, shardSize)); + params.add(new TestTarget("ThreadSafeFileLockingFileBackedBlocking" + blocking, nShard, shardSize) { + GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedDistributed(file2, genomeLocParser, blocking, null); + public GenomeLocProcessingTracker getTracker() { return tracker; } + public void init() { + if ( file2.exists() ) + file2.delete(); + } + }); + } } } @@ -149,7 +149,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { GenomeLoc loc = genomeLocParser.createGenomeLoc(chr1, start, start +1); ProcessingLoc ploc = tracker.claimOwnership(loc, NAME_ONE); Assert.assertTrue(ploc.isOwnedBy(NAME_ONE)); - Assert.assertEquals(tracker.getProcessingLocs().size(), 0); + Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), 0); } } } @@ -164,8 +164,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { for ( GenomeLoc shard : shards ) { counter++; - Assert.assertNull(tracker.findOwner(shard)); - Assert.assertFalse(tracker.locIsOwned(shard)); + Assert.assertNull(tracker.findOwner(shard, NAME_ONE)); + Assert.assertFalse(tracker.locIsOwned(shard, NAME_ONE)); ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); Assert.assertNotNull(proc); @@ -173,10 +173,10 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { Assert.assertNotNull(proc.getOwner()); Assert.assertEquals(proc.getLocation(), shard); Assert.assertEquals(proc.getOwner(), NAME_ONE); - Assert.assertEquals(tracker.findOwner(shard), proc); - Assert.assertTrue(tracker.locIsOwned(shard)); - Assert.assertNotNull(tracker.getProcessingLocs()); - Assert.assertEquals(tracker.getProcessingLocs().size(), counter); + Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc); + Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE)); + Assert.assertNotNull(tracker.getProcessingLocs(NAME_ONE)); + Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), counter); ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO); Assert.assertFalse(badClaimAttempt.getOwner().equals(NAME_TWO)); @@ -211,7 +211,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { Assert.assertEquals(nFound, toFind.size(), "Didn't find all of the available shards"); } else { nFound++; - ProcessingLoc proc = tracker.findOwner(shard); + ProcessingLoc proc = tracker.findOwner(shard, NAME_ONE); Assert.assertTrue(proc.isOwnedBy(NAME_ONE)); Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!"); @@ -246,7 +246,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { Assert.assertTrue(markedShards.contains(shard), "Unran process wasn't marked"); if ( ! markedShards.contains(shard) ) { - Assert.assertEquals(tracker.findOwner(shard), proc); + Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc); } } } @@ -357,12 +357,12 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { assertAllThreadsFinished(results); // we ran everything - Assert.assertEquals(tracker.getProcessingLocs().size(), shards.size(), "Not all shards were run"); + Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), shards.size(), "Not all shards were run"); for ( GenomeLoc shard : shards ) { - Assert.assertTrue(tracker.locIsOwned(shard), "Unowned shard"); + Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE), "Unowned shard"); - ProcessingLoc proc = tracker.findOwner(shard); + ProcessingLoc proc = tracker.findOwner(shard, NAME_ONE); Assert.assertNotNull(proc, "Proc was null"); Assert.assertNotNull(proc.getOwner(), "Owner was null"); diff --git a/shell/syncWithDevOnGSA2.csh b/shell/syncWithDevOnGSA2.csh index 045ac49b6..4f2445ee2 100755 --- a/shell/syncWithDevOnGSA2.csh +++ b/shell/syncWithDevOnGSA2.csh @@ -1,6 +1,6 @@ #!/bin/tcsh -setenv HERE "java tribble" +setenv HERE "java tribble scala analysis" setenv THERE \~/dev/GenomeAnalysisTKFromLaptop/trunk rsync -e ssh -aCvz $HERE depristo@gsa1:$THERE