Finally restored code after accidentally removing three days worth of work:
schedule file infrastructure has been restored, and is now a single file. Only the exact bins required for the traversal are stored in the schedule. Very close to being able to merge schedule entries. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5497 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
parent
69646ff840
commit
37fbf17da8
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import net.sf.picard.util.PeekableIterator;
|
||||
import net.sf.samtools.Bin;
|
||||
import net.sf.samtools.GATKBAMFileSpan;
|
||||
import net.sf.samtools.GATKChunk;
|
||||
|
|
@ -37,6 +38,12 @@ import java.io.RandomAccessFile;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Writes schedules for a single BAM file to a target output file.
|
||||
|
|
@ -45,18 +52,34 @@ public class BAMSchedule implements CloseableIterator<BAMScheduleEntry> {
|
|||
/**
|
||||
* File in which to store schedule data.
|
||||
*/
|
||||
private final File scheduleFile;
|
||||
private File scheduleFile;
|
||||
|
||||
/**
|
||||
* File channel for the schedule file.
|
||||
*/
|
||||
private final FileChannel scheduleFileChannel;
|
||||
private FileChannel scheduleFileChannel;
|
||||
|
||||
/**
|
||||
* The definitive, sorted list of reader IDs. Order is important here: the order
|
||||
* in which the reader IDs are presented here maps to the order in which they appear in the file.
|
||||
*/
|
||||
private final List<SAMReaderID> readerIDs = new ArrayList<SAMReaderID>();
|
||||
|
||||
/**
|
||||
* Iterators over the schedule. Stored in the same order as readerIDs, above.
|
||||
*/
|
||||
private final List<PeekableIterator<BAMScheduleEntry>> scheduleIterators = new ArrayList<PeekableIterator<BAMScheduleEntry>>();
|
||||
|
||||
/**
|
||||
* Next schedule entry to be returned. Null if no additional entries are present.
|
||||
*/
|
||||
private BAMScheduleEntry nextScheduleEntry;
|
||||
|
||||
/**
|
||||
* Reference sequence for which to write the schedule.
|
||||
*/
|
||||
private final int referenceSequence;
|
||||
|
||||
/**
|
||||
* Sizes of ints and longs in bytes.
|
||||
*/
|
||||
|
|
@ -65,52 +88,77 @@ public class BAMSchedule implements CloseableIterator<BAMScheduleEntry> {
|
|||
|
||||
/**
|
||||
* Create a new BAM schedule based on the given index.
|
||||
* @param index index to convert to a schedule.
|
||||
* @param indices Mapping from readers to indices.
|
||||
* @param intervals List of
|
||||
*/
|
||||
public BAMSchedule(final GATKBAMIndex index, final int referenceSequence) {
|
||||
try {
|
||||
scheduleFile = File.createTempFile("bamschedule."+referenceSequence,null);
|
||||
scheduleFileChannel = new RandomAccessFile(scheduleFile,"rw").getChannel();
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to create BAM schedule file.",ex);
|
||||
}
|
||||
scheduleFile.deleteOnExit();
|
||||
public BAMSchedule(final Map<SAMReaderID,GATKBAMIndex> indices, final List<GenomeLoc> intervals) {
|
||||
if(intervals.isEmpty())
|
||||
throw new ReviewedStingException("Tried to write schedule for empty interval list.");
|
||||
|
||||
int currentBinInLowestLevel = GATKBAMIndex.getFirstBinInLevel(GATKBAMIndex.getNumIndexLevels()-1) - 1;
|
||||
while(++currentBinInLowestLevel < GATKBAMIndex.MAX_BINS) {
|
||||
BAMScheduleEntry scheduleEntry = BAMScheduleEntry.query(index,referenceSequence,currentBinInLowestLevel);
|
||||
if(scheduleEntry.fileSpan.isEmpty())
|
||||
continue;
|
||||
referenceSequence = intervals.get(0).getContigIndex();
|
||||
|
||||
// File format is binary in little endian; start of region, end of region, num chunks, then the chunks themselves.
|
||||
ByteBuffer buffer = ByteBuffer.allocateDirect(2*INT_SIZE_IN_BYTES + INT_SIZE_IN_BYTES + scheduleEntry.fileSpan.getGATKChunks().size()*LONG_SIZE_IN_BYTES*2);
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
buffer.putInt(scheduleEntry.start);
|
||||
buffer.putInt(scheduleEntry.stop);
|
||||
buffer.putInt(scheduleEntry.fileSpan.getGATKChunks().size());
|
||||
for(GATKChunk chunk: scheduleEntry.fileSpan.getGATKChunks()) {
|
||||
buffer.putLong(chunk.getChunkStart());
|
||||
buffer.putLong(chunk.getChunkEnd());
|
||||
createScheduleFile();
|
||||
|
||||
readerIDs.addAll(indices.keySet());
|
||||
|
||||
for(final SAMReaderID reader: readerIDs) {
|
||||
GATKBAMIndex index = indices.get(reader);
|
||||
|
||||
int currentBinInLowestLevel = GATKBAMIndex.getFirstBinInLevel(GATKBAMIndex.getNumIndexLevels()-1);
|
||||
Iterator<GenomeLoc> locusIterator = intervals.iterator();
|
||||
GenomeLoc currentLocus = locusIterator.next();
|
||||
|
||||
final long readerStartOffset = position();
|
||||
|
||||
int maxChunkCount = 0;
|
||||
|
||||
while(currentBinInLowestLevel < GATKBAMIndex.MAX_BINS && currentLocus != null) {
|
||||
final Bin bin = new Bin(referenceSequence,currentBinInLowestLevel);
|
||||
final int binStart = index.getFirstLocusInBin(bin);
|
||||
final int binStop = index.getLastLocusInBin(bin);
|
||||
|
||||
// In required, pull bin iterator ahead to the point of the next GenomeLoc.
|
||||
if(binStop < currentLocus.getStart()) {
|
||||
currentBinInLowestLevel++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// At this point, the bin stop is guaranteed to be >= the start of the locus.
|
||||
// If the bins have gone past the current locus, update the current locus if at all possible.
|
||||
if(binStart > currentLocus.getStop()) {
|
||||
currentLocus = locusIterator.hasNext() ? locusIterator.next() : null;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Code at this point knows that the current bin is neither before nor after the current locus,
|
||||
// so it must overlap. Add this region to the filesystem.
|
||||
final GATKBAMFileSpan fileSpan = index.getSpanOverlapping(bin);
|
||||
|
||||
if(!fileSpan.isEmpty()) {
|
||||
// File format is binary in little endian; start of region, end of region, num chunks, then the chunks themselves.
|
||||
ByteBuffer buffer = allocateByteBuffer(2*INT_SIZE_IN_BYTES + INT_SIZE_IN_BYTES + fileSpan.getGATKChunks().size()*LONG_SIZE_IN_BYTES*2);
|
||||
buffer.putInt(binStart);
|
||||
buffer.putInt(binStop);
|
||||
buffer.putInt(fileSpan.getGATKChunks().size());
|
||||
for(GATKChunk chunk: fileSpan.getGATKChunks()) {
|
||||
buffer.putLong(chunk.getChunkStart());
|
||||
buffer.putLong(chunk.getChunkEnd());
|
||||
}
|
||||
maxChunkCount = Math.max(maxChunkCount,fileSpan.getGATKChunks().size());
|
||||
|
||||
// Prepare buffer for writing
|
||||
buffer.flip();
|
||||
|
||||
// And write.
|
||||
write(buffer);
|
||||
}
|
||||
|
||||
currentBinInLowestLevel++;
|
||||
}
|
||||
|
||||
// Prepare buffer for writing
|
||||
buffer.flip();
|
||||
final long readerStopOffset = position();
|
||||
|
||||
try {
|
||||
scheduleFileChannel.write(buffer);
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to create BAM schedule file.",ex);
|
||||
}
|
||||
}
|
||||
|
||||
// Move file pointer back to the start.
|
||||
try {
|
||||
scheduleFileChannel.position(0L);
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to rewind BAM schedule file.",ex);
|
||||
scheduleIterators.add(new PeekableIterator<BAMScheduleEntry>(new BAMScheduleIterator(reader,readerStartOffset,readerStopOffset,maxChunkCount)));
|
||||
}
|
||||
|
||||
advance();
|
||||
|
|
@ -155,52 +203,225 @@ public class BAMSchedule implements CloseableIterator<BAMScheduleEntry> {
|
|||
private void advance() {
|
||||
nextScheduleEntry = null;
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.allocateDirect(2*INT_SIZE_IN_BYTES+INT_SIZE_IN_BYTES);
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
int results;
|
||||
BitSet selectedIterators = new BitSet(readerIDs.size());
|
||||
int currentStart = Integer.MAX_VALUE;
|
||||
int currentStop = Integer.MAX_VALUE;
|
||||
|
||||
try {
|
||||
results = scheduleFileChannel.read(buffer);
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to read start, stop and chunk sizes from schedule file channel.",ex);
|
||||
// Select every iterator whose next element is the lowest element in the list.
|
||||
for(int reader = 0; reader < scheduleIterators.size(); reader++) {
|
||||
PeekableIterator<BAMScheduleEntry> scheduleIterator = scheduleIterators.get(reader);
|
||||
if(!scheduleIterator.hasNext())
|
||||
continue;
|
||||
|
||||
// If the iterator starts after this one, skip over it.
|
||||
if(scheduleIterator.peek().start > currentStart)
|
||||
continue;
|
||||
|
||||
// If the iterator starts at the same point as this one, add it to the list.
|
||||
if(scheduleIterator.peek().start == currentStart) {
|
||||
selectedIterators.set(reader);
|
||||
currentStop = Math.min(scheduleIterator.peek().stop,currentStop);
|
||||
continue;
|
||||
}
|
||||
|
||||
// If the iterator is less than anything seen before it, purge the selections and make this one current.
|
||||
if(scheduleIterator.peek().start < currentStart) {
|
||||
selectedIterators.clear();
|
||||
selectedIterators.set(reader);
|
||||
currentStart = scheduleIterator.peek().start;
|
||||
currentStop = scheduleIterator.peek().stop;
|
||||
}
|
||||
}
|
||||
|
||||
// No more data to read.
|
||||
if(results <= 0)
|
||||
// Out of iterators? Abort early.
|
||||
if(selectedIterators.isEmpty())
|
||||
return;
|
||||
|
||||
// Reorient buffer for reading.
|
||||
buffer.flip();
|
||||
BAMScheduleEntry mergedScheduleEntry = new BAMScheduleEntry(currentStart,currentStop);
|
||||
for (int reader = selectedIterators.nextSetBit(0); reader >= 0; reader = selectedIterators.nextSetBit(reader+1)) {
|
||||
PeekableIterator<BAMScheduleEntry> scheduleIterator = scheduleIterators.get(reader);
|
||||
BAMScheduleEntry individualScheduleEntry = scheduleIterator.peek();
|
||||
mergedScheduleEntry.mergeInto(individualScheduleEntry);
|
||||
|
||||
final int start = buffer.getInt();
|
||||
final int stop = buffer.getInt();
|
||||
final int numChunks = buffer.getInt();
|
||||
|
||||
GATKChunk[] chunks = new GATKChunk[numChunks];
|
||||
buffer = ByteBuffer.allocateDirect(numChunks * 2 * LONG_SIZE_IN_BYTES);
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
||||
try {
|
||||
scheduleFileChannel.read(buffer);
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to read chunk data from schedule file channel.",ex);
|
||||
// If the schedule iterator ends after this entry, consume it.
|
||||
if(individualScheduleEntry.stop <= currentStop)
|
||||
scheduleIterator.next();
|
||||
}
|
||||
|
||||
// Reposition for reading.
|
||||
buffer.flip();
|
||||
|
||||
// Read out chunk data.
|
||||
for(int i = 0; i < numChunks; i++)
|
||||
chunks[i] = new GATKChunk(buffer.getLong(),buffer.getLong());
|
||||
|
||||
// Prep the iterator for the next schedule entry.
|
||||
nextScheduleEntry = new BAMScheduleEntry(start,stop,new GATKBAMFileSpan(chunks));
|
||||
nextScheduleEntry = mergedScheduleEntry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() { throw new UnsupportedOperationException("Unable to remove from a schedule iterator."); }
|
||||
|
||||
/**
|
||||
* Create a new schedule file, containing schedule information for all BAM files being dynamically merged.
|
||||
*/
|
||||
private void createScheduleFile() {
|
||||
try {
|
||||
scheduleFile = File.createTempFile("bamschedule."+referenceSequence,null);
|
||||
scheduleFileChannel = new RandomAccessFile(scheduleFile,"rw").getChannel();
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to create BAM schedule file.",ex);
|
||||
}
|
||||
scheduleFile.deleteOnExit();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new byte buffer of the given size.
|
||||
* @param size the size of buffer to allocate.
|
||||
* @return Newly allocated byte buffer.
|
||||
*/
|
||||
private ByteBuffer allocateByteBuffer(final int size) {
|
||||
ByteBuffer buffer = ByteBuffer.allocateDirect(size);
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the contents at the current position on disk into the given buffer.
|
||||
* @param buffer buffer to fill.
|
||||
*/
|
||||
private int read(final ByteBuffer buffer) {
|
||||
try {
|
||||
return scheduleFileChannel.read(buffer);
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to read data from BAM schedule file..",ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void write(final ByteBuffer buffer) {
|
||||
try {
|
||||
scheduleFileChannel.write(buffer);
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to write data to BAM schedule file.",ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the current position from the file channel.
|
||||
* @return Current position within file channel.
|
||||
*/
|
||||
private long position() {
|
||||
try {
|
||||
return scheduleFileChannel.position();
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to retrieve position of BAM schedule file.",ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reposition the file channel to the specified offset wrt the start of the file.
|
||||
* @param position The position.
|
||||
*/
|
||||
private void position(final long position) {
|
||||
try {
|
||||
scheduleFileChannel.position(position);
|
||||
}
|
||||
catch(IOException ex) {
|
||||
throw new ReviewedStingException("Unable to position BAM schedule file.",ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An iterator over the schedule for a single BAM file.
|
||||
*/
|
||||
private class BAMScheduleIterator implements Iterator<BAMScheduleEntry> {
|
||||
/**
|
||||
* ID of the reader associated with the given schedule.
|
||||
*/
|
||||
private final SAMReaderID reader;
|
||||
|
||||
/**
|
||||
* Current position in the file.
|
||||
*/
|
||||
private long currentPosition;
|
||||
|
||||
/**
|
||||
* Stopping file position of last bin in file for this reader, exclusive.
|
||||
*/
|
||||
private final long stopPosition;
|
||||
|
||||
/**
|
||||
* Byte buffer used to store BAM header info.
|
||||
*/
|
||||
private final ByteBuffer binHeader;
|
||||
|
||||
/**
|
||||
* Byte buffer used to store chunk data.
|
||||
*/
|
||||
private final ByteBuffer chunkData;
|
||||
|
||||
public BAMScheduleIterator(final SAMReaderID reader, final long startPosition, final long stopPosition, final int maxChunkCount) {
|
||||
this.reader = reader;
|
||||
this.currentPosition = startPosition;
|
||||
this.stopPosition = stopPosition;
|
||||
binHeader = allocateByteBuffer(INT_SIZE_IN_BYTES*3);
|
||||
chunkData = allocateByteBuffer(maxChunkCount*LONG_SIZE_IN_BYTES*2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return currentPosition < stopPosition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BAMScheduleEntry next() {
|
||||
position(currentPosition);
|
||||
|
||||
// Read data.
|
||||
read(binHeader);
|
||||
|
||||
// Decode contents.
|
||||
binHeader.flip();
|
||||
final int start = binHeader.getInt();
|
||||
final int stop = binHeader.getInt();
|
||||
final int numChunks = binHeader.getInt();
|
||||
|
||||
// Prepare bin buffer for next read.
|
||||
binHeader.flip();
|
||||
|
||||
// Prepare a target buffer for chunks.
|
||||
GATKChunk[] chunks = new GATKChunk[numChunks];
|
||||
|
||||
// Read all chunk data.
|
||||
chunkData.limit(numChunks*LONG_SIZE_IN_BYTES*2);
|
||||
long bytesRead = read(chunkData);
|
||||
if(bytesRead != numChunks*LONG_SIZE_IN_BYTES*2)
|
||||
throw new ReviewedStingException("Unable to read all chunks from file");
|
||||
|
||||
// Prepare for reading.
|
||||
chunkData.flip();
|
||||
|
||||
for(int i = 0; i < numChunks; i++)
|
||||
chunks[i] = new GATKChunk(chunkData.getLong(),chunkData.getLong());
|
||||
|
||||
// Prepare chunk buffer for next read.
|
||||
chunkData.flip();
|
||||
|
||||
BAMScheduleEntry nextScheduleEntry = new BAMScheduleEntry(start,stop);
|
||||
nextScheduleEntry.addFileSpan(reader,new GATKBAMFileSpan(chunks));
|
||||
|
||||
// Reset the position of the iterator at the next contig.
|
||||
currentPosition = position();
|
||||
|
||||
return nextScheduleEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Not supported.
|
||||
*/
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Unable to remove from a BAMScheduleIterator");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -220,20 +441,33 @@ class BAMScheduleEntry {
|
|||
/**
|
||||
* The spans representing the given region.
|
||||
*/
|
||||
public final GATKBAMFileSpan fileSpan;
|
||||
public final Map<SAMReaderID,GATKBAMFileSpan> fileSpans = new HashMap<SAMReaderID,GATKBAMFileSpan>();
|
||||
|
||||
BAMScheduleEntry(final int start, final int stop, final GATKBAMFileSpan fileSpan) {
|
||||
BAMScheduleEntry(final int start, final int stop) {
|
||||
this.start = start;
|
||||
this.stop = stop;
|
||||
this.fileSpan = fileSpan;
|
||||
}
|
||||
|
||||
public static BAMScheduleEntry query(final GATKBAMIndex index, final int referenceSequence, final int binNumber) {
|
||||
final Bin bin = new Bin(referenceSequence,binNumber);
|
||||
final int start = index.getFirstLocusInBin(bin);
|
||||
final int stop = index.getLastLocusInBin(bin);
|
||||
final GATKBAMFileSpan fileSpan = index.getSpanOverlapping(bin);
|
||||
return new BAMScheduleEntry(start,stop,fileSpan);
|
||||
/**
|
||||
* Add a new file span to this schedule.
|
||||
* @param reader Reader associated with the span.
|
||||
* @param fileSpan Blocks to read in the given reader.
|
||||
*/
|
||||
public void addFileSpan(final SAMReaderID reader, final GATKBAMFileSpan fileSpan) {
|
||||
fileSpans.put(reader,fileSpan);
|
||||
}
|
||||
|
||||
/**
|
||||
* A naive merge operation. Merge the fileSpans in other into this, blowing up if conflicts are
|
||||
* detected. Completely ignores merging start and stop.
|
||||
* @param other Other schedule entry to merging into this one.
|
||||
*/
|
||||
public void mergeInto(final BAMScheduleEntry other) {
|
||||
final int thisSize = fileSpans.size();
|
||||
final int otherSize = other.fileSpans.size();
|
||||
fileSpans.putAll(other.fileSpans);
|
||||
if(fileSpans.size() != thisSize+otherSize)
|
||||
throw new ReviewedStingException("Unable to handle overlaps when merging BAM schedule entries.");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import net.sf.samtools.GATKBAMFileSpan;
|
||||
import net.sf.samtools.SAMFileSpan;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
|
||||
|
|
@ -65,11 +66,15 @@ class FilePointer {
|
|||
this(referenceSequence,null);
|
||||
}
|
||||
|
||||
public void addLocation(GenomeLoc location) {
|
||||
public void addLocation(final GenomeLoc location) {
|
||||
locations.add(location);
|
||||
}
|
||||
|
||||
public void addFileSpans(SAMReaderID id, SAMFileSpan fileSpan) {
|
||||
public void addFileSpans(final SAMReaderID id, final SAMFileSpan fileSpan) {
|
||||
this.fileSpans.put(id,fileSpan);
|
||||
}
|
||||
|
||||
public void addFileSpans(final Map<SAMReaderID, GATKBAMFileSpan> fileSpans) {
|
||||
this.fileSpans.putAll(fileSpans);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,8 +30,13 @@ import org.apache.log4j.Logger;
|
|||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
|
|
@ -43,16 +48,20 @@ public class LowMemoryIntervalSharder implements Iterator<FilePointer> {
|
|||
|
||||
private final SAMDataSource dataSource;
|
||||
|
||||
private final Map<SAMReaderID,GATKBAMIndex> indices = new HashMap<SAMReaderID,GATKBAMIndex>();
|
||||
|
||||
private FilePointer nextFilePointer = null;
|
||||
|
||||
private final GenomeLocSortedSet loci;
|
||||
|
||||
private final PeekableIterator<GenomeLoc> locusIterator;
|
||||
|
||||
private GenomeLoc currentLocus;
|
||||
|
||||
private FilePointer nextFilePointer = null;
|
||||
private GenomeLoc currentLocus;
|
||||
|
||||
public LowMemoryIntervalSharder(final SAMDataSource dataSource, final GenomeLocSortedSet loci) {
|
||||
this.dataSource = dataSource;
|
||||
for(SAMReaderID reader: dataSource.getReaderIDs())
|
||||
indices.put(reader,(GATKBAMIndex)dataSource.getIndex(reader));
|
||||
this.loci = loci;
|
||||
locusIterator = new PeekableIterator<GenomeLoc>(loci.iterator());
|
||||
if(locusIterator.hasNext())
|
||||
|
|
@ -97,28 +106,26 @@ public class LowMemoryIntervalSharder implements Iterator<FilePointer> {
|
|||
int coveredRegionStop = Integer.MAX_VALUE;
|
||||
GenomeLoc coveredRegion = null;
|
||||
|
||||
for(SAMReaderID reader: dataSource.getReaderIDs()) {
|
||||
BAMScheduleEntry scheduleEntry = getNextOverlappingBAMScheduleEntry(reader,(GATKBAMIndex)dataSource.getIndex(reader),currentLocus);
|
||||
BAMScheduleEntry scheduleEntry = getNextOverlappingBAMScheduleEntry(indices,currentLocus);
|
||||
|
||||
// No overlapping data at all.
|
||||
if(scheduleEntry != null) {
|
||||
coveredRegionStart = Math.max(coveredRegionStart,scheduleEntry.start);
|
||||
coveredRegionStop = Math.min(coveredRegionStop,scheduleEntry.stop);
|
||||
coveredRegion = loci.getGenomeLocParser().createGenomeLoc(currentLocus.getContig(),coveredRegionStart,coveredRegionStop);
|
||||
}
|
||||
|
||||
// Only a partial overlap, and the interval list precedes the bin. Force the bin tree to null.
|
||||
// TODO: the only reason to do this is to generate shards with no data that are placeholders for the interval list. Manage this externally.
|
||||
if(coveredRegion != null && currentLocus.startsBefore(coveredRegion))
|
||||
scheduleEntry = null;
|
||||
// No overlapping data at all.
|
||||
if(scheduleEntry != null) {
|
||||
coveredRegionStart = Math.max(coveredRegionStart,scheduleEntry.start);
|
||||
coveredRegionStop = Math.min(coveredRegionStop,scheduleEntry.stop);
|
||||
coveredRegion = loci.getGenomeLocParser().createGenomeLoc(currentLocus.getContig(),coveredRegionStart,coveredRegionStop);
|
||||
|
||||
// Always create a file span, whether there was covered data or not. If there was no covered data, then the binTree is empty.
|
||||
//System.out.printf("Shard: index file = %s; reference sequence = %d; ",index.getIndexFile(),currentLocus.getContigIndex());
|
||||
nextFilePointer.addFileSpans(reader,scheduleEntry != null ? scheduleEntry.fileSpan : new GATKBAMFileSpan());
|
||||
nextFilePointer.addFileSpans(scheduleEntry.fileSpans);
|
||||
}
|
||||
else {
|
||||
for(SAMReaderID reader: indices.keySet())
|
||||
nextFilePointer.addFileSpans(reader,new GATKBAMFileSpan());
|
||||
}
|
||||
|
||||
// Early exit if no bins were found.
|
||||
if(coveredRegion == null) {
|
||||
// for debugging only: maximum split is 16384.
|
||||
if(currentLocus.size() > 16384) {
|
||||
GenomeLoc[] splitContigs = currentLocus.split(currentLocus.getStart()+16384);
|
||||
nextFilePointer.addLocation(splitContigs[0]);
|
||||
|
|
@ -165,41 +172,51 @@ public class LowMemoryIntervalSharder implements Iterator<FilePointer> {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The last reference sequence processed by this iterator.
|
||||
*/
|
||||
private Map<SAMReaderID,Integer> lastReferenceSequenceLoaded = new HashMap<SAMReaderID,Integer>();
|
||||
private Integer lastReferenceSequenceLoaded = null;
|
||||
|
||||
/**
|
||||
* The stateful iterator used to progress through the genoem.
|
||||
*/
|
||||
private Map<SAMReaderID, PeekableIterator<BAMScheduleEntry>> bamScheduleIterators = new HashMap<SAMReaderID, PeekableIterator<BAMScheduleEntry>>();
|
||||
private PeekableIterator<BAMScheduleEntry> bamScheduleIterator = null;
|
||||
|
||||
/**
|
||||
* Get the next overlapping tree of bins associated with the given BAM file.
|
||||
* @param index BAM index representation.
|
||||
* @param locus Locus for which to grab the bin tree, if available.
|
||||
* @return The BinTree overlapping the given locus.
|
||||
* @param indices BAM index representation.
|
||||
* @param currentLocus The actual locus for which to check overlap.
|
||||
* @return The next schedule entry overlapping with the given list of loci.
|
||||
*/
|
||||
private BAMScheduleEntry getNextOverlappingBAMScheduleEntry(final SAMReaderID reader, final GATKBAMIndex index, final GenomeLoc locus) {
|
||||
private BAMScheduleEntry getNextOverlappingBAMScheduleEntry(final Map<SAMReaderID,GATKBAMIndex> indices, final GenomeLoc currentLocus) {
|
||||
// Stale reference sequence or first invocation. (Re)create the binTreeIterator.
|
||||
if(!lastReferenceSequenceLoaded.containsKey(reader) || lastReferenceSequenceLoaded.get(reader) != locus.getContigIndex()) {
|
||||
if(bamScheduleIterators.containsKey(reader))
|
||||
bamScheduleIterators.get(reader).close();
|
||||
lastReferenceSequenceLoaded.put(reader,locus.getContigIndex());
|
||||
bamScheduleIterators.put(reader,new PeekableIterator<BAMScheduleEntry>(new BAMSchedule(index, locus.getContigIndex())));
|
||||
if(lastReferenceSequenceLoaded == null || lastReferenceSequenceLoaded != currentLocus.getContigIndex()) {
|
||||
if(bamScheduleIterator != null)
|
||||
bamScheduleIterator.close();
|
||||
lastReferenceSequenceLoaded = currentLocus.getContigIndex();
|
||||
|
||||
// Naive algorithm: find all elements in current contig for proper schedule creation.
|
||||
List<GenomeLoc> lociInContig = new LinkedList<GenomeLoc>();
|
||||
for(GenomeLoc locus: loci) {
|
||||
if(locus.getContigIndex() == lastReferenceSequenceLoaded)
|
||||
lociInContig.add(locus);
|
||||
}
|
||||
|
||||
bamScheduleIterator = new PeekableIterator<BAMScheduleEntry>(new BAMSchedule(indices,lociInContig));
|
||||
}
|
||||
|
||||
PeekableIterator<BAMScheduleEntry> bamScheduleIterator = bamScheduleIterators.get(reader);
|
||||
if(!bamScheduleIterator.hasNext())
|
||||
return null;
|
||||
|
||||
// Peek the iterator along until finding the first binTree at or following the current locus.
|
||||
BAMScheduleEntry bamScheduleEntry = bamScheduleIterator.peek();
|
||||
while(bamScheduleEntry != null && bamScheduleEntry.isBefore(locus)) {
|
||||
while(bamScheduleEntry != null && bamScheduleEntry.isBefore(currentLocus)) {
|
||||
bamScheduleIterator.next();
|
||||
bamScheduleEntry = bamScheduleIterator.hasNext() ? bamScheduleIterator.peek() : null;
|
||||
}
|
||||
|
||||
return (bamScheduleEntry != null && bamScheduleEntry.overlaps(locus)) ? bamScheduleEntry : null;
|
||||
return (bamScheduleEntry != null && bamScheduleEntry.overlaps(currentLocus)) ? bamScheduleEntry : null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue