Support for initialize() and onTraversalDone() output from parallelized walkers.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@1911 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2009-10-26 20:18:31 +00:00
parent 62c1001790
commit a1e8a532ad
3 changed files with 72 additions and 22 deletions

View File

@ -102,7 +102,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
ReduceTree reduceTree = new ReduceTree(this); ReduceTree reduceTree = new ReduceTree(this);
walker.initialize(); initializeWalker(walker);
for (Shard shard : shardStrategy) for (Shard shard : shardStrategy)
traverseTasks.add(shard); traverseTasks.add(shard);
@ -124,29 +124,59 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
queueNextShardTraverse(walker, reduceTree); queueNextShardTraverse(walker, reduceTree);
} }
threadPool.shutdown();
// Merge any lingering output files. If these files aren't ready, // Merge any lingering output files. If these files aren't ready,
// sit around and wait for them, then merge them. // sit around and wait for them, then merge them.
mergeExistingOutput(true); mergeExistingOutput(true);
threadPool.shutdown();
Object result = null; Object result = null;
try { try {
result = reduceTree.getResult().get(); result = reduceTree.getResult().get();
notifyTraversalDone(walker,result);
} }
catch (Exception ex) { catch (Exception ex) {
throw new StingException("Unable to retrieve result", ex); throw new StingException("Unable to retrieve result", ex);
} }
walker.onTraversalDone(result); outputTracker.close();
printOnTraversalDone(result);
getOutputTracker().close();
return result; return result;
} }
/**
* Run the initialize method of the walker. Ensure that any calls
* to the output stream will bypass thread local storage and write
* directly to the output file.
* @param walker Walker to initialize.
*/
protected void initializeWalker(Walker walker) {
outputTracker.bypassThreadLocalStorage(true);
try {
walker.initialize();
}
finally {
outputTracker.bypassThreadLocalStorage(false);
}
}
/**
* Run the initialize method of the walker. Ensure that any calls
* to the output stream will bypass thread local storage and write
* directly to the output file.
* @param walker Walker to initialize.
*/
protected void notifyTraversalDone(Walker walker, Object result) {
outputTracker.bypassThreadLocalStorage(true);
try {
walker.onTraversalDone(result);
printOnTraversalDone(result);
}
finally {
outputTracker.bypassThreadLocalStorage(false);
}
}
/** /**
* @{inheritDoc} * @{inheritDoc}
*/ */

View File

@ -59,7 +59,7 @@ public class LinearMicroScheduler extends MicroScheduler {
printOnTraversalDone(result); printOnTraversalDone(result);
getOutputTracker().close(); outputTracker.close();
return accumulator; return accumulator;
} }

View File

@ -36,7 +36,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
/** /**
* An output tracker that tracks its output per-thread. * An output tracker that can either track its output per-thread or directly,
* *
* @author mhanna * @author mhanna
* @version 0.1 * @version 0.1
@ -47,7 +47,26 @@ public class ThreadLocalOutputTracker extends OutputTracker {
*/ */
private ThreadLocal<Map<Stub, Storage>> storage = new ThreadLocal<Map<Stub,Storage>>(); private ThreadLocal<Map<Stub, Storage>> storage = new ThreadLocal<Map<Stub,Storage>>();
/**
* A total hack. If bypass = true, bypass thread local storage and write directly
* to the target file. Used to handle output during initialize() and onTraversalDone().
*/
private boolean bypass = false;
public void bypassThreadLocalStorage(boolean bypass) {
this.bypass = bypass;
}
public <T> T getStorage( Stub<T> stub ) { public <T> T getStorage( Stub<T> stub ) {
Storage target;
if(bypass) {
target = outputs.get(stub);
if( target == null ) {
target = StorageFactory.createStorage(stub);
outputs.put(stub, target);
}
}
else {
Map<Stub,Storage> threadLocalOutputStreams = storage.get(); Map<Stub,Storage> threadLocalOutputStreams = storage.get();
if( threadLocalOutputStreams == null ) { if( threadLocalOutputStreams == null ) {
@ -55,11 +74,12 @@ public class ThreadLocalOutputTracker extends OutputTracker {
storage.set( threadLocalOutputStreams ); storage.set( threadLocalOutputStreams );
} }
Storage target = threadLocalOutputStreams.get(stub); target = threadLocalOutputStreams.get(stub);
if( target == null ) { if( target == null ) {
target = StorageFactory.createStorage(stub, createTempFile(stub)); target = StorageFactory.createStorage(stub, createTempFile(stub));
threadLocalOutputStreams.put(stub, target); threadLocalOutputStreams.put(stub, target);
} }
}
return (T)target; return (T)target;
} }
@ -68,13 +88,13 @@ public class ThreadLocalOutputTracker extends OutputTracker {
* Close down any existing temporary files which have been opened. * Close down any existing temporary files which have been opened.
*/ */
public OutputMergeTask closeStorage() { public OutputMergeTask closeStorage() {
Map<Stub,Storage> threadLocalStorage = storage.get(); Map<Stub,Storage> threadLocalOutputStreams = storage.get();
if( threadLocalStorage == null || threadLocalStorage.isEmpty() ) if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() )
return null; return null;
OutputMergeTask outputMergeTask = new OutputMergeTask(); OutputMergeTask outputMergeTask = new OutputMergeTask();
for( Map.Entry<Stub,Storage> entry: threadLocalStorage.entrySet() ) { for( Map.Entry<Stub,Storage> entry: threadLocalOutputStreams.entrySet() ) {
Stub stub = entry.getKey(); Stub stub = entry.getKey();
Storage storageEntry = entry.getValue(); Storage storageEntry = entry.getValue();
@ -82,7 +102,7 @@ public class ThreadLocalOutputTracker extends OutputTracker {
outputMergeTask.addMergeOperation(getTargetStream(stub),storageEntry); outputMergeTask.addMergeOperation(getTargetStream(stub),storageEntry);
} }
threadLocalStorage.clear(); threadLocalOutputStreams.clear();
return outputMergeTask; return outputMergeTask;
} }