A test walker that does consensus compression of deep read data sets.

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5702 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2011-04-27 22:00:48 +00:00
parent 3907377f37
commit 6cce3e00f3
6 changed files with 240 additions and 69 deletions

View File

@ -12,11 +12,11 @@ import java.util.Iterator;
* Time: 8:49 AM
* To change this template use File | Settings | File Templates.
*/
public interface ConsensusReadCompressor extends Iterable<SAMRecord> {
void addAlignment(SAMRecord read);
public interface ConsensusReadCompressor { // extends Iterable<SAMRecord> {
Iterable<SAMRecord> addAlignment(SAMRecord read);
Iterable<SAMRecord> close();
Collection<SAMRecord> consensusReads();
@Override
Iterator<SAMRecord> iterator();
//Collection<SAMRecord> consensusReads();
// @Override
// Iterator<SAMRecord> iterator();
}

View File

@ -49,32 +49,32 @@ public class MultiSampleConsensusReadCompressor implements ConsensusReadCompress
public MultiSampleConsensusReadCompressor(SAMFileHeader header,
final int readContextSize,
final GenomeLocParser glParser,
final String contig) {
final String contig,
final int minBpForRunningConsensus,
final int maxReadsAtVariableSites) {
for ( String name : SampleUtils.getSAMFileSamples(header) ) {
compressorsPerSample.put(name,
new SingleSampleConsensusReadCompressor(readContextSize, glParser, contig));
new SingleSampleConsensusReadCompressor(readContextSize,
glParser, contig, minBpForRunningConsensus, maxReadsAtVariableSites));
// todo -- argument for minConsensusSize
}
}
@Override
public void addAlignment(SAMRecord read) {
public Iterable<SAMRecord> addAlignment(SAMRecord read) {
String sample = read.getReadGroup().getSample();
SingleSampleConsensusReadCompressor compressor = compressorsPerSample.get(sample);
if ( compressor == null )
throw new ReviewedStingException("No compressor for sample " + sample);
compressor.addAlignment(read);
return compressor.addAlignment(read);
}
@Override
public Collection<SAMRecord> consensusReads() {
public Iterable<SAMRecord> close() {
List<SAMRecord> reads = new LinkedList<SAMRecord>();
for ( SingleSampleConsensusReadCompressor comp : compressorsPerSample.values() )
reads.addAll(comp.consensusReads());
for ( SAMRecord read : comp.close() )
reads.add(read);
return reads;
}
@Override
public Iterator<SAMRecord> iterator() {
return consensusReads().iterator();
}
}

View File

@ -32,6 +32,7 @@ import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.io.StingSAMFileWriter;
import org.broadinstitute.sting.gatk.refdata.ReadMetaDataTracker;
import org.broadinstitute.sting.gatk.walkers.ReadWalker;
import org.broadinstitute.sting.utils.sam.ReadUtils;
import java.io.*;
import java.util.ArrayList;
@ -43,7 +44,7 @@ import java.util.Set;
* User: depristo
* Date: April 7, 2011
*/
public class ReduceReadsWalker extends ReadWalker<SAMRecord, Collection<SAMRecord>> {
public class ReduceReadsWalker extends ReadWalker<SAMRecord, ConsensusReadCompressor> {
@Output
protected StingSAMFileWriter out;
@ -59,7 +60,14 @@ public class ReduceReadsWalker extends ReadWalker<SAMRecord, Collection<SAMRecor
@Argument(fullName = "useRead", shortName = "UR", doc = "", required = false)
protected Set<String> readNamesToUse;
@Argument(fullName = "minBpForRunningConsensus", shortName = "mbrc", doc = "", required = false)
protected int minBpForRunningConsensus = 1000;
@Argument(fullName = "maxReadsAtVariableSites", shortName = "mravs", doc = "", required = false)
protected int maxReadsAtVariableSites = 500;
protected int totalReads = 0;
int nCompressedReads = 0;
@Override
public void initialize() {
@ -80,8 +88,10 @@ public class ReduceReadsWalker extends ReadWalker<SAMRecord, Collection<SAMRecor
* @return SAMFileWriter, set to the BAM output file if the command line option was set, null otherwise
*/
@Override
public Collection<SAMRecord> reduceInit() {
return new ArrayList<SAMRecord>();
public ConsensusReadCompressor reduceInit() {
return new MultiSampleConsensusReadCompressor(getToolkit().getSAMFileHeader(),
contextSize, getToolkit().getGenomeLocParser(), "20", // todo fixme
minBpForRunningConsensus, maxReadsAtVariableSites);
}
/**
@ -89,35 +99,32 @@ public class ReduceReadsWalker extends ReadWalker<SAMRecord, Collection<SAMRecor
* @param read the read itself
* @return the SAMFileWriter, so that the next reduce can emit to the same source
*/
public Collection<SAMRecord> reduce( SAMRecord read, Collection<SAMRecord> reads ) {
if ( readNamesToUse == null || readNamesToUse.contains(read.getReadName()) )
reads.add(read);
return reads;
public ConsensusReadCompressor reduce( SAMRecord read, ConsensusReadCompressor comp ) {
if ( readNamesToUse == null || readNamesToUse.contains(read.getReadName()) ) {
if ( INCLUDE_RAW_READS )
out.addAlignment(read);
// write out compressed reads as they become available
for ( SAMRecord consensusRead : comp.addAlignment(read) ) {
out.addAlignment(consensusRead);
nCompressedReads++;
}
}
return comp;
}
@Override
public void onTraversalDone( Collection<SAMRecord> reads ) {
String contig = reads.iterator().next().getReferenceName();
ConsensusReadCompressor compressor =
new MultiSampleConsensusReadCompressor(getToolkit().getSAMFileHeader(),
contextSize, getToolkit().getGenomeLocParser(), contig);
// add all of the reads to the compressor
for ( SAMRecord read : reads ) {
if ( INCLUDE_RAW_READS )
out.addAlignment(read);
compressor.addAlignment(read);
}
// compress the reads
public void onTraversalDone( ConsensusReadCompressor compressor ) {
//compressor.writeConsensusBed(bedOut);
int nCompressedReads = 0;
for ( SAMRecord read : compressor ) {
out.addAlignment(read);
// write out any remaining reads
for ( SAMRecord consensusRead : compressor.close() ) {
out.addAlignment(consensusRead);
nCompressedReads++;
}
logger.info("Compressed reads : " + nCompressedReads);
double percent = (100.0 * nCompressedReads) / totalReads;
logger.info("Compressed reads : " + nCompressedReads + String.format(" (%.2f%%)", percent));
logger.info("Total reads : " + totalReads);
}

View File

@ -23,6 +23,8 @@ public class RefPileupElement extends PileupElement {
public RefPileupElement(SAMRecord read, int offset, int refOffset) {
super(read, offset);
this.refOffset = refOffset;
if ( refOffset < 0 )
throw new ReviewedStingException("Bad RefPileupElement: ref offset < 0: " + refOffset + " for read " + read);
}
public int getRefOffset() {
@ -52,15 +54,18 @@ public class RefPileupElement extends PileupElement {
break;
case I :
for ( int i = 0; i < l; i++)
elts.add(new RefPileupElement(read, readI++, refI));
if ( refI >= 0 )
elts.add(new RefPileupElement(read, readI++, refI));
break;
case D :
for ( int i = 0; i < l; i++)
elts.add(new RefPileupElement(read, -1, refI++));
if ( refI >= 0 )
elts.add(new RefPileupElement(read, -1, refI++));
break;
case M :
for ( int i = 0; i < l; i++)
elts.add(new RefPileupElement(read, readI++, refI++));
if ( refI >= 0 )
elts.add(new RefPileupElement(read, readI++, refI++));
break;
default:
throw new ReviewedStingException("BUG: Unexpected CIGAR element " + elt + " in read " + read.getReadName());

View File

@ -11,6 +11,8 @@ import org.broadinstitute.sting.utils.clipreads.ClippingRepresentation;
import org.broadinstitute.sting.utils.clipreads.ReadClipper;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.pileup.PileupElement;
import org.broadinstitute.sting.utils.sam.AlignmentUtils;
import org.broadinstitute.sting.utils.sam.ReadUtils;
import java.io.PrintStream;
import java.util.*;
@ -52,63 +54,150 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres
private static final boolean DEBUG = false;
private static final boolean INVERT = false;
private static final boolean PRINT_CONSENSUS_READS = false;
private static final int CYCLES_BEFORE_RETRY = 1000;
private static final double MAX_FRACTION_DISAGREEING_BASES = 0.1;
private static final ClippingRepresentation VARIABLE_READ_REPRESENTATION = ClippingRepresentation.SOFTCLIP_BASES;
private static final double MIN_FRACT_BASES_FOR_VARIABLE_READ = 0.33; // todo -- should be variable
// todo -- should merge close together spans
/** The place where we ultimately write out our records */
Queue<SAMRecord> waitingReads = new LinkedList<SAMRecord>();
final int readContextSize;
final int maxReadsAtVariableSites = 50;
long nCompressedReads = 0;
final int maxReadsAtVariableSites;
final int minBpForRunningConsensus;
int retryTimer = 0;
final String contig;
final GenomeLocParser glParser;
SAMFileHeader header;
// todo -- require minimum read size of 2 bp in variable region
GenomeLoc lastProcessedRegion = null;
public SingleSampleConsensusReadCompressor(final int readContextSize,
final GenomeLocParser glParser,
final String contig) {
final String contig,
final int minBpForRunningConsensus,
final int maxReadsAtVariableSites) {
this.readContextSize = readContextSize;
this.glParser = glParser;
this.contig = contig;
this.minBpForRunningConsensus = minBpForRunningConsensus;
this.maxReadsAtVariableSites = maxReadsAtVariableSites;
}
// ------------------------------------------------------------------------------------------
//
// public interface functions
//
// ------------------------------------------------------------------------------------------
/**
* @{inheritDoc}
*/
public void addAlignment( SAMRecord read ) {
@Override
public Iterable<SAMRecord> addAlignment( SAMRecord read ) {
if ( header == null )
header = read.getHeader();
if ( ! waitingReads.isEmpty() && read.getAlignmentStart() < waitingReads.peek().getAlignmentStart() )
throw new ReviewedStingException(
String.format("Adding read %s starting at %d before current queue head start position %d",
read.getReadName(), read.getAlignmentStart(), waitingReads.peek().getAlignmentStart()));
Collection<SAMRecord> result = Collections.emptyList();
if ( retryTimer == 0 ) {
if ( chunkReadyForConsensus(read) ) {
result = consensusReads(false);
}
} else {
//logger.info("Retry: " + retryTimer);
retryTimer--;
}
if ( ! read.getDuplicateReadFlag() && ! read.getNotPrimaryAlignmentFlag() && ! read.getReadUnmappedFlag() )
waitingReads.add(read);
}
public void writeConsensusBed(PrintStream bedOut) {
for ( ConsensusSite site : calculateConsensusSites(waitingReads) ) {
GenomeLoc loc = site.getLoc();
bedOut.printf("%s\t%d\t%d\t%s%n", loc.getContig(), loc.getStart()-1, loc.getStop(), site.counts);
}
return result;
}
@Override
public Iterator<SAMRecord> iterator() {
return consensusReads().iterator();
public Iterable<SAMRecord> close() {
return consensusReads(true);
}
public Collection<SAMRecord> consensusReads() {
// ------------------------------------------------------------------------------------------
//
// private implementation functions
//
// ------------------------------------------------------------------------------------------
private boolean chunkReadyForConsensus(SAMRecord read) {
if ( ! waitingReads.isEmpty() ) {
List<ConsensusSite> sites = calculateConsensusSites(waitingReads);
List<ConsensusSpan> spans = calculateSpans(sites);
return consensusReadsFromSitesAndSpans(sites, spans);
SAMRecord firstRead = waitingReads.iterator().next();
int refStart = firstRead.getAlignmentStart();
int refEnd = read.getAlignmentStart();
int size = refEnd - refStart;
return size > minBpForRunningConsensus;
} else
return false;
}
//
// public void writeConsensusBed(PrintStream bedOut) {
// for ( ConsensusSite site : calculateConsensusSites(waitingReads) ) {
// GenomeLoc loc = site.getLoc();
// bedOut.printf("%s\t%d\t%d\t%s%n", loc.getContig(), loc.getStart()-1, loc.getStop(), site.counts);
// }
// }
private Collection<SAMRecord> consensusReads(boolean useAllRemainingReads) {
if ( ! waitingReads.isEmpty() ) {
logger.info("Calculating consensus reads");
List<ConsensusSite> sites = calculateConsensusSites(waitingReads, useAllRemainingReads, lastProcessedRegion);
List<ConsensusSpan> rawSpans = calculateSpans(sites);
List<ConsensusSpan> spans = useAllRemainingReads ? rawSpans : excludeFinalSpan(rawSpans);
if ( ! spans.isEmpty() ) {
lastProcessedRegion = spannedRegion(spans);
logger.info("Processing region: " + lastProcessedRegion);
updateWaitingReads(sites, spans);
return consensusReadsFromSitesAndSpans(sites, spans);
} else {
logger.info("Danger, spans is empty, may experience poor performance at: " + spannedRegion(rawSpans));
retryTimer = CYCLES_BEFORE_RETRY;
return Collections.emptyList();
}
} else {
return Collections.EMPTY_LIST;
return Collections.emptyList();
}
}
private static final List<ConsensusSpan> excludeFinalSpan(List<ConsensusSpan> rawSpans) {
logger.info("Dropping final, potentially incomplete span: " + rawSpans.get(rawSpans.size()-1));
return rawSpans.subList(0, rawSpans.size() - 1);
}
private static final GenomeLoc spannedRegion(List<ConsensusSpan> spans) {
GenomeLoc region = spans.get(0).loc;
for ( ConsensusSpan span : spans )
region = region.merge(span.loc);
return region;
}
private void updateWaitingReads(List<ConsensusSite> sites, List<ConsensusSpan> spans) {
ConsensusSpan lastSpan = spans.get(spans.size() - 1);
Set<SAMRecord> unprocessedReads = new HashSet<SAMRecord>();
for ( ConsensusSite site : sites.subList(lastSpan.getOffsetFromStartOfSites() + 1, sites.size()) ) {
for ( PileupElement p : site.getOverlappingReads() )
unprocessedReads.add(p.getRead());
}
logger.info(String.format("Updating waiting reads: old=%d reads, new=%d reads", waitingReads.size(), unprocessedReads.size()));
waitingReads = new LinkedList<SAMRecord>(ReadUtils.coordinateSortReads(new ArrayList<SAMRecord>(unprocessedReads)));
}
private List<ConsensusSite> expandVariableSites(List<ConsensusSite> sites) {
for ( ConsensusSite site : sites )
site.setMarkedType(ConsensusType.CONSERVED);
@ -169,11 +258,14 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres
}
private List<ConsensusSite> calculateConsensusSites(Collection<SAMRecord> reads) {
private List<ConsensusSite> calculateConsensusSites(Collection<SAMRecord> reads, boolean useAllRemainingReads, GenomeLoc lastProcessedRegion) {
SAMRecord firstRead = reads.iterator().next();
int refStart = firstRead.getAlignmentStart();
int minStart = lastProcessedRegion == null ? -1 : lastProcessedRegion.getStop() + 1;
int refStart = Math.max(firstRead.getAlignmentStart(), minStart);
int refEnd = furtherestEnd(reads);
logger.info("Calculating sites for region " + refStart + " to " + refEnd);
// set up the consensus site array
List<ConsensusSite> consensusSites = new ArrayList<ConsensusSite>();
int len = refEnd - refStart + 1;
@ -207,6 +299,9 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres
return reads;
}
// todo -- should be smart -- should take reads in some priority order
// todo -- by length, and by strand, ideally. Perhaps alternating by strand
// todo -- in order of length?
private Collection<SAMRecord> downsample(Collection<SAMRecord> reads) {
if ( reads.size() > maxReadsAtVariableSites ) {
List<SAMRecord> readArray = new ArrayList<SAMRecord>(reads);
@ -252,13 +347,16 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres
private Collection<SAMRecord> variableSpanReads(List<ConsensusSite> sites, ConsensusSpan span) {
Set<SAMRecord> reads = new HashSet<SAMRecord>();
int minNumBasesInRead = (int)Math.floor(MIN_FRACT_BASES_FOR_VARIABLE_READ * span.size());
for ( int i = 0; i < span.size(); i++ ) {
int refI = i + span.getOffsetFromStartOfSites();
ConsensusSite site = sites.get(refI);
for ( PileupElement p : site.getOverlappingReads() ) {
// if ( reads.contains(p.getRead()))
// logger.info("Skipping already added read: " + p.getRead().getReadName());
reads.add(clipReadToSpan(p.getRead(), span));
SAMRecord read = clipReadToSpan(p.getRead(), span);
if ( read.getReadLength() >= minNumBasesInRead )
reads.add(read);
}
}
@ -281,7 +379,8 @@ public class SingleSampleConsensusReadCompressor implements ConsensusReadCompres
}
}
return clipper.clipRead(VARIABLE_READ_REPRESENTATION);
SAMRecord softClipped = clipper.clipRead(VARIABLE_READ_REPRESENTATION);
return ReadUtils.hardClipSoftClippedBases(softClipped);
}
private static int furtherestEnd(Collection<SAMRecord> reads) {

View File

@ -364,6 +364,65 @@ public class ReadUtils {
return rec;
}
/**
* Hard clips away (i.e.g, removes from the read) bases that were previously soft clipped.
*
* @param rec
* @return
*/
public static SAMRecord hardClipSoftClippedBases(SAMRecord rec) {
List<CigarElement> cigarElts = rec.getCigar().getCigarElements();
if ( cigarElts.size() == 1 ) // can't be soft clipped, just return
return rec;
int basesStart = 0;
List<CigarElement> newCigarElements = new LinkedList<CigarElement>();
int basesToClip = 0;
for ( int i = 0; i < cigarElts.size(); i++ ) {
CigarElement ce = cigarElts.get(i);
int l = ce.getLength();
switch ( ce.getOperator() ) {
case S:
basesToClip += l;
if ( i == 0 ) basesStart += l;
newCigarElements.add(new CigarElement(l, CigarOperator.HARD_CLIP));
break;
case H:
// TODO -- must be handled specially
throw new ReviewedStingException("BUG: tell mark he forgot to implement this");
default:
newCigarElements.add(ce);
break;
}
}
if ( basesToClip > 0 ) {
try {
rec = SimplifyingSAMFileWriter.simplifyRead((SAMRecord)rec.clone());
// copy over the unclipped bases
final byte[] bases = rec.getReadBases();
final byte[] quals = rec.getBaseQualities();
int newLength = bases.length - basesToClip;
byte[] newBases = new byte[newLength];
byte[] newQuals = new byte[newLength];
System.arraycopy(bases, basesStart, newBases, 0, newLength);
System.arraycopy(quals, basesStart, newQuals, 0, newLength);
rec.setReadBases(newBases);
rec.setBaseQualities(newQuals);
// now add a CIGAR element for the clipped bases
Cigar newCigar = new Cigar(newCigarElements);
rec.setCigar(newCigar);
} catch ( CloneNotSupportedException e ) {
throw new ReviewedStingException("WTF, where did clone go?", e);
}
}
return rec;
}
private static int DEFAULT_ADAPTOR_SIZE = 100;
/**
@ -425,9 +484,10 @@ public class ReadUtils {
* @param reads
* @return
*/
public final static void coordinateSortReads(List<SAMRecord> reads) {
public final static List<SAMRecord> coordinateSortReads(List<SAMRecord> reads) {
final SAMRecordComparator comparer = new SAMRecordCoordinateComparator();
Collections.sort(reads, comparer);
return reads;
}
public final static int getFirstInsertionOffset(SAMRecord read) {