fixes for the Tree index, and some small clean-up in the GATK.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3991 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
aaron 2010-08-09 20:41:50 +00:00
parent 3eee3183fd
commit 0f29f2ae3f
10 changed files with 93 additions and 63 deletions

View File

@ -169,11 +169,10 @@ public class GenomeAnalysisEngine {
ShardStrategy shardStrategy = getShardStrategy(my_walker, ShardStrategy shardStrategy = getShardStrategy(my_walker,
microScheduler.getReference(), microScheduler.getReference(),
intervals, intervals,
argCollection.maximumEngineIterations,
readsDataSource != null ? readsDataSource.getReadsInfo().getValidationExclusionList() : null); readsDataSource != null ? readsDataSource.getReadsInfo().getValidationExclusionList() : null);
// execute the microscheduler, storing the results // execute the microscheduler, storing the results
Object result = microScheduler.execute(my_walker, shardStrategy, argCollection.maximumEngineIterations); Object result = microScheduler.execute(my_walker, shardStrategy);
//monitor.stop(); //monitor.stop();
//logger.info(String.format("Maximum heap size consumed: %d",monitor.getMaxMemoryUsed())); //logger.info(String.format("Maximum heap size consumed: %d",monitor.getMaxMemoryUsed()));
@ -719,13 +718,11 @@ public class GenomeAnalysisEngine {
* @param walker Walker for which to infer sharding strategy. * @param walker Walker for which to infer sharding strategy.
* @param drivingDataSource Data on which to shard. * @param drivingDataSource Data on which to shard.
* @param intervals Intervals to use when limiting sharding. * @param intervals Intervals to use when limiting sharding.
* @param maxIterations the maximum number of iterations to run through
* @return Sharding strategy for this driving data source. * @return Sharding strategy for this driving data source.
*/ */
protected ShardStrategy getShardStrategy(Walker walker, protected ShardStrategy getShardStrategy(Walker walker,
ReferenceSequenceFile drivingDataSource, ReferenceSequenceFile drivingDataSource,
GenomeLocSortedSet intervals, GenomeLocSortedSet intervals,
Integer maxIterations,
ValidationExclusion exclusions) { ValidationExclusion exclusions) {
// Use monolithic sharding if no index is present. Monolithic sharding is always required for the original // Use monolithic sharding if no index is present. Monolithic sharding is always required for the original
// sharding system; it's required with the new sharding system only for locus walkers. // sharding system; it's required with the new sharding system only for locus walkers.
@ -775,13 +772,13 @@ public class GenomeAnalysisEngine {
ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL, ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, SHARD_SIZE,
intervals, maxIterations); intervals);
} else } else
shardStrategy = ShardStrategyFactory.shatter(readsDataSource, shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
referenceDataSource.getReference(), referenceDataSource.getReference(),
ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL, ShardStrategyFactory.SHATTER_STRATEGY.LOCUS_EXPERIMENTAL,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations); SHARD_SIZE);
} else if (walker instanceof ReadWalker || } else if (walker instanceof ReadWalker ||
walker instanceof DuplicateWalker) { walker instanceof DuplicateWalker) {
shardType = ShardStrategyFactory.SHATTER_STRATEGY.READS_EXPERIMENTAL; shardType = ShardStrategyFactory.SHATTER_STRATEGY.READS_EXPERIMENTAL;
@ -792,13 +789,13 @@ public class GenomeAnalysisEngine {
shardType, shardType,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, SHARD_SIZE,
intervals, maxIterations); intervals);
} else { } else {
shardStrategy = ShardStrategyFactory.shatter(readsDataSource, shardStrategy = ShardStrategyFactory.shatter(readsDataSource,
referenceDataSource.getReference(), referenceDataSource.getReference(),
shardType, shardType,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations); SHARD_SIZE);
} }
} else if (walker instanceof ReadPairWalker) { } else if (walker instanceof ReadPairWalker) {
if(readsDataSource != null && readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.queryname) if(readsDataSource != null && readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.queryname)
@ -810,7 +807,7 @@ public class GenomeAnalysisEngine {
referenceDataSource.getReference(), referenceDataSource.getReference(),
ShardStrategyFactory.SHATTER_STRATEGY.READS_EXPERIMENTAL, ShardStrategyFactory.SHATTER_STRATEGY.READS_EXPERIMENTAL,
drivingDataSource.getSequenceDictionary(), drivingDataSource.getSequenceDictionary(),
SHARD_SIZE, maxIterations); SHARD_SIZE);
} else } else
throw new StingException("Unable to support walker of type" + walker.getClass().getName()); throw new StingException("Unable to support walker of type" + walker.getClass().getName());

View File

@ -128,10 +128,6 @@ public class GATKArgumentCollection {
@Output(fullName = "outerr", shortName = "oe", doc = "A joint file for 'normal' and error output presented to the walker. Will overwrite contents if file exists.", required = false) @Output(fullName = "outerr", shortName = "oe", doc = "A joint file for 'normal' and error output presented to the walker. Will overwrite contents if file exists.", required = false)
public String outErrFileName = null; public String outErrFileName = null;
@Element(required = false)
@Argument(fullName = "maximum_iterations", shortName = "M", doc = "Maximum number of iterations to process before exiting, the lower bound is zero. Intended only for testing", required = false)
public Integer maximumEngineIterations = -1;
@Element(required = false) @Element(required = false)
@Argument(fullName = "filterZeroMappingQualityReads", shortName = "fmq0", doc = "If true, mapping quality zero reads will be filtered at the lowest GATK level. Vastly improves performance at areas with abnormal depth due to mapping Q0 reads", required = false) @Argument(fullName = "filterZeroMappingQualityReads", shortName = "fmq0", doc = "If true, mapping quality zero reads will be filtered at the lowest GATK level. Vastly improves performance at areas with abnormal depth due to mapping Q0 reads", required = false)
public Boolean filterZeroMappingQualityReads = false; public Boolean filterZeroMappingQualityReads = false;
@ -294,9 +290,6 @@ public class GATKArgumentCollection {
if (!(other.readBufferSize == null && this.readBufferSize == null) && (other.readBufferSize == null || this.readBufferSize == null)) { if (!(other.readBufferSize == null && this.readBufferSize == null) && (other.readBufferSize == null || this.readBufferSize == null)) {
return false; return false;
} }
if (!other.maximumEngineIterations.equals(this.maximumEngineIterations)) {
return false;
}
if (!other.strictnessLevel.equals(this.strictnessLevel)) { if (!other.strictnessLevel.equals(this.strictnessLevel)) {
return false; return false;
} }

View File

@ -113,15 +113,11 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
} }
} }
public Object execute( Walker walker, ShardStrategy shardStrategy, int maxIterations ) { public Object execute( Walker walker, ShardStrategy shardStrategy ) {
// Fast fail for walkers not supporting TreeReducible interface. // Fast fail for walkers not supporting TreeReducible interface.
if (!( walker instanceof TreeReducible )) if (!( walker instanceof TreeReducible ))
throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers"); throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers");
// Having maxiterations in the execute method is a holdover from the old TraversalEngine days.
// Lets do something else with this.
traversalEngine.setMaximumIterations(maxIterations);
ReduceTree reduceTree = new ReduceTree(this); ReduceTree reduceTree = new ReduceTree(this);
initializeWalker(walker); initializeWalker(walker);

View File

@ -47,11 +47,7 @@ public class LinearMicroScheduler extends MicroScheduler {
* @param walker Computation to perform over dataset. * @param walker Computation to perform over dataset.
* @param shardStrategy A strategy for sharding the data. * @param shardStrategy A strategy for sharding the data.
*/ */
public Object execute(Walker walker, ShardStrategy shardStrategy, int maxIterations) { public Object execute(Walker walker, ShardStrategy shardStrategy) {
// Having maxiterations in the execute method is a holdover from the old TraversalEngine days.
// Lets do something else with this.
traversalEngine.setMaximumIterations(maxIterations);
walker.initialize(); walker.initialize();
Accumulator accumulator = Accumulator.create(engine,walker); Accumulator accumulator = Accumulator.create(engine,walker);

View File

@ -133,7 +133,7 @@ public abstract class MicroScheduler {
* *
* @return the return type of the walker * @return the return type of the walker
*/ */
public abstract Object execute(Walker walker, ShardStrategy shardStrategy, int iterations ); public abstract Object execute(Walker walker, ShardStrategy shardStrategy);
/** /**
* Retrieves the object responsible for tracking and managing output. * Retrieves the object responsible for tracking and managing output.

View File

@ -24,20 +24,9 @@ public abstract class TraversalEngine<M,T,WalkerType extends Walker<M,T>,Provide
private final long MAX_PROGRESS_PRINT_TIME = 30 * 1000; // 10 seconds in millisecs private final long MAX_PROGRESS_PRINT_TIME = 30 * 1000; // 10 seconds in millisecs
private final long N_RECORDS_TO_PRINT = 1000000; private final long N_RECORDS_TO_PRINT = 1000000;
// Maximum number of reads to process before finishing
protected long maximumIterations = -1;
/** our log, which we want to capture anything from this class */ /** our log, which we want to capture anything from this class */
protected static Logger logger = Logger.getLogger(TraversalEngine.class); protected static Logger logger = Logger.getLogger(TraversalEngine.class);
/**
* set the max number of iterations
* @param maximumIterations the number of iterations
*/
public void setMaximumIterations(final int maximumIterations) {
this.maximumIterations = maximumIterations;
}
/** /**
* @param curTime (current runtime, in millisecs) * @param curTime (current runtime, in millisecs)
* *

View File

@ -196,11 +196,6 @@ public class TraverseDuplicates<M,T> extends TraversalEngine<M,T,DuplicateWalker
} }
printProgress(DUPS_STRING, site); printProgress(DUPS_STRING, site);
if (this.maximumIterations > 0 && TraversalStatistics.nRecords > this.maximumIterations) {
logger.warn(String.format(("Maximum number of duplicate sets encountered, terminating traversal " + TraversalStatistics.nRecords)));
break;
}
} }
return sum; return sum;

View File

@ -76,11 +76,6 @@ public class TraverseLoci<M,T> extends TraversalEngine<M,T,LocusWalker<M,T>,Locu
sum = walker.reduce(x, sum); sum = walker.reduce(x, sum);
} }
if (this.maximumIterations > 0 && TraversalStatistics.nRecords > this.maximumIterations) {
logger.warn(String.format("Maximum number of reads encountered, terminating traversal " + TraversalStatistics.nRecords));
break;
}
printProgress(LOCI_STRING, locus.getLocation()); printProgress(LOCI_STRING, locus.getLocation());
} }
} }

View File

@ -80,7 +80,6 @@ public class GATKArgumentCollectionUnitTest extends BaseTest {
List<File> input = new ArrayList<File>(); List<File> input = new ArrayList<File>();
input.add(new File("test.file")); input.add(new File("test.file"));
collect.samFiles = input; collect.samFiles = input;
collect.maximumEngineIterations = -1;
collect.strictnessLevel = SAMFileReader.ValidationStringency.STRICT; collect.strictnessLevel = SAMFileReader.ValidationStringency.STRICT;
collect.referenceFile = new File("referenceFile".toLowerCase()); collect.referenceFile = new File("referenceFile".toLowerCase());
collect.DBSNPFile = "DBSNPFile".toLowerCase(); collect.DBSNPFile = "DBSNPFile".toLowerCase();

View File

@ -14,6 +14,7 @@ import org.broad.tribble.vcf.VCFCodec;
import org.broadinstitute.sting.BaseTest; import org.broadinstitute.sting.BaseTest;
import org.broadinstitute.sting.gatk.refdata.features.annotator.AnnotatorInputTableCodec; import org.broadinstitute.sting.gatk.refdata.features.annotator.AnnotatorInputTableCodec;
import org.broadinstitute.sting.gatk.refdata.features.annotator.AnnotatorInputTableFeature; import org.broadinstitute.sting.gatk.refdata.features.annotator.AnnotatorInputTableFeature;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.GenomeLocParser; import org.broadinstitute.sting.utils.GenomeLocParser;
import org.broadinstitute.sting.utils.collections.Pair; import org.broadinstitute.sting.utils.collections.Pair;
import org.junit.Assert; import org.junit.Assert;
@ -162,35 +163,104 @@ public class IndexPerformanceTests extends BaseTest {
//@Test //@Test
public void testBigTable() { public void testBigTable() {
// store the mapping of location -> variant; this will get big
Map<GenomeLoc, Integer> features = new TreeMap<GenomeLoc,Integer>();
Map<Integer,Integer> bucketToCount = getMapOfFeatures(features,false);
Pair<BasicFeatureSource, SAMSequenceDictionary> pairing;
Map<GenomeLoc, Integer> features2 = new TreeMap<GenomeLoc,Integer>();
Map<Integer,Integer> bucketToCount2 = getMapOfFeatures(features2,true);
System.err.println("Summary: ");
System.err.println("Summary: tree " + features.size());
System.err.println("Summary: linear " + features2.size());
// compare the two
for (Map.Entry<GenomeLoc,Integer> entry: features.entrySet()) {
if (!features2.containsKey(entry.getKey())) {
System.err.println("key " + entry + " missing from linear, count " + entry.getValue());
}
else if (features2.get(entry.getKey()) != entry.getValue()) {
/*System.err.println("counts are not equal at " +
entry.getKey() +
" features2.get(entry.getKey()) = " +
features2.get(entry.getKey()) +
" feature1 = " + entry.getValue());*/
}
if (features2.containsKey(entry.getKey())) features2.remove(entry.getKey());
}
System.err.println("Missing from the tree :");
for (Map.Entry<GenomeLoc,Integer> entry2: features2.entrySet()) {
System.err.println("Position " + entry2.getKey() + " count = " + entry2.getValue());
}
for (Integer bucket : bucketToCount.keySet()) {
if (!bucketToCount2.get(bucket).equals(bucketToCount.get(bucket))) {
System.err.println("Bucket " + bucket + " tree != linear, " + bucketToCount2.get(bucket) + " " + bucketToCount.get(bucket));
}
}
}
private Map<Integer,Integer> getMapOfFeatures(Map<GenomeLoc, Integer> features, boolean useLinear) {
File bigTable = new File("/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/slowAnnotator/big.table.txt"); File bigTable = new File("/humgen/gsa-hpprojects/dev/depristo/oneOffProjects/slowAnnotator/big.table.txt");
TribbleRMDTrackBuilder.useLinearIndex = false; TribbleRMDTrackBuilder.useLinearIndex = useLinear;
TribbleRMDTrackBuilder.binSize = 1000; TribbleRMDTrackBuilder.binSize = 1000;
deleteIndex(inputFiles.get("Big Table")); deleteIndex(inputFiles.get("Big Table"));
// time creating the index // time creating the index
logger.warn("creating index"); logger.warn("creating index");
long createTime = System.currentTimeMillis();
Map<Integer,Integer> bucketToCount = new TreeMap<Integer,Integer>();
Pair<BasicFeatureSource, SAMSequenceDictionary> pairing = builder.createFeatureReader(inputTypes.get("Big Table"),inputFiles.get("Big Table")); Pair<BasicFeatureSource, SAMSequenceDictionary> pairing = builder.createFeatureReader(inputTypes.get("Big Table"),inputFiles.get("Big Table"));
createTime = System.currentTimeMillis() - createTime;
//System.err.println("index creation took " + createTime);
PrintWriter stream = null;
logger.warn("reading and writing");
try { try {
stream = new PrintWriter(new File("bigTable.out.tree")); for (int x = 5000; x < 6000; x = x + 1000) {
} catch (FileNotFoundException e) { int bucketCount = 0;
Assert.fail("Fail!!!");
}
try {
for (int x = 1; x < 200000; x = x + 1000) {
CloseableTribbleIterator<Feature> iter = pairing.first.query("chr1", x, x+1000); // query CloseableTribbleIterator<Feature> iter = pairing.first.query("chr1", x, x+1000); // query
for (Feature feat : iter) { for (Feature feat : iter) {
stream.println(((AnnotatorInputTableFeature)feat).toString()); GenomeLoc loc = GenomeLocParser.createGenomeLoc(feat.getChr(),feat.getStart(),feat.getEnd());
if (loc.getStop() < 5000 || loc.getStart() > 6000) continue;
int count = 0;
if (features.containsKey(loc))
count = features.get(loc)+1;
features.put(loc,count);
bucketCount++;
} }
bucketToCount.put(x,bucketCount);
} }
} catch (IOException e) { } catch (IOException e) {
Assert.fail("Unable to load file for query!!"); Assert.fail("Unable to load file for query!!");
} }
stream.close(); return bucketToCount;
}
//@Test
public void testGetTreeIndexLocation() {
File bigTable = new File("small.table.txt");
TribbleRMDTrackBuilder.useLinearIndex = false;
TribbleRMDTrackBuilder.binSize = 1000;
deleteIndex(bigTable);
// time creating the index
logger.warn("creating index");
Map<Integer,Integer> bucketToCount = new TreeMap<Integer,Integer>();
Pair<BasicFeatureSource, SAMSequenceDictionary> pairing = builder.createFeatureReader(inputTypes.get("Big Table"),bigTable);
try {
int count= 0;
CloseableTribbleIterator<Feature> iter = null;
for (int x = 5000; x < 6000; x = x + 1000)
iter = pairing.first.query("chr1", x, x+1000); // query
for (Feature feat : iter) {
GenomeLoc loc = GenomeLocParser.createGenomeLoc(feat.getChr(),feat.getStart(),feat.getEnd());
if (loc.getStop() < 5000 || loc.getStart() > 6000) continue;
count++;
}
System.err.println(count);
} catch (IOException e) {
Assert.fail("Unable to load file for query!!");
}
} }