V3 of the distributed GATK. High-efficiency implementation. Support for status tracking for debugging and display. Still not safe for production use due to NFS filelock problem. V4 will use alternative file locking mechanism

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5063 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2011-01-24 16:45:07 +00:00
parent fd864e8e3a
commit c50f39a147
12 changed files with 361 additions and 162 deletions

View File

@ -1,8 +1,8 @@
# todo -- add replicate number to system
# tood -- add scatter gather comparison
d <- read.table("results.dat", header=T)
d <- read.table("results.new.dat", header=T)
require("lattice")
plot1 <- function(d, name) {
d = subset(d, dataset == name)
subd = data.frame(parallel.type=d$parallel.type, nWaysParallel=d$nWaysParallel, end.to.end.time=d$end.to.end.time,per.1M.sites = d$per.1M.sites, job.run.time = d$job.run.time)
nways = unique(subd$nWaysParallel)
@ -18,5 +18,8 @@ subd = rbind(subd, theo)
print(summary(subd))
print(xyplot(end.to.end.time + per.1M.sites + job.run.time ~ nWaysParallel, data=subd[order(subd$nWaysParallel),], group=parallel.type, type="b", outer=T, scale=list(relation="free"), auto.key=T, lwd=c(2,2,1)))
print(xyplot(log10(end.to.end.time) + per.1M.sites + log10(job.run.time) ~ log2(nWaysParallel), data=subd[order(subd$nWaysParallel),], group=parallel.type, type="b", outer=T, scale=list(relation="free"), auto.key=T, lwd=c(2,2,1), main=name))
}
plot1(d, "NA12878Trio.WEx")
plot1(d, "NA12878.HiSeq")

View File

@ -13,16 +13,22 @@ class DistributedGATKPerformance extends QScript {
@Argument(shortName="outputDir", doc="output directory", required=false)
var outputDir: String = ""
@Argument(shortName="dataset", doc="selects the datasets to run. If not provided, all datasets will be used", required=false)
@Argument(shortName="dataset", doc="selects the datasets to run. If not provided, all datasets will be used", required=true)
var datasets: List[String] = Nil
@Argument(shortName="waysParallel", doc="selects the datasets to run. If not provided, all datasets will be used", required=false)
var waysParallelArg: List[Int] = Nil
@Argument(shortName="long", doc="runs long calculations", required=false)
var long: Boolean = false
@Argument(shortName="test", doc="runs long calculations", required=false)
var test: Boolean = false
//@Argument(shortName="noBAQ", doc="turns off BAQ calculation", required=false)
var noBAQ: Boolean = false
trait UNIVERSAL_GATK_ARGS extends CommandLineGATK { logging_level = "INFO"; jarFile = gatkJarFile; memoryLimit = Some(2); }
trait UNIVERSAL_GATK_ARGS extends CommandLineGATK { logging_level = "DEBUG"; jarFile = gatkJarFile; memoryLimit = Some(2); }
class Target(
val baseName: String,
@ -102,7 +108,7 @@ class DistributedGATKPerformance extends QScript {
new File("/humgen/gsa-hpprojects/dev/data/AugChr20Calls_v4_3state/ALL.august.v4.chr20.filtered.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED **
"/humgen/1kg/processing/pipeline_test_bams/whole_genome_chunked.chr20.hg19.intervals", 2.3, lowPass),
"WExTrio" -> new Target("NA12878Trio.WEx", b37, dbSNP_b37, hapmap_b37, indelMask_b37,
new File("/humgen/gsa-scr1/carneiro/prj/trio/NA12878Trio.WEx.hg19.bam"),
new File("/humgen/gsa-scr1/carneiro/prj/trio/data/NA12878Trio.WEx.hg19.recal.bam"),
new File("/humgen/gsa-scr1/delangel/NewUG/calls/AugustRelease.filtered_Q50_QD5.0_SB0.0.allSamples.SNPs_hg19.WEx_UG_newUG_MQC.vcf"), // ** THIS GOLD STANDARD NEEDS TO BE CORRECTED **
"/seq/references/HybSelOligos/whole_exome_agilent_1.1_refseq_plus_3_boosters/whole_exome_agilent_1.1_refseq_plus_3_boosters.Homo_sapiens_assembly19.targets.interval_list", 2.6, !lowPass)
)
@ -111,24 +117,25 @@ class DistributedGATKPerformance extends QScript {
// Selects the datasets in the -dataset argument and adds them to targets.
var targets: List[Target] = List()
if (!datasets.isEmpty)
for (ds <- datasets)
targets ::= targetDataSets(ds) // Could check if ds was mispelled, but this way an exception will be thrown, maybe it's better this way?
else // If -dataset is not specified, all datasets are used.
for (targetDS <- targetDataSets.valuesIterator) // for Scala 2.7 or older, use targetDataSets.values
targets ::= targetDS
for (ds <- datasets)
targets ::= targetDataSets(ds) // Could check if ds was mispelled, but this way an exception will be thrown, maybe it's better this way?
var nWays = if (long) List(1, 2, 4, 8) else List(16, 32, 64, 96)
if ( ! waysParallelArg.isEmpty )
nWays = waysParallelArg
val nWays = if (long) List(1, 2, 4) else List(8, 16, 32, 64, 96)
//val nWays = List(2)
for (target <- targets) {
for ( scatterP <- List(true, false) )
for (nWaysParallel <- nWays) {
for ( scatterP <- if ( test ) List(false) else List(true, false) )
for (nWaysParallel <- if ( test ) List(32) else nWays) {
val aname = "ptype_%s.nways_%d".format(if ( scatterP ) "sg" else "dist", nWaysParallel)
def addUG(ug: UnifiedGenotyper) = {
if ( ! long )
ug.jobLimitSeconds = Some(60 * 60 * 4)
if ( test )
ug.jobLimitSeconds = Some(60 * 30)
add(ug);
}
@ -141,10 +148,14 @@ class DistributedGATKPerformance extends QScript {
} else {
for ( part <- 1 to nWaysParallel) {
var ug: UnifiedGenotyper = new UnifiedGenotyper(target, aname + ".part" + part)
ug.intervalsString ++= List(CHROMOSOME)
if ( target.name.equals("NA12878.HiSeq"))
ug.intervalsString ++= List(CHROMOSOME)
else
ug.intervalsString ++= List(target.intervals)
ug.processingTracker = new File(target.name + "." + aname + ".distributed.txt")
if ( part == 1 )
ug.performanceLog = new File("%s.%s.pf.log".format(target.name, aname))
ug.processingTrackerStatusFile = new File("%s.%s.%d.ptstatus.log".format(target.name, aname, part))
addUG(ug)
}
}

View File

@ -212,6 +212,11 @@ public class GATKArgumentCollection {
@Input(fullName = "read_group_black_list", shortName="rgbl", doc="Filters out read groups matching <TAG>:<STRING> or a .txt file containing the filter strings one per line.", required = false)
public List<String> readGroupBlackList = null;
// --------------------------------------------------------------------------------------------------------------
//
// distributed GATK arguments
//
// --------------------------------------------------------------------------------------------------------------
@Element(required=false)
@Argument(fullName="processingTracker",shortName="C",doc="A lockable, shared file for coordinating distributed GATK runs",required=false)
@Hidden
@ -222,6 +227,17 @@ public class GATKArgumentCollection {
@Hidden
public boolean restartProcessingTracker = false;
@Element(required=false)
@Argument(fullName="processingTrackerStatusFile",shortName="CSF",doc="If provided, a detailed accounting of the state of the process tracker is written to this file. For debugging, only",required=false)
@Hidden
public File processingTrackerStatusFile = null;
// --------------------------------------------------------------------------------------------------------------
//
// methods
//
// --------------------------------------------------------------------------------------------------------------
/**
* marshal the data out to a object
*
@ -387,6 +403,10 @@ public class GATKArgumentCollection {
(other.processingTrackerFile != null && !other.processingTrackerFile.equals(this.processingTrackerFile)))
return false;
if ((other.processingTrackerStatusFile == null && this.processingTrackerStatusFile != null) ||
(other.processingTrackerStatusFile != null && !other.processingTrackerStatusFile.equals(this.processingTrackerStatusFile)))
return false;
if ( restartProcessingTracker != other.restartProcessingTracker )
return false;

View File

@ -38,6 +38,10 @@ import org.broadinstitute.sting.gatk.iterators.NullSAMIterator;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.gatk.ReadMetrics;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.util.*;
@ -167,8 +171,17 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
logger.info("Deleting ProcessingTracker file " + engine.getArguments().processingTrackerFile);
}
processingTracker = GenomeLocProcessingTracker.createFileBackedDistributed(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser());
logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile);
PrintStream statusStream = null;
if ( engine.getArguments().processingTrackerStatusFile != null ) {
try {
statusStream = new PrintStream(new FileOutputStream(engine.getArguments().processingTrackerStatusFile));
} catch ( FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(engine.getArguments().processingTrackerStatusFile, e);
}
}
processingTracker = GenomeLocProcessingTracker.createFileBackedDistributed(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser(), false, statusStream);
logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile + " process.id = " + engine.getName());
} else {
processingTracker = GenomeLocProcessingTracker.createNoOp();
}

View File

@ -11,5 +11,6 @@ import java.util.concurrent.locks.ReentrantLock;
* Simple extension of a ReentrantLock that supports a close method
*/
public class ClosableReentrantLock extends ReentrantLock {
public boolean ownsLock() { return super.isHeldByCurrentThread(); }
public void close() {}
}

View File

@ -6,10 +6,8 @@ import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -20,76 +18,103 @@ import java.util.concurrent.locks.ReentrantLock;
* Keeps a copy of the processing locks in a file, in addition to tracking in memory via the base class
*/
public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class);
private static final Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class);
private static final boolean DEBUG = false;
private File sharedFile = null;
private GenomeLocParser parser;
private RandomAccessFile raFile;
private static final String READ_MODE = "r";
private static final String WRITE_MODE = "rws";
private final File sharedFile;
private final GenomeLocParser parser;
private long lastReadPosition = 0;
protected FileBackedGenomeLocProcessingTracker(File sharedFile, RandomAccessFile raFile, GenomeLocParser parser, ClosableReentrantLock lock) {
super(lock);
protected FileBackedGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, ClosableReentrantLock lock, PrintStream status) {
super(lock, status);
this.sharedFile = sharedFile;
this.raFile = raFile;
this.parser = parser;
}
protected void close() {
super.close();
private RandomAccessFile openFile(String mode) {
try {
raFile.close();
return new RandomAccessFile(sharedFile, mode);
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
private void closeFile(RandomAccessFile raFile) {
try {
if ( raFile != null ) raFile.close();
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
// protected void close() {
// super.close();
// }
@Override
protected List<ProcessingLoc> readNewLocs() {
List<ProcessingLoc> newPLocs = new ArrayList<ProcessingLoc>(); // todo -- gratitous object creation
try {
//logger.warn(String.format("Reading new locs at: file.length=%d last=%d", raFile.length(), lastReadPosition));
if ( raFile.length() > lastReadPosition ) {
raFile.seek(lastReadPosition);
int counter = 0;
String line = raFile.readLine(); // Read another line
while ( line != null ) {
String[] parts = line.split(" ");
if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer());
ProcessingLoc ploc = new ProcessingLoc(parser.parseGenomeLoc(parts[0]), parts[1]);
//logger.warn(" Read " + ploc);
newPLocs.add(ploc);
line = raFile.readLine();
counter++;
if ( sharedFile.exists() ) {
RandomAccessFile raFile = null;
try {
raFile = openFile(READ_MODE);
//logger.warn(String.format("Reading new locs at: file.length=%d last=%d", raFile.length(), lastReadPosition));
if ( raFile.length() > lastReadPosition ) {
raFile.seek(lastReadPosition);
int counter = 0;
String line = raFile.readLine(); // Read another line
while ( line != null ) {
String[] parts = line.split(" ");
if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer());
ProcessingLoc ploc = new ProcessingLoc(parser.parseGenomeLoc(parts[0]), parts[1]);
//logger.warn(" Read " + ploc);
newPLocs.add(ploc);
line = raFile.readLine();
counter++;
}
lastReadPosition = raFile.getFilePointer();
if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, # read new locs is %d",
counter, lastReadPosition, newPLocs.size()));
}
lastReadPosition = raFile.getFilePointer();
if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, # read new locs is %d",
counter, lastReadPosition, newPLocs.size()));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotReadInputFile(sharedFile, e);
} catch (IOException e) {
throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e);
} finally {
closeFile(raFile);
}
} catch (FileNotFoundException e) {
throw new UserException.CouldNotReadInputFile(sharedFile, e);
} catch (IOException e) {
throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e);
}
return newPLocs;
}
}
@Override
protected void registerNewLocs(Collection<ProcessingLoc> plocs) {
RandomAccessFile raFile = null;
try {
raFile = openFile(WRITE_MODE);
long startPos = raFile.getFilePointer();
raFile.seek(raFile.length());
StringBuffer bytes = new StringBuffer();
for ( ProcessingLoc ploc : plocs ) {
String packet = String.format("%s %s%n", ploc.getLocation(), ploc.getOwner());
raFile.write(packet.getBytes());
bytes.append(packet);
if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", ploc, startPos, packet.length(), raFile.getFilePointer()));
}
raFile.write(bytes.toString().getBytes());
//raFile.getChannel().force(true);
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
} finally {
closeFile(raFile);
}
}
}

View File

@ -10,7 +10,9 @@ import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
@ -18,9 +20,17 @@ import java.util.concurrent.locks.ReentrantLock;
*
*/
public abstract class GenomeLocProcessingTracker {
private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class);
private Map<GenomeLoc, ProcessingLoc> processingLocs;
private ClosableReentrantLock lock;
private final static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class);
private final static SimpleDateFormat STATUS_FORMAT = new SimpleDateFormat("HH:mm:ss,SSS");
private final static int DEFAULT_OWNERSHIP_ITERATOR_SIZE = 100;
private final static String GOING_FOR_LOCK = "going_for_lock";
private final static String HAVE_LOCK = "have_lock";
private final static String RUNNING = "running";
private final Map<GenomeLoc, ProcessingLoc> processingLocs;
private final ClosableReentrantLock lock;
private final PrintStream status;
protected SimpleTimer writeTimer = new SimpleTimer("writeTimer");
protected SimpleTimer readTimer = new SimpleTimer("readTimer");
@ -41,24 +51,18 @@ public abstract class GenomeLocProcessingTracker {
return new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock());
}
public static GenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser) {
return createFileBacked(sharedFile, parser, false);
public static GenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser, PrintStream status) {
return createFileBacked(sharedFile, parser, false, false, status);
}
public static GenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser) {
return createFileBacked(sharedFile, parser, true);
public static GenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser, boolean blockingP, PrintStream status) {
return createFileBacked(sharedFile, parser, blockingP, true, status);
}
private static FileBackedGenomeLocProcessingTracker createFileBacked(File sharedFile, GenomeLocParser parser, boolean useFileLockToo) {
try {
//logger.warn("Creating file backed GLPT at " + sharedFile);
RandomAccessFile raFile = new RandomAccessFile(sharedFile, "rws");
ClosableReentrantLock lock = useFileLockToo ? new SharedFileThreadSafeLock(raFile.getChannel()) : new ClosableReentrantLock();
return new FileBackedGenomeLocProcessingTracker(sharedFile, raFile, parser, lock);
}
catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
private static FileBackedGenomeLocProcessingTracker createFileBacked(File sharedFile, GenomeLocParser parser, boolean blockP, boolean useFileLockToo, PrintStream status) {
//logger.warn("Creating file backed GLPT at " + sharedFile);
ClosableReentrantLock lock = useFileLockToo ? new SharedFileThreadSafeLock(sharedFile, blockP) : new ClosableReentrantLock();
return new FileBackedGenomeLocProcessingTracker(sharedFile, parser, lock, status);
}
// --------------------------------------------------------------------------------
@ -66,9 +70,11 @@ public abstract class GenomeLocProcessingTracker {
// Creating ProcessingTrackers
//
// --------------------------------------------------------------------------------
public GenomeLocProcessingTracker(ClosableReentrantLock lock) {
processingLocs = new HashMap<GenomeLoc, ProcessingLoc>();
public GenomeLocProcessingTracker(ClosableReentrantLock lock, PrintStream status) {
this.processingLocs = new HashMap<GenomeLoc, ProcessingLoc>();
this.status = status;
this.lock = lock;
printStatusHeader();
}
// --------------------------------------------------------------------------------
@ -84,16 +90,16 @@ public abstract class GenomeLocProcessingTracker {
* @param loc
* @return
*/
public final boolean locIsOwned(GenomeLoc loc) {
return findOwner(loc) != null;
public final boolean locIsOwned(GenomeLoc loc, String id) {
return findOwner(loc, id) != null;
}
public final ProcessingLoc findOwner(GenomeLoc loc) {
protected final ProcessingLoc findOwner(GenomeLoc loc, String id) {
// fast path to check if we already have the existing genome loc in memory for ownership claims
// getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it
// unless necessary
ProcessingLoc x = findOwnerInMap(loc, processingLocs);
return x == null ? findOwnerInMap(loc, updateLocs()) : x;
return x == null ? findOwnerInMap(loc, updateLocs(id)) : x;
}
/**
@ -110,19 +116,19 @@ public abstract class GenomeLocProcessingTracker {
public final ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
// processingLocs is a shared memory synchronized object, and this
// method is synchronized, so we can just do our processing
lock();
lock(myName);
try {
ProcessingLoc owner = findOwner(loc);
ProcessingLoc owner = findOwner(loc, myName);
if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName);
registerNewLocsWithTimers(Arrays.asList(owner));
registerNewLocsWithTimers(Arrays.asList(owner), myName);
}
return owner;
//logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner));
} finally {
unlock();
unlock(myName);
}
}
@ -154,7 +160,7 @@ public abstract class GenomeLocProcessingTracker {
private final int cacheSize;
public OwnershipIterator(Iterator<T> subit, String myName) {
this(subit, myName, 10);
this(subit, myName, DEFAULT_OWNERSHIP_ITERATOR_SIZE);
}
public OwnershipIterator(Iterator<T> subit, String myName, int cacheSize) {
@ -185,10 +191,11 @@ public abstract class GenomeLocProcessingTracker {
return elt;
else {
// cache is empty, we need to fill up the cache and return the first element of the queue
lock();
lock(myName);
try {
// read once the database of owners at the start
updateLocs();
updateLocs(myName);
boolean done = false;
Queue<ProcessingLoc> pwns = new LinkedList<ProcessingLoc>(); // ;-)
@ -208,7 +215,7 @@ public abstract class GenomeLocProcessingTracker {
// if not, we continue our search
}
registerNewLocsWithTimers(pwns);
registerNewLocsWithTimers(pwns, myName);
// we've either filled up the cache or run out of elements. Either way we return
// the first element of the cache. If the cache is empty, we return null here.
@ -217,7 +224,7 @@ public abstract class GenomeLocProcessingTracker {
return cache.poll();
} finally {
unlock();
unlock(myName);
}
}
}
@ -240,12 +247,12 @@ public abstract class GenomeLocProcessingTracker {
*
* @return
*/
protected final Collection<ProcessingLoc> getProcessingLocs() {
return updateLocs().values();
protected final Collection<ProcessingLoc> getProcessingLocs(String myName) {
return updateLocs(myName).values();
}
private final Map<GenomeLoc, ProcessingLoc> updateLocs() {
lock();
private final Map<GenomeLoc, ProcessingLoc> updateLocs(String myName) {
lock(myName);
try {
readTimer.restart();
for ( ProcessingLoc p : readNewLocs() )
@ -254,11 +261,11 @@ public abstract class GenomeLocProcessingTracker {
nReads++;
return processingLocs;
} finally {
unlock();
unlock(myName);
}
}
protected final void registerNewLocsWithTimers(Collection<ProcessingLoc> plocs) {
protected final void registerNewLocsWithTimers(Collection<ProcessingLoc> plocs, String myName) {
writeTimer.restart();
registerNewLocs(plocs);
nWrites++;
@ -270,17 +277,37 @@ public abstract class GenomeLocProcessingTracker {
// Low-level accessors / manipulators and utility functions
//
// --------------------------------------------------------------------------------
private final void lock() {
lockWaitTimer.restart();
if ( ! lock.isHeldByCurrentThread() )
nLocks++;
lock.lock();
lockWaitTimer.stop();
private final boolean hasStatus() {
return status != null;
}
private final void unlock() {
private final void printStatusHeader() {
if ( hasStatus() ) status.printf("process.id\thr.time\ttime\tstate%n");
}
private final void printStatus(String id, long machineTime, String state) {
// prints a line like processID human-readable-time machine-time state
if ( hasStatus() ) {
status.printf("%s\t%s\t%d\t%s%n", id, STATUS_FORMAT.format(machineTime), machineTime, state);
status.flush();
}
}
private final void lock(String id) {
lockWaitTimer.restart();
boolean hadLock = lock.ownsLock();
if ( ! hadLock ) {
nLocks++;
printStatus(id, lockWaitTimer.currentTime(), GOING_FOR_LOCK);
}
lock.lock();
lockWaitTimer.stop();
if ( ! hadLock ) printStatus(id, lockWaitTimer.currentTime(), HAVE_LOCK);
}
private final void unlock(String id) {
lock.unlock();
if ( ! lock.ownsLock() ) printStatus(id, lockWaitTimer.currentTime(), RUNNING);
}
protected final static ProcessingLoc findOwnerInCollection(GenomeLoc loc, Collection<ProcessingLoc> locs) {
@ -312,8 +339,8 @@ public abstract class GenomeLocProcessingTracker {
protected void close() {
lock.close();
logger.warn("Locking events: " + nLocks);
// by default we don't do anything
if ( hasStatus() ) status.close();
//logger.warn("Locking events: " + nLocks);
}
protected abstract void registerNewLocs(Collection<ProcessingLoc> plocs);

View File

@ -15,7 +15,7 @@ import java.util.List;
*/
public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
protected NoOpGenomeLocProcessingTracker() {
super(new ClosableReentrantLock()); // todo -- should be lighter weight
super(new ClosableReentrantLock(), null); // todo -- should be lighter weight
}
// @Override

View File

@ -4,7 +4,10 @@ import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.*;
/**
@ -20,28 +23,136 @@ public class SharedFileThreadSafeLock extends ClosableReentrantLock {
private static Logger logger = Logger.getLogger(SharedFileThreadSafeLock.class);
private static final boolean DEBUG = false;
// 100 seconds of trying -> failure
private static final int DEFAULT_N_TRIES = 1000;
private static final long DEFAULT_MILLISECONDS_PER_TRY = 100;
/** The file we are locking */
private final File file;
/** The file lock itself that guards the file */
FileLock fileLock;
/** the channel object that 'owns' the file lock, and we use to request the lock */
FileChannel channel;
int fileLockReentrantCounter = 0;
/**
* Create a SharedFileThreadSafeLock object locking the file associated with channel
* @param channel
* A counter that indicates the number of 'locks' on this file.
* If locks == 2, then two unlocks are required
* before any resources are freed.
*/
public SharedFileThreadSafeLock(FileChannel channel) {
int fileLockReentrantCounter = 0;
// type of locking
private final boolean blockOnLock;
private final int nRetries;
private final long milliSecPerTry;
/**
* Create a SharedFileThreadSafeLock object locking the file
* @param file
*/
public SharedFileThreadSafeLock(File file, boolean blockOnLock, int nRetries, long milliSecPerTry) {
super();
this.channel = channel;
this.file = file;
this.blockOnLock = blockOnLock;
this.nRetries = nRetries;
this.milliSecPerTry = milliSecPerTry;
}
public SharedFileThreadSafeLock(File file, boolean blockOnLock) {
this(file, blockOnLock, DEFAULT_N_TRIES, DEFAULT_MILLISECONDS_PER_TRY);
}
private FileChannel getChannel() {
if ( DEBUG ) logger.warn(" Get channel: " + Thread.currentThread().getName() + " channel = " + channel);
if ( channel == null ) {
try {
if ( DEBUG ) logger.warn(" opening channel: " + Thread.currentThread().getName());
this.channel = new RandomAccessFile(file, "rw").getChannel();
if ( DEBUG ) logger.warn(" opened channel: " + Thread.currentThread().getName());
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(file, e);
}
}
return this.channel;
}
private void closeChannel() {
try {
if ( channel != null ) {
channel.close();
channel = null;
}
}
catch (IOException e) {
throw new UserException("Count not close channel associated with file" + file, e);
}
}
public void close() {
closeChannel();
}
public boolean ownsLock() {
return super.isHeldByCurrentThread() && fileLockReentrantCounter > 0;
}
// ------------------------------------------------------------------------------------------
//
// workhorse routines -- acquiring file locks
//
// ------------------------------------------------------------------------------------------
private void acquireFileLock() {
try {
channel.close();
}
catch (IOException e) {
throw new UserException("Count not close channel " + channel, e);
// Precondition -- lock is always null while we don't have a lock
if ( fileLock != null )
throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!");
if ( blockOnLock ) {
//
// blocking code
//
fileLock = getChannel().lock();
} else {
//
// polling code
//
int i = 0;
for ( ; fileLock == null && i < nRetries; i++ ) {
fileLock = getChannel().tryLock();
if ( fileLock == null ) {
try {
//logger.warn("tryLock failed on try " + i + ", waiting " + milliSecPerTry + " millseconds for retry");
Thread.sleep(milliSecPerTry);
} catch ( InterruptedException e ) {
throw new UserException("SharedFileThreadSafeLock interrupted during wait for file lock", e);
}
}
}
if ( i > 1 ) logger.warn("tryLock required " + i + " tries before completing, waited " + i * milliSecPerTry + " millseconds");
if ( fileLock == null ) {
// filelock == null -> we never managed to acquire the lock!
throw new UserException("SharedFileThreadSafeLock failed to obtain the lock after " + nRetries + " attempts");
}
}
if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName());
} catch (ClosedChannelException e) {
throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + file, e);
} catch (FileLockInterruptionException e) {
throw new ReviewedStingException("File lock interrupted", e);
} catch (NonWritableChannelException e) {
throw new ReviewedStingException("File channel not writable", e);
} catch (OverlappingFileLockException e) {
// this only happens when multiple threads are running, and one is waiting
// for the lock above and we come here.
throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen.");
} catch (IOException e) {
throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e);
}
}
@ -61,27 +172,9 @@ public class SharedFileThreadSafeLock extends ClosableReentrantLock {
} else {
super.lock();
if ( DEBUG ) logger.warn(" Have thread-lock, going for filelock: " + Thread.currentThread().getName());
try {
// Precondition -- lock is always null while we don't have a lock
if ( fileLock != null )
throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!");
if ( fileLockReentrantCounter == 0 )
fileLock = channel.lock();
fileLockReentrantCounter++;
if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName());
} catch (ClosedChannelException e) {
throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + channel, e);
} catch (FileLockInterruptionException e) {
throw new ReviewedStingException("File lock interrupted", e);
} catch (NonWritableChannelException e) {
throw new ReviewedStingException("File channel not writable", e);
} catch (OverlappingFileLockException e) {
// this only happens when multiple threads are running, and one is waiting
// for the lock above and we come here.
throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen.");
} catch (IOException e) {
throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e);
}
if ( fileLockReentrantCounter == 0 )
acquireFileLock();
fileLockReentrantCounter++;
}
}
@ -97,13 +190,14 @@ public class SharedFileThreadSafeLock extends ClosableReentrantLock {
if ( DEBUG ) logger.warn(" going to release filelock: " + Thread.currentThread().getName());
fileLock.release();
closeChannel();
fileLock = null;
if ( DEBUG ) logger.warn(" released filelock: " + Thread.currentThread().getName());
} else {
if ( DEBUG ) logger.warn(" skipping filelock release, reenterring unlock via multiple threads " + Thread.currentThread().getName());
}
} catch ( IOException e ) {
throw new ReviewedStingException("Could not free lock on file " + channel, e);
throw new ReviewedStingException("Could not free lock on file " + file, e);
} finally {
if ( DEBUG ) logger.warn(" going to release threadlock: " + Thread.currentThread().getName());
super.unlock();

View File

@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.threading;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.GenomeLoc;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -15,7 +16,11 @@ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingT
private List<ProcessingLoc> newPLocs = new ArrayList<ProcessingLoc>();
protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock) {
super(lock);
super(lock, null);
}
protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock, PrintStream status) {
super(lock, status);
}
@Override

View File

@ -96,8 +96,6 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
List<TestTarget> params = new ArrayList<TestTarget>();
int counter = 0;
// for ( int nShard : Arrays.asList(10,100,1000) ) {
// for ( int shardSize : Arrays.asList(10) ) {
for ( int nShard : nShards ) {
for ( int shardSize : shardSizes ) {
// shared mem -- canonical implementation
@ -108,7 +106,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
final File file1 = new File(String.format("%s_ThreadSafeFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize));
params.add(new TestTarget("ThreadSafeFileBacked", nShard, shardSize) {
GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedThreaded(file1, genomeLocParser);
GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedThreaded(file1, genomeLocParser, null);
public GenomeLocProcessingTracker getTracker() { return tracker; }
public void init() {
if ( file1.exists() )
@ -116,15 +114,17 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
}
});
final File file2 = new File(String.format("%s_ThreadSafeFileLockingFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize));
params.add(new TestTarget("ThreadSafeFileLockingFileBacked", nShard, shardSize) {
GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedDistributed(file2, genomeLocParser);
public GenomeLocProcessingTracker getTracker() { return tracker; }
public void init() {
if ( file2.exists() )
file2.delete();
}
});
for ( final boolean blocking : Arrays.asList(true, false) ) {
final File file2 = new File(String.format("%s_ThreadSafeFileLockingFile_blocking%b_%d_%d", FILE_ROOT, blocking, counter++, nShard, shardSize));
params.add(new TestTarget("ThreadSafeFileLockingFileBackedBlocking" + blocking, nShard, shardSize) {
GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedDistributed(file2, genomeLocParser, blocking, null);
public GenomeLocProcessingTracker getTracker() { return tracker; }
public void init() {
if ( file2.exists() )
file2.delete();
}
});
}
}
}
@ -149,7 +149,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
GenomeLoc loc = genomeLocParser.createGenomeLoc(chr1, start, start +1);
ProcessingLoc ploc = tracker.claimOwnership(loc, NAME_ONE);
Assert.assertTrue(ploc.isOwnedBy(NAME_ONE));
Assert.assertEquals(tracker.getProcessingLocs().size(), 0);
Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), 0);
}
}
}
@ -164,8 +164,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
for ( GenomeLoc shard : shards ) {
counter++;
Assert.assertNull(tracker.findOwner(shard));
Assert.assertFalse(tracker.locIsOwned(shard));
Assert.assertNull(tracker.findOwner(shard, NAME_ONE));
Assert.assertFalse(tracker.locIsOwned(shard, NAME_ONE));
ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE);
Assert.assertNotNull(proc);
@ -173,10 +173,10 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
Assert.assertNotNull(proc.getOwner());
Assert.assertEquals(proc.getLocation(), shard);
Assert.assertEquals(proc.getOwner(), NAME_ONE);
Assert.assertEquals(tracker.findOwner(shard), proc);
Assert.assertTrue(tracker.locIsOwned(shard));
Assert.assertNotNull(tracker.getProcessingLocs());
Assert.assertEquals(tracker.getProcessingLocs().size(), counter);
Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc);
Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE));
Assert.assertNotNull(tracker.getProcessingLocs(NAME_ONE));
Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), counter);
ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO);
Assert.assertFalse(badClaimAttempt.getOwner().equals(NAME_TWO));
@ -211,7 +211,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
Assert.assertEquals(nFound, toFind.size(), "Didn't find all of the available shards");
} else {
nFound++;
ProcessingLoc proc = tracker.findOwner(shard);
ProcessingLoc proc = tracker.findOwner(shard, NAME_ONE);
Assert.assertTrue(proc.isOwnedBy(NAME_ONE));
Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!");
@ -246,7 +246,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
Assert.assertTrue(markedShards.contains(shard), "Unran process wasn't marked");
if ( ! markedShards.contains(shard) ) {
Assert.assertEquals(tracker.findOwner(shard), proc);
Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc);
}
}
}
@ -357,12 +357,12 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
assertAllThreadsFinished(results);
// we ran everything
Assert.assertEquals(tracker.getProcessingLocs().size(), shards.size(), "Not all shards were run");
Assert.assertEquals(tracker.getProcessingLocs(NAME_ONE).size(), shards.size(), "Not all shards were run");
for ( GenomeLoc shard : shards ) {
Assert.assertTrue(tracker.locIsOwned(shard), "Unowned shard");
Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE), "Unowned shard");
ProcessingLoc proc = tracker.findOwner(shard);
ProcessingLoc proc = tracker.findOwner(shard, NAME_ONE);
Assert.assertNotNull(proc, "Proc was null");
Assert.assertNotNull(proc.getOwner(), "Owner was null");

View File

@ -1,6 +1,6 @@
#!/bin/tcsh
setenv HERE "java tribble"
setenv HERE "java tribble scala analysis"
setenv THERE \~/dev/GenomeAnalysisTKFromLaptop/trunk
rsync -e ssh -aCvz $HERE depristo@gsa1:$THERE