# VCF Validation Analysis library -- classes and functions to help VCF validation header = ["Name","Filtered","Called","False Positive","2+ Alleles","Discordant","True Positives","Now not called singleton"] vcf_header = ["CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"] def COUNT_ALLELES(vcf_fields): al = 0 for i in range(len(vcf_fields) ): if ( i > 8 ): gt = vcf_fields[i].split(":")[0] #print(gt) if ( gt.find("1") != -1 ): al = 1 + al return al class VCFGenotypeValidator: def __init__(self,name): self.name = name self.chipHets = set() self.chipHoms = set() self.chipRefs = set() self.chipNoCalls = set() self.concordantGenotypes = set() self.falseNegativeGenotypes = set() self.falsePositiveGenotypes = set() self.homsCalledHets = set() self.hetsCalledHoms = set() self.header = "INITIALIZED" def getFalsePositives(self): return self.falsePositiveGenotypes def setHeader(self, header): try: self.offset = header.index(self.name) self.header = header except ValueError: print("Index error -- name is "+self.name) self.offset = -1 def importData(self,splitVCFLine): chrpos = splitVCFLine[0]+":"+splitVCFLine[1] genotype = splitVCFLine[self.offset].split(":")[0] if ( genotype == "1/1" ): self.chipHoms.add(chrpos) elif ( genotype == "0/0" ): self.chipRefs.add(chrpos) elif ( genotype.startswith(".") ): self.chipNoCalls.add(chrpos) else: self.chipHets.add(chrpos) def checkOffset(self): if ( self.header[self.offset] != self.name ): raise ValueError("Header[offset] is not name") def checkOffset(self,newHeader): if ( newHeader[self.offset] != self.name ): raise ValueError("The internal offset is not appropriate for the input header.") def checkData(self, splitVCFLine): chrpos = splitVCFLine[0]+":"+splitVCFLine[1] genotype = splitVCFLine[self.offset].split(":")[0] if ( chrpos in self.chipRefs ): self.checkFalsePositive(chrpos,genotype) self.chipRefs.remove(chrpos) elif ( chrpos in self.chipHets ): self.checkChipHet(chrpos,genotype) self.chipHets.remove(chrpos) elif ( chrpos in self.chipHoms ): self.checkChipHom(chrpos,genotype) self.chipHoms.remove(chrpos) # else ignore the site def checkFalsePositive(self,chrpos,genotype): if ( genotype.count("1") != 0 ): self.falsePositiveGenotypes.add(chrpos) def checkChipHet(self,chrpos,genotype): if ( genotype.count("1") == 2 ): self.hetsCalledHoms.add(chrpos) elif ( genotype.count("1") == 0 ): self.falseNegativeGenotypes.add(chrpos) else: self.concordantGenotypes.add(chrpos) def checkChipHom(self,chrpos,genotype): if ( genotype.count("1") == 2 ): self.concordantGenotypes.add(chrpos) elif ( genotype.count("1") == 0 ): self.falseNegativeGenotypes.add(chrpos) else: self.homsCalledHets.add(chrpos) def __str__(self): false_negs = str(len(self.falseNegativeGenotypes)) false_pos = str(len(self.falsePositiveGenotypes)) hetsCalledHoms = str(len(self.hetsCalledHoms)) homsCalledHets = str(len(self.homsCalledHets)) concordant = str(len(self.concordantGenotypes)) variants_called_only_in_chip = str(len(self.chipHets)+len(self.chipHoms)) out_fields = [self.name, concordant, homsCalledHets, hetsCalledHoms, false_pos, false_negs, variants_called_only_in_chip] return "\t".join(out_fields) class VCFSingletonValidator: def __init__(self, infokeys, name): self.name = name self.infokeys = infokeys self.numCalls = 0 self.concordantTPCalls = 0 self.numFalsePositives = 0 self.numSingletonsActuallyDoublePlus = 0 self.filteredCalls = 0 self.falsePositives = list() self.wrongAF = list() self.allSites = list() def update(self, vcfLine, args="NONE"): if ( self.useLine(vcfLine) ): self.numCalls = 1 + self.numCalls fields = vcfLine.strip().split("\t") self.allSites.append(fields[0]+":"+fields[1]) info = fields[7] info = info.split(";") infodict = dict() for pair in info: keyval = pair.split("=") infodict[keyval[0]]=keyval[1] ref = fields[3] alt = fields[4] numAlleles = int(infodict["nonrefAlleles"]) snpName = infodict["snpID"] call = snpName.split("_g")[1].split("_")[0].upper() if ( numAlleles == 0 ): self.numFalsePositives = 1 + self.numFalsePositives self.falsePositives.append(fields[0]+":"+fields[1]) else: if ( numAlleles > 1 ): if ( args == "CHECK_NEW_CALL" ): contig = fields[0]+":"+fields[1] newcallAlleles = self.calledCounts.get(contig) if ( int(newcallAlleles > 1) ): self.originalSingletonsNowCalledHigher = 1 + self.originalSingletonsNowCalledHigher else: self.numSingletonsActuallyDoublePlus = 1 + self.numSingletonsActuallyDoublePlus self.wrongAF.append(fields[0]+":"+fields[1]) else: self.numSingletonsActuallyDoublePlus = 1 + self.numSingletonsActuallyDoublePlus self.wrongAF.append(fields[0]+":"+fields[1]) if ( call.find(alt) != -1 ): self.concordantTPCalls = 1 + self.concordantTPCalls elif ( alt.find(",") == -1 ): print("Discordant site at "+fields[0]+":"+fields[1]+" with call "+call+"and alt "+alt) else: print("Tri allelic site at "+fields[0]+":"+fields[1]) def useLine(self, vcfLine): for key in self.infokeys: if ( vcfLine.find(key) == -1 ): return False if ( vcfLine.split("\t")[6] != "." ): self.filteredCalls = 1 + self.filteredCalls return False return True def __str__(self): entries = [self.name, str(self.filteredCalls), str(self.numCalls), str(self.numFalsePositives), str(self.numSingletonsActuallyDoublePlus), str(self.numCalls-self.concordantTPCalls-self.numFalsePositives),str(self.concordantTPCalls)] return ("\t".join(entries)) def falsePositiveSites(self): return "\n".join(self.falsePositives) def wrongAFSites(self): return "\n".join(self.wrongAF) def printAllSites(self): return "\n".join(self.allSites) class ContigFilteredValidator(VCFSingletonValidator): # only need a couple of things def __init__(self,keyset,name,alleledict): VCFSingletonValidator.__init__(self,keyset,name) self.calledCounts = alleledict self.originalSingletonsNowCalledHigher = 0 def useLine(self, vcfLine): spline = vcfLine.strip().split("\t") contig = spline[0]+":"+spline[1] if ( contig in self.infokeys): if ( vcfLine.split("\t")[6] != "." ): self.filteredCalls = 1 + self.filteredCalls return False else: return True else: return False def update(self, vcfLine): VCFSingletonValidator.update(self,vcfLine,"CHECK_NEW_CALL") def __str__(self): fields = VCFSingletonValidator.__str__(self).split("\t") fields.append(str(self.originalSingletonsNowCalledHigher)) return "\t".join(fields) class SingletonExclusionValidator(VCFSingletonValidator): # override the useLine function -- must have the first # key but no others def __init__(self,keyset,name): VCFSingletonValidator.__init__(self,keyset,name) def useLine(self,vcfLine): if ( vcfLine.find(self.infokeys[0]) == -1 ): return False else: for key in self.infokeys: if ( key != self.infokeys[0] and vcfLine.find(key) != -1 ): return False if ( vcfLine.split("\t")[6]!="." ): self.filteredCalls = 1 + self.filteredCalls return False else: return True def update(self, vcfLine): VCFSingletonValidator.update(self,vcfLine)