From ae2eddec2d7fde6d0d01e5186b3371ff48dec552 Mon Sep 17 00:00:00 2001 From: depristo Date: Tue, 2 Jun 2009 13:31:12 +0000 Subject: [PATCH] Improving, yet again, the merging of bam files git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@874 348d0f76-0448-11de-a6fe-93d51630548a --- python/Gelis2PopSNPs.py | 13 +-- python/MergeBAMBatch.py | 84 ++++------------ python/MergeBAMsUtils.py | 209 +++++++++++++++++++++++++++++++++++++++ python/MergeBamsByKey.py | 79 +++++++++++++++ 4 files changed, 312 insertions(+), 73 deletions(-) create mode 100755 python/MergeBAMsUtils.py create mode 100755 python/MergeBamsByKey.py diff --git a/python/Gelis2PopSNPs.py b/python/Gelis2PopSNPs.py index 3f8629716..60f25456f 100755 --- a/python/Gelis2PopSNPs.py +++ b/python/Gelis2PopSNPs.py @@ -46,10 +46,11 @@ def main(): for geli in gelis: root, flowcellDotlane, ext = picard_utils.splitPath(geli) dbsnp_matches = os.path.join(root, flowcellDotlane) + '.dbsnp_matches' - TOTAL_SNPS, NOVEL_SNPS, PCT_DBSNP, NUM_IN_DB_SNP = picard_utils.read_dbsnp(dbsnp_matches) - nTotalSnps += int(TOTAL_SNPS) - nNovelSnps += int(NOVEL_SNPS) - print 'DATA: ', flowcellDotlane, TOTAL_SNPS, NOVEL_SNPS, PCT_DBSNP, NUM_IN_DB_SNP, dbsnp_matches + if os.path.exists(dbsnp_matches): + TOTAL_SNPS, NOVEL_SNPS, PCT_DBSNP, NUM_IN_DB_SNP = picard_utils.read_dbsnp(dbsnp_matches) + nTotalSnps += int(TOTAL_SNPS) + nNovelSnps += int(NOVEL_SNPS) + print 'DATA: ', flowcellDotlane, TOTAL_SNPS, NOVEL_SNPS, PCT_DBSNP, NUM_IN_DB_SNP, dbsnp_matches print 'DATA: TOTAL SNP CALLS SUMMED ACROSS LANES, NOT ACCOUNT FOR IDENTITY', nTotalSnps print 'DATA: NOVEL SNP CALLS SUMMED ACROSS LANES, NOT ACCOUNT FOR IDENTITY ', nNovelSnps print 'DATA: AVERAGE DBSNP RATE ACROSS LANES ', float(nTotalSnps - nNovelSnps) / nTotalSnps @@ -57,8 +58,8 @@ def main(): jobid = None for geli, variantOut in zip(gelis, variantsOut): if not os.path.exists(variantOut): - cmd = ("GeliToText.jar I=%s | awk '$7 > %f' > %s" % ( geli, OPTIONS.lod, variantsOut) ) - #jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, just_print_commands=False) + cmd = ("GeliToText.jar I=%s | awk '$7 > %f' > %s" % ( geli, OPTIONS.lod, variantOut) ) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, just_print_commands=False) cmd = ("cat %s | awk '$1 !~ \"@\" && $1 !~ \"#Sequence\" && $0 !~ \"GeliToText\"' | sort -k 1 -k 2 -n > tmp.calls" % ( ' '.join(variantsOut) ) ) jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, just_print_commands=False, waitID = jobid) diff --git a/python/MergeBAMBatch.py b/python/MergeBAMBatch.py index dcbffc8c5..7e7889f79 100755 --- a/python/MergeBAMBatch.py +++ b/python/MergeBAMBatch.py @@ -7,56 +7,14 @@ import glob import operator import ValidateGATK import picard_utils - -MERGE_BIN = '/seq/software/picard/current/bin/MergeSamFiles.jar' -bam_ext = '.bam' - -def readNAIdMap(NAIdFile): - m = dict() - for data in [line.split() for line in open(NAIdFile)]: - naid, pop = data[0:2] - print naid, ' => ', pop - assert naid not in m - m[naid] = pop - print 'Read NAID->population map' - print 'Contains', len(m), 'id -> population mappings' - print 'Distinct populations:', picard_utils.unique(m.values()) - return m - -class MergeFilesSpec: - def __init__(self, sources, pop, merged_filename_base ): - self.sourceFiles = sources - self.pop = pop - self.merged_filename_base = merged_filename_base - - def sources(self): - return self.sourceFiles - - def filename(self): - return self.merged_filename_base + '.' + self.pop +from MergeBAMsUtils import * def splitSourcesByPopulation(allSources, merged_filename_base, NAID2Pop): - if NAID2Pop == None: - return [MergeFilesSpec(allSources, '', merged_filename_base)] - else: - specs = dict() - for source in allSources: - spec = None - for naid, pop in NAID2Pop.iteritems(): - if source.find(naid) <> -1: - if pop in specs: - spec = specs[pop] - else: - spec = MergeFilesSpec([], pop, merged_filename_base) - specs[pop] = spec - #print 'Mapping', source, naid, pop - spec.sourceFiles.append(source) - if spec == None: - sys.exit('File contains an unknown NAID: ' + source) - return specs.values() + sourcePairs = [[source, source] for source in allSources] + return groupSources(sourcePairs, NAID2Pop, merged_filename_base) if __name__ == "__main__": - usage = "usage: %prog [options]" + usage = "usage: %prog files.list [options]" parser = OptionParser(usage=usage) parser.add_option("-q", "--farm", dest="farmQueue", type="string", default=None, @@ -64,11 +22,14 @@ if __name__ == "__main__": parser.add_option("-d", "--dir", dest="output_dir", type="string", default="./", help="Output directory") + parser.add_option("", "--dry", dest="dry", + action='store_true', default=False, + help="If provided, nothing actually gets run, just a dry run") parser.add_option("-i", "--ignoreExistingFiles", dest="ignoreExistingFiles", action='store_true', default=False, help="Ignores already written files, if present") parser.add_option("-m", "--mergeBin", dest="mergeBin", - type="string", default=MERGE_BIN, + type="string", default=None, help="Path to merge binary") parser.add_option("-n", "--naIDPops", dest="NAIDS2POP", type="string", default=None, @@ -87,34 +48,23 @@ if __name__ == "__main__": if OPTIONS.NAIDS2POP <> None: NAID2Pop = readNAIdMap(OPTIONS.NAIDS2POP) - today = date.today() - time_stamp = today.isoformat() - for line in open(args[0]): s = line.split() if ( s <> [] and s[0] <> '#' ): merged_filename_base = s[0] allSources = reduce( operator.__add__, map( glob.glob, s[1:] ), [] ) print 'Merging info:' - for mergeFilesSpec in splitSourcesByPopulation(allSources, merged_filename_base, NAID2Pop): - print '-----' - print ' Population', mergeFilesSpec.pop - print ' Filename', mergeFilesSpec.filename() - print ' N sources', len(mergeFilesSpec.sources()) - print ' sources', mergeFilesSpec.sources() - - output = os.path.join(directory, mergeFilesSpec.filename() + '.stdout') - output_filename = os.path.join(directory, mergeFilesSpec.filename() + bam_ext) - output_index = output_filename + ".bai" + for spec in splitSourcesByPopulation(allSources, merged_filename_base, NAID2Pop): + spec.setPath(directory) + spec.pprint() jobid = None - if OPTIONS.ignoreExistingFiles or not os.path.exists(output_filename): - #cmd = 'java -Xmx4096m -jar ' + OPTIONS.mergeBin + ' MSD=true AS=true SO=coordinate O=' + output_filename + ' VALIDATION_STRINGENCY=SILENT ' + ' I=' + (' I='.join(sources)) - cmd = picard_utils.mergeBAMCmd(output_filename, mergeFilesSpec.sources(), OPTIONS.mergeBin) + if OPTIONS.ignoreExistingFiles or not os.path.exists(spec.getMergedBAM()): + output = spec.getMergedBase() + '.stdout' + cmd = spec.mergeCmd(OPTIONS.mergeBin) print cmd - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output) + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output, just_print_commands = OPTIONS.dry) - if OPTIONS.ignoreExistingFiles or not os.path.exists(output_index): - cmd = "samtools index " + output_filename - jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output, waitID = jobid) + if OPTIONS.ignoreExistingFiles or not os.path.exists(spec.getMergedBAMIndex()): + jobid = farm_commands.cmd(spec.getIndexCmd(), OPTIONS.farmQueue, '', waitID = jobid, just_print_commands = OPTIONS.dry) diff --git a/python/MergeBAMsUtils.py b/python/MergeBAMsUtils.py new file mode 100755 index 000000000..b29de288f --- /dev/null +++ b/python/MergeBAMsUtils.py @@ -0,0 +1,209 @@ +import farm_commands +import os.path +import sys +from optparse import OptionParser +from datetime import date +import glob +import operator +import ValidateGATK +import picard_utils +import operator + +MERGE_BIN = '/seq/software/picard/current/bin/MergeSamFiles.jar' +bam_ext = 'bam' +bam_index_ext = 'bai' + +def readNAIdMap(NAIdFile): + m = dict() + for data in [line.split() for line in open(NAIdFile)]: + naid, pop = data[0:2] + print naid, ' => ', pop + assert naid not in m + m[naid] = pop + print 'Read NAID->population map' + print 'Contains', len(m), 'id -> population mappings' + print 'Distinct populations:', picard_utils.unique(m.values()) + return m + +_abbrevs = [ + (1<<50L, 'P'), + (1<<40L, 'T'), + (1<<30L, 'G'), + (1<<20L, 'M'), + (1<<10L, 'k'), + (1, '') + ] + +def greek(size): + """Return a string representing the greek/metric suffix of a size""" + for factor, suffix in _abbrevs: + if size > factor: + break + return '%.1f%s' % (float(size)/factor, suffix) + +class MergeFilesSpec: + def __init__(self, sources, group, merged_filename_base, path = '' ): + self.sourceFiles = sources + self.groupName = group + self.merged_filename_base = merged_filename_base + self.path = '' + + def __str__(self): + return 'MergeFilesSpec: ' + self.group() + + def group(self): + return self.groupName + + def sources(self): + return self.sourceFiles + + def filename(self): + if self.merged_filename_base <> None: + return self.merged_filename_base + '.' + self.group() + else: + return self.group() + + def pprint(self): + print '--------------------------------------------------------------------------------' + print ' Population: ', self.group() + print ' Merged filename: ', self.getMergedBAM() + print ' N sources: ', len(self.sources()) + print ' Sources: ', self.sources() + print ' Sizes: ', self.sourceSizes(humanReadable=True) + print ' Est. merged size: ', greek(reduce(operator.__add__, self.sourceSizes(), 0)) + + def setPath(self, path): + self.path = path + + def getMergedBase(self): + return os.path.join(self.path, self.filename()) + + def getMergedBAM(self): + return self.getMergedBase() + '.' + bam_ext + + def getMergedBAMIndex(self): + return self.getMergedBase() + '.' + bam_ext + '.' + bam_index_ext + + def sourceSizes(self, humanReadable=False): + sizes = map( os.path.getsize, self.sources() ) + if humanReadable: + sizes = map(greek, sizes) + return sizes + + def mergeCmd(self, mergeBin = None): + if mergeBin == None: + mergeBin = MERGE_BIN + + return picard_utils.mergeBAMCmd(self.getMergedBAM(), self.sources(), mergeBin) + + def getIndexCmd(self): + return "samtools index " + self.getMergedBAM() + +# Very general-purpose operation that returns merge specs given two lists of pairs: +# The first list, sources, contains superkey / sourceFile pairs. +# The second list, groups, contains key / group pairs. +# +# This function walks over all source pairs, and for each superkey, it +# looks for any key within groups contained within superkey. If it finds it, +# it associates sourceFile with the merge group in groups. +# +# The system requires that superkey match one (and only) one key in groups. It also +# requires that each group string be unique. The system can handle groups provided +# as a list of pairs of a dictionary. +# +# The function returns a list of MergeFileSpecs, one for each group with +# at least one associated sourceFile. +# +def groupSources(sources, groups, merged_filename_base): + if groups == None: + return [MergeFilesSpec(map( lambda x: x[1], sources), '', merged_filename_base)] + else: + specs = dict() + if type(groups) == list: groups = dict(groups) + + for superkey, sourceFile in sources: + spec = None + for key, group in groups.iteritems(): + #print 'Examining', superkey, key, group + if superkey.find(key) <> -1: + if group in specs: + spec = specs[group] + else: + spec = MergeFilesSpec([], group, merged_filename_base) + specs[group] = spec + print 'Mapping', group, key, superkey, sourceFile + spec.sourceFiles.append(sourceFile) + if spec == None: + sys.exit('File contains an unknown superkey: ' + superkey) + v = specs.values() + v.sort(key = MergeFilesSpec.group) + return v + +import unittest +class TestMergeBAMsUtils(unittest.TestCase): + def setUp(self): + import cStringIO + groupsString = """NA10846 ceu CEPH1 +NA10847 ceu CEPH1 +NA12144 ceu CEPH1 +NA12145 ceu CEPH1 +NA12146 yri CEPH1 +NA12239 yri CEPH1 +NA07029 ceu CEPH1 +NA07019 ceu CEPH1 +NA06994 ceu CEPH1 +NA07000 ceu CEPH1 +NA07022 ceu CEPH1 +NA07056 ceu CEPH1 +NA07048 ceu CEPH1 +NA06991 ceu CEPH1 +NA07034 ceu CEPH1 +""" + lanesString = """NA10846 30GA9AAXX 1 Paired CEPH 30GA9AAXX.1.observed_genotypes.geli +NA10846 30GA9AAXX 6 Paired CEPH 30GA9AAXX.6.observed_genotypes.geli +NA10847 30GA9AAXX 7 Paired CEPH 30GA9AAXX.7.observed_genotypes.geli +NA12146 30JLTAAXX 2 Paired CEPH 30JLTAAXX.2.observed_genotypes.geli +NA12239 30PNVAAXX 1 Paired CEPH 30PNVAAXX.1.observed_genotypes.geli +NA12144 30PYMAAXX 1 Paired CEPH 30PYMAAXX.1.observed_genotypes.geli +NA12146 30PYMAAXX 7 Paired CEPH 30PYMAAXX.7.observed_genotypes.geli +""" + + self.laneIDCounts = dict([["NA10846", 2], ["NA10847", 1], ["NA12146", 2], ["NA12239", 1], ["NA12144", 1]]) + + pops = [line.strip() for line in cStringIO.StringIO(groupsString)] + lanes = [line.strip() for line in cStringIO.StringIO(lanesString)] + + print pops + print lanes + + self.ids2pop = [line.split()[0:2] for line in pops] + self.ids2gelis = [[line.split()[0], line.split()[5]] for line in lanes] + self.ids2ids = dict([[line.split()[0]] * 2 for line in lanes]) + + def testPopGroups(self): + specs = groupSources(self.ids2gelis, self.ids2pop, 'foo') + print 'Specs', specs + self.assertEqual(len(specs), 2) + self.assertEqual(specs[0].group(), 'ceu') + self.assertEqual(specs[1].group(), 'yri') + + ceu = specs[0] + yri = specs[1] + #print ceu.sources() + self.assertEqual(len(ceu.sources()), 4) + self.assertEqual(len(yri.sources()), 3) + self.assertEqual(ceu.getMergedBAM(), 'foo.ceu.bam') + self.assertEqual(ceu.getMergedBAMIndex(), 'foo.ceu.bam.bai') + self.assertEqual(yri.getMergedBAM(), 'foo.yri.bam') + self.assertEqual(yri.getMergedBAMIndex(), 'foo.yri.bam.bai') + + def testIDGroups(self): + specs = groupSources(self.ids2gelis, self.ids2ids, 'foo') + self.assertEqual(len(specs), 5) + for spec in specs: + print 'Spec', spec + self.assertEqual(len(spec.sources()), self.laneIDCounts[spec.group()]) + self.assertEqual(spec.getMergedBAM(), 'foo.' + spec.group() + '.bam') + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/python/MergeBamsByKey.py b/python/MergeBamsByKey.py new file mode 100755 index 000000000..43d3909bd --- /dev/null +++ b/python/MergeBamsByKey.py @@ -0,0 +1,79 @@ +import farm_commands +import os.path +import sys +from optparse import OptionParser +import picard_utils +from MergeBAMsUtils import * + +def splitSourcesByKeys( bams, keys ): + keyPairs = [[key, key] for key in keys] + keybamPairs = zip(keys, bams) + return groupSources(keybamPairs, keyPairs, None) + +if __name__ == "__main__": + usage = """usage: %prog bams.list [options] +Merges BAM files by keys from a file of a list of bams. + +bams.list is a whitespace separated file. One column (--keyCol arg) is the key, and another +column (--bamCol) is a path to a bam file. This program will group the bam files +by key and spawn merge and index jobs to merge all of the files sharing the same key together""" + + parser = OptionParser(usage=usage) + parser.add_option("-q", "--farm", dest="farmQueue", + type="string", default=None, + help="Farm queue to send processing jobs to") + parser.add_option("-d", "--dir", dest="output_dir", + type="string", default="./", + help="Output directory") + parser.add_option("", "--dry", dest="dry", + action='store_true', default=False, + help="If provided, nothing actually gets run, just a dry run") + parser.add_option("-i", "--ignoreExistingFiles", dest="ignoreExistingFiles", + action='store_true', default=False, + help="Ignores already written files, if present") + parser.add_option("-m", "--mergeBin", dest="mergeBin", + type="string", default=None, + help="Path to merge binary") + parser.add_option("", "--keyCol", dest="keyCol", + type=int, default=1, + help="Column in the list file holding the key") + parser.add_option("", "--bamCol", dest="bamCol", + type=int, default=2, + help="Column in the list file holding the bam file path") + parser.add_option("-l", "--link", dest="link", + action='store_true', default=False, + help="If true, program will soft link single bam files that don't need merging") + + (OPTIONS, args) = parser.parse_args() + if len(args) != 1: + parser.error("incorrect number of arguments") + + directory = OPTIONS.output_dir + + if not os.path.exists(directory): + os.mkdir(directory) + + bamsList = [line.strip().split() for line in open(args[0])] + keys = map( lambda x: x[OPTIONS.keyCol-1], bamsList ) + bams = map( lambda x: x[OPTIONS.bamCol-1], bamsList ) + + print 'Merging info:' + for info in bamsList: print info + for spec in splitSourcesByKeys(bams, keys): + spec.setPath(directory) + spec.pprint() + + jobid = None + if OPTIONS.ignoreExistingFiles or not os.path.exists(spec.getMergedBAM()): + output = spec.getMergedBase() + '.stdout' + if len(spec.sources()) == 1 and OPTIONS.link: + cmd = 'ln -s ' + spec.sources()[0] + ' ' + spec.getMergedBAM() + else: + cmd = spec.mergeCmd(OPTIONS.mergeBin) + print cmd + jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output, just_print_commands = OPTIONS.dry) + + if OPTIONS.ignoreExistingFiles or not os.path.exists(spec.getMergedBAMIndex()): + pass + jobid = farm_commands.cmd(spec.getIndexCmd(), OPTIONS.farmQueue, '', waitID = jobid, just_print_commands = OPTIONS.dry) +