Merging the sharding-specific inherited classes down into the base.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3581 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-06-17 22:36:13 +00:00
parent 612c3fdd9d
commit 48cbc5ce37
20 changed files with 977 additions and 1267 deletions

View File

@ -814,7 +814,7 @@ public class GenomeAnalysisEngine {
if (reads.getReadsFiles().size() == 0)
return null;
return new BlockDrivenSAMDataSource(reads);
return new SAMDataSource(reads);
}
/**

View File

@ -1,139 +0,0 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.*;
import net.sf.picard.filter.SamRecordFilter;
import java.util.*;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
/**
* Expresses a shard of read data in block format.
*
* @author mhanna
* @version 0.1
*/
public class BlockDelimitedReadShard extends ReadShard implements BAMFormatAwareShard {
/**
* Information about the origins of reads.
*/
private final Reads sourceInfo;
/**
* The data backing the next chunks to deliver to the traversal engine.
*/
private final Map<SAMReaderID,SAMFileSpan> fileSpans;
/**
* The reads making up this shard.
*/
private final Collection<SAMRecord> reads = new ArrayList<SAMRecord>(BlockDelimitedReadShardStrategy.MAX_READS);
/**
* The filter to be applied to all reads meeting this criteria.
*/
private final SamRecordFilter filter;
/**
* An BlockDelimitedLocusShard can be used either for READ or READ shard types.
* Track which type is being used.
*/
private final Shard.ShardType shardType;
public BlockDelimitedReadShard(Reads sourceInfo, Map<SAMReaderID,SAMFileSpan> fileSpans, SamRecordFilter filter, Shard.ShardType shardType) {
this.sourceInfo = sourceInfo;
this.fileSpans = fileSpans;
this.filter = filter;
this.shardType = shardType;
}
/**
* Returns true if this shard is meant to buffer reads, rather
* than just holding pointers to their locations.
* @return True if this shard can buffer reads. False otherwise.
*/
@Override
public boolean buffersReads() {
return true;
}
/**
* Returns true if the read buffer is currently full.
* @return True if this shard's buffer is full (and the shard can buffer reads).
*/
@Override
public boolean isBufferEmpty() {
return reads.size() == 0;
}
/**
* Returns true if the read buffer is currently full.
* @return True if this shard's buffer is full (and the shard can buffer reads).
*/
@Override
public boolean isBufferFull() {
return reads.size() > BlockDelimitedReadShardStrategy.MAX_READS;
}
/**
* Adds a read to the read buffer.
* @param read Add a read to the internal shard buffer.
*/
@Override
public void addRead(SAMRecord read) {
// DO NOT validate that the buffer is full. Paired read sharding will occasionally have to stuff another
// read or two into the buffer.
reads.add(read);
}
/**
* Creates an iterator over reads stored in this shard's read cache.
* @return
*/
@Override
public StingSAMIterator iterator() {
return StingSAMIteratorAdapter.adapt(sourceInfo,reads.iterator());
}
@Override
public SamRecordFilter getFilter() {
return filter;
}
/**
* Get the list of chunks delimiting this shard.
* @return a list of chunks that contain data for this shard.
*/
@Override
public Map<SAMReaderID,SAMFileSpan> getFileSpans() {
return Collections.unmodifiableMap(fileSpans);
}
/**
* returns the type of shard.
*/
@Override
public ShardType getShardType() {
return shardType;
}
/**
* String representation of this shard.
* @return A string representation of the boundaries of this shard.
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for(Map.Entry<SAMReaderID,SAMFileSpan> entry: fileSpans.entrySet()) {
sb.append(entry.getKey());
sb.append(": ");
sb.append(entry.getValue());
sb.append(' ');
}
return sb.toString();
}
}

View File

@ -1,159 +0,0 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.*;
import net.sf.picard.filter.SamRecordFilter;
import java.util.*;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
/**
* A read shard strategy that delimits based on the number of
* blocks in the BAM file.
*
* @author mhanna
* @version 0.1
*/
public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
/**
* What is the maximum number of reads which should go into a read shard.
*/
protected static final int MAX_READS = 10000;
/**
* The data source used to shard.
*/
private final BlockDrivenSAMDataSource dataSource;
/**
* The intervals to be processed.
*/
private final GenomeLocSortedSet locations;
/**
* The cached shard to be returned next. Prefetched in the peekable iterator style.
*/
private Shard nextShard = null;
/** our storage of the genomic locations they'd like to shard over */
private final List<FilePointer> filePointers = new ArrayList<FilePointer>();
/**
* Iterator over the list of file pointers.
*/
private final Iterator<FilePointer> filePointerIterator;
/**
* The file pointer currently being processed.
*/
private FilePointer currentFilePointer;
/**
* Ending position of the last shard in the file.
*/
private Map<SAMReaderID,SAMFileSpan> position;
/**
* Create a new read shard strategy, loading read shards from the given BAM file.
* @param dataSource Data source from which to load shards.
* @param locations intervals to use for sharding.
*/
public BlockDelimitedReadShardStrategy(SAMDataSource dataSource, GenomeLocSortedSet locations) {
if(!(dataSource instanceof BlockDrivenSAMDataSource))
throw new StingException("Block-delimited read shard strategy cannot work without a block-driven data source.");
this.dataSource = (BlockDrivenSAMDataSource)dataSource;
this.position = this.dataSource.getCurrentPosition();
this.locations = locations;
if(locations != null)
filePointerIterator = IntervalSharder.shardIntervals(this.dataSource,locations.toList());
else
filePointerIterator = filePointers.iterator();
if(filePointerIterator.hasNext())
currentFilePointer = filePointerIterator.next();
advance();
}
/**
* do we have another read shard?
* @return True if any more data is available. False otherwise.
*/
public boolean hasNext() {
return nextShard != null;
}
/**
* Retrieves the next shard, if available.
* @return The next shard, if available.
* @throws NoSuchElementException if no such shard is available.
*/
public Shard next() {
if(!hasNext())
throw new NoSuchElementException("No next read shard available");
Shard currentShard = nextShard;
advance();
return currentShard;
}
public void advance() {
Map<SAMReaderID,SAMFileSpan> shardPosition = new HashMap<SAMReaderID,SAMFileSpan>();
nextShard = null;
SamRecordFilter filter = null;
if(locations != null) {
Map<SAMReaderID,SAMFileSpan> selectedReaders = new HashMap<SAMReaderID,SAMFileSpan>();
while(selectedReaders.size() == 0 && currentFilePointer != null) {
shardPosition = currentFilePointer.fileSpans;
for(SAMReaderID id: shardPosition.keySet()) {
SAMFileSpan fileSpan = shardPosition.get(id).removeContentsBefore(position.get(id));
if(!fileSpan.isEmpty())
selectedReaders.put(id,fileSpan);
}
if(selectedReaders.size() > 0) {
filter = new ReadOverlapFilter(currentFilePointer.locations);
BAMFormatAwareShard shard = new BlockDelimitedReadShard(dataSource.getReadsInfo(),selectedReaders,filter,Shard.ShardType.READ);
dataSource.fillShard(shard);
if(!shard.isBufferEmpty()) {
nextShard = shard;
break;
}
}
selectedReaders.clear();
currentFilePointer = filePointerIterator.hasNext() ? filePointerIterator.next() : null;
}
}
else {
BAMFormatAwareShard shard = new BlockDelimitedReadShard(dataSource.getReadsInfo(),position,filter,Shard.ShardType.READ);
dataSource.fillShard(shard);
nextShard = !shard.isBufferEmpty() ? shard : null;
}
this.position = dataSource.getCurrentPosition();
}
/**
* @throws UnsupportedOperationException always.
*/
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
/**
* Convenience method for using ShardStrategy in an foreach loop.
* @return A iterator over shards.
*/
public Iterator<Shard> iterator() {
return this;
}
}

View File

@ -1,120 +0,0 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.SAMFileSpan;
import net.sf.picard.filter.SamRecordFilter;
import java.util.List;
import java.util.Map;
/*
* Copyright (c) 2009 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* A shard that's delimited based on the index rather than
*/
public class IndexDelimitedLocusShard extends LocusShard implements BAMFormatAwareShard {
/**
* A list of the chunks associated with this shard.
*/
private final Map<SAMReaderID,SAMFileSpan> fileSpans;
/**
* Create a new locus shard, divided by index.
* @param intervals List of intervals to process.
* @param fileSpans File spans associated with that interval.
*/
IndexDelimitedLocusShard(List<GenomeLoc> intervals, Map<SAMReaderID,SAMFileSpan> fileSpans) {
super(intervals);
this.fileSpans = fileSpans;
}
/**
* Gets the file spans associated with this locus shard.
* @return A list of the file spans to use when retrieving locus data.
*/
@Override
public Map<SAMReaderID,SAMFileSpan> getFileSpans() {
return fileSpans;
}
/**
* Returns true if this shard is meant to buffer reads, rather
* than just holding pointers to their locations.
* @return True if this shard can buffer reads. False otherwise.
*/
@Override
public boolean buffersReads() { return false; }
/**
* Returns true if the read buffer is currently full.
* @return True if this shard's buffer is full (and the shard can buffer reads).
*/
@Override
public boolean isBufferEmpty() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Returns true if the read buffer is currently full.
* @return True if this shard's buffer is full (and the shard can buffer reads).
*/
@Override
public boolean isBufferFull() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Adds a read to the read buffer.
* @param read Add a read to the internal shard buffer.
*/
@Override
public void addRead(SAMRecord read) { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Gets the iterator over the elements cached in the shard.
* @return
*/
@Override
public StingSAMIterator iterator() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Gets a filter testing for overlap of this read with the given shard.
* @return A filter capable of filtering out reads outside a given shard.
*/
@Override
public SamRecordFilter getFilter() {
return new ReadOverlapFilter(loci);
}
/**
* returns the type of shard.
*/
@Override
public ShardType getShardType() {
return ShardType.LOCUS;
}
}

View File

@ -27,8 +27,8 @@ package org.broadinstitute.sting.gatk.datasources.shards;
import org.broadinstitute.sting.utils.*;
import org.broadinstitute.sting.utils.collections.Pair;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.apache.log4j.Logger;
import java.util.*;
@ -45,7 +45,7 @@ import net.sf.picard.util.PeekableIterator;
public class IntervalSharder {
private static Logger logger = Logger.getLogger(IntervalSharder.class);
public static Iterator<FilePointer> shardIntervals(final BlockDrivenSAMDataSource dataSource, final List<GenomeLoc> loci) {
public static Iterator<FilePointer> shardIntervals(final SAMDataSource dataSource, final List<GenomeLoc> loci) {
return new FilePointerIterator(dataSource,loci);
}
@ -53,11 +53,11 @@ public class IntervalSharder {
* A lazy-loading iterator over file pointers.
*/
private static class FilePointerIterator implements Iterator<FilePointer> {
final BlockDrivenSAMDataSource dataSource;
final SAMDataSource dataSource;
final PeekableIterator<GenomeLoc> locusIterator;
final Queue<FilePointer> cachedFilePointers = new LinkedList<FilePointer>();
public FilePointerIterator(final BlockDrivenSAMDataSource dataSource, final List<GenomeLoc> loci) {
public FilePointerIterator(final SAMDataSource dataSource, final List<GenomeLoc> loci) {
this.dataSource = dataSource;
locusIterator = new PeekableIterator<GenomeLoc>(loci.iterator());
advance();
@ -98,7 +98,7 @@ public class IntervalSharder {
}
}
private static List<FilePointer> shardIntervalsOnContig(final BlockDrivenSAMDataSource dataSource, final String contig, final List<GenomeLoc> loci) {
private static List<FilePointer> shardIntervalsOnContig(final SAMDataSource dataSource, final String contig, final List<GenomeLoc> loci) {
// Gather bins for the given loci, splitting loci as necessary so that each falls into exactly one lowest-level bin.
List<FilePointer> filePointers = new ArrayList<FilePointer>();
FilePointer lastFilePointer = null;

View File

@ -2,35 +2,104 @@ package org.broadinstitute.sting.gatk.datasources.shards;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.Utils;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import java.util.List;
import java.util.Map;
import net.sf.samtools.SAMFileSpan;
import net.sf.samtools.SAMRecord;
import net.sf.picard.filter.SamRecordFilter;
/**
* This is the base class for locus shards. Right now it does little more then
* wrap GenomeLoc (actually nothing more), but it's good to have the class
* in place so it's easier to change guts later.
* Handles locus shards of BAM information.
* @author aaron
* @version 1.0
* @date Apr 7, 2009
*/
public class LocusShard implements Shard {
// currently our location
protected final List<GenomeLoc> loci;
public class LocusShard implements BAMFormatAwareShard {
/**
* A list of the chunks associated with this shard.
*/
private final Map<SAMReaderID,SAMFileSpan> fileSpans;
public LocusShard(List<GenomeLoc> loci) {
this.loci = loci;
// currently our location
private final List<GenomeLoc> loci;
/**
* Create a new locus shard, divided by index.
* @param intervals List of intervals to process.
* @param fileSpans File spans associated with that interval.
*/
public LocusShard(List<GenomeLoc> intervals, Map<SAMReaderID,SAMFileSpan> fileSpans) {
this.loci = intervals;
this.fileSpans = fileSpans;
}
/**
* Gets the file spans associated with this locus shard.
* @return A list of the file spans to use when retrieving locus data.
*/
@Override
public Map<SAMReaderID,SAMFileSpan> getFileSpans() {
return fileSpans;
}
/** @return the genome location represented by this shard */
public List<GenomeLoc> getGenomeLocs() {
return loci;
}
/**
* Returns true if this shard is meant to buffer reads, rather
* than just holding pointers to their locations.
* @return True if this shard can buffer reads. False otherwise.
*/
@Override
public boolean buffersReads() { return false; }
/**
* Returns true if the read buffer is currently full.
* @return True if this shard's buffer is full (and the shard can buffer reads).
*/
@Override
public boolean isBufferEmpty() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Returns true if the read buffer is currently full.
* @return True if this shard's buffer is full (and the shard can buffer reads).
*/
@Override
public boolean isBufferFull() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Adds a read to the read buffer.
* @param read Add a read to the internal shard buffer.
*/
@Override
public void addRead(SAMRecord read) { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Gets the iterator over the elements cached in the shard.
* @return
*/
@Override
public StingSAMIterator iterator() { throw new UnsupportedOperationException("This shard does not buffer reads."); }
/**
* Gets a filter testing for overlap of this read with the given shard.
* @return A filter capable of filtering out reads outside a given shard.
*/
@Override
public SamRecordFilter getFilter() {
return new ReadOverlapFilter(loci);
}
/**
* what kind of shard do we return
*
* @return ShardType, indicating the type
* returns the type of shard.
*/
@Override
public ShardType getShardType() {
return ShardType.LOCUS;
}

View File

@ -1,22 +1,5 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
import java.util.*;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMSequenceRecord;
import net.sf.samtools.SAMFileSpan;
/*
* Copyright (c) 2009 The Broad Institute
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
@ -29,7 +12,6 @@ import net.sf.samtools.SAMFileSpan;
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
@ -40,15 +22,29 @@ import net.sf.samtools.SAMFileSpan;
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.shards;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
import java.util.*;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMSequenceRecord;
import net.sf.samtools.SAMFileSpan;
/**
* A sharding strategy for loci based on reading of the index.
*/
public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
public class LocusShardStrategy implements ShardStrategy {
/**
* The data source to use when performing this sharding.
*/
private final BlockDrivenSAMDataSource reads;
private final SAMDataSource reads;
/**
* An iterator through the available file pointers.
@ -60,13 +56,8 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
* @param reads Data source from which to load index data.
* @param locations List of locations for which to load data.
*/
IndexDelimitedLocusShardStrategy(SAMDataSource reads, IndexedFastaSequenceFile reference, GenomeLocSortedSet locations) {
LocusShardStrategy(SAMDataSource reads, IndexedFastaSequenceFile reference, GenomeLocSortedSet locations) {
if(reads != null) {
// Shard based on reads.
// TODO: Push this sharding into the data source.
if(!(reads instanceof BlockDrivenSAMDataSource))
throw new StingException("Cannot power an IndexDelimitedLocusShardStrategy with this data source.");
List<GenomeLoc> intervals;
if(locations == null) {
// If no locations were passed in, shard the entire BAM file.
@ -86,7 +77,7 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
else
intervals = locations.toList();
this.reads = (BlockDrivenSAMDataSource)reads;
this.reads = reads;
this.filePointerIterator = IntervalSharder.shardIntervals(this.reads,intervals);
}
else {
@ -124,10 +115,10 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
*
* @return the next shard
*/
public IndexDelimitedLocusShard next() {
public LocusShard next() {
FilePointer nextFilePointer = filePointerIterator.next();
Map<SAMReaderID,SAMFileSpan> fileSpansBounding = nextFilePointer.fileSpans != null ? nextFilePointer.fileSpans : null;
return new IndexDelimitedLocusShard(nextFilePointer.locations,fileSpansBounding);
return new LocusShard(nextFilePointer.locations,fileSpansBounding);
}
/** we don't support the remove command */

View File

@ -1,51 +0,0 @@
package org.broadinstitute.sting.gatk.datasources.shards;
/**
* A read shard delimited by an actual read count, rather than blocks or any other
* physical mapping of the BAM file.
*
* @author mhanna
* @version 0.1
*/
public class ReadDelimitedReadShard extends ReadShard {
// the count of the reads we want to copy off
private int size = 0;
/**
* our tie in for the shard strategy. This allows us to signal to the shard
* strategy that we've finished process, so it can indicate that we're out of reads
*/
private final ReadDelimitedReadShardStrategy strat;
/**
* create a read shard, given a read size
* @param strat The sharding strategy used to create this shard.
* @param size Size of the shard, in reads.
*/
ReadDelimitedReadShard(ReadDelimitedReadShardStrategy strat, int size) {
this.size = size;
this.strat = strat;
}
/** @return the genome location represented by this shard */
public int getSize() {
return size;
}
/**
* this method is used as a backend, to signal to the sharding strategy that we've
* finished processing. When we move to a more read-aware bam system this method could disappear.
*/
public void signalDone() {
strat.signalDone();
}
/**
* String representation of this shard.
* @return A string representation of the boundaries of this shard.
*/
@Override
public String toString() {
return String.format("%d reads", size);
}
}

View File

@ -1,86 +0,0 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import java.util.Iterator;
/**
* A shard strategy that breaks up shards based on how many reads are
* in each.
*
* @author mhanna
* @version 0.1
*/
public class ReadDelimitedReadShardStrategy extends ReadShardStrategy {
// our read bucket size, default
protected long readCount = 1000L;
// our hasnext flag
boolean hasNext = true;
// our limiting factor
long limitedSize = -1;
boolean stopDueToLimitingFactor = false;
/**
* the default constructor
* @param size the read count to iterate over
* @param limitedSize limit the shard to this length
*/
ReadDelimitedReadShardStrategy(long size, long limitedSize) {
readCount = size;
this.limitedSize = limitedSize;
}
/**
* do we have another read shard?
* @return
*/
public boolean hasNext() {
if (stopDueToLimitingFactor) {
return false;
}
return hasNext;
}
public Shard next() {
if (limitedSize > 0) {
if (limitedSize > readCount) {
limitedSize = limitedSize - readCount;
}
else {
readCount = limitedSize;
limitedSize = 0;
stopDueToLimitingFactor = true;
}
}
return new ReadDelimitedReadShard(this,(int)readCount);
}
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
public Iterator<Shard> iterator() {
return this;
}
/**
* set the next shards size
*
* @param size adjust the next size to this
*/
public void adjustNextShardSize(long size) {
readCount = size;
}
/**
* this function is a work-around for the fact that
* we don't know when we're out of reads until the SAM data source
* tells us so.
*/
public void signalDone() {
hasNext = false;
}
}

View File

@ -1,8 +1,16 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
import java.util.List;
import java.util.*;
import net.sf.samtools.SAMFileSpan;
import net.sf.samtools.SAMRecord;
import net.sf.picard.filter.SamRecordFilter;
/**
*
@ -21,21 +29,131 @@ import java.util.List;
*/
/**
* the base class for read shards.
* @author aaron
* Expresses a shard of read data in block format.
*
* @author mhanna
* @version 0.1
*/
public abstract class ReadShard implements Shard {
public class ReadShard implements BAMFormatAwareShard {
/**
* Information about the origins of reads.
*/
private final Reads sourceInfo;
/**
* The data backing the next chunks to deliver to the traversal engine.
*/
private final Map<SAMReaderID,SAMFileSpan> fileSpans;
/**
* The reads making up this shard.
*/
private final Collection<SAMRecord> reads = new ArrayList<SAMRecord>(ReadShardStrategy.MAX_READS);
/**
* The filter to be applied to all reads meeting this criteria.
*/
private final SamRecordFilter filter;
public ReadShard(Reads sourceInfo, Map<SAMReaderID,SAMFileSpan> fileSpans, SamRecordFilter filter) {
this.sourceInfo = sourceInfo;
this.fileSpans = fileSpans;
this.filter = filter;
}
/**
* Get the list of chunks delimiting this shard.
* @return a list of chunks that contain data for this shard.
*/
@Override
public Map<SAMReaderID,SAMFileSpan> getFileSpans() {
return Collections.unmodifiableMap(fileSpans);
}
/** @return the genome location represented by this shard */
@Override
public List<GenomeLoc> getGenomeLocs() {
throw new UnsupportedOperationException("ReadShard isn't genome loc aware");
}
/**
* Returns true if this shard is meant to buffer reads, rather
* than just holding pointers to their locations.
* @return True if this shard can buffer reads. False otherwise.
*/
@Override
public boolean buffersReads() {
return true;
}
/**
* Returns true if the read buffer is currently full.
* @return True if this shard's buffer is full (and the shard can buffer reads).
*/
@Override
public boolean isBufferEmpty() {
return reads.size() == 0;
}
/**
* Returns true if the read buffer is currently full.
* @return True if this shard's buffer is full (and the shard can buffer reads).
*/
@Override
public boolean isBufferFull() {
return reads.size() > ReadShardStrategy.MAX_READS;
}
/**
* Adds a read to the read buffer.
* @param read Add a read to the internal shard buffer.
*/
@Override
public void addRead(SAMRecord read) {
// DO NOT validate that the buffer is full. Paired read sharding will occasionally have to stuff another
// read or two into the buffer.
reads.add(read);
}
/**
* Creates an iterator over reads stored in this shard's read cache.
* @return
*/
@Override
public StingSAMIterator iterator() {
return StingSAMIteratorAdapter.adapt(sourceInfo,reads.iterator());
}
@Override
public SamRecordFilter getFilter() {
return filter;
}
/**
* what kind of shard do we return
*
* @return ShardType, indicating the type
*/
@Override
public ShardType getShardType() {
return ShardType.READ;
}
/**
* String representation of this shard.
* @return A string representation of the boundaries of this shard.
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for(Map.Entry<SAMReaderID,SAMFileSpan> entry: fileSpans.entrySet()) {
sb.append(entry.getKey());
sb.append(": ");
sb.append(entry.getValue());
sb.append(' ');
}
return sb.toString();
}
}

View File

@ -1,11 +1,5 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.SAMSequenceDictionary;
import java.util.Iterator;
/*
* Copyright (c) 2009 The Broad Institute
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
@ -18,7 +12,6 @@ import java.util.Iterator;
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
@ -29,22 +22,155 @@ import java.util.Iterator;
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.SAMFileSpan;
import net.sf.picard.filter.SamRecordFilter;
import java.util.*;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
/**
* The sharding strategy for reads using a simple counting mechanism. Each read shard
* has a specific number of reads (default to 10K) which is configured in the constructor.
* @author aaron
* @version 1.0
* @date Apr 14, 2009
* <p/>
* Class ReadShardStrategy
* <p/>
* The sharding strategy for reads using a simple counting mechanism. Each read shard
* has a specific number of reads (default to 100K) which is configured in the constructor.
*/
public abstract class ReadShardStrategy implements ShardStrategy {
public class ReadShardStrategy implements ShardStrategy {
/**
* What is the maximum number of reads which should go into a read shard.
*/
protected static final int MAX_READS = 10000;
// do we use unmapped reads in the sharding strategy
private boolean unMappedReads = true;
/**
* The data source used to shard.
*/
private final SAMDataSource dataSource;
/**
* The intervals to be processed.
*/
private final GenomeLocSortedSet locations;
/**
* The cached shard to be returned next. Prefetched in the peekable iterator style.
*/
private Shard nextShard = null;
/** our storage of the genomic locations they'd like to shard over */
private final List<FilePointer> filePointers = new ArrayList<FilePointer>();
/**
* Iterator over the list of file pointers.
*/
private final Iterator<FilePointer> filePointerIterator;
/**
* The file pointer currently being processed.
*/
private FilePointer currentFilePointer;
/**
* Ending position of the last shard in the file.
*/
private Map<SAMReaderID,SAMFileSpan> position;
/**
* Create a new read shard strategy, loading read shards from the given BAM file.
* @param dataSource Data source from which to load shards.
* @param locations intervals to use for sharding.
*/
public ReadShardStrategy(SAMDataSource dataSource, GenomeLocSortedSet locations) {
this.dataSource = dataSource;
this.position = this.dataSource.getCurrentPosition();
this.locations = locations;
if(locations != null)
filePointerIterator = IntervalSharder.shardIntervals(this.dataSource,locations.toList());
else
filePointerIterator = filePointers.iterator();
if(filePointerIterator.hasNext())
currentFilePointer = filePointerIterator.next();
advance();
}
/**
* do we have another read shard?
* @return True if any more data is available. False otherwise.
*/
public boolean hasNext() {
return nextShard != null;
}
/**
* Retrieves the next shard, if available.
* @return The next shard, if available.
* @throws java.util.NoSuchElementException if no such shard is available.
*/
public Shard next() {
if(!hasNext())
throw new NoSuchElementException("No next read shard available");
Shard currentShard = nextShard;
advance();
return currentShard;
}
public void advance() {
Map<SAMReaderID,SAMFileSpan> shardPosition = new HashMap<SAMReaderID,SAMFileSpan>();
nextShard = null;
SamRecordFilter filter = null;
if(locations != null) {
Map<SAMReaderID,SAMFileSpan> selectedReaders = new HashMap<SAMReaderID,SAMFileSpan>();
while(selectedReaders.size() == 0 && currentFilePointer != null) {
shardPosition = currentFilePointer.fileSpans;
for(SAMReaderID id: shardPosition.keySet()) {
SAMFileSpan fileSpan = shardPosition.get(id).removeContentsBefore(position.get(id));
if(!fileSpan.isEmpty())
selectedReaders.put(id,fileSpan);
}
if(selectedReaders.size() > 0) {
filter = new ReadOverlapFilter(currentFilePointer.locations);
BAMFormatAwareShard shard = new ReadShard(dataSource.getReadsInfo(),selectedReaders,filter);
dataSource.fillShard(shard);
if(!shard.isBufferEmpty()) {
nextShard = shard;
break;
}
}
selectedReaders.clear();
currentFilePointer = filePointerIterator.hasNext() ? filePointerIterator.next() : null;
}
}
else {
BAMFormatAwareShard shard = new ReadShard(dataSource.getReadsInfo(),position,filter);
dataSource.fillShard(shard);
nextShard = !shard.isBufferEmpty() ? shard : null;
}
this.position = dataSource.getCurrentPosition();
}
/**
* @throws UnsupportedOperationException always.
*/
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
/**
* Convenience method for using ShardStrategy in an foreach loop.
* @return A iterator over shards.
*/
public Iterator<Shard> iterator() {
return this;
}

View File

@ -1,14 +1,11 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.SAMSequenceDictionary;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import java.io.File;
/**
*
* User: aaron
@ -70,9 +67,9 @@ public class ShardStrategyFactory {
static public ShardStrategy shatter(SAMDataSource readsDataSource, IndexedFastaSequenceFile referenceDataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, long limitByCount) {
switch (strat) {
case LOCUS_EXPERIMENTAL:
return new IndexDelimitedLocusShardStrategy(readsDataSource,referenceDataSource,null);
return new LocusShardStrategy(readsDataSource,referenceDataSource,null);
case READS_EXPERIMENTAL:
return new BlockDelimitedReadShardStrategy(readsDataSource,null);
return new ReadShardStrategy(readsDataSource,null);
default:
throw new StingException("Strategy: " + strat + " isn't implemented for this type of shatter request");
}
@ -108,9 +105,9 @@ public class ShardStrategyFactory {
static public ShardStrategy shatter(SAMDataSource readsDataSource, IndexedFastaSequenceFile referenceDataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst, long limitDataCount) {
switch (strat) {
case LOCUS_EXPERIMENTAL:
return new IndexDelimitedLocusShardStrategy(readsDataSource,referenceDataSource,lst);
return new LocusShardStrategy(readsDataSource,referenceDataSource,lst);
case READS_EXPERIMENTAL:
return new BlockDelimitedReadShardStrategy(readsDataSource,lst);
return new ReadShardStrategy(readsDataSource,lst);
default:
throw new StingException("Strategy: " + strat + " isn't implemented");
}

View File

@ -1,526 +0,0 @@
package org.broadinstitute.sting.gatk.datasources.simpleDataSources;
import org.broadinstitute.sting.gatk.datasources.shards.*;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
import org.broadinstitute.sting.utils.StingException;
import net.sf.samtools.*;
import net.sf.samtools.util.CloseableIterator;
import net.sf.picard.sam.SamFileHeaderMerger;
import net.sf.picard.sam.MergingSamRecordIterator;
import net.sf.picard.filter.FilteringIterator;
import java.util.*;
import java.io.File;
import static net.sf.samtools.SAMFileHeader.SortOrder;
/**
* An iterator that's aware of how data is stored on disk in SAM format.
*
* @author mhanna
* @version 0.1
*/
public class BlockDrivenSAMDataSource extends SAMDataSource {
/**
* A collection of readers driving the merging process.
*/
private final SAMResourcePool resourcePool;
/**
* The sort order of the BAM files. Files without a sort order tag are assumed to be
* in coordinate order.
*/
private SAMFileHeader.SortOrder sortOrder = null;
/**
* The merged header.
*/
private final SAMFileHeader mergedHeader;
/**
* Whether the read groups in overlapping files collide.
*/
private final boolean hasReadGroupCollisions;
/**
* Maps the SAM readers' original read group ids to their revised ids.
*/
private final Map<SAMReaderID,ReadGroupMapping> mergedReadGroupMappings = new HashMap<SAMReaderID,ReadGroupMapping>();
/**
* How far along is each reader?
*/
private final Map<SAMReaderID,SAMFileSpan> readerPositions = new HashMap<SAMReaderID,SAMFileSpan>();
/**
* Create a new block-aware SAM data source given the supplied read metadata.
* @param reads The read metadata.
*/
public BlockDrivenSAMDataSource(Reads reads) {
super(reads);
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
SAMReaders readers = resourcePool.getAvailableReaders();
// Determine the sort order.
for(SAMFileReader reader: readers.values()) {
// Get the sort order, forcing it to coordinate if unsorted.
SAMFileHeader header = reader.getFileHeader();
SortOrder sortOrder = header.getSortOrder() != SortOrder.unsorted ? header.getSortOrder() : SortOrder.coordinate;
// Validate that all input files are sorted in the same order.
if(this.sortOrder != null && this.sortOrder != sortOrder)
throw new StingException(String.format("Attempted to process mixed of files sorted as %s and %s.",this.sortOrder,sortOrder));
// Update the sort order.
this.sortOrder = sortOrder;
}
initializeReaderPositions(readers);
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
mergedHeader = headerMerger.getMergedHeader();
hasReadGroupCollisions = headerMerger.hasReadGroupCollisions();
// cache the read group id (original) -> read group id (merged) mapping.
for(SAMReaderID id: readerIDs) {
SAMFileReader reader = readers.getReader(id);
ReadGroupMapping mapping = new ReadGroupMapping();
List<SAMReadGroupRecord> readGroups = reader.getFileHeader().getReadGroups();
for(SAMReadGroupRecord readGroup: readGroups) {
if(headerMerger.hasReadGroupCollisions())
mapping.put(readGroup.getReadGroupId(),headerMerger.getReadGroupId(reader,readGroup.getReadGroupId()));
else
mapping.put(readGroup.getReadGroupId(),readGroup.getReadGroupId());
}
mergedReadGroupMappings.put(id,mapping);
}
resourcePool.releaseReaders(readers);
}
/**
* True if all readers have an index.
* @return
*/
public boolean hasIndex() {
for(SAMFileReader reader: resourcePool.getReadersWithoutLocking()) {
if(!reader.hasIndex())
return false;
}
return true;
}
/**
* Retrieves the sort order of the readers.
* @return Sort order. Can be unsorted, coordinate order, or query name order.
*/
public SortOrder getSortOrder() {
return sortOrder;
}
/**
* Gets the index for a particular reader. Always preloaded.
* @param id Id of the reader.
* @return The index. Will preload the index if necessary.
*/
public BrowseableBAMIndex getIndex(final SAMReaderID id) {
SAMReaders readers = resourcePool.getReadersWithoutLocking();
return readers.getReader(id).getBrowseableIndex();
}
/**
* Retrieves the current position within the BAM file.
* @return A mapping of reader to current position.
*/
public Map<SAMReaderID,SAMFileSpan> getCurrentPosition() {
return readerPositions;
}
/**
* Fill the given buffering shard with reads.
* @param shard Shard to fill.
* @return true if at the end of the stream. False otherwise.
*/
public void fillShard(BAMFormatAwareShard shard) {
if(!shard.buffersReads())
throw new StingException("Attempting to fill a non-buffering shard.");
SAMReaders readers = resourcePool.getAvailableReaders();
// Cache the most recently viewed read so that we can check whether we've reached the end of a pair.
SAMRecord read = null;
CloseableIterator<SAMRecord> iterator = getIterator(readers,shard,sortOrder == SortOrder.coordinate);
while(!shard.isBufferFull() && iterator.hasNext()) {
read = iterator.next();
addReadToBufferingShard(shard,getReaderID(readers,read),read);
}
// If the reads are sorted in queryname order, ensure that all reads
// having the same queryname become part of the same shard.
if(sortOrder == SortOrder.queryname) {
while(iterator.hasNext()) {
SAMRecord nextRead = iterator.next();
if(read == null || !read.getReadName().equals(nextRead.getReadName()))
break;
addReadToBufferingShard(shard,getReaderID(readers,nextRead),nextRead);
}
}
iterator.close();
}
/**
* Retrieves the id of the reader which built the given read.
* @param read The read to test.
* @return ID of the reader.
*/
public SAMReaderID getReaderID(SAMRecord read) {
return resourcePool.getReaderID(read.getFileSource().getReader());
}
/**
* Adds this read to the given shard.
* @param shard The shard to which to add the read.
* @param id The id of the given reader.
* @param read The read to add to the shard.
*/
private void addReadToBufferingShard(BAMFormatAwareShard shard,SAMReaderID id,SAMRecord read) {
SAMFileSpan endChunk = read.getFileSource().getFilePointer().getContentsFollowing();
shard.addRead(read);
readerPositions.put(id,endChunk);
}
/**
* Gets the reader associated with the given read.
* @param readers Available readers.
* @param read
* @return
*/
private SAMReaderID getReaderID(SAMReaders readers, SAMRecord read) {
for(SAMReaderID id: getReaderIDs()) {
if(readers.getReader(id) == read.getFileSource().getReader())
return id;
}
throw new StingException("Unable to find id for reader associated with read " + read.getReadName());
}
/**
* Initialize the current reader positions
* @param readers
*/
private void initializeReaderPositions(SAMReaders readers) {
for(SAMReaderID id: getReaderIDs())
readerPositions.put(id,readers.getReader(id).getFilePointerSpanningReads());
}
public StingSAMIterator seek(Shard shard) {
// todo: refresh monolithic sharding implementation
if(shard instanceof MonolithicShard)
return seekMonolithic(shard);
if(!(shard instanceof BAMFormatAwareShard))
throw new StingException("BlockDrivenSAMDataSource cannot operate on shards of type: " + shard.getClass());
BAMFormatAwareShard bamAwareShard = (BAMFormatAwareShard)shard;
if(bamAwareShard.buffersReads()) {
return bamAwareShard.iterator();
}
else {
SAMReaders readers = resourcePool.getAvailableReaders();
return getIterator(readers,bamAwareShard,shard instanceof ReadShard);
}
}
/**
* Get an iterator over the data types specified in the shard.
* @param readers Readers from which to load data.
* @param shard The shard specifying the data limits.
* @param enableVerification True to verify. For compatibility with old sharding strategy.
* TODO: Collapse this flag when the two sharding systems are merged.
* @return An iterator over the selected data.
*/
private StingSAMIterator getIterator(SAMReaders readers, BAMFormatAwareShard shard, boolean enableVerification) {
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
// Set up merging to dynamically merge together multiple BAMs.
MergingSamRecordIterator mergingIterator = new MergingSamRecordIterator(headerMerger,true);
for(SAMReaderID id: getReaderIDs()) {
if(shard.getFileSpans().get(id) == null)
continue;
CloseableIterator<SAMRecord> iterator = readers.getReader(id).iterator(shard.getFileSpans().get(id));
if(shard.getFilter() != null)
iterator = new FilteringIterator(iterator,shard.getFilter()); // not a counting iterator because we don't want to show the filtering of reads
mergingIterator.addIterator(readers.getReader(id),iterator);
}
return applyDecoratingIterators(enableVerification,
new ReleasingIterator(readers,StingSAMIteratorAdapter.adapt(reads,mergingIterator)),
reads.getDownsamplingMethod().toFraction,
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
}
/**
* A stopgap measure to handle monolithic sharding
* @param shard the (monolithic) shard.
* @return An iterator over the monolithic shard.
*/
private StingSAMIterator seekMonolithic(Shard shard) {
SAMReaders readers = resourcePool.getAvailableReaders();
// Set up merging and filtering to dynamically merge together multiple BAMs and filter out records not in the shard set.
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
MergingSamRecordIterator mergingIterator = new MergingSamRecordIterator(headerMerger,true);
for(SAMReaderID id: getReaderIDs())
mergingIterator.addIterator(readers.getReader(id),readers.getReader(id).iterator());
return applyDecoratingIterators(shard instanceof ReadShard,
new ReleasingIterator(readers,StingSAMIteratorAdapter.adapt(reads,mergingIterator)),
reads.getDownsamplingMethod().toFraction,
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
}
/**
* Gets the merged header from the SAM file.
* @return The merged header.
*/
public SAMFileHeader getHeader() {
return mergedHeader;
}
public SAMFileHeader getHeader(SAMReaderID id) {
return resourcePool.getReadersWithoutLocking().getReader(id).getFileHeader();
}
/**
* No read group collisions at this time because only one SAM file is currently supported.
* @return False always.
*/
public boolean hasReadGroupCollisions() {
return hasReadGroupCollisions;
}
/**
* Gets the revised read group id mapped to this 'original' read group id.
* @return Merged read group ID.
*/
public String getReadGroupId(final SAMReaderID reader, final String originalReadGroupId) {
return mergedReadGroupMappings.get(reader).get(originalReadGroupId);
}
private class SAMResourcePool {
/**
* How many entries can be cached in this resource pool?
*/
private final int maxEntries;
/**
* All iterators of this reference-ordered data.
*/
private List<SAMReaders> allResources = new ArrayList<SAMReaders>();
/**
* All iterators that are not currently in service.
*/
private List<SAMReaders> availableResources = new ArrayList<SAMReaders>();
public SAMResourcePool(final int maxEntries) {
this.maxEntries = maxEntries;
}
/**
* Dangerous internal method; retrieves any set of readers, whether in iteration or not.
* Used to handle non-exclusive, stateless operations, such as index queries.
* @return Any collection of SAMReaders, whether in iteration or not.
*/
protected SAMReaders getReadersWithoutLocking() {
synchronized(this) {
if(allResources.size() == 0)
createNewResource();
}
return allResources.get(0);
}
/**
* Choose a set of readers from the pool to use for this query. When complete,
* @return
*/
public synchronized SAMReaders getAvailableReaders() {
if(availableResources.size() == 0)
createNewResource();
SAMReaders readers = availableResources.get(0);
availableResources.remove(readers);
return readers;
}
public synchronized void releaseReaders(SAMReaders readers) {
if(!allResources.contains(readers))
throw new StingException("Tried to return readers from the pool that didn't originate in the pool.");
availableResources.add(readers);
}
/**
* Gets the reader id for the given reader.
* @param reader Reader for which to determine the id.
* @return id of the given reader.
*/
protected synchronized SAMReaderID getReaderID(SAMFileReader reader) {
for(SAMReaders readers: allResources) {
SAMReaderID id = readers.getReaderID(reader);
if(id != null)
return id;
}
throw new StingException("No such reader id is available");
}
private synchronized void createNewResource() {
if(allResources.size() > maxEntries)
throw new StingException("Cannot create a new resource pool. All resources are in use.");
SAMReaders readers = new SAMReaders(reads);
allResources.add(readers);
availableResources.add(readers);
}
}
/**
* A collection of readers derived from a reads metadata structure.
*/
private class SAMReaders implements Iterable<SAMFileReader> {
/**
* Internal storage for a map of id -> reader.
*/
private final Map<SAMReaderID,SAMFileReader> readers = new LinkedHashMap<SAMReaderID,SAMFileReader>();
/**
* Derive a new set of readers from the Reads metadata.
* @param sourceInfo Metadata for the reads to load.
*/
public SAMReaders(Reads sourceInfo) {
for(File readsFile: sourceInfo.getReadsFiles()) {
SAMFileReader reader = new SAMFileReader(readsFile,true);
reader.enableFileSource(true);
reader.enableIndexCaching(true);
reader.setValidationStringency(sourceInfo.getValidationStringency());
// If no read group is present, hallucinate one.
// TODO: Straw poll to see whether this is really required.
final SAMFileHeader header = reader.getFileHeader();
logger.debug(String.format("Sort order is: " + header.getSortOrder()));
if (reader.getFileHeader().getReadGroups().size() < 1) {
SAMReadGroupRecord rec = new SAMReadGroupRecord(readsFile.getName());
rec.setLibrary(readsFile.getName());
rec.setSample(readsFile.getName());
reader.getFileHeader().addReadGroup(rec);
}
readers.put(new SAMReaderID(readsFile),reader);
}
}
/**
* Retrieve the reader from the data structure.
* @param id The ID of the reader to retrieve.
* @return the reader associated with the given id.
*/
public SAMFileReader getReader(SAMReaderID id) {
if(!readers.containsKey(id))
throw new NoSuchElementException("No reader is associated with id " + id);
return readers.get(id);
}
/**
* Searches for the reader id of this reader.
* @param reader Reader for which to search.
* @return The id associated the given reader, or null if the reader is not present in this collection.
*/
protected SAMReaderID getReaderID(SAMFileReader reader) {
for(Map.Entry<SAMReaderID,SAMFileReader> entry: readers.entrySet()) {
if(reader == entry.getValue())
return entry.getKey();
}
// Not found? return null.
return null;
}
/**
* Returns an iterator over all readers in this structure.
* @return An iterator over readers.
*/
public Iterator<SAMFileReader> iterator() {
return readers.values().iterator();
}
/**
* Returns whether any readers are present in this structure.
* @return
*/
public boolean isEmpty() {
return readers.isEmpty();
}
/**
* Gets all the actual readers out of this data structure.
* @return A collection of the readers.
*/
public Collection<SAMFileReader> values() {
return readers.values();
}
}
private class ReleasingIterator implements StingSAMIterator {
/**
* The resource acting as the source of the data.
*/
private final SAMReaders resource;
/**
* The iterator to wrap.
*/
private final StingSAMIterator wrappedIterator;
public Reads getSourceInfo() {
return wrappedIterator.getSourceInfo();
}
public ReleasingIterator(SAMReaders resource, StingSAMIterator wrapped) {
this.resource = resource;
this.wrappedIterator = wrapped;
}
public ReleasingIterator iterator() {
return this;
}
public void remove() {
throw new UnsupportedOperationException("Can't remove from a StingSAMIterator");
}
public void close() {
wrappedIterator.close();
resourcePool.releaseReaders(resource);
}
public boolean hasNext() {
return wrappedIterator.hasNext();
}
public SAMRecord next() {
return wrappedIterator.next();
}
}
/**
* Maps read groups in the original SAMFileReaders to read groups in
*/
private class ReadGroupMapping extends HashMap<String,String> {}
}

View File

@ -1,23 +1,5 @@
package org.broadinstitute.sting.gatk.datasources.simpleDataSources;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMRecord;
import net.sf.picard.filter.SamRecordFilter;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.iterators.*;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.filters.CountingFilteringIterator;
import org.broadinstitute.sting.utils.sam.SAMReadViolationHistogram;
import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.ArrayList;
/*
* Copyright (c) 2009 The Broad Institute
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
@ -30,7 +12,6 @@ import java.util.ArrayList;
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
@ -41,6 +22,30 @@ import java.util.ArrayList;
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.simpleDataSources;
import net.sf.samtools.*;
import net.sf.samtools.util.CloseableIterator;
import net.sf.picard.filter.SamRecordFilter;
import net.sf.picard.filter.FilteringIterator;
import net.sf.picard.sam.SamFileHeaderMerger;
import net.sf.picard.sam.MergingSamRecordIterator;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.BAMFormatAwareShard;
import org.broadinstitute.sting.gatk.datasources.shards.MonolithicShard;
import org.broadinstitute.sting.gatk.datasources.shards.ReadShard;
import org.broadinstitute.sting.gatk.iterators.*;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
import org.broadinstitute.sting.gatk.filters.CountingFilteringIterator;
import org.broadinstitute.sting.utils.sam.SAMReadViolationHistogram;
import org.broadinstitute.sting.utils.StingException;
import java.io.File;
import java.util.*;
/**
* User: aaron
* Date: Mar 26, 2009
@ -48,7 +53,7 @@ import java.util.ArrayList;
* <p/>
* Converts shards to SAM iterators over the specified region
*/
public abstract class SAMDataSource implements SimpleDataSource {
public class SAMDataSource implements SimpleDataSource {
/** Backing support for reads. */
protected final Reads reads;
@ -57,31 +62,50 @@ public abstract class SAMDataSource implements SimpleDataSource {
*/
protected final List<SAMReaderID> readerIDs = new ArrayList<SAMReaderID>();
/** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(SAMDataSource.class);
/**
* How far along is each reader?
*/
private final Map<SAMReaderID, SAMFileSpan> readerPositions = new HashMap<SAMReaderID,SAMFileSpan>();
// do we take unmapped reads
protected boolean includeUnmappedReads = true;
/**
* The merged header.
*/
private final SAMFileHeader mergedHeader;
/**
* The sort order of the BAM files. Files without a sort order tag are assumed to be
* in coordinate order.
*/
private SAMFileHeader.SortOrder sortOrder = null;
/**
* Whether the read groups in overlapping files collide.
*/
private final boolean hasReadGroupCollisions;
/**
* Maps the SAM readers' original read group ids to their revised ids.
*/
private final Map<SAMReaderID,ReadGroupMapping> mergedReadGroupMappings = new HashMap<SAMReaderID,ReadGroupMapping>();
/** our log, which we want to capture anything from this class */
private static Logger logger = Logger.getLogger(SAMDataSource.class);
/**
* A histogram of exactly what reads were removed from the input stream and why.
*/
protected SAMReadViolationHistogram violations = new SAMReadViolationHistogram();
private SAMReadViolationHistogram violations = new SAMReadViolationHistogram();
/**
* Returns a histogram of reads that were screened out, grouped by the nature of the error.
* @return Histogram of reads. Will not be null.
* A collection of readers driving the merging process.
*/
public SAMReadViolationHistogram getViolationHistogram() {
return violations;
}
private final SAMResourcePool resourcePool;
/**
* constructor, given sam files
*
* @param reads the list of sam files
* Create a new SAM data source given the supplied read metadata.
* @param reads The read metadata.
*/
public SAMDataSource( Reads reads ) throws SimpleDataSourceLoadException {
public SAMDataSource(Reads reads) {
this.reads = reads;
// check the length
@ -94,42 +118,49 @@ public abstract class SAMDataSource implements SimpleDataSource {
}
readerIDs.add(new SAMReaderID(smFile));
}
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
SAMReaders readers = resourcePool.getAvailableReaders();
// Determine the sort order.
for(SAMFileReader reader: readers.values()) {
// Get the sort order, forcing it to coordinate if unsorted.
SAMFileHeader header = reader.getFileHeader();
SAMFileHeader.SortOrder sortOrder = header.getSortOrder() != SAMFileHeader.SortOrder.unsorted ? header.getSortOrder() : SAMFileHeader.SortOrder.coordinate;
// Validate that all input files are sorted in the same order.
if(this.sortOrder != null && this.sortOrder != sortOrder)
throw new StingException(String.format("Attempted to process mixed of files sorted as %s and %s.",this.sortOrder,sortOrder));
// Update the sort order.
this.sortOrder = sortOrder;
}
initializeReaderPositions(readers);
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
mergedHeader = headerMerger.getMergedHeader();
hasReadGroupCollisions = headerMerger.hasReadGroupCollisions();
// cache the read group id (original) -> read group id (merged) mapping.
for(SAMReaderID id: readerIDs) {
SAMFileReader reader = readers.getReader(id);
ReadGroupMapping mapping = new ReadGroupMapping();
List<SAMReadGroupRecord> readGroups = reader.getFileHeader().getReadGroups();
for(SAMReadGroupRecord readGroup: readGroups) {
if(headerMerger.hasReadGroupCollisions())
mapping.put(readGroup.getReadGroupId(),headerMerger.getReadGroupId(reader,readGroup.getReadGroupId()));
else
mapping.put(readGroup.getReadGroupId(),readGroup.getReadGroupId());
}
mergedReadGroupMappings.put(id,mapping);
}
resourcePool.releaseReaders(readers);
}
/**
* Do all BAM files backing this data source have an index? The case where hasIndex() is false
* is supported, but only in a few extreme cases.
* @return True if an index is present; false otherwise.
*/
public abstract boolean hasIndex();
/**
* Retrieves the sort order of the readers.
* @return Sort order. Can be unsorted, coordinate order, or query name order.
*/
public abstract SAMFileHeader.SortOrder getSortOrder();
/**
* Gets the (potentially merged) SAM file header.
*
* @return SAM file header.
*/
public abstract SAMFileHeader getHeader();
/**
* Gets the (unmerged) header for the given reader.
* @param reader Unique identifier for the reader.
* @return Unmerged header.
*/
public abstract SAMFileHeader getHeader(SAMReaderID reader);
/**
* Retrieves the id of the reader which built the given read.
* @param read The read to test.
* @return ID of the reader.
*/
public abstract SAMReaderID getReaderID(SAMRecord read);
/**
* Returns Reads data structure containing information about the reads data sources placed in this pool as well as
* information about how they are downsampled, sorted, and filtered
@ -137,13 +168,6 @@ public abstract class SAMDataSource implements SimpleDataSource {
*/
public Reads getReadsInfo() { return reads; }
/**
* Returns readers used by this data source.
*/
public List<SAMReaderID> getReaderIDs() {
return readerIDs;
}
/**
* Gets the SAM file associated with a given reader ID.
* @param id The reader for which to retrieve the source file.
@ -153,19 +177,234 @@ public abstract class SAMDataSource implements SimpleDataSource {
return id.samFile;
}
/** Returns true if there are read group duplicates within the merged headers. */
public abstract boolean hasReadGroupCollisions();
/** Returns the read group id that should be used for the input read and RG id. */
public abstract String getReadGroupId(final SAMReaderID reader, final String originalReadGroupId);
/**
* Returns readers used by this data source.
* @return A list of SAM reader IDs.
*/
public List<SAMReaderID> getReaderIDs() {
return readerIDs;
}
/**
*
* @param shard the shard to get data for
*
* @return an iterator for that region
* Retrieves the id of the reader which built the given read.
* @param read The read to test.
* @return ID of the reader.
*/
public abstract StingSAMIterator seek(Shard shard);
public SAMReaderID getReaderID(SAMRecord read) {
return resourcePool.getReaderID(read.getFileSource().getReader());
}
/**
* Retrieves the current position within the BAM file.
* @return A mapping of reader to current position.
*/
public Map<SAMReaderID,SAMFileSpan> getCurrentPosition() {
return readerPositions;
}
/**
* Gets the merged header from the SAM file.
* @return The merged header.
*/
public SAMFileHeader getHeader() {
return mergedHeader;
}
public SAMFileHeader getHeader(SAMReaderID id) {
return resourcePool.getReadersWithoutLocking().getReader(id).getFileHeader();
}
/**
* Gets the revised read group id mapped to this 'original' read group id.
* @param reader for which to grab a read group.
* @param originalReadGroupId ID of the original read group.
* @return Merged read group ID.
*/
public String getReadGroupId(final SAMReaderID reader, final String originalReadGroupId) {
return mergedReadGroupMappings.get(reader).get(originalReadGroupId);
}
/**
* No read group collisions at this time because only one SAM file is currently supported.
* @return False always.
*/
public boolean hasReadGroupCollisions() {
return hasReadGroupCollisions;
}
/**
* True if all readers have an index.
* @return
*/
public boolean hasIndex() {
for(SAMFileReader reader: resourcePool.getReadersWithoutLocking()) {
if(!reader.hasIndex())
return false;
}
return true;
}
/**
* Gets the index for a particular reader. Always preloaded.
* @param id Id of the reader.
* @return The index. Will preload the index if necessary.
*/
public BrowseableBAMIndex getIndex(final SAMReaderID id) {
SAMReaders readers = resourcePool.getReadersWithoutLocking();
return readers.getReader(id).getBrowseableIndex();
}
/**
* Retrieves the sort order of the readers.
* @return Sort order. Can be unsorted, coordinate order, or query name order.
*/
public SAMFileHeader.SortOrder getSortOrder() {
return sortOrder;
}
/**
* Returns a histogram of reads that were screened out, grouped by the nature of the error.
* @return Histogram of reads. Will not be null.
*/
public SAMReadViolationHistogram getViolationHistogram() {
return violations;
}
/**
* Fill the given buffering shard with reads.
* @param shard Shard to fill.
* @return true if at the end of the stream. False otherwise.
*/
public void fillShard(BAMFormatAwareShard shard) {
if(!shard.buffersReads())
throw new StingException("Attempting to fill a non-buffering shard.");
SAMReaders readers = resourcePool.getAvailableReaders();
// Cache the most recently viewed read so that we can check whether we've reached the end of a pair.
SAMRecord read = null;
CloseableIterator<SAMRecord> iterator = getIterator(readers,shard,sortOrder == SAMFileHeader.SortOrder.coordinate);
while(!shard.isBufferFull() && iterator.hasNext()) {
read = iterator.next();
addReadToBufferingShard(shard,getReaderID(readers,read),read);
}
// If the reads are sorted in queryname order, ensure that all reads
// having the same queryname become part of the same shard.
if(sortOrder == SAMFileHeader.SortOrder.queryname) {
while(iterator.hasNext()) {
SAMRecord nextRead = iterator.next();
if(read == null || !read.getReadName().equals(nextRead.getReadName()))
break;
addReadToBufferingShard(shard,getReaderID(readers,nextRead),nextRead);
}
}
iterator.close();
}
public StingSAMIterator seek(Shard shard) {
// todo: refresh monolithic sharding implementation
if(shard instanceof MonolithicShard)
return seekMonolithic(shard);
if(!(shard instanceof BAMFormatAwareShard))
throw new StingException("BlockDrivenSAMDataSource cannot operate on shards of type: " + shard.getClass());
BAMFormatAwareShard bamAwareShard = (BAMFormatAwareShard)shard;
if(bamAwareShard.buffersReads()) {
return bamAwareShard.iterator();
}
else {
SAMReaders readers = resourcePool.getAvailableReaders();
return getIterator(readers,bamAwareShard,shard instanceof ReadShard);
}
}
/**
* Gets the reader associated with the given read.
* @param readers Available readers.
* @param read
* @return
*/
private SAMReaderID getReaderID(SAMReaders readers, SAMRecord read) {
for(SAMReaderID id: getReaderIDs()) {
if(readers.getReader(id) == read.getFileSource().getReader())
return id;
}
throw new StingException("Unable to find id for reader associated with read " + read.getReadName());
}
/**
* Initialize the current reader positions
* @param readers
*/
private void initializeReaderPositions(SAMReaders readers) {
for(SAMReaderID id: getReaderIDs())
readerPositions.put(id,readers.getReader(id).getFilePointerSpanningReads());
}
/**
* Get an iterator over the data types specified in the shard.
* @param readers Readers from which to load data.
* @param shard The shard specifying the data limits.
* @param enableVerification True to verify. For compatibility with old sharding strategy.
* TODO: Collapse this flag when the two sharding systems are merged.
* @return An iterator over the selected data.
*/
private StingSAMIterator getIterator(SAMReaders readers, BAMFormatAwareShard shard, boolean enableVerification) {
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
// Set up merging to dynamically merge together multiple BAMs.
MergingSamRecordIterator mergingIterator = new MergingSamRecordIterator(headerMerger,true);
for(SAMReaderID id: getReaderIDs()) {
if(shard.getFileSpans().get(id) == null)
continue;
CloseableIterator<SAMRecord> iterator = readers.getReader(id).iterator(shard.getFileSpans().get(id));
if(shard.getFilter() != null)
iterator = new FilteringIterator(iterator,shard.getFilter()); // not a counting iterator because we don't want to show the filtering of reads
mergingIterator.addIterator(readers.getReader(id),iterator);
}
return applyDecoratingIterators(enableVerification,
new ReleasingIterator(readers,StingSAMIteratorAdapter.adapt(reads,mergingIterator)),
reads.getDownsamplingMethod().toFraction,
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
}
/**
* A stopgap measure to handle monolithic sharding
* @param shard the (monolithic) shard.
* @return An iterator over the monolithic shard.
*/
private StingSAMIterator seekMonolithic(Shard shard) {
SAMReaders readers = resourcePool.getAvailableReaders();
// Set up merging and filtering to dynamically merge together multiple BAMs and filter out records not in the shard set.
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
MergingSamRecordIterator mergingIterator = new MergingSamRecordIterator(headerMerger,true);
for(SAMReaderID id: getReaderIDs())
mergingIterator.addIterator(readers.getReader(id),readers.getReader(id).iterator());
return applyDecoratingIterators(shard instanceof ReadShard,
new ReleasingIterator(readers,StingSAMIteratorAdapter.adapt(reads,mergingIterator)),
reads.getDownsamplingMethod().toFraction,
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
}
/**
* Adds this read to the given shard.
* @param shard The shard to which to add the read.
* @param id The id of the given reader.
* @param read The read to add to the shard.
*/
private void addReadToBufferingShard(BAMFormatAwareShard shard,SAMReaderID id,SAMRecord read) {
SAMFileSpan endChunk = read.getFileSource().getFilePointer().getContentsFollowing();
shard.addRead(read);
readerPositions.put(id,endChunk);
}
/**
* Filter reads based on user-specified criteria.
@ -197,10 +436,219 @@ public abstract class SAMDataSource implements SimpleDataSource {
for( SamRecordFilter supplementalFilter: supplementalFilters )
wrappedIterator = StingSAMIteratorAdapter.adapt(wrappedIterator.getSourceInfo(),
new CountingFilteringIterator(wrappedIterator,supplementalFilter));
new CountingFilteringIterator(wrappedIterator,supplementalFilter));
return wrappedIterator;
}
private class SAMResourcePool {
/**
* How many entries can be cached in this resource pool?
*/
private final int maxEntries;
/**
* All iterators of this reference-ordered data.
*/
private List<SAMReaders> allResources = new ArrayList<SAMReaders>();
/**
* All iterators that are not currently in service.
*/
private List<SAMReaders> availableResources = new ArrayList<SAMReaders>();
public SAMResourcePool(final int maxEntries) {
this.maxEntries = maxEntries;
}
/**
* Dangerous internal method; retrieves any set of readers, whether in iteration or not.
* Used to handle non-exclusive, stateless operations, such as index queries.
* @return Any collection of SAMReaders, whether in iteration or not.
*/
protected SAMReaders getReadersWithoutLocking() {
synchronized(this) {
if(allResources.size() == 0)
createNewResource();
}
return allResources.get(0);
}
/**
* Choose a set of readers from the pool to use for this query. When complete,
* @return
*/
public synchronized SAMReaders getAvailableReaders() {
if(availableResources.size() == 0)
createNewResource();
SAMReaders readers = availableResources.get(0);
availableResources.remove(readers);
return readers;
}
public synchronized void releaseReaders(SAMReaders readers) {
if(!allResources.contains(readers))
throw new StingException("Tried to return readers from the pool that didn't originate in the pool.");
availableResources.add(readers);
}
/**
* Gets the reader id for the given reader.
* @param reader Reader for which to determine the id.
* @return id of the given reader.
*/
protected synchronized SAMReaderID getReaderID(SAMFileReader reader) {
for(SAMReaders readers: allResources) {
SAMReaderID id = readers.getReaderID(reader);
if(id != null)
return id;
}
throw new StingException("No such reader id is available");
}
private synchronized void createNewResource() {
if(allResources.size() > maxEntries)
throw new StingException("Cannot create a new resource pool. All resources are in use.");
SAMReaders readers = new SAMReaders(reads);
allResources.add(readers);
availableResources.add(readers);
}
}
/**
* A collection of readers derived from a reads metadata structure.
*/
private class SAMReaders implements Iterable<SAMFileReader> {
/**
* Internal storage for a map of id -> reader.
*/
private final Map<SAMReaderID,SAMFileReader> readers = new LinkedHashMap<SAMReaderID,SAMFileReader>();
/**
* Derive a new set of readers from the Reads metadata.
* @param sourceInfo Metadata for the reads to load.
*/
public SAMReaders(Reads sourceInfo) {
for(File readsFile: sourceInfo.getReadsFiles()) {
SAMFileReader reader = new SAMFileReader(readsFile,true);
reader.enableFileSource(true);
reader.enableIndexCaching(true);
reader.setValidationStringency(sourceInfo.getValidationStringency());
// If no read group is present, hallucinate one.
// TODO: Straw poll to see whether this is really required.
final SAMFileHeader header = reader.getFileHeader();
logger.debug(String.format("Sort order is: " + header.getSortOrder()));
if (reader.getFileHeader().getReadGroups().size() < 1) {
SAMReadGroupRecord rec = new SAMReadGroupRecord(readsFile.getName());
rec.setLibrary(readsFile.getName());
rec.setSample(readsFile.getName());
reader.getFileHeader().addReadGroup(rec);
}
readers.put(new SAMReaderID(readsFile),reader);
}
}
/**
* Retrieve the reader from the data structure.
* @param id The ID of the reader to retrieve.
* @return the reader associated with the given id.
*/
public SAMFileReader getReader(SAMReaderID id) {
if(!readers.containsKey(id))
throw new NoSuchElementException("No reader is associated with id " + id);
return readers.get(id);
}
/**
* Searches for the reader id of this reader.
* @param reader Reader for which to search.
* @return The id associated the given reader, or null if the reader is not present in this collection.
*/
protected SAMReaderID getReaderID(SAMFileReader reader) {
for(Map.Entry<SAMReaderID,SAMFileReader> entry: readers.entrySet()) {
if(reader == entry.getValue())
return entry.getKey();
}
// Not found? return null.
return null;
}
/**
* Returns an iterator over all readers in this structure.
* @return An iterator over readers.
*/
public Iterator<SAMFileReader> iterator() {
return readers.values().iterator();
}
/**
* Returns whether any readers are present in this structure.
* @return
*/
public boolean isEmpty() {
return readers.isEmpty();
}
/**
* Gets all the actual readers out of this data structure.
* @return A collection of the readers.
*/
public Collection<SAMFileReader> values() {
return readers.values();
}
}
private class ReleasingIterator implements StingSAMIterator {
/**
* The resource acting as the source of the data.
*/
private final SAMReaders resource;
/**
* The iterator to wrap.
*/
private final StingSAMIterator wrappedIterator;
public Reads getSourceInfo() {
return wrappedIterator.getSourceInfo();
}
public ReleasingIterator(SAMReaders resource, StingSAMIterator wrapped) {
this.resource = resource;
this.wrappedIterator = wrapped;
}
public ReleasingIterator iterator() {
return this;
}
public void remove() {
throw new UnsupportedOperationException("Can't remove from a StingSAMIterator");
}
public void close() {
wrappedIterator.close();
resourcePool.releaseReaders(resource);
}
public boolean hasNext() {
return wrappedIterator.hasNext();
}
public SAMRecord next() {
return wrappedIterator.next();
}
}
/**
* Maps read groups in the original SAMFileReaders to read groups in
*/
private class ReadGroupMapping extends HashMap<String,String> {}
}

View File

@ -4,9 +4,8 @@ import org.junit.Test;
import org.junit.Assert;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.LocusShard;
import org.broadinstitute.sting.gatk.datasources.shards.MockLocusShard;
import org.broadinstitute.sting.gatk.iterators.GenomeLocusIterator;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
@ -57,7 +56,7 @@ public class LocusReferenceViewUnitTest extends ReferenceViewTemplate {
@Test
public void testOverlappingReferenceBases() {
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc(0, sequenceFile.getSequence("chrM").length() - 10, sequenceFile.getSequence("chrM").length())));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc(0, sequenceFile.getSequence("chrM").length() - 10, sequenceFile.getSequence("chrM").length())));
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, null, shard.getGenomeLocs().get(0), null, sequenceFile, null);
LocusReferenceView view = new LocusReferenceView(dataProvider);
@ -74,7 +73,7 @@ public class LocusReferenceViewUnitTest extends ReferenceViewTemplate {
/** Queries outside the bounds of the shard should result in reference context window trimmed at the shard boundary. */
@Test
public void testBoundsFailure() {
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc(0, 1, 50)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc(0, 1, 50)));
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, null, shard.getGenomeLocs().get(0), null, sequenceFile, null);
LocusReferenceView view = new LocusReferenceView(dataProvider);
@ -94,7 +93,7 @@ public class LocusReferenceViewUnitTest extends ReferenceViewTemplate {
* @param loc
*/
protected void validateLocation( GenomeLoc loc ) {
Shard shard = new LocusShard(Collections.singletonList(loc));
Shard shard = new MockLocusShard(Collections.singletonList(loc));
GenomeLocusIterator shardIterator = new GenomeLocusIterator(loc);
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, null, loc, null, sequenceFile, null);

View File

@ -9,6 +9,8 @@ import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.executive.WindowMaker;
import org.broadinstitute.sting.gatk.datasources.shards.LocusShard;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.MockLocusShard;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.iterators.LocusIteratorByState;
import org.broadinstitute.sting.utils.GenomeLoc;
@ -48,7 +50,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecordIterator iterator = new SAMRecordIterator();
GenomeLoc shardBounds = GenomeLocParser.createGenomeLoc("chr1", 1, 5);
Shard shard = new LocusShard(Collections.singletonList(shardBounds));
Shard shard = new LocusShard(Collections.singletonList(shardBounds),Collections.<SAMReaderID,SAMFileSpan>emptyMap());
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, null, window.getLocus(), window, null, null);
@ -64,7 +66,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecordIterator iterator = new SAMRecordIterator(read);
GenomeLoc shardBounds = GenomeLocParser.createGenomeLoc("chr1", 1, 5);
Shard shard = new LocusShard(Collections.singletonList(shardBounds));
Shard shard = new MockLocusShard(Collections.singletonList(shardBounds));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -79,7 +81,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read = buildSAMRecord("chr1", 1, 5);
SAMRecordIterator iterator = new SAMRecordIterator(read);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -93,7 +95,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read = buildSAMRecord("chr1", 6, 10);
SAMRecordIterator iterator = new SAMRecordIterator(read);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -107,7 +109,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read = buildSAMRecord("chr1", 3, 7);
SAMRecordIterator iterator = new SAMRecordIterator(read);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -121,7 +123,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read = buildSAMRecord("chr1", 1, 10);
SAMRecordIterator iterator = new SAMRecordIterator(read);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 6, 15)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 6, 15)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -135,7 +137,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read = buildSAMRecord("chr1", 6, 15);
SAMRecordIterator iterator = new SAMRecordIterator(read);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -150,7 +152,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read2 = buildSAMRecord("chr1", 6, 10);
SAMRecordIterator iterator = new SAMRecordIterator(read1, read2);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -169,7 +171,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read4 = buildSAMRecord("chr1", 6, 10);
SAMRecordIterator iterator = new SAMRecordIterator(read1, read2, read3, read4);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -188,7 +190,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read4 = buildSAMRecord("chr1", 5, 9);
SAMRecordIterator iterator = new SAMRecordIterator(read1, read2, read3, read4);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -209,7 +211,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecord read6 = buildSAMRecord("chr1", 6, 10);
SAMRecordIterator iterator = new SAMRecordIterator(read1, read2, read3, read4, read5, read6);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 1, 10)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);
@ -237,7 +239,7 @@ public abstract class LocusViewTemplate extends BaseTest {
SAMRecordIterator iterator = new SAMRecordIterator(read01, read02, read03, read04, read05, read06,
read07, read08, read09, read10, read11, read12);
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 6, 15)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chr1", 6, 15)));
WindowMaker windowMaker = new WindowMaker(iterator,shard.getGenomeLocs(),new ArrayList<SamRecordFilter>(), LocusIteratorByState.NO_FILTERS);
WindowMaker.WindowMakerIterator window = windowMaker.next();
LocusShardDataProvider dataProvider = new LocusShardDataProvider(shard, window.getSourceInfo(), window.getLocus(), window, null, null);

View File

@ -3,6 +3,7 @@ package org.broadinstitute.sting.gatk.datasources.providers;
import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.datasources.shards.LocusShard;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.MockLocusShard;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.gatk.refdata.ReferenceOrderedData;
@ -53,7 +54,7 @@ public class ReferenceOrderedViewUnitTest extends BaseTest {
*/
@Test
public void testNoBindings() {
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chrM",1,30)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chrM",1,30)));
LocusShardDataProvider provider = new LocusShardDataProvider(shard, null, shard.getGenomeLocs().get(0), null, seq, Collections.<ReferenceOrderedDataSource>emptyList());
ReferenceOrderedView view = new ManagingReferenceOrderedView( provider );
@ -70,7 +71,7 @@ public class ReferenceOrderedViewUnitTest extends BaseTest {
ReferenceOrderedData rod = new ReferenceOrderedData("tableTest", file, TabularROD.class);
ReferenceOrderedDataSource dataSource = new ReferenceOrderedDataSource(null, new RODRMDTrack(TabularROD.class,"tableTest",file,rod));
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chrM",1,30)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chrM",1,30)));
LocusShardDataProvider provider = new LocusShardDataProvider(shard, null, shard.getGenomeLocs().get(0), null, seq, Collections.singletonList(dataSource));
ReferenceOrderedView view = new ManagingReferenceOrderedView( provider );
@ -96,7 +97,7 @@ public class ReferenceOrderedViewUnitTest extends BaseTest {
ReferenceOrderedDataSource dataSource2 = new ReferenceOrderedDataSource(null,new RODRMDTrack(TabularROD.class,"tableTest2",file,rod2));;
Shard shard = new LocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chrM",1,30)));
Shard shard = new MockLocusShard(Collections.singletonList(GenomeLocParser.createGenomeLoc("chrM",1,30)));
LocusShardDataProvider provider = new LocusShardDataProvider(shard, null, shard.getGenomeLocs().get(0), null, seq, Arrays.asList(dataSource1,dataSource2));
ReferenceOrderedView view = new ManagingReferenceOrderedView( provider );

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) 2010, The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.broadinstitute.sting.gatk.datasources.shards;
import org.broadinstitute.sting.utils.GenomeLoc;
import java.util.List;
/**
* A mock locus shard, usable for infrastructure that requires a shard to behave properly.
*
* @author mhanna
* @version 0.1
*/
public class MockLocusShard extends LocusShard {
public MockLocusShard(final List<GenomeLoc> intervals) {
super(intervals,null);
}
}

View File

@ -88,7 +88,7 @@ public class SAMBAMDataSourceUnitTest extends BaseTest {
Reads reads = new Reads(fl);
// the sharding strat.
SAMDataSource data = new BlockDrivenSAMDataSource(reads);
SAMDataSource data = new SAMDataSource(reads);
ShardStrategy strat = ShardStrategyFactory.shatter(data,seq,ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL, seq.getSequenceDictionary(), 100000);
int count = 0;
@ -133,7 +133,7 @@ public class SAMBAMDataSourceUnitTest extends BaseTest {
Reads reads = new Reads(fl);
// the sharding strat.
SAMDataSource data = new BlockDrivenSAMDataSource(reads);
SAMDataSource data = new SAMDataSource(reads);
ShardStrategy strat = ShardStrategyFactory.shatter(data,seq,ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL, seq.getSequenceDictionary(), 100000);
ArrayList<Integer> readcountPerShard = new ArrayList<Integer>();
@ -176,7 +176,7 @@ public class SAMBAMDataSourceUnitTest extends BaseTest {
count = 0;
// the sharding strat.
data = new BlockDrivenSAMDataSource(reads);
data = new SAMDataSource(reads);
strat = ShardStrategyFactory.shatter(data,seq,ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL, seq.getSequenceDictionary(), 100000);
logger.debug("Pile two:");

View File

@ -9,7 +9,6 @@ import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.walkers.qc.CountReadsWalker;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.GenomeLocParser;
@ -115,7 +114,7 @@ public class TraverseReadsUnitTest extends BaseTest {
}
GenomeLocParser.setupRefContigOrdering(ref);
SAMDataSource dataSource = new BlockDrivenSAMDataSource(new Reads(bamList));
SAMDataSource dataSource = new SAMDataSource(new Reads(bamList));
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(dataSource,ref,ShardStrategyFactory.SHATTER_STRATEGY.READS_EXPERIMENTAL,
ref.getSequenceDictionary(),
readSize);