Merged bug fix from Stable into Unstable
This commit is contained in:
commit
d0cd29cb36
|
|
@ -104,6 +104,56 @@ public class PerSampleDownsamplingReadsIterator implements StingSAMIterator {
|
|||
readComparator.compare(orderedDownsampledReadsCache.peek(), earliestPendingRead) <= 0;
|
||||
}
|
||||
|
||||
private boolean fillDownsampledReadsCache() {
|
||||
SAMRecord prevRead = null;
|
||||
int numPositionalChanges = 0;
|
||||
|
||||
// Continue submitting reads to the per-sample downsamplers until the read at the top of the priority queue
|
||||
// can be released without violating global sort order
|
||||
while ( nestedSAMIterator.hasNext() && ! readyToReleaseReads() ) {
|
||||
SAMRecord read = nestedSAMIterator.next();
|
||||
String sampleName = read.getReadGroup() != null ? read.getReadGroup().getSample() : null;
|
||||
|
||||
ReadsDownsampler<SAMRecord> thisSampleDownsampler = perSampleDownsamplers.get(sampleName);
|
||||
if ( thisSampleDownsampler == null ) {
|
||||
thisSampleDownsampler = downsamplerFactory.newInstance();
|
||||
perSampleDownsamplers.put(sampleName, thisSampleDownsampler);
|
||||
}
|
||||
|
||||
thisSampleDownsampler.submit(read);
|
||||
processFinalizedAndPendingItems(thisSampleDownsampler);
|
||||
|
||||
if ( prevRead != null && prevRead.getAlignmentStart() != read.getAlignmentStart() ) {
|
||||
numPositionalChanges++;
|
||||
}
|
||||
|
||||
// Periodically inform all downsamplers of the current position in the read stream. This is
|
||||
// to prevent downsamplers for samples with sparser reads than others from getting stuck too
|
||||
// long in a pending state.
|
||||
if ( numPositionalChanges > 0 && numPositionalChanges % DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL == 0 ) {
|
||||
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
|
||||
perSampleDownsampler.signalNoMoreReadsBefore(read);
|
||||
processFinalizedAndPendingItems(perSampleDownsampler);
|
||||
}
|
||||
}
|
||||
|
||||
prevRead = read;
|
||||
}
|
||||
|
||||
if ( ! nestedSAMIterator.hasNext() ) {
|
||||
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
|
||||
perSampleDownsampler.signalEndOfInput();
|
||||
if ( perSampleDownsampler.hasFinalizedItems() ) {
|
||||
orderedDownsampledReadsCache.addAll(perSampleDownsampler.consumeFinalizedItems());
|
||||
}
|
||||
}
|
||||
earliestPendingRead = null;
|
||||
earliestPendingDownsampler = null;
|
||||
}
|
||||
|
||||
return readyToReleaseReads();
|
||||
}
|
||||
|
||||
private void updateEarliestPendingRead( ReadsDownsampler<SAMRecord> currentDownsampler ) {
|
||||
// If there is no recorded earliest pending read and this downsampler has pending items,
|
||||
// then this downsampler's first pending item becomes the new earliest pending read:
|
||||
|
|
@ -135,57 +185,11 @@ public class PerSampleDownsamplingReadsIterator implements StingSAMIterator {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean fillDownsampledReadsCache() {
|
||||
SAMRecord prevRead = null;
|
||||
int numPositionalChanges = 0;
|
||||
|
||||
// Continue submitting reads to the per-sample downsamplers until the read at the top of the priority queue
|
||||
// can be released without violating global sort order
|
||||
while ( nestedSAMIterator.hasNext() && ! readyToReleaseReads() ) {
|
||||
SAMRecord read = nestedSAMIterator.next();
|
||||
String sampleName = read.getReadGroup() != null ? read.getReadGroup().getSample() : null;
|
||||
|
||||
ReadsDownsampler<SAMRecord> thisSampleDownsampler = perSampleDownsamplers.get(sampleName);
|
||||
if ( thisSampleDownsampler == null ) {
|
||||
thisSampleDownsampler = downsamplerFactory.newInstance();
|
||||
perSampleDownsamplers.put(sampleName, thisSampleDownsampler);
|
||||
}
|
||||
|
||||
thisSampleDownsampler.submit(read);
|
||||
updateEarliestPendingRead(thisSampleDownsampler);
|
||||
|
||||
if ( prevRead != null && prevRead.getAlignmentStart() != read.getAlignmentStart() ) {
|
||||
numPositionalChanges++;
|
||||
}
|
||||
|
||||
// Periodically inform all downsamplers of the current position in the read stream. This is
|
||||
// to prevent downsamplers for samples with sparser reads than others from getting stuck too
|
||||
// long in a pending state.
|
||||
if ( numPositionalChanges > 0 && numPositionalChanges % DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL == 0 ) {
|
||||
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
|
||||
perSampleDownsampler.signalNoMoreReadsBefore(read);
|
||||
updateEarliestPendingRead(perSampleDownsampler);
|
||||
}
|
||||
}
|
||||
|
||||
prevRead = read;
|
||||
private void processFinalizedAndPendingItems( ReadsDownsampler<SAMRecord> currentDownsampler ) {
|
||||
if ( currentDownsampler.hasFinalizedItems() ) {
|
||||
orderedDownsampledReadsCache.addAll(currentDownsampler.consumeFinalizedItems());
|
||||
}
|
||||
|
||||
if ( ! nestedSAMIterator.hasNext() ) {
|
||||
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
|
||||
perSampleDownsampler.signalEndOfInput();
|
||||
}
|
||||
earliestPendingRead = null;
|
||||
earliestPendingDownsampler = null;
|
||||
}
|
||||
|
||||
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
|
||||
if ( perSampleDownsampler.hasFinalizedItems() ) {
|
||||
orderedDownsampledReadsCache.addAll(perSampleDownsampler.consumeFinalizedItems());
|
||||
}
|
||||
}
|
||||
|
||||
return readyToReleaseReads();
|
||||
updateEarliestPendingRead(currentDownsampler);
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue