diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 758238279..0199901a1 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -30,11 +30,6 @@ import net.sf.picard.reference.IndexedFastaSequenceFile; * Requires a special walker tagged with a 'TreeReducible' interface. */ public class HierarchicalMicroScheduler extends MicroScheduler implements HierarchicalMicroSchedulerMBean, ReduceTree.TreeReduceNotifier { - /** - * Counts the number of instances of the class that are currently alive. - */ - private static int instanceNumber = 0; - /** * How many outstanding output merges are allowed before the scheduler stops * allowing new processes and starts merging flat-out. @@ -101,23 +96,6 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar if (!( walker instanceof TreeReducible )) throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers"); - // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. - // To get around this limitation and since we have no job identifier at this point, register a simple counter that - // will count the number of instances of this object that have been created in this JVM. - int thisInstance; - synchronized(HierarchicalMicroScheduler.class) { - thisInstance = instanceNumber++; - } - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - ObjectName name = null; - try { - name = new ObjectName("org.broadinstitute.sting.gatk.executive:type=HierarchicalMicroScheduler,instanceNumber="+thisInstance); - mbs.registerMBean(this, name); - } - catch (JMException ex) { - throw new ReviewedStingException("Unable to register microscheduler with JMX", ex); - } - traversalEngine.startTimers(); ReduceTree reduceTree = new ReduceTree(this); initializeWalker(walker); @@ -164,14 +142,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar throw new ReviewedStingException("Unable to retrieve result", ex); } + // do final cleanup operations outputTracker.close(); - - try { - mbs.unregisterMBean(name); - } - catch (JMException ex) { - throw new ReviewedStingException("Unable to unregister microscheduler with JMX", ex); - } + cleanup(); return result; } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java index 3bb86e737..21a87963b 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroSchedulerMBean.java @@ -16,7 +16,7 @@ package org.broadinstitute.sting.gatk.executive; * An interface for retrieving runtime statistics about how the hierarchical * microscheduler is behaving. */ -public interface HierarchicalMicroSchedulerMBean { +public interface HierarchicalMicroSchedulerMBean extends MicroSchedulerMBean { /** * What is the total number of shards assigned to this microscheduler? * @return Total number of shards to process. diff --git a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index caf395730..63829072d 100644 --- a/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -75,6 +75,7 @@ public class LinearMicroScheduler extends MicroScheduler { printOnTraversalDone(result,engine.getCumulativeMetrics()); outputTracker.close(); + cleanup(); return accumulator; } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index de7623de6..52dfba70c 100755 --- a/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -30,21 +30,26 @@ import org.broadinstitute.sting.gatk.datasources.shards.Shard; import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy; import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource; -import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID; import org.broadinstitute.sting.gatk.traversals.*; import org.broadinstitute.sting.gatk.walkers.*; import org.broadinstitute.sting.gatk.io.OutputTracker; import org.broadinstitute.sting.gatk.iterators.StingSAMIterator; import org.broadinstitute.sting.gatk.iterators.NullSAMIterator; -import org.broadinstitute.sting.gatk.ReadProperties; import org.broadinstitute.sting.gatk.GenomeAnalysisEngine; import org.broadinstitute.sting.gatk.ReadMetrics; +import java.io.File; +import java.lang.management.ManagementFactory; import java.util.*; import net.sf.picard.reference.IndexedFastaSequenceFile; +import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.ObjectName; + /** * Created by IntelliJ IDEA. @@ -55,9 +60,14 @@ import org.broadinstitute.sting.utils.exceptions.UserException; */ /** Shards and schedules data in manageable chunks. */ -public abstract class MicroScheduler { +public abstract class MicroScheduler implements MicroSchedulerMBean { protected static Logger logger = Logger.getLogger(MicroScheduler.class); + /** + * Counts the number of instances of the class that are currently alive. + */ + private static int instanceNumber = 0; + /** * The engine invoking this scheduler. */ @@ -69,6 +79,9 @@ public abstract class MicroScheduler { private final SAMDataSource reads; protected final Collection rods; + private final MBeanServer mBeanServer; + private final ObjectName mBeanName; + /** * MicroScheduler factory function. Create a microscheduler appropriate for reducing the * selected walker. @@ -123,6 +136,19 @@ public abstract class MicroScheduler { } traversalEngine.initialize(engine); + + // JMX does not allow multiple instances with the same ObjectName to be registered with the same platform MXBean. + // To get around this limitation and since we have no job identifier at this point, register a simple counter that + // will count the number of instances of this object that have been created in this JVM. + int thisInstance = instanceNumber++; + mBeanServer = ManagementFactory.getPlatformMBeanServer(); + try { + mBeanName = new ObjectName("org.broadinstitute.sting.gatk.executive:type=MicroScheduler,instanceNumber="+thisInstance); + mBeanServer.registerMBean(this, mBeanName); + } + catch (JMException ex) { + throw new ReviewedStingException("Unable to register microscheduler with JMX", ex); + } } /** @@ -175,4 +201,45 @@ public abstract class MicroScheduler { * @return The reference maintained by this scheduler. */ public IndexedFastaSequenceFile getReference() { return reference; } + + /** + * Gets the filename to which performance data is currently being written. + * @return Filename to which performance data is currently being written. + */ + public String getPerformanceLogFileName() { + return traversalEngine.getPerformanceLogFileName(); + } + + /** + * Set the filename of the log for performance. If set, + * @param fileName filename to use when writing performance data. + */ + public void setPerformanceLogFileName(String fileName) { + traversalEngine.setPerformanceLogFileName(fileName); + } + + /** + * Gets the frequency with which performance data is written. + * @return Frequency, in seconds, of performance log writes. + */ + public long getPerformanceProgressPrintFrequencySeconds() { + return traversalEngine.getPerformanceProgressPrintFrequencySeconds(); + } + + /** + * How often should the performance log message be written? + * @param seconds number of seconds between messages indicating performance frequency. + */ + public void setPerformanceProgressPrintFrequencySeconds(long seconds) { + traversalEngine.setPerformanceProgressPrintFrequencySeconds(seconds); + } + + protected void cleanup() { + try { + mBeanServer.unregisterMBean(mBeanName); + } + catch (JMException ex) { + throw new ReviewedStingException("Unable to unregister microscheduler with JMX", ex); + } + } } diff --git a/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java b/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java new file mode 100644 index 000000000..e510822b8 --- /dev/null +++ b/java/src/org/broadinstitute/sting/gatk/executive/MicroSchedulerMBean.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ +package org.broadinstitute.sting.gatk.executive; + +/** + * Created by IntelliJ IDEA. + * User: mhanna + * Date: Jan 12, 2011 + * Time: 9:19:27 PM + * To change this template use File | Settings | File Templates. + */ +public interface MicroSchedulerMBean { + /** + * Gets the filename to which performance data is currently being written. + * @return Filename to which performance data is currently being written. + */ + public String getPerformanceLogFileName(); + + /** + * Set the filename of the log for performance. If set, + * @param fileName filename to use when writing performance data. + */ + public void setPerformanceLogFileName(String fileName); + + /** + * Gets the frequency with which performance data is written. + * @return Frequency, in seconds, of performance log writes. + */ + public long getPerformanceProgressPrintFrequencySeconds(); + + /** + * How often should the performance log message be written? + * @param seconds number of seconds between messages indicating performance frequency. + */ + public void setPerformanceProgressPrintFrequencySeconds(long seconds); +} diff --git a/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java b/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java index c25ed6c24..2a86197fc 100755 --- a/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java +++ b/java/src/org/broadinstitute/sting/gatk/traversals/TraversalEngine.java @@ -34,6 +34,7 @@ import org.broadinstitute.sting.utils.*; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.exceptions.UserException; +import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; @@ -115,10 +116,12 @@ public abstract class TraversalEngine,Provide // How long can we go without printing some progress info? private long lastProgressPrintTime = -1; // When was the last time we printed progress log? - private final long PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in seconds + private long PROGRESS_PRINT_FREQUENCY = 10 * 1000; // in seconds // for performance log private static final boolean PERFORMANCE_LOG_ENABLED = true; + private final Object performanceLogLock = new Object(); + private File performanceLogFile; private PrintStream performanceLog = null; private long lastPerformanceLogPrintTime = -1; // When was the last time we printed to the performance log? private final long PERFORMANCE_LOG_PRINT_FREQUENCY = PROGRESS_PRINT_FREQUENCY; // in seconds @@ -172,12 +175,9 @@ public abstract class TraversalEngine,Provide this.engine = engine; if ( PERFORMANCE_LOG_ENABLED && engine.getArguments() != null && engine.getArguments().performanceLog != null ) { - try { - performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog)); - List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); - performanceLog.println(Utils.join("\t", pLogHeader)); - } catch (FileNotFoundException e) { - throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e); + synchronized(this.performanceLogLock) { + performanceLogFile = engine.getArguments().performanceLog; + createNewPerformanceLog(); } } @@ -189,6 +189,17 @@ public abstract class TraversalEngine,Provide targetSize = targetIntervals.coveredSize(); } + private void createNewPerformanceLog() { + synchronized(performanceLogLock) { + try { + performanceLog = new PrintStream(new FileOutputStream(engine.getArguments().performanceLog)); + List pLogHeader = Arrays.asList("elapsed.time", "units.processed", "processing.speed", "bp.processed", "bp.speed", "genome.fraction.complete", "est.total.runtime", "est.time.remaining"); + performanceLog.println(Utils.join("\t", pLogHeader)); + } catch (FileNotFoundException e) { + throw new UserException.CouldNotCreateOutputFile(engine.getArguments().performanceLog, e); + } + } + } /** * Should be called to indicate that we're going to process records and the timer should start ticking */ @@ -273,9 +284,11 @@ public abstract class TraversalEngine,Provide if ( printLog ) { lastPerformanceLogPrintTime = curTime; - performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", - elapsed.t, nRecords, unitRate.t, last.bpProcessed, bpRate.t, - fractionGenomeTargetCompleted, estTotalRuntime.t, timeToCompletion.t); + synchronized(performanceLogLock) { + performanceLog.printf("%.2f\t%d\t%.2e\t%d\t%.2e\t%.2e\t%.2f\t%.2f%n", + elapsed.t, nRecords, unitRate.t, last.bpProcessed, bpRate.t, + fractionGenomeTargetCompleted, estTotalRuntime.t, timeToCompletion.t); + } } } } @@ -345,4 +358,50 @@ public abstract class TraversalEngine,Provide if ( performanceLog != null ) performanceLog.close(); } + + /** + * Gets the filename to which performance data is currently being written. + * @return Filename to which performance data is currently being written. + */ + public String getPerformanceLogFileName() { + synchronized(performanceLogLock) { + return performanceLogFile.getAbsolutePath(); + } + } + + /** + * Sets the filename of the log for performance. If set, will write performance data. + * @param fileName filename to use when writing performance data. + */ + public void setPerformanceLogFileName(String fileName) { + File file = new File(fileName); + + synchronized(performanceLogLock) { + // Ignore multiple calls to reset the same lock. + if(performanceLogFile != null && performanceLogFile.equals(fileName)) + return; + + // Close an existing log + if(performanceLog != null) performanceLog.close(); + + performanceLogFile = file; + createNewPerformanceLog(); + } + } + + /** + * Gets the frequency with which performance data is written. + * @return Frequency, in seconds, of performance log writes. + */ + public long getPerformanceProgressPrintFrequencySeconds() { + return PROGRESS_PRINT_FREQUENCY; + } + + /** + * How often should the performance log message be written? + * @param seconds number of seconds between messages indicating performance frequency. + */ + public void setPerformanceProgressPrintFrequencySeconds(long seconds) { + PROGRESS_PRINT_FREQUENCY = seconds; + } }