New ActiveRegionShardBalancer allows efficient NanoScheduling
-- Previously we used the LocusShardBalancer for the haplotype caller, which meant that TraverseActiveRegions saw its shards grouped in chunks of 16kb bits on the genome. These locus shards are useful when you want to use the HierarchicalMicroScheduler, as they provide fine-grained accessed to the underlying BAM, but they have two major drawbacks (1) we have to fairly frequently reset our state in TAR to handle moving between shard boundaries and (2) with the nano scheduled TAR we end up blocking at the end of each shard while our threads all finish processing. -- This commit changes the system over to using an ActiveRegionShardBalancers, that combines all of the shard data for a single contig into a single combined shard. This ensures that TAR, and by extensions the HaplotypeCaller, gets all of the data on a single contig together so the the NanoSchedule runs efficiently instead of blocking over and over at shard boundaries. This simple change allows us to scale efficiently to around 8 threads in the nano scheduler: -- See https://www.dropbox.com/s/k7f280pd2zt0lyh/hc_nano_linear_scale.pdf -- See https://www.dropbox.com/s/fflpnan802m2906/hc_nano_log_scale.pdf -- Misc. changes throughout the codebase so we Use the ActiveRegionShardBalancer where appropriate. -- Added unit tests for ActiveRegionShardBalancer to confirm it does the merging as expected. -- Fix bad toString in FilePointer
This commit is contained in:
parent
b4f482a421
commit
39e4396de0
|
|
@ -570,9 +570,9 @@ public class GenomeAnalysisEngine {
|
|||
if (readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.coordinate)
|
||||
throw new UserException.MissortedBAM(SAMFileHeader.SortOrder.coordinate, "Active region walkers can only traverse coordinate-sorted data. Please resort your input BAM file(s) or set the Sort Order tag in the header appropriately.");
|
||||
if(intervals == null)
|
||||
return readsDataSource.createShardIteratorOverMappedReads(new LocusShardBalancer());
|
||||
return readsDataSource.createShardIteratorOverMappedReads(new ActiveRegionShardBalancer());
|
||||
else
|
||||
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new LocusShardBalancer());
|
||||
return readsDataSource.createShardIteratorOverIntervals(((ActiveRegionWalker)walker).extendIntervals(intervals, this.genomeLocParser, this.getReferenceDataSource().getReference()), new ActiveRegionShardBalancer());
|
||||
}
|
||||
else if(walker instanceof ReadWalker || walker instanceof ReadPairWalker || walker instanceof DuplicateWalker) {
|
||||
// Apply special validation to read pair walkers.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright (c) 2012 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ActiveRegionShardBalancer
|
||||
*
|
||||
* Merges all of the file pointer information for a single contig index into a single
|
||||
* combined shard. The purpose of doing this is to ensure that the HaplotypeCaller, which
|
||||
* doesn't support TreeReduction by construction, gets all of the data on a single
|
||||
* contig together so the the NanoSchedule runs efficiently
|
||||
*/
|
||||
public class ActiveRegionShardBalancer extends ShardBalancer {
|
||||
/**
|
||||
* Convert iterators of file pointers into balanced iterators of shards.
|
||||
* @return An iterator over balanced shards.
|
||||
*/
|
||||
public Iterator<Shard> iterator() {
|
||||
return new Iterator<Shard>() {
|
||||
public boolean hasNext() {
|
||||
return filePointers.hasNext();
|
||||
}
|
||||
|
||||
public Shard next() {
|
||||
FilePointer current = getCombinedFilePointersOnSingleContig();
|
||||
|
||||
// FilePointers have already been combined as necessary at the IntervalSharder level. No
|
||||
// need to do so again here.
|
||||
|
||||
return new LocusShard(parser,readsDataSource,current.getLocations(),current.fileSpans);
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Unable to remove from shard balancing iterator");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Combine all of the file pointers in the filePointers iterator into a single combined
|
||||
* FilePointer that spans all of the file pointers on a single contig
|
||||
* @return a non-null FilePointer
|
||||
*/
|
||||
private FilePointer getCombinedFilePointersOnSingleContig() {
|
||||
FilePointer current = filePointers.next();
|
||||
|
||||
final List<FilePointer> toCombine = new LinkedList<>();
|
||||
toCombine.add(current);
|
||||
|
||||
while ( filePointers.hasNext() &&
|
||||
current.isRegionUnmapped == filePointers.peek().isRegionUnmapped &&
|
||||
(current.getContigIndex() == filePointers.peek().getContigIndex() || current.isRegionUnmapped) ) {
|
||||
toCombine.add(filePointers.next());
|
||||
}
|
||||
|
||||
return FilePointer.union(toCombine, parser);
|
||||
}
|
||||
}
|
||||
|
|
@ -407,10 +407,10 @@ public class FilePointer {
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("FilePointer:%n");
|
||||
builder.append("FilePointer:\n");
|
||||
builder.append("\tlocations = {");
|
||||
builder.append(Utils.join(";",locations));
|
||||
builder.append("}%n\tregions = %n");
|
||||
builder.append("}\n\tregions = \n");
|
||||
for(Map.Entry<SAMReaderID,SAMFileSpan> entry: fileSpans.entrySet()) {
|
||||
builder.append(entry.getKey());
|
||||
builder.append("= {");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Copyright (c) 2012 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import net.sf.samtools.SAMFileHeader;
|
||||
import net.sf.samtools.SAMFileSpan;
|
||||
import net.sf.samtools.SAMSequenceRecord;
|
||||
import org.broadinstitute.sting.BaseTest;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.*;
|
||||
|
||||
public class ActiveRegionShardBalancerUnitTest extends BaseTest {
|
||||
// example genome loc parser for this test, can be deleted if you don't use the reference
|
||||
private GenomeLocParser genomeLocParser;
|
||||
protected SAMDataSource readsDataSource;
|
||||
|
||||
@BeforeClass
|
||||
public void setup() throws FileNotFoundException {
|
||||
// sequence
|
||||
final SAMFileHeader header = ArtificialSAMUtils.createArtificialSamHeader(10, 0, 10000);
|
||||
genomeLocParser = new GenomeLocParser(header.getSequenceDictionary());
|
||||
readsDataSource = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergingManyContigs() {
|
||||
executeTest(genomeLocParser.getContigs().getSequences());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergingAllPointersOnSingleContig() {
|
||||
executeTest(Arrays.asList(genomeLocParser.getContigs().getSequences().get(1)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergingMultipleDiscontinuousContigs() {
|
||||
final List<SAMSequenceRecord> all = genomeLocParser.getContigs().getSequences();
|
||||
executeTest(Arrays.asList(all.get(1), all.get(3)));
|
||||
}
|
||||
|
||||
private void executeTest(final Collection<SAMSequenceRecord> records) {
|
||||
final ActiveRegionShardBalancer balancer = new ActiveRegionShardBalancer();
|
||||
|
||||
final List<Set<GenomeLoc>> expectedLocs = new LinkedList<>();
|
||||
final List<FilePointer> pointers = new LinkedList<>();
|
||||
|
||||
for ( final SAMSequenceRecord record : records ) {
|
||||
final int size = 10;
|
||||
int end = 0;
|
||||
for ( int i = 0; i < record.getSequenceLength(); i += size) {
|
||||
final int myEnd = i + size - 1;
|
||||
end = myEnd;
|
||||
final GenomeLoc loc = genomeLocParser.createGenomeLoc(record.getSequenceName(), i, myEnd);
|
||||
final Map<SAMReaderID, SAMFileSpan> fileSpans = Collections.emptyMap();
|
||||
final FilePointer fp = new FilePointer(fileSpans, Collections.singletonList(loc));
|
||||
pointers.add(fp);
|
||||
}
|
||||
expectedLocs.add(Collections.singleton(genomeLocParser.createGenomeLoc(record.getSequenceName(), 0, end)));
|
||||
}
|
||||
|
||||
balancer.initialize(readsDataSource, pointers.iterator(), genomeLocParser);
|
||||
|
||||
int i = 0;
|
||||
int nShardsFound = 0;
|
||||
for ( final Shard shard : balancer ) {
|
||||
nShardsFound++;
|
||||
Assert.assertEquals(new HashSet<>(shard.getGenomeLocs()), expectedLocs.get(i++));
|
||||
}
|
||||
Assert.assertEquals(nShardsFound, records.size(), "Didn't find exactly one shard for each contig in the sequence dictionary");
|
||||
}
|
||||
}
|
||||
|
|
@ -490,7 +490,7 @@ public class TraverseActiveRegionsUnitTest extends BaseTest {
|
|||
|
||||
traverseActiveRegions.initialize(engine, walker);
|
||||
List<LocusShardDataProvider> providers = new ArrayList<LocusShardDataProvider>();
|
||||
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new LocusShardBalancer())) {
|
||||
for (Shard shard : dataSource.createShardIteratorOverIntervals(new GenomeLocSortedSet(genomeLocParser, intervals), new ActiveRegionShardBalancer())) {
|
||||
for (WindowMaker.WindowMakerIterator window : new WindowMaker(shard, genomeLocParser, dataSource.seek(shard), shard.getGenomeLocs(), samples)) {
|
||||
providers.add(new LocusShardDataProvider(shard, shard.getReadProperties(), genomeLocParser, window.getLocus(), window, reference, new ArrayList<ReferenceOrderedDataSource>()));
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue