From a51061fd96480cea53425d337c19fdf120ed141e Mon Sep 17 00:00:00 2001 From: depristo Date: Sun, 23 Jan 2011 16:17:25 +0000 Subject: [PATCH] Improved distributed processing analytics. Still not 100% ready for prime-time. More improvements incoming. Iterator claim now supports requests to obtain in a single atomic claim (one lock) multiple sequential shards, which radically reduces overhead. However, deadlocking is still possible... git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5061 348d0f76-0448-11de-a6fe-93d51630548a --- .../gatk/executive/LinearMicroScheduler.java | 8 +- .../FileBackedGenomeLocProcessingTracker.java | 12 +- .../threading/GenomeLocProcessingTracker.java | 110 ++++++++++++------ .../NoOpGenomeLocProcessingTracker.java | 40 +++++++ ...haredMemoryGenomeLocProcessingTracker.java | 5 +- .../interval/IntervalIntegrationTest.java | 6 +- 6 files changed, 132 insertions(+), 49 deletions(-) create mode 100644 java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 2cc744893..b81cc7c7e 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -74,9 +74,11 @@ public class LinearMicroScheduler extends MicroScheduler { counter++; - logger.debug(String.format("Processing shard %s, used %d locks for %d shards processed, %.2e sec / lock, %.2e sec / read, %.2f sec / write", - shard.getLocation(), processingTracker.getNLocks(), counter, - processingTracker.getTimePerLock(), processingTracker.getTimePerRead(), processingTracker.getTimePerWrite())); + logger.debug(String.format("At %s: processed %d shards. %.2e s / lock (n=%d), %.2e s / read (n=%d), %.2e s / write (n=%d)", + shard.getLocation(), counter, + processingTracker.getTimePerLock(), processingTracker.getNLocks(), + processingTracker.getTimePerRead(), processingTracker.getNReads(), + processingTracker.getTimePerWrite(), processingTracker.getNWrites())); } Object result = accumulator.finishTraversal(); diff --git a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java index 70d2d5d5e..3c77eaece 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java @@ -11,6 +11,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.locks.ReentrantLock; @@ -75,13 +77,15 @@ public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTra } @Override - protected void registerNewLoc(ProcessingLoc proc) { + protected void registerNewLocs(Collection plocs) { try { - String packet = String.format("%s %s%n", proc.getLocation(), proc.getOwner()); long startPos = raFile.getFilePointer(); raFile.seek(raFile.length()); - raFile.write(packet.getBytes()); - if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer())); + for ( ProcessingLoc ploc : plocs ) { + String packet = String.format("%s %s%n", ploc.getLocation(), ploc.getOwner()); + raFile.write(packet.getBytes()); + if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", ploc, startPos, packet.length(), raFile.getFilePointer())); + } } catch (FileNotFoundException e) { throw new UserException.CouldNotCreateOutputFile(sharedFile, e); } catch (IOException e) { diff --git a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java index 4360fe6ab..759ceeee3 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java @@ -5,6 +5,7 @@ import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.HasGenomeLocation; import org.broadinstitute.sting.utils.SimpleTimer; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; import java.io.File; @@ -21,10 +22,10 @@ public abstract class GenomeLocProcessingTracker { private Map processingLocs; private ClosableReentrantLock lock; - SimpleTimer writeTimer = new SimpleTimer("writeTimer"); - SimpleTimer readTimer = new SimpleTimer("readTimer"); - SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer"); - private long nLocks = 0, nWrites = 0, nReads = 0; + protected SimpleTimer writeTimer = new SimpleTimer("writeTimer"); + protected SimpleTimer readTimer = new SimpleTimer("readTimer"); + protected SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer"); + protected long nLocks = 0, nWrites = 0, nReads = 0; // -------------------------------------------------------------------------------- // @@ -33,7 +34,7 @@ public abstract class GenomeLocProcessingTracker { // -------------------------------------------------------------------------------- public static GenomeLocProcessingTracker createNoOp() { - return createSharedMemory(); + return new NoOpGenomeLocProcessingTracker(); } public static GenomeLocProcessingTracker createSharedMemory() { @@ -83,11 +84,11 @@ public abstract class GenomeLocProcessingTracker { * @param loc * @return */ - public boolean locIsOwned(GenomeLoc loc) { + public final boolean locIsOwned(GenomeLoc loc) { return findOwner(loc) != null; } - public ProcessingLoc findOwner(GenomeLoc loc) { + public final ProcessingLoc findOwner(GenomeLoc loc) { // fast path to check if we already have the existing genome loc in memory for ownership claims // getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it // unless necessary @@ -115,10 +116,7 @@ public abstract class GenomeLocProcessingTracker { if ( owner == null ) { // we are unowned owner = new ProcessingLoc(loc, myName); - writeTimer.restart(); - registerNewLoc(owner); - writeTimer.stop(); - nWrites++; + registerNewLocsWithTimers(Arrays.asList(owner)); } return owner; @@ -141,7 +139,7 @@ public abstract class GenomeLocProcessingTracker { * @return */ public final T claimOwnershipOfNextAvailable(Iterator iterator, String myName) { - OwnershipIterator myIt = new OwnershipIterator(iterator, myName); + OwnershipIterator myIt = new OwnershipIterator(iterator, myName, 1); return myIt.next(); } @@ -150,12 +148,20 @@ public abstract class GenomeLocProcessingTracker { } protected final class OwnershipIterator implements Iterator, Iterable { - Iterator subit; - String myName; + private final Iterator subit; + private final String myName; + private final Queue cache; + private final int cacheSize; public OwnershipIterator(Iterator subit, String myName) { + this(subit, myName, 10); + } + + public OwnershipIterator(Iterator subit, String myName, int cacheSize) { this.subit = subit; this.myName = myName; + cache = new LinkedList(); + this.cacheSize = cacheSize; } /** @@ -163,8 +169,8 @@ public abstract class GenomeLocProcessingTracker { * elements and so will return null there * @return */ - public boolean hasNext() { - return subit.hasNext(); + public final boolean hasNext() { + return cache.peek() != null || subit.hasNext(); } /** @@ -173,32 +179,54 @@ public abstract class GenomeLocProcessingTracker { * * @return an object of type T owned by this thread, or null if none of the remaining object could be claimed */ - public T next() { - lock(); - try { - while ( subit.hasNext() ) { - T elt = subit.next(); - //logger.warn("Checking elt for ownership " + elt); - GenomeLoc loc = elt.getLocation(); - ProcessingLoc proc = claimOwnership(loc, myName); + public final T next() { + T elt = cache.poll(); + if ( elt != null) + return elt; + else { + // cache is empty, we need to fill up the cache and return the first element of the queue + lock(); + try { + // read once the database of owners at the start + updateLocs(); - if ( proc.isOwnedBy(myName) ) - return elt; - // if not, we continue our search + boolean done = false; + Queue pwns = new LinkedList(); // ;-) + while ( !done && cache.size() < cacheSize && subit.hasNext() ) { + elt = subit.next(); + //logger.warn("Checking elt for ownership " + elt); + GenomeLoc loc = elt.getLocation(); + + ProcessingLoc owner = findOwnerInMap(loc, processingLocs); + + if ( owner == null ) { // we are unowned + owner = new ProcessingLoc(loc, myName); + pwns.offer(owner); + if ( ! cache.offer(elt) ) throw new ReviewedStingException("Cache offer unexpectedly failed"); + if ( GenomeLoc.isUnmapped(loc) ) done = true; + } + // if not, we continue our search + } + + registerNewLocsWithTimers(pwns); + + // we've either filled up the cache or run out of elements. Either way we return + // the first element of the cache. If the cache is empty, we return null here. + //logger.warn("Cache size is " + cache.size()); + //logger.warn("Cache contains " + cache); + + return cache.poll(); + } finally { + unlock(); } - - // we never found an object, just return it. - return null; - } finally { - unlock(); } } - public void remove() { + public final void remove() { throw new UnsupportedOperationException(); } - public Iterator iterator() { + public final Iterator iterator() { return this; } } @@ -212,11 +240,11 @@ public abstract class GenomeLocProcessingTracker { * * @return */ - protected Collection getProcessingLocs() { + protected final Collection getProcessingLocs() { return updateLocs().values(); } - private Map updateLocs() { + private final Map updateLocs() { lock(); try { readTimer.restart(); @@ -230,6 +258,12 @@ public abstract class GenomeLocProcessingTracker { } } + protected final void registerNewLocsWithTimers(Collection plocs) { + writeTimer.restart(); + registerNewLocs(plocs); + nWrites++; + writeTimer.stop(); + } // -------------------------------------------------------------------------------- // @@ -264,6 +298,8 @@ public abstract class GenomeLocProcessingTracker { // useful code for getting public final long getNLocks() { return nLocks; } + public final long getNReads() { return nReads; } + public final long getNWrites() { return nWrites; } public final double getTimePerLock() { return lockWaitTimer.getElapsedTime() / Math.max(nLocks, 1); } public final double getTimePerRead() { return readTimer.getElapsedTime() / Math.max(nReads,1); } public final double getTimePerWrite() { return writeTimer.getElapsedTime() / Math.max(nWrites,1); } @@ -280,6 +316,6 @@ public abstract class GenomeLocProcessingTracker { // by default we don't do anything } - protected abstract void registerNewLoc(ProcessingLoc loc); + protected abstract void registerNewLocs(Collection plocs); protected abstract Collection readNewLocs(); } diff --git a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java new file mode 100644 index 000000000..eb5759e95 --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java @@ -0,0 +1,40 @@ +package org.broadinstitute.sting.utils.threading; + +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Base class, and null tracker. Always says that a GenomeLoc is ready for processing. It is + * critical that this class already return that a loc is owned, no matter if it's been seen before, + * etc. ReadShards can differ in their contents but have the same "unmapped" genome loc + */ +public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker { + protected NoOpGenomeLocProcessingTracker() { + super(new ClosableReentrantLock()); // todo -- should be lighter weight + } + +// @Override +// public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { +// return new ProcessingLoc(loc, myName); +// } + +// @Override +// protected List getProcessingLocs() { +// return Collections.emptyList(); +// } + + @Override + protected void registerNewLocs(Collection loc) { + ; + } + + @Override + protected List readNewLocs() { + return Collections.emptyList(); + } +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java index 46b13d163..5a67eb7b0 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java @@ -4,6 +4,7 @@ import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.GenomeLoc; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.locks.ReentrantLock; @@ -18,8 +19,8 @@ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingT } @Override - protected void registerNewLoc(ProcessingLoc loc) { - newPLocs.add(loc); + protected void registerNewLocs(Collection plocs) { + newPLocs.addAll(plocs); } @Override diff --git a/java/test/org/broadinstitute/sting/utils/interval/IntervalIntegrationTest.java b/java/test/org/broadinstitute/sting/utils/interval/IntervalIntegrationTest.java index 98df83254..638710ad4 100644 --- a/java/test/org/broadinstitute/sting/utils/interval/IntervalIntegrationTest.java +++ b/java/test/org/broadinstitute/sting/utils/interval/IntervalIntegrationTest.java @@ -34,7 +34,7 @@ import java.util.Collections; * Test the GATK core interval parsing mechanism. */ public class IntervalIntegrationTest extends WalkerTest { - @Test + @Test(enabled = true) public void testAllImplicitIntervalParsing() { String md5 = "7821db9e14d4f8e07029ff1959cd5a99"; WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec( @@ -47,7 +47,7 @@ public class IntervalIntegrationTest extends WalkerTest { executeTest("testAllIntervalsImplicit",spec); } - @Test + @Test(enabled = true) public void testAllExplicitIntervalParsing() { String md5 = "7821db9e14d4f8e07029ff1959cd5a99"; WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec( @@ -76,7 +76,7 @@ public class IntervalIntegrationTest extends WalkerTest { executeTest("testUnmappedReadInclusion",spec); } - @Test + @Test(enabled = true) public void testUnmappedReadExclusion() { String md5 = "3153593c9f9ff80a8551fff5655e65ec"; WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec(