V2 cleaner, easily testing, shared memory and distributed GATK job management. Serious unit testing. Very much cleaner processing. Some code cleanup remains in removing now unused classes but the system is ready for general testing. Confirmed that one can run the UG 100 ways parallel without error, but edge cases may remain.

See documentation at:

http://www.broadinstitute.org/gsa/wiki/index.php/Parallelism_and_the_GATK#Distributed_Parallelism_.28Experimental.29

for examples on how to run this, or the testing Scala script

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5032 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2011-01-20 12:58:13 +00:00
parent 41c8552d0a
commit 85553cf5cb
12 changed files with 874 additions and 452 deletions

View File

@ -410,7 +410,7 @@ public class GenomeAnalysisEngine {
region.add(getGenomeLocParser().createGenomeLoc(sequenceRecord.getSequenceName(),1,sequenceRecord.getSequenceLength()));
}
return new MonolithicShardStrategy(readsDataSource,shardType,region);
return new MonolithicShardStrategy(getGenomeLocParser(), readsDataSource,shardType,region);
}
ShardStrategy shardStrategy = null;

View File

@ -49,26 +49,26 @@ public class LinearMicroScheduler extends MicroScheduler {
walker.initialize();
Accumulator accumulator = Accumulator.create(engine,walker);
for (Shard shard : shardStrategy) {
if ( claimShard(shard) ) {
// New experimental code for managing locus intervals.
if(shard.getShardType() == Shard.ShardType.LOCUS) {
LocusWalker lWalker = (LocusWalker)walker;
WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata());
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods);
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
accumulator.accumulate(dataProvider,result);
dataProvider.close();
}
windowMaker.close();
}
else {
ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods);
for (Shard shard : processingTracker.onlyOwned(shardStrategy, engine.getName())) {
if ( shard == null ) // we ran out of shards that aren't owned
break;
if(shard.getShardType() == Shard.ShardType.LOCUS) {
LocusWalker lWalker = (LocusWalker)walker;
WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata());
for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
ShardDataProvider dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods);
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
accumulator.accumulate(dataProvider,result);
dataProvider.close();
}
windowMaker.close();
}
else {
ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods);
Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
accumulator.accumulate(dataProvider,result);
dataProvider.close();
}
}

View File

@ -26,9 +26,6 @@
package org.broadinstitute.sting.gatk.executive;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ReadShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource;
@ -41,7 +38,6 @@ import org.broadinstitute.sting.gatk.iterators.NullSAMIterator;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.gatk.ReadMetrics;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.*;
@ -50,8 +46,7 @@ import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.threading.GenomeLocProcessingTracker;
import org.broadinstitute.sting.utils.threading.NoOpGenomeLocProcessingTracker;
import org.broadinstitute.sting.utils.threading.SharedFileGenomeLocProcessingTracker;
import org.broadinstitute.sting.utils.threading.ProcessingLoc;
import javax.management.JMException;
import javax.management.MBeanServer;
@ -63,7 +58,8 @@ import javax.management.ObjectName;
* User: mhanna
* Date: Apr 26, 2009
* Time: 12:37:23 PM
* To change this template use File | Settings | File Templates.
*
* General base class for all scheduling algorithms
*/
/** Shards and schedules data in manageable chunks. */
@ -89,7 +85,7 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
private final MBeanServer mBeanServer;
private final ObjectName mBeanName;
private GenomeLocProcessingTracker processingTracker;
protected GenomeLocProcessingTracker processingTracker;
/**
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
@ -104,6 +100,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* @return The best-fit microscheduler.
*/
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, int nThreadsToUse) {
if (engine.getArguments().processingTrackerFile != null) {
if ( walker instanceof ReadWalker )
throw new UserException.BadArgumentValue("C", String.format("Distributed GATK processing not enabled for read walkers"));
}
if (walker instanceof TreeReducible && nThreadsToUse > 1) {
if(walker.isReduceByInterval())
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
@ -166,10 +167,10 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
logger.info("Deleting ProcessingTracker file " + engine.getArguments().processingTrackerFile);
}
processingTracker = new SharedFileGenomeLocProcessingTracker(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser());
processingTracker = GenomeLocProcessingTracker.createFileBackedDistributed(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser());
logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile);
} else {
processingTracker = new NoOpGenomeLocProcessingTracker();
processingTracker = GenomeLocProcessingTracker.createNoOp();
}
}
@ -183,79 +184,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
*/
public abstract Object execute(Walker walker, ShardStrategy shardStrategy);
protected boolean claimShard(Shard shard) {
if ( shard.getGenomeLocs() == null ) {
if ( engine.getArguments().processingTrackerFile != null )
throw new UserException.BadArgumentValue("processingTrackerFile", "Cannot use processing tracking with unindexed data");
return true;
} else {
GenomeLoc shardSpan = shardSpan(shard);
GenomeLocProcessingTracker.ProcessingLoc proc = processingTracker.claimOwnership(shardSpan, engine.getName());
boolean actuallyProcess = proc.isOwnedBy(engine.getName());
//logger.debug(String.format("Shard %s claimed by %s => owned by me %b", shard, proc.getOwner(), actuallyProcess));
if ( ! actuallyProcess )
logger.info(String.format("DISTRIBUTED GATK: Shard %s already processed by %s", shard, proc.getOwner()));
return actuallyProcess;
}
}
private GenomeLoc shardSpan(Shard shard) {
if ( shard == null ) throw new ReviewedStingException("Shard is null!");
int start = Integer.MAX_VALUE;
int stop = Integer.MIN_VALUE;
String contig = null;
for ( GenomeLoc loc : shard.getGenomeLocs() ) {
if ( GenomeLoc.isUnmapped(loc) )
// special case the unmapped region marker, just abort out
return loc;
contig = loc.getContig();
if ( loc.getStart() < start ) start = loc.getStart();
if ( loc.getStop() > stop ) stop = loc.getStop();
}
return engine.getGenomeLocParser().createGenomeLoc(contig, start, stop);
}
// todo -- the execution code in the schedulers is duplicated and slightly different -- should be merged
// protected boolean executeShard(Walker walker, Shard shard, Accumulator accumulator) {
// if(shard.getShardType() == Shard.ShardType.LOCUS) {
// LocusWalker lWalker = (LocusWalker)walker;
//
// WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), lWalker.getDiscards(), engine.getSampleMetadata());
//
// // ShardTraverser
//
// ShardDataProvider dataProvider = null;
// for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
// dataProvider = new LocusShardDataProvider(shard,iterator.getSourceInfo(),engine.getGenomeLocParser(),iterator.getLocus(),iterator,reference,rods);
// Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
// accumulator.accumulate(dataProvider,result);
// dataProvider.close();
// }
// if (dataProvider != null) dataProvider.close();
// windowMaker.close();
// }
// else {
// ShardDataProvider dataProvider = new ReadShardDataProvider(shard,engine.getGenomeLocParser(),getReadIterator(shard),reference,rods);
// Object result = traversalEngine.traverse(walker, dataProvider, accumulator.getReduceInit());
// accumulator.accumulate(dataProvider,result);
// dataProvider.close();
// }
//
//
// // ShardTraverser
//
// for(WindowMaker.WindowMakerIterator iterator: windowMaker) {
// accumulator = traversalEngine.traverse( walker, dataProvider, accumulator );
// dataProvider.close();
// }
//
// return true;
// }
/**
* Retrieves the object responsible for tracking and managing output.
* @return An output tracker, for loading data in and extracting results. Will not be null.

View File

@ -0,0 +1,15 @@
package org.broadinstitute.sting.utils.threading;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by IntelliJ IDEA.
* User: depristo
* Date: 1/19/11
* Time: 9:50 AM
*
* Simple extension of a ReentrantLock that supports a close method
*/
public class ClosableReentrantLock extends ReentrantLock {
public void close() {}
}

View File

@ -0,0 +1,93 @@
package org.broadinstitute.sting.utils.threading;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
/**
* Keeps a copy of the processing locks in a file, in addition to tracking in memory via the base class
*/
public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class);
private static final boolean DEBUG = false;
private File sharedFile = null;
private GenomeLocParser parser;
private RandomAccessFile raFile;
private long lastReadPosition = 0;
protected FileBackedGenomeLocProcessingTracker(File sharedFile, RandomAccessFile raFile, GenomeLocParser parser, ClosableReentrantLock lock) {
super(lock);
this.sharedFile = sharedFile;
this.raFile = raFile;
this.parser = parser;
}
protected void close() {
super.close();
try {
raFile.close();
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
@Override
protected List<ProcessingLoc> readNewLocs() {
List<ProcessingLoc> newPLocs = new ArrayList<ProcessingLoc>(); // todo -- gratitous object creation
try {
//logger.warn(String.format("Reading new locs at: file.length=%d last=%d", raFile.length(), lastReadPosition));
if ( raFile.length() > lastReadPosition ) {
raFile.seek(lastReadPosition);
int counter = 0;
String line = raFile.readLine(); // Read another line
while ( line != null ) {
String[] parts = line.split(" ");
if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer());
ProcessingLoc ploc = new ProcessingLoc(parser.parseGenomeLoc(parts[0]), parts[1]);
//logger.warn(" Read " + ploc);
newPLocs.add(ploc);
line = raFile.readLine();
counter++;
}
lastReadPosition = raFile.getFilePointer();
if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, # read new locs is %d",
counter, lastReadPosition, newPLocs.size()));
}
} catch (FileNotFoundException e) {
throw new UserException.CouldNotReadInputFile(sharedFile, e);
} catch (IOException e) {
throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e);
}
return newPLocs;
}
@Override
protected void registerNewLoc(ProcessingLoc proc) {
try {
String packet = String.format("%s %s%n", proc.getLocation(), proc.getOwner());
long startPos = raFile.getFilePointer();
raFile.seek(raFile.length());
raFile.write(packet.getBytes());
if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer()));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
}

View File

@ -1,62 +1,68 @@
package org.broadinstitute.sting.utils.threading;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.HasGenomeLocation;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
/**
*
*/
public abstract class GenomeLocProcessingTracker {
/**
* Information about processing locations and their owners
*/
public static final class ProcessingLoc implements Comparable<ProcessingLoc> {
private final GenomeLoc loc;
private final String owner;
private static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class);
private Map<GenomeLoc, ProcessingLoc> processingLocs;
private ClosableReentrantLock lock;
private long nLockingEvents = 0;
/**
* Create a loc that's already owned
* @param loc
* @param owner
*/
public ProcessingLoc(GenomeLoc loc, String owner) {
if ( loc == null || owner == null ) {
throw new ReviewedStingException("BUG: invalid ProcessingLoc detected: " + loc + " owner " + owner);
}
// --------------------------------------------------------------------------------
//
// Factory methods for creating ProcessingTrackers
//
// --------------------------------------------------------------------------------
this.loc = loc;
this.owner = owner;
public static NoOpGenomeLocProcessingTracker createNoOp() {
return new NoOpGenomeLocProcessingTracker();
}
public static SharedMemoryGenomeLocProcessingTracker createSharedMemory() {
return new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock());
}
public static FileBackedGenomeLocProcessingTracker createFileBackedThreaded(File sharedFile, GenomeLocParser parser) {
return createFileBacked(sharedFile, parser, false);
}
public static FileBackedGenomeLocProcessingTracker createFileBackedDistributed(File sharedFile, GenomeLocParser parser) {
return createFileBacked(sharedFile, parser, true);
}
private static FileBackedGenomeLocProcessingTracker createFileBacked(File sharedFile, GenomeLocParser parser, boolean useFileLockToo) {
try {
//logger.warn("Creating file backed GLPT at " + sharedFile);
RandomAccessFile raFile = new RandomAccessFile(sharedFile, "rws");
ClosableReentrantLock lock = useFileLockToo ? new SharedFileThreadSafeLock(raFile.getChannel()) : new ClosableReentrantLock();
return new FileBackedGenomeLocProcessingTracker(sharedFile, raFile, parser, lock);
}
public GenomeLoc getLoc() {
return loc;
catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
public String getOwner() {
return owner;
}
public boolean isOwnedBy(String name) {
return getOwner().equals(name);
}
public String toString() { return String.format("ProcessingLoc(%s,%s)", loc, owner); }
public boolean equals(Object other) {
if (other instanceof ProcessingLoc )
return this.loc.equals(((ProcessingLoc)other).loc) && this.owner.equals(((ProcessingLoc)other).owner);
else
return false;
}
public int compareTo(ProcessingLoc other) {
return this.getLoc().compareTo(other.getLoc());
}
// --------------------------------------------------------------------------------
//
// Creating ProcessingTrackers
//
// --------------------------------------------------------------------------------
public GenomeLocProcessingTracker(ClosableReentrantLock lock) {
processingLocs = new HashMap<GenomeLoc, ProcessingLoc>();
this.lock = lock;
}
// --------------------------------------------------------------------------------
@ -64,6 +70,7 @@ public abstract class GenomeLocProcessingTracker {
// Code to claim intervals for processing and query for their ownership
//
// --------------------------------------------------------------------------------
/**
* Queries the current database if a location is owned. Does not guarantee that the
* loc can be owned in a future call, though.
@ -75,23 +82,12 @@ public abstract class GenomeLocProcessingTracker {
return findOwner(loc) != null;
}
// in general this isn't true for the list of locs, as they definitely can occur out of order
protected static ProcessingLoc findOwnerInSortedList(GenomeLoc loc, List<ProcessingLoc> locs) {
int i = Collections.binarySearch(locs, new ProcessingLoc(loc, "ignore"));
return i < 0 ? null : locs.get(i);
}
protected static ProcessingLoc findOwnerInUnsortedList(GenomeLoc loc, List<ProcessingLoc> locs) {
for ( ProcessingLoc l : locs ) {
if ( l.getLoc().equals(loc) )
return l;
}
return null;
}
public ProcessingLoc findOwner(GenomeLoc loc) {
return findOwnerInUnsortedList(loc, getProcessingLocs());
// fast path to check if we already have the existing genome loc in memory for ownership claims
// getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it
// unless necessary
ProcessingLoc x = findOwnerInMap(loc, processingLocs);
return x == null ? findOwnerInMap(loc, updateLocs()) : x;
}
/**
@ -105,7 +101,24 @@ public abstract class GenomeLocProcessingTracker {
* @param myName
* @return
*/
public abstract ProcessingLoc claimOwnership(GenomeLoc loc, String myName);
public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
// processingLocs is a shared memory synchronized object, and this
// method is synchronized, so we can just do our processing
lock();
try {
ProcessingLoc owner = findOwner(loc);
if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName);
registerNewLoc(owner);
}
return owner;
//logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner));
} finally {
unlock();
}
}
/**
* A higher-level, and more efficient, interface to obtain the next location we own. Takes an
@ -120,18 +133,66 @@ public abstract class GenomeLocProcessingTracker {
* @return
*/
public <T extends HasGenomeLocation> T claimOwnershipOfNextAvailable(Iterator<T> iterator, String myName) {
while ( iterator.hasNext() ) {
T elt = iterator.next();
GenomeLoc loc = elt.getLocation();
ProcessingLoc proc = claimOwnership(loc, myName);
OwnershipIterator<T> myIt = new OwnershipIterator<T>(iterator, myName);
return myIt.next();
}
if ( proc.isOwnedBy(myName) )
return elt;
// if not, we continue our search
public <T extends HasGenomeLocation> Iterable<T> onlyOwned(Iterator<T> iterator, String myName) {
return new OwnershipIterator<T>(iterator, myName);
}
protected class OwnershipIterator<T extends HasGenomeLocation> implements Iterator<T>, Iterable<T> {
Iterator<T> subit;
String myName;
public OwnershipIterator(Iterator<T> subit, String myName) {
this.subit = subit;
this.myName = myName;
}
// we never found an object, just return it.
return null;
/**
* Will return true for all elements of subit, even if we can't get ownership of some of the future
* elements and so will return null there
* @return
*/
public boolean hasNext() {
return subit.hasNext();
}
/**
* High performance iterator that only locks and unlocks once per claimed object found. Avoids
* locking / unlocking for each query
*
* @return an object of type T owned by this thread, or null if none of the remaining object could be claimed
*/
public T next() {
lock();
try {
while ( subit.hasNext() ) {
T elt = subit.next();
//logger.warn("Checking elt for ownership " + elt);
GenomeLoc loc = elt.getLocation();
ProcessingLoc proc = claimOwnership(loc, myName);
if ( proc.isOwnedBy(myName) )
return elt;
// if not, we continue our search
}
// we never found an object, just return it.
return null;
} finally {
unlock();
}
}
public void remove() {
throw new UnsupportedOperationException();
}
public Iterator<T> iterator() {
return this;
}
}
/**
@ -143,9 +204,64 @@ public abstract class GenomeLocProcessingTracker {
*
* @return
*/
protected abstract List<ProcessingLoc> getProcessingLocs();
protected Collection<ProcessingLoc> getProcessingLocs() {
return updateLocs().values();
}
private Map<GenomeLoc, ProcessingLoc> updateLocs() {
lock();
try {
for ( ProcessingLoc p : readNewLocs() )
processingLocs.put(p.getLocation(), p);
return processingLocs;
} finally {
unlock();
}
}
// --------------------------------------------------------------------------------
//
// Low-level accessors / manipulators and utility functions
//
// --------------------------------------------------------------------------------
private final void lock() {
if ( ! lock.isHeldByCurrentThread() )
nLockingEvents++;
lock.lock();
}
private final void unlock() {
lock.unlock();
}
protected static ProcessingLoc findOwnerInCollection(GenomeLoc loc, Collection<ProcessingLoc> locs) {
for ( ProcessingLoc l : locs ) {
if ( l.getLocation().equals(loc) )
return l;
}
return null;
}
protected static ProcessingLoc findOwnerInMap(GenomeLoc loc, Map<GenomeLoc,ProcessingLoc> locs) {
return locs.get(loc);
}
// --------------------------------------------------------------------------------
//
// Code to override to change the dynamics of the the GenomeLocProcessingTracker
//
// --------------------------------------------------------------------------------
protected void close() {
lock.close();
logger.warn("Locking events: " + nLockingEvents);
// by default we don't do anything
}
protected abstract void registerNewLoc(ProcessingLoc loc);
protected abstract Collection<ProcessingLoc> readNewLocs();
}

View File

@ -3,6 +3,7 @@ package org.broadinstitute.sting.utils.threading;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -10,11 +11,27 @@ import java.util.List;
* Base class, and null tracker. Always says that a GenomeLoc is ready for processing
*/
public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
protected NoOpGenomeLocProcessingTracker() {
super(new ClosableReentrantLock()); // todo -- should be lighter weight
}
@Override
public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
return new ProcessingLoc(loc, myName);
}
@Override
protected List<ProcessingLoc> getProcessingLocs() {
return Collections.emptyList();
}
@Override
protected void registerNewLoc(ProcessingLoc loc) {
;
}
@Override
protected List<ProcessingLoc> readNewLocs() {
return Collections.emptyList();
}
}

View File

@ -0,0 +1,57 @@
package org.broadinstitute.sting.utils.threading;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.HasGenomeLocation;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
/**
* Created by IntelliJ IDEA.
* User: depristo
* Date: 1/19/11
* Time: 8:06 AM
*
* Information about processing locations and their owners
*/
public class ProcessingLoc implements HasGenomeLocation {
private final GenomeLoc loc;
private final String owner;
/**
* Create a loc that's already owned
* @param loc
* @param owner
*/
public ProcessingLoc(GenomeLoc loc, String owner) {
if ( loc == null || owner == null ) {
throw new ReviewedStingException("BUG: invalid ProcessingLoc detected: " + loc + " owner " + owner);
}
this.loc = loc;
this.owner = owner;
}
public GenomeLoc getLocation() {
return loc;
}
public String getOwner() {
return owner;
}
public boolean isOwnedBy(String name) {
return getOwner().equals(name);
}
public String toString() { return String.format("ProcessingLoc(%s,%s)", loc, owner); }
public boolean equals(Object other) {
if (other instanceof ProcessingLoc )
return this.loc.equals(((ProcessingLoc)other).loc) && this.owner.equals(((ProcessingLoc)other).owner);
else
return false;
}
public int compareTo(ProcessingLoc other) {
return this.getLocation().compareTo(other.getLocation());
}
}

View File

@ -14,208 +14,208 @@ import java.util.List;
/**
*
*/
public class SharedFileGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
private static final boolean DEBUG = false;
private static final boolean REALLY_DEBUG = false;
private boolean ACTUALLY_USE_FILE_LOCK = true;
private static Logger logger = Logger.getLogger(SharedFileGenomeLocProcessingTracker.class);
private Object myLock = new Object();
private List<ProcessingLoc> processingLocs;
private File sharedFile = null;
private GenomeLocParser parser;
private FileLock lock = null;
private RandomAccessFile raFile;
private long lastReadPosition = 0;
// //
// // TODO -- I CAN'T FOR SOME REASON GET THE FILE LOCK TESTING TO WORK WITH MULTIPLE THREADS IN THE UNIT TEST
// // TODO -- IT SEEMS THAT SOME LOCKS AREN'T BEING FREED, BUT IT DOESN'T SEEM POSSIBLE GIVEN THE CHECKS
// // TODO -- IN THE CODE. I THINK THE LOCK IS SOMEHOW CONTINUING BEYOND THE UNLOCK CALL, OR THAT I NEED
// // TODO -- TO CLOSE AND REOPEN THE CHANNEL FOR EACH LOCK?
// //
//public class SharedFileGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
// private static final boolean DEBUG = false;
// private static final boolean REALLY_DEBUG = false;
//
public SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser) {
this(sharedFile, parser, true);
}
protected SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, boolean useFileLock) {
processingLocs = new ArrayList<ProcessingLoc>();
ACTUALLY_USE_FILE_LOCK = false;
try {
this.sharedFile = sharedFile;
this.raFile = new RandomAccessFile(sharedFile, "rws");
this.parser = parser;
}
catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
public void close() {
if ( ACTUALLY_USE_FILE_LOCK ) {
try {
this.raFile.close();
}
catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
}
private void lock() {
if ( ACTUALLY_USE_FILE_LOCK ) {
// Precondition -- lock is always null while we don't have a lock
if ( lock != null )
throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!");
try {
lock = raFile.getChannel().lock();
} catch (ClosedChannelException e) {
throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e);
} catch (FileLockInterruptionException e) {
throw new ReviewedStingException("File lock interrupted", e);
} catch (NonWritableChannelException e) {
throw new ReviewedStingException("File channel not writable", e);
} catch (OverlappingFileLockException e) {
// this only happens when multiple threads are running, and one is waiting
// for the lock above and we come here.
throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen.");
} catch (IOException e) {
throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e);
}
}
}
private void unlock(boolean excepting) {
if ( ACTUALLY_USE_FILE_LOCK ) {
// Precondition -- lock is never null while we have a lock
if ( lock == null ) {
if ( ! excepting )
throw new ReviewedStingException("BUG: call to unlock() when we don't have the lock!");
} else {
if ( ! lock.isValid() )
throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!");
try {
lock.release();
lock = null;
//channel.close();
} catch ( IOException e ) {
throw new ReviewedStingException("Could not free lock on file " + sharedFile, e);
}
}
}
}
private List<ProcessingLoc> readLocs() {
if ( ACTUALLY_USE_FILE_LOCK ) {
// we must have a lock to run this code
if ( lock == null || ! lock.isValid() ) throw new ReviewedStingException("File lock must be valid upon entry to readLocs()");
try {
if ( raFile.length() > lastReadPosition ) {
raFile.seek(lastReadPosition);
int counter = 0;
String line = raFile.readLine(); // Read another line
while ( line != null ) {
String[] parts = line.split(" ");
if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer());
GenomeLoc loc = parser.parseGenomeLoc(parts[0]);
String owner = parts[1];
processingLocs.add(new ProcessingLoc(loc, owner));
line = raFile.readLine();
counter++;
}
lastReadPosition = raFile.getFilePointer();
if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, total locs is %d",
counter, lastReadPosition, processingLocs.size()));
}
} catch (FileNotFoundException e) {
throw new UserException.CouldNotReadInputFile(sharedFile, e);
} catch (IOException e) {
throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e);
}
}
return processingLocs;
}
private void writeLoc(ProcessingLoc proc) {
if ( ACTUALLY_USE_FILE_LOCK ) {
// we must have a lock to run this code
if ( lock == null || ! lock.isValid() )
throw new ReviewedStingException("File lock must be valid upon entry to writeLoc()");
try {
String packet = String.format("%s %s%n", proc.getLoc(), proc.getOwner());
long startPos = raFile.getFilePointer();
raFile.seek(raFile.length());
raFile.write(packet.getBytes());
if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer()));
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
} else {
processingLocs.add(proc);
}
}
private final void printOwners() {
for ( ProcessingLoc proc : processingLocs )
System.out.println(proc);
}
public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
if ( REALLY_DEBUG ) System.out.printf(" claimOwnership %s%n", myName);
synchronized (processingLocs) {
boolean excepting = true;
ProcessingLoc owner = null;
if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock");
if ( REALLY_DEBUG ) System.out.printf(" sync raFile %s %s%n", myName, raFile);
try {
lock();
owner = findOwnerInUnsortedList(loc, readLocs());
//owner = super.findOwner(loc);
if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName);
writeLoc(owner);
}
excepting = false;
} finally {
if ( REALLY_DEBUG ) System.out.printf(" claimOwnership unlock %s excepting %s, owner %s%n", myName, excepting, owner);
//printOwners();
unlock(excepting);
}
if ( lock != null ) throw new ReviewedStingException("BUG: exiting claimOwnership synchronized block without setting lock to null");
return owner;
}
}
protected List<ProcessingLoc> getProcessingLocs() {
synchronized (processingLocs) {
boolean excepting = true;
if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock");
try {
lock();
readLocs();
excepting = false;
} finally {
unlock(excepting);
}
if ( lock != null ) throw new ReviewedStingException("BUG: exiting getProcessingLocs synchronized block without setting lock to null");
return processingLocs;
}
}
}
// private boolean ACTUALLY_USE_FILE_LOCK = true;
//
// private static Logger logger = Logger.getLogger(SharedFileGenomeLocProcessingTracker.class);
//
// private Object myLock = new Object();
// private List<ProcessingLoc> processingLocs;
// private File sharedFile = null;
// private GenomeLocParser parser;
// private FileLock lock = null;
// private RandomAccessFile raFile;
// private long lastReadPosition = 0;
//
//// //
//// // TODO -- I CAN'T FOR SOME REASON GET THE FILE LOCK TESTING TO WORK WITH MULTIPLE THREADS IN THE UNIT TEST
//// // TODO -- IT SEEMS THAT SOME LOCKS AREN'T BEING FREED, BUT IT DOESN'T SEEM POSSIBLE GIVEN THE CHECKS
//// // TODO -- IN THE CODE. I THINK THE LOCK IS SOMEHOW CONTINUING BEYOND THE UNLOCK CALL, OR THAT I NEED
//// // TODO -- TO CLOSE AND REOPEN THE CHANNEL FOR EACH LOCK?
//// //
////
// public SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser) {
// this(sharedFile, parser, true);
// }
//
// protected SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, boolean useFileLock) {
// processingLocs = new ArrayList<ProcessingLoc>();
// ACTUALLY_USE_FILE_LOCK = false;
// try {
// this.sharedFile = sharedFile;
// this.raFile = new RandomAccessFile(sharedFile, "rws");
// this.parser = parser;
// }
// catch (FileNotFoundException e) {
// throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
// }
// }
//
// public void close() {
// if ( ACTUALLY_USE_FILE_LOCK ) {
// try {
// this.raFile.close();
// }
// catch (IOException e) {
// throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
// }
// }
// }
//
// private void lock() {
// if ( ACTUALLY_USE_FILE_LOCK ) {
//
// // Precondition -- lock is always null while we don't have a lock
// if ( lock != null )
// throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!");
//
// try {
// lock = raFile.getChannel().lock();
// } catch (ClosedChannelException e) {
// throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e);
// } catch (FileLockInterruptionException e) {
// throw new ReviewedStingException("File lock interrupted", e);
// } catch (NonWritableChannelException e) {
// throw new ReviewedStingException("File channel not writable", e);
// } catch (OverlappingFileLockException e) {
// // this only happens when multiple threads are running, and one is waiting
// // for the lock above and we come here.
// throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen.");
// } catch (IOException e) {
// throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e);
// }
// }
// }
//
// private void unlock(boolean excepting) {
// if ( ACTUALLY_USE_FILE_LOCK ) {
//
// // Precondition -- lock is never null while we have a lock
// if ( lock == null ) {
// if ( ! excepting )
// throw new ReviewedStingException("BUG: call to unlock() when we don't have the lock!");
// } else {
// if ( ! lock.isValid() )
// throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!");
// try {
// lock.release();
// lock = null;
// //channel.close();
// } catch ( IOException e ) {
// throw new ReviewedStingException("Could not free lock on file " + sharedFile, e);
// }
// }
// }
// }
//
// private List<ProcessingLoc> readLocs() {
// if ( ACTUALLY_USE_FILE_LOCK ) {
// // we must have a lock to run this code
// if ( lock == null || ! lock.isValid() ) throw new ReviewedStingException("File lock must be valid upon entry to readLocs()");
//
// try {
// if ( raFile.length() > lastReadPosition ) {
// raFile.seek(lastReadPosition);
//
// int counter = 0;
// String line = raFile.readLine(); // Read another line
// while ( line != null ) {
// String[] parts = line.split(" ");
// if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer());
// GenomeLoc loc = parser.parseGenomeLoc(parts[0]);
// String owner = parts[1];
// processingLocs.add(new ProcessingLoc(loc, owner));
// line = raFile.readLine();
// counter++;
// }
// lastReadPosition = raFile.getFilePointer();
// if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, total locs is %d",
// counter, lastReadPosition, processingLocs.size()));
// }
// } catch (FileNotFoundException e) {
// throw new UserException.CouldNotReadInputFile(sharedFile, e);
// } catch (IOException e) {
// throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e);
// }
// }
//
// return processingLocs;
// }
//
// private void writeLoc(ProcessingLoc proc) {
// if ( ACTUALLY_USE_FILE_LOCK ) {
// // we must have a lock to run this code
// if ( lock == null || ! lock.isValid() )
// throw new ReviewedStingException("File lock must be valid upon entry to writeLoc()");
//
// try {
// String packet = String.format("%s %s%n", proc.getLoc(), proc.getOwner());
// long startPos = raFile.getFilePointer();
// raFile.seek(raFile.length());
// raFile.write(packet.getBytes());
// if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", proc, startPos, packet.length(), raFile.getFilePointer()));
// } catch (FileNotFoundException e) {
// throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
// } catch (IOException e) {
// throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
// }
// } else {
// processingLocs.add(proc);
// }
// }
//
// private final void printOwners() {
// for ( ProcessingLoc proc : processingLocs )
// System.out.println(proc);
// }
//
// public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
// if ( REALLY_DEBUG ) System.out.printf(" claimOwnership %s%n", myName);
// synchronized (processingLocs) {
// boolean excepting = true;
// ProcessingLoc owner = null;
//
// if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock");
//
// if ( REALLY_DEBUG ) System.out.printf(" sync raFile %s %s%n", myName, raFile);
// try {
// lock();
// owner = findOwnerInUnsortedList(loc, readLocs());
// //owner = super.findOwner(loc);
// if ( owner == null ) { // we are unowned
// owner = new ProcessingLoc(loc, myName);
// writeLoc(owner);
// }
// excepting = false;
// } finally {
// if ( REALLY_DEBUG ) System.out.printf(" claimOwnership unlock %s excepting %s, owner %s%n", myName, excepting, owner);
// //printOwners();
// unlock(excepting);
// }
//
// if ( lock != null ) throw new ReviewedStingException("BUG: exiting claimOwnership synchronized block without setting lock to null");
// return owner;
// }
// }
//
// protected List<ProcessingLoc> getProcessingLocs() {
// synchronized (processingLocs) {
// boolean excepting = true;
// if ( lock != null ) throw new ReviewedStingException("BUG: into claimOwnership synchronized block while another thread owns the lock");
//
// try {
// lock();
// readLocs();
// excepting = false;
// } finally {
// unlock(excepting);
// }
//
// if ( lock != null ) throw new ReviewedStingException("BUG: exiting getProcessingLocs synchronized block without setting lock to null");
// return processingLocs;
// }
// }
//}

View File

@ -0,0 +1,114 @@
package org.broadinstitute.sting.utils.threading;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.IOException;
import java.nio.channels.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* User: depristo
* Date: 1/19/11
* Time: 8:24 AM
*
* A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common
* file in the file system. It itself a shared memory reenterant lock to managed thread safety and a
* FileChannel FileLock to handle the file integrity
*/
public class SharedFileThreadSafeLock extends ClosableReentrantLock {
private static Logger logger = Logger.getLogger(SharedFileThreadSafeLock.class);
private static final boolean DEBUG = false;
/** The file lock itself that guards the file */
FileLock fileLock;
/** the channel object that 'owns' the file lock, and we use to request the lock */
FileChannel channel;
int fileLockReentrantCounter = 0;
/**
* Create a SharedFileThreadSafeLock object locking the file associated with channel
* @param channel
*/
public SharedFileThreadSafeLock(FileChannel channel) {
super();
this.channel = channel;
}
public void close() {
try {
channel.close();
}
catch (IOException e) {
throw new UserException("Count not close channel " + channel, e);
}
}
/**
* Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be
* unwound appropriately. Uses file channel lock *after* thread locking.
*/
@Override
public void lock() {
if ( DEBUG ) logger.warn("Attempting threadlock: " + Thread.currentThread().getName());
if ( super.isHeldByCurrentThread() ) {
if ( DEBUG ) logger.warn(" Already have threadlock, continuing: " + Thread.currentThread().getName());
super.lock(); // call the lock here so we can call unlock later
fileLockReentrantCounter++; // inc. the file lock counter
return;
} else {
super.lock();
if ( DEBUG ) logger.warn(" Have thread-lock, going for filelock: " + Thread.currentThread().getName());
try {
// Precondition -- lock is always null while we don't have a lock
if ( fileLock != null )
throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!");
if ( fileLockReentrantCounter == 0 )
fileLock = channel.lock();
fileLockReentrantCounter++;
if ( DEBUG ) logger.warn(" Have filelock: " + Thread.currentThread().getName());
} catch (ClosedChannelException e) {
throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + channel, e);
} catch (FileLockInterruptionException e) {
throw new ReviewedStingException("File lock interrupted", e);
} catch (NonWritableChannelException e) {
throw new ReviewedStingException("File channel not writable", e);
} catch (OverlappingFileLockException e) {
// this only happens when multiple threads are running, and one is waiting
// for the lock above and we come here.
throw new ReviewedStingException("BUG: Failed to acquire lock, should never happen.");
} catch (IOException e) {
throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e);
}
}
}
@Override
public void unlock() {
try {
// update for reentrant unlocking
fileLockReentrantCounter--;
if ( fileLockReentrantCounter < 0 ) throw new ReviewedStingException("BUG: file lock counter < 0");
if ( fileLock != null && fileLockReentrantCounter == 0 ) {
if ( ! fileLock.isValid() ) throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!");
if ( DEBUG ) logger.warn(" going to release filelock: " + Thread.currentThread().getName());
fileLock.release();
fileLock = null;
if ( DEBUG ) logger.warn(" released filelock: " + Thread.currentThread().getName());
} else {
if ( DEBUG ) logger.warn(" skipping filelock release, reenterring unlock via multiple threads " + Thread.currentThread().getName());
}
} catch ( IOException e ) {
throw new ReviewedStingException("Could not free lock on file " + channel, e);
} finally {
if ( DEBUG ) logger.warn(" going to release threadlock: " + Thread.currentThread().getName());
super.unlock();
if ( DEBUG ) logger.warn(" released threadlock: " + Thread.currentThread().getName());
}
}
}

View File

@ -5,34 +5,27 @@ import org.broadinstitute.sting.utils.GenomeLoc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
/**
* For algorithmic testing purposes only. Uses synchronization to keep a consistent
* processing list in shared memory.
* Thread-safe shared memory only implementation
*/
public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
private static Logger logger = Logger.getLogger(SharedMemoryGenomeLocProcessingTracker.class);
protected List<ProcessingLoc> processingLocs = new ArrayList<ProcessingLoc>();
private List<ProcessingLoc> newPLocs = new ArrayList<ProcessingLoc>();
public ProcessingLoc claimOwnership(GenomeLoc loc, String myName) {
// processingLocs is a shared memory synchronized object, and this
// method is synchronized, so we can just do our processing
synchronized (processingLocs) {
ProcessingLoc owner = super.findOwner(loc);
if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName);
processingLocs.add(owner);
}
return owner;
//logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner));
}
protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock) {
super(lock);
}
protected List<ProcessingLoc> getProcessingLocs() {
synchronized (processingLocs) {
return processingLocs;
}
@Override
protected void registerNewLoc(ProcessingLoc loc) {
newPLocs.add(loc);
}
@Override
protected List<ProcessingLoc> readNewLocs() {
List<ProcessingLoc> r = newPLocs;
newPLocs = new ArrayList<ProcessingLoc>();
return r;
}
}

View File

@ -7,6 +7,7 @@ package org.broadinstitute.sting.utils.threading;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.iterators.GenomeLocusIterator;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.UserException;
@ -17,7 +18,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
@ -27,13 +28,11 @@ import java.util.concurrent.*;
public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
IndexedFastaSequenceFile fasta = null;
GenomeLocParser genomeLocParser = null;
File sharedFile = new File("synchronizationFile.txt");
static final boolean USE_FILE_LOCK = false;
String chr1 = null;
private final static String FILE_ROOT = "testdata/GLPTFile";
@BeforeTest
public void before() {
logger.warn("SharedFile is " + sharedFile.getAbsolutePath());
File referenceFile = new File(hg18Reference);
try {
fasta = new IndexedFastaSequenceFile(referenceFile);
@ -46,13 +45,16 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
}
}
@AfterMethod
public void cleanup(Object[] data) {
if ( sharedFile.exists() ) {
sharedFile.delete();
}
@BeforeMethod
public void beforeMethod(Object[] data) {
if ( data.length > 0 )
((TestTarget)data[0]).init();
}
((TestTarget)data[0]).getTracker().close();
@AfterMethod
public void afterMethod(Object[] data) {
if ( data.length > 0 )
((TestTarget)data[0]).getTracker().close();
}
abstract private class TestTarget {
@ -60,6 +62,8 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
int nShards;
int shardSize;
public void init() {}
protected TestTarget(String name, int nShards, int shardSize) {
this.name = name;
this.nShards = nShards;
@ -83,37 +87,44 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
}
}
private class SharedFileTest extends TestTarget {
protected SharedFileTest(int nShards, int shardSize) {
super("SharedFile", nShards, shardSize);
}
GenomeLocProcessingTracker tracker = null;
public GenomeLocProcessingTracker getTracker() {
if ( tracker == null )
//tracker = new SharedMemoryGenomeLocProcessingTracker();
tracker = new SharedFileGenomeLocProcessingTracker(sharedFile, genomeLocParser, USE_FILE_LOCK);
return tracker;
}
@DataProvider(name = "threadData")
public Object[][] createThreadData() {
return createData(Arrays.asList(10, 100, 1000, 10000), Arrays.asList(10));
}
@DataProvider(name = "data")
public Object[][] createData1() {
public Object[][] createData(List<Integer> nShards, List<Integer> shardSizes) {
List<TestTarget> params = new ArrayList<TestTarget>();
for ( int nShard : Arrays.asList(10, 100, 1000) ) {
for ( int shardSize : Arrays.asList(10) ) {
// for ( int nShard : Arrays.asList(10, 100, 1000, 10000) ) {
// for ( int shardSize : Arrays.asList(10, 100) ) {
int counter = 0;
// for ( int nShard : Arrays.asList(10,100,1000) ) {
// for ( int shardSize : Arrays.asList(10) ) {
for ( int nShard : nShards ) {
for ( int shardSize : shardSizes ) {
// shared mem -- canonical implementation
params.add(new TestTarget("SharedMem", nShard, shardSize) {
SharedMemoryGenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker();
params.add(new TestTarget("ThreadSafeSharedMemory", nShard, shardSize) {
SharedMemoryGenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createSharedMemory();
public GenomeLocProcessingTracker getTracker() { return tracker; }
});
// // shared file -- working implementation
// params.add(new SharedFileTest(nShard, shardSize));
final File file1 = new File(String.format("%s_ThreadSafeFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize));
params.add(new TestTarget("ThreadSafeFileBacked", nShard, shardSize) {
GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedThreaded(file1, genomeLocParser);
public GenomeLocProcessingTracker getTracker() { return tracker; }
public void init() {
if ( file1.exists() )
file1.delete();
}
});
final File file2 = new File(String.format("%s_ThreadSafeFileLockingFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize));
params.add(new TestTarget("ThreadSafeFileLockingFileBacked", nShard, shardSize) {
GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createFileBackedDistributed(file2, genomeLocParser);
public GenomeLocProcessingTracker getTracker() { return tracker; }
public void init() {
if ( file2.exists() )
file2.delete();
}
});
}
}
@ -122,10 +133,28 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
return params2.toArray(new Object[][]{});
}
@DataProvider(name = "simpleData")
public Object[][] createSimpleData() {
return createData(Arrays.asList(1000), Arrays.asList(100));
}
private static final String NAME_ONE = "name1";
private static final String NAME_TWO = "name2";
@Test(dataProvider = "data", enabled = true)
@Test(enabled = true)
public void testNoop() {
GenomeLocProcessingTracker tracker = GenomeLocProcessingTracker.createNoOp();
for ( int start = 1; start < 100; start++ ) {
for ( int n = 0; n < 2; n++ ) {
GenomeLoc loc = genomeLocParser.createGenomeLoc(chr1, start, start +1);
ProcessingLoc ploc = tracker.claimOwnership(loc, NAME_ONE);
Assert.assertTrue(ploc.isOwnedBy(NAME_ONE));
Assert.assertEquals(tracker.getProcessingLocs().size(), 0);
}
}
}
@Test(dataProvider = "simpleData", enabled = true)
public void testSingleProcessTracker(TestTarget test) {
GenomeLocProcessingTracker tracker = test.getTracker();
List<GenomeLoc> shards = test.getShards();
@ -138,24 +167,60 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
Assert.assertNull(tracker.findOwner(shard));
Assert.assertFalse(tracker.locIsOwned(shard));
GenomeLocProcessingTracker.ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE);
ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE);
Assert.assertNotNull(proc);
Assert.assertNotNull(proc.getLoc());
Assert.assertNotNull(proc.getLocation());
Assert.assertNotNull(proc.getOwner());
Assert.assertEquals(proc.getLoc(), shard);
Assert.assertEquals(proc.getLocation(), shard);
Assert.assertEquals(proc.getOwner(), NAME_ONE);
Assert.assertEquals(tracker.findOwner(shard), proc);
Assert.assertTrue(tracker.locIsOwned(shard));
Assert.assertNotNull(tracker.getProcessingLocs());
Assert.assertEquals(tracker.getProcessingLocs().size(), counter);
GenomeLocProcessingTracker.ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO);
ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO);
Assert.assertFalse(badClaimAttempt.getOwner().equals(NAME_TWO));
Assert.assertEquals(badClaimAttempt.getOwner(), NAME_ONE);
}
}
@Test(dataProvider = "data", enabled = true)
@Test(dataProvider = "simpleData", enabled = true)
public void testIterator(TestTarget test) {
GenomeLocProcessingTracker tracker = test.getTracker();
List<GenomeLoc> shards = test.getShards();
logger.warn("testIterator " + test);
List<GenomeLoc> markedShards = new ArrayList<GenomeLoc>();
List<GenomeLoc> toFind = new ArrayList<GenomeLoc>();
for ( int i = 0; i < shards.size(); i++ ) {
if ( ! (i % 10 == 0) ) {
markedShards.add(shards.get(i));
tracker.claimOwnership(shards.get(i), NAME_TWO);
} else {
toFind.add(shards.get(i));
}
}
int nFound = 0;
Iterator<GenomeLoc> it = shards.iterator();
while ( it.hasNext() ) {
GenomeLoc shard = tracker.claimOwnershipOfNextAvailable(it, NAME_ONE);
if ( shard == null ) { // everything to get is done
Assert.assertEquals(nFound, toFind.size(), "Didn't find all of the available shards");
} else {
nFound++;
ProcessingLoc proc = tracker.findOwner(shard);
Assert.assertTrue(proc.isOwnedBy(NAME_ONE));
Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!");
Assert.assertTrue(toFind.contains(shard), "Claimed shard wasn't one of the unmarked!");
}
}
}
@Test(dataProvider = "simpleData", enabled = true)
public void testMarkedProcesses(TestTarget test) {
GenomeLocProcessingTracker tracker = test.getTracker();
List<GenomeLoc> shards = test.getShards();
@ -171,7 +236,7 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
}
for ( GenomeLoc shard : shards ) {
GenomeLocProcessingTracker.ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE);
ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE);
Assert.assertTrue(proc.isOwnedBy(NAME_ONE) || proc.isOwnedBy(NAME_TWO));
@ -190,24 +255,39 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
public TestTarget test;
public String name;
public List<GenomeLoc> ran, toRun;
boolean useIterator;
public TestThread(TestTarget test, int count, List<GenomeLoc> toRun) {
public TestThread(TestTarget test, int count, List<GenomeLoc> toRun, boolean useIterator) {
this.test = test;
this.toRun = toRun;
this.name = "thread" + count;
this.ran = new ArrayList<GenomeLoc>();
this.useIterator = useIterator;
}
public Integer call() {
logger.warn(String.format("Call() Thread %s", name));
for ( GenomeLoc shard : toRun ) {
//System.out.printf("Claiming ownership in %s on %s%n", name, shard);
GenomeLocProcessingTracker.ProcessingLoc proc = test.getTracker().claimOwnership(shard,name);
//System.out.printf(" => ownership of %s is %s (I own? %b)%n", shard, proc.getOwner(), proc.isOwnedBy(name));
if ( proc.isOwnedBy(name) ) {
ran.add(proc.getLoc());
//logger.warn(String.format("Call() Thread %s", name));
if ( useIterator ) {
for ( GenomeLoc shard : test.getTracker().onlyOwned(toRun.iterator(), name) ) {
if ( shard != null ) { // ignore the unclaimable end of the stream
ran.add(shard);
// do some work here
for ( int sum =0, i = 0; i < 100000; i++) sum += i;
}
}
} else {
for ( GenomeLoc shard : toRun ) {
//System.out.printf("Claiming ownership in %s on %s%n", name, shard);
ProcessingLoc proc = test.getTracker().claimOwnership(shard,name);
//System.out.printf(" => ownership of %s is %s (I own? %b)%n", shard, proc.getOwner(), proc.isOwnedBy(name));
if ( proc.isOwnedBy(name) ) {
ran.add(proc.getLocation());
// do some work here
for ( int sum =0, i = 0; i < 100000; i++) sum += i;
}
//logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner()));
}
//logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner()));
}
return 1;
@ -245,14 +325,23 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
return r;
}
@Test(dataProvider = "data", enabled = true)
public void testThreadedProcesses(TestTarget test) {
@Test(dataProvider = "threadData", enabled = true)
public void testThreadedProcessesLowLevelFunctions(TestTarget test) {
testThreading(test, false);
}
@Test(dataProvider = "threadData", enabled = true)
public void testThreadedProcessesIterator(TestTarget test) {
testThreading(test, true);
}
private void testThreading(TestTarget test, boolean useIterator) {
// start up 3 threads
logger.warn("ThreadedTesting " + test);
logger.warn("ThreadedTesting " + test + " using iterator " + useIterator);
List<TestThread> threads = new ArrayList<TestThread>();
for ( int i = 0; i < 4; i++) {
List<GenomeLoc> toRun = subList(test.getShards(), i+1);
TestThread thread = new TestThread(test, i, toRun);
TestThread thread = new TestThread(test, i, toRun, useIterator);
threads.add(thread);
}
ExecutorService exec = java.util.concurrent.Executors.newFixedThreadPool(threads.size());
@ -273,11 +362,11 @@ public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
for ( GenomeLoc shard : shards ) {
Assert.assertTrue(tracker.locIsOwned(shard), "Unowned shard");
GenomeLocProcessingTracker.ProcessingLoc proc = tracker.findOwner(shard);
ProcessingLoc proc = tracker.findOwner(shard);
Assert.assertNotNull(proc, "Proc was null");
Assert.assertNotNull(proc.getOwner(), "Owner was null");
Assert.assertEquals(proc.getLoc(), shard, "Shard loc doesn't make ProcessingLoc");
Assert.assertEquals(proc.getLocation(), shard, "Shard loc doesn't make ProcessingLoc");
TestThread owner = findOwner(proc.getOwner(), threads);
Assert.assertNotNull(owner, "Couldn't find owner");