305 lines
10 KiB
Python
305 lines
10 KiB
Python
|
|
#!/util/bin/python
|
||
|
|
#
|
||
|
|
#
|
||
|
|
# This file really needs to be cleaned up and functions need to be improved
|
||
|
|
#
|
||
|
|
# (1) utility operations should be moved to a utility file
|
||
|
|
# (2) Need to add support for windows paired reads, information about where
|
||
|
|
# reads come from (which file), and n-base pair loci
|
||
|
|
# (3) Move to C++ based SAM/BAM i/o system
|
||
|
|
# (4) Figure out a way to get the reference to stream, and use a automatically pickled version
|
||
|
|
# (5) Add capability to walk over all reference bases, regardless of coverage. Currently
|
||
|
|
# we only walk over covered bases.
|
||
|
|
#
|
||
|
|
#
|
||
|
|
from optparse import OptionParser
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
import random
|
||
|
|
import string
|
||
|
|
import os.path
|
||
|
|
import heapq
|
||
|
|
from itertools import *
|
||
|
|
|
||
|
|
from SAM import *
|
||
|
|
import SimpleSAM
|
||
|
|
#import SimulateReads # utility functions should be imported
|
||
|
|
|
||
|
|
# Global variable containing command line arguments
|
||
|
|
OPTIONS = None
|
||
|
|
|
||
|
|
#
|
||
|
|
# Trivial class that added pushback support to a stream
|
||
|
|
#
|
||
|
|
class peakStream(object):
|
||
|
|
"""A simple wrapper around a stream that adds unget operation"""
|
||
|
|
def __init__( self, stream ):
|
||
|
|
self.ungot = []
|
||
|
|
self.s = stream
|
||
|
|
|
||
|
|
def unget( self, elt ):
|
||
|
|
"""Put elt at the front of the underlying stream, making the next get call return it"""
|
||
|
|
|
||
|
|
#print 'unget', self.ungot
|
||
|
|
self.ungot.append(elt)
|
||
|
|
#print ' -> unget', self.ungot
|
||
|
|
|
||
|
|
# we hold the next() operation
|
||
|
|
def __iter__( self ): return self
|
||
|
|
|
||
|
|
def next(self):
|
||
|
|
"""For python iterator support"""
|
||
|
|
#print 'ungot', self.ungot
|
||
|
|
if self.ungot <> []:
|
||
|
|
elt = self.ungot.pop(0)
|
||
|
|
else:
|
||
|
|
elt = self.s.next()
|
||
|
|
|
||
|
|
#print ' -> ungot', self.ungot
|
||
|
|
#print ' elt', elt
|
||
|
|
return elt
|
||
|
|
|
||
|
|
#
|
||
|
|
# Returns a single, combined stream of SAM records in chromosome order from a list of
|
||
|
|
# input streams, each in reference order
|
||
|
|
#
|
||
|
|
def rawReadStream( ref, inputs ):
|
||
|
|
"""Returns a single iterable that returns records in reference order
|
||
|
|
from each of the input streams"""
|
||
|
|
def includePriorities( stream ):
|
||
|
|
return imap( lambda x: (x.getPos(), x), stream )
|
||
|
|
def prunePriorities( stream ):
|
||
|
|
return imap( lambda p: SimpleSAM.MappedReadFromSamRecord(ref, p[1]), stream )
|
||
|
|
|
||
|
|
with_priorities = map( includePriorities, inputs )
|
||
|
|
return prunePriorities( heapq.merge( *with_priorities ) )
|
||
|
|
|
||
|
|
#
|
||
|
|
# Just wraps the raw read stream objects in a list as they come out
|
||
|
|
#
|
||
|
|
def readStream( ref, inputs ):
|
||
|
|
"""Returns a single iterable that returns SAM records in reference order
|
||
|
|
from each of the input streams"""
|
||
|
|
rs = rawReadStream( ref, inputs )
|
||
|
|
return imap( lambda x: [x], rs )
|
||
|
|
|
||
|
|
#
|
||
|
|
# More complex stream object. Takes a list of input streams and creates a stream
|
||
|
|
# returning successive loci covered by the reads in the combined input stream.
|
||
|
|
#
|
||
|
|
class lociStream():
|
||
|
|
"""A streaming data structure that returns reads spanning each covered loci in
|
||
|
|
the input reads, offsets into them
|
||
|
|
where the bases are equivalent, and the position of the locus.
|
||
|
|
"""
|
||
|
|
def __init__( self, ref, inputs ):
|
||
|
|
self.ref = ref
|
||
|
|
self.rs = peakStream(rawReadStream( ref, inputs ))
|
||
|
|
self.window = []
|
||
|
|
|
||
|
|
self.chr = None
|
||
|
|
self.pos = None
|
||
|
|
|
||
|
|
def __iter__(self):
|
||
|
|
return self
|
||
|
|
|
||
|
|
def pushRead( self, read ):
|
||
|
|
self.window.append(read)
|
||
|
|
|
||
|
|
def cleanWindow( self, chr, pos ):
|
||
|
|
"""Walk over the window of reads, deleting those who no longer cover the current pos"""
|
||
|
|
if OPTIONS.debug:
|
||
|
|
print 'cleanWindow start:', chr, pos, self.window
|
||
|
|
|
||
|
|
def keepReadP( read ):
|
||
|
|
return read.chr == chr and pos >= read.start and pos <= read.end
|
||
|
|
self.window = filter( keepReadP, self.window )
|
||
|
|
|
||
|
|
if OPTIONS.debug:
|
||
|
|
print 'cleanWindow stop:', chr, pos, self.window
|
||
|
|
|
||
|
|
def expandWindow( self, chr, pos ):
|
||
|
|
"""Keep reading from the read stream until we've added all reads from the stream covering pos"""
|
||
|
|
if OPTIONS.debug:
|
||
|
|
print 'expandWindow start:', chr, pos, self.window
|
||
|
|
for read in self.rs:
|
||
|
|
#print 'read', read, pos
|
||
|
|
if read.chr == chr and read.start <= pos and read.end >= pos:
|
||
|
|
self.pushRead(read)
|
||
|
|
else:
|
||
|
|
self.rs.unget( read )
|
||
|
|
#self.rs = chain( [read], self.rs )
|
||
|
|
break
|
||
|
|
if OPTIONS.debug:
|
||
|
|
print 'expandWindow stop:', chr, pos, self.window
|
||
|
|
|
||
|
|
|
||
|
|
#
|
||
|
|
# This is the workhorse routine. It constructs a window of reads covering the
|
||
|
|
# next locus in the genome, and returns the reference, the contig, its position
|
||
|
|
# in the genome, along with a vector of offsets into the list of reads that denote
|
||
|
|
# the equivalent base among all the reads and reference.
|
||
|
|
#
|
||
|
|
def next(self):
|
||
|
|
if self.pos <> None:
|
||
|
|
self.cleanWindow(self.chr, self.pos) # consume reads not covering pos
|
||
|
|
self.expandWindow(self.chr, self.pos) # add reads to window covering pos
|
||
|
|
|
||
|
|
if self.window == []:
|
||
|
|
# the window is empty, we need to jump to the first pos of the first read in the stream:
|
||
|
|
nextRead = self.rs.next()
|
||
|
|
self.pushRead( nextRead )
|
||
|
|
self.chr = nextRead.chr
|
||
|
|
self.pos = nextRead.start
|
||
|
|
return self.next()
|
||
|
|
else:
|
||
|
|
# at this point, window contains all reads covering the pos, we need to return them
|
||
|
|
# and the offsets into each read for this loci
|
||
|
|
def calcOffset( read ):
|
||
|
|
offset = self.pos - read.start
|
||
|
|
return offset
|
||
|
|
|
||
|
|
offsets = map(calcOffset, self.window)
|
||
|
|
currPos = self.pos
|
||
|
|
self.pos += 1 # we are going to try to get the next position
|
||
|
|
return self.ref, self.chr, currPos, offsets, self.window
|
||
|
|
|
||
|
|
#
|
||
|
|
# Reference reader
|
||
|
|
#
|
||
|
|
def readRef(referenceFasta):
|
||
|
|
ref = {}
|
||
|
|
|
||
|
|
header = None
|
||
|
|
seq = ''
|
||
|
|
for line in open(referenceFasta):
|
||
|
|
if line[0] == '>':
|
||
|
|
if header <> None:
|
||
|
|
ref[header] = seq.lower()
|
||
|
|
seq = ''
|
||
|
|
header = line[1:].strip()
|
||
|
|
else:
|
||
|
|
seq += line.strip()
|
||
|
|
|
||
|
|
ref[header] = seq.lower()
|
||
|
|
#print ref
|
||
|
|
return ref
|
||
|
|
|
||
|
|
#
|
||
|
|
# Main() procedure
|
||
|
|
#
|
||
|
|
def main():
|
||
|
|
global OPTIONS, ROOT
|
||
|
|
|
||
|
|
# ------------------------------------------------------------
|
||
|
|
#
|
||
|
|
# Setup command line options
|
||
|
|
#
|
||
|
|
# ------------------------------------------------------------
|
||
|
|
usage = "usage: %prog [options] Walker [sam or bam file or file list]+"
|
||
|
|
parser = OptionParser(usage=usage)
|
||
|
|
parser.add_option("-r", "--ref", dest="reference",
|
||
|
|
type="string", default=None,
|
||
|
|
help="Reference DNA seqeunce in FASTA format")
|
||
|
|
parser.add_option("-m", "--maxRecords", dest="maxRecords",
|
||
|
|
type=int, default=None,
|
||
|
|
help="Max number of SAM records to process")
|
||
|
|
parser.add_option("-q", "--quiet", dest="quiet",
|
||
|
|
action='store_true', default=False,
|
||
|
|
help="Be quiet when generating output")
|
||
|
|
parser.add_option("-d", "--debug", dest="debug",
|
||
|
|
action='store_true', default=False,
|
||
|
|
help="Verbose debugging output?")
|
||
|
|
|
||
|
|
(OPTIONS, args) = parser.parse_args()
|
||
|
|
print 'args', args
|
||
|
|
if len(args) < 2:
|
||
|
|
parser.error("Incorrect number of arguments")
|
||
|
|
|
||
|
|
|
||
|
|
# ------------------------------------------------------------
|
||
|
|
#
|
||
|
|
# Dynamically load the walker class (the first cmd line arg)
|
||
|
|
# and initialize a walker class object
|
||
|
|
#
|
||
|
|
#
|
||
|
|
# load walkers from standard location
|
||
|
|
#
|
||
|
|
# ------------------------------------------------------------
|
||
|
|
walkerName = args[0]
|
||
|
|
walkerModule = __import__(walkerName, globals(), locals(), [], -1)
|
||
|
|
walkerClass = walkerModule.__dict__[walkerName]
|
||
|
|
walker = walkerClass()
|
||
|
|
|
||
|
|
print walkerName
|
||
|
|
print walkerModule
|
||
|
|
print walkerClass
|
||
|
|
print walker
|
||
|
|
|
||
|
|
print '------------------------------------------------------------'
|
||
|
|
print 'program:', sys.argv[0]
|
||
|
|
print ''
|
||
|
|
print ' ref: ', OPTIONS.reference
|
||
|
|
print ' walker: ', walker
|
||
|
|
if walker.hasOption('name'):
|
||
|
|
print ' -> ', walker.getOption('name')
|
||
|
|
if walker.hasOption('desc'):
|
||
|
|
print ' -> ', walker.getOption('desc')
|
||
|
|
print '------------------------------------------------------------'
|
||
|
|
|
||
|
|
|
||
|
|
# ------------------------------------------------------------
|
||
|
|
#
|
||
|
|
# Initialize the engine
|
||
|
|
#
|
||
|
|
# ------------------------------------------------------------
|
||
|
|
|
||
|
|
# read the reference
|
||
|
|
refs = readRef(OPTIONS.reference)
|
||
|
|
|
||
|
|
# create the low-level SAMIO streams
|
||
|
|
files = args[1:]
|
||
|
|
inputs = map( lambda file: SAMIO( file, debugging=OPTIONS.debug ), files )
|
||
|
|
|
||
|
|
# build the higher level stream object, either by record or by loci
|
||
|
|
if walker.isRecordWalker():
|
||
|
|
stream = readStream( refs, inputs )
|
||
|
|
if walker.isLociWalker():
|
||
|
|
stream = lociStream( refs, inputs )
|
||
|
|
|
||
|
|
# ------------------------------------------------------------
|
||
|
|
#
|
||
|
|
# Move the walker object over the stream
|
||
|
|
#
|
||
|
|
# For each element in the record or loci stream, invoke the walker
|
||
|
|
# object on it. Use filter, map, and reduce to construct the sum
|
||
|
|
#
|
||
|
|
# ------------------------------------------------------------
|
||
|
|
sum = walker.reduceDefault
|
||
|
|
counter = 0
|
||
|
|
for elt in stream:
|
||
|
|
counter += 1
|
||
|
|
#print 'processing elt', counter, elt, OPTIONS.maxRecords
|
||
|
|
if OPTIONS.maxRecords <> None:
|
||
|
|
if (counter * 10) % OPTIONS.maxRecords == 0:
|
||
|
|
print ' => ', (counter * 100.0) / OPTIONS.maxRecords, '% done'
|
||
|
|
|
||
|
|
if counter > OPTIONS.maxRecords:
|
||
|
|
break
|
||
|
|
if walker.filterfunc( *elt ):
|
||
|
|
x = walker.mapfunc( *elt )
|
||
|
|
sum = walker.reducefunc( x, sum )
|
||
|
|
print 'Result is', sum
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
if True:
|
||
|
|
import cProfile, pstats
|
||
|
|
cProfile.run('main()', 'prof')
|
||
|
|
stats = pstats.Stats('prof')
|
||
|
|
stats.sort_stats('time')
|
||
|
|
stats.print_stats(20)
|
||
|
|
else:
|
||
|
|
main()
|
||
|
|
|
||
|
|
|