diff --git a/python/MergeBAMBatch.py b/python/MergeBAMBatch.py index e950aae8d..dcbffc8c5 100755 --- a/python/MergeBAMBatch.py +++ b/python/MergeBAMBatch.py @@ -6,10 +6,55 @@ from datetime import date import glob import operator import ValidateGATK +import picard_utils MERGE_BIN = '/seq/software/picard/current/bin/MergeSamFiles.jar' bam_ext = '.bam' +def readNAIdMap(NAIdFile): + m = dict() + for data in [line.split() for line in open(NAIdFile)]: + naid, pop = data[0:2] + print naid, ' => ', pop + assert naid not in m + m[naid] = pop + print 'Read NAID->population map' + print 'Contains', len(m), 'id -> population mappings' + print 'Distinct populations:', picard_utils.unique(m.values()) + return m + +class MergeFilesSpec: + def __init__(self, sources, pop, merged_filename_base ): + self.sourceFiles = sources + self.pop = pop + self.merged_filename_base = merged_filename_base + + def sources(self): + return self.sourceFiles + + def filename(self): + return self.merged_filename_base + '.' + self.pop + +def splitSourcesByPopulation(allSources, merged_filename_base, NAID2Pop): + if NAID2Pop == None: + return [MergeFilesSpec(allSources, '', merged_filename_base)] + else: + specs = dict() + for source in allSources: + spec = None + for naid, pop in NAID2Pop.iteritems(): + if source.find(naid) <> -1: + if pop in specs: + spec = specs[pop] + else: + spec = MergeFilesSpec([], pop, merged_filename_base) + specs[pop] = spec + #print 'Mapping', source, naid, pop + spec.sourceFiles.append(source) + if spec == None: + sys.exit('File contains an unknown NAID: ' + source) + return specs.values() + if __name__ == "__main__": usage = "usage: %prog [options]" parser = OptionParser(usage=usage) @@ -25,7 +70,10 @@ if __name__ == "__main__": parser.add_option("-m", "--mergeBin", dest="mergeBin", type="string", default=MERGE_BIN, help="Path to merge binary") - + parser.add_option("-n", "--naIDPops", dest="NAIDS2POP", + type="string", default=None, + help="Path to file contains NAID POP names. If provided, input files will be merged by population") + (OPTIONS, args) = parser.parse_args() if len(args) != 1: parser.error("incorrect number of arguments") @@ -35,23 +83,38 @@ if __name__ == "__main__": if not os.path.exists(directory): os.mkdir(directory) + NAID2Pop = None + if OPTIONS.NAIDS2POP <> None: + NAID2Pop = readNAIdMap(OPTIONS.NAIDS2POP) + today = date.today() time_stamp = today.isoformat() for line in open(args[0]): s = line.split() if ( s <> [] and s[0] <> '#' ): - merged_filename = s[0] - output = os.path.join(directory, merged_filename + '.stdout') - output_filename = os.path.join(directory, merged_filename + bam_ext) - output_index = output_filename + ".bai" - sources = reduce( operator.__add__, map( glob.glob, s[1:] ), [] ) - - if OPTIONS.ignoreExistingFiles or not os.path.exists(output_filename): - cmd = 'java -Xmx4096m -jar ' + OPTIONS.mergeBin + ' MSD=true AS=true SO=coordinate O=' + output_filename + ' VALIDATION_STRINGENCY=SILENT ' + ' I=' + (' I='.join(sources)) - print cmd - farm_commands.cmd(cmd, OPTIONS.farmQueue, output) + merged_filename_base = s[0] + allSources = reduce( operator.__add__, map( glob.glob, s[1:] ), [] ) + print 'Merging info:' + for mergeFilesSpec in splitSourcesByPopulation(allSources, merged_filename_base, NAID2Pop): + print '-----' + print ' Population', mergeFilesSpec.pop + print ' Filename', mergeFilesSpec.filename() + print ' N sources', len(mergeFilesSpec.sources()) + print ' sources', mergeFilesSpec.sources() - if OPTIONS.ignoreExistingFiles or not os.path.exists(output_index): - ValidateGATK.indexBAM(output_filename, OPTIONS.farmQueue) + output = os.path.join(directory, mergeFilesSpec.filename() + '.stdout') + output_filename = os.path.join(directory, mergeFilesSpec.filename() + bam_ext) + output_index = output_filename + ".bai" + + jobid = None + if OPTIONS.ignoreExistingFiles or not os.path.exists(output_filename): + #cmd = 'java -Xmx4096m -jar ' + OPTIONS.mergeBin + ' MSD=true AS=true SO=coordinate O=' + output_filename + ' VALIDATION_STRINGENCY=SILENT ' + ' I=' + (' I='.join(sources)) + cmd = picard_utils.mergeBAMCmd(output_filename, mergeFilesSpec.sources(), OPTIONS.mergeBin) + print cmd + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output) + + if OPTIONS.ignoreExistingFiles or not os.path.exists(output_index): + cmd = "samtools index " + output_filename + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output, waitID = jobid) diff --git a/python/picard_utils.py b/python/picard_utils.py index 85b5c9a49..0a3fdf5d9 100755 --- a/python/picard_utils.py +++ b/python/picard_utils.py @@ -22,10 +22,9 @@ MERGE_BIN = '/seq/software/picard/current/bin/MergeSamFiles.jar' CALL_GENOTYPES_BIN = '/seq/software/picard/current/bin/CallGenotypes.jar' def unique(l): - # NotÊorderÊpreservingÊÊÊÊ return list(set(l)) -def genotypes2heterozyosity(genotypes, nIndividuals = -1): +def genotypes2heterozygosity(genotypes, nIndividuals = -1): def isHET(genotype): return genotype[0] <> genotype[1] @@ -39,34 +38,98 @@ def genotypes2heterozyosity(genotypes, nIndividuals = -1): # print genotypes, ' => hets', hets return [nhets / (1.0*n), nhets, n] -def genotypes2allele(genotypes, nIndividuals = -1): - def isHET(genotype): - return genotype[0] <> genotype[1] - +def genotypes2allelefrequencies(ref, genotypes, nIndividuals = -1): if nIndividuals == -1: n = len(genotypes) else: n = nIndividuals - - hets = filter( isHET, genotypes ) - nhets = len(hets) - print genotypes, ' => hets', hets - return [nhets / (1.0*n), nhets, n] + alleles = ''.join(genotypes) + nChroms = 2 * n + nCalledChroms = 2 * len(genotypes) + nMissingChroms = nChroms - nCalledChroms + nRefChroms = alleles.count(ref) + nMissingChroms + nAltChroms = nChroms - nRefChroms + p = float(nRefChroms) / nChroms + q = float(nAltChroms) / nChroms + + assert p + q == 1 + + return [p, q, n] + +class PicardSNP: + def __init__( self, loc, ref, polymorphism, heterozygosity, allelefrequencies, genotypes): + self.loc = loc + self.ref = ref + self.polymorphism = polymorphism + self.heterozygosity = heterozygosity + self.nIndividuals = allelefrequencies[2] + self.allelefrequencies = allelefrequencies + self.genotypes = genotypes + + def refGenotype(self): + return self.ref + self.ref + + def hetGenotype(self): + return self.ref + self.alt() + + def homVarGenotype(self): + return self.alt() + self.alt() + + def alt(self): + return self.polymorphism[1] + + def het(self): + return self.heterozygosity[0] + + def p(self): + return self.allelefrequencies[0] + + def q(self): + return self.allelefrequencies[1] + + def countGenotype(self, genotype): + r = len(filter( lambda x: x == genotype, self.genotypes )) + #print 'countGenotype', genotype, self.genotypes, r + return r + + def nRefGenotypes(self): + return self.nIndividuals - self.nHetGenotypes() - self.nHomVarGenotypes() + + def nHetGenotypes(self): + return self.countGenotype(self.hetGenotype()) + + def nHomVarGenotypes(self): + return self.countGenotype(self.homVarGenotype()) + + + def __str__(self): + return '%s %s %s %s %s %s' % ( self.loc, self.ref, str(self.polymorphism), str(self.het()), str(self.allelefrequencies), str(self.genotypes)) + def aggregatedGeliCalls2SNP( geliCallsAtSite, nIndividuals ): #print 'geliCallsAtSite', geliCallsAtSite loc = geliCallsAtSite[0] + #print loc refBases = map( lambda call: call[2], geliCallsAtSite[1] ) + refBase = refBases[0] #print 'refBases', refBases - genotypes = map( lambda call: call[5], geliCallsAtSite[1] ) + + genotypes = map( lambda call: ''.join(sorted(call[5])), geliCallsAtSite[1] ) + allBases = unique(''.join(genotypes) + refBase) + #print 'All bases => ', allBases, genotypes + + if len(allBases) > 2: + print '*** WARNING, tri-state allele [ref=%s, all bases observed = %s] discovered at %s, ignoring the call' % ( refBase, ''.join(allBases), loc ) + return None + #print 'genotypes', genotypes - polymorphism = unique(list(refBases[0] + genotypes[0])) - if polymorphism[0] <> refBases[0]: polymorphism.reverse() - assert polymorphism[0] == refBases[0] + polymorphism = unique(list(refBase + genotypes[0])) + if polymorphism[0] <> refBase: polymorphism.reverse() + #print 'polymorphism', polymorphism genotype = list(geliCallsAtSite[1][0][5]) - return [loc, polymorphism, genotypes2heterozyosity(genotypes, nIndividuals), genotypes] + return PicardSNP(loc, refBase, polymorphism, genotypes2heterozygosity(genotypes, nIndividuals), genotypes2allelefrequencies(refBase, genotypes, nIndividuals), genotypes) #return '%s %s %s 0.002747 -411.622578 -420.661738 0.000000 9.039160 364.000000 %d 1 0' % (loc, genotype[0], genotype[1], len(geliCallsAtSite)) @@ -74,12 +137,14 @@ def call2loc(call): return call[0] + ':' + call[1] def aggregateGeliCalls( sortedGeliCalls ): + #return [[loc, list(sharedCallsGroup)] for (loc, sharedCallsGroup) in itertools.groupby(sortedGeliCalls, call2loc)] return [[loc, list(sharedCallsGroup)] for (loc, sharedCallsGroup) in itertools.groupby(sortedGeliCalls, call2loc)] def mergeBAMCmd( output_filename, inputFiles, mergeBin = MERGE_BIN ): if type(inputFiles) <> list: inputFiles = list(inputFiles) return 'java -Xmx4096m -jar ' + mergeBin + ' MSD=true AS=true SO=coordinate O=' + output_filename + ' VALIDATION_STRINGENCY=SILENT ' + ' I=' + (' I='.join(inputFiles)) + #return 'java -Xmx4096m -jar ' + mergeBin + ' AS=true SO=coordinate O=' + output_filename + ' VALIDATION_STRINGENCY=SILENT ' + ' I=' + (' I='.join(inputFiles)) def getPicardPath(lane, picardRoot = '/seq/picard/'): flowcell, laneNo = lane.split('.') @@ -118,6 +183,23 @@ def readPicardConcordance(file): data = [parse1(line) for line in open(file) if p.match(line) <> None] return data +def splitPath(geli): + root, filename = os.path.split(geli) + s = filename.split('.') + flowcellDotlane = '.'.join(s[0:2]) + ext = '.'.join(s[2:]) + return [root, flowcellDotlane, ext] + +def read_dbsnp(dbsnp_matches): + next = False + for line in open(dbsnp_matches): + s = line.split() + if next: + return s + if len(s) > 0 and s[0] == "TOTAL_SNPS": + next = True + return [] + # ------------------------------------------------------------------------------------------ # # Unit testing! @@ -146,7 +228,7 @@ chr1 1111204 C 31 99 CT 18.784479 18.784479 self.nhets = [1, 2, 2, 1, 1, 2] self.altAlleles = [1, 2, 2, 1, 1, 4] - self.aaf = map( lambda x: (1.0*x) / self.nIndividuals, self.altAlleles ) + self.aaf = map( lambda x: (1.0*x) / (2 * self.nIndividuals), self.altAlleles ) self.hets = map( lambda x: (1.0*x) / self.nIndividuals, self.nhets ) def testGenotypesSize(self): @@ -154,19 +236,35 @@ chr1 1111204 C 31 99 CT 18.784479 18.784479 def testGenotypes2Het(self): print 'testGenotypes2Het...' - self.assertEqual(genotypes2heterozyosity(['AT']), [1, 1, 1]) - self.assertEqual(genotypes2heterozyosity(['AA']), [0, 0, 1]) - self.assertEqual(genotypes2heterozyosity(['TT']), [0, 0, 1]) - self.assertEqual(genotypes2heterozyosity(['AT', 'AT']), [1, 2, 2]) - self.assertEqual(genotypes2heterozyosity(['AA', 'AA']), [0, 0, 2]) - self.assertEqual(genotypes2heterozyosity(['AT', 'AA']), [0.5, 1, 2]) - self.assertEqual(genotypes2heterozyosity(['AT', 'TT']), [0.5, 1, 2]) - self.assertEqual(genotypes2heterozyosity(['AT', 'TT', 'AA']), [1.0/3, 1, 3]) - self.assertEqual(genotypes2heterozyosity(['AT', 'AT', 'AA']), [2.0/3, 2, 3]) + self.assertEqual(genotypes2heterozygosity(['AT']), [1, 1, 1]) + self.assertEqual(genotypes2heterozygosity(['AA']), [0, 0, 1]) + self.assertEqual(genotypes2heterozygosity(['TT']), [0, 0, 1]) + self.assertEqual(genotypes2heterozygosity(['AT', 'AT']), [1, 2, 2]) + self.assertEqual(genotypes2heterozygosity(['AA', 'AA']), [0, 0, 2]) + self.assertEqual(genotypes2heterozygosity(['AT', 'AA']), [0.5, 1, 2]) + self.assertEqual(genotypes2heterozygosity(['AT', 'TT']), [0.5, 1, 2]) + self.assertEqual(genotypes2heterozygosity(['AT', 'TT', 'AA']), [1.0/3, 1, 3]) + self.assertEqual(genotypes2heterozygosity(['AT', 'AT', 'AA']), [2.0/3, 2, 3]) - self.assertEqual(genotypes2heterozyosity(['AT', 'AT'], 10), [2.0/10, 2, 10]) - self.assertEqual(genotypes2heterozyosity(['AT', 'AA'], 10), [1.0/10, 1, 10]) + self.assertEqual(genotypes2heterozygosity(['AT', 'AT'], 10), [2.0/10, 2, 10]) + self.assertEqual(genotypes2heterozygosity(['AT', 'AA'], 10), [1.0/10, 1, 10]) + def testAlleleFreqs(self): + print 'testAlleleFreqs...' + self.assertEqual(genotypes2allelefrequencies('A', ['AT']), [0.5, 0.5, 1]) + self.assertEqual(genotypes2allelefrequencies('T', ['AT']), [0.5, 0.5, 1]) + self.assertEqual(genotypes2allelefrequencies('A', ['AA']), [1.0, 0.0, 1]) + self.assertEqual(genotypes2allelefrequencies('A', ['TT']), [0.0, 1.0, 1]) + + self.assertEqual(genotypes2allelefrequencies('A', ['TT'], 2), [0.5, 0.5, 2]) + self.assertEqual(genotypes2allelefrequencies('A', ['AA'], 2), [1.0, 0.0, 2]) + self.assertEqual(genotypes2allelefrequencies('A', ['AT'], 2), [3.0/4, 1.0/4, 2]) + self.assertEqual(genotypes2allelefrequencies('A', ['AT'], 3), [5.0/6, 1.0/6, 3]) + self.assertEqual(genotypes2allelefrequencies('T', ['AT'], 3), [5.0/6, 1.0/6, 3]) + + self.assertEqual(genotypes2allelefrequencies('A', ['AT', 'AT'], 3), [4.0/6, 2.0/6, 3]) + self.assertEqual(genotypes2allelefrequencies('A', ['AT', 'TT'], 3), [3.0/6, 3.0/6, 3]) + self.assertEqual(genotypes2allelefrequencies('A', ['AT', 'TT', 'AA']), [3.0/6, 3.0/6, 3]) def testGenotypeSetLocs(self): for set, loc in zip(self.genotypesSets, self.locs): @@ -175,14 +273,23 @@ chr1 1111204 C 31 99 CT 18.784479 18.784479 def testGenotypeLocs(self): for genotype, loc in zip(self.genotypes, self.locs): - self.assertEqual(genotype[0], loc) + self.assertEqual(genotype.loc, loc) def testGenotypeHets(self): print 'testGenotypeHets:' for genotype, het in zip(self.genotypes, self.hets): print ' => ', genotype, het - self.assertEqual(genotype[2][0], het) + self.assertEqual(genotype.het(), het) + def testGenotypeAlleleFreqs(self): + print 'testGenotypeAlleleFreqs:' + for genotype, af in zip(self.genotypes, self.aaf): + print ' => ', genotype, af + self.assertEqual(genotype.allelefrequencies, [1 - af, af, self.nIndividuals]) + + def testSplit(self): + self.assertEqual(splitPath('/seq/picard/30GA9AAXX/C1-152_2008-10-23_2009-04-05/1/Solexa-8267/30GA9AAXX.1.observed_genotypes.geli'), ['/seq/picard/30GA9AAXX/C1-152_2008-10-23_2009-04-05/1/Solexa-8267', '30GA9AAXX.1', 'observed_genotypes.geli']) + self.assertEqual(splitPath('/seq/picard/30GA9AAXX/C1-152_2008-10-23_2009-04-05/2/Solexa-8268/30GA9AAXX.2.observed_genotypes.geli'), ['/seq/picard/30GA9AAXX/C1-152_2008-10-23_2009-04-05/2/Solexa-8268', '30GA9AAXX.2', 'observed_genotypes.geli']) if __name__ == '__main__': unittest.main()