1 /*
2 * Licensed to the Hipparchus project under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The Hipparchus project licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.hipparchus.stat.descriptive.rank;
18
19 import java.io.Serializable;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.NoSuchElementException;
28 import java.util.UUID;
29
30 import org.hipparchus.exception.LocalizedCoreFormats;
31 import org.hipparchus.exception.MathIllegalArgumentException;
32 import org.hipparchus.exception.MathIllegalStateException;
33 import org.hipparchus.exception.NullArgumentException;
34 import org.hipparchus.random.RandomGenerator;
35 import org.hipparchus.random.Well19937c;
36 import org.hipparchus.stat.StatUtils;
37 import org.hipparchus.stat.descriptive.AbstractStorelessUnivariateStatistic;
38 import org.hipparchus.stat.descriptive.AggregatableStatistic;
39 import org.hipparchus.stat.descriptive.StorelessUnivariateStatistic;
40 import org.hipparchus.util.FastMath;
41 import org.hipparchus.util.MathArrays;
42
43 /**
44 * A {@link StorelessUnivariateStatistic} estimating percentiles using the
45 * <a href=http://dimacs.rutgers.edu/~graham/pubs/papers/nquantiles.pdf>RANDOM</a>
46 * Algorithm.
47 * <p>
48 * Storage requirements for the RANDOM algorithm depend on the desired
49 * accuracy of quantile estimates. Quantile estimate accuracy is defined as follows.
50 * <p>
51 * Let \(X\) be the set of all data values consumed from the stream and let \(q\)
52 * be a quantile (measured between 0 and 1) to be estimated. If <ul>
53 * <li>\(\epsilon\) is the configured accuracy</li>
54 * <li> \(\hat{q}\) is a RandomPercentile estimate for \(q\) (what is returned
55 * by {@link #getResult()} or {@link #getResult(double)}) with \(100q\) as
56 * actual parameter)</li>
57 * <li> \(rank(\hat{q}) = |\{x \in X : x < \hat{q}\}|\) is the actual rank of
58 * \(\hat{q}\) in the full data stream</li>
59 * <li>\(n = |X|\) is the number of observations</li></ul>
60 * then we can expect \((q - \epsilon)n < rank(\hat{q}) < (q + \epsilon)n\).
61 * <p>
62 * The algorithm maintains \(\left\lceil {log_{2}(1/\epsilon)}\right\rceil + 1\) buffers
63 * of size \(\left\lceil {1/\epsilon \sqrt{log_2(1/\epsilon)}}\right\rceil\). When
64 * {@code epsilon} is set to the default value of \(10^{-4}\), this makes 15 buffers
65 * of size 36,453.
66 * <p>
67 * The algorithm uses the buffers to maintain samples of data from the stream. Until
68 * all buffers are full, the entire sample is stored in the buffers.
69 * If one of the {@code getResult} methods is called when all data are available in memory
70 * and there is room to make a copy of the data (meaning the combined set of buffers is
71 * less than half full), the {@code getResult} method delegates to a {@link Percentile}
72 * instance to compute and return the exact value for the desired quantile.
73 * For default {@code epsilon}, this means exact values will be returned whenever fewer than
74 * \(\left\lceil {15 \times 36453 / 2} \right\rceil = 273,398\) values have been consumed
75 * from the data stream.
76 * <p>
77 * When buffers become full, the algorithm merges buffers so that they effectively represent
78 * a larger set of values than they can hold. Subsequently, data values are sampled from the
79 * stream to fill buffers freed by merge operations. Both the merging and the sampling
80 * require random selection, which is done using a {@code RandomGenerator}. To get
81 * repeatable results for large data streams, users should provide {@code RandomGenerator}
82 * instances with fixed seeds. {@code RandomPercentile} itself does not reseed or otherwise
83 * initialize the {@code RandomGenerator} provided to it. By default, it uses a
84 * {@link Well19937c} generator with the default seed.
85 * <p>
86 * Note: This implementation is not thread-safe.
87 */
88 public class RandomPercentile
89 extends AbstractStorelessUnivariateStatistic implements StorelessUnivariateStatistic,
90 AggregatableStatistic<RandomPercentile>, Serializable {
91
92 /** Default quantile estimation error setting */
93 public static final double DEFAULT_EPSILON = 1e-4;
94 /** Serialization version id */
95 private static final long serialVersionUID = 1L;
96 /** Storage size of each buffer */
97 private final int s;
98 /** Maximum number of buffers minus 1 */
99 private final int h;
100 /** Data structure used to manage buffers */
101 private final BufferMap bufferMap;
102 /** Bound on the quantile estimation error */
103 private final double epsilon;
104 /** Source of random data */
105 private final RandomGenerator randomGenerator;
106 /** Number of elements consumed from the input data stream */
107 private long n;
108 /** Buffer currently being filled */
109 private Buffer currentBuffer;
110
111 /**
112 * Constructs a {@code RandomPercentile} with quantile estimation error
113 * {@code epsilon} using {@code randomGenerator} as its source of random data.
114 *
115 * @param epsilon bound on quantile estimation error (see class javadoc)
116 * @param randomGenerator PRNG used in sampling and merge operations
117 * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
118 */
119 public RandomPercentile(double epsilon, RandomGenerator randomGenerator) {
120 if (epsilon <= 0) {
121 throw new MathIllegalArgumentException(LocalizedCoreFormats.NUMBER_TOO_SMALL,
122 epsilon, 0);
123 }
124 this.h = (int) FastMath.ceil(log2(1/epsilon));
125 this.s = (int) FastMath.ceil(FastMath.sqrt(log2(1/epsilon)) / epsilon);
126 this.randomGenerator = randomGenerator;
127 bufferMap = new BufferMap(h + 1, s, randomGenerator);
128 currentBuffer = bufferMap.create(0);
129 this.epsilon = epsilon;
130 }
131
132 /**
133 * Constructs a {@code RandomPercentile} with default estimation error
134 * using {@code randomGenerator} as its source of random data.
135 *
136 * @param randomGenerator PRNG used in sampling and merge operations
137 * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
138 */
139 public RandomPercentile(RandomGenerator randomGenerator) {
140 this(DEFAULT_EPSILON, randomGenerator);
141 }
142
143 /**
144 * Constructs a {@code RandomPercentile} with quantile estimation error
145 * {@code epsilon} using the default PRNG as source of random data.
146 *
147 * @param epsilon bound on quantile estimation error (see class javadoc)
148 * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
149 */
150 public RandomPercentile(double epsilon) {
151 this(epsilon, new Well19937c());
152 }
153
154 /**
155 * Constructs a {@code RandomPercentile} with quantile estimation error
156 * set to the default ({@link #DEFAULT_EPSILON}), using the default PRNG
157 * as source of random data.
158 */
159 public RandomPercentile() {
160 this(DEFAULT_EPSILON, new Well19937c());
161 }
162
163 /**
164 * Copy constructor, creates a new {@code RandomPercentile} identical
165 * to the {@code original}. Note: the RandomGenerator used by the new
166 * instance is referenced, not copied - i.e., the new instance shares
167 * a generator with the original.
168 *
169 * @param original the {@code PSquarePercentile} instance to copy
170 */
171 public RandomPercentile(RandomPercentile original) {
172 super();
173 this.h = original.h;
174 this.n = original.n;
175 this.s = original.s;
176 this.epsilon = original.epsilon;
177 this.bufferMap = new BufferMap(original.bufferMap);
178 this.randomGenerator = original.randomGenerator;
179 Iterator<Buffer> iterator = bufferMap.iterator();
180 Buffer current = null;
181 Buffer curr = null;
182 // See if there is a partially filled buffer - that will be currentBuffer
183 while (current == null && iterator.hasNext()) {
184 curr = iterator.next();
185 if (curr.hasCapacity()) {
186 current = curr;
187 }
188 }
189 // If there is no partially filled buffer, just assign the last one.
190 // Next increment() will find no capacity and create a new one or trigger
191 // a merge.
192 this.currentBuffer = current == null ? curr : current;
193 }
194
195 @Override
196 public long getN() {
197 return n;
198 }
199
200 /**
201 * Returns an estimate of the given percentile, computed using the designated
202 * array segment as input data.
203 *
204 * @param values source of input data
205 * @param begin position of the first element of the values array to include
206 * @param length number of array elements to include
207 * @param percentile desired percentile (scaled 0 - 100)
208 *
209 * @return estimated percentile
210 * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
211 */
212 public double evaluate(final double percentile, final double[] values, final int begin, final int length)
213 throws MathIllegalArgumentException {
214 if (MathArrays.verifyValues(values, begin, length)) {
215 RandomPercentile randomPercentile = new RandomPercentile(this.epsilon,
216 this.randomGenerator);
217 randomPercentile.incrementAll(values, begin, length);
218 return randomPercentile.getResult(percentile);
219 }
220 return Double.NaN;
221 }
222
223 /**
224 * Returns an estimate of the median, computed using the designated
225 * array segment as input data.
226 *
227 * @param values source of input data
228 * @param begin position of the first element of the values array to include
229 * @param length number of array elements to include
230 *
231 * @return estimated percentile
232 * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
233 */
234 @Override
235 public double evaluate(final double[] values, final int begin, final int length) {
236 return evaluate(50d, values, begin, length);
237 }
238
239 /**
240 * Returns an estimate of percentile over the given array.
241 *
242 * @param values source of input data
243 * @param percentile desired percentile (scaled 0 - 100)
244 *
245 * @return estimated percentile
246 * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
247 */
248 public double evaluate(final double percentile, final double[] values) {
249 return evaluate(percentile, values, 0, values.length);
250 }
251
252 @Override
253 public RandomPercentile copy() {
254 return new RandomPercentile(this);
255 }
256
257 @Override
258 public void clear() {
259 n = 0;
260 bufferMap.clear();
261 currentBuffer = bufferMap.create(0);
262 }
263
264 /**
265 * Returns an estimate of the median.
266 */
267 @Override
268 public double getResult() {
269 return getResult(50d);
270
271 }
272
273 /**
274 * Returns an estimate of the given percentile.
275 *
276 * @param percentile desired percentile (scaled 0 - 100)
277 * @return estimated percentile
278 * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
279 */
280 public double getResult(double percentile) {
281 if (percentile > 100 || percentile < 0) {
282 throw new MathIllegalArgumentException(LocalizedCoreFormats.OUT_OF_RANGE,
283 percentile, 0, 100);
284 }
285 // Convert to internal quantile scale
286 final double q = percentile / 100;
287 // First get global min and max to bound search.
288 double min = Double.POSITIVE_INFINITY;
289 double max = Double.NEGATIVE_INFINITY;
290 double bMin;
291 double bMax;
292 for (Buffer buffer : bufferMap) {
293 bMin = buffer.min();
294 if (bMin < min) {
295 min = bMin;
296 }
297 bMax = buffer.max();
298 if (bMax > max) {
299 max = bMax;
300 }
301 }
302
303 // Handle degenerate cases
304 if (Double.compare(q, 0d) == 0 || n == 1) {
305 return min;
306 }
307 if (Double.compare(q, 1) == 0) {
308 return max;
309 }
310 if (n == 0) {
311 return Double.NaN;
312 }
313
314 // See if we have all data in memory and enough free memory to copy.
315 // If so, use Percentile to perform exact computation.
316 if (bufferMap.halfEmpty()) {
317 return new Percentile(percentile).evaluate(bufferMap.levelZeroData());
318 }
319
320 // Compute target rank
321 final double targetRank = q * n;
322
323 // Start with initial guess min + quantile * (max - min).
324 double estimate = min + q * (max - min);
325 double estimateRank = getRank(estimate);
326 double lower;
327 double upper;
328 if (estimateRank > targetRank) {
329 upper = estimate;
330 lower = min;
331 } else if (estimateRank < targetRank) {
332 lower = estimate;
333 upper = max;
334 } else {
335 return estimate;
336 }
337 final double eps = epsilon / 2;
338 final double rankTolerance = eps * n;
339 final double minWidth = eps / n;
340 double intervalWidth = FastMath.abs(upper - lower);
341 while (FastMath.abs(estimateRank - targetRank) > rankTolerance && intervalWidth > minWidth) {
342 if (estimateRank > targetRank) {
343 upper = estimate;
344 } else {
345 lower = estimate;
346 }
347 intervalWidth = upper - lower;
348 estimate = lower + intervalWidth / 2;
349 estimateRank = getRank(estimate);
350 }
351 return estimate;
352 }
353
354 /**
355 * Gets the estimated rank of {@code value}, i.e. \(|\{x \in X : x < value\}|\)
356 * where \(X\) is the set of values that have been consumed from the stream.
357 *
358 * @param value value whose overall rank is sought
359 * @return estimated number of sample values that are strictly less than {@code value}
360 */
361 public double getRank(double value) {
362 double rankSum = 0;
363 for (Buffer buffer : bufferMap) {
364 rankSum += buffer.rankOf(value) * FastMath.pow(2, buffer.level);
365 }
366 return rankSum;
367 }
368
369 /**
370 * Returns the estimated quantile position of value in the dataset.
371 * Specifically, what is returned is an estimate of \(|\{x \in X : x < value\}| / |X|\)
372 * where \(X\) is the set of values that have been consumed from the stream.
373 *
374 * @param value value whose quantile rank is sought.
375 * @return estimated proportion of sample values that are strictly less than {@code value}
376 */
377 public double getQuantileRank(double value) {
378 return getRank(value) / getN();
379 }
380
381 @Override
382 public void increment(double d) {
383 n++;
384 if (!currentBuffer.hasCapacity()) { // Need to get a new buffer to fill
385 // First see if we have not yet created all the buffers
386 if (bufferMap.canCreate()) {
387 final int level = (int) Math.ceil(Math.max(0, log2(n/(s * FastMath.pow(2, h - 1)))));
388 currentBuffer = bufferMap.create(level);
389 } else { // All buffers have been created - need to merge to free one
390 currentBuffer = bufferMap.merge();
391 }
392 }
393 currentBuffer.consume(d);
394 }
395
396 /**
397 * Maintains a buffer of values sampled from the input data stream.
398 * <p>
399 * The {@link #level} of a buffer determines its sampling frequency.
400 * The {@link #consume(double)} method retains 1 out of every 2^level values
401 * read in from the stream.
402 * <p>
403 * The {@link #size} of the buffer is the number of values that it can store
404 * The buffer is considered full when it has consumed 2^level * size values.
405 * <p>
406 * The {@link #blockSize} of a buffer is 2^level.
407 * The consume method starts each block by generating a random integer in
408 * [0, blockSize - 1]. It then skips over all but the element with that offset
409 * in the block, retaining only the selected value. So after 2^level * size
410 * elements have been consumed, it will have retained size elements - one
411 * from each 2^level block.
412 * <p>
413 * The {@link #mergeWith(Buffer)} method merges this buffer with another one,
414 * The merge operation merges the data from the other buffer into this and clears
415 * the other buffer (so it can consume data). Both buffers have their level
416 * incremented by the merge. This operation is only used on full buffers.
417 */
418 private static class Buffer implements Serializable {
419 /** Serialization version id */
420 private static final long serialVersionUID = 1L;
421 /** Number of values actually stored in the buffer */
422 private final int size;
423 /** Data sampled from the stream */
424 private final double[] data;
425 /** PRNG used for merges and stream sampling */
426 private final RandomGenerator randomGenerator;
427 /** Level of the buffer */
428 private int level;
429 /** Block size = 2^level */
430 private long blockSize;
431 /** Next location in backing array for stored (taken) value */
432 private int next;
433 /** Number of values consumed in current 2^level block of values from the stream */
434 private long consumed;
435 /** Index of next value to take in current 2^level block */
436 private long nextToTake;
437 /** ID */
438 private final UUID id;
439
440 /**
441 * Creates a new buffer capable of retaining size values with the given level.
442 *
443 * @param size number of values the buffer can retain
444 * @param level the base 2 log of the sampling frequency
445 * (one out of every 2^level values is retained)
446 * @param randomGenerator PRNG used for sampling and merge operations
447 */
448 Buffer(int size, int level, RandomGenerator randomGenerator) {
449 this.size = size;
450 data = new double[size];
451 this.level = level;
452 this.randomGenerator = randomGenerator;
453 this.id = UUID.randomUUID();
454 computeBlockSize();
455 }
456
457 /**
458 * Sets blockSize and nextToTake based on level.
459 */
460 private void computeBlockSize() {
461 if (level == 0) {
462 blockSize = 1;
463 } else {
464 long product = 1;
465 for (int i = 0; i < level; i++) {
466 product *= 2;
467 }
468 blockSize = product;
469 }
470 if (blockSize > 1) {
471 nextToTake = randomGenerator.nextLong(blockSize);
472 }
473 }
474
475 /**
476 * Consumes a value from the input stream.
477 * <p>
478 * For each 2^level values consumed, one is added to the buffer.
479 * The buffer is not considered full until 2^level * size values
480 * have been consumed.
481 * <p>
482 * Sorts the data array if the consumption renders the buffer full.
483 * <p>
484 * There is no capacity check in this method. Clients are expected
485 * to use {@link #hasCapacity()} before invoking this method. If
486 * it is invoked on a full buffer, an ArrayIndexOutOfBounds exception
487 * will result.
488 *
489 * @param value value to consume from the stream
490 */
491 public void consume(double value) {
492 if (consumed == nextToTake) {
493 data[next] = value;
494 next++;
495 }
496 consumed++;
497 if (consumed == blockSize) {
498 if (next == size) { // Buffer is full
499 Arrays.sort(data);
500 } else { // Reset in-block counter and nextToTake
501 consumed = 0;
502 if (blockSize > 1) {
503 nextToTake = randomGenerator.nextLong(blockSize);
504 }
505 }
506 }
507 }
508
509 /**
510 * Merges this with other.
511 * <p>
512 * After the merge, this will be the merged buffer and other will be free.
513 * Both will have level+1.
514 * Post-merge, other can be used to accept new data.
515 * <p>
516 * The contents of the merged buffer (this after the merge) are determined
517 * by randomly choosing one of the two retained elements in each of the
518 * [0...size - 1] positions in the two buffers.
519 * <p>
520 * This and other must have the same level and both must be full.
521 *
522 * @param other initially full other buffer at the same level as this.
523 * @throws MathIllegalArgumentException if either buffer is not full or they
524 * have different levels
525 */
526 public void mergeWith(Buffer other) {
527 // Make sure both this and other are full and have the same level
528 if (this.hasCapacity() || other.hasCapacity() || other.level != this.level) {
529 throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
530 }
531 // Randomly select one of the two entries for each slot
532 for (int i = 0; i < size; i++) {
533 if (randomGenerator.nextBoolean()) {
534 data[i] = other.data[i];
535 }
536 }
537 // Re-sort data
538 Arrays.sort(data);
539 // Bump level of both buffers
540 other.setLevel(level + 1);
541 this.setLevel(level + 1);
542 // Clear the free one (and compute new blocksize)
543 other.clear();
544 }
545
546 /**
547 * Merge this into a higher-level buffer.
548 * <p>
549 * Does not alter this; but after the merge, higher may have some of its
550 * data replaced by data from this. Levels are not changed for either buffer.
551 * <p>
552 * Probability of selection into the newly constituted higher buffer is weighted
553 * according to level. So for example, if this has level 0 and higher has level
554 * 2, the ith element of higher is 4 times more likely than the corresponding
555 * element of this to retain its spot.
556 * <p>
557 * This method is only used when aggregating RandomPercentile instances.
558 * <p>
559 * Preconditions:
560 * <ol><li> this.level < higher.level </li>
561 * <li> this.size = higher.size </li>
562 * <li> Both buffers are full </li>
563 * </ol>
564 *
565 * @param higher higher-level buffer to merge this into
566 * @throws MathIllegalArgumentException if the buffers have different sizes,
567 * either buffer is not full or this has level greater than or equal to higher
568 */
569 public void mergeInto(Buffer higher) {
570 // Check preconditions
571 if (this.size != higher.size || this.hasCapacity() || higher.hasCapacity() ||
572 this.level >= higher.level) {
573 throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
574 }
575 final int levelDifference = higher.level - this.level;
576 int m = 1;
577 for (int i = 0; i < levelDifference; i++) {
578 m *= 2;
579 }
580 // Randomly select one of the two entries for each slot in higher, giving
581 // m-times higher weight to the entries of higher.
582 for (int i = 0; i < size; i++) {
583 // data[i] <-> {0}, higher.data[i] <-> {1, ..., m}
584 if (randomGenerator.nextInt(m + 1) == 0) {
585 higher.data[i] = data[i];
586 }
587 }
588 // Resort higher's data
589 Arrays.sort(higher.data);
590 }
591
592 /**
593 * @return true if the buffer has capacity; false if it is full
594 */
595 public boolean hasCapacity() {
596 // Buffer has capacity if it has not yet set all of its data
597 // values or if it has but still has not finished its last block
598 return next < size || consumed < blockSize;
599 }
600
601 /**
602 * Sets the level of the buffer.
603 *
604 * @param level new level value
605 */
606 public void setLevel(int level) {
607 this.level = level;
608 }
609
610 /**
611 * Clears data, recomputes blockSize and resets consumed and nextToTake.
612 */
613 public void clear() {
614 consumed = 0;
615 next = 0;
616 computeBlockSize();
617 }
618
619 /**
620 * Returns a copy of the data that has been added to the buffer
621 *
622 * @return possibly unsorted copy of the portion of the buffer that has been filled
623 */
624 public double[] getData() {
625 final double[] out = new double[next];
626 System.arraycopy(data, 0, out, 0, next);
627 return out;
628 }
629
630 /**
631 * Returns the ordinal rank of value among the sampled values in this buffer.
632 *
633 * @param value value whose rank is sought
634 * @return |{v in data : v < value}|
635 */
636 public int rankOf(double value) {
637 int ret = 0;
638 if (!hasCapacity()) { // Full sorted buffer, can do binary search
639 ret = Arrays.binarySearch(data, value);
640 if (ret < 0) {
641 return -ret - 1;
642 } else {
643 return ret;
644 }
645 } else { // have to count - not sorted yet and can't sort yet
646 for (int i = 0; i < next; i++) {
647 if (data[i] < value) {
648 ret++;
649 }
650 }
651 return ret;
652 }
653 }
654
655 /**
656 * @return the smallest value held in this buffer
657 */
658 public double min() {
659 if (!hasCapacity()) {
660 return data[0];
661 } else {
662 return StatUtils.min(getData());
663 }
664 }
665
666 /**
667 * @return the largest value held in this buffer
668 */
669 public double max() {
670 if (!hasCapacity()) {
671 return data[data.length - 1];
672 } else {
673 return StatUtils.max(getData());
674 }
675 }
676
677 /**
678 * @return the level of this buffer
679 */
680 public int getLevel() {
681 return level;
682 }
683
684 /**
685 * @return the id
686 */
687 public UUID getId() {
688 return id;
689 }
690 }
691
692 /**
693 * Computes base 2 log of the argument.
694 *
695 * @param x input value
696 * @return the value y such that 2^y = x
697 */
698 private static double log2(double x) {
699 return Math.log(x) / Math.log(2);
700 }
701
702 /**
703 * A map structure to hold the buffers.
704 * Keys are levels and values are lists of buffers at the given level.
705 * Overall capacity is limited by the total number of buffers.
706 */
707 private static class BufferMap implements Iterable<Buffer>, Serializable {
708 /** Serialization version ID */
709 private static final long serialVersionUID = 1L;
710 /** Total number of buffers that can be created - cap for count */
711 private final int capacity;
712 /** PRNG used in merges */
713 private final RandomGenerator randomGenerator;
714 /** Total count of all buffers */
715 private int count;
716 /** Uniform buffer size */
717 private final int bufferSize;
718 /** Backing store for the buffer map. Keys are levels, values are lists of registered buffers. */
719 private final Map<Integer,List<Buffer>> registry;
720 /** Maximum buffer level */
721 private int maxLevel;
722
723 /**
724 * Creates a BufferMap that can manage up to capacity buffers.
725 * Buffers created by the pool with have size = buffersize.
726 *
727 * @param capacity cap on the number of buffers
728 * @param bufferSize size of each buffer
729 * @param randomGenerator RandomGenerator to use in merges
730 */
731 BufferMap(int capacity, int bufferSize, RandomGenerator randomGenerator) {
732 this.bufferSize = bufferSize;
733 this.capacity = capacity;
734 this.randomGenerator = randomGenerator;
735 this.registry = new HashMap<>();
736 }
737
738 /**
739 * Copy constructor.
740 *
741 * @param original BufferMap to copy
742 */
743 BufferMap(BufferMap original) {
744 super();
745 this.bufferSize = original.bufferSize;
746 this.capacity = original.capacity;
747 this.count = 0;
748 this.randomGenerator = original.randomGenerator;
749 this.registry = new HashMap<>();
750 for (Buffer current : original) {
751 // Create and register a new buffer at the same level
752 final Buffer newCopy = create(current.getLevel());
753 // Consume the data
754 final double[] data = current.getData();
755 for (double value : data) {
756 newCopy.consume(value);
757 }
758 }
759 }
760
761 /**
762 * Tries to create a buffer with the given level.
763 * <p>
764 * If there is capacity to create a new buffer (i.e., fewer than
765 * count have been created), a new buffer is created with the given
766 * level, registered and returned. If capacity has been reached,
767 * null is returned.
768 *
769 * @param level level of the new buffer
770 * @return an empty buffer or null if a buffer can't be provided
771 */
772 public Buffer create(int level) {
773 if (!canCreate()) {
774 return null;
775 }
776 count++;
777 Buffer buffer = new Buffer(bufferSize, level, randomGenerator);
778 List<Buffer> bufferList = registry.computeIfAbsent(level, k -> new ArrayList<>());
779 bufferList.add(buffer);
780 if (level > maxLevel) {
781 maxLevel = level;
782 }
783 return buffer;
784 }
785
786 /**
787 * Returns true if there is capacity to create a new buffer.
788 *
789 * @return true if fewer than capacity buffers have been created.
790 */
791 public boolean canCreate() {
792 return count < capacity;
793 }
794
795 /**
796 * Returns true if we have used less than half of the allocated storage.
797 * <p>
798 * Includes a check to make sure all buffers have level 0;
799 * but this should always be the case.
800 * <p>
801 * When this method returns true, we have all consumed data in storage
802 * and enough space to make a copy of the combined dataset.
803 *
804 * @return true if all buffers have level 0 and less than half of the
805 * available storage has been used
806 */
807 public boolean halfEmpty() {
808 return count * 2 < capacity &&
809 registry.size() == 1 &&
810 registry.containsKey(0);
811 }
812
813 /**
814 * Returns a fresh copy of all data from level 0 buffers.
815 *
816 * @return combined data stored in all level 0 buffers
817 */
818 public double[] levelZeroData() {
819 List<Buffer> levelZeroBuffers = registry.get(0);
820 // First determine the combined size of the data
821 int length = 0;
822 for (Buffer buffer : levelZeroBuffers) {
823 if (!buffer.hasCapacity()) { // full buffer
824 length += buffer.size;
825 } else {
826 length += buffer.next; // filled amount
827 }
828 }
829 // Copy the data
830 int pos = 0;
831 int currLen;
832 final double[] out = new double[length];
833 for (Buffer buffer : levelZeroBuffers) {
834 if (!buffer.hasCapacity()) {
835 currLen = buffer.size;
836 } else {
837 currLen = buffer.next;
838 }
839 System.arraycopy(buffer.data, 0, out, pos, currLen);
840 pos += currLen;
841 }
842 return out;
843 }
844
845 /**
846 * Finds the lowest level l where there exist at least two buffers,
847 * merges them to create a new buffer with level l+1 and returns
848 * a free buffer with level l+1.
849 *
850 * @return free buffer that can accept data
851 */
852 public Buffer merge() {
853 int l = 0;
854 List<Buffer> mergeCandidates = null;
855 // Find the lowest level containing at least two buffers
856 while (mergeCandidates == null && l <= maxLevel) {
857 final List<Buffer> bufferList = registry.get(l);
858 if (bufferList != null && bufferList.size() > 1) {
859 mergeCandidates = bufferList;
860 } else {
861 l++;
862 }
863 }
864 if (mergeCandidates == null) {
865 // Should never happen
866 throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
867 }
868 Buffer buffer1 = mergeCandidates.get(0);
869 Buffer buffer2 = mergeCandidates.get(1);
870 // Remove buffers to be merged
871 mergeCandidates.remove(0);
872 mergeCandidates.remove(0);
873 // If these are the last level-l buffers, remove the empty list
874 if (registry.get(l).isEmpty()) {
875 registry.remove(l);
876 }
877 // Merge the buffers
878 buffer1.mergeWith(buffer2);
879 // Now both buffers have level l+1; buffer1 is full and buffer2 is free.
880 // Register both buffers
881 register(buffer1);
882 register(buffer2);
883
884 // Return the free one
885 return buffer2;
886 }
887
888 /**
889 * Clears the buffer map.
890 */
891 public void clear() {
892 for (List<Buffer> bufferList : registry.values()) {
893 bufferList.clear();
894 }
895 registry.clear();
896 count = 0;
897 }
898
899 /**
900 * Registers a buffer.
901 *
902 * @param buffer Buffer to be registered.
903 */
904 public void register(Buffer buffer) {
905 final int level = buffer.getLevel();
906 List<Buffer> list = registry.get(level);
907 if (list == null) {
908 list = new ArrayList<>();
909 registry.put(level, list);
910 if (level > maxLevel) {
911 maxLevel = level;
912 }
913 }
914 list.add(buffer);
915 }
916
917 /**
918 * De-register a buffer, without clearing it.
919 *
920 * @param buffer Buffer to be de-registered
921 * @throws IllegalStateException if the buffer is not registered
922 */
923 public void deRegister(Buffer buffer) {
924 final Iterator<Buffer> iterator = registry.get(buffer.getLevel()).iterator();
925 while (iterator.hasNext()) {
926 if (iterator.next().getId().equals(buffer.getId())) {
927 iterator.remove();
928 return;
929 }
930 }
931 throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
932 }
933
934 /**
935 * Returns an iterator over all of the buffers. Iteration goes by level, with
936 * level 0 first. Assumes there are no empty buffer lists.
937 */
938 @Override
939 public Iterator<Buffer> iterator() {
940 return new Iterator<Buffer>() {
941
942 /** Outer loop iterator, from level to level. */
943 private final Iterator<Integer> levelIterator = registry.keySet().iterator();
944
945 /** List of buffers at current level. */
946 private List<Buffer> currentList = registry.get(levelIterator.next()); // NOPMD - cannot use local variable in anonymous class
947
948 /** Inner loop iterator, from buffer to buffer. */
949 private Iterator<Buffer> bufferIterator =
950 currentList == null ? null : currentList.iterator();
951
952 @Override
953 public boolean hasNext() {
954 if (bufferIterator == null) {
955 return false;
956 }
957 if (bufferIterator.hasNext()) {
958 return true;
959 }
960 // The current level iterator has just finished, try to bump level
961 if (levelIterator.hasNext()) {
962 List<Buffer> currentList = registry.get(levelIterator.next());
963 bufferIterator = currentList.iterator();
964 return true;
965 } else {
966 // Nothing left, signal this by nulling bufferIterator
967 bufferIterator = null;
968 return false;
969 }
970 }
971
972 @Override
973 public Buffer next() {
974 if (hasNext()) {
975 return bufferIterator.next();
976 }
977 throw new NoSuchElementException();
978 }
979
980 @Override
981 public void remove() {
982 throw new UnsupportedOperationException();
983 }
984 };
985 }
986
987 /**
988 * Absorbs the data in other into this, merging buffers as necessary to trim
989 * the aggregate down to capacity. This method is only used when aggregating
990 * RandomPercentile instances.
991 *
992 * @param other other BufferMap to merge in
993 */
994 public void absorb(BufferMap other) {
995 // Add all of other's buffers to the map - possibly exceeding cap
996 boolean full = true;
997 for (Buffer buffer : other) {
998 if (buffer.hasCapacity()) {
999 full = false;
1000 }
1001 register(buffer);
1002 count++;
1003 }
1004 // Now eliminate the excess by merging
1005 while (count > capacity || (count == capacity && full)) {
1006 mergeUp();
1007 count--;
1008 }
1009 }
1010
1011 /**
1012 * Find two buffers, first and second, of minimal level. Then merge
1013 * first into second and discard first.
1014 * <p>
1015 * If the buffers have different levels, make second the higher level
1016 * buffer and make probability of selection in the merge proportional
1017 * to level weight ratio.
1018 * <p>
1019 * This method is only used when aggregating RandomPercentile instances.
1020 */
1021 public void mergeUp() {
1022 // Find two minimum-level buffers to merge
1023 // Loop depends on two invariants:
1024 // 0) iterator goes in level order
1025 // 1) there are no empty lists in the registry
1026 Iterator<Buffer> bufferIterator = iterator();
1027 Buffer first = null;
1028 Buffer second = null;
1029 while ((first == null || second == null) && bufferIterator.hasNext()) {
1030 Buffer buffer = bufferIterator.next();
1031 if (!buffer.hasCapacity()) { // Skip not full buffers
1032 if (first == null) {
1033 first = buffer;
1034 } else {
1035 second = buffer;
1036 }
1037 }
1038 }
1039 if (first == null || second == null || first.level > second.level) {
1040 throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
1041 }
1042 // Merge first into second and deregister first.
1043 // Assumes that first has level <= second (checked above).
1044 if (first.getLevel() == second.getLevel()) {
1045 deRegister(first);
1046 deRegister(second);
1047 second.mergeWith(first);
1048 register(second);
1049 } else {
1050 deRegister(first);
1051 first.mergeInto(second);
1052 }
1053 }
1054 }
1055
1056 /**
1057 * Computes the given percentile by combining the data from the collection
1058 * of aggregates. The result describes the combined sample of all data added
1059 * to any of the aggregates.
1060 *
1061 * @param percentile desired percentile (scaled 0-100)
1062 * @param aggregates RandomPercentile instances to combine data from
1063 * @return estimate of the given percentile using combined data from the aggregates
1064 * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
1065 */
1066 public double reduce(double percentile, Collection<RandomPercentile> aggregates) {
1067 if (percentile > 100 || percentile < 0) {
1068 throw new MathIllegalArgumentException(LocalizedCoreFormats.OUT_OF_RANGE,
1069 percentile, 0, 100);
1070 }
1071
1072 // First see if we can copy all data and just compute exactly.
1073 // The following could be improved to verify that all have only level 0 buffers
1074 // and the sum of the data sizes is less than 1/2 total capacity. Here we
1075 // just check that each of the aggregates is less than half full.
1076 Iterator<RandomPercentile> iterator = aggregates.iterator();
1077 boolean small = true;
1078 while (small && iterator.hasNext()) {
1079 small = iterator.next().bufferMap.halfEmpty();
1080 }
1081 if (small) {
1082 iterator = aggregates.iterator();
1083 double[] combined = {};
1084 while (iterator.hasNext()) {
1085 combined = MathArrays.concatenate(combined, iterator.next().bufferMap.levelZeroData());
1086 }
1087 final Percentile exactP = new Percentile(percentile);
1088 return exactP.evaluate(combined);
1089 }
1090
1091 // Below largely duplicates code in getResult(percentile).
1092 // Common binary search code with function parameter should be factored out.
1093
1094 // Get global max and min to bound binary search and total N
1095 double min = Double.POSITIVE_INFINITY;
1096 double max = Double.NEGATIVE_INFINITY;
1097 double combinedN = 0;
1098 iterator = aggregates.iterator();
1099 while (iterator.hasNext()) {
1100 final RandomPercentile curr = iterator.next();
1101 final double curMin = curr.getResult(0);
1102 final double curMax = curr.getResult(100);
1103 if (curMin < min) {
1104 min = curMin;
1105 }
1106 if (curMax > max) {
1107 max = curMax;
1108 }
1109 combinedN += curr.getN();
1110 }
1111
1112 final double q = percentile / 100;
1113 // Handle degenerate cases
1114 if (Double.compare(q, 0d) == 0) {
1115 return min;
1116 }
1117 if (Double.compare(q, 1) == 0) {
1118 return max;
1119 }
1120
1121 // Compute target rank
1122 final double targetRank = q * combinedN;
1123
1124 // Perform binary search using aggregated rank computation
1125 // Start with initial guess min + quantile * (max - min).
1126 double estimate = min + q * (max - min);
1127 double estimateRank = getAggregateRank(estimate, aggregates);
1128 double lower;
1129 double upper;
1130 if (estimateRank > targetRank) {
1131 upper = estimate;
1132 lower = min;
1133 } else if (estimateRank < targetRank) {
1134 lower = estimate;
1135 upper = max;
1136 } else {
1137 return estimate;
1138 }
1139 final double eps = epsilon / 2;
1140 double intervalWidth = FastMath.abs(upper - lower);
1141 while (FastMath.abs(estimateRank / combinedN - q) > eps && intervalWidth > eps / combinedN) {
1142 if (estimateRank == targetRank) {
1143 return estimate;
1144 }
1145 if (estimateRank > targetRank) {
1146 upper = estimate;
1147 } else {
1148 lower = estimate;
1149 }
1150 intervalWidth = FastMath.abs(upper - lower);
1151 estimate = lower + intervalWidth / 2;
1152 estimateRank = getAggregateRank(estimate, aggregates);
1153 }
1154 return estimate;
1155 }
1156
1157 /**
1158 * Computes the estimated rank of value in the combined dataset of the aggregates.
1159 * Sums the values from {@link #getRank(double)}.
1160 *
1161 * @param value value whose rank is sought
1162 * @param aggregates collection to aggregate rank over
1163 * @return estimated number of elements in the combined dataset that are less than value
1164 */
1165 public double getAggregateRank(double value, Collection<RandomPercentile> aggregates) {
1166 double result = 0;
1167 for (RandomPercentile aggregate : aggregates) {
1168 result += aggregate.getRank(value);
1169 }
1170 return result;
1171 }
1172
1173 /**
1174 * Returns the estimated quantile position of value in the combined dataset of the aggregates.
1175 * Specifically, what is returned is an estimate of \(|\{x \in X : x < value\}| / |X|\)
1176 * where \(X\) is the set of values that have been consumed from all of the datastreams
1177 * feeding the aggregates.
1178 *
1179 * @param value value whose quantile rank is sought.
1180 * @param aggregates collection of RandomPercentile instances being combined
1181 * @return estimated proportion of combined sample values that are strictly less than {@code value}
1182 */
1183 public double getAggregateQuantileRank(double value, Collection<RandomPercentile> aggregates) {
1184 return getAggregateRank(value, aggregates) / getAggregateN(aggregates);
1185 }
1186
1187 /**
1188 * Returns the total number of values that have been consumed by the aggregates.
1189 *
1190 * @param aggregates collection of RandomPercentile instances whose combined sample size is sought
1191 * @return total number of values that have been consumed by the aggregates
1192 */
1193 public double getAggregateN(Collection<RandomPercentile> aggregates) {
1194 double result = 0;
1195 for (RandomPercentile aggregate : aggregates) {
1196 result += aggregate.getN();
1197 }
1198 return result;
1199 }
1200
1201 /**
1202 * Aggregates the provided instance into this instance.
1203 * <p>
1204 * Other must have the same buffer size as this. If the combined data size
1205 * exceeds the maximum storage configured for this instance, buffers are
1206 * merged to create capacity. If all that is needed is computation of
1207 * aggregate results, {@link #reduce(double, Collection)} is faster,
1208 * may be more accurate and does not require the buffer sizes to be the same.
1209 *
1210 * @param other the instance to aggregate into this instance
1211 * @throws NullArgumentException if the input is null
1212 * @throws IllegalArgumentException if other has different buffer size than this
1213 */
1214 @Override
1215 public void aggregate(RandomPercentile other)
1216 throws NullArgumentException {
1217 if (other == null) {
1218 throw new NullArgumentException();
1219 }
1220 if (other.s != s) {
1221 throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
1222 }
1223 bufferMap.absorb(other.bufferMap);
1224 n += other.n;
1225 }
1226
1227 /**
1228 * Returns the maximum number of {@code double} values that a {@code RandomPercentile}
1229 * instance created with the given {@code epsilon} value will retain in memory.
1230 * <p>
1231 * If the number of values that have been consumed from the stream is less than 1/2
1232 * of this value, reported statistics are exact.
1233 *
1234 * @param epsilon bound on the relative quantile error (see class javadoc)
1235 * @return upper bound on the total number of primitive double values retained in memory
1236 * @throws MathIllegalArgumentException if epsilon is not in the interval (0,1)
1237 */
1238 public static long maxValuesRetained(double epsilon) {
1239 if (epsilon >= 1) {
1240 throw new MathIllegalArgumentException(
1241 LocalizedCoreFormats.NUMBER_TOO_LARGE_BOUND_EXCLUDED, epsilon, 1);
1242 }
1243 if (epsilon <= 0) {
1244 throw new MathIllegalArgumentException(
1245 LocalizedCoreFormats.NUMBER_TOO_SMALL_BOUND_EXCLUDED, epsilon, 0);
1246 }
1247 final long h = (long) FastMath.ceil(log2(1/epsilon));
1248 final long s = (long) FastMath.ceil(FastMath.sqrt(log2(1/epsilon)) / epsilon);
1249 return (h+1) * s;
1250 }
1251 }