MergingIterator completely re-done. Now it is not a generic class (sorry guys), but rather it is tailored for merging ROD tracks. This implementation peeks the locations of next ROD annotations in each track, but does not actually read these RODs from underlying streams until the location is reached and it is time to actually return the object. Now underlying ROD track iterators (registered in the resource pool!) are not advanced prematurely past the current position and all the way to the next ROD record wherever it is, so that the sharding system can reuse them.
git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2582 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
c0891d512f
commit
d85461c463
|
|
@ -32,7 +32,7 @@ public class RodLocusView extends LocusView implements ReferenceOrderedView {
|
|||
/**
|
||||
* The data sources along with their current states.
|
||||
*/
|
||||
private MergingIterator<RODRecordList<ReferenceOrderedDatum>> rodQueue = null;
|
||||
private MergingIterator<ReferenceOrderedDatum> rodQueue = null;
|
||||
|
||||
RefMetaDataTracker tracker = null;
|
||||
GenomeLoc lastLoc = null;
|
||||
|
|
@ -86,7 +86,7 @@ public class RodLocusView extends LocusView implements ReferenceOrderedView {
|
|||
}
|
||||
}
|
||||
|
||||
rodQueue = new MergingIterator<RODRecordList<ReferenceOrderedDatum>>(iterators);
|
||||
rodQueue = new MergingIterator<ReferenceOrderedDatum>(iterators);
|
||||
|
||||
//throw new StingException("RodLocusView currently disabled");
|
||||
}
|
||||
|
|
@ -99,8 +99,7 @@ public class RodLocusView extends LocusView implements ReferenceOrderedView {
|
|||
if ( ! rodQueue.hasNext() )
|
||||
return false;
|
||||
else {
|
||||
RODRecordList<ReferenceOrderedDatum> peeked = rodQueue.peek();
|
||||
return ! peeked.getLocation().isPast(shard.getGenomeLoc());
|
||||
return ! rodQueue.peekLocation().isPast(shard.getGenomeLoc());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,38 +1,55 @@
|
|||
package org.broadinstitute.sting.utils;
|
||||
|
||||
import org.broadinstitute.sting.gatk.iterators.PeekingIterator;
|
||||
import org.broadinstitute.sting.gatk.iterators.PushbackIterator;
|
||||
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedDatum;
|
||||
import org.broadinstitute.sting.gatk.refdata.SeekableRODIterator;
|
||||
import org.broadinstitute.sting.gatk.refdata.RODRecordList;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
public class MergingIterator<E extends Comparable<E>> implements Iterator<E>, PeekingIterator<E>, Iterable<E> {
|
||||
public class MergingIterator<ROD extends ReferenceOrderedDatum> implements Iterator<RODRecordList<ROD>>, Iterable<RODRecordList<ROD>> {
|
||||
PriorityQueue<Element> queue = new PriorityQueue<Element>();
|
||||
|
||||
private class Element implements Comparable<Element> {
|
||||
public Iterator<E> it = null;
|
||||
public E value = null;
|
||||
public SeekableRODIterator it = null;
|
||||
//public E value = null;
|
||||
public GenomeLoc nextLoc = null;
|
||||
|
||||
public Element(Iterator<E> it) {
|
||||
this.it = it;
|
||||
update();
|
||||
public Element(Iterator<RODRecordList<ROD>> it) {
|
||||
if ( it instanceof SeekableRODIterator ) {
|
||||
this.it = (SeekableRODIterator)it;
|
||||
if ( ! it.hasNext() ) throw new StingException("Iterator is empty");
|
||||
update();
|
||||
} else {
|
||||
throw new StingException("Iterator passed to MergingIterator is not SeekableRODIterator");
|
||||
}
|
||||
}
|
||||
|
||||
public Element update() {
|
||||
if ( ! it.hasNext() )
|
||||
throw new RuntimeException("it is empty");
|
||||
|
||||
E prev = value;
|
||||
value = it.next();
|
||||
//System.out.printf("Updating %s to prev=%s, next=%s%n", this, prev, value);
|
||||
// E prev = value;
|
||||
nextLoc = it.peekNextLocation(); // will return null if there is no next location
|
||||
return this;
|
||||
}
|
||||
|
||||
public int compareTo(Element other) {
|
||||
return value.compareTo(other.value);
|
||||
if ( nextLoc == null ) {
|
||||
if ( other.nextLoc != null ) return 1; // null means no more data available, so its after any non-null position
|
||||
return 0;
|
||||
}
|
||||
if ( other.nextLoc == null ) return -1; // we can get to this point only if this.nextLoc != null
|
||||
|
||||
return nextLoc.compareTo(other.nextLoc);
|
||||
}
|
||||
|
||||
public RODRecordList<ROD> next() {
|
||||
RODRecordList<ROD> value = it.next();
|
||||
update();
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
public Iterator<E> iterator() {
|
||||
public Iterator<RODRecordList<ROD>> iterator() {
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -40,17 +57,21 @@ public class MergingIterator<E extends Comparable<E>> implements Iterator<E>, Pe
|
|||
;
|
||||
}
|
||||
|
||||
public MergingIterator(Iterator<E> it) {
|
||||
public MergingIterator(Iterator<RODRecordList<ROD>> it) {
|
||||
add(it);
|
||||
}
|
||||
|
||||
public MergingIterator(Collection<Iterator<E>> its) {
|
||||
for ( Iterator<E> it : its ) {
|
||||
public MergingIterator(Collection<Iterator<RODRecordList<ROD>>> its) {
|
||||
for ( Iterator<RODRecordList<ROD>> it : its ) {
|
||||
add(it);
|
||||
}
|
||||
}
|
||||
|
||||
public void add(Iterator<E> it) {
|
||||
/** If the iterator is non-empty (hasNext() is true), put it into the queue. The next location the iterator
|
||||
* will be after a call to next() is peeked into and cached as queue's priority value.
|
||||
* @param it
|
||||
*/
|
||||
public void add(Iterator<RODRecordList<ROD>> it) {
|
||||
if ( it.hasNext() )
|
||||
queue.add(new Element(it));
|
||||
}
|
||||
|
|
@ -59,35 +80,39 @@ public class MergingIterator<E extends Comparable<E>> implements Iterator<E>, Pe
|
|||
return ! queue.isEmpty();
|
||||
}
|
||||
|
||||
public E next() {
|
||||
public RODRecordList<ROD> next() {
|
||||
Element e = queue.poll();
|
||||
E value = e.value;
|
||||
RODRecordList<ROD> value = e.next(); // next() will also update next location cached by the Element
|
||||
|
||||
if ( e.it != null && e.it.hasNext() )
|
||||
queue.add(new Element(e.it));
|
||||
if ( e.nextLoc != null ) // we have more data in the track
|
||||
queue.add(e); // add the element back to queue (note: its next location, on which priority is based, was updated
|
||||
|
||||
//System.out.printf("Element is %s%n", e.value);
|
||||
return value;
|
||||
}
|
||||
|
||||
public E peek() {
|
||||
return queue.peek().value;
|
||||
/** Peeks into the genomic location of the record this iterator will return next.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public GenomeLoc peekLocation() {
|
||||
return queue.peek().nextLoc;
|
||||
}
|
||||
|
||||
public Collection<E> allElementsLTE(E elt) {
|
||||
public Collection<RODRecordList<ROD>> allElementsLTE(RODRecordList<ROD> elt) {
|
||||
return allElementsLTE(elt, true);
|
||||
}
|
||||
|
||||
public Collection<E> allElementsLTE(E elt, boolean includeElt) {
|
||||
LinkedList<E> all = new LinkedList<E>();
|
||||
public Collection<RODRecordList<ROD>> allElementsLTE(RODRecordList<ROD> elt, boolean includeElt) {
|
||||
LinkedList<RODRecordList<ROD>> all = new LinkedList<RODRecordList<ROD>>();
|
||||
|
||||
if ( includeElt ) all.add(elt);
|
||||
|
||||
while ( hasNext() ) {
|
||||
E x = peek();
|
||||
Element x = queue.peek();
|
||||
//System.out.printf("elt.compareTo(x) == %d%n", elt.compareTo(x));
|
||||
//System.out.printf("In allElementLTE%n");
|
||||
int cmp = elt.compareTo(x);
|
||||
int cmp = elt.getLocation().compareTo(x.nextLoc);
|
||||
//System.out.printf("x=%s%n elt=%s%n => elt.compareTo(x) == %d%n", x, elt, cmp);
|
||||
if ( cmp >= 0 ) {
|
||||
//System.out.printf(" Adding element x=%s, size = %d%n", x, all.size());
|
||||
|
|
|
|||
Loading…
Reference in New Issue