Reenabled downsampling by sample after basic sanity testing and fixes of the

new implementation.  Hard testing and performance enhancements are still
pending.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3566 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-06-16 17:23:27 +00:00
parent c44fd05aa1
commit e77f76f8e1
1 changed files with 167 additions and 46 deletions

View File

@ -408,6 +408,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
if ( state.getRead().getMappingQuality() == 0 ) { if ( state.getRead().getMappingQuality() == 0 ) {
nMQ0Reads++; nMQ0Reads++;
} }
// TODO: sample split!
if( indelPile.size() != 0 ) fullExtendedEventPileup.put(sampleName,new UnifiedReadBackedExtendedEventPileup(loc,indelPile,size,maxDeletionLength,nDeletions,nInsertions,nMQ0Reads)); if( indelPile.size() != 0 ) fullExtendedEventPileup.put(sampleName,new UnifiedReadBackedExtendedEventPileup(loc,indelPile,size,maxDeletionLength,nDeletions,nInsertions,nMQ0Reads));
} }
} }
@ -538,15 +539,15 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
private final PeekableIterator<SAMRecord> iterator; private final PeekableIterator<SAMRecord> iterator;
private final DownsamplingMethod downsamplingMethod; private final DownsamplingMethod downsamplingMethod;
private final ReadSelector firstReadSelector; private final ReadSelector chainedReadSelector;
private final SamplePartitioner samplePartitioner; private final SamplePartitioner samplePartitioner;
private final Map<String,Deque<SAMRecordState>> readStatesBySample = new HashMap<String,Deque<SAMRecordState>>(); private final Map<String,PerSampleReadStateManager> readStatesBySample = new HashMap<String,PerSampleReadStateManager>();
private final int targetCoverage; private final int targetCoverage;
private final int maxReadsAtLocus; private final int maxReadsAtLocus;
private int totalReadStatesInHanger = 0; private int totalReadStates = 0;
/** /**
* Store a random number generator with a consistent seed for consistent downsampling from run to run. * Store a random number generator with a consistent seed for consistent downsampling from run to run.
@ -574,7 +575,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
samplePartitioner = new SamplePartitioner(sampleNames); samplePartitioner = new SamplePartitioner(sampleNames);
for(String sampleName: sampleNames) for(String sampleName: sampleNames)
readStatesBySample.put(sampleName,new LinkedList<SAMRecordState>()); readStatesBySample.put(sampleName,new PerSampleReadStateManager());
ReadSelector primaryReadSelector; ReadSelector primaryReadSelector;
if(downsamplingMethod.type == DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) { if(downsamplingMethod.type == DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) {
@ -583,7 +584,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
else else
primaryReadSelector = samplePartitioner; primaryReadSelector = samplePartitioner;
firstReadSelector = maxReadsAtLocus!=Integer.MAX_VALUE ? new FirstNReadSelector(primaryReadSelector,maxReadsAtLocus) : primaryReadSelector; chainedReadSelector = maxReadsAtLocus!=Integer.MAX_VALUE ? new FirstNReadSelector(primaryReadSelector,maxReadsAtLocus) : primaryReadSelector;
} }
public Iterator<SAMRecordState> iteratorForSample(final String sampleName) { public Iterator<SAMRecordState> iteratorForSample(final String sampleName) {
@ -600,26 +601,22 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
public void remove() { public void remove() {
wrappedIterator.remove(); wrappedIterator.remove();
totalReadStatesInHanger--; totalReadStates--;
} }
}; };
} }
public boolean isEmpty() { public boolean isEmpty() {
return totalReadStatesInHanger == 0; return totalReadStates == 0;
} }
public int size() { public int size() {
int size = 0; return totalReadStates;
for(Deque<SAMRecordState> readStates: readStatesBySample.values()) {
size += readStates.size();
}
return size;
} }
public SAMRecordState getFirst() { public SAMRecordState getFirst() {
for(String sampleName: sampleNames) { for(String sampleName: sampleNames) {
Deque<SAMRecordState> reads = readStatesBySample.get(sampleName); PerSampleReadStateManager reads = readStatesBySample.get(sampleName);
if(!reads.isEmpty()) if(!reads.isEmpty())
return reads.peek(); return reads.peek();
} }
@ -627,7 +624,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
} }
public boolean hasNext() { public boolean hasNext() {
return totalReadStatesInHanger > 0 || iterator.hasNext(); return totalReadStates > 0 || iterator.hasNext();
} }
public void collectPendingReads() { public void collectPendingReads() {
@ -638,7 +635,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
int firstContigIndex = iterator.peek().getReferenceIndex(); int firstContigIndex = iterator.peek().getReferenceIndex();
int firstAlignmentStart = iterator.peek().getAlignmentStart(); int firstAlignmentStart = iterator.peek().getAlignmentStart();
while(iterator.hasNext() && iterator.peek().getReferenceIndex() == firstContigIndex && iterator.peek().getAlignmentStart() == firstAlignmentStart) { while(iterator.hasNext() && iterator.peek().getReferenceIndex() == firstContigIndex && iterator.peek().getAlignmentStart() == firstAlignmentStart) {
firstReadSelector.submitRead(iterator.next()); chainedReadSelector.submitRead(iterator.next());
} }
} }
else { else {
@ -647,59 +644,67 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
return; return;
while (iterator.hasNext() && !readIsPastCurrentPosition(iterator.peek())) { while (iterator.hasNext() && !readIsPastCurrentPosition(iterator.peek())) {
firstReadSelector.submitRead(iterator.next()); chainedReadSelector.submitRead(iterator.next());
} }
} }
firstReadSelector.complete(); chainedReadSelector.complete();
int readStatesInHangerEntry = 0;
for(String sampleName: sampleNames) { for(String sampleName: sampleNames) {
ReadSelector aggregator = samplePartitioner.getSelectedReads(sampleName); ReadSelector aggregator = samplePartitioner.getSelectedReads(sampleName);
Collection<SAMRecord> newReads = new ArrayList<SAMRecord>(aggregator.getSelectedReads()); Collection<SAMRecord> newReads = new ArrayList<SAMRecord>(aggregator.getSelectedReads());
Deque<SAMRecordState> hanger = readStatesBySample.get(sampleName); PerSampleReadStateManager statesBySample = readStatesBySample.get(sampleName);
int readsInHanger = hanger.size(); int numReads = statesBySample.size();
if(readsInHanger+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.NONE || downsamplingMethod.type==DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) { if(numReads+newReads.size()<=targetCoverage || downsamplingMethod.type==DownsampleType.NONE || downsamplingMethod.type==DownsampleType.EXPERIMENTAL_NAIVE_DUPLICATE_ELIMINATOR) {
int readLimit = newReads.size(); int readLimit = newReads.size();
boolean mrlViolation = false; boolean mrlViolation = false;
if(readLimit > maxReadsAtLocus-totalReadStatesInHanger) { if(readLimit > maxReadsAtLocus-totalReadStates) {
readLimit = maxReadsAtLocus-totalReadStatesInHanger; readLimit = maxReadsAtLocus-totalReadStates;
mrlViolation = true; mrlViolation = true;
} }
readStatesInHangerEntry += addReadsToSample(hanger,newReads,readLimit,mrlViolation); totalReadStates += addReadsToSample(statesBySample,newReads,readLimit,mrlViolation);
} }
else { else {
// TODO: implement downsampling mechanism int[] counts = statesBySample.getCountsPerAlignmentStart();
/* int[] updatedCounts = new int[counts.length];
Iterator<List<SAMRecordState>> backIterator = hanger.descendingIterator(); System.arraycopy(counts,0,updatedCounts,0,counts.length);
boolean readPruned = true; boolean readPruned = true;
while(readsInHanger+newReads.size()>targetCoverage && readPruned) { while(numReads+newReads.size()>targetCoverage && readPruned) {
readPruned = false; readPruned = false;
while(readsInHanger+newReads.size()>targetCoverage && backIterator.hasNext()) { for(int alignmentStart=updatedCounts.length-1;numReads+newReads.size()>targetCoverage&&alignmentStart>=0;alignmentStart--) {
List<SAMRecordState> readsAtLocus = backIterator.next(); if(updatedCounts[alignmentStart] > 1) {
if(readsAtLocus.size() > 1) { updatedCounts[alignmentStart]--;
readsAtLocus.remove(downsampleRandomizer.nextInt(readsAtLocus.size())); numReads--;
readPruned = true; readPruned = true;
readsInHanger--;
} }
} }
} }
if(readsInHanger == targetCoverage) { if(numReads == targetCoverage) {
List<SAMRecordState> readsInFirstHanger = hanger.remove(); updatedCounts[0]--;
readsInHanger -= readsInFirstHanger.size(); numReads--;
} }
readStatesInHangerEntry += addReadsToSample(hanger,newReads,targetCoverage-readsInHanger,false); BitSet toPurge = new BitSet(readStates.size());
*/ int readOffset = 0;
}
totalReadStatesInHanger += readStatesInHangerEntry; for(int i = 0; i < updatedCounts.length; i++) {
int n = counts[i];
int k = updatedCounts[i];
for(Integer purgedElement: MathUtils.sampleIndicesWithoutReplacement(n,n-k))
toPurge.set(readOffset+purgedElement);
readOffset += counts[i];
}
statesBySample.purge(toPurge);
totalReadStates += addReadsToSample(statesBySample,newReads,targetCoverage-numReads,false);
}
} }
firstReadSelector.reset(); chainedReadSelector.reset();
} }
/** /**
@ -709,7 +714,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
* @param maxReads Maximum number of reads to add. * @param maxReads Maximum number of reads to add.
* @return Total number of reads added. * @return Total number of reads added.
*/ */
private int addReadsToSample(final Deque<SAMRecordState> readStates, final Collection<SAMRecord> reads, final int maxReads, boolean atMaxReadsAtLocusLimit) { private int addReadsToSample(final PerSampleReadStateManager readStates, final Collection<SAMRecord> reads, final int maxReads, boolean atMaxReadsAtLocusLimit) {
if(reads.isEmpty()) if(reads.isEmpty())
return 0; return 0;
@ -718,12 +723,13 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
// the farthest right a read extends // the farthest right a read extends
Integer rightMostEnd = -1; Integer rightMostEnd = -1;
Collection<SAMRecordState> newReadStates = new LinkedList<SAMRecordState>();
int readCount = 0; int readCount = 0;
for(SAMRecord read: reads) { for(SAMRecord read: reads) {
if(readCount <= maxReads) { if(readCount < maxReads) {
SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents()); SAMRecordState state = new SAMRecordState(read, readInfo.generateExtendedEvents());
state.stepForwardOnGenome(); state.stepForwardOnGenome();
readStates.add(state); newReadStates.add(state);
// TODO: What if we downsample the extended events away? // TODO: What if we downsample the extended events away?
if (state.hadIndel()) hasExtendedEvents = true; if (state.hadIndel()) hasExtendedEvents = true;
readCount++; readCount++;
@ -734,6 +740,7 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd; rightMostEnd = (read.getAlignmentEnd() > rightMostEnd) ? read.getAlignmentEnd() : rightMostEnd;
} }
} }
readStates.addStatesAtNextAlignmentStart(newReadStates);
if (location != null) if (location != null)
overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd), overflowTracker.exceeded(GenomeLocParser.createGenomeLoc(location.getContigIndex(),location.getStart(),rightMostEnd),
@ -741,11 +748,125 @@ public class DownsamplingLocusIteratorByState extends LocusIterator {
return readCount; return readCount;
} }
private class PerSampleReadStateManager implements Iterable<SAMRecordState> {
private final Queue<SAMRecordState> readStates = new LinkedList<SAMRecordState>();
private final Deque<Counter> readStateCounter = new LinkedList<Counter>();
public void addStatesAtNextAlignmentStart(Collection<SAMRecordState> states) {
readStates.addAll(states);
readStateCounter.add(new Counter(states.size()));
}
public boolean isEmpty() {
return readStates.isEmpty();
}
public SAMRecordState peek() {
return readStates.peek();
}
public int size() {
return readStates.size();
}
public int[] getCountsPerAlignmentStart() {
int[] counts = new int[readStateCounter.size()];
int index = 0;
for(Counter counter: readStateCounter)
counts[index++] = counter.getCount();
return counts;
}
public Iterator<SAMRecordState> iterator() {
return new Iterator<SAMRecordState>() {
private Iterator<SAMRecordState> wrappedIterator = readStates.iterator();
public boolean hasNext() {
return wrappedIterator.hasNext();
}
public SAMRecordState next() {
return wrappedIterator.next();
}
public void remove() {
wrappedIterator.remove();
Counter counter = readStateCounter.peek();
counter.decrement();
if(counter.getCount() == 0)
readStateCounter.remove();
}
};
}
/**
* Purge the given elements from the bitset. If an element in the bitset is true, purge
* the corresponding read state.
* @param elements bits from the set to purge.
*/
public void purge(final BitSet elements) {
if(elements.isEmpty() || readStates.isEmpty()) return;
Iterator<SAMRecordState> readStateIterator = readStates.iterator();
Iterator<Counter> counterIterator = readStateCounter.iterator();
Counter currentCounter = counterIterator.next();
int readIndex = 0;
long alignmentStartCounter = currentCounter.getCount();
int toPurge = elements.nextSetBit(0);
int removedCount = 0;
while(readStateIterator.hasNext() && toPurge >= 0) {
readStateIterator.next();
if(readIndex == toPurge) {
readStateIterator.remove();
currentCounter.decrement();
if(currentCounter.getCount() == 0)
counterIterator.remove();
removedCount++;
toPurge = elements.nextSetBit(toPurge+1);
}
readIndex++;
alignmentStartCounter--;
if(alignmentStartCounter == 0 && counterIterator.hasNext()) {
currentCounter = counterIterator.next();
alignmentStartCounter = currentCounter.getCount();
}
}
totalReadStates -= removedCount;
}
}
}
/**
* Note: assuming that, whenever we downsample, we downsample to an integer capacity.
*/
private class Counter {
private int count;
public Counter(int count) {
this.count = count;
}
public int getCount() {
return count;
}
public void decrement() {
count--;
}
} }
} }
/** /**
* Selects reads passed to it based on a criteria decided through inheritance. * Selects reads passed to it based on a criteria decided through inheritance.
* TODO: This is a temporary abstraction until we can get rid of this downsampling implementation and the mrl option. Get rid of this.
*/ */
interface ReadSelector { interface ReadSelector {
/** /**
@ -984,7 +1105,7 @@ class SamplePartitioner implements ReadSelector {
readsSeen = 0; readsSeen = 0;
} }
} }
} }