From 32d86cf457bcd9dc3abd6a87761c99769e89c7d2 Mon Sep 17 00:00:00 2001 From: hanna Date: Wed, 21 Apr 2010 19:50:26 +0000 Subject: [PATCH] Rev the reservoir downsampler to support partitioning through a functor. git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3232 348d0f76-0448-11de-a6fe-93d51630548a --- .../sting/utils/ReservoirDownsampler.java | 120 +++++++++++++++--- .../utils/sam/AlignmentStartComparator.java | 3 + 2 files changed, 104 insertions(+), 19 deletions(-) rename java/{test => src}/org/broadinstitute/sting/utils/sam/AlignmentStartComparator.java (83%) diff --git a/java/src/org/broadinstitute/sting/utils/ReservoirDownsampler.java b/java/src/org/broadinstitute/sting/utils/ReservoirDownsampler.java index a4a8e7f03..13af0041c 100644 --- a/java/src/org/broadinstitute/sting/utils/ReservoirDownsampler.java +++ b/java/src/org/broadinstitute/sting/utils/ReservoirDownsampler.java @@ -10,6 +10,9 @@ import java.util.*; * with a Reservoir" (Vitter 1985). At time of writing, this paper is located here: * http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.138.784&rep=rep1&type=pdf * + * Contains an enhancement allowing users to partition downsampled data. If a partitioner + * is used, each partition will be allowed to contain maxElements elements. + * * Note that using the ReservoirDownsampler will leave the given iterator in an undefined * state. Do not attempt to use the iterator (other than closing it) after the Downsampler * completes. @@ -33,6 +36,11 @@ public class ReservoirDownsampler implements Iterator> { */ private final Comparator comparator; + /** + * Partitions the elements into subsets, each having an equal number of maxElements. + */ + private final Partitioner partitioner; + /** * What is the maximum number of reads that can be returned in a single batch. */ @@ -42,11 +50,23 @@ public class ReservoirDownsampler implements Iterator> { * Create a new downsampler with the given source iterator and given comparator. * @param iterator Source of the data stream. * @param comparator Used to compare two records to see whether they're 'equal' at this position. - * @param maxElements What is the maximum number of reads that can be returned in any call of this + * @param maxElements What is the maximum number of reads that can be returned in any partition of any call of this iterator. */ public ReservoirDownsampler(final Iterator iterator, final Comparator comparator, final int maxElements) { + this(iterator,comparator,null,maxElements); + } + + /** + * Create a new downsampler with the given source iterator and given comparator. + * @param iterator Source of the data stream. + * @param comparator Used to compare two records to see whether they're 'equal' at this position. + * @param partitioner Used to divide the elements into bins. Each bin can have maxElements elements. + * @param maxElements What is the maximum number of reads that can be returned in any call of this + */ + public ReservoirDownsampler(final Iterator iterator, final Comparator comparator, final Partitioner partitioner, final int maxElements) { this.iterator = new PeekableIterator(iterator); this.comparator = comparator; + this.partitioner = partitioner; if(maxElements < 0) throw new StingException("Unable to work with an negative size collection of elements"); this.maxElements = maxElements; @@ -65,39 +85,101 @@ public class ReservoirDownsampler implements Iterator> { if(!hasNext()) throw new NoSuchElementException("No next element is present."); - List batch = new ArrayList(maxElements); - int currentElement = 0; + Map> partitions = new HashMap>(); // Determine our basis of equality. T first = iterator.next(); + if(maxElements > 0) - batch.add(first); - currentElement++; + getPartitionForEntry(partitions,first).add(first); - // Fill the reservoir - while(iterator.hasNext() && - currentElement < maxElements && - comparator.compare(first,iterator.peek()) == 0) { - batch.add(iterator.next()); - currentElement++; - } - - // Trim off remaining elements, randomly selecting them using the process as described by Vitter. while(iterator.hasNext() && comparator.compare(first,iterator.peek()) == 0) { T candidate = iterator.next(); - final int slot = random.nextInt(currentElement); - if(slot >= 0 && slot < maxElements) - batch.set(slot,candidate); - currentElement++; - } + getPartitionForEntry(partitions,candidate).add(candidate); + } + + LinkedList batch = new LinkedList(); + for(Partition partition: partitions.values()) + batch.addAll(partition.elements); return batch; } + /** + * Gets the appropriate partition for the given entry from storage. + * @param partitions List of partitions from which to choose. + * @param entry Entry for which to compute the partition. + * @return The partition associated with this entry. Will be created if not present. + */ + private Partition getPartitionForEntry(final Map> partitions, final T entry) { + Object partition = partitioner!=null ? partitioner.partition(entry) : null; + if(!partitions.containsKey(partition)) + partitions.put(partition,new Partition(maxElements)); + return partitions.get(partition); + } + /** * Unsupported; throws exception to that effect. */ public void remove() { throw new UnsupportedOperationException("Cannot remove from a ReservoirDownsampler."); } + + /** + * A common interface for a functor that can take data of + * some type and return an object that can be used to partition + * that data in some way. Really just a declaration of a + * specialized map function. + */ + public interface Partitioner { + public Object partition(T input); + } + + /** + * Models a partition of a given set of elements. Knows how to select + * random elements with replacement. + * @param Data type for the elements of the partition. + */ + private class Partition { + /** + * How large can this partition grow? + */ + private final int partitionSize; + + /** + * The elements of the partition. + */ + private List elements = new ArrayList(); + + /** + * The total number of elements seen. + */ + private long elementsSeen = 0; + + public Partition(final int partitionSize) { + this.partitionSize = partitionSize; + } + + /** + * Add a new element to this collection, downsampling as necessary so that the partition + * stays under partitionSize elements. + * @param element Element to conditionally add. + */ + public void add(T element) { + if(elements.size() < partitionSize) + elements.add(element); + else { + // Get a uniformly distributed long > 0 and remap it to the range from [0,elementsSeen). + long slot = random.nextLong(); + while(slot == Long.MIN_VALUE) + slot = random.nextLong(); + slot = (long)(((float)Math.abs(slot))/Long.MAX_VALUE * (elementsSeen-1)); + + // If the chosen slot lives within the partition, replace the entry in that slot with the newest entry. + if(slot >= 0 && slot < partitionSize) + elements.set((int)slot,element); + } + elementsSeen++; + } + } } diff --git a/java/test/org/broadinstitute/sting/utils/sam/AlignmentStartComparator.java b/java/src/org/broadinstitute/sting/utils/sam/AlignmentStartComparator.java similarity index 83% rename from java/test/org/broadinstitute/sting/utils/sam/AlignmentStartComparator.java rename to java/src/org/broadinstitute/sting/utils/sam/AlignmentStartComparator.java index 6b0f6d036..0e12fa87e 100644 --- a/java/test/org/broadinstitute/sting/utils/sam/AlignmentStartComparator.java +++ b/java/src/org/broadinstitute/sting/utils/sam/AlignmentStartComparator.java @@ -16,6 +16,9 @@ import java.util.Comparator; */ public class AlignmentStartComparator implements Comparator { public int compare(SAMRecord lhs, SAMRecord rhs) { + if(!lhs.getReferenceIndex().equals(rhs.getReferenceIndex())) + return lhs.getReferenceIndex() - rhs.getReferenceIndex(); + // Note: no integer overflow here because alignment starts are >= 0. return lhs.getAlignmentStart() - rhs.getAlignmentStart(); }