From f8ba76d87c5c32b8ec03a4d92ba33caf61e62ef9 Mon Sep 17 00:00:00 2001 From: depristo Date: Mon, 17 Jan 2011 21:23:09 +0000 Subject: [PATCH] Incremental commit for distributed computation. Appears to work but has potential deadlock situation not yet debugged. Do not use yet. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5010 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/gatk/GenomeAnalysisEngine.java | 9 + .../arguments/GATKArgumentCollection.java | 17 ++ .../datasources/shards/IntervalSharder.java | 4 +- .../sting/gatk/datasources/shards/Shard.java | 7 +- .../simpleDataSources/SAMDataSource.java | 4 +- .../gatk/executive/LinearMicroScheduler.java | 28 +- .../sting/gatk/executive/MicroScheduler.java | 95 +++++++ .../broadinstitute/sting/utils/GenomeLoc.java | 18 +- .../sting/utils/HasGenomeLocation.java | 8 + .../threading/GenomeLocProcessingTracker.java | 59 +++- .../NoOpGenomeLocProcessingTracker.java | 20 ++ .../SharedFileGenomeLocProcessingTracker.java | 258 +++++++++++------- ...haredMemoryGenomeLocProcessingTracker.java | 27 +- .../GenomeLocProcessingTrackerUnitTest.java | 109 +++++--- 14 files changed, 486 insertions(+), 177 deletions(-) create mode 100644 java/src/org/broadinstitute/sting/utils/HasGenomeLocation.java create mode 100644 java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java diff --git a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index eec54118c..e3a0b0b6d 100755 --- a/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -134,6 +134,11 @@ public class GenomeAnalysisEngine { */ private Collection filters; + /** + * A currently hacky unique name for this GATK instance + */ + private String myName = "GATK_" + Math.abs(new Random().nextInt()); + /** * our walker manager */ @@ -236,6 +241,10 @@ public class GenomeAnalysisEngine { return walkerManager.getName(walkerType); } + public String getName() { + return myName; + } + /** * Gets a list of the filters to associate with the given walker. Will NOT initialize the engine with this filters; * the caller must handle that directly. diff --git a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java index d31377dc5..82c8b292d 100755 --- a/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java +++ b/java/src/org/broadinstitute/sting/gatk/arguments/GATKArgumentCollection.java @@ -217,6 +217,16 @@ public class GATKArgumentCollection { @Hidden public RMDTriplet.RMDStorageType rodInputType = null; + @Element(required=false) + @Argument(fullName="processingTracker",shortName="C",doc="A lockable, shared file for coordinating distributed GATK runs",required=false) + @Hidden + public File processingTrackerFile = null; + + @Element(required=false) + @Argument(fullName="restartProcessingTracker",shortName="RPT",doc="Should we delete the processing tracker file at startup?",required=false) + @Hidden + public boolean restartProcessingTracker = false; + /** * marshal the data out to a object * @@ -378,9 +388,16 @@ public class GATKArgumentCollection { (other.performanceLog != null && !other.performanceLog.equals(this.performanceLog))) return false; + if ((other.processingTrackerFile == null && this.processingTrackerFile != null) || + (other.processingTrackerFile != null && !other.processingTrackerFile.equals(this.processingTrackerFile))) + return false; + if(rodInputType != other.rodInputType) return false; + if ( restartProcessingTracker != other.restartProcessingTracker ) + return false; + return true; } diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/shards/IntervalSharder.java b/java/src/org/broadinstitute/sting/gatk/datasources/shards/IntervalSharder.java index 1fdd6b100..a85a6f3a4 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/shards/IntervalSharder.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/shards/IntervalSharder.java @@ -90,7 +90,7 @@ public class IntervalSharder { // If the next section of the BAM to be processed is unmapped, handle this region separately. while(locusIterator.hasNext() && nextBatch.isEmpty()) { contig = null; - while(locusIterator.hasNext() && (contig == null || (locusIterator.peek() != GenomeLoc.UNMAPPED && locusIterator.peek().getContig().equals(contig)))) { + while(locusIterator.hasNext() && (contig == null || (!GenomeLoc.isUnmapped(locusIterator.peek()) && locusIterator.peek().getContig().equals(contig)))) { GenomeLoc nextLocus = locusIterator.next(); contig = nextLocus.getContig(); nextBatch.add(nextLocus); @@ -387,7 +387,7 @@ class FilePointer { this.referenceSequence = location.getContig(); this.overlap = null; this.locations = Collections.singletonList(location); - this.isRegionUnmapped = location == GenomeLoc.UNMAPPED; + this.isRegionUnmapped = GenomeLoc.isUnmapped(location); } public FilePointer(final String referenceSequence,final BAMOverlap overlap) { diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/shards/Shard.java b/java/src/org/broadinstitute/sting/gatk/datasources/shards/Shard.java index 4a7c9dda5..061bece45 100644 --- a/java/src/org/broadinstitute/sting/gatk/datasources/shards/Shard.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/shards/Shard.java @@ -36,7 +36,12 @@ public interface Shard extends Serializable { READ, LOCUS } - /** @return the genome location represented by this shard */ + /** + * If isUnmapped is true, than getGenomeLocs by + * definition will return a singleton list with a GenomeLoc.UNMAPPED + * + * @return the genome location represented by this shard + */ public List getGenomeLocs(); /** diff --git a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java index 09dc64bca..95a8b6229 100755 --- a/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java +++ b/java/src/org/broadinstitute/sting/gatk/datasources/simpleDataSources/SAMDataSource.java @@ -866,9 +866,9 @@ public class SAMDataSource implements SimpleDataSource { // If we find a mix of mapped/unmapped intervals, throw an exception. boolean foundMappedIntervals = false; for(GenomeLoc location: intervals) { - if(location != GenomeLoc.UNMAPPED) + if(! GenomeLoc.isUnmapped(location)) foundMappedIntervals = true; - keepOnlyUnmappedReads |= (location == GenomeLoc.UNMAPPED); + keepOnlyUnmappedReads |= GenomeLoc.isUnmapped(location); } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 63829072d..13a721eb6 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -50,23 +50,25 @@ public class LinearMicroScheduler extends MicroScheduler { Accumulator accumulator = Accumulator.create(engine,walker); for (Shard shard : shardStrategy) { - // New experimental code for managing locus intervals. - if(shard.getShardType() == Shard.ShardType.LOCUS) { - LocusWalker lWalker = (LocusWalker)walker; - WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata()); - for(WindowMaker.WindowMakerIterator iterator: windowMaker) { - ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); + if ( claimShard(shard) ) { + // New experimental code for managing locus intervals. + if(shard.getShardType() == Shard.ShardType.LOCUS) { + LocusWalker lWalker = (LocusWalker)walker; + WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata()); + for(WindowMaker.WindowMakerIterator iterator: windowMaker) { + ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); + Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); + accumulator.accumulate(dataProvider,result); + dataProvider.close(); + } + windowMaker.close(); + } + else { + ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); accumulator.accumulate(dataProvider,result); dataProvider.close(); } - windowMaker.close(); - } - else { - ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); - Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); - accumulator.accumulate(dataProvider,result); - dataProvider.close(); } } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 52dfba70c..93d11eca4 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -26,6 +26,9 @@ package org.broadinstitute.sting.gatk.executive; import org.apache.log4j.Logger; +import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider; +import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider; +import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider; import org.broadinstitute.sting.gatk.datasources.shards.Shard; import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource; @@ -43,8 +46,12 @@ import java.lang.management.ManagementFactory; import java.util.*; import net.sf.picard.reference.IndexedFastaSequenceFile; +import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.threading.GenomeLocProcessingTracker; +import org.broadinstitute.sting.utils.threading.NoOpGenomeLocProcessingTracker; +import org.broadinstitute.sting.utils.threading.SharedFileGenomeLocProcessingTracker; import javax.management.JMException; import javax.management.MBeanServer; @@ -82,6 +89,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { private final MBeanServer mBeanServer; private final ObjectName mBeanName; + private GenomeLocProcessingTracker processingTracker; + /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -149,6 +158,19 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { catch (JMException ex) { throw new ReviewedStingException("Unable to register microscheduler with JMX", ex); } + + // create the processing tracker + if ( engine.getArguments().processingTrackerFile != null ) { + if ( engine.getArguments().restartProcessingTracker && engine.getArguments().processingTrackerFile.exists() ) { + engine.getArguments().processingTrackerFile.delete(); + logger.info("Deleting ProcessingTracker file " + engine.getArguments().processingTrackerFile); + } + + processingTracker = new SharedFileGenomeLocProcessingTracker(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser()); + logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile); + } else { + processingTracker = new NoOpGenomeLocProcessingTracker(); + } } /** @@ -161,6 +183,79 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { */ public abstract Object execute(Walker walker, ShardStrategy shardStrategy); + protected boolean claimShard(Shard shard) { + if ( shard.getGenomeLocs() == null ) { + if ( engine.getArguments().processingTrackerFile != null ) + throw new UserException.BadArgumentValue("processingTrackerFile", "Cannot use processing tracking with unindexed data"); + return true; + } else { + GenomeLoc shardSpan = shardSpan(shard); + + GenomeLocProcessingTracker.ProcessingLoc proc = processingTracker.claimOwnership(shardSpan, engine.getName()); + boolean actuallyProcess = proc.isOwnedBy(engine.getName()); + //logger.debug(String.format("Shard %s claimed by %s => owned by me %b", shard, proc.getOwner(), actuallyProcess)); + + if ( ! actuallyProcess ) + logger.info(String.format("DISTRIBUTED GATK: Shard %s already processed by %s", shard, proc.getOwner())); + + return actuallyProcess; + } + } + + private GenomeLoc shardSpan(Shard shard) { + if ( shard == null ) throw new ReviewedStingException("Shard is null!"); + int start = Integer.MAX_VALUE; + int stop = Integer.MIN_VALUE; + String contig = null; + + for ( GenomeLoc loc : shard.getGenomeLocs() ) { + if ( GenomeLoc.isUnmapped(loc) ) + // special case the unmapped region marker, just abort out + return loc; + contig = loc.getContig(); + if ( loc.getStart() < start ) start = loc.getStart(); + if ( loc.getStop() > stop ) stop = loc.getStop(); + } + return engine.getGenomeLocParser().createGenomeLoc(contig, start, stop); + } + + // todo -- the execution code in the schedulers is duplicated and slightly different -- should be merged +// protected boolean executeShard(Walker walker, Shard shard, Accumulator accumulator) { +// if(shard.getShardType() == Shard.ShardType.LOCUS) { +// LocusWalker lWalker = (LocusWalker)walker; +// +// WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata()); +// +// // ShardTraverser +// +// ShardDataProvider dataProvider = null; +// for(WindowMaker.WindowMakerIterator iterator: windowMaker) { +// dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); +// Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); +// accumulator.accumulate(dataProvider,result); +// dataProvider.close(); +// } +// if (dataProvider != null) dataProvider.close(); +// windowMaker.close(); +// } +// else { +// ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods); +// Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); +// accumulator.accumulate(dataProvider,result); +// dataProvider.close(); +// } +// +// +// // ShardTraverser +// +// for(WindowMaker.WindowMakerIterator iterator: windowMaker) { +// accumulator = traversalEngine.traverse( walker, dataProvider, accumulator ); +// dataProvider.close(); +// } +// +// return true; +// } + /** * Retrieves the object responsible for tracking and managing output. * @return An output tracker, for loading data in and extracting results. Will not be null. diff --git a/java/src/org/broadinstitute/sting/utils/GenomeLoc.java b/java/src/org/broadinstitute/sting/utils/GenomeLoc.java index 7fb93a208..c588125db 100644 --- a/java/src/org/broadinstitute/sting/utils/GenomeLoc.java +++ b/java/src/org/broadinstitute/sting/utils/GenomeLoc.java @@ -33,7 +33,11 @@ public class GenomeLoc implements Comparable, Cloneable, Serializable * the object may be used to refer to the region, as '==' comparisons are used * in comparators, etc. */ + // TODO - WARNING WARNING WARNING code somehow depends on the name of the contig being null! public static final GenomeLoc UNMAPPED = new GenomeLoc(null,-1,0,0); + public static final boolean isUnmapped(GenomeLoc loc) { + return loc == UNMAPPED; + } // -------------------------------------------------------------------------------------------------------------- // @@ -72,7 +76,7 @@ public class GenomeLoc implements Comparable, Cloneable, Serializable public final int getStart() { return this.start; } public final int getStop() { return this.stop; } public final String toString() { - if(this == UNMAPPED) return "unmapped"; + if(GenomeLoc.isUnmapped(this)) return "unmapped"; if ( throughEndOfContigP() && atBeginningOfContigP() ) return getContig(); else if ( throughEndOfContigP() || getStart() == getStop() ) @@ -100,8 +104,8 @@ public class GenomeLoc implements Comparable, Cloneable, Serializable } public GenomeLoc merge( GenomeLoc that ) throws ReviewedStingException { - if(this == UNMAPPED || that == UNMAPPED) { - if(this != UNMAPPED || that != UNMAPPED) + if(GenomeLoc.isUnmapped(this) || GenomeLoc.isUnmapped(that)) { + if(! GenomeLoc.isUnmapped(this) || !GenomeLoc.isUnmapped(that)) throw new ReviewedStingException("Tried to merge a mapped and an unmapped genome loc"); return UNMAPPED; } @@ -116,8 +120,8 @@ public class GenomeLoc implements Comparable, Cloneable, Serializable } public GenomeLoc intersect( GenomeLoc that ) throws ReviewedStingException { - if(this == UNMAPPED || that == UNMAPPED) { - if(this != UNMAPPED || that != UNMAPPED) + if(GenomeLoc.isUnmapped(this) || GenomeLoc.isUnmapped(that)) { + if(! GenomeLoc.isUnmapped(this) || !GenomeLoc.isUnmapped(that)) throw new ReviewedStingException("Tried to intersect a mapped and an unmapped genome loc"); return UNMAPPED; } @@ -238,9 +242,9 @@ public class GenomeLoc implements Comparable, Cloneable, Serializable if ( this == that ) { result = 0; } - else if(this == UNMAPPED) + else if(GenomeLoc.isUnmapped(this)) result = 1; - else if(that == UNMAPPED) + else if(GenomeLoc.isUnmapped(that)) result = -1; else { final int cmpContig = compareContigs(that); diff --git a/java/src/org/broadinstitute/sting/utils/HasGenomeLocation.java b/java/src/org/broadinstitute/sting/utils/HasGenomeLocation.java new file mode 100644 index 000000000..3c01dd897 --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/HasGenomeLocation.java @@ -0,0 +1,8 @@ +package org.broadinstitute.sting.utils; + +/** + * Indicates that this object has a genomic location and provides a systematic interface to get it. + */ +public interface HasGenomeLocation { + public GenomeLoc getLocation(); +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java index cdab0c610..cc805a8b8 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java @@ -1,22 +1,21 @@ package org.broadinstitute.sting.utils.threading; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.HasGenomeLocation; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import java.util.Collections; +import java.util.Iterator; import java.util.List; /** - * Created by IntelliJ IDEA. - * User: depristo - * Date: 1/13/11 - * Time: 9:38 AM - * To change this template use File | Settings | File Templates. + * */ public abstract class GenomeLocProcessingTracker { /** * Information about processing locations and their owners */ - public static final class ProcessingLoc { + public static final class ProcessingLoc implements Comparable { private final GenomeLoc loc; private final String owner; @@ -55,6 +54,9 @@ public abstract class GenomeLocProcessingTracker { return false; } + public int compareTo(ProcessingLoc other) { + return this.getLoc().compareTo(other.getLoc()); + } } // -------------------------------------------------------------------------------- @@ -73,8 +75,14 @@ public abstract class GenomeLocProcessingTracker { return findOwner(loc) != null; } - public ProcessingLoc findOwner(GenomeLoc loc) { - for ( ProcessingLoc l : getProcessingLocs() ) { + // in general this isn't true for the list of locs, as they definitely can occur out of order + protected static ProcessingLoc findOwnerInSortedList(GenomeLoc loc, List locs) { + int i = Collections.binarySearch(locs, new ProcessingLoc(loc, "ignore")); + return i < 0 ? null : locs.get(i); + } + + protected static ProcessingLoc findOwnerInUnsortedList(GenomeLoc loc, List locs) { + for ( ProcessingLoc l : locs ) { if ( l.getLoc().equals(loc) ) return l; } @@ -82,6 +90,10 @@ public abstract class GenomeLocProcessingTracker { return null; } + public ProcessingLoc findOwner(GenomeLoc loc) { + return findOwnerInUnsortedList(loc, getProcessingLocs()); + } + /** * The workhorse routine. Attempt to claim processing ownership of loc, with my name. * This is an atomic operation -- other threads / processes will wait until this function @@ -95,6 +107,33 @@ public abstract class GenomeLocProcessingTracker { */ public abstract ProcessingLoc claimOwnership(GenomeLoc loc, String myName); + /** + * A higher-level, and more efficient, interface to obtain the next location we own. Takes an + * iterator producing objects that support the getLocation() interface, and returns the next + * object in that stream that we can claim ownership of. Returns null if we run out of elements + * during the iteration. + * + * Can be more efficiently implemented in subclasses to avoid multiple unlocking + * + * @param iterator + * @param myName + * @return + */ + public T claimOwnershipOfNextAvailable(Iterator iterator, String myName) { + while ( iterator.hasNext() ) { + T elt = iterator.next(); + GenomeLoc loc = elt.getLocation(); + ProcessingLoc proc = claimOwnership(loc, myName); + + if ( proc.isOwnedBy(myName) ) + return elt; + // if not, we continue our search + } + + // we never found an object, just return it. + return null; + } + /** * Returns the list of currently owned locations, updating the database as necessary. * DO NOT MODIFY THIS LIST! As with all parallelizing data structures, the list may be @@ -105,4 +144,8 @@ public abstract class GenomeLocProcessingTracker { * @return */ protected abstract List getProcessingLocs(); + + protected void close() { + // by default we don't do anything + } } diff --git a/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java new file mode 100644 index 000000000..dd8f23152 --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java @@ -0,0 +1,20 @@ +package org.broadinstitute.sting.utils.threading; + +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; + +import java.util.Collections; +import java.util.List; + +/** + * Base class, and null tracker. Always says that a GenomeLoc is ready for processing + */ +public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker { + public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { + return new ProcessingLoc(loc, myName); + } + + protected List getProcessingLocs() { + return Collections.emptyList(); + } +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java index e77862a8f..b1353f51f 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java @@ -1,22 +1,13 @@ package org.broadinstitute.sting.utils.threading; -import com.google.common.collect.ArrayListMultimap; -import net.sf.picard.reference.FastaSequenceIndex; -import net.sf.picard.reference.FastaSequenceIndexBuilder; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; -import org.broadinstitute.sting.utils.file.FSLockWithShared; -import org.broadinstitute.sting.utils.file.FileSystemInabilityToLockException; -import org.broadinstitute.sting.utils.text.XReadLines; import java.io.*; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; +import java.nio.channels.*; import java.util.ArrayList; import java.util.List; @@ -25,123 +16,206 @@ import java.util.List; */ public class SharedFileGenomeLocProcessingTracker extends GenomeLocProcessingTracker { private static final boolean DEBUG = false; + private static final boolean REALLY_DEBUG = false; + + private boolean ACTUALLY_USE_FILE_LOCK = true; + private static Logger logger = Logger.getLogger(SharedFileGenomeLocProcessingTracker.class); - private List processingLocs = new ArrayList(); + + private Object myLock = new Object(); + private List processingLocs; private File sharedFile = null; - GenomeLocParser parser; - - // the file lock + private GenomeLocParser parser; private FileLock lock = null; + private RandomAccessFile raFile; + private long lastReadPosition = 0; - // the file channel we open - private FileChannel channel = null; - - long lastReadPosition = 0; - +// // +// // TODO -- I CAN'T FOR SOME REASON GET THE FILE LOCK TESTING TO WORK WITH MULTIPLE THREADS IN THE UNIT TEST +// // TODO -- IT SEEMS THAT SOME LOCKS AREN'T BEING FREED, BUT IT DOESN'T SEEM POSSIBLE GIVEN THE CHECKS +// // TODO -- IN THE CODE. I THINK THE LOCK IS SOMEHOW CONTINUING BEYOND THE UNLOCK CALL, OR THAT I NEED +// // TODO -- TO CLOSE AND REOPEN THE CHANNEL FOR EACH LOCK? +// // +// public SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser) { + this(sharedFile, parser, true); + } + + protected SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, boolean useFileLock) { + processingLocs = new ArrayList(); + ACTUALLY_USE_FILE_LOCK = false; try { this.sharedFile = sharedFile; - this.channel = new RandomAccessFile(sharedFile, "rw").getChannel(); + this.raFile = new RandomAccessFile(sharedFile, "rws"); this.parser = parser; } - catch (Exception e) { + catch (FileNotFoundException e) { throw new UserException.CouldNotCreateOutputFile(sharedFile, e); } } + public void close() { + if ( ACTUALLY_USE_FILE_LOCK ) { + try { + this.raFile.close(); + } + catch (IOException e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } + } + } + private void lock() { - try { - lock = channel.lock(); - } catch (ClosedChannelException e) { - throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e); - } -// catch (OverlappingFileLockException e) { -// logger.debug("Unable to lock file because you already have a lock on this file."); -// return false; -// } - catch (IOException e) { - throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); - } finally { - unlock(); + if ( ACTUALLY_USE_FILE_LOCK ) { + + // Precondition -- lock is always null while we don't have a lock + if ( lock != null ) + throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); + + try { + lock = raFile.getChannel().lock(); + } catch (ClosedChannelException e) { + throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e); + } catch (FileLockInterruptionException e) { + throw new ReviewedStingException("File lock interrupted", e); + } catch (NonWritableChannelException e) { + throw new ReviewedStingException("File channel not writable", e); + } catch (OverlappingFileLockException e) { + // this only happens when multiple threads are running, and one is waiting + // for the lock above and we come here. + throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen."); + } catch (IOException e) { + throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); + } } } - private void unlock() { - try { - lock.release(); - //channel.close(); - } catch ( IOException e ) { - throw new ReviewedStingException("Could not free lock on file " + sharedFile, e); + private void unlock(boolean excepting) { + if ( ACTUALLY_USE_FILE_LOCK ) { + + // Precondition -- lock is never null while we have a lock + if ( lock == null ) { + if ( ! excepting ) + throw new ReviewedStingException("BUG: call to unlock() when we don't have the lock!"); + } else { + if ( ! lock.isValid() ) + throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); + try { + lock.release(); + lock = null; + //channel.close(); + } catch ( IOException e ) { + throw new ReviewedStingException("Could not free lock on file " + sharedFile, e); + } + } } } - private void readLocs() { - try { - if ( sharedFile.exists() ) { - FileInputStream in = new FileInputStream(sharedFile); - if ( in.getChannel().size() > lastReadPosition ) { - in.skip(lastReadPosition); + private List readLocs() { + if ( ACTUALLY_USE_FILE_LOCK ) { + // we must have a lock to run this code + if ( lock == null || ! lock.isValid() ) throw new ReviewedStingException("File lock must be valid upon entry to readLocs()"); + + try { + if ( raFile.length() > lastReadPosition ) { + raFile.seek(lastReadPosition); - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); int counter = 0; - String line = reader.readLine(); // Read another line + String line = raFile.readLine(); // Read another line while ( line != null ) { String[] parts = line.split(" "); - if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line " + line); + if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer()); GenomeLoc loc = parser.parseGenomeLoc(parts[0]); String owner = parts[1]; processingLocs.add(new ProcessingLoc(loc, owner)); - line = reader.readLine(); + line = raFile.readLine(); counter++; } - lastReadPosition = in.getChannel().position(); + lastReadPosition = raFile.getFilePointer(); if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, total locs is %d", counter, lastReadPosition, processingLocs.size())); } - in.close(); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotReadInputFile(sharedFile, e); + } catch (IOException e) { + throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); } - } catch (FileNotFoundException e) { - throw new UserException.CouldNotReadInputFile(sharedFile, e); - } catch (IOException e) { - throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); - } - } - - private void writeLoc(ProcessingLoc proc) { - try { - PrintStream out = new PrintStream(new FileOutputStream(sharedFile, true)); - out.printf("%s %s%n", proc.getLoc(), proc.getOwner()); - if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file", proc)); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotReadInputFile(sharedFile, e); - } - } - - public synchronized ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { - ProcessingLoc owner = null; - try { - lock(); - readLocs(); - owner = super.findOwner(loc); - if ( owner == null ) { // we are unowned - owner = new ProcessingLoc(loc, myName); - writeLoc(owner); - } - } finally { - unlock(); - } - - return owner; - } - - protected synchronized List getProcessingLocs() { - try { - lock(); - readLocs(); - } finally { - unlock(); } return processingLocs; } + + private void writeLoc(ProcessingLoc proc) { + if ( ACTUALLY_USE_FILE_LOCK ) { + // we must have a lock to run this code + if ( lock == null || ! lock.isValid() ) + throw new ReviewedStingException("File lock must be valid upon entry to writeLoc()"); + + try { + String packet = String.format("%s %s%n", proc.getLoc(), proc.getOwner()); + long startPos = raFile.getFilePointer(); + raFile.seek(raFile.length()); + raFile.write(packet.getBytes()); + if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer())); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } catch (IOException e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } + } else { + processingLocs.add(proc); + } + } + + private final void printOwners() { + for ( ProcessingLoc proc : processingLocs ) + System.out.println(proc); + } + + public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { + if ( REALLY_DEBUG ) System.out.printf(" claimOwnership %s%n", myName); + synchronized (processingLocs) { + boolean excepting = true; + ProcessingLoc owner = null; + + if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock"); + + if ( REALLY_DEBUG ) System.out.printf(" sync raFile %s %s%n", myName, raFile); + try { + lock(); + owner = findOwnerInUnsortedList(loc, readLocs()); + //owner = super.findOwner(loc); + if ( owner == null ) { // we are unowned + owner = new ProcessingLoc(loc, myName); + writeLoc(owner); + } + excepting = false; + } finally { + if ( REALLY_DEBUG ) System.out.printf(" claimOwnership unlock %s excepting %s, owner %s%n", myName, excepting, owner); + //printOwners(); + unlock(excepting); + } + + if ( lock != null ) throw new ReviewedStingException("BUG: exiting claimOwnership synchronized block without setting lock to null"); + return owner; + } + } + + protected List getProcessingLocs() { + synchronized (processingLocs) { + boolean excepting = true; + if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock"); + + try { + lock(); + readLocs(); + excepting = false; + } finally { + unlock(excepting); + } + + if ( lock != null ) throw new ReviewedStingException("BUG: exiting getProcessingLocs synchronized block without setting lock to null"); + return processingLocs; + } + } } diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java index d8976ee0f..c247196d3 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java @@ -12,24 +12,27 @@ import java.util.List; */ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker { private static Logger logger = Logger.getLogger(SharedMemoryGenomeLocProcessingTracker.class); - private List processingLocs = new ArrayList(); + protected List processingLocs = new ArrayList(); - public synchronized ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { + public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { // processingLocs is a shared memory synchronized object, and this // method is synchronized, so we can just do our processing - ProcessingLoc owner = super.findOwner(loc); + synchronized (processingLocs) { + ProcessingLoc owner = super.findOwner(loc); - if ( owner == null ) { // we are unowned - owner = new ProcessingLoc(loc, myName); - processingLocs.add(owner); + if ( owner == null ) { // we are unowned + owner = new ProcessingLoc(loc, myName); + processingLocs.add(owner); + } + + return owner; + //logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner)); } - - //logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner)); - - return owner; } - protected synchronized List getProcessingLocs() { - return processingLocs; + protected List getProcessingLocs() { + synchronized (processingLocs) { + return processingLocs; + } } } diff --git a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java index a7d2aed71..c19161f83 100644 --- a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java +++ b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java @@ -11,10 +11,7 @@ import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.exceptions.UserException; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.BeforeTest; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; +import org.testng.annotations.*; import java.io.File; import java.io.FileNotFoundException; @@ -22,10 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Basic unit test for GenomeLoc @@ -34,10 +28,12 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { IndexedFastaSequenceFile fasta = null; GenomeLocParser genomeLocParser = null; File sharedFile = new File("synchronizationFile.txt"); + static final boolean USE_FILE_LOCK = false; String chr1 = null; @BeforeTest public void before() { + logger.warn("SharedFile is " + sharedFile.getAbsolutePath()); File referenceFile = new File(hg18Reference); try { fasta = new IndexedFastaSequenceFile(referenceFile); @@ -50,13 +46,14 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { } } - @BeforeMethod - public void cleanup() { + @AfterMethod + public void cleanup(Object[] data) { if ( sharedFile.exists() ) { sharedFile.delete(); } - } + ((TestTarget)data[0]).getTracker().close(); + } abstract private class TestTarget { String name; @@ -86,26 +83,37 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { } } + private class SharedFileTest extends TestTarget { + protected SharedFileTest(int nShards, int shardSize) { + super("SharedFile", nShards, shardSize); + } + + GenomeLocProcessingTracker tracker = null; + public GenomeLocProcessingTracker getTracker() { + if ( tracker == null ) + //tracker = new SharedMemoryGenomeLocProcessingTracker(); + tracker = new SharedFileGenomeLocProcessingTracker(sharedFile, genomeLocParser, USE_FILE_LOCK); + return tracker; + } + } + @DataProvider(name = "data") public Object[][] createData1() { List params = new ArrayList(); -// for ( int nShard : Arrays.asList(10) ) { -// for ( int shardSize : Arrays.asList(10) ) { - for ( int nShard : Arrays.asList(10, 100, 1000, 10000) ) { - for ( int shardSize : Arrays.asList(10, 100) ) { + for ( int nShard : Arrays.asList(10, 100, 1000) ) { + for ( int shardSize : Arrays.asList(10) ) { +// for ( int nShard : Arrays.asList(10, 100, 1000, 10000) ) { +// for ( int shardSize : Arrays.asList(10, 100) ) { // shared mem -- canonical implementation -// params.add(new TestTarget(nShard, shardSize) { -// SharedMemoryGenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker(); -// public GenomeLocProcessingTracker getTracker() { return tracker; } -// }); - - // shared file -- working implementation - params.add(new TestTarget("SharedFile", nShard, shardSize) { - SharedFileGenomeLocProcessingTracker tracker = new SharedFileGenomeLocProcessingTracker(sharedFile, genomeLocParser); + params.add(new TestTarget("SharedMem", nShard, shardSize) { + SharedMemoryGenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker(); public GenomeLocProcessingTracker getTracker() { return tracker; } }); + +// // shared file -- working implementation +// params.add(new SharedFileTest(nShard, shardSize)); } } @@ -168,9 +176,9 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { Assert.assertTrue(proc.isOwnedBy(NAME_ONE) || proc.isOwnedBy(NAME_TWO)); if ( proc.isOwnedBy(NAME_ONE) ) - Assert.assertTrue(! markedShards.contains(shard)); + Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!"); else - Assert.assertTrue(markedShards.contains(shard)); + Assert.assertTrue(markedShards.contains(shard), "Unran process wasn't marked"); if ( ! markedShards.contains(shard) ) { Assert.assertEquals(tracker.findOwner(shard), proc); @@ -181,23 +189,25 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { public class TestThread implements Callable { public TestTarget test; public String name; - public List ran; + public List ran, toRun; - public TestThread(TestTarget test, int count) { + public TestThread(TestTarget test, int count, List toRun) { this.test = test; + this.toRun = toRun; this.name = "thread" + count; this.ran = new ArrayList(); } public Integer call() { - int nShards = test.getShards().size(); - for ( GenomeLoc shard : test.getShards() ) { - if ( ran.size() < nShards / 3 ) { - GenomeLocProcessingTracker.ProcessingLoc proc = test.getTracker().claimOwnership(shard,name); - if ( proc.isOwnedBy(name) ) - ran.add(proc.getLoc()); - //logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner())); + logger.warn(String.format("Call() Thread %s", name)); + for ( GenomeLoc shard : toRun ) { + //System.out.printf("Claiming ownership in %s on %s%n", name, shard); + GenomeLocProcessingTracker.ProcessingLoc proc = test.getTracker().claimOwnership(shard,name); + //System.out.printf(" => ownership of %s is %s (I own? %b)%n", shard, proc.getOwner(), proc.isOwnedBy(name)); + if ( proc.isOwnedBy(name) ) { + ran.add(proc.getLoc()); } + //logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner())); } return 1; @@ -213,8 +223,26 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { } private static final void assertAllThreadsFinished(List> futures) { - for ( Future f : futures ) - Assert.assertTrue(f.isDone(), "Thread never finished running"); + try { + for ( Future f : futures ) { + Assert.assertTrue(f.isDone(), "Thread never finished running"); + Assert.assertTrue(f.get() != null, "Finished successfully"); + } + } catch (InterruptedException e) { + Assert.fail("Thread failed to run to completion", e); + } catch (ExecutionException e) { + Assert.fail("Thread generated an exception", e); + } + } + + private static final List subList(List l, int i) { + List r = new ArrayList(); + for ( int j = 0; j < l.size(); j++ ) { + if ( j % i == 0 ) + r.add(l.get(j)); + } + + return r; } @Test(dataProvider = "data", enabled = true) @@ -223,7 +251,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { logger.warn("ThreadedTesting " + test); List threads = new ArrayList(); for ( int i = 0; i < 4; i++) { - TestThread thread = new TestThread(test, i); + List toRun = subList(test.getShards(), i+1); + TestThread thread = new TestThread(test, i, toRun); threads.add(thread); } ExecutorService exec = java.util.concurrent.Executors.newFixedThreadPool(threads.size()); @@ -234,7 +263,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { List shards = test.getShards(); for ( TestThread thread : threads ) - logger.warn(String.format("TestThread ran %d jobs", thread.ran.size())); + logger.warn(String.format("TestThread %s ran %d jobs of %d to run", thread.name, thread.ran.size(), thread.toRun.size())); assertAllThreadsFinished(results); @@ -256,8 +285,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest { Assert.assertTrue(owner.ran.contains(shard), "Owner doesn't contain ran shard"); for ( TestThread thread : threads ) - if ( ! proc.isOwnedBy(thread.name) ) - Assert.assertFalse(thread.ran.contains(shard), "Shard appears in another run list"); + if ( ! proc.isOwnedBy(thread.name) && thread.ran.contains(shard) ) + Assert.fail("Shard appears in another run list: proc=" + proc + " shard=" + shard + " also in jobs of " + thread.name + " obj=" + thread.ran.get(thread.ran.indexOf(shard))); } } catch (InterruptedException e) {