Updated phasing pipeline to properly read samples from VCF and BAM files

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@5172 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
fromer 2011-02-02 07:16:05 +00:00
parent 43fb11b923
commit 3839fd1a25
2 changed files with 93 additions and 77 deletions

View File

@ -96,7 +96,9 @@ public class PhasingEval extends RodWalker<Integer, Integer> {
int homalt = vc.getHomVarCount(); int homalt = vc.getHomVarCount();
int het = vc.getHetCount(); int het = vc.getHetCount();
int ac = 2 * homalt + het; int ac = 2 * homalt + het;
//int an = 2 * (homref + homalt + het); //int an = 2 * (homref + homalt + het);
PhasingByAC data = phasingByACs.get(ac); PhasingByAC data = phasingByACs.get(ac);
data.nHets += het > 0 ? 1 : 0; data.nHets += het > 0 ? 1 : 0;
data.nHetsPhased += isPhysicallyPhased(vc.getGenotypes().values()) ? 1 : 0; data.nHetsPhased += isPhysicallyPhased(vc.getGenotypes().values()) ? 1 : 0;
@ -125,10 +127,10 @@ public class PhasingEval extends RodWalker<Integer, Integer> {
public void onTraversalDone(Integer sum) { public void onTraversalDone(Integer sum) {
if (analysis == Analysis.PHASING_BY_AC) { if (analysis == Analysis.PHASING_BY_AC) {
out.println(Utils.join("\t", Arrays.asList("ac", "nhets", "nhetphased"))); out.println(Utils.join("\t", Arrays.asList("ac", "an", "nhets", "nhetphased")));
for (PhasingByAC pac : phasingByACs) { for (PhasingByAC pac : phasingByACs) {
out.printf("%d\t%d\t%d%n", pac.myAC, pac.nHets, pac.nHetsPhased); out.printf("%d\t%d\t%d\t%d%n", pac.myAC, pac.myAN, pac.nHets, pac.nHetsPhased);
} }
} }
} }
} }

View File

@ -2,41 +2,39 @@ import org.broadinstitute.sting.queue.extensions.picard.PicardBamJarFunction
import org.broadinstitute.sting.queue.extensions.gatk._ import org.broadinstitute.sting.queue.extensions.gatk._
import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction import org.broadinstitute.sting.queue.extensions.samtools.SamtoolsIndexFunction
import org.broadinstitute.sting.queue.QScript import org.broadinstitute.sting.queue.QScript
import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.FilenameUtils
import tools.nsc.io.Process;
import org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType import org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType
import scala.io.Source._ import scala.io.Source._
class PhaseSamples extends QScript { class PhaseSamples extends QScript {
qscript => qscript =>
@Input(doc="bam input, as .bam or as a list of files", shortName="I", required=true) @Input(doc = "bam input, as .bam or as a list of files", shortName = "I", required = true)
var bams: File = _ var bams: File = _
@Input(doc="master VCF calls", shortName="C", required=true) @Input(doc = "master VCF calls", shortName = "C", required = true)
var masterCalls: File = _ var masterCalls: File = _
@Input(doc="file containing samples to be phased, as many as you like per line", shortName="samples", required=true) @Argument(doc = "gatk jar file", shortName = "J", required = true)
var samplesFile: File = _
@Argument(doc="gatk jar file", shortName="J", required=true)
var gatkJarFile: File = _ var gatkJarFile: File = _
@Argument(shortName = "R", doc="ref", required=true) @Argument(shortName = "R", doc = "ref", required = true)
var referenceFile: File = _ var referenceFile: File = _
@Argument(fullName = "prefix", doc="Prefix argument", required=false) @Argument(fullName = "prefix", doc = "Prefix argument", required = false)
var prefix: String = "" var prefix: String = ""
@Argument(shortName = "L", doc="Intervals", required=false) @Argument(shortName = "L", doc = "Intervals", required = false)
var intervals: String = _ var intervals: String = _
@Input(doc="level of parallelism for BAM phaser. By default is set to 0 [no scattering].", shortName="scatter", required=false) @Input(doc = "level of parallelism for BAM phaser. By default is set to 0 [no scattering].", shortName = "scatter", required = false)
var scatterCount = 0 var scatterCount = 0
@Input(doc="Samples to phase together. By default is set to 1 [one job per sample].", shortName="samplesPerJob", required=false) @Input(doc = "Samples to phase together. By default is set to 1 [one job per sample].", shortName = "samplesPerJob", required = false)
var samplesPerJob = 1 var samplesPerJob = 1
@Input(doc="Phased file to output", shortName="o", required=true) @Input(doc = "Phased file to output", shortName = "o", required = true)
var outputPhased: File = _ var outputPhased: File = _
trait CommandLineGATKArgs extends CommandLineGATK { trait CommandLineGATKArgs extends CommandLineGATK {
@ -47,93 +45,103 @@ class PhaseSamples extends QScript {
this.logging_level = "INFO" this.logging_level = "INFO"
} }
// A target has a list of samples and bam files to use for phasing // A target has a list of samples and bam files to use for phasing
class Target(val name: String, val samples: List[String], val bams: List[File]) { class Target(val name: String, val samples: List[String], val bams: List[File]) {
def phasedVCFFile = new File(name + "." + outputPhased) def phasedVCFFile = new File(name + "." + outputPhased)
override def toString(): String = String.format("[Target %s with samples %s against bams %s]", name, samples, bams) override def toString(): String = String.format("[Target %s with samples %s against bams %s]", name, samples, bams)
} }
def script = { def script = {
if ( qscript.scatterCount > 0 ) throw new RuntimeException("scatter/gather currently not implemented") if (qscript.scatterCount > 0) throw new RuntimeException("scatter/gather currently not implemented")
val samples: List[String] = getSamples(samplesFile) val samples: List[String] = getSamples()
Console.out.printf("Samples are %s%n", samples) Console.out.printf("Samples are %s%n", samples)
val targets: List[Target] = bamsToTargets(samples, bams) val targets: List[Target] = bamsToTargets(samples, bams)
for (target <- targets) { for (target <- targets) {
Console.out.printf("Target is %s%n", target) Console.out.printf("Target is %s%n", target)
add(new PhaseSamples(target)) add(new PhaseSamples(target))
} }
add(new CombineVariants(targets.map(_.phasedVCFFile))) add(new CombineVariants(targets.map(_.phasedVCFFile)))
}
def bamsToTargets(samples: List[String], bamsIn: File): List[Target] = { add(new PhasingByACeval())
}
def bamsToTargets(samples: List[String], bamsIn: File): List[Target] = {
val bams: List[File] = parseBamsInput(bamsIn) val bams: List[File] = parseBamsInput(bamsIn)
val sampleToBams: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = getMapOfBamsForSample(bams)
def buildTargets(samples: List[String], count: Int): List[Target] = (samples splitAt samplesPerJob) match { def buildTargets(samples: List[String], count: Int): List[Target] = (samples splitAt samplesPerJob) match {
case (Nil, y) => case (Nil, y) =>
return Nil return Nil
case (subsamples, remaining) => case (subsamples, remaining) =>
return new Target("group" + count, subsamples, findBamsForSamples(subsamples, bams)) :: return new Target("group" + count, subsamples, findBamsForSamples(subsamples, sampleToBams)) ::
buildTargets(remaining, count + 1) buildTargets(remaining, count + 1)
} }
return buildTargets(samples, 0) return buildTargets(samples, 0)
} }
// determines the bam files to use for these samples. If there's only a single bam, assumes this file def getMapOfBamsForSample(bams: List[File]): scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = bams match {
// is a merge of all sample bams. If there are multiple bams, for each sample, finds all bam files case Nil => return scala.collection.mutable.Map.empty[String, scala.collection.mutable.Set[File]]
// containing sample in their path (should really check header?). The phasing bams are the merge of
// these files
def findBamsForSamples( samples: List[String], bams: List[File] ): List[File] = bams match {
case x :: Nil => return bams
case _ =>
return findBamsForSamplesHelper(samples, bams)
}
def findBamsForSamplesHelper( samples: List[String], bams: List[File] ): List[File] = bams match {
case Nil => Nil
case x :: y => case x :: y =>
val line: File = matchSamplesInSingleBam(x, samples) val m: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]] = getMapOfBamsForSample(y)
val restOfList: List[File] = findBamsForSamplesHelper(samples, y)
if (line != null)
line :: restOfList
else
restOfList
}
// If the line (bam file name) contains ANY sample match, we use it: val getBamSamplesCommand: String = "samtools view -H " + x.getPath() + " | grep '^@RG' | awk '{for (i = 1; i <= NF; i++) if (substr($i,1,3) == \"SM:\") print substr($i,4)}' | sort | uniq"
def matchSamplesInSingleBam( line : File, samples: List[String] ) : File = samples match { //println("getBamSamplesCommand: " + getBamSamplesCommand)
case Nil => null val bamSamples: List[String] = Process(getBamSamplesCommand).iterator toList
case x :: y => if (line.getName().contains(x)) line else matchSamplesInSingleBam(line, y)
}
// todo -- allow us to get the master list of samples from the VCF for convenience for (s <- bamSamples) {
def getSamples(samplesFile: File): List[String] = { if (!m.contains(s))
return (for { line <- fromFile(samplesFile).getLines m += s -> scala.collection.mutable.Set.empty[File]
part <- line.split("\\s", 0).iterator }
yield part ).toList
}
def parseBamsInput(bamsIn: File): List[File] = FilenameUtils.getExtension(bamsIn.getPath) match { m(s) = m(s) + x
}
return m
}
def findBamsForSamples(samples: List[String], sampleToBams: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]]): List[File] = {
val l: List[File] = Nil
l ++ findBamsForSamplesHelper(samples, sampleToBams)
}
def findBamsForSamplesHelper(samples: List[String], sampleToBams: scala.collection.mutable.Map[String, scala.collection.mutable.Set[File]]): scala.collection.mutable.Set[File] = samples match {
case Nil => scala.collection.mutable.Set.empty[File]
case x :: y =>
var bamsForSampleX: scala.collection.mutable.Set[File] = scala.collection.mutable.Set.empty[File]
if (sampleToBams.contains(x))
bamsForSampleX = sampleToBams(x)
return bamsForSampleX ++ findBamsForSamplesHelper(y, sampleToBams)
}
def getSamples(): List[String] = {
val getSamplesCommand: String = "cat " + masterCalls.getPath() + " | grep '^#CHROM' | head -1 | awk '{for (i = 10; i <= NF; i++) print $i}'"
//println("getSamplesCommand: " + getSamplesCommand)
return Process(getSamplesCommand).iterator toList
}
def parseBamsInput(bamsIn: File): List[File] = FilenameUtils.getExtension(bamsIn.getPath) match {
case "bam" => return List(bamsIn) case "bam" => return List(bamsIn)
case "list" => return (for( line <- fromFile(bamsIn).getLines) yield new File(line)).toList case "list" => return (for (line <- fromFile(bamsIn).getLines) yield new File(line)).toList
case _ => throw new RuntimeException("Unexpected BAM input type: " + bamsIn + "; only permitted extensions are .bam and .list") case _ => throw new RuntimeException("Unexpected BAM input type: " + bamsIn + "; only permitted extensions are .bam and .list")
} }
class PhaseSamples(t: Target) extends org.broadinstitute.sting.queue.extensions.gatk.ReadBackedPhasing with CommandLineGATKArgs { class PhaseSamples(t: Target) extends org.broadinstitute.sting.queue.extensions.gatk.ReadBackedPhasing with CommandLineGATKArgs {
this.rodBind :+= RodBind("variant", "VCF", qscript.masterCalls) this.rodBind :+= RodBind("variant", "VCF", qscript.masterCalls)
this.out = t.phasedVCFFile this.out = t.phasedVCFFile
this.input_file = t.bams this.input_file = t.bams
this.sampleToPhase = t.samples this.sampleToPhase = t.samples
} }
class CombineVariants(vcfsToCombine: List[File]) extends org.broadinstitute.sting.queue.extensions.gatk.CombineVariants with CommandLineGATKArgs { class CombineVariants(vcfsToCombine: List[File]) extends org.broadinstitute.sting.queue.extensions.gatk.CombineVariants with CommandLineGATKArgs {
for ( vcf <- vcfsToCombine ) { for (vcf <- vcfsToCombine) {
this.rodBind :+= RodBind(vcf.getName, "VCF", vcf) this.rodBind :+= RodBind(vcf.getName, "VCF", vcf)
} }
// add the master call: // add the master call:
@ -141,6 +149,12 @@ class CombineVariants(vcfsToCombine: List[File]) extends org.broadinstitute.stin
this.variantMergeOptions = Some(org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.MASTER) this.variantMergeOptions = Some(org.broadinstitute.sting.gatk.contexts.variantcontext.VariantContextUtils.VariantMergeType.MASTER)
this.out = outputPhased this.out = outputPhased
} }
} class PhasingByACeval() extends org.broadinstitute.sting.queue.extensions.gatk.PhasingEval with CommandLineGATKArgs {
this.analysis = org.broadinstitute.sting.oneoffprojects.walkers.phasing.PhasingEval.Analysis.PHASING_BY_AC
this.out = new File("phasing_by_ac." + outputPhased + ".txt")
}
}