diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index e3a0b0b6d..fee6dfd7e 100755 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -410,7 +410,7 @@ public class GenomeAnalysisEngine { region.add(getGenomeLocParser().createGenomeLoc(sequenceRecord.getSequenceName(),1,sequenceRecord.getSequenceLength())); } - return new MonolithicShardStrategy(readsDataSource,shardType,region); + return new MonolithicShardStrategy(getGenomeLocParser(), readsDataSource,shardType,region); } ShardStrategy shardStrategy = null; diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 13a721eb6..6f4415bc5 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -49,26 +49,26 @@ public class LinearMicroScheduler extends MicroScheduler { walker.initialize(); Accumulator accumulator = Accumulator.create(engine,walker); - for (Shard shard : shardStrategy) { - if ( claimShard(shard) ) { - // New experimental code for managing locus intervals. - if(shard.getShardType() == Shard.ShardType.LOCUS) { - LocusWalker lWalker = (LocusWalker)walker; - WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata()); - for(WindowMaker.WindowMakerIterator iterator: windowMaker) { - ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); - Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); - accumulator.accumulate(dataProvider,result); - dataProvider.close(); - } - windowMaker.close(); - } - else { - ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); + for (Shard shard : processingTracker.onlyOwned(shardStrategy, engine.getName())) { + if ( shard == null ) // we ran out of shards that aren't owned + break; + + if(shard.getShardType() == Shard.ShardType.LOCUS) { + LocusWalker lWalker = (LocusWalker)walker; + WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata()); + for(WindowMaker.WindowMakerIterator iterator: windowMaker) { + ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); accumulator.accumulate(dataProvider,result); dataProvider.close(); } + windowMaker.close(); + } + else { + ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); + Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); + accumulator.accumulate(dataProvider,result); + dataProvider.close(); } } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 93d11eca4..69d74a364 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -26,9 +26,6 @@ package org.broadinstitute.sting.gatk.executive; import org.apache.log4j.Logger; -import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; -import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; -import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.shards.Shard; import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource; @@ -41,7 +38,6 @@ import org.broadinstitute.sting.gatk.iterators.NullSAMIterator; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.ReadMetrics; -import java.io.File; import java.lang.management.ManagementFactory; import java.util.*; @@ -50,8 +46,7 @@ import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.threading.GenomeLocProcessingTracker; -import org.broadinstitute.sting.utils.threading.NoOpGenomeLocProcessingTracker; -import org.broadinstitute.sting.utils.threading.SharedFileGenomeLocProcessingTracker; +import org.broadinstitute.sting.utils.threading.ProcessingLoc; import javax.management.JMException; import javax.management.MBeanServer; @@ -63,7 +58,8 @@ import javax.management.ObjectName; * User: mhanna * Date: Apr 26, 2009 * Time: 12:37:23 PM - * To change this template use File | Settings | File Templates. + * + * General base class for all scheduling algorithms */ /** Shards and schedules data in manageable chunks. */ @@ -89,7 +85,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { private final MBeanServer mBeanServer; private final ObjectName mBeanName; - private GenomeLocProcessingTracker processingTracker; + protected GenomeLocProcessingTracker processingTracker; /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the @@ -104,6 +100,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * @return The best-fit microscheduler. */ public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, int nThreadsToUse) { + if (engine.getArguments().processingTrackerFile != null) { + if ( walker instanceof ReadWalker ) + throw new UserException.BadArgumentValue("C", String.format("Distributed GATK processing not enabled for read walkers")); + } + if (walker instanceof TreeReducible && nThreadsToUse > 1) { if(walker.isReduceByInterval()) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); @@ -166,10 +167,10 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { logger.info("Deleting ProcessingTracker file " + engine.getArguments().processingTrackerFile); } - processingTracker = new SharedFileGenomeLocProcessingTracker(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser()); + processingTracker = GenomeLocProcessingTracker.createFileBackedDistributed(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser()); logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile); } else { - processingTracker = new NoOpGenomeLocProcessingTracker(); + processingTracker = GenomeLocProcessingTracker.createNoOp(); } } @@ -183,79 +184,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public abstract Object execute(Walker walker, ShardStrategy shardStrategy); - protected boolean claimShard(Shard shard) { - if ( shard.getGenomeLocs() == null ) { - if ( engine.getArguments().processingTrackerFile != null ) - throw new UserException.BadArgumentValue("processingTrackerFile", "Cannot use processing tracking with unindexed data"); - return true; - } else { - GenomeLoc shardSpan = shardSpan(shard); - - GenomeLocProcessingTracker.ProcessingLoc proc = processingTracker.claimOwnership(shardSpan, engine.getName()); - boolean actuallyProcess = proc.isOwnedBy(engine.getName()); - //logger.debug(String.format("Shard %s claimed by %s => owned by me %b", shard, proc.getOwner(), actuallyProcess)); - - if ( ! actuallyProcess ) - logger.info(String.format("DISTRIBUTED GATK: Shard %s already processed by %s", shard, proc.getOwner())); - - return actuallyProcess; - } - } - - private GenomeLoc shardSpan(Shard shard) { - if ( shard == null ) throw new ReviewedStingException("Shard is null!"); - int start = Integer.MAX_VALUE; - int stop = Integer.MIN_VALUE; - String contig = null; - - for ( GenomeLoc loc : shard.getGenomeLocs() ) { - if ( GenomeLoc.isUnmapped(loc) ) - // special case the unmapped region marker, just abort out - return loc; - contig = loc.getContig(); - if ( loc.getStart() < start ) start = loc.getStart(); - if ( loc.getStop() > stop ) stop = loc.getStop(); - } - return engine.getGenomeLocParser().createGenomeLoc(contig, start, stop); - } - - // todo -- the execution code in the schedulers is duplicated and slightly different -- should be merged -// protected boolean executeShard(Walker walker, Shard shard, Accumulator accumulator) { -// if(shard.getShardType() == Shard.ShardType.LOCUS) { -// LocusWalker lWalker = (LocusWalker)walker; -// -// WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata()); -// -// // ShardTraverser -// -// ShardDataProvider dataProvider = null; -// for(WindowMaker.WindowMakerIterator iterator: windowMaker) { -// dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); -// Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); -// accumulator.accumulate(dataProvider,result); -// dataProvider.close(); -// } -// if (dataProvider != null) dataProvider.close(); -// windowMaker.close(); -// } -// else { -// ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); -// Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); -// accumulator.accumulate(dataProvider,result); -// dataProvider.close(); -// } -// -// -// // ShardTraverser -// -// for(WindowMaker.WindowMakerIterator iterator: windowMaker) { -// accumulator = traversalEngine.traverse( walker, dataProvider, accumulator ); -// dataProvider.close(); -// } -// -// return true; -// } - /** * Retrieves the object responsible for tracking and managing output. * @return An output tracker, for loading data in and extracting results. Will not be null. diff --git a/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java b/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java new file mode 100644 index 000000000..b305ffff7 --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java @@ -0,0 +1,15 @@ +package org.broadinstitute.sting.utils.threading; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * Created by IntelliJ IDEA. + * User: depristo + * Date: 1/19/11 + * Time: 9:50 AM + * + * Simple extension of a ReentrantLock that supports a close method + */ +public class ClosableReentrantLock extends ReentrantLock { + public void close() {} +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java new file mode 100644 index 000000000..70d2d5d5e --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java @@ -0,0 +1,93 @@ +package org.broadinstitute.sting.utils.threading; + +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.GenomeLocParser; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Keeps a copy of the processing locks in a file, in addition to tracking in memory via the base class + */ +public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTracker { + private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); + private static final boolean DEBUG = false; + private File sharedFile = null; + private GenomeLocParser parser; + private RandomAccessFile raFile; + private long lastReadPosition = 0; + + protected FileBackedGenomeLocProcessingTracker(File sharedFile, RandomAccessFile raFile, GenomeLocParser parser, ClosableReentrantLock lock) { + super(lock); + + this.sharedFile = sharedFile; + this.raFile = raFile; + this.parser = parser; + } + + protected void close() { + super.close(); + try { + raFile.close(); + } catch (IOException e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } + } + + @Override + protected List readNewLocs() { + List newPLocs = new ArrayList(); // todo -- gratitous object creation + try { + //logger.warn(String.format("Reading new locs at: file.length=%d last=%d", raFile.length(), lastReadPosition)); + if ( raFile.length() > lastReadPosition ) { + raFile.seek(lastReadPosition); + + int counter = 0; + String line = raFile.readLine(); // Read another line + while ( line != null ) { + String[] parts = line.split(" "); + if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer()); + ProcessingLoc ploc = new ProcessingLoc(parser.parseGenomeLoc(parts[0]), parts[1]); + //logger.warn(" Read " + ploc); + newPLocs.add(ploc); + line = raFile.readLine(); + counter++; + } + lastReadPosition = raFile.getFilePointer(); + if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, # read new locs is %d", + counter, lastReadPosition, newPLocs.size())); + } + } catch (FileNotFoundException e) { + throw new UserException.CouldNotReadInputFile(sharedFile, e); + } catch (IOException e) { + throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); + } + + return newPLocs; + } + + @Override + protected void registerNewLoc(ProcessingLoc proc) { + try { + String packet = String.format("%s %s%n", proc.getLocation(), proc.getOwner()); + long startPos = raFile.getFilePointer(); + raFile.seek(raFile.length()); + raFile.write(packet.getBytes()); + if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer())); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } catch (IOException e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } + } +} + + diff --git a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java index cc805a8b8..ba8e7429b 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java @@ -1,62 +1,68 @@ package org.broadinstitute.sting.utils.threading; +import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.HasGenomeLocation; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.UserException; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.util.*; +import java.util.concurrent.locks.ReentrantLock; /** * */ public abstract class GenomeLocProcessingTracker { - /** - * Information about processing locations and their owners - */ - public static final class ProcessingLoc implements Comparable { - private final GenomeLoc loc; - private final String owner; + private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); + private Map processingLocs; + private ClosableReentrantLock lock; + private long nLockingEvents = 0; - /** - * Create a loc that's already owned - * @param loc - * @param owner - */ - public ProcessingLoc(GenomeLoc loc, String owner) { - if ( loc == null || owner == null ) { - throw new ReviewedStingException("BUG: invalid ProcessingLoc detected: " + loc + " owner " + owner); - } + // -------------------------------------------------------------------------------- + // + // Factory methods for creating ProcessingTrackers + // + // -------------------------------------------------------------------------------- - this.loc = loc; - this.owner = owner; + public static NoOpGenomeLocProcessingTracker createNoOp() { + return new NoOpGenomeLocProcessingTracker(); + } + + public static SharedMemoryGenomeLocProcessingTracker createSharedMemory() { + return new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock()); + } + + public static FileBackedGenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser) { + return createFileBacked(sharedFile, parser, false); + } + + public static FileBackedGenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser) { + return createFileBacked(sharedFile, parser, true); + } + + private static FileBackedGenomeLocProcessingTracker createFileBacked(File sharedFile, GenomeLocParser parser, boolean useFileLockToo) { + try { + //logger.warn("Creating file backed GLPT at " + sharedFile); + RandomAccessFile raFile = new RandomAccessFile(sharedFile, "rws"); + ClosableReentrantLock lock = useFileLockToo ? new SharedFileThreadSafeLock(raFile.getChannel()) : new ClosableReentrantLock(); + return new FileBackedGenomeLocProcessingTracker(sharedFile, raFile, parser, lock); } - - public GenomeLoc getLoc() { - return loc; + catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); } + } - public String getOwner() { - return owner; - } - - public boolean isOwnedBy(String name) { - return getOwner().equals(name); - } - - public String toString() { return String.format("ProcessingLoc(%s,%s)", loc, owner); } - - public boolean equals(Object other) { - if (other instanceof ProcessingLoc ) - return this.loc.equals(((ProcessingLoc)other).loc) && this.owner.equals(((ProcessingLoc)other).owner); - else - return false; - } - - public int compareTo(ProcessingLoc other) { - return this.getLoc().compareTo(other.getLoc()); - } + // -------------------------------------------------------------------------------- + // + // Creating ProcessingTrackers + // + // -------------------------------------------------------------------------------- + public GenomeLocProcessingTracker(ClosableReentrantLock lock) { + processingLocs = new HashMap(); + this.lock = lock; } // -------------------------------------------------------------------------------- @@ -64,6 +70,7 @@ public abstract class GenomeLocProcessingTracker { // Code to claim intervals for processing and query for their ownership // // -------------------------------------------------------------------------------- + /** * Queries the current database if a location is owned. Does not guarantee that the * loc can be owned in a future call, though. @@ -75,23 +82,12 @@ public abstract class GenomeLocProcessingTracker { return findOwner(loc) != null; } - // in general this isn't true for the list of locs, as they definitely can occur out of order - protected static ProcessingLoc findOwnerInSortedList(GenomeLoc loc, List locs) { - int i = Collections.binarySearch(locs, new ProcessingLoc(loc, "ignore")); - return i < 0 ? null : locs.get(i); - } - - protected static ProcessingLoc findOwnerInUnsortedList(GenomeLoc loc, List locs) { - for ( ProcessingLoc l : locs ) { - if ( l.getLoc().equals(loc) ) - return l; - } - - return null; - } - public ProcessingLoc findOwner(GenomeLoc loc) { - return findOwnerInUnsortedList(loc, getProcessingLocs()); + // fast path to check if we already have the existing genome loc in memory for ownership claims + // getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it + // unless necessary + ProcessingLoc x = findOwnerInMap(loc, processingLocs); + return x == null ? findOwnerInMap(loc, updateLocs()) : x; } /** @@ -105,7 +101,24 @@ public abstract class GenomeLocProcessingTracker { * @param myName * @return */ - public abstract ProcessingLoc claimOwnership(GenomeLoc loc, String myName); + public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { + // processingLocs is a shared memory synchronized object, and this + // method is synchronized, so we can just do our processing + lock(); + try { + ProcessingLoc owner = findOwner(loc); + + if ( owner == null ) { // we are unowned + owner = new ProcessingLoc(loc, myName); + registerNewLoc(owner); + } + + return owner; + //logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner)); + } finally { + unlock(); + } + } /** * A higher-level, and more efficient, interface to obtain the next location we own. Takes an @@ -120,18 +133,66 @@ public abstract class GenomeLocProcessingTracker { * @return */ public T claimOwnershipOfNextAvailable(Iterator iterator, String myName) { - while ( iterator.hasNext() ) { - T elt = iterator.next(); - GenomeLoc loc = elt.getLocation(); - ProcessingLoc proc = claimOwnership(loc, myName); + OwnershipIterator myIt = new OwnershipIterator(iterator, myName); + return myIt.next(); + } - if ( proc.isOwnedBy(myName) ) - return elt; - // if not, we continue our search + public Iterable onlyOwned(Iterator iterator, String myName) { + return new OwnershipIterator(iterator, myName); + } + + protected class OwnershipIterator implements Iterator, Iterable { + Iterator subit; + String myName; + + public OwnershipIterator(Iterator subit, String myName) { + this.subit = subit; + this.myName = myName; } - // we never found an object, just return it. - return null; + /** + * Will return true for all elements of subit, even if we can't get ownership of some of the future + * elements and so will return null there + * @return + */ + public boolean hasNext() { + return subit.hasNext(); + } + + /** + * High performance iterator that only locks and unlocks once per claimed object found. Avoids + * locking / unlocking for each query + * + * @return an object of type T owned by this thread, or null if none of the remaining object could be claimed + */ + public T next() { + lock(); + try { + while ( subit.hasNext() ) { + T elt = subit.next(); + //logger.warn("Checking elt for ownership " + elt); + GenomeLoc loc = elt.getLocation(); + ProcessingLoc proc = claimOwnership(loc, myName); + + if ( proc.isOwnedBy(myName) ) + return elt; + // if not, we continue our search + } + + // we never found an object, just return it. + return null; + } finally { + unlock(); + } + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + public Iterator iterator() { + return this; + } } /** @@ -143,9 +204,64 @@ public abstract class GenomeLocProcessingTracker { * * @return */ - protected abstract List getProcessingLocs(); + protected Collection getProcessingLocs() { + return updateLocs().values(); + } + + private Map updateLocs() { + lock(); + try { + for ( ProcessingLoc p : readNewLocs() ) + processingLocs.put(p.getLocation(), p); + return processingLocs; + } finally { + unlock(); + } + } + + + // -------------------------------------------------------------------------------- + // + // Low-level accessors / manipulators and utility functions + // + // -------------------------------------------------------------------------------- + + private final void lock() { + if ( ! lock.isHeldByCurrentThread() ) + nLockingEvents++; + lock.lock(); + } + + private final void unlock() { + lock.unlock(); + } + + protected static ProcessingLoc findOwnerInCollection(GenomeLoc loc, Collection locs) { + for ( ProcessingLoc l : locs ) { + if ( l.getLocation().equals(loc) ) + return l; + } + + return null; + } + + protected static ProcessingLoc findOwnerInMap(GenomeLoc loc, Map locs) { + return locs.get(loc); + } + + + // -------------------------------------------------------------------------------- + // + // Code to override to change the dynamics of the the GenomeLocProcessingTracker + // + // -------------------------------------------------------------------------------- protected void close() { + lock.close(); + logger.warn("Locking events: " + nLockingEvents); // by default we don't do anything } + + protected abstract void registerNewLoc(ProcessingLoc loc); + protected abstract Collection readNewLocs(); } diff --git a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java index dd8f23152..d5743a3c7 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java @@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.threading; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -10,11 +11,27 @@ import java.util.List; * Base class, and null tracker. Always says that a GenomeLoc is ready for processing */ public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker { + protected NoOpGenomeLocProcessingTracker() { + super(new ClosableReentrantLock()); // todo -- should be lighter weight + } + + @Override public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { return new ProcessingLoc(loc, myName); } + @Override protected List getProcessingLocs() { return Collections.emptyList(); } + + @Override + protected void registerNewLoc(ProcessingLoc loc) { + ; + } + + @Override + protected List readNewLocs() { + return Collections.emptyList(); + } } diff --git a/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java b/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java new file mode 100644 index 000000000..d2ec1fe9a --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java @@ -0,0 +1,57 @@ +package org.broadinstitute.sting.utils.threading; + +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.HasGenomeLocation; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +/** + * Created by IntelliJ IDEA. + * User: depristo + * Date: 1/19/11 + * Time: 8:06 AM + * + * Information about processing locations and their owners + */ +public class ProcessingLoc implements HasGenomeLocation { + private final GenomeLoc loc; + private final String owner; + + /** + * Create a loc that's already owned + * @param loc + * @param owner + */ + public ProcessingLoc(GenomeLoc loc, String owner) { + if ( loc == null || owner == null ) { + throw new ReviewedStingException("BUG: invalid ProcessingLoc detected: " + loc + " owner " + owner); + } + + this.loc = loc; + this.owner = owner; + } + + public GenomeLoc getLocation() { + return loc; + } + + public String getOwner() { + return owner; + } + + public boolean isOwnedBy(String name) { + return getOwner().equals(name); + } + + public String toString() { return String.format("ProcessingLoc(%s,%s)", loc, owner); } + + public boolean equals(Object other) { + if (other instanceof ProcessingLoc ) + return this.loc.equals(((ProcessingLoc)other).loc) && this.owner.equals(((ProcessingLoc)other).owner); + else + return false; + } + + public int compareTo(ProcessingLoc other) { + return this.getLocation().compareTo(other.getLocation()); + } +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java index b1353f51f..cac55d5ff 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java @@ -14,208 +14,208 @@ import java.util.List; /** * */ -public class SharedFileGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - private static final boolean DEBUG = false; - private static final boolean REALLY_DEBUG = false; - - private boolean ACTUALLY_USE_FILE_LOCK = true; - - private static Logger logger = Logger.getLogger(SharedFileGenomeLocProcessingTracker.class); - - private Object myLock = new Object(); - private List processingLocs; - private File sharedFile = null; - private GenomeLocParser parser; - private FileLock lock = null; - private RandomAccessFile raFile; - private long lastReadPosition = 0; - -// // -// // TODO -- I CAN'T FOR SOME REASON GET THE FILE LOCK TESTING TO WORK WITH MULTIPLE THREADS IN THE UNIT TEST -// // TODO -- IT SEEMS THAT SOME LOCKS AREN'T BEING FREED, BUT IT DOESN'T SEEM POSSIBLE GIVEN THE CHECKS -// // TODO -- IN THE CODE. I THINK THE LOCK IS SOMEHOW CONTINUING BEYOND THE UNLOCK CALL, OR THAT I NEED -// // TODO -- TO CLOSE AND REOPEN THE CHANNEL FOR EACH LOCK? -// // +//public class SharedFileGenomeLocProcessingTracker extends GenomeLocProcessingTracker { +// private static final boolean DEBUG = false; +// private static final boolean REALLY_DEBUG = false; // - public SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser) { - this(sharedFile, parser, true); - } - - protected SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, boolean useFileLock) { - processingLocs = new ArrayList(); - ACTUALLY_USE_FILE_LOCK = false; - try { - this.sharedFile = sharedFile; - this.raFile = new RandomAccessFile(sharedFile, "rws"); - this.parser = parser; - } - catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } - } - - public void close() { - if ( ACTUALLY_USE_FILE_LOCK ) { - try { - this.raFile.close(); - } - catch (IOException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } - } - } - - private void lock() { - if ( ACTUALLY_USE_FILE_LOCK ) { - - // Precondition -- lock is always null while we don't have a lock - if ( lock != null ) - throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); - - try { - lock = raFile.getChannel().lock(); - } catch (ClosedChannelException e) { - throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e); - } catch (FileLockInterruptionException e) { - throw new ReviewedStingException("File lock interrupted", e); - } catch (NonWritableChannelException e) { - throw new ReviewedStingException("File channel not writable", e); - } catch (OverlappingFileLockException e) { - // this only happens when multiple threads are running, and one is waiting - // for the lock above and we come here. - throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); - } catch (IOException e) { - throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); - } - } - } - - private void unlock(boolean excepting) { - if ( ACTUALLY_USE_FILE_LOCK ) { - - // Precondition -- lock is never null while we have a lock - if ( lock == null ) { - if ( ! excepting ) - throw new ReviewedStingException("BUG: call to unlock() when we don't have the lock!"); - } else { - if ( ! lock.isValid() ) - throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); - try { - lock.release(); - lock = null; - //channel.close(); - } catch ( IOException e ) { - throw new ReviewedStingException("Could not free lock on file " + sharedFile, e); - } - } - } - } - - private List readLocs() { - if ( ACTUALLY_USE_FILE_LOCK ) { - // we must have a lock to run this code - if ( lock == null || ! lock.isValid() ) throw new ReviewedStingException("File lock must be valid upon entry to readLocs()"); - - try { - if ( raFile.length() > lastReadPosition ) { - raFile.seek(lastReadPosition); - - int counter = 0; - String line = raFile.readLine(); // Read another line - while ( line != null ) { - String[] parts = line.split(" "); - if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer()); - GenomeLoc loc = parser.parseGenomeLoc(parts[0]); - String owner = parts[1]; - processingLocs.add(new ProcessingLoc(loc, owner)); - line = raFile.readLine(); - counter++; - } - lastReadPosition = raFile.getFilePointer(); - if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, total locs is %d", - counter, lastReadPosition, processingLocs.size())); - } - } catch (FileNotFoundException e) { - throw new UserException.CouldNotReadInputFile(sharedFile, e); - } catch (IOException e) { - throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); - } - } - - return processingLocs; - } - - private void writeLoc(ProcessingLoc proc) { - if ( ACTUALLY_USE_FILE_LOCK ) { - // we must have a lock to run this code - if ( lock == null || ! lock.isValid() ) - throw new ReviewedStingException("File lock must be valid upon entry to writeLoc()"); - - try { - String packet = String.format("%s %s%n", proc.getLoc(), proc.getOwner()); - long startPos = raFile.getFilePointer(); - raFile.seek(raFile.length()); - raFile.write(packet.getBytes()); - if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer())); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } catch (IOException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } - } else { - processingLocs.add(proc); - } - } - - private final void printOwners() { - for ( ProcessingLoc proc : processingLocs ) - System.out.println(proc); - } - - public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { - if ( REALLY_DEBUG ) System.out.printf(" claimOwnership %s%n", myName); - synchronized (processingLocs) { - boolean excepting = true; - ProcessingLoc owner = null; - - if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock"); - - if ( REALLY_DEBUG ) System.out.printf(" sync raFile %s %s%n", myName, raFile); - try { - lock(); - owner = findOwnerInUnsortedList(loc, readLocs()); - //owner = super.findOwner(loc); - if ( owner == null ) { // we are unowned - owner = new ProcessingLoc(loc, myName); - writeLoc(owner); - } - excepting = false; - } finally { - if ( REALLY_DEBUG ) System.out.printf(" claimOwnership unlock %s excepting %s, owner %s%n", myName, excepting, owner); - //printOwners(); - unlock(excepting); - } - - if ( lock != null ) throw new ReviewedStingException("BUG: exiting claimOwnership synchronized block without setting lock to null"); - return owner; - } - } - - protected List getProcessingLocs() { - synchronized (processingLocs) { - boolean excepting = true; - if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock"); - - try { - lock(); - readLocs(); - excepting = false; - } finally { - unlock(excepting); - } - - if ( lock != null ) throw new ReviewedStingException("BUG: exiting getProcessingLocs synchronized block without setting lock to null"); - return processingLocs; - } - } -} +// private boolean ACTUALLY_USE_FILE_LOCK = true; +// +// private static Logger logger = Logger.getLogger(SharedFileGenomeLocProcessingTracker.class); +// +// private Object myLock = new Object(); +// private List processingLocs; +// private File sharedFile = null; +// private GenomeLocParser parser; +// private FileLock lock = null; +// private RandomAccessFile raFile; +// private long lastReadPosition = 0; +// +//// // +//// // TODO -- I CAN'T FOR SOME REASON GET THE FILE LOCK TESTING TO WORK WITH MULTIPLE THREADS IN THE UNIT TEST +//// // TODO -- IT SEEMS THAT SOME LOCKS AREN'T BEING FREED, BUT IT DOESN'T SEEM POSSIBLE GIVEN THE CHECKS +//// // TODO -- IN THE CODE. I THINK THE LOCK IS SOMEHOW CONTINUING BEYOND THE UNLOCK CALL, OR THAT I NEED +//// // TODO -- TO CLOSE AND REOPEN THE CHANNEL FOR EACH LOCK? +//// // +//// +// public SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser) { +// this(sharedFile, parser, true); +// } +// +// protected SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, boolean useFileLock) { +// processingLocs = new ArrayList(); +// ACTUALLY_USE_FILE_LOCK = false; +// try { +// this.sharedFile = sharedFile; +// this.raFile = new RandomAccessFile(sharedFile, "rws"); +// this.parser = parser; +// } +// catch (FileNotFoundException e) { +// throw new UserException.CouldNotCreateOutputFile(sharedFile, e); +// } +// } +// +// public void close() { +// if ( ACTUALLY_USE_FILE_LOCK ) { +// try { +// this.raFile.close(); +// } +// catch (IOException e) { +// throw new UserException.CouldNotCreateOutputFile(sharedFile, e); +// } +// } +// } +// +// private void lock() { +// if ( ACTUALLY_USE_FILE_LOCK ) { +// +// // Precondition -- lock is always null while we don't have a lock +// if ( lock != null ) +// throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); +// +// try { +// lock = raFile.getChannel().lock(); +// } catch (ClosedChannelException e) { +// throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e); +// } catch (FileLockInterruptionException e) { +// throw new ReviewedStingException("File lock interrupted", e); +// } catch (NonWritableChannelException e) { +// throw new ReviewedStingException("File channel not writable", e); +// } catch (OverlappingFileLockException e) { +// // this only happens when multiple threads are running, and one is waiting +// // for the lock above and we come here. +// throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); +// } catch (IOException e) { +// throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); +// } +// } +// } +// +// private void unlock(boolean excepting) { +// if ( ACTUALLY_USE_FILE_LOCK ) { +// +// // Precondition -- lock is never null while we have a lock +// if ( lock == null ) { +// if ( ! excepting ) +// throw new ReviewedStingException("BUG: call to unlock() when we don't have the lock!"); +// } else { +// if ( ! lock.isValid() ) +// throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); +// try { +// lock.release(); +// lock = null; +// //channel.close(); +// } catch ( IOException e ) { +// throw new ReviewedStingException("Could not free lock on file " + sharedFile, e); +// } +// } +// } +// } +// +// private List readLocs() { +// if ( ACTUALLY_USE_FILE_LOCK ) { +// // we must have a lock to run this code +// if ( lock == null || ! lock.isValid() ) throw new ReviewedStingException("File lock must be valid upon entry to readLocs()"); +// +// try { +// if ( raFile.length() > lastReadPosition ) { +// raFile.seek(lastReadPosition); +// +// int counter = 0; +// String line = raFile.readLine(); // Read another line +// while ( line != null ) { +// String[] parts = line.split(" "); +// if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer()); +// GenomeLoc loc = parser.parseGenomeLoc(parts[0]); +// String owner = parts[1]; +// processingLocs.add(new ProcessingLoc(loc, owner)); +// line = raFile.readLine(); +// counter++; +// } +// lastReadPosition = raFile.getFilePointer(); +// if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, total locs is %d", +// counter, lastReadPosition, processingLocs.size())); +// } +// } catch (FileNotFoundException e) { +// throw new UserException.CouldNotReadInputFile(sharedFile, e); +// } catch (IOException e) { +// throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); +// } +// } +// +// return processingLocs; +// } +// +// private void writeLoc(ProcessingLoc proc) { +// if ( ACTUALLY_USE_FILE_LOCK ) { +// // we must have a lock to run this code +// if ( lock == null || ! lock.isValid() ) +// throw new ReviewedStingException("File lock must be valid upon entry to writeLoc()"); +// +// try { +// String packet = String.format("%s %s%n", proc.getLoc(), proc.getOwner()); +// long startPos = raFile.getFilePointer(); +// raFile.seek(raFile.length()); +// raFile.write(packet.getBytes()); +// if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer())); +// } catch (FileNotFoundException e) { +// throw new UserException.CouldNotCreateOutputFile(sharedFile, e); +// } catch (IOException e) { +// throw new UserException.CouldNotCreateOutputFile(sharedFile, e); +// } +// } else { +// processingLocs.add(proc); +// } +// } +// +// private final void printOwners() { +// for ( ProcessingLoc proc : processingLocs ) +// System.out.println(proc); +// } +// +// public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { +// if ( REALLY_DEBUG ) System.out.printf(" claimOwnership %s%n", myName); +// synchronized (processingLocs) { +// boolean excepting = true; +// ProcessingLoc owner = null; +// +// if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock"); +// +// if ( REALLY_DEBUG ) System.out.printf(" sync raFile %s %s%n", myName, raFile); +// try { +// lock(); +// owner = findOwnerInUnsortedList(loc, readLocs()); +// //owner = super.findOwner(loc); +// if ( owner == null ) { // we are unowned +// owner = new ProcessingLoc(loc, myName); +// writeLoc(owner); +// } +// excepting = false; +// } finally { +// if ( REALLY_DEBUG ) System.out.printf(" claimOwnership unlock %s excepting %s, owner %s%n", myName, excepting, owner); +// //printOwners(); +// unlock(excepting); +// } +// +// if ( lock != null ) throw new ReviewedStingException("BUG: exiting claimOwnership synchronized block without setting lock to null"); +// return owner; +// } +// } +// +// protected List getProcessingLocs() { +// synchronized (processingLocs) { +// boolean excepting = true; +// if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock"); +// +// try { +// lock(); +// readLocs(); +// excepting = false; +// } finally { +// unlock(excepting); +// } +// +// if ( lock != null ) throw new ReviewedStingException("BUG: exiting getProcessingLocs synchronized block without setting lock to null"); +// return processingLocs; +// } +// } +//} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java new file mode 100644 index 000000000..20f36bdb9 --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java @@ -0,0 +1,114 @@ +package org.broadinstitute.sting.utils.threading; + +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.UserException; + +import java.io.IOException; +import java.nio.channels.*; +import java.util.concurrent.locks.ReentrantLock; + +/** + * User: depristo + * Date: 1/19/11 + * Time: 8:24 AM + * + * A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common + * file in the file system. It itself a shared memory reenterant lock to managed thread safety and a + * FileChannel FileLock to handle the file integrity + */ +public class SharedFileThreadSafeLock extends ClosableReentrantLock { + private static Logger logger = Logger.getLogger(SharedFileThreadSafeLock.class); + private static final boolean DEBUG = false; + + /** The file lock itself that guards the file */ + FileLock fileLock; + + /** the channel object that 'owns' the file lock, and we use to request the lock */ + FileChannel channel; + int fileLockReentrantCounter = 0; + + /** + * Create a SharedFileThreadSafeLock object locking the file associated with channel + * @param channel + */ + public SharedFileThreadSafeLock(FileChannel channel) { + super(); + this.channel = channel; + } + + public void close() { + try { + channel.close(); + } + catch (IOException e) { + throw new UserException("Count not close channel " + channel, e); + } + } + + /** + * Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be + * unwound appropriately. Uses file channel lock *after* thread locking. + */ + @Override + public void lock() { + if ( DEBUG ) logger.warn("Attempting threadlock: " + Thread.currentThread().getName()); + + if ( super.isHeldByCurrentThread() ) { + if ( DEBUG ) logger.warn(" Already have threadlock, continuing: " + Thread.currentThread().getName()); + super.lock(); // call the lock here so we can call unlock later + fileLockReentrantCounter++; // inc. the file lock counter + return; + } else { + super.lock(); + if ( DEBUG ) logger.warn(" Have thread-lock, going for filelock: " + Thread.currentThread().getName()); + try { + // Precondition -- lock is always null while we don't have a lock + if ( fileLock != null ) + throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); + if ( fileLockReentrantCounter == 0 ) + fileLock = channel.lock(); + fileLockReentrantCounter++; + if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName()); + } catch (ClosedChannelException e) { + throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + channel, e); + } catch (FileLockInterruptionException e) { + throw new ReviewedStingException("File lock interrupted", e); + } catch (NonWritableChannelException e) { + throw new ReviewedStingException("File channel not writable", e); + } catch (OverlappingFileLockException e) { + // this only happens when multiple threads are running, and one is waiting + // for the lock above and we come here. + throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); + } catch (IOException e) { + throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); + } + } + } + + @Override + public void unlock() { + try { + // update for reentrant unlocking + fileLockReentrantCounter--; + if ( fileLockReentrantCounter < 0 ) throw new ReviewedStingException("BUG: file lock counter < 0"); + + if ( fileLock != null && fileLockReentrantCounter == 0 ) { + if ( ! fileLock.isValid() ) throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); + + if ( DEBUG ) logger.warn(" going to release filelock: " + Thread.currentThread().getName()); + fileLock.release(); + fileLock = null; + if ( DEBUG ) logger.warn(" released filelock: " + Thread.currentThread().getName()); + } else { + if ( DEBUG ) logger.warn(" skipping filelock release, reenterring unlock via multiple threads " + Thread.currentThread().getName()); + } + } catch ( IOException e ) { + throw new ReviewedStingException("Could not free lock on file " + channel, e); + } finally { + if ( DEBUG ) logger.warn(" going to release threadlock: " + Thread.currentThread().getName()); + super.unlock(); + if ( DEBUG ) logger.warn(" released threadlock: " + Thread.currentThread().getName()); + } + } +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java index c247196d3..46b13d163 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java @@ -5,34 +5,27 @@ import org.broadinstitute.sting.utils.GenomeLoc; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; /** - * For algorithmic testing purposes only. Uses synchronization to keep a consistent - * processing list in shared memory. + * Thread-safe shared memory only implementation */ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - private static Logger logger = Logger.getLogger(SharedMemoryGenomeLocProcessingTracker.class); - protected List processingLocs = new ArrayList(); + private List newPLocs = new ArrayList(); - public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { - // processingLocs is a shared memory synchronized object, and this - // method is synchronized, so we can just do our processing - synchronized (processingLocs) { - ProcessingLoc owner = super.findOwner(loc); - - if ( owner == null ) { // we are unowned - owner = new ProcessingLoc(loc, myName); - processingLocs.add(owner); - } - - return owner; - //logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner)); - } + protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock) { + super(lock); } - protected List getProcessingLocs() { - synchronized (processingLocs) { - return processingLocs; - } + @Override + protected void registerNewLoc(ProcessingLoc loc) { + newPLocs.add(loc); + } + + @Override + protected List readNewLocs() { + List r = newPLocs; + newPLocs = new ArrayList(); + return r; } } diff --git a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java index c19161f83..9ee8a1fc1 100644 --- a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java @@ -7,6 +7,7 @@ package org.broadinstitute.sting.utils.threading; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.gatk.iterators.GenomeLocusIterator; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.exceptions.UserException; @@ -17,7 +18,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.concurrent.*; @@ -27,13 +28,11 @@ import java.util.concurrent.*; public class GenomeLocProcessingTrackerUnitTest extends BaseTest { IndexedFastaSequenceFile fasta = null; GenomeLocParser genomeLocParser = null; - File sharedFile = new File("synchronizationFile.txt"); - static final boolean USE_FILE_LOCK = false; String chr1 = null; + private final static String FILE_ROOT = "testdata/GLPTFile"; @BeforeTest public void before() { - logger.warn("SharedFile is " + sharedFile.getAbsolutePath()); File referenceFile = new File(hg18Reference); try { fasta = new IndexedFastaSequenceFile(referenceFile); @@ -46,13 +45,16 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { } } - @AfterMethod - public void cleanup(Object[] data) { - if ( sharedFile.exists() ) { - sharedFile.delete(); - } + @BeforeMethod + public void beforeMethod(Object[] data) { + if ( data.length > 0 ) + ((TestTarget)data[0]).init(); + } - ((TestTarget)data[0]).getTracker().close(); + @AfterMethod + public void afterMethod(Object[] data) { + if ( data.length > 0 ) + ((TestTarget)data[0]).getTracker().close(); } abstract private class TestTarget { @@ -60,6 +62,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { int nShards; int shardSize; + public void init() {} + protected TestTarget(String name, int nShards, int shardSize) { this.name = name; this.nShards = nShards; @@ -83,37 +87,44 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { } } - private class SharedFileTest extends TestTarget { - protected SharedFileTest(int nShards, int shardSize) { - super("SharedFile", nShards, shardSize); - } - - GenomeLocProcessingTracker tracker = null; - public GenomeLocProcessingTracker getTracker() { - if ( tracker == null ) - //tracker = new SharedMemoryGenomeLocProcessingTracker(); - tracker = new SharedFileGenomeLocProcessingTracker(sharedFile, genomeLocParser, USE_FILE_LOCK); - return tracker; - } + @DataProvider(name = "threadData") + public Object[][] createThreadData() { + return createData(Arrays.asList(10, 100, 1000, 10000), Arrays.asList(10)); } - - @DataProvider(name = "data") - public Object[][] createData1() { + public Object[][] createData(List nShards, List shardSizes) { List params = new ArrayList(); - for ( int nShard : Arrays.asList(10, 100, 1000) ) { - for ( int shardSize : Arrays.asList(10) ) { -// for ( int nShard : Arrays.asList(10, 100, 1000, 10000) ) { -// for ( int shardSize : Arrays.asList(10, 100) ) { + int counter = 0; +// for ( int nShard : Arrays.asList(10,100,1000) ) { +// for ( int shardSize : Arrays.asList(10) ) { + for ( int nShard : nShards ) { + for ( int shardSize : shardSizes ) { // shared mem -- canonical implementation - params.add(new TestTarget("SharedMem", nShard, shardSize) { - SharedMemoryGenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker(); + params.add(new TestTarget("ThreadSafeSharedMemory", nShard, shardSize) { + SharedMemoryGenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createSharedMemory(); public GenomeLocProcessingTracker getTracker() { return tracker; } }); -// // shared file -- working implementation -// params.add(new SharedFileTest(nShard, shardSize)); + final File file1 = new File(String.format("%s_ThreadSafeFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize)); + params.add(new TestTarget("ThreadSafeFileBacked", nShard, shardSize) { + GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedThreaded(file1, genomeLocParser); + public GenomeLocProcessingTracker getTracker() { return tracker; } + public void init() { + if ( file1.exists() ) + file1.delete(); + } + }); + + final File file2 = new File(String.format("%s_ThreadSafeFileLockingFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize)); + params.add(new TestTarget("ThreadSafeFileLockingFileBacked", nShard, shardSize) { + GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedDistributed(file2, genomeLocParser); + public GenomeLocProcessingTracker getTracker() { return tracker; } + public void init() { + if ( file2.exists() ) + file2.delete(); + } + }); } } @@ -122,10 +133,28 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { return params2.toArray(new Object[][]{}); } + @DataProvider(name = "simpleData") + public Object[][] createSimpleData() { + return createData(Arrays.asList(1000), Arrays.asList(100)); + } + private static final String NAME_ONE = "name1"; private static final String NAME_TWO = "name2"; - @Test(dataProvider = "data", enabled = true) + @Test(enabled = true) + public void testNoop() { + GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createNoOp(); + for ( int start = 1; start < 100; start++ ) { + for ( int n = 0; n < 2; n++ ) { + GenomeLoc loc = genomeLocParser.createGenomeLoc(chr1, start, start +1); + ProcessingLoc ploc = tracker.claimOwnership(loc, NAME_ONE); + Assert.assertTrue(ploc.isOwnedBy(NAME_ONE)); + Assert.assertEquals(tracker.getProcessingLocs().size(), 0); + } + } + } + + @Test(dataProvider = "simpleData", enabled = true) public void testSingleProcessTracker(TestTarget test) { GenomeLocProcessingTracker tracker = test.getTracker(); List shards = test.getShards(); @@ -138,24 +167,60 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { Assert.assertNull(tracker.findOwner(shard)); Assert.assertFalse(tracker.locIsOwned(shard)); - GenomeLocProcessingTracker.ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); + ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); Assert.assertNotNull(proc); - Assert.assertNotNull(proc.getLoc()); + Assert.assertNotNull(proc.getLocation()); Assert.assertNotNull(proc.getOwner()); - Assert.assertEquals(proc.getLoc(), shard); + Assert.assertEquals(proc.getLocation(), shard); Assert.assertEquals(proc.getOwner(), NAME_ONE); Assert.assertEquals(tracker.findOwner(shard), proc); Assert.assertTrue(tracker.locIsOwned(shard)); Assert.assertNotNull(tracker.getProcessingLocs()); Assert.assertEquals(tracker.getProcessingLocs().size(), counter); - GenomeLocProcessingTracker.ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO); + ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO); Assert.assertFalse(badClaimAttempt.getOwner().equals(NAME_TWO)); Assert.assertEquals(badClaimAttempt.getOwner(), NAME_ONE); } } - @Test(dataProvider = "data", enabled = true) + @Test(dataProvider = "simpleData", enabled = true) + public void testIterator(TestTarget test) { + GenomeLocProcessingTracker tracker = test.getTracker(); + List shards = test.getShards(); + logger.warn("testIterator " + test); + + List markedShards = new ArrayList(); + List toFind = new ArrayList(); + + for ( int i = 0; i < shards.size(); i++ ) { + if ( ! (i % 10 == 0) ) { + markedShards.add(shards.get(i)); + tracker.claimOwnership(shards.get(i), NAME_TWO); + } else { + toFind.add(shards.get(i)); + } + } + + int nFound = 0; + Iterator it = shards.iterator(); + while ( it.hasNext() ) { + GenomeLoc shard = tracker.claimOwnershipOfNextAvailable(it, NAME_ONE); + + if ( shard == null ) { // everything to get is done + Assert.assertEquals(nFound, toFind.size(), "Didn't find all of the available shards"); + } else { + nFound++; + ProcessingLoc proc = tracker.findOwner(shard); + + Assert.assertTrue(proc.isOwnedBy(NAME_ONE)); + Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!"); + Assert.assertTrue(toFind.contains(shard), "Claimed shard wasn't one of the unmarked!"); + } + } + } + + @Test(dataProvider = "simpleData", enabled = true) public void testMarkedProcesses(TestTarget test) { GenomeLocProcessingTracker tracker = test.getTracker(); List shards = test.getShards(); @@ -171,7 +236,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { } for ( GenomeLoc shard : shards ) { - GenomeLocProcessingTracker.ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); + ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); Assert.assertTrue(proc.isOwnedBy(NAME_ONE) || proc.isOwnedBy(NAME_TWO)); @@ -190,24 +255,39 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { public TestTarget test; public String name; public List ran, toRun; + boolean useIterator; - public TestThread(TestTarget test, int count, List toRun) { + public TestThread(TestTarget test, int count, List toRun, boolean useIterator) { this.test = test; this.toRun = toRun; this.name = "thread" + count; this.ran = new ArrayList(); + this.useIterator = useIterator; } public Integer call() { - logger.warn(String.format("Call() Thread %s", name)); - for ( GenomeLoc shard : toRun ) { - //System.out.printf("Claiming ownership in %s on %s%n", name, shard); - GenomeLocProcessingTracker.ProcessingLoc proc = test.getTracker().claimOwnership(shard,name); - //System.out.printf(" => ownership of %s is %s (I own? %b)%n", shard, proc.getOwner(), proc.isOwnedBy(name)); - if ( proc.isOwnedBy(name) ) { - ran.add(proc.getLoc()); + //logger.warn(String.format("Call() Thread %s", name)); + if ( useIterator ) { + for ( GenomeLoc shard : test.getTracker().onlyOwned(toRun.iterator(), name) ) { + if ( shard != null ) { // ignore the unclaimable end of the stream + ran.add(shard); + // do some work here + for ( int sum =0, i = 0; i < 100000; i++) sum += i; + } + } + + } else { + for ( GenomeLoc shard : toRun ) { + //System.out.printf("Claiming ownership in %s on %s%n", name, shard); + ProcessingLoc proc = test.getTracker().claimOwnership(shard,name); + //System.out.printf(" => ownership of %s is %s (I own? %b)%n", shard, proc.getOwner(), proc.isOwnedBy(name)); + if ( proc.isOwnedBy(name) ) { + ran.add(proc.getLocation()); + // do some work here + for ( int sum =0, i = 0; i < 100000; i++) sum += i; + } + //logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner())); } - //logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner())); } return 1; @@ -245,14 +325,23 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { return r; } - @Test(dataProvider = "data", enabled = true) - public void testThreadedProcesses(TestTarget test) { + @Test(dataProvider = "threadData", enabled = true) + public void testThreadedProcessesLowLevelFunctions(TestTarget test) { + testThreading(test, false); + } + + @Test(dataProvider = "threadData", enabled = true) + public void testThreadedProcessesIterator(TestTarget test) { + testThreading(test, true); + } + + private void testThreading(TestTarget test, boolean useIterator) { // start up 3 threads - logger.warn("ThreadedTesting " + test); + logger.warn("ThreadedTesting " + test + " using iterator " + useIterator); List threads = new ArrayList(); for ( int i = 0; i < 4; i++) { List toRun = subList(test.getShards(), i+1); - TestThread thread = new TestThread(test, i, toRun); + TestThread thread = new TestThread(test, i, toRun, useIterator); threads.add(thread); } ExecutorService exec = java.util.concurrent.Executors.newFixedThreadPool(threads.size()); @@ -273,11 +362,11 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { for ( GenomeLoc shard : shards ) { Assert.assertTrue(tracker.locIsOwned(shard), "Unowned shard"); - GenomeLocProcessingTracker.ProcessingLoc proc = tracker.findOwner(shard); + ProcessingLoc proc = tracker.findOwner(shard); Assert.assertNotNull(proc, "Proc was null"); Assert.assertNotNull(proc.getOwner(), "Owner was null"); - Assert.assertEquals(proc.getLoc(), shard, "Shard loc doesn't make ProcessingLoc"); + Assert.assertEquals(proc.getLocation(), shard, "Shard loc doesn't make ProcessingLoc"); TestThread owner = findOwner(proc.getOwner(), threads); Assert.assertNotNull(owner, "Couldn't find owner");