Performance improvements for dynamically merging BAMs in read walkers.
This change and my previous change have dropped runtime when dynamically merging 2k BAM files from 72.6min/1M reads to 46.8sec/1M reads. Note that many of these changes are stopgaps -- the real problem is the way ReadWalkers interface with Picard, and I'll have to work with Tim&Co to produce a more maintainable patch.
This commit is contained in:
parent
e61e5c7589
commit
3642a73c07
|
|
@ -0,0 +1,247 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2011, The Broad Institute
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person
|
||||||
|
* obtaining a copy of this software and associated documentation
|
||||||
|
* files (the "Software"), to deal in the Software without
|
||||||
|
* restriction, including without limitation the rights to use,
|
||||||
|
* copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the
|
||||||
|
* Software is furnished to do so, subject to the following
|
||||||
|
* conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||||
|
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||||
|
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||||
|
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||||
|
* OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
package net.sf.picard.sam;
|
||||||
|
|
||||||
|
import net.sf.picard.PicardException;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
|
||||||
|
import net.sf.samtools.*;
|
||||||
|
import net.sf.samtools.util.CloseableIterator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides an iterator interface for merging multiple underlying iterators into a single
|
||||||
|
* iterable stream. The underlying iterators/files must all have the same sort order unless
|
||||||
|
* the requested output format is unsorted, in which case any combination is valid.
|
||||||
|
*/
|
||||||
|
public class MergingSamRecordIterator implements CloseableIterator<SAMRecord> {
|
||||||
|
private final PriorityQueue<ComparableSamRecordIterator> pq;
|
||||||
|
private final SamFileHeaderMerger samHeaderMerger;
|
||||||
|
private final Collection<SAMFileReader> readers;
|
||||||
|
private final SAMFileHeader.SortOrder sortOrder;
|
||||||
|
private final SAMRecordComparator comparator;
|
||||||
|
|
||||||
|
private boolean initialized = false;
|
||||||
|
private boolean iterationStarted = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a new merging iterator with the same set of readers and sort order as
|
||||||
|
* provided by the header merger parameter.
|
||||||
|
* @param headerMerger The merged header and contents of readers.
|
||||||
|
* @param forcePresorted True to ensure that the iterator checks the headers of the readers for appropriate sort order.
|
||||||
|
* @deprecated replaced by (SamFileHeaderMerger, Collection<SAMFileReader>, boolean)
|
||||||
|
*/
|
||||||
|
public MergingSamRecordIterator(final SamFileHeaderMerger headerMerger, final boolean forcePresorted) {
|
||||||
|
this(headerMerger, headerMerger.getReaders(), forcePresorted);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a new merging iterator with the same set of readers and sort order as
|
||||||
|
* provided by the header merger parameter.
|
||||||
|
* @param headerMerger The merged header and contents of readers.
|
||||||
|
* @param assumeSorted false ensures that the iterator checks the headers of the readers for appropriate sort order.
|
||||||
|
*/
|
||||||
|
public MergingSamRecordIterator(final SamFileHeaderMerger headerMerger, Collection<SAMFileReader> readers, final boolean assumeSorted) {
|
||||||
|
this.samHeaderMerger = headerMerger;
|
||||||
|
this.sortOrder = headerMerger.getMergedHeader().getSortOrder();
|
||||||
|
this.comparator = getComparator();
|
||||||
|
this.readers = readers;
|
||||||
|
|
||||||
|
this.pq = new PriorityQueue<ComparableSamRecordIterator>(readers.size());
|
||||||
|
|
||||||
|
for (final SAMFileReader reader : readers) {
|
||||||
|
if (!assumeSorted && this.sortOrder != SAMFileHeader.SortOrder.unsorted &&
|
||||||
|
reader.getFileHeader().getSortOrder() != this.sortOrder){
|
||||||
|
throw new PicardException("Files are not compatible with sort order");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a given SAM file iterator to the merging iterator. Use this to restrict the merged iteration to a given genomic interval,
|
||||||
|
* rather than iterating over every read in the backing file or stream.
|
||||||
|
* @param reader Reader to add to the merging iterator.
|
||||||
|
* @param iterator Iterator traversing over reader contents.
|
||||||
|
*/
|
||||||
|
public void addIterator(final SAMFileReader reader, final CloseableIterator<SAMRecord> iterator) {
|
||||||
|
if(iterationStarted)
|
||||||
|
throw new PicardException("Cannot add another iterator; iteration has already begun");
|
||||||
|
if(!samHeaderMerger.containsHeader(reader.getFileHeader()))
|
||||||
|
throw new PicardException("All iterators to be merged must be accounted for in the SAM header merger");
|
||||||
|
final ComparableSamRecordIterator comparableIterator = new ComparableSamRecordIterator(reader,iterator,comparator);
|
||||||
|
addIfNotEmpty(comparableIterator);
|
||||||
|
initialized = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startIterationIfRequired() {
|
||||||
|
if(initialized)
|
||||||
|
return;
|
||||||
|
for(SAMFileReader reader: readers)
|
||||||
|
addIterator(reader,reader.iterator());
|
||||||
|
iterationStarted = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close down all open iterators.
|
||||||
|
*/
|
||||||
|
public void close() {
|
||||||
|
// Iterators not in the priority queue have already been closed; only close down the iterators that are still in the priority queue.
|
||||||
|
for(CloseableIterator<SAMRecord> iterator: pq)
|
||||||
|
iterator.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns true if any of the underlying iterators has more records, otherwise false. */
|
||||||
|
public boolean hasNext() {
|
||||||
|
startIterationIfRequired();
|
||||||
|
return !this.pq.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the next record from the top most iterator during merging. */
|
||||||
|
public SAMRecord next() {
|
||||||
|
startIterationIfRequired();
|
||||||
|
|
||||||
|
final ComparableSamRecordIterator iterator = this.pq.poll();
|
||||||
|
final SAMRecord record = iterator.next();
|
||||||
|
addIfNotEmpty(iterator);
|
||||||
|
record.setHeader(this.samHeaderMerger.getMergedHeader());
|
||||||
|
|
||||||
|
// Fix the read group if needs be
|
||||||
|
if (this.samHeaderMerger.hasReadGroupCollisions()) {
|
||||||
|
final String oldGroupId = (String) record.getAttribute(ReservedTagConstants.READ_GROUP_ID);
|
||||||
|
if (oldGroupId != null ) {
|
||||||
|
final String newGroupId = this.samHeaderMerger.getReadGroupId(iterator.getReader().getFileHeader(),oldGroupId);
|
||||||
|
record.setAttribute(ReservedTagConstants.READ_GROUP_ID, newGroupId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fix the program group if needs be
|
||||||
|
if (this.samHeaderMerger.hasProgramGroupCollisions()) {
|
||||||
|
final String oldGroupId = (String) record.getAttribute(ReservedTagConstants.PROGRAM_GROUP_ID);
|
||||||
|
if (oldGroupId != null ) {
|
||||||
|
final String newGroupId = this.samHeaderMerger.getProgramGroupId(iterator.getReader().getFileHeader(),oldGroupId);
|
||||||
|
record.setAttribute(ReservedTagConstants.PROGRAM_GROUP_ID, newGroupId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fix up the sequence indexes if needs be
|
||||||
|
if (this.samHeaderMerger.hasMergedSequenceDictionary()) {
|
||||||
|
if (record.getReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
|
||||||
|
record.setReferenceIndex(this.samHeaderMerger.getMergedSequenceIndex(iterator.getReader().getFileHeader(),record.getReferenceIndex()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (record.getReadPairedFlag() && record.getMateReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
|
||||||
|
record.setMateReferenceIndex(this.samHeaderMerger.getMergedSequenceIndex(iterator.getReader().getFileHeader(),record.getMateReferenceIndex()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds iterator to priority queue. If the iterator has more records it is added
|
||||||
|
* otherwise it is closed and not added.
|
||||||
|
*/
|
||||||
|
private void addIfNotEmpty(final ComparableSamRecordIterator iterator) {
|
||||||
|
if (iterator.hasNext()) {
|
||||||
|
pq.offer(iterator);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
iterator.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Unsupported operation. */
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException("MergingSAMRecorderIterator.remove()");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the right comparator for a given sort order (coordinate, alphabetic). In the
|
||||||
|
* case of "unsorted" it will return a comparator that gives an arbitrary but reflexive
|
||||||
|
* ordering.
|
||||||
|
*/
|
||||||
|
private SAMRecordComparator getComparator() {
|
||||||
|
// For unsorted build a fake comparator that compares based on object ID
|
||||||
|
if (this.sortOrder == SAMFileHeader.SortOrder.unsorted) {
|
||||||
|
return new SAMRecordComparator() {
|
||||||
|
public int fileOrderCompare(final SAMRecord lhs, final SAMRecord rhs) {
|
||||||
|
return System.identityHashCode(lhs) - System.identityHashCode(rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int compare(final SAMRecord lhs, final SAMRecord rhs) {
|
||||||
|
return fileOrderCompare(lhs, rhs);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (samHeaderMerger.hasMergedSequenceDictionary() && sortOrder.equals(SAMFileHeader.SortOrder.coordinate)) {
|
||||||
|
return new MergedSequenceDictionaryCoordinateOrderComparator();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise try and figure out what kind of comparator to return and build it
|
||||||
|
return this.sortOrder.getComparatorInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the merged header that the merging iterator is working from. */
|
||||||
|
public SAMFileHeader getMergedHeader() {
|
||||||
|
return this.samHeaderMerger.getMergedHeader();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ugh. Basically does a regular coordinate compare, but looks up the sequence indices in the merged
|
||||||
|
* sequence dictionary. I hate the fact that this extends SAMRecordCoordinateComparator, but it avoids
|
||||||
|
* more copy & paste.
|
||||||
|
*/
|
||||||
|
private class MergedSequenceDictionaryCoordinateOrderComparator extends SAMRecordCoordinateComparator {
|
||||||
|
|
||||||
|
public int fileOrderCompare(final SAMRecord samRecord1, final SAMRecord samRecord2) {
|
||||||
|
final int referenceIndex1 = getReferenceIndex(samRecord1);
|
||||||
|
final int referenceIndex2 = getReferenceIndex(samRecord2);
|
||||||
|
if (referenceIndex1 != referenceIndex2) {
|
||||||
|
if (referenceIndex1 == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
|
||||||
|
return 1;
|
||||||
|
} else if (referenceIndex2 == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return referenceIndex1 - referenceIndex2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (referenceIndex1 == SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
|
||||||
|
// Both are unmapped.
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return samRecord1.getAlignmentStart() - samRecord2.getAlignmentStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getReferenceIndex(final SAMRecord samRecord) {
|
||||||
|
if (samRecord.getReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
|
||||||
|
return samHeaderMerger.getMergedSequenceIndex(samRecord.getHeader(), samRecord.getReferenceIndex());
|
||||||
|
}
|
||||||
|
if (samRecord.getMateReferenceIndex() != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
|
||||||
|
return samHeaderMerger.getMergedSequenceIndex(samRecord.getHeader(), samRecord.getMateReferenceIndex());
|
||||||
|
}
|
||||||
|
return SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,744 @@
|
||||||
|
/*
|
||||||
|
* The MIT License
|
||||||
|
*
|
||||||
|
* Copyright (c) 2009 The Broad Institute
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
* of this software and associated documentation files (the "Software"), to deal
|
||||||
|
* in the Software without restriction, including without limitation the rights
|
||||||
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the Software is
|
||||||
|
* furnished to do so, subject to the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be included in
|
||||||
|
* all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||||
|
* THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
package net.sf.picard.sam;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import net.sf.picard.PicardException;
|
||||||
|
import net.sf.samtools.AbstractSAMHeaderRecord;
|
||||||
|
import net.sf.samtools.SAMFileHeader;
|
||||||
|
import net.sf.samtools.SAMFileReader;
|
||||||
|
import net.sf.samtools.SAMProgramRecord;
|
||||||
|
import net.sf.samtools.SAMReadGroupRecord;
|
||||||
|
import net.sf.samtools.SAMSequenceDictionary;
|
||||||
|
import net.sf.samtools.SAMSequenceRecord;
|
||||||
|
import net.sf.samtools.util.SequenceUtil;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merges SAMFileHeaders that have the same sequences into a single merged header
|
||||||
|
* object while providing read group translation for cases where read groups
|
||||||
|
* clash across input headers.
|
||||||
|
*/
|
||||||
|
public class SamFileHeaderMerger {
|
||||||
|
//Super Header to construct
|
||||||
|
private final SAMFileHeader mergedHeader;
|
||||||
|
private Collection<SAMFileReader> readers;
|
||||||
|
private final Collection<SAMFileHeader> headers;
|
||||||
|
|
||||||
|
//Translation of old group ids to new group ids
|
||||||
|
private final Map<SAMFileHeader, Map<String, String>> samReadGroupIdTranslation =
|
||||||
|
new IdentityHashMap<SAMFileHeader, Map<String, String>>();
|
||||||
|
|
||||||
|
//the read groups from different files use the same group ids
|
||||||
|
private boolean hasReadGroupCollisions = false;
|
||||||
|
|
||||||
|
//the program records from different files use the same program record ids
|
||||||
|
private boolean hasProgramGroupCollisions = false;
|
||||||
|
|
||||||
|
//Translation of old program group ids to new program group ids
|
||||||
|
private Map<SAMFileHeader, Map<String, String>> samProgramGroupIdTranslation =
|
||||||
|
new IdentityHashMap<SAMFileHeader, Map<String, String>>();
|
||||||
|
|
||||||
|
private boolean hasMergedSequenceDictionary = false;
|
||||||
|
|
||||||
|
// Translation of old sequence dictionary ids to new dictionary ids
|
||||||
|
// This is an IdentityHashMap because it can be quite expensive to compute the hashCode for
|
||||||
|
// large SAMFileHeaders. It is possible that two input files will have identical headers so that
|
||||||
|
// the regular HashMap would fold them together, but the value stored in each of the two
|
||||||
|
// Map entries will be the same, so it should not hurt anything.
|
||||||
|
private final Map<SAMFileHeader, Map<Integer, Integer>> samSeqDictionaryIdTranslationViaHeader =
|
||||||
|
new IdentityHashMap<SAMFileHeader, Map<Integer, Integer>>();
|
||||||
|
|
||||||
|
//HeaderRecordFactory that creates SAMReadGroupRecord instances.
|
||||||
|
private static final HeaderRecordFactory<SAMReadGroupRecord> READ_GROUP_RECORD_FACTORY = new HeaderRecordFactory<SAMReadGroupRecord>() {
|
||||||
|
public SAMReadGroupRecord createRecord(String id, SAMReadGroupRecord srcReadGroupRecord) {
|
||||||
|
return new SAMReadGroupRecord(id, srcReadGroupRecord);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
//HeaderRecordFactory that creates SAMProgramRecord instances.
|
||||||
|
private static final HeaderRecordFactory<SAMProgramRecord> PROGRAM_RECORD_FACTORY = new HeaderRecordFactory<SAMProgramRecord>() {
|
||||||
|
public SAMProgramRecord createRecord(String id, SAMProgramRecord srcProgramRecord) {
|
||||||
|
return new SAMProgramRecord(id, srcProgramRecord);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
//comparator used to sort lists of program group and read group records
|
||||||
|
private static final Comparator<AbstractSAMHeaderRecord> RECORD_ID_COMPARATOR = new Comparator<AbstractSAMHeaderRecord>() {
|
||||||
|
public int compare(AbstractSAMHeaderRecord o1, AbstractSAMHeaderRecord o2) {
|
||||||
|
return o1.getId().compareTo(o2.getId());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create SAMFileHeader with additional information. Required that sequence dictionaries agree.
|
||||||
|
*
|
||||||
|
* @param readers sam file readers to combine
|
||||||
|
* @param sortOrder sort order new header should have
|
||||||
|
* @deprecated replaced by SamFileHeaderMerger(Collection<SAMFileHeader>, SAMFileHeader.SortOrder, boolean)
|
||||||
|
*/
|
||||||
|
public SamFileHeaderMerger(final Collection<SAMFileReader> readers, final SAMFileHeader.SortOrder sortOrder) {
|
||||||
|
this(readers, sortOrder, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create SAMFileHeader with additional information.
|
||||||
|
*
|
||||||
|
* @param readers sam file readers to combine
|
||||||
|
* @param sortOrder sort order new header should have
|
||||||
|
* @param mergeDictionaries If true, merge sequence dictionaries in new header. If false, require that
|
||||||
|
* all input sequence dictionaries be identical.
|
||||||
|
* @deprecated replaced by SamFileHeaderMerger(Collection<SAMFileHeader>, SAMFileHeader.SortOrder, boolean)
|
||||||
|
*/
|
||||||
|
public SamFileHeaderMerger(final Collection<SAMFileReader> readers, final SAMFileHeader.SortOrder sortOrder, final boolean mergeDictionaries) {
|
||||||
|
this(sortOrder, getHeadersFromReaders(readers), mergeDictionaries);
|
||||||
|
this.readers = readers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create SAMFileHeader with additional information.. This is the preferred constructor.
|
||||||
|
*
|
||||||
|
* @param sortOrder sort order new header should have
|
||||||
|
* @param headers sam file headers to combine
|
||||||
|
* @param mergeDictionaries If true, merge sequence dictionaries in new header. If false, require that
|
||||||
|
* all input sequence dictionaries be identical.
|
||||||
|
*/
|
||||||
|
public SamFileHeaderMerger(final SAMFileHeader.SortOrder sortOrder, final Collection<SAMFileHeader> headers, final boolean mergeDictionaries) {
|
||||||
|
this.headers = headers;
|
||||||
|
this.mergedHeader = new SAMFileHeader();
|
||||||
|
|
||||||
|
SAMSequenceDictionary sequenceDictionary;
|
||||||
|
try {
|
||||||
|
sequenceDictionary = getSequenceDictionary(headers);
|
||||||
|
this.hasMergedSequenceDictionary = false;
|
||||||
|
}
|
||||||
|
catch (SequenceUtil.SequenceListsDifferException pe) {
|
||||||
|
if (mergeDictionaries) {
|
||||||
|
sequenceDictionary = mergeSequenceDictionaries(headers);
|
||||||
|
this.hasMergedSequenceDictionary = true;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw pe;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.mergedHeader.setSequenceDictionary(sequenceDictionary);
|
||||||
|
|
||||||
|
// Set program that creates input alignments
|
||||||
|
for (final SAMProgramRecord program : mergeProgramGroups(headers)) {
|
||||||
|
this.mergedHeader.addProgramRecord(program);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set read groups for merged header
|
||||||
|
final List<SAMReadGroupRecord> readGroups = mergeReadGroups(headers);
|
||||||
|
this.mergedHeader.setReadGroups(readGroups);
|
||||||
|
this.mergedHeader.setGroupOrder(SAMFileHeader.GroupOrder.none);
|
||||||
|
|
||||||
|
this.mergedHeader.setSortOrder(sortOrder);
|
||||||
|
|
||||||
|
for (final SAMFileHeader header : headers) {
|
||||||
|
for (final String comment : header.getComments()) {
|
||||||
|
this.mergedHeader.addComment(comment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Utilility method to make use with old constructor
|
||||||
|
private static List<SAMFileHeader> getHeadersFromReaders(Collection<SAMFileReader> readers) {
|
||||||
|
List<SAMFileHeader> headers = new ArrayList<SAMFileHeader>(readers.size());
|
||||||
|
for (SAMFileReader reader : readers) {
|
||||||
|
headers.add(reader.getFileHeader());
|
||||||
|
}
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks to see if there are clashes where different readers are using the same read
|
||||||
|
* group IDs. If yes, then those IDs that collided are remapped.
|
||||||
|
*
|
||||||
|
* @param headers headers to combine
|
||||||
|
* @return new list of read groups constructed from all the readers
|
||||||
|
*/
|
||||||
|
private List<SAMReadGroupRecord> mergeReadGroups(final Collection<SAMFileHeader> headers) {
|
||||||
|
//prepare args for mergeHeaderRecords(..) call
|
||||||
|
final HashSet<String> idsThatAreAlreadyTaken = new HashSet<String>();
|
||||||
|
|
||||||
|
final List<HeaderRecordAndFileHeader<SAMReadGroupRecord>> readGroupsToProcess = new LinkedList<HeaderRecordAndFileHeader<SAMReadGroupRecord>>();
|
||||||
|
for (final SAMFileHeader header : headers) {
|
||||||
|
for (final SAMReadGroupRecord readGroup : header.getReadGroups()) {
|
||||||
|
//verify that there are no existing id collisions in this input file
|
||||||
|
if(!idsThatAreAlreadyTaken.add(readGroup.getId()))
|
||||||
|
throw new PicardException("Input file: " + header + " contains more than one RG with the same id (" + readGroup.getId() + ")");
|
||||||
|
|
||||||
|
readGroupsToProcess.add(new HeaderRecordAndFileHeader<SAMReadGroupRecord>(readGroup, header));
|
||||||
|
}
|
||||||
|
idsThatAreAlreadyTaken.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<SAMReadGroupRecord> result = new LinkedList<SAMReadGroupRecord>();
|
||||||
|
|
||||||
|
hasReadGroupCollisions = mergeHeaderRecords(readGroupsToProcess, READ_GROUP_RECORD_FACTORY, idsThatAreAlreadyTaken, samReadGroupIdTranslation, result);
|
||||||
|
|
||||||
|
//sort the result list by record id
|
||||||
|
Collections.sort(result, RECORD_ID_COMPARATOR);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks to see if there are clashes where different readers are using the same program
|
||||||
|
* group IDs. If yes, then those IDs that collided are remapped.
|
||||||
|
*
|
||||||
|
* @param headers headers to combine
|
||||||
|
* @return new list of program groups constructed from all the readers
|
||||||
|
*/
|
||||||
|
private List<SAMProgramRecord> mergeProgramGroups(final Collection<SAMFileHeader> headers) {
|
||||||
|
|
||||||
|
final List<SAMProgramRecord> overallResult = new LinkedList<SAMProgramRecord>();
|
||||||
|
|
||||||
|
//this Set will accumulate all SAMProgramRecord ids that have been encountered so far.
|
||||||
|
final HashSet<String> idsThatAreAlreadyTaken = new HashSet<String>();
|
||||||
|
|
||||||
|
//need to process all program groups
|
||||||
|
List<HeaderRecordAndFileHeader<SAMProgramRecord>> programGroupsLeftToProcess = new LinkedList<HeaderRecordAndFileHeader<SAMProgramRecord>>();
|
||||||
|
for (final SAMFileHeader header : headers) {
|
||||||
|
for (final SAMProgramRecord programGroup : header.getProgramRecords()) {
|
||||||
|
//verify that there are no existing id collisions in this input file
|
||||||
|
if(!idsThatAreAlreadyTaken.add(programGroup.getId()))
|
||||||
|
throw new PicardException("Input file: " + header + " contains more than one PG with the same id (" + programGroup.getId() + ")");
|
||||||
|
|
||||||
|
programGroupsLeftToProcess.add(new HeaderRecordAndFileHeader<SAMProgramRecord>(programGroup, header));
|
||||||
|
}
|
||||||
|
idsThatAreAlreadyTaken.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
//A program group header (lets say ID=2 PN=B PP=1) may have a PP (previous program) attribute which chains it to
|
||||||
|
//another program group header (lets say ID=1 PN=A) to indicate that the given file was
|
||||||
|
//processed by program A followed by program B. These PP attributes potentially
|
||||||
|
//connect headers into one or more tree structures. Merging is done by
|
||||||
|
//first merging all headers that don't have PP attributes (eg. tree roots),
|
||||||
|
//then updating and merging all headers whose PPs point to the tree-root headers,
|
||||||
|
//and so on until all program group headers are processed.
|
||||||
|
|
||||||
|
//currentProgramGroups is the list of records to merge next. Start by merging the programGroups that don't have a PP attribute (eg. the tree roots).
|
||||||
|
List< HeaderRecordAndFileHeader<SAMProgramRecord> > currentProgramGroups = new LinkedList<HeaderRecordAndFileHeader<SAMProgramRecord>>();
|
||||||
|
for(final Iterator<HeaderRecordAndFileHeader<SAMProgramRecord>> programGroupsLeftToProcessIterator = programGroupsLeftToProcess.iterator(); programGroupsLeftToProcessIterator.hasNext(); ) {
|
||||||
|
final HeaderRecordAndFileHeader<SAMProgramRecord> pair = programGroupsLeftToProcessIterator.next();
|
||||||
|
if(pair.getHeaderRecord().getAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG) == null) {
|
||||||
|
programGroupsLeftToProcessIterator.remove();
|
||||||
|
currentProgramGroups.add(pair);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//merge currentProgramGroups
|
||||||
|
while(!currentProgramGroups.isEmpty())
|
||||||
|
{
|
||||||
|
final List<SAMProgramRecord> currentResult = new LinkedList<SAMProgramRecord>();
|
||||||
|
|
||||||
|
hasProgramGroupCollisions |= mergeHeaderRecords(currentProgramGroups, PROGRAM_RECORD_FACTORY, idsThatAreAlreadyTaken, samProgramGroupIdTranslation, currentResult);
|
||||||
|
|
||||||
|
//add currentResults to overallResults
|
||||||
|
overallResult.addAll(currentResult);
|
||||||
|
|
||||||
|
//apply the newly-computed id translations to currentProgramGroups and programGroupsLeftToProcess
|
||||||
|
currentProgramGroups = translateIds(currentProgramGroups, samProgramGroupIdTranslation, false);
|
||||||
|
programGroupsLeftToProcess = translateIds(programGroupsLeftToProcess, samProgramGroupIdTranslation, true);
|
||||||
|
|
||||||
|
//find all records in programGroupsLeftToProcess whose ppId points to a record that was just processed (eg. a record that's in currentProgramGroups),
|
||||||
|
//and move them to the list of programGroupsToProcessNext.
|
||||||
|
LinkedList<HeaderRecordAndFileHeader<SAMProgramRecord>> programGroupsToProcessNext = new LinkedList<HeaderRecordAndFileHeader<SAMProgramRecord>>();
|
||||||
|
for(final Iterator<HeaderRecordAndFileHeader<SAMProgramRecord>> programGroupsLeftToProcessIterator = programGroupsLeftToProcess.iterator(); programGroupsLeftToProcessIterator.hasNext(); ) {
|
||||||
|
final HeaderRecordAndFileHeader<SAMProgramRecord> pairLeftToProcess = programGroupsLeftToProcessIterator.next();
|
||||||
|
final Object ppIdOfRecordLeftToProcess = pairLeftToProcess.getHeaderRecord().getAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG);
|
||||||
|
//find what currentProgramGroups this ppId points to (NOTE: they have to come from the same file)
|
||||||
|
for(final HeaderRecordAndFileHeader<SAMProgramRecord> justProcessedPair : currentProgramGroups) {
|
||||||
|
String idJustProcessed = justProcessedPair.getHeaderRecord().getId();
|
||||||
|
if(pairLeftToProcess.getFileHeader() == justProcessedPair.getFileHeader() && ppIdOfRecordLeftToProcess.equals(idJustProcessed)) {
|
||||||
|
programGroupsLeftToProcessIterator.remove();
|
||||||
|
programGroupsToProcessNext.add(pairLeftToProcess);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
currentProgramGroups = programGroupsToProcessNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
//verify that all records were processed
|
||||||
|
if(!programGroupsLeftToProcess.isEmpty()) {
|
||||||
|
StringBuffer errorMsg = new StringBuffer(programGroupsLeftToProcess.size() + " program groups weren't processed. Do their PP ids point to existing PGs? \n");
|
||||||
|
for( final HeaderRecordAndFileHeader<SAMProgramRecord> pair : programGroupsLeftToProcess ) {
|
||||||
|
SAMProgramRecord record = pair.getHeaderRecord();
|
||||||
|
errorMsg.append("@PG ID:"+record.getProgramGroupId()+" PN:"+record.getProgramName()+" PP:"+record.getPreviousProgramGroupId() +"\n");
|
||||||
|
}
|
||||||
|
throw new PicardException(errorMsg.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
//sort the result list by record id
|
||||||
|
Collections.sort(overallResult, RECORD_ID_COMPARATOR);
|
||||||
|
|
||||||
|
return overallResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility method that takes a list of program groups and remaps all their
|
||||||
|
* ids (including ppIds if requested) using the given idTranslationTable.
|
||||||
|
*
|
||||||
|
* NOTE: when remapping, this method creates new SAMProgramRecords and
|
||||||
|
* doesn't mutate any records in the programGroups list.
|
||||||
|
*
|
||||||
|
* @param programGroups The program groups to translate.
|
||||||
|
* @param idTranslationTable The translation table.
|
||||||
|
* @param translatePpIds Whether ppIds should be translated as well.
|
||||||
|
*
|
||||||
|
* @return The list of translated records.
|
||||||
|
*/
|
||||||
|
private List<HeaderRecordAndFileHeader<SAMProgramRecord>> translateIds(
|
||||||
|
List<HeaderRecordAndFileHeader<SAMProgramRecord>> programGroups,
|
||||||
|
Map<SAMFileHeader, Map<String, String>> idTranslationTable,
|
||||||
|
boolean translatePpIds) {
|
||||||
|
|
||||||
|
//go through programGroups and translate any IDs and PPs based on the idTranslationTable.
|
||||||
|
List<HeaderRecordAndFileHeader<SAMProgramRecord>> result = new LinkedList<HeaderRecordAndFileHeader<SAMProgramRecord>>();
|
||||||
|
for(final HeaderRecordAndFileHeader<SAMProgramRecord> pair : programGroups ) {
|
||||||
|
final SAMProgramRecord record = pair.getHeaderRecord();
|
||||||
|
final String id = record.getProgramGroupId();
|
||||||
|
final String ppId = (String) record.getAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG);
|
||||||
|
|
||||||
|
final SAMFileHeader header = pair.getFileHeader();
|
||||||
|
final Map<String, String> translations = idTranslationTable.get(header);
|
||||||
|
|
||||||
|
//see if one or both ids need to be translated
|
||||||
|
SAMProgramRecord translatedRecord = null;
|
||||||
|
if(translations != null)
|
||||||
|
{
|
||||||
|
String translatedId = translations.get( id );
|
||||||
|
String translatedPpId = translatePpIds ? translations.get( ppId ) : null;
|
||||||
|
|
||||||
|
boolean needToTranslateId = translatedId != null && !translatedId.equals(id);
|
||||||
|
boolean needToTranslatePpId = translatedPpId != null && !translatedPpId.equals(ppId);
|
||||||
|
|
||||||
|
if(needToTranslateId && needToTranslatePpId) {
|
||||||
|
translatedRecord = new SAMProgramRecord(translatedId, record);
|
||||||
|
translatedRecord.setAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG, translatedPpId);
|
||||||
|
} else if(needToTranslateId) {
|
||||||
|
translatedRecord = new SAMProgramRecord(translatedId, record);
|
||||||
|
} else if(needToTranslatePpId) {
|
||||||
|
translatedRecord = new SAMProgramRecord(id, record);
|
||||||
|
translatedRecord.setAttribute(SAMProgramRecord.PREVIOUS_PROGRAM_GROUP_ID_TAG, translatedPpId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(translatedRecord != null) {
|
||||||
|
result.add(new HeaderRecordAndFileHeader<SAMProgramRecord>(translatedRecord, header));
|
||||||
|
} else {
|
||||||
|
result.add(pair); //keep the original record
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility method for merging a List of AbstractSAMHeaderRecords. If it finds
|
||||||
|
* records that have identical ids and attributes, it will collapse them
|
||||||
|
* into one record. If it finds records that have identical ids but
|
||||||
|
* non-identical attributes, this is treated as a collision. When collision happens,
|
||||||
|
* the records' ids are remapped, and an old-id to new-id mapping is added to the idTranslationTable.
|
||||||
|
*
|
||||||
|
* NOTE: Non-collided records also get recorded in the idTranslationTable as
|
||||||
|
* old-id to old-id. This way, an idTranslationTable lookup should never return null.
|
||||||
|
*
|
||||||
|
* @param headerRecords The header records to merge.
|
||||||
|
* @param headerRecordFactory Constructs a specific subclass of AbstractSAMHeaderRecord.
|
||||||
|
* @param idsThatAreAlreadyTaken If the id of a headerRecord matches an id in this set, it will be treated as a collision, and the headRecord's id will be remapped.
|
||||||
|
* @param idTranslationTable When records collide, their ids are remapped, and an old-id to new-id
|
||||||
|
* mapping is added to the idTranslationTable. Non-collided records also get recorded in the idTranslationTable as
|
||||||
|
* old-id to old-id. This way, an idTranslationTable lookup should never return null.
|
||||||
|
*
|
||||||
|
* @param result The list of merged header records.
|
||||||
|
*
|
||||||
|
* @return True if there were collisions.
|
||||||
|
*/
|
||||||
|
private <RecordType extends AbstractSAMHeaderRecord> boolean mergeHeaderRecords(final List<HeaderRecordAndFileHeader<RecordType>> headerRecords, HeaderRecordFactory<RecordType> headerRecordFactory,
|
||||||
|
final HashSet<String> idsThatAreAlreadyTaken, Map<SAMFileHeader, Map<String, String>> idTranslationTable, List<RecordType> result) {
|
||||||
|
|
||||||
|
//The outer Map bins the header records by their ids. The nested Map further collapses
|
||||||
|
//header records which, in addition to having the same id, also have identical attributes.
|
||||||
|
//In other words, each key in the nested map represents one or more
|
||||||
|
//header records which have both identical ids and identical attributes. The List of
|
||||||
|
//SAMFileHeaders keeps track of which readers these header record(s) came from.
|
||||||
|
final Map<String, Map<RecordType, List<SAMFileHeader>>> idToRecord =
|
||||||
|
new HashMap<String, Map<RecordType, List<SAMFileHeader>>>();
|
||||||
|
|
||||||
|
//Populate the idToRecord and seenIds data structures
|
||||||
|
for (final HeaderRecordAndFileHeader<RecordType> pair : headerRecords) {
|
||||||
|
final RecordType record = pair.getHeaderRecord();
|
||||||
|
final SAMFileHeader header = pair.getFileHeader();
|
||||||
|
final String recordId = record.getId();
|
||||||
|
Map<RecordType, List<SAMFileHeader>> recordsWithSameId = idToRecord.get(recordId);
|
||||||
|
if(recordsWithSameId == null) {
|
||||||
|
recordsWithSameId = new LinkedHashMap<RecordType, List<SAMFileHeader>>();
|
||||||
|
idToRecord.put(recordId, recordsWithSameId);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SAMFileHeader> fileHeaders = recordsWithSameId.get(record);
|
||||||
|
if(fileHeaders == null) {
|
||||||
|
fileHeaders = new LinkedList<SAMFileHeader>();
|
||||||
|
recordsWithSameId.put(record, fileHeaders);
|
||||||
|
}
|
||||||
|
|
||||||
|
fileHeaders.add(header);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Resolve any collisions between header records by remapping their ids.
|
||||||
|
boolean hasCollisions = false;
|
||||||
|
for (final Map.Entry<String, Map<RecordType, List<SAMFileHeader>>> entry : idToRecord.entrySet() )
|
||||||
|
{
|
||||||
|
final String recordId = entry.getKey();
|
||||||
|
final Map<RecordType, List<SAMFileHeader>> recordsWithSameId = entry.getValue();
|
||||||
|
|
||||||
|
|
||||||
|
for( Map.Entry<RecordType, List<SAMFileHeader>> recordWithUniqueAttr : recordsWithSameId.entrySet()) {
|
||||||
|
final RecordType record = recordWithUniqueAttr.getKey();
|
||||||
|
final List<SAMFileHeader> fileHeaders = recordWithUniqueAttr.getValue();
|
||||||
|
|
||||||
|
String newId;
|
||||||
|
if(!idsThatAreAlreadyTaken.contains(recordId)) {
|
||||||
|
//don't remap 1st record. If there are more records
|
||||||
|
//with this id, they will be remapped in the 'else'.
|
||||||
|
newId = recordId;
|
||||||
|
idsThatAreAlreadyTaken.add(recordId);
|
||||||
|
} else {
|
||||||
|
//there is more than one record with this id.
|
||||||
|
hasCollisions = true;
|
||||||
|
|
||||||
|
//find a unique newId for this record
|
||||||
|
int idx=1;
|
||||||
|
while(idsThatAreAlreadyTaken.contains(newId = recordId + "." + Integer.toString(idx++)))
|
||||||
|
;
|
||||||
|
|
||||||
|
idsThatAreAlreadyTaken.add( newId );
|
||||||
|
}
|
||||||
|
|
||||||
|
for(SAMFileHeader fileHeader : fileHeaders) {
|
||||||
|
Map<String, String> readerTranslationTable = idTranslationTable.get(fileHeader);
|
||||||
|
if(readerTranslationTable == null) {
|
||||||
|
readerTranslationTable = new HashMap<String, String>();
|
||||||
|
idTranslationTable.put(fileHeader, readerTranslationTable);
|
||||||
|
}
|
||||||
|
readerTranslationTable.put(recordId, newId);
|
||||||
|
}
|
||||||
|
|
||||||
|
result.add( headerRecordFactory.createRecord(newId, record) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return hasCollisions;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the sequences off the SAMFileHeader. Throws runtime exception if the sequence
|
||||||
|
* are different from one another.
|
||||||
|
*
|
||||||
|
* @param headers headers to pull sequences from
|
||||||
|
* @return sequences from files. Each file should have the same sequence
|
||||||
|
*/
|
||||||
|
private SAMSequenceDictionary getSequenceDictionary(final Collection<SAMFileHeader> headers) {
|
||||||
|
SAMSequenceDictionary sequences = null;
|
||||||
|
for (final SAMFileHeader header : headers) {
|
||||||
|
|
||||||
|
if (sequences == null) {
|
||||||
|
sequences = header.getSequenceDictionary();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
final SAMSequenceDictionary currentSequences = header.getSequenceDictionary();
|
||||||
|
SequenceUtil.assertSequenceDictionariesEqual(sequences, currentSequences);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sequences;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the sequences from the SAMFileHeader, and merge the resulting sequence dictionaries.
|
||||||
|
*
|
||||||
|
* @param headers headers to pull sequences from
|
||||||
|
* @return sequences from files. Each file should have the same sequence
|
||||||
|
*/
|
||||||
|
private SAMSequenceDictionary mergeSequenceDictionaries(final Collection<SAMFileHeader> headers) {
|
||||||
|
SAMSequenceDictionary sequences = new SAMSequenceDictionary();
|
||||||
|
for (final SAMFileHeader header : headers) {
|
||||||
|
final SAMSequenceDictionary currentSequences = header.getSequenceDictionary();
|
||||||
|
sequences = mergeSequences(sequences, currentSequences);
|
||||||
|
}
|
||||||
|
// second pass, make a map of the original seqeunce id -> new sequence id
|
||||||
|
createSequenceMapping(headers, sequences);
|
||||||
|
return sequences;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* They've asked to merge the sequence headers. What we support right now is finding the sequence name superset.
|
||||||
|
*
|
||||||
|
* @param mergeIntoDict the result of merging so far. All SAMSequenceRecords in here have been cloned from the originals.
|
||||||
|
* @param mergeFromDict A new sequence dictionary to merge into mergeIntoDict.
|
||||||
|
* @return A new sequence dictionary that resulting from merging the two inputs.
|
||||||
|
*/
|
||||||
|
private SAMSequenceDictionary mergeSequences(SAMSequenceDictionary mergeIntoDict, SAMSequenceDictionary mergeFromDict) {
|
||||||
|
|
||||||
|
// a place to hold the sequences that we haven't found a home for, in the order the appear in mergeFromDict.
|
||||||
|
LinkedList<SAMSequenceRecord> holder = new LinkedList<SAMSequenceRecord>();
|
||||||
|
|
||||||
|
// Return value will be created from this.
|
||||||
|
LinkedList<SAMSequenceRecord> resultingDict = new LinkedList<SAMSequenceRecord>();
|
||||||
|
for (final SAMSequenceRecord sequenceRecord : mergeIntoDict.getSequences()) {
|
||||||
|
resultingDict.add(sequenceRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index into resultingDict of previous SAMSequenceRecord from mergeFromDict that already existed in mergeIntoDict.
|
||||||
|
int prevloc = -1;
|
||||||
|
// Previous SAMSequenceRecord from mergeFromDict that already existed in mergeIntoDict.
|
||||||
|
SAMSequenceRecord previouslyMerged = null;
|
||||||
|
|
||||||
|
for (SAMSequenceRecord sequenceRecord : mergeFromDict.getSequences()) {
|
||||||
|
// Does it already exist in resultingDict?
|
||||||
|
int loc = getIndexOfSequenceName(resultingDict, sequenceRecord.getSequenceName());
|
||||||
|
if (loc == -1) {
|
||||||
|
// If doesn't already exist in resultingDict, save it an decide where to insert it later.
|
||||||
|
holder.add(sequenceRecord.clone());
|
||||||
|
} else if (prevloc > loc) {
|
||||||
|
// If sequenceRecord already exists in resultingDict, but prior to the previous one
|
||||||
|
// from mergeIntoDict that already existed, cannot merge.
|
||||||
|
throw new PicardException("Cannot merge sequence dictionaries because sequence " +
|
||||||
|
sequenceRecord.getSequenceName() + " and " + previouslyMerged.getSequenceName() +
|
||||||
|
" are in different orders in two input sequence dictionaries.");
|
||||||
|
} else {
|
||||||
|
// Since sequenceRecord already exists in resultingDict, don't need to add it.
|
||||||
|
// Add in all the sequences prior to it that have been held in holder.
|
||||||
|
resultingDict.addAll(loc, holder);
|
||||||
|
// Remember the index of sequenceRecord so can check for merge imcompatibility.
|
||||||
|
prevloc = loc + holder.size();
|
||||||
|
previouslyMerged = sequenceRecord;
|
||||||
|
holder.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Append anything left in holder.
|
||||||
|
if (holder.size() != 0) {
|
||||||
|
resultingDict.addAll(holder);
|
||||||
|
}
|
||||||
|
return new SAMSequenceDictionary(resultingDict);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find sequence in list.
|
||||||
|
* @param list List to search for the sequence name.
|
||||||
|
* @param sequenceName Name to search for.
|
||||||
|
* @return Index of SAMSequenceRecord with the given name in list, or -1 if not found.
|
||||||
|
*/
|
||||||
|
private static int getIndexOfSequenceName(final List<SAMSequenceRecord> list, final String sequenceName) {
|
||||||
|
for (int i = 0; i < list.size(); ++i) {
|
||||||
|
if (list.get(i).getSequenceName().equals(sequenceName)) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create the sequence mapping. This map is used to convert the unmerged header sequence ID's to the merged
|
||||||
|
* list of sequence id's.
|
||||||
|
* @param headers the collections of headers.
|
||||||
|
* @param masterDictionary the superset dictionary we've created.
|
||||||
|
*/
|
||||||
|
private void createSequenceMapping(final Collection<SAMFileHeader> headers, SAMSequenceDictionary masterDictionary) {
|
||||||
|
LinkedList<String> resultingDictStr = new LinkedList<String>();
|
||||||
|
for (SAMSequenceRecord r : masterDictionary.getSequences()) {
|
||||||
|
resultingDictStr.add(r.getSequenceName());
|
||||||
|
}
|
||||||
|
for (final SAMFileHeader header : headers) {
|
||||||
|
Map<Integer, Integer> seqMap = new HashMap<Integer, Integer>();
|
||||||
|
SAMSequenceDictionary dict = header.getSequenceDictionary();
|
||||||
|
for (SAMSequenceRecord rec : dict.getSequences()) {
|
||||||
|
seqMap.put(rec.getSequenceIndex(), resultingDictStr.indexOf(rec.getSequenceName()));
|
||||||
|
}
|
||||||
|
this.samSeqDictionaryIdTranslationViaHeader.put(header, seqMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the read group id that should be used for the input read and RG id.
|
||||||
|
*
|
||||||
|
* @deprecated replaced by getReadGroupId(SAMFileHeader, String)
|
||||||
|
* */
|
||||||
|
public String getReadGroupId(final SAMFileReader reader, final String originalReadGroupId) {
|
||||||
|
return getReadGroupId(reader.getFileHeader(), originalReadGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the read group id that should be used for the input read and RG id. */
|
||||||
|
public String getReadGroupId(final SAMFileHeader header, final String originalReadGroupId) {
|
||||||
|
return this.samReadGroupIdTranslation.get(header).get(originalReadGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param reader one of the input files
|
||||||
|
* @param originalProgramGroupId a program group ID from the above input file
|
||||||
|
* @return new ID from the merged list of program groups in the output file
|
||||||
|
* @deprecated replaced by getProgramGroupId(SAMFileHeader, String)
|
||||||
|
*/
|
||||||
|
public String getProgramGroupId(final SAMFileReader reader, final String originalProgramGroupId) {
|
||||||
|
return getProgramGroupId(reader.getFileHeader(), originalProgramGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param header one of the input headers
|
||||||
|
* @param originalProgramGroupId a program group ID from the above input file
|
||||||
|
* @return new ID from the merged list of program groups in the output file
|
||||||
|
*/
|
||||||
|
public String getProgramGroupId(final SAMFileHeader header, final String originalProgramGroupId) {
|
||||||
|
return this.samProgramGroupIdTranslation.get(header).get(originalProgramGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns true if there are read group duplicates within the merged headers. */
|
||||||
|
public boolean hasReadGroupCollisions() {
|
||||||
|
return this.hasReadGroupCollisions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns true if there are program group duplicates within the merged headers. */
|
||||||
|
public boolean hasProgramGroupCollisions() {
|
||||||
|
return hasProgramGroupCollisions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @return if we've merged the sequence dictionaries, return true */
|
||||||
|
public boolean hasMergedSequenceDictionary() {
|
||||||
|
return hasMergedSequenceDictionary;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the merged header that should be written to any output merged file. */
|
||||||
|
public SAMFileHeader getMergedHeader() {
|
||||||
|
return this.mergedHeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the collection of readers that this header merger is working with. May return null.
|
||||||
|
* @deprecated replaced by getHeaders()
|
||||||
|
*/
|
||||||
|
public Collection<SAMFileReader> getReaders() {
|
||||||
|
return this.readers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the collection of readers that this header merger is working with.
|
||||||
|
*/
|
||||||
|
public Collection<SAMFileHeader> getHeaders() {
|
||||||
|
return this.headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tells whether this header merger contains a given SAM file header. Note that header presence
|
||||||
|
* is confirmed / blocked by == equality, rather than actually testing SAMFileHeader.equals(), for
|
||||||
|
* reasons of performance.
|
||||||
|
* @param header header to check for.
|
||||||
|
* @return True if the header exists in this HeaderMerger. False otherwise.
|
||||||
|
*/
|
||||||
|
boolean containsHeader(SAMFileHeader header) {
|
||||||
|
for(SAMFileHeader headerMergerHeader: headers) {
|
||||||
|
if(headerMergerHeader == header)
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns the new mapping for a specified reader, given it's old sequence index
|
||||||
|
* @param reader the reader
|
||||||
|
* @param oldReferenceSequenceIndex the old sequence (also called reference) index
|
||||||
|
* @return the new index value
|
||||||
|
* @deprecated replaced by getMergedSequenceIndex(SAMFileHeader, Integer)
|
||||||
|
*/
|
||||||
|
public Integer getMergedSequenceIndex(SAMFileReader reader, Integer oldReferenceSequenceIndex) {
|
||||||
|
return this.getMergedSequenceIndex(reader.getFileHeader(), oldReferenceSequenceIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Another mechanism for getting the new sequence index, for situations in which the reader is not available.
|
||||||
|
* Note that if the SAMRecord has already had its header replaced with the merged header, this won't work.
|
||||||
|
* @param header The original header for the input record in question.
|
||||||
|
* @param oldReferenceSequenceIndex The original sequence index.
|
||||||
|
* @return the new index value that is compatible with the merged sequence index.
|
||||||
|
*/
|
||||||
|
public Integer getMergedSequenceIndex(final SAMFileHeader header, Integer oldReferenceSequenceIndex) {
|
||||||
|
final Map<Integer, Integer> mapping = this.samSeqDictionaryIdTranslationViaHeader.get(header);
|
||||||
|
if (mapping == null) {
|
||||||
|
throw new PicardException("No sequence dictionary mapping available for header: " + header);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Integer newIndex = mapping.get(oldReferenceSequenceIndex);
|
||||||
|
if (newIndex == null) {
|
||||||
|
throw new PicardException("No mapping for reference index " + oldReferenceSequenceIndex + " from header: " + header);
|
||||||
|
}
|
||||||
|
|
||||||
|
return newIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementations of this interface are used by mergeHeaderRecords(..) to instantiate
|
||||||
|
* specific subclasses of AbstractSAMHeaderRecord.
|
||||||
|
*/
|
||||||
|
private static interface HeaderRecordFactory<RecordType extends AbstractSAMHeaderRecord> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a new instance of RecordType.
|
||||||
|
* @param id The id of the new record.
|
||||||
|
* @param srcRecord Except for the id, the new record will be a copy of this source record.
|
||||||
|
*/
|
||||||
|
public RecordType createRecord(final String id, RecordType srcRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Struct that groups together a subclass of AbstractSAMHeaderRecord with the
|
||||||
|
* SAMFileHeader that it came from.
|
||||||
|
*/
|
||||||
|
private static class HeaderRecordAndFileHeader<RecordType extends AbstractSAMHeaderRecord> {
|
||||||
|
private RecordType headerRecord;
|
||||||
|
private SAMFileHeader samFileHeader;
|
||||||
|
|
||||||
|
public HeaderRecordAndFileHeader(RecordType headerRecord, SAMFileHeader samFileHeader) {
|
||||||
|
this.headerRecord = headerRecord;
|
||||||
|
this.samFileHeader = samFileHeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RecordType getHeaderRecord() {
|
||||||
|
return headerRecord;
|
||||||
|
}
|
||||||
|
public SAMFileHeader getFileHeader() {
|
||||||
|
return samFileHeader;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -259,6 +259,11 @@ public class SAMDataSource {
|
||||||
validationStringency = strictness;
|
validationStringency = strictness;
|
||||||
if(readBufferSize != null)
|
if(readBufferSize != null)
|
||||||
ReadShard.setReadBufferSize(readBufferSize);
|
ReadShard.setReadBufferSize(readBufferSize);
|
||||||
|
else {
|
||||||
|
// Choose a sensible default for the read buffer size. For the moment, we're picking 1000 reads per BAM per shard (which effectively
|
||||||
|
// will mean per-thread once ReadWalkers are parallelized) with a max cap of 250K reads in memory at once.
|
||||||
|
ReadShard.setReadBufferSize(Math.min(1000*samFiles.size(),250000));
|
||||||
|
}
|
||||||
|
|
||||||
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
|
resourcePool = new SAMResourcePool(Integer.MAX_VALUE);
|
||||||
SAMReaders readers = resourcePool.getAvailableReaders();
|
SAMReaders readers = resourcePool.getAvailableReaders();
|
||||||
|
|
@ -479,10 +484,13 @@ public class SAMDataSource {
|
||||||
// Cache the most recently viewed read so that we can check whether we've reached the end of a pair.
|
// Cache the most recently viewed read so that we can check whether we've reached the end of a pair.
|
||||||
SAMRecord read = null;
|
SAMRecord read = null;
|
||||||
|
|
||||||
|
Map<SAMFileReader,GATKBAMFileSpan> positionUpdates = new IdentityHashMap<SAMFileReader,GATKBAMFileSpan>();
|
||||||
|
|
||||||
CloseableIterator<SAMRecord> iterator = getIterator(readers,shard,sortOrder == SAMFileHeader.SortOrder.coordinate);
|
CloseableIterator<SAMRecord> iterator = getIterator(readers,shard,sortOrder == SAMFileHeader.SortOrder.coordinate);
|
||||||
while(!shard.isBufferFull() && iterator.hasNext()) {
|
while(!shard.isBufferFull() && iterator.hasNext()) {
|
||||||
read = iterator.next();
|
read = iterator.next();
|
||||||
addReadToBufferingShard(shard,getReaderID(readers,read),read);
|
shard.addRead(read);
|
||||||
|
noteFilePositionUpdate(positionUpdates,read);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the reads are sorted in queryname order, ensure that all reads
|
// If the reads are sorted in queryname order, ensure that all reads
|
||||||
|
|
@ -492,11 +500,21 @@ public class SAMDataSource {
|
||||||
SAMRecord nextRead = iterator.next();
|
SAMRecord nextRead = iterator.next();
|
||||||
if(read == null || !read.getReadName().equals(nextRead.getReadName()))
|
if(read == null || !read.getReadName().equals(nextRead.getReadName()))
|
||||||
break;
|
break;
|
||||||
addReadToBufferingShard(shard,getReaderID(readers,nextRead),nextRead);
|
shard.addRead(nextRead);
|
||||||
|
noteFilePositionUpdate(positionUpdates,nextRead);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
iterator.close();
|
iterator.close();
|
||||||
|
|
||||||
|
// Make the updates specified by the reader.
|
||||||
|
for(Map.Entry<SAMFileReader,GATKBAMFileSpan> positionUpdate: positionUpdates.entrySet())
|
||||||
|
readerPositions.put(readers.getReaderID(positionUpdate.getKey()),positionUpdate.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void noteFilePositionUpdate(Map<SAMFileReader,GATKBAMFileSpan> positionMapping, SAMRecord read) {
|
||||||
|
GATKBAMFileSpan endChunk = new GATKBAMFileSpan(read.getFileSource().getFilePointer().getContentsFollowing());
|
||||||
|
positionMapping.put(read.getFileSource().getReader(),endChunk);
|
||||||
}
|
}
|
||||||
|
|
||||||
public StingSAMIterator seek(Shard shard) {
|
public StingSAMIterator seek(Shard shard) {
|
||||||
|
|
@ -574,18 +592,6 @@ public class SAMDataSource {
|
||||||
readProperties.defaultBaseQualities());
|
readProperties.defaultBaseQualities());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds this read to the given shard.
|
|
||||||
* @param shard The shard to which to add the read.
|
|
||||||
* @param id The id of the given reader.
|
|
||||||
* @param read The read to add to the shard.
|
|
||||||
*/
|
|
||||||
private void addReadToBufferingShard(Shard shard,SAMReaderID id,SAMRecord read) {
|
|
||||||
GATKBAMFileSpan endChunk = new GATKBAMFileSpan(read.getFileSource().getFilePointer().getContentsFollowing());
|
|
||||||
shard.addRead(read);
|
|
||||||
readerPositions.put(id,endChunk);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Filter reads based on user-specified criteria.
|
* Filter reads based on user-specified criteria.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue