From d31b176e159046e70880897b99777543d942ad17 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Wed, 20 Jul 2011 16:26:09 -0400 Subject: [PATCH 1/4] Removed GATK use of distributed parallelism framework. Moved distributed GATK prototype code into distributedutils, separating from threading package --- .../gatk/executive/LinearMicroScheduler.java | 2 +- .../sting/gatk/executive/MicroScheduler.java | 38 ------------------- .../ClosableReentrantLock.java | 2 +- .../FileBackedGenomeLocProcessingTracker.java | 2 +- .../GenomeLocProcessingTracker.java | 2 +- .../NoOpGenomeLocProcessingTracker.java | 2 +- .../ProcessingLoc.java | 2 +- .../SharedFileLock.java | 2 +- .../SharedFileThreadSafeLock.java | 2 +- ...haredMemoryGenomeLocProcessingTracker.java | 2 +- .../utils/distributedutils/package-info.java | 28 ++++++++++++++ .../GenomeLocProcessingTrackerUnitTest.java | 4 +- 12 files changed, 39 insertions(+), 49 deletions(-) rename public/java/src/org/broadinstitute/sting/utils/{threading => distributedutils}/ClosableReentrantLock.java (86%) rename public/java/src/org/broadinstitute/sting/utils/{threading => distributedutils}/FileBackedGenomeLocProcessingTracker.java (98%) rename public/java/src/org/broadinstitute/sting/utils/{threading => distributedutils}/GenomeLocProcessingTracker.java (99%) rename public/java/src/org/broadinstitute/sting/utils/{threading => distributedutils}/NoOpGenomeLocProcessingTracker.java (93%) rename public/java/src/org/broadinstitute/sting/utils/{threading => distributedutils}/ProcessingLoc.java (97%) rename public/java/src/org/broadinstitute/sting/utils/{threading => distributedutils}/SharedFileLock.java (99%) rename public/java/src/org/broadinstitute/sting/utils/{threading => distributedutils}/SharedFileThreadSafeLock.java (97%) rename public/java/src/org/broadinstitute/sting/utils/{threading => distributedutils}/SharedMemoryGenomeLocProcessingTracker.java (94%) create mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/package-info.java rename public/java/test/org/broadinstitute/sting/utils/{threading => distributedutils}/GenomeLocProcessingTrackerUnitTest.java (99%) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index 9466fdf75..48fd73e0b 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -49,7 +49,7 @@ public class LinearMicroScheduler extends MicroScheduler { Accumulator accumulator = Accumulator.create(engine,walker); int counter = 0; - for (Shard shard : processingTracker.onlyOwned(shardStrategy, engine.getName())) { + for (Shard shard : shardStrategy ) { if ( shard == null ) // we ran out of shards that aren't owned break; diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index 23e5769f1..e731b9864 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -39,14 +39,10 @@ import org.broadinstitute.sting.gatk.traversals.*; import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; -import org.broadinstitute.sting.utils.threading.*; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.PrintStream; import java.lang.management.ManagementFactory; import java.util.Collection; @@ -83,8 +79,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { private final MBeanServer mBeanServer; private final ObjectName mBeanName; - protected GenomeLocProcessingTracker processingTracker; - /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -98,11 +92,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { * @return The best-fit microscheduler. */ public static MicroScheduler create(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection rods, int nThreadsToUse) { - if (engine.getArguments().processingTrackerFile != null) { - if ( walker instanceof ReadWalker ) - throw new UserException.BadArgumentValue("C", String.format("Distributed GATK processing not enabled for read walkers")); - } - if (walker instanceof TreeReducible && nThreadsToUse > 1) { if(walker.isReduceByInterval()) throw new UserException.BadArgumentValue("nt", String.format("The analysis %s aggregates results by interval. Due to a current limitation of the GATK, analyses of this type do not currently support parallel execution. Please run your analysis without the -nt option.", engine.getWalkerName(walker.getClass()))); @@ -157,33 +146,6 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { catch (JMException ex) { throw new ReviewedStingException("Unable to register microscheduler with JMX", ex); } - - // - // create the processing tracker - // - if ( engine.getArguments().processingTrackerFile != null ) { - logger.warn("Distributed GATK is an experimental engine feature, and is likely to not work correctly or reliably."); - if ( engine.getArguments().restartProcessingTracker && engine.getArguments().processingTrackerFile.exists() ) { - engine.getArguments().processingTrackerFile.delete(); - logger.info("Deleting ProcessingTracker file " + engine.getArguments().processingTrackerFile); - } - - PrintStream statusStream = null; - if ( engine.getArguments().processingTrackerStatusFile != null ) { - try { - statusStream = new PrintStream(new FileOutputStream(engine.getArguments().processingTrackerStatusFile)); - } catch ( FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(engine.getArguments().processingTrackerStatusFile, e); - } - } - - ClosableReentrantLock lock = new SharedFileThreadSafeLock(engine.getArguments().processingTrackerFile, engine.getArguments().processTrackerID); - processingTracker = new FileBackedGenomeLocProcessingTracker(engine.getArguments().processingTrackerFile, engine.getGenomeLocParser(), lock, statusStream) ; - logger.info("Creating ProcessingTracker using shared file " + engine.getArguments().processingTrackerFile + " process.id = " + engine.getName() + " CID = " + engine.getArguments().processTrackerID); - } else { - // create a NoOp version that doesn't do anything but say "yes" - processingTracker = new NoOpGenomeLocProcessingTracker(); - } } /** diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/ClosableReentrantLock.java similarity index 86% rename from public/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java rename to public/java/src/org/broadinstitute/sting/utils/distributedutils/ClosableReentrantLock.java index d16c19130..7f0c879e8 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/ClosableReentrantLock.java +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/ClosableReentrantLock.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; import java.util.concurrent.locks.ReentrantLock; diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/FileBackedGenomeLocProcessingTracker.java similarity index 98% rename from public/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java rename to public/java/src/org/broadinstitute/sting/utils/distributedutils/FileBackedGenomeLocProcessingTracker.java index 3763ec67d..eac68cbdd 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/FileBackedGenomeLocProcessingTracker.java +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/FileBackedGenomeLocProcessingTracker.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.GenomeLocParser; diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTracker.java similarity index 99% rename from public/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java rename to public/java/src/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTracker.java index e97a73fb8..a7310743b 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTracker.java +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTracker.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.apache.log4j.Logger; diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/NoOpGenomeLocProcessingTracker.java similarity index 93% rename from public/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java rename to public/java/src/org/broadinstitute/sting/utils/distributedutils/NoOpGenomeLocProcessingTracker.java index ad2a6d31b..9807b6efa 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/NoOpGenomeLocProcessingTracker.java +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/NoOpGenomeLocProcessingTracker.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; import java.util.Collection; import java.util.Collections; diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/ProcessingLoc.java similarity index 97% rename from public/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java rename to public/java/src/org/broadinstitute/sting/utils/distributedutils/ProcessingLoc.java index ee2283dcf..0957ac1ae 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/ProcessingLoc.java +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/ProcessingLoc.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.HasGenomeLocation; diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileLock.java similarity index 99% rename from public/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java rename to public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileLock.java index 0f47da413..bda62b890 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/SharedFileLock.java +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileLock.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; import org.apache.log4j.Logger; import org.apache.lucene.store.*; diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileThreadSafeLock.java similarity index 97% rename from public/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java rename to public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileThreadSafeLock.java index d70879a0a..49fc8208a 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/SharedFileThreadSafeLock.java +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileThreadSafeLock.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; import org.apache.log4j.Logger; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; diff --git a/public/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedMemoryGenomeLocProcessingTracker.java similarity index 94% rename from public/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java rename to public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedMemoryGenomeLocProcessingTracker.java index 9bf8b58b1..0e62d0f63 100644 --- a/public/java/src/org/broadinstitute/sting/utils/threading/SharedMemoryGenomeLocProcessingTracker.java +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedMemoryGenomeLocProcessingTracker.java @@ -1,4 +1,4 @@ -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; import java.io.PrintStream; import java.util.ArrayList; diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/package-info.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/package-info.java new file mode 100644 index 000000000..033120dc5 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/distributedutils/package-info.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +/** + * Utilities for prototype distributed GATK. No longer in use in the codebase + */ +package org.broadinstitute.sting.utils.distributedutils; diff --git a/public/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTrackerUnitTest.java similarity index 99% rename from public/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java rename to public/java/test/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTrackerUnitTest.java index 78ab916db..139937e29 100644 --- a/public/java/test/org/broadinstitute/sting/utils/threading/GenomeLocProcessingTrackerUnitTest.java +++ b/public/java/test/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTrackerUnitTest.java @@ -1,5 +1,5 @@ // our package -package org.broadinstitute.sting.utils.threading; +package org.broadinstitute.sting.utils.distributedutils; // the imports for unit testing. @@ -7,9 +7,9 @@ package org.broadinstitute.sting.utils.threading; import net.sf.picard.reference.IndexedFastaSequenceFile; import org.broadinstitute.sting.BaseTest; -import org.broadinstitute.sting.gatk.iterators.GenomeLocusIterator; import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLocParser; +import org.broadinstitute.sting.utils.distributedutils.*; import org.broadinstitute.sting.utils.exceptions.UserException; import org.testng.Assert; import org.testng.annotations.*; From 172b35372babc6f681e733924d1308bd943c4669 Mon Sep 17 00:00:00 2001 From: Mark DePristo Date: Fri, 22 Jul 2011 09:20:32 -0400 Subject: [PATCH 2/4] Moved all of the distributed GATK code to archive. --- .../ClosableReentrantLock.java | 16 - .../FileBackedGenomeLocProcessingTracker.java | 114 ---- .../GenomeLocProcessingTracker.java | 486 ------------------ .../NoOpGenomeLocProcessingTracker.java | 26 - .../utils/distributedutils/ProcessingLoc.java | 71 --- .../distributedutils/SharedFileLock.java | 171 ------ .../SharedFileThreadSafeLock.java | 75 --- ...haredMemoryGenomeLocProcessingTracker.java | 34 -- .../utils/distributedutils/package-info.java | 28 - .../GenomeLocProcessingTrackerUnitTest.java | 402 --------------- 10 files changed, 1423 deletions(-) delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/ClosableReentrantLock.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/FileBackedGenomeLocProcessingTracker.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTracker.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/NoOpGenomeLocProcessingTracker.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/ProcessingLoc.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileLock.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileThreadSafeLock.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedMemoryGenomeLocProcessingTracker.java delete mode 100644 public/java/src/org/broadinstitute/sting/utils/distributedutils/package-info.java delete mode 100644 public/java/test/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTrackerUnitTest.java diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/ClosableReentrantLock.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/ClosableReentrantLock.java deleted file mode 100644 index 7f0c879e8..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/ClosableReentrantLock.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.broadinstitute.sting.utils.distributedutils; - -import java.util.concurrent.locks.ReentrantLock; - -/** - * Created by IntelliJ IDEA. - * User: depristo - * Date: 1/19/11 - * Time: 9:50 AM - * - * Simple extension of a ReentrantLock that supports a close method. - */ -public class ClosableReentrantLock extends ReentrantLock { - public boolean ownsLock() { return super.isHeldByCurrentThread(); } - public void close() {} -} diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/FileBackedGenomeLocProcessingTracker.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/FileBackedGenomeLocProcessingTracker.java deleted file mode 100644 index eac68cbdd..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/FileBackedGenomeLocProcessingTracker.java +++ /dev/null @@ -1,114 +0,0 @@ -package org.broadinstitute.sting.utils.distributedutils; - -import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.GenomeLocParser; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.*; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * Keeps a copy of the processing locks in a file - */ -public class FileBackedGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - private static final Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); - private static final boolean DEBUG = false; - private static final String READ_MODE = "r"; - private static final String WRITE_MODE = "rws"; - - private final File sharedFile; - private final GenomeLocParser parser; - private long lastReadPosition = 0; - - public FileBackedGenomeLocProcessingTracker(File sharedFile, GenomeLocParser parser, ClosableReentrantLock lock, PrintStream status) { - super(lock, status); - - this.sharedFile = sharedFile; - this.parser = parser; - } - - private RandomAccessFile openFile(String mode) { - try { - return new RandomAccessFile(sharedFile, mode); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } - } - - private void closeFile(RandomAccessFile raFile) { - try { - if ( raFile != null ) raFile.close(); - } catch (IOException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } - } - - @Override - protected List readNewLocs() { - List newPLocs = new ArrayList(); // todo -- gratitous object creation - - if ( sharedFile.exists() ) { - RandomAccessFile raFile = null; - try { - raFile = openFile(READ_MODE); - //logger.warn(String.format("Reading new locs at: file.length=%d last=%d", raFile.length(), lastReadPosition)); - if ( raFile.length() > lastReadPosition ) { - raFile.seek(lastReadPosition); - - int counter = 0; - String line = raFile.readLine(); // Read another line - while ( line != null ) { - String[] parts = line.split(" "); - if ( parts.length != 2 ) throw new ReviewedStingException("BUG: bad sharedFile line '" + line + "' at " + raFile.getFilePointer()); - ProcessingLoc ploc = new ProcessingLoc(parser.parseGenomeLoc(parts[0]), parts[1]); - //logger.warn(" Read " + ploc); - newPLocs.add(ploc); - line = raFile.readLine(); - counter++; - } - lastReadPosition = raFile.getFilePointer(); - if ( DEBUG ) logger.warn(String.format("Read %s locs from file, current pos is %d, # read new locs is %d", - counter, lastReadPosition, newPLocs.size())); - } - } catch (FileNotFoundException e) { - throw new UserException.CouldNotReadInputFile(sharedFile, e); - } catch (IOException e) { - throw new ReviewedStingException("Couldn't read sharedFile " + sharedFile, e); - } finally { - closeFile(raFile); - } - } - - return newPLocs; - } - - @Override - protected void registerNewLocs(Collection plocs) { - RandomAccessFile raFile = null; - - try { - raFile = openFile(WRITE_MODE); - long startPos = raFile.getFilePointer(); - raFile.seek(raFile.length()); - StringBuffer bytes = new StringBuffer(); - for ( ProcessingLoc ploc : plocs ) { - String packet = String.format("%s %s%n", ploc.getLocation(), ploc.getOwner()); - bytes.append(packet); - if ( DEBUG ) logger.warn(String.format("Wrote loc %s to file: %d + %d bytes ending at %d", ploc, startPos, packet.length(), raFile.getFilePointer())); - } - raFile.write(bytes.toString().getBytes()); - //raFile.getChannel().force(true); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } catch (IOException e) { - throw new UserException.CouldNotCreateOutputFile(sharedFile, e); - } finally { - closeFile(raFile); - } - } -} - - diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTracker.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTracker.java deleted file mode 100644 index a7310743b..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTracker.java +++ /dev/null @@ -1,486 +0,0 @@ -package org.broadinstitute.sting.utils.distributedutils; - -import net.sf.picard.reference.IndexedFastaSequenceFile; -import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.GenomeLoc; -import org.broadinstitute.sting.utils.GenomeLocParser; -import org.broadinstitute.sting.utils.HasGenomeLocation; -import org.broadinstitute.sting.utils.SimpleTimer; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.PrintStream; -import java.text.SimpleDateFormat; -import java.util.*; - -/** - * Abstract base class to coordinating data processing by a collecting for processes / threads. - * - * Conceptually, the genome is viewed as a collection of non-overlapping genome location: - * - * chr1:1-10 - * chr1:11-20 - * chr1:21-30 - * etc. - * - * This class, and it's concrete derived classes, provide the ability to claim individual locations - * as "mine", and exclude other processes / threads from processing them. At the lowest-level this - * is implemented by the claimOwnership(loc, name) function, that returns true if loc free (unclaimed) - * and makes name the owner of loc. High-level, and more efficient operations provide claiming - * iterators over streams of objects implementing the HasGenomeLocation interface, so that you can - * write code that looks like: - * - * for ( GenomeLoc ownedLoc : onlyOwned(allLocsToProcess.iterator) ) { - * doSomeWork(ownedLoc) - * - * Much of the code in this class is actually surrounding debugging and performance metrics code. - * The actual synchronization code is separated out into the ClosableReentrantLock() system - * and the two abstract functions: - * - * protected abstract void registerNewLocs(Collection plocs); - * protected abstract Collection readNewLocs(); - * - * That maintain the state of the tracker. - * - * That is, the ProcessingTracker is made of two components: a thread / process locking system and - * a subclass that implements the methods to record new claimed state changes and to read out updates - * that may have occurred by another thread or process. - * - * NOTE: this class assumes that all threads / processes are working with the same set of potential - * GenomeLocs to own. Claiming chr1:1-10 and then chr1:5-6 is allowed by the system. Basically, - * you only can stake claim to GenomeLocs that are .equal(). - */ -public abstract class GenomeLocProcessingTracker { - private final static Logger logger = Logger.getLogger(FileBackedGenomeLocProcessingTracker.class); - private final static SimpleDateFormat STATUS_FORMAT = new SimpleDateFormat("HH:mm:ss,SSS"); - private final static int DEFAULT_OWNERSHIP_ITERATOR_SIZE = 1; - - /** - * Useful state strings for printing status - */ - private final static String GOING_FOR_LOCK = "going_for_lock"; - private final static String RELEASING_LOCK = "releasing_lock"; - private final static String HAVE_LOCK = "have_lock"; - private final static String RUNNING = "running"; - - /** - * A map, for efficiency, that allows quick lookup of the processing loc for a - * given GenomeLoc. The map points from loc -> loc / owner as a ProcessingLoc - */ - private final Map processingLocs; - - /** - * The locking object used to protect data from simulatanous access by multiple - * threads or processes. - */ - private final ClosableReentrantLock lock; - - /** A stream for writing status messages. Can be null if we aren't writing status */ - private final PrintStream status; - - // - // Timers for recording performance information - // Note -- these cannot be used because this class isn't thread safe, and neither are the - // timers, so they result in invalid operations w.r.t. the SimpleTimer contract - // -// protected final SimpleTimer writeTimer = new SimpleTimer("writeTimer"); -// protected final SimpleTimer readTimer = new SimpleTimer("readTimer"); -// protected final SimpleTimer lockWaitTimer = new SimpleTimer("lockWaitTimer"); - protected final SimpleTimer timer = new SimpleTimer(); - protected long nLocks = 0, nWrites = 0, nReads = 0; - - // -------------------------------------------------------------------------------- - // - // Creating ProcessingTrackers - // - // -------------------------------------------------------------------------------- - public GenomeLocProcessingTracker(ClosableReentrantLock lock, PrintStream status) { - this.processingLocs = new HashMap(); - this.status = status; - this.lock = lock; - printStatusHeader(); - } - - // -------------------------------------------------------------------------------- - // - // Code to override to change the dynamics of the the GenomeLocProcessingTracker - // - // -------------------------------------------------------------------------------- - - protected void close() { - lock.close(); - if ( status != null ) status.close(); - } - - /** - * Takes a collection of newly claimed (i.e., previous unclaimed) genome locs - * and the name of their owner and "registers" this data in some persistent way that's - * visible to all threads / processes communicating via this GenomeLocProcessingTracker. - * - * Could be a in-memory data structure (a list) if we are restricting ourselves to intra-memory - * parallelism, a locked file on a shared file system, or a server we communicate with. - * - * @param plocs - */ - protected abstract void registerNewLocs(Collection plocs); - - /** - * The inverse of the registerNewLocs() function. Looks at the persistent data store - * shared by all threads / processes and returns the ones that have appeared since the last - * call to readNewLocs(). Note that we expect the pair of registerNewLocs and readNewLocs to - * include everything, even locs registered by this thread / process. For example: - * - * readNewLocs() => List() - * registerNewLocs(List(x, y,)) => void - * readNewLocs() => List(x,y)) - * - * even for this thread or process. - * @return - */ - protected abstract Collection readNewLocs(); - - - // -------------------------------------------------------------------------------- - // - // Code to claim intervals for processing and query for their ownership - // - // -------------------------------------------------------------------------------- - - /** - * Queries the current database if a location is owned. Does not guarantee that the - * loc can be owned in a future call, though. - * - * @param loc - * @return - */ - public final boolean locIsOwned(GenomeLoc loc, String id) { - return findOwner(loc, id) != null; - } - - /** - * The workhorse routine. Attempt to claim processing ownership of loc, with my name. - * This is an atomic operation -- other threads / processes will wait until this function - * returns. The return result is the ProcessingLoc object describing who owns this - * location. If the location isn't already claimed and we now own the location, the pl owner - * will be myName. Otherwise, the name of the owner can found in the pl. - * - * @param loc - * @param myName - * @return - */ - public final ProcessingLoc claimOwnership(final GenomeLoc loc, final String myName) { - // processingLocs is a shared memory synchronized object, and this - // method is synchronized, so we can just do our processing - return new WithLock(myName) { - public ProcessingLoc doBody() { - ProcessingLoc owner = findOwner(loc, myName); - if ( owner == null ) { // we are unowned - owner = new ProcessingLoc(loc, myName); - registerNewLocsWithTimers(Arrays.asList(owner), myName); - } - return owner; - } - }.run(); - } - - - // -------------------------------------------------------------------------------- - // - // High-level iterator-style interface to claiming ownership - // - // -------------------------------------------------------------------------------- - - /** - * A higher-level, and more efficient, interface to obtain the next location we own. Takes an - * iterator producing objects that support the getLocation() interface, and returns the next - * object in that stream that we can claim ownership of. Returns null if we run out of elements - * during the iteration. - * - * Can be more efficiently implemented in subclasses to avoid multiple unlocking - * - * @param iterator - * @param myName - * @return - */ - public final T claimOwnershipOfNextAvailable(Iterator iterator, String myName) { - OwnershipIterator myIt = new OwnershipIterator(iterator, myName, 1); - return myIt.next(); - } - - public final Iterable onlyOwned(Iterator iterator, String myName) { - return new OwnershipIterator(iterator, myName); - } - - private final class OwnershipIterator implements Iterator, Iterable { - private final Iterator subit; - private final String myName; - private final Queue cache; - private final int cacheSize; - - public OwnershipIterator(Iterator subit, String myName) { - this(subit, myName, DEFAULT_OWNERSHIP_ITERATOR_SIZE); - } - - public OwnershipIterator(Iterator subit, String myName, int cacheSize) { - this.subit = subit; - this.myName = myName; - cache = new LinkedList(); - this.cacheSize = cacheSize; - } - - /** - * Will return true for all elements of subit, even if we can't get ownership of some of the future - * elements and so will return null there - * @return - */ - public final boolean hasNext() { - return cache.peek() != null || subit.hasNext(); - } - - /** - * High performance iterator that only locks and unlocks once per claimed object found. Avoids - * locking / unlocking for each query - * - * @return an object of type T owned by this thread, or null if none of the remaining object could be claimed - */ - public final T next() { - if ( cache.peek() != null) - return cache.poll(); - else { - // cache is empty, we need to fill up the cache and return the first element of the queue - return new WithLock(myName) { - public T doBody() { - // read once the database of owners at the start - updateAndGetProcessingLocs(myName); - - boolean done = false; - Queue pwns = new LinkedList(); // ;-) - while ( !done && cache.size() < cacheSize && subit.hasNext() ) { - final T elt = subit.next(); - GenomeLoc loc = elt.getLocation(); - - ProcessingLoc owner = processingLocs.get(loc); - - if ( owner == null ) { // we are unowned - owner = new ProcessingLoc(loc, myName); - pwns.offer(owner); - if ( ! cache.offer(elt) ) throw new ReviewedStingException("Cache offer unexpectedly failed"); - if ( GenomeLoc.isUnmapped(loc) ) done = true; - } - // if not, we continue our search - } - - registerNewLocsWithTimers(pwns, myName); - - // we've either filled up the cache or run out of elements. Either way we return - // the first element of the cache. If the cache is empty, we return null here. - return cache.poll(); - } - }.run(); - } - } - - public final void remove() { - throw new UnsupportedOperationException(); - } - - public final Iterator iterator() { - return this; - } - } - - // -------------------------------------------------------------------------------- - // - // private / protected low-level accessors / manipulators and utility functions - // - // -------------------------------------------------------------------------------- - - /** - * Useful debugging function that returns the ProcessingLoc who owns loc. ID - * is provided for debugging purposes - * @param loc - * @param id - * @return - */ - protected final ProcessingLoc findOwner(GenomeLoc loc, String id) { - // fast path to check if we already have the existing genome loc in memory for ownership claims - // getProcessingLocs() may be expensive [reading from disk, for example] so we shouldn't call it - // unless necessary - ProcessingLoc x = processingLocs.get(loc); - return x == null ? updateAndGetProcessingLocs(id).get(loc) : x; - } - - /** - * Returns the list of currently owned locations, updating the database as necessary. - * DO NOT MODIFY THIS MAP! As with all parallelizing data structures, the list may be - * out of date immediately after the call returns, or may be updating on the fly. - * @return - */ - protected final Map updateAndGetProcessingLocs(String myName) { - return new WithLock>(myName) { - public Map doBody() { -// readTimer.restart(); - for ( ProcessingLoc p : readNewLocs() ) - processingLocs.put(p.getLocation(), p); -// readTimer.stop(); - nReads++; - return processingLocs; - } - }.run(); - } - - /** - * Wrapper around registerNewLocs that also times the operation - * - * @param plocs - * @param myName - */ - protected final void registerNewLocsWithTimers(Collection plocs, String myName) { -// writeTimer.restart(); - registerNewLocs(plocs); - nWrites++; -// writeTimer.stop(); - } - - private final void printStatusHeader() { - if ( status != null ) status.printf("process.id\thr.time\ttime\tstate%n"); - } - - private final void printStatus(String id, long machineTime, String state) { - // prints a line like processID human-readable-time machine-time state - if ( status != null ) { - status.printf("%s\t%s\t%d\t%s%n", id, STATUS_FORMAT.format(machineTime), machineTime, state); - status.flush(); - } - } - - - /** - * Lock the data structure, preventing other threads / processes from reading and writing to the - * common store - * @param id the name of the process doing the locking - */ - private final void lock(String id) { - //lockWaitTimer.restart(); - boolean hadLock = lock.ownsLock(); - if ( ! hadLock ) { - nLocks++; - //printStatus(id, lockWaitTimer.currentTime(), GOING_FOR_LOCK); - } - lock.lock(); - //lockWaitTimer.stop(); - //if ( ! hadLock ) printStatus(id, lockWaitTimer.currentTime(), HAVE_LOCK); - } - - /** - * Unlock the data structure, allowing other threads / processes to read and write to the common store - * @param id the name of the process doing the unlocking - */ - private final void unlock(String id) { - if ( lock.getHoldCount() == 1 ) printStatus(id, timer.currentTime(), RELEASING_LOCK); - lock.unlock(); - if ( ! lock.ownsLock() ) printStatus(id, timer.currentTime(), RUNNING); - } - - // useful code for getting - public final long getNLocks() { return nLocks; } - public final long getNReads() { return nReads; } - public final long getNWrites() { return nWrites; } -// public final double getTimePerLock() { return lockWaitTimer.getElapsedTime() / Math.max(nLocks, 1); } -// public final double getTimePerRead() { return readTimer.getElapsedTime() / Math.max(nReads,1); } -// public final double getTimePerWrite() { return writeTimer.getElapsedTime() / Math.max(nWrites,1); } - - // -------------------------------------------------------------------------------- - // - // Java-style functional form for with lock do { x }; - // - // -------------------------------------------------------------------------------- - - /** - * Private utility class that executes doBody() method with the lock() acquired and - * handles property unlock()ing the system, even if an error occurs. Allows one to write - * clean code like: - * - * new WithLock(name) { - * public Integer doBody() { doSomething(); return 1; } - * }.run() - * - * @param the return type of the doBody() method - */ - private abstract class WithLock { - private final String myName; - - public WithLock(String myName) { - this.myName = myName; - } - - protected abstract T doBody(); - - public T run() { - boolean locked = false; - try { - lock(myName); - locked = true; - return doBody(); - } finally { - if (locked) unlock(myName); - } - } - } - - // -------------------------------------------------------------------------------- - // - // main function for testing performance - // - // -------------------------------------------------------------------------------- - public static void main(String[] args) { - //BasicConfigurator.configure(); - - final String ref = args[0]; - final File file = new File(args[1]); - final int cycles = Integer.valueOf(args[2]); - - File referenceFile = new File(ref); - try { - final IndexedFastaSequenceFile fasta = new IndexedFastaSequenceFile(referenceFile); - final String chr1 = fasta.getSequenceDictionary().getSequence(1).getSequenceName(); - final GenomeLocParser genomeLocParser = new GenomeLocParser(fasta); - - final class MyTest { - String name; - GenomeLocProcessingTracker tracker; - - MyTest(String name, GenomeLocProcessingTracker tracker) { - this.name = name; - this.tracker = tracker; - } - - public void execute(int cycles) { - SimpleTimer delta = new SimpleTimer("delta"); - SimpleTimer timer = new SimpleTimer("none"); - - if ( file.exists() ) file.delete(); - timer.start(); - delta.start(); - for ( int i = 1; i < cycles; i++ ) { - tracker.claimOwnership(genomeLocParser.createGenomeLoc(chr1, i, i+1), "ABCDEFGHIJKL"); - if ( i % 1000 == 0 ) { - System.out.printf("%s\t%d\t%d\t%.4f\t%.4f%n", name, i, timer.currentTime(), timer.getElapsedTime(), delta.getElapsedTime() ); - delta.restart(); - } - } - } - } - - System.out.printf("name\tcycle\tcurrent.time\telapsed.time\tdelta%n"); - new MyTest("in-memory", new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock())).execute(cycles); - new MyTest("nio", new FileBackedGenomeLocProcessingTracker(file, genomeLocParser, new ClosableReentrantLock(), null)).execute(cycles); - new MyTest("nio-file-lock", new FileBackedGenomeLocProcessingTracker(file, genomeLocParser, new SharedFileThreadSafeLock(file,1), null)).execute(cycles); - } - catch(FileNotFoundException ex) { - throw new UserException.CouldNotReadInputFile(referenceFile,ex); - } - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/NoOpGenomeLocProcessingTracker.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/NoOpGenomeLocProcessingTracker.java deleted file mode 100644 index 9807b6efa..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/NoOpGenomeLocProcessingTracker.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.broadinstitute.sting.utils.distributedutils; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * Base class, and null tracker. Always says that a GenomeLoc is ready for processing. It is - * critical that this class already return that a loc is owned, no matter if it's been seen before, - * etc. ReadShards can differ in their contents but have the same "unmapped" genome loc - */ -public class NoOpGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - public NoOpGenomeLocProcessingTracker() { - super(new ClosableReentrantLock(), null); - } - - @Override - protected void registerNewLocs(Collection loc) { - ; - } - - @Override - protected List readNewLocs() { - return Collections.emptyList(); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/ProcessingLoc.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/ProcessingLoc.java deleted file mode 100644 index 0957ac1ae..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/ProcessingLoc.java +++ /dev/null @@ -1,71 +0,0 @@ -package org.broadinstitute.sting.utils.distributedutils; - -import org.broadinstitute.sting.utils.GenomeLoc; -import org.broadinstitute.sting.utils.HasGenomeLocation; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; - -/** - * Created by IntelliJ IDEA. - * User: depristo - * Date: 1/19/11 - * Time: 8:06 AM - * - * Information about processing locations and their owners. Contains two basic data, associated - * together. The first is a genome loc, and the second is the name of the owner, as a string. - * - * chr1:1-10 Mark - * chr2:11-20 DePristo - * - * would be two ProcessingLocs that first indicate that the first 10 bp of chr1 are owned by Mark, - * and the second is owned by DePristo. - */ -public class ProcessingLoc implements HasGenomeLocation { - private final GenomeLoc loc; - private final String owner; - - /** - * Create a loc that's already owned - * @param loc - * @param owner - */ - public ProcessingLoc(GenomeLoc loc, String owner) { - if ( loc == null || owner == null ) { - throw new ReviewedStingException("BUG: invalid ProcessingLoc detected: " + loc + " owner " + owner); - } - - this.loc = loc; - this.owner = owner.intern(); // reduce memory consumption by interning the string - } - - public GenomeLoc getLocation() { - return loc; - } - - public String getOwner() { - return owner; - } - - /** - * Returns true iff the owner of this processing loc is name. Can be used to determine - * the owner of this processing location. - * - * @param name - * @return - */ - public boolean isOwnedBy(String name) { - return getOwner().equals(name); - } - - public String toString() { return String.format("ProcessingLoc(%s,%s)", loc, owner); } - - public boolean equals(Object other) { - if (other instanceof ProcessingLoc ) - return this.loc.equals(((ProcessingLoc)other).loc) && this.owner.equals(((ProcessingLoc)other).owner); - else - return false; - } - - public int compareTo(ProcessingLoc other) { - return this.getLocation().compareTo(other.getLocation()); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileLock.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileLock.java deleted file mode 100644 index bda62b890..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileLock.java +++ /dev/null @@ -1,171 +0,0 @@ -package org.broadinstitute.sting.utils.distributedutils; - -import org.apache.log4j.Logger; -import org.apache.lucene.store.*; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; - -import java.io.File; -import java.io.IOException; - -/** - * User: depristo - * Date: 1/19/11 - * Time: 8:24 AM - * - * A reentrant lock for a shared file common file in the file system. Relies on a a Lucene SimpleFSLock - * to manage on disk file locking. - */ -public class SharedFileLock extends ClosableReentrantLock { // todo -- kinda gross inheritance. The super lock is never used - private static Logger logger = Logger.getLogger(SharedFileLock.class); - - private static final String VERIFY_HOST = System.getProperty("verify.host", "gsa1"); - private static final boolean VERIFY = false; - private static final int VERIFY_PORT = 5050; - - // 5 minutes => 360 seconds of trying -> failure - protected static final int DEFAULT_N_TRIES = 1000; - protected static final long DEFAULT_MILLISECONDS_PER_TRY = 360; - - /** The file we are locking */ - private final File file; - - private final LockFactory lockFactory; - private Lock fileLock = null; - - /** - * A counter that indicates the number of 'locks' on this file. - * If locks == 2, then two unlocks are required - * before any resources are freed. - */ - int fileLockReentrantCounter = 0; - - // type of locking - private final int nRetries; - private final long milliSecPerTry; - - /** - * Create a SharedFileThreadSafeLock object locking the file - * @param file - */ - public SharedFileLock(File file, int nRetries, long milliSecPerTry, int ID) { - super(); - this.file = file; - this.nRetries = nRetries; - this.milliSecPerTry = milliSecPerTry; - - File lockDir = new File(file.getParent() == null ? "./" : file.getParent()); - try { - LockFactory factory = new SimpleFSLockFactory(lockDir); - if ( VERIFY ) { // don't forget to start up the VerifyLockServer - this.lockFactory = new VerifyingLockFactory((byte)ID, factory, VERIFY_HOST, VERIFY_PORT); - } else { - this.lockFactory = factory; - } - } catch (IOException e) { - throw new UserException.CouldNotCreateOutputFile(lockDir, "Could not create coordination file locking directory " + lockDir, e); - } - } - - public SharedFileLock(File file, int ID) { - this(file, DEFAULT_N_TRIES, DEFAULT_MILLISECONDS_PER_TRY, ID); - } - - @Override - public void close() { - if ( ownsLock() ) throw new ReviewedStingException("closing SharedFileLock while still owned: ownership count " + fileLockReentrantCounter); - } - - @Override - public int getHoldCount() { - return fileLockReentrantCounter; - } - - @Override - public boolean ownsLock() { - return fileLockReentrantCounter > 0; - } - - // ------------------------------------------------------------------------------------------ - // - // workhorse routines -- acquiring file locks - // - // ------------------------------------------------------------------------------------------ - - private boolean obtainFileLock() throws IOException { - // annoying bug work around for verifylockserver - if ( VERIFY ) - try { - return fileLock.obtain(1); - } catch ( LockObtainFailedException e ) { - return false; - } - else - return fileLock.obtain(); - } - - /** - * Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be - * unwound appropriately. Uses file channel lock *after* thread locking. - */ - @Override - public void lock() { - if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" lock() " + Thread.currentThread().getName() + ", fileLockReentrantCounter = " + fileLockReentrantCounter); - if ( fileLockReentrantCounter++ == 0 ) { - // Precondition -- lock is always null while we don't have a lock - if ( fileLock != null ) - throw new ReviewedStingException("BUG: lock() function called when a lock already is owned!"); - - int i = 1; - fileLock = lockFactory.makeLock(file.getName() + ".lock"); - try { - boolean obtained = obtainFileLock(); // todo -- maybe use intrinsic lock features - for ( ; ! obtained && i < nRetries; i++ ) { - try { - //logger.warn("tryLock failed on try " + i + ", waiting " + milliSecPerTry + " millseconds for retry"); - Thread.sleep(milliSecPerTry); - } catch ( InterruptedException e ) { - throw new UserException("SharedFileThreadSafeLock interrupted during wait for file lock", e); - } - obtained = obtainFileLock(); // gross workaround for error in verify server - } - - if ( i > 1 ) logger.warn("tryLock required " + i + " tries before completing, waited " + i * milliSecPerTry + " millseconds"); - - if ( ! obtained ) { - fileLock = null; - // filelock == null -> we never managed to acquire the lock! - throw new UserException("SharedFileThreadSafeLock failed to obtain the lock after " + nRetries + " attempts"); - } - - if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" lock() " + Thread.currentThread().getName() + ", obtained = " + obtained + ", tries = " + i); - } catch (IOException e) { - fileLock = null; - throw new ReviewedStingException("Coordination file could not be created because a lock could not be obtained.", e); - } - } - } - - @Override - public void unlock() { - // update for reentrant unlocking - if ( fileLock == null ) throw new ReviewedStingException("BUG: file lock is null -- file lock was not obtained"); - if ( fileLockReentrantCounter <= 0 ) throw new ReviewedStingException("BUG: file lock counter < 0"); - - // this unlock counts as 1 unlock. If this is our last unlock, actually do something - if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", count = " + fileLockReentrantCounter); - if ( --fileLockReentrantCounter == 0 ) { - try { - if ( ! fileLock.isLocked() ) throw new ReviewedStingException("BUG: call to unlock() when we don't have a valid lock!"); - fileLock.release(); - if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", actually releasing"); - } catch ( IOException e ) { - throw new ReviewedStingException("Could not free file lock on file " + file, e); - } finally { // make sure we null out the filelock, regardless of our state - fileLock = null; - } - } else { - if ( SharedFileThreadSafeLock.DEBUG ) logger.warn(" unlock() " + Thread.currentThread().getName() + ", skipping, count = " + fileLockReentrantCounter); - } - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileThreadSafeLock.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileThreadSafeLock.java deleted file mode 100644 index 49fc8208a..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedFileThreadSafeLock.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.broadinstitute.sting.utils.distributedutils; - -import org.apache.log4j.Logger; -import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; - -import java.io.File; - -/** - * User: depristo - * Date: 1/19/11 - * Time: 8:24 AM - * - * A reentrant lock that supports multi-threaded locking as well as a shared file lock on a common - * file in the file system. It itself a shared memory reenterant lock to managed thread safety and - * contains a SharedFileLock to handle the file integrity. - */ -public class SharedFileThreadSafeLock extends ClosableReentrantLock { - private static Logger logger = Logger.getLogger(SharedFileThreadSafeLock.class); - protected static final boolean DEBUG = false; - - private final SharedFileLock fileLock; - - /** - * Create a SharedFileThreadSafeLock object locking the file - * @param file - */ - public SharedFileThreadSafeLock(File file, int nRetries, long milliSecPerTry, int ID) { - super(); - this.fileLock = new SharedFileLock(file, nRetries, milliSecPerTry, ID); - } - - public SharedFileThreadSafeLock(File file, int ID) { - this(file, SharedFileLock.DEFAULT_N_TRIES, SharedFileLock.DEFAULT_MILLISECONDS_PER_TRY, ID); - } - - @Override - public void close() { - super.close(); - fileLock.close(); - } - - @Override - public int getHoldCount() { - if ( super.getHoldCount() != fileLock.getHoldCount() ) - throw new ReviewedStingException("BUG: unequal hold counts. threadlock = " + super.getHoldCount() + ", filelock = " + fileLock.getHoldCount()); - return super.getHoldCount(); - } - - @Override - public boolean ownsLock() { - return super.isHeldByCurrentThread() && fileLock.ownsLock(); - } - - /** - * Two stage [threading then file] locking mechanism. Reenterant in that multiple lock calls will be - * unwound appropriately. Uses file channel lock *after* thread locking. - */ - @Override - public void lock() { - if ( DEBUG ) logger.warn("Attempting SharedFileThreadSafe lock: " + Thread.currentThread().getName()); - if ( DEBUG ) logger.warn(" going for thread lock: " + Thread.currentThread().getName()); - super.lock(); - if ( DEBUG ) logger.warn(" going for file lock: " + Thread.currentThread().getName()); - fileLock.lock(); // todo -- should this be in a try? - } - - @Override - public void unlock() { - if ( DEBUG ) logger.warn(" releasing filelock: " + Thread.currentThread().getName()); - fileLock.unlock(); - if ( DEBUG ) logger.warn(" releasing threadlock: " + Thread.currentThread().getName()); - super.unlock(); - if ( DEBUG ) logger.warn(" unlock() complete: " + Thread.currentThread().getName()); - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedMemoryGenomeLocProcessingTracker.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedMemoryGenomeLocProcessingTracker.java deleted file mode 100644 index 0e62d0f63..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/SharedMemoryGenomeLocProcessingTracker.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.broadinstitute.sting.utils.distributedutils; - -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * Thread-safe shared memory only implementation. Uses a simple list to manage the newly - * added processing locations. - */ -public class SharedMemoryGenomeLocProcessingTracker extends GenomeLocProcessingTracker { - private List newPLocs = new ArrayList(); - - protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock) { - super(lock, null); - } - - protected SharedMemoryGenomeLocProcessingTracker(ClosableReentrantLock lock, PrintStream status) { - super(lock, status); - } - - @Override - protected void registerNewLocs(Collection plocs) { - newPLocs.addAll(plocs); - } - - @Override - protected List readNewLocs() { - List r = newPLocs; - newPLocs = new ArrayList(); - return r; - } -} diff --git a/public/java/src/org/broadinstitute/sting/utils/distributedutils/package-info.java b/public/java/src/org/broadinstitute/sting/utils/distributedutils/package-info.java deleted file mode 100644 index 033120dc5..000000000 --- a/public/java/src/org/broadinstitute/sting/utils/distributedutils/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2011, The Broad Institute - * - * Permission is hereby granted, free of charge, to any person - * obtaining a copy of this software and associated documentation - * files (the "Software"), to deal in the Software without - * restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following - * conditions: - * - * The above copyright notice and this permission notice shall be - * included in all copies or substantial portions of the Software. - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES - * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT - * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ - -/** - * Utilities for prototype distributed GATK. No longer in use in the codebase - */ -package org.broadinstitute.sting.utils.distributedutils; diff --git a/public/java/test/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTrackerUnitTest.java b/public/java/test/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTrackerUnitTest.java deleted file mode 100644 index 139937e29..000000000 --- a/public/java/test/org/broadinstitute/sting/utils/distributedutils/GenomeLocProcessingTrackerUnitTest.java +++ /dev/null @@ -1,402 +0,0 @@ -// our package -package org.broadinstitute.sting.utils.distributedutils; - - -// the imports for unit testing. - - -import net.sf.picard.reference.IndexedFastaSequenceFile; -import org.broadinstitute.sting.BaseTest; -import org.broadinstitute.sting.utils.GenomeLoc; -import org.broadinstitute.sting.utils.GenomeLocParser; -import org.broadinstitute.sting.utils.distributedutils.*; -import org.broadinstitute.sting.utils.exceptions.UserException; -import org.testng.Assert; -import org.testng.annotations.*; - -import java.io.File; -import java.io.FileNotFoundException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.*; - -/** - * Basic unit test for GenomeLoc - */ -public class GenomeLocProcessingTrackerUnitTest extends BaseTest { - IndexedFastaSequenceFile fasta = null; - GenomeLocParser genomeLocParser = null; - String chr1 = null; - private final static String FILE_ROOT = "public/testdata/GLPTFile"; - - @BeforeTest - public void before() { - File referenceFile = new File(hg18Reference); - try { - fasta = new IndexedFastaSequenceFile(referenceFile); - chr1 = fasta.getSequenceDictionary().getSequence(1).getSequenceName(); - genomeLocParser = new GenomeLocParser(fasta); - - } - catch(FileNotFoundException ex) { - throw new UserException.CouldNotReadInputFile(referenceFile,ex); - } - } - - @BeforeMethod - public void beforeMethod(Object[] data) { - if ( data.length > 0 ) - ((TestTarget)data[0]).init(); - } - - @AfterMethod - public void afterMethod(Object[] data) { - if ( data.length > 0 ) { - ((TestTarget)data[0]).getTracker().close(); - ((TestTarget)data[0]).cleanup(); - } - } - - abstract private class TestTarget { - String name; - int nShards; - int shardSize; - File file; - - public void init() { cleanup(); } - - public void cleanup() { - if ( file != null && file.exists() ) - file.delete(); - } - - public boolean isThreadSafe() { return true; } - - protected TestTarget(String name, int nShards, int shardSize, File file) { - this.name = name; - this.nShards = nShards; - this.shardSize = shardSize; - this.file = file; - } - - public abstract GenomeLocProcessingTracker getTracker(); - - public List getShards() { - List shards = new ArrayList(); - for ( int i = 0; i < nShards; i++ ) { - int start = shardSize * i; - int stop = start + shardSize; - shards.add(genomeLocParser.createGenomeLoc(chr1, start, stop)); - } - return shards; - } - - public String toString() { - return String.format("TestTarget %s: nShards=%d shardSize=%d", name, nShards, shardSize); - } - } - - @DataProvider(name = "threadData") - public Object[][] createThreadData() { - // gotta keep the tests small... - return createData(Arrays.asList(10, 100), Arrays.asList(10)); - //return createData(Arrays.asList(10, 100, 1000, 10000), Arrays.asList(10)); - } - - public Object[][] createData(List nShards, List shardSizes) { - List params = new ArrayList(); - - int counter = 0; - String name = null; - for ( int nShard : nShards ) { - for ( int shardSize : shardSizes ) { - // shared mem -- canonical implementation - params.add(new TestTarget("ThreadSafeSharedMemory", nShard, shardSize, null) { - GenomeLocProcessingTracker tracker = new SharedMemoryGenomeLocProcessingTracker(new ClosableReentrantLock()); - public GenomeLocProcessingTracker getTracker() { return tracker; } - }); - - final File file1 = new File(String.format("%s_ThreadSafeFileBacked_%d_%d", FILE_ROOT, counter++, nShard, shardSize)); - params.add(new TestTarget("ThreadSafeFileBacked", nShard, shardSize, file1) { - GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file1, genomeLocParser, new ClosableReentrantLock(), null); - public GenomeLocProcessingTracker getTracker() { return tracker; } - }); - - name = "FileBackedSharedFileThreadSafe"; - final File file2 = new File(String.format("%s_%s_%d_%d", FILE_ROOT, name, counter++, nShard, shardSize)); - params.add(new TestTarget(name, nShard, shardSize, file2) { - GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file2, genomeLocParser, new SharedFileThreadSafeLock(file2, -1), null); - public GenomeLocProcessingTracker getTracker() { return tracker; } - }); - - name = "FileBackedSharedFile"; - final File file3 = new File(String.format("%s_%s_%d_%d", FILE_ROOT, name, counter++, nShard, shardSize)); - params.add(new TestTarget(name, nShard, shardSize, file3) { - GenomeLocProcessingTracker tracker = new FileBackedGenomeLocProcessingTracker(file3, genomeLocParser, new SharedFileLock(file3, -1), null); - public GenomeLocProcessingTracker getTracker() { return tracker; } - public boolean isThreadSafe() { return false; } - }); - } - } - - List params2 = new ArrayList(); - for ( TestTarget x : params ) params2.add(new Object[]{x}); - return params2.toArray(new Object[][]{}); - } - - @DataProvider(name = "simpleData") - public Object[][] createSimpleData() { - return createData(Arrays.asList(1000), Arrays.asList(100)); - } - - private static final String NAME_ONE = "name1"; - private static final String NAME_TWO = "name2"; - - @Test(enabled = true) - public void testNoop() { - GenomeLocProcessingTracker tracker = new NoOpGenomeLocProcessingTracker(); - for ( int start = 1; start < 100; start++ ) { - for ( int n = 0; n < 2; n++ ) { - GenomeLoc loc = genomeLocParser.createGenomeLoc(chr1, start, start +1); - ProcessingLoc ploc = tracker.claimOwnership(loc, NAME_ONE); - Assert.assertTrue(ploc.isOwnedBy(NAME_ONE)); - Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), 0); - } - } - } - - @Test(dataProvider = "simpleData", enabled = true) - public void testSingleProcessTracker(TestTarget test) { - GenomeLocProcessingTracker tracker = test.getTracker(); - List shards = test.getShards(); - logger.warn("testSingleProcessTracker " + test); - - int counter = 0; - for ( GenomeLoc shard : shards ) { - counter++; - - Assert.assertNull(tracker.findOwner(shard, NAME_ONE)); - Assert.assertFalse(tracker.locIsOwned(shard, NAME_ONE)); - - ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); - Assert.assertNotNull(proc); - Assert.assertNotNull(proc.getLocation()); - Assert.assertNotNull(proc.getOwner()); - Assert.assertEquals(proc.getLocation(), shard); - Assert.assertEquals(proc.getOwner(), NAME_ONE); - Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc); - Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE)); - Assert.assertNotNull(tracker.updateAndGetProcessingLocs(NAME_ONE)); - Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), counter); - - ProcessingLoc badClaimAttempt = tracker.claimOwnership(shard,NAME_TWO); - Assert.assertFalse(badClaimAttempt.getOwner().equals(NAME_TWO)); - Assert.assertEquals(badClaimAttempt.getOwner(), NAME_ONE); - } - } - - @Test(dataProvider = "simpleData", enabled = true) - public void testIterator(TestTarget test) { - GenomeLocProcessingTracker tracker = test.getTracker(); - List shards = test.getShards(); - logger.warn("testIterator " + test); - - List markedShards = new ArrayList(); - List toFind = new ArrayList(); - - for ( int i = 0; i < shards.size(); i++ ) { - if ( ! (i % 10 == 0) ) { - markedShards.add(shards.get(i)); - tracker.claimOwnership(shards.get(i), NAME_TWO); - } else { - toFind.add(shards.get(i)); - } - } - - int nFound = 0; - Iterator it = shards.iterator(); - while ( it.hasNext() ) { - GenomeLoc shard = tracker.claimOwnershipOfNextAvailable(it, NAME_ONE); - - if ( shard == null ) { // everything to get is done - Assert.assertEquals(nFound, toFind.size(), "Didn't find all of the available shards"); - } else { - nFound++; - ProcessingLoc proc = tracker.findOwner(shard, NAME_ONE); - - Assert.assertTrue(proc.isOwnedBy(NAME_ONE)); - Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!"); - Assert.assertTrue(toFind.contains(shard), "Claimed shard wasn't one of the unmarked!"); - } - } - } - - @Test(dataProvider = "simpleData", enabled = true) - public void testMarkedProcesses(TestTarget test) { - GenomeLocProcessingTracker tracker = test.getTracker(); - List shards = test.getShards(); - logger.warn("testMarkedProcesses " + test); - - List markedShards = new ArrayList(); - - for ( int i = 0; i < shards.size(); i++ ) { - if ( i % 2 == 0 ) { - markedShards.add(shards.get(i)); - tracker.claimOwnership(shards.get(i), NAME_TWO); - } - } - - for ( GenomeLoc shard : shards ) { - ProcessingLoc proc = tracker.claimOwnership(shard,NAME_ONE); - - Assert.assertTrue(proc.isOwnedBy(NAME_ONE) || proc.isOwnedBy(NAME_TWO)); - - if ( proc.isOwnedBy(NAME_ONE) ) - Assert.assertTrue(! markedShards.contains(shard), "Ran process was already marked!"); - else - Assert.assertTrue(markedShards.contains(shard), "Unran process wasn't marked"); - - if ( ! markedShards.contains(shard) ) { - Assert.assertEquals(tracker.findOwner(shard, NAME_ONE), proc); - } - } - } - - public class TestThread implements Callable { - public TestTarget test; - public String name; - public List ran, toRun; - boolean useIterator; - - public TestThread(TestTarget test, int count, List toRun, boolean useIterator) { - this.test = test; - this.toRun = toRun; - this.name = "thread" + count; - this.ran = new ArrayList(); - this.useIterator = useIterator; - } - - public Integer call() { - //logger.warn(String.format("Call() Thread %s", name)); - if ( useIterator ) { - for ( GenomeLoc shard : test.getTracker().onlyOwned(toRun.iterator(), name) ) { - if ( shard != null ) { // ignore the unclaimable end of the stream - ran.add(shard); - // do some work here - for ( int sum =0, i = 0; i < 100000; i++) sum += i; - } - } - - } else { - for ( GenomeLoc shard : toRun ) { - //System.out.printf("Claiming ownership in %s on %s%n", name, shard); - ProcessingLoc proc = test.getTracker().claimOwnership(shard,name); - //System.out.printf(" => ownership of %s is %s (I own? %b)%n", shard, proc.getOwner(), proc.isOwnedBy(name)); - if ( proc.isOwnedBy(name) ) { - ran.add(proc.getLocation()); - // do some work here - for ( int sum =0, i = 0; i < 100000; i++) sum += i; - } - //logger.warn(String.format("Thread %s on %s -> owned by %s", name, shard, proc.getOwner())); - } - } - - return 1; - } - } - - private static TestThread findOwner(String name, List threads) { - for ( TestThread thread : threads ) { - if ( thread.name.equals(name) ) - return thread; - } - return null; - } - - private static final void assertAllThreadsFinished(List> futures) { - try { - for ( Future f : futures ) { - Assert.assertTrue(f.isDone(), "Thread never finished running"); - Assert.assertTrue(f.get() != null, "Finished successfully"); - } - } catch (InterruptedException e) { - Assert.fail("Thread failed to run to completion", e); - } catch (ExecutionException e) { - Assert.fail("Thread generated an exception", e); - } - } - - private static final List subList(List l, int i) { - List r = new ArrayList(); - for ( int j = 0; j < l.size(); j++ ) { - if ( j % i == 0 ) - r.add(l.get(j)); - } - - return r; - } - - @Test(dataProvider = "threadData", enabled = true) - public void testThreadedProcessesLowLevelFunctions(TestTarget test) { - testThreading(test, false); - } - - @Test(dataProvider = "threadData", enabled = true) - public void testThreadedProcessesIterator(TestTarget test) { - testThreading(test, true); - } - - private void testThreading(TestTarget test, boolean useIterator) { - if ( ! test.isThreadSafe() ) - // skip tests that aren't thread safe - return; - - // start up 3 threads - logger.warn("ThreadedTesting " + test + " using iterator " + useIterator); - List threads = new ArrayList(); - for ( int i = 0; i < 4; i++) { - List toRun = subList(test.getShards(), i+1); - TestThread thread = new TestThread(test, i, toRun, useIterator); - threads.add(thread); - } - ExecutorService exec = java.util.concurrent.Executors.newFixedThreadPool(threads.size()); - - try { - List> results = exec.invokeAll(threads, 300, TimeUnit.SECONDS); - GenomeLocProcessingTracker tracker = test.getTracker(); - List shards = test.getShards(); - - for ( TestThread thread : threads ) - logger.warn(String.format("TestThread %s ran %d jobs of %d to run", thread.name, thread.ran.size(), thread.toRun.size())); - - assertAllThreadsFinished(results); - - // we ran everything - Assert.assertEquals(tracker.updateAndGetProcessingLocs(NAME_ONE).size(), shards.size(), "Not all shards were run"); - - for ( GenomeLoc shard : shards ) { - Assert.assertTrue(tracker.locIsOwned(shard, NAME_ONE), "Unowned shard"); - - ProcessingLoc proc = tracker.findOwner(shard, NAME_ONE); - Assert.assertNotNull(proc, "Proc was null"); - - Assert.assertNotNull(proc.getOwner(), "Owner was null"); - Assert.assertEquals(proc.getLocation(), shard, "Shard loc doesn't make ProcessingLoc"); - - TestThread owner = findOwner(proc.getOwner(), threads); - Assert.assertNotNull(owner, "Couldn't find owner"); - - Assert.assertTrue(owner.ran.contains(shard), "Owner doesn't contain ran shard"); - - for ( TestThread thread : threads ) - if ( ! proc.isOwnedBy(thread.name) && thread.ran.contains(shard) ) - Assert.fail("Shard appears in another run list: proc=" + proc + " shard=" + shard + " also in jobs of " + thread.name + " obj=" + thread.ran.get(thread.ran.indexOf(shard))); - - } - } catch (InterruptedException e) { - Assert.fail("Thread failure", e); - } - } -} \ No newline at end of file From 7ffedf211ca8123db6d5cb64db526d1114d7fbe4 Mon Sep 17 00:00:00 2001 From: Mauricio Carneiro Date: Sun, 24 Jul 2011 02:24:04 -0400 Subject: [PATCH 3/4] Contig comparator -- sorting contigs like Picard This is very useful if you want to output your text files or manipulate data in the usual chromosome ordering : 1 2 3 ... 21 22 X Y GL??? ... Just use this comparator in any SortedSet class constructor and your data will be sorted like in the BAM file. --- .../sting/utils/ContigComparator.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 public/java/src/org/broadinstitute/sting/utils/ContigComparator.java diff --git a/public/java/src/org/broadinstitute/sting/utils/ContigComparator.java b/public/java/src/org/broadinstitute/sting/utils/ContigComparator.java new file mode 100644 index 000000000..5e573418d --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/ContigComparator.java @@ -0,0 +1,57 @@ +package org.broadinstitute.sting.utils; + +import java.util.Comparator; +import java.util.Set; +import java.util.TreeSet; + +/** + * Created by IntelliJ IDEA. + * User: carneiro + * Date: 7/23/11 + * Time: 6:07 PM + * To change this template use File | Settings | File Templates. + */ +public class ContigComparator implements Comparator { + private Set specialChrs; + + public ContigComparator() { + specialChrs = new TreeSet(); + specialChrs.add("X"); + specialChrs.add("Y"); + } + + public int compare(String chr1, String chr2) { + if (chr1.equals(chr2)) + return 0; + + Integer x = convertStringWithoutException(chr1); + Integer y = convertStringWithoutException(chr2); + // both contigs are numbered + if (x != null && y != null) + return (x < y) ? -1:1; + + // both contigs are named + if (x == null && y == null) { + // both contigs are special contigs or neither contig is a special contigs + if (specialChrs.contains(chr1) && specialChrs.contains(chr2) || (!specialChrs.contains(chr1) && !specialChrs.contains(chr2))) + return chr1.compareTo(chr2); + // one contig is a special and the other is not special + if (specialChrs.contains(chr1)) + return -1; + return 1; + } + + // one contig is named the other is numbered + if (x != null) + return -1; + return 1; + } + + private Integer convertStringWithoutException(String contig) { + Integer x = null; + try { + x = Integer.decode(contig); + } catch (NumberFormatException n){} + return x; + } +}