2009-04-30 05:07:07 +08:00
package org.broadinstitute.sting.gatk.executive ;
2010-05-28 05:57:56 +08:00
import org.apache.log4j.Logger ;
2009-06-12 02:13:22 +08:00
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider ;
2010-03-12 02:40:31 +08:00
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider ;
2011-02-04 01:59:19 +08:00
import org.broadinstitute.sting.gatk.datasources.reads.Shard ;
2009-05-23 05:20:24 +08:00
import org.broadinstitute.sting.gatk.traversals.TraversalEngine ;
2009-08-23 08:56:02 +08:00
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker ;
2009-05-01 04:35:56 +08:00
import org.broadinstitute.sting.gatk.walkers.Walker ;
2010-05-31 02:00:12 +08:00
import org.broadinstitute.sting.gatk.walkers.LocusWalker ;
2010-09-12 23:07:38 +08:00
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException ;
2009-04-30 05:07:07 +08:00
import java.util.concurrent.Callable ;
/ * *
* User : hanna
* Date : Apr 29 , 2009
* Time : 4 : 40 : 38 PM
* BROAD INSTITUTE SOFTWARE COPYRIGHT NOTICE AND AGREEMENT
* Software and documentation are copyright 2005 by the Broad Institute .
* All rights are reserved .
*
* Users acknowledge that this software is supplied without any warranty or support .
* The Broad Institute is not responsible for its use , misuse , or
* functionality .
* /
/ * *
* Carries the walker over a given shard , in a callable interface .
* /
public class ShardTraverser implements Callable {
2009-05-30 06:19:27 +08:00
private HierarchicalMicroScheduler microScheduler ;
2009-04-30 05:07:07 +08:00
private Walker walker ;
2010-03-12 02:40:31 +08:00
private Shard shard ;
2009-05-23 05:20:24 +08:00
private TraversalEngine traversalEngine ;
2009-08-23 08:56:02 +08:00
private ThreadLocalOutputTracker outputTracker ;
private OutputMergeTask outputMergeTask ;
2010-05-28 05:57:56 +08:00
/** our log, which we want to capture anything from this class */
protected static Logger logger = Logger . getLogger ( ShardTraverser . class ) ;
2009-08-23 08:56:02 +08:00
/ * *
* Is this traversal complete ?
* /
private boolean complete = false ;
2009-04-30 05:07:07 +08:00
2009-05-30 06:19:27 +08:00
public ShardTraverser ( HierarchicalMicroScheduler microScheduler ,
TraversalEngine traversalEngine ,
2009-04-30 05:07:07 +08:00
Walker walker ,
2010-03-12 02:40:31 +08:00
Shard shard ,
2010-08-12 04:17:11 +08:00
ThreadLocalOutputTracker outputTracker ) {
2009-05-30 06:19:27 +08:00
this . microScheduler = microScheduler ;
2009-04-30 05:07:07 +08:00
this . walker = walker ;
this . traversalEngine = traversalEngine ;
2010-03-12 02:40:31 +08:00
this . shard = shard ;
2009-08-23 08:56:02 +08:00
this . outputTracker = outputTracker ;
2009-04-30 05:07:07 +08:00
}
public Object call ( ) {
2009-05-02 03:34:09 +08:00
try {
2010-07-15 04:34:43 +08:00
long startTime = System . currentTimeMillis ( ) ;
Object accumulator = walker . reduceInit ( ) ;
LocusWalker lWalker = ( LocusWalker ) walker ;
2011-03-28 04:48:24 +08:00
WindowMaker windowMaker = new WindowMaker ( shard , microScheduler . getEngine ( ) . getGenomeLocParser ( ) , microScheduler . getReadIterator ( shard ) , shard . getGenomeLocs ( ) , microScheduler . engine . getSampleMetadata ( ) ) ; // todo: microScheduler.engine is protected - is it okay to user it here?
2010-07-15 04:34:43 +08:00
ShardDataProvider dataProvider = null ;
2010-03-12 02:40:31 +08:00
for ( WindowMaker . WindowMakerIterator iterator : windowMaker ) {
2010-11-11 01:59:50 +08:00
dataProvider = new LocusShardDataProvider ( shard , iterator . getSourceInfo ( ) , microScheduler . getEngine ( ) . getGenomeLocParser ( ) , iterator . getLocus ( ) , iterator , microScheduler . reference , microScheduler . rods ) ;
2010-03-12 02:40:31 +08:00
accumulator = traversalEngine . traverse ( walker , dataProvider , accumulator ) ;
dataProvider . close ( ) ;
}
2010-07-15 04:34:43 +08:00
2010-05-28 05:57:56 +08:00
if ( dataProvider ! = null ) dataProvider . close ( ) ;
2010-03-12 02:40:31 +08:00
windowMaker . close ( ) ;
2009-08-23 08:56:02 +08:00
outputMergeTask = outputTracker . closeStorage ( ) ;
2010-07-15 04:34:43 +08:00
long endTime = System . currentTimeMillis ( ) ;
microScheduler . reportShardTraverseTime ( endTime - startTime ) ;
return accumulator ;
}
2010-07-23 06:13:01 +08:00
catch ( Throwable t ) {
// Notify that an exception has occurred and rethrow it.
microScheduler . notifyOfTraversalError ( t ) ;
2010-09-12 23:07:38 +08:00
throw new ReviewedStingException ( "An error has occurred during traversal" , t ) ;
2010-07-23 06:13:01 +08:00
}
2010-07-15 04:34:43 +08:00
finally {
2009-08-23 08:56:02 +08:00
synchronized ( this ) {
complete = true ;
notifyAll ( ) ;
}
2009-05-02 03:34:09 +08:00
}
2009-04-30 05:07:07 +08:00
}
2009-08-23 08:56:02 +08:00
/ * *
* Has this traversal completed ?
* @return True if completed , false otherwise .
* /
public boolean isComplete ( ) {
synchronized ( this ) {
return complete ;
}
}
/ * *
* Waits for any the given OutputMerger to be ready for merging .
* /
public void waitForComplete ( ) {
try {
synchronized ( this ) {
if ( isComplete ( ) )
return ;
wait ( ) ;
}
}
catch ( InterruptedException ex ) {
2010-09-12 23:07:38 +08:00
throw new ReviewedStingException ( "Interrupted while waiting for more output to be finalized." , ex ) ;
2009-08-23 08:56:02 +08:00
}
}
/ * *
* Gets the output merge task associated with the given shard .
* @return OutputMergeTask if one exists ; null if nothing needs to be merged .
* /
public OutputMergeTask getOutputMergeTask ( ) {
return outputMergeTask ;
}
2009-04-30 05:07:07 +08:00
}