Use 2 MongoDB collections (tables): one for INFO/attributes, one for samples/genotypes.

This commit is contained in:
Joel Thibault 2012-04-23 16:38:41 -04:00
parent 04e1be9106
commit db3cd1abd5
3 changed files with 172 additions and 126 deletions

View File

@ -16,11 +16,14 @@ import org.broadinstitute.sting.commandline.RodBinding;
import org.broadinstitute.sting.gatk.contexts.AlignmentContext;
import org.broadinstitute.sting.gatk.contexts.ReferenceContext;
import org.broadinstitute.sting.gatk.refdata.RefMetaDataTracker;
import org.broadinstitute.sting.utils.collections.Pair;
import org.broadinstitute.sting.utils.exceptions.StingException;
import org.broadinstitute.sting.utils.variantcontext.VariantContext;
import java.io.File;
import java.io.PrintStream;
import java.util.Collection;
import java.util.List;
/**
* Inserts all of the RODs in the input data set. Data is inserted using VariantContext.toMongoDB().
@ -35,10 +38,12 @@ public class InsertRODsWalker extends RodWalker<Integer, Integer> {
private final static String MONGO_HOST = "gsa4.broadinstitute.org";
private final static Integer MONGO_PORT = 43054;
private final static String MONGO_DB_NAME = "bjorn";
private final static String MONGO_VC_COLLECTION = "vcs";
private final static String MONGO_ATTRIBUTES_COLLECTION = "attributes";
private final static String MONGO_SAMPLES_COLLECTION = "samples";
protected Mongo mongo;
protected DBCollection mongoCollection;
protected DBCollection mongoAttributes;
protected DBCollection mongoSamples;
private String RODFileName;
@ -47,22 +52,31 @@ public class InsertRODsWalker extends RodWalker<Integer, Integer> {
try {
mongo = new Mongo(MONGO_HOST, MONGO_PORT);
DB mongoDb = mongo.getDB(MONGO_DB_NAME);
mongoCollection = mongoDb.getCollection(MONGO_VC_COLLECTION);
mongoAttributes = mongoDb.getCollection(MONGO_ATTRIBUTES_COLLECTION);
mongoSamples = mongoDb.getCollection(MONGO_SAMPLES_COLLECTION);
RODFileName = input.getSource();
int lastSep = RODFileName.lastIndexOf(File.separator);
RODFileName = RODFileName.substring(lastSep + 1);
// set up indices
mongoCollection.ensureIndex("location");
mongoCollection.ensureIndex("sample");
mongoCollection.ensureIndex("sourceROD");
mongoCollection.ensureIndex("contig");
mongoCollection.ensureIndex("start");
mongoCollection.ensureIndex("stop");
// set up primary key
mongoCollection.ensureIndex(new BasicDBObject("location", 1).append("sample", 1).append("sourceROD", 1).append("alleles", 1), new BasicDBObject("unique", 1));
mongoAttributes.ensureIndex("location");
mongoAttributes.ensureIndex("sourceROD");
mongoAttributes.ensureIndex("contig");
mongoAttributes.ensureIndex("start");
mongoAttributes.ensureIndex("stop");
mongoSamples.ensureIndex("location");
mongoSamples.ensureIndex("sample");
mongoSamples.ensureIndex("sourceROD");
mongoSamples.ensureIndex("contig");
mongoSamples.ensureIndex("start");
mongoSamples.ensureIndex("stop");
// set up primary keys
mongoAttributes.ensureIndex(new BasicDBObject("location", 1).append("sourceROD", 1).append("alleles", 1), new BasicDBObject("unique", 1));
mongoSamples.ensureIndex(new BasicDBObject("location", 1).append("sourceROD", 1).append("alleles", 1).append("sample", 1), new BasicDBObject("unique", 1));
}
catch (MongoException e) {
throw e;
@ -93,11 +107,13 @@ public class InsertRODsWalker extends RodWalker<Integer, Integer> {
for ( Feature feature : tracker.getValues(Feature.class, context.getLocation()) ) {
if ( feature instanceof VariantContext ) {
VariantContext vc = (VariantContext) feature;
for (BasicDBObject vcForMongo : vc.toMongoDB(RODFileName)) {
mongoCollection.insert(vcForMongo);
Pair<BasicDBObject,List<BasicDBObject>> mongoCollections = vc.toMongoDB(RODFileName);
mongoAttributes.insert(mongoCollections.first);
for (BasicDBObject sampleForMongo : mongoCollections.second) {
mongoSamples.insert(sampleForMongo);
}
}
}
return 1;

View File

@ -352,10 +352,12 @@ public class SelectVariants extends RodWalker<Integer, Integer> implements TreeR
private final static String MONGO_HOST = "gsa4.broadinstitute.org";
private final static Integer MONGO_PORT = 43054;
private final static String MONGO_DB_NAME = "bjorn";
private final static String MONGO_VC_COLLECTION = "vcs";
private final static String MONGO_ATTRIBUTES_COLLECTION = "attributes";
private final static String MONGO_SAMPLES_COLLECTION = "samples";
protected Mongo mongo;
protected DBCollection mongoCollection;
protected DBCollection mongoAttributes;
protected DBCollection mongoSamples;
/**
* Set up the VCF writer, the sample expressions and regexs, and the JEXL matcher
@ -460,7 +462,8 @@ public class SelectVariants extends RodWalker<Integer, Integer> implements TreeR
try {
mongo = new Mongo(MONGO_HOST, MONGO_PORT);
DB mongoDb = mongo.getDB(MONGO_DB_NAME);
mongoCollection = mongoDb.getCollection(MONGO_VC_COLLECTION);
mongoAttributes = mongoDb.getCollection(MONGO_ATTRIBUTES_COLLECTION);
mongoSamples = mongoDb.getCollection(MONGO_SAMPLES_COLLECTION);
}
catch (MongoException e) {
throw e;
@ -568,14 +571,14 @@ public class SelectVariants extends RodWalker<Integer, Integer> implements TreeR
query.put("start", start);
// can't know stop location for deletions from reference
DBCursor cursor = mongoCollection.find(query);
Map<Pair<String,List<Allele>>,DBObject> results = new HashMap<Pair<String,List<Allele>>,DBObject>();
Map<Pair<String,List<Allele>>,List<Genotype>> genotypes = new HashMap<Pair<String,List<Allele>>,List<Genotype>>();
DBCursor attributesCursor = mongoAttributes.find(query);
DBCursor samplesCursor = mongoSamples.find(query);
while(cursor.hasNext()) {
DBObject oneResult = cursor.next();
Map<Pair<String,List<Allele>>,VariantContextBuilder> attributesFromDB = new HashMap<Pair<String,List<Allele>>,VariantContextBuilder>();
while(attributesCursor.hasNext()) {
DBObject oneResult = attributesCursor.next();
String sample = (String)oneResult.get("sample");
String sourceROD = (String)oneResult.get("sourceROD");
ArrayList<Allele> alleles = new ArrayList<Allele>();
@ -587,6 +590,69 @@ public class SelectVariants extends RodWalker<Integer, Integer> implements TreeR
alleles.add(Allele.create(allele, isRef));
}
// primary key to uniquely identify variant
Pair<String, List<Allele>> sourceRodAllelePair = new Pair<String, List<Allele>>(sourceROD, alleles);
Map<String, Object> attributes = new TreeMap<String, Object>();
BasicDBList attrsInDb = (BasicDBList)oneResult.get("attributes");
for (Object attrInDb : attrsInDb) {
BasicDBObject attrKVP = (BasicDBObject)attrInDb;
String key = (String)attrKVP.get("key");
Object value = attrKVP.get("value");
attributes.put(key, value);
}
Set<String> filters = new HashSet<String>();
BasicDBObject filtersInDb = (BasicDBObject)oneResult.get("filters");
if (filtersInDb != null) {
for (Object filterInDb : filtersInDb.values()) {
filters.add((String)filterInDb);
}
}
String source = (String)oneResult.get("source");
String id = (String)oneResult.get("id");
Double error = (Double)oneResult.get("error");
Long stop = (Long)oneResult.get("stop");
VariantContextBuilder builder = new VariantContextBuilder(source, contig, start, stop, sourceRodAllelePair.getSecond());
builder.id(id);
builder.log10PError(error);
builder.attributes(attributes);
builder.filters(filters);
long index = start - ref.getWindow().getStart() - 1;
if ( index >= 0 ) {
// we were given enough reference context to create the VariantContext
builder.referenceBaseForIndel(ref.getBases()[(int)index]); // TODO: needed?
}
builder.referenceBaseForIndel(ref.getBases()[0]); // TODO: correct?
attributesFromDB.put(sourceRodAllelePair, builder);
}
while(samplesCursor.hasNext()) {
DBObject oneResult = samplesCursor.next();
String sourceROD = (String)oneResult.get("sourceROD");
ArrayList<Allele> alleles = new ArrayList<Allele>();
BasicDBObject allelesInDb = (BasicDBObject)oneResult.get("alleles");
for (Object alleleInDb : allelesInDb.values()) {
String rawAllele = (String)alleleInDb;
boolean isRef = rawAllele.contains("*");
String allele = rawAllele.replace("*", "");
alleles.add(Allele.create(allele, isRef));
}
// primary key to uniquely identify variant
Pair<String, List<Allele>> sourceRodAllelePair = new Pair<String, List<Allele>>(sourceROD, alleles);
VariantContextBuilder builder = attributesFromDB.get(sourceRodAllelePair);
String sample = (String)oneResult.get("sample");
BasicDBObject genotypeInDb = (BasicDBObject)oneResult.get("genotype");
Double genotypeError = (Double)genotypeInDb.get("error");
@ -609,59 +675,7 @@ public class SelectVariants extends RodWalker<Integer, Integer> implements TreeR
}
Genotype genotype = new Genotype(sample, genotypeAlleles, genotypeError);
// primary key to uniquely identify variant
Pair<String, List<Allele>> sourceRodAllelePair = new Pair<String, List<Allele>>(sourceROD, alleles);
if (!genotypes.containsKey(sourceRodAllelePair))
genotypes.put(sourceRodAllelePair, new ArrayList<Genotype>());
Collection<Genotype> genotypesBySourceROD = genotypes.get(sourceRodAllelePair);
genotypesBySourceROD.add(Genotype.modifyAttributes(genotype, genotypeAttributes));
results.put(sourceRodAllelePair, oneResult);
}
for (Pair<String, List<Allele>> sourceRodAllelePair : results.keySet()) {
DBObject result = results.get(sourceRodAllelePair);
Map<String, Object> attributes = new TreeMap<String, Object>();
BasicDBList attrsInDb = (BasicDBList)result.get("attributes");
for (Object attrInDb : attrsInDb) {
BasicDBObject attrKVP = (BasicDBObject)attrInDb;
String key = (String)attrKVP.get("key");
Object value = attrKVP.get("value");
attributes.put(key, value);
}
Set<String> filters = new HashSet<String>();
BasicDBObject filtersInDb = (BasicDBObject)result.get("filters");
if (filtersInDb != null) {
for (Object filterInDb : filtersInDb.values()) {
filters.add((String)filterInDb);
}
}
String source = (String)result.get("source");
String id = (String)result.get("id");
Double error = (Double)result.get("error");
Long stop = (Long)result.get("stop");
VariantContextBuilder builder = new VariantContextBuilder(source, contig, start, stop, sourceRodAllelePair.getSecond());
builder.id(id);
builder.log10PError(error);
builder.genotypes(genotypes.get(sourceRodAllelePair));
builder.attributes(attributes);
builder.filters(filters);
long index = start - ref.getWindow().getStart() - 1;
if ( index >= 0 ) {
// we were given enough reference context to create the VariantContext
builder.referenceBaseForIndel(ref.getBases()[(int)index]);
}
builder.referenceBaseForIndel(ref.getBases()[0]);
builder.genotypes(Genotype.modifyAttributes(genotype, genotypeAttributes));
vcs.add(builder.make());
}
@ -827,6 +841,7 @@ public class SelectVariants extends RodWalker<Integer, Integer> implements TreeR
}
public void onTraversalDone(Integer result) {
mongo.close();
logger.info(result + " records processed.");
if (SELECT_RANDOM_NUMBER) {

View File

@ -7,6 +7,7 @@ import org.broad.tribble.TribbleException;
import org.broad.tribble.util.ParsingUtils;
import org.broadinstitute.sting.utils.GenomeLoc;
import org.broadinstitute.sting.utils.codecs.vcf.VCFConstants;
import org.broadinstitute.sting.utils.collections.Pair;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import java.util.*;
@ -1221,42 +1222,68 @@ public class VariantContext implements Feature { // to enable tribble integratio
this.getGenotypes());
}
public List<BasicDBObject> toMongoDB(String sourceROD) {
List<BasicDBObject> vcDocs = new ArrayList<BasicDBObject>();
/**
* Generate a Mongo DB attributes collection element and a set of samples collection elements
* @param sourceROD
* @return
*/
public Pair<BasicDBObject,List<BasicDBObject>> toMongoDB(String sourceROD) {
// fields common to both attributes and samples collections
BasicDBObject siteDoc = new BasicDBObject();
siteDoc.put("location", contig + ":" + (start - stop == 0 ? start : start + "-" + stop));
siteDoc.put("contig", contig);
siteDoc.put("start", start);
siteDoc.put("stop", stop);
siteDoc.put("id", this.getID());
siteDoc.put("error", this.getLog10PError());
siteDoc.put("source", this.getSource());
siteDoc.put("sourceROD", sourceROD);
siteDoc.put("type", this.getType().toString());
Integer alleleIndex = 0;
BasicDBObject allelesDoc = new BasicDBObject();
for (Allele allele : this.getAlleles())
{
String index = alleleIndex.toString();
allelesDoc.put(index, allele.toString());
alleleIndex++;
}
siteDoc.put("alleles", allelesDoc);
Integer filterIndex = 0;
BasicDBObject filtersDoc = new BasicDBObject();
for (String filter : this.getFilters())
{
String index = filterIndex.toString();
filtersDoc.put(index, filter.toString());
filterIndex++;
}
if (filterIndex > 0) {
siteDoc.put("filters", filtersDoc);
}
// attributes collection
BasicDBObject attributesDoc = new BasicDBObject(siteDoc);
List<BasicDBObject> attributeKVPs = new ArrayList<BasicDBObject>();
for (Map.Entry<String, Object> attribute : this.getAttributes().entrySet() )
{
String key = attribute.getKey();
Object value = attribute.getValue();
BasicDBObject attributeKVP = new BasicDBObject();
attributeKVP.put("key", key);
attributeKVP.put("value", value);
attributeKVPs.add(attributeKVP);
}
attributesDoc.put("attributes", attributeKVPs);
// samples collection
List<BasicDBObject> samplesDocs = new ArrayList<BasicDBObject>();
for (Genotype genotype : this.getGenotypes()) {
BasicDBObject vcDoc = new BasicDBObject();
vcDoc.put("location", contig + ":" + (start - stop == 0 ? start : start + "-" + stop));
vcDoc.put("contig", contig);
vcDoc.put("start", start);
vcDoc.put("stop", stop);
vcDoc.put("id", this.getID());
vcDoc.put("error", this.getLog10PError());
vcDoc.put("sample", genotype.getSampleName());
vcDoc.put("source", this.getSource());
vcDoc.put("sourceROD", sourceROD);
vcDoc.put("type", this.getType().toString());
Integer alleleIndex = 0;
BasicDBObject allelesDoc = new BasicDBObject();
for (Allele allele : this.getAlleles())
{
String index = alleleIndex.toString();
allelesDoc.put(index, allele.toString());
alleleIndex++;
}
vcDoc.put("alleles", allelesDoc);
List<BasicDBObject> attributesDocs = new ArrayList<BasicDBObject>();
for (Map.Entry<String, Object> attribute : this.getAttributes().entrySet() )
{
String key = attribute.getKey();
Object value = attribute.getValue();
BasicDBObject attributesDoc = new BasicDBObject();
attributesDoc.put("key", key);
attributesDoc.put("value", value);
attributesDocs.add(attributesDoc);
}
vcDoc.put("attributes", attributesDocs);
BasicDBObject sampleDoc = new BasicDBObject(siteDoc);
sampleDoc.put("sample", genotype.getSampleName());
BasicDBObject genotypesDoc = new BasicDBObject();
Integer genotypeAlleleIndex = 0;
@ -1282,24 +1309,12 @@ public class VariantContext implements Feature { // to enable tribble integratio
genotypesDoc.put("attributes", genotypesAttributesDocs);
genotypesDoc.put("error", genotype.getLog10PError());
vcDoc.put("genotype", genotypesDoc);
sampleDoc.put("genotype", genotypesDoc);
Integer filterIndex = 0;
BasicDBObject filtersDoc = new BasicDBObject();
for (String filter : this.getFilters())
{
String index = filterIndex.toString();
filtersDoc.put(index, filter.toString());
filterIndex++;
}
if (filterIndex > 0) {
vcDoc.put("filters", filtersDoc);
}
vcDocs.add(vcDoc);
samplesDocs.add(sampleDoc);
}
return vcDocs;
return new Pair<BasicDBObject,List<BasicDBObject>>(attributesDoc, samplesDocs);
}
// protected basic manipulation routines