diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 0199901a1..2ab248924 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -284,6 +284,8 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar Shard shard = traverseTasks.remove(); + // todo -- add ownership claim here + ShardTraverser traverser = new ShardTraverser(this, traversalEngine, walker, diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 6f4415bc5..2cc744893 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -49,6 +49,7 @@ public class LinearMicroScheduler extends MicroScheduler { walker.initialize(); Accumulator accumulator = Accumulator.create(engine,walker); + int counter = 0; for (Shard shard : processingTracker.onlyOwned(shardStrategy, engine.getName())) { if ( shard == null ) // we ran out of shards that aren't owned break; @@ -70,6 +71,12 @@ public class LinearMicroScheduler extends MicroScheduler { accumulator.accumulate(dataProvider,result); dataProvider.close(); } + + + counter++; + logger.debug(String.format("Processing shard %s, used %d locks for %d shards processed, %.2e sec / lock, %.2e sec / read, %.2f sec / write", + shard.getLocation(), processingTracker.getNLocks(), counter, + processingTracker.getTimePerLock(), processingTracker.getTimePerRead(), processingTracker.getTimePerWrite())); } Object result = accumulator.finishTraversal(); diff --git a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java index ba8e7429b..4360fe6ab 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java @@ -4,6 +4,7 @@ import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.HasGenomeLocation; +import org.broadinstitute.sting.utils.SimpleTimer; import org.broadinstitute.sting.utils.exceptions.UserException; import java.io.File; @@ -19,7 +20,11 @@ public abstract class GenomeLocProcessingTracker { private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); private Map processingLocs; private ClosableReentrantLock lock; - private long nLockingEvents = 0; + + SimpleTimer writeTimer = new SimpleTimer("writeTimer"); + SimpleTimer readTimer = new SimpleTimer("readTimer"); + SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer"); + private long nLocks = 0, nWrites = 0, nReads = 0; // -------------------------------------------------------------------------------- // @@ -27,19 +32,19 @@ public abstract class GenomeLocProcessingTracker { // // -------------------------------------------------------------------------------- - public static NoOpGenomeLocProcessingTracker createNoOp() { - return new NoOpGenomeLocProcessingTracker(); + public static GenomeLocProcessingTracker createNoOp() { + return createSharedMemory(); } - public static SharedMemoryGenomeLocProcessingTracker createSharedMemory() { + public static GenomeLocProcessingTracker createSharedMemory() { return new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock()); } - public static FileBackedGenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser) { + public static GenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser) { return createFileBacked(sharedFile, parser, false); } - public static FileBackedGenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser) { + public static GenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser) { return createFileBacked(sharedFile, parser, true); } @@ -101,7 +106,7 @@ public abstract class GenomeLocProcessingTracker { * @param myName * @return */ - public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { + public final ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { // processingLocs is a shared memory synchronized object, and this // method is synchronized, so we can just do our processing lock(); @@ -110,7 +115,10 @@ public abstract class GenomeLocProcessingTracker { if ( owner == null ) { // we are unowned owner = new ProcessingLoc(loc, myName); + writeTimer.restart(); registerNewLoc(owner); + writeTimer.stop(); + nWrites++; } return owner; @@ -132,16 +140,16 @@ public abstract class GenomeLocProcessingTracker { * @param myName * @return */ - public T claimOwnershipOfNextAvailable(Iterator iterator, String myName) { + public final T claimOwnershipOfNextAvailable(Iterator iterator, String myName) { OwnershipIterator myIt = new OwnershipIterator(iterator, myName); return myIt.next(); } - public Iterable onlyOwned(Iterator iterator, String myName) { + public final Iterable onlyOwned(Iterator iterator, String myName) { return new OwnershipIterator(iterator, myName); } - protected class OwnershipIterator implements Iterator, Iterable { + protected final class OwnershipIterator implements Iterator, Iterable { Iterator subit; String myName; @@ -211,8 +219,11 @@ public abstract class GenomeLocProcessingTracker { private Map updateLocs() { lock(); try { + readTimer.restart(); for ( ProcessingLoc p : readNewLocs() ) processingLocs.put(p.getLocation(), p); + readTimer.stop(); + nReads++; return processingLocs; } finally { unlock(); @@ -227,16 +238,18 @@ public abstract class GenomeLocProcessingTracker { // -------------------------------------------------------------------------------- private final void lock() { + lockWaitTimer.restart(); if ( ! lock.isHeldByCurrentThread() ) - nLockingEvents++; + nLocks++; lock.lock(); + lockWaitTimer.stop(); } private final void unlock() { lock.unlock(); } - protected static ProcessingLoc findOwnerInCollection(GenomeLoc loc, Collection locs) { + protected final static ProcessingLoc findOwnerInCollection(GenomeLoc loc, Collection locs) { for ( ProcessingLoc l : locs ) { if ( l.getLocation().equals(loc) ) return l; @@ -245,10 +258,15 @@ public abstract class GenomeLocProcessingTracker { return null; } - protected static ProcessingLoc findOwnerInMap(GenomeLoc loc, Map locs) { + protected final static ProcessingLoc findOwnerInMap(GenomeLoc loc, Map locs) { return locs.get(loc); } + // useful code for getting + public final long getNLocks() { return nLocks; } + public final double getTimePerLock() { return lockWaitTimer.getElapsedTime() / Math.max(nLocks, 1); } + public final double getTimePerRead() { return readTimer.getElapsedTime() / Math.max(nReads,1); } + public final double getTimePerWrite() { return writeTimer.getElapsedTime() / Math.max(nWrites,1); } // -------------------------------------------------------------------------------- // @@ -258,7 +276,7 @@ public abstract class GenomeLocProcessingTracker { protected void close() { lock.close(); - logger.warn("Locking events: " + nLockingEvents); + logger.warn("Locking events: " + nLocks); // by default we don't do anything } diff --git a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java deleted file mode 100644 index d5743a3c7..000000000 --- a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.broadinstitute.sting.utils.threading; - -import org.broadinstitute.sting.utils.GenomeLoc; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Base class, and null tracker. Always says that a GenomeLoc is ready for processing - */ -public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - protected NoOpGenomeLocProcessingTracker() { - super(new ClosableReentrantLock()); // todo -- should be lighter weight - } - - @Override - public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { - return new ProcessingLoc(loc, myName); - } - - @Override - protected List getProcessingLocs() { - return Collections.emptyList(); - } - - @Override - protected void registerNewLoc(ProcessingLoc loc) { - ; - } - - @Override - protected List readNewLocs() { - return Collections.emptyList(); - } -} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java index 20f36bdb9..b4f2e761f 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java @@ -6,7 +6,6 @@ import org.broadinstitute.sting.utils.exceptions.UserException; import java.io.IOException; import java.nio.channels.*; -import java.util.concurrent.locks.ReentrantLock; /** * User: depristo diff --git a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java index 9ee8a1fc1..355e3dd92 100644 --- a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java @@ -102,7 +102,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { for ( int shardSize : shardSizes ) { // shared mem -- canonical implementation params.add(new TestTarget("ThreadSafeSharedMemory", nShard, shardSize) { - SharedMemoryGenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createSharedMemory(); + GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createSharedMemory(); public GenomeLocProcessingTracker getTracker() { return tracker; } });