Very crude implementation of writing BAM 'schedules' to disk rather that 'meta-

indexes'.  Not yet elegant, but proves that it circumvents the performance
issues associated with the meta-index.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5454 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2011-03-16 21:48:47 +00:00
parent 8d0880d33e
commit 28a5a177ce
8 changed files with 329 additions and 342 deletions

View File

@ -210,7 +210,7 @@ public class GenomeAnalysisEngine {
// create the output streams "
initializeOutputStreams(microScheduler.getOutputTracker());
ShardStrategy shardStrategy = getShardStrategy(microScheduler.getReference());
ShardStrategy shardStrategy = getShardStrategy(readsDataSource,microScheduler.getReference(),intervals);
// execute the microscheduler, storing the results
Object result = microScheduler.execute(this.walker, shardStrategy);
@ -376,9 +376,7 @@ public class GenomeAnalysisEngine {
* @param drivingDataSource Data on which to shard.
* @return the sharding strategy
*/
protected ShardStrategy getShardStrategy(ReferenceSequenceFile drivingDataSource) {
GenomeLocSortedSet intervals = this.getIntervals();
SAMDataSource readsDataSource = this.getReadsDataSource();
protected ShardStrategy getShardStrategy(SAMDataSource readsDataSource, ReferenceSequenceFile drivingDataSource, GenomeLocSortedSet intervals) {
ValidationExclusion exclusions = (readsDataSource != null ? readsDataSource.getReadsInfo().getValidationExclusionList() : null);
ReferenceDataSource referenceDataSource = this.getReferenceDataSource();
// Use monolithic sharding if no index is present. Monolithic sharding is always required for the original
@ -673,7 +671,7 @@ public class GenomeAnalysisEngine {
setReferenceDataSource(argCollection.referenceFile);
validateSuppliedReads();
readsDataSource = createReadsDataSource(genomeLocParser, referenceDataSource.getReference());
readsDataSource = createReadsDataSource(argCollection,genomeLocParser,referenceDataSource.getReference());
sampleDataSource = new SampleDataSource(getSAMFileHeader(), argCollection.sampleFiles);
@ -840,16 +838,13 @@ public class GenomeAnalysisEngine {
*
* @return A data source for the given set of reads.
*/
private SAMDataSource createReadsDataSource(GenomeLocParser genomeLocParser, IndexedFastaSequenceFile refReader) {
private SAMDataSource createReadsDataSource(GATKArgumentCollection argCollection, GenomeLocParser genomeLocParser, IndexedFastaSequenceFile refReader) {
DownsamplingMethod method = getDownsamplingMethod();
if ( getWalkerBAQApplicationTime() == BAQ.ApplicationTime.FORBIDDEN && argCollection.BAQMode != BAQ.CalculationMode.OFF)
throw new UserException.BadArgumentValue("baq", "Walker cannot accept BAQ'd base qualities, and yet BAQ mode " + argCollection.BAQMode + " was requested.");
// TEMPORARY: Force low-memory sharding to be available.
SAMDataSource.enableLowMemorySharding(argCollection.enableLowMemorySharding);
return new SAMDataSource(
SAMDataSource dataSource = new SAMDataSource(
samReaderIDs,
genomeLocParser,
argCollection.useOriginalBaseQualities,
@ -863,7 +858,8 @@ public class GenomeAnalysisEngine {
getWalkerBAQApplicationTime() == BAQ.ApplicationTime.ON_INPUT ? argCollection.BAQMode : BAQ.CalculationMode.OFF,
getWalkerBAQQualityMode(),
refReader,
argCollection.defaultBaseQualities);
argCollection.defaultBaseQualities,argCollection.enableLowMemorySharding);
return dataSource;
}
/**

View File

@ -0,0 +1,256 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.samtools.Bin;
import net.sf.samtools.GATKBAMFileSpan;
import net.sf.samtools.GATKChunk;
import net.sf.samtools.util.CloseableIterator;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
/**
* Writes schedules for a single BAM file to a target output file.
*/
public class BAMSchedule implements CloseableIterator<BAMScheduleEntry> {
/**
* File in which to store schedule data.
*/
private final File scheduleFile;
/**
* File channel for the schedule file.
*/
private final FileChannel scheduleFileChannel;
/**
* Next schedule entry to be returned. Null if no additional entries are present.
*/
private BAMScheduleEntry nextScheduleEntry;
/**
* Sizes of ints and longs in bytes.
*/
private static final int INT_SIZE_IN_BYTES = Integer.SIZE / 8;
private static final int LONG_SIZE_IN_BYTES = Long.SIZE / 8;
/**
* Create a new BAM schedule based on the given index.
* @param index index to convert to a schedule.
*/
public BAMSchedule(final GATKBAMIndex index, final int referenceSequence) {
try {
scheduleFile = File.createTempFile("bamschedule."+referenceSequence,null);
scheduleFileChannel = new RandomAccessFile(scheduleFile,"rw").getChannel();
}
catch(IOException ex) {
throw new ReviewedStingException("Unable to create BAM schedule file.",ex);
}
scheduleFile.deleteOnExit();
int currentBinInLowestLevel = GATKBAMIndex.getFirstBinInLevel(GATKBAMIndex.getNumIndexLevels()-1) - 1;
while(++currentBinInLowestLevel < GATKBAMIndex.MAX_BINS) {
BAMScheduleEntry scheduleEntry = BAMScheduleEntry.query(index,referenceSequence,currentBinInLowestLevel);
if(scheduleEntry.fileSpan.isEmpty())
continue;
// File format is binary in little endian; start of region, end of region, num chunks, then the chunks themselves.
ByteBuffer buffer = ByteBuffer.allocateDirect(2*INT_SIZE_IN_BYTES + INT_SIZE_IN_BYTES + scheduleEntry.fileSpan.getGATKChunks().size()*LONG_SIZE_IN_BYTES*2);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putInt(scheduleEntry.start);
buffer.putInt(scheduleEntry.stop);
buffer.putInt(scheduleEntry.fileSpan.getGATKChunks().size());
for(GATKChunk chunk: scheduleEntry.fileSpan.getGATKChunks()) {
buffer.putLong(chunk.getChunkStart());
buffer.putLong(chunk.getChunkEnd());
}
// Prepare buffer for writing
buffer.flip();
try {
scheduleFileChannel.write(buffer);
}
catch(IOException ex) {
throw new ReviewedStingException("Unable to create BAM schedule file.",ex);
}
}
// Move file pointer back to the start.
try {
scheduleFileChannel.position(0L);
}
catch(IOException ex) {
throw new ReviewedStingException("Unable to rewind BAM schedule file.",ex);
}
advance();
}
/**
* Determine whether more ScheduleEntries are present in the iterator.
* @return Next schedule entry to parse.
*/
@Override
public boolean hasNext() {
return nextScheduleEntry != null;
}
/**
* Retrieve the next schedule entry in the list.
* @return next schedule entry in the queue.
*/
@Override
public BAMScheduleEntry next() {
BAMScheduleEntry currentScheduleEntry = nextScheduleEntry;
advance();
return currentScheduleEntry;
}
/**
* Close down and delete the file.
*/
@Override
public void close() {
try {
scheduleFileChannel.close();
}
catch(IOException ex) {
throw new ReviewedStingException("Unable to close schedule file.");
}
}
/**
* Advance to the next schedule entry.
*/
private void advance() {
nextScheduleEntry = null;
ByteBuffer buffer = ByteBuffer.allocateDirect(2*INT_SIZE_IN_BYTES+INT_SIZE_IN_BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
int results;
try {
results = scheduleFileChannel.read(buffer);
}
catch(IOException ex) {
throw new ReviewedStingException("Unable to read start, stop and chunk sizes from schedule file channel.",ex);
}
// No more data to read.
if(results <= 0)
return;
// Reorient buffer for reading.
buffer.flip();
final int start = buffer.getInt();
final int stop = buffer.getInt();
final int numChunks = buffer.getInt();
GATKChunk[] chunks = new GATKChunk[numChunks];
buffer = ByteBuffer.allocateDirect(numChunks * 2 * LONG_SIZE_IN_BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
try {
scheduleFileChannel.read(buffer);
}
catch(IOException ex) {
throw new ReviewedStingException("Unable to read chunk data from schedule file channel.",ex);
}
// Reposition for reading.
buffer.flip();
// Read out chunk data.
for(int i = 0; i < numChunks; i++)
chunks[i] = new GATKChunk(buffer.getLong(),buffer.getLong());
// Prep the iterator for the next schedule entry.
nextScheduleEntry = new BAMScheduleEntry(start,stop,new GATKBAMFileSpan(chunks));
}
@Override
public void remove() { throw new UnsupportedOperationException("Unable to remove from a schedule iterator."); }
}
/**
* A single proto-shard to be processed.
*/
class BAMScheduleEntry {
/**
* Starting position for the genomic entry.
*/
public final int start;
/**
* Ending position for the genomic entry.
*/
public final int stop;
/**
* The spans representing the given region.
*/
public final GATKBAMFileSpan fileSpan;
BAMScheduleEntry(final int start, final int stop, final GATKBAMFileSpan fileSpan) {
this.start = start;
this.stop = stop;
this.fileSpan = fileSpan;
}
public static BAMScheduleEntry query(final GATKBAMIndex index, final int referenceSequence, final int binNumber) {
final Bin bin = new Bin(referenceSequence,binNumber);
final int start = index.getFirstLocusInBin(bin);
final int stop = index.getLastLocusInBin(bin);
final GATKBAMFileSpan fileSpan = index.getSpanOverlapping(bin);
return new BAMScheduleEntry(start,stop,fileSpan);
}
/**
* Returns true if the location of this bin tree is before the given position.
* @param locus Locus to test.
* @return True if this bin sits completely before the given locus; false otherwise.
*/
public boolean isBefore(final GenomeLoc locus) {
return stop < locus.getStart();
}
/**
* Checks overlap between this bin tree and other bin trees.
* @param position the position over which to detect overlap.
* @return True if the segment overlaps. False otherwise.
*/
public boolean overlaps(final GenomeLoc position) {
return !(position.getStop() < start || position.getStart() > stop);
}
}

View File

@ -1,255 +0,0 @@
/*
* Copyright (c) 2011, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.picard.util.PeekableIterator;
import net.sf.samtools.Bin;
import net.sf.samtools.GATKBin;
import org.broadinstitute.sting.utils.GenomeLoc;
import java.io.File;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Represents a tree of overlapping bins in a single
* BAM index.
*/
public class BinTree {
/**
* The bins in this tree, organized by level.
*/
private final GATKBin[] bins;
/**
* Starting location of the bin tree.
*/
private final int binTreeStart;
/**
* Ending location of the bin tree.
*/
private final int binTreeStop;
/**
* Linear index entry associated with this location.
*/
private final long linearIndexEntry;
public BinTree(final int binTreeStart, final int binTreeStop,final GATKBin[] bins, final long linearIndexEntry) {
this.binTreeStart = binTreeStart;
this.binTreeStop = binTreeStop;
this.bins = bins;
this.linearIndexEntry = linearIndexEntry;
}
/**
* Retrieve the bins from the bin tree.
* @return list of bins.
*/
public GATKBin[] getBins() {
return bins;
}
/**
* Gets the number of bins in a given bin list.
* @return Number of bins in the list.
*/
public int size() {
return bins.length;
}
/**
* Returns the start of the region covered by this bin tree.
* @return Start of the region covered by this bin tree.
*/
public int getStart() {
return binTreeStart;
}
/**
* Returns the end of the region covered by this bin tree.
* @return End of the region covered by this bin tree.
*/
public int getStop() {
return binTreeStop;
}
/**
* The linear index entry associated with this bin tree.
* @return Linear index entry.
*/
public long getLinearIndexEntry() {
return linearIndexEntry;
}
/**
* Returns true if the location of this bin tree is before the given position.
* @param locus Locus to test.
* @return True if this bin sits completely before the given locus; false otherwise.
*/
public boolean isBefore(final GenomeLoc locus) {
return binTreeStop < locus.getStart();
}
/**
* Checks overlap between this bin tree and other bin trees.
* @param position the position over which to detect overlap.
* @return True if the segment overlaps. False otherwise.
*/
public boolean overlaps(final GenomeLoc position) {
for(GATKBin gatkBin: bins) {
if(gatkBin == null)
continue;
Bin bin = new Bin(gatkBin.getReferenceSequence(),gatkBin.getBinNumber());
// Overlap occurs when the position is not disjoint with the bin boundaries.
if(!(position.getStop() < binTreeStart || position.getStart() > binTreeStop))
return true;
}
return false;
}
}
/**
* Iterate through all bin trees in sequence, from those covering base 1 to those covering MAX_BINS.
*/
class BinTreeIterator implements Iterator<BinTree> {
/**
* The index over which to iterate.
*/
private final GATKBAMIndex index;
/**
* Master iterator over the BAM index.
*/
private final BAMIndexBinIterator binIterator;
/**
* Iterators over each individual level.
*/
private final PeekableIterator<GATKBin>[] levelIterators;
/**
* The next bin tree to be returned.
*/
private BinTree nextBinTree;
/**
* Each iteration through the bin tree has a corresponding lowest level. Make sure
* every lowest-level bin is covered, whether that bin is present or not.
*/
private int currentBinInLowestLevel;
public BinTreeIterator(final GATKBAMIndex index, final File indexFile, final int referenceSequence) {
this.index = index;
binIterator = new BAMIndexBinIterator(index,indexFile,referenceSequence);
levelIterators = new PeekableIterator[GATKBAMIndex.getNumIndexLevels()];
for(int level = 0; level < GATKBAMIndex.getNumIndexLevels(); level++)
levelIterators[level] = new PeekableIterator<GATKBin>(binIterator.getIteratorOverLevel(level));
// Set the current bin to one less that the first bin in the sequence. advance() will push it
// ahead to the first bin in the lowest level.
currentBinInLowestLevel = GATKBAMIndex.getFirstBinInLevel(GATKBAMIndex.getNumIndexLevels()-1) - 1;
advance();
}
public void close() {
for(PeekableIterator<GATKBin> levelIterator: levelIterators)
levelIterator.close();
}
public boolean hasNext() {
return nextBinTree != null;
}
/**
* Return the next BinTree in the level.
* @return Next BinTree in sequence.
*/
public BinTree next() {
if(!hasNext())
throw new NoSuchElementException("BinTreeIterator is out of elements");
BinTree currentBinTree = nextBinTree;
advance();
return currentBinTree;
}
/**
* Bring the bin tree ahead to the next overlapping structure.
*/
private void advance() {
final int lowestLevel = GATKBAMIndex.getNumIndexLevels()-1;
final int firstBinInLowestLevel = GATKBAMIndex.getFirstBinInLevel(lowestLevel);
final int binsInLowestLevel = index.getLevelSize(lowestLevel);
GATKBin[] bins = new GATKBin[GATKBAMIndex.getNumIndexLevels()];
nextBinTree = null;
while(nextBinTree == null) {
currentBinInLowestLevel++;
boolean levelIteratorsExhausted = true;
for(int level = lowestLevel; level >= 0; level--) {
if(!levelIterators[level].hasNext())
continue;
levelIteratorsExhausted = false;
final int firstBinInThisLevel = GATKBAMIndex.getFirstBinInLevel(level);
final int binsInThisLevel = index.getLevelSize(level);
final int currentBinInThisLevel = ((currentBinInLowestLevel-firstBinInLowestLevel)*binsInThisLevel/binsInLowestLevel) + firstBinInThisLevel;
while(levelIterators[level].hasNext() && levelIterators[level].peek().getBinNumber() < currentBinInThisLevel)
levelIterators[level].next();
if(levelIterators[level].hasNext() && levelIterators[level].peek().getBinNumber() == currentBinInThisLevel)
bins[level] = levelIterators[level].peek();
}
// No more bins available for this reference sequence? Break out of the loop.
if(levelIteratorsExhausted)
break;
// Found a compelling bin tree? Break out of the loop.
for(int level = 0; level <= lowestLevel; level++) {
if(bins[level] != null) {
Bin lowestLevelBin = new Bin(bins[level].getReferenceSequence(),currentBinInLowestLevel);
final int firstLocusInBin = index.getFirstLocusInBin(lowestLevelBin);
final int lastLocusInBin = index.getLastLocusInBin(lowestLevelBin);
final long linearIndexEntry = binIterator.getLinearIndexEntry(firstLocusInBin);
nextBinTree = new BinTree(firstLocusInBin,lastLocusInBin,bins,linearIndexEntry);
break;
}
}
}
}
/**
* Remove unsupported.
*/
public void remove() {
throw new UnsupportedOperationException("Cannot remove elements from a BinTreeIterator");
}
}

View File

@ -24,6 +24,7 @@
package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.samtools.GATKBAMFileSpan;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
@ -59,7 +60,7 @@ public class LocusShardStrategy implements ShardStrategy {
* @param reads Data source from which to load index data.
* @param locations List of locations for which to load data.
*/
LocusShardStrategy(SAMDataSource reads, IndexedFastaSequenceFile reference, GenomeLocParser genomeLocParser, GenomeLocSortedSet locations) {
public LocusShardStrategy(SAMDataSource reads, IndexedFastaSequenceFile reference, GenomeLocParser genomeLocParser, GenomeLocSortedSet locations) {
this.reads = reads;
this.genomeLocParser = genomeLocParser;
@ -83,8 +84,16 @@ public class LocusShardStrategy implements ShardStrategy {
else
intervals = locations;
if(SAMDataSource.isLowMemoryShardingEnabled())
if(reads.isLowMemoryShardingEnabled()) {
/*
Iterator<FilePointer> filePointerIterator = new LowMemoryIntervalSharder(this.reads,intervals);
List<FilePointer> filePointers = new ArrayList<FilePointer>();
while(filePointerIterator.hasNext())
filePointers.add(filePointerIterator.next());
this.filePointerIterator = filePointers.iterator();
*/
this.filePointerIterator = new LowMemoryIntervalSharder(this.reads,intervals);
}
else
this.filePointerIterator = IntervalSharder.shardIntervals(this.reads,intervals);
}
@ -122,6 +131,8 @@ public class LocusShardStrategy implements ShardStrategy {
return filePointerIterator.hasNext();
}
public long shardNumber = 0;
/**
* gets the next Shard
*
@ -130,6 +141,23 @@ public class LocusShardStrategy implements ShardStrategy {
public LocusShard next() {
FilePointer nextFilePointer = filePointerIterator.next();
Map<SAMReaderID,SAMFileSpan> fileSpansBounding = nextFilePointer.fileSpans != null ? nextFilePointer.fileSpans : null;
/*
System.out.printf("Shard %d: interval = {",++shardNumber);
for(GenomeLoc locus: nextFilePointer.locations)
System.out.printf("%s;",locus);
System.out.printf("}; ");
if(fileSpansBounding == null)
System.out.printf("no shard data%n");
else {
SortedMap<SAMReaderID,SAMFileSpan> sortedSpans = new TreeMap<SAMReaderID,SAMFileSpan>(fileSpansBounding);
for(Map.Entry<SAMReaderID,SAMFileSpan> entry: sortedSpans.entrySet()) {
System.out.printf("Shard %d:%s = {%s}%n",shardNumber,entry.getKey().samFile,entry.getValue());
}
}
*/
return new LocusShard(genomeLocParser, reads,nextFilePointer.locations,fileSpansBounding);
}

View File

@ -26,16 +26,12 @@ package org.broadinstitute.sting.gatk.datasources.reads;
import net.sf.picard.util.PeekableIterator;
import net.sf.samtools.GATKBAMFileSpan;
import net.sf.samtools.GATKBin;
import net.sf.samtools.GATKChunk;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@ -102,24 +98,23 @@ public class LowMemoryIntervalSharder implements Iterator<FilePointer> {
GenomeLoc coveredRegion = null;
for(SAMReaderID reader: dataSource.getReaderIDs()) {
GATKBAMIndex index = (GATKBAMIndex)dataSource.getIndex(reader);
BinTree binTree = getNextOverlappingBinTree(reader,(GATKBAMIndex)dataSource.getIndex(reader),currentLocus);
BAMScheduleEntry scheduleEntry = getNextOverlappingBAMScheduleEntry(reader,(GATKBAMIndex)dataSource.getIndex(reader),currentLocus);
// No overlapping data at all.
if(binTree != null) {
coveredRegionStart = Math.max(coveredRegionStart,binTree.getStart());
coveredRegionStop = Math.min(coveredRegionStop,binTree.getStop());
if(scheduleEntry != null) {
coveredRegionStart = Math.max(coveredRegionStart,scheduleEntry.start);
coveredRegionStop = Math.min(coveredRegionStop,scheduleEntry.stop);
coveredRegion = loci.getGenomeLocParser().createGenomeLoc(currentLocus.getContig(),coveredRegionStart,coveredRegionStop);
}
// Only a partial overlap, and the interval list precedes the bin. Force the bin tree to null.
// TODO: the only reason to do this is to generate shards with no data that are placeholders for the interval list. Manage this externally.
if(coveredRegion != null && currentLocus.startsBefore(coveredRegion))
binTree = null;
scheduleEntry = null;
// Always create a file span, whether there was covered data or not. If there was no covered data, then the binTree is empty.
GATKBAMFileSpan fileSpan = generateFileSpan(reader,index,currentLocus.getContigIndex(),binTree);
nextFilePointer.addFileSpans(reader,fileSpan);
//System.out.printf("Shard: index file = %s; reference sequence = %d; ",index.getIndexFile(),currentLocus.getContigIndex());
nextFilePointer.addFileSpans(reader,scheduleEntry != null ? scheduleEntry.fileSpan : new GATKBAMFileSpan());
}
// Early exit if no bins were found.
@ -178,74 +173,33 @@ public class LowMemoryIntervalSharder implements Iterator<FilePointer> {
/**
* The stateful iterator used to progress through the genoem.
*/
private Map<SAMReaderID, PeekableIterator<BinTree>> binTreeIterators = new HashMap<SAMReaderID, PeekableIterator<BinTree>>();
private Map<SAMReaderID, PeekableIterator<BAMScheduleEntry>> bamScheduleIterators = new HashMap<SAMReaderID, PeekableIterator<BAMScheduleEntry>>();
/**
* Get the next overlapping tree of bins associated with the given BAM file.
* @param index BAM index representation.
* @param locus Locus for which to grab the bin tree, if available.
* @return The BinTree overlapping the given locus.
*/
private BinTree getNextOverlappingBinTree(final SAMReaderID reader, final GATKBAMIndex index, final GenomeLoc locus) {
private BAMScheduleEntry getNextOverlappingBAMScheduleEntry(final SAMReaderID reader, final GATKBAMIndex index, final GenomeLoc locus) {
// Stale reference sequence or first invocation. (Re)create the binTreeIterator.
if(!lastReferenceSequenceLoaded.containsKey(reader) || lastReferenceSequenceLoaded.get(reader) != locus.getContigIndex()) {
if(binTreeIterators.containsKey(reader))
binTreeIterators.get(reader).close();
if(bamScheduleIterators.containsKey(reader))
bamScheduleIterators.get(reader).close();
lastReferenceSequenceLoaded.put(reader,locus.getContigIndex());
binTreeIterators.put(reader,new PeekableIterator<BinTree>(new BinTreeIterator(index, index.getIndexFile(), locus.getContigIndex())));
bamScheduleIterators.put(reader,new PeekableIterator<BAMScheduleEntry>(new BAMSchedule(index, locus.getContigIndex())));
}
PeekableIterator<BinTree> binTreeIterator = binTreeIterators.get(reader);
if(!binTreeIterator.hasNext())
PeekableIterator<BAMScheduleEntry> bamScheduleIterator = bamScheduleIterators.get(reader);
if(!bamScheduleIterator.hasNext())
return null;
// Peek the iterator along until finding the first binTree at or following the current locus.
BinTree binTree = binTreeIterator.peek();
while(binTree != null && binTree.isBefore(locus)) {
binTreeIterator.next();
binTree = binTreeIterator.hasNext() ? binTreeIterator.peek() : null;
BAMScheduleEntry bamScheduleEntry = bamScheduleIterator.peek();
while(bamScheduleEntry != null && bamScheduleEntry.isBefore(locus)) {
bamScheduleIterator.next();
bamScheduleEntry = bamScheduleIterator.hasNext() ? bamScheduleIterator.peek() : null;
}
return (binTree != null && binTree.overlaps(locus)) ? binTree : null;
return (bamScheduleEntry != null && bamScheduleEntry.overlaps(locus)) ? bamScheduleEntry : null;
}
/**
* Converts a bin list to a file span, trimmed based on the linear index and with overlapping regions removed.
* @param index BAM index.
* @param binTree Tree of data found to overlap the region. binTree.overlaps(initialRegion) must return true.
* @return File span mapping to given region.
*/
private GATKBAMFileSpan generateFileSpan(final SAMReaderID reader, final GATKBAMIndex index, final int referenceSequence, final BinTree binTree) {
System.out.printf("Shard %d: index file = %s; reference sequence = %d; span = %d-%d; ",++shardNumber,index.getIndexFile(),referenceSequence,(shardNumber-1)*16384+1,shardNumber*16384);
// Empty bin trees mean empty file spans.
if(binTree == null) {
System.out.printf("bins = {}; minimumOffset = 0, chunks = {}%n");
return new GATKBAMFileSpan();
}
System.out.printf("bins = {");
List<GATKChunk> chunks = new ArrayList<GATKChunk>(binTree.size());
for(GATKBin bin: binTree.getBins()) {
if(bin == null)
continue;
System.out.printf("%d,",bin.getBinNumber());
// The optimizer below will mutate the chunk list. Make sure each element is a clone of the reference sequence.
for(GATKChunk chunk: bin.getChunkList())
chunks.add(chunk.clone());
}
System.out.printf("}; ");
final long linearIndexMinimumOffset = binTree.getLinearIndexEntry();
System.out.printf("minimumOffset = %d, ",linearIndexMinimumOffset);
// Optimize the chunk list with a linear index optimization
chunks = index.optimizeChunkList(chunks,linearIndexMinimumOffset);
GATKBAMFileSpan fileSpan = new GATKBAMFileSpan(chunks.toArray(new GATKChunk[chunks.size()]));
System.out.printf("chunks = {%s}%n",fileSpan);
return fileSpan;
}
static long shardNumber = 0;
}

View File

@ -96,7 +96,7 @@ public class ReadShardStrategy implements ShardStrategy {
this.locations = locations;
if(locations != null)
filePointerIterator = SAMDataSource.isLowMemoryShardingEnabled() ? new LowMemoryIntervalSharder(this.dataSource,locations) : IntervalSharder.shardIntervals(this.dataSource,locations);
filePointerIterator = dataSource.isLowMemoryShardingEnabled() ? new LowMemoryIntervalSharder(this.dataSource,locations) : IntervalSharder.shardIntervals(this.dataSource,locations);
else
filePointerIterator = filePointers.iterator();

View File

@ -131,7 +131,7 @@ public class SAMDataSource {
/**
* Whether to enable the new low-memory sharding mechanism.
*/
private static boolean enableLowMemorySharding = false;
private boolean enableLowMemorySharding = false;
/**
* Create a new SAM data source given the supplied read metadata.
@ -148,6 +148,7 @@ public class SAMDataSource {
new ValidationExclusion(),
new ArrayList<SamRecordFilter>(),
false,
false,
false);
}
@ -164,7 +165,8 @@ public class SAMDataSource {
ValidationExclusion exclusionList,
Collection<SamRecordFilter> supplementalFilters,
boolean includeReadsWithDeletionAtLoci,
boolean generateExtendedEvents) {
boolean generateExtendedEvents,
boolean enableLowMemorySharding) {
this( samFiles,
genomeLocParser,
useOriginalBaseQualities,
@ -178,8 +180,8 @@ public class SAMDataSource {
BAQ.CalculationMode.OFF,
BAQ.QualityMode.DONT_MODIFY,
null, // no BAQ
(byte) -1
);
(byte) -1,
enableLowMemorySharding);
}
/**
@ -213,7 +215,9 @@ public class SAMDataSource {
BAQ.CalculationMode cmode,
BAQ.QualityMode qmode,
IndexedFastaSequenceFile refReader,
byte defaultBaseQualities) {
byte defaultBaseQualities,
boolean enableLowMemorySharding) {
this.enableLowMemorySharding(enableLowMemorySharding);
this.readMetrics = new ReadMetrics();
this.genomeLocParser = genomeLocParser;
@ -309,7 +313,7 @@ public class SAMDataSource {
* Enable experimental low-memory sharding.
* @param enable True to enable sharding. False otherwise.
*/
public static void enableLowMemorySharding(final boolean enable) {
public void enableLowMemorySharding(final boolean enable) {
enableLowMemorySharding = enable;
}
@ -317,7 +321,7 @@ public class SAMDataSource {
* Returns whether low-memory sharding is enabled.
* @return True if enabled, false otherwise.
*/
public static boolean isLowMemoryShardingEnabled() {
public boolean isLowMemoryShardingEnabled() {
return enableLowMemorySharding;
}

View File

@ -10,7 +10,7 @@ import java.io.File;
* @author mhanna
* @version 0.1
*/
public class SAMReaderID {
public class SAMReaderID implements Comparable {
/**
* The SAM file at the heart of this reader. SAMReaderID
* currently supports only file-based readers.
@ -69,4 +69,8 @@ public class SAMReaderID {
public int hashCode() {
return samFile.hashCode();
}
public int compareTo(Object other) {
return this.samFile.getAbsolutePath().compareTo(((SAMReaderID)other).samFile.getAbsolutePath());
}
}