Temporary commit of parallelization support for RealignerTargetCreator. Tim begged us for this and I got assurances from Khalid/Matt that this would also be extremely helpful for the whole genome calling pipeline, so I spent a while working on this. Needs to be fixed up though because apparently only the leaves in the hierarchical reduce get their output aggregated. Worked out a better solution with Matt.

This commit is contained in:
Eric Banks 2011-10-06 13:41:36 -04:00
parent c3eff7451a
commit c4dfc1fb8b
1 changed files with 99 additions and 20 deletions

View File

@ -103,7 +103,7 @@ import java.util.List;
@Allows(value={DataSource.READS, DataSource.REFERENCE})
@By(DataSource.REFERENCE)
@BAQMode(ApplicationTime = BAQ.ApplicationTime.FORBIDDEN)
public class RealignerTargetCreator extends RodWalker<RealignerTargetCreator.Event, RealignerTargetCreator.Event> {
public class RealignerTargetCreator extends RodWalker<RealignerTargetCreator.Event, RealignerTargetCreator.EventPair> implements TreeReducible<RealignerTargetCreator.EventPair> {
/**
* The target intervals for realignment.
@ -251,38 +251,117 @@ public class RealignerTargetCreator extends RodWalker<RealignerTargetCreator.Eve
return new Event(eventLoc, furthestStopPos, eventType);
}
public void onTraversalDone(Event sum) {
if ( sum != null && sum.isReportableEvent() )
out.println(sum.toString());
public void onTraversalDone(EventPair sum) {
if ( sum.left != null && sum.left.isReportableEvent() )
out.println(sum.left.toString());
if ( sum.right != null && sum.right.isReportableEvent() )
out.println(sum.right.toString());
}
public Event reduceInit() {
return null;
public EventPair reduceInit() {
return new EventPair(null, null);
}
public Event reduce(Event value, Event sum) {
// ignore no new events
if ( value == null )
return sum;
public EventPair treeReduce(EventPair lhs, EventPair rhs) {
EventPair result;
// if it's the first good value, use it
if ( sum == null )
return value;
if ( lhs.left == null ) {
result = rhs;
} else if ( rhs.left == null ) {
result = lhs;
} else if ( lhs.right == null ) {
if ( rhs.right == null ) {
if ( canBeMerged(lhs.left, rhs.left) )
result = new EventPair(mergeEvents(lhs.left, rhs.left), null);
else
result = new EventPair(lhs.left, rhs.left);
} else {
if ( canBeMerged(lhs.left, rhs.left) )
result = new EventPair(mergeEvents(lhs.left, rhs.left), rhs.right);
else {
if ( rhs.left.isReportableEvent() )
out.println(rhs.left.toString());
result = new EventPair(lhs.left, rhs.right);
}
}
} else if ( rhs.right == null ) {
if ( canBeMerged(lhs.right, rhs.left) )
result = new EventPair(lhs.left, mergeEvents(lhs.right, rhs.left));
else {
if ( lhs.right.isReportableEvent() )
out.println(lhs.right.toString());
result = new EventPair(lhs.left, rhs.left);
}
} else {
if ( canBeMerged(lhs.right, rhs.left) ) {
Event merge = mergeEvents(lhs.right, rhs.left);
if ( merge.isReportableEvent() )
out.println(merge.toString());
} else {
if ( lhs.right.isReportableEvent() )
out.println(lhs.right.toString());
if ( rhs.left.isReportableEvent() )
out.println(rhs.left.toString());
}
// if we hit a new contig or they have no overlapping reads, then they are separate events - so clear sum
if ( sum.loc.getContigIndex() != value.loc.getContigIndex() || sum.furthestStopPos < value.loc.getStart() ) {
if ( sum.isReportableEvent() )
out.println(sum.toString());
return value;
result = new EventPair(lhs.left, rhs.right);
}
return result;
}
public EventPair reduce(Event value, EventPair sum) {
if ( value == null ) {
; // do nothing
} else if ( sum.left == null ) {
sum.left = value;
} else if ( sum.right == null ) {
if ( canBeMerged(sum.left, value) )
sum.left = mergeEvents(sum.left, value);
// While ideally we shouldn't do anything differently depending on the number of threads, there is a practical reason for doing so:
// if we know that we are single-threaded then we can ensure that the intervals are emitted in order, which leads to a large
// performance improvement (esp. memory) in the IndelRealigner (because we don't have to store all of the intervals in memory to sort them).
else if ( getToolkit().getArguments().numberOfThreads > 1 )
sum.right = value;
else {
if ( sum.left.isReportableEvent() )
out.println(sum.left.toString());
sum.left = value;
}
} else {
if ( canBeMerged(sum.right, value) )
sum.right = mergeEvents(sum.right, value);
else {
if ( sum.right.isReportableEvent() )
out.println(sum.right.toString());
sum.right = value;
}
}
// otherwise, merge the two events
sum.merge(value);
return sum;
}
static private boolean canBeMerged(Event left, Event right) {
return left.loc.getContigIndex() == right.loc.getContigIndex() && left.furthestStopPos >= right.loc.getStart();
}
@com.google.java.contract.Requires({"left != null", "right != null"})
static private Event mergeEvents(Event left, Event right) {
left.merge(right);
return left;
}
private enum EVENT_TYPE { POINT_EVENT, INDEL_EVENT, BOTH }
class EventPair {
public Event left, right;
public EventPair(Event left, Event right) {
this.left = left;
this.right = right;
}
}
class Event {
public int furthestStopPos;