New, much less memory intensive implementation of BAM file sharding. Streams indices together with the expectation
that bins will be present in the bin sparse array, which avoids the problem of having to hold the sparse bin array stored in every BAM file index in memory at the same time. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3075 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
4398a8b370
commit
46c14ec63f
|
|
@ -27,7 +27,12 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
||||||
/**
|
/**
|
||||||
* The data source used to shard.
|
* The data source used to shard.
|
||||||
*/
|
*/
|
||||||
protected final BlockDrivenSAMDataSource dataSource;
|
private final BlockDrivenSAMDataSource dataSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The intervals to be processed.
|
||||||
|
*/
|
||||||
|
private final GenomeLocSortedSet locations;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The cached shard to be returned next. Prefetched in the peekable iterator style.
|
* The cached shard to be returned next. Prefetched in the peekable iterator style.
|
||||||
|
|
@ -63,10 +68,13 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
||||||
|
|
||||||
this.dataSource = (BlockDrivenSAMDataSource)dataSource;
|
this.dataSource = (BlockDrivenSAMDataSource)dataSource;
|
||||||
this.position = this.dataSource.getCurrentPosition();
|
this.position = this.dataSource.getCurrentPosition();
|
||||||
if(locations != null)
|
this.locations = locations;
|
||||||
filePointers.addAll(IntervalSharder.shardIntervals(this.dataSource,locations.toList()));
|
|
||||||
|
if(locations != null)
|
||||||
|
filePointerIterator = IntervalSharder.shardIntervals(this.dataSource,locations.toList());
|
||||||
|
else
|
||||||
|
filePointerIterator = filePointers.iterator();
|
||||||
|
|
||||||
filePointerIterator = filePointers.iterator();
|
|
||||||
if(filePointerIterator.hasNext())
|
if(filePointerIterator.hasNext())
|
||||||
currentFilePointer = filePointerIterator.next();
|
currentFilePointer = filePointerIterator.next();
|
||||||
|
|
||||||
|
|
@ -99,7 +107,7 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
|
||||||
nextShard = null;
|
nextShard = null;
|
||||||
SamRecordFilter filter = null;
|
SamRecordFilter filter = null;
|
||||||
|
|
||||||
if(!filePointers.isEmpty()) {
|
if(locations != null) {
|
||||||
Map<SAMReaderID,List<Chunk>> selectedReaders = new HashMap<SAMReaderID,List<Chunk>>();
|
Map<SAMReaderID,List<Chunk>> selectedReaders = new HashMap<SAMReaderID,List<Chunk>>();
|
||||||
while(selectedReaders.size() == 0 && currentFilePointer != null) {
|
while(selectedReaders.size() == 0 && currentFilePointer != null) {
|
||||||
shardPosition = currentFilePointer.chunks;
|
shardPosition = currentFilePointer.chunks;
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
import net.sf.samtools.Chunk;
|
import net.sf.samtools.Chunk;
|
||||||
import net.sf.samtools.SAMFileReader;
|
|
||||||
import net.sf.samtools.SAMFileHeader;
|
import net.sf.samtools.SAMFileHeader;
|
||||||
import net.sf.samtools.SAMSequenceRecord;
|
import net.sf.samtools.SAMSequenceRecord;
|
||||||
|
|
||||||
|
|
@ -51,9 +50,6 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
|
||||||
*/
|
*/
|
||||||
private final BlockDrivenSAMDataSource reads;
|
private final BlockDrivenSAMDataSource reads;
|
||||||
|
|
||||||
/** our storage of the genomic locations they'd like to shard over */
|
|
||||||
private final List<FilePointer> filePointers = new ArrayList<FilePointer>();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An iterator through the available file pointers.
|
* An iterator through the available file pointers.
|
||||||
*/
|
*/
|
||||||
|
|
@ -90,13 +86,13 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
|
||||||
else
|
else
|
||||||
intervals = locations.toList();
|
intervals = locations.toList();
|
||||||
|
|
||||||
|
|
||||||
this.reads = (BlockDrivenSAMDataSource)reads;
|
this.reads = (BlockDrivenSAMDataSource)reads;
|
||||||
filePointers.addAll(IntervalSharder.shardIntervals(this.reads,intervals));
|
this.filePointerIterator = IntervalSharder.shardIntervals(this.reads,intervals);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
final int maxShardSize = 100000;
|
final int maxShardSize = 100000;
|
||||||
this.reads = null;
|
this.reads = null;
|
||||||
|
List<FilePointer> filePointers = new ArrayList<FilePointer>();
|
||||||
if(locations == null) {
|
if(locations == null) {
|
||||||
for(SAMSequenceRecord refSequenceRecord: reference.getSequenceDictionary().getSequences()) {
|
for(SAMSequenceRecord refSequenceRecord: reference.getSequenceDictionary().getSequences()) {
|
||||||
for(int shardStart = 1; shardStart <= refSequenceRecord.getSequenceLength(); shardStart += maxShardSize) {
|
for(int shardStart = 1; shardStart <= refSequenceRecord.getSequenceLength(); shardStart += maxShardSize) {
|
||||||
|
|
@ -109,9 +105,9 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
|
||||||
for(GenomeLoc interval: locations)
|
for(GenomeLoc interval: locations)
|
||||||
filePointers.add(new FilePointer(interval));
|
filePointers.add(new FilePointer(interval));
|
||||||
}
|
}
|
||||||
|
filePointerIterator = filePointers.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
filePointerIterator = filePointers.iterator();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,6 @@
|
||||||
package org.broadinstitute.sting.gatk.datasources.shards;
|
package org.broadinstitute.sting.gatk.datasources.shards;
|
||||||
|
|
||||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
import org.broadinstitute.sting.utils.*;
|
||||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
|
||||||
import org.broadinstitute.sting.utils.StingException;
|
|
||||||
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
|
|
||||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
|
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
|
||||||
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
|
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
|
||||||
|
|
||||||
|
|
@ -19,192 +16,113 @@ import net.sf.picard.util.PeekableIterator;
|
||||||
* @version 0.1
|
* @version 0.1
|
||||||
*/
|
*/
|
||||||
public class IntervalSharder {
|
public class IntervalSharder {
|
||||||
protected static List<FilePointer> shardIntervals(final BlockDrivenSAMDataSource dataSource, final List<GenomeLoc> loci) {
|
public static Iterator<FilePointer> shardIntervals(final BlockDrivenSAMDataSource dataSource, final List<GenomeLoc> loci) {
|
||||||
Map<SAMReaderID,List<FilePointer>> filePointersByReader = new HashMap<SAMReaderID,List<FilePointer>>();
|
return new FilePointerIterator(dataSource,loci);
|
||||||
for(SAMReaderID id: dataSource.getReaderIDs()) {
|
|
||||||
PreloadedBAMFileIndex index = dataSource.getIndex(id);
|
|
||||||
// Gather bins for the given loci, splitting loci as necessary so that each falls into exactly one lowest-level bin.\
|
|
||||||
filePointersByReader.put(id,shardIntervalsOverIndex(dataSource,id,index,loci,index.getNumIndexLevels()-1));
|
|
||||||
index.close();
|
|
||||||
}
|
|
||||||
return combineFilePointers(filePointersByReader);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Combine adjacent file pointers into a structure that can be streamed in.
|
* A lazy-loading iterator over file pointers.
|
||||||
* @param filePointersByReader File pointers broken down by reader.
|
|
||||||
* @return A large structure of file pointers.
|
|
||||||
*/
|
*/
|
||||||
private static List<FilePointer> combineFilePointers(Map<SAMReaderID,List<FilePointer>> filePointersByReader) {
|
private static class FilePointerIterator implements Iterator<FilePointer> {
|
||||||
PeekableIterator<FilePointer> mergingIterator = new PeekableIterator<FilePointer>(new FilePointerMergingIterator(filePointersByReader));
|
final BlockDrivenSAMDataSource dataSource;
|
||||||
|
final PeekableIterator<GenomeLoc> locusIterator;
|
||||||
|
final Queue<FilePointer> cachedFilePointers = new LinkedList<FilePointer>();
|
||||||
|
|
||||||
List<FilePointer> overlappingFilePointers = new ArrayList<FilePointer>();
|
public FilePointerIterator(final BlockDrivenSAMDataSource dataSource, final List<GenomeLoc> loci) {
|
||||||
List<FilePointer> mergedFilePointers = new ArrayList<FilePointer>();
|
this.dataSource = dataSource;
|
||||||
|
locusIterator = new PeekableIterator<GenomeLoc>(loci.iterator());
|
||||||
while(mergingIterator.hasNext()) {
|
advance();
|
||||||
GenomeLoc bounds = null;
|
|
||||||
|
|
||||||
// Load up a segment where file pointers overlap
|
|
||||||
while(mergingIterator.hasNext() && (overlappingFilePointers.size() == 0 || mergingIterator.peek().getBounds().overlapsP(bounds))) {
|
|
||||||
FilePointer filePointer = mergingIterator.next();
|
|
||||||
if(bounds != null)
|
|
||||||
bounds = GenomeLocParser.createGenomeLoc(bounds.getContig(),
|
|
||||||
Math.min(bounds.getStart(),filePointer.getBounds().getStart()),
|
|
||||||
Math.max(bounds.getStop(),filePointer.getBounds().getStop()));
|
|
||||||
else
|
|
||||||
bounds = filePointer.getBounds();
|
|
||||||
overlappingFilePointers.add(filePointer);
|
|
||||||
}
|
|
||||||
|
|
||||||
// determine the complete set of unique locations defining this set.
|
|
||||||
List<GenomeLoc> overlappingLocations = new ArrayList<GenomeLoc>();
|
|
||||||
for(FilePointer filePointer: overlappingFilePointers)
|
|
||||||
overlappingLocations.addAll(filePointer.locations);
|
|
||||||
Collections.sort(overlappingLocations);
|
|
||||||
overlappingLocations = GenomeLocSortedSet.mergeOverlappingLocations(overlappingLocations);
|
|
||||||
|
|
||||||
while(!overlappingLocations.isEmpty()) {
|
|
||||||
long overlapStart = overlappingLocations.get(0).getStart();
|
|
||||||
long overlapStop = overlappingLocations.get(overlappingLocations.size()-1).getStop();
|
|
||||||
|
|
||||||
for(FilePointer overlappingFilePointer: overlappingFilePointers) {
|
|
||||||
if(overlappingFilePointer.getBounds().getStop() < overlapStart)
|
|
||||||
continue;
|
|
||||||
if(overlappingFilePointer.getBounds().getStart() > overlapStart) overlapStop = Math.min(overlapStop,overlappingFilePointer.getBounds().getStart()-1);
|
|
||||||
if(overlappingFilePointer.getBounds().getStop() < overlapStop) overlapStop = Math.min(overlapStop,overlappingFilePointer.getBounds().getStop());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the overlapping genome locs.
|
|
||||||
List<GenomeLoc> segmentOverlap = new ArrayList<GenomeLoc>();
|
|
||||||
for(GenomeLoc overlappingLocation: overlappingLocations) {
|
|
||||||
if(overlappingLocation.getStop() <= overlapStop) {
|
|
||||||
// segment is completely before end of overlap.
|
|
||||||
segmentOverlap.add(overlappingLocation);
|
|
||||||
}
|
|
||||||
else if(overlappingLocation.getStart() <= overlapStop) {
|
|
||||||
// segment is partially before end of overlap.
|
|
||||||
segmentOverlap.add(GenomeLocParser.setStop(overlappingLocation,overlapStop));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// segment starts after overlap ends.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trim the overlapping genome locs of the overlapping locations list.
|
|
||||||
while(!overlappingLocations.isEmpty() && overlappingLocations.get(0).getStart() <= overlapStop) {
|
|
||||||
GenomeLoc location = overlappingLocations.remove(0);
|
|
||||||
if(location.getStop() > overlapStop)
|
|
||||||
overlappingLocations.add(0,GenomeLocParser.setStart(location,overlapStop+1));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge together all file pointers that overlap with these bounds.
|
|
||||||
GenomeLoc overlapBounds = GenomeLocParser.createGenomeLoc(segmentOverlap.get(0).getContigIndex(),overlapStart,overlapStop);
|
|
||||||
FilePointer mergedFilePointer = null;
|
|
||||||
for(FilePointer overlappingFilePointer: overlappingFilePointers) {
|
|
||||||
if(overlappingFilePointer.getBounds().overlapsP(overlapBounds))
|
|
||||||
mergedFilePointer = overlappingFilePointer.merge(mergedFilePointer,segmentOverlap);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the resulting file pointer and clear state.
|
|
||||||
mergedFilePointers.add(mergedFilePointer);
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset
|
|
||||||
overlappingFilePointers.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
return mergedFilePointers;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class FilePointerMergingIterator implements Iterator<FilePointer> {
|
|
||||||
private PriorityQueue<PeekableIterator<FilePointer>> filePointerQueue;
|
|
||||||
|
|
||||||
public FilePointerMergingIterator(Map<SAMReaderID,List<FilePointer>> filePointers) {
|
|
||||||
filePointerQueue = new PriorityQueue<PeekableIterator<FilePointer>>(filePointers.size(),new FilePointerMergingComparator());
|
|
||||||
for(List<FilePointer> filePointersByReader: filePointers.values())
|
|
||||||
filePointerQueue.add(new PeekableIterator<FilePointer>(filePointersByReader.iterator()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
return !filePointerQueue.isEmpty();
|
return !cachedFilePointers.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
public FilePointer next() {
|
public FilePointer next() {
|
||||||
if(!hasNext()) throw new NoSuchElementException("FilePointerMergingIterator is out of elements");
|
if(!hasNext())
|
||||||
PeekableIterator<FilePointer> nextIterator = filePointerQueue.remove();
|
throw new NoSuchElementException("FilePointerIterator iteration is complete");
|
||||||
FilePointer nextFilePointer = nextIterator.next();
|
FilePointer filePointer = cachedFilePointers.remove();
|
||||||
if(nextIterator.hasNext())
|
if(cachedFilePointers.isEmpty())
|
||||||
filePointerQueue.add(nextIterator);
|
advance();
|
||||||
return nextFilePointer;
|
return filePointer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void remove() { throw new UnsupportedOperationException("Cannot remove from a merging iterator."); }
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException("Cannot remove from a FilePointerIterator");
|
||||||
|
}
|
||||||
|
|
||||||
private class FilePointerMergingComparator implements Comparator<PeekableIterator<FilePointer>> {
|
private void advance() {
|
||||||
public int compare(PeekableIterator<FilePointer> lhs, PeekableIterator<FilePointer> rhs) {
|
List<GenomeLoc> nextBatch = new ArrayList<GenomeLoc>();
|
||||||
if(!lhs.hasNext() && !rhs.hasNext()) return 0;
|
String contig = null;
|
||||||
if(!rhs.hasNext()) return -1;
|
|
||||||
if(!lhs.hasNext()) return 1;
|
while(locusIterator.hasNext() && nextBatch.isEmpty()) {
|
||||||
return lhs.peek().getBounds().compareTo(rhs.peek().getBounds());
|
contig = null;
|
||||||
|
while(locusIterator.hasNext() && (contig == null || locusIterator.peek().getContig().equals(contig))) {
|
||||||
|
GenomeLoc nextLocus = locusIterator.next();
|
||||||
|
contig = nextLocus.getContig();
|
||||||
|
nextBatch.add(nextLocus);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(nextBatch.size() > 0)
|
||||||
|
cachedFilePointers.addAll(shardIntervalsOnContig(dataSource,contig,nextBatch));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<FilePointer> shardIntervalsOverIndex(final BlockDrivenSAMDataSource dataSource, final SAMReaderID id, final PreloadedBAMFileIndex index, final List<GenomeLoc> loci, final int binsDeeperThan) {
|
private static List<FilePointer> shardIntervalsOnContig(final BlockDrivenSAMDataSource dataSource, final String contig, final List<GenomeLoc> loci) {
|
||||||
// Gather bins for the given loci, splitting loci as necessary so that each falls into exactly one lowest-level bin.
|
// Gather bins for the given loci, splitting loci as necessary so that each falls into exactly one lowest-level bin.
|
||||||
List<FilePointer> filePointers = new ArrayList<FilePointer>();
|
List<FilePointer> filePointers = new ArrayList<FilePointer>();
|
||||||
FilePointer lastFilePointer = null;
|
FilePointer lastFilePointer = null;
|
||||||
Bin lastBin = null;
|
BAMOverlap lastBAMOverlap = null;
|
||||||
|
|
||||||
|
Map<SAMReaderID,PreloadedBAMFileIndex> readerToIndexMap = new HashMap<SAMReaderID,PreloadedBAMFileIndex>();
|
||||||
|
BinMergingIterator binMerger = new BinMergingIterator();
|
||||||
|
for(SAMReaderID id: dataSource.getReaderIDs()) {
|
||||||
|
final SAMSequenceRecord referenceSequence = dataSource.getHeader(id).getSequence(contig);
|
||||||
|
final PreloadedBAMFileIndex index = dataSource.getIndex(id);
|
||||||
|
binMerger.addReader(id,
|
||||||
|
index,
|
||||||
|
referenceSequence.getSequenceIndex(),
|
||||||
|
index.getBinsOverlapping(referenceSequence.getSequenceIndex(),1,referenceSequence.getSequenceLength()).iterator());
|
||||||
|
// Cache the reader for later data lookup.
|
||||||
|
readerToIndexMap.put(id,index);
|
||||||
|
}
|
||||||
|
PeekableIterator<BAMOverlap> binIterator = new PeekableIterator<BAMOverlap>(binMerger);
|
||||||
|
|
||||||
for(GenomeLoc location: loci) {
|
for(GenomeLoc location: loci) {
|
||||||
// If crossing contigs, be sure to reset the filepointer that's been accumulating shard data.
|
if(!location.getContig().equals(contig))
|
||||||
if(lastFilePointer != null && lastFilePointer.referenceSequence != location.getContigIndex()) {
|
throw new StingException("Location outside bounds of contig");
|
||||||
filePointers.add(lastFilePointer);
|
|
||||||
lastFilePointer = null;
|
|
||||||
lastBin = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
int locationStart = (int)location.getStart();
|
int locationStart = (int)location.getStart();
|
||||||
final int locationStop = (int)location.getStop();
|
final int locationStop = (int)location.getStop();
|
||||||
|
|
||||||
List<Bin> bins = findBinsAtLeastAsDeepAs(index,getOverlappingBins(dataSource,id,index,location),binsDeeperThan);
|
// Advance to first bin.
|
||||||
|
while(binIterator.peek().stop < locationStart)
|
||||||
|
binIterator.next();
|
||||||
|
|
||||||
// Recursive stopping condition -- algorithm is at the zero point and no bins have been found.
|
// Add all relevant bins to a list. If the given bin extends beyond the end of the current interval, make
|
||||||
if(binsDeeperThan == 0 && bins.size() == 0) {
|
// sure the extending bin is not pruned from the list.
|
||||||
filePointers.add(new FilePointer(location));
|
List<BAMOverlap> bamOverlaps = new ArrayList<BAMOverlap>();
|
||||||
continue;
|
while(binIterator.hasNext() && binIterator.peek().stop <= locationStop)
|
||||||
}
|
bamOverlaps.add(binIterator.next());
|
||||||
|
if(binIterator.hasNext() && binIterator.peek().start <= locationStop)
|
||||||
// No bins found; step up a level and search again.
|
bamOverlaps.add(binIterator.peek());
|
||||||
if(bins.size() == 0) {
|
|
||||||
if(lastFilePointer != null && lastFilePointer.locations.size() > 0) {
|
|
||||||
filePointers.add(lastFilePointer);
|
|
||||||
lastFilePointer = null;
|
|
||||||
lastBin = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
filePointers.addAll(shardIntervalsOverIndex(dataSource,id,index,Collections.singletonList(location),binsDeeperThan-1));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bins found; try to match bins with locations.
|
// Bins found; try to match bins with locations.
|
||||||
Collections.sort(bins);
|
Iterator<BAMOverlap> bamOverlapIterator = bamOverlaps.iterator();
|
||||||
Iterator<Bin> binIterator = bins.iterator();
|
|
||||||
|
|
||||||
while(locationStop >= locationStart) {
|
while(locationStop >= locationStart) {
|
||||||
int binStart = lastFilePointer!=null ? index.getFirstLocusInBin(lastBin) : 0;
|
int binStart = lastFilePointer!=null ? lastFilePointer.overlap.start : 0;
|
||||||
int binStop = lastFilePointer!=null ? index.getLastLocusInBin(lastBin) : 0;
|
int binStop = lastFilePointer!=null ? lastFilePointer.overlap.stop : 0;
|
||||||
|
|
||||||
while(binStop < locationStart && binIterator.hasNext()) {
|
while(binStop < locationStart && bamOverlapIterator.hasNext()) {
|
||||||
if(lastFilePointer != null && lastFilePointer.locations.size() > 0)
|
if(lastFilePointer != null && lastFilePointer.locations.size() > 0)
|
||||||
filePointers.add(lastFilePointer);
|
filePointers.add(lastFilePointer);
|
||||||
|
|
||||||
lastBin = binIterator.next();
|
lastBAMOverlap = bamOverlapIterator.next();
|
||||||
lastFilePointer = new FilePointer(id,lastBin.referenceSequence,getFilePointersBounding(index,lastBin));
|
lastFilePointer = new FilePointer(contig,lastBAMOverlap);
|
||||||
binStart = index.getFirstLocusInBin(lastBin);
|
binStart = lastFilePointer.overlap.start;
|
||||||
binStop = index.getLastLocusInBin(lastBin);
|
binStop = lastFilePointer.overlap.stop;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(locationStart < binStart) {
|
if(locationStart < binStart) {
|
||||||
|
|
@ -212,13 +130,13 @@ public class IntervalSharder {
|
||||||
if(lastFilePointer != null && lastFilePointer.locations.size() > 0) {
|
if(lastFilePointer != null && lastFilePointer.locations.size() > 0) {
|
||||||
filePointers.add(lastFilePointer);
|
filePointers.add(lastFilePointer);
|
||||||
lastFilePointer = null;
|
lastFilePointer = null;
|
||||||
lastBin = null;
|
lastBAMOverlap = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final int regionStop = Math.min(locationStop,binStart-1);
|
final int regionStop = Math.min(locationStop,binStart-1);
|
||||||
|
|
||||||
GenomeLoc subset = GenomeLocParser.createGenomeLoc(location.getContig(),locationStart,regionStop);
|
GenomeLoc subset = GenomeLocParser.createGenomeLoc(location.getContig(),locationStart,regionStop);
|
||||||
filePointers.addAll(shardIntervalsOverIndex(dataSource,id,index,Collections.singletonList(subset),binsDeeperThan-1));
|
lastFilePointer = new FilePointer(subset);
|
||||||
|
|
||||||
locationStart = regionStop + 1;
|
locationStart = regionStop + 1;
|
||||||
}
|
}
|
||||||
|
|
@ -227,20 +145,21 @@ public class IntervalSharder {
|
||||||
if(lastFilePointer != null && lastFilePointer.locations.size() > 0) {
|
if(lastFilePointer != null && lastFilePointer.locations.size() > 0) {
|
||||||
filePointers.add(lastFilePointer);
|
filePointers.add(lastFilePointer);
|
||||||
lastFilePointer = null;
|
lastFilePointer = null;
|
||||||
lastBin = null;
|
lastBAMOverlap = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
GenomeLoc subset = GenomeLocParser.createGenomeLoc(location.getContig(),locationStart,locationStop);
|
GenomeLoc subset = GenomeLocParser.createGenomeLoc(location.getContig(),locationStart,locationStop);
|
||||||
filePointers.addAll(shardIntervalsOverIndex(dataSource,id,index,Collections.singletonList(subset),binsDeeperThan-1));
|
filePointers.add(new FilePointer(subset));
|
||||||
|
|
||||||
locationStart = locationStop + 1;
|
locationStart = locationStop + 1;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
if(lastFilePointer == null)
|
||||||
|
throw new StingException("Illegal state: initializer failed to create cached file pointer.");
|
||||||
|
|
||||||
// The start of the region overlaps the bin. Add the overlapping subset.
|
// The start of the region overlaps the bin. Add the overlapping subset.
|
||||||
final int regionStop = Math.min(locationStop,binStop);
|
final int regionStop = Math.min(locationStop,binStop);
|
||||||
lastFilePointer.addLocation(GenomeLocParser.createGenomeLoc(location.getContig(),
|
lastFilePointer.addLocation(GenomeLocParser.createGenomeLoc(location.getContig(),locationStart,regionStop));
|
||||||
locationStart,
|
|
||||||
regionStop));
|
|
||||||
locationStart = regionStop + 1;
|
locationStart = regionStop + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -249,48 +168,204 @@ public class IntervalSharder {
|
||||||
if(lastFilePointer != null && lastFilePointer.locations.size() > 0)
|
if(lastFilePointer != null && lastFilePointer.locations.size() > 0)
|
||||||
filePointers.add(lastFilePointer);
|
filePointers.add(lastFilePointer);
|
||||||
|
|
||||||
|
// Lookup the locations for every file pointer in the index.
|
||||||
|
for(SAMReaderID id: dataSource.getReaderIDs()) {
|
||||||
|
PreloadedBAMFileIndex index = readerToIndexMap.get(id);
|
||||||
|
for(FilePointer filePointer: filePointers)
|
||||||
|
filePointer.addChunks(id,index.getChunksOverlapping(filePointer.overlap.getBin(id)));
|
||||||
|
index.close();
|
||||||
|
}
|
||||||
|
|
||||||
return filePointers;
|
return filePointers;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Bin> findBinsAtLeastAsDeepAs(final PreloadedBAMFileIndex index, final List<Bin> bins, final int deepestBinLevel) {
|
private static class BinMergingIterator implements Iterator<BAMOverlap> {
|
||||||
List<Bin> deepestBins = new ArrayList<Bin>();
|
private PriorityQueue<BinQueueState> binQueue = new PriorityQueue<BinQueueState>();
|
||||||
for(Bin bin: bins) {
|
private Queue<BAMOverlap> pendingOverlaps = new LinkedList<BAMOverlap>();
|
||||||
if(index.getLevelForBin(bin) >= deepestBinLevel)
|
|
||||||
deepestBins.add(bin);
|
public void addReader(final SAMReaderID id, final PreloadedBAMFileIndex index, final int referenceSequence, Iterator<Bin> bins) {
|
||||||
|
binQueue.add(new BinQueueState(id,index,referenceSequence,new LowestLevelBinFilteringIterator(index,bins)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasNext() {
|
||||||
|
return pendingOverlaps.size() > 0 || !binQueue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public BAMOverlap next() {
|
||||||
|
if(!hasNext())
|
||||||
|
throw new NoSuchElementException("No elements left in merging iterator");
|
||||||
|
if(pendingOverlaps.isEmpty())
|
||||||
|
advance();
|
||||||
|
return pendingOverlaps.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void advance() {
|
||||||
|
List<ReaderBin> bins = new ArrayList<ReaderBin>();
|
||||||
|
int boundsStart, boundsStop;
|
||||||
|
|
||||||
|
// Prime the pump
|
||||||
|
if(binQueue.isEmpty())
|
||||||
|
return;
|
||||||
|
bins.add(getNextBin());
|
||||||
|
boundsStart = bins.get(0).getStart();
|
||||||
|
boundsStop = bins.get(0).getStop();
|
||||||
|
|
||||||
|
// Accumulate all the bins that overlap the current bin, in sorted order.
|
||||||
|
while(!binQueue.isEmpty() && peekNextBin().getStart() <= boundsStop) {
|
||||||
|
ReaderBin bin = getNextBin();
|
||||||
|
bins.add(bin);
|
||||||
|
boundsStart = Math.min(boundsStart,bin.getStart());
|
||||||
|
boundsStop = Math.max(boundsStop,bin.getStop());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Pair<Integer,Integer>> range = new ArrayList<Pair<Integer,Integer>>();
|
||||||
|
int start = bins.get(0).getStart();
|
||||||
|
int stop = bins.get(0).getStop();
|
||||||
|
while(start <= boundsStop) {
|
||||||
|
// Find the next stopping point.
|
||||||
|
for(ReaderBin bin: bins) {
|
||||||
|
stop = Math.min(stop,bin.getStop());
|
||||||
|
if(start < bin.getStart())
|
||||||
|
stop = Math.min(stop,bin.getStart()-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
range.add(new Pair<Integer,Integer>(start,stop));
|
||||||
|
// If the last entry added included the last element, stop.
|
||||||
|
if(stop >= boundsStop)
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Find the next start.
|
||||||
|
start = stop + 1;
|
||||||
|
for(ReaderBin bin: bins) {
|
||||||
|
if(start >= bin.getStart() && start <= bin.getStop())
|
||||||
|
break;
|
||||||
|
else if(start < bin.getStart()) {
|
||||||
|
start = bin.getStart();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the next series of BAM overlaps to the window.
|
||||||
|
for(Pair<Integer,Integer> window: range) {
|
||||||
|
BAMOverlap bamOverlap = new BAMOverlap(window.first,window.second);
|
||||||
|
for(ReaderBin bin: bins)
|
||||||
|
bamOverlap.addBin(bin.id,bin.bin);
|
||||||
|
pendingOverlaps.add(bamOverlap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove() { throw new UnsupportedOperationException("Cannot remove from a merging iterator."); }
|
||||||
|
|
||||||
|
private ReaderBin peekNextBin() {
|
||||||
|
if(binQueue.isEmpty())
|
||||||
|
throw new NoSuchElementException("No more bins are available");
|
||||||
|
BinQueueState current = binQueue.peek();
|
||||||
|
return new ReaderBin(current.id,current.index,current.referenceSequence,current.bins.peek());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReaderBin getNextBin() {
|
||||||
|
if(binQueue.isEmpty())
|
||||||
|
throw new NoSuchElementException("No more bins are available");
|
||||||
|
BinQueueState current = binQueue.remove();
|
||||||
|
ReaderBin readerBin = new ReaderBin(current.id,current.index,current.referenceSequence,current.bins.next());
|
||||||
|
if(current.bins.hasNext())
|
||||||
|
binQueue.add(current);
|
||||||
|
return readerBin;
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ReaderBin {
|
||||||
|
public final SAMReaderID id;
|
||||||
|
public final PreloadedBAMFileIndex index;
|
||||||
|
public final int referenceSequence;
|
||||||
|
public final Bin bin;
|
||||||
|
|
||||||
|
public ReaderBin(final SAMReaderID id, final PreloadedBAMFileIndex index, final int referenceSequence, final Bin bin) {
|
||||||
|
this.id = id;
|
||||||
|
this.index = index;
|
||||||
|
this.referenceSequence = referenceSequence;
|
||||||
|
this.bin = bin;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getStart() {
|
||||||
|
return index.getFirstLocusInBin(bin);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getStop() {
|
||||||
|
return index.getLastLocusInBin(bin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class BinQueueState implements Comparable<BinQueueState> {
|
||||||
|
public final SAMReaderID id;
|
||||||
|
public final PreloadedBAMFileIndex index;
|
||||||
|
public final int referenceSequence;
|
||||||
|
public final PeekableIterator<Bin> bins;
|
||||||
|
|
||||||
|
public BinQueueState(final SAMReaderID id, final PreloadedBAMFileIndex index, final int referenceSequence, final Iterator<Bin> bins) {
|
||||||
|
this.id = id;
|
||||||
|
this.index = index;
|
||||||
|
this.referenceSequence = referenceSequence;
|
||||||
|
this.bins = new PeekableIterator<Bin>(bins);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int compareTo(BinQueueState other) {
|
||||||
|
if(!this.bins.hasNext() && !other.bins.hasNext()) return 0;
|
||||||
|
if(!this.bins.hasNext()) return -1;
|
||||||
|
if(!this.bins.hasNext()) return 1;
|
||||||
|
|
||||||
|
int thisStart = this.index.getFirstLocusInBin(this.bins.peek());
|
||||||
|
int otherStart = other.index.getFirstLocusInBin(other.bins.peek());
|
||||||
|
|
||||||
|
// Straight integer subtraction works here because lhsStart, rhsStart always positive.
|
||||||
|
if(thisStart != otherStart)
|
||||||
|
return thisStart - otherStart;
|
||||||
|
|
||||||
|
int thisStop = this.index.getLastLocusInBin(this.bins.peek());
|
||||||
|
int otherStop = other.index.getLastLocusInBin(other.bins.peek());
|
||||||
|
|
||||||
|
// Straight integer subtraction works here because lhsStop, rhsStop always positive.
|
||||||
|
return thisStop - otherStop;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return deepestBins;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets a list of the bins in each BAM file that overlap with the given interval list.
|
* Filters out bins not at the lowest level in the tree.
|
||||||
* @param location Location for which to determine the bin.
|
|
||||||
* @return A map of reader back to bin.
|
|
||||||
*/
|
*/
|
||||||
private static List<Bin> getOverlappingBins(final BlockDrivenSAMDataSource dataSource, final SAMReaderID id, final PreloadedBAMFileIndex index, final GenomeLoc location) {
|
private static class LowestLevelBinFilteringIterator implements Iterator<Bin> {
|
||||||
// All readers will have the same bin structure, so just use the first bin as an example.
|
private PreloadedBAMFileIndex index;
|
||||||
final SAMFileHeader fileHeader = dataSource.getHeader(id);
|
private Iterator<Bin> wrappedIterator;
|
||||||
int referenceIndex = fileHeader.getSequenceIndex(location.getContig());
|
|
||||||
if (referenceIndex != -1) {
|
private Bin nextBin;
|
||||||
return index.getBinsContaining(referenceIndex,(int)location.getStart(),(int)location.getStop());
|
|
||||||
|
public LowestLevelBinFilteringIterator(final PreloadedBAMFileIndex index, Iterator<Bin> iterator) {
|
||||||
|
this.index = index;
|
||||||
|
this.wrappedIterator = iterator;
|
||||||
|
advance();
|
||||||
}
|
}
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
public boolean hasNext() {
|
||||||
* Gets the file pointers bounded by this bin, grouped by the reader of origination.
|
return nextBin != null;
|
||||||
* @param bin The bin for which to load data.
|
|
||||||
* @return A map of the file pointers bounding the bin.
|
|
||||||
*/
|
|
||||||
private static List<Chunk> getFilePointersBounding(final PreloadedBAMFileIndex index, final Bin bin) {
|
|
||||||
if(bin != null) {
|
|
||||||
List<Chunk> chunks = index.getSearchBins(bin);
|
|
||||||
return chunks != null ? chunks : Collections.<Chunk>emptyList();
|
|
||||||
}
|
}
|
||||||
else
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
public Bin next() {
|
||||||
|
Bin bin = nextBin;
|
||||||
|
advance();
|
||||||
|
return bin;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove() { throw new UnsupportedOperationException("Remove operation is not supported"); }
|
||||||
|
|
||||||
|
private void advance() {
|
||||||
|
nextBin = null;
|
||||||
|
while(wrappedIterator.hasNext() && nextBin == null) {
|
||||||
|
Bin bin = wrappedIterator.next();
|
||||||
|
if(index.getLevelForBin(bin) == index.getNumIndexLevels()-1)
|
||||||
|
nextBin = bin;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -298,47 +373,53 @@ public class IntervalSharder {
|
||||||
*/
|
*/
|
||||||
class FilePointer {
|
class FilePointer {
|
||||||
protected final Map<SAMReaderID,List<Chunk>> chunks = new HashMap<SAMReaderID,List<Chunk>>();
|
protected final Map<SAMReaderID,List<Chunk>> chunks = new HashMap<SAMReaderID,List<Chunk>>();
|
||||||
protected final int referenceSequence;
|
protected final String referenceSequence;
|
||||||
|
protected final BAMOverlap overlap;
|
||||||
protected final List<GenomeLoc> locations;
|
protected final List<GenomeLoc> locations;
|
||||||
|
|
||||||
public FilePointer(SAMReaderID id, int referenceSequence, List<Chunk> chunks) {
|
public FilePointer(final GenomeLoc location) {
|
||||||
this.referenceSequence = referenceSequence;
|
referenceSequence = location.getContig();
|
||||||
this.chunks.put(id,chunks);
|
overlap = null;
|
||||||
this.locations = new ArrayList<GenomeLoc>();
|
|
||||||
}
|
|
||||||
|
|
||||||
public FilePointer(GenomeLoc location) {
|
|
||||||
referenceSequence = location.getContigIndex();
|
|
||||||
locations = Collections.singletonList(location);
|
locations = Collections.singletonList(location);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public FilePointer(final String referenceSequence,final BAMOverlap overlap) {
|
||||||
* Private constructor for merge operation.
|
|
||||||
* @param referenceSequence Sequence to merge.
|
|
||||||
* @param locations Merged locations.
|
|
||||||
*/
|
|
||||||
private FilePointer(final int referenceSequence, final List<GenomeLoc> locations) {
|
|
||||||
this.referenceSequence = referenceSequence;
|
this.referenceSequence = referenceSequence;
|
||||||
this.locations = locations;
|
this.overlap = overlap;
|
||||||
}
|
this.locations = new ArrayList<GenomeLoc>();
|
||||||
|
|
||||||
public FilePointer merge(FilePointer other, List<GenomeLoc> locations) {
|
|
||||||
FilePointer merged = new FilePointer(referenceSequence,locations);
|
|
||||||
merged.chunks.putAll(this.chunks);
|
|
||||||
if(other != null)
|
|
||||||
merged.chunks.putAll(other.chunks);
|
|
||||||
return merged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addLocation(GenomeLoc location) {
|
public void addLocation(GenomeLoc location) {
|
||||||
locations.add(location);
|
locations.add(location);
|
||||||
}
|
}
|
||||||
|
|
||||||
public GenomeLoc getBounds() {
|
public void addChunks(SAMReaderID id, List<Chunk> chunks) {
|
||||||
final long boundaryStart = locations.get(0).getStart();
|
this.chunks.put(id,chunks);
|
||||||
final long boundaryStop = locations.get(locations.size()-1).getStop();
|
}
|
||||||
return GenomeLocParser.createGenomeLoc(locations.get(0).getContigIndex(),boundaryStart,boundaryStop);
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Models a bin at which all BAM files in the merged input stream overlap.
|
||||||
|
*/
|
||||||
|
class BAMOverlap {
|
||||||
|
public final int start;
|
||||||
|
public final int stop;
|
||||||
|
|
||||||
|
private final Map<SAMReaderID,Bin> bins = new HashMap<SAMReaderID,Bin>();
|
||||||
|
|
||||||
|
public BAMOverlap(final int start, final int stop) {
|
||||||
|
this.start = start;
|
||||||
|
this.stop = stop;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addBin(final SAMReaderID id, final Bin bin) {
|
||||||
|
bins.put(id,bin);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Bin getBin(final SAMReaderID id) {
|
||||||
|
return bins.get(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Binary file not shown.
|
|
@ -1,3 +1,3 @@
|
||||||
<ivy-module version="1.0">
|
<ivy-module version="1.0">
|
||||||
<info organisation="edu.mit.broad" module="picard-private-parts" revision="1333-sharding-3" status="integration" publication="20100323163000" />
|
<info organisation="edu.mit.broad" module="picard-private-parts" revision="1333-sharding-4" status="integration" publication="20100324095800" />
|
||||||
</ivy-module>
|
</ivy-module>
|
||||||
Binary file not shown.
|
|
@ -1,3 +1,3 @@
|
||||||
<ivy-module version="1.0">
|
<ivy-module version="1.0">
|
||||||
<info organisation="net.sf" module="picard" revision="1.16.363-sharding" status="release" />
|
<info organisation="net.sf" module="picard" revision="1.16.364-sharding" status="release" />
|
||||||
</ivy-module>
|
</ivy-module>
|
||||||
Binary file not shown.
|
|
@ -1,3 +1,3 @@
|
||||||
<ivy-module version="1.0">
|
<ivy-module version="1.0">
|
||||||
<info organisation="net.sf" module="sam" revision="1.16.363-sharding" status="release" />
|
<info organisation="net.sf" module="sam" revision="1.16.364-sharding" status="release" />
|
||||||
</ivy-module>
|
</ivy-module>
|
||||||
Loading…
Reference in New Issue