RandomPercentile.java

/*
 * Licensed to the Hipparchus project under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The Hipparchus project licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.hipparchus.stat.descriptive.rank;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;

import org.hipparchus.exception.LocalizedCoreFormats;
import org.hipparchus.exception.MathIllegalArgumentException;
import org.hipparchus.exception.MathIllegalStateException;
import org.hipparchus.exception.NullArgumentException;
import org.hipparchus.random.RandomGenerator;
import org.hipparchus.random.Well19937c;
import org.hipparchus.stat.StatUtils;
import org.hipparchus.stat.descriptive.AbstractStorelessUnivariateStatistic;
import org.hipparchus.stat.descriptive.AggregatableStatistic;
import org.hipparchus.stat.descriptive.StorelessUnivariateStatistic;
import org.hipparchus.util.FastMath;
import org.hipparchus.util.MathArrays;

/**
 * A {@link StorelessUnivariateStatistic} estimating percentiles using the
 * <a href=http://dimacs.rutgers.edu/~graham/pubs/papers/nquantiles.pdf>RANDOM</a>
 * Algorithm.
 * <p>
 * Storage requirements for the RANDOM algorithm depend on the desired
 * accuracy of quantile estimates. Quantile estimate accuracy is defined as follows.
 * <p>
 * Let \(X\) be the set of all data values consumed from the stream and let \(q\)
 * be a quantile (measured between 0 and 1) to be estimated. If <ul>
 * <li>\(\epsilon\) is the configured accuracy</li>
 * <li> \(\hat{q}\) is a RandomPercentile estimate for \(q\) (what is returned
 *      by {@link #getResult()} or {@link #getResult(double)}) with \(100q\) as
 *      actual parameter)</li>
 * <li> \(rank(\hat{q}) = |\{x \in X : x &lt; \hat{q}\}|\) is the actual rank of
 *      \(\hat{q}\) in the full data stream</li>
 * <li>\(n = |X|\) is the number of observations</li></ul>
 * then we can expect \((q - \epsilon)n &lt; rank(\hat{q}) &lt; (q + \epsilon)n\).
 * <p>
 * The algorithm maintains \(\left\lceil {log_{2}(1/\epsilon)}\right\rceil + 1\) buffers
 * of size \(\left\lceil {1/\epsilon \sqrt{log_2(1/\epsilon)}}\right\rceil\).  When
 * {@code epsilon} is set to the default value of \(10^{-4}\), this makes 15 buffers
 * of size 36,453.
 * <p>
 * The algorithm uses the buffers to maintain samples of data from the stream.  Until
 * all buffers are full, the entire sample is stored in the buffers.
 * If one of the {@code getResult} methods is called when all data are available in memory
 * and there is room to make a copy of the data (meaning the combined set of buffers is
 * less than half full), the {@code getResult} method delegates to a {@link Percentile}
 * instance to compute and return the exact value for the desired quantile.
 * For default {@code epsilon}, this means exact values will be returned whenever fewer than
 * \(\left\lceil {15 \times 36453 / 2} \right\rceil = 273,398\) values have been consumed
 * from the data stream.
 * <p>
 * When buffers become full, the algorithm merges buffers so that they effectively represent
 * a larger set of values than they can hold. Subsequently, data values are sampled from the
 * stream to fill buffers freed by merge operations.  Both the merging and the sampling
 * require random selection, which is done using a {@code RandomGenerator}.  To get
 * repeatable results for large data streams, users should provide {@code RandomGenerator}
 * instances with fixed seeds. {@code RandomPercentile} itself does not reseed or otherwise
 * initialize the {@code RandomGenerator} provided to it.  By default, it uses a
 * {@link Well19937c} generator with the default seed.
 * <p>
 * Note: This implementation is not thread-safe.
 */
public class RandomPercentile
    extends AbstractStorelessUnivariateStatistic implements StorelessUnivariateStatistic,
    AggregatableStatistic<RandomPercentile>, Serializable {

    /** Default quantile estimation error setting */
    public static final double DEFAULT_EPSILON = 1e-4;
    /** Serialization version id */
    private static final long serialVersionUID = 1L;
    /** Storage size of each buffer */
    private final int s;
    /** Maximum number of buffers minus 1 */
    private final int h;
    /** Data structure used to manage buffers */
    private final BufferMap bufferMap;
    /** Bound on the quantile estimation error */
    private final double epsilon;
    /** Source of random data */
    private final RandomGenerator randomGenerator;
    /** Number of elements consumed from the input data stream */
    private long n;
    /** Buffer currently being filled */
    private Buffer currentBuffer;

    /**
     * Constructs a {@code RandomPercentile} with quantile estimation error
     * {@code epsilon} using {@code randomGenerator} as its source of random data.
     *
     * @param epsilon bound on quantile estimation error (see class javadoc)
     * @param randomGenerator PRNG used in sampling and merge operations
     * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
     */
    public RandomPercentile(double epsilon, RandomGenerator randomGenerator) {
        if (epsilon <= 0) {
            throw new MathIllegalArgumentException(LocalizedCoreFormats.NUMBER_TOO_SMALL,
                                                   epsilon, 0);
        }
        this.h = (int) FastMath.ceil(log2(1/epsilon));
        this.s = (int) FastMath.ceil(FastMath.sqrt(log2(1/epsilon)) / epsilon);
        this.randomGenerator = randomGenerator;
        bufferMap = new BufferMap(h + 1, s, randomGenerator);
        currentBuffer = bufferMap.create(0);
        this.epsilon = epsilon;
    }

    /**
     * Constructs a {@code RandomPercentile} with default estimation error
     * using {@code randomGenerator} as its source of random data.
     *
     * @param randomGenerator PRNG used in sampling and merge operations
     * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
     */
    public RandomPercentile(RandomGenerator randomGenerator) {
        this(DEFAULT_EPSILON, randomGenerator);
    }

    /**
     * Constructs a {@code RandomPercentile} with quantile estimation error
     * {@code epsilon} using the default PRNG as source of random data.
     *
     * @param epsilon bound on quantile estimation error (see class javadoc)
     * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
     */
    public RandomPercentile(double epsilon) {
        this(epsilon, new Well19937c());
    }

    /**
     * Constructs a {@code RandomPercentile} with quantile estimation error
     * set to the default ({@link #DEFAULT_EPSILON}), using the default PRNG
     * as source of random data.
     */
    public RandomPercentile() {
        this(DEFAULT_EPSILON, new Well19937c());
    }

    /**
     * Copy constructor, creates a new {@code RandomPercentile} identical
     * to the {@code original}.  Note: the RandomGenerator used by the new
     * instance is referenced, not copied - i.e., the new instance shares
     * a generator with the original.
     *
     * @param original the {@code PSquarePercentile} instance to copy
     */
    public RandomPercentile(RandomPercentile original) {
        super();
        this.h = original.h;
        this.n = original.n;
        this.s = original.s;
        this.epsilon = original.epsilon;
        this.bufferMap = new BufferMap(original.bufferMap);
        this.randomGenerator = original.randomGenerator;
        Iterator<Buffer> iterator = bufferMap.iterator();
        Buffer current = null;
        Buffer curr = null;
        // See if there is a partially filled buffer - that will be currentBuffer
        while (current == null && iterator.hasNext()) {
            curr = iterator.next();
            if (curr.hasCapacity()) {
                current = curr;
            }
        }
        // If there is no partially filled buffer, just assign the last one.
        // Next increment() will find no capacity and create a new one or trigger
        // a merge.
        this.currentBuffer = current == null ? curr : current;
    }

    @Override
    public long getN() {
        return n;
    }

    /**
     * Returns an estimate of the given percentile, computed using the designated
     * array segment as input data.
     *
     * @param values source of input data
     * @param begin position of the first element of the values array to include
     * @param length number of array elements to include
     * @param percentile desired percentile (scaled 0 - 100)
     *
     * @return estimated percentile
     * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
     */
    public double evaluate(final double percentile, final double[] values, final int begin, final int length)
        throws MathIllegalArgumentException {
        if (MathArrays.verifyValues(values, begin, length)) {
            RandomPercentile randomPercentile = new RandomPercentile(this.epsilon,
                                                                     this.randomGenerator);
            randomPercentile.incrementAll(values, begin, length);
            return randomPercentile.getResult(percentile);
        }
        return Double.NaN;
    }

    /**
     * Returns an estimate of the median, computed using the designated
     * array segment as input data.
     *
     * @param values source of input data
     * @param begin position of the first element of the values array to include
     * @param length number of array elements to include
     *
     * @return estimated percentile
     * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
     */
    @Override
    public double evaluate(final double[] values, final int begin, final int length) {
        return evaluate(50d, values, begin, length);
    }

    /**
     * Returns an estimate of percentile over the given array.
     *
     * @param values source of input data
     * @param percentile desired percentile (scaled 0 - 100)
     *
     * @return estimated percentile
     * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
     */
    public double evaluate(final double percentile, final double[] values) {
        return evaluate(percentile, values, 0, values.length);
    }

    @Override
    public RandomPercentile copy() {
        return new RandomPercentile(this);
    }

    @Override
    public void clear() {
        n = 0;
        bufferMap.clear();
        currentBuffer = bufferMap.create(0);
    }

    /**
     * Returns an estimate of the median.
     */
    @Override
    public double getResult() {
        return getResult(50d);

    }

    /**
     * Returns an estimate of the given percentile.
     *
     * @param percentile desired percentile (scaled 0 - 100)
     * @return estimated percentile
     * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
     */
    public double getResult(double percentile) {
        if (percentile > 100 || percentile < 0) {
            throw new MathIllegalArgumentException(LocalizedCoreFormats.OUT_OF_RANGE,
                                                   percentile, 0, 100);
        }
        // Convert to internal quantile scale
        final double q = percentile / 100;
        // First get global min and max to bound search.
        double min = Double.POSITIVE_INFINITY;
        double max = Double.NEGATIVE_INFINITY;
        double bMin;
        double bMax;
        Iterator<Buffer> bufferIterator = bufferMap.iterator();
        while (bufferIterator.hasNext()) {
            Buffer buffer = bufferIterator.next();
            bMin = buffer.min();
            if (bMin < min) {
                min = bMin;
            }
            bMax = buffer.max();
            if (bMax > max) {
                max = bMax;
            }
        }

        // Handle degenerate cases
        if (Double.compare(q, 0d) == 0 || n == 1) {
            return min;
        }
        if (Double.compare(q, 1) == 0) {
            return max;
        }
        if (n == 0) {
            return Double.NaN;
        }

        // See if we have all data in memory and enough free memory to copy.
        // If so, use Percentile to perform exact computation.
        if (bufferMap.halfEmpty()) {
            return new Percentile(percentile).evaluate(bufferMap.levelZeroData());
        }

        // Compute target rank
        final double targetRank = q * n;

        // Start with initial guess min + quantile * (max - min).
        double estimate = min + q * (max - min);
        double estimateRank = getRank(estimate);
        double lower;
        double upper;
        if (estimateRank > targetRank) {
            upper = estimate;
            lower = min;
        } else if (estimateRank < targetRank) {
            lower = estimate;
            upper = max;
        } else {
            return estimate;
        }
        final double eps = epsilon / 2;
        final double rankTolerance = eps * n;
        final double minWidth = eps / n;
        double intervalWidth = FastMath.abs(upper - lower);
        while (FastMath.abs(estimateRank - targetRank) > rankTolerance && intervalWidth > minWidth) {
            if (estimateRank > targetRank) {
                upper = estimate;
            } else {
                lower = estimate;
            }
            intervalWidth = upper - lower;
            estimate = lower + intervalWidth / 2;
            estimateRank = getRank(estimate);
        }
        return estimate;
    }

    /**
     * Gets the estimated rank of {@code value}, i.e.  \(|\{x \in X : x &lt; value\}|\)
     * where \(X\) is the set of values that have been consumed from the stream.
     *
     * @param value value whose overall rank is sought
     * @return estimated number of sample values that are strictly less than {@code value}
     */
    public double getRank(double value) {
        double rankSum = 0;
        Iterator<Buffer> bufferIterator = bufferMap.iterator();
        while (bufferIterator.hasNext()) {
            Buffer buffer = bufferIterator.next();
            rankSum += buffer.rankOf(value) * FastMath.pow(2, buffer.level);
        }
        return rankSum;
    }

    /**
     * Returns the estimated quantile position of value in the dataset.
     * Specifically, what is returned is an estimate of \(|\{x \in X : x &lt; value\}| / |X|\)
     * where \(X\) is the set of values that have been consumed from the stream.
     *
     * @param value value whose quantile rank is sought.
     * @return estimated proportion of sample values that are strictly less than {@code value}
     */
    public double getQuantileRank(double value) {
        return getRank(value) / getN();
    }

    @Override
    public void increment(double d) {
        n++;
        if (!currentBuffer.hasCapacity()) { // Need to get a new buffer to fill
            // First see if we have not yet created all the buffers
            if (bufferMap.canCreate()) {
                final int level = (int) Math.ceil(Math.max(0, log2(n/(s * FastMath.pow(2, h - 1)))));
                currentBuffer = bufferMap.create(level);
            } else { // All buffers have been created - need to merge to free one
                currentBuffer = bufferMap.merge();
            }
        }
        currentBuffer.consume(d);
    }

    /**
     * Maintains a buffer of values sampled from the input data stream.
     * <p>
     * The {@link #level} of a buffer determines its sampling frequency.
     * The {@link #consume(double)} method retains 1 out of every 2^level values
     * read in from the stream.
     * <p>
     * The {@link #size} of the buffer is the number of values that it can store
     * The buffer is considered full when it has consumed 2^level * size values.
     * <p>
     * The {@link #blockSize} of a buffer is 2^level.
     * The consume method starts each block by generating a random integer in
     * [0, blockSize - 1].  It then skips over all but the element with that offset
     * in the block, retaining only the selected value. So after 2^level * size
     * elements have been consumed, it will have retained size elements - one
     * from each 2^level block.
     * <p>
     * The {@link #mergeWith(Buffer)} method merges this buffer with another one,
     * The merge operation merges the data from the other buffer into this and clears
     * the other buffer (so it can consume data). Both buffers have their level
     * incremented by the merge. This operation is only used on full buffers.
     */
    private static class Buffer implements Serializable {
        /** Serialization version id */
        private static final long serialVersionUID = 1L;
        /** Number of values actually stored in the buffer */
        private final int size;
        /** Data sampled from the stream */
        private final double[] data;
        /** PRNG used for merges and stream sampling */
        private final RandomGenerator randomGenerator;
        /** Level of the buffer */
        private int level;
        /** Block size  = 2^level */
        private long blockSize;
        /** Next location in backing array for stored (taken) value */
        private int next;
        /** Number of values consumed in current 2^level block of values from the stream */
        private long consumed;
        /** Index of next value to take in current 2^level block */
        private long nextToTake;
        /** ID */
        private final UUID id;

        /**
         * Creates a new buffer capable of retaining size values with the given level.
         *
         * @param size number of values the buffer can retain
         * @param level the base 2 log of the sampling frequency
         *        (one out of every 2^level values is retained)
         * @param randomGenerator PRNG used for sampling and merge operations
         */
        Buffer(int size, int level, RandomGenerator randomGenerator) {
            this.size = size;
            data = new double[size];
            this.level = level;
            this.randomGenerator = randomGenerator;
            this.id = UUID.randomUUID();
            computeBlockSize();
        }

        /**
         * Sets blockSize and nextToTake based on level.
         */
        private void computeBlockSize() {
            if (level == 0) {
                blockSize = 1;
            } else {
                long product = 1;
                for (int i = 0; i < level; i++) {
                    product *= 2;
                }
                blockSize = product;
            }
            if (blockSize > 1) {
                nextToTake = randomGenerator.nextLong(blockSize);
            }
        }

        /**
         * Consumes a value from the input stream.
         * <p>
         * For each 2^level values consumed, one is added to the buffer.
         * The buffer is not considered full until 2^level * size values
         * have been consumed.
         * <p>
         * Sorts the data array if the consumption renders the buffer full.
         * <p>
         * There is no capacity check in this method.  Clients are expected
         * to use {@link #hasCapacity()} before invoking this method.  If
         * it is invoked on a full buffer, an ArrayIndexOutOfBounds exception
         * will result.
         *
         * @param value value to consume from the stream
         */
        public void consume(double value) {
            if (consumed == nextToTake) {
                data[next] = value;
                next++;
            }
            consumed++;
            if (consumed == blockSize) {
                if (next == size) {   // Buffer is full
                    Arrays.sort(data);
                } else {              // Reset in-block counter and nextToTake
                    consumed = 0;
                    if (blockSize > 1) {
                        nextToTake = randomGenerator.nextLong(blockSize);
                    }
                }
            }
        }

        /**
         * Merges this with other.
         * <p>
         * After the merge, this will be the merged buffer and other will be free.
         * Both will have level+1.
         * Post-merge, other can be used to accept new data.
         * <p>
         * The contents of the merged buffer (this after the merge) are determined
         * by randomly choosing one of the two retained elements in each of the
         * [0...size - 1] positions in the two buffers.
         * <p>
         * This and other must have the same level and both must be full.
         *
         * @param other initially full other buffer at the same level as this.
         * @throws MathIllegalArgumentException if either buffer is not full or they
         * have different levels
         */
        public void mergeWith(Buffer other) {
            // Make sure both this and other are full and have the same level
            if (this.hasCapacity() || other.hasCapacity() || other.level != this.level) {
                throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
            }
            // Randomly select one of the two entries for each slot
            for (int i = 0; i < size; i++) {
                if (randomGenerator.nextBoolean()) {
                    data[i] = other.data[i];
                }
            }
            // Re-sort data
            Arrays.sort(data);
            // Bump level of both buffers
            other.setLevel(level + 1);
            this.setLevel(level + 1);
            // Clear the free one (and compute new blocksize)
            other.clear();
        }

        /**
         * Merge this into a higher-level buffer.
         * <p>
         * Does not alter this; but after the merge, higher may have some of its
         * data replaced by data from this.  Levels are not changed for either buffer.
         * <p>
         * Probability of selection into the newly constituted higher buffer is weighted
         * according to level. So for example, if this has level 0 and higher has level
         * 2, the ith element of higher is 4 times more likely than the corresponding
         * element of this to retain its spot.
         * <p>
         * This method is only used when aggregating RandomPercentile instances.
         * <p>
         * Preconditions:
         * <ol><li> this.level < higher.level </li>
         *     <li> this.size = higher.size </li>
         *     <li> Both buffers are full </li>
         * </ol>
         *
         * @param higher higher-level buffer to merge this into
         * @throws MathIllegalArgumentException if the buffers have different sizes,
         * either buffer is not full or this has level greater than or equal to higher
         */
        public void mergeInto(Buffer higher) {
            // Check preconditions
            if (this.size != higher.size || this.hasCapacity() || higher.hasCapacity() ||
                    this.level >= higher.level) {
                throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
            }
            final int levelDifference = higher.level - this.level;
            int m = 1;
            for (int i = 0; i < levelDifference; i++) {
                m *= 2;
            }
            // Randomly select one of the two entries for each slot in higher, giving
            // m-times higher weight to the entries of higher.
            for (int i = 0; i < size; i++) {
                // data[i] <-> {0}, higher.data[i] <-> {1, ..., m}
                if (randomGenerator.nextInt(m + 1) == 0) {
                    higher.data[i] = data[i];
                }
            }
            // Resort higher's data
            Arrays.sort(higher.data);
        }

        /**
         * @return true if the buffer has capacity; false if it is full
         */
        public boolean hasCapacity() {
            // Buffer has capacity if it has not yet set all of its data
            // values or if it has but still has not finished its last block
            return next < size || consumed < blockSize;
        }

        /**
         * Sets the level of the buffer.
         *
         * @param level new level value
         */
        public void setLevel(int level) {
            this.level = level;
        }

        /**
         * Clears data, recomputes blockSize and resets consumed and nextToTake.
         */
        public void clear() {
            consumed = 0;
            next = 0;
            computeBlockSize();
        }

        /**
         * Returns a copy of the data that has been added to the buffer
         *
         * @return possibly unsorted copy of the portion of the buffer that has been filled
         */
        public double[] getData() {
            final double[] out = new double[next];
            System.arraycopy(data, 0, out, 0, next);
            return out;
        }

        /**
         * Returns the ordinal rank of value among the sampled values in this buffer.
         *
         * @param value value whose rank is sought
         * @return |{v in data : v < value}|
         */
        public int rankOf(double value) {
            int ret = 0;
            if (!hasCapacity()) { // Full sorted buffer, can do binary search
                ret = Arrays.binarySearch(data, value);
                if (ret < 0) {
                    return -ret - 1;
                } else {
                    return ret;
                }
            } else { // have to count - not sorted yet and can't sort yet
                for (int i = 0; i < next; i++) {
                    if (data[i] < value) {
                        ret++;
                    }
                }
                return ret;
            }
        }

        /**
         * @return the smallest value held in this buffer
         */
        public double min() {
            if (!hasCapacity()) {
                return data[0];
            } else {
                return StatUtils.min(getData());
            }
        }

        /**
         * @return the largest value held in this buffer
         */
        public double max() {
            if (!hasCapacity()) {
                return data[data.length - 1];
            } else {
                return StatUtils.max(getData());
            }
        }

        /**
         * @return the level of this buffer
         */
        public int getLevel() {
            return level;
        }

        /**
         * @return the id
         */
        public UUID getId() {
            return id;
        }
    }

    /**
     * Computes base 2 log of the argument.
     *
     * @param x input value
     * @return the value y such that 2^y = x
     */
    private static double log2(double x) {
        return Math.log(x) / Math.log(2);
    }

    /**
     * A map structure to hold the buffers.
     * Keys are levels and values are lists of buffers at the given level.
     * Overall capacity is limited by the total number of buffers.
     */
    private static class BufferMap implements Iterable<Buffer>, Serializable {
        /** Serialization version ID */
        private static final long serialVersionUID = 1L;
        /** Total number of buffers that can be created - cap for count */
        private final int capacity;
        /** PRNG used in merges */
        private final RandomGenerator randomGenerator;
        /** Total count of all buffers */
        private int count;
        /** Uniform buffer size */
        private final int bufferSize;
        /** Backing store for the buffer map. Keys are levels, values are lists of registered buffers. */
        private final Map<Integer,List<Buffer>> registry;
        /** Maximum buffer level */
        private int maxLevel;

        /**
         * Creates a BufferMap that can manage up to capacity buffers.
         * Buffers created by the pool with have size = buffersize.
         *
         * @param capacity cap on the number of buffers
         * @param bufferSize size of each buffer
         * @param randomGenerator RandomGenerator to use in merges
         */
        BufferMap(int capacity, int bufferSize, RandomGenerator randomGenerator) {
            this.bufferSize = bufferSize;
            this.capacity = capacity;
            this.randomGenerator = randomGenerator;
            this.registry = new HashMap<>();
        }

        /**
         * Copy constructor.
         *
         * @param original BufferMap to copy
         */
        BufferMap(BufferMap original) {
            super();
            this.bufferSize = original.bufferSize;
            this.capacity = original.capacity;
            this.count = 0;
            this.randomGenerator = original.randomGenerator;
            this.registry = new HashMap<>();
            Iterator<Buffer> iterator = original.iterator();
            while (iterator.hasNext()) {
                final Buffer current = iterator.next();
                // Create and register a new buffer at the same level
                final Buffer newCopy = create(current.getLevel());
                // Consume the data
                final double[] data = current.getData();
                for (double value : data) {
                    newCopy.consume(value);
                }
            }
        }

        /**
         * Tries to create a buffer with the given level.
         * <p>
         * If there is capacity to create a new buffer (i.e., fewer than
         * count have been created), a new buffer is created with the given
         * level, registered and returned.  If capacity has been reached,
         * null is returned.
         *
         * @param level level of the new buffer
         * @return an empty buffer or null if a buffer can't be provided
         */
        public Buffer create(int level) {
            if (!canCreate()) {
                return null;
            }
            count++;
            Buffer buffer = new Buffer(bufferSize, level, randomGenerator);
            List<Buffer> bufferList = registry.get(level);
            if (bufferList == null) {
                bufferList = new ArrayList<>();
                registry.put(level, bufferList);
            }
            bufferList.add(buffer);
            if (level > maxLevel) {
                maxLevel = level;
            }
            return buffer;
        }

        /**
         * Returns true if there is capacity to create a new buffer.
         *
         * @return true if fewer than capacity buffers have been created.
         */
        public boolean canCreate() {
            return count < capacity;
        }

        /**
         * Returns true if we have used less than half of the allocated storage.
         * <p>
         * Includes a check to make sure all buffers have level 0;
         * but this should always be the case.
         * <p>
         * When this method returns true, we have all consumed data in storage
         * and enough space to make a copy of the combined dataset.
         *
         * @return true if all buffers have level 0 and less than half of the
         * available storage has been used
         */
        public boolean halfEmpty() {
            return count * 2 < capacity &&
                    registry.size() == 1 &&
                    registry.containsKey(0);
        }

        /**
         * Returns a fresh copy of all data from level 0 buffers.
         *
         * @return combined data stored in all level 0 buffers
         */
        public double[] levelZeroData() {
            List<Buffer> levelZeroBuffers = registry.get(0);
            // First determine the combined size of the data
            int length = 0;
            for (Buffer buffer : levelZeroBuffers) {
                if (!buffer.hasCapacity()) { // full buffer
                    length += buffer.size;
                } else {
                    length += buffer.next;  // filled amount
                }
            }
            // Copy the data
            int pos = 0;
            int currLen;
            final double[] out = new double[length];
            for (Buffer buffer : levelZeroBuffers) {
                if (!buffer.hasCapacity()) {
                    currLen = buffer.size;
                } else {
                    currLen =  buffer.next;
                }
                System.arraycopy(buffer.data, 0, out, pos, currLen);
                pos += currLen;
            }
            return out;
        }

        /**
         * Finds the lowest level l where there exist at least two buffers,
         * merges them to create a new buffer with level l+1 and returns
         * a free buffer with level l+1.
         *
         * @return free buffer that can accept data
         */
        public Buffer merge() {
            int l = 0;
            List<Buffer> mergeCandidates = null;
            // Find the lowest level containing at least two buffers
            while (mergeCandidates == null && l <= maxLevel) {
                final List<Buffer> bufferList = registry.get(l);
                if (bufferList != null && bufferList.size() > 1) {
                    mergeCandidates = bufferList;
                } else {
                    l++;
                }
            }
            if (mergeCandidates == null) {
                // Should never happen
                throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
            }
            Buffer buffer1 = mergeCandidates.get(0);
            Buffer buffer2 = mergeCandidates.get(1);
            // Remove buffers to be merged
            mergeCandidates.remove(0);
            mergeCandidates.remove(0);
            // If these are the last level-l buffers, remove the empty list
            if (registry.get(l).isEmpty()) {
                registry.remove(l);
            }
            // Merge the buffers
            buffer1.mergeWith(buffer2);
            // Now both buffers have level l+1; buffer1 is full and buffer2 is free.
            // Register both buffers
            register(buffer1);
            register(buffer2);

            // Return the free one
            return buffer2;
        }

        /**
         * Clears the buffer map.
         */
        public void clear() {
            for (List<Buffer> bufferList : registry.values()) {
                bufferList.clear();
            }
            registry.clear();
            count = 0;
        }

        /**
         * Registers a buffer.
         *
         * @param buffer Buffer to be registered.
         */
        public void register(Buffer buffer) {
            final int level = buffer.getLevel();
            List<Buffer> list = registry.get(level);
            if (list == null) {
                list = new ArrayList<>();
                registry.put(level, list);
                if (level > maxLevel) {
                    maxLevel = level;
                }
            }
            list.add(buffer);
        }

        /**
         * De-register a buffer, without clearing it.
         *
         * @param buffer Buffer to be de-registered
         * @throws IllegalStateException if the buffer is not registered
         */
        public void deRegister(Buffer buffer) {
            final Iterator<Buffer> iterator = registry.get(buffer.getLevel()).iterator();
            while (iterator.hasNext()) {
                if (iterator.next().getId().equals(buffer.getId())) {
                    iterator.remove();
                    return;
                }
            }
            throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
        }

        /**
         * Returns an iterator over all of the buffers. Iteration goes by level, with
         * level 0 first.  Assumes there are no empty buffer lists.
         */
        @Override
        public Iterator<Buffer> iterator() {
            return new Iterator<Buffer>() {

                /** Outer loop iterator, from level to level. */
                private final Iterator<Integer> levelIterator = registry.keySet().iterator();

                /** List of buffers at current level. */
                private List<Buffer> currentList = registry.get(levelIterator.next());

                /** Inner loop iterator, from buffer to buffer. */
                private Iterator<Buffer> bufferIterator =
                        currentList == null ? null : currentList.iterator();

                @Override
                public boolean hasNext() {
                    if (bufferIterator == null) {
                        return false;
                    }
                    if (bufferIterator.hasNext()) {
                        return true;
                    }
                    // The current level iterator has just finished, try to bump level
                    if (levelIterator.hasNext()) {
                        List<Buffer> currentList = registry.get(levelIterator.next());
                        bufferIterator = currentList.iterator();
                        return true;
                    } else {
                        // Nothing left, signal this by nulling bufferIterator
                        bufferIterator = null;
                        return false;
                    }
                }

                @Override
                public Buffer next() {
                     if (hasNext()) {
                         return bufferIterator.next();
                     }
                     throw new NoSuchElementException();
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            };
        }

        /**
         * Absorbs the data in other into this, merging buffers as necessary to trim
         * the aggregate down to capacity. This method is only used when aggregating
         * RandomPercentile instances.
         *
         * @param other other BufferMap to merge in
         */
        public void absorb(BufferMap other) {
            // Add all of other's buffers to the map - possibly exceeding cap
            boolean full = true;
            Iterator<Buffer> otherIterator = other.iterator();
            while (otherIterator.hasNext()) {
                Buffer buffer = otherIterator.next();
                if (buffer.hasCapacity()) {
                    full = false;
                }
                register(buffer);
                count++;
            }
            // Now eliminate the excess by merging
            while (count > capacity || (count == capacity && full)) {
                mergeUp();
                count--;
            }
        }

        /**
         * Find two buffers, first and second, of minimal level. Then merge
         * first into second and discard first.
         * <p>
         * If the buffers have different levels, make second the higher level
         * buffer and make probability of selection in the merge proportional
         * to level weight ratio.
         * <p>
         * This method is only used when aggregating RandomPercentile instances.
         */
        public void mergeUp() {
            // Find two minimum-level buffers to merge
            // Loop depends on two invariants:
            //   0) iterator goes in level order
            //   1) there are no empty lists in the registry
            Iterator<Buffer> bufferIterator = iterator();
            Buffer first = null;
            Buffer second = null;
            while ((first == null || second == null) && bufferIterator.hasNext()) {
                Buffer buffer = bufferIterator.next();
                if (!buffer.hasCapacity()) { // Skip not full buffers
                    if (first == null) {
                        first = buffer;
                    } else {
                        second = buffer;
                    }
                }
            }
            if (first == null || second == null || first.level > second.level) {
                throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
            }
            // Merge first into second and deregister first.
            // Assumes that first has level <= second (checked above).
            if (first.getLevel() == second.getLevel()) {
                deRegister(first);
                deRegister(second);
                second.mergeWith(first);
                register(second);
            } else {
                deRegister(first);
                first.mergeInto(second);
            }
        }
    }

    /**
     * Computes the given percentile by combining the data from the collection
     * of aggregates. The result describes the combined sample of all data added
     * to any of the aggregates.
     *
     * @param percentile desired percentile (scaled 0-100)
     * @param aggregates RandomPercentile instances to combine data from
     * @return estimate of the given percentile using combined data from the aggregates
     * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
     */
    public double reduce(double percentile, Collection<RandomPercentile> aggregates) {
        if (percentile > 100 || percentile < 0) {
            throw new MathIllegalArgumentException(LocalizedCoreFormats.OUT_OF_RANGE,
                                                   percentile, 0, 100);
        }

        // First see if we can copy all data and just compute exactly.
        // The following could be improved to verify that all have only level 0 buffers
        // and the sum of the data sizes is less than 1/2 total capacity.  Here we
        // just check that each of the aggregates is less than half full.
        Iterator<RandomPercentile> iterator = aggregates.iterator();
        boolean small = true;
        while (small && iterator.hasNext()) {
            small = iterator.next().bufferMap.halfEmpty();
        }
        if (small) {
            iterator = aggregates.iterator();
            double[] combined = {};
            while (iterator.hasNext()) {
               combined = MathArrays.concatenate(combined, iterator.next().bufferMap.levelZeroData());
            }
            final Percentile exactP = new Percentile(percentile);
            return exactP.evaluate(combined);
        }

        // Below largely duplicates code in getResult(percentile).
        // Common binary search code with function parameter should be factored out.

        // Get global max and min to bound binary search and total N
        double min = Double.POSITIVE_INFINITY;
        double max = Double.NEGATIVE_INFINITY;
        double combinedN = 0;
        iterator = aggregates.iterator();
        while (iterator.hasNext()) {
            final RandomPercentile curr = iterator.next();
            final double curMin = curr.getResult(0);
            final double curMax = curr.getResult(100);
            if (curMin < min) {
                min = curMin;
            }
            if (curMax > max) {
                max = curMax;
            }
            combinedN += curr.getN();
        }

        final double q = percentile / 100;
        // Handle degenerate cases
        if (Double.compare(q, 0d) == 0) {
            return min;
        }
        if (Double.compare(q, 1) == 0) {
            return max;
        }

        // Compute target rank
        final double targetRank = q * combinedN;

        // Perform binary search using aggregated rank computation
        // Start with initial guess min + quantile * (max - min).
        double estimate = min + q * (max - min);
        double estimateRank = getAggregateRank(estimate, aggregates);
        double lower;
        double upper;
        if (estimateRank > targetRank) {
            upper = estimate;
            lower = min;
        } else if (estimateRank < targetRank) {
            lower = estimate;
            upper = max;
        } else {
            return estimate;
        }
        final double eps = epsilon / 2;
        double intervalWidth = FastMath.abs(upper - lower);
        while (FastMath.abs(estimateRank / combinedN - q) > eps && intervalWidth > eps / combinedN) {
            if (estimateRank == targetRank) {
                return estimate;
            }
            if (estimateRank > targetRank) {
                upper = estimate;
            } else {
                lower = estimate;
            }
            intervalWidth = FastMath.abs(upper - lower);
            estimate = lower + intervalWidth / 2;
            estimateRank = getAggregateRank(estimate, aggregates);
        }
        return estimate;
    }

    /**
     * Computes the estimated rank of value in the combined dataset of the aggregates.
     * Sums the values from {@link #getRank(double)}.
     *
     * @param value value whose rank is sought
     * @param aggregates collection to aggregate rank over
     * @return estimated number of elements in the combined dataset that are less than value
     */
    public double getAggregateRank(double value, Collection<RandomPercentile> aggregates) {
        double result = 0;
        final Iterator<RandomPercentile> iterator = aggregates.iterator();
        while (iterator.hasNext()) {
            result += iterator.next().getRank(value);
        }
        return result;
    }

    /**
     * Returns the estimated quantile position of value in the combined dataset of the aggregates.
     * Specifically, what is returned is an estimate of \(|\{x \in X : x &lt; value\}| / |X|\)
     * where \(X\) is the set of values that have been consumed from all of the datastreams
     * feeding the aggregates.
     *
     * @param value value whose quantile rank is sought.
     * @param aggregates collection of RandomPercentile instances being combined
     * @return estimated proportion of combined sample values that are strictly less than {@code value}
     */
    public double getAggregateQuantileRank(double value, Collection<RandomPercentile> aggregates) {
        return getAggregateRank(value, aggregates) / getAggregateN(aggregates);
    }

    /**
     * Returns the total number of values that have been consumed by the aggregates.
     *
     * @param aggregates collection of RandomPercentile instances whose combined sample size is sought
     * @return total number of values that have been consumed by the aggregates
     */
    public double getAggregateN(Collection<RandomPercentile> aggregates) {
        double result = 0;
        final Iterator<RandomPercentile> iterator = aggregates.iterator();
        while (iterator.hasNext()) {
            result += iterator.next().getN();
        }
        return result;
    }

    /**
     * Aggregates the provided instance into this instance.
     * <p>
     * Other must have the same buffer size as this. If the combined data size
     * exceeds the maximum storage configured for this instance, buffers are
     * merged to create capacity. If all that is needed is computation of
     * aggregate results, {@link #reduce(double, Collection)} is faster,
     * may be more accurate and does not require the buffer sizes to be the same.
     *
     * @param other the instance to aggregate into this instance
     * @throws NullArgumentException if the input is null
     * @throws IllegalArgumentException if other has different buffer size than this
     */
    @Override
    public void aggregate(RandomPercentile other)
        throws NullArgumentException {
        if (other == null) {
            throw new NullArgumentException();
        }
        if (other.s != s) {
            throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
        }
        bufferMap.absorb(other.bufferMap);
        n += other.n;
    }

    /**
     * Returns the maximum number of {@code double} values that a {@code RandomPercentile}
     * instance created with the given {@code epsilon} value will retain in memory.
     * <p>
     * If the number of values that have been consumed from the stream is less than 1/2
     * of this value, reported statistics are exact.
     *
     * @param epsilon bound on the relative quantile error (see class javadoc)
     * @return upper bound on the total number of primitive double values retained in memory
     * @throws MathIllegalArgumentException if epsilon is not in the interval (0,1)
     */
    public static long maxValuesRetained(double epsilon) {
        if (epsilon >= 1) {
            throw new MathIllegalArgumentException(
                    LocalizedCoreFormats.NUMBER_TOO_LARGE_BOUND_EXCLUDED, epsilon, 1);
        }
        if (epsilon <= 0) {
            throw new MathIllegalArgumentException(
                    LocalizedCoreFormats.NUMBER_TOO_SMALL_BOUND_EXCLUDED, epsilon, 0);
        }
        final long h = (long) FastMath.ceil(log2(1/epsilon));
        final long s = (long) FastMath.ceil(FastMath.sqrt(log2(1/epsilon)) / epsilon);
        return (h+1) * s;
    }
}