Basic use cases for merging BAM files with the new sharding system work.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2815 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-02-09 22:14:37 +00:00
parent 934d4b93a2
commit 0250338ce7
11 changed files with 142 additions and 55 deletions

View File

@ -30,6 +30,7 @@ import java.util.Iterator;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.SAMFileReader;
import net.sf.samtools.util.CloseableIterator;
/**
* Iterator for SAM records that implements comparable to enable sorting of iterators.
@ -37,6 +38,7 @@ import net.sf.samtools.SAMFileReader;
* record in another iterator and returning the ordering between those SAM records.
*/
class ComparableSamRecordIterator extends PeekableIterator<SAMRecord> implements Comparable<ComparableSamRecordIterator> {
private final CloseableIterator<SAMRecord> iterator;
private final Comparator<SAMRecord> comparator;
/**
@ -46,11 +48,16 @@ class ComparableSamRecordIterator extends PeekableIterator<SAMRecord> implements
* @param iterator the wrapped iterator.
* @param comparator the Comparator to use to provide ordering fo SAMRecords
*/
public ComparableSamRecordIterator(final Iterator<SAMRecord> iterator, final Comparator<SAMRecord> comparator) {
public ComparableSamRecordIterator(final CloseableIterator<SAMRecord> iterator, final Comparator<SAMRecord> comparator) {
super(iterator);
this.iterator = iterator;
this.comparator = comparator;
}
public CloseableIterator<SAMRecord> getWrappedIterator() {
return iterator;
}
/**
* Compares this iterator to another comparable iterator based on the next record
* available in each iterator. If the two comparable iterators have different

View File

@ -29,6 +29,7 @@ import java.util.*;
import java.lang.reflect.Constructor;
import net.sf.samtools.*;
import net.sf.samtools.util.CloseableIterator;
/**
* Provides an iterator interface for merging multiple underlying iterators into a single
@ -61,7 +62,7 @@ public class MergingSamRecordIterator implements Iterator<SAMRecord> {
* @param readerToIteratorMap A mapping of reader to iterator.
* @param forcePresorted True to ensure that the iterator checks the headers of the readers for appropriate sort order.
*/
public MergingSamRecordIterator(final SamFileHeaderMerger headerMerger, final Map<SAMFileReader,Iterator<SAMRecord>> readerToIteratorMap, final boolean forcePresorted) {
public MergingSamRecordIterator(final SamFileHeaderMerger headerMerger, final Map<SAMFileReader, CloseableIterator<SAMRecord>> readerToIteratorMap, final boolean forcePresorted) {
this.samHeaderMerger = headerMerger;
this.sortOrder = headerMerger.getMergedHeader().getSortOrder();
final SAMRecordComparator comparator = getComparator();
@ -77,7 +78,7 @@ public class MergingSamRecordIterator implements Iterator<SAMRecord> {
final ComparableSamRecordIterator iterator = new ComparableSamRecordIterator(readerToIteratorMap.get(reader),comparator);
addIfNotEmpty(iterator);
iteratorToSourceMap.put(iterator,reader);
iteratorToSourceMap.put(iterator.getWrappedIterator(),reader);
}
}
@ -86,8 +87,8 @@ public class MergingSamRecordIterator implements Iterator<SAMRecord> {
* @param readers The readers from which to derive iterators.
* @return A map of reader to its associated iterator.
*/
private static Map<SAMFileReader,Iterator<SAMRecord>> createWholeFileIterators(Collection<SAMFileReader> readers) {
Map<SAMFileReader,Iterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,Iterator<SAMRecord>>();
private static Map<SAMFileReader,CloseableIterator<SAMRecord>> createWholeFileIterators(Collection<SAMFileReader> readers) {
Map<SAMFileReader,CloseableIterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,CloseableIterator<SAMRecord>>();
for(final SAMFileReader reader: readers)
readerToIteratorMap.put(reader,reader.iterator());
return readerToIteratorMap;
@ -109,7 +110,7 @@ public class MergingSamRecordIterator implements Iterator<SAMRecord> {
if (this.samHeaderMerger.hasReadGroupCollisions()) {
final String oldGroupId = (String) record.getAttribute(ReservedTagConstants.READ_GROUP_ID);
if (oldGroupId != null ) {
final String newGroupId = this.samHeaderMerger.getReadGroupId(iteratorToSourceMap.get(iterator), oldGroupId);
final String newGroupId = this.samHeaderMerger.getReadGroupId(iteratorToSourceMap.get(iterator.getWrappedIterator()), oldGroupId);
record.setAttribute(ReservedTagConstants.READ_GROUP_ID, newGroupId);
}
}
@ -118,7 +119,7 @@ public class MergingSamRecordIterator implements Iterator<SAMRecord> {
if (this.samHeaderMerger.hasProgramGroupCollisions()) {
final String oldGroupId = (String) record.getAttribute(ReservedTagConstants.PROGRAM_GROUP_ID);
if (oldGroupId != null ) {
final String newGroupId = this.samHeaderMerger.getProgramGroupId(iteratorToSourceMap.get(iterator), oldGroupId);
final String newGroupId = this.samHeaderMerger.getProgramGroupId(iteratorToSourceMap.get(iterator.getWrappedIterator()), oldGroupId);
record.setAttribute(ReservedTagConstants.PROGRAM_GROUP_ID, newGroupId);
}
}
@ -126,11 +127,11 @@ public class MergingSamRecordIterator implements Iterator<SAMRecord> {
// Fix up the sequence indexes if needs be
if (this.samHeaderMerger.hasMergedSequenceDictionary()) {
if (record.getReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
record.setReferenceIndex(this.samHeaderMerger.getMergedSequenceIndex(iteratorToSourceMap.get(iterator),record.getReferenceIndex()));
record.setReferenceIndex(this.samHeaderMerger.getMergedSequenceIndex(iteratorToSourceMap.get(iterator.getWrappedIterator()),record.getReferenceIndex()));
}
if (record.getReadPairedFlag() && record.getMateReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
record.setMateReferenceIndex(this.samHeaderMerger.getMergedSequenceIndex(iteratorToSourceMap.get(iterator), record.getMateReferenceIndex()));
record.setMateReferenceIndex(this.samHeaderMerger.getMergedSequenceIndex(iteratorToSourceMap.get(iterator.getWrappedIterator()), record.getMateReferenceIndex()));
}
}

View File

@ -54,6 +54,11 @@ public class BAMFileIndex2 extends BAMFileIndex
*/
protected final SortedMap<Integer,LinearIndex> referenceToLinearIndices = new TreeMap<Integer,LinearIndex>();
/**
* A mapping from bin to the chunks contained in that bin.
*/
protected final SortedMap<Bin,List<Chunk>> binToChunks = new TreeMap<Bin,List<Chunk>>();
protected BAMFileIndex2(final File file) {
super(file);
loadIndex(file);
@ -121,7 +126,8 @@ public class BAMFileIndex2 extends BAMFileIndex
final long chunkEnd = readLong(fileBuffer);
chunkList.add(new Chunk(chunkBegin, chunkEnd));
}
bins[bin] = new Bin(sequence,indexBin,chunkList);
bins[bin] = new Bin(sequence,indexBin);
binToChunks.put(bins[bin],chunkList);
}
referenceToBins.put(sequence,bins);
@ -162,7 +168,7 @@ public class BAMFileIndex2 extends BAMFileIndex
List<Chunk> chunkList = new ArrayList<Chunk>();
for(Bin bin: bins)
chunkList.addAll(bin.chunks);
chunkList.addAll(binToChunks.get(bin));
if (chunkList.isEmpty()) {
return null;
@ -180,7 +186,7 @@ public class BAMFileIndex2 extends BAMFileIndex
}
long[] getFilePointersBounding(final Bin bin) {
return convertToArray(bin.chunks);
return convertToArray(binToChunks.get(bin));
}
/**

View File

@ -3,7 +3,7 @@ package net.sf.samtools;
import java.util.List;
/**
* An individual bin in a BAM file, divided into chunks plus a linear index.
* An individual bin in a BAM file.
*
* @author mhanna
* @version 0.1
@ -19,15 +19,9 @@ public class Bin implements Comparable {
*/
public final int binNumber;
/**
* The chunks contained within this bin.
*/
public final List<Chunk> chunks;
public Bin(int referenceSequence, int binNumber, List<Chunk> chunks) {
public Bin(int referenceSequence, int binNumber) {
this.referenceSequence = referenceSequence;
this.binNumber = binNumber;
this.chunks = chunks;
}
/**
@ -36,6 +30,7 @@ public class Bin implements Comparable {
* @param other The other Bin to which to compare this.
* @return True if the two bins are equal. False otherwise.
*/
@Override
public boolean equals(Object other) {
if(other == null) return false;
if(!(other instanceof Bin)) return false;
@ -44,11 +39,21 @@ public class Bin implements Comparable {
return this.referenceSequence == otherBin.referenceSequence && this.binNumber == otherBin.binNumber;
}
/**
* Compute a unique hash code for the given reference sequence and bin number.
* @return A unique hash code.
*/
@Override
public int hashCode() {
return ((Integer)referenceSequence).hashCode() ^ ((Integer)binNumber).hashCode();
}
/**
* Compare two bins to see what ordering they should appear in.
* @param other Other bin to which this bin should be compared.
* @return -1 if this < other, 0 if this == other, 1 if this > other.
*/
@Override
public int compareTo(Object other) {
if(other == null)
throw new ClassCastException("Cannot compare to a null object");

View File

@ -9,6 +9,7 @@ import java.io.IOException;
import java.io.FileInputStream;
import java.io.PrintStream;
import java.nio.channels.FileChannel;
import java.util.List;
import net.sf.samtools.*;
@ -106,8 +107,9 @@ public class BAMFileStat extends CommandLineProgram {
outputStream.printf("Reference sequence: %d%n",referenceSequence);
outputStream.printf("number of bins: %d%n",bins.length);
for(Bin bin: bins) {
outputStream.printf("\tBin: %d, number of chunks: %d%n", bin.binNumber, bin.chunks.size());
for(Chunk chunk: bin.chunks)
List<Chunk> chunks = binToChunks.get(bin);
outputStream.printf("\tBin: %d, number of chunks: %d%n", bin.binNumber, chunks.size());
for(Chunk chunk: chunks)
outputStream.printf("\t\tChunk: %s%n", chunk);
}
LinearIndex linearIndex = referenceToLinearIndices.get(referenceSequence);

View File

@ -1,8 +1,10 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.Chunk;
import net.sf.samtools.SAMFileReader2;
import java.util.List;
import java.util.Map;
/**
* A common interface for shards that natively understand the BAM format.
@ -15,5 +17,5 @@ public interface BAMFormatAwareShard extends Shard {
* Get the list of chunks delimiting this shard.
* @return a list of chunks that contain data for this shard.
*/
public List<Chunk> getChunks();
public Map<SAMFileReader2,List<Chunk>> getChunks();
}

View File

@ -1,8 +1,11 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.Chunk;
import net.sf.samtools.SAMFileReader2;
import java.util.List;
import java.util.Map;
import java.util.Collections;
/**
* Expresses a shard of read data in block format.
@ -11,12 +14,18 @@ import java.util.List;
* @version 0.1
*/
public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAwareShard {
/**
* The reader associated with this shard.
*/
private final SAMFileReader2 reader;
/**
* The list of chunks to retrieve when loading this shard.
*/
private final List<Chunk> chunks;
public BlockDelimitedReadShard(List<Chunk> chunks) {
public BlockDelimitedReadShard(SAMFileReader2 reader, List<Chunk> chunks) {
this.reader = reader;
this.chunks = chunks;
}
@ -24,8 +33,8 @@ public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAware
* Get the list of chunks delimiting this shard.
* @return a list of chunks that contain data for this shard.
*/
public List<Chunk> getChunks() {
return chunks;
public Map<SAMFileReader2,List<Chunk>> getChunks() {
return Collections.singletonMap(reader,chunks);
}
/**

View File

@ -1,9 +1,6 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.Chunk;
import net.sf.samtools.BAMFileHeaderLoader;
import net.sf.samtools.BAMChunkIterator;
import net.sf.samtools.BAMBlockIterator;
import net.sf.samtools.*;
import java.util.*;
import java.io.File;
@ -25,6 +22,11 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
*/
protected int blockCount = 100;
/**
* The data source used to shard.
*/
protected final SAMDataSource dataSource;
/**
* The actual chunks streaming into the file.
*/
@ -40,6 +42,8 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
* @param dataSource Data source from which to load shards.
*/
public BlockDelimitedReadShardStrategy(SAMDataSource dataSource) {
this.dataSource = dataSource;
if(dataSource.getReadsInfo().getReadsFiles().size() > 1)
throw new UnsupportedOperationException("Experimental sharding only works with a single BAM at the moment.");
File bamFile = dataSource.getReadsInfo().getReadsFiles().get(0);
@ -71,7 +75,11 @@ public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
public Shard next() {
if(!hasNext())
throw new NoSuchElementException("No such element available: SAM reader has arrived at last shard.");
Shard shard = new BlockDelimitedReadShard(Collections.unmodifiableList(new ArrayList<Chunk>(nextChunks)));
if(dataSource.getReaders().size() == 0)
throw new StingException("Unable to shard with no readers present.");
Shard shard = new BlockDelimitedReadShard((SAMFileReader2)dataSource.getReaders().iterator().next(),Collections.unmodifiableList(new ArrayList<Chunk>(nextChunks)));
advance();
return shard;
}

View File

@ -1,11 +1,12 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.StingException;
import net.sf.samtools.Chunk;
import net.sf.samtools.SAMFileReader2;
import java.util.List;
import java.util.Map;
/*
@ -40,7 +41,7 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa
/**
* A list of the chunks associated with this shard.
*/
private final List<Chunk> chunks;
private final Map<SAMFileReader2,List<Chunk>> chunks;
/**
* An IndexDelimitedLocusShard can be used either for LOCUS or LOCUS_INTERVAL shard types.
@ -54,7 +55,7 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa
* @param chunks Chunks associated with that interval.
* @param shardType Type of the shard; must be either LOCUS or LOCUS_INTERVAL.
*/
IndexDelimitedLocusShard(List<GenomeLoc> intervals, List<Chunk> chunks, ShardType shardType) {
IndexDelimitedLocusShard(List<GenomeLoc> intervals, Map<SAMFileReader2,List<Chunk>> chunks, ShardType shardType) {
super(intervals);
this.chunks = chunks;
if(shardType != ShardType.LOCUS && shardType != ShardType.LOCUS_INTERVAL)
@ -66,7 +67,7 @@ public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwa
* Gets the chunks associated with this locus shard.
* @return A list of the chunks to use when retrieving locus data.
*/
public List<Chunk> getChunks() {
public Map<SAMFileReader2,List<Chunk>> getChunks() {
return chunks;
}

View File

@ -10,6 +10,7 @@ import java.util.*;
import net.sf.samtools.Chunk;
import net.sf.samtools.Bin;
import net.sf.samtools.SAMFileReader2;
/*
* Copyright (c) 2009 The Broad Institute
@ -75,7 +76,6 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
}
// Group the loci by bin, sorted in the order in which bins appear in the file. Only use the smallest bins in the set.
// TODO -- does this work with large interval lists?
for(String contig: locationToReference.keySet()) {
SortedMap<Bin,List<GenomeLoc>> bins = new TreeMap<Bin,List<GenomeLoc>>();
for(GenomeLoc location: locationToReference.get(contig)) {
@ -111,7 +111,7 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
*/
public IndexDelimitedLocusShard next() {
FilePointer nextFilePointer = filePointerIterator.next();
List<Chunk> chunksBounding = blockDrivenDataSource.getFilePointersBounding(nextFilePointer.bin);
Map<SAMFileReader2,List<Chunk>> chunksBounding = blockDrivenDataSource.getFilePointersBounding(nextFilePointer.bin);
return new IndexDelimitedLocusShard(nextFilePointer.locations,chunksBounding,Shard.ShardType.LOCUS_INTERVAL);
}

View File

@ -9,9 +9,10 @@ import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLoc;
import net.sf.samtools.*;
import net.sf.samtools.util.CloseableIterator;
import net.sf.picard.sam.SamFileHeaderMerger;
import net.sf.picard.sam.MergingSamRecordIterator;
import java.util.Collection;
import java.util.List;
import java.util.*;
import java.io.File;
/**
@ -22,7 +23,8 @@ import java.io.File;
*/
public class BlockDrivenSAMDataSource extends SAMDataSource {
private final SAMFileReader2 reader;
private final SamFileHeaderMerger headerMerger;
/**
* Create a new block-aware SAM data source given the supplied read metadata.
* @param reads The read metadata.
@ -32,34 +34,64 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
logger.warn("Experimental sharding is enabled. Many use cases are not supported. Please use with care.");
if(reads.getReadsFiles().size() > 1)
throw new StingException("Experimental sharding strategy cannot handle multiple BAM files at this point.");
Collection<SAMFileReader> readers = new ArrayList<SAMFileReader>();
for(File readsFile: reads.getReadsFiles()) {
SAMFileReader2 reader = new SAMFileReader2(readsFile);
reader.setValidationStringency(reads.getValidationStringency());
readers.add(reader);
}
File readsFile = reads.getReadsFiles().get(0);
reader = new SAMFileReader2(readsFile);
reader.setValidationStringency(reads.getValidationStringency());
this.headerMerger = new SamFileHeaderMerger(readers,SAMFileHeader.SortOrder.coordinate,true);
}
public boolean hasIndex() {
return reader.hasIndex();
for(SAMFileReader reader: headerMerger.getReaders()) {
if(!reader.hasIndex())
return false;
}
return true;
}
public List<Bin> getOverlappingBins(GenomeLoc location) {
/**
* Gets a list of the bins in each BAM file that overlap with the given interval list.
* @param location Location for which to determine the bin.
* @return A map of reader back to bin.
*/
public List<Bin> getOverlappingBins(final GenomeLoc location) {
if(headerMerger.getReaders().size() == 0)
return Collections.emptyList();
// All readers will have the same bin structure, so just use the first bin as an example.
SAMFileReader2 reader = (SAMFileReader2)headerMerger.getReaders().iterator().next();
return reader.getOverlappingBins(location.getContig(),(int)location.getStart(),(int)location.getStop());
}
public List<Chunk> getFilePointersBounding(final Bin bin) {
return reader.getFilePointersBounding(bin);
}
/**
* Gets the file pointers bounded by this bin, grouped by the reader of origination.
* @param bin The bin for which to load data.
* @return A map of the file pointers bounding the bin.
*/
public Map<SAMFileReader2,List<Chunk>> getFilePointersBounding(final Bin bin) {
Map<SAMFileReader2,List<Chunk>> filePointers = new HashMap<SAMFileReader2,List<Chunk>>();
for(SAMFileReader reader: headerMerger.getReaders()) {
SAMFileReader2 reader2 = (SAMFileReader2)reader;
filePointers.put(reader2,reader2.getFilePointersBounding(bin));
}
return filePointers;
}
/**
* Get the number of levels employed by this index.
* @return Number of levels in this index.
*/
public int getNumIndexLevels() {
if(headerMerger.getReaders().size() == 0)
throw new StingException("Unable to determine number of index levels; no BAMs are present.");
if(!hasIndex())
throw new SAMException("Unable to determine number of index levels; BAM file index is not present.");
return reader.getNumIndexLevels();
SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next();
return firstReader.getNumIndexLevels();
}
/**
@ -68,20 +100,34 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return the level associated with the given bin number.
*/
public int getLevelForBin(final Bin bin) {
if(headerMerger.getReaders().size() == 0)
throw new StingException("Unable to determine number of level for bin; no BAMs are present.");
if(!hasIndex())
throw new SAMException("Unable to determine level of bin number; BAM file index is not present.");
return reader.getLevelForBin(bin);
throw new SAMException("Unable to determine number of level for bin; BAM file index is not present.");
SAMFileReader2 firstReader = (SAMFileReader2)headerMerger.getReaders().iterator().next();
return firstReader.getLevelForBin(bin);
}
public StingSAMIterator seek(Shard shard) {
if(!(shard instanceof BAMFormatAwareShard))
throw new StingException("BlockDrivenSAMDataSource cannot operate on shards of type: " + shard.getClass());
BAMFormatAwareShard bamAwareShard = (BAMFormatAwareShard)shard;
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards. I don't
// know why this is. Please add a comment here if you do.
boolean enableVerification = shard instanceof ReadShard;
CloseableIterator<SAMRecord> iterator = reader.iterator(((BAMFormatAwareShard)shard).getChunks());
if(shard instanceof ReadShard && reads.getReadsFiles().size() > 1)
throw new StingException("Experimental read sharding cannot handle multiple BAM files at this point.");
Map<SAMFileReader,CloseableIterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,CloseableIterator<SAMRecord>>();
for(Map.Entry<SAMFileReader2,List<Chunk>> chunksByReader: bamAwareShard.getChunks().entrySet()) {
SAMFileReader2 reader = chunksByReader.getKey();
List<Chunk> chunks = chunksByReader.getValue();
readerToIteratorMap.put(reader,reader.iterator(chunks));
}
MergingSamRecordIterator iterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true);
return applyDecoratingIterators(enableVerification,
StingSAMIteratorAdapter.adapt(reads,iterator),
reads.getDownsamplingFraction(),
@ -94,7 +140,7 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
* @return The merged header.
*/
public SAMFileHeader getHeader() {
return reader.getFileHeader();
return headerMerger.getMergedHeader();
}
/**