Optimizations for parallel read walkers
-- TraversalReadsNano only creates the NanoScheduler once, and shuts it down onTraversalDone -- Nicer debugging output in NanoScheduler -- ReadShard has a getBufferSize() method now
This commit is contained in:
parent
5066b14335
commit
fde9824765
|
|
@ -58,6 +58,15 @@ public class ReadShard extends Shard {
|
||||||
MAX_READS = bufferSize;
|
MAX_READS = bufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What read buffer size are we using?
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static int getReadBufferSize() {
|
||||||
|
return MAX_READS;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if this shard is meant to buffer reads, rather
|
* Returns true if this shard is meant to buffer reads, rather
|
||||||
* than just holding pointers to their locations.
|
* than just holding pointers to their locations.
|
||||||
|
|
|
||||||
|
|
@ -51,12 +51,12 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
|
||||||
/** our log, which we want to capture anything from this class */
|
/** our log, which we want to capture anything from this class */
|
||||||
protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class);
|
protected static final Logger logger = Logger.getLogger(TraverseReadsNano.class);
|
||||||
private static final boolean DEBUG = false;
|
private static final boolean DEBUG = false;
|
||||||
final int bufferSize = ReadShard.MAX_READS;
|
final NanoScheduler<SAMRecord, M, T> nanoScheduler;
|
||||||
final int mapGroupSize = bufferSize / 10 + 1;
|
|
||||||
final int nThreads;
|
|
||||||
|
|
||||||
public TraverseReadsNano(int nThreads) {
|
public TraverseReadsNano(int nThreads) {
|
||||||
this.nThreads = nThreads;
|
final int bufferSize = ReadShard.getReadBufferSize() + 1; // actually has 1 more than max
|
||||||
|
final int mapGroupSize = bufferSize / 10 + 1;
|
||||||
|
nanoScheduler = new NanoScheduler<SAMRecord, M, T>(bufferSize, mapGroupSize, nThreads);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -87,18 +87,23 @@ public class TraverseReadsNano<M,T> extends TraversalEngine<M,T,ReadWalker<M,T>,
|
||||||
final ReadReferenceView reference = new NotImplementedReadReferenceView(dataProvider);
|
final ReadReferenceView reference = new NotImplementedReadReferenceView(dataProvider);
|
||||||
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
|
final ReadBasedReferenceOrderedView rodView = new ReadBasedReferenceOrderedView(dataProvider);
|
||||||
|
|
||||||
final NanoScheduler<SAMRecord, M, T> nanoScheduler = new NanoScheduler<SAMRecord, M, T>(bufferSize, mapGroupSize, nThreads);
|
|
||||||
nanoScheduler.setDebug(DEBUG);
|
nanoScheduler.setDebug(DEBUG);
|
||||||
final TraverseReadsMap myMap = new TraverseReadsMap(reads, reference, rodView, walker);
|
final TraverseReadsMap myMap = new TraverseReadsMap(reads, reference, rodView, walker);
|
||||||
final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker);
|
final TraverseReadsReduce myReduce = new TraverseReadsReduce(walker);
|
||||||
|
|
||||||
T result = nanoScheduler.execute(reads.iterator().iterator(), myMap, sum, myReduce);
|
T result = nanoScheduler.execute(reads.iterator().iterator(), myMap, sum, myReduce);
|
||||||
nanoScheduler.shutdown();
|
// TODO -- how do we print progress?
|
||||||
//printProgress(dataProvider.getShard(), ???);
|
//printProgress(dataProvider.getShard(), ???);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void printOnTraversalDone() {
|
||||||
|
nanoScheduler.shutdown();
|
||||||
|
super.printOnTraversalDone(); //To change body of overridden methods use File | Settings | File Templates.
|
||||||
|
}
|
||||||
|
|
||||||
private static class NotImplementedReadReferenceView extends ReadReferenceView {
|
private static class NotImplementedReadReferenceView extends ReadReferenceView {
|
||||||
private NotImplementedReadReferenceView(ShardDataProvider provider) {
|
private NotImplementedReadReferenceView(ShardDataProvider provider) {
|
||||||
super(provider);
|
super(provider);
|
||||||
|
|
|
||||||
|
|
@ -136,7 +136,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
|
|
||||||
private void debugPrint(final String format, Object ... args) {
|
private void debugPrint(final String format, Object ... args) {
|
||||||
if ( isDebug() )
|
if ( isDebug() )
|
||||||
logger.info(String.format(format, args));
|
logger.info("Thread " + Thread.currentThread().getId() + ":" + String.format(format, args));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -205,7 +205,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
||||||
final MapFunction<InputType, MapType> map,
|
final MapFunction<InputType, MapType> map,
|
||||||
final ReduceType initialValue,
|
final ReduceType initialValue,
|
||||||
final ReduceFunction<MapType, ReduceType> reduce) {
|
final ReduceFunction<MapType, ReduceType> reduce) {
|
||||||
debugPrint("Executing nanoScheduler with initial reduce value " + initialValue);
|
debugPrint("Executing nanoScheduler");
|
||||||
ReduceType sum = initialValue;
|
ReduceType sum = initialValue;
|
||||||
while ( inputReader.hasNext() ) {
|
while ( inputReader.hasNext() ) {
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue