From a6886a4cc0e21c368fcd06e36e144e41113bf0d0 Mon Sep 17 00:00:00 2001 From: Ryan Poplin Date: Wed, 4 Jan 2012 17:03:21 -0500 Subject: [PATCH] Initial commit of the Active Region Traversal. Not ready to be used by anyone yet. --- .../sting/gatk/GenomeAnalysisEngine.java | 2 +- .../executive/HierarchicalMicroScheduler.java | 1 - .../gatk/executive/LinearMicroScheduler.java | 8 +- .../sting/gatk/executive/MicroScheduler.java | 2 + .../traversals/TraverseActiveRegions.java | 213 ++++++++++++++++++ .../gatk/walkers/ActiveRegionWalker.java | 29 +++ .../broadinstitute/sting/utils/GenomeLoc.java | 6 +- .../sting/utils/activeregion/ActiveRead.java | 19 ++ .../utils/activeregion/ActiveRegion.java | 55 +++++ 9 files changed, 331 insertions(+), 4 deletions(-) create mode 100644 public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java create mode 100644 public/java/src/org/broadinstitute/sting/gatk/walkers/ActiveRegionWalker.java create mode 100644 public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRead.java create mode 100644 public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRegion.java diff --git a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java index f954d7650..ede8e9340 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java +++ b/public/java/src/org/broadinstitute/sting/gatk/GenomeAnalysisEngine.java @@ -443,7 +443,7 @@ public class GenomeAnalysisEngine { if(!readsDataSource.hasIndex() && intervals != null && !argCollection.allowIntervalsWithUnindexedBAM) throw new UserException.CommandLineException("Cannot perform interval processing when reads are present but no index is available."); - if(walker instanceof LocusWalker) { + if(walker instanceof LocusWalker || walker instanceof ActiveRegionWalker) { if (readsDataSource.getSortOrder() != SAMFileHeader.SortOrder.coordinate) throw new UserException.MissortedBAM(SAMFileHeader.SortOrder.coordinate, "Locus walkers can only traverse coordinate-sorted data. Please resort your input BAM file(s) or set the Sort Order tag in the header appropriately."); if(intervals == null) diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java index 39e1bdc72..eec440820 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/HierarchicalMicroScheduler.java @@ -11,7 +11,6 @@ import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker; import org.broadinstitute.sting.gatk.walkers.TreeReducible; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; -import org.broadinstitute.sting.utils.exceptions.UserException; import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor; import java.util.Collection; diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java index ff5e1064b..774b532f3 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/LinearMicroScheduler.java @@ -10,6 +10,7 @@ import org.broadinstitute.sting.gatk.datasources.reads.Shard; import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource; import org.broadinstitute.sting.gatk.io.DirectOutputTracker; import org.broadinstitute.sting.gatk.io.OutputTracker; +import org.broadinstitute.sting.gatk.traversals.TraverseActiveRegions; import org.broadinstitute.sting.gatk.walkers.LocusWalker; import org.broadinstitute.sting.gatk.walkers.Walker; import org.broadinstitute.sting.utils.SampleUtils; @@ -55,7 +56,6 @@ public class LinearMicroScheduler extends MicroScheduler { traversalEngine.startTimersIfNecessary(); if(shard.getShardType() == Shard.ShardType.LOCUS) { - LocusWalker lWalker = (LocusWalker)walker; WindowMaker windowMaker = new WindowMaker(shard, engine.getGenomeLocParser(), getReadIterator(shard), shard.getGenomeLocs(), SampleUtils.getSAMFileSamples(engine)); for(WindowMaker.WindowMakerIterator iterator: windowMaker) { @@ -77,6 +77,12 @@ public class LinearMicroScheduler extends MicroScheduler { done = walker.isDone(); } + // Special function call to empty out the work queue. Ugly for now but will be cleaned up when we push this functionality more into the engine + if( traversalEngine instanceof TraverseActiveRegions ) { + final Object result = ((TraverseActiveRegions) traversalEngine).endTraversal(walker, accumulator.getReduceInit()); + accumulator.accumulate(null, result); // Assumes only used with StandardAccumulator + } + Object result = accumulator.finishTraversal(); printOnTraversalDone(result); diff --git a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java index d013db7e8..508099708 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java +++ b/public/java/src/org/broadinstitute/sting/gatk/executive/MicroScheduler.java @@ -128,6 +128,8 @@ public abstract class MicroScheduler implements MicroSchedulerMBean { traversalEngine = new TraverseDuplicates(); } else if (walker instanceof ReadPairWalker) { traversalEngine = new TraverseReadPairs(); + } else if (walker instanceof ActiveRegionWalker) { + traversalEngine = new TraverseActiveRegions(); } else { throw new UnsupportedOperationException("Unable to determine traversal type, the walker is an unknown type."); } diff --git a/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java new file mode 100644 index 000000000..01bfe396a --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/traversals/TraverseActiveRegions.java @@ -0,0 +1,213 @@ +package org.broadinstitute.sting.gatk.traversals; + +import net.sf.samtools.SAMRecord; +import org.apache.log4j.Logger; +import org.broadinstitute.sting.gatk.WalkerManager; +import org.broadinstitute.sting.gatk.contexts.AlignmentContext; +import org.broadinstitute.sting.gatk.contexts.ReferenceContext; +import org.broadinstitute.sting.gatk.datasources.providers.*; +import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; +import org.broadinstitute.sting.gatk.walkers.ActiveRegionWalker; +import org.broadinstitute.sting.gatk.walkers.DataSource; +import org.broadinstitute.sting.gatk.walkers.Walker; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.activeregion.ActiveRegion; +import org.broadinstitute.sting.utils.pileup.PileupElement; +import org.broadinstitute.sting.utils.sam.GATKSAMRecord; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.Queue; + +/** + * Created by IntelliJ IDEA. + * User: rpoplin + * Date: 12/9/11 + */ + +public class TraverseActiveRegions extends TraversalEngine,LocusShardDataProvider> { + /** + * our log, which we want to capture anything from this class + */ + protected static Logger logger = Logger.getLogger(TraversalEngine.class); + + private final Queue workQueue = new LinkedList(); + private final LinkedHashSet myReads = new LinkedHashSet(); + + @Override + protected String getTraversalType() { + return "active regions"; + } + + @Override + public T traverse( final ActiveRegionWalker walker, + final LocusShardDataProvider dataProvider, + T sum) { + logger.debug(String.format("TraverseActiveRegion.traverse: Shard is %s", dataProvider)); + + LocusView locusView = getLocusView( walker, dataProvider ); + + int minStart = Integer.MAX_VALUE; + final LocusReferenceView referenceView = new LocusReferenceView( walker, dataProvider ); + + if ( locusView.hasNext() ) { // trivial optimization to avoid unnecessary processing when there's nothing here at all + + final ArrayList isActiveList = new ArrayList(); + + //ReferenceOrderedView referenceOrderedDataView = new ReferenceOrderedView( dataProvider ); + ReferenceOrderedView referenceOrderedDataView = null; + if ( WalkerManager.getWalkerDataSource(walker) != DataSource.REFERENCE_ORDERED_DATA ) + referenceOrderedDataView = new ManagingReferenceOrderedView( dataProvider ); + else + referenceOrderedDataView = (RodLocusView)locusView; + + // We keep processing while the next reference location is within the interval + while( locusView.hasNext() ) { + final AlignmentContext locus = locusView.next(); + GenomeLoc location = locus.getLocation(); + + dataProvider.getShard().getReadMetrics().incrementNumIterations(); + + if ( locus.hasExtendedEventPileup() ) { + // if the alignment context we received holds an "extended" pileup (i.e. pileup of insertions/deletions + // associated with the current site), we need to update the location. The updated location still starts + // at the current genomic position, but it has to span the length of the longest deletion (if any). + location = engine.getGenomeLocParser().setStop(location,location.getStop()+locus.getExtendedEventPileup().getMaxDeletionLength()); + + // it is possible that the new expanded location spans the current shard boundary; the next method ensures + // that when it is the case, the reference sequence held by the ReferenceView will be reloaded so that + // the view has all the bases we are gonna need. If the location fits within the current view bounds, + // the next call will not do anything to the view: + referenceView.expandBoundsToAccomodateLoc(location); + } + + // create reference context. Note that if we have a pileup of "extended events", the context will + // hold the (longest) stretch of deleted reference bases (if deletions are present in the pileup). + final ReferenceContext refContext = referenceView.getReferenceContext(location); + + // Iterate forward to get all reference ordered data covering this location + final RefMetaDataTracker tracker = referenceOrderedDataView.getReferenceOrderedDataAtLocus(locus.getLocation(), refContext); + + // Call the walkers isActive function for this locus and add them to the list to be integrated later + final boolean isActive = walker.isActive( tracker, refContext, locus ); + isActiveList.add( new ActiveRegion(location, isActive, engine.getGenomeLocParser()) ); + + // Grab all the previously unseen reads from this pileup and add them to the massive read list + for( final PileupElement p : locus.getBasePileup() ) { + final SAMRecord read = p.getRead(); + if( !myReads.contains(read) ) { + myReads.add(read); + } + } + + // If this is the last pileup for this shard then need to calculate the minimum alignment start so that + // we know which active regions in the work queue are now safe to process + if( !locusView.hasNext() ) { + for( final PileupElement p : locus.getBasePileup() ) { + final SAMRecord read = p.getRead(); + if( read.getAlignmentStart() < minStart ) { minStart = read.getAlignmentStart(); } + } + } + printProgress(dataProvider.getShard(),locus.getLocation()); + } + + // Take the individual isActive calls and integrate them into contiguous active regions and + // add these blocks of work to the work queue + final ArrayList activeRegions = integrateActiveList( isActiveList ); + logger.debug("Integrated " + isActiveList.size() + " isActive calls into " + activeRegions.size() + " regions." ); + workQueue.addAll( activeRegions ); + } + + while( workQueue.peek().getLocation().getStop() < minStart ) { + final ActiveRegion activeRegion = workQueue.remove(); + sum = processActiveRegion( activeRegion, myReads, workQueue, sum, walker ); + } + + return sum; + } + + // Special function called in LinearMicroScheduler to empty out the work queue. Ugly for now but will be cleaned up when we push this functionality more into the engine + public T endTraversal( final Walker walker, T sum) { + while( workQueue.peek() != null ) { + final ActiveRegion activeRegion = workQueue.remove(); + sum = processActiveRegion( activeRegion, myReads, workQueue, sum, (ActiveRegionWalker) walker ); + } + + return sum; + } + + private T processActiveRegion( final ActiveRegion activeRegion, final LinkedHashSet reads, final Queue workQueue, final T sum, final ActiveRegionWalker walker ) { + final ArrayList placedReads = new ArrayList(); + for( final SAMRecord read : reads ) { + final GenomeLoc readLoc = this.engine.getGenomeLocParser().createGenomeLoc( read ); + if( activeRegion.getLocation().overlapsP( readLoc ) ) { + // The region which the highest amount of overlap is chosen as the primary region for the read (tie breaking is done as right most region) + long maxOverlap = activeRegion.getLocation().sizeOfOverlap( readLoc ); + ActiveRegion bestRegion = activeRegion; + for( final ActiveRegion otherRegionToTest : workQueue ) { + if( otherRegionToTest.getLocation().sizeOfOverlap(readLoc) >= maxOverlap ) { + maxOverlap = otherRegionToTest.getLocation().sizeOfOverlap(readLoc); + bestRegion = otherRegionToTest; + } + } + bestRegion.add( (GATKSAMRecord) read, true ); + + // The read is also added to all other region in which it overlaps but marked as non-primary + if( !bestRegion.equals(activeRegion) ) { + activeRegion.add( (GATKSAMRecord) read, false ); + } + for( final ActiveRegion otherRegionToTest : workQueue ) { + if( !bestRegion.equals(otherRegionToTest) && otherRegionToTest.getLocation().overlapsP( readLoc ) ) { + activeRegion.add( (GATKSAMRecord) read, false ); + } + } + placedReads.add( read ); + } + } + reads.removeAll( placedReads ); // remove all the reads which have been placed into their active region + + logger.debug(">> Map call with " + activeRegion.getReads().size() + " " + (activeRegion.isActive ? "active" : "inactive") + " reads @ " + activeRegion.getLocation() + " with full extent: " + activeRegion.getReferenceLocation()); + final M x = walker.map( activeRegion, null ); // BUGBUG: tracker needs to be filled in and passed to the walker + return walker.reduce( x, sum ); + } + + /** + * Gets the best view of loci for this walker given the available data. + * @param walker walker to interrogate. + * @param dataProvider Data which which to drive the locus view. + * @return A view of the locus data, where one iteration of the locus view maps to one iteration of the traversal. + */ + private LocusView getLocusView( Walker walker, LocusShardDataProvider dataProvider ) { + DataSource dataSource = WalkerManager.getWalkerDataSource(walker); + if( dataSource == DataSource.READS ) + return new CoveredLocusView(dataProvider); + else if( dataSource == DataSource.REFERENCE ) //|| ! GenomeAnalysisEngine.instance.getArguments().enableRodWalkers ) + return new AllLocusView(dataProvider); + else if( dataSource == DataSource.REFERENCE_ORDERED_DATA ) + return new RodLocusView(dataProvider); + else + throw new UnsupportedOperationException("Unsupported traversal type: " + dataSource); + } + + // integrate active regions into contiguous chunks based on active status + private ArrayList integrateActiveList( final ArrayList activeList ) { + final ArrayList returnList = new ArrayList(); + ActiveRegion prevLocus = activeList.remove(0); + ActiveRegion startLocus = prevLocus; + for( final ActiveRegion thisLocus : activeList ) { + if( prevLocus.isActive != thisLocus.isActive ) { + returnList.add( new ActiveRegion( engine.getGenomeLocParser().createGenomeLoc(startLocus.getLocation().getContig(), startLocus.getLocation().getStart(), prevLocus.getLocation().getStart()), + prevLocus.isActive, engine.getGenomeLocParser() ) ); + startLocus = thisLocus; + } + prevLocus = thisLocus; + } + // output the last region if necessary + if( startLocus != prevLocus ) { + returnList.add( new ActiveRegion( engine.getGenomeLocParser().createGenomeLoc(startLocus.getLocation().getContig(), startLocus.getLocation().getStart(), prevLocus.getLocation().getStart()), + prevLocus.isActive, engine.getGenomeLocParser() ) ); + } + return returnList; + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/walkers/ActiveRegionWalker.java b/public/java/src/org/broadinstitute/sting/gatk/walkers/ActiveRegionWalker.java new file mode 100644 index 000000000..d2891c959 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/gatk/walkers/ActiveRegionWalker.java @@ -0,0 +1,29 @@ +package org.broadinstitute.sting.gatk.walkers; + +import org.broadinstitute.sting.gatk.contexts.AlignmentContext; +import org.broadinstitute.sting.gatk.contexts.ReferenceContext; +import org.broadinstitute.sting.gatk.refdata.ReadMetaDataTracker; +import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker; +import org.broadinstitute.sting.utils.activeregion.ActiveRegion; + +/** + * Created by IntelliJ IDEA. + * User: rpoplin + * Date: 12/7/11 + */ + +@By(DataSource.READS) +@Requires({DataSource.READS, DataSource.REFERENCE_BASES}) +@PartitionBy(PartitionType.READ) +public abstract class ActiveRegionWalker extends Walker { + // Do we actually want to operate on the context? + public boolean filter(final RefMetaDataTracker tracker, final ReferenceContext ref, final AlignmentContext context) { + return true; // We are keeping all the reads + } + + // Determine active status over the AlignmentContext + public abstract boolean isActive(final RefMetaDataTracker tracker, final ReferenceContext ref, final AlignmentContext context); + + // Map over the ActiveRegion + public abstract MapType map(final ActiveRegion activeRegion, final ReadMetaDataTracker metaDataTracker); +} diff --git a/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java b/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java index 345161416..6941b888b 100644 --- a/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java +++ b/public/java/src/org/broadinstitute/sting/utils/GenomeLoc.java @@ -145,7 +145,7 @@ public class GenomeLoc implements Comparable, Serializable, HasGenome } return new GenomeLoc(getContig(), this.contigIndex, - Math.min(getStart(), that.getStart()), + Math.min( getStart(), that.getStart() ), Math.max( getStop(), that.getStop()) ); } @@ -465,4 +465,8 @@ public class GenomeLoc implements Comparable, Serializable, HasGenome private final static double overlapPercent(final GenomeLoc gl1, final GenomeLoc gl2) { return (1.0 * gl1.intersect(gl2).size()) / gl1.size(); } + + public long sizeOfOverlap( final GenomeLoc that ) { + return ( this.overlapsP(that) ? Math.min( getStop(), that.getStop() ) - Math.max( getStart(), that.getStart() ) : 0L ); + } } diff --git a/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRead.java b/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRead.java new file mode 100644 index 000000000..8d08a29b6 --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRead.java @@ -0,0 +1,19 @@ +package org.broadinstitute.sting.utils.activeregion; + +import org.broadinstitute.sting.utils.sam.GATKSAMRecord; + +/** + * Created by IntelliJ IDEA. + * User: rpoplin + * Date: 1/4/12 + */ + +public class ActiveRead { + final public GATKSAMRecord read; + final public boolean isPrimaryRegion; + + ActiveRead( final GATKSAMRecord read, final boolean isPrimaryRegion ) { + this.read = read; + this.isPrimaryRegion = isPrimaryRegion; + } +} diff --git a/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRegion.java b/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRegion.java new file mode 100644 index 000000000..e8908480c --- /dev/null +++ b/public/java/src/org/broadinstitute/sting/utils/activeregion/ActiveRegion.java @@ -0,0 +1,55 @@ +package org.broadinstitute.sting.utils.activeregion; + +import net.sf.picard.reference.IndexedFastaSequenceFile; +import org.broadinstitute.sting.utils.GenomeLoc; +import org.broadinstitute.sting.utils.GenomeLocParser; +import org.broadinstitute.sting.utils.HasGenomeLocation; +import org.broadinstitute.sting.utils.sam.GATKSAMRecord; + +import java.util.ArrayList; + +/** + * Created by IntelliJ IDEA. + * User: rpoplin + * Date: 1/4/12 + */ + +public class ActiveRegion implements HasGenomeLocation { + + private final ArrayList reads = new ArrayList(); + private byte[] reference = null; + private final GenomeLoc loc; + private GenomeLoc referenceLoc = null; + private final GenomeLocParser genomeLocParser; + public final boolean isActive; + + public ActiveRegion( final GenomeLoc loc, final boolean isActive, final GenomeLocParser genomeLocParser ) { + this.loc = loc; + this.isActive = isActive; + this.genomeLocParser = genomeLocParser; + referenceLoc = loc; + } + + // add each read to the bin and extend the reference genome loc if needed + public void add( final GATKSAMRecord read, final boolean isPrimaryRegion ) { + referenceLoc = referenceLoc.union( genomeLocParser.createGenomeLoc( read ) ); + reads.add( new ActiveRead(read, isPrimaryRegion) ); + } + + public ArrayList getReads() { return reads; } + + public byte[] getReference( final IndexedFastaSequenceFile referenceReader ) { + // set up the reference if we haven't done so yet + if ( reference == null ) { + reference = referenceReader.getSubsequenceAt(referenceLoc.getContig(), referenceLoc.getStart(), referenceLoc.getStop()).getBases(); + } + + return reference; + } + + public GenomeLoc getLocation() { return loc; } + + public GenomeLoc getReferenceLocation() { return referenceLoc; } + + public int size() { return reads.size(); } +} \ No newline at end of file