Improved distributed processing analytics. Still not 100% ready for prime-time. More improvements incoming. Iterator claim now supports requests to obtain in a single atomic claim (one lock) multiple sequential shards, which radically reduces overhead. However, deadlocking is still possible...
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5061 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
2d4bcb60a1
commit
a51061fd96
|
|
@ -74,9 +74,11 @@ public class LinearMicroScheduler extends MicroScheduler {
|
|||
|
||||
|
||||
counter++;
|
||||
logger.debug(String.format("Processing shard %s, used %d locks for %d shards processed, %.2e sec / lock, %.2e sec / read, %.2f sec / write",
|
||||
shard.getLocation(), processingTracker.getNLocks(), counter,
|
||||
processingTracker.getTimePerLock(), processingTracker.getTimePerRead(), processingTracker.getTimePerWrite()));
|
||||
logger.debug(String.format("At %s: processed %d shards. %.2e s / lock (n=%d), %.2e s / read (n=%d), %.2e s / write (n=%d)",
|
||||
shard.getLocation(), counter,
|
||||
processingTracker.getTimePerLock(), processingTracker.getNLocks(),
|
||||
processingTracker.getTimePerRead(), processingTracker.getNReads(),
|
||||
processingTracker.getTimePerWrite(), processingTracker.getNWrites()));
|
||||
}
|
||||
|
||||
Object result = accumulator.finishTraversal();
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
|
@ -75,13 +77,15 @@ public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTra
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void registerNewLoc(ProcessingLoc proc) {
|
||||
protected void registerNewLocs(Collection<ProcessingLoc> plocs) {
|
||||
try {
|
||||
String packet = String.format("%s %s%n", proc.getLocation(), proc.getOwner());
|
||||
long startPos = raFile.getFilePointer();
|
||||
raFile.seek(raFile.length());
|
||||
raFile.write(packet.getBytes());
|
||||
if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer()));
|
||||
for ( ProcessingLoc ploc : plocs ) {
|
||||
String packet = String.format("%s %s%n", ploc.getLocation(), ploc.getOwner());
|
||||
raFile.write(packet.getBytes());
|
||||
if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", ploc, startPos, packet.length(), raFile.getFilePointer()));
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
|
||||
} catch (IOException e) {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import org.broadinstitute.sting.utils.GenomeLoc;
|
|||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.utils.HasGenomeLocation;
|
||||
import org.broadinstitute.sting.utils.SimpleTimer;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
|
||||
import java.io.File;
|
||||
|
|
@ -21,10 +22,10 @@ public abstract class GenomeLocProcessingTracker {
|
|||
private Map<GenomeLoc, ProcessingLoc> processingLocs;
|
||||
private ClosableReentrantLock lock;
|
||||
|
||||
SimpleTimer writeTimer = new SimpleTimer("writeTimer");
|
||||
SimpleTimer readTimer = new SimpleTimer("readTimer");
|
||||
SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer");
|
||||
private long nLocks = 0, nWrites = 0, nReads = 0;
|
||||
protected SimpleTimer writeTimer = new SimpleTimer("writeTimer");
|
||||
protected SimpleTimer readTimer = new SimpleTimer("readTimer");
|
||||
protected SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer");
|
||||
protected long nLocks = 0, nWrites = 0, nReads = 0;
|
||||
|
||||
// --------------------------------------------------------------------------------
|
||||
//
|
||||
|
|
@ -33,7 +34,7 @@ public abstract class GenomeLocProcessingTracker {
|
|||
// --------------------------------------------------------------------------------
|
||||
|
||||
public static GenomeLocProcessingTracker createNoOp() {
|
||||
return createSharedMemory();
|
||||
return new NoOpGenomeLocProcessingTracker();
|
||||
}
|
||||
|
||||
public static GenomeLocProcessingTracker createSharedMemory() {
|
||||
|
|
@ -83,11 +84,11 @@ public abstract class GenomeLocProcessingTracker {
|
|||
* @param loc
|
||||
* @return
|
||||
*/
|
||||
public boolean locIsOwned(GenomeLoc loc) {
|
||||
public final boolean locIsOwned(GenomeLoc loc) {
|
||||
return findOwner(loc) != null;
|
||||
}
|
||||
|
||||
public ProcessingLoc findOwner(GenomeLoc loc) {
|
||||
public final ProcessingLoc findOwner(GenomeLoc loc) {
|
||||
// fast path to check if we already have the existing genome loc in memory for ownership claims
|
||||
// getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it
|
||||
// unless necessary
|
||||
|
|
@ -115,10 +116,7 @@ public abstract class GenomeLocProcessingTracker {
|
|||
|
||||
if ( owner == null ) { // we are unowned
|
||||
owner = new ProcessingLoc(loc, myName);
|
||||
writeTimer.restart();
|
||||
registerNewLoc(owner);
|
||||
writeTimer.stop();
|
||||
nWrites++;
|
||||
registerNewLocsWithTimers(Arrays.asList(owner));
|
||||
}
|
||||
|
||||
return owner;
|
||||
|
|
@ -141,7 +139,7 @@ public abstract class GenomeLocProcessingTracker {
|
|||
* @return
|
||||
*/
|
||||
public final <T extends HasGenomeLocation> T claimOwnershipOfNextAvailable(Iterator<T> iterator, String myName) {
|
||||
OwnershipIterator<T> myIt = new OwnershipIterator<T>(iterator, myName);
|
||||
OwnershipIterator<T> myIt = new OwnershipIterator<T>(iterator, myName, 1);
|
||||
return myIt.next();
|
||||
}
|
||||
|
||||
|
|
@ -150,12 +148,20 @@ public abstract class GenomeLocProcessingTracker {
|
|||
}
|
||||
|
||||
protected final class OwnershipIterator<T extends HasGenomeLocation> implements Iterator<T>, Iterable<T> {
|
||||
Iterator<T> subit;
|
||||
String myName;
|
||||
private final Iterator<T> subit;
|
||||
private final String myName;
|
||||
private final Queue<T> cache;
|
||||
private final int cacheSize;
|
||||
|
||||
public OwnershipIterator(Iterator<T> subit, String myName) {
|
||||
this(subit, myName, 10);
|
||||
}
|
||||
|
||||
public OwnershipIterator(Iterator<T> subit, String myName, int cacheSize) {
|
||||
this.subit = subit;
|
||||
this.myName = myName;
|
||||
cache = new LinkedList<T>();
|
||||
this.cacheSize = cacheSize;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -163,8 +169,8 @@ public abstract class GenomeLocProcessingTracker {
|
|||
* elements and so will return null there
|
||||
* @return
|
||||
*/
|
||||
public boolean hasNext() {
|
||||
return subit.hasNext();
|
||||
public final boolean hasNext() {
|
||||
return cache.peek() != null || subit.hasNext();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -173,32 +179,54 @@ public abstract class GenomeLocProcessingTracker {
|
|||
*
|
||||
* @return an object of type T owned by this thread, or null if none of the remaining object could be claimed
|
||||
*/
|
||||
public T next() {
|
||||
lock();
|
||||
try {
|
||||
while ( subit.hasNext() ) {
|
||||
T elt = subit.next();
|
||||
//logger.warn("Checking elt for ownership " + elt);
|
||||
GenomeLoc loc = elt.getLocation();
|
||||
ProcessingLoc proc = claimOwnership(loc, myName);
|
||||
public final T next() {
|
||||
T elt = cache.poll();
|
||||
if ( elt != null)
|
||||
return elt;
|
||||
else {
|
||||
// cache is empty, we need to fill up the cache and return the first element of the queue
|
||||
lock();
|
||||
try {
|
||||
// read once the database of owners at the start
|
||||
updateLocs();
|
||||
|
||||
if ( proc.isOwnedBy(myName) )
|
||||
return elt;
|
||||
// if not, we continue our search
|
||||
boolean done = false;
|
||||
Queue<ProcessingLoc> pwns = new LinkedList<ProcessingLoc>(); // ;-)
|
||||
while ( !done && cache.size() < cacheSize && subit.hasNext() ) {
|
||||
elt = subit.next();
|
||||
//logger.warn("Checking elt for ownership " + elt);
|
||||
GenomeLoc loc = elt.getLocation();
|
||||
|
||||
ProcessingLoc owner = findOwnerInMap(loc, processingLocs);
|
||||
|
||||
if ( owner == null ) { // we are unowned
|
||||
owner = new ProcessingLoc(loc, myName);
|
||||
pwns.offer(owner);
|
||||
if ( ! cache.offer(elt) ) throw new ReviewedStingException("Cache offer unexpectedly failed");
|
||||
if ( GenomeLoc.isUnmapped(loc) ) done = true;
|
||||
}
|
||||
// if not, we continue our search
|
||||
}
|
||||
|
||||
registerNewLocsWithTimers(pwns);
|
||||
|
||||
// we've either filled up the cache or run out of elements. Either way we return
|
||||
// the first element of the cache. If the cache is empty, we return null here.
|
||||
//logger.warn("Cache size is " + cache.size());
|
||||
//logger.warn("Cache contains " + cache);
|
||||
|
||||
return cache.poll();
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
|
||||
// we never found an object, just return it.
|
||||
return null;
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
public final void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public Iterator<T> iterator() {
|
||||
public final Iterator<T> iterator() {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
@ -212,11 +240,11 @@ public abstract class GenomeLocProcessingTracker {
|
|||
*
|
||||
* @return
|
||||
*/
|
||||
protected Collection<ProcessingLoc> getProcessingLocs() {
|
||||
protected final Collection<ProcessingLoc> getProcessingLocs() {
|
||||
return updateLocs().values();
|
||||
}
|
||||
|
||||
private Map<GenomeLoc, ProcessingLoc> updateLocs() {
|
||||
private final Map<GenomeLoc, ProcessingLoc> updateLocs() {
|
||||
lock();
|
||||
try {
|
||||
readTimer.restart();
|
||||
|
|
@ -230,6 +258,12 @@ public abstract class GenomeLocProcessingTracker {
|
|||
}
|
||||
}
|
||||
|
||||
protected final void registerNewLocsWithTimers(Collection<ProcessingLoc> plocs) {
|
||||
writeTimer.restart();
|
||||
registerNewLocs(plocs);
|
||||
nWrites++;
|
||||
writeTimer.stop();
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------------------
|
||||
//
|
||||
|
|
@ -264,6 +298,8 @@ public abstract class GenomeLocProcessingTracker {
|
|||
|
||||
// useful code for getting
|
||||
public final long getNLocks() { return nLocks; }
|
||||
public final long getNReads() { return nReads; }
|
||||
public final long getNWrites() { return nWrites; }
|
||||
public final double getTimePerLock() { return lockWaitTimer.getElapsedTime() / Math.max(nLocks, 1); }
|
||||
public final double getTimePerRead() { return readTimer.getElapsedTime() / Math.max(nReads,1); }
|
||||
public final double getTimePerWrite() { return writeTimer.getElapsedTime() / Math.max(nWrites,1); }
|
||||
|
|
@ -280,6 +316,6 @@ public abstract class GenomeLocProcessingTracker {
|
|||
// by default we don't do anything
|
||||
}
|
||||
|
||||
protected abstract void registerNewLoc(ProcessingLoc loc);
|
||||
protected abstract void registerNewLocs(Collection<ProcessingLoc> plocs);
|
||||
protected abstract Collection<ProcessingLoc> readNewLocs();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
package org.broadinstitute.sting.utils.threading;
|
||||
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Base class, and null tracker. Always says that a GenomeLoc is ready for processing. It is
|
||||
* critical that this class already return that a loc is owned, no matter if it's been seen before,
|
||||
* etc. ReadShards can differ in their contents but have the same "unmapped" genome loc
|
||||
*/
|
||||
public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
|
||||
protected NoOpGenomeLocProcessingTracker() {
|
||||
super(new ClosableReentrantLock()); // todo -- should be lighter weight
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
|
||||
// return new ProcessingLoc(loc, myName);
|
||||
// }
|
||||
|
||||
// @Override
|
||||
// protected List<ProcessingLoc> getProcessingLocs() {
|
||||
// return Collections.emptyList();
|
||||
// }
|
||||
|
||||
@Override
|
||||
protected void registerNewLocs(Collection<ProcessingLoc> loc) {
|
||||
;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<ProcessingLoc> readNewLocs() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
|
@ -4,6 +4,7 @@ import org.apache.log4j.Logger;
|
|||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
|
@ -18,8 +19,8 @@ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingT
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void registerNewLoc(ProcessingLoc loc) {
|
||||
newPLocs.add(loc);
|
||||
protected void registerNewLocs(Collection<ProcessingLoc> plocs) {
|
||||
newPLocs.addAll(plocs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ import java.util.Collections;
|
|||
* Test the GATK core interval parsing mechanism.
|
||||
*/
|
||||
public class IntervalIntegrationTest extends WalkerTest {
|
||||
@Test
|
||||
@Test(enabled = true)
|
||||
public void testAllImplicitIntervalParsing() {
|
||||
String md5 = "7821db9e14d4f8e07029ff1959cd5a99";
|
||||
WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec(
|
||||
|
|
@ -47,7 +47,7 @@ public class IntervalIntegrationTest extends WalkerTest {
|
|||
executeTest("testAllIntervalsImplicit",spec);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(enabled = true)
|
||||
public void testAllExplicitIntervalParsing() {
|
||||
String md5 = "7821db9e14d4f8e07029ff1959cd5a99";
|
||||
WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec(
|
||||
|
|
@ -76,7 +76,7 @@ public class IntervalIntegrationTest extends WalkerTest {
|
|||
executeTest("testUnmappedReadInclusion",spec);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(enabled = true)
|
||||
public void testUnmappedReadExclusion() {
|
||||
String md5 = "3153593c9f9ff80a8551fff5655e65ec";
|
||||
WalkerTest.WalkerTestSpec spec = new WalkerTest.WalkerTestSpec(
|
||||
|
|
|
|||
Loading…
Reference in New Issue