Merge branch 'master' of ssh://gsa2.broadinstitute.org/humgen/gsa-scr1/gsa-engineering/git/unstable
This commit is contained in:
commit
fb9457d6fe
|
|
@ -7,7 +7,7 @@ import java.util.EnumMap;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* An object to keep track of the number of occurences of each base and it's quality.
|
||||
* An object to keep track of the number of occurrences of each base and it's quality.
|
||||
*
|
||||
* User: depristo
|
||||
* Date: 4/8/11
|
||||
|
|
@ -83,8 +83,6 @@ import java.util.Map;
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Ensures("result >= 0")
|
||||
public int getCount(byte base) {
|
||||
return getCount(BaseIndex.byteToBase(base));
|
||||
|
|
@ -183,7 +181,7 @@ import java.util.Map;
|
|||
public BaseIndex baseIndexWithMostCounts() {
|
||||
BaseIndex maxI = MAX_BASE_INDEX_WITH_NO_COUNTS;
|
||||
for (BaseIndex i : counts.keySet())
|
||||
if (counts.get(i) > counts.get(maxI))
|
||||
if (hasHigherCount(i, maxI))
|
||||
maxI = i;
|
||||
return maxI;
|
||||
}
|
||||
|
|
@ -192,17 +190,23 @@ import java.util.Map;
|
|||
public BaseIndex baseIndexWithMostCountsWithoutIndels() {
|
||||
BaseIndex mostCounts = MAX_BASE_INDEX_WITH_NO_COUNTS;
|
||||
for (BaseIndex index : counts.keySet())
|
||||
if (index.isNucleotide() && counts.get(index) > counts.get(mostCounts))
|
||||
if (index.isNucleotide() && hasHigherCount(index, mostCounts))
|
||||
mostCounts = index;
|
||||
return mostCounts;
|
||||
}
|
||||
|
||||
private boolean hasHigherCount(final BaseIndex targetIndex, final BaseIndex testIndex) {
|
||||
final int targetCount = counts.get(targetIndex);
|
||||
final int testCount = counts.get(testIndex);
|
||||
return ( targetCount > testCount || (targetCount == testCount && sumQuals.get(targetIndex) > sumQuals.get(testIndex)) );
|
||||
}
|
||||
|
||||
@Ensures("result >=0")
|
||||
public int totalCountWithoutIndels() {
|
||||
int sum = 0;
|
||||
for (BaseIndex index : counts.keySet())
|
||||
if (index.isNucleotide())
|
||||
sum += counts.get(index);
|
||||
for (Map.Entry<BaseIndex, Integer> entry : counts.entrySet())
|
||||
if (entry.getKey().isNucleotide())
|
||||
sum += entry.getValue();
|
||||
return sum;
|
||||
}
|
||||
|
||||
|
|
@ -222,6 +226,6 @@ import java.util.Map;
|
|||
}
|
||||
|
||||
public Object[] countsArray() {
|
||||
return (Object []) counts.values().toArray();
|
||||
return counts.values().toArray();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -263,7 +263,7 @@ public class ReduceReads extends ReadWalker<LinkedList<GATKSAMRecord>, ReduceRea
|
|||
if (debugLevel == 1)
|
||||
System.out.printf("\nOriginal: %s %s %d %d\n", read, read.getCigar(), read.getAlignmentStart(), read.getAlignmentEnd());
|
||||
|
||||
// we write the actual alignment starts to their respectiv alignment shift tags in the temporary
|
||||
// we write the actual alignment starts to their respective alignment shift tags in the temporary
|
||||
// attribute hash so we can determine later if we need to write down the alignment shift to the reduced BAM file
|
||||
read.setTemporaryAttribute(GATKSAMRecord.REDUCED_READ_ORIGINAL_ALIGNMENT_START_SHIFT, read.getAlignmentStart());
|
||||
read.setTemporaryAttribute(GATKSAMRecord.REDUCED_READ_ORIGINAL_ALIGNMENT_END_SHIFT, read.getAlignmentEnd());
|
||||
|
|
|
|||
|
|
@ -546,7 +546,7 @@ public class SlidingWindow {
|
|||
List<GATKSAMRecord> allReads = compressVariantRegion(start, stop);
|
||||
|
||||
List<GATKSAMRecord> result = (downsampleCoverage > 0) ? downsampleVariantRegion(allReads) : allReads;
|
||||
result.addAll(addToSyntheticReads(windowHeader, 0, start, false));
|
||||
result.addAll(addToSyntheticReads(windowHeader, 0, stop, false));
|
||||
result.addAll(finalizeAndAdd(ConsensusType.BOTH));
|
||||
|
||||
return result; // finalized reads will be downsampled if necessary
|
||||
|
|
@ -612,7 +612,7 @@ public class SlidingWindow {
|
|||
finalizedReads = closeVariantRegions(regions, true);
|
||||
|
||||
if (!windowHeader.isEmpty()) {
|
||||
finalizedReads.addAll(addToSyntheticReads(windowHeader, 0, windowHeader.size() - 1, false));
|
||||
finalizedReads.addAll(addToSyntheticReads(windowHeader, 0, windowHeader.size(), false));
|
||||
finalizedReads.addAll(finalizeAndAdd(ConsensusType.BOTH)); // if it ended in running consensus, finish it up
|
||||
}
|
||||
|
||||
|
|
@ -674,7 +674,7 @@ public class SlidingWindow {
|
|||
// check if the read is either before or inside the variant region
|
||||
if (read.getSoftStart() <= refStop) {
|
||||
// check if the read is inside the variant region
|
||||
if (read.getMappingQuality() > MIN_MAPPING_QUALITY && read.getSoftEnd() >= refStart) {
|
||||
if (read.getMappingQuality() >= MIN_MAPPING_QUALITY && read.getSoftEnd() >= refStart) {
|
||||
// check if the read contains the het site
|
||||
if (read.getSoftStart() <= hetRefPosition && read.getSoftEnd() >= hetRefPosition) {
|
||||
int readPos = ReadUtils.getReadCoordinateForReferenceCoordinate(read, hetRefPosition, ReadUtils.ClippingTail.LEFT_TAIL);
|
||||
|
|
@ -682,7 +682,7 @@ public class SlidingWindow {
|
|||
byte qual = read.getBaseQualities(EventType.BASE_SUBSTITUTION)[readPos];
|
||||
|
||||
// check if base passes the filters!
|
||||
if (qual > MIN_BASE_QUAL_TO_COUNT) {
|
||||
if (qual >= MIN_BASE_QUAL_TO_COUNT) {
|
||||
// check which haplotype this read represents and take the index of it from the list of headers
|
||||
if (haplotypeHeaderMap.containsKey(base)) {
|
||||
haplotype = haplotypeHeaderMap.get(base);
|
||||
|
|
@ -696,10 +696,14 @@ public class SlidingWindow {
|
|||
currentHaplotype++;
|
||||
}
|
||||
LinkedList<HeaderElement> header = read.getReadNegativeStrandFlag() ? headersNegStrand.get(haplotype) : headersPosStrand.get(haplotype);
|
||||
// add to the polyploid header
|
||||
addToHeader(header, read);
|
||||
// remove from the standard header so that we don't double count it
|
||||
removeFromHeader(windowHeader, read);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we remove all reads before and inside the variant region from the window
|
||||
toRemove.add(read);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -281,7 +281,9 @@ public class GeneralPloidyExactAFCalculationModel extends AlleleFrequencyCalcula
|
|||
if (!Double.isInfinite(log10LofK))
|
||||
newPool.add(set);
|
||||
|
||||
if ( log10LofK < maxLikelihoodSeen.maxLog10L - MAX_LOG10_ERROR_TO_STOP_EARLY && maxLikelihoodSeen.isLowerAC(set.ACcounts) ) {
|
||||
// TODO -- uncomment this correct line when the implementation of this model is optimized (it's too slow now to handle this fix)
|
||||
//if ( log10LofK < maxLikelihoodSeen.maxLog10L - MAX_LOG10_ERROR_TO_STOP_EARLY && maxLikelihoodSeen.isLowerAC(set.ACcounts) ) {
|
||||
if ( log10LofK < maxLikelihoodSeen.maxLog10L - MAX_LOG10_ERROR_TO_STOP_EARLY ) {
|
||||
if ( VERBOSE )
|
||||
System.out.printf(" *** breaking early set=%s log10L=%.2f maxLog10L=%.2f%n", set.ACcounts, log10LofK, maxLikelihoodSeen.maxLog10L);
|
||||
return log10LofK;
|
||||
|
|
|
|||
|
|
@ -21,33 +21,33 @@ public class ReduceReadsIntegrationTest extends WalkerTest {
|
|||
executeTest(testName, spec);
|
||||
}
|
||||
|
||||
@Test(enabled = true)
|
||||
@Test(enabled = false)
|
||||
public void testDefaultCompression() {
|
||||
RRTest("testDefaultCompression ", L, "323dd4deabd7767efa0f2c6e7fa4189f");
|
||||
}
|
||||
|
||||
@Test(enabled = true)
|
||||
@Test(enabled = false)
|
||||
public void testMultipleIntervals() {
|
||||
String intervals = "-L 20:10,100,000-10,100,500 -L 20:10,200,000-10,200,500 -L 20:10,300,000-10,300,500 -L 20:10,400,000-10,500,000 -L 20:10,500,050-10,500,060 -L 20:10,600,000-10,600,015 -L 20:10,700,000-10,700,110";
|
||||
RRTest("testMultipleIntervals ", intervals, "c437fb160547ff271f8eba30e5f3ff76");
|
||||
}
|
||||
|
||||
@Test(enabled = true)
|
||||
@Test(enabled = false)
|
||||
public void testHighCompression() {
|
||||
RRTest("testHighCompression ", " -cs 10 -minvar 0.3 -mindel 0.3 " + L, "3a607bc3ebaf84e9dc44e005c5f8a047");
|
||||
}
|
||||
|
||||
@Test(enabled = true)
|
||||
@Test(enabled = false)
|
||||
public void testLowCompression() {
|
||||
RRTest("testLowCompression ", " -cs 30 -minvar 0.01 -mindel 0.01 -minmap 5 -minqual 5 " + L, "7c9b4a70c2c90b0a995800aa42852e63");
|
||||
}
|
||||
|
||||
@Test(enabled = true)
|
||||
@Test(enabled = false)
|
||||
public void testIndelCompression() {
|
||||
RRTest("testIndelCompression ", " -cs 50 -L 20:10,100,500-10,100,600 ", "f7b9fa44c10bc4b2247813d2b8dc1973");
|
||||
}
|
||||
|
||||
@Test(enabled = true)
|
||||
@Test(enabled = false)
|
||||
public void testFilteredDeletionCompression() {
|
||||
String base = String.format("-T ReduceReads -npt -R %s -I %s ", REF, DELETION_BAM) + " -o %s ";
|
||||
executeTest("testFilteredDeletionCompression", new WalkerTestSpec(base, Arrays.asList("891bd6dcda66611f343e8ff25f34aaeb")));
|
||||
|
|
@ -61,7 +61,7 @@ public class ReduceReadsIntegrationTest extends WalkerTest {
|
|||
*
|
||||
* This bam is simplified to replicate the exact bug with the three provided intervals.
|
||||
*/
|
||||
@Test(enabled = true)
|
||||
@Test(enabled = false)
|
||||
public void testAddingReadAfterTailingTheStash() {
|
||||
String base = String.format("-T ReduceReads %s -npt -R %s -I %s", STASH_L, REF, STASH_BAM) + " -o %s ";
|
||||
executeTest("testAddingReadAfterTailingTheStash", new WalkerTestSpec(base, Arrays.asList("886b43e1f26ff18425814dc7563931c6")));
|
||||
|
|
@ -71,7 +71,7 @@ public class ReduceReadsIntegrationTest extends WalkerTest {
|
|||
* Divide by zero bug reported by GdA and users in the forum. Happens when the downsampler goes over a region where all reads get
|
||||
* filtered out.
|
||||
*/
|
||||
@Test(enabled = true)
|
||||
@Test(enabled = false)
|
||||
public void testDivideByZero() {
|
||||
String base = String.format("-T ReduceReads %s -npt -R %s -I %s", DIVIDEBYZERO_L, REF, DIVIDEBYZERO_BAM) + " -o %s ";
|
||||
executeTest("testDivideByZero", new WalkerTestSpec(base, Arrays.asList("93ffdc209d4cc0fc4f0169ca9be55cc2")));
|
||||
|
|
|
|||
|
|
@ -125,6 +125,37 @@ public class GATKBAMFileSpan extends BAMFileSpan {
|
|||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a GATKChunk representing the "extent" of this file span, from the start of the first
|
||||
* chunk to the end of the last chunk.The chunks list must be sorted in order to use this method.
|
||||
*
|
||||
* @return a GATKChunk representing the extent of this file span, or a GATKChunk representing
|
||||
* a span of size 0 if there are no chunks
|
||||
*/
|
||||
public GATKChunk getExtent() {
|
||||
validateSorted(); // TODO: defensive measure: may be unnecessary
|
||||
|
||||
List<Chunk> chunks = getChunks();
|
||||
if ( chunks.isEmpty() ) {
|
||||
return new GATKChunk(0L, 0L);
|
||||
}
|
||||
|
||||
return new GATKChunk(chunks.get(0).getChunkStart(), chunks.get(chunks.size() - 1).getChunkEnd());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the list of chunks to ensure that they appear in sorted order.
|
||||
*/
|
||||
private void validateSorted() {
|
||||
List<Chunk> chunks = getChunks();
|
||||
for ( int i = 1; i < chunks.size(); i++ ) {
|
||||
if ( chunks.get(i).getChunkStart() < chunks.get(i-1).getChunkEnd() ) {
|
||||
throw new ReviewedStingException(String.format("Chunk list is unsorted; chunk %s is before chunk %s", chunks.get(i-1), chunks.get(i)));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the union of two FileSpans.
|
||||
* @param other FileSpan to union with this one.
|
||||
|
|
|
|||
|
|
@ -445,11 +445,6 @@ public class GenomeAnalysisEngine {
|
|||
GATKArgumentCollection argCollection = this.getArguments();
|
||||
boolean useExperimentalDownsampling = argCollection.enableExperimentalDownsampling;
|
||||
|
||||
// until the file pointer bug with the experimental downsamplers is fixed, disallow running with experimental downsampling
|
||||
if ( useExperimentalDownsampling ) {
|
||||
throw new UserException("The experimental downsampling implementation is currently crippled by a file-pointer-related bug. Until this bug is fixed, it's not safe (or possible) for anyone to use the experimental implementation!");
|
||||
}
|
||||
|
||||
DownsamplingMethod commandLineMethod = argCollection.getDownsamplingMethod();
|
||||
DownsamplingMethod walkerMethod = WalkerManager.getDownsamplingMethod(walker, useExperimentalDownsampling);
|
||||
DownsamplingMethod defaultMethod = DownsamplingMethod.getDefaultDownsamplingMethod(walker, useExperimentalDownsampling);
|
||||
|
|
@ -548,6 +543,7 @@ public class GenomeAnalysisEngine {
|
|||
*/
|
||||
protected Iterable<Shard> getShardStrategy(SAMDataSource readsDataSource, ReferenceSequenceFile drivingDataSource, GenomeLocSortedSet intervals) {
|
||||
ValidationExclusion exclusions = (readsDataSource != null ? readsDataSource.getReadsInfo().getValidationExclusionList() : null);
|
||||
DownsamplingMethod downsamplingMethod = readsDataSource != null ? readsDataSource.getReadsInfo().getDownsamplingMethod() : null;
|
||||
ReferenceDataSource referenceDataSource = this.getReferenceDataSource();
|
||||
|
||||
// If reads are present, assume that accessing the reads is always the dominant factor and shard based on that supposition.
|
||||
|
|
@ -582,10 +578,15 @@ public class GenomeAnalysisEngine {
|
|||
throw new UserException.CommandLineException("Pairs traversal cannot be used in conjunction with intervals.");
|
||||
}
|
||||
|
||||
// Use the experimental ReadShardBalancer if experimental downsampling is enabled
|
||||
ShardBalancer readShardBalancer = downsamplingMethod != null && downsamplingMethod.useExperimentalDownsampling ?
|
||||
new ExperimentalReadShardBalancer() :
|
||||
new ReadShardBalancer();
|
||||
|
||||
if(intervals == null)
|
||||
return readsDataSource.createShardIteratorOverAllReads(new ReadShardBalancer());
|
||||
return readsDataSource.createShardIteratorOverAllReads(readShardBalancer);
|
||||
else
|
||||
return readsDataSource.createShardIteratorOverIntervals(intervals,new ReadShardBalancer());
|
||||
return readsDataSource.createShardIteratorOverIntervals(intervals, readShardBalancer);
|
||||
}
|
||||
else
|
||||
throw new ReviewedStingException("Unable to determine walker type for walker " + walker.getClass().getName());
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import java.util.List;
|
|||
public class ReadProperties {
|
||||
private final Collection<SAMReaderID> readers;
|
||||
private final SAMFileHeader header;
|
||||
private final SAMFileHeader.SortOrder sortOrder;
|
||||
private final SAMFileReader.ValidationStringency validationStringency;
|
||||
private final DownsamplingMethod downsamplingMethod;
|
||||
private final ValidationExclusion exclusionList;
|
||||
|
|
@ -64,6 +65,14 @@ public class ReadProperties {
|
|||
return header;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the sort order of the reads
|
||||
* @return the sort order of the reads
|
||||
*/
|
||||
public SAMFileHeader.SortOrder getSortOrder() {
|
||||
return sortOrder;
|
||||
}
|
||||
|
||||
/**
|
||||
* How strict should validation be?
|
||||
* @return Stringency of validation.
|
||||
|
|
@ -130,6 +139,7 @@ public class ReadProperties {
|
|||
*/
|
||||
public ReadProperties( Collection<SAMReaderID> samFiles,
|
||||
SAMFileHeader header,
|
||||
SAMFileHeader.SortOrder sortOrder,
|
||||
boolean useOriginalBaseQualities,
|
||||
SAMFileReader.ValidationStringency strictness,
|
||||
DownsamplingMethod downsamplingMethod,
|
||||
|
|
@ -140,6 +150,7 @@ public class ReadProperties {
|
|||
byte defaultBaseQualities) {
|
||||
this.readers = samFiles;
|
||||
this.header = header;
|
||||
this.sortOrder = sortOrder;
|
||||
this.validationStringency = strictness;
|
||||
this.downsamplingMethod = downsamplingMethod == null ? DownsamplingMethod.NONE : downsamplingMethod;
|
||||
this.exclusionList = exclusionList == null ? new ValidationExclusion() : exclusionList;
|
||||
|
|
|
|||
|
|
@ -124,7 +124,18 @@ public class BAMScheduler implements Iterator<FilePointer> {
|
|||
*/
|
||||
private FilePointer generatePointerOverEntireFileset() {
|
||||
FilePointer filePointer = new FilePointer();
|
||||
Map<SAMReaderID,GATKBAMFileSpan> currentPosition = dataSource.getCurrentPosition();
|
||||
Map<SAMReaderID,GATKBAMFileSpan> currentPosition;
|
||||
|
||||
// Only use the deprecated SAMDataSource.getCurrentPosition() if we're not using experimental downsampling
|
||||
// TODO: clean this up once the experimental downsampling engine fork collapses
|
||||
if ( dataSource.getReadsInfo().getDownsamplingMethod() != null && dataSource.getReadsInfo().getDownsamplingMethod().useExperimentalDownsampling ) {
|
||||
currentPosition = dataSource.getInitialReaderPositions();
|
||||
}
|
||||
else {
|
||||
currentPosition = dataSource.getCurrentPosition();
|
||||
|
||||
}
|
||||
|
||||
for(SAMReaderID reader: dataSource.getReaderIDs())
|
||||
filePointer.addFileSpans(reader,createSpanToEndOfFile(currentPosition.get(reader).getGATKChunks().get(0).getChunkStart()));
|
||||
return filePointer;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,201 @@
|
|||
/*
|
||||
* Copyright (c) 2012, The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import net.sf.picard.util.PeekableIterator;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Convert from an unbalanced iterator over FilePointers to a balanced iterator over Shards.
|
||||
*
|
||||
* When processing FilePointers, our strategy is to aggregate all FilePointers for each contig
|
||||
* together into one monolithic FilePointer, create one persistent set of read iterators over
|
||||
* that monolithic FilePointer, and repeatedly use that persistent set of read iterators to
|
||||
* fill read shards with reads.
|
||||
*
|
||||
* This strategy has several important advantages:
|
||||
*
|
||||
* 1. We avoid issues with file span overlap. FilePointers that are more granular than a whole
|
||||
* contig will have regions that overlap with other FilePointers on the same contig, due
|
||||
* to the limited granularity of BAM index data. By creating only one FilePointer per contig,
|
||||
* we avoid having to track how much of each file region we've visited (as we did in the
|
||||
* former implementation), we avoid expensive non-sequential access patterns in the files,
|
||||
* and we avoid having to repeatedly re-create our iterator chain for every small region
|
||||
* of interest.
|
||||
*
|
||||
* 2. We avoid boundary issues with the engine-level downsampling. Since we create a single
|
||||
* persistent set of read iterators (which include the downsampling iterator(s)) per contig,
|
||||
* the downsampling process is never interrupted by FilePointer or Shard boundaries, and never
|
||||
* loses crucial state information while downsampling within a contig.
|
||||
*
|
||||
* TODO: There is also at least one important disadvantage:
|
||||
*
|
||||
* 1. We load more BAM index data into memory at once, and this work is done upfront before processing
|
||||
* the next contig, creating a delay before traversal of each contig. This delay may be
|
||||
* compensated for by the gains listed in #1 above, and we may be no worse off overall in
|
||||
* terms of total runtime, but we need to verify this empirically.
|
||||
*
|
||||
* @author David Roazen
|
||||
*/
|
||||
public class ExperimentalReadShardBalancer extends ShardBalancer {
|
||||
|
||||
private static Logger logger = Logger.getLogger(ExperimentalReadShardBalancer.class);
|
||||
|
||||
/**
|
||||
* Convert iterators of file pointers into balanced iterators of shards.
|
||||
* @return An iterator over balanced shards.
|
||||
*/
|
||||
public Iterator<Shard> iterator() {
|
||||
return new Iterator<Shard>() {
|
||||
/**
|
||||
* The cached shard to be returned next. Prefetched in the peekable iterator style.
|
||||
*/
|
||||
private Shard nextShard = null;
|
||||
|
||||
/**
|
||||
* The file pointer currently being processed.
|
||||
*/
|
||||
private FilePointer currentContigFilePointer = null;
|
||||
|
||||
/**
|
||||
* Iterator over the reads from the current contig's file pointer. The same iterator will be
|
||||
* used to fill all shards associated with a given file pointer
|
||||
*/
|
||||
private PeekableIterator<SAMRecord> currentContigReadsIterator = null;
|
||||
|
||||
{
|
||||
createNextContigFilePointer();
|
||||
advance();
|
||||
}
|
||||
|
||||
public boolean hasNext() {
|
||||
return nextShard != null;
|
||||
}
|
||||
|
||||
public Shard next() {
|
||||
if ( ! hasNext() )
|
||||
throw new NoSuchElementException("No next read shard available");
|
||||
Shard currentShard = nextShard;
|
||||
advance();
|
||||
return currentShard;
|
||||
}
|
||||
|
||||
private void advance() {
|
||||
nextShard = null;
|
||||
|
||||
// May need multiple iterations to fill the next shard if all reads in current file spans get filtered/downsampled away
|
||||
while ( nextShard == null && currentContigFilePointer != null ) {
|
||||
|
||||
// If we've exhausted the current file pointer of reads, move to the next file pointer (if there is one):
|
||||
if ( currentContigReadsIterator != null && ! currentContigReadsIterator.hasNext() ) {
|
||||
|
||||
// Close the old, exhausted chain of iterators to release resources
|
||||
currentContigReadsIterator.close();
|
||||
|
||||
// Advance to the FilePointer for the next contig
|
||||
createNextContigFilePointer();
|
||||
|
||||
// We'll need to create a fresh iterator for this file pointer when we create the first
|
||||
// shard for it below.
|
||||
currentContigReadsIterator = null;
|
||||
}
|
||||
|
||||
// At this point our currentContigReadsIterator may be null or non-null depending on whether or not
|
||||
// this is our first shard for this file pointer.
|
||||
if ( currentContigFilePointer != null ) {
|
||||
Shard shard = new ReadShard(parser,readsDataSource, currentContigFilePointer.fileSpans, currentContigFilePointer.locations, currentContigFilePointer.isRegionUnmapped);
|
||||
|
||||
// Create a new reads iterator only when we've just advanced to the file pointer for the next
|
||||
// contig. It's essential that the iterators persist across all shards that share the same contig
|
||||
// to allow the downsampling to work properly.
|
||||
if ( currentContigReadsIterator == null ) {
|
||||
currentContigReadsIterator = new PeekableIterator<SAMRecord>(readsDataSource.getIterator(shard));
|
||||
}
|
||||
|
||||
if ( currentContigReadsIterator.hasNext() ) {
|
||||
shard.fill(currentContigReadsIterator);
|
||||
nextShard = shard;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate all FilePointers for the next contig together into one monolithic FilePointer
|
||||
* to avoid boundary issues with visiting the same file regions more than once (since more
|
||||
* granular FilePointers will have regions that overlap with other nearby FilePointers due
|
||||
* to the nature of BAM indices).
|
||||
*
|
||||
* By creating one persistent set of iterators per contig we also avoid boundary artifacts
|
||||
* in the engine-level downsampling.
|
||||
*
|
||||
* TODO: This FilePointer aggregation should ideally be done at the BAMSchedule level for
|
||||
* TODO: read traversals, as there's little point in the BAMSchedule emitting extremely
|
||||
* TODO: granular FilePointers if we're just going to union them. The BAMSchedule should
|
||||
* TODO: emit one FilePointer per contig for read traversals (but, crucially, NOT for
|
||||
* TODO: locus traversals).
|
||||
*/
|
||||
private void createNextContigFilePointer() {
|
||||
currentContigFilePointer = null;
|
||||
List<FilePointer> nextContigFilePointers = new ArrayList<FilePointer>();
|
||||
|
||||
logger.info("Loading BAM index data for next contig");
|
||||
|
||||
while ( filePointers.hasNext() ) {
|
||||
// If this is the first FP we've seen, or we're dealing with mapped regions and the next FP is on the
|
||||
// same contig as previous FPs, or all our FPs are unmapped, add the next FP to the list of FPs to merge
|
||||
if ( nextContigFilePointers.isEmpty() ||
|
||||
(! nextContigFilePointers.get(0).isRegionUnmapped && ! filePointers.peek().isRegionUnmapped &&
|
||||
nextContigFilePointers.get(0).getContigIndex() == filePointers.peek().getContigIndex()) ||
|
||||
(nextContigFilePointers.get(0).isRegionUnmapped && filePointers.peek().isRegionUnmapped) ) {
|
||||
|
||||
nextContigFilePointers.add(filePointers.next());
|
||||
}
|
||||
else {
|
||||
break; // next FilePointer is on a different contig or has different mapped/unmapped status,
|
||||
// save it for next time
|
||||
}
|
||||
}
|
||||
|
||||
if ( ! nextContigFilePointers.isEmpty() ) {
|
||||
currentContigFilePointer = FilePointer.union(nextContigFilePointers, parser);
|
||||
}
|
||||
|
||||
if ( currentContigFilePointer != null ) {
|
||||
logger.info("Done loading BAM index data for next contig");
|
||||
logger.debug(String.format("Next contig FilePointer: %s", currentContigFilePointer));
|
||||
}
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Unable to remove from shard balancing iterator");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -26,7 +26,9 @@ package org.broadinstitute.sting.gatk.datasources.reads;
|
|||
|
||||
import net.sf.picard.util.PeekableIterator;
|
||||
import net.sf.samtools.GATKBAMFileSpan;
|
||||
import net.sf.samtools.GATKChunk;
|
||||
import net.sf.samtools.SAMFileSpan;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.utils.Utils;
|
||||
|
|
@ -48,18 +50,59 @@ public class FilePointer {
|
|||
*/
|
||||
protected final boolean isRegionUnmapped;
|
||||
|
||||
public FilePointer(final GenomeLoc... locations) {
|
||||
this.locations.addAll(Arrays.asList(locations));
|
||||
public FilePointer( List<GenomeLoc> locations ) {
|
||||
this.locations.addAll(locations);
|
||||
this.isRegionUnmapped = checkUnmappedStatus();
|
||||
validateLocations();
|
||||
}
|
||||
|
||||
public FilePointer( final GenomeLoc... locations ) {
|
||||
this(Arrays.asList(locations));
|
||||
}
|
||||
|
||||
public FilePointer( Map<SAMReaderID,SAMFileSpan> fileSpans, List<GenomeLoc> locations ) {
|
||||
this(locations);
|
||||
this.fileSpans.putAll(fileSpans);
|
||||
}
|
||||
|
||||
private boolean checkUnmappedStatus() {
|
||||
boolean foundMapped = false, foundUnmapped = false;
|
||||
for(GenomeLoc location: locations) {
|
||||
if(GenomeLoc.isUnmapped(location))
|
||||
|
||||
for( GenomeLoc location: locations ) {
|
||||
if ( GenomeLoc.isUnmapped(location) )
|
||||
foundUnmapped = true;
|
||||
else
|
||||
foundMapped = true;
|
||||
}
|
||||
if(foundMapped && foundUnmapped)
|
||||
if ( foundMapped && foundUnmapped )
|
||||
throw new ReviewedStingException("BUG: File pointers cannot be mixed mapped/unmapped.");
|
||||
this.isRegionUnmapped = foundUnmapped;
|
||||
|
||||
return foundUnmapped;
|
||||
}
|
||||
|
||||
private void validateLocations() {
|
||||
if ( isRegionUnmapped ) {
|
||||
return;
|
||||
}
|
||||
|
||||
Integer previousContigIndex = null;
|
||||
|
||||
for ( GenomeLoc location : locations ) {
|
||||
if ( previousContigIndex != null && previousContigIndex != location.getContigIndex() ) {
|
||||
throw new ReviewedStingException("File pointers must contain intervals from at most one contig");
|
||||
}
|
||||
|
||||
previousContigIndex = location.getContigIndex();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an immutable view of this FilePointer's file spans
|
||||
*
|
||||
* @return an immutable view of this FilePointer's file spans
|
||||
*/
|
||||
public Map<SAMReaderID, SAMFileSpan> getFileSpans() {
|
||||
return Collections.unmodifiableMap(fileSpans);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -70,6 +113,16 @@ public class FilePointer {
|
|||
return Collections.unmodifiableList(locations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the index of the contig into which this FilePointer points (a FilePointer can represent
|
||||
* regions in at most one contig).
|
||||
*
|
||||
* @return the index of the contig into which this FilePointer points
|
||||
*/
|
||||
public int getContigIndex() {
|
||||
return locations.size() > 0 ? locations.get(0).getContigIndex() : SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object other) {
|
||||
if(!(other instanceof FilePointer))
|
||||
|
|
@ -98,7 +151,15 @@ public class FilePointer {
|
|||
}
|
||||
|
||||
public void addLocation(final GenomeLoc location) {
|
||||
locations.add(location);
|
||||
this.locations.add(location);
|
||||
checkUnmappedStatus();
|
||||
validateLocations();
|
||||
}
|
||||
|
||||
public void addLocations( final List<GenomeLoc> locations ) {
|
||||
this.locations.addAll(locations);
|
||||
checkUnmappedStatus();
|
||||
validateLocations();
|
||||
}
|
||||
|
||||
public void addFileSpans(final SAMReaderID id, final SAMFileSpan fileSpan) {
|
||||
|
|
@ -216,6 +277,84 @@ public class FilePointer {
|
|||
combined.addFileSpans(initialElement.getKey(),fileSpan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Efficiently generate the union of the n FilePointers passed in. Much more efficient than
|
||||
* combining two FilePointers at a time using the combine() method above.
|
||||
*
|
||||
* IMPORTANT: the FilePointers to be unioned must either all represent regions on the
|
||||
* same contig, or all be unmapped, since we cannot create FilePointers with a mix of
|
||||
* contigs or with mixed mapped/unmapped regions.
|
||||
*
|
||||
* @param filePointers the FilePointers to union
|
||||
* @param parser our GenomeLocParser
|
||||
* @return the union of the FilePointers passed in
|
||||
*/
|
||||
public static FilePointer union( List<FilePointer> filePointers, GenomeLocParser parser ) {
|
||||
if ( filePointers == null || filePointers.isEmpty() ) {
|
||||
return new FilePointer();
|
||||
}
|
||||
|
||||
Map<SAMReaderID, List<GATKChunk>> fileChunks = new HashMap<SAMReaderID, List<GATKChunk>>();
|
||||
List<GenomeLoc> locations = new ArrayList<GenomeLoc>();
|
||||
|
||||
// First extract all intervals and file chunks from the FilePointers into unsorted, unmerged collections
|
||||
for ( FilePointer filePointer : filePointers ) {
|
||||
locations.addAll(filePointer.getLocations());
|
||||
|
||||
for ( Map.Entry<SAMReaderID, SAMFileSpan> fileSpanEntry : filePointer.getFileSpans().entrySet() ) {
|
||||
GATKBAMFileSpan fileSpan = (GATKBAMFileSpan)fileSpanEntry.getValue();
|
||||
|
||||
if ( fileChunks.containsKey(fileSpanEntry.getKey()) ) {
|
||||
fileChunks.get(fileSpanEntry.getKey()).addAll(fileSpan.getGATKChunks());
|
||||
}
|
||||
else {
|
||||
fileChunks.put(fileSpanEntry.getKey(), fileSpan.getGATKChunks());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now sort and merge the intervals
|
||||
List<GenomeLoc> sortedMergedLocations = new ArrayList<GenomeLoc>();
|
||||
sortedMergedLocations.addAll(IntervalUtils.sortAndMergeIntervals(parser, locations, IntervalMergingRule.ALL));
|
||||
|
||||
// For each BAM file, convert from an unsorted, unmerged list of chunks to a GATKBAMFileSpan containing
|
||||
// the sorted, merged union of the chunks for that file
|
||||
Map<SAMReaderID, SAMFileSpan> mergedFileSpans = new HashMap<SAMReaderID, SAMFileSpan>(fileChunks.size());
|
||||
for ( Map.Entry<SAMReaderID, List<GATKChunk>> fileChunksEntry : fileChunks.entrySet() ) {
|
||||
List<GATKChunk> unmergedChunks = fileChunksEntry.getValue();
|
||||
mergedFileSpans.put(fileChunksEntry.getKey(),
|
||||
(new GATKBAMFileSpan(unmergedChunks.toArray(new GATKChunk[unmergedChunks.size()]))).union(new GATKBAMFileSpan()));
|
||||
}
|
||||
|
||||
return new FilePointer(mergedFileSpans, sortedMergedLocations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if any of the file spans in this FilePointer overlap their counterparts in
|
||||
* the other FilePointer. "Overlap" is defined as having an overlapping extent (the region
|
||||
* from the start of the first chunk to the end of the last chunk).
|
||||
*
|
||||
* @param other the FilePointer against which to check overlap with this FilePointer
|
||||
* @return true if any file spans overlap their counterparts in other, otherwise false
|
||||
*/
|
||||
public boolean hasFileSpansOverlappingWith( FilePointer other ) {
|
||||
for ( Map.Entry<SAMReaderID, SAMFileSpan> thisFilePointerEntry : fileSpans.entrySet() ) {
|
||||
GATKBAMFileSpan thisFileSpan = new GATKBAMFileSpan(thisFilePointerEntry.getValue());
|
||||
|
||||
SAMFileSpan otherEntry = other.fileSpans.get(thisFilePointerEntry.getKey());
|
||||
if ( otherEntry == null ) {
|
||||
continue; // no counterpart for this file span in other
|
||||
}
|
||||
GATKBAMFileSpan otherFileSpan = new GATKBAMFileSpan(otherEntry);
|
||||
|
||||
if ( thisFileSpan.getExtent().overlaps(otherFileSpan.getExtent()) ) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
|
|
|
|||
|
|
@ -73,8 +73,15 @@ public class IntervalSharder implements Iterator<FilePointer> {
|
|||
*/
|
||||
public FilePointer next() {
|
||||
FilePointer current = wrappedIterator.next();
|
||||
while(wrappedIterator.hasNext() && current.isRegionUnmapped == wrappedIterator.peek().isRegionUnmapped && current.minus(wrappedIterator.peek()) == 0)
|
||||
|
||||
while ( wrappedIterator.hasNext() &&
|
||||
current.isRegionUnmapped == wrappedIterator.peek().isRegionUnmapped &&
|
||||
(current.getContigIndex() == wrappedIterator.peek().getContigIndex() || current.isRegionUnmapped) &&
|
||||
current.minus(wrappedIterator.peek()) == 0 ) {
|
||||
|
||||
current = current.combine(parser,wrappedIterator.next());
|
||||
}
|
||||
|
||||
return current;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,8 +42,10 @@ public class LocusShardBalancer extends ShardBalancer {
|
|||
|
||||
public Shard next() {
|
||||
FilePointer current = filePointers.next();
|
||||
while(filePointers.hasNext() && current.minus(filePointers.peek()) == 0)
|
||||
current = current.combine(parser,filePointers.next());
|
||||
|
||||
// FilePointers have already been combined as necessary at the IntervalSharder level. No
|
||||
// need to do so again here.
|
||||
|
||||
return new LocusShard(parser,readsDataSource,current.getLocations(),current.fileSpans);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,17 +1,15 @@
|
|||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import net.sf.samtools.SAMFileSpan;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import net.sf.picard.util.PeekableIterator;
|
||||
import net.sf.samtools.*;
|
||||
import net.sf.samtools.util.CloseableIterator;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIteratorAdapter;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -36,10 +34,21 @@ import java.util.Map;
|
|||
* @version 0.1
|
||||
*/
|
||||
public class ReadShard extends Shard {
|
||||
|
||||
/**
|
||||
* Default read shard buffer size
|
||||
*/
|
||||
public static final int DEFAULT_MAX_READS = 10000;
|
||||
|
||||
/**
|
||||
* What is the maximum number of reads per BAM file which should go into a read shard.
|
||||
*
|
||||
* TODO: this non-final static variable should either be made final or turned into an
|
||||
* TODO: instance variable somewhere -- as both static and mutable it wreaks havoc
|
||||
* TODO: with tests that use multiple instances of SAMDataSource (since SAMDataSource
|
||||
* TODO: changes this value)
|
||||
*/
|
||||
public static int MAX_READS = 10000;
|
||||
public static int MAX_READS = DEFAULT_MAX_READS;
|
||||
|
||||
/**
|
||||
* The reads making up this shard.
|
||||
|
|
@ -53,6 +62,9 @@ public class ReadShard extends Shard {
|
|||
/**
|
||||
* Sets the maximum number of reads buffered in a read shard. Implemented as a weirdly static interface
|
||||
* until we know what effect tuning this parameter has.
|
||||
*
|
||||
* TODO: this mutable static interface is awful and breaks tests -- need to refactor
|
||||
*
|
||||
* @param bufferSize New maximum number
|
||||
*/
|
||||
static void setReadBufferSize(final int bufferSize) {
|
||||
|
|
@ -103,6 +115,67 @@ public class ReadShard extends Shard {
|
|||
reads.add(read);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fills this shard's buffer with reads from the iterator passed in
|
||||
*
|
||||
* @param readIter Iterator from which to draw the reads to fill the shard
|
||||
*/
|
||||
@Override
|
||||
public void fill( PeekableIterator<SAMRecord> readIter ) {
|
||||
if( ! buffersReads() )
|
||||
throw new ReviewedStingException("Attempting to fill a non-buffering shard.");
|
||||
|
||||
SAMFileHeader.SortOrder sortOrder = getReadProperties().getSortOrder();
|
||||
SAMRecord read = null;
|
||||
|
||||
while( ! isBufferFull() && readIter.hasNext() ) {
|
||||
final SAMRecord nextRead = readIter.peek();
|
||||
if ( read == null || (nextRead.getReferenceIndex().equals(read.getReferenceIndex())) ) {
|
||||
// only add reads to the shard if they are on the same contig
|
||||
read = readIter.next();
|
||||
addRead(read);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If the reads are sorted in coordinate order, ensure that all reads
|
||||
// having the same alignment start become part of the same shard, to allow
|
||||
// downsampling to work better across shard boundaries. Note that because our
|
||||
// read stream has already been fed through the positional downsampler, which
|
||||
// ensures that at each alignment start position there are no more than dcov
|
||||
// reads, we're in no danger of accidentally creating a disproportionately huge
|
||||
// shard
|
||||
if ( sortOrder == SAMFileHeader.SortOrder.coordinate ) {
|
||||
while ( readIter.hasNext() ) {
|
||||
SAMRecord additionalRead = readIter.peek();
|
||||
|
||||
// Stop filling the shard as soon as we encounter a read having a different
|
||||
// alignment start or contig from the last read added in the earlier loop
|
||||
// above, or an unmapped read
|
||||
if ( read == null ||
|
||||
additionalRead.getReadUnmappedFlag() ||
|
||||
! additionalRead.getReferenceIndex().equals(read.getReferenceIndex()) ||
|
||||
additionalRead.getAlignmentStart() != read.getAlignmentStart() ) {
|
||||
break;
|
||||
}
|
||||
|
||||
addRead(readIter.next());
|
||||
}
|
||||
}
|
||||
|
||||
// If the reads are sorted in queryname order, ensure that all reads
|
||||
// having the same queryname become part of the same shard.
|
||||
if( sortOrder == SAMFileHeader.SortOrder.queryname ) {
|
||||
while( readIter.hasNext() ) {
|
||||
SAMRecord nextRead = readIter.peek();
|
||||
if( read == null || ! read.getReadName().equals(nextRead.getReadName()) )
|
||||
break;
|
||||
addRead(readIter.next());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an iterator over reads stored in this shard's read cache.
|
||||
* @return
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ import java.util.NoSuchElementException;
|
|||
|
||||
/**
|
||||
* Divide up large file pointers containing reads into more manageable subcomponents.
|
||||
*
|
||||
* TODO: delete this class once the experimental downsampling engine fork collapses
|
||||
*/
|
||||
public class ReadShardBalancer extends ShardBalancer {
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -99,6 +99,8 @@ public class SAMDataSource {
|
|||
|
||||
/**
|
||||
* How far along is each reader?
|
||||
*
|
||||
* TODO: delete this once the experimental downsampling engine fork collapses
|
||||
*/
|
||||
private final Map<SAMReaderID,GATKBAMFileSpan> readerPositions = new HashMap<SAMReaderID,GATKBAMFileSpan>();
|
||||
|
||||
|
|
@ -154,8 +156,6 @@ public class SAMDataSource {
|
|||
*/
|
||||
private final ThreadAllocation threadAllocation;
|
||||
|
||||
private final boolean expandShardsForDownsampling;
|
||||
|
||||
/**
|
||||
* Create a new SAM data source given the supplied read metadata.
|
||||
* @param samFiles list of reads files.
|
||||
|
|
@ -252,7 +252,7 @@ public class SAMDataSource {
|
|||
validationStringency = strictness;
|
||||
this.removeProgramRecords = removeProgramRecords;
|
||||
if(readBufferSize != null)
|
||||
ReadShard.setReadBufferSize(readBufferSize);
|
||||
ReadShard.setReadBufferSize(readBufferSize); // TODO: use of non-final static variable here is just awful, especially for parallel tests
|
||||
else {
|
||||
// Choose a sensible default for the read buffer size. For the moment, we're picking 1000 reads per BAM per shard (which effectively
|
||||
// will mean per-thread once ReadWalkers are parallelized) with a max cap of 250K reads in memory at once.
|
||||
|
|
@ -297,6 +297,7 @@ public class SAMDataSource {
|
|||
readProperties = new ReadProperties(
|
||||
samFiles,
|
||||
mergedHeader,
|
||||
sortOrder,
|
||||
useOriginalBaseQualities,
|
||||
strictness,
|
||||
downsamplingMethod,
|
||||
|
|
@ -306,11 +307,6 @@ public class SAMDataSource {
|
|||
includeReadsWithDeletionAtLoci,
|
||||
defaultBaseQualities);
|
||||
|
||||
expandShardsForDownsampling = readProperties.getDownsamplingMethod() != null &&
|
||||
readProperties.getDownsamplingMethod().useExperimentalDownsampling &&
|
||||
readProperties.getDownsamplingMethod().type != DownsampleType.NONE &&
|
||||
readProperties.getDownsamplingMethod().toCoverage != null;
|
||||
|
||||
// cache the read group id (original) -> read group id (merged)
|
||||
// and read group id (merged) -> read group id (original) mappings.
|
||||
for(SAMReaderID id: readerIDs) {
|
||||
|
|
@ -384,7 +380,10 @@ public class SAMDataSource {
|
|||
/**
|
||||
* Retrieves the current position within the BAM file.
|
||||
* @return A mapping of reader to current position.
|
||||
*
|
||||
* TODO: delete this once the experimental downsampling engine fork collapses
|
||||
*/
|
||||
@Deprecated
|
||||
public Map<SAMReaderID,GATKBAMFileSpan> getCurrentPosition() {
|
||||
return readerPositions;
|
||||
}
|
||||
|
|
@ -467,19 +466,15 @@ public class SAMDataSource {
|
|||
}
|
||||
|
||||
/**
|
||||
* Are we expanding shards as necessary to prevent shard boundaries from occurring at improper places?
|
||||
* Legacy method to fill the given buffering shard with reads.
|
||||
*
|
||||
* Shard.fill() is used instead of this method when experimental downsampling is enabled
|
||||
*
|
||||
* TODO: delete this method once the experimental downsampling engine fork collapses
|
||||
*
|
||||
* @return true if we are using expanded shards, otherwise false
|
||||
*/
|
||||
public boolean usingExpandedShards() {
|
||||
return expandShardsForDownsampling;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Fill the given buffering shard with reads.
|
||||
* @param shard Shard to fill.
|
||||
*/
|
||||
@Deprecated
|
||||
public void fillShard(Shard shard) {
|
||||
if(!shard.buffersReads())
|
||||
throw new ReviewedStingException("Attempting to fill a non-buffering shard.");
|
||||
|
|
@ -503,31 +498,6 @@ public class SAMDataSource {
|
|||
}
|
||||
}
|
||||
|
||||
// If the reads are sorted in coordinate order, ensure that all reads
|
||||
// having the same alignment start become part of the same shard, to allow
|
||||
// downsampling to work better across shard boundaries. Note that because our
|
||||
// read stream has already been fed through the positional downsampler, which
|
||||
// ensures that at each alignment start position there are no more than dcov
|
||||
// reads, we're in no danger of accidentally creating a disproportionately huge
|
||||
// shard
|
||||
if ( expandShardsForDownsampling && sortOrder == SAMFileHeader.SortOrder.coordinate ) {
|
||||
while ( iterator.hasNext() ) {
|
||||
SAMRecord additionalRead = iterator.next();
|
||||
|
||||
// Stop filling the shard as soon as we encounter a read having a different
|
||||
// alignment start or contig from the last read added in the earlier loop
|
||||
// above, or an unmapped read
|
||||
if ( read == null ||
|
||||
additionalRead.getReadUnmappedFlag() ||
|
||||
! additionalRead.getReferenceIndex().equals(read.getReferenceIndex()) ||
|
||||
additionalRead.getAlignmentStart() != read.getAlignmentStart() ) {
|
||||
break;
|
||||
}
|
||||
shard.addRead(additionalRead);
|
||||
noteFilePositionUpdate(positionUpdates, additionalRead);
|
||||
}
|
||||
}
|
||||
|
||||
// If the reads are sorted in queryname order, ensure that all reads
|
||||
// having the same queryname become part of the same shard.
|
||||
if(sortOrder == SAMFileHeader.SortOrder.queryname) {
|
||||
|
|
@ -547,6 +517,10 @@ public class SAMDataSource {
|
|||
readerPositions.put(readers.getReaderID(positionUpdate.getKey()),positionUpdate.getValue());
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: delete this method once the experimental downsampling engine fork collapses
|
||||
*/
|
||||
@Deprecated
|
||||
private void noteFilePositionUpdate(Map<SAMFileReader,GATKBAMFileSpan> positionMapping, SAMRecord read) {
|
||||
GATKBAMFileSpan endChunk = new GATKBAMFileSpan(read.getFileSource().getFilePointer().getContentsFollowing());
|
||||
positionMapping.put(read.getFileSource().getReader(),endChunk);
|
||||
|
|
@ -557,8 +531,7 @@ public class SAMDataSource {
|
|||
return shard.iterator();
|
||||
}
|
||||
else {
|
||||
SAMReaders readers = resourcePool.getAvailableReaders();
|
||||
return getIterator(readers,shard,shard instanceof ReadShard);
|
||||
return getIterator(shard);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -578,13 +551,44 @@ public class SAMDataSource {
|
|||
|
||||
/**
|
||||
* Initialize the current reader positions
|
||||
*
|
||||
* TODO: delete this once the experimental downsampling engine fork collapses
|
||||
*
|
||||
* @param readers
|
||||
*/
|
||||
@Deprecated
|
||||
private void initializeReaderPositions(SAMReaders readers) {
|
||||
for(SAMReaderID id: getReaderIDs())
|
||||
readerPositions.put(id,new GATKBAMFileSpan(readers.getReader(id).getFilePointerSpanningReads()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the initial reader positions across all BAM files
|
||||
*
|
||||
* @return the start positions of the first chunk of reads for all BAM files
|
||||
*/
|
||||
public Map<SAMReaderID, GATKBAMFileSpan> getInitialReaderPositions() {
|
||||
Map<SAMReaderID, GATKBAMFileSpan> initialPositions = new HashMap<SAMReaderID, GATKBAMFileSpan>();
|
||||
SAMReaders readers = resourcePool.getAvailableReaders();
|
||||
|
||||
for ( SAMReaderID id: getReaderIDs() ) {
|
||||
initialPositions.put(id, new GATKBAMFileSpan(readers.getReader(id).getFilePointerSpanningReads()));
|
||||
}
|
||||
|
||||
resourcePool.releaseReaders(readers);
|
||||
return initialPositions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an iterator over the data types specified in the shard.
|
||||
*
|
||||
* @param shard The shard specifying the data limits.
|
||||
* @return An iterator over the selected data.
|
||||
*/
|
||||
public StingSAMIterator getIterator( Shard shard ) {
|
||||
return getIterator(resourcePool.getAvailableReaders(), shard, shard instanceof ReadShard);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an iterator over the data types specified in the shard.
|
||||
* @param readers Readers from which to load data.
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import net.sf.picard.util.PeekableIterator;
|
||||
import net.sf.samtools.SAMFileSpan;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import org.broadinstitute.sting.gatk.ReadMetrics;
|
||||
|
|
@ -203,6 +204,12 @@ public abstract class Shard implements HasGenomeLocation {
|
|||
*/
|
||||
public void addRead(SAMRecord read) { throw new UnsupportedOperationException("This shard does not buffer reads."); }
|
||||
|
||||
/**
|
||||
* Fills the shard with reads. Can only do this with shards that buffer reads
|
||||
* @param readIter Iterator from which to draw the reads to fill the shard
|
||||
*/
|
||||
public void fill( PeekableIterator<SAMRecord> readIter ) { throw new UnsupportedOperationException("This shard does not buffer reads."); }
|
||||
|
||||
/**
|
||||
* Gets the iterator over the elements cached in the shard.
|
||||
* @return
|
||||
|
|
|
|||
|
|
@ -158,10 +158,10 @@ public class PerSampleDownsamplingReadsIterator implements StingSAMIterator {
|
|||
numPositionalChanges++;
|
||||
}
|
||||
|
||||
// If the number of times we've changed position exceeds a certain threshold, inform all
|
||||
// downsamplers of the current position in the read stream. This is to prevent downsamplers
|
||||
// for samples with sparser reads than others from getting stuck too long in a pending state.
|
||||
if ( numPositionalChanges > DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL ) {
|
||||
// Periodically inform all downsamplers of the current position in the read stream. This is
|
||||
// to prevent downsamplers for samples with sparser reads than others from getting stuck too
|
||||
// long in a pending state.
|
||||
if ( numPositionalChanges > 0 && numPositionalChanges % DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL == 0 ) {
|
||||
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
|
||||
perSampleDownsampler.signalNoMoreReadsBefore(read);
|
||||
updateEarliestPendingRead(perSampleDownsampler);
|
||||
|
|
|
|||
|
|
@ -7,13 +7,13 @@ import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource;
|
|||
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
|
||||
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
|
||||
import org.broadinstitute.sting.gatk.io.OutputTracker;
|
||||
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
|
||||
import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker;
|
||||
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
|
||||
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.utils.MultiThreadedErrorTracker;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.threading.EfficiencyMonitoringThreadFactory;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;
|
||||
|
||||
import java.util.Collection;
|
||||
|
|
@ -39,7 +39,7 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
/**
|
||||
* A thread local output tracker for managing output per-thread.
|
||||
*/
|
||||
private ThreadLocalOutputTracker outputTracker = new ThreadLocalOutputTracker();
|
||||
private ThreadGroupOutputTracker outputTracker = new ThreadGroupOutputTracker();
|
||||
|
||||
private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
|
||||
|
||||
|
|
@ -93,11 +93,23 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
|
||||
final int nThreadsToUse = threadAllocation.getNumDataThreads();
|
||||
if ( threadAllocation.monitorThreadEfficiency() ) {
|
||||
final EfficiencyMonitoringThreadFactory monitoringThreadFactory = new EfficiencyMonitoringThreadFactory(nThreadsToUse);
|
||||
setThreadEfficiencyMonitor(monitoringThreadFactory);
|
||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, monitoringThreadFactory);
|
||||
} else {
|
||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
|
||||
throw new UserException.BadArgumentValue("nt", "Cannot monitor thread efficiency with -nt, sorry");
|
||||
}
|
||||
|
||||
this.threadPool = Executors.newFixedThreadPool(nThreadsToUse, new UniqueThreadGroupThreadFactory());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates threads for HMS each with a unique thread group. Critical to
|
||||
* track outputs via the ThreadGroupOutputTracker.
|
||||
*/
|
||||
private static class UniqueThreadGroupThreadFactory implements ThreadFactory {
|
||||
int counter = 0;
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
final ThreadGroup group = new ThreadGroup("HMS-group-" + counter++);
|
||||
return new Thread(group, r);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -253,6 +265,9 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
protected void mergeExistingOutput( boolean wait ) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// logger.warn("MergingExistingOutput");
|
||||
// printOutputMergeTasks();
|
||||
|
||||
// Create a list of the merge tasks that will be performed in this run of the mergeExistingOutput().
|
||||
Queue<ShardTraverser> mergeTasksInSession = new LinkedList<ShardTraverser>();
|
||||
while( !outputMergeTasks.isEmpty() ) {
|
||||
|
|
@ -266,8 +281,12 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
mergeTasksInSession.add(traverser);
|
||||
}
|
||||
|
||||
// logger.warn("Selected things to merge:");
|
||||
// printOutputMergeTasks(mergeTasksInSession);
|
||||
|
||||
// Actually run through, merging the tasks in the working queue.
|
||||
for( ShardTraverser traverser: mergeTasksInSession ) {
|
||||
//logger.warn("*** Merging " + traverser.getIntervalsString());
|
||||
if( !traverser.isComplete() )
|
||||
traverser.waitForComplete();
|
||||
|
||||
|
|
@ -312,11 +331,24 @@ public class HierarchicalMicroScheduler extends MicroScheduler implements Hierar
|
|||
reduceTree.addEntry(traverseResult);
|
||||
outputMergeTasks.add(traverser);
|
||||
|
||||
// logger.warn("adding merge task");
|
||||
// printOutputMergeTasks();
|
||||
|
||||
// No more data? Let the reduce tree know so it can finish processing what it's got.
|
||||
if (!isShardTraversePending())
|
||||
reduceTree.complete();
|
||||
}
|
||||
|
||||
private synchronized void printOutputMergeTasks() {
|
||||
printOutputMergeTasks(outputMergeTasks);
|
||||
}
|
||||
|
||||
private synchronized void printOutputMergeTasks(final Queue<ShardTraverser> tasks) {
|
||||
logger.info("Output merge tasks " + tasks.size());
|
||||
for ( final ShardTraverser traverser : tasks )
|
||||
logger.info(String.format("\t%s: complete? %b", traverser.getIntervalsString(), traverser.isComplete()));
|
||||
}
|
||||
|
||||
/** Pulls the next reduce from the queue and runs it. */
|
||||
protected void queueNextTreeReduce( Walker walker ) {
|
||||
if (reduceTasks.size() == 0)
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ public class LinearMicroScheduler extends MicroScheduler {
|
|||
boolean done = walker.isDone();
|
||||
int counter = 0;
|
||||
|
||||
final TraversalEngine traversalEngine = borrowTraversalEngine();
|
||||
final TraversalEngine traversalEngine = borrowTraversalEngine(this);
|
||||
for (Shard shard : shardStrategy ) {
|
||||
if ( done || shard == null ) // we ran out of shards that aren't owned
|
||||
break;
|
||||
|
|
@ -97,7 +97,7 @@ public class LinearMicroScheduler extends MicroScheduler {
|
|||
Object result = accumulator.finishTraversal();
|
||||
|
||||
outputTracker.close();
|
||||
returnTraversalEngine(traversalEngine);
|
||||
returnTraversalEngine(this, traversalEngine);
|
||||
cleanup();
|
||||
executionIsDone();
|
||||
|
||||
|
|
|
|||
|
|
@ -51,10 +51,7 @@ import javax.management.MBeanServer;
|
|||
import javax.management.ObjectName;
|
||||
import java.io.File;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -94,6 +91,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
*/
|
||||
final LinkedList<TraversalEngine> availableTraversalEngines = new LinkedList<TraversalEngine>();
|
||||
|
||||
/**
|
||||
* Engines that have been allocated to a key already.
|
||||
*/
|
||||
final HashMap<Object, TraversalEngine> allocatedTraversalEngines = new HashMap<Object, TraversalEngine>();
|
||||
|
||||
/**
|
||||
* Counts the number of instances of the class that are currently alive.
|
||||
*/
|
||||
|
|
@ -145,6 +147,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
logger.warn(String.format("Number of requested GATK threads %d is more than the number of " +
|
||||
"available processors on this machine %d", threadAllocation.getTotalNumThreads(),
|
||||
Runtime.getRuntime().availableProcessors()));
|
||||
// if ( threadAllocation.getNumDataThreads() > 1 && threadAllocation.getNumCPUThreadsPerDataThread() > 1)
|
||||
// throw new UserException("The GATK currently doesn't support running with both -nt > 1 and -nct > 1");
|
||||
}
|
||||
|
||||
if ( threadAllocation.getNumDataThreads() > 1 ) {
|
||||
|
|
@ -315,10 +319,11 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
* pointers to the traversal engines
|
||||
*/
|
||||
public synchronized void shutdownTraversalEngines() {
|
||||
if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() )
|
||||
throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " +
|
||||
"have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size()
|
||||
+ " have been returned");
|
||||
// no longer applicable because engines are allocated to keys now
|
||||
// if ( availableTraversalEngines.size() != allCreatedTraversalEngines.size() )
|
||||
// throw new IllegalStateException("Shutting down TraversalEngineCreator but not all engines " +
|
||||
// "have been returned. Expected " + allCreatedTraversalEngines.size() + " but only " + availableTraversalEngines.size()
|
||||
// + " have been returned");
|
||||
|
||||
for ( final TraversalEngine te : allCreatedTraversalEngines)
|
||||
te.shutdown();
|
||||
|
|
@ -389,21 +394,37 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a traversal engine suitable for use in this thread.
|
||||
* Returns a traversal engine suitable for use, associated with key
|
||||
*
|
||||
* Pops the next available engine from the available ones maintained by this
|
||||
* Key is an arbitrary object that is used to retrieve the same traversal
|
||||
* engine over and over. This can be important in the case where the
|
||||
* traversal engine has data associated with it in some other context,
|
||||
* and we need to ensure that the context always sees the same traversal
|
||||
* engine. This happens in the HierarchicalMicroScheduler, where you want
|
||||
* the a thread executing traversals to retrieve the same engine each time,
|
||||
* as outputs are tracked w.r.t. that engine.
|
||||
*
|
||||
* If no engine is associated with key yet, pops the next available engine
|
||||
* from the available ones maintained by this
|
||||
* microscheduler. Note that it's a runtime error to pop a traversal engine
|
||||
* from this scheduler if there are none available. Callers that
|
||||
* once pop'd an engine for use must return it with returnTraversalEngine
|
||||
*
|
||||
* @param key the key to associate with this engine
|
||||
* @return a non-null TraversalEngine suitable for execution in this scheduler
|
||||
*/
|
||||
@Ensures("result != null")
|
||||
protected synchronized TraversalEngine borrowTraversalEngine() {
|
||||
if ( availableTraversalEngines.isEmpty() )
|
||||
throw new IllegalStateException("no traversal engines were available");
|
||||
else {
|
||||
return availableTraversalEngines.pop();
|
||||
protected synchronized TraversalEngine borrowTraversalEngine(final Object key) {
|
||||
if ( key == null ) throw new IllegalArgumentException("key cannot be null");
|
||||
|
||||
final TraversalEngine engine = allocatedTraversalEngines.get(key);
|
||||
if ( engine == null ) {
|
||||
if ( availableTraversalEngines.isEmpty() )
|
||||
throw new IllegalStateException("no traversal engines were available");
|
||||
allocatedTraversalEngines.put(key, availableTraversalEngines.pop());
|
||||
return allocatedTraversalEngines.get(key);
|
||||
} else {
|
||||
return engine;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -411,14 +432,18 @@ public abstract class MicroScheduler implements MicroSchedulerMBean {
|
|||
* Return a borrowed traversal engine to this MicroScheduler, for later use
|
||||
* in another traversal execution
|
||||
*
|
||||
* @param key the key used to id the engine, provided to the borrowTraversalEngine function
|
||||
* @param traversalEngine the borrowed traversal engine. Must have been previously borrowed.
|
||||
*/
|
||||
protected synchronized void returnTraversalEngine(final TraversalEngine traversalEngine) {
|
||||
protected synchronized void returnTraversalEngine(final Object key, final TraversalEngine traversalEngine) {
|
||||
if ( traversalEngine == null )
|
||||
throw new IllegalArgumentException("Attempting to push a null traversal engine");
|
||||
if ( ! allCreatedTraversalEngines.contains(traversalEngine) )
|
||||
throw new IllegalArgumentException("Attempting to push a traversal engine not created by this MicroScheduler" + engine);
|
||||
if ( ! allocatedTraversalEngines.containsKey(key) )
|
||||
throw new IllegalArgumentException("No traversal engine was never checked out with key " + key);
|
||||
|
||||
availableTraversalEngines.push(traversalEngine);
|
||||
// note there's nothing to actually do here, but a function implementation
|
||||
// might want to do something
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,9 +4,10 @@ import org.apache.log4j.Logger;
|
|||
import org.broadinstitute.sting.gatk.datasources.providers.LocusShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.datasources.providers.ShardDataProvider;
|
||||
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
|
||||
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
|
||||
import org.broadinstitute.sting.gatk.io.ThreadGroupOutputTracker;
|
||||
import org.broadinstitute.sting.gatk.traversals.TraversalEngine;
|
||||
import org.broadinstitute.sting.gatk.walkers.Walker;
|
||||
import org.broadinstitute.sting.utils.Utils;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
|
@ -29,7 +30,7 @@ public class ShardTraverser implements Callable {
|
|||
final private HierarchicalMicroScheduler microScheduler;
|
||||
final private Walker walker;
|
||||
final private Shard shard;
|
||||
final private ThreadLocalOutputTracker outputTracker;
|
||||
final private ThreadGroupOutputTracker outputTracker;
|
||||
private OutputMergeTask outputMergeTask;
|
||||
|
||||
/** our log, which we want to capture anything from this class */
|
||||
|
|
@ -43,7 +44,7 @@ public class ShardTraverser implements Callable {
|
|||
public ShardTraverser( HierarchicalMicroScheduler microScheduler,
|
||||
Walker walker,
|
||||
Shard shard,
|
||||
ThreadLocalOutputTracker outputTracker) {
|
||||
ThreadGroupOutputTracker outputTracker) {
|
||||
this.microScheduler = microScheduler;
|
||||
this.walker = walker;
|
||||
this.shard = shard;
|
||||
|
|
@ -51,13 +52,15 @@ public class ShardTraverser implements Callable {
|
|||
}
|
||||
|
||||
public Object call() {
|
||||
final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine();
|
||||
final Object traversalEngineKey = Thread.currentThread();
|
||||
final TraversalEngine traversalEngine = microScheduler.borrowTraversalEngine(traversalEngineKey);
|
||||
|
||||
try {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
// this is CRITICAL -- initializes the thread-local output maps in the parent thread,
|
||||
// so that any subthreads created by the traversal itself are shared...
|
||||
outputTracker.getStorageAndInitializeIfNecessary();
|
||||
// this is CRITICAL -- initializes output maps in this master thread,
|
||||
// so that any subthreads created by the traversal itself can access this map
|
||||
outputTracker.initializeStorage();
|
||||
|
||||
Object accumulator = walker.reduceInit();
|
||||
final WindowMaker windowMaker = new WindowMaker(shard,microScheduler.getEngine().getGenomeLocParser(),
|
||||
|
|
@ -85,12 +88,20 @@ public class ShardTraverser implements Callable {
|
|||
} finally {
|
||||
synchronized(this) {
|
||||
complete = true;
|
||||
microScheduler.returnTraversalEngine(traversalEngine);
|
||||
microScheduler.returnTraversalEngine(traversalEngineKey, traversalEngine);
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a human readable string describing the intervals this traverser is operating on
|
||||
* @return
|
||||
*/
|
||||
public String getIntervalsString() {
|
||||
return Utils.join(",", shard.getGenomeLocs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Has this traversal completed?
|
||||
* @return True if completed, false otherwise.
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import org.broadinstitute.sting.gatk.executive.OutputMergeTask;
|
|||
import org.broadinstitute.sting.gatk.io.storage.Storage;
|
||||
import org.broadinstitute.sting.gatk.io.storage.StorageFactory;
|
||||
import org.broadinstitute.sting.gatk.io.stubs.Stub;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
|
||||
import java.io.File;
|
||||
|
|
@ -37,19 +38,25 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* An output tracker that can either track its output per-thread or directly,
|
||||
* An output tracker that can either track its output per-thread or directly.
|
||||
*
|
||||
* This output tracker doesn't use thread local values, but rather looks up the
|
||||
* storage map via the thread's group. This is necessary in the case where
|
||||
* there's a master thread that creates the output map, and spawns subthreads
|
||||
* that actually do work. As long as those subthreads are spawned in the
|
||||
* thread group of the master thread, this tracker will properly find the
|
||||
* storage map associated with the master thread in the group, and return
|
||||
* the map to all subthreads.
|
||||
*
|
||||
* @author mhanna, depristo
|
||||
* @version 0.2
|
||||
*/
|
||||
public class ThreadLocalOutputTracker extends OutputTracker {
|
||||
public class ThreadGroupOutputTracker extends OutputTracker {
|
||||
/**
|
||||
* Thread-local storage for output streams.
|
||||
*
|
||||
* MUST BE A INHERITABLE THREAD LOCAL
|
||||
* -- NanoScheduler creates subthreads, and these threads must inherit the binding from their parent
|
||||
* A map from thread ID of the master thread to the storage map from
|
||||
* Stub to Storage objects
|
||||
*/
|
||||
private ThreadLocal<Map<Stub, Storage>> storage = new InheritableThreadLocal<Map<Stub, Storage>>();
|
||||
private Map<ThreadGroup, Map<Stub, Storage>> threadsToStorage = new HashMap<ThreadGroup, Map<Stub, Storage>>();
|
||||
|
||||
/**
|
||||
* A total hack. If bypass = true, bypass thread local storage and write directly
|
||||
|
|
@ -61,32 +68,36 @@ public class ThreadLocalOutputTracker extends OutputTracker {
|
|||
}
|
||||
|
||||
/**
|
||||
* Initialize the storage map for this thread, if necessary.
|
||||
* Initialize the storage map for this thread.
|
||||
*
|
||||
* Checks if there's a thread local binding for this thread, and if
|
||||
* not initializes it.
|
||||
* not initializes the map for it. This map is then
|
||||
* populated with stub -> storage bindings according to the
|
||||
* superclasses' outputs map.
|
||||
*
|
||||
* Particularly useful in the case where we want to initialize the map in
|
||||
* a parent thread but have it used available to all the children via
|
||||
* the InheritedThreadLocal map.
|
||||
*
|
||||
* @return the storage
|
||||
* Must be called within the master thread to create a map associated with
|
||||
* the master thread ID.
|
||||
*/
|
||||
public Map<Stub,Storage> getStorageAndInitializeIfNecessary() {
|
||||
Map<Stub,Storage> threadLocalOutputStreams = storage.get();
|
||||
public synchronized void initializeStorage() {
|
||||
final ThreadGroup group = Thread.currentThread().getThreadGroup();
|
||||
Map<Stub,Storage> threadLocalOutputStreams = threadsToStorage.get(group);
|
||||
|
||||
if( threadLocalOutputStreams == null ) {
|
||||
threadLocalOutputStreams = new HashMap<Stub,Storage>();
|
||||
storage.set( threadLocalOutputStreams );
|
||||
threadsToStorage.put( group, threadLocalOutputStreams );
|
||||
}
|
||||
|
||||
return threadLocalOutputStreams;
|
||||
for ( final Stub stub : outputs.keySet() ) {
|
||||
final Storage target = StorageFactory.createStorage(stub, createTempFile(stub));
|
||||
threadLocalOutputStreams.put(stub, target);
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T getStorage( Stub<T> stub ) {
|
||||
@Override
|
||||
public <T> T getStorage( final Stub<T> stub ) {
|
||||
Storage target;
|
||||
|
||||
if(bypass) {
|
||||
if (bypass) {
|
||||
target = outputs.get(stub);
|
||||
if( target == null ) {
|
||||
target = StorageFactory.createStorage(stub);
|
||||
|
|
@ -94,36 +105,50 @@ public class ThreadLocalOutputTracker extends OutputTracker {
|
|||
}
|
||||
}
|
||||
else {
|
||||
final Map<Stub,Storage> threadLocalOutputStreams = getStorageAndInitializeIfNecessary();
|
||||
|
||||
final Map<Stub,Storage> threadLocalOutputStreams = findStorage(Thread.currentThread());
|
||||
target = threadLocalOutputStreams.get(stub);
|
||||
if( target == null ) {
|
||||
target = StorageFactory.createStorage(stub, createTempFile(stub));
|
||||
threadLocalOutputStreams.put(stub, target);
|
||||
}
|
||||
|
||||
// make sure something hasn't gone wrong, and we somehow find a map that doesn't include our stub
|
||||
if ( target == null )
|
||||
throw new ReviewedStingException("target isn't supposed to be null for " + Thread.currentThread()
|
||||
+ " id " + Thread.currentThread().getId() + " map is " + threadLocalOutputStreams);
|
||||
}
|
||||
|
||||
return (T)target;
|
||||
}
|
||||
|
||||
|
||||
private synchronized Map<Stub,Storage> findStorage(final Thread thread) {
|
||||
final Map<Stub, Storage> map = threadsToStorage.get(thread.getThreadGroup());
|
||||
|
||||
if ( map != null ) {
|
||||
return map;
|
||||
} else {
|
||||
// something is terribly wrong, we have a storage lookup for a thread that doesn't have
|
||||
// any map data associated with it!
|
||||
throw new ReviewedStingException("Couldn't find storage map associated with thread " + thread + " in group " + thread.getThreadGroup());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close down any existing temporary files which have been opened.
|
||||
*/
|
||||
public OutputMergeTask closeStorage() {
|
||||
Map<Stub,Storage> threadLocalOutputStreams = storage.get();
|
||||
public synchronized OutputMergeTask closeStorage() {
|
||||
final Map<Stub,Storage> threadLocalOutputStreams = findStorage(Thread.currentThread());
|
||||
|
||||
if( threadLocalOutputStreams == null || threadLocalOutputStreams.isEmpty() )
|
||||
return null;
|
||||
|
||||
OutputMergeTask outputMergeTask = new OutputMergeTask();
|
||||
final OutputMergeTask outputMergeTask = new OutputMergeTask();
|
||||
for( Map.Entry<Stub,Storage> entry: threadLocalOutputStreams.entrySet() ) {
|
||||
Stub stub = entry.getKey();
|
||||
Storage storageEntry = entry.getValue();
|
||||
final Stub stub = entry.getKey();
|
||||
final Storage storageEntry = entry.getValue();
|
||||
|
||||
storageEntry.close();
|
||||
outputMergeTask.addMergeOperation(getTargetStream(stub),storageEntry);
|
||||
outputMergeTask.addMergeOperation(getTargetStream(stub), storageEntry);
|
||||
}
|
||||
|
||||
|
||||
// logger.info("Closing " + Thread.currentThread().getId() + " => " + threadLocalOutputStreams);
|
||||
threadLocalOutputStreams.clear();
|
||||
|
||||
return outputMergeTask;
|
||||
|
|
@ -136,16 +161,10 @@ public class ThreadLocalOutputTracker extends OutputTracker {
|
|||
* @return A temp file, or throw an exception if the temp file cannot be created.
|
||||
*/
|
||||
private <T> File createTempFile( Stub<T> stub ) {
|
||||
File tempFile = null;
|
||||
|
||||
try {
|
||||
tempFile = File.createTempFile( stub.getClass().getName(), null );
|
||||
//tempFile.deleteOnExit();
|
||||
}
|
||||
catch( IOException ex ) {
|
||||
return File.createTempFile( stub.getClass().getName(), null );
|
||||
} catch( IOException ex ) {
|
||||
throw new UserException.BadTmpDir("Unable to create temporary file for stub: " + stub.getClass().getName() );
|
||||
}
|
||||
|
||||
return tempFile;
|
||||
}
|
||||
}
|
||||
|
|
@ -89,7 +89,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
|
|||
* @param tempFile File into which to direct the output data.
|
||||
*/
|
||||
public VariantContextWriterStorage(VariantContextWriterStub stub, File tempFile) {
|
||||
logger.debug("Creating temporary output file " + tempFile.getAbsolutePath() + " for VariantContext output.");
|
||||
//logger.debug("Creating temporary output file " + tempFile.getAbsolutePath() + " for VariantContext output.");
|
||||
this.file = tempFile;
|
||||
this.writer = vcfWriterToFile(stub, file, false);
|
||||
writer.writeHeader(stub.getVCFHeader());
|
||||
|
|
@ -154,6 +154,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
|
|||
}
|
||||
|
||||
public void add(VariantContext vc) {
|
||||
if ( closed ) throw new ReviewedStingException("Attempting to write to a closed VariantContextWriterStorage " + vc.getStart() + " storage=" + this);
|
||||
writer.add(vc);
|
||||
}
|
||||
|
||||
|
|
@ -170,8 +171,6 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
|
|||
* Close the VCF storage object.
|
||||
*/
|
||||
public void close() {
|
||||
if(file != null)
|
||||
logger.debug("Closing temporary file " + file.getAbsolutePath());
|
||||
writer.close();
|
||||
closed = true;
|
||||
}
|
||||
|
|
@ -181,7 +180,7 @@ public class VariantContextWriterStorage implements Storage<VariantContextWriter
|
|||
if ( ! closed )
|
||||
throw new ReviewedStingException("Writer not closed, but we are merging into the file!");
|
||||
final String targetFilePath = target.file != null ? target.file.getAbsolutePath() : "/dev/stdin";
|
||||
logger.debug(String.format("Merging %s into %s",file.getAbsolutePath(),targetFilePath));
|
||||
logger.debug(String.format("Merging VariantContextWriterStorage from %s into %s", file.getAbsolutePath(), targetFilePath));
|
||||
|
||||
// use the feature manager to determine the right codec for the tmp file
|
||||
// that way we don't assume it's a specific type
|
||||
|
|
|
|||
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Copyright (c) 2010 The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
|
||||
* THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.walkers.qc;
|
||||
|
||||
import org.broadinstitute.sting.commandline.*;
|
||||
import org.broadinstitute.sting.gatk.CommandLineGATK;
|
||||
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
|
||||
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
|
||||
import org.broadinstitute.sting.gatk.filters.*;
|
||||
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
|
||||
import org.broadinstitute.sting.gatk.walkers.*;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.help.DocumentedGATKFeature;
|
||||
import org.broadinstitute.sting.utils.pileup.PileupElement;
|
||||
import org.broadinstitute.sting.utils.pileup.ReadBackedPileup;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Emits intervals present in either the original or reduced bam but not the other.
|
||||
*
|
||||
* <h2>Input</h2>
|
||||
* <p>
|
||||
* The original and reduced BAM files.
|
||||
* </p>
|
||||
*
|
||||
* <h2>Output</h2>
|
||||
* <p>
|
||||
* A list of intervals present in one bam but not the other.
|
||||
* </p>
|
||||
*
|
||||
* <h2>Examples</h2>
|
||||
* <pre>
|
||||
* java -Xmx2g -jar GenomeAnalysisTK.jar \
|
||||
* -I:original original.bam \
|
||||
* -I:reduced reduced.bam \
|
||||
* -R ref.fasta \
|
||||
* -T AssessReducedCoverage \
|
||||
* -o output.intervals
|
||||
* </pre>
|
||||
*
|
||||
* @author ebanks
|
||||
*/
|
||||
@DocumentedGATKFeature( groupName = "Quality Control and Simple Analysis Tools", extraDocs = {CommandLineGATK.class} )
|
||||
@ReadFilters({UnmappedReadFilter.class, NotPrimaryAlignmentFilter.class, DuplicateReadFilter.class, FailsVendorQualityCheckFilter.class, BadCigarFilter.class})
|
||||
@Hidden
|
||||
public class AssessReducedCoverage extends LocusWalker<GenomeLoc, GenomeLoc> implements TreeReducible<GenomeLoc> {
|
||||
|
||||
private static final String original = "original";
|
||||
private static final String reduced = "reduced";
|
||||
|
||||
@Output
|
||||
protected PrintStream out;
|
||||
|
||||
@Override
|
||||
public boolean includeReadsWithDeletionAtLoci() { return true; }
|
||||
|
||||
@Argument(fullName = "output_reduced_only_coverage", shortName = "output_reduced_only_coverage", doc = "Output an interval if the reduced bam has coverage where the original does not", required = false)
|
||||
public boolean OUTPUT_REDUCED_ONLY_INTERVALS = false;
|
||||
|
||||
public void initialize() {}
|
||||
|
||||
public GenomeLoc map(RefMetaDataTracker tracker, ReferenceContext ref, AlignmentContext context) {
|
||||
|
||||
if ( tracker == null )
|
||||
return null;
|
||||
|
||||
Set<String> tags = getAllTags(context.getBasePileup());
|
||||
return (tags.contains(original) && !tags.contains(reduced)) ||
|
||||
(OUTPUT_REDUCED_ONLY_INTERVALS && tags.contains(reduced) && !tags.contains(original)) ? ref.getLocus() : null;
|
||||
}
|
||||
|
||||
private Set<String> getAllTags(final ReadBackedPileup pileup) {
|
||||
|
||||
final Set<String> tags = new HashSet<String>(10);
|
||||
|
||||
for ( final PileupElement p : pileup ) {
|
||||
if ( (int)p.getQual() > 2 && p.getMappingQual() > 0 && !p.isDeletion() )
|
||||
tags.addAll(getToolkit().getReaderIDForRead(p.getRead()).getTags().getPositionalTags());
|
||||
}
|
||||
|
||||
return tags;
|
||||
}
|
||||
|
||||
public void onTraversalDone(GenomeLoc sum) {
|
||||
if ( sum != null )
|
||||
out.println(sum);
|
||||
}
|
||||
|
||||
public GenomeLoc reduceInit() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public GenomeLoc treeReduce(GenomeLoc lhs, GenomeLoc rhs) {
|
||||
if ( lhs == null )
|
||||
return rhs;
|
||||
|
||||
if ( rhs == null )
|
||||
return lhs;
|
||||
|
||||
// if contiguous, just merge them
|
||||
if ( lhs.contiguousP(rhs) )
|
||||
return getToolkit().getGenomeLocParser().createGenomeLoc(lhs.getContig(), lhs.getStart(), rhs.getStop());
|
||||
|
||||
// otherwise, print the lhs and start over with the rhs
|
||||
out.println(lhs);
|
||||
return rhs;
|
||||
}
|
||||
|
||||
public GenomeLoc reduce(GenomeLoc value, GenomeLoc sum) {
|
||||
if ( value == null )
|
||||
return sum;
|
||||
|
||||
if ( sum == null )
|
||||
return value;
|
||||
|
||||
// if contiguous, just merge them
|
||||
if ( sum.contiguousP(value) )
|
||||
return getToolkit().getGenomeLocParser().createGenomeLoc(sum.getContig(), sum.getStart(), value.getStop());
|
||||
|
||||
// otherwise, print the sum and start over with the value
|
||||
out.println(sum);
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
|
@ -171,6 +171,9 @@ public class VariantEval extends RodWalker<Integer, Integer> implements TreeRedu
|
|||
@Argument(shortName="mvq", fullName="mendelianViolationQualThreshold", doc="Minimum genotype QUAL score for each trio member required to accept a site as a violation. Default is 50.", required=false)
|
||||
protected double MENDELIAN_VIOLATION_QUAL_THRESHOLD = 50;
|
||||
|
||||
@Argument(shortName="ploidy", fullName="samplePloidy", doc="Per-sample ploidy (number of chromosomes per sample)", required=false)
|
||||
protected int ploidy = VariantContextUtils.DEFAULT_PLOIDY;
|
||||
|
||||
@Argument(fullName="ancestralAlignments", shortName="aa", doc="Fasta file with ancestral alleles", required=false)
|
||||
private File ancestralAlignmentsFile = null;
|
||||
|
||||
|
|
@ -574,6 +577,7 @@ public class VariantEval extends RodWalker<Integer, Integer> implements TreeRedu
|
|||
|
||||
public double getMinPhaseQuality() { return MIN_PHASE_QUALITY; }
|
||||
|
||||
public int getSamplePloidy() { return ploidy; }
|
||||
public double getMendelianViolationQualThreshold() { return MENDELIAN_VIOLATION_QUAL_THRESHOLD; }
|
||||
|
||||
public static String getAllSampleName() { return ALL_SAMPLE_NAME; }
|
||||
|
|
|
|||
|
|
@ -27,9 +27,9 @@ public class AlleleCount extends VariantStratifier {
|
|||
if ( getVariantEvalWalker().getEvals().size() != 1 && !getVariantEvalWalker().mergeEvals )
|
||||
throw new UserException.BadArgumentValue("AlleleCount", "AlleleCount stratification only works with a single eval vcf");
|
||||
|
||||
// There are 2 x n sample chromosomes for diploids
|
||||
// There are ploidy x n sample chromosomes
|
||||
// TODO -- generalize to handle multiple ploidy
|
||||
nchrom = getVariantEvalWalker().getSampleNamesForEvaluation().size() * 2;
|
||||
nchrom = getVariantEvalWalker().getSampleNamesForEvaluation().size() * getVariantEvalWalker().getSamplePloidy();
|
||||
if ( nchrom < 2 )
|
||||
throw new UserException.BadArgumentValue("AlleleCount", "AlleleCount stratification requires an eval vcf with at least one sample");
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@
|
|||
package org.broadinstitute.sting.utils;
|
||||
|
||||
import com.google.java.contract.Ensures;
|
||||
import com.google.java.contract.Invariant;
|
||||
import com.google.java.contract.Requires;
|
||||
import com.google.java.contract.ThrowEnsures;
|
||||
import net.sf.picard.reference.ReferenceSequenceFile;
|
||||
|
|
@ -70,7 +69,6 @@ public final class GenomeLocParser {
|
|||
private CachingSequenceDictionary getContigInfo() {
|
||||
if ( contigInfoPerThread.get() == null ) {
|
||||
// initialize for this thread
|
||||
logger.debug("Creating thread-local caching sequence dictionary for thread " + Thread.currentThread().getName());
|
||||
contigInfoPerThread.set(new CachingSequenceDictionary(SINGLE_MASTER_SEQUENCE_DICTIONARY));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ public class NanoScheduler<InputType, MapType, ReduceType> {
|
|||
runningMapJobSlots = new Semaphore(this.bufferSize);
|
||||
|
||||
this.inputExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
|
||||
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-input-thread-%d"));
|
||||
this.masterExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("NS-master-thread-%d"));
|
||||
}
|
||||
|
||||
// start timing the time spent outside of the nanoScheduler
|
||||
|
|
|
|||
|
|
@ -250,7 +250,7 @@ class VCFWriter extends IndexingVariantContextWriter {
|
|||
mWriter.write("\n");
|
||||
mWriter.flush(); // necessary so that writing to an output stream will work
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Unable to write the VCF object to " + getStreamName());
|
||||
throw new RuntimeException("Unable to write the VCF object to " + getStreamName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ package org.broadinstitute.sting.gatk.datasources.reads;
|
|||
|
||||
import com.google.caliper.Param;
|
||||
import net.sf.picard.filter.FilteringIterator;
|
||||
import net.sf.samtools.SAMFileHeader;
|
||||
import net.sf.samtools.SAMFileReader;
|
||||
import net.sf.samtools.SAMRecord;
|
||||
import org.broadinstitute.sting.commandline.Tags;
|
||||
|
|
@ -71,6 +72,7 @@ public class DownsamplerBenchmark extends ReadProcessingBenchmark {
|
|||
SAMFileReader reader = new SAMFileReader(inputFile);
|
||||
ReadProperties readProperties = new ReadProperties(Collections.<SAMReaderID>singletonList(new SAMReaderID(inputFile,new Tags())),
|
||||
reader.getFileHeader(),
|
||||
SAMFileHeader.SortOrder.coordinate,
|
||||
false,
|
||||
SAMFileReader.ValidationStringency.SILENT,
|
||||
downsampling.create(),
|
||||
|
|
|
|||
|
|
@ -0,0 +1,203 @@
|
|||
/*
|
||||
* Copyright (c) 2012, The Broad Institute
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following
|
||||
* conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
* OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package org.broadinstitute.sting.gatk.datasources.reads;
|
||||
|
||||
import net.sf.samtools.*;
|
||||
import org.broadinstitute.sting.BaseTest;
|
||||
import org.broadinstitute.sting.commandline.Tags;
|
||||
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
|
||||
import org.broadinstitute.sting.gatk.downsampling.DownsampleType;
|
||||
import org.broadinstitute.sting.gatk.downsampling.DownsamplingMethod;
|
||||
import org.broadinstitute.sting.gatk.filters.ReadFilter;
|
||||
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
|
||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
|
||||
import org.broadinstitute.sting.utils.sam.ArtificialSingleSampleReadStream;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
import org.testng.Assert;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class ExperimentalReadShardBalancerUnitTest extends BaseTest {
|
||||
|
||||
/**
|
||||
* Tests to ensure that ExperimentalReadShardBalancer works as expected and does not place shard boundaries
|
||||
* at inappropriate places, such as within an alignment start position
|
||||
*/
|
||||
private static class ExperimentalReadShardBalancerTest extends TestDataProvider {
|
||||
private int numContigs;
|
||||
private int numStacksPerContig;
|
||||
private int stackSize;
|
||||
private int numUnmappedReads;
|
||||
private DownsamplingMethod downsamplingMethod;
|
||||
private int expectedReadCount;
|
||||
|
||||
private SAMFileHeader header;
|
||||
private SAMReaderID testBAM;
|
||||
|
||||
public ExperimentalReadShardBalancerTest( int numContigs,
|
||||
int numStacksPerContig,
|
||||
int stackSize,
|
||||
int numUnmappedReads,
|
||||
int downsamplingTargetCoverage ) {
|
||||
super(ExperimentalReadShardBalancerTest.class);
|
||||
|
||||
this.numContigs = numContigs;
|
||||
this.numStacksPerContig = numStacksPerContig;
|
||||
this.stackSize = stackSize;
|
||||
this.numUnmappedReads = numUnmappedReads;
|
||||
|
||||
this.downsamplingMethod = new DownsamplingMethod(DownsampleType.BY_SAMPLE, downsamplingTargetCoverage, null, true);
|
||||
this.expectedReadCount = Math.min(stackSize, downsamplingTargetCoverage) * numStacksPerContig * numContigs + numUnmappedReads;
|
||||
|
||||
setName(String.format("%s: numContigs=%d numStacksPerContig=%d stackSize=%d numUnmappedReads=%d downsamplingTargetCoverage=%d",
|
||||
getClass().getSimpleName(), numContigs, numStacksPerContig, stackSize, numUnmappedReads, downsamplingTargetCoverage));
|
||||
}
|
||||
|
||||
public void run() {
|
||||
createTestBAM();
|
||||
|
||||
SAMDataSource dataSource = new SAMDataSource(Arrays.asList(testBAM),
|
||||
new ThreadAllocation(),
|
||||
null,
|
||||
new GenomeLocParser(header.getSequenceDictionary()),
|
||||
false,
|
||||
SAMFileReader.ValidationStringency.SILENT,
|
||||
ReadShard.DEFAULT_MAX_READS, // reset ReadShard.MAX_READS to ReadShard.DEFAULT_MAX_READS for each test
|
||||
downsamplingMethod,
|
||||
new ValidationExclusion(),
|
||||
new ArrayList<ReadFilter>(),
|
||||
false);
|
||||
|
||||
Iterable<Shard> shardIterator = dataSource.createShardIteratorOverAllReads(new ExperimentalReadShardBalancer());
|
||||
|
||||
SAMRecord readAtEndOfLastShard = null;
|
||||
int totalReadsSeen = 0;
|
||||
|
||||
for ( Shard shard : shardIterator ) {
|
||||
int numContigsThisShard = 0;
|
||||
SAMRecord lastRead = null;
|
||||
|
||||
for ( SAMRecord read : shard.iterator() ) {
|
||||
totalReadsSeen++;
|
||||
|
||||
if ( lastRead == null ) {
|
||||
numContigsThisShard = 1;
|
||||
}
|
||||
else if ( ! read.getReadUnmappedFlag() && ! lastRead.getReferenceIndex().equals(read.getReferenceIndex()) ) {
|
||||
numContigsThisShard++;
|
||||
}
|
||||
|
||||
// If the last read from the previous shard is not unmapped, we have to make sure
|
||||
// that no reads in this shard start at the same position
|
||||
if ( readAtEndOfLastShard != null && ! readAtEndOfLastShard.getReadUnmappedFlag() ) {
|
||||
Assert.assertFalse(readAtEndOfLastShard.getReferenceIndex().equals(read.getReferenceIndex()) &&
|
||||
readAtEndOfLastShard.getAlignmentStart() == read.getAlignmentStart(),
|
||||
String.format("Reads from alignment start position %d:%d are split across multiple shards",
|
||||
read.getReferenceIndex(), read.getAlignmentStart()));
|
||||
}
|
||||
|
||||
lastRead = read;
|
||||
}
|
||||
|
||||
// There should never be reads from more than 1 contig in a shard (ignoring unmapped reads)
|
||||
Assert.assertTrue(numContigsThisShard == 1, "found a shard with reads from multiple contigs");
|
||||
|
||||
readAtEndOfLastShard = lastRead;
|
||||
}
|
||||
|
||||
Assert.assertEquals(totalReadsSeen, expectedReadCount, "did not encounter the expected number of reads");
|
||||
}
|
||||
|
||||
private void createTestBAM() {
|
||||
header = ArtificialSAMUtils.createArtificialSamHeader(numContigs, 1, 100000);
|
||||
SAMReadGroupRecord readGroup = new SAMReadGroupRecord("foo");
|
||||
readGroup.setSample("testSample");
|
||||
header.addReadGroup(readGroup);
|
||||
ArtificialSingleSampleReadStream artificialReads = new ArtificialSingleSampleReadStream(header,
|
||||
"foo",
|
||||
numContigs,
|
||||
numStacksPerContig,
|
||||
stackSize,
|
||||
stackSize,
|
||||
1,
|
||||
100,
|
||||
50,
|
||||
150,
|
||||
numUnmappedReads);
|
||||
|
||||
File testBAMFile;
|
||||
try {
|
||||
testBAMFile = File.createTempFile("SAMDataSourceFillShardBoundaryTest", ".bam");
|
||||
testBAMFile.deleteOnExit();
|
||||
}
|
||||
catch ( IOException e ) {
|
||||
throw new ReviewedStingException(String.format("Failed to create temp bam file for test %s. %s", this, e.getMessage()));
|
||||
}
|
||||
|
||||
SAMFileWriter bamWriter = new SAMFileWriterFactory().setCreateIndex(true).makeBAMWriter(header, true, testBAMFile);
|
||||
for ( SAMRecord read : artificialReads ) {
|
||||
bamWriter.addAlignment(read);
|
||||
}
|
||||
bamWriter.close();
|
||||
|
||||
testBAM = new SAMReaderID(testBAMFile, new Tags());
|
||||
|
||||
new File(testBAM.getSamFilePath().replace(".bam", ".bai")).deleteOnExit();
|
||||
new File(testBAM.getSamFilePath() + ".bai").deleteOnExit();
|
||||
}
|
||||
}
|
||||
|
||||
@DataProvider(name = "ExperimentalReadShardBalancerTestDataProvider")
|
||||
public Object[][] createExperimentalReadShardBalancerTests() {
|
||||
for ( int numContigs = 1; numContigs <= 3; numContigs++ ) {
|
||||
for ( int numStacksPerContig : Arrays.asList(1, 2, 4) ) {
|
||||
// Use crucial read shard boundary values as the stack sizes
|
||||
for ( int stackSize : Arrays.asList(ReadShard.DEFAULT_MAX_READS / 2, ReadShard.DEFAULT_MAX_READS / 2 + 10, ReadShard.DEFAULT_MAX_READS, ReadShard.DEFAULT_MAX_READS - 1, ReadShard.DEFAULT_MAX_READS + 1, ReadShard.DEFAULT_MAX_READS * 2) ) {
|
||||
for ( int numUnmappedReads : Arrays.asList(0, ReadShard.DEFAULT_MAX_READS / 2, ReadShard.DEFAULT_MAX_READS * 2) ) {
|
||||
// The first value will result in no downsampling at all, the others in some downsampling
|
||||
for ( int downsamplingTargetCoverage : Arrays.asList(ReadShard.DEFAULT_MAX_READS * 10, ReadShard.DEFAULT_MAX_READS, ReadShard.DEFAULT_MAX_READS / 2) ) {
|
||||
new ExperimentalReadShardBalancerTest(numContigs, numStacksPerContig, stackSize, numUnmappedReads, downsamplingTargetCoverage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ExperimentalReadShardBalancerTest.getTests(ExperimentalReadShardBalancerTest.class);
|
||||
}
|
||||
|
||||
@Test(dataProvider = "ExperimentalReadShardBalancerTestDataProvider")
|
||||
public void runExperimentalReadShardBalancerTest( ExperimentalReadShardBalancerTest test ) {
|
||||
logger.warn("Running test: " + test);
|
||||
|
||||
test.run();
|
||||
}
|
||||
}
|
||||
|
|
@ -29,30 +29,21 @@ import net.sf.samtools.*;
|
|||
import org.broadinstitute.sting.BaseTest;
|
||||
import org.broadinstitute.sting.commandline.Tags;
|
||||
import org.broadinstitute.sting.gatk.arguments.ValidationExclusion;
|
||||
import org.broadinstitute.sting.gatk.downsampling.DownsampleType;
|
||||
import org.broadinstitute.sting.gatk.downsampling.DownsamplingMethod;
|
||||
import org.broadinstitute.sting.gatk.filters.ReadFilter;
|
||||
import org.broadinstitute.sting.gatk.iterators.ReadTransformer;
|
||||
import org.broadinstitute.sting.gatk.iterators.StingSAMIterator;
|
||||
import org.broadinstitute.sting.gatk.resourcemanagement.ThreadAllocation;
|
||||
import org.broadinstitute.sting.utils.GenomeLoc;
|
||||
import org.broadinstitute.sting.utils.GenomeLocParser;
|
||||
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
|
||||
import org.broadinstitute.sting.utils.exceptions.UserException;
|
||||
import org.broadinstitute.sting.utils.fasta.CachingIndexedFastaSequenceFile;
|
||||
import org.broadinstitute.sting.utils.sam.ArtificialSAMUtils;
|
||||
import org.broadinstitute.sting.utils.sam.ArtificialSingleSampleReadStream;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
import org.testng.Assert;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
|
|
@ -66,165 +57,12 @@ import static org.testng.Assert.*;
|
|||
*/
|
||||
public class SAMDataSourceUnitTest extends BaseTest {
|
||||
|
||||
// TODO: These legacy tests should really be replaced with a more comprehensive suite of tests for SAMDataSource
|
||||
|
||||
private List<SAMReaderID> readers;
|
||||
private IndexedFastaSequenceFile seq;
|
||||
private GenomeLocParser genomeLocParser;
|
||||
|
||||
|
||||
/***********************************
|
||||
* Tests for the fillShard() method
|
||||
***********************************/
|
||||
|
||||
/**
|
||||
* Tests to ensure that the fillShard() method does not place shard boundaries at inappropriate places,
|
||||
* such as within an alignment start position
|
||||
*/
|
||||
private static class SAMDataSourceFillShardBoundaryTest extends TestDataProvider {
|
||||
private int numContigs;
|
||||
private int numStacksPerContig;
|
||||
private int stackSize;
|
||||
private int numUnmappedReads;
|
||||
private DownsamplingMethod downsamplingMethod;
|
||||
|
||||
private SAMFileHeader header;
|
||||
|
||||
public SAMDataSourceFillShardBoundaryTest( int numContigs,
|
||||
int numStacksPerContig,
|
||||
int stackSize,
|
||||
int numUnmappedReads,
|
||||
int downsamplingTargetCoverage ) {
|
||||
super(SAMDataSourceFillShardBoundaryTest.class);
|
||||
|
||||
this.numContigs = numContigs;
|
||||
this.numStacksPerContig = numStacksPerContig;
|
||||
this.stackSize = stackSize;
|
||||
this.numUnmappedReads = numUnmappedReads;
|
||||
|
||||
this.downsamplingMethod = new DownsamplingMethod(DownsampleType.BY_SAMPLE, downsamplingTargetCoverage, null, true);
|
||||
|
||||
setName(String.format("%s: numContigs=%d numStacksPerContig=%d stackSize=%d numUnmappedReads=%d downsamplingTargetCoverage=%d",
|
||||
getClass().getSimpleName(), numContigs, numStacksPerContig, stackSize, numUnmappedReads, downsamplingTargetCoverage));
|
||||
}
|
||||
|
||||
public void run() {
|
||||
SAMDataSource dataSource = new SAMDataSource(Arrays.asList(createTestBAM()),
|
||||
new ThreadAllocation(),
|
||||
null,
|
||||
new GenomeLocParser(header.getSequenceDictionary()),
|
||||
false,
|
||||
SAMFileReader.ValidationStringency.SILENT,
|
||||
null,
|
||||
downsamplingMethod,
|
||||
new ValidationExclusion(),
|
||||
new ArrayList<ReadFilter>(),
|
||||
false);
|
||||
|
||||
Assert.assertTrue(dataSource.usingExpandedShards());
|
||||
|
||||
Iterable<Shard> shardIterator = dataSource.createShardIteratorOverAllReads(new ReadShardBalancer());
|
||||
|
||||
SAMRecord readAtEndOfLastShard = null;
|
||||
|
||||
for ( Shard shard : shardIterator ) {
|
||||
int numContigsThisShard = 0;
|
||||
SAMRecord lastRead = null;
|
||||
|
||||
for ( SAMRecord read : shard.iterator() ) {
|
||||
if ( lastRead == null ) {
|
||||
numContigsThisShard = 1;
|
||||
}
|
||||
else if ( ! read.getReadUnmappedFlag() && ! lastRead.getReferenceIndex().equals(read.getReferenceIndex()) ) {
|
||||
numContigsThisShard++;
|
||||
}
|
||||
|
||||
// If the last read from the previous shard is not unmapped, we have to make sure
|
||||
// that no reads in this shard start at the same position
|
||||
if ( readAtEndOfLastShard != null && ! readAtEndOfLastShard.getReadUnmappedFlag() ) {
|
||||
Assert.assertFalse(readAtEndOfLastShard.getReferenceIndex().equals(read.getReferenceIndex()) &&
|
||||
readAtEndOfLastShard.getAlignmentStart() == read.getAlignmentStart(),
|
||||
String.format("Reads from alignment start position %d:%d are split across multiple shards",
|
||||
read.getReferenceIndex(), read.getAlignmentStart()));
|
||||
}
|
||||
|
||||
lastRead = read;
|
||||
}
|
||||
|
||||
// There should never be reads from more than 1 contig in a shard (ignoring unmapped reads)
|
||||
Assert.assertTrue(numContigsThisShard == 1, "found a shard with reads from multiple contigs");
|
||||
|
||||
readAtEndOfLastShard = lastRead;
|
||||
}
|
||||
}
|
||||
|
||||
private SAMReaderID createTestBAM() {
|
||||
header = ArtificialSAMUtils.createArtificialSamHeader(numContigs, 1, 100000);
|
||||
SAMReadGroupRecord readGroup = new SAMReadGroupRecord("foo");
|
||||
readGroup.setSample("testSample");
|
||||
header.addReadGroup(readGroup);
|
||||
ArtificialSingleSampleReadStream artificialReads = new ArtificialSingleSampleReadStream(header,
|
||||
"foo",
|
||||
numContigs,
|
||||
numStacksPerContig,
|
||||
stackSize,
|
||||
stackSize,
|
||||
1,
|
||||
100,
|
||||
50,
|
||||
150,
|
||||
numUnmappedReads);
|
||||
|
||||
File testBAMFile;
|
||||
try {
|
||||
testBAMFile = File.createTempFile("SAMDataSourceFillShardBoundaryTest", ".bam");
|
||||
testBAMFile.deleteOnExit();
|
||||
}
|
||||
catch ( IOException e ) {
|
||||
throw new ReviewedStingException(String.format("Failed to create temp bam file for test %s. %s", this, e.getMessage()));
|
||||
}
|
||||
|
||||
SAMFileWriter bamWriter = new SAMFileWriterFactory().setCreateIndex(true).makeBAMWriter(header, true, testBAMFile);
|
||||
for ( SAMRecord read : artificialReads ) {
|
||||
bamWriter.addAlignment(read);
|
||||
}
|
||||
bamWriter.close();
|
||||
|
||||
return new SAMReaderID(testBAMFile, new Tags());
|
||||
}
|
||||
}
|
||||
|
||||
@DataProvider(name = "SAMDataSourceFillShardTestDataProvider")
|
||||
public Object[][] createSAMDataSourceFillShardBoundaryTests() {
|
||||
// Take downsampling out of the equation for these tests -- we are only interested in whether the
|
||||
// shard boundaries occur at the right places in the read stream, and removing downsampling as a
|
||||
// factor simplifies that task (note that we still need to provide a specific downsampling method with
|
||||
// experimental downsampling enabled to trigger the shard expansion behavior, for now)
|
||||
int downsamplingTargetCoverage = ReadShard.MAX_READS * 10;
|
||||
|
||||
for ( int numContigs = 1; numContigs <= 3; numContigs++ ) {
|
||||
for ( int numStacksPerContig : Arrays.asList(1, 2, 4) ) {
|
||||
// Use crucial read shard boundary values as the stack sizes
|
||||
for ( int stackSize : Arrays.asList(ReadShard.MAX_READS / 2, ReadShard.MAX_READS / 2 + 10, ReadShard.MAX_READS, ReadShard.MAX_READS - 1, ReadShard.MAX_READS + 1, ReadShard.MAX_READS * 2) ) {
|
||||
for ( int numUnmappedReads : Arrays.asList(0, ReadShard.MAX_READS / 2, ReadShard.MAX_READS * 2) ) {
|
||||
new SAMDataSourceFillShardBoundaryTest(numContigs, numStacksPerContig, stackSize, numUnmappedReads, downsamplingTargetCoverage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return SAMDataSourceFillShardBoundaryTest.getTests(SAMDataSourceFillShardBoundaryTest.class);
|
||||
}
|
||||
|
||||
// TODO: re-enable these tests once the issues with filepointer ordering + the downsamplers are worked out
|
||||
@Test(dataProvider = "SAMDataSourceFillShardTestDataProvider", enabled = false)
|
||||
public void testSAMDataSourceFillShard( SAMDataSourceFillShardBoundaryTest test ) {
|
||||
logger.warn("Running test: " + test);
|
||||
|
||||
test.run();
|
||||
}
|
||||
|
||||
|
||||
// TODO: the legacy tests below should really be replaced with a more comprehensive suite of tests for SAMDataSource
|
||||
|
||||
/**
|
||||
* This function does the setup of our parser, before each method call.
|
||||
* <p/>
|
||||
|
|
|
|||
|
|
@ -502,6 +502,7 @@ public class LocusIteratorByStateExperimentalUnitTest extends BaseTest {
|
|||
return new ReadProperties(
|
||||
Collections.<SAMReaderID>emptyList(),
|
||||
new SAMFileHeader(),
|
||||
SAMFileHeader.SortOrder.coordinate,
|
||||
false,
|
||||
SAMFileReader.ValidationStringency.STRICT,
|
||||
downsamplingMethod,
|
||||
|
|
|
|||
|
|
@ -333,6 +333,7 @@ public class LocusIteratorByStateUnitTest extends BaseTest {
|
|||
return new ReadProperties(
|
||||
Collections.<SAMReaderID>emptyList(),
|
||||
new SAMFileHeader(),
|
||||
SAMFileHeader.SortOrder.coordinate,
|
||||
false,
|
||||
SAMFileReader.ValidationStringency.STRICT,
|
||||
null,
|
||||
|
|
|
|||
|
|
@ -349,7 +349,7 @@ public class UnifiedGenotyperIntegrationTest extends WalkerTest {
|
|||
WalkerTest.WalkerTestSpec spec2 = new WalkerTest.WalkerTestSpec(
|
||||
baseCommandIndels + " --genotyping_mode GENOTYPE_GIVEN_ALLELES -alleles " + result.get(0).getAbsolutePath() + " -I " + validationDataLocation +
|
||||
"low_coverage_CEU.chr1.10k-11k.bam -o %s -L 1:10450700-10551000", 1,
|
||||
Arrays.asList("1a4d856bfe53d9acee0ea303c4b83bb1"));
|
||||
Arrays.asList("c7792e27477ecf99893a76ecbac5c2f9"));
|
||||
executeTest("test MultiSample Pilot1 CEU indels using GENOTYPE_GIVEN_ALLELES", spec2);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -598,8 +598,8 @@ public class VariantContextUtilsUnitTest extends BaseTest {
|
|||
|
||||
private RepeatDetectorTest(boolean isTrueRepeat, String ref, String refAlleleString, String ... altAlleleStrings) {
|
||||
super(RepeatDetectorTest.class);
|
||||
this.ref = "N" + ref; // add a dummy base for the event here
|
||||
this.isTrueRepeat = isTrueRepeat;
|
||||
this.ref = ref;
|
||||
|
||||
List<Allele> alleles = new LinkedList<Allele>();
|
||||
final Allele refAllele = Allele.create(refAlleleString, true);
|
||||
|
|
@ -609,7 +609,7 @@ public class VariantContextUtilsUnitTest extends BaseTest {
|
|||
alleles.add(alt);
|
||||
}
|
||||
|
||||
VariantContextBuilder builder = new VariantContextBuilder("test", "chr1", 1, 1 + refAllele.length(), alleles);
|
||||
VariantContextBuilder builder = new VariantContextBuilder("test", "chr1", 1, refAllele.length(), alleles);
|
||||
this.vc = builder.make();
|
||||
}
|
||||
|
||||
|
|
@ -620,31 +620,31 @@ public class VariantContextUtilsUnitTest extends BaseTest {
|
|||
|
||||
@DataProvider(name = "RepeatDetectorTest")
|
||||
public Object[][] makeRepeatDetectorTest() {
|
||||
new RepeatDetectorTest(true, "AAC", "-", "A");
|
||||
new RepeatDetectorTest(true, "AAC", "A", "-");
|
||||
new RepeatDetectorTest(false, "AAC", "AA", "-");
|
||||
new RepeatDetectorTest(false, "AAC", "-", "C");
|
||||
new RepeatDetectorTest(true, "NAAC", "N", "NA");
|
||||
new RepeatDetectorTest(true, "NAAC", "NA", "N");
|
||||
new RepeatDetectorTest(false, "NAAC", "NAA", "N");
|
||||
new RepeatDetectorTest(false, "NAAC", "N", "NC");
|
||||
new RepeatDetectorTest(false, "AAC", "A", "C");
|
||||
|
||||
// running out of ref bases => false
|
||||
new RepeatDetectorTest(false, "AAC", "-", "CAGTA");
|
||||
new RepeatDetectorTest(false, "NAAC", "N", "NCAGTA");
|
||||
|
||||
// complex repeats
|
||||
new RepeatDetectorTest(true, "ATATATC", "-", "AT");
|
||||
new RepeatDetectorTest(true, "ATATATC", "-", "ATA");
|
||||
new RepeatDetectorTest(true, "ATATATC", "-", "ATAT");
|
||||
new RepeatDetectorTest(true, "ATATATC", "AT", "-");
|
||||
new RepeatDetectorTest(false, "ATATATC", "ATA", "-");
|
||||
new RepeatDetectorTest(false, "ATATATC", "ATAT", "-");
|
||||
new RepeatDetectorTest(true, "NATATATC", "N", "NAT");
|
||||
new RepeatDetectorTest(true, "NATATATC", "N", "NATA");
|
||||
new RepeatDetectorTest(true, "NATATATC", "N", "NATAT");
|
||||
new RepeatDetectorTest(true, "NATATATC", "NAT", "N");
|
||||
new RepeatDetectorTest(false, "NATATATC", "NATA", "N");
|
||||
new RepeatDetectorTest(false, "NATATATC", "NATAT", "N");
|
||||
|
||||
// multi-allelic
|
||||
new RepeatDetectorTest(true, "ATATATC", "-", "AT", "ATAT");
|
||||
new RepeatDetectorTest(true, "ATATATC", "-", "AT", "ATA");
|
||||
new RepeatDetectorTest(true, "ATATATC", "AT", "-", "ATAT");
|
||||
new RepeatDetectorTest(true, "ATATATC", "AT", "-", "ATA"); // two As
|
||||
new RepeatDetectorTest(false, "ATATATC", "AT", "-", "ATC"); // false
|
||||
new RepeatDetectorTest(false, "ATATATC", "AT", "-", "CC"); // false
|
||||
new RepeatDetectorTest(false, "ATATATC", "AT", "ATAT", "CC"); // false
|
||||
new RepeatDetectorTest(true, "NATATATC", "N", "NAT", "NATAT");
|
||||
new RepeatDetectorTest(true, "NATATATC", "N", "NAT", "NATA");
|
||||
new RepeatDetectorTest(true, "NATATATC", "NAT", "N", "NATAT");
|
||||
new RepeatDetectorTest(true, "NATATATC", "NAT", "N", "NATA"); // two As
|
||||
new RepeatDetectorTest(false, "NATATATC", "NAT", "N", "NATC"); // false
|
||||
new RepeatDetectorTest(false, "NATATATC", "NAT", "N", "NCC"); // false
|
||||
new RepeatDetectorTest(false, "NATATATC", "NAT", "NATAT", "NCC"); // false
|
||||
|
||||
return RepeatDetectorTest.getTests(RepeatDetectorTest.class);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ class DataProcessingPipelineTest {
|
|||
" -D " + BaseTest.publicTestDir + "exampleDBSNP.vcf",
|
||||
" -test ",
|
||||
" -p " + projectName).mkString
|
||||
spec.fileMD5s += testOut -> "60d39ae909fdd049920b54e0965b6d3c"
|
||||
spec.fileMD5s += testOut -> "45d97df6d291695b92668e8a55c54cd0"
|
||||
PipelineTest.executeTest(spec)
|
||||
}
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ class DataProcessingPipelineTest {
|
|||
" -bwa /home/unix/carneiro/bin/bwa",
|
||||
" -bwape ",
|
||||
" -p " + projectName).mkString
|
||||
spec.fileMD5s += testOut -> "61ca3237afdfabf78ee27a5bb80dae59"
|
||||
spec.fileMD5s += testOut -> "6e70efbe6bafc3fedd60bd406bd201db"
|
||||
PipelineTest.executeTest(spec)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue