Made the MergingSamIterator2 peekable. This iterator is being a ducktaped together swiss army knife, the iterators could use a redo soon.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@593 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
aaron 2009-05-05 19:15:07 +00:00
parent 09b0b6b57d
commit daa2163ee8
2 changed files with 48 additions and 33 deletions

View File

@ -1,7 +1,6 @@
package org.broadinstitute.sting.gatk.dataSources.simpleDataSources; package org.broadinstitute.sting.gatk.dataSources.simpleDataSources;
import edu.mit.broad.picard.sam.SamFileHeaderMerger; import edu.mit.broad.picard.sam.SamFileHeaderMerger;
import edu.mit.broad.picard.util.PeekableIterator;
import net.sf.samtools.SAMFileHeader; import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMFileReader; import net.sf.samtools.SAMFileReader;
import net.sf.samtools.SAMReadGroupRecord; import net.sf.samtools.SAMReadGroupRecord;
@ -184,18 +183,11 @@ public class SAMDataSource implements SimpleDataSource {
SamFileHeaderMerger headerMerger = CreateHeader(); SamFileHeaderMerger headerMerger = CreateHeader();
MergingSamRecordIterator2 iter = null; MergingSamRecordIterator2 iter = null;
/*if (false) { // !includeUnmappedReads) {
// make a merging iterator for this record
iter = new MergingSamRecordIterator2(headerMerger);
bound = fastMappedReadSeek(shard.getSize(), iter);
} else {*/
if (!intoUnmappedReads) { if (!intoUnmappedReads) {
// make a merging iterator for this record // make a merging iterator for this record
iter = new MergingSamRecordIterator2(headerMerger); iter = new MergingSamRecordIterator2(headerMerger);
//bound = unmappedReadSeek(shard.getSize(), iter); bound = fastMappedReadSeek(shard.getSize(), iter);
bound = fastMappedReadSeek(shard.getSize(), iter);
} }
if (bound == null || intoUnmappedReads) { if (bound == null || intoUnmappedReads) {
if (iter != null) { if (iter != null) {
@ -204,7 +196,6 @@ public class SAMDataSource implements SimpleDataSource {
iter = new MergingSamRecordIterator2(CreateHeader()); iter = new MergingSamRecordIterator2(CreateHeader());
bound = toUnmappedReads(shard.getSize(), iter); bound = toUnmappedReads(shard.getSize(), iter);
} }
//}
if (bound == null) { if (bound == null) {
shard.signalDone(); shard.signalDone();
@ -234,37 +225,35 @@ public class SAMDataSource implements SimpleDataSource {
private BoundedReadIterator toUnmappedReads(long readCount, MergingSamRecordIterator2 iter) throws SimpleDataSourceLoadException { private BoundedReadIterator toUnmappedReads(long readCount, MergingSamRecordIterator2 iter) throws SimpleDataSourceLoadException {
BoundedReadIterator bound;// is this the first time we're doing this? BoundedReadIterator bound;// is this the first time we're doing this?
int count = 0; int count = 0;
PeekableIterator<SAMRecord> peek = new PeekableIterator(iter);
// throw away as many reads as it takes
SAMRecord d = null; SAMRecord d = null;
while (peek.hasNext()) { while (iter.hasNext()) {
d = peek.peek(); d = iter.peek();
int x = d.getReferenceIndex(); int x = d.getReferenceIndex();
if (x < 0 || x >= d.getHeader().getSequenceDictionary().getSequences().size()) { if (x < 0 || x >= d.getHeader().getSequenceDictionary().getSequences().size()) {
// we have the magic read that starts the unmapped read segment! // we have the magic read that starts the unmapped read segment!
break; break;
} }
peek.next(); iter.next();
} }
// check to see what happened, did we run out of reads? // check to see what happened, did we run out of reads?
if (!peek.hasNext()) { if (!iter.hasNext()) {
return null; return null;
} }
// now walk until we've taken the unmapped read count // now walk until we've taken the unmapped read count
while (peek.hasNext() && count < this.readsTaken) { while (iter.hasNext() && count < this.readsTaken) {
peek.next(); iter.next();
} }
// check to see what happened, did we run out of reads? // check to see what happened, did we run out of reads?
if (!peek.hasNext()) { if (!iter.hasNext()) {
return null; return null;
} }
// we're good, increment our read cout // we're good, increment our read cout
this.readsTaken += readCount; this.readsTaken += readCount;
return new BoundedReadIterator(peek, readCount); return new BoundedReadIterator(iter, readCount);
} }

View File

@ -34,7 +34,7 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
protected final SamFileHeaderMerger samHeaderMerger; protected final SamFileHeaderMerger samHeaderMerger;
protected final SAMFileHeader.SortOrder sortOrder; protected final SAMFileHeader.SortOrder sortOrder;
protected static Logger logger = Logger.getLogger(MergingSamRecordIterator2.class); protected static Logger logger = Logger.getLogger(MergingSamRecordIterator2.class);
private SAMRecord mNextRecord;
protected boolean initialized = false; protected boolean initialized = false;
/** /**
@ -48,14 +48,11 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
} }
/** /** this class MUST only be initialized once, since the creation of the */
* this class MUST only be initialized once, since the creation of the
*/
private void lazyInitialization() { private void lazyInitialization() {
if (initialized) { if (initialized) {
throw new UnsupportedOperationException("You cannot double initialize a MergingSamRecordIterator2"); throw new UnsupportedOperationException("You cannot double initialize a MergingSamRecordIterator2");
} }
initialized = true;
final SAMRecordComparator comparator = getComparator(); final SAMRecordComparator comparator = getComparator();
for (final SAMFileReader reader : samHeaderMerger.getReaders()) { for (final SAMFileReader reader : samHeaderMerger.getReaders()) {
if (this.sortOrder != SAMFileHeader.SortOrder.unsorted && reader.getFileHeader().getSortOrder() != this.sortOrder) { if (this.sortOrder != SAMFileHeader.SortOrder.unsorted && reader.getFileHeader().getSortOrder() != this.sortOrder) {
@ -65,9 +62,11 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(reader, comparator); final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(reader, comparator);
addIfNotEmpty(iterator); addIfNotEmpty(iterator);
} }
setInitialized();
} }
public boolean supportsSeeking() { public boolean supportsSeeking() {
return true; return true;
} }
@ -75,7 +74,6 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
if (initialized) { if (initialized) {
throw new IllegalStateException("You cannot double initialize a MergingSamRecordIterator2"); throw new IllegalStateException("You cannot double initialize a MergingSamRecordIterator2");
} }
initialized = true;
final SAMRecordComparator comparator = getComparator(); final SAMRecordComparator comparator = getComparator();
for (final SAMFileReader reader : samHeaderMerger.getReaders()) { for (final SAMFileReader reader : samHeaderMerger.getReaders()) {
@ -83,13 +81,14 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(reader, recordIter, comparator); final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(reader, recordIter, comparator);
addIfNotEmpty(iterator); addIfNotEmpty(iterator);
} }
setInitialized();
} }
public void query(final String contig, final int start, final int stop, final boolean contained) { public void query(final String contig, final int start, final int stop, final boolean contained) {
if (initialized) { if (initialized) {
throw new IllegalStateException("You cannot double initialize a MergingSamRecordIterator2"); throw new IllegalStateException("You cannot double initialize a MergingSamRecordIterator2");
} }
initialized = true;
final SAMRecordComparator comparator = getComparator(); final SAMRecordComparator comparator = getComparator();
for (final SAMFileReader reader : samHeaderMerger.getReaders()) { for (final SAMFileReader reader : samHeaderMerger.getReaders()) {
//reader.close(); //reader.close();
@ -97,31 +96,57 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(reader, recordIter, comparator); final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(reader, recordIter, comparator);
addIfNotEmpty(iterator); addIfNotEmpty(iterator);
} }
setInitialized();
} }
public void queryContained(final String contig, final int start, final int stop) { public void queryContained(final String contig, final int start, final int stop) {
if (initialized) { if (initialized) {
throw new IllegalStateException("You cannot double initialize a MergingSamRecordIterator2"); throw new IllegalStateException("You cannot double initialize a MergingSamRecordIterator2");
} }
initialized = true;
final SAMRecordComparator comparator = getComparator(); final SAMRecordComparator comparator = getComparator();
for (final SAMFileReader reader : samHeaderMerger.getReaders()) { for (final SAMFileReader reader : samHeaderMerger.getReaders()) {
Iterator<SAMRecord> recordIter = reader.queryContained(contig, start, stop); Iterator<SAMRecord> recordIter = reader.queryContained(contig, start, stop);
final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(reader, recordIter, comparator); final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(reader, recordIter, comparator);
addIfNotEmpty(iterator); addIfNotEmpty(iterator);
} }
setInitialized();
} }
private void setInitialized() {
initialized = true;
mNextRecord = nextRecord();
}
/** Returns true if any of the underlying iterators has more records, otherwise false. */ /** Returns true if any of the underlying iterators has more records, otherwise false. */
public boolean hasNext() { public boolean hasNext() {
if (!initialized) { if (!initialized) {
lazyInitialization(); lazyInitialization();
} }
return !this.pq.isEmpty(); if (this.pq.isEmpty() && mNextRecord == null) {
return false;
}
return true;
}
public SAMRecord next() {
SAMRecord r = mNextRecord;
if (!this.pq.isEmpty()) {
mNextRecord = nextRecord();
} else {
mNextRecord = null;
}
return r;
}
public SAMRecord peek() {
return mNextRecord;
} }
/** Returns the next record from the top most iterator during merging. */ /** Returns the next record from the top most iterator during merging. */
public SAMRecord next() { public SAMRecord nextRecord() {
if (!initialized) { if (!initialized) {
lazyInitialization(); lazyInitialization();
} }
@ -147,7 +172,7 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
record.setAttribute(SAMTag.RG.toString(), readGroups.get(0).getReadGroupId()); record.setAttribute(SAMTag.RG.toString(), readGroups.get(0).getReadGroupId());
record.setAttribute(SAMTag.SM.toString(), readGroups.get(0).getReadGroupId()); record.setAttribute(SAMTag.SM.toString(), readGroups.get(0).getReadGroupId());
} else { } else {
logger.warn("Unable to set read group of ungrouped read: unable to pick default group, there are " + readGroups.size() + " possible."); logger.warn("Unable to set read group of ungrouped read: unable to pick default group, there are " + readGroups.size() + " possible.");
} }
} }
@ -222,7 +247,7 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
/** /**
* closes all the file handles for the readers....DO THIS or you will run out of handles * closes all the file handles for the readers....DO THIS or you will run out of handles
* with sharding. * with sharding.
*/ */
public void close() { public void close() {
for (SAMFileReader reader : samHeaderMerger.getReaders()) { for (SAMFileReader reader : samHeaderMerger.getReaders()) {
@ -233,6 +258,7 @@ public class MergingSamRecordIterator2 implements CloseableIterator<SAMRecord>,
/** /**
* allows us to be used in the new style for loops * allows us to be used in the new style for loops
*
* @return * @return
*/ */
public Iterator<SAMRecord> iterator() { public Iterator<SAMRecord> iterator() {