Fixes to try and speed up unmapped read traversals. Still not nearly as fast as they should be, but the next step would be to modify samtools code.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@592 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
aaron 2009-05-05 18:17:07 +00:00
parent 6550fe6f97
commit 09b0b6b57d
3 changed files with 110 additions and 74 deletions

View File

@ -1,6 +1,7 @@
package org.broadinstitute.sting.gatk.dataSources.simpleDataSources; package org.broadinstitute.sting.gatk.dataSources.simpleDataSources;
import edu.mit.broad.picard.sam.SamFileHeaderMerger; import edu.mit.broad.picard.sam.SamFileHeaderMerger;
import edu.mit.broad.picard.util.PeekableIterator;
import net.sf.samtools.SAMFileHeader; import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMFileReader; import net.sf.samtools.SAMFileReader;
import net.sf.samtools.SAMReadGroupRecord; import net.sf.samtools.SAMReadGroupRecord;
@ -42,7 +43,7 @@ public class SAMDataSource implements SimpleDataSource {
private boolean locusMode = true; private boolean locusMode = true;
// How strict should we be with SAM/BAM parsing? // How strict should we be with SAM/BAM parsing?
protected SAMFileReader.ValidationStringency strictness = SAMFileReader.ValidationStringency.SILENT; //JRM for CSH protected SAMFileReader.ValidationStringency strictness = SAMFileReader.ValidationStringency.SILENT;
// our list of readers // our list of readers
private final List<File> samFileList = new ArrayList<File>(); private final List<File> samFileList = new ArrayList<File>();
@ -60,7 +61,6 @@ public class SAMDataSource implements SimpleDataSource {
private boolean intoUnmappedReads = false; private boolean intoUnmappedReads = false;
private int readsAtLastPos = 0; private int readsAtLastPos = 0;
/** /**
* constructor, given sam files * constructor, given sam files
* *
@ -121,10 +121,7 @@ public class SAMDataSource implements SimpleDataSource {
public MergingSamRecordIterator2 seekLocus(GenomeLoc location) throws SimpleDataSourceLoadException { public MergingSamRecordIterator2 seekLocus(GenomeLoc location) throws SimpleDataSourceLoadException {
// right now this is pretty damn heavy, it copies the file list into a reader list every time // right now this is pretty damn heavy, it copies the file list into a reader list every time
List<SAMFileReader> lst = GetReaderList(); SamFileHeaderMerger headerMerger = CreateHeader();
// now merge the headers
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(lst, SORT_ORDER);
// make a merging iterator for this record // make a merging iterator for this record
MergingSamRecordIterator2 iter = new MergingSamRecordIterator2(headerMerger); MergingSamRecordIterator2 iter = new MergingSamRecordIterator2(headerMerger);
@ -151,12 +148,10 @@ public class SAMDataSource implements SimpleDataSource {
*/ */
public CloseableIterator<SAMRecord> seek(Shard shard) throws SimpleDataSourceLoadException { public CloseableIterator<SAMRecord> seek(Shard shard) throws SimpleDataSourceLoadException {
if (shard.getShardType() == Shard.ShardType.READ) { if (shard.getShardType() == Shard.ShardType.READ) {
return seekRead((ReadShard)shard); return seekRead((ReadShard) shard);
} } else if (shard.getShardType() == Shard.ShardType.LOCUS) {
else if (shard.getShardType() == Shard.ShardType.LOCUS) {
return seekLocus(shard.getGenomeLoc()); return seekLocus(shard.getGenomeLoc());
} } else {
else {
throw new StingException("seek: Unknown shard type"); throw new StingException("seek: Unknown shard type");
} }
} }
@ -174,7 +169,7 @@ public class SAMDataSource implements SimpleDataSource {
includeUnmappedReads = seeUnMappedReads; includeUnmappedReads = seeUnMappedReads;
} }
/** /**
* <p> * <p>
* seek * seek
@ -184,28 +179,49 @@ public class SAMDataSource implements SimpleDataSource {
* @return an iterator for that region * @return an iterator for that region
*/ */
private BoundedReadIterator seekRead(ReadShard shard) throws SimpleDataSourceLoadException { private BoundedReadIterator seekRead(ReadShard shard) throws SimpleDataSourceLoadException {
// TODO: make extremely less horrible
List<SAMFileReader> lst = GetReaderList();
BoundedReadIterator bound = null; BoundedReadIterator bound = null;
SamFileHeaderMerger headerMerger = CreateHeader();
MergingSamRecordIterator2 iter = null;
// now merge the headers /*if (false) { // !includeUnmappedReads) {
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(lst, SORT_ORDER); // make a merging iterator for this record
iter = new MergingSamRecordIterator2(headerMerger);
// make a merging iterator for this record
MergingSamRecordIterator2 iter = new MergingSamRecordIterator2(headerMerger);
if (!includeUnmappedReads) {
bound = fastMappedReadSeek(shard.getSize(), iter); bound = fastMappedReadSeek(shard.getSize(), iter);
} else { } else {*/
bound = unmappedReadSeek(shard.getSize(), iter); if (!intoUnmappedReads) {
// make a merging iterator for this record
iter = new MergingSamRecordIterator2(headerMerger);
//bound = unmappedReadSeek(shard.getSize(), iter);
bound = fastMappedReadSeek(shard.getSize(), iter);
} }
if (bound == null || intoUnmappedReads) {
if (iter != null) {
iter.close();
}
iter = new MergingSamRecordIterator2(CreateHeader());
bound = toUnmappedReads(shard.getSize(), iter);
}
//}
if (bound == null) { if (bound == null) {
shard.signalDone(); shard.signalDone();
bound = new BoundedReadIterator(iter, 0);
} }
return bound; return bound;
} }
private SamFileHeaderMerger CreateHeader() {
// TODO: make extremely less horrible
List<SAMFileReader> lst = GetReaderList();
// now merge the headers
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(lst, SORT_ORDER);
return headerMerger;
}
/** /**
* Seek, if we want unmapped reads. This method will be faster then the unmapped read method, but you cannot extract the * Seek, if we want unmapped reads. This method will be faster then the unmapped read method, but you cannot extract the
* unmapped reads. * unmapped reads.
@ -215,30 +231,45 @@ public class SAMDataSource implements SimpleDataSource {
* @return the bounded iterator that you can use to get the intervaled reads from * @return the bounded iterator that you can use to get the intervaled reads from
* @throws SimpleDataSourceLoadException * @throws SimpleDataSourceLoadException
*/ */
private BoundedReadIterator unmappedReadSeek(long readCount, MergingSamRecordIterator2 iter) throws SimpleDataSourceLoadException { private BoundedReadIterator toUnmappedReads(long readCount, MergingSamRecordIterator2 iter) throws SimpleDataSourceLoadException {
BoundedReadIterator bound;// is this the first time we're doing this? BoundedReadIterator bound;// is this the first time we're doing this?
int count = 0; int count = 0;
PeekableIterator<SAMRecord> peek = new PeekableIterator(iter);
// throw away as many reads as it takes // throw away as many reads as it takes
while (count < this.readsTaken && iter.hasNext()) { SAMRecord d = null;
iter.next(); while (peek.hasNext()) {
++count; d = peek.peek();
int x = d.getReferenceIndex();
if (x < 0 || x >= d.getHeader().getSequenceDictionary().getSequences().size()) {
// we have the magic read that starts the unmapped read segment!
break;
}
peek.next();
} }
// check to see what happened, did we run out of reads? // check to see what happened, did we run out of reads?
if (count != this.readsTaken) { if (!peek.hasNext()) {
return null;
}
// now walk until we've taken the unmapped read count
while (peek.hasNext() && count < this.readsTaken) {
peek.next();
}
// check to see what happened, did we run out of reads?
if (!peek.hasNext()) {
return null; return null;
} }
// we're good, increment our read cout // we're good, increment our read cout
this.readsTaken += readCount; this.readsTaken += readCount;
return new BoundedReadIterator(iter,readCount); return new BoundedReadIterator(peek, readCount);
} }
/** /**
* Seek, if we want only mapped reads. This method will be faster then the unmapped read method, but you cannot extract the
* unmapped reads. * unmapped reads.
* *
* @param readCount how many reads to retrieve * @param readCount how many reads to retrieve
@ -250,10 +281,10 @@ public class SAMDataSource implements SimpleDataSource {
BoundedReadIterator bound;// is this the first time we're doing this? BoundedReadIterator bound;// is this the first time we're doing this?
if (lastReadPos == null) { if (lastReadPos == null) {
lastReadPos = new GenomeLoc(iter.getMergedHeader().getSequenceDictionary().getSequence(0).getSequenceIndex(), 0, 0); lastReadPos = new GenomeLoc(iter.getMergedHeader().getSequenceDictionary().getSequence(0).getSequenceIndex(), 0, 0);
iter.queryContained(lastReadPos.getContig(), 1, -1);
bound = new BoundedReadIterator(iter, readCount); bound = new BoundedReadIterator(iter, readCount);
this.readsTaken = readCount; this.readsTaken = readCount;
} }
// we're not at the beginning, not at the end, so we move forward with our ghastly plan... // we're not at the beginning, not at the end, so we move forward with our ghastly plan...
else { else {
@ -276,7 +307,7 @@ public class SAMDataSource implements SimpleDataSource {
if (iter.hasNext()) { if (iter.hasNext()) {
rec = iter.next(); rec = iter.next();
if (lastPos == rec.getAlignmentStart()) { if (lastPos == rec.getAlignmentStart()) {
this.readsAtLastPos++; ++this.readsAtLastPos;
} else { } else {
this.readsAtLastPos = 1; this.readsAtLastPos = 1;
} }
@ -285,15 +316,20 @@ public class SAMDataSource implements SimpleDataSource {
} else { } else {
// jump contigs // jump contigs
if (lastReadPos.toNextContig() == false) { if (lastReadPos.toNextContig() == false) {
// check to see if we're using unmapped reads, if not return, we're done
readsTaken = 0;
intoUnmappedReads = true;
return null; return null;
} else {
iter.close();
// now merge the headers
// right now this is pretty damn heavy, it copies the file list into a reader list every time
SamFileHeaderMerger mg = CreateHeader();
iter = new MergingSamRecordIterator2(mg);
iter.queryContained(lastReadPos.getContig(), 1, Integer.MAX_VALUE);
return new BoundedReadIterator(iter,readCount);
} }
iter.close();
// now merge the headers
// right now this is pretty damn heavy, it copies the file list into a reader list every time
List<SAMFileReader> lst2 = GetReaderList();
SamFileHeaderMerger mg = new SamFileHeaderMerger(lst2, SORT_ORDER);
iter = new MergingSamRecordIterator2(mg);
iter.queryContained(lastReadPos.getContig(), (int) lastReadPos.getStop(), -1);
} }
} }
@ -314,7 +350,8 @@ public class SAMDataSource implements SimpleDataSource {
} }
// in case we're run out of reads, get out // in case we're run out of reads, get out
else { else {
return null; throw new StingException("Danger");
//return null;
} }
bound = new BoundedReadIterator(iter, readCount); bound = new BoundedReadIterator(iter, readCount);
} }
@ -328,7 +365,7 @@ public class SAMDataSource implements SimpleDataSource {
/** /**
* A private function that, given the internal file list, generates a SamFileReader * A private function that, given the internal file list, generates a SamFileReader
* list of validated files. * list of validated files.
* *
* @return a list of SAMFileReaders that represent the stored file names * @return a list of SAMFileReaders that represent the stored file names
* @throws SimpleDataSourceLoadException if there's a problem loading the files * @throws SimpleDataSourceLoadException if there's a problem loading the files
*/ */
@ -339,8 +376,8 @@ public class SAMDataSource implements SimpleDataSource {
SAMFileReader reader = initializeSAMFile(f); SAMFileReader reader = initializeSAMFile(f);
if (reader.getFileHeader().getReadGroups().size() < 1) { if (reader.getFileHeader().getReadGroups().size() < 1) {
logger.warn("Setting header in reader " + f.getName()); //logger.warn("Setting header in reader " + f.getName());
SAMReadGroupRecord rec = new SAMReadGroupRecord(f.getName()); SAMReadGroupRecord rec = new SAMReadGroupRecord(f.getName());
rec.setLibrary(f.getName()); rec.setLibrary(f.getName());
rec.setSample(f.getName()); rec.setSample(f.getName());

View File

@ -40,7 +40,7 @@ public class BoundedReadIterator implements CloseableIterator<SAMRecord>, Iterab
private long currentCount = 0; private long currentCount = 0;
// the iterator we want to decorate // the iterator we want to decorate
private final MergingSamRecordIterator2 iterator; private final CloseableIterator<SAMRecord> iterator;
// our unmapped read flag // our unmapped read flag
private boolean doNotUseThatUnmappedReadPile = false; private boolean doNotUseThatUnmappedReadPile = false;
@ -56,7 +56,7 @@ public class BoundedReadIterator implements CloseableIterator<SAMRecord>, Iterab
* @param iter * @param iter
* @param readCount * @param readCount
*/ */
public BoundedReadIterator(MergingSamRecordIterator2 iter, long readCount) { public BoundedReadIterator(CloseableIterator<SAMRecord> iter, long readCount) {
if (iter != null) { if (iter != null) {
isOpen = true; isOpen = true;
@ -71,7 +71,12 @@ public class BoundedReadIterator implements CloseableIterator<SAMRecord>, Iterab
public SAMFileHeader getMergedHeader() { public SAMFileHeader getMergedHeader() {
return iterator.getMergedHeader(); // todo: this is bad, we need an iterface out there for samrecords that supports getting the header,
// regardless of the merging
if (iterator instanceof MergingSamRecordIterator2)
return ((MergingSamRecordIterator2)iterator).getMergedHeader();
else
return null;
} }
/** /**

View File

@ -1,16 +1,20 @@
package org.broadinstitute.sting.gatk.dataSources.simpleDataSources; package org.broadinstitute.sting.gatk.dataSources.simpleDataSources;
import static junit.framework.Assert.fail; import static junit.framework.Assert.fail;
import net.sf.samtools.SAMRecord;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.dataSources.shards.Shard;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.dataSources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator; import org.broadinstitute.sting.gatk.iterators.BoundedReadIterator;
import org.broadinstitute.sting.utils.GenomeLoc; import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.fasta.FastaSequenceFile2; import org.broadinstitute.sting.utils.fasta.FastaSequenceFile2;
import static org.junit.Assert.assertEquals;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
/** /**
@ -54,7 +58,7 @@ public class SAMByReadsTest extends BaseTest {
fl = new ArrayList<String>(); fl = new ArrayList<String>();
// sequence // sequence
seq = new FastaSequenceFile2(new File(seqLocation + "/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta")); seq = new FastaSequenceFile2(new File(seqLocation + "/references/Homo_sapiens_assembly17/v0/Homo_sapiens_assembly17.fasta"));
GenomeLoc.setupRefContigOrdering(seq.getSequenceDictionary()); GenomeLoc.setupRefContigOrdering(seq.getSequenceDictionary());
} }
@ -63,53 +67,43 @@ public class SAMByReadsTest extends BaseTest {
@Test @Test
public void testTotalReadCount() { public void testTotalReadCount() {
logger.warn("Executing testTotalReadCount"); logger.warn("Executing testTotalReadCount");
// the sharding strat.
//ShardStrategy strat = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
// setup the test files // setup the test files
fl.add("/humgen/gsa-scr1/GATK_Data/Validation_Data/10035.5.clean.bam"); fl.add("/humgen/gsa-scr1/GATK_Data/Validation_Data/index_test.bam");
final int targetReadCount = 5000;
// make sure we don't see dupes
HashSet<String> names = new HashSet<String>(); ShardStrategy shardStrategy = ShardStrategyFactory.shatterByReadCount(seq.getSequenceDictionary(),targetReadCount);
try { try {
SAMDataSource data = new SAMDataSource(fl); SAMDataSource data = new SAMDataSource(fl);
final int targetReadCount = 500000;
// check the total read count // check the total read count
final int totalReads = 1782980; final int totalReads = 10000;
int readsSeen = 0; int readsSeen = 0;
BoundedReadIterator iter; BoundedReadIterator iter;
/*
while ((iter = data.seek(targetReadCount)) != null) {
for (Shard sd : shardStrategy) {
int readcnt = 0; int readcnt = 0;
iter = (BoundedReadIterator)data.seek(sd);
for (SAMRecord r : iter) { for (SAMRecord r : iter) {
String readName = r.getReadName();
if (names.contains(readName)) {
fail("We saw read " + readName + " twice");
}
names.add(readName);
readcnt++; readcnt++;
} }
readsSeen += readcnt; readsSeen += readcnt;
logger.warn("Seen " + readsSeen + " reads."); //logger.warn("Seen " + readsSeen + " reads.");
} }
// make sure we've seen all the reads // make sure we've seen all the reads
assertEquals(totalReads,readsSeen); assertEquals(totalReads,readsSeen);
*/ logger.warn("Success " + readsSeen + " equals target count of " + totalReads);
} }
catch (SimpleDataSourceLoadException e) { catch (SimpleDataSourceLoadException e) {