236 lines
8.7 KiB
Python
236 lines
8.7 KiB
Python
|
|
# VCF Validation Analysis library -- classes and functions to help VCF validation
|
||
|
|
|
||
|
|
header = ["Name","Filtered","Called","False Positive","2+ Alleles","Discordant","True Positives","Now not called singleton"]
|
||
|
|
|
||
|
|
vcf_header = ["CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"]
|
||
|
|
|
||
|
|
def COUNT_ALLELES(vcf_fields):
|
||
|
|
al = 0
|
||
|
|
for i in range(len(vcf_fields) ):
|
||
|
|
if ( i > 8 ):
|
||
|
|
gt = vcf_fields[i].split(":")[0]
|
||
|
|
#print(gt)
|
||
|
|
if ( gt.find("1") != -1 ):
|
||
|
|
al = 1 + al
|
||
|
|
|
||
|
|
return al
|
||
|
|
|
||
|
|
class VCFGenotypeValidator:
|
||
|
|
def __init__(self,name):
|
||
|
|
self.name = name
|
||
|
|
self.chipHets = set()
|
||
|
|
self.chipHoms = set()
|
||
|
|
self.chipRefs = set()
|
||
|
|
self.chipNoCalls = set()
|
||
|
|
self.concordantGenotypes = set()
|
||
|
|
self.falseNegativeGenotypes = set()
|
||
|
|
self.falsePositiveGenotypes = set()
|
||
|
|
self.homsCalledHets = set()
|
||
|
|
self.hetsCalledHoms = set()
|
||
|
|
self.header = "INITIALIZED"
|
||
|
|
|
||
|
|
def getFalsePositives(self):
|
||
|
|
return self.falsePositiveGenotypes
|
||
|
|
|
||
|
|
def setHeader(self, header):
|
||
|
|
try:
|
||
|
|
self.offset = header.index(self.name)
|
||
|
|
self.header = header
|
||
|
|
except ValueError:
|
||
|
|
print("Index error -- name is "+self.name)
|
||
|
|
self.offset = -1
|
||
|
|
|
||
|
|
def importData(self,splitVCFLine):
|
||
|
|
chrpos = splitVCFLine[0]+":"+splitVCFLine[1]
|
||
|
|
genotype = splitVCFLine[self.offset].split(":")[0]
|
||
|
|
if ( genotype == "1/1" ):
|
||
|
|
self.chipHoms.add(chrpos)
|
||
|
|
elif ( genotype == "0/0" ):
|
||
|
|
self.chipRefs.add(chrpos)
|
||
|
|
elif ( genotype.startswith(".") ):
|
||
|
|
self.chipNoCalls.add(chrpos)
|
||
|
|
else:
|
||
|
|
self.chipHets.add(chrpos)
|
||
|
|
|
||
|
|
def checkOffset(self):
|
||
|
|
if ( self.header[self.offset] != self.name ):
|
||
|
|
raise ValueError("Header[offset] is not name")
|
||
|
|
|
||
|
|
def checkOffset(self,newHeader):
|
||
|
|
if ( newHeader[self.offset] != self.name ):
|
||
|
|
raise ValueError("The internal offset is not appropriate for the input header.")
|
||
|
|
|
||
|
|
def checkData(self, splitVCFLine):
|
||
|
|
chrpos = splitVCFLine[0]+":"+splitVCFLine[1]
|
||
|
|
genotype = splitVCFLine[self.offset].split(":")[0]
|
||
|
|
if ( chrpos in self.chipRefs ):
|
||
|
|
self.checkFalsePositive(chrpos,genotype)
|
||
|
|
self.chipRefs.remove(chrpos)
|
||
|
|
elif ( chrpos in self.chipHets ):
|
||
|
|
self.checkChipHet(chrpos,genotype)
|
||
|
|
self.chipHets.remove(chrpos)
|
||
|
|
elif ( chrpos in self.chipHoms ):
|
||
|
|
self.checkChipHom(chrpos,genotype)
|
||
|
|
self.chipHoms.remove(chrpos)
|
||
|
|
# else ignore the site
|
||
|
|
|
||
|
|
def checkFalsePositive(self,chrpos,genotype):
|
||
|
|
if ( genotype.count("1") != 0 ):
|
||
|
|
self.falsePositiveGenotypes.add(chrpos)
|
||
|
|
|
||
|
|
def checkChipHet(self,chrpos,genotype):
|
||
|
|
if ( genotype.count("1") == 2 ):
|
||
|
|
self.hetsCalledHoms.add(chrpos)
|
||
|
|
elif ( genotype.count("1") == 0 ):
|
||
|
|
self.falseNegativeGenotypes.add(chrpos)
|
||
|
|
else:
|
||
|
|
self.concordantGenotypes.add(chrpos)
|
||
|
|
|
||
|
|
def checkChipHom(self,chrpos,genotype):
|
||
|
|
if ( genotype.count("1") == 2 ):
|
||
|
|
self.concordantGenotypes.add(chrpos)
|
||
|
|
elif ( genotype.count("1") == 0 ):
|
||
|
|
self.falseNegativeGenotypes.add(chrpos)
|
||
|
|
else:
|
||
|
|
self.homsCalledHets.add(chrpos)
|
||
|
|
|
||
|
|
def __str__(self):
|
||
|
|
false_negs = str(len(self.falseNegativeGenotypes))
|
||
|
|
false_pos = str(len(self.falsePositiveGenotypes))
|
||
|
|
hetsCalledHoms = str(len(self.hetsCalledHoms))
|
||
|
|
homsCalledHets = str(len(self.homsCalledHets))
|
||
|
|
concordant = str(len(self.concordantGenotypes))
|
||
|
|
variants_called_only_in_chip = str(len(self.chipHets)+len(self.chipHoms))
|
||
|
|
out_fields = [self.name, concordant, homsCalledHets, hetsCalledHoms, false_pos, false_negs, variants_called_only_in_chip]
|
||
|
|
return "\t".join(out_fields)
|
||
|
|
|
||
|
|
|
||
|
|
class VCFSingletonValidator:
|
||
|
|
|
||
|
|
def __init__(self, infokeys, name):
|
||
|
|
self.name = name
|
||
|
|
self.infokeys = infokeys
|
||
|
|
self.numCalls = 0
|
||
|
|
self.concordantTPCalls = 0
|
||
|
|
self.numFalsePositives = 0
|
||
|
|
self.numSingletonsActuallyDoublePlus = 0
|
||
|
|
self.filteredCalls = 0
|
||
|
|
self.falsePositives = list()
|
||
|
|
self.wrongAF = list()
|
||
|
|
self.allSites = list()
|
||
|
|
|
||
|
|
def update(self, vcfLine, args="NONE"):
|
||
|
|
if ( self.useLine(vcfLine) ):
|
||
|
|
self.numCalls = 1 + self.numCalls
|
||
|
|
fields = vcfLine.strip().split("\t")
|
||
|
|
self.allSites.append(fields[0]+":"+fields[1])
|
||
|
|
info = fields[7]
|
||
|
|
info = info.split(";")
|
||
|
|
infodict = dict()
|
||
|
|
for pair in info:
|
||
|
|
keyval = pair.split("=")
|
||
|
|
infodict[keyval[0]]=keyval[1]
|
||
|
|
ref = fields[3]
|
||
|
|
alt = fields[4]
|
||
|
|
numAlleles = int(infodict["nonrefAlleles"])
|
||
|
|
snpName = infodict["snpID"]
|
||
|
|
call = snpName.split("_g")[1].split("_")[0].upper()
|
||
|
|
if ( numAlleles == 0 ):
|
||
|
|
self.numFalsePositives = 1 + self.numFalsePositives
|
||
|
|
self.falsePositives.append(fields[0]+":"+fields[1])
|
||
|
|
else:
|
||
|
|
if ( numAlleles > 1 ):
|
||
|
|
if ( args == "CHECK_NEW_CALL" ):
|
||
|
|
contig = fields[0]+":"+fields[1]
|
||
|
|
newcallAlleles = self.calledCounts.get(contig)
|
||
|
|
if ( int(newcallAlleles > 1) ):
|
||
|
|
self.originalSingletonsNowCalledHigher = 1 + self.originalSingletonsNowCalledHigher
|
||
|
|
else:
|
||
|
|
self.numSingletonsActuallyDoublePlus = 1 + self.numSingletonsActuallyDoublePlus
|
||
|
|
self.wrongAF.append(fields[0]+":"+fields[1])
|
||
|
|
else:
|
||
|
|
self.numSingletonsActuallyDoublePlus = 1 + self.numSingletonsActuallyDoublePlus
|
||
|
|
self.wrongAF.append(fields[0]+":"+fields[1])
|
||
|
|
if ( call.find(alt) != -1 ):
|
||
|
|
self.concordantTPCalls = 1 + self.concordantTPCalls
|
||
|
|
elif ( alt.find(",") == -1 ):
|
||
|
|
print("Discordant site at "+fields[0]+":"+fields[1]+" with call "+call+"and alt "+alt)
|
||
|
|
else:
|
||
|
|
print("Tri allelic site at "+fields[0]+":"+fields[1])
|
||
|
|
|
||
|
|
def useLine(self, vcfLine):
|
||
|
|
for key in self.infokeys:
|
||
|
|
if ( vcfLine.find(key) == -1 ):
|
||
|
|
return False
|
||
|
|
|
||
|
|
if ( vcfLine.split("\t")[6] != "." ):
|
||
|
|
self.filteredCalls = 1 + self.filteredCalls
|
||
|
|
return False
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
def __str__(self):
|
||
|
|
entries = [self.name, str(self.filteredCalls), str(self.numCalls), str(self.numFalsePositives),
|
||
|
|
str(self.numSingletonsActuallyDoublePlus),
|
||
|
|
str(self.numCalls-self.concordantTPCalls-self.numFalsePositives),str(self.concordantTPCalls)]
|
||
|
|
return ("\t".join(entries))
|
||
|
|
|
||
|
|
def falsePositiveSites(self):
|
||
|
|
return "\n".join(self.falsePositives)
|
||
|
|
|
||
|
|
def wrongAFSites(self):
|
||
|
|
return "\n".join(self.wrongAF)
|
||
|
|
|
||
|
|
def printAllSites(self):
|
||
|
|
return "\n".join(self.allSites)
|
||
|
|
|
||
|
|
class ContigFilteredValidator(VCFSingletonValidator):
|
||
|
|
# only need a couple of things
|
||
|
|
def __init__(self,keyset,name,alleledict):
|
||
|
|
VCFSingletonValidator.__init__(self,keyset,name)
|
||
|
|
self.calledCounts = alleledict
|
||
|
|
self.originalSingletonsNowCalledHigher = 0
|
||
|
|
|
||
|
|
def useLine(self, vcfLine):
|
||
|
|
spline = vcfLine.strip().split("\t")
|
||
|
|
contig = spline[0]+":"+spline[1]
|
||
|
|
if ( contig in self.infokeys):
|
||
|
|
if ( vcfLine.split("\t")[6] != "." ):
|
||
|
|
self.filteredCalls = 1 + self.filteredCalls
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
return True
|
||
|
|
else:
|
||
|
|
return False
|
||
|
|
|
||
|
|
def update(self, vcfLine):
|
||
|
|
VCFSingletonValidator.update(self,vcfLine,"CHECK_NEW_CALL")
|
||
|
|
|
||
|
|
def __str__(self):
|
||
|
|
fields = VCFSingletonValidator.__str__(self).split("\t")
|
||
|
|
fields.append(str(self.originalSingletonsNowCalledHigher))
|
||
|
|
return "\t".join(fields)
|
||
|
|
|
||
|
|
class SingletonExclusionValidator(VCFSingletonValidator):
|
||
|
|
# override the useLine function -- must have the first
|
||
|
|
# key but no others
|
||
|
|
def __init__(self,keyset,name):
|
||
|
|
VCFSingletonValidator.__init__(self,keyset,name)
|
||
|
|
|
||
|
|
def useLine(self,vcfLine):
|
||
|
|
if ( vcfLine.find(self.infokeys[0]) == -1 ):
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
for key in self.infokeys:
|
||
|
|
if ( key != self.infokeys[0] and vcfLine.find(key) != -1 ):
|
||
|
|
return False
|
||
|
|
|
||
|
|
if ( vcfLine.split("\t")[6]!="." ):
|
||
|
|
self.filteredCalls = 1 + self.filteredCalls
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
return True
|
||
|
|
|
||
|
|
def update(self, vcfLine):
|
||
|
|
VCFSingletonValidator.update(self,vcfLine)
|