From 3642a73c07d9f6da85ee59c62ea3a8c9587a364e Mon Sep 17 00:00:00 2001 From: Matt Hanna Date: Fri, 16 Dec 2011 09:37:44 -0500 Subject: [PATCH] Performance improvements for dynamically merging BAMs in read walkers. This change and my previous change have dropped runtime when dynamically merging 2k BAM files from 72.6min/1M reads to 46.8sec/1M reads. Note that many of these changes are stopgaps -- the real problem is the way ReadWalkers interface with Picard, and I'll have to work with Tim&Co to produce a more maintainable patch. --- .../picard/sam/MergingSamRecordIterator.java | 247 ++++++ .../sf/picard/sam/SamFileHeaderMerger.java | 744 ++++++++++++++++++ .../gatk/datasources/reads/SAMDataSource.java | 34 +- 3 files changed, 1011 insertions(+), 14 deletions(-) create mode 100644 public/java/src/net/sf/picard/sam/MergingSamRecordIterator.java create mode 100644 public/java/src/net/sf/picard/sam/SamFileHeaderMerger.java diff --git a/public/java/src/net/sf/picard/sam/MergingSamRecordIterator.java b/public/java/src/net/sf/picard/sam/MergingSamRecordIterator.java new file mode 100644 index 000000000..4b1c7a999 --- /dev/null +++ b/public/java/src/net/sf/picard/sam/MergingSamRecordIterator.java @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2011, The Broad Institute + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ +package net.sf.picard.sam; + +import net.sf.picard.PicardException; + +import java.util.*; +import java.lang.reflect.Constructor; + +import net.sf.samtools.*; +import net.sf.samtools.util.CloseableIterator; + +/** + * Provides an iterator interface for merging multiple underlying iterators into a single + * iterable stream. The underlying iterators/files must all have the same sort order unless + * the requested output format is unsorted, in which case any combination is valid. + */ +public class MergingSamRecordIterator implements CloseableIterator { + private final PriorityQueue pq; + private final SamFileHeaderMerger samHeaderMerger; + private final Collection readers; + private final SAMFileHeader.SortOrder sortOrder; + private final SAMRecordComparator comparator; + + private boolean initialized = false; + private boolean iterationStarted = false; + + /** + * Constructs a new merging iterator with the same set of readers and sort order as + * provided by the header merger parameter. + * @param headerMerger The merged header and contents of readers. + * @param forcePresorted True to ensure that the iterator checks the headers of the readers for appropriate sort order. + * @deprecated replaced by (SamFileHeaderMerger, Collection, boolean) + */ + public MergingSamRecordIterator(final SamFileHeaderMerger headerMerger, final boolean forcePresorted) { + this(headerMerger, headerMerger.getReaders(), forcePresorted); + } + + /** + * Constructs a new merging iterator with the same set of readers and sort order as + * provided by the header merger parameter. + * @param headerMerger The merged header and contents of readers. + * @param assumeSorted false ensures that the iterator checks the headers of the readers for appropriate sort order. + */ + public MergingSamRecordIterator(final SamFileHeaderMerger headerMerger, Collection readers, final boolean assumeSorted) { + this.samHeaderMerger = headerMerger; + this.sortOrder = headerMerger.getMergedHeader().getSortOrder(); + this.comparator = getComparator(); + this.readers = readers; + + this.pq = new PriorityQueue(readers.size()); + + for (final SAMFileReader reader : readers) { + if (!assumeSorted && this.sortOrder != SAMFileHeader.SortOrder.unsorted && + reader.getFileHeader().getSortOrder() != this.sortOrder){ + throw new PicardException("Files are not compatible with sort order"); + } + } + } + + /** + * Add a given SAM file iterator to the merging iterator. Use this to restrict the merged iteration to a given genomic interval, + * rather than iterating over every read in the backing file or stream. + * @param reader Reader to add to the merging iterator. + * @param iterator Iterator traversing over reader contents. + */ + public void addIterator(final SAMFileReader reader, final CloseableIterator iterator) { + if(iterationStarted) + throw new PicardException("Cannot add another iterator; iteration has already begun"); + if(!samHeaderMerger.containsHeader(reader.getFileHeader())) + throw new PicardException("All iterators to be merged must be accounted for in the SAM header merger"); + final ComparableSamRecordIterator comparableIterator = new ComparableSamRecordIterator(reader,iterator,comparator); + addIfNotEmpty(comparableIterator); + initialized = true; + } + + private void startIterationIfRequired() { + if(initialized) + return; + for(SAMFileReader reader: readers) + addIterator(reader,reader.iterator()); + iterationStarted = true; + } + + /** + * Close down all open iterators. + */ + public void close() { + // Iterators not in the priority queue have already been closed; only close down the iterators that are still in the priority queue. + for(CloseableIterator iterator: pq) + iterator.close(); + } + + /** Returns true if any of the underlying iterators has more records, otherwise false. */ + public boolean hasNext() { + startIterationIfRequired(); + return !this.pq.isEmpty(); + } + + /** Returns the next record from the top most iterator during merging. */ + public SAMRecord next() { + startIterationIfRequired(); + + final ComparableSamRecordIterator iterator = this.pq.poll(); + final SAMRecord record = iterator.next(); + addIfNotEmpty(iterator); + record.setHeader(this.samHeaderMerger.getMergedHeader()); + + // Fix the read group if needs be + if (this.samHeaderMerger.hasReadGroupCollisions()) { + final String oldGroupId = (String) record.getAttribute(ReservedTagConstants.READ_GROUP_ID); + if (oldGroupId != null ) { + final String newGroupId = this.samHeaderMerger.getReadGroupId(iterator.getReader().getFileHeader(),oldGroupId); + record.setAttribute(ReservedTagConstants.READ_GROUP_ID, newGroupId); + } + } + + // Fix the program group if needs be + if (this.samHeaderMerger.hasProgramGroupCollisions()) { + final String oldGroupId = (String) record.getAttribute(ReservedTagConstants.PROGRAM_GROUP_ID); + if (oldGroupId != null ) { + final String newGroupId = this.samHeaderMerger.getProgramGroupId(iterator.getReader().getFileHeader(),oldGroupId); + record.setAttribute(ReservedTagConstants.PROGRAM_GROUP_ID, newGroupId); + } + } + + // Fix up the sequence indexes if needs be + if (this.samHeaderMerger.hasMergedSequenceDictionary()) { + if (record.getReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) { + record.setReferenceIndex(this.samHeaderMerger.getMergedSequenceIndex(iterator.getReader().getFileHeader(),record.getReferenceIndex())); + } + + if (record.getReadPairedFlag() && record.getMateReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) { + record.setMateReferenceIndex(this.samHeaderMerger.getMergedSequenceIndex(iterator.getReader().getFileHeader(),record.getMateReferenceIndex())); + } + } + + return record; + } + + /** + * Adds iterator to priority queue. If the iterator has more records it is added + * otherwise it is closed and not added. + */ + private void addIfNotEmpty(final ComparableSamRecordIterator iterator) { + if (iterator.hasNext()) { + pq.offer(iterator); + } + else { + iterator.close(); + } + } + + /** Unsupported operation. */ + public void remove() { + throw new UnsupportedOperationException("MergingSAMRecorderIterator.remove()"); + } + + /** + * Get the right comparator for a given sort order (coordinate, alphabetic). In the + * case of "unsorted" it will return a comparator that gives an arbitrary but reflexive + * ordering. + */ + private SAMRecordComparator getComparator() { + // For unsorted build a fake comparator that compares based on object ID + if (this.sortOrder == SAMFileHeader.SortOrder.unsorted) { + return new SAMRecordComparator() { + public int fileOrderCompare(final SAMRecord lhs, final SAMRecord rhs) { + return System.identityHashCode(lhs) - System.identityHashCode(rhs); + } + + public int compare(final SAMRecord lhs, final SAMRecord rhs) { + return fileOrderCompare(lhs, rhs); + } + }; + } + if (samHeaderMerger.hasMergedSequenceDictionary() && sortOrder.equals(SAMFileHeader.SortOrder.coordinate)) { + return new MergedSequenceDictionaryCoordinateOrderComparator(); + } + + // Otherwise try and figure out what kind of comparator to return and build it + return this.sortOrder.getComparatorInstance(); + } + + /** Returns the merged header that the merging iterator is working from. */ + public SAMFileHeader getMergedHeader() { + return this.samHeaderMerger.getMergedHeader(); + } + + /** + * Ugh. Basically does a regular coordinate compare, but looks up the sequence indices in the merged + * sequence dictionary. I hate the fact that this extends SAMRecordCoordinateComparator, but it avoids + * more copy & paste. + */ + private class MergedSequenceDictionaryCoordinateOrderComparator extends SAMRecordCoordinateComparator { + + public int fileOrderCompare(final SAMRecord samRecord1, final SAMRecord samRecord2) { + final int referenceIndex1 = getReferenceIndex(samRecord1); + final int referenceIndex2 = getReferenceIndex(samRecord2); + if (referenceIndex1 != referenceIndex2) { + if (referenceIndex1 == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) { + return 1; + } else if (referenceIndex2 == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) { + return -1; + } else { + return referenceIndex1 - referenceIndex2; + } + } + if (referenceIndex1 == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) { + // Both are unmapped. + return 0; + } + return samRecord1.getAlignmentStart() - samRecord2.getAlignmentStart(); + } + + private int getReferenceIndex(final SAMRecord samRecord) { + if (samRecord.getReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) { + return samHeaderMerger.getMergedSequenceIndex(samRecord.getHeader(), samRecord.getReferenceIndex()); + } + if (samRecord.getMateReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) { + return samHeaderMerger.getMergedSequenceIndex(samRecord.getHeader(), samRecord.getMateReferenceIndex()); + } + return SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX; + } + } +} diff --git a/public/java/src/net/sf/picard/sam/SamFileHeaderMerger.java b/public/java/src/net/sf/picard/sam/SamFileHeaderMerger.java new file mode 100644 index 000000000..f78cd81da --- /dev/null +++ b/public/java/src/net/sf/picard/sam/SamFileHeaderMerger.java @@ -0,0 +1,744 @@ +/* + * The MIT License + * + * Copyright (c) 2009 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package net.sf.picard.sam; + +import java.util.*; + +import net.sf.picard.PicardException; +import net.sf.samtools.AbstractSAMHeaderRecord; +import net.sf.samtools.SAMFileHeader; +import net.sf.samtools.SAMFileReader; +import net.sf.samtools.SAMProgramRecord; +import net.sf.samtools.SAMReadGroupRecord; +import net.sf.samtools.SAMSequenceDictionary; +import net.sf.samtools.SAMSequenceRecord; +import net.sf.samtools.util.SequenceUtil; + +/** + * Merges SAMFileHeaders that have the same sequences into a single merged header + * object while providing read group translation for cases where read groups + * clash across input headers. + */ +public class SamFileHeaderMerger { + //Super Header to construct + private final SAMFileHeader mergedHeader; + private Collection readers; + private final Collection headers; + + //Translation of old group ids to new group ids + private final Map> samReadGroupIdTranslation = + new IdentityHashMap>(); + + //the read groups from different files use the same group ids + private boolean hasReadGroupCollisions = false; + + //the program records from different files use the same program record ids + private boolean hasProgramGroupCollisions = false; + + //Translation of old program group ids to new program group ids + private Map> samProgramGroupIdTranslation = + new IdentityHashMap>(); + + private boolean hasMergedSequenceDictionary = false; + + // Translation of old sequence dictionary ids to new dictionary ids + // This is an IdentityHashMap because it can be quite expensive to compute the hashCode for + // large SAMFileHeaders. It is possible that two input files will have identical headers so that + // the regular HashMap would fold them together, but the value stored in each of the two + // Map entries will be the same, so it should not hurt anything. + private final Map> samSeqDictionaryIdTranslationViaHeader = + new IdentityHashMap>(); + + //HeaderRecordFactory that creates SAMReadGroupRecord instances. + private static final HeaderRecordFactory READ_GROUP_RECORD_FACTORY = new HeaderRecordFactory() { + public SAMReadGroupRecord createRecord(String id, SAMReadGroupRecord srcReadGroupRecord) { + return new SAMReadGroupRecord(id, srcReadGroupRecord); + } + }; + + //HeaderRecordFactory that creates SAMProgramRecord instances. + private static final HeaderRecordFactory PROGRAM_RECORD_FACTORY = new HeaderRecordFactory() { + public SAMProgramRecord createRecord(String id, SAMProgramRecord srcProgramRecord) { + return new SAMProgramRecord(id, srcProgramRecord); + } + }; + + //comparator used to sort lists of program group and read group records + private static final Comparator RECORD_ID_COMPARATOR = new Comparator() { + public int compare(AbstractSAMHeaderRecord o1, AbstractSAMHeaderRecord o2) { + return o1.getId().compareTo(o2.getId()); + } + }; + + /** + * Create SAMFileHeader with additional information. Required that sequence dictionaries agree. + * + * @param readers sam file readers to combine + * @param sortOrder sort order new header should have + * @deprecated replaced by SamFileHeaderMerger(Collection, SAMFileHeader.SortOrder, boolean) + */ + public SamFileHeaderMerger(final Collection readers, final SAMFileHeader.SortOrder sortOrder) { + this(readers, sortOrder, false); + } + + /** + * Create SAMFileHeader with additional information. + * + * @param readers sam file readers to combine + * @param sortOrder sort order new header should have + * @param mergeDictionaries If true, merge sequence dictionaries in new header. If false, require that + * all input sequence dictionaries be identical. + * @deprecated replaced by SamFileHeaderMerger(Collection, SAMFileHeader.SortOrder, boolean) + */ + public SamFileHeaderMerger(final Collection readers, final SAMFileHeader.SortOrder sortOrder, final boolean mergeDictionaries) { + this(sortOrder, getHeadersFromReaders(readers), mergeDictionaries); + this.readers = readers; + } + + /** + * Create SAMFileHeader with additional information.. This is the preferred constructor. + * + * @param sortOrder sort order new header should have + * @param headers sam file headers to combine + * @param mergeDictionaries If true, merge sequence dictionaries in new header. If false, require that + * all input sequence dictionaries be identical. + */ + public SamFileHeaderMerger(final SAMFileHeader.SortOrder sortOrder, final Collection headers, final boolean mergeDictionaries) { + this.headers = headers; + this.mergedHeader = new SAMFileHeader(); + + SAMSequenceDictionary sequenceDictionary; + try { + sequenceDictionary = getSequenceDictionary(headers); + this.hasMergedSequenceDictionary = false; + } + catch (SequenceUtil.SequenceListsDifferException pe) { + if (mergeDictionaries) { + sequenceDictionary = mergeSequenceDictionaries(headers); + this.hasMergedSequenceDictionary = true; + } + else { + throw pe; + } + } + + this.mergedHeader.setSequenceDictionary(sequenceDictionary); + + // Set program that creates input alignments + for (final SAMProgramRecord program : mergeProgramGroups(headers)) { + this.mergedHeader.addProgramRecord(program); + } + + // Set read groups for merged header + final List readGroups = mergeReadGroups(headers); + this.mergedHeader.setReadGroups(readGroups); + this.mergedHeader.setGroupOrder(SAMFileHeader.GroupOrder.none); + + this.mergedHeader.setSortOrder(sortOrder); + + for (final SAMFileHeader header : headers) { + for (final String comment : header.getComments()) { + this.mergedHeader.addComment(comment); + } + } + } + + // Utilility method to make use with old constructor + private static List getHeadersFromReaders(Collection readers) { + List headers = new ArrayList(readers.size()); + for (SAMFileReader reader : readers) { + headers.add(reader.getFileHeader()); + } + return headers; + } + + + /** + * Checks to see if there are clashes where different readers are using the same read + * group IDs. If yes, then those IDs that collided are remapped. + * + * @param headers headers to combine + * @return new list of read groups constructed from all the readers + */ + private List mergeReadGroups(final Collection headers) { + //prepare args for mergeHeaderRecords(..) call + final HashSet idsThatAreAlreadyTaken = new HashSet(); + + final List> readGroupsToProcess = new LinkedList>(); + for (final SAMFileHeader header : headers) { + for (final SAMReadGroupRecord readGroup : header.getReadGroups()) { + //verify that there are no existing id collisions in this input file + if(!idsThatAreAlreadyTaken.add(readGroup.getId())) + throw new PicardException("Input file: " + header + " contains more than one RG with the same id (" + readGroup.getId() + ")"); + + readGroupsToProcess.add(new HeaderRecordAndFileHeader(readGroup, header)); + } + idsThatAreAlreadyTaken.clear(); + } + + final List result = new LinkedList(); + + hasReadGroupCollisions = mergeHeaderRecords(readGroupsToProcess, READ_GROUP_RECORD_FACTORY, idsThatAreAlreadyTaken, samReadGroupIdTranslation, result); + + //sort the result list by record id + Collections.sort(result, RECORD_ID_COMPARATOR); + + return result; + } + + + /** + * Checks to see if there are clashes where different readers are using the same program + * group IDs. If yes, then those IDs that collided are remapped. + * + * @param headers headers to combine + * @return new list of program groups constructed from all the readers + */ + private List mergeProgramGroups(final Collection headers) { + + final List overallResult = new LinkedList(); + + //this Set will accumulate all SAMProgramRecord ids that have been encountered so far. + final HashSet idsThatAreAlreadyTaken = new HashSet(); + + //need to process all program groups + List> programGroupsLeftToProcess = new LinkedList>(); + for (final SAMFileHeader header : headers) { + for (final SAMProgramRecord programGroup : header.getProgramRecords()) { + //verify that there are no existing id collisions in this input file + if(!idsThatAreAlreadyTaken.add(programGroup.getId())) + throw new PicardException("Input file: " + header + " contains more than one PG with the same id (" + programGroup.getId() + ")"); + + programGroupsLeftToProcess.add(new HeaderRecordAndFileHeader(programGroup, header)); + } + idsThatAreAlreadyTaken.clear(); + } + + //A program group header (lets say ID=2 PN=B PP=1) may have a PP (previous program) attribute which chains it to + //another program group header (lets say ID=1 PN=A) to indicate that the given file was + //processed by program A followed by program B. These PP attributes potentially + //connect headers into one or more tree structures. Merging is done by + //first merging all headers that don't have PP attributes (eg. tree roots), + //then updating and merging all headers whose PPs point to the tree-root headers, + //and so on until all program group headers are processed. + + //currentProgramGroups is the list of records to merge next. Start by merging the programGroups that don't have a PP attribute (eg. the tree roots). + List< HeaderRecordAndFileHeader > currentProgramGroups = new LinkedList>(); + for(final Iterator> programGroupsLeftToProcessIterator = programGroupsLeftToProcess.iterator(); programGroupsLeftToProcessIterator.hasNext(); ) { + final HeaderRecordAndFileHeader pair = programGroupsLeftToProcessIterator.next(); + if(pair.getHeaderRecord().getAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG) == null) { + programGroupsLeftToProcessIterator.remove(); + currentProgramGroups.add(pair); + } + } + + //merge currentProgramGroups + while(!currentProgramGroups.isEmpty()) + { + final List currentResult = new LinkedList(); + + hasProgramGroupCollisions |= mergeHeaderRecords(currentProgramGroups, PROGRAM_RECORD_FACTORY, idsThatAreAlreadyTaken, samProgramGroupIdTranslation, currentResult); + + //add currentResults to overallResults + overallResult.addAll(currentResult); + + //apply the newly-computed id translations to currentProgramGroups and programGroupsLeftToProcess + currentProgramGroups = translateIds(currentProgramGroups, samProgramGroupIdTranslation, false); + programGroupsLeftToProcess = translateIds(programGroupsLeftToProcess, samProgramGroupIdTranslation, true); + + //find all records in programGroupsLeftToProcess whose ppId points to a record that was just processed (eg. a record that's in currentProgramGroups), + //and move them to the list of programGroupsToProcessNext. + LinkedList> programGroupsToProcessNext = new LinkedList>(); + for(final Iterator> programGroupsLeftToProcessIterator = programGroupsLeftToProcess.iterator(); programGroupsLeftToProcessIterator.hasNext(); ) { + final HeaderRecordAndFileHeader pairLeftToProcess = programGroupsLeftToProcessIterator.next(); + final Object ppIdOfRecordLeftToProcess = pairLeftToProcess.getHeaderRecord().getAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG); + //find what currentProgramGroups this ppId points to (NOTE: they have to come from the same file) + for(final HeaderRecordAndFileHeader justProcessedPair : currentProgramGroups) { + String idJustProcessed = justProcessedPair.getHeaderRecord().getId(); + if(pairLeftToProcess.getFileHeader() == justProcessedPair.getFileHeader() && ppIdOfRecordLeftToProcess.equals(idJustProcessed)) { + programGroupsLeftToProcessIterator.remove(); + programGroupsToProcessNext.add(pairLeftToProcess); + break; + } + } + } + + currentProgramGroups = programGroupsToProcessNext; + } + + //verify that all records were processed + if(!programGroupsLeftToProcess.isEmpty()) { + StringBuffer errorMsg = new StringBuffer(programGroupsLeftToProcess.size() + " program groups weren't processed. Do their PP ids point to existing PGs? \n"); + for( final HeaderRecordAndFileHeader pair : programGroupsLeftToProcess ) { + SAMProgramRecord record = pair.getHeaderRecord(); + errorMsg.append("@PG ID:"+record.getProgramGroupId()+" PN:"+record.getProgramName()+" PP:"+record.getPreviousProgramGroupId() +"\n"); + } + throw new PicardException(errorMsg.toString()); + } + + //sort the result list by record id + Collections.sort(overallResult, RECORD_ID_COMPARATOR); + + return overallResult; + } + + + /** + * Utility method that takes a list of program groups and remaps all their + * ids (including ppIds if requested) using the given idTranslationTable. + * + * NOTE: when remapping, this method creates new SAMProgramRecords and + * doesn't mutate any records in the programGroups list. + * + * @param programGroups The program groups to translate. + * @param idTranslationTable The translation table. + * @param translatePpIds Whether ppIds should be translated as well. + * + * @return The list of translated records. + */ + private List> translateIds( + List> programGroups, + Map> idTranslationTable, + boolean translatePpIds) { + + //go through programGroups and translate any IDs and PPs based on the idTranslationTable. + List> result = new LinkedList>(); + for(final HeaderRecordAndFileHeader pair : programGroups ) { + final SAMProgramRecord record = pair.getHeaderRecord(); + final String id = record.getProgramGroupId(); + final String ppId = (String) record.getAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG); + + final SAMFileHeader header = pair.getFileHeader(); + final Map translations = idTranslationTable.get(header); + + //see if one or both ids need to be translated + SAMProgramRecord translatedRecord = null; + if(translations != null) + { + String translatedId = translations.get( id ); + String translatedPpId = translatePpIds ? translations.get( ppId ) : null; + + boolean needToTranslateId = translatedId != null && !translatedId.equals(id); + boolean needToTranslatePpId = translatedPpId != null && !translatedPpId.equals(ppId); + + if(needToTranslateId && needToTranslatePpId) { + translatedRecord = new SAMProgramRecord(translatedId, record); + translatedRecord.setAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG, translatedPpId); + } else if(needToTranslateId) { + translatedRecord = new SAMProgramRecord(translatedId, record); + } else if(needToTranslatePpId) { + translatedRecord = new SAMProgramRecord(id, record); + translatedRecord.setAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG, translatedPpId); + } + } + + if(translatedRecord != null) { + result.add(new HeaderRecordAndFileHeader(translatedRecord, header)); + } else { + result.add(pair); //keep the original record + } + } + + return result; + } + + + /** + * Utility method for merging a List of AbstractSAMHeaderRecords. If it finds + * records that have identical ids and attributes, it will collapse them + * into one record. If it finds records that have identical ids but + * non-identical attributes, this is treated as a collision. When collision happens, + * the records' ids are remapped, and an old-id to new-id mapping is added to the idTranslationTable. + * + * NOTE: Non-collided records also get recorded in the idTranslationTable as + * old-id to old-id. This way, an idTranslationTable lookup should never return null. + * + * @param headerRecords The header records to merge. + * @param headerRecordFactory Constructs a specific subclass of AbstractSAMHeaderRecord. + * @param idsThatAreAlreadyTaken If the id of a headerRecord matches an id in this set, it will be treated as a collision, and the headRecord's id will be remapped. + * @param idTranslationTable When records collide, their ids are remapped, and an old-id to new-id + * mapping is added to the idTranslationTable. Non-collided records also get recorded in the idTranslationTable as + * old-id to old-id. This way, an idTranslationTable lookup should never return null. + * + * @param result The list of merged header records. + * + * @return True if there were collisions. + */ + private boolean mergeHeaderRecords(final List> headerRecords, HeaderRecordFactory headerRecordFactory, + final HashSet idsThatAreAlreadyTaken, Map> idTranslationTable, List result) { + + //The outer Map bins the header records by their ids. The nested Map further collapses + //header records which, in addition to having the same id, also have identical attributes. + //In other words, each key in the nested map represents one or more + //header records which have both identical ids and identical attributes. The List of + //SAMFileHeaders keeps track of which readers these header record(s) came from. + final Map>> idToRecord = + new HashMap>>(); + + //Populate the idToRecord and seenIds data structures + for (final HeaderRecordAndFileHeader pair : headerRecords) { + final RecordType record = pair.getHeaderRecord(); + final SAMFileHeader header = pair.getFileHeader(); + final String recordId = record.getId(); + Map> recordsWithSameId = idToRecord.get(recordId); + if(recordsWithSameId == null) { + recordsWithSameId = new LinkedHashMap>(); + idToRecord.put(recordId, recordsWithSameId); + } + + List fileHeaders = recordsWithSameId.get(record); + if(fileHeaders == null) { + fileHeaders = new LinkedList(); + recordsWithSameId.put(record, fileHeaders); + } + + fileHeaders.add(header); + } + + //Resolve any collisions between header records by remapping their ids. + boolean hasCollisions = false; + for (final Map.Entry>> entry : idToRecord.entrySet() ) + { + final String recordId = entry.getKey(); + final Map> recordsWithSameId = entry.getValue(); + + + for( Map.Entry> recordWithUniqueAttr : recordsWithSameId.entrySet()) { + final RecordType record = recordWithUniqueAttr.getKey(); + final List fileHeaders = recordWithUniqueAttr.getValue(); + + String newId; + if(!idsThatAreAlreadyTaken.contains(recordId)) { + //don't remap 1st record. If there are more records + //with this id, they will be remapped in the 'else'. + newId = recordId; + idsThatAreAlreadyTaken.add(recordId); + } else { + //there is more than one record with this id. + hasCollisions = true; + + //find a unique newId for this record + int idx=1; + while(idsThatAreAlreadyTaken.contains(newId = recordId + "." + Integer.toString(idx++))) + ; + + idsThatAreAlreadyTaken.add( newId ); + } + + for(SAMFileHeader fileHeader : fileHeaders) { + Map readerTranslationTable = idTranslationTable.get(fileHeader); + if(readerTranslationTable == null) { + readerTranslationTable = new HashMap(); + idTranslationTable.put(fileHeader, readerTranslationTable); + } + readerTranslationTable.put(recordId, newId); + } + + result.add( headerRecordFactory.createRecord(newId, record) ); + } + } + + return hasCollisions; + } + + + /** + * Get the sequences off the SAMFileHeader. Throws runtime exception if the sequence + * are different from one another. + * + * @param headers headers to pull sequences from + * @return sequences from files. Each file should have the same sequence + */ + private SAMSequenceDictionary getSequenceDictionary(final Collection headers) { + SAMSequenceDictionary sequences = null; + for (final SAMFileHeader header : headers) { + + if (sequences == null) { + sequences = header.getSequenceDictionary(); + } + else { + final SAMSequenceDictionary currentSequences = header.getSequenceDictionary(); + SequenceUtil.assertSequenceDictionariesEqual(sequences, currentSequences); + } + } + + return sequences; + } + + /** + * Get the sequences from the SAMFileHeader, and merge the resulting sequence dictionaries. + * + * @param headers headers to pull sequences from + * @return sequences from files. Each file should have the same sequence + */ + private SAMSequenceDictionary mergeSequenceDictionaries(final Collection headers) { + SAMSequenceDictionary sequences = new SAMSequenceDictionary(); + for (final SAMFileHeader header : headers) { + final SAMSequenceDictionary currentSequences = header.getSequenceDictionary(); + sequences = mergeSequences(sequences, currentSequences); + } + // second pass, make a map of the original seqeunce id -> new sequence id + createSequenceMapping(headers, sequences); + return sequences; + } + + /** + * They've asked to merge the sequence headers. What we support right now is finding the sequence name superset. + * + * @param mergeIntoDict the result of merging so far. All SAMSequenceRecords in here have been cloned from the originals. + * @param mergeFromDict A new sequence dictionary to merge into mergeIntoDict. + * @return A new sequence dictionary that resulting from merging the two inputs. + */ + private SAMSequenceDictionary mergeSequences(SAMSequenceDictionary mergeIntoDict, SAMSequenceDictionary mergeFromDict) { + + // a place to hold the sequences that we haven't found a home for, in the order the appear in mergeFromDict. + LinkedList holder = new LinkedList(); + + // Return value will be created from this. + LinkedList resultingDict = new LinkedList(); + for (final SAMSequenceRecord sequenceRecord : mergeIntoDict.getSequences()) { + resultingDict.add(sequenceRecord); + } + + // Index into resultingDict of previous SAMSequenceRecord from mergeFromDict that already existed in mergeIntoDict. + int prevloc = -1; + // Previous SAMSequenceRecord from mergeFromDict that already existed in mergeIntoDict. + SAMSequenceRecord previouslyMerged = null; + + for (SAMSequenceRecord sequenceRecord : mergeFromDict.getSequences()) { + // Does it already exist in resultingDict? + int loc = getIndexOfSequenceName(resultingDict, sequenceRecord.getSequenceName()); + if (loc == -1) { + // If doesn't already exist in resultingDict, save it an decide where to insert it later. + holder.add(sequenceRecord.clone()); + } else if (prevloc > loc) { + // If sequenceRecord already exists in resultingDict, but prior to the previous one + // from mergeIntoDict that already existed, cannot merge. + throw new PicardException("Cannot merge sequence dictionaries because sequence " + + sequenceRecord.getSequenceName() + " and " + previouslyMerged.getSequenceName() + + " are in different orders in two input sequence dictionaries."); + } else { + // Since sequenceRecord already exists in resultingDict, don't need to add it. + // Add in all the sequences prior to it that have been held in holder. + resultingDict.addAll(loc, holder); + // Remember the index of sequenceRecord so can check for merge imcompatibility. + prevloc = loc + holder.size(); + previouslyMerged = sequenceRecord; + holder.clear(); + } + } + // Append anything left in holder. + if (holder.size() != 0) { + resultingDict.addAll(holder); + } + return new SAMSequenceDictionary(resultingDict); + } + + /** + * Find sequence in list. + * @param list List to search for the sequence name. + * @param sequenceName Name to search for. + * @return Index of SAMSequenceRecord with the given name in list, or -1 if not found. + */ + private static int getIndexOfSequenceName(final List list, final String sequenceName) { + for (int i = 0; i < list.size(); ++i) { + if (list.get(i).getSequenceName().equals(sequenceName)) { + return i; + } + } + return -1; + } + + /** + * create the sequence mapping. This map is used to convert the unmerged header sequence ID's to the merged + * list of sequence id's. + * @param headers the collections of headers. + * @param masterDictionary the superset dictionary we've created. + */ + private void createSequenceMapping(final Collection headers, SAMSequenceDictionary masterDictionary) { + LinkedList resultingDictStr = new LinkedList(); + for (SAMSequenceRecord r : masterDictionary.getSequences()) { + resultingDictStr.add(r.getSequenceName()); + } + for (final SAMFileHeader header : headers) { + Map seqMap = new HashMap(); + SAMSequenceDictionary dict = header.getSequenceDictionary(); + for (SAMSequenceRecord rec : dict.getSequences()) { + seqMap.put(rec.getSequenceIndex(), resultingDictStr.indexOf(rec.getSequenceName())); + } + this.samSeqDictionaryIdTranslationViaHeader.put(header, seqMap); + } + } + + + + /** + * Returns the read group id that should be used for the input read and RG id. + * + * @deprecated replaced by getReadGroupId(SAMFileHeader, String) + * */ + public String getReadGroupId(final SAMFileReader reader, final String originalReadGroupId) { + return getReadGroupId(reader.getFileHeader(), originalReadGroupId); + } + + /** Returns the read group id that should be used for the input read and RG id. */ + public String getReadGroupId(final SAMFileHeader header, final String originalReadGroupId) { + return this.samReadGroupIdTranslation.get(header).get(originalReadGroupId); + } + + /** + * @param reader one of the input files + * @param originalProgramGroupId a program group ID from the above input file + * @return new ID from the merged list of program groups in the output file + * @deprecated replaced by getProgramGroupId(SAMFileHeader, String) + */ + public String getProgramGroupId(final SAMFileReader reader, final String originalProgramGroupId) { + return getProgramGroupId(reader.getFileHeader(), originalProgramGroupId); + } + + /** + * @param header one of the input headers + * @param originalProgramGroupId a program group ID from the above input file + * @return new ID from the merged list of program groups in the output file + */ + public String getProgramGroupId(final SAMFileHeader header, final String originalProgramGroupId) { + return this.samProgramGroupIdTranslation.get(header).get(originalProgramGroupId); + } + + /** Returns true if there are read group duplicates within the merged headers. */ + public boolean hasReadGroupCollisions() { + return this.hasReadGroupCollisions; + } + + /** Returns true if there are program group duplicates within the merged headers. */ + public boolean hasProgramGroupCollisions() { + return hasProgramGroupCollisions; + } + + /** @return if we've merged the sequence dictionaries, return true */ + public boolean hasMergedSequenceDictionary() { + return hasMergedSequenceDictionary; + } + + /** Returns the merged header that should be written to any output merged file. */ + public SAMFileHeader getMergedHeader() { + return this.mergedHeader; + } + + /** Returns the collection of readers that this header merger is working with. May return null. + * @deprecated replaced by getHeaders() + */ + public Collection getReaders() { + return this.readers; + } + + /** Returns the collection of readers that this header merger is working with. + */ + public Collection getHeaders() { + return this.headers; + } + + /** + * Tells whether this header merger contains a given SAM file header. Note that header presence + * is confirmed / blocked by == equality, rather than actually testing SAMFileHeader.equals(), for + * reasons of performance. + * @param header header to check for. + * @return True if the header exists in this HeaderMerger. False otherwise. + */ + boolean containsHeader(SAMFileHeader header) { + for(SAMFileHeader headerMergerHeader: headers) { + if(headerMergerHeader == header) + return true; + } + return false; + } + + /** + * returns the new mapping for a specified reader, given it's old sequence index + * @param reader the reader + * @param oldReferenceSequenceIndex the old sequence (also called reference) index + * @return the new index value + * @deprecated replaced by getMergedSequenceIndex(SAMFileHeader, Integer) + */ + public Integer getMergedSequenceIndex(SAMFileReader reader, Integer oldReferenceSequenceIndex) { + return this.getMergedSequenceIndex(reader.getFileHeader(), oldReferenceSequenceIndex); + } + + /** + * Another mechanism for getting the new sequence index, for situations in which the reader is not available. + * Note that if the SAMRecord has already had its header replaced with the merged header, this won't work. + * @param header The original header for the input record in question. + * @param oldReferenceSequenceIndex The original sequence index. + * @return the new index value that is compatible with the merged sequence index. + */ + public Integer getMergedSequenceIndex(final SAMFileHeader header, Integer oldReferenceSequenceIndex) { + final Map mapping = this.samSeqDictionaryIdTranslationViaHeader.get(header); + if (mapping == null) { + throw new PicardException("No sequence dictionary mapping available for header: " + header); + } + + final Integer newIndex = mapping.get(oldReferenceSequenceIndex); + if (newIndex == null) { + throw new PicardException("No mapping for reference index " + oldReferenceSequenceIndex + " from header: " + header); + } + + return newIndex; + } + + + /** + * Implementations of this interface are used by mergeHeaderRecords(..) to instantiate + * specific subclasses of AbstractSAMHeaderRecord. + */ + private static interface HeaderRecordFactory { + + /** + * Constructs a new instance of RecordType. + * @param id The id of the new record. + * @param srcRecord Except for the id, the new record will be a copy of this source record. + */ + public RecordType createRecord(final String id, RecordType srcRecord); + } + + /** + * Struct that groups together a subclass of AbstractSAMHeaderRecord with the + * SAMFileHeader that it came from. + */ + private static class HeaderRecordAndFileHeader { + private RecordType headerRecord; + private SAMFileHeader samFileHeader; + + public HeaderRecordAndFileHeader(RecordType headerRecord, SAMFileHeader samFileHeader) { + this.headerRecord = headerRecord; + this.samFileHeader = samFileHeader; + } + + public RecordType getHeaderRecord() { + return headerRecord; + } + public SAMFileHeader getFileHeader() { + return samFileHeader; + } + } +} diff --git a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java index d5d8ab2ef..2729941bc 100755 --- a/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java +++ b/public/java/src/org/broadinstitute/sting/gatk/datasources/reads/SAMDataSource.java @@ -259,6 +259,11 @@ public class SAMDataSource { validationStringency = strictness; if(readBufferSize != null) ReadShard.setReadBufferSize(readBufferSize); + else { + // Choose a sensible default for the read buffer size. For the moment, we're picking 1000 reads per BAM per shard (which effectively + // will mean per-thread once ReadWalkers are parallelized) with a max cap of 250K reads in memory at once. + ReadShard.setReadBufferSize(Math.min(1000*samFiles.size(),250000)); + } resourcePool = new SAMResourcePool(Integer.MAX_VALUE); SAMReaders readers = resourcePool.getAvailableReaders(); @@ -479,10 +484,13 @@ public class SAMDataSource { // Cache the most recently viewed read so that we can check whether we've reached the end of a pair. SAMRecord read = null; + Map positionUpdates = new IdentityHashMap(); + CloseableIterator iterator = getIterator(readers,shard,sortOrder == SAMFileHeader.SortOrder.coordinate); while(!shard.isBufferFull() && iterator.hasNext()) { read = iterator.next(); - addReadToBufferingShard(shard,getReaderID(readers,read),read); + shard.addRead(read); + noteFilePositionUpdate(positionUpdates,read); } // If the reads are sorted in queryname order, ensure that all reads @@ -492,11 +500,21 @@ public class SAMDataSource { SAMRecord nextRead = iterator.next(); if(read == null || !read.getReadName().equals(nextRead.getReadName())) break; - addReadToBufferingShard(shard,getReaderID(readers,nextRead),nextRead); + shard.addRead(nextRead); + noteFilePositionUpdate(positionUpdates,nextRead); } } iterator.close(); + + // Make the updates specified by the reader. + for(Map.Entry positionUpdate: positionUpdates.entrySet()) + readerPositions.put(readers.getReaderID(positionUpdate.getKey()),positionUpdate.getValue()); + } + + private void noteFilePositionUpdate(Map positionMapping, SAMRecord read) { + GATKBAMFileSpan endChunk = new GATKBAMFileSpan(read.getFileSource().getFilePointer().getContentsFollowing()); + positionMapping.put(read.getFileSource().getReader(),endChunk); } public StingSAMIterator seek(Shard shard) { @@ -574,18 +592,6 @@ public class SAMDataSource { readProperties.defaultBaseQualities()); } - /** - * Adds this read to the given shard. - * @param shard The shard to which to add the read. - * @param id The id of the given reader. - * @param read The read to add to the shard. - */ - private void addReadToBufferingShard(Shard shard,SAMReaderID id,SAMRecord read) { - GATKBAMFileSpan endChunk = new GATKBAMFileSpan(read.getFileSource().getFilePointer().getContentsFollowing()); - shard.addRead(read); - readerPositions.put(id,endChunk); - } - /** * Filter reads based on user-specified criteria. *