From 6cce3e00f32b77aa9009a09e19adaad047e0fda4 Mon Sep 17 00:00:00 2001 From: depristo Date: Wed, 27 Apr 2011 22:00:48 +0000 Subject: [PATCH] A test walker that does consensus compression of deep read data sets. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5702 348d0f76-0448-11de-a6fe-93d51630548a --- .../reducereads/ConsensusReadCompressor.java | 12 +- .../MultiSampleConsensusReadCompressor.java | 22 +-- .../reducereads/ReduceReadsWalker.java | 57 ++++--- .../walkers/reducereads/RefPileupElement.java | 11 +- .../SingleSampleConsensusReadCompressor.java | 145 +++++++++++++++--- .../sting/utils/sam/ReadUtils.java | 62 +++++++- 6 files changed, 240 insertions(+), 69 deletions(-) diff --git a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/ConsensusReadCompressor.java b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/ConsensusReadCompressor.java index fa91137e6..2ef27226a 100644 --- a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/ConsensusReadCompressor.java +++ b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/ConsensusReadCompressor.java @@ -12,11 +12,11 @@ import java.util.Iterator; * Time: 8:49 AM * To change this template use File | Settings | File Templates. */ -public interface ConsensusReadCompressor extends Iterable { - void addAlignment(SAMRecord read); +public interface ConsensusReadCompressor { // extends Iterable { + Iterable addAlignment(SAMRecord read); + Iterable close(); - Collection consensusReads(); - - @Override - Iterator iterator(); +//Collection consensusReads(); +// @Override +// Iterator iterator(); } diff --git a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/MultiSampleConsensusReadCompressor.java b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/MultiSampleConsensusReadCompressor.java index e79a8ad0c..bb3540906 100644 --- a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/MultiSampleConsensusReadCompressor.java +++ b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/MultiSampleConsensusReadCompressor.java @@ -49,32 +49,32 @@ public class MultiSampleConsensusReadCompressor implements ConsensusReadCompress public MultiSampleConsensusReadCompressor(SAMFileHeader header, final int readContextSize, final GenomeLocParser glParser, - final String contig) { + final String contig, + final int minBpForRunningConsensus, + final int maxReadsAtVariableSites) { for ( String name : SampleUtils.getSAMFileSamples(header) ) { compressorsPerSample.put(name, - new SingleSampleConsensusReadCompressor(readContextSize, glParser, contig)); + new SingleSampleConsensusReadCompressor(readContextSize, + glParser, contig, minBpForRunningConsensus, maxReadsAtVariableSites)); + // todo -- argument for minConsensusSize } } @Override - public void addAlignment(SAMRecord read) { + public Iterable addAlignment(SAMRecord read) { String sample = read.getReadGroup().getSample(); SingleSampleConsensusReadCompressor compressor = compressorsPerSample.get(sample); if ( compressor == null ) throw new ReviewedStingException("No compressor for sample " + sample); - compressor.addAlignment(read); + return compressor.addAlignment(read); } @Override - public Collection consensusReads() { + public Iterable close() { List reads = new LinkedList(); for ( SingleSampleConsensusReadCompressor comp : compressorsPerSample.values() ) - reads.addAll(comp.consensusReads()); + for ( SAMRecord read : comp.close() ) + reads.add(read); return reads; } - - @Override - public Iterator iterator() { - return consensusReads().iterator(); - } } diff --git a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/ReduceReadsWalker.java b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/ReduceReadsWalker.java index 6561fe121..e0984988a 100755 --- a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/ReduceReadsWalker.java +++ b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/ReduceReadsWalker.java @@ -32,6 +32,7 @@ import org.broadinstitute.sting.gatk.contexts.ReferenceContext; import org.broadinstitute.sting.gatk.io.StingSAMFileWriter; import org.broadinstitute.sting.gatk.refdata.ReadMetaDataTracker; import org.broadinstitute.sting.gatk.walkers.ReadWalker; +import org.broadinstitute.sting.utils.sam.ReadUtils; import java.io.*; import java.util.ArrayList; @@ -43,7 +44,7 @@ import java.util.Set; * User: depristo * Date: April 7, 2011 */ -public class ReduceReadsWalker extends ReadWalker> { +public class ReduceReadsWalker extends ReadWalker { @Output protected StingSAMFileWriter out; @@ -59,7 +60,14 @@ public class ReduceReadsWalker extends ReadWalker readNamesToUse; + @Argument(fullName = "minBpForRunningConsensus", shortName = "mbrc", doc = "", required = false) + protected int minBpForRunningConsensus = 1000; + + @Argument(fullName = "maxReadsAtVariableSites", shortName = "mravs", doc = "", required = false) + protected int maxReadsAtVariableSites = 500; + protected int totalReads = 0; + int nCompressedReads = 0; @Override public void initialize() { @@ -80,8 +88,10 @@ public class ReduceReadsWalker extends ReadWalker reduceInit() { - return new ArrayList(); + public ConsensusReadCompressor reduceInit() { + return new MultiSampleConsensusReadCompressor(getToolkit().getSAMFileHeader(), + contextSize, getToolkit().getGenomeLocParser(), "20", // todo fixme + minBpForRunningConsensus, maxReadsAtVariableSites); } /** @@ -89,35 +99,32 @@ public class ReduceReadsWalker extends ReadWalker reduce( SAMRecord read, Collection reads ) { - if ( readNamesToUse == null || readNamesToUse.contains(read.getReadName()) ) - reads.add(read); - return reads; + public ConsensusReadCompressor reduce( SAMRecord read, ConsensusReadCompressor comp ) { + if ( readNamesToUse == null || readNamesToUse.contains(read.getReadName()) ) { + if ( INCLUDE_RAW_READS ) + out.addAlignment(read); + + // write out compressed reads as they become available + for ( SAMRecord consensusRead : comp.addAlignment(read) ) { + out.addAlignment(consensusRead); + nCompressedReads++; + } + } + + return comp; } @Override - public void onTraversalDone( Collection reads ) { - String contig = reads.iterator().next().getReferenceName(); - ConsensusReadCompressor compressor = - new MultiSampleConsensusReadCompressor(getToolkit().getSAMFileHeader(), - contextSize, getToolkit().getGenomeLocParser(), contig); - - // add all of the reads to the compressor - for ( SAMRecord read : reads ) { - if ( INCLUDE_RAW_READS ) - out.addAlignment(read); - compressor.addAlignment(read); - } - - // compress the reads + public void onTraversalDone( ConsensusReadCompressor compressor ) { //compressor.writeConsensusBed(bedOut); - int nCompressedReads = 0; - for ( SAMRecord read : compressor ) { - out.addAlignment(read); + // write out any remaining reads + for ( SAMRecord consensusRead : compressor.close() ) { + out.addAlignment(consensusRead); nCompressedReads++; } - logger.info("Compressed reads : " + nCompressedReads); + double percent = (100.0 * nCompressedReads) / totalReads; + logger.info("Compressed reads : " + nCompressedReads + String.format(" (%.2f%%)", percent)); logger.info("Total reads : " + totalReads); } diff --git a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/RefPileupElement.java b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/RefPileupElement.java index f3e820a3c..edc0ab990 100755 --- a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/RefPileupElement.java +++ b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/RefPileupElement.java @@ -23,6 +23,8 @@ public class RefPileupElement extends PileupElement { public RefPileupElement(SAMRecord read, int offset, int refOffset) { super(read, offset); this.refOffset = refOffset; + if ( refOffset < 0 ) + throw new ReviewedStingException("Bad RefPileupElement: ref offset < 0: " + refOffset + " for read " + read); } public int getRefOffset() { @@ -52,15 +54,18 @@ public class RefPileupElement extends PileupElement { break; case I : for ( int i = 0; i < l; i++) - elts.add(new RefPileupElement(read, readI++, refI)); + if ( refI >= 0 ) + elts.add(new RefPileupElement(read, readI++, refI)); break; case D : for ( int i = 0; i < l; i++) - elts.add(new RefPileupElement(read, -1, refI++)); + if ( refI >= 0 ) + elts.add(new RefPileupElement(read, -1, refI++)); break; case M : for ( int i = 0; i < l; i++) - elts.add(new RefPileupElement(read, readI++, refI++)); + if ( refI >= 0 ) + elts.add(new RefPileupElement(read, readI++, refI++)); break; default: throw new ReviewedStingException("BUG: Unexpected CIGAR element " + elt + " in read " + read.getReadName()); diff --git a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/SingleSampleConsensusReadCompressor.java b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/SingleSampleConsensusReadCompressor.java index 3697f3f6c..ce91de1ab 100644 --- a/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/SingleSampleConsensusReadCompressor.java +++ b/java/src/org/broadinstitute/sting/oneoffprojects/walkers/reducereads/SingleSampleConsensusReadCompressor.java @@ -11,6 +11,8 @@ import org.broadinstitute.sting.utils.clipreads.ClippingRepresentation; import org.broadinstitute.sting.utils.clipreads.ReadClipper; import org.broadinstitute.sting.utils.exceptions.ReviewedStingException; import org.broadinstitute.sting.utils.pileup.PileupElement; +import org.broadinstitute.sting.utils.sam.AlignmentUtils; +import org.broadinstitute.sting.utils.sam.ReadUtils; import java.io.PrintStream; import java.util.*; @@ -52,63 +54,150 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres private static final boolean DEBUG = false; private static final boolean INVERT = false; private static final boolean PRINT_CONSENSUS_READS = false; + private static final int CYCLES_BEFORE_RETRY = 1000; private static final double MAX_FRACTION_DISAGREEING_BASES = 0.1; private static final ClippingRepresentation VARIABLE_READ_REPRESENTATION = ClippingRepresentation.SOFTCLIP_BASES; + private static final double MIN_FRACT_BASES_FOR_VARIABLE_READ = 0.33; // todo -- should be variable + + // todo -- should merge close together spans /** The place where we ultimately write out our records */ Queue waitingReads = new LinkedList(); final int readContextSize; - final int maxReadsAtVariableSites = 50; - long nCompressedReads = 0; + final int maxReadsAtVariableSites; + final int minBpForRunningConsensus; + int retryTimer = 0; final String contig; final GenomeLocParser glParser; SAMFileHeader header; - - // todo -- require minimum read size of 2 bp in variable region + GenomeLoc lastProcessedRegion = null; public SingleSampleConsensusReadCompressor(final int readContextSize, final GenomeLocParser glParser, - final String contig) { + final String contig, + final int minBpForRunningConsensus, + final int maxReadsAtVariableSites) { this.readContextSize = readContextSize; this.glParser = glParser; this.contig = contig; + this.minBpForRunningConsensus = minBpForRunningConsensus; + this.maxReadsAtVariableSites = maxReadsAtVariableSites; } + // ------------------------------------------------------------------------------------------ + // + // public interface functions + // + // ------------------------------------------------------------------------------------------ + /** * @{inheritDoc} */ - public void addAlignment( SAMRecord read ) { + @Override + public Iterable addAlignment( SAMRecord read ) { if ( header == null ) header = read.getHeader(); + if ( ! waitingReads.isEmpty() && read.getAlignmentStart() < waitingReads.peek().getAlignmentStart() ) + throw new ReviewedStingException( + String.format("Adding read %s starting at %d before current queue head start position %d", + read.getReadName(), read.getAlignmentStart(), waitingReads.peek().getAlignmentStart())); + + Collection result = Collections.emptyList(); + if ( retryTimer == 0 ) { + if ( chunkReadyForConsensus(read) ) { + result = consensusReads(false); + } + } else { + //logger.info("Retry: " + retryTimer); + retryTimer--; + } + if ( ! read.getDuplicateReadFlag() && ! read.getNotPrimaryAlignmentFlag() && ! read.getReadUnmappedFlag() ) waitingReads.add(read); - } - public void writeConsensusBed(PrintStream bedOut) { - for ( ConsensusSite site : calculateConsensusSites(waitingReads) ) { - GenomeLoc loc = site.getLoc(); - bedOut.printf("%s\t%d\t%d\t%s%n", loc.getContig(), loc.getStart()-1, loc.getStop(), site.counts); - } + return result; } @Override - public Iterator iterator() { - return consensusReads().iterator(); + public Iterable close() { + return consensusReads(true); } - public Collection consensusReads() { + // ------------------------------------------------------------------------------------------ + // + // private implementation functions + // + // ------------------------------------------------------------------------------------------ + + private boolean chunkReadyForConsensus(SAMRecord read) { if ( ! waitingReads.isEmpty() ) { - List sites = calculateConsensusSites(waitingReads); - List spans = calculateSpans(sites); - return consensusReadsFromSitesAndSpans(sites, spans); + SAMRecord firstRead = waitingReads.iterator().next(); + int refStart = firstRead.getAlignmentStart(); + int refEnd = read.getAlignmentStart(); + int size = refEnd - refStart; + return size > minBpForRunningConsensus; + } else + return false; + } + +// +// public void writeConsensusBed(PrintStream bedOut) { +// for ( ConsensusSite site : calculateConsensusSites(waitingReads) ) { +// GenomeLoc loc = site.getLoc(); +// bedOut.printf("%s\t%d\t%d\t%s%n", loc.getContig(), loc.getStart()-1, loc.getStop(), site.counts); +// } +// } + + private Collection consensusReads(boolean useAllRemainingReads) { + if ( ! waitingReads.isEmpty() ) { + logger.info("Calculating consensus reads"); + List sites = calculateConsensusSites(waitingReads, useAllRemainingReads, lastProcessedRegion); + List rawSpans = calculateSpans(sites); + List spans = useAllRemainingReads ? rawSpans : excludeFinalSpan(rawSpans); + + if ( ! spans.isEmpty() ) { + lastProcessedRegion = spannedRegion(spans); + logger.info("Processing region: " + lastProcessedRegion); + updateWaitingReads(sites, spans); + return consensusReadsFromSitesAndSpans(sites, spans); + } else { + logger.info("Danger, spans is empty, may experience poor performance at: " + spannedRegion(rawSpans)); + retryTimer = CYCLES_BEFORE_RETRY; + return Collections.emptyList(); + } } else { - return Collections.EMPTY_LIST; + return Collections.emptyList(); } } + private static final List excludeFinalSpan(List rawSpans) { + logger.info("Dropping final, potentially incomplete span: " + rawSpans.get(rawSpans.size()-1)); + return rawSpans.subList(0, rawSpans.size() - 1); + } + + private static final GenomeLoc spannedRegion(List spans) { + GenomeLoc region = spans.get(0).loc; + for ( ConsensusSpan span : spans ) + region = region.merge(span.loc); + return region; + } + + private void updateWaitingReads(List sites, List spans) { + ConsensusSpan lastSpan = spans.get(spans.size() - 1); + Set unprocessedReads = new HashSet(); + + for ( ConsensusSite site : sites.subList(lastSpan.getOffsetFromStartOfSites() + 1, sites.size()) ) { + for ( PileupElement p : site.getOverlappingReads() ) + unprocessedReads.add(p.getRead()); + } + + logger.info(String.format("Updating waiting reads: old=%d reads, new=%d reads", waitingReads.size(), unprocessedReads.size())); + waitingReads = new LinkedList(ReadUtils.coordinateSortReads(new ArrayList(unprocessedReads))); + } + private List expandVariableSites(List sites) { for ( ConsensusSite site : sites ) site.setMarkedType(ConsensusType.CONSERVED); @@ -169,11 +258,14 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres } - private List calculateConsensusSites(Collection reads) { + private List calculateConsensusSites(Collection reads, boolean useAllRemainingReads, GenomeLoc lastProcessedRegion) { SAMRecord firstRead = reads.iterator().next(); - int refStart = firstRead.getAlignmentStart(); + + int minStart = lastProcessedRegion == null ? -1 : lastProcessedRegion.getStop() + 1; + int refStart = Math.max(firstRead.getAlignmentStart(), minStart); int refEnd = furtherestEnd(reads); + logger.info("Calculating sites for region " + refStart + " to " + refEnd); // set up the consensus site array List consensusSites = new ArrayList(); int len = refEnd - refStart + 1; @@ -207,6 +299,9 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres return reads; } + // todo -- should be smart -- should take reads in some priority order + // todo -- by length, and by strand, ideally. Perhaps alternating by strand + // todo -- in order of length? private Collection downsample(Collection reads) { if ( reads.size() > maxReadsAtVariableSites ) { List readArray = new ArrayList(reads); @@ -252,13 +347,16 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres private Collection variableSpanReads(List sites, ConsensusSpan span) { Set reads = new HashSet(); + int minNumBasesInRead = (int)Math.floor(MIN_FRACT_BASES_FOR_VARIABLE_READ * span.size()); for ( int i = 0; i < span.size(); i++ ) { int refI = i + span.getOffsetFromStartOfSites(); ConsensusSite site = sites.get(refI); for ( PileupElement p : site.getOverlappingReads() ) { // if ( reads.contains(p.getRead())) // logger.info("Skipping already added read: " + p.getRead().getReadName()); - reads.add(clipReadToSpan(p.getRead(), span)); + SAMRecord read = clipReadToSpan(p.getRead(), span); + if ( read.getReadLength() >= minNumBasesInRead ) + reads.add(read); } } @@ -281,7 +379,8 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres } } - return clipper.clipRead(VARIABLE_READ_REPRESENTATION); + SAMRecord softClipped = clipper.clipRead(VARIABLE_READ_REPRESENTATION); + return ReadUtils.hardClipSoftClippedBases(softClipped); } private static int furtherestEnd(Collection reads) { diff --git a/java/src/org/broadinstitute/sting/utils/sam/ReadUtils.java b/java/src/org/broadinstitute/sting/utils/sam/ReadUtils.java index 0fa227fee..02a1bdb5d 100644 --- a/java/src/org/broadinstitute/sting/utils/sam/ReadUtils.java +++ b/java/src/org/broadinstitute/sting/utils/sam/ReadUtils.java @@ -364,6 +364,65 @@ public class ReadUtils { return rec; } + /** + * Hard clips away (i.e.g, removes from the read) bases that were previously soft clipped. + * + * @param rec + * @return + */ + public static SAMRecord hardClipSoftClippedBases(SAMRecord rec) { + List cigarElts = rec.getCigar().getCigarElements(); + + if ( cigarElts.size() == 1 ) // can't be soft clipped, just return + return rec; + + int basesStart = 0; + List newCigarElements = new LinkedList(); + int basesToClip = 0; + + for ( int i = 0; i < cigarElts.size(); i++ ) { + CigarElement ce = cigarElts.get(i); + int l = ce.getLength(); + switch ( ce.getOperator() ) { + case S: + basesToClip += l; + if ( i == 0 ) basesStart += l; + newCigarElements.add(new CigarElement(l, CigarOperator.HARD_CLIP)); + break; + case H: + // TODO -- must be handled specially + throw new ReviewedStingException("BUG: tell mark he forgot to implement this"); + default: + newCigarElements.add(ce); + break; + } + } + + if ( basesToClip > 0 ) { + try { + rec = SimplifyingSAMFileWriter.simplifyRead((SAMRecord)rec.clone()); + // copy over the unclipped bases + final byte[] bases = rec.getReadBases(); + final byte[] quals = rec.getBaseQualities(); + int newLength = bases.length - basesToClip; + byte[] newBases = new byte[newLength]; + byte[] newQuals = new byte[newLength]; + System.arraycopy(bases, basesStart, newBases, 0, newLength); + System.arraycopy(quals, basesStart, newQuals, 0, newLength); + rec.setReadBases(newBases); + rec.setBaseQualities(newQuals); + + // now add a CIGAR element for the clipped bases + Cigar newCigar = new Cigar(newCigarElements); + rec.setCigar(newCigar); + } catch ( CloneNotSupportedException e ) { + throw new ReviewedStingException("WTF, where did clone go?", e); + } + } + + return rec; + } + private static int DEFAULT_ADAPTOR_SIZE = 100; /** @@ -425,9 +484,10 @@ public class ReadUtils { * @param reads * @return */ - public final static void coordinateSortReads(List reads) { + public final static List coordinateSortReads(List reads) { final SAMRecordComparator comparer = new SAMRecordCoordinateComparator(); Collections.sort(reads, comparer); + return reads; } public final static int getFirstInsertionOffset(SAMRecord read) {