A few stopgap fixes to get the GATK to the point where the old sharding

infrastructure can be torn down:
1) New sharding system emulates old MonolithicSharding mechanism.
2) Better awareness of differences between fasta and BAM files when creating
   shards.


git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@2948 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
hanna 2010-03-07 21:01:25 +00:00
parent dd6122f682
commit a7fe07c404
10 changed files with 103 additions and 52 deletions

View File

@ -66,7 +66,7 @@ public class SAMFileReader2 extends SAMFileReader {
* @param indexFile Location of index file, or null in order to use the default index file (if present).
* @param eagerDecode eagerDecode if true, decode SAM record entirely when reading it.
*/
public SAMFileReader2(final File file, final File indexFile, final boolean eagerDecode){
public SAMFileReader2(final File file, File indexFile, final boolean eagerDecode){
super(file,indexFile,eagerDecode);
this.sourceFile = file;
close();
@ -74,11 +74,13 @@ public class SAMFileReader2 extends SAMFileReader {
try {
BAMFileReader2 reader = new BAMFileReader2(file,eagerDecode,getDefaultValidationStringency());
reader.setReader(this);
BAMFileIndex2 index = new BAMFileIndex2(indexFile != null ? indexFile : findIndexFileFromParent(file));
reader.setFileIndex(index);
JVMUtils.setFieldValue(getField("mReader"),this,reader);
JVMUtils.setFieldValue(getField("mFileIndex"),this,index);
if(indexFile != null || findIndexFileFromParent(file) != null) {
BAMFileIndex2 index = new BAMFileIndex2(indexFile != null ? indexFile : findIndexFileFromParent(file));
reader.setFileIndex(index);
JVMUtils.setFieldValue(getField("mFileIndex"),this,index);
}
}
catch(IOException ex) {
throw new StingException("Unable to load BAM file: " + file,ex);

View File

@ -652,12 +652,14 @@ public class GenomeAnalysisEngine {
ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL :
ShardStrategyFactory.SHATTER_STRATEGY.LINEAR;
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
referenceDataSource,
argCollection.experimentalSharding ? ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL : shardType,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,
intervals, maxIterations);
} else
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
referenceDataSource,
argCollection.experimentalSharding ? ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL : ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations);
@ -669,12 +671,16 @@ public class GenomeAnalysisEngine {
shardType = ShardStrategyFactory.SHATTER_STRATEGY.READS;
if (intervals != null && !intervals.isEmpty()) {
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,shardType,
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
referenceDataSource,
shardType,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,
intervals, maxIterations);
} else {
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,shardType,
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
referenceDataSource,
shardType,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations);
}
@ -683,6 +689,7 @@ public class GenomeAnalysisEngine {
Utils.warnUser("walker is of type LocusWindow (which operates over intervals), but no intervals were provided." +
"This may be unintentional, check your command-line arguments.");
shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
referenceDataSource,
ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL,
drivingDataSource.getSequenceDictionary(),
SHARD_SIZE,

View File

@ -4,6 +4,7 @@ import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.BlockDrivenSAMDataSource;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMReaderID;
@ -48,7 +49,7 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
/**
* The data source to use when performing this sharding.
*/
private final BlockDrivenSAMDataSource dataSource;
private final BlockDrivenSAMDataSource reads;
/** our storage of the genomic locations they'd like to shard over */
private final List<FilePointer> filePointers = new ArrayList<FilePointer>();
@ -60,34 +61,42 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
/**
* construct the shard strategy from a seq dictionary, a shard size, and and genomeLocs
* @param dataSource Data source from which to load index data.
* @param reads Data source from which to load index data.
* @param locations List of locations for which to load data.
*/
IndexDelimitedLocusShardStrategy(SAMDataSource dataSource, GenomeLocSortedSet locations) {
if(dataSource != null) {
IndexDelimitedLocusShardStrategy(SAMDataSource reads, IndexedFastaSequenceFile reference, GenomeLocSortedSet locations) {
if(reads != null) {
// Shard based on reads.
// TODO: Push this sharding into the data source.
if(!(dataSource instanceof BlockDrivenSAMDataSource))
if(!(reads instanceof BlockDrivenSAMDataSource))
throw new StingException("Cannot power an IndexDelimitedLocusShardStrategy with this data source.");
List<GenomeLoc> intervals;
if(locations == null) {
// If no locations were passed in, shard the entire BAM file.
SAMFileHeader header = dataSource.getHeader();
SAMFileHeader header = reads.getHeader();
intervals = new ArrayList<GenomeLoc>();
for(SAMSequenceRecord sequenceRecord: header.getSequenceDictionary().getSequences())
intervals.add(GenomeLocParser.createGenomeLoc(sequenceRecord.getSequenceName(),1,sequenceRecord.getSequenceLength()));
for(SAMSequenceRecord readsSequenceRecord: header.getSequenceDictionary().getSequences()) {
// Check this sequence against the reference sequence dictionary.
// TODO: Do a better job of merging reads + reference.
SAMSequenceRecord refSequenceRecord = reference.getSequenceDictionary().getSequence(readsSequenceRecord.getSequenceName());
if(refSequenceRecord != null) {
final int length = Math.min(readsSequenceRecord.getSequenceLength(),refSequenceRecord.getSequenceLength());
intervals.add(GenomeLocParser.createGenomeLoc(readsSequenceRecord.getSequenceName(),1,length));
}
}
}
else
intervals = locations.toList();
this.dataSource = (BlockDrivenSAMDataSource)dataSource;
filePointers.addAll(IntervalSharder.shardIntervals(this.dataSource,intervals,this.dataSource.getNumIndexLevels()-1));
this.reads = (BlockDrivenSAMDataSource)reads;
filePointers.addAll(IntervalSharder.shardIntervals(this.reads,intervals,this.reads.getNumIndexLevels()-1));
}
else {
this.dataSource = null;
// TODO: Non-intervaled ref traversals.
this.reads = null;
for(GenomeLoc interval: locations)
filePointers.add(new FilePointer(interval));
}
@ -111,7 +120,7 @@ public class IndexDelimitedLocusShardStrategy implements ShardStrategy {
*/
public IndexDelimitedLocusShard next() {
FilePointer nextFilePointer = filePointerIterator.next();
Map<SAMReaderID,List<Chunk>> chunksBounding = dataSource!=null ? dataSource.getFilePointersBounding(nextFilePointer.bin) : null;
Map<SAMReaderID,List<Chunk>> chunksBounding = reads!=null ? reads.getFilePointersBounding(nextFilePointer.bin) : null;
return new IndexDelimitedLocusShard(nextFilePointer.locations,chunksBounding,Shard.ShardType.LOCUS_INTERVAL);
}

View File

@ -4,6 +4,7 @@ import net.sf.samtools.SAMSequenceDictionary;
import org.apache.log4j.Logger;
import org.broadinstitute.sting.utils.StingException;
import org.broadinstitute.sting.utils.GenomeLocSortedSet;
import org.broadinstitute.sting.utils.fasta.IndexedFastaSequenceFile;
import org.broadinstitute.sting.gatk.datasources.simpleDataSources.SAMDataSource;
import java.io.File;
@ -48,26 +49,28 @@ public class ShardStrategyFactory {
/**
* get a new shatter strategy
*
* @param dataSource File pointer to BAM. TODO: Eliminate this argument; pass a data source instead!
* @param readsDataSource File pointer to BAM.
* @param referenceDataSource File pointer to reference.
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @return a shard strategy capable of dividing input data into shards.
*/
static public ShardStrategy shatter(SAMDataSource dataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize) {
return ShardStrategyFactory.shatter(dataSource, strat, dic, startingSize, -1L);
static public ShardStrategy shatter(SAMDataSource readsDataSource, IndexedFastaSequenceFile referenceDataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize) {
return ShardStrategyFactory.shatter(readsDataSource, referenceDataSource, strat, dic, startingSize, -1L);
}
/**
* get a new shatter strategy
*
* @param dataSource File pointer to BAM.
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @param readsDataSource File pointer to BAM.
* @param referenceDataSource File pointer to reference.
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @return a shard strategy capable of dividing input data into shards.
*/
static public ShardStrategy shatter(SAMDataSource dataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, long limitByCount) {
static public ShardStrategy shatter(SAMDataSource readsDataSource, IndexedFastaSequenceFile referenceDataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, long limitByCount) {
switch (strat) {
case LINEAR:
return new LinearLocusShardStrategy(dic, startingSize, limitByCount);
@ -76,9 +79,9 @@ public class ShardStrategyFactory {
case INTERVAL:
throw new StingException("Requested trategy: " + strat + " doesn't work with the limiting count (-M) command line option");
case LOCUS_EXPERIMENTAL:
return new IndexDelimitedLocusShardStrategy(dataSource,null);
return new IndexDelimitedLocusShardStrategy(readsDataSource,referenceDataSource,null);
case READS_EXPERIMENTAL:
return new BlockDelimitedReadShardStrategy(dataSource,null);
return new BlockDelimitedReadShardStrategy(readsDataSource,null);
default:
throw new StingException("Strategy: " + strat + " isn't implemented for this type of shatter request");
}
@ -89,25 +92,29 @@ public class ShardStrategyFactory {
/**
* get a new shatter strategy
*
* @param readsDataSource File pointer to BAM.
* @param referenceDataSource File pointer to reference.
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @return
* @return a shard strategy capable of dividing input data into shards.
*/
static public ShardStrategy shatter(SAMDataSource dataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst) {
return ShardStrategyFactory.shatter(dataSource, strat, dic, startingSize, lst, -1l);
static public ShardStrategy shatter(SAMDataSource readsDataSource, IndexedFastaSequenceFile referenceDataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst) {
return ShardStrategyFactory.shatter(readsDataSource, referenceDataSource, strat, dic, startingSize, lst, -1l);
}
/**
* get a new shatter strategy
*
* @param readsDataSource The reads used to shatter this file.
* @param referenceDataSource The reference used to shatter this file.
* @param strat what's our strategy - SHATTER_STRATEGY type
* @param dic the seq dictionary
* @param startingSize the starting size
* @return
* @return A strategy for shattering this data.
*/
static public ShardStrategy shatter(SAMDataSource dataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst, long limitDataCount) {
static public ShardStrategy shatter(SAMDataSource readsDataSource, IndexedFastaSequenceFile referenceDataSource, SHATTER_STRATEGY strat, SAMSequenceDictionary dic, long startingSize, GenomeLocSortedSet lst, long limitDataCount) {
switch (strat) {
case LINEAR:
return new LinearLocusShardStrategy(dic, startingSize, lst, limitDataCount);
@ -116,9 +123,9 @@ public class ShardStrategyFactory {
case READS:
return new IntervalShardStrategy(startingSize, lst, Shard.ShardType.READ_INTERVAL);
case LOCUS_EXPERIMENTAL:
return new IndexDelimitedLocusShardStrategy(dataSource,lst);
return new IndexDelimitedLocusShardStrategy(readsDataSource,referenceDataSource,lst);
case READS_EXPERIMENTAL:
return new BlockDelimitedReadShardStrategy(dataSource,lst);
return new BlockDelimitedReadShardStrategy(readsDataSource,lst);
default:
throw new StingException("Strategy: " + strat + " isn't implemented");
}

View File

@ -245,18 +245,19 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
}
public StingSAMIterator seek(Shard shard) {
// todo: refresh monolithic sharding implementation
if(shard instanceof MonolithicShard)
return seekMonolithic(shard);
if(!(shard instanceof BAMFormatAwareShard))
throw new StingException("BlockDrivenSAMDataSource cannot operate on shards of type: " + shard.getClass());
BAMFormatAwareShard bamAwareShard = (BAMFormatAwareShard)shard;
if(bamAwareShard.buffersReads()) {
return bamAwareShard.iterator();
return bamAwareShard.iterator();
}
else {
SAMReaders readers = resourcePool.getAvailableReaders();
// Since the beginning of time for the GATK, enableVerification has been true only for ReadShards, because
//
return getIterator(readers,bamAwareShard,shard instanceof ReadShard);
}
}
@ -282,6 +283,31 @@ public class BlockDrivenSAMDataSource extends SAMDataSource {
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
}
/**
* A stopgap measure to handle monolithic sharding
* @param shard the (monolithic) shard.
* @return An iterator over the monolithic shard.
*/
private StingSAMIterator seekMonolithic(Shard shard) {
SAMReaders readers = resourcePool.getAvailableReaders();
Map<SAMFileReader,CloseableIterator<SAMRecord>> readerToIteratorMap = new HashMap<SAMFileReader,CloseableIterator<SAMRecord>>();
for(SAMReaderID id: getReaderIDs()) {
SAMFileReader2 reader2 = (SAMFileReader2)readers.getReader(id);
readerToIteratorMap.put(reader2,reader2.iterator());
}
// Set up merging and filtering to dynamically merge together multiple BAMs and filter out records not in the shard set.
SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(readers.values(),SAMFileHeader.SortOrder.coordinate,true);
CloseableIterator<SAMRecord> iterator = new MergingSamRecordIterator(headerMerger,readerToIteratorMap,true);
return applyDecoratingIterators(shard instanceof ReadShard,
new ReleasingIterator(readers,StingSAMIteratorAdapter.adapt(reads,iterator)),
reads.getDownsamplingFraction(),
reads.getValidationExclusionList().contains(ValidationExclusion.TYPE.NO_READ_ORDER_VERIFICATION),
reads.getSupplementalFilters());
}
/**
* Gets the merged header from the SAM file.

View File

@ -49,7 +49,7 @@ public class ShardStrategyFactoryTest extends BaseTest {
@Test
public void testReadNonInterval() {
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.READS,header.getSequenceDictionary(),100);
ShardStrategy st = ShardStrategyFactory.shatter(null,null,ShardStrategyFactory.SHATTER_STRATEGY.READS,header.getSequenceDictionary(),100);
assertTrue(st instanceof ReadShardStrategy);
}
@ -57,13 +57,13 @@ public class ShardStrategyFactoryTest extends BaseTest {
public void testReadInterval() {
GenomeLoc l = GenomeLocParser.createGenomeLoc(0,1,100);
set.add(l);
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.READS,header.getSequenceDictionary(),100,set);
ShardStrategy st = ShardStrategyFactory.shatter(null,null,ShardStrategyFactory.SHATTER_STRATEGY.READS,header.getSequenceDictionary(),100,set);
assertTrue(st instanceof IntervalShardStrategy);
}
@Test
public void testLinearNonInterval() {
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,header.getSequenceDictionary(),100);
ShardStrategy st = ShardStrategyFactory.shatter(null,null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,header.getSequenceDictionary(),100);
assertTrue(st instanceof LinearLocusShardStrategy);
}
@ -71,7 +71,7 @@ public class ShardStrategyFactoryTest extends BaseTest {
public void testLinearInterval() {
GenomeLoc l = GenomeLocParser.createGenomeLoc(0,1,100);
set.add(l);
ShardStrategy st = ShardStrategyFactory.shatter(null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,header.getSequenceDictionary(),100,set);
ShardStrategy st = ShardStrategyFactory.shatter(null,null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR,header.getSequenceDictionary(),100,set);
assertTrue(st instanceof LinearLocusShardStrategy);
}

View File

@ -89,7 +89,7 @@ public class SAMBAMDataSourceTest extends BaseTest {
// the sharding strat.
SAMDataSource data = new IndexDrivenSAMDataSource(reads);
ShardStrategy strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
ShardStrategy strat = ShardStrategyFactory.shatter(data,null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
int count = 0;
try {
@ -134,7 +134,7 @@ public class SAMBAMDataSourceTest extends BaseTest {
// the sharding strat.
SAMDataSource data = new IndexDrivenSAMDataSource(reads);
ShardStrategy strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
ShardStrategy strat = ShardStrategyFactory.shatter(data,null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
ArrayList<Integer> readcountPerShard = new ArrayList<Integer>();
ArrayList<Integer> readcountPerShard2 = new ArrayList<Integer>();
@ -177,7 +177,7 @@ public class SAMBAMDataSourceTest extends BaseTest {
count = 0;
// the sharding strat.
data = new IndexDrivenSAMDataSource(reads);
strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
strat = ShardStrategyFactory.shatter(data,null,ShardStrategyFactory.SHATTER_STRATEGY.LINEAR, seq.getSequenceDictionary(), 100000);
logger.debug("Pile two:");
try {

View File

@ -105,7 +105,7 @@ public class SAMByIntervalTest extends BaseTest {
data.setResourcePool(gen);
GenomeLocSortedSet set = new GenomeLocSortedSet();
set.add(GenomeLocParser.createGenomeLoc(0, start, stop));
ShardStrategy strat = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL, gen.getHeader().getSequenceDictionary(), UNMAPPED_READ_COUNT, set);
ShardStrategy strat = ShardStrategyFactory.shatter(data,null,ShardStrategyFactory.SHATTER_STRATEGY.INTERVAL, gen.getHeader().getSequenceDictionary(), UNMAPPED_READ_COUNT, set);
StingSAMIterator iter = data.seek(strat.next());
int count = 0;

View File

@ -112,7 +112,7 @@ public class SAMByReadsTest extends BaseTest {
IndexDrivenSAMDataSource data = new IndexDrivenSAMDataSource(reads);
data.setResourcePool(gen);
shardStrategy = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
shardStrategy = ShardStrategyFactory.shatter(data,null,ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
while (shardStrategy.hasNext()) {
StingSAMIterator ret = data.seek(shardStrategy.next());
assertTrue(ret != null);
@ -144,7 +144,7 @@ public class SAMByReadsTest extends BaseTest {
data.setResourcePool(gen);
shardStrategy = ShardStrategyFactory.shatter(data,ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
shardStrategy = ShardStrategyFactory.shatter(data,null,ShardStrategyFactory.SHATTER_STRATEGY.READS, gen.getHeader().getSequenceDictionary(), targetReadCount);
while (shardStrategy.hasNext()) {

View File

@ -118,7 +118,7 @@ public class TraverseReadsTest extends BaseTest {
SAMDataSource dataSource = new IndexDrivenSAMDataSource(new Reads(bamList));
dataSource.viewUnmappedReads(false);
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(dataSource,ShardStrategyFactory.SHATTER_STRATEGY.READS,
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(dataSource,ref,ShardStrategyFactory.SHATTER_STRATEGY.READS,
ref.getSequenceDictionary(),
readSize);
@ -164,7 +164,7 @@ public class TraverseReadsTest extends BaseTest {
SAMDataSource dataSource = new IndexDrivenSAMDataSource(new Reads(bamList));
dataSource.viewUnmappedReads(true);
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(dataSource,ShardStrategyFactory.SHATTER_STRATEGY.READS,
ShardStrategy shardStrategy = ShardStrategyFactory.shatter(dataSource,ref,ShardStrategyFactory.SHATTER_STRATEGY.READS,
ref.getSequenceDictionary(),
readSize);