Incremental commit for distributed computation. Appears to work but has potential deadlock situation not yet debugged. Do not use yet.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5010 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2011-01-17 21:23:09 +00:00
parent 2f4a436719
commit f8ba76d87c
14 changed files with 486 additions and 177 deletions

View File

@ -134,6 +134,11 @@ public class GenomeAnalysisEngine {
*/ */
private Collection<SamRecordFilter> filters; private Collection<SamRecordFilter> filters;
/**
* A currently hacky unique name for this GATK instance
*/
private String myName = "GATK_" + Math.abs(new Random().nextInt());
/** /**
* our walker manager * our walker manager
*/ */
@ -236,6 +241,10 @@ public class GenomeAnalysisEngine {
return walkerManager.getName(walkerType); return walkerManager.getName(walkerType);
} }
public String getName() {
return myName;
}
/** /**
* Gets a list of the filters to associate with the given walker. Will NOT initialize the engine with this filters; * Gets a list of the filters to associate with the given walker. Will NOT initialize the engine with this filters;
* the caller must handle that directly. * the caller must handle that directly.

View File

@ -217,6 +217,16 @@ public class GATKArgumentCollection {
@Hidden @Hidden
public RMDTriplet.RMDStorageType rodInputType = null; public RMDTriplet.RMDStorageType rodInputType = null;
@Element(required=false)
@Argument(fullName="processingTracker",shortName="C",doc="A lockable, shared file for coordinating distributed GATK runs",required=false)
@Hidden
public File processingTrackerFile = null;
@Element(required=false)
@Argument(fullName="restartProcessingTracker",shortName="RPT",doc="Should we delete the processing tracker file at startup?",required=false)
@Hidden
public boolean restartProcessingTracker = false;
/** /**
* marshal the data out to a object * marshal the data out to a object
* *
@ -378,9 +388,16 @@ public class GATKArgumentCollection {
(other.performanceLog != null && !other.performanceLog.equals(this.performanceLog))) (other.performanceLog != null && !other.performanceLog.equals(this.performanceLog)))
return false; return false;
if ((other.processingTrackerFile == null && this.processingTrackerFile != null) ||
(other.processingTrackerFile != null && !other.processingTrackerFile.equals(this.processingTrackerFile)))
return false;
if(rodInputType != other.rodInputType) if(rodInputType != other.rodInputType)
return false; return false;
if ( restartProcessingTracker != other.restartProcessingTracker )
return false;
return true; return true;
} }

View File

@ -90,7 +90,7 @@ public class IntervalSharder {
// If the next section of the BAM to be processed is unmapped, handle this region separately. // If the next section of the BAM to be processed is unmapped, handle this region separately.
while(locusIterator.hasNext() && nextBatch.isEmpty()) { while(locusIterator.hasNext() && nextBatch.isEmpty()) {
contig = null; contig = null;
while(locusIterator.hasNext() && (contig == null || (locusIterator.peek() != GenomeLoc.UNMAPPED && locusIterator.peek().getContig().equals(contig)))) { while(locusIterator.hasNext() && (contig == null || (!GenomeLoc.isUnmapped(locusIterator.peek()) && locusIterator.peek().getContig().equals(contig)))) {
GenomeLoc nextLocus = locusIterator.next(); GenomeLoc nextLocus = locusIterator.next();
contig = nextLocus.getContig(); contig = nextLocus.getContig();
nextBatch.add(nextLocus); nextBatch.add(nextLocus);
@ -387,7 +387,7 @@ class FilePointer {
this.referenceSequence = location.getContig(); this.referenceSequence = location.getContig();
this.overlap = null; this.overlap = null;
this.locations = Collections.singletonList(location); this.locations = Collections.singletonList(location);
this.isRegionUnmapped = location == GenomeLoc.UNMAPPED; this.isRegionUnmapped = GenomeLoc.isUnmapped(location);
} }
public FilePointer(final String referenceSequence,final BAMOverlap overlap) { public FilePointer(final String referenceSequence,final BAMOverlap overlap) {

View File

@ -36,7 +36,12 @@ public interface Shard extends Serializable {
READ, LOCUS READ, LOCUS
} }
/** @return the genome location represented by this shard */ /**
* If isUnmapped is true, than getGenomeLocs by
* definition will return a singleton list with a GenomeLoc.UNMAPPED
*
* @return the genome location represented by this shard
*/
public List<GenomeLoc> getGenomeLocs(); public List<GenomeLoc> getGenomeLocs();
/** /**

View File

@ -866,9 +866,9 @@ public class SAMDataSource implements SimpleDataSource {
// If we find a mix of mapped/unmapped intervals, throw an exception. // If we find a mix of mapped/unmapped intervals, throw an exception.
boolean foundMappedIntervals = false; boolean foundMappedIntervals = false;
for(GenomeLoc location: intervals) { for(GenomeLoc location: intervals) {
if(location != GenomeLoc.UNMAPPED) if(! GenomeLoc.isUnmapped(location))
foundMappedIntervals = true; foundMappedIntervals = true;
keepOnlyUnmappedReads |= (location == GenomeLoc.UNMAPPED); keepOnlyUnmappedReads |= GenomeLoc.isUnmapped(location);
} }

View File

@ -50,23 +50,25 @@ public class LinearMicroScheduler extends MicroScheduler {
Accumulator accumulator = Accumulator.create(engine,walker); Accumulator accumulator = Accumulator.create(engine,walker);
for (Shard shard : shardStrategy) { for (Shard shard : shardStrategy) {
// New experimental code for managing locus intervals. if ( claimShard(shard) ) {
if(shard.getShardType() == Shard.ShardType.LOCUS) { // New experimental code for managing locus intervals.
LocusWalker lWalker = (LocusWalker)walker; if(shard.getShardType() == Shard.ShardType.LOCUS) {
WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata()); LocusWalker lWalker = (LocusWalker)walker;
for(WindowMaker.WindowMakerIterator iterator: windowMaker) { WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata());
ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods); for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods);
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
accumulator.accumulate(dataProvider,result);
dataProvider.close();
}
windowMaker.close();
}
else {
ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods);
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit()); Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
accumulator.accumulate(dataProvider,result); accumulator.accumulate(dataProvider,result);
dataProvider.close(); dataProvider.close();
} }
windowMaker.close();
}
else {
ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods);
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
accumulator.accumulate(dataProvider,result);
dataProvider.close();
} }
} }

View File

@ -26,6 +26,9 @@
package org.broadinstitute.sting.gatk.executive; package org.broadinstitute.sting.gatk.executive;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.shards.Shard; import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource;
@ -43,8 +46,12 @@ import java.lang.management.ManagementFactory;
import java.util.*; import java.util.*;
import net.sf.picard.reference.IndexedFastaSequenceFile; import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.threading.GenomeLocProcessingTracker;
import org.broadinstitute.sting.utils.threading.NoOpGenomeLocProcessingTracker;
import org.broadinstitute.sting.utils.threading.SharedFileGenomeLocProcessingTracker;
import javax.management.JMException; import javax.management.JMException;
import javax.management.MBeanServer; import javax.management.MBeanServer;
@ -82,6 +89,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
private final MBeanServer mBeanServer; private final MBeanServer mBeanServer;
private final ObjectName mBeanName; private final ObjectName mBeanName;
private GenomeLocProcessingTracker processingTracker;
/** /**
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the * MicroScheduler factory function. Create a microscheduler appropriate for reducing the
* selected walker. * selected walker.
@ -149,6 +158,19 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
catch (JMException ex) { catch (JMException ex) {
throw new ReviewedStingException("Unable to register microscheduler with JMX", ex); throw new ReviewedStingException("Unable to register microscheduler with JMX", ex);
} }
// create the processing tracker
if ( engine.getArguments().processingTrackerFile != null ) {
if ( engine.getArguments().restartProcessingTracker && engine.getArguments().processingTrackerFile.exists() ) {
engine.getArguments().processingTrackerFile.delete();
logger.info("Deleting ProcessingTracker file " + engine.getArguments().processingTrackerFile);
}
processingTracker = new SharedFileGenomeLocProcessingTracker(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser());
logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile);
} else {
processingTracker = new NoOpGenomeLocProcessingTracker();
}
} }
/** /**
@ -161,6 +183,79 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
*/ */
public abstract Object execute(Walker walker, ShardStrategy shardStrategy); public abstract Object execute(Walker walker, ShardStrategy shardStrategy);
protected boolean claimShard(Shard shard) {
if ( shard.getGenomeLocs() == null ) {
if ( engine.getArguments().processingTrackerFile != null )
throw new UserException.BadArgumentValue("processingTrackerFile", "Cannot use processing tracking with unindexed data");
return true;
} else {
GenomeLoc shardSpan = shardSpan(shard);
GenomeLocProcessingTracker.ProcessingLoc proc = processingTracker.claimOwnership(shardSpan, engine.getName());
boolean actuallyProcess = proc.isOwnedBy(engine.getName());
//logger.debug(String.format("Shard %s claimed by %s => owned by me %b", shard, proc.getOwner(), actuallyProcess));
if ( ! actuallyProcess )
logger.info(String.format("DISTRIBUTED GATK: Shard %s already processed by %s", shard, proc.getOwner()));
return actuallyProcess;
}
}
private GenomeLoc shardSpan(Shard shard) {
if ( shard == null ) throw new ReviewedStingException("Shard is null!");
int start = Integer.MAX_VALUE;
int stop = Integer.MIN_VALUE;
String contig = null;
for ( GenomeLoc loc : shard.getGenomeLocs() ) {
if ( GenomeLoc.isUnmapped(loc) )
// special case the unmapped region marker, just abort out
return loc;
contig = loc.getContig();
if ( loc.getStart() < start ) start = loc.getStart();
if ( loc.getStop() > stop ) stop = loc.getStop();
}
return engine.getGenomeLocParser().createGenomeLoc(contig, start, stop);
}
// todo -- the execution code in the schedulers is duplicated and slightly different -- should be merged
// protected boolean executeShard(Walker walker, Shard shard, Accumulator accumulator) {
// if(shard.getShardType() == Shard.ShardType.LOCUS) {
// LocusWalker lWalker = (LocusWalker)walker;
//
// WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata());
//
// // ShardTraverser
//
// ShardDataProvider dataProvider = null;
// for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
// dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods);
// Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
// accumulator.accumulate(dataProvider,result);
// dataProvider.close();
// }
// if (dataProvider != null) dataProvider.close();
// windowMaker.close();
// }
// else {
// ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods);
// Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
// accumulator.accumulate(dataProvider,result);
// dataProvider.close();
// }
//
//
// // ShardTraverser
//
// for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
// accumulator = traversalEngine.traverse( walker, dataProvider, accumulator );
// dataProvider.close();
// }
//
// return true;
// }
/** /**
* Retrieves the object responsible for tracking and managing output. * Retrieves the object responsible for tracking and managing output.
* @return An output tracker, for loading data in and extracting results. Will not be null. * @return An output tracker, for loading data in and extracting results. Will not be null.

View File

@ -33,7 +33,11 @@ public class GenomeLoc implements Comparable<GenomeLoc>, Cloneable, Serializable
* the object may be used to refer to the region, as '==' comparisons are used * the object may be used to refer to the region, as '==' comparisons are used
* in comparators, etc. * in comparators, etc.
*/ */
// TODO - WARNING WARNING WARNING code somehow depends on the name of the contig being null!
public static final GenomeLoc UNMAPPED = new GenomeLoc(null,-1,0,0); public static final GenomeLoc UNMAPPED = new GenomeLoc(null,-1,0,0);
public static final boolean isUnmapped(GenomeLoc loc) {
return loc == UNMAPPED;
}
// -------------------------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------------------------
// //
@ -72,7 +76,7 @@ public class GenomeLoc implements Comparable<GenomeLoc>, Cloneable, Serializable
public final int getStart() { return this.start; } public final int getStart() { return this.start; }
public final int getStop() { return this.stop; } public final int getStop() { return this.stop; }
public final String toString() { public final String toString() {
if(this == UNMAPPED) return "unmapped"; if(GenomeLoc.isUnmapped(this)) return "unmapped";
if ( throughEndOfContigP() && atBeginningOfContigP() ) if ( throughEndOfContigP() && atBeginningOfContigP() )
return getContig(); return getContig();
else if ( throughEndOfContigP() || getStart() == getStop() ) else if ( throughEndOfContigP() || getStart() == getStop() )
@ -100,8 +104,8 @@ public class GenomeLoc implements Comparable<GenomeLoc>, Cloneable, Serializable
} }
public GenomeLoc merge( GenomeLoc that ) throws ReviewedStingException { public GenomeLoc merge( GenomeLoc that ) throws ReviewedStingException {
if(this == UNMAPPED || that == UNMAPPED) { if(GenomeLoc.isUnmapped(this) || GenomeLoc.isUnmapped(that)) {
if(this != UNMAPPED || that != UNMAPPED) if(! GenomeLoc.isUnmapped(this) || !GenomeLoc.isUnmapped(that))
throw new ReviewedStingException("Tried to merge a mapped and an unmapped genome loc"); throw new ReviewedStingException("Tried to merge a mapped and an unmapped genome loc");
return UNMAPPED; return UNMAPPED;
} }
@ -116,8 +120,8 @@ public class GenomeLoc implements Comparable<GenomeLoc>, Cloneable, Serializable
} }
public GenomeLoc intersect( GenomeLoc that ) throws ReviewedStingException { public GenomeLoc intersect( GenomeLoc that ) throws ReviewedStingException {
if(this == UNMAPPED || that == UNMAPPED) { if(GenomeLoc.isUnmapped(this) || GenomeLoc.isUnmapped(that)) {
if(this != UNMAPPED || that != UNMAPPED) if(! GenomeLoc.isUnmapped(this) || !GenomeLoc.isUnmapped(that))
throw new ReviewedStingException("Tried to intersect a mapped and an unmapped genome loc"); throw new ReviewedStingException("Tried to intersect a mapped and an unmapped genome loc");
return UNMAPPED; return UNMAPPED;
} }
@ -238,9 +242,9 @@ public class GenomeLoc implements Comparable<GenomeLoc>, Cloneable, Serializable
if ( this == that ) { if ( this == that ) {
result = 0; result = 0;
} }
else if(this == UNMAPPED) else if(GenomeLoc.isUnmapped(this))
result = 1; result = 1;
else if(that == UNMAPPED) else if(GenomeLoc.isUnmapped(that))
result = -1; result = -1;
else { else {
final int cmpContig = compareContigs(that); final int cmpContig = compareContigs(that);

View File

@ -0,0 +1,8 @@
package org.broadinstitute.sting.utils;
/**
* Indicates that this object has a genomic location and provides a systematic interface to get it.
*/
public interface HasGenomeLocation {
public GenomeLoc getLocation();
}

View File

@ -1,22 +1,21 @@
package org.broadinstitute.sting.utils.threading; package org.broadinstitute.sting.utils.threading;
import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.HasGenomeLocation;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
/** /**
* Created by IntelliJ IDEA. *
* User: depristo
* Date: 1/13/11
* Time: 9:38 AM
* To change this template use File | Settings | File Templates.
*/ */
public abstract class GenomeLocProcessingTracker { public abstract class GenomeLocProcessingTracker {
/** /**
* Information about processing locations and their owners * Information about processing locations and their owners
*/ */
public static final class ProcessingLoc { public static final class ProcessingLoc implements Comparable<ProcessingLoc> {
private final GenomeLoc loc; private final GenomeLoc loc;
private final String owner; private final String owner;
@ -55,6 +54,9 @@ public abstract class GenomeLocProcessingTracker {
return false; return false;
} }
public int compareTo(ProcessingLoc other) {
return this.getLoc().compareTo(other.getLoc());
}
} }
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
@ -73,8 +75,14 @@ public abstract class GenomeLocProcessingTracker {
return findOwner(loc) != null; return findOwner(loc) != null;
} }
public ProcessingLoc findOwner(GenomeLoc loc) { // in general this isn't true for the list of locs, as they definitely can occur out of order
for ( ProcessingLoc l : getProcessingLocs() ) { protected static ProcessingLoc findOwnerInSortedList(GenomeLoc loc, List<ProcessingLoc> locs) {
int i = Collections.binarySearch(locs, new ProcessingLoc(loc, "ignore"));
return i < 0 ? null : locs.get(i);
}
protected static ProcessingLoc findOwnerInUnsortedList(GenomeLoc loc, List<ProcessingLoc> locs) {
for ( ProcessingLoc l : locs ) {
if ( l.getLoc().equals(loc) ) if ( l.getLoc().equals(loc) )
return l; return l;
} }
@ -82,6 +90,10 @@ public abstract class GenomeLocProcessingTracker {
return null; return null;
} }
public ProcessingLoc findOwner(GenomeLoc loc) {
return findOwnerInUnsortedList(loc, getProcessingLocs());
}
/** /**
* The workhorse routine. Attempt to claim processing ownership of loc, with my name. * The workhorse routine. Attempt to claim processing ownership of loc, with my name.
* This is an atomic operation -- other threads / processes will wait until this function * This is an atomic operation -- other threads / processes will wait until this function
@ -95,6 +107,33 @@ public abstract class GenomeLocProcessingTracker {
*/ */
public abstract ProcessingLoc claimOwnership(GenomeLoc loc, String myName); public abstract ProcessingLoc claimOwnership(GenomeLoc loc, String myName);
/**
* A higher-level, and more efficient, interface to obtain the next location we own. Takes an
* iterator producing objects that support the getLocation() interface, and returns the next
* object in that stream that we can claim ownership of. Returns null if we run out of elements
* during the iteration.
*
* Can be more efficiently implemented in subclasses to avoid multiple unlocking
*
* @param iterator
* @param myName
* @return
*/
public <T extends HasGenomeLocation> T claimOwnershipOfNextAvailable(Iterator<T> iterator, String myName) {
while ( iterator.hasNext() ) {
T elt = iterator.next();
GenomeLoc loc = elt.getLocation();
ProcessingLoc proc = claimOwnership(loc, myName);
if ( proc.isOwnedBy(myName) )
return elt;
// if not, we continue our search
}
// we never found an object, just return it.
return null;
}
/** /**
* Returns the list of currently owned locations, updating the database as necessary. * Returns the list of currently owned locations, updating the database as necessary.
* DO NOT MODIFY THIS LIST! As with all parallelizing data structures, the list may be * DO NOT MODIFY THIS LIST! As with all parallelizing data structures, the list may be
@ -105,4 +144,8 @@ public abstract class GenomeLocProcessingTracker {
* @return * @return
*/ */
protected abstract List<ProcessingLoc> getProcessingLocs(); protected abstract List<ProcessingLoc> getProcessingLocs();
protected void close() {
// by default we don't do anything
}
} }

View File

@ -0,0 +1,20 @@
package org.broadinstitute.sting.utils.threading;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.Collections;
import java.util.List;
/**
* Base class, and null tracker. Always says that a GenomeLoc is ready for processing
*/
public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
return new ProcessingLoc(loc, myName);
}
protected List<ProcessingLoc> getProcessingLocs() {
return Collections.emptyList();
}
}

View File

@ -1,22 +1,13 @@
package org.broadinstitute.sting.utils.threading; package org.broadinstitute.sting.utils.threading;
import com.google.common.collect.ArrayListMultimap;
import net.sf.picard.reference.FastaSequenceIndex;
import net.sf.picard.reference.FastaSequenceIndexBuilder;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.file.FSLockWithShared;
import org.broadinstitute.sting.utils.file.FileSystemInabilityToLockException;
import org.broadinstitute.sting.utils.text.XReadLines;
import java.io.*; import java.io.*;
import java.nio.channels.ClosedChannelException; import java.nio.channels.*;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -25,123 +16,206 @@ import java.util.List;
*/ */
public class SharedFileGenomeLocProcessingTracker extends GenomeLocProcessingTracker { public class SharedFileGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
private static final boolean DEBUG = false; private static final boolean DEBUG = false;
private static final boolean REALLY_DEBUG = false;
private boolean ACTUALLY_USE_FILE_LOCK = true;
private static Logger logger = Logger.getLogger(SharedFileGenomeLocProcessingTracker.class); private static Logger logger = Logger.getLogger(SharedFileGenomeLocProcessingTracker.class);
private List<ProcessingLoc> processingLocs = new ArrayList<ProcessingLoc>();
private Object myLock = new Object();
private List<ProcessingLoc> processingLocs;
private File sharedFile = null; private File sharedFile = null;
GenomeLocParser parser; private GenomeLocParser parser;
// the file lock
private FileLock lock = null; private FileLock lock = null;
private RandomAccessFile raFile;
private long lastReadPosition = 0;
// the file channel we open // //
private FileChannel channel = null; // // TODO -- I CAN'T FOR SOME REASON GET THE FILE LOCK TESTING TO WORK WITH MULTIPLE THREADS IN THE UNIT TEST
// // TODO -- IT SEEMS THAT SOME LOCKS AREN'T BEING FREED, BUT IT DOESN'T SEEM POSSIBLE GIVEN THE CHECKS
long lastReadPosition = 0; // // TODO -- IN THE CODE. I THINK THE LOCK IS SOMEHOW CONTINUING BEYOND THE UNLOCK CALL, OR THAT I NEED
// // TODO -- TO CLOSE AND REOPEN THE CHANNEL FOR EACH LOCK?
// //
//
public SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser) { public SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser) {
this(sharedFile, parser, true);
}
protected SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, boolean useFileLock) {
processingLocs = new ArrayList<ProcessingLoc>();
ACTUALLY_USE_FILE_LOCK = false;
try { try {
this.sharedFile = sharedFile; this.sharedFile = sharedFile;
this.channel = new RandomAccessFile(sharedFile, "rw").getChannel(); this.raFile = new RandomAccessFile(sharedFile, "rws");
this.parser = parser; this.parser = parser;
} }
catch (Exception e) { catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e); throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
} }
} }
public void close() {
if ( ACTUALLY_USE_FILE_LOCK ) {
try {
this.raFile.close();
}
catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
}
private void lock() { private void lock() {
try { if ( ACTUALLY_USE_FILE_LOCK ) {
lock = channel.lock();
} catch (ClosedChannelException e) { // Precondition -- lock is always null while we don't have a lock
throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e); if ( lock != null )
} throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!");
// catch (OverlappingFileLockException e) {
// logger.debug("Unable to lock file because you already have a lock on this file."); try {
// return false; lock = raFile.getChannel().lock();
// } } catch (ClosedChannelException e) {
catch (IOException e) { throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e);
throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); } catch (FileLockInterruptionException e) {
} finally { throw new ReviewedStingException("File lock interrupted", e);
unlock(); } catch (NonWritableChannelException e) {
throw new ReviewedStingException("File channel not writable", e);
} catch (OverlappingFileLockException e) {
// this only happens when multiple threads are running, and one is waiting
// for the lock above and we come here.
throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen.");
} catch (IOException e) {
throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e);
}
} }
} }
private void unlock() { private void unlock(boolean excepting) {
try { if ( ACTUALLY_USE_FILE_LOCK ) {
lock.release();
//channel.close(); // Precondition -- lock is never null while we have a lock
} catch ( IOException e ) { if ( lock == null ) {
throw new ReviewedStingException("Could not free lock on file " + sharedFile, e); if ( ! excepting )
throw new ReviewedStingException("BUG: call to unlock() when we don't have the lock!");
} else {
if ( ! lock.isValid() )
throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!");
try {
lock.release();
lock = null;
//channel.close();
} catch ( IOException e ) {
throw new ReviewedStingException("Could not free lock on file " + sharedFile, e);
}
}
} }
} }
private void readLocs() { private List<ProcessingLoc> readLocs() {
try { if ( ACTUALLY_USE_FILE_LOCK ) {
if ( sharedFile.exists() ) { // we must have a lock to run this code
FileInputStream in = new FileInputStream(sharedFile); if ( lock == null || ! lock.isValid() ) throw new ReviewedStingException("File lock must be valid upon entry to readLocs()");
if ( in.getChannel().size() > lastReadPosition ) {
in.skip(lastReadPosition); try {
if ( raFile.length() > lastReadPosition ) {
raFile.seek(lastReadPosition);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
int counter = 0; int counter = 0;
String line = reader.readLine(); // Read another line String line = raFile.readLine(); // Read another line
while ( line != null ) { while ( line != null ) {
String[] parts = line.split(" "); String[] parts = line.split(" ");
if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line " + line); if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer());
GenomeLoc loc = parser.parseGenomeLoc(parts[0]); GenomeLoc loc = parser.parseGenomeLoc(parts[0]);
String owner = parts[1]; String owner = parts[1];
processingLocs.add(new ProcessingLoc(loc, owner)); processingLocs.add(new ProcessingLoc(loc, owner));
line = reader.readLine(); line = raFile.readLine();
counter++; counter++;
} }
lastReadPosition = in.getChannel().position(); lastReadPosition = raFile.getFilePointer();
if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, total locs is %d", if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, total locs is %d",
counter, lastReadPosition, processingLocs.size())); counter, lastReadPosition, processingLocs.size()));
} }
in.close(); } catch (FileNotFoundException e) {
throw new UserException.CouldNotReadInputFile(sharedFile, e);
} catch (IOException e) {
throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e);
} }
} catch (FileNotFoundException e) {
throw new UserException.CouldNotReadInputFile(sharedFile, e);
} catch (IOException e) {
throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e);
}
}
private void writeLoc(ProcessingLoc proc) {
try {
PrintStream out = new PrintStream(new FileOutputStream(sharedFile, true));
out.printf("%s %s%n", proc.getLoc(), proc.getOwner());
if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file", proc));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotReadInputFile(sharedFile, e);
}
}
public synchronized ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
ProcessingLoc owner = null;
try {
lock();
readLocs();
owner = super.findOwner(loc);
if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName);
writeLoc(owner);
}
} finally {
unlock();
}
return owner;
}
protected synchronized List<ProcessingLoc> getProcessingLocs() {
try {
lock();
readLocs();
} finally {
unlock();
} }
return processingLocs; return processingLocs;
} }
private void writeLoc(ProcessingLoc proc) {
if ( ACTUALLY_USE_FILE_LOCK ) {
// we must have a lock to run this code
if ( lock == null || ! lock.isValid() )
throw new ReviewedStingException("File lock must be valid upon entry to writeLoc()");
try {
String packet = String.format("%s %s%n", proc.getLoc(), proc.getOwner());
long startPos = raFile.getFilePointer();
raFile.seek(raFile.length());
raFile.write(packet.getBytes());
if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer()));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
} else {
processingLocs.add(proc);
}
}
private final void printOwners() {
for ( ProcessingLoc proc : processingLocs )
System.out.println(proc);
}
public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
if ( REALLY_DEBUG ) System.out.printf(" claimOwnership %s%n", myName);
synchronized (processingLocs) {
boolean excepting = true;
ProcessingLoc owner = null;
if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock");
if ( REALLY_DEBUG ) System.out.printf(" sync raFile %s %s%n", myName, raFile);
try {
lock();
owner = findOwnerInUnsortedList(loc, readLocs());
//owner = super.findOwner(loc);
if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName);
writeLoc(owner);
}
excepting = false;
} finally {
if ( REALLY_DEBUG ) System.out.printf(" claimOwnership unlock %s excepting %s, owner %s%n", myName, excepting, owner);
//printOwners();
unlock(excepting);
}
if ( lock != null ) throw new ReviewedStingException("BUG: exiting claimOwnership synchronized block without setting lock to null");
return owner;
}
}
protected List<ProcessingLoc> getProcessingLocs() {
synchronized (processingLocs) {
boolean excepting = true;
if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock");
try {
lock();
readLocs();
excepting = false;
} finally {
unlock(excepting);
}
if ( lock != null ) throw new ReviewedStingException("BUG: exiting getProcessingLocs synchronized block without setting lock to null");
return processingLocs;
}
}
} }

View File

@ -12,24 +12,27 @@ import java.util.List;
*/ */
public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker { public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
private static Logger logger = Logger.getLogger(SharedMemoryGenomeLocProcessingTracker.class); private static Logger logger = Logger.getLogger(SharedMemoryGenomeLocProcessingTracker.class);
private List<ProcessingLoc> processingLocs = new ArrayList<ProcessingLoc>(); protected List<ProcessingLoc> processingLocs = new ArrayList<ProcessingLoc>();
public synchronized ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
// processingLocs is a shared memory synchronized object, and this // processingLocs is a shared memory synchronized object, and this
// method is synchronized, so we can just do our processing // method is synchronized, so we can just do our processing
ProcessingLoc owner = super.findOwner(loc); synchronized (processingLocs) {
ProcessingLoc owner = super.findOwner(loc);
if ( owner == null ) { // we are unowned if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName); owner = new ProcessingLoc(loc, myName);
processingLocs.add(owner); processingLocs.add(owner);
}
return owner;
//logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner));
} }
//logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner));
return owner;
} }
protected synchronized List<ProcessingLoc> getProcessingLocs() { protected List<ProcessingLoc> getProcessingLocs() {
return processingLocs; synchronized (processingLocs) {
return processingLocs;
}
} }
} }

View File

@ -11,10 +11,7 @@ import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.exceptions.UserException;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.*;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -22,10 +19,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/** /**
* Basic unit test for GenomeLoc * Basic unit test for GenomeLoc
@ -34,10 +28,12 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
IndexedFastaSequenceFile fasta = null; IndexedFastaSequenceFile fasta = null;
GenomeLocParser genomeLocParser = null; GenomeLocParser genomeLocParser = null;
File sharedFile = new File("synchronizationFile.txt"); File sharedFile = new File("synchronizationFile.txt");
static final boolean USE_FILE_LOCK = false;
String chr1 = null; String chr1 = null;
@BeforeTest @BeforeTest
public void before() { public void before() {
logger.warn("SharedFile is " + sharedFile.getAbsolutePath());
File referenceFile = new File(hg18Reference); File referenceFile = new File(hg18Reference);
try { try {
fasta = new IndexedFastaSequenceFile(referenceFile); fasta = new IndexedFastaSequenceFile(referenceFile);
@ -50,13 +46,14 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
} }
} }
@BeforeMethod @AfterMethod
public void cleanup() { public void cleanup(Object[] data) {
if ( sharedFile.exists() ) { if ( sharedFile.exists() ) {
sharedFile.delete(); sharedFile.delete();
} }
}
((TestTarget)data[0]).getTracker().close();
}
abstract private class TestTarget { abstract private class TestTarget {
String name; String name;
@ -86,26 +83,37 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
} }
} }
private class SharedFileTest extends TestTarget {
protected SharedFileTest(int nShards, int shardSize) {
super("SharedFile", nShards, shardSize);
}
GenomeLocProcessingTracker tracker = null;
public GenomeLocProcessingTracker getTracker() {
if ( tracker == null )
//tracker = new SharedMemoryGenomeLocProcessingTracker();
tracker = new SharedFileGenomeLocProcessingTracker(sharedFile, genomeLocParser, USE_FILE_LOCK);
return tracker;
}
}
@DataProvider(name = "data") @DataProvider(name = "data")
public Object[][] createData1() { public Object[][] createData1() {
List<TestTarget> params = new ArrayList<TestTarget>(); List<TestTarget> params = new ArrayList<TestTarget>();
// for ( int nShard : Arrays.asList(10) ) { for ( int nShard : Arrays.asList(10, 100, 1000) ) {
// for ( int shardSize : Arrays.asList(10) ) { for ( int shardSize : Arrays.asList(10) ) {
for ( int nShard : Arrays.asList(10, 100, 1000, 10000) ) { // for ( int nShard : Arrays.asList(10, 100, 1000, 10000) ) {
for ( int shardSize : Arrays.asList(10, 100) ) { // for ( int shardSize : Arrays.asList(10, 100) ) {
// shared mem -- canonical implementation // shared mem -- canonical implementation
// params.add(new TestTarget(nShard, shardSize) { params.add(new TestTarget("SharedMem", nShard, shardSize) {
// SharedMemoryGenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker(); SharedMemoryGenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker();
// public GenomeLocProcessingTracker getTracker() { return tracker; }
// });
// shared file -- working implementation
params.add(new TestTarget("SharedFile", nShard, shardSize) {
SharedFileGenomeLocProcessingTracker tracker = new SharedFileGenomeLocProcessingTracker(sharedFile, genomeLocParser);
public GenomeLocProcessingTracker getTracker() { return tracker; } public GenomeLocProcessingTracker getTracker() { return tracker; }
}); });
// // shared file -- working implementation
// params.add(new SharedFileTest(nShard, shardSize));
} }
} }
@ -168,9 +176,9 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
Assert.assertTrue(proc.isOwnedBy(NAME_ONE) || proc.isOwnedBy(NAME_TWO)); Assert.assertTrue(proc.isOwnedBy(NAME_ONE) || proc.isOwnedBy(NAME_TWO));
if ( proc.isOwnedBy(NAME_ONE) ) if ( proc.isOwnedBy(NAME_ONE) )
Assert.assertTrue(! markedShards.contains(shard)); Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!");
else else
Assert.assertTrue(markedShards.contains(shard)); Assert.assertTrue(markedShards.contains(shard), "Unran process wasn't marked");
if ( ! markedShards.contains(shard) ) { if ( ! markedShards.contains(shard) ) {
Assert.assertEquals(tracker.findOwner(shard), proc); Assert.assertEquals(tracker.findOwner(shard), proc);
@ -181,23 +189,25 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
public class TestThread implements Callable<Integer> { public class TestThread implements Callable<Integer> {
public TestTarget test; public TestTarget test;
public String name; public String name;
public List<GenomeLoc> ran; public List<GenomeLoc> ran, toRun;
public TestThread(TestTarget test, int count) { public TestThread(TestTarget test, int count, List<GenomeLoc> toRun) {
this.test = test; this.test = test;
this.toRun = toRun;
this.name = "thread" + count; this.name = "thread" + count;
this.ran = new ArrayList<GenomeLoc>(); this.ran = new ArrayList<GenomeLoc>();
} }
public Integer call() { public Integer call() {
int nShards = test.getShards().size(); logger.warn(String.format("Call() Thread %s", name));
for ( GenomeLoc shard : test.getShards() ) { for ( GenomeLoc shard : toRun ) {
if ( ran.size() < nShards / 3 ) { //System.out.printf("Claiming ownership in %s on %s%n", name, shard);
GenomeLocProcessingTracker.ProcessingLoc proc = test.getTracker().claimOwnership(shard,name); GenomeLocProcessingTracker.ProcessingLoc proc = test.getTracker().claimOwnership(shard,name);
if ( proc.isOwnedBy(name) ) //System.out.printf(" => ownership of %s is %s (I own? %b)%n", shard, proc.getOwner(), proc.isOwnedBy(name));
ran.add(proc.getLoc()); if ( proc.isOwnedBy(name) ) {
//logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner())); ran.add(proc.getLoc());
} }
//logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner()));
} }
return 1; return 1;
@ -213,8 +223,26 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
} }
private static final <T> void assertAllThreadsFinished(List<Future<T>> futures) { private static final <T> void assertAllThreadsFinished(List<Future<T>> futures) {
for ( Future f : futures ) try {
Assert.assertTrue(f.isDone(), "Thread never finished running"); for ( Future f : futures ) {
Assert.assertTrue(f.isDone(), "Thread never finished running");
Assert.assertTrue(f.get() != null, "Finished successfully");
}
} catch (InterruptedException e) {
Assert.fail("Thread failed to run to completion", e);
} catch (ExecutionException e) {
Assert.fail("Thread generated an exception", e);
}
}
private static final List<GenomeLoc> subList(List<GenomeLoc> l, int i) {
List<GenomeLoc> r = new ArrayList<GenomeLoc>();
for ( int j = 0; j < l.size(); j++ ) {
if ( j % i == 0 )
r.add(l.get(j));
}
return r;
} }
@Test(dataProvider = "data", enabled = true) @Test(dataProvider = "data", enabled = true)
@ -223,7 +251,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
logger.warn("ThreadedTesting " + test); logger.warn("ThreadedTesting " + test);
List<TestThread> threads = new ArrayList<TestThread>(); List<TestThread> threads = new ArrayList<TestThread>();
for ( int i = 0; i < 4; i++) { for ( int i = 0; i < 4; i++) {
TestThread thread = new TestThread(test, i); List<GenomeLoc> toRun = subList(test.getShards(), i+1);
TestThread thread = new TestThread(test, i, toRun);
threads.add(thread); threads.add(thread);
} }
ExecutorService exec = java.util.concurrent.Executors.newFixedThreadPool(threads.size()); ExecutorService exec = java.util.concurrent.Executors.newFixedThreadPool(threads.size());
@ -234,7 +263,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
List<GenomeLoc> shards = test.getShards(); List<GenomeLoc> shards = test.getShards();
for ( TestThread thread : threads ) for ( TestThread thread : threads )
logger.warn(String.format("TestThread ran %d jobs", thread.ran.size())); logger.warn(String.format("TestThread %s ran %d jobs of %d to run", thread.name, thread.ran.size(), thread.toRun.size()));
assertAllThreadsFinished(results); assertAllThreadsFinished(results);
@ -256,8 +285,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
Assert.assertTrue(owner.ran.contains(shard), "Owner doesn't contain ran shard"); Assert.assertTrue(owner.ran.contains(shard), "Owner doesn't contain ran shard");
for ( TestThread thread : threads ) for ( TestThread thread : threads )
if ( ! proc.isOwnedBy(thread.name) ) if ( ! proc.isOwnedBy(thread.name) && thread.ran.contains(shard) )
Assert.assertFalse(thread.ran.contains(shard), "Shard appears in another run list"); Assert.fail("Shard appears in another run list: proc=" + proc + " shard=" + shard + " also in jobs of " + thread.name + " obj=" + thread.ran.get(thread.ran.indexOf(shard)));
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {