From afbea9ce599859506ed9af52786a44299c39340d Mon Sep 17 00:00:00 2001 From: depristo Date: Fri, 14 Jan 2011 03:14:24 +0000 Subject: [PATCH] SharedMemory and SharedFile implementations of GenomeLocProcessingTracker, along with serious unit tests that both pass. Slightly inefficient implementation but sufficient for further testing. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@4998 348d0f76-0448-11de-a6fe-93d51630548a --- .../threading/GenomeLocProcessingTracker.java | 32 ++- .../SharedFileGenomeLocProcessingTracker.java | 147 ++++++++++ ...haredMemoryGenomeLocProcessingTracker.java | 8 +- .../GenomeLocProcessingTrackerUnitTest.java | 260 ++++++++++++++++++ 4 files changed, 431 insertions(+), 16 deletions(-) create mode 100644 java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java create mode 100644 java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java diff --git a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java index 4d3507e4c..cdab0c610 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java @@ -1,6 +1,7 @@ package org.broadinstitute.sting.utils.threading; import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import java.util.List; @@ -16,17 +17,8 @@ public abstract class GenomeLocProcessingTracker { * Information about processing locations and their owners */ public static final class ProcessingLoc { - GenomeLoc loc; - String owner; - - /** - * Create an unowned loc - * @param loc - */ - public ProcessingLoc(GenomeLoc loc) { - this(loc, null); - - } + private final GenomeLoc loc; + private final String owner; /** * Create a loc that's already owned @@ -34,6 +26,10 @@ public abstract class GenomeLocProcessingTracker { * @param owner */ public ProcessingLoc(GenomeLoc loc, String owner) { + if ( loc == null || owner == null ) { + throw new ReviewedStingException("BUG: invalid ProcessingLoc detected: " + loc + " owner " + owner); + } + this.loc = loc; this.owner = owner; } @@ -46,11 +42,19 @@ public abstract class GenomeLocProcessingTracker { return owner; } - public void setOwner(String owner) { - this.owner = owner; + public boolean isOwnedBy(String name) { + return getOwner().equals(name); + } + + public String toString() { return String.format("ProcessingLoc(%s,%s)", loc, owner); } + + public boolean equals(Object other) { + if (other instanceof ProcessingLoc ) + return this.loc.equals(((ProcessingLoc)other).loc) && this.owner.equals(((ProcessingLoc)other).owner); + else + return false; } - public boolean isOwned() { return owner != null; } } // -------------------------------------------------------------------------------- diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java new file mode 100644 index 000000000..e77862a8f --- /dev/null +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedFileGenomeLocProcessingTracker.java @@ -0,0 +1,147 @@ +package org.broadinstitute.sting.utils.threading; + +import com.google.common.collect.ArrayListMultimap; +import net.sf.picard.reference.FastaSequenceIndex; +import net.sf.picard.reference.FastaSequenceIndexBuilder; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.GenomeLocParser; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; +import org.broadinstitute.sting.utils.exceptions.UserException; +import org.broadinstitute.sting.utils.file.FSLockWithShared; +import org.broadinstitute.sting.utils.file.FileSystemInabilityToLockException; +import org.broadinstitute.sting.utils.text.XReadLines; + +import java.io.*; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.util.ArrayList; +import java.util.List; + +/** + * + */ +public class SharedFileGenomeLocProcessingTracker extends GenomeLocProcessingTracker { + private static final boolean DEBUG = false; + private static Logger logger = Logger.getLogger(SharedFileGenomeLocProcessingTracker.class); + private List processingLocs = new ArrayList(); + private File sharedFile = null; + GenomeLocParser parser; + + // the file lock + private FileLock lock = null; + + // the file channel we open + private FileChannel channel = null; + + long lastReadPosition = 0; + + public SharedFileGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser) { + try { + this.sharedFile = sharedFile; + this.channel = new RandomAccessFile(sharedFile, "rw").getChannel(); + this.parser = parser; + } + catch (Exception e) { + throw new UserException.CouldNotCreateOutputFile(sharedFile, e); + } + } + + private void lock() { + try { + lock = channel.lock(); + } catch (ClosedChannelException e) { + throw new ReviewedStingException("Unable to lock file because the file channel is closed. " + sharedFile, e); + } +// catch (OverlappingFileLockException e) { +// logger.debug("Unable to lock file because you already have a lock on this file."); +// return false; +// } + catch (IOException e) { + throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); + } finally { + unlock(); + } + } + + private void unlock() { + try { + lock.release(); + //channel.close(); + } catch ( IOException e ) { + throw new ReviewedStingException("Could not free lock on file " + sharedFile, e); + } + } + + private void readLocs() { + try { + if ( sharedFile.exists() ) { + FileInputStream in = new FileInputStream(sharedFile); + if ( in.getChannel().size() > lastReadPosition ) { + in.skip(lastReadPosition); + + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + int counter = 0; + String line = reader.readLine(); // Read another line + while ( line != null ) { + String[] parts = line.split(" "); + if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line " + line); + GenomeLoc loc = parser.parseGenomeLoc(parts[0]); + String owner = parts[1]; + processingLocs.add(new ProcessingLoc(loc, owner)); + line = reader.readLine(); + counter++; + } + lastReadPosition = in.getChannel().position(); + if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, total locs is %d", + counter, lastReadPosition, processingLocs.size())); + } + in.close(); + } + } catch (FileNotFoundException e) { + throw new UserException.CouldNotReadInputFile(sharedFile, e); + } catch (IOException e) { + throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); + } + } + + private void writeLoc(ProcessingLoc proc) { + try { + PrintStream out = new PrintStream(new FileOutputStream(sharedFile, true)); + out.printf("%s %s%n", proc.getLoc(), proc.getOwner()); + if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file", proc)); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotReadInputFile(sharedFile, e); + } + } + + public synchronized ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { + ProcessingLoc owner = null; + try { + lock(); + readLocs(); + owner = super.findOwner(loc); + if ( owner == null ) { // we are unowned + owner = new ProcessingLoc(loc, myName); + writeLoc(owner); + } + } finally { + unlock(); + } + + return owner; + } + + protected synchronized List getProcessingLocs() { + try { + lock(); + readLocs(); + } finally { + unlock(); + } + + return processingLocs; + } +} diff --git a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java index a57921c4a..d8976ee0f 100644 --- a/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java +++ b/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java @@ -1,5 +1,6 @@ package org.broadinstitute.sting.utils.threading; +import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.GenomeLoc; import java.util.ArrayList; @@ -10,11 +11,12 @@ import java.util.List; * processing list in shared memory. */ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - List processingLocs = new ArrayList(); + private static Logger logger = Logger.getLogger(SharedMemoryGenomeLocProcessingTracker.class); + private List processingLocs = new ArrayList(); public synchronized ProcessingLoc claimOwnership(GenomeLoc loc, String myName) { // processingLocs is a shared memory synchronized object, and this - // method is synchonized, so we can just do our processing + // method is synchronized, so we can just do our processing ProcessingLoc owner = super.findOwner(loc); if ( owner == null ) { // we are unowned @@ -22,6 +24,8 @@ public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingT processingLocs.add(owner); } + //logger.warn(String.format("%s.claimOwnership(%s,%s) => %s", this, loc, myName, owner)); + return owner; } diff --git a/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java new file mode 100644 index 000000000..f99eb9406 --- /dev/null +++ b/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java @@ -0,0 +1,260 @@ +// our package +package org.broadinstitute.sting.utils.threading; + + +// the imports for unit testing. + + +import net.sf.picard.reference.IndexedFastaSequenceFile; +import org.broadinstitute.sting.BaseTest; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.GenomeLocParser; +import org.broadinstitute.sting.utils.exceptions.UserException; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Basic unit test for GenomeLoc + */ +public class GenomeLocProcessingTrackerUnitTest extends BaseTest { + IndexedFastaSequenceFile fasta = null; + GenomeLocParser genomeLocParser = null; + File sharedFile = new File("synchronizationFile.txt"); + String chr1 = null; + + @BeforeTest + public void before() { + File referenceFile = new File(hg18Reference); + try { + fasta = new IndexedFastaSequenceFile(referenceFile); + chr1 = fasta.getSequenceDictionary().getSequence(1).getSequenceName(); + genomeLocParser = new GenomeLocParser(fasta); + + } + catch(FileNotFoundException ex) { + throw new UserException.CouldNotReadInputFile(referenceFile,ex); + } + } + + @BeforeMethod + public void cleanup() { + if ( sharedFile.exists() ) { + sharedFile.delete(); + } + } + + + abstract private class TestTarget { + String name; + int nShards; + int shardSize; + + protected TestTarget(String name, int nShards, int shardSize) { + this.name = name; + this.nShards = nShards; + this.shardSize = shardSize; + } + + public abstract GenomeLocProcessingTracker getTracker(); + + public List getShards() { + List shards = new ArrayList(); + for ( int i = 0; i < nShards; i++ ) { + int start = shardSize * i; + int stop = start + shardSize; + shards.add(genomeLocParser.createGenomeLoc(chr1, start, stop)); + } + return shards; + } + + public String toString() { + return String.format("TestTarget %s: nShards=%d shardSize=%d", name, nShards, shardSize); + } + } + + + @DataProvider(name = "data") + public Object[][] createData1() { + List params = new ArrayList(); + +// for ( int nShard : Arrays.asList(10) ) { +// for ( int shardSize : Arrays.asList(10) ) { + for ( int nShard : Arrays.asList(10, 100, 1000, 10000) ) { + for ( int shardSize : Arrays.asList(10, 100) ) { + // shared mem -- canonical implementation +// params.add(new TestTarget(nShard, shardSize) { +// SharedMemoryGenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker(); +// public GenomeLocProcessingTracker getTracker() { return tracker; } +// }); + + // shared file -- working implementation + params.add(new TestTarget("SharedFile", nShard, shardSize) { + SharedFileGenomeLocProcessingTracker tracker = new SharedFileGenomeLocProcessingTracker(sharedFile, genomeLocParser); + public GenomeLocProcessingTracker getTracker() { return tracker; } + }); + } + } + + List params2 = new ArrayList(); + for ( TestTarget x : params ) params2.add(new Object[]{x}); + return params2.toArray(new Object[][]{}); + } + + private static final String NAME_ONE = "name1"; + private static final String NAME_TWO = "name2"; + + @Test(dataProvider = "data", enabled = true) + public void testSingleProcessTracker(TestTarget test) { + GenomeLocProcessingTracker tracker = test.getTracker(); + List shards = test.getShards(); + logger.warn("testSingleProcessTracker " + test); + + int counter = 0; + for ( GenomeLoc shard : shards ) { + counter++; + + Assert.assertNull(tracker.findOwner(shard)); + Assert.assertFalse(tracker.locIsOwned(shard)); + + GenomeLocProcessingTracker.ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); + Assert.assertNotNull(proc); + Assert.assertNotNull(proc.getLoc()); + Assert.assertNotNull(proc.getOwner()); + Assert.assertEquals(proc.getLoc(), shard); + Assert.assertEquals(proc.getOwner(), NAME_ONE); + Assert.assertEquals(tracker.findOwner(shard), proc); + Assert.assertTrue(tracker.locIsOwned(shard)); + Assert.assertNotNull(tracker.getProcessingLocs()); + Assert.assertEquals(tracker.getProcessingLocs().size(), counter); + + GenomeLocProcessingTracker.ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO); + Assert.assertFalse(badClaimAttempt.getOwner().equals(NAME_TWO)); + Assert.assertEquals(badClaimAttempt.getOwner(), NAME_ONE); + } + } + + @Test(dataProvider = "data", enabled = true) + public void testMarkedProcesses(TestTarget test) { + GenomeLocProcessingTracker tracker = test.getTracker(); + List shards = test.getShards(); + logger.warn("testMarkedProcesses " + test); + + List markedShards = new ArrayList(); + + for ( int i = 0; i < shards.size(); i++ ) { + if ( i % 2 == 0 ) { + markedShards.add(shards.get(i)); + tracker.claimOwnership(shards.get(i), NAME_TWO); + } + } + + for ( GenomeLoc shard : shards ) { + GenomeLocProcessingTracker.ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); + + Assert.assertTrue(proc.isOwnedBy(NAME_ONE) || proc.isOwnedBy(NAME_TWO)); + + if ( proc.isOwnedBy(NAME_ONE) ) + Assert.assertTrue(! markedShards.contains(shard)); + else + Assert.assertTrue(markedShards.contains(shard)); + + if ( ! markedShards.contains(shard) ) { + Assert.assertEquals(tracker.findOwner(shard), proc); + } + } + } + + public class TestThread implements Callable { + public TestTarget test; + public String name; + public List ran; + + public TestThread(TestTarget test, int count) { + this.test = test; + this.name = "thread" + count; + this.ran = new ArrayList(); + } + + public Integer call() { + int nShards = test.getShards().size(); + for ( GenomeLoc shard : test.getShards() ) { + if ( ran.size() < nShards / 3 ) { + GenomeLocProcessingTracker.ProcessingLoc proc = test.getTracker().claimOwnership(shard,name); + if ( proc.isOwnedBy(name) ) + ran.add(proc.getLoc()); + //logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner())); + } + } + + return 1; + } + } + + private static TestThread findOwner(String name, List threads) { + for ( TestThread thread : threads ) { + if ( thread.name.equals(name) ) + return thread; + } + return null; + } + + @Test(dataProvider = "data", enabled = true) + public void testThreadedProcesses(TestTarget test) { + // start up 3 threads + logger.warn("ThreadedTesting " + test); + List threads = new ArrayList(); + for ( int i = 0; i < 4; i++) { + TestThread thread = new TestThread(test, i); + threads.add(thread); + } + ExecutorService exec = java.util.concurrent.Executors.newFixedThreadPool(threads.size()); + + try { + List> results = exec.invokeAll(threads, 60, TimeUnit.SECONDS); + GenomeLocProcessingTracker tracker = test.getTracker(); + List shards = test.getShards(); + + for ( TestThread thread : threads ) + logger.warn(String.format("TestThread ran %d jobs", thread.ran.size())); + + // we ran everything + Assert.assertEquals(tracker.getProcessingLocs().size(), shards.size()); + + for ( GenomeLoc shard : shards ) { + Assert.assertTrue(tracker.locIsOwned(shard), "Unowned shard"); + + GenomeLocProcessingTracker.ProcessingLoc proc = tracker.findOwner(shard); + Assert.assertNotNull(proc, "Proc was null"); + + Assert.assertNotNull(proc.getOwner(), "Owner was null"); + Assert.assertEquals(proc.getLoc(), shard, "Shard loc doesn't make ProcessingLoc"); + + TestThread owner = findOwner(proc.getOwner(), threads); + Assert.assertNotNull(owner, "Couldn't find owner"); + + Assert.assertTrue(owner.ran.contains(shard), "Owner doesn't contain ran shard"); + + for ( TestThread thread : threads ) + if ( ! proc.isOwnedBy(thread.name) ) + Assert.assertFalse(thread.ran.contains(shard), "Shard appears in another run list"); + + } + } catch (InterruptedException e) { + Assert.fail("Thread failure", e); + } + } +} \ No newline at end of file