First successful test of new sharding system prototype. Can traverse over reads from a single

BAM file.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2587 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-01-15 03:35:55 +00:00
parent db9570ae29
commit b19bb19f3d
27 changed files with 971 additions and 517 deletions

View File

@ -36,6 +36,7 @@ public class BAMChunkIterator implements Iterator<Chunk> {
this.blockIterator = blockIterator;
this.prefetchedSegments = new LinkedList<BlockSegment>();
this.filters = new PriorityQueue<Chunk>(filters);
seedNextSegments();
}
/**

View File

@ -8,6 +8,8 @@ import java.io.File;
import java.io.IOException;
import java.io.DataInputStream;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
/**
* Loads a BAM file header from an file, optionally providing its position
@ -27,6 +29,8 @@ public class BAMFileHeaderLoader {
*/
private final Chunk location;
public static final Chunk preambleLocation = new Chunk(0<<16 | 0, 0<<16 | 3);
/**
* Load the header from the given file.
* @param header the parsed haeder for the BAM file.
@ -69,9 +73,55 @@ public class BAMFileHeaderLoader {
headerCodec.setValidationStringency(SAMFileReader.ValidationStringency.SILENT);
SAMFileHeader header = headerCodec.decode(new StringLineReader(textHeader),file.getAbsolutePath());
// directly copied from BAMFileReader...
final int sequenceCount = binaryCodec.readInt();
if (header.getSequenceDictionary().size() > 0) {
// It is allowed to have binary sequences but no text sequences, so only validate if both are present
if (sequenceCount != header.getSequenceDictionary().size()) {
throw new SAMFormatException("Number of sequences in text header (" +
header.getSequenceDictionary().size() +
") != number of sequences in binary header (" + sequenceCount + ") for file " + file);
}
for (int i = 0; i < sequenceCount; i++) {
final SAMSequenceRecord binarySequenceRecord = readSequenceRecord(binaryCodec,file);
final SAMSequenceRecord sequenceRecord = header.getSequence(i);
if (!sequenceRecord.getSequenceName().equals(binarySequenceRecord.getSequenceName())) {
throw new SAMFormatException("For sequence " + i + ", text and binary have different names in file " +
binaryCodec);
}
if (sequenceRecord.getSequenceLength() != binarySequenceRecord.getSequenceLength()) {
throw new SAMFormatException("For sequence " + i + ", text and binary have different lengths in file " +
binaryCodec);
}
}
} else {
// If only binary sequences are present, copy them into mFileHeader
final List<SAMSequenceRecord> sequences = new ArrayList<SAMSequenceRecord>(sequenceCount);
for (int i = 0; i < sequenceCount; i++) {
sequences.add(readSequenceRecord(binaryCodec,file));
}
header.setSequenceDictionary(new SAMSequenceDictionary(sequences));
}
inputStream.close();
return new BAMFileHeaderLoader(header,new Chunk(buffer.length,inputStream.getFilePointer()));
return new BAMFileHeaderLoader(header,new Chunk(buffer.length,inputStream.getFilePointer()-1));
}
/**
* Reads a single binary sequence record from the file or stream
* @param binaryCodec stream to read from.
* @param file Note that this is used only for reporting errors.
* @return an individual sequence record.
*/
private static SAMSequenceRecord readSequenceRecord(final BinaryCodec binaryCodec, final File file) {
final int nameLength = binaryCodec.readInt();
if (nameLength <= 1) {
throw new SAMFormatException("Invalid BAM file header: missing sequence name in file " + file.getAbsolutePath());
}
final String sequenceName = binaryCodec.readString(nameLength - 1);
// Skip the null terminator
binaryCodec.readByte();
final int sequenceLength = binaryCodec.readInt();
return new SAMSequenceRecord(sequenceName, sequenceLength);
}
}

View File

@ -490,10 +490,7 @@ class BAMFileReader2
mFilePointerLimit = endOffset;
}
// Pull next record from stream
final SAMRecord record = super.getNextRecord();
if (record == null) {
return null;
}
return super.getNextRecord();
}
}
}

View File

@ -51,7 +51,7 @@ class BlockSegment {
* @return the chunk equivalent of this block.
*/
public Chunk toChunk() {
return new Chunk(position << 16 & blockStart,position << 16 & blockStop);
return new Chunk(position << 16 | blockStart,position << 16 | blockStop);
}
/**

View File

@ -12,7 +12,7 @@ import java.util.ArrayList;
* @author mhanna
* @version 0.1
*/
class Chunk implements Comparable<Chunk> {
public class Chunk implements Comparable<Chunk> {
private long mChunkStart;
private long mChunkEnd;

View File

@ -26,13 +26,14 @@
package org.broadinstitute.sting.gatk;
import net.sf.picard.reference.ReferenceSequenceFile;
import net.sf.picard.sam.SamFileHeaderMerger;
import net.sf.picard.filter.SamRecordFilter;
import net.sf.samtools.*;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.IndexDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
@ -329,11 +330,11 @@ public class GenomeAnalysisEngine {
public List<Set<String>> getSamplesByReaders() {
SamFileHeaderMerger hm = getDataSource().getHeaderMerger();
Collection<SAMFileReader> readers = getDataSource().getReaders();
List<Set<String>> sample_sets = new ArrayList<Set<String>>(hm.getReaders().size());
List<Set<String>> sample_sets = new ArrayList<Set<String>>(readers.size());
for (SAMFileReader r : hm.getReaders()) {
for (SAMFileReader r : readers) {
Set<String> samples = new HashSet<String>(1);
sample_sets.add(samples);
@ -358,11 +359,11 @@ public class GenomeAnalysisEngine {
public List<Set<String>> getLibrariesByReaders() {
SamFileHeaderMerger hm = getDataSource().getHeaderMerger();
Collection<SAMFileReader> readers = getDataSource().getReaders();
List<Set<String>> lib_sets = new ArrayList<Set<String>>(hm.getReaders().size());
List<Set<String>> lib_sets = new ArrayList<Set<String>>(readers.size());
for (SAMFileReader r : hm.getReaders()) {
for (SAMFileReader r : readers) {
Set<String> libs = new HashSet<String>(2);
lib_sets.add(libs);
@ -387,20 +388,20 @@ public class GenomeAnalysisEngine {
public List<Set<String>> getMergedReadGroupsByReaders() {
SamFileHeaderMerger hm = getDataSource().getHeaderMerger();
Collection<SAMFileReader> readers = getDataSource().getReaders();
List<Set<String>> rg_sets = new ArrayList<Set<String>>(hm.getReaders().size());
List<Set<String>> rg_sets = new ArrayList<Set<String>>(readers.size());
for (SAMFileReader r : hm.getReaders()) {
for (SAMFileReader r : readers) {
Set<String> groups = new HashSet<String>(5);
rg_sets.add(groups);
for (SAMReadGroupRecord g : r.getFileHeader().getReadGroups()) {
if (hm.hasReadGroupCollisions()) { // Check if there were read group clashes with hasGroupIdDuplicates and if so:
if (getDataSource().hasReadGroupCollisions()) { // Check if there were read group clashes with hasGroupIdDuplicates and if so:
// use HeaderMerger to translate original read group id from the reader into the read group id in the
// merged stream, and save that remapped read group id to associate it with specific reader
groups.add(hm.getReadGroupId(r, g.getReadGroupId()));
groups.add(getDataSource().getReadGroupId(r, g.getReadGroupId()));
} else {
// otherwise, pass through the unmapped read groups since this is what Picard does as well
groups.add(g.getReadGroupId());
@ -609,26 +610,29 @@ public class GenomeAnalysisEngine {
ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL :
ShardStrategyFactory.SHATTER_STRATEGY.LINEAR;
shardStrategy = ShardStrategyFactory.shatter(shardType,
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
shardType,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,
intervals, maxIterations);
} else
shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations);
} else if (walker instanceof ReadWalker ||
walker instanceof DuplicateWalker) {
shardType = ShardStrategyFactory.SHATTER_STRATEGY.READS;
if(argCollection.experimentalSharding)
shardType = ShardStrategyFactory.SHATTER_STRATEGY.READS_EXPERIMENTAL;
else
shardType = ShardStrategyFactory.SHATTER_STRATEGY.READS;
if (intervals != null && !intervals.isEmpty()) {
shardStrategy = ShardStrategyFactory.shatter(shardType,
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,shardType,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,
intervals, maxIterations);
} else {
shardStrategy = ShardStrategyFactory.shatter(shardType,
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,shardType,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations);
}
@ -636,7 +640,8 @@ public class GenomeAnalysisEngine {
if ((intervals == null || intervals.isEmpty()) && !exclusions.contains(ValidationExclusion.TYPE.ALLOW_EMPTY_INTERVAL_LIST))
Utils.warnUser("walker is of type LocusWindow (which operates over intervals), but no intervals were provided." +
"This may be unintentional, check your command-line arguments.");
shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL,
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,
intervals, maxIterations);
@ -657,7 +662,11 @@ public class GenomeAnalysisEngine {
if (reads.getReadsFiles().size() == 0)
return null;
SAMDataSource dataSource = new SAMDataSource(reads);
SAMDataSource dataSource = null;
if(argCollection.experimentalSharding)
dataSource = new BlockDrivenSAMDataSource(reads);
else
dataSource = new IndexDrivenSAMDataSource(reads);
return dataSource;
}

View File

@ -146,6 +146,9 @@ public class GATKArgumentCollection {
@Argument(fullName = "enableRodWalkers", shortName = "erw", doc = "Enable experimental rodWalker support. TEMPORARY HACK TO ALLOW EXPERIMENTATION WITH ROD WALKERS. [default is false]}.", required = false)
public boolean enableRodWalkers = false;
@Element(required = false)
@Argument(fullName = "experimental_sharding",shortName="es", doc="Use the experimental sharding strategy. Will not work for all traversal types.", required = false)
public boolean experimentalSharding = false;
/**
* marshal the data out to a object

View File

@ -0,0 +1,44 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.Chunk;
import java.util.List;
/**
* Expresses a shard of read data in block format.
*
* @author mhanna
* @version 0.1
*/
public class BlockDelimitedReadShard extends ReadShard {
/**
* The list of chunks to retrieve when loading this shard.
*/
private final List<Chunk> chunks;
public BlockDelimitedReadShard(List<Chunk> chunks) {
this.chunks = chunks;
}
/**
* Get the list of chunks delimiting this shard.
* @return a list of chunks that contain data for this shard.
*/
public List<Chunk> getChunks() {
return chunks;
}
/**
* String representation of this shard.
* @return A string representation of the boundaries of this shard.
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for(Chunk chunk : chunks) {
sb.append(chunk);
sb.append(' ');
}
return sb.toString();
}
}

View File

@ -0,0 +1,101 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import net.sf.samtools.Chunk;
import net.sf.samtools.BAMFileHeaderLoader;
import net.sf.samtools.BAMChunkIterator;
import net.sf.samtools.BAMBlockIterator;
import java.util.*;
import java.io.File;
import java.io.IOException;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
/**
* A read shard strategy that delimits based on the number of
* blocks in the BAM file.
*
* @author mhanna
* @version 0.1
*/
public class BlockDelimitedReadShardStrategy extends ReadShardStrategy {
/**
* Number of blocks in a given shard.
*/
protected int blockCount = 100;
/**
* The actual chunks streaming into the file.
*/
private final BAMChunkIterator chunkIterator;
/**
* The data backing the next chunks to deliver to the traversal engine.
*/
private final List<Chunk> nextChunks;
/**
* Create a new read shard strategy, loading read shards from the given BAM file.
* @param dataSource Data source from which to load shards.
*/
public BlockDelimitedReadShardStrategy(SAMDataSource dataSource) {
if(dataSource.getReadsInfo().getReadsFiles().size() > 1)
throw new UnsupportedOperationException("Experimental sharding only works with a single BAM at the moment.");
File bamFile = dataSource.getReadsInfo().getReadsFiles().get(0);
try {
Chunk headerLocation = BAMFileHeaderLoader.load(bamFile).getLocation();
chunkIterator = new BAMChunkIterator(new BAMBlockIterator(bamFile),Arrays.asList(BAMFileHeaderLoader.preambleLocation,headerLocation));
}
catch(IOException ex) {
throw new StingException("Unable to open BAM file for sharding.");
}
nextChunks = new ArrayList<Chunk>();
advance();
}
/**
* do we have another read shard?
* @return True if any more data is available. False otherwise.
*/
public boolean hasNext() {
return nextChunks.size() > 0;
}
/**
* Retrieves the next shard, if available.
* @return The next shard, if available.
* @throws NoSuchElementException if no such shard is available.
*/
public Shard next() {
if(!hasNext())
throw new NoSuchElementException("No such element available: SAM reader has arrived at last shard.");
Shard shard = new BlockDelimitedReadShard(Collections.unmodifiableList(new ArrayList<Chunk>(nextChunks)));
advance();
return shard;
}
/**
* @throws UnsupportedOperationException always.
*/
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
/**
* Convenience method for using ShardStrategy in an foreach loop.
* @return A iterator over shards.
*/
public Iterator<Shard> iterator() {
return this;
}
private void advance() {
nextChunks.clear();
int chunksCopied = 0;
while(chunksCopied++ < blockCount && chunkIterator.hasNext())
nextChunks.add(chunkIterator.next());
}
}

View File

@ -62,4 +62,13 @@ public class IntervalShard implements Shard {
public Shard.ShardType getShardType() {
return mType;
}
/**
* String representation of this shard.
* @return A string representation of the boundaries of this shard.
*/
@Override
public String toString() {
return mSet.toString();
}
}

View File

@ -0,0 +1,51 @@
package org.broadinstitute.sting.gatk.datasources.shards;
/**
* A read shard delimited by an actual read count, rather than blocks or any other
* physical mapping of the BAM file.
*
* @author mhanna
* @version 0.1
*/
public class ReadDelimitedReadShard extends ReadShard {
// the count of the reads we want to copy off
private int size = 0;
/**
* our tie in for the shard strategy. This allows us to signal to the shard
* strategy that we've finished process, so it can indicate that we're out of reads
*/
private final ReadDelimitedReadShardStrategy strat;
/**
* create a read shard, given a read size
* @param strat The sharding strategy used to create this shard.
* @param size Size of the shard, in reads.
*/
ReadDelimitedReadShard(ReadDelimitedReadShardStrategy strat, int size) {
this.size = size;
this.strat = strat;
}
/** @return the genome location represented by this shard */
public int getSize() {
return size;
}
/**
* this method is used as a backend, to signal to the sharding strategy that we've
* finished processing. When we move to a more read-aware bam system this method could disappear.
*/
public void signalDone() {
strat.signalDone();
}
/**
* String representation of this shard.
* @return A string representation of the boundaries of this shard.
*/
@Override
public String toString() {
return String.format("%d reads", size);
}
}

View File

@ -0,0 +1,86 @@
package org.broadinstitute.sting.gatk.datasources.shards;
import java.util.Iterator;
/**
* A shard strategy that breaks up shards based on how many reads are
* in each.
*
* @author mhanna
* @version 0.1
*/
public class ReadDelimitedReadShardStrategy extends ReadShardStrategy {
// our read bucket size, default
protected long readCount = 100000L;
// our hasnext flag
boolean hasNext = true;
// our limiting factor
long limitedSize = -1;
boolean stopDueToLimitingFactor = false;
/**
* the default constructor
* @param size the read count to iterate over
* @param limitedSize limit the shard to this length
*/
ReadDelimitedReadShardStrategy(long size, long limitedSize) {
readCount = size;
this.limitedSize = limitedSize;
}
/**
* do we have another read shard?
* @return
*/
public boolean hasNext() {
if (stopDueToLimitingFactor) {
return false;
}
return hasNext;
}
public Shard next() {
if (limitedSize > 0) {
if (limitedSize > readCount) {
limitedSize = limitedSize - readCount;
}
else {
readCount = limitedSize;
limitedSize = 0;
stopDueToLimitingFactor = true;
}
}
return new ReadDelimitedReadShard(this,(int)readCount);
}
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
public Iterator<Shard> iterator() {
return this;
}
/**
* set the next shards size
*
* @param size adjust the next size to this
*/
public void adjustNextShardSize(long size) {
readCount = size;
}
/**
* this function is a work-around for the fact that
* we don't know when we're out of reads until the SAM data source
* tells us so.
*/
public void signalDone() {
hasNext = false;
}
}

View File

@ -18,68 +18,16 @@ import org.broadinstitute.sting.utils.GenomeLoc;
*
*/
/**
* the base class for read shards.
* @author aaron
* <p/>
* ReadShard
* <p/>
* the base class for read shards.
*/
public class ReadShard implements Shard {
// the count of the reads we want to copy off
private int size = 0;
/**
* our tie in for the shard strategy. This allows us to signal to the shard
* strategy that we've finished process, so it can indicate that we're out of reads
*/
private final ReadShardStrategy str;
// the reference back to our read shard strategy
private final ReadShardStrategy strat;
/**
* create a read shard, given a read size
*
* @param size
*/
ReadShard(int size, ReadShardStrategy strat) {
this.str = null;
this.size = size;
this.strat = strat;
}
/**
* create a read shard, given a read size
*
* @param size
*/
ReadShard(ReadShardStrategy caller, int size, ReadShardStrategy strat) {
this.str = caller;
this.size = size;
this.strat = strat;
}
public abstract class ReadShard implements Shard {
/** @return the genome location represented by this shard */
public GenomeLoc getGenomeLoc() {
throw new UnsupportedOperationException("ReadShard isn't genome loc aware");
}
/** @return the genome location represented by this shard */
public int getSize() {
return size;
}
/**
* this method is used as a backend, to signal to the sharding strategy that we've
* finished processing. When we move to a more read-aware bam system this method could disappear.
*/
public void signalDone() {
strat.signalDone();
}
/**
* what kind of shard do we return
*

View File

@ -40,85 +40,12 @@ import java.util.Iterator;
* The sharding strategy for reads using a simple counting mechanism. Each read shard
* has a specific number of reads (default to 100K) which is configured in the constructor.
*/
public class ReadShardStrategy implements ShardStrategy {
public abstract class ReadShardStrategy implements ShardStrategy {
// do we use unmapped reads in the sharding strategy
private boolean unMappedReads = true;
// our read bucket size, default
protected long readCount = 100000L;
// our sequence dictionary
final private SAMSequenceDictionary dic;
// our hasnext flag
boolean hasNext = true;
// our limiting factor
long limitedSize = -1;
boolean stopDueToLimitingFactor = false;
/**
* the default constructor
* @param dic the sequence dictionary to use
* @param size the read count to iterate over
*/
ReadShardStrategy(SAMSequenceDictionary dic, long size, long limitedSize) {
this.dic = dic;
readCount = size;
this.limitedSize = limitedSize;
}
/**
* do we have another read shard?
* @return
*/
public boolean hasNext() {
if (stopDueToLimitingFactor) {
return false;
}
return hasNext;
}
public Shard next() {
if (limitedSize > 0) {
if (limitedSize > readCount) {
limitedSize = limitedSize - readCount;
}
else {
readCount = limitedSize;
limitedSize = 0;
stopDueToLimitingFactor = true;
}
}
return new ReadShard((int)readCount, this);
}
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
public Iterator<Shard> iterator() {
return this;
}
/**
* set the next shards size
*
* @param size adjust the next size to this
*/
public void adjustNextShardSize(long size) {
readCount = size;
}
/**
* this function is a work-around for the fact that
* we don't know when we're out of reads until the SAM data source
* tells us so.
*/
public void signalDone() {
hasNext = false;
}
}

View File

@ -28,13 +28,4 @@ import java.util.Iterator;
* class, but not this will be an interface to accomidate read based sharding
*/
public interface ShardStrategy extends Iterator<Shard>, Iterable<Shard> {
/**
* set the next shards size
*
* @param size adjust the next size to this
*/
public abstract void adjustNextShardSize(long size);
}

View File

@ -4,6 +4,9 @@ import net.sf.samtools.SAMSequenceDictionary;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import java.io.File;
/**
*
@ -37,6 +40,7 @@ public class ShardStrategyFactory {
LINEAR,
EXPONENTIAL,
READS,
READS_EXPERIMENTAL,
INTERVAL,
MONOLITHIC // Put all of the available data into one shard.
}
@ -48,31 +52,35 @@ public class ShardStrategyFactory {
/**
* get a new shatter strategy
*
* @param dataSource File pointer to BAM. TODO: Eliminate this argument; pass a data source instead!
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @return
* @return a shard strategy capable of dividing input data into shards.
*/
static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize) {
return ShardStrategyFactory.shatter(strat, dic, startingSize, -1L);
static public ShardStrategy shatter(SAMDataSource dataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize) {
return ShardStrategyFactory.shatter(dataSource, strat, dic, startingSize, -1L);
}
/**
* get a new shatter strategy
*
* @param dataSource File pointer to BAM.
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @return
* @return a shard strategy capable of dividing input data into shards.
*/
static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, long limitByCount) {
static public ShardStrategy shatter(SAMDataSource dataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, long limitByCount) {
switch (strat) {
case LINEAR:
return new LinearLocusShardStrategy(dic, startingSize, limitByCount);
case EXPONENTIAL:
return new ExpGrowthLocusShardStrategy(dic, startingSize, limitByCount);
case READS:
return new ReadShardStrategy(dic, startingSize, limitByCount);
return new ReadDelimitedReadShardStrategy(startingSize, limitByCount);
case READS_EXPERIMENTAL:
return new BlockDelimitedReadShardStrategy(dataSource);
case INTERVAL:
throw new StingException("Requested trategy: " + strat + " doesn't work with the limiting count (-M) command line option");
default:
@ -90,8 +98,8 @@ public class ShardStrategyFactory {
* @param startingSize the starting size
* @return
*/
static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst) {
return ShardStrategyFactory.shatter(strat, dic, startingSize, lst, -1l);
static public ShardStrategy shatter(SAMDataSource dataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst) {
return ShardStrategyFactory.shatter(dataSource, strat, dic, startingSize, lst, -1l);
}
@ -103,7 +111,7 @@ public class ShardStrategyFactory {
* @param startingSize the starting size
* @return
*/
static public ShardStrategy shatter(SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst, long limitDataCount) {
static public ShardStrategy shatter(SAMDataSource dataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst, long limitDataCount) {
switch (strat) {
case LINEAR:
return new LinearLocusShardStrategy(dic, startingSize, lst, limitDataCount);
@ -113,6 +121,8 @@ public class ShardStrategyFactory {
return new IntervalShardStrategy(startingSize, lst, Shard.ShardType.LOCUS_INTERVAL);
case READS:
return new IntervalShardStrategy(startingSize, lst, Shard.ShardType.READ_INTERVAL);
case READS_EXPERIMENTAL:
throw new UnsupportedOperationException("Cannot do experimental read sharding with intervals");
default:
throw new StingException("Strategy: " + strat + " isn't implemented");
}

View File

@ -0,0 +1,88 @@
package org.broadinstitute.sting.gatk.datasources.simpleDataSources;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.BlockDelimitedReadShard;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
import org.broadinstitute.sting.utils.StingException;
import net.sf.samtools.SAMFileReader;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMFileReader2;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.util.CloseableIterator;
import java.util.Collection;
import java.io.File;
/**
* An iterator that's aware of how data is stored on disk in SAM format.
*
* @author mhanna
* @version 0.1
*/
public class BlockDrivenSAMDataSource extends SAMDataSource {
private final SAMFileReader2 reader;
/**
* Create a new block-aware SAM data source given the supplied read metadata.
* @param reads The read metadata.
*/
public BlockDrivenSAMDataSource(Reads reads) {
super(reads);
if(reads.getReadsFiles().size() > 1)
throw new StingException("Experimental sharding strategy cannot handle multiple BAM files at this point.");
File readsFile = reads.getReadsFiles().get(0);
reader = new SAMFileReader2(readsFile);
}
public boolean hasIndex() {
return reader.hasIndex();
}
public StingSAMIterator seek(Shard shard) {
if(!(shard instanceof BlockDelimitedReadShard))
throw new StingException("Currently unable to operate on types other than block delimited read shards.");
CloseableIterator<SAMRecord> iterator = reader.iterator(((BlockDelimitedReadShard)shard).getChunks());
return applyDecoratingIterators(true,
StingSAMIteratorAdapter.adapt(reads, iterator),
reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
}
/**
* Gets the merged header from the SAM file.
* @return The merged header.
*/
public SAMFileHeader getHeader() {
return reader.getFileHeader();
}
/**
* Currently unsupported.
* @return
*/
public Collection<SAMFileReader> getReaders() {
throw new StingException("Currently unable to get readers for shard-based fields.");
}
/**
* No read group collisions at this time because only one SAM file is currently supported.
* @return False always.
*/
public boolean hasReadGroupCollisions() {
return false;
}
/**
* Currently unsupported.
* @return
*/
public String getReadGroupId(final SAMFileReader reader, final String originalReadGroupId) {
throw new UnsupportedOperationException("Getting read group ID from this experimental SAM reader is not currently supported.");
}
}

View File

@ -0,0 +1,423 @@
package org.broadinstitute.sting.gatk.datasources.simpleDataSources;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.SAMFileReader;
import net.sf.samtools.util.CloseableIterator;
import net.sf.picard.filter.FilteringIterator;
import net.sf.picard.filter.SamRecordFilter;
import net.sf.picard.sam.SamFileHeaderMerger;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.shards.ReadShard;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.MonolithicShard;
import org.broadinstitute.sting.gatk.datasources.shards.ReadDelimitedReadShard;
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
import org.broadinstitute.sting.gatk.iterators.*;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.sam.SAMReadViolationHistogram;
import java.io.File;
import java.util.Collection;
/*
* Copyright (c) 2009 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* User: aaron
* Date: Mar 26, 2009
* Time: 2:36:16 PM
* <p/>
* Converts shards to SAM iterators over the specified region
*/
public class IndexDrivenSAMDataSource extends SAMDataSource {
// used for the reads case, the last count of reads retrieved
long readsTaken = 0;
// our last genome loc position
protected GenomeLoc lastReadPos = null;
// do we take unmapped reads
private boolean includeUnmappedReads = true;
// reads based traversal variables
private boolean intoUnmappedReads = false;
private int readsSeenAtLastPos = 0;
/**
* A histogram of exactly what reads were removed from the input stream and why.
*/
private SAMReadViolationHistogram violations = new SAMReadViolationHistogram();
// A pool of SAM iterators.
private SAMResourcePool resourcePool = null;
private GenomeLoc mLastInterval = null;
/**
* Returns a histogram of reads that were screened out, grouped by the nature of the error.
* @return Histogram of reads. Will not be null.
*/
public SAMReadViolationHistogram getViolationHistogram() {
return violations;
}
/**
* constructor, given sam files
*
* @param reads the list of sam files
*/
public IndexDrivenSAMDataSource( Reads reads ) throws SimpleDataSourceLoadException {
super(reads);
resourcePool = new SAMResourcePool(reads);
}
/**
* Do all BAM files backing this data source have an index? The case where hasIndex() is false
* is supported, but only in a few extreme cases.
* @return True if an index is present; false otherwise.
*/
public boolean hasIndex() {
return resourcePool.hasIndex;
}
/**
* Gets the (potentially merged) SAM file header.
*
* @return SAM file header.
*/
public SAMFileHeader getHeader() {
return resourcePool.getHeader();
}
/**
* Returns Reads data structure containing information about the reads data sources placed in this pool as well as
* information about how they are downsampled, sorted, and filtered
* @return
*/
public Reads getReadsInfo() { return reads; }
/**
* Returns header merger: a class that keeps the mapping between original read groups and read groups
* of the merged stream; merger also provides access to the individual file readers (and hence headers
* prior to the merging too) maintained by the system.
* @return
*/
public Collection<SAMFileReader> getReaders() { return resourcePool.getHeaderMerger().getReaders(); }
/** Returns true if there are read group duplicates within the merged headers. */
public boolean hasReadGroupCollisions() {
return resourcePool.getHeaderMerger().hasReadGroupCollisions();
}
/** Returns the read group id that should be used for the input read and RG id. */
public String getReadGroupId(final SAMFileReader reader, final String originalReadGroupId) {
return resourcePool.getHeaderMerger().getReadGroupId(reader,originalReadGroupId);
}
/**
*
* @param shard the shard to get data for
*
* @return an iterator for that region
*/
public StingSAMIterator seek( Shard shard ) throws SimpleDataSourceLoadException {
// setup the iterator pool if it's not setup
boolean queryOverlapping = ( shard.getShardType() == Shard.ShardType.READ ) ? false : true;
resourcePool.setQueryOverlapping(queryOverlapping);
StingSAMIterator iterator = null;
if (shard.getShardType() == Shard.ShardType.READ) {
iterator = seekRead(shard);
iterator = applyDecoratingIterators(true,
iterator,
reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
} else if (shard.getShardType() == Shard.ShardType.LOCUS) {
iterator = seekLocus(shard);
iterator = applyDecoratingIterators(false,
iterator,
reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
} else if ((shard.getShardType() == Shard.ShardType.LOCUS_INTERVAL) ||
(shard.getShardType() == Shard.ShardType.READ_INTERVAL)) {
iterator = seekLocus(shard);
iterator = applyDecoratingIterators(false,
iterator,
reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
// add the new overlapping detection iterator, if we have a last interval and we're a read based shard
if (mLastInterval != null && shard.getShardType() == Shard.ShardType.READ_INTERVAL )
iterator = new PlusOneFixIterator(shard.getGenomeLoc(),new IntervalOverlapIterator(iterator,mLastInterval,false));
mLastInterval = shard.getGenomeLoc();
} else {
throw new StingException("seek: Unknown shard type");
}
return iterator;
}
/**
* <p>
* seekLocus
* </p>
*
* @param shard the shard containing the genome location to extract data for
*
* @return an iterator for that region
*/
private StingSAMIterator seekLocus( Shard shard ) throws SimpleDataSourceLoadException {
if(shard instanceof MonolithicShard)
return createIterator(new EntireStream());
if( getHeader().getSequenceDictionary().getSequences().size() == 0 )
throw new StingException("Unable to seek to the given locus; reads data source has no alignment information.");
return createIterator( new MappedStreamSegment(shard.getGenomeLoc()) );
}
/**
* <p>
* seek
* </p>
*
* @param shard the read shard to extract from
*
* @return an iterator for that region
*/
private StingSAMIterator seekRead( Shard shard ) throws SimpleDataSourceLoadException {
if(shard instanceof MonolithicShard)
return createIterator(new EntireStream());
ReadDelimitedReadShard readShard = (ReadDelimitedReadShard)shard;
StingSAMIterator iter = null;
// If there are no entries in the sequence dictionary, there can't possibly be any unmapped reads. Force state to 'unmapped'.
if( isSequenceDictionaryEmpty() )
intoUnmappedReads = true;
if (!intoUnmappedReads) {
if (lastReadPos == null) {
lastReadPos = GenomeLocParser.createGenomeLoc(getHeader().getSequenceDictionary().getSequence(0).getSequenceIndex(), 0, Integer.MAX_VALUE);
iter = createIterator(new MappedStreamSegment(lastReadPos));
return InitialReadIterator(readShard.getSize(), iter);
} else {
lastReadPos = GenomeLocParser.setStop(lastReadPos,-1);
iter = fastMappedReadSeek(readShard.getSize(), StingSAMIteratorAdapter.adapt(reads, createIterator(new MappedStreamSegment(lastReadPos))));
}
if (intoUnmappedReads && !includeUnmappedReads)
readShard.signalDone();
}
if (intoUnmappedReads && includeUnmappedReads) {
if (iter != null)
iter.close();
iter = toUnmappedReads(readShard.getSize());
if (!iter.hasNext())
readShard.signalDone();
}
return iter;
}
/**
* If we're in by-read mode, this indicates if we want
* to see unmapped reads too. Only seeing mapped reads
* is much faster, but most BAM files have significant
* unmapped read counts.
*
* @param seeUnMappedReads true to see unmapped reads, false otherwise
*/
public void viewUnmappedReads( boolean seeUnMappedReads ) {
includeUnmappedReads = seeUnMappedReads;
}
/**
* For unit testing, add a custom iterator pool.
*
* @param resourcePool Custom mock iterator pool.
*/
void setResourcePool( SAMResourcePool resourcePool ) {
this.resourcePool = resourcePool;
}
/**
* Retrieve unmapped reads.
*
* @param readCount how many reads to retrieve
*
* @return the bounded iterator that you can use to get the intervaled reads from
*/
StingSAMIterator toUnmappedReads( long readCount ) {
StingSAMIterator iter = createIterator(new UnmappedStreamSegment(readsTaken, readCount));
readsTaken += readCount;
return iter;
}
/**
* A seek function for mapped reads.
*
* @param readCount how many reads to retrieve
* @param iter the iterator to use, seeked to the correct start location
*
* @return the bounded iterator that you can use to get the intervaled reads from. Will be a zero-length
* iterator if no reads are available.
* @throws SimpleDataSourceLoadException
*/
StingSAMIterator fastMappedReadSeek( long readCount, StingSAMIterator iter ) throws SimpleDataSourceLoadException {
BoundedReadIterator bound;
correctForReadPileupSeek(iter);
if (readsTaken == 0) {
return InitialReadIterator(readCount, iter);
}
int x = 0;
SAMRecord rec = null;
// Assuming that lastReadPos should never be null, because this is a mappedReadSeek
// and initial queries are handled by the previous conditional.
int lastContig = lastReadPos.getContigIndex();
int lastPos = (int)lastReadPos.getStart();
while (x < readsTaken) {
if (iter.hasNext()) {
rec = iter.next();
if (lastContig == rec.getReferenceIndex() && lastPos == rec.getAlignmentStart()) ++this.readsSeenAtLastPos;
else this.readsSeenAtLastPos = 1;
lastPos = rec.getAlignmentStart();
++x;
} else {
iter.close();
// jump contigs
lastReadPos = GenomeLocParser.toNextContig(lastReadPos);
if (lastReadPos == null) {
// check to see if we're using unmapped reads, if not return, we're done
readsTaken = 0;
intoUnmappedReads = true;
// fastMappedReadSeek must return an iterator, even if that iterator iterates through nothing.
return new NullSAMIterator(reads);
} else {
readsTaken = readCount;
readsSeenAtLastPos = 0;
lastReadPos = GenomeLocParser.setStop(lastReadPos,-1);
CloseableIterator<SAMRecord> ret = createIterator(new MappedStreamSegment(lastReadPos));
return new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, ret), readCount);
}
}
}
// if we're off the end of the last contig (into unmapped territory)
if (rec != null && rec.getAlignmentStart() == 0) {
readsTaken += readCount;
intoUnmappedReads = true;
}
// else we're not off the end, store our location
else if (rec != null) {
int stopPos = rec.getAlignmentStart();
if (stopPos < lastReadPos.getStart()) {
lastReadPos = GenomeLocParser.createGenomeLoc(lastReadPos.getContigIndex() + 1, stopPos, stopPos);
} else {
lastReadPos = GenomeLocParser.setStart(lastReadPos,rec.getAlignmentStart());
}
}
// in case we're run out of reads, get out
else {
throw new StingException("Danger: weve run out reads in fastMappedReadSeek");
//return null;
}
bound = new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, iter), readCount);
// return the iterator
return bound;
}
/**
* Determines whether the BAM file is completely unsequenced. Requires that the resource pool be initialized.
* @return True if the sequence dictionary is completely empty. False otherwise.
*/
private boolean isSequenceDictionaryEmpty() {
return getHeader().getSequenceDictionary().isEmpty();
}
/**
* Even though the iterator has seeked to the correct location, there may be multiple reads at that location,
* and we may have given some of them out already. Move the iterator to the correct location using the readsAtLastPos variable
*
* @param iter the iterator
*/
private void correctForReadPileupSeek( StingSAMIterator iter ) {
// move the number of reads we read from the last pos
boolean atLeastOneReadSeen = false; // we have a problem where some chomesomes don't have a single read (i.e. the chrN_random chrom.)
for(int i = 0; i < this.readsSeenAtLastPos && iter.hasNext(); i++,iter.next())
atLeastOneReadSeen = true;
if (readsSeenAtLastPos > 0 && !atLeastOneReadSeen) {
throw new SimpleDataSourceLoadException("Seek problem: reads at last position count != 0");
}
}
/**
* set the initial iterator
*
* @param readCount the number of reads
* @param iter the merging iterator
*
* @return a bounded read iterator at the first read position in the file.
*/
private BoundedReadIterator InitialReadIterator( long readCount, CloseableIterator<SAMRecord> iter ) {
BoundedReadIterator bound;
bound = new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, iter), readCount);
this.readsTaken = readCount;
return bound;
}
/**
* Creates an iterator over the selected segment, from a resource pulled from the pool.
* @param segment Segment over which to gather reads.
* @return An iterator over just the reads in the given segment.
*/
private StingSAMIterator createIterator( DataStreamSegment segment ) {
StingSAMIterator iterator = resourcePool.iterator(segment);
StingSAMIterator malformedWrappedIterator = new MalformedSAMFilteringIterator( getHeader(), iterator, violations );
StingSAMIterator readWrappingIterator = new ReadWrappingIterator(malformedWrappedIterator);
return readWrappingIterator;
}
}

View File

@ -2,21 +2,17 @@ package org.broadinstitute.sting.gatk.datasources.simpleDataSources;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMRecord;
import net.sf.samtools.SAMFileReader;
import net.sf.samtools.util.CloseableIterator;
import net.sf.picard.filter.FilteringIterator;
import net.sf.picard.filter.SamRecordFilter;
import net.sf.picard.sam.SamFileHeaderMerger;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.gatk.datasources.shards.ReadShard;
import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.MonolithicShard;
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
import org.broadinstitute.sting.gatk.iterators.*;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.sam.SAMReadViolationHistogram;
import java.io.File;
@ -54,36 +50,21 @@ import java.util.Collection;
* <p/>
* Converts shards to SAM iterators over the specified region
*/
public class SAMDataSource implements SimpleDataSource {
public abstract class SAMDataSource implements SimpleDataSource {
/** Backing support for reads. */
private final Reads reads;
protected final Reads reads;
/** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(SAMDataSource.class);
// used for the reads case, the last count of reads retrieved
long readsTaken = 0;
// our last genome loc position
protected GenomeLoc lastReadPos = null;
// do we take unmapped reads
private boolean includeUnmappedReads = true;
// reads based traversal variables
private boolean intoUnmappedReads = false;
private int readsSeenAtLastPos = 0;
protected boolean includeUnmappedReads = true;
/**
* A histogram of exactly what reads were removed from the input stream and why.
*/
private SAMReadViolationHistogram violations = new SAMReadViolationHistogram();
// A pool of SAM iterators.
private SAMResourcePool resourcePool = null;
private GenomeLoc mLastInterval = null;
protected SAMReadViolationHistogram violations = new SAMReadViolationHistogram();
/**
* Returns a histogram of reads that were screened out, grouped by the nature of the error.
@ -110,7 +91,6 @@ public class SAMDataSource implements SimpleDataSource {
throw new SimpleDataSourceLoadException("SAMDataSource: Unable to load file: " + smFile.getName());
}
}
resourcePool = new SAMResourcePool(reads);
}
/**
@ -118,18 +98,14 @@ public class SAMDataSource implements SimpleDataSource {
* is supported, but only in a few extreme cases.
* @return True if an index is present; false otherwise.
*/
public boolean hasIndex() {
return resourcePool.hasIndex;
}
public abstract boolean hasIndex();
/**
* Gets the (potentially merged) SAM file header.
*
* @return SAM file header.
*/
public SAMFileHeader getHeader() {
return resourcePool.getHeader();
}
public abstract SAMFileHeader getHeader();
/**
@ -140,12 +116,15 @@ public class SAMDataSource implements SimpleDataSource {
public Reads getReadsInfo() { return reads; }
/**
* Returns header merger: a class that keeps the mapping between original read groups and read groups
* of the merged stream; merger also provides access to the individual file readers (and hence headers
* prior to the merging too) maintained by the system.
* @return
* Returns readers used by this data source.
*/
public SamFileHeaderMerger getHeaderMerger() { return resourcePool.getHeaderMerger(); }
public abstract Collection<SAMFileReader> getReaders();
/** Returns true if there are read group duplicates within the merged headers. */
public abstract boolean hasReadGroupCollisions();
/** Returns the read group id that should be used for the input read and RG id. */
public abstract String getReadGroupId(final SAMFileReader reader, final String originalReadGroupId);
/**
*
@ -153,110 +132,7 @@ public class SAMDataSource implements SimpleDataSource {
*
* @return an iterator for that region
*/
public StingSAMIterator seek( Shard shard ) throws SimpleDataSourceLoadException {
// setup the iterator pool if it's not setup
boolean queryOverlapping = ( shard.getShardType() == Shard.ShardType.READ ) ? false : true;
resourcePool.setQueryOverlapping(queryOverlapping);
StingSAMIterator iterator = null;
if (shard.getShardType() == Shard.ShardType.READ) {
iterator = seekRead(shard);
iterator = applyDecoratingIterators(true,
iterator,
reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
} else if (shard.getShardType() == Shard.ShardType.LOCUS) {
iterator = seekLocus(shard);
iterator = applyDecoratingIterators(false,
iterator,
reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
} else if ((shard.getShardType() == Shard.ShardType.LOCUS_INTERVAL) ||
(shard.getShardType() == Shard.ShardType.READ_INTERVAL)) {
iterator = seekLocus(shard);
iterator = applyDecoratingIterators(false,
iterator,
reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
// add the new overlapping detection iterator, if we have a last interval and we're a read based shard
if (mLastInterval != null && shard.getShardType() == Shard.ShardType.READ_INTERVAL )
iterator = new PlusOneFixIterator(shard.getGenomeLoc(),new IntervalOverlapIterator(iterator,mLastInterval,false));
mLastInterval = shard.getGenomeLoc();
} else {
throw new StingException("seek: Unknown shard type");
}
return iterator;
}
/**
* <p>
* seekLocus
* </p>
*
* @param shard the shard containing the genome location to extract data for
*
* @return an iterator for that region
*/
private StingSAMIterator seekLocus( Shard shard ) throws SimpleDataSourceLoadException {
if(shard instanceof MonolithicShard)
return createIterator(new EntireStream());
if( getHeader().getSequenceDictionary().getSequences().size() == 0 )
throw new StingException("Unable to seek to the given locus; reads data source has no alignment information.");
return createIterator( new MappedStreamSegment(shard.getGenomeLoc()) );
}
/**
* <p>
* seek
* </p>
*
* @param shard the read shard to extract from
*
* @return an iterator for that region
*/
private StingSAMIterator seekRead( Shard shard ) throws SimpleDataSourceLoadException {
if(shard instanceof MonolithicShard)
return createIterator(new EntireStream());
ReadShard readShard = (ReadShard)shard;
StingSAMIterator iter = null;
// If there are no entries in the sequence dictionary, there can't possibly be any unmapped reads. Force state to 'unmapped'.
if( isSequenceDictionaryEmpty() )
intoUnmappedReads = true;
if (!intoUnmappedReads) {
if (lastReadPos == null) {
lastReadPos = GenomeLocParser.createGenomeLoc(getHeader().getSequenceDictionary().getSequence(0).getSequenceIndex(), 0, Integer.MAX_VALUE);
iter = createIterator(new MappedStreamSegment(lastReadPos));
return InitialReadIterator(readShard.getSize(), iter);
} else {
lastReadPos = GenomeLocParser.setStop(lastReadPos,-1);
iter = fastMappedReadSeek(readShard.getSize(), StingSAMIteratorAdapter.adapt(reads, createIterator(new MappedStreamSegment(lastReadPos))));
}
if (intoUnmappedReads && !includeUnmappedReads)
readShard.signalDone();
}
if (intoUnmappedReads && includeUnmappedReads) {
if (iter != null)
iter.close();
iter = toUnmappedReads(readShard.getSize());
if (!iter.hasNext())
readShard.signalDone();
}
return iter;
}
public abstract StingSAMIterator seek(Shard shard);
/**
* If we're in by-read mode, this indicates if we want
@ -270,160 +146,6 @@ public class SAMDataSource implements SimpleDataSource {
includeUnmappedReads = seeUnMappedReads;
}
/**
* For unit testing, add a custom iterator pool.
*
* @param resourcePool Custom mock iterator pool.
*/
void setResourcePool( SAMResourcePool resourcePool ) {
this.resourcePool = resourcePool;
}
/**
* Retrieve unmapped reads.
*
* @param readCount how many reads to retrieve
*
* @return the bounded iterator that you can use to get the intervaled reads from
*/
StingSAMIterator toUnmappedReads( long readCount ) {
StingSAMIterator iter = createIterator(new UnmappedStreamSegment(readsTaken, readCount));
readsTaken += readCount;
return iter;
}
/**
* A seek function for mapped reads.
*
* @param readCount how many reads to retrieve
* @param iter the iterator to use, seeked to the correct start location
*
* @return the bounded iterator that you can use to get the intervaled reads from. Will be a zero-length
* iterator if no reads are available.
* @throws SimpleDataSourceLoadException
*/
StingSAMIterator fastMappedReadSeek( long readCount, StingSAMIterator iter ) throws SimpleDataSourceLoadException {
BoundedReadIterator bound;
correctForReadPileupSeek(iter);
if (readsTaken == 0) {
return InitialReadIterator(readCount, iter);
}
int x = 0;
SAMRecord rec = null;
// Assuming that lastReadPos should never be null, because this is a mappedReadSeek
// and initial queries are handled by the previous conditional.
int lastContig = lastReadPos.getContigIndex();
int lastPos = (int)lastReadPos.getStart();
while (x < readsTaken) {
if (iter.hasNext()) {
rec = iter.next();
if (lastContig == rec.getReferenceIndex() && lastPos == rec.getAlignmentStart()) ++this.readsSeenAtLastPos;
else this.readsSeenAtLastPos = 1;
lastPos = rec.getAlignmentStart();
++x;
} else {
iter.close();
// jump contigs
lastReadPos = GenomeLocParser.toNextContig(lastReadPos);
if (lastReadPos == null) {
// check to see if we're using unmapped reads, if not return, we're done
readsTaken = 0;
intoUnmappedReads = true;
// fastMappedReadSeek must return an iterator, even if that iterator iterates through nothing.
return new NullSAMIterator(reads);
} else {
readsTaken = readCount;
readsSeenAtLastPos = 0;
lastReadPos = GenomeLocParser.setStop(lastReadPos,-1);
CloseableIterator<SAMRecord> ret = createIterator(new MappedStreamSegment(lastReadPos));
return new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, ret), readCount);
}
}
}
// if we're off the end of the last contig (into unmapped territory)
if (rec != null && rec.getAlignmentStart() == 0) {
readsTaken += readCount;
intoUnmappedReads = true;
}
// else we're not off the end, store our location
else if (rec != null) {
int stopPos = rec.getAlignmentStart();
if (stopPos < lastReadPos.getStart()) {
lastReadPos = GenomeLocParser.createGenomeLoc(lastReadPos.getContigIndex() + 1, stopPos, stopPos);
} else {
lastReadPos = GenomeLocParser.setStart(lastReadPos,rec.getAlignmentStart());
}
}
// in case we're run out of reads, get out
else {
throw new StingException("Danger: weve run out reads in fastMappedReadSeek");
//return null;
}
bound = new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, iter), readCount);
// return the iterator
return bound;
}
/**
* Determines whether the BAM file is completely unsequenced. Requires that the resource pool be initialized.
* @return True if the sequence dictionary is completely empty. False otherwise.
*/
private boolean isSequenceDictionaryEmpty() {
return getHeader().getSequenceDictionary().isEmpty();
}
/**
* Even though the iterator has seeked to the correct location, there may be multiple reads at that location,
* and we may have given some of them out already. Move the iterator to the correct location using the readsAtLastPos variable
*
* @param iter the iterator
*/
private void correctForReadPileupSeek( StingSAMIterator iter ) {
// move the number of reads we read from the last pos
boolean atLeastOneReadSeen = false; // we have a problem where some chomesomes don't have a single read (i.e. the chrN_random chrom.)
for(int i = 0; i < this.readsSeenAtLastPos && iter.hasNext(); i++,iter.next())
atLeastOneReadSeen = true;
if (readsSeenAtLastPos > 0 && !atLeastOneReadSeen) {
throw new SimpleDataSourceLoadException("Seek problem: reads at last position count != 0");
}
}
/**
* set the initial iterator
*
* @param readCount the number of reads
* @param iter the merging iterator
*
* @return a bounded read iterator at the first read position in the file.
*/
private BoundedReadIterator InitialReadIterator( long readCount, CloseableIterator<SAMRecord> iter ) {
BoundedReadIterator bound;
bound = new BoundedReadIterator(StingSAMIteratorAdapter.adapt(reads, iter), readCount);
this.readsTaken = readCount;
return bound;
}
/**
* Creates an iterator over the selected segment, from a resource pulled from the pool.
* @param segment Segment over which to gather reads.
* @return An iterator over just the reads in the given segment.
*/
private StingSAMIterator createIterator( DataStreamSegment segment ) {
StingSAMIterator iterator = resourcePool.iterator(segment);
StingSAMIterator malformedWrappedIterator = new MalformedSAMFilteringIterator( getHeader(), iterator, violations );
StingSAMIterator readWrappingIterator = new ReadWrappingIterator(malformedWrappedIterator);
return readWrappingIterator;
}
/**
* Filter reads based on user-specified criteria.
*
@ -434,11 +156,11 @@ public class SAMDataSource implements SimpleDataSource {
* @param supplementalFilters additional filters to apply to the reads.
* @return An iterator wrapped with filters reflecting the passed-in parameters. Will not be null.
*/
private StingSAMIterator applyDecoratingIterators(boolean enableVerification,
StingSAMIterator wrappedIterator,
Double downsamplingFraction,
Boolean noValidationOfReadOrder,
Collection<SamRecordFilter> supplementalFilters) {
protected StingSAMIterator applyDecoratingIterators(boolean enableVerification,
StingSAMIterator wrappedIterator,
Double downsamplingFraction,
Boolean noValidationOfReadOrder,
Collection<SamRecordFilter> supplementalFilters) {
// NOTE: this (and other filtering) should be done before on-the-fly sorting
// as there is no reason to sort something that we will end of throwing away
if (downsamplingFraction != null)

View File

@ -34,7 +34,6 @@ import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource
import org.broadinstitute.sting.gatk.traversals.*;
import org.broadinstitute.sting.gatk.walkers.*;
import org.broadinstitute.sting.gatk.io.OutputTracker;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import java.util.*;

View File

@ -71,12 +71,7 @@ public class TraverseReads extends TraversalEngine {
ShardDataProvider dataProvider,
T sum) {
if (shard instanceof ReadShard) {
logger.debug(String.format("TraverseReads.traverse Genomic interval is %s", ((ReadShard) shard).getSize()));
} else if (shard instanceof IntervalShard) {
logger.debug(String.format("TraverseReads.traverse Genomic interval is %s", ((IntervalShard) shard).getGenomeLoc()));
}
logger.debug(String.format("TraverseReads.traverse Covered dataset is %s", shard));
if (!(walker instanceof ReadWalker))
throw new IllegalArgumentException("Walker isn't a read walker!");

View File

@ -49,7 +49,7 @@ public class ShardStrategyFactoryTest extends BaseTest {
@Test
public void testReadNonInterval() {
ShardStrategy st = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS,header.getSequenceDictionary(),100);
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.READS,header.getSequenceDictionary(),100);
assertTrue(st instanceof ReadShardStrategy);
}
@ -57,19 +57,19 @@ public class ShardStrategyFactoryTest extends BaseTest {
public void testReadInterval() {
GenomeLoc l = GenomeLocParser.createGenomeLoc(0,1,100);
set.add(l);
ShardStrategy st = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS,header.getSequenceDictionary(),100,set);
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.READS,header.getSequenceDictionary(),100,set);
assertTrue(st instanceof IntervalShardStrategy);
}
@Test
public void testLinearNonInterval() {
ShardStrategy st = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,header.getSequenceDictionary(),100);
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,header.getSequenceDictionary(),100);
assertTrue(st instanceof LinearLocusShardStrategy);
}
@Test
public void testExpNonInterval() {
ShardStrategy st = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.EXPONENTIAL,header.getSequenceDictionary(),100);
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.EXPONENTIAL,header.getSequenceDictionary(),100);
assertTrue(st instanceof ExpGrowthLocusShardStrategy);
}
@ -77,7 +77,7 @@ public class ShardStrategyFactoryTest extends BaseTest {
public void testExpInterval() {
GenomeLoc l = GenomeLocParser.createGenomeLoc(0,1,100);
set.add(l);
ShardStrategy st = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.EXPONENTIAL,header.getSequenceDictionary(),100,set);
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.EXPONENTIAL,header.getSequenceDictionary(),100,set);
assertTrue(st instanceof ExpGrowthLocusShardStrategy);
}
@ -85,7 +85,7 @@ public class ShardStrategyFactoryTest extends BaseTest {
public void testLinearInterval() {
GenomeLoc l = GenomeLocParser.createGenomeLoc(0,1,100);
set.add(l);
ShardStrategy st = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,header.getSequenceDictionary(),100,set);
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,header.getSequenceDictionary(),100,set);
assertTrue(st instanceof LinearLocusShardStrategy);
}

View File

@ -81,16 +81,17 @@ public class SAMBAMDataSourceTest extends BaseTest {
@Test
public void testLinearBreakIterateAll() {
logger.warn("Executing testLinearBreakIterateAll");
// the sharding strat.
ShardStrategy strat = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
int count = 0;
// setup the data
fl.add(new File(validationDataLocation + "/NA12878.chrom6.SLX.SRP000032.2009_06.selected.bam"));
Reads reads = new Reads(fl);
// the sharding strat.
SAMDataSource data = new IndexDrivenSAMDataSource(reads);
ShardStrategy strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
int count = 0;
try {
SAMDataSource data = new SAMDataSource(reads);
for (Shard sh : strat) {
int readCount = 0;
count++;
@ -124,13 +125,14 @@ public class SAMBAMDataSourceTest extends BaseTest {
@Test
public void testMergingTwoBAMFiles() {
logger.warn("Executing testMergingTwoBAMFiles");
// the sharding strat.
ShardStrategy strat = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
// setup the test files
fl.add(new File(seqLocation + "/dirseq/analysis/cancer_exome/twoflowcell_sams/TCGA-06-0188.aligned.duplicates_marked.bam"));
Reads reads = new Reads(fl);
Reads reads = new Reads(fl);
// the sharding strat.
SAMDataSource data = new IndexDrivenSAMDataSource(reads);
ShardStrategy strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
ArrayList<Integer> readcountPerShard = new ArrayList<Integer>();
ArrayList<Integer> readcountPerShard2 = new ArrayList<Integer>();
@ -140,7 +142,6 @@ public class SAMBAMDataSourceTest extends BaseTest {
int count = 0;
try {
SAMDataSource data = new SAMDataSource(reads);
for (Shard sh : strat) {
int readCount = 0;
count++;
@ -173,11 +174,11 @@ public class SAMBAMDataSourceTest extends BaseTest {
count = 0;
// the sharding strat.
strat = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
data = new IndexDrivenSAMDataSource(reads);
strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
logger.debug("Pile two:");
try {
SAMDataSource data = new SAMDataSource(reads);
for (Shard sh : strat) {
int readCount = 0;
count++;

View File

@ -101,11 +101,11 @@ public class SAMByIntervalTest extends BaseTest {
int unmappedReadsSeen = 0;
int iterations = 0;
SAMDataSource data = new SAMDataSource(reads);
IndexDrivenSAMDataSource data = new IndexDrivenSAMDataSource(reads);
data.setResourcePool(gen);
GenomeLocSortedSet set = new GenomeLocSortedSet();
set.add(GenomeLocParser.createGenomeLoc(0, start, stop));
ShardStrategy strat = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL, gen.getHeader().getSequenceDictionary(), UNMAPPED_READ_COUNT, set);
ShardStrategy strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL, gen.getHeader().getSequenceDictionary(), UNMAPPED_READ_COUNT, set);
StingSAMIterator iter = data.seek(strat.next());
int count = 0;

View File

@ -79,7 +79,7 @@ public class SAMByReadsTest extends BaseTest {
int unmappedReadsSeen = 0;
int iterations = 0;
SAMDataSource data = new SAMDataSource(reads);
IndexDrivenSAMDataSource data = new IndexDrivenSAMDataSource(reads);
data.setResourcePool(gen);
++iterations;
StingSAMIterator ret = data.toUnmappedReads(100);
@ -109,10 +109,10 @@ public class SAMByReadsTest extends BaseTest {
targetReadCount = 5;
try {
int readCount = 0;
SAMDataSource data = new SAMDataSource(reads);
IndexDrivenSAMDataSource data = new IndexDrivenSAMDataSource(reads);
data.setResourcePool(gen);
shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
shardStrategy = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
while (shardStrategy.hasNext()) {
StingSAMIterator ret = data.seek(shardStrategy.next());
assertTrue(ret != null);
@ -140,11 +140,11 @@ public class SAMByReadsTest extends BaseTest {
targetReadCount = 3;
try {
int readCount = 0;
SAMDataSource data = new SAMDataSource(reads);
IndexDrivenSAMDataSource data = new IndexDrivenSAMDataSource(reads);
data.setResourcePool(gen);
shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
shardStrategy = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
while (shardStrategy.hasNext()) {

View File

@ -9,6 +9,7 @@ import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SimpleDataSourceLoadException;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.IndexDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.Reads;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
@ -83,22 +84,21 @@ public class BoundedReadIteratorTest extends BaseTest {
@Test
public void testBounding() {
logger.warn("Executing testBounding");
// the sharding strat.
ShardStrategy strat = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
int count = 0;
// setup the test files
fl.add(new File(seqLocation + "/dirseq/analysis/cancer_exome/twoflowcell_sams/TCGA-06-0188.aligned.duplicates_marked.bam"));
Reads reads = new Reads(fl);
SAMDataSource data = new IndexDrivenSAMDataSource(reads);
// the sharding strat.
ShardStrategy strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
int count = 0;
// our target read
final long boundedReadCount = 100;
long shardReadCount = 0;
try {
SAMDataSource data = new SAMDataSource(reads);
// make sure we have a shard
if (!strat.hasNext()) {
fail("Our shatter didn't give us a single piece, this is bad");

View File

@ -8,6 +8,7 @@ import org.broadinstitute.sting.gatk.datasources.shards.Shard;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategy;
import org.broadinstitute.sting.gatk.datasources.shards.ShardStrategyFactory;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.IndexDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.walkers.qc.CountReadsWalker;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.GenomeLocParser;
@ -115,13 +116,12 @@ public class TraverseReadsTest extends BaseTest {
}
GenomeLocParser.setupRefContigOrdering(ref);
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS,
SAMDataSource dataSource = new IndexDrivenSAMDataSource(new Reads(bamList));
dataSource.viewUnmappedReads(false);
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(dataSource,ShardStrategyFactory.SHATTER_STRATEGY.READS,
ref.getSequenceDictionary(),
readSize);
SAMDataSource dataSource = new SAMDataSource(new Reads(bamList));
dataSource.viewUnmappedReads(false);
countReadWalker.initialize();
Object accumulator = countReadWalker.reduceInit();
@ -162,13 +162,12 @@ public class TraverseReadsTest extends BaseTest {
}
GenomeLocParser.setupRefContigOrdering(ref);
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(ShardStrategyFactory.SHATTER_STRATEGY.READS,
SAMDataSource dataSource = new IndexDrivenSAMDataSource(new Reads(bamList));
dataSource.viewUnmappedReads(true);
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(dataSource,ShardStrategyFactory.SHATTER_STRATEGY.READS,
ref.getSequenceDictionary(),
readSize);
SAMDataSource dataSource = new SAMDataSource(new Reads(bamList));
dataSource.viewUnmappedReads(true);
countReadWalker.initialize();
Object accumulator = countReadWalker.reduceInit();