Feature parity between LocusIteratorByState, DownsamplingLocusIteratorByState, including pushing mrl /

the LocusOverflowTracker into LocusIteratorByState.  Note that the 'Matt Hanna exception', is still enabled
because I haven't yet validated the performance of the DownsamplingLocusIteratorByState when running
without downsampling.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3496 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-06-07 22:58:21 +00:00
parent 5c4d070566
commit 52ab9f2417
1 changed files with 68 additions and 29 deletions

View File

@ -53,6 +53,12 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
//public static final EnumSet<Discard> NO_DISCARDS = EnumSet.noneOf(Discard.class); //public static final EnumSet<Discard> NO_DISCARDS = EnumSet.noneOf(Discard.class);
public static final List<LocusIteratorFilter> NO_FILTERS = Arrays.asList(); public static final List<LocusIteratorFilter> NO_FILTERS = Arrays.asList();
/**
* the overflow tracker, which makes sure we get a limited number of warnings for locus pile-ups that
* exceed the max depth
*/
private LocusOverflowTracker overflowTracker;
/** our log, which we want to capture anything from this class */ /** our log, which we want to capture anything from this class */
private static Logger logger = Logger.getLogger(DownsamplingLocusIteratorByState.class); private static Logger logger = Logger.getLogger(DownsamplingLocusIteratorByState.class);
@ -275,9 +281,10 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
// TODO: Push in header via constructor // TODO: Push in header via constructor
if(GenomeAnalysisEngine.instance.getDataSource() != null) if(GenomeAnalysisEngine.instance.getDataSource() != null)
sampleNames.addAll(SampleUtils.getSAMFileSamples(GenomeAnalysisEngine.instance.getSAMFileHeader())); sampleNames.addAll(SampleUtils.getSAMFileSamples(GenomeAnalysisEngine.instance.getSAMFileHeader()));
readStates = new ReadStateManager(samIterator,readInformation.getDownsamplingMethod(),sampleNames); readStates = new ReadStateManager(samIterator,readInformation.getDownsamplingMethod(),readInformation.getMaxReadsAtLocus(),sampleNames);
this.readInfo = readInformation; this.readInfo = readInformation;
this.filters = filters; this.filters = filters;
overflowTracker = new LocusOverflowTracker(readInformation.getMaxReadsAtLocus());
} }
public Iterator<AlignmentContext> iterator() { public Iterator<AlignmentContext> iterator() {
@ -292,6 +299,11 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
lazyLoadNextAlignmentContext(); lazyLoadNextAlignmentContext();
boolean r = (nextAlignmentContext != null); boolean r = (nextAlignmentContext != null);
//if ( DEBUG ) System.out.printf("hasNext() = %b%n", r); //if ( DEBUG ) System.out.printf("hasNext() = %b%n", r);
// if we don't have a next record, make sure we clean the warning queue
// TODO: Note that this implementation requires that hasNext() always be called before next().
if (!r) overflowTracker.cleanWarningQueue();
return r; return r;
} }
@ -489,7 +501,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
* @param tracker * @param tracker
*/ */
protected void setLocusOverflowTracker(LocusOverflowTracker tracker) { protected void setLocusOverflowTracker(LocusOverflowTracker tracker) {
// TODO: implement this.overflowTracker = tracker;
} }
/** /**
@ -497,16 +509,17 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
* @return the overflow tracker, null if none exists * @return the overflow tracker, null if none exists
*/ */
public LocusOverflowTracker getLocusOverflowTracker() { public LocusOverflowTracker getLocusOverflowTracker() {
// TODO: implement return this.overflowTracker;
return null;
} }
private class ReadStateManager implements Iterable<SAMRecordState> { private class ReadStateManager implements Iterable<SAMRecordState> {
private final PeekableIterator<SAMRecord> iterator; private final PeekableIterator<SAMRecord> iterator;
private final DownsamplingMethod downsamplingMethod; private final DownsamplingMethod downsamplingMethod;
private final Map<String,ReservoirDownsampler<SAMRecord>> downsamplersBySampleName = new HashMap<String,ReservoirDownsampler<SAMRecord>>(); private final Map<String,Collection<SAMRecord>> aggregatorsBySampleName = new HashMap<String,Collection<SAMRecord>>();
private final int targetCoverage; private final int targetCoverage;
private final int maxReadsAtLocus;
private final Deque<Map<String,List<SAMRecordState>>> readStatesByAlignmentStart; private final Deque<Map<String,List<SAMRecordState>>> readStatesByAlignmentStart;
private int totalReadStatesInHanger = 0; private int totalReadStatesInHanger = 0;
@ -518,15 +531,18 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
*/ */
private Random downsampleRandomizer = new Random(38148309L); private Random downsampleRandomizer = new Random(38148309L);
public ReadStateManager(Iterator<SAMRecord> source, DownsamplingMethod downsamplingMethod, Collection<String> sampleNames) { public ReadStateManager(Iterator<SAMRecord> source, DownsamplingMethod downsamplingMethod, int maxReadsAtLocus, Collection<String> sampleNames) {
this.iterator = new PeekableIterator<SAMRecord>(source); this.iterator = new PeekableIterator<SAMRecord>(source);
this.downsamplingMethod = downsamplingMethod; this.downsamplingMethod = downsamplingMethod;
this.targetCoverage = downsamplingMethod.toCoverage != null ? downsamplingMethod.toCoverage : 1; this.targetCoverage = downsamplingMethod.toCoverage != null ? downsamplingMethod.toCoverage : 1;
this.maxReadsAtLocus = maxReadsAtLocus;
if(downsamplingMethod.type == DownsampleType.NONE)
aggregatorsBySampleName.put(null,new ArrayList<SAMRecord>());
if(downsamplingMethod.type == DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) if(downsamplingMethod.type == DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR)
downsamplersBySampleName.put(null,new ReservoirDownsampler<SAMRecord>(targetCoverage)); aggregatorsBySampleName.put(null,new ReservoirDownsampler<SAMRecord>(targetCoverage));
else { else {
for(String sampleName: sampleNames) for(String sampleName: sampleNames)
downsamplersBySampleName.put(sampleName,new ReservoirDownsampler<SAMRecord>(targetCoverage)); aggregatorsBySampleName.put(sampleName,new ReservoirDownsampler<SAMRecord>(targetCoverage));
} }
this.readStatesByAlignmentStart = new LinkedList<Map<String,List<SAMRecordState>>>(); this.readStatesByAlignmentStart = new LinkedList<Map<String,List<SAMRecordState>>>();
} }
@ -624,7 +640,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
int firstAlignmentStart = iterator.peek().getAlignmentStart(); int firstAlignmentStart = iterator.peek().getAlignmentStart();
while(iterator.hasNext() && iterator.peek().getReferenceIndex() == firstContigIndex && iterator.peek().getAlignmentStart() == firstAlignmentStart) { while(iterator.hasNext() && iterator.peek().getReferenceIndex() == firstContigIndex && iterator.peek().getAlignmentStart() == firstAlignmentStart) {
SAMRecord read = iterator.next(); SAMRecord read = iterator.next();
getDownsampler(read.getReadGroup().getSample()).add(read); getAggregator(read.getReadGroup().getSample()).add(read);
} }
} }
else { else {
@ -634,23 +650,30 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
while (iterator.hasNext() && !readIsPastCurrentPosition(iterator.peek())) { while (iterator.hasNext() && !readIsPastCurrentPosition(iterator.peek())) {
SAMRecord read = iterator.next(); SAMRecord read = iterator.next();
getDownsampler(read.getReadGroup().getSample()).add(read); getAggregator(read.getReadGroup().getSample()).add(read);
} }
} }
Map<String,List<SAMRecordState>> culledReadStatesBySample = new HashMap<String,List<SAMRecordState>>(); Map<String,List<SAMRecordState>> culledReadStatesBySample = new HashMap<String,List<SAMRecordState>>();
int readStatesInHangerEntry = 0; int readStatesInHangerEntry = 0;
for(Map.Entry<String,ReservoirDownsampler<SAMRecord>> entry: downsamplersBySampleName.entrySet()) { for(Map.Entry<String,Collection<SAMRecord>> entry: aggregatorsBySampleName.entrySet()) {
String sampleName = entry.getKey(); String sampleName = entry.getKey();
ReservoirDownsampler<SAMRecord> downsampler = entry.getValue(); Collection<SAMRecord> aggregator = entry.getValue();
Collection<SAMRecord> newReads = downsampler.getDownsampledContents(); Collection<SAMRecord> newReads = new ArrayList<SAMRecord>(aggregator);
downsampler.clear(); aggregator.clear();
int readsInHanger = countReadsInHanger(sampleName); int readsInHanger = countReadsInHanger(sampleName);
if(readsInHanger+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) if(readsInHanger+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.NONE || downsamplingMethod.type==DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) {
readStatesInHangerEntry += addReadsToHanger(culledReadStatesBySample,sampleName,newReads,newReads.size()); int readLimit = newReads.size();
boolean mrlViolation = false;
if(readLimit > maxReadsAtLocus-totalReadStatesInHanger) {
readLimit = maxReadsAtLocus-totalReadStatesInHanger;
mrlViolation = true;
}
readStatesInHangerEntry += addReadsToHanger(culledReadStatesBySample,sampleName,newReads,readLimit,mrlViolation);
}
else { else {
Iterator<Map<String,List<SAMRecordState>>> backIterator = readStatesByAlignmentStart.descendingIterator(); Iterator<Map<String,List<SAMRecordState>>> backIterator = readStatesByAlignmentStart.descendingIterator();
boolean readPruned = true; boolean readPruned = true;
@ -672,7 +695,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
firstHangerForSample.clear(); firstHangerForSample.clear();
} }
readStatesInHangerEntry += addReadsToHanger(culledReadStatesBySample,sampleName,newReads,targetCoverage-readsInHanger); readStatesInHangerEntry += addReadsToHanger(culledReadStatesBySample,sampleName,newReads,targetCoverage-readsInHanger,false);
} }
} }
if(!culledReadStatesBySample.isEmpty()) { if(!culledReadStatesBySample.isEmpty()) {
@ -681,11 +704,11 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
} }
} }
private ReservoirDownsampler<SAMRecord> getDownsampler(String sampleName) { private Collection<SAMRecord> getAggregator(String sampleName) {
if(downsamplingMethod.type == DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) if(downsamplingMethod.type == DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR)
return downsamplersBySampleName.get(null); return aggregatorsBySampleName.get(null);
else else
return downsamplersBySampleName.get(sampleName); return aggregatorsBySampleName.get(sampleName);
} }
private int countReadsInHanger(final String sampleName) { private int countReadsInHanger(final String sampleName) {
@ -705,22 +728,38 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
* @param maxReads Maximum number of reads to add. * @param maxReads Maximum number of reads to add.
* @return Total number of reads added. * @return Total number of reads added.
*/ */
private int addReadsToHanger(final Map<String,List<SAMRecordState>> newHangerEntry, final String sampleName, final Collection<SAMRecord> reads, final int maxReads) { private int addReadsToHanger(final Map<String,List<SAMRecordState>> newHangerEntry, final String sampleName, final Collection<SAMRecord> reads, final int maxReads, boolean atMaxReadsAtLocusLimit) {
if(reads.isEmpty()) if(reads.isEmpty())
return 0; return 0;
GenomeLoc location = null;
// the farthest right a read extends
Integer rightMostEnd = -1;
List<SAMRecordState> readStatesBySample = new LinkedList<SAMRecordState>(); List<SAMRecordState> readStatesBySample = new LinkedList<SAMRecordState>();
int readCount = 0; int readCount = 0;
for(SAMRecord read: reads) { for(SAMRecord read: reads) {
if(readCount >= maxReads) if(readCount < maxReads) {
break; SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents());
SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents()); state.stepForwardOnGenome();
state.stepForwardOnGenome(); readStatesBySample.add(state);
readStatesBySample.add(state); // TODO: What if we downsample the extended events away?
// TODO: What if we downsample the extended events away? if (state.hadIndel()) hasExtendedEvents = true;
if (state.hadIndel()) hasExtendedEvents = true; readCount++;
readCount++; }
else if(atMaxReadsAtLocusLimit) {
if (location == null)
location = GenomeLocParser.createGenomeLoc(read);
rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd;
}
} }
newHangerEntry.put(sampleName,readStatesBySample); newHangerEntry.put(sampleName,readStatesBySample);
if (location != null)
overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd),
readCount);
return readCount; return readCount;
} }
} }