RandomPercentile.java
/*
* Licensed to the Hipparchus project under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The Hipparchus project licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hipparchus.stat.descriptive.rank;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import org.hipparchus.exception.LocalizedCoreFormats;
import org.hipparchus.exception.MathIllegalArgumentException;
import org.hipparchus.exception.MathIllegalStateException;
import org.hipparchus.exception.NullArgumentException;
import org.hipparchus.random.RandomGenerator;
import org.hipparchus.random.Well19937c;
import org.hipparchus.stat.StatUtils;
import org.hipparchus.stat.descriptive.AbstractStorelessUnivariateStatistic;
import org.hipparchus.stat.descriptive.AggregatableStatistic;
import org.hipparchus.stat.descriptive.StorelessUnivariateStatistic;
import org.hipparchus.util.FastMath;
import org.hipparchus.util.MathArrays;
/**
* A {@link StorelessUnivariateStatistic} estimating percentiles using the
* <a href=http://dimacs.rutgers.edu/~graham/pubs/papers/nquantiles.pdf>RANDOM</a>
* Algorithm.
* <p>
* Storage requirements for the RANDOM algorithm depend on the desired
* accuracy of quantile estimates. Quantile estimate accuracy is defined as follows.
* <p>
* Let \(X\) be the set of all data values consumed from the stream and let \(q\)
* be a quantile (measured between 0 and 1) to be estimated. If <ul>
* <li>\(\epsilon\) is the configured accuracy</li>
* <li> \(\hat{q}\) is a RandomPercentile estimate for \(q\) (what is returned
* by {@link #getResult()} or {@link #getResult(double)}) with \(100q\) as
* actual parameter)</li>
* <li> \(rank(\hat{q}) = |\{x \in X : x < \hat{q}\}|\) is the actual rank of
* \(\hat{q}\) in the full data stream</li>
* <li>\(n = |X|\) is the number of observations</li></ul>
* then we can expect \((q - \epsilon)n < rank(\hat{q}) < (q + \epsilon)n\).
* <p>
* The algorithm maintains \(\left\lceil {log_{2}(1/\epsilon)}\right\rceil + 1\) buffers
* of size \(\left\lceil {1/\epsilon \sqrt{log_2(1/\epsilon)}}\right\rceil\). When
* {@code epsilon} is set to the default value of \(10^{-4}\), this makes 15 buffers
* of size 36,453.
* <p>
* The algorithm uses the buffers to maintain samples of data from the stream. Until
* all buffers are full, the entire sample is stored in the buffers.
* If one of the {@code getResult} methods is called when all data are available in memory
* and there is room to make a copy of the data (meaning the combined set of buffers is
* less than half full), the {@code getResult} method delegates to a {@link Percentile}
* instance to compute and return the exact value for the desired quantile.
* For default {@code epsilon}, this means exact values will be returned whenever fewer than
* \(\left\lceil {15 \times 36453 / 2} \right\rceil = 273,398\) values have been consumed
* from the data stream.
* <p>
* When buffers become full, the algorithm merges buffers so that they effectively represent
* a larger set of values than they can hold. Subsequently, data values are sampled from the
* stream to fill buffers freed by merge operations. Both the merging and the sampling
* require random selection, which is done using a {@code RandomGenerator}. To get
* repeatable results for large data streams, users should provide {@code RandomGenerator}
* instances with fixed seeds. {@code RandomPercentile} itself does not reseed or otherwise
* initialize the {@code RandomGenerator} provided to it. By default, it uses a
* {@link Well19937c} generator with the default seed.
* <p>
* Note: This implementation is not thread-safe.
*/
public class RandomPercentile
extends AbstractStorelessUnivariateStatistic implements StorelessUnivariateStatistic,
AggregatableStatistic<RandomPercentile>, Serializable {
/** Default quantile estimation error setting */
public static final double DEFAULT_EPSILON = 1e-4;
/** Serialization version id */
private static final long serialVersionUID = 1L;
/** Storage size of each buffer */
private final int s;
/** Maximum number of buffers minus 1 */
private final int h;
/** Data structure used to manage buffers */
private final BufferMap bufferMap;
/** Bound on the quantile estimation error */
private final double epsilon;
/** Source of random data */
private final RandomGenerator randomGenerator;
/** Number of elements consumed from the input data stream */
private long n;
/** Buffer currently being filled */
private Buffer currentBuffer;
/**
* Constructs a {@code RandomPercentile} with quantile estimation error
* {@code epsilon} using {@code randomGenerator} as its source of random data.
*
* @param epsilon bound on quantile estimation error (see class javadoc)
* @param randomGenerator PRNG used in sampling and merge operations
* @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
*/
public RandomPercentile(double epsilon, RandomGenerator randomGenerator) {
if (epsilon <= 0) {
throw new MathIllegalArgumentException(LocalizedCoreFormats.NUMBER_TOO_SMALL,
epsilon, 0);
}
this.h = (int) FastMath.ceil(log2(1/epsilon));
this.s = (int) FastMath.ceil(FastMath.sqrt(log2(1/epsilon)) / epsilon);
this.randomGenerator = randomGenerator;
bufferMap = new BufferMap(h + 1, s, randomGenerator);
currentBuffer = bufferMap.create(0);
this.epsilon = epsilon;
}
/**
* Constructs a {@code RandomPercentile} with default estimation error
* using {@code randomGenerator} as its source of random data.
*
* @param randomGenerator PRNG used in sampling and merge operations
* @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
*/
public RandomPercentile(RandomGenerator randomGenerator) {
this(DEFAULT_EPSILON, randomGenerator);
}
/**
* Constructs a {@code RandomPercentile} with quantile estimation error
* {@code epsilon} using the default PRNG as source of random data.
*
* @param epsilon bound on quantile estimation error (see class javadoc)
* @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
*/
public RandomPercentile(double epsilon) {
this(epsilon, new Well19937c());
}
/**
* Constructs a {@code RandomPercentile} with quantile estimation error
* set to the default ({@link #DEFAULT_EPSILON}), using the default PRNG
* as source of random data.
*/
public RandomPercentile() {
this(DEFAULT_EPSILON, new Well19937c());
}
/**
* Copy constructor, creates a new {@code RandomPercentile} identical
* to the {@code original}. Note: the RandomGenerator used by the new
* instance is referenced, not copied - i.e., the new instance shares
* a generator with the original.
*
* @param original the {@code PSquarePercentile} instance to copy
*/
public RandomPercentile(RandomPercentile original) {
super();
this.h = original.h;
this.n = original.n;
this.s = original.s;
this.epsilon = original.epsilon;
this.bufferMap = new BufferMap(original.bufferMap);
this.randomGenerator = original.randomGenerator;
Iterator<Buffer> iterator = bufferMap.iterator();
Buffer current = null;
Buffer curr = null;
// See if there is a partially filled buffer - that will be currentBuffer
while (current == null && iterator.hasNext()) {
curr = iterator.next();
if (curr.hasCapacity()) {
current = curr;
}
}
// If there is no partially filled buffer, just assign the last one.
// Next increment() will find no capacity and create a new one or trigger
// a merge.
this.currentBuffer = current == null ? curr : current;
}
@Override
public long getN() {
return n;
}
/**
* Returns an estimate of the given percentile, computed using the designated
* array segment as input data.
*
* @param values source of input data
* @param begin position of the first element of the values array to include
* @param length number of array elements to include
* @param percentile desired percentile (scaled 0 - 100)
*
* @return estimated percentile
* @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
*/
public double evaluate(final double percentile, final double[] values, final int begin, final int length)
throws MathIllegalArgumentException {
if (MathArrays.verifyValues(values, begin, length)) {
RandomPercentile randomPercentile = new RandomPercentile(this.epsilon,
this.randomGenerator);
randomPercentile.incrementAll(values, begin, length);
return randomPercentile.getResult(percentile);
}
return Double.NaN;
}
/**
* Returns an estimate of the median, computed using the designated
* array segment as input data.
*
* @param values source of input data
* @param begin position of the first element of the values array to include
* @param length number of array elements to include
*
* @return estimated percentile
* @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
*/
@Override
public double evaluate(final double[] values, final int begin, final int length) {
return evaluate(50d, values, begin, length);
}
/**
* Returns an estimate of percentile over the given array.
*
* @param values source of input data
* @param percentile desired percentile (scaled 0 - 100)
*
* @return estimated percentile
* @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
*/
public double evaluate(final double percentile, final double[] values) {
return evaluate(percentile, values, 0, values.length);
}
@Override
public RandomPercentile copy() {
return new RandomPercentile(this);
}
@Override
public void clear() {
n = 0;
bufferMap.clear();
currentBuffer = bufferMap.create(0);
}
/**
* Returns an estimate of the median.
*/
@Override
public double getResult() {
return getResult(50d);
}
/**
* Returns an estimate of the given percentile.
*
* @param percentile desired percentile (scaled 0 - 100)
* @return estimated percentile
* @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
*/
public double getResult(double percentile) {
if (percentile > 100 || percentile < 0) {
throw new MathIllegalArgumentException(LocalizedCoreFormats.OUT_OF_RANGE,
percentile, 0, 100);
}
// Convert to internal quantile scale
final double q = percentile / 100;
// First get global min and max to bound search.
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double bMin;
double bMax;
Iterator<Buffer> bufferIterator = bufferMap.iterator();
while (bufferIterator.hasNext()) {
Buffer buffer = bufferIterator.next();
bMin = buffer.min();
if (bMin < min) {
min = bMin;
}
bMax = buffer.max();
if (bMax > max) {
max = bMax;
}
}
// Handle degenerate cases
if (Double.compare(q, 0d) == 0 || n == 1) {
return min;
}
if (Double.compare(q, 1) == 0) {
return max;
}
if (n == 0) {
return Double.NaN;
}
// See if we have all data in memory and enough free memory to copy.
// If so, use Percentile to perform exact computation.
if (bufferMap.halfEmpty()) {
return new Percentile(percentile).evaluate(bufferMap.levelZeroData());
}
// Compute target rank
final double targetRank = q * n;
// Start with initial guess min + quantile * (max - min).
double estimate = min + q * (max - min);
double estimateRank = getRank(estimate);
double lower;
double upper;
if (estimateRank > targetRank) {
upper = estimate;
lower = min;
} else if (estimateRank < targetRank) {
lower = estimate;
upper = max;
} else {
return estimate;
}
final double eps = epsilon / 2;
final double rankTolerance = eps * n;
final double minWidth = eps / n;
double intervalWidth = FastMath.abs(upper - lower);
while (FastMath.abs(estimateRank - targetRank) > rankTolerance && intervalWidth > minWidth) {
if (estimateRank > targetRank) {
upper = estimate;
} else {
lower = estimate;
}
intervalWidth = upper - lower;
estimate = lower + intervalWidth / 2;
estimateRank = getRank(estimate);
}
return estimate;
}
/**
* Gets the estimated rank of {@code value}, i.e. \(|\{x \in X : x < value\}|\)
* where \(X\) is the set of values that have been consumed from the stream.
*
* @param value value whose overall rank is sought
* @return estimated number of sample values that are strictly less than {@code value}
*/
public double getRank(double value) {
double rankSum = 0;
Iterator<Buffer> bufferIterator = bufferMap.iterator();
while (bufferIterator.hasNext()) {
Buffer buffer = bufferIterator.next();
rankSum += buffer.rankOf(value) * FastMath.pow(2, buffer.level);
}
return rankSum;
}
/**
* Returns the estimated quantile position of value in the dataset.
* Specifically, what is returned is an estimate of \(|\{x \in X : x < value\}| / |X|\)
* where \(X\) is the set of values that have been consumed from the stream.
*
* @param value value whose quantile rank is sought.
* @return estimated proportion of sample values that are strictly less than {@code value}
*/
public double getQuantileRank(double value) {
return getRank(value) / getN();
}
@Override
public void increment(double d) {
n++;
if (!currentBuffer.hasCapacity()) { // Need to get a new buffer to fill
// First see if we have not yet created all the buffers
if (bufferMap.canCreate()) {
final int level = (int) Math.ceil(Math.max(0, log2(n/(s * FastMath.pow(2, h - 1)))));
currentBuffer = bufferMap.create(level);
} else { // All buffers have been created - need to merge to free one
currentBuffer = bufferMap.merge();
}
}
currentBuffer.consume(d);
}
/**
* Maintains a buffer of values sampled from the input data stream.
* <p>
* The {@link #level} of a buffer determines its sampling frequency.
* The {@link #consume(double)} method retains 1 out of every 2^level values
* read in from the stream.
* <p>
* The {@link #size} of the buffer is the number of values that it can store
* The buffer is considered full when it has consumed 2^level * size values.
* <p>
* The {@link #blockSize} of a buffer is 2^level.
* The consume method starts each block by generating a random integer in
* [0, blockSize - 1]. It then skips over all but the element with that offset
* in the block, retaining only the selected value. So after 2^level * size
* elements have been consumed, it will have retained size elements - one
* from each 2^level block.
* <p>
* The {@link #mergeWith(Buffer)} method merges this buffer with another one,
* The merge operation merges the data from the other buffer into this and clears
* the other buffer (so it can consume data). Both buffers have their level
* incremented by the merge. This operation is only used on full buffers.
*/
private static class Buffer implements Serializable {
/** Serialization version id */
private static final long serialVersionUID = 1L;
/** Number of values actually stored in the buffer */
private final int size;
/** Data sampled from the stream */
private final double[] data;
/** PRNG used for merges and stream sampling */
private final RandomGenerator randomGenerator;
/** Level of the buffer */
private int level;
/** Block size = 2^level */
private long blockSize;
/** Next location in backing array for stored (taken) value */
private int next;
/** Number of values consumed in current 2^level block of values from the stream */
private long consumed;
/** Index of next value to take in current 2^level block */
private long nextToTake;
/** ID */
private final UUID id;
/**
* Creates a new buffer capable of retaining size values with the given level.
*
* @param size number of values the buffer can retain
* @param level the base 2 log of the sampling frequency
* (one out of every 2^level values is retained)
* @param randomGenerator PRNG used for sampling and merge operations
*/
Buffer(int size, int level, RandomGenerator randomGenerator) {
this.size = size;
data = new double[size];
this.level = level;
this.randomGenerator = randomGenerator;
this.id = UUID.randomUUID();
computeBlockSize();
}
/**
* Sets blockSize and nextToTake based on level.
*/
private void computeBlockSize() {
if (level == 0) {
blockSize = 1;
} else {
long product = 1;
for (int i = 0; i < level; i++) {
product *= 2;
}
blockSize = product;
}
if (blockSize > 1) {
nextToTake = randomGenerator.nextLong(blockSize);
}
}
/**
* Consumes a value from the input stream.
* <p>
* For each 2^level values consumed, one is added to the buffer.
* The buffer is not considered full until 2^level * size values
* have been consumed.
* <p>
* Sorts the data array if the consumption renders the buffer full.
* <p>
* There is no capacity check in this method. Clients are expected
* to use {@link #hasCapacity()} before invoking this method. If
* it is invoked on a full buffer, an ArrayIndexOutOfBounds exception
* will result.
*
* @param value value to consume from the stream
*/
public void consume(double value) {
if (consumed == nextToTake) {
data[next] = value;
next++;
}
consumed++;
if (consumed == blockSize) {
if (next == size) { // Buffer is full
Arrays.sort(data);
} else { // Reset in-block counter and nextToTake
consumed = 0;
if (blockSize > 1) {
nextToTake = randomGenerator.nextLong(blockSize);
}
}
}
}
/**
* Merges this with other.
* <p>
* After the merge, this will be the merged buffer and other will be free.
* Both will have level+1.
* Post-merge, other can be used to accept new data.
* <p>
* The contents of the merged buffer (this after the merge) are determined
* by randomly choosing one of the two retained elements in each of the
* [0...size - 1] positions in the two buffers.
* <p>
* This and other must have the same level and both must be full.
*
* @param other initially full other buffer at the same level as this.
* @throws MathIllegalArgumentException if either buffer is not full or they
* have different levels
*/
public void mergeWith(Buffer other) {
// Make sure both this and other are full and have the same level
if (this.hasCapacity() || other.hasCapacity() || other.level != this.level) {
throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
}
// Randomly select one of the two entries for each slot
for (int i = 0; i < size; i++) {
if (randomGenerator.nextBoolean()) {
data[i] = other.data[i];
}
}
// Re-sort data
Arrays.sort(data);
// Bump level of both buffers
other.setLevel(level + 1);
this.setLevel(level + 1);
// Clear the free one (and compute new blocksize)
other.clear();
}
/**
* Merge this into a higher-level buffer.
* <p>
* Does not alter this; but after the merge, higher may have some of its
* data replaced by data from this. Levels are not changed for either buffer.
* <p>
* Probability of selection into the newly constituted higher buffer is weighted
* according to level. So for example, if this has level 0 and higher has level
* 2, the ith element of higher is 4 times more likely than the corresponding
* element of this to retain its spot.
* <p>
* This method is only used when aggregating RandomPercentile instances.
* <p>
* Preconditions:
* <ol><li> this.level < higher.level </li>
* <li> this.size = higher.size </li>
* <li> Both buffers are full </li>
* </ol>
*
* @param higher higher-level buffer to merge this into
* @throws MathIllegalArgumentException if the buffers have different sizes,
* either buffer is not full or this has level greater than or equal to higher
*/
public void mergeInto(Buffer higher) {
// Check preconditions
if (this.size != higher.size || this.hasCapacity() || higher.hasCapacity() ||
this.level >= higher.level) {
throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
}
final int levelDifference = higher.level - this.level;
int m = 1;
for (int i = 0; i < levelDifference; i++) {
m *= 2;
}
// Randomly select one of the two entries for each slot in higher, giving
// m-times higher weight to the entries of higher.
for (int i = 0; i < size; i++) {
// data[i] <-> {0}, higher.data[i] <-> {1, ..., m}
if (randomGenerator.nextInt(m + 1) == 0) {
higher.data[i] = data[i];
}
}
// Resort higher's data
Arrays.sort(higher.data);
}
/**
* @return true if the buffer has capacity; false if it is full
*/
public boolean hasCapacity() {
// Buffer has capacity if it has not yet set all of its data
// values or if it has but still has not finished its last block
return next < size || consumed < blockSize;
}
/**
* Sets the level of the buffer.
*
* @param level new level value
*/
public void setLevel(int level) {
this.level = level;
}
/**
* Clears data, recomputes blockSize and resets consumed and nextToTake.
*/
public void clear() {
consumed = 0;
next = 0;
computeBlockSize();
}
/**
* Returns a copy of the data that has been added to the buffer
*
* @return possibly unsorted copy of the portion of the buffer that has been filled
*/
public double[] getData() {
final double[] out = new double[next];
System.arraycopy(data, 0, out, 0, next);
return out;
}
/**
* Returns the ordinal rank of value among the sampled values in this buffer.
*
* @param value value whose rank is sought
* @return |{v in data : v < value}|
*/
public int rankOf(double value) {
int ret = 0;
if (!hasCapacity()) { // Full sorted buffer, can do binary search
ret = Arrays.binarySearch(data, value);
if (ret < 0) {
return -ret - 1;
} else {
return ret;
}
} else { // have to count - not sorted yet and can't sort yet
for (int i = 0; i < next; i++) {
if (data[i] < value) {
ret++;
}
}
return ret;
}
}
/**
* @return the smallest value held in this buffer
*/
public double min() {
if (!hasCapacity()) {
return data[0];
} else {
return StatUtils.min(getData());
}
}
/**
* @return the largest value held in this buffer
*/
public double max() {
if (!hasCapacity()) {
return data[data.length - 1];
} else {
return StatUtils.max(getData());
}
}
/**
* @return the level of this buffer
*/
public int getLevel() {
return level;
}
/**
* @return the id
*/
public UUID getId() {
return id;
}
}
/**
* Computes base 2 log of the argument.
*
* @param x input value
* @return the value y such that 2^y = x
*/
private static double log2(double x) {
return Math.log(x) / Math.log(2);
}
/**
* A map structure to hold the buffers.
* Keys are levels and values are lists of buffers at the given level.
* Overall capacity is limited by the total number of buffers.
*/
private static class BufferMap implements Iterable<Buffer>, Serializable {
/** Serialization version ID */
private static final long serialVersionUID = 1L;
/** Total number of buffers that can be created - cap for count */
private final int capacity;
/** PRNG used in merges */
private final RandomGenerator randomGenerator;
/** Total count of all buffers */
private int count;
/** Uniform buffer size */
private final int bufferSize;
/** Backing store for the buffer map. Keys are levels, values are lists of registered buffers. */
private final Map<Integer,List<Buffer>> registry;
/** Maximum buffer level */
private int maxLevel;
/**
* Creates a BufferMap that can manage up to capacity buffers.
* Buffers created by the pool with have size = buffersize.
*
* @param capacity cap on the number of buffers
* @param bufferSize size of each buffer
* @param randomGenerator RandomGenerator to use in merges
*/
BufferMap(int capacity, int bufferSize, RandomGenerator randomGenerator) {
this.bufferSize = bufferSize;
this.capacity = capacity;
this.randomGenerator = randomGenerator;
this.registry = new HashMap<>();
}
/**
* Copy constructor.
*
* @param original BufferMap to copy
*/
BufferMap(BufferMap original) {
super();
this.bufferSize = original.bufferSize;
this.capacity = original.capacity;
this.count = 0;
this.randomGenerator = original.randomGenerator;
this.registry = new HashMap<>();
Iterator<Buffer> iterator = original.iterator();
while (iterator.hasNext()) {
final Buffer current = iterator.next();
// Create and register a new buffer at the same level
final Buffer newCopy = create(current.getLevel());
// Consume the data
final double[] data = current.getData();
for (double value : data) {
newCopy.consume(value);
}
}
}
/**
* Tries to create a buffer with the given level.
* <p>
* If there is capacity to create a new buffer (i.e., fewer than
* count have been created), a new buffer is created with the given
* level, registered and returned. If capacity has been reached,
* null is returned.
*
* @param level level of the new buffer
* @return an empty buffer or null if a buffer can't be provided
*/
public Buffer create(int level) {
if (!canCreate()) {
return null;
}
count++;
Buffer buffer = new Buffer(bufferSize, level, randomGenerator);
List<Buffer> bufferList = registry.get(level);
if (bufferList == null) {
bufferList = new ArrayList<>();
registry.put(level, bufferList);
}
bufferList.add(buffer);
if (level > maxLevel) {
maxLevel = level;
}
return buffer;
}
/**
* Returns true if there is capacity to create a new buffer.
*
* @return true if fewer than capacity buffers have been created.
*/
public boolean canCreate() {
return count < capacity;
}
/**
* Returns true if we have used less than half of the allocated storage.
* <p>
* Includes a check to make sure all buffers have level 0;
* but this should always be the case.
* <p>
* When this method returns true, we have all consumed data in storage
* and enough space to make a copy of the combined dataset.
*
* @return true if all buffers have level 0 and less than half of the
* available storage has been used
*/
public boolean halfEmpty() {
return count * 2 < capacity &&
registry.size() == 1 &&
registry.containsKey(0);
}
/**
* Returns a fresh copy of all data from level 0 buffers.
*
* @return combined data stored in all level 0 buffers
*/
public double[] levelZeroData() {
List<Buffer> levelZeroBuffers = registry.get(0);
// First determine the combined size of the data
int length = 0;
for (Buffer buffer : levelZeroBuffers) {
if (!buffer.hasCapacity()) { // full buffer
length += buffer.size;
} else {
length += buffer.next; // filled amount
}
}
// Copy the data
int pos = 0;
int currLen;
final double[] out = new double[length];
for (Buffer buffer : levelZeroBuffers) {
if (!buffer.hasCapacity()) {
currLen = buffer.size;
} else {
currLen = buffer.next;
}
System.arraycopy(buffer.data, 0, out, pos, currLen);
pos += currLen;
}
return out;
}
/**
* Finds the lowest level l where there exist at least two buffers,
* merges them to create a new buffer with level l+1 and returns
* a free buffer with level l+1.
*
* @return free buffer that can accept data
*/
public Buffer merge() {
int l = 0;
List<Buffer> mergeCandidates = null;
// Find the lowest level containing at least two buffers
while (mergeCandidates == null && l <= maxLevel) {
final List<Buffer> bufferList = registry.get(l);
if (bufferList != null && bufferList.size() > 1) {
mergeCandidates = bufferList;
} else {
l++;
}
}
if (mergeCandidates == null) {
// Should never happen
throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
}
Buffer buffer1 = mergeCandidates.get(0);
Buffer buffer2 = mergeCandidates.get(1);
// Remove buffers to be merged
mergeCandidates.remove(0);
mergeCandidates.remove(0);
// If these are the last level-l buffers, remove the empty list
if (registry.get(l).isEmpty()) {
registry.remove(l);
}
// Merge the buffers
buffer1.mergeWith(buffer2);
// Now both buffers have level l+1; buffer1 is full and buffer2 is free.
// Register both buffers
register(buffer1);
register(buffer2);
// Return the free one
return buffer2;
}
/**
* Clears the buffer map.
*/
public void clear() {
for (List<Buffer> bufferList : registry.values()) {
bufferList.clear();
}
registry.clear();
count = 0;
}
/**
* Registers a buffer.
*
* @param buffer Buffer to be registered.
*/
public void register(Buffer buffer) {
final int level = buffer.getLevel();
List<Buffer> list = registry.get(level);
if (list == null) {
list = new ArrayList<>();
registry.put(level, list);
if (level > maxLevel) {
maxLevel = level;
}
}
list.add(buffer);
}
/**
* De-register a buffer, without clearing it.
*
* @param buffer Buffer to be de-registered
* @throws IllegalStateException if the buffer is not registered
*/
public void deRegister(Buffer buffer) {
final Iterator<Buffer> iterator = registry.get(buffer.getLevel()).iterator();
while (iterator.hasNext()) {
if (iterator.next().getId().equals(buffer.getId())) {
iterator.remove();
return;
}
}
throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
}
/**
* Returns an iterator over all of the buffers. Iteration goes by level, with
* level 0 first. Assumes there are no empty buffer lists.
*/
@Override
public Iterator<Buffer> iterator() {
return new Iterator<Buffer>() {
/** Outer loop iterator, from level to level. */
private final Iterator<Integer> levelIterator = registry.keySet().iterator();
/** List of buffers at current level. */
private List<Buffer> currentList = registry.get(levelIterator.next());
/** Inner loop iterator, from buffer to buffer. */
private Iterator<Buffer> bufferIterator =
currentList == null ? null : currentList.iterator();
@Override
public boolean hasNext() {
if (bufferIterator == null) {
return false;
}
if (bufferIterator.hasNext()) {
return true;
}
// The current level iterator has just finished, try to bump level
if (levelIterator.hasNext()) {
List<Buffer> currentList = registry.get(levelIterator.next());
bufferIterator = currentList.iterator();
return true;
} else {
// Nothing left, signal this by nulling bufferIterator
bufferIterator = null;
return false;
}
}
@Override
public Buffer next() {
if (hasNext()) {
return bufferIterator.next();
}
throw new NoSuchElementException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
/**
* Absorbs the data in other into this, merging buffers as necessary to trim
* the aggregate down to capacity. This method is only used when aggregating
* RandomPercentile instances.
*
* @param other other BufferMap to merge in
*/
public void absorb(BufferMap other) {
// Add all of other's buffers to the map - possibly exceeding cap
boolean full = true;
Iterator<Buffer> otherIterator = other.iterator();
while (otherIterator.hasNext()) {
Buffer buffer = otherIterator.next();
if (buffer.hasCapacity()) {
full = false;
}
register(buffer);
count++;
}
// Now eliminate the excess by merging
while (count > capacity || (count == capacity && full)) {
mergeUp();
count--;
}
}
/**
* Find two buffers, first and second, of minimal level. Then merge
* first into second and discard first.
* <p>
* If the buffers have different levels, make second the higher level
* buffer and make probability of selection in the merge proportional
* to level weight ratio.
* <p>
* This method is only used when aggregating RandomPercentile instances.
*/
public void mergeUp() {
// Find two minimum-level buffers to merge
// Loop depends on two invariants:
// 0) iterator goes in level order
// 1) there are no empty lists in the registry
Iterator<Buffer> bufferIterator = iterator();
Buffer first = null;
Buffer second = null;
while ((first == null || second == null) && bufferIterator.hasNext()) {
Buffer buffer = bufferIterator.next();
if (!buffer.hasCapacity()) { // Skip not full buffers
if (first == null) {
first = buffer;
} else {
second = buffer;
}
}
}
if (first == null || second == null || first.level > second.level) {
throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
}
// Merge first into second and deregister first.
// Assumes that first has level <= second (checked above).
if (first.getLevel() == second.getLevel()) {
deRegister(first);
deRegister(second);
second.mergeWith(first);
register(second);
} else {
deRegister(first);
first.mergeInto(second);
}
}
}
/**
* Computes the given percentile by combining the data from the collection
* of aggregates. The result describes the combined sample of all data added
* to any of the aggregates.
*
* @param percentile desired percentile (scaled 0-100)
* @param aggregates RandomPercentile instances to combine data from
* @return estimate of the given percentile using combined data from the aggregates
* @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
*/
public double reduce(double percentile, Collection<RandomPercentile> aggregates) {
if (percentile > 100 || percentile < 0) {
throw new MathIllegalArgumentException(LocalizedCoreFormats.OUT_OF_RANGE,
percentile, 0, 100);
}
// First see if we can copy all data and just compute exactly.
// The following could be improved to verify that all have only level 0 buffers
// and the sum of the data sizes is less than 1/2 total capacity. Here we
// just check that each of the aggregates is less than half full.
Iterator<RandomPercentile> iterator = aggregates.iterator();
boolean small = true;
while (small && iterator.hasNext()) {
small = iterator.next().bufferMap.halfEmpty();
}
if (small) {
iterator = aggregates.iterator();
double[] combined = {};
while (iterator.hasNext()) {
combined = MathArrays.concatenate(combined, iterator.next().bufferMap.levelZeroData());
}
final Percentile exactP = new Percentile(percentile);
return exactP.evaluate(combined);
}
// Below largely duplicates code in getResult(percentile).
// Common binary search code with function parameter should be factored out.
// Get global max and min to bound binary search and total N
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double combinedN = 0;
iterator = aggregates.iterator();
while (iterator.hasNext()) {
final RandomPercentile curr = iterator.next();
final double curMin = curr.getResult(0);
final double curMax = curr.getResult(100);
if (curMin < min) {
min = curMin;
}
if (curMax > max) {
max = curMax;
}
combinedN += curr.getN();
}
final double q = percentile / 100;
// Handle degenerate cases
if (Double.compare(q, 0d) == 0) {
return min;
}
if (Double.compare(q, 1) == 0) {
return max;
}
// Compute target rank
final double targetRank = q * combinedN;
// Perform binary search using aggregated rank computation
// Start with initial guess min + quantile * (max - min).
double estimate = min + q * (max - min);
double estimateRank = getAggregateRank(estimate, aggregates);
double lower;
double upper;
if (estimateRank > targetRank) {
upper = estimate;
lower = min;
} else if (estimateRank < targetRank) {
lower = estimate;
upper = max;
} else {
return estimate;
}
final double eps = epsilon / 2;
double intervalWidth = FastMath.abs(upper - lower);
while (FastMath.abs(estimateRank / combinedN - q) > eps && intervalWidth > eps / combinedN) {
if (estimateRank == targetRank) {
return estimate;
}
if (estimateRank > targetRank) {
upper = estimate;
} else {
lower = estimate;
}
intervalWidth = FastMath.abs(upper - lower);
estimate = lower + intervalWidth / 2;
estimateRank = getAggregateRank(estimate, aggregates);
}
return estimate;
}
/**
* Computes the estimated rank of value in the combined dataset of the aggregates.
* Sums the values from {@link #getRank(double)}.
*
* @param value value whose rank is sought
* @param aggregates collection to aggregate rank over
* @return estimated number of elements in the combined dataset that are less than value
*/
public double getAggregateRank(double value, Collection<RandomPercentile> aggregates) {
double result = 0;
final Iterator<RandomPercentile> iterator = aggregates.iterator();
while (iterator.hasNext()) {
result += iterator.next().getRank(value);
}
return result;
}
/**
* Returns the estimated quantile position of value in the combined dataset of the aggregates.
* Specifically, what is returned is an estimate of \(|\{x \in X : x < value\}| / |X|\)
* where \(X\) is the set of values that have been consumed from all of the datastreams
* feeding the aggregates.
*
* @param value value whose quantile rank is sought.
* @param aggregates collection of RandomPercentile instances being combined
* @return estimated proportion of combined sample values that are strictly less than {@code value}
*/
public double getAggregateQuantileRank(double value, Collection<RandomPercentile> aggregates) {
return getAggregateRank(value, aggregates) / getAggregateN(aggregates);
}
/**
* Returns the total number of values that have been consumed by the aggregates.
*
* @param aggregates collection of RandomPercentile instances whose combined sample size is sought
* @return total number of values that have been consumed by the aggregates
*/
public double getAggregateN(Collection<RandomPercentile> aggregates) {
double result = 0;
final Iterator<RandomPercentile> iterator = aggregates.iterator();
while (iterator.hasNext()) {
result += iterator.next().getN();
}
return result;
}
/**
* Aggregates the provided instance into this instance.
* <p>
* Other must have the same buffer size as this. If the combined data size
* exceeds the maximum storage configured for this instance, buffers are
* merged to create capacity. If all that is needed is computation of
* aggregate results, {@link #reduce(double, Collection)} is faster,
* may be more accurate and does not require the buffer sizes to be the same.
*
* @param other the instance to aggregate into this instance
* @throws NullArgumentException if the input is null
* @throws IllegalArgumentException if other has different buffer size than this
*/
@Override
public void aggregate(RandomPercentile other)
throws NullArgumentException {
if (other == null) {
throw new NullArgumentException();
}
if (other.s != s) {
throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
}
bufferMap.absorb(other.bufferMap);
n += other.n;
}
/**
* Returns the maximum number of {@code double} values that a {@code RandomPercentile}
* instance created with the given {@code epsilon} value will retain in memory.
* <p>
* If the number of values that have been consumed from the stream is less than 1/2
* of this value, reported statistics are exact.
*
* @param epsilon bound on the relative quantile error (see class javadoc)
* @return upper bound on the total number of primitive double values retained in memory
* @throws MathIllegalArgumentException if epsilon is not in the interval (0,1)
*/
public static long maxValuesRetained(double epsilon) {
if (epsilon >= 1) {
throw new MathIllegalArgumentException(
LocalizedCoreFormats.NUMBER_TOO_LARGE_BOUND_EXCLUDED, epsilon, 1);
}
if (epsilon <= 0) {
throw new MathIllegalArgumentException(
LocalizedCoreFormats.NUMBER_TOO_SMALL_BOUND_EXCLUDED, epsilon, 0);
}
final long h = (long) FastMath.ceil(log2(1/epsilon));
final long s = (long) FastMath.ceil(FastMath.sqrt(log2(1/epsilon)) / epsilon);
return (h+1) * s;
}
}