Improving, yet again, the merging of bam files

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@874 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2009-06-02 13:31:12 +00:00
parent c4cb867d74
commit ae2eddec2d
4 changed files with 312 additions and 73 deletions

View File

@ -46,10 +46,11 @@ def main():
for geli in gelis:
root, flowcellDotlane, ext = picard_utils.splitPath(geli)
dbsnp_matches = os.path.join(root, flowcellDotlane) + '.dbsnp_matches'
TOTAL_SNPS, NOVEL_SNPS, PCT_DBSNP, NUM_IN_DB_SNP = picard_utils.read_dbsnp(dbsnp_matches)
nTotalSnps += int(TOTAL_SNPS)
nNovelSnps += int(NOVEL_SNPS)
print 'DATA: ', flowcellDotlane, TOTAL_SNPS, NOVEL_SNPS, PCT_DBSNP, NUM_IN_DB_SNP, dbsnp_matches
if os.path.exists(dbsnp_matches):
TOTAL_SNPS, NOVEL_SNPS, PCT_DBSNP, NUM_IN_DB_SNP = picard_utils.read_dbsnp(dbsnp_matches)
nTotalSnps += int(TOTAL_SNPS)
nNovelSnps += int(NOVEL_SNPS)
print 'DATA: ', flowcellDotlane, TOTAL_SNPS, NOVEL_SNPS, PCT_DBSNP, NUM_IN_DB_SNP, dbsnp_matches
print 'DATA: TOTAL SNP CALLS SUMMED ACROSS LANES, NOT ACCOUNT FOR IDENTITY', nTotalSnps
print 'DATA: NOVEL SNP CALLS SUMMED ACROSS LANES, NOT ACCOUNT FOR IDENTITY ', nNovelSnps
print 'DATA: AVERAGE DBSNP RATE ACROSS LANES ', float(nTotalSnps - nNovelSnps) / nTotalSnps
@ -57,8 +58,8 @@ def main():
jobid = None
for geli, variantOut in zip(gelis, variantsOut):
if not os.path.exists(variantOut):
cmd = ("GeliToText.jar I=%s | awk '$7 > %f' > %s" % ( geli, OPTIONS.lod, variantsOut) )
#jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, just_print_commands=False)
cmd = ("GeliToText.jar I=%s | awk '$7 > %f' > %s" % ( geli, OPTIONS.lod, variantOut) )
jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, just_print_commands=False)
cmd = ("cat %s | awk '$1 !~ \"@\" && $1 !~ \"#Sequence\" && $0 !~ \"GeliToText\"' | sort -k 1 -k 2 -n > tmp.calls" % ( ' '.join(variantsOut) ) )
jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, just_print_commands=False, waitID = jobid)

View File

@ -7,56 +7,14 @@ import glob
import operator
import ValidateGATK
import picard_utils
MERGE_BIN = '/seq/software/picard/current/bin/MergeSamFiles.jar'
bam_ext = '.bam'
def readNAIdMap(NAIdFile):
m = dict()
for data in [line.split() for line in open(NAIdFile)]:
naid, pop = data[0:2]
print naid, ' => ', pop
assert naid not in m
m[naid] = pop
print 'Read NAID->population map'
print 'Contains', len(m), 'id -> population mappings'
print 'Distinct populations:', picard_utils.unique(m.values())
return m
class MergeFilesSpec:
def __init__(self, sources, pop, merged_filename_base ):
self.sourceFiles = sources
self.pop = pop
self.merged_filename_base = merged_filename_base
def sources(self):
return self.sourceFiles
def filename(self):
return self.merged_filename_base + '.' + self.pop
from MergeBAMsUtils import *
def splitSourcesByPopulation(allSources, merged_filename_base, NAID2Pop):
if NAID2Pop == None:
return [MergeFilesSpec(allSources, '', merged_filename_base)]
else:
specs = dict()
for source in allSources:
spec = None
for naid, pop in NAID2Pop.iteritems():
if source.find(naid) <> -1:
if pop in specs:
spec = specs[pop]
else:
spec = MergeFilesSpec([], pop, merged_filename_base)
specs[pop] = spec
#print 'Mapping', source, naid, pop
spec.sourceFiles.append(source)
if spec == None:
sys.exit('File contains an unknown NAID: ' + source)
return specs.values()
sourcePairs = [[source, source] for source in allSources]
return groupSources(sourcePairs, NAID2Pop, merged_filename_base)
if __name__ == "__main__":
usage = "usage: %prog [options]"
usage = "usage: %prog files.list [options]"
parser = OptionParser(usage=usage)
parser.add_option("-q", "--farm", dest="farmQueue",
type="string", default=None,
@ -64,11 +22,14 @@ if __name__ == "__main__":
parser.add_option("-d", "--dir", dest="output_dir",
type="string", default="./",
help="Output directory")
parser.add_option("", "--dry", dest="dry",
action='store_true', default=False,
help="If provided, nothing actually gets run, just a dry run")
parser.add_option("-i", "--ignoreExistingFiles", dest="ignoreExistingFiles",
action='store_true', default=False,
help="Ignores already written files, if present")
parser.add_option("-m", "--mergeBin", dest="mergeBin",
type="string", default=MERGE_BIN,
type="string", default=None,
help="Path to merge binary")
parser.add_option("-n", "--naIDPops", dest="NAIDS2POP",
type="string", default=None,
@ -87,34 +48,23 @@ if __name__ == "__main__":
if OPTIONS.NAIDS2POP <> None:
NAID2Pop = readNAIdMap(OPTIONS.NAIDS2POP)
today = date.today()
time_stamp = today.isoformat()
for line in open(args[0]):
s = line.split()
if ( s <> [] and s[0] <> '#' ):
merged_filename_base = s[0]
allSources = reduce( operator.__add__, map( glob.glob, s[1:] ), [] )
print 'Merging info:'
for mergeFilesSpec in splitSourcesByPopulation(allSources, merged_filename_base, NAID2Pop):
print '-----'
print ' Population', mergeFilesSpec.pop
print ' Filename', mergeFilesSpec.filename()
print ' N sources', len(mergeFilesSpec.sources())
print ' sources', mergeFilesSpec.sources()
output = os.path.join(directory, mergeFilesSpec.filename() + '.stdout')
output_filename = os.path.join(directory, mergeFilesSpec.filename() + bam_ext)
output_index = output_filename + ".bai"
for spec in splitSourcesByPopulation(allSources, merged_filename_base, NAID2Pop):
spec.setPath(directory)
spec.pprint()
jobid = None
if OPTIONS.ignoreExistingFiles or not os.path.exists(output_filename):
#cmd = 'java -Xmx4096m -jar ' + OPTIONS.mergeBin + ' MSD=true AS=true SO=coordinate O=' + output_filename + ' VALIDATION_STRINGENCY=SILENT ' + ' I=' + (' I='.join(sources))
cmd = picard_utils.mergeBAMCmd(output_filename, mergeFilesSpec.sources(), OPTIONS.mergeBin)
if OPTIONS.ignoreExistingFiles or not os.path.exists(spec.getMergedBAM()):
output = spec.getMergedBase() + '.stdout'
cmd = spec.mergeCmd(OPTIONS.mergeBin)
print cmd
jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output)
jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output, just_print_commands = OPTIONS.dry)
if OPTIONS.ignoreExistingFiles or not os.path.exists(output_index):
cmd = "samtools index " + output_filename
jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output, waitID = jobid)
if OPTIONS.ignoreExistingFiles or not os.path.exists(spec.getMergedBAMIndex()):
jobid = farm_commands.cmd(spec.getIndexCmd(), OPTIONS.farmQueue, '', waitID = jobid, just_print_commands = OPTIONS.dry)

View File

@ -0,0 +1,209 @@
import farm_commands
import os.path
import sys
from optparse import OptionParser
from datetime import date
import glob
import operator
import ValidateGATK
import picard_utils
import operator
MERGE_BIN = '/seq/software/picard/current/bin/MergeSamFiles.jar'
bam_ext = 'bam'
bam_index_ext = 'bai'
def readNAIdMap(NAIdFile):
m = dict()
for data in [line.split() for line in open(NAIdFile)]:
naid, pop = data[0:2]
print naid, ' => ', pop
assert naid not in m
m[naid] = pop
print 'Read NAID->population map'
print 'Contains', len(m), 'id -> population mappings'
print 'Distinct populations:', picard_utils.unique(m.values())
return m
_abbrevs = [
(1<<50L, 'P'),
(1<<40L, 'T'),
(1<<30L, 'G'),
(1<<20L, 'M'),
(1<<10L, 'k'),
(1, '')
]
def greek(size):
"""Return a string representing the greek/metric suffix of a size"""
for factor, suffix in _abbrevs:
if size > factor:
break
return '%.1f%s' % (float(size)/factor, suffix)
class MergeFilesSpec:
def __init__(self, sources, group, merged_filename_base, path = '' ):
self.sourceFiles = sources
self.groupName = group
self.merged_filename_base = merged_filename_base
self.path = ''
def __str__(self):
return 'MergeFilesSpec: ' + self.group()
def group(self):
return self.groupName
def sources(self):
return self.sourceFiles
def filename(self):
if self.merged_filename_base <> None:
return self.merged_filename_base + '.' + self.group()
else:
return self.group()
def pprint(self):
print '--------------------------------------------------------------------------------'
print ' Population: ', self.group()
print ' Merged filename: ', self.getMergedBAM()
print ' N sources: ', len(self.sources())
print ' Sources: ', self.sources()
print ' Sizes: ', self.sourceSizes(humanReadable=True)
print ' Est. merged size: ', greek(reduce(operator.__add__, self.sourceSizes(), 0))
def setPath(self, path):
self.path = path
def getMergedBase(self):
return os.path.join(self.path, self.filename())
def getMergedBAM(self):
return self.getMergedBase() + '.' + bam_ext
def getMergedBAMIndex(self):
return self.getMergedBase() + '.' + bam_ext + '.' + bam_index_ext
def sourceSizes(self, humanReadable=False):
sizes = map( os.path.getsize, self.sources() )
if humanReadable:
sizes = map(greek, sizes)
return sizes
def mergeCmd(self, mergeBin = None):
if mergeBin == None:
mergeBin = MERGE_BIN
return picard_utils.mergeBAMCmd(self.getMergedBAM(), self.sources(), mergeBin)
def getIndexCmd(self):
return "samtools index " + self.getMergedBAM()
# Very general-purpose operation that returns merge specs given two lists of pairs:
# The first list, sources, contains superkey / sourceFile pairs.
# The second list, groups, contains key / group pairs.
#
# This function walks over all source pairs, and for each superkey, it
# looks for any key within groups contained within superkey. If it finds it,
# it associates sourceFile with the merge group in groups.
#
# The system requires that superkey match one (and only) one key in groups. It also
# requires that each group string be unique. The system can handle groups provided
# as a list of pairs of a dictionary.
#
# The function returns a list of MergeFileSpecs, one for each group with
# at least one associated sourceFile.
#
def groupSources(sources, groups, merged_filename_base):
if groups == None:
return [MergeFilesSpec(map( lambda x: x[1], sources), '', merged_filename_base)]
else:
specs = dict()
if type(groups) == list: groups = dict(groups)
for superkey, sourceFile in sources:
spec = None
for key, group in groups.iteritems():
#print 'Examining', superkey, key, group
if superkey.find(key) <> -1:
if group in specs:
spec = specs[group]
else:
spec = MergeFilesSpec([], group, merged_filename_base)
specs[group] = spec
print 'Mapping', group, key, superkey, sourceFile
spec.sourceFiles.append(sourceFile)
if spec == None:
sys.exit('File contains an unknown superkey: ' + superkey)
v = specs.values()
v.sort(key = MergeFilesSpec.group)
return v
import unittest
class TestMergeBAMsUtils(unittest.TestCase):
def setUp(self):
import cStringIO
groupsString = """NA10846 ceu CEPH1
NA10847 ceu CEPH1
NA12144 ceu CEPH1
NA12145 ceu CEPH1
NA12146 yri CEPH1
NA12239 yri CEPH1
NA07029 ceu CEPH1
NA07019 ceu CEPH1
NA06994 ceu CEPH1
NA07000 ceu CEPH1
NA07022 ceu CEPH1
NA07056 ceu CEPH1
NA07048 ceu CEPH1
NA06991 ceu CEPH1
NA07034 ceu CEPH1
"""
lanesString = """NA10846 30GA9AAXX 1 Paired CEPH 30GA9AAXX.1.observed_genotypes.geli
NA10846 30GA9AAXX 6 Paired CEPH 30GA9AAXX.6.observed_genotypes.geli
NA10847 30GA9AAXX 7 Paired CEPH 30GA9AAXX.7.observed_genotypes.geli
NA12146 30JLTAAXX 2 Paired CEPH 30JLTAAXX.2.observed_genotypes.geli
NA12239 30PNVAAXX 1 Paired CEPH 30PNVAAXX.1.observed_genotypes.geli
NA12144 30PYMAAXX 1 Paired CEPH 30PYMAAXX.1.observed_genotypes.geli
NA12146 30PYMAAXX 7 Paired CEPH 30PYMAAXX.7.observed_genotypes.geli
"""
self.laneIDCounts = dict([["NA10846", 2], ["NA10847", 1], ["NA12146", 2], ["NA12239", 1], ["NA12144", 1]])
pops = [line.strip() for line in cStringIO.StringIO(groupsString)]
lanes = [line.strip() for line in cStringIO.StringIO(lanesString)]
print pops
print lanes
self.ids2pop = [line.split()[0:2] for line in pops]
self.ids2gelis = [[line.split()[0], line.split()[5]] for line in lanes]
self.ids2ids = dict([[line.split()[0]] * 2 for line in lanes])
def testPopGroups(self):
specs = groupSources(self.ids2gelis, self.ids2pop, 'foo')
print 'Specs', specs
self.assertEqual(len(specs), 2)
self.assertEqual(specs[0].group(), 'ceu')
self.assertEqual(specs[1].group(), 'yri')
ceu = specs[0]
yri = specs[1]
#print ceu.sources()
self.assertEqual(len(ceu.sources()), 4)
self.assertEqual(len(yri.sources()), 3)
self.assertEqual(ceu.getMergedBAM(), 'foo.ceu.bam')
self.assertEqual(ceu.getMergedBAMIndex(), 'foo.ceu.bam.bai')
self.assertEqual(yri.getMergedBAM(), 'foo.yri.bam')
self.assertEqual(yri.getMergedBAMIndex(), 'foo.yri.bam.bai')
def testIDGroups(self):
specs = groupSources(self.ids2gelis, self.ids2ids, 'foo')
self.assertEqual(len(specs), 5)
for spec in specs:
print 'Spec', spec
self.assertEqual(len(spec.sources()), self.laneIDCounts[spec.group()])
self.assertEqual(spec.getMergedBAM(), 'foo.' + spec.group() + '.bam')
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,79 @@
import farm_commands
import os.path
import sys
from optparse import OptionParser
import picard_utils
from MergeBAMsUtils import *
def splitSourcesByKeys( bams, keys ):
keyPairs = [[key, key] for key in keys]
keybamPairs = zip(keys, bams)
return groupSources(keybamPairs, keyPairs, None)
if __name__ == "__main__":
usage = """usage: %prog bams.list [options]
Merges BAM files by keys from a file of a list of bams.
bams.list is a whitespace separated file. One column (--keyCol arg) is the key, and another
column (--bamCol) is a path to a bam file. This program will group the bam files
by key and spawn merge and index jobs to merge all of the files sharing the same key together"""
parser = OptionParser(usage=usage)
parser.add_option("-q", "--farm", dest="farmQueue",
type="string", default=None,
help="Farm queue to send processing jobs to")
parser.add_option("-d", "--dir", dest="output_dir",
type="string", default="./",
help="Output directory")
parser.add_option("", "--dry", dest="dry",
action='store_true', default=False,
help="If provided, nothing actually gets run, just a dry run")
parser.add_option("-i", "--ignoreExistingFiles", dest="ignoreExistingFiles",
action='store_true', default=False,
help="Ignores already written files, if present")
parser.add_option("-m", "--mergeBin", dest="mergeBin",
type="string", default=None,
help="Path to merge binary")
parser.add_option("", "--keyCol", dest="keyCol",
type=int, default=1,
help="Column in the list file holding the key")
parser.add_option("", "--bamCol", dest="bamCol",
type=int, default=2,
help="Column in the list file holding the bam file path")
parser.add_option("-l", "--link", dest="link",
action='store_true', default=False,
help="If true, program will soft link single bam files that don't need merging")
(OPTIONS, args) = parser.parse_args()
if len(args) != 1:
parser.error("incorrect number of arguments")
directory = OPTIONS.output_dir
if not os.path.exists(directory):
os.mkdir(directory)
bamsList = [line.strip().split() for line in open(args[0])]
keys = map( lambda x: x[OPTIONS.keyCol-1], bamsList )
bams = map( lambda x: x[OPTIONS.bamCol-1], bamsList )
print 'Merging info:'
for info in bamsList: print info
for spec in splitSourcesByKeys(bams, keys):
spec.setPath(directory)
spec.pprint()
jobid = None
if OPTIONS.ignoreExistingFiles or not os.path.exists(spec.getMergedBAM()):
output = spec.getMergedBase() + '.stdout'
if len(spec.sources()) == 1 and OPTIONS.link:
cmd = 'ln -s ' + spec.sources()[0] + ' ' + spec.getMergedBAM()
else:
cmd = spec.mergeCmd(OPTIONS.mergeBin)
print cmd
jobid = farm_commands.cmd(cmd, OPTIONS.farmQueue, output, just_print_commands = OPTIONS.dry)
if OPTIONS.ignoreExistingFiles or not os.path.exists(spec.getMergedBAMIndex()):
pass
jobid = farm_commands.cmd(spec.getIndexCmd(), OPTIONS.farmQueue, '', waitID = jobid, just_print_commands = OPTIONS.dry)