Merge branch 'master' into help

This commit is contained in:
Mark DePristo 2011-07-24 18:14:32 -04:00
commit ff85687679
12 changed files with 58 additions and 1434 deletions

View File

@ -49,7 +49,7 @@ public class LinearMicroScheduler extends MicroScheduler {
Accumulator accumulator = Accumulator.create(engine,walker);
int counter = 0;
for (Shard shard : processingTracker.onlyOwned(shardStrategy, engine.getName())) {
for (Shard shard : shardStrategy ) {
if ( shard == null ) // we ran out of shards that aren't owned
break;

View File

@ -39,14 +39,10 @@ import org.broadinstitute.sting.gatk.traversals.*;
import org.broadinstitute.sting.gatk.walkers.*;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.broadinstitute.sting.utils.threading.*;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.util.Collection;
@ -83,8 +79,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
private final MBeanServer mBeanServer;
private final ObjectName mBeanName;
protected GenomeLocProcessingTracker processingTracker;
/**
* MicroScheduler factory function. Create a microscheduler appropriate for reducing the
* selected walker.
@ -98,11 +92,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
* @return The best-fit microscheduler.
*/
public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, int nThreadsToUse) {
if (engine.getArguments().processingTrackerFile != null) {
if ( walker instanceof ReadWalker )
throw new UserException.BadArgumentValue("C", String.format("Distributed GATK processing not enabled for read walkers"));
}
if (walker instanceof TreeReducible && nThreadsToUse > 1) {
if(walker.isReduceByInterval())
throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass())));
@ -157,33 +146,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
catch (JMException ex) {
throw new ReviewedStingException("Unable to register microscheduler with JMX", ex);
}
//
// create the processing tracker
//
if ( engine.getArguments().processingTrackerFile != null ) {
logger.warn("Distributed GATK is an experimental engine feature, and is likely to not work correctly or reliably.");
if ( engine.getArguments().restartProcessingTracker && engine.getArguments().processingTrackerFile.exists() ) {
engine.getArguments().processingTrackerFile.delete();
logger.info("Deleting ProcessingTracker file " + engine.getArguments().processingTrackerFile);
}
PrintStream statusStream = null;
if ( engine.getArguments().processingTrackerStatusFile != null ) {
try {
statusStream = new PrintStream(new FileOutputStream(engine.getArguments().processingTrackerStatusFile));
} catch ( FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(engine.getArguments().processingTrackerStatusFile, e);
}
}
ClosableReentrantLock lock = new SharedFileThreadSafeLock(engine.getArguments().processingTrackerFile, engine.getArguments().processTrackerID);
processingTracker = new FileBackedGenomeLocProcessingTracker(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser(), lock, statusStream) ;
logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile + " process.id = " + engine.getName() + " CID = " + engine.getArguments().processTrackerID);
} else {
// create a NoOp version that doesn't do anything but say "yes"
processingTracker = new NoOpGenomeLocProcessingTracker();
}
}
/**

View File

@ -0,0 +1,57 @@
package org.broadinstitute.sting.utils;
import java.util.Comparator;
import java.util.Set;
import java.util.TreeSet;
/**
* Created by IntelliJ IDEA.
* User: carneiro
* Date: 7/23/11
* Time: 6:07 PM
* To change this template use File | Settings | File Templates.
*/
public class ContigComparator implements Comparator<String> {
private Set<String> specialChrs;
public ContigComparator() {
specialChrs = new TreeSet<String>();
specialChrs.add("X");
specialChrs.add("Y");
}
public int compare(String chr1, String chr2) {
if (chr1.equals(chr2))
return 0;
Integer x = convertStringWithoutException(chr1);
Integer y = convertStringWithoutException(chr2);
// both contigs are numbered
if (x != null && y != null)
return (x < y) ? -1:1;
// both contigs are named
if (x == null && y == null) {
// both contigs are special contigs or neither contig is a special contigs
if (specialChrs.contains(chr1) && specialChrs.contains(chr2) || (!specialChrs.contains(chr1) && !specialChrs.contains(chr2)))
return chr1.compareTo(chr2);
// one contig is a special and the other is not special
if (specialChrs.contains(chr1))
return -1;
return 1;
}
// one contig is named the other is numbered
if (x != null)
return -1;
return 1;
}
private Integer convertStringWithoutException(String contig) {
Integer x = null;
try {
x = Integer.decode(contig);
} catch (NumberFormatException n){}
return x;
}
}

View File

@ -1,16 +0,0 @@
package org.broadinstitute.sting.utils.threading;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by IntelliJ IDEA.
* User: depristo
* Date: 1/19/11
* Time: 9:50 AM
*
* Simple extension of a ReentrantLock that supports a close method.
*/
public class ClosableReentrantLock extends ReentrantLock {
public boolean ownsLock() { return super.isHeldByCurrentThread(); }
public void close() {}
}

View File

@ -1,114 +0,0 @@
package org.broadinstitute.sting.utils.threading;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* Keeps a copy of the processing locks in a file
*/
public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
private static final Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class);
private static final boolean DEBUG = false;
private static final String READ_MODE = "r";
private static final String WRITE_MODE = "rws";
private final File sharedFile;
private final GenomeLocParser parser;
private long lastReadPosition = 0;
public FileBackedGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, ClosableReentrantLock lock, PrintStream status) {
super(lock, status);
this.sharedFile = sharedFile;
this.parser = parser;
}
private RandomAccessFile openFile(String mode) {
try {
return new RandomAccessFile(sharedFile, mode);
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
private void closeFile(RandomAccessFile raFile) {
try {
if ( raFile != null ) raFile.close();
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
}
}
@Override
protected List<ProcessingLoc> readNewLocs() {
List<ProcessingLoc> newPLocs = new ArrayList<ProcessingLoc>(); // todo -- gratitous object creation
if ( sharedFile.exists() ) {
RandomAccessFile raFile = null;
try {
raFile = openFile(READ_MODE);
//logger.warn(String.format("Reading new locs at: file.length=%d last=%d", raFile.length(), lastReadPosition));
if ( raFile.length() > lastReadPosition ) {
raFile.seek(lastReadPosition);
int counter = 0;
String line = raFile.readLine(); // Read another line
while ( line != null ) {
String[] parts = line.split(" ");
if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer());
ProcessingLoc ploc = new ProcessingLoc(parser.parseGenomeLoc(parts[0]), parts[1]);
//logger.warn(" Read " + ploc);
newPLocs.add(ploc);
line = raFile.readLine();
counter++;
}
lastReadPosition = raFile.getFilePointer();
if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, # read new locs is %d",
counter, lastReadPosition, newPLocs.size()));
}
} catch (FileNotFoundException e) {
throw new UserException.CouldNotReadInputFile(sharedFile, e);
} catch (IOException e) {
throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e);
} finally {
closeFile(raFile);
}
}
return newPLocs;
}
@Override
protected void registerNewLocs(Collection<ProcessingLoc> plocs) {
RandomAccessFile raFile = null;
try {
raFile = openFile(WRITE_MODE);
long startPos = raFile.getFilePointer();
raFile.seek(raFile.length());
StringBuffer bytes = new StringBuffer();
for ( ProcessingLoc ploc : plocs ) {
String packet = String.format("%s %s%n", ploc.getLocation(), ploc.getOwner());
bytes.append(packet);
if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", ploc, startPos, packet.length(), raFile.getFilePointer()));
}
raFile.write(bytes.toString().getBytes());
//raFile.getChannel().force(true);
} catch (FileNotFoundException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(sharedFile, e);
} finally {
closeFile(raFile);
}
}
}

View File

@ -1,486 +0,0 @@
package org.broadinstitute.sting.utils.threading;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.HasGenomeLocation;
import org.broadinstitute.sting.utils.SimpleTimer;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* Abstract base class to coordinating data processing by a collecting for processes / threads.
*
* Conceptually, the genome is viewed as a collection of non-overlapping genome location:
*
* chr1:1-10
* chr1:11-20
* chr1:21-30
* etc.
*
* This class, and it's concrete derived classes, provide the ability to claim individual locations
* as "mine", and exclude other processes / threads from processing them. At the lowest-level this
* is implemented by the claimOwnership(loc, name) function, that returns true if loc free (unclaimed)
* and makes name the owner of loc. High-level, and more efficient operations provide claiming
* iterators over streams of objects implementing the HasGenomeLocation interface, so that you can
* write code that looks like:
*
* for ( GenomeLoc ownedLoc : onlyOwned(allLocsToProcess.iterator) ) {
* doSomeWork(ownedLoc)
*
* Much of the code in this class is actually surrounding debugging and performance metrics code.
* The actual synchronization code is separated out into the ClosableReentrantLock() system
* and the two abstract functions:
*
* protected abstract void registerNewLocs(Collection<ProcessingLoc> plocs);
* protected abstract Collection<ProcessingLoc> readNewLocs();
*
* That maintain the state of the tracker.
*
* That is, the ProcessingTracker is made of two components: a thread / process locking system and
* a subclass that implements the methods to record new claimed state changes and to read out updates
* that may have occurred by another thread or process.
*
* NOTE: this class assumes that all threads / processes are working with the same set of potential
* GenomeLocs to own. Claiming chr1:1-10 and then chr1:5-6 is allowed by the system. Basically,
* you only can stake claim to GenomeLocs that are .equal().
*/
public abstract class GenomeLocProcessingTracker {
private final static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class);
private final static SimpleDateFormat STATUS_FORMAT = new SimpleDateFormat("HH:mm:ss,SSS");
private final static int DEFAULT_OWNERSHIP_ITERATOR_SIZE = 1;
/**
* Useful state strings for printing status
*/
private final static String GOING_FOR_LOCK = "going_for_lock";
private final static String RELEASING_LOCK = "releasing_lock";
private final static String HAVE_LOCK = "have_lock";
private final static String RUNNING = "running";
/**
* A map, for efficiency, that allows quick lookup of the processing loc for a
* given GenomeLoc. The map points from loc -> loc / owner as a ProcessingLoc
*/
private final Map<GenomeLoc, ProcessingLoc> processingLocs;
/**
* The locking object used to protect data from simulatanous access by multiple
* threads or processes.
*/
private final ClosableReentrantLock lock;
/** A stream for writing status messages. Can be null if we aren't writing status */
private final PrintStream status;
//
// Timers for recording performance information
// Note -- these cannot be used because this class isn't thread safe, and neither are the
// timers, so they result in invalid operations w.r.t. the SimpleTimer contract
//
// protected final SimpleTimer writeTimer = new SimpleTimer("writeTimer");
// protected final SimpleTimer readTimer = new SimpleTimer("readTimer");
// protected final SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer");
protected final SimpleTimer timer = new SimpleTimer();
protected long nLocks = 0, nWrites = 0, nReads = 0;
// --------------------------------------------------------------------------------
//
// Creating ProcessingTrackers
//
// --------------------------------------------------------------------------------
public GenomeLocProcessingTracker(ClosableReentrantLock lock, PrintStream status) {
this.processingLocs = new HashMap<GenomeLoc, ProcessingLoc>();
this.status = status;
this.lock = lock;
printStatusHeader();
}
// --------------------------------------------------------------------------------
//
// Code to override to change the dynamics of the the GenomeLocProcessingTracker
//
// --------------------------------------------------------------------------------
protected void close() {
lock.close();
if ( status != null ) status.close();
}
/**
* Takes a collection of newly claimed (i.e., previous unclaimed) genome locs
* and the name of their owner and "registers" this data in some persistent way that's
* visible to all threads / processes communicating via this GenomeLocProcessingTracker.
*
* Could be a in-memory data structure (a list) if we are restricting ourselves to intra-memory
* parallelism, a locked file on a shared file system, or a server we communicate with.
*
* @param plocs
*/
protected abstract void registerNewLocs(Collection<ProcessingLoc> plocs);
/**
* The inverse of the registerNewLocs() function. Looks at the persistent data store
* shared by all threads / processes and returns the ones that have appeared since the last
* call to readNewLocs(). Note that we expect the pair of registerNewLocs and readNewLocs to
* include everything, even locs registered by this thread / process. For example:
*
* readNewLocs() => List()
* registerNewLocs(List(x, y,)) => void
* readNewLocs() => List(x,y))
*
* even for this thread or process.
* @return
*/
protected abstract Collection<ProcessingLoc> readNewLocs();
// --------------------------------------------------------------------------------
//
// Code to claim intervals for processing and query for their ownership
//
// --------------------------------------------------------------------------------
/**
* Queries the current database if a location is owned. Does not guarantee that the
* loc can be owned in a future call, though.
*
* @param loc
* @return
*/
public final boolean locIsOwned(GenomeLoc loc, String id) {
return findOwner(loc, id) != null;
}
/**
* The workhorse routine. Attempt to claim processing ownership of loc, with my name.
* This is an atomic operation -- other threads / processes will wait until this function
* returns. The return result is the ProcessingLoc object describing who owns this
* location. If the location isn't already claimed and we now own the location, the pl owner
* will be myName. Otherwise, the name of the owner can found in the pl.
*
* @param loc
* @param myName
* @return
*/
public final ProcessingLoc claimOwnership(final GenomeLoc loc, final String myName) {
// processingLocs is a shared memory synchronized object, and this
// method is synchronized, so we can just do our processing
return new WithLock<ProcessingLoc>(myName) {
public ProcessingLoc doBody() {
ProcessingLoc owner = findOwner(loc, myName);
if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName);
registerNewLocsWithTimers(Arrays.asList(owner), myName);
}
return owner;
}
}.run();
}
// --------------------------------------------------------------------------------
//
// High-level iterator-style interface to claiming ownership
//
// --------------------------------------------------------------------------------
/**
* A higher-level, and more efficient, interface to obtain the next location we own. Takes an
* iterator producing objects that support the getLocation() interface, and returns the next
* object in that stream that we can claim ownership of. Returns null if we run out of elements
* during the iteration.
*
* Can be more efficiently implemented in subclasses to avoid multiple unlocking
*
* @param iterator
* @param myName
* @return
*/
public final <T extends HasGenomeLocation> T claimOwnershipOfNextAvailable(Iterator<T> iterator, String myName) {
OwnershipIterator<T> myIt = new OwnershipIterator<T>(iterator, myName, 1);
return myIt.next();
}
public final <T extends HasGenomeLocation> Iterable<T> onlyOwned(Iterator<T> iterator, String myName) {
return new OwnershipIterator<T>(iterator, myName);
}
private final class OwnershipIterator<T extends HasGenomeLocation> implements Iterator<T>, Iterable<T> {
private final Iterator<T> subit;
private final String myName;
private final Queue<T> cache;
private final int cacheSize;
public OwnershipIterator(Iterator<T> subit, String myName) {
this(subit, myName, DEFAULT_OWNERSHIP_ITERATOR_SIZE);
}
public OwnershipIterator(Iterator<T> subit, String myName, int cacheSize) {
this.subit = subit;
this.myName = myName;
cache = new LinkedList<T>();
this.cacheSize = cacheSize;
}
/**
* Will return true for all elements of subit, even if we can't get ownership of some of the future
* elements and so will return null there
* @return
*/
public final boolean hasNext() {
return cache.peek() != null || subit.hasNext();
}
/**
* High performance iterator that only locks and unlocks once per claimed object found. Avoids
* locking / unlocking for each query
*
* @return an object of type T owned by this thread, or null if none of the remaining object could be claimed
*/
public final T next() {
if ( cache.peek() != null)
return cache.poll();
else {
// cache is empty, we need to fill up the cache and return the first element of the queue
return new WithLock<T>(myName) {
public T doBody() {
// read once the database of owners at the start
updateAndGetProcessingLocs(myName);
boolean done = false;
Queue<ProcessingLoc> pwns = new LinkedList<ProcessingLoc>(); // ;-)
while ( !done && cache.size() < cacheSize && subit.hasNext() ) {
final T elt = subit.next();
GenomeLoc loc = elt.getLocation();
ProcessingLoc owner = processingLocs.get(loc);
if ( owner == null ) { // we are unowned
owner = new ProcessingLoc(loc, myName);
pwns.offer(owner);
if ( ! cache.offer(elt) ) throw new ReviewedStingException("Cache offer unexpectedly failed");
if ( GenomeLoc.isUnmapped(loc) ) done = true;
}
// if not, we continue our search
}
registerNewLocsWithTimers(pwns, myName);
// we've either filled up the cache or run out of elements. Either way we return
// the first element of the cache. If the cache is empty, we return null here.
return cache.poll();
}
}.run();
}
}
public final void remove() {
throw new UnsupportedOperationException();
}
public final Iterator<T> iterator() {
return this;
}
}
// --------------------------------------------------------------------------------
//
// private / protected low-level accessors / manipulators and utility functions
//
// --------------------------------------------------------------------------------
/**
* Useful debugging function that returns the ProcessingLoc who owns loc. ID
* is provided for debugging purposes
* @param loc
* @param id
* @return
*/
protected final ProcessingLoc findOwner(GenomeLoc loc, String id) {
// fast path to check if we already have the existing genome loc in memory for ownership claims
// getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it
// unless necessary
ProcessingLoc x = processingLocs.get(loc);
return x == null ? updateAndGetProcessingLocs(id).get(loc) : x;
}
/**
* Returns the list of currently owned locations, updating the database as necessary.
* DO NOT MODIFY THIS MAP! As with all parallelizing data structures, the list may be
* out of date immediately after the call returns, or may be updating on the fly.
* @return
*/
protected final Map<GenomeLoc, ProcessingLoc> updateAndGetProcessingLocs(String myName) {
return new WithLock<Map<GenomeLoc, ProcessingLoc>>(myName) {
public Map<GenomeLoc, ProcessingLoc> doBody() {
// readTimer.restart();
for ( ProcessingLoc p : readNewLocs() )
processingLocs.put(p.getLocation(), p);
// readTimer.stop();
nReads++;
return processingLocs;
}
}.run();
}
/**
* Wrapper around registerNewLocs that also times the operation
*
* @param plocs
* @param myName
*/
protected final void registerNewLocsWithTimers(Collection<ProcessingLoc> plocs, String myName) {
// writeTimer.restart();
registerNewLocs(plocs);
nWrites++;
// writeTimer.stop();
}
private final void printStatusHeader() {
if ( status != null ) status.printf("process.id\thr.time\ttime\tstate%n");
}
private final void printStatus(String id, long machineTime, String state) {
// prints a line like processID human-readable-time machine-time state
if ( status != null ) {
status.printf("%s\t%s\t%d\t%s%n", id, STATUS_FORMAT.format(machineTime), machineTime, state);
status.flush();
}
}
/**
* Lock the data structure, preventing other threads / processes from reading and writing to the
* common store
* @param id the name of the process doing the locking
*/
private final void lock(String id) {
//lockWaitTimer.restart();
boolean hadLock = lock.ownsLock();
if ( ! hadLock ) {
nLocks++;
//printStatus(id, lockWaitTimer.currentTime(), GOING_FOR_LOCK);
}
lock.lock();
//lockWaitTimer.stop();
//if ( ! hadLock ) printStatus(id, lockWaitTimer.currentTime(), HAVE_LOCK);
}
/**
* Unlock the data structure, allowing other threads / processes to read and write to the common store
* @param id the name of the process doing the unlocking
*/
private final void unlock(String id) {
if ( lock.getHoldCount() == 1 ) printStatus(id, timer.currentTime(), RELEASING_LOCK);
lock.unlock();
if ( ! lock.ownsLock() ) printStatus(id, timer.currentTime(), RUNNING);
}
// useful code for getting
public final long getNLocks() { return nLocks; }
public final long getNReads() { return nReads; }
public final long getNWrites() { return nWrites; }
// public final double getTimePerLock() { return lockWaitTimer.getElapsedTime() / Math.max(nLocks, 1); }
// public final double getTimePerRead() { return readTimer.getElapsedTime() / Math.max(nReads,1); }
// public final double getTimePerWrite() { return writeTimer.getElapsedTime() / Math.max(nWrites,1); }
// --------------------------------------------------------------------------------
//
// Java-style functional form for with lock do { x };
//
// --------------------------------------------------------------------------------
/**
* Private utility class that executes doBody() method with the lock() acquired and
* handles property unlock()ing the system, even if an error occurs. Allows one to write
* clean code like:
*
* new WithLock<Integer>(name) {
* public Integer doBody() { doSomething(); return 1; }
* }.run()
*
* @param <T> the return type of the doBody() method
*/
private abstract class WithLock<T> {
private final String myName;
public WithLock(String myName) {
this.myName = myName;
}
protected abstract T doBody();
public T run() {
boolean locked = false;
try {
lock(myName);
locked = true;
return doBody();
} finally {
if (locked) unlock(myName);
}
}
}
// --------------------------------------------------------------------------------
//
// main function for testing performance
//
// --------------------------------------------------------------------------------
public static void main(String[] args) {
//BasicConfigurator.configure();
final String ref = args[0];
final File file = new File(args[1]);
final int cycles = Integer.valueOf(args[2]);
File referenceFile = new File(ref);
try {
final IndexedFastaSequenceFile fasta = new IndexedFastaSequenceFile(referenceFile);
final String chr1 = fasta.getSequenceDictionary().getSequence(1).getSequenceName();
final GenomeLocParser genomeLocParser = new GenomeLocParser(fasta);
final class MyTest {
String name;
GenomeLocProcessingTracker tracker;
MyTest(String name, GenomeLocProcessingTracker tracker) {
this.name = name;
this.tracker = tracker;
}
public void execute(int cycles) {
SimpleTimer delta = new SimpleTimer("delta");
SimpleTimer timer = new SimpleTimer("none");
if ( file.exists() ) file.delete();
timer.start();
delta.start();
for ( int i = 1; i < cycles; i++ ) {
tracker.claimOwnership(genomeLocParser.createGenomeLoc(chr1, i, i+1), "ABCDEFGHIJKL");
if ( i % 1000 == 0 ) {
System.out.printf("%s\t%d\t%d\t%.4f\t%.4f%n", name, i, timer.currentTime(), timer.getElapsedTime(), delta.getElapsedTime() );
delta.restart();
}
}
}
}
System.out.printf("name\tcycle\tcurrent.time\telapsed.time\tdelta%n");
new MyTest("in-memory", new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock())).execute(cycles);
new MyTest("nio", new FileBackedGenomeLocProcessingTracker(file, genomeLocParser, new ClosableReentrantLock(), null)).execute(cycles);
new MyTest("nio-file-lock", new FileBackedGenomeLocProcessingTracker(file, genomeLocParser, new SharedFileThreadSafeLock(file,1), null)).execute(cycles);
}
catch(FileNotFoundException ex) {
throw new UserException.CouldNotReadInputFile(referenceFile,ex);
}
}
}

View File

@ -1,26 +0,0 @@
package org.broadinstitute.sting.utils.threading;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Base class, and null tracker. Always says that a GenomeLoc is ready for processing. It is
* critical that this class already return that a loc is owned, no matter if it's been seen before,
* etc. ReadShards can differ in their contents but have the same "unmapped" genome loc
*/
public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
public NoOpGenomeLocProcessingTracker() {
super(new ClosableReentrantLock(), null);
}
@Override
protected void registerNewLocs(Collection<ProcessingLoc> loc) {
;
}
@Override
protected List<ProcessingLoc> readNewLocs() {
return Collections.emptyList();
}
}

View File

@ -1,71 +0,0 @@
package org.broadinstitute.sting.utils.threading;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.HasGenomeLocation;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
/**
* Created by IntelliJ IDEA.
* User: depristo
* Date: 1/19/11
* Time: 8:06 AM
*
* Information about processing locations and their owners. Contains two basic data, associated
* together. The first is a genome loc, and the second is the name of the owner, as a string.
*
* chr1:1-10 Mark
* chr2:11-20 DePristo
*
* would be two ProcessingLocs that first indicate that the first 10 bp of chr1 are owned by Mark,
* and the second is owned by DePristo.
*/
public class ProcessingLoc implements HasGenomeLocation {
private final GenomeLoc loc;
private final String owner;
/**
* Create a loc that's already owned
* @param loc
* @param owner
*/
public ProcessingLoc(GenomeLoc loc, String owner) {
if ( loc == null || owner == null ) {
throw new ReviewedStingException("BUG: invalid ProcessingLoc detected: " + loc + " owner " + owner);
}
this.loc = loc;
this.owner = owner.intern(); // reduce memory consumption by interning the string
}
public GenomeLoc getLocation() {
return loc;
}
public String getOwner() {
return owner;
}
/**
* Returns true iff the owner of this processing loc is name. Can be used to determine
* the owner of this processing location.
*
* @param name
* @return
*/
public boolean isOwnedBy(String name) {
return getOwner().equals(name);
}
public String toString() { return String.format("ProcessingLoc(%s,%s)", loc, owner); }
public boolean equals(Object other) {
if (other instanceof ProcessingLoc )
return this.loc.equals(((ProcessingLoc)other).loc) && this.owner.equals(((ProcessingLoc)other).owner);
else
return false;
}
public int compareTo(ProcessingLoc other) {
return this.getLocation().compareTo(other.getLocation());
}
}

View File

@ -1,171 +0,0 @@
package org.broadinstitute.sting.utils.threading;
import org.apache.log4j.Logger;
import org.apache.lucene.store.*;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.exceptions.UserException;
import java.io.File;
import java.io.IOException;
/**
* User: depristo
* Date: 1/19/11
* Time: 8:24 AM
*
* A reentrant lock for a shared file common file in the file system. Relies on a a Lucene SimpleFSLock
* to manage on disk file locking.
*/
public class SharedFileLock extends ClosableReentrantLock { // todo -- kinda gross inheritance. The super lock is never used
private static Logger logger = Logger.getLogger(SharedFileLock.class);
private static final String VERIFY_HOST = System.getProperty("verify.host", "gsa1");
private static final boolean VERIFY = false;
private static final int VERIFY_PORT = 5050;
// 5 minutes => 360 seconds of trying -> failure
protected static final int DEFAULT_N_TRIES = 1000;
protected static final long DEFAULT_MILLISECONDS_PER_TRY = 360;
/** The file we are locking */
private final File file;
private final LockFactory lockFactory;
private Lock fileLock = null;
/**
* A counter that indicates the number of 'locks' on this file.
* If locks == 2, then two unlocks are required
* before any resources are freed.
*/
int fileLockReentrantCounter = 0;
// type of locking
private final int nRetries;
private final long milliSecPerTry;
/**
* Create a SharedFileThreadSafeLock object locking the file
* @param file
*/
public SharedFileLock(File file, int nRetries, long milliSecPerTry, int ID) {
super();
this.file = file;
this.nRetries = nRetries;
this.milliSecPerTry = milliSecPerTry;
File lockDir = new File(file.getParent() == null ? "./" : file.getParent());
try {
LockFactory factory = new SimpleFSLockFactory(lockDir);
if ( VERIFY ) { // don't forget to start up the VerifyLockServer
this.lockFactory = new VerifyingLockFactory((byte)ID, factory, VERIFY_HOST, VERIFY_PORT);
} else {
this.lockFactory = factory;
}
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(lockDir, "Could not create coordination file locking directory " + lockDir, e);
}
}
public SharedFileLock(File file, int ID) {
this(file, DEFAULT_N_TRIES, DEFAULT_MILLISECONDS_PER_TRY, ID);
}
@Override
public void close() {
if ( ownsLock() ) throw new ReviewedStingException("closing SharedFileLock while still owned: ownership count " + fileLockReentrantCounter);
}
@Override
public int getHoldCount() {
return fileLockReentrantCounter;
}
@Override
public boolean ownsLock() {
return fileLockReentrantCounter > 0;
}
// ------------------------------------------------------------------------------------------
//
// workhorse routines -- acquiring file locks
//
// ------------------------------------------------------------------------------------------
private boolean obtainFileLock() throws IOException {
// annoying bug work around for verifylockserver
if ( VERIFY )
try {
return fileLock.obtain(1);
} catch ( LockObtainFailedException e ) {
return false;
}
else
return fileLock.obtain();
}
/**
* Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be
* unwound appropriately. Uses file channel lock *after* thread locking.
*/
@Override
public void lock() {
if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" lock() " + Thread.currentThread().getName() + ", fileLockReentrantCounter = " + fileLockReentrantCounter);
if ( fileLockReentrantCounter++ == 0 ) {
// Precondition -- lock is always null while we don't have a lock
if ( fileLock != null )
throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!");
int i = 1;
fileLock = lockFactory.makeLock(file.getName() + ".lock");
try {
boolean obtained = obtainFileLock(); // todo -- maybe use intrinsic lock features
for ( ; ! obtained && i < nRetries; i++ ) {
try {
//logger.warn("tryLock failed on try " + i + ", waiting " + milliSecPerTry + " millseconds for retry");
Thread.sleep(milliSecPerTry);
} catch ( InterruptedException e ) {
throw new UserException("SharedFileThreadSafeLock interrupted during wait for file lock", e);
}
obtained = obtainFileLock(); // gross workaround for error in verify server
}
if ( i > 1 ) logger.warn("tryLock required " + i + " tries before completing, waited " + i * milliSecPerTry + " millseconds");
if ( ! obtained ) {
fileLock = null;
// filelock == null -> we never managed to acquire the lock!
throw new UserException("SharedFileThreadSafeLock failed to obtain the lock after " + nRetries + " attempts");
}
if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" lock() " + Thread.currentThread().getName() + ", obtained = " + obtained + ", tries = " + i);
} catch (IOException e) {
fileLock = null;
throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e);
}
}
}
@Override
public void unlock() {
// update for reentrant unlocking
if ( fileLock == null ) throw new ReviewedStingException("BUG: file lock is null -- file lock was not obtained");
if ( fileLockReentrantCounter <= 0 ) throw new ReviewedStingException("BUG: file lock counter < 0");
// this unlock counts as 1 unlock. If this is our last unlock, actually do something
if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", count = " + fileLockReentrantCounter);
if ( --fileLockReentrantCounter == 0 ) {
try {
if ( ! fileLock.isLocked() ) throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!");
fileLock.release();
if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", actually releasing");
} catch ( IOException e ) {
throw new ReviewedStingException("Could not free file lock on file " + file, e);
} finally { // make sure we null out the filelock, regardless of our state
fileLock = null;
}
} else {
if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", skipping, count = " + fileLockReentrantCounter);
}
}
}

View File

@ -1,75 +0,0 @@
package org.broadinstitute.sting.utils.threading;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.io.File;
/**
* User: depristo
* Date: 1/19/11
* Time: 8:24 AM
*
* A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common
* file in the file system. It itself a shared memory reenterant lock to managed thread safety and
* contains a SharedFileLock to handle the file integrity.
*/
public class SharedFileThreadSafeLock extends ClosableReentrantLock {
private static Logger logger = Logger.getLogger(SharedFileThreadSafeLock.class);
protected static final boolean DEBUG = false;
private final SharedFileLock fileLock;
/**
* Create a SharedFileThreadSafeLock object locking the file
* @param file
*/
public SharedFileThreadSafeLock(File file, int nRetries, long milliSecPerTry, int ID) {
super();
this.fileLock = new SharedFileLock(file, nRetries, milliSecPerTry, ID);
}
public SharedFileThreadSafeLock(File file, int ID) {
this(file, SharedFileLock.DEFAULT_N_TRIES, SharedFileLock.DEFAULT_MILLISECONDS_PER_TRY, ID);
}
@Override
public void close() {
super.close();
fileLock.close();
}
@Override
public int getHoldCount() {
if ( super.getHoldCount() != fileLock.getHoldCount() )
throw new ReviewedStingException("BUG: unequal hold counts. threadlock = " + super.getHoldCount() + ", filelock = " + fileLock.getHoldCount());
return super.getHoldCount();
}
@Override
public boolean ownsLock() {
return super.isHeldByCurrentThread() && fileLock.ownsLock();
}
/**
* Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be
* unwound appropriately. Uses file channel lock *after* thread locking.
*/
@Override
public void lock() {
if ( DEBUG ) logger.warn("Attempting SharedFileThreadSafe lock: " + Thread.currentThread().getName());
if ( DEBUG ) logger.warn(" going for thread lock: " + Thread.currentThread().getName());
super.lock();
if ( DEBUG ) logger.warn(" going for file lock: " + Thread.currentThread().getName());
fileLock.lock(); // todo -- should this be in a try?
}
@Override
public void unlock() {
if ( DEBUG ) logger.warn(" releasing filelock: " + Thread.currentThread().getName());
fileLock.unlock();
if ( DEBUG ) logger.warn(" releasing threadlock: " + Thread.currentThread().getName());
super.unlock();
if ( DEBUG ) logger.warn(" unlock() complete: " + Thread.currentThread().getName());
}
}

View File

@ -1,34 +0,0 @@
package org.broadinstitute.sting.utils.threading;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* Thread-safe shared memory only implementation. Uses a simple list to manage the newly
* added processing locations.
*/
public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker {
private List<ProcessingLoc> newPLocs = new ArrayList<ProcessingLoc>();
protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock) {
super(lock, null);
}
protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock, PrintStream status) {
super(lock, status);
}
@Override
protected void registerNewLocs(Collection<ProcessingLoc> plocs) {
newPLocs.addAll(plocs);
}
@Override
protected List<ProcessingLoc> readNewLocs() {
List<ProcessingLoc> r = newPLocs;
newPLocs = new ArrayList<ProcessingLoc>();
return r;
}
}

View File

@ -1,402 +0,0 @@
// our package
package org.broadinstitute.sting.utils.threading;
// the imports for unit testing.
import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.iterators.GenomeLocusIterator;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.exceptions.UserException;
import org.testng.Assert;
import org.testng.annotations.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
/**
* Basic unit test for GenomeLoc
*/
public class GenomeLocProcessingTrackerUnitTest extends BaseTest {
IndexedFastaSequenceFile fasta = null;
GenomeLocParser genomeLocParser = null;
String chr1 = null;
private final static String FILE_ROOT = "public/testdata/GLPTFile";
@BeforeTest
public void before() {
File referenceFile = new File(hg18Reference);
try {
fasta = new IndexedFastaSequenceFile(referenceFile);
chr1 = fasta.getSequenceDictionary().getSequence(1).getSequenceName();
genomeLocParser = new GenomeLocParser(fasta);
}
catch(FileNotFoundException ex) {
throw new UserException.CouldNotReadInputFile(referenceFile,ex);
}
}
@BeforeMethod
public void beforeMethod(Object[] data) {
if ( data.length > 0 )
((TestTarget)data[0]).init();
}
@AfterMethod
public void afterMethod(Object[] data) {
if ( data.length > 0 ) {
((TestTarget)data[0]).getTracker().close();
((TestTarget)data[0]).cleanup();
}
}
abstract private class TestTarget {
String name;
int nShards;
int shardSize;
File file;
public void init() { cleanup(); }
public void cleanup() {
if ( file != null && file.exists() )
file.delete();
}
public boolean isThreadSafe() { return true; }
protected TestTarget(String name, int nShards, int shardSize, File file) {
this.name = name;
this.nShards = nShards;
this.shardSize = shardSize;
this.file = file;
}
public abstract GenomeLocProcessingTracker getTracker();
public List<GenomeLoc> getShards() {
List<GenomeLoc> shards = new ArrayList<GenomeLoc>();
for ( int i = 0; i < nShards; i++ ) {
int start = shardSize * i;
int stop = start + shardSize;
shards.add(genomeLocParser.createGenomeLoc(chr1, start, stop));
}
return shards;
}
public String toString() {
return String.format("TestTarget %s: nShards=%d shardSize=%d", name, nShards, shardSize);
}
}
@DataProvider(name = "threadData")
public Object[][] createThreadData() {
// gotta keep the tests small...
return createData(Arrays.asList(10, 100), Arrays.asList(10));
//return createData(Arrays.asList(10, 100, 1000, 10000), Arrays.asList(10));
}
public Object[][] createData(List<Integer> nShards, List<Integer> shardSizes) {
List<TestTarget> params = new ArrayList<TestTarget>();
int counter = 0;
String name = null;
for ( int nShard : nShards ) {
for ( int shardSize : shardSizes ) {
// shared mem -- canonical implementation
params.add(new TestTarget("ThreadSafeSharedMemory", nShard, shardSize, null) {
GenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock());
public GenomeLocProcessingTracker getTracker() { return tracker; }
});
final File file1 = new File(String.format("%s_ThreadSafeFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize));
params.add(new TestTarget("ThreadSafeFileBacked", nShard, shardSize, file1) {
GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file1, genomeLocParser, new ClosableReentrantLock(), null);
public GenomeLocProcessingTracker getTracker() { return tracker; }
});
name = "FileBackedSharedFileThreadSafe";
final File file2 = new File(String.format("%s_%s_%d_%d", FILE_ROOT, name, counter++, nShard, shardSize));
params.add(new TestTarget(name, nShard, shardSize, file2) {
GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file2, genomeLocParser, new SharedFileThreadSafeLock(file2, -1), null);
public GenomeLocProcessingTracker getTracker() { return tracker; }
});
name = "FileBackedSharedFile";
final File file3 = new File(String.format("%s_%s_%d_%d", FILE_ROOT, name, counter++, nShard, shardSize));
params.add(new TestTarget(name, nShard, shardSize, file3) {
GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file3, genomeLocParser, new SharedFileLock(file3, -1), null);
public GenomeLocProcessingTracker getTracker() { return tracker; }
public boolean isThreadSafe() { return false; }
});
}
}
List<Object[]> params2 = new ArrayList<Object[]>();
for ( TestTarget x : params ) params2.add(new Object[]{x});
return params2.toArray(new Object[][]{});
}
@DataProvider(name = "simpleData")
public Object[][] createSimpleData() {
return createData(Arrays.asList(1000), Arrays.asList(100));
}
private static final String NAME_ONE = "name1";
private static final String NAME_TWO = "name2";
@Test(enabled = true)
public void testNoop() {
GenomeLocProcessingTracker tracker = new NoOpGenomeLocProcessingTracker();
for ( int start = 1; start < 100; start++ ) {
for ( int n = 0; n < 2; n++ ) {
GenomeLoc loc = genomeLocParser.createGenomeLoc(chr1, start, start +1);
ProcessingLoc ploc = tracker.claimOwnership(loc, NAME_ONE);
Assert.assertTrue(ploc.isOwnedBy(NAME_ONE));
Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), 0);
}
}
}
@Test(dataProvider = "simpleData", enabled = true)
public void testSingleProcessTracker(TestTarget test) {
GenomeLocProcessingTracker tracker = test.getTracker();
List<GenomeLoc> shards = test.getShards();
logger.warn("testSingleProcessTracker " + test);
int counter = 0;
for ( GenomeLoc shard : shards ) {
counter++;
Assert.assertNull(tracker.findOwner(shard, NAME_ONE));
Assert.assertFalse(tracker.locIsOwned(shard, NAME_ONE));
ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE);
Assert.assertNotNull(proc);
Assert.assertNotNull(proc.getLocation());
Assert.assertNotNull(proc.getOwner());
Assert.assertEquals(proc.getLocation(), shard);
Assert.assertEquals(proc.getOwner(), NAME_ONE);
Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc);
Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE));
Assert.assertNotNull(tracker.updateAndGetProcessingLocs(NAME_ONE));
Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), counter);
ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO);
Assert.assertFalse(badClaimAttempt.getOwner().equals(NAME_TWO));
Assert.assertEquals(badClaimAttempt.getOwner(), NAME_ONE);
}
}
@Test(dataProvider = "simpleData", enabled = true)
public void testIterator(TestTarget test) {
GenomeLocProcessingTracker tracker = test.getTracker();
List<GenomeLoc> shards = test.getShards();
logger.warn("testIterator " + test);
List<GenomeLoc> markedShards = new ArrayList<GenomeLoc>();
List<GenomeLoc> toFind = new ArrayList<GenomeLoc>();
for ( int i = 0; i < shards.size(); i++ ) {
if ( ! (i % 10 == 0) ) {
markedShards.add(shards.get(i));
tracker.claimOwnership(shards.get(i), NAME_TWO);
} else {
toFind.add(shards.get(i));
}
}
int nFound = 0;
Iterator<GenomeLoc> it = shards.iterator();
while ( it.hasNext() ) {
GenomeLoc shard = tracker.claimOwnershipOfNextAvailable(it, NAME_ONE);
if ( shard == null ) { // everything to get is done
Assert.assertEquals(nFound, toFind.size(), "Didn't find all of the available shards");
} else {
nFound++;
ProcessingLoc proc = tracker.findOwner(shard, NAME_ONE);
Assert.assertTrue(proc.isOwnedBy(NAME_ONE));
Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!");
Assert.assertTrue(toFind.contains(shard), "Claimed shard wasn't one of the unmarked!");
}
}
}
@Test(dataProvider = "simpleData", enabled = true)
public void testMarkedProcesses(TestTarget test) {
GenomeLocProcessingTracker tracker = test.getTracker();
List<GenomeLoc> shards = test.getShards();
logger.warn("testMarkedProcesses " + test);
List<GenomeLoc> markedShards = new ArrayList<GenomeLoc>();
for ( int i = 0; i < shards.size(); i++ ) {
if ( i % 2 == 0 ) {
markedShards.add(shards.get(i));
tracker.claimOwnership(shards.get(i), NAME_TWO);
}
}
for ( GenomeLoc shard : shards ) {
ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE);
Assert.assertTrue(proc.isOwnedBy(NAME_ONE) || proc.isOwnedBy(NAME_TWO));
if ( proc.isOwnedBy(NAME_ONE) )
Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!");
else
Assert.assertTrue(markedShards.contains(shard), "Unran process wasn't marked");
if ( ! markedShards.contains(shard) ) {
Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc);
}
}
}
public class TestThread implements Callable<Integer> {
public TestTarget test;
public String name;
public List<GenomeLoc> ran, toRun;
boolean useIterator;
public TestThread(TestTarget test, int count, List<GenomeLoc> toRun, boolean useIterator) {
this.test = test;
this.toRun = toRun;
this.name = "thread" + count;
this.ran = new ArrayList<GenomeLoc>();
this.useIterator = useIterator;
}
public Integer call() {
//logger.warn(String.format("Call() Thread %s", name));
if ( useIterator ) {
for ( GenomeLoc shard : test.getTracker().onlyOwned(toRun.iterator(), name) ) {
if ( shard != null ) { // ignore the unclaimable end of the stream
ran.add(shard);
// do some work here
for ( int sum =0, i = 0; i < 100000; i++) sum += i;
}
}
} else {
for ( GenomeLoc shard : toRun ) {
//System.out.printf("Claiming ownership in %s on %s%n", name, shard);
ProcessingLoc proc = test.getTracker().claimOwnership(shard,name);
//System.out.printf(" => ownership of %s is %s (I own? %b)%n", shard, proc.getOwner(), proc.isOwnedBy(name));
if ( proc.isOwnedBy(name) ) {
ran.add(proc.getLocation());
// do some work here
for ( int sum =0, i = 0; i < 100000; i++) sum += i;
}
//logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner()));
}
}
return 1;
}
}
private static TestThread findOwner(String name, List<TestThread> threads) {
for ( TestThread thread : threads ) {
if ( thread.name.equals(name) )
return thread;
}
return null;
}
private static final <T> void assertAllThreadsFinished(List<Future<T>> futures) {
try {
for ( Future f : futures ) {
Assert.assertTrue(f.isDone(), "Thread never finished running");
Assert.assertTrue(f.get() != null, "Finished successfully");
}
} catch (InterruptedException e) {
Assert.fail("Thread failed to run to completion", e);
} catch (ExecutionException e) {
Assert.fail("Thread generated an exception", e);
}
}
private static final List<GenomeLoc> subList(List<GenomeLoc> l, int i) {
List<GenomeLoc> r = new ArrayList<GenomeLoc>();
for ( int j = 0; j < l.size(); j++ ) {
if ( j % i == 0 )
r.add(l.get(j));
}
return r;
}
@Test(dataProvider = "threadData", enabled = true)
public void testThreadedProcessesLowLevelFunctions(TestTarget test) {
testThreading(test, false);
}
@Test(dataProvider = "threadData", enabled = true)
public void testThreadedProcessesIterator(TestTarget test) {
testThreading(test, true);
}
private void testThreading(TestTarget test, boolean useIterator) {
if ( ! test.isThreadSafe() )
// skip tests that aren't thread safe
return;
// start up 3 threads
logger.warn("ThreadedTesting " + test + " using iterator " + useIterator);
List<TestThread> threads = new ArrayList<TestThread>();
for ( int i = 0; i < 4; i++) {
List<GenomeLoc> toRun = subList(test.getShards(), i+1);
TestThread thread = new TestThread(test, i, toRun, useIterator);
threads.add(thread);
}
ExecutorService exec = java.util.concurrent.Executors.newFixedThreadPool(threads.size());
try {
List<Future<Integer>> results = exec.invokeAll(threads, 300, TimeUnit.SECONDS);
GenomeLocProcessingTracker tracker = test.getTracker();
List<GenomeLoc> shards = test.getShards();
for ( TestThread thread : threads )
logger.warn(String.format("TestThread %s ran %d jobs of %d to run", thread.name, thread.ran.size(), thread.toRun.size()));
assertAllThreadsFinished(results);
// we ran everything
Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), shards.size(), "Not all shards were run");
for ( GenomeLoc shard : shards ) {
Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE), "Unowned shard");
ProcessingLoc proc = tracker.findOwner(shard, NAME_ONE);
Assert.assertNotNull(proc, "Proc was null");
Assert.assertNotNull(proc.getOwner(), "Owner was null");
Assert.assertEquals(proc.getLocation(), shard, "Shard loc doesn't make ProcessingLoc");
TestThread owner = findOwner(proc.getOwner(), threads);
Assert.assertNotNull(owner, "Couldn't find owner");
Assert.assertTrue(owner.ran.contains(shard), "Owner doesn't contain ran shard");
for ( TestThread thread : threads )
if ( ! proc.isOwnedBy(thread.name) && thread.ran.contains(shard) )
Assert.fail("Shard appears in another run list: proc=" + proc + " shard=" + shard + " also in jobs of " + thread.name + " obj=" + thread.ran.get(thread.ran.indexOf(shard)));
}
} catch (InterruptedException e) {
Assert.fail("Thread failure", e);
}
}
}