View Javadoc
1   /*
2    * Licensed to the Hipparchus project under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The Hipparchus project licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.hipparchus.stat.descriptive.rank;
18  
19  import java.io.Serializable;
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NoSuchElementException;
28  import java.util.UUID;
29  
30  import org.hipparchus.exception.LocalizedCoreFormats;
31  import org.hipparchus.exception.MathIllegalArgumentException;
32  import org.hipparchus.exception.MathIllegalStateException;
33  import org.hipparchus.exception.NullArgumentException;
34  import org.hipparchus.random.RandomGenerator;
35  import org.hipparchus.random.Well19937c;
36  import org.hipparchus.stat.StatUtils;
37  import org.hipparchus.stat.descriptive.AbstractStorelessUnivariateStatistic;
38  import org.hipparchus.stat.descriptive.AggregatableStatistic;
39  import org.hipparchus.stat.descriptive.StorelessUnivariateStatistic;
40  import org.hipparchus.util.FastMath;
41  import org.hipparchus.util.MathArrays;
42  
43  /**
44   * A {@link StorelessUnivariateStatistic} estimating percentiles using the
45   * <a href=http://dimacs.rutgers.edu/~graham/pubs/papers/nquantiles.pdf>RANDOM</a>
46   * Algorithm.
47   * <p>
48   * Storage requirements for the RANDOM algorithm depend on the desired
49   * accuracy of quantile estimates. Quantile estimate accuracy is defined as follows.
50   * <p>
51   * Let \(X\) be the set of all data values consumed from the stream and let \(q\)
52   * be a quantile (measured between 0 and 1) to be estimated. If <ul>
53   * <li>\(\epsilon\) is the configured accuracy</li>
54   * <li> \(\hat{q}\) is a RandomPercentile estimate for \(q\) (what is returned
55   *      by {@link #getResult()} or {@link #getResult(double)}) with \(100q\) as
56   *      actual parameter)</li>
57   * <li> \(rank(\hat{q}) = |\{x \in X : x &lt; \hat{q}\}|\) is the actual rank of
58   *      \(\hat{q}\) in the full data stream</li>
59   * <li>\(n = |X|\) is the number of observations</li></ul>
60   * then we can expect \((q - \epsilon)n &lt; rank(\hat{q}) &lt; (q + \epsilon)n\).
61   * <p>
62   * The algorithm maintains \(\left\lceil {log_{2}(1/\epsilon)}\right\rceil + 1\) buffers
63   * of size \(\left\lceil {1/\epsilon \sqrt{log_2(1/\epsilon)}}\right\rceil\).  When
64   * {@code epsilon} is set to the default value of \(10^{-4}\), this makes 15 buffers
65   * of size 36,453.
66   * <p>
67   * The algorithm uses the buffers to maintain samples of data from the stream.  Until
68   * all buffers are full, the entire sample is stored in the buffers.
69   * If one of the {@code getResult} methods is called when all data are available in memory
70   * and there is room to make a copy of the data (meaning the combined set of buffers is
71   * less than half full), the {@code getResult} method delegates to a {@link Percentile}
72   * instance to compute and return the exact value for the desired quantile.
73   * For default {@code epsilon}, this means exact values will be returned whenever fewer than
74   * \(\left\lceil {15 \times 36453 / 2} \right\rceil = 273,398\) values have been consumed
75   * from the data stream.
76   * <p>
77   * When buffers become full, the algorithm merges buffers so that they effectively represent
78   * a larger set of values than they can hold. Subsequently, data values are sampled from the
79   * stream to fill buffers freed by merge operations.  Both the merging and the sampling
80   * require random selection, which is done using a {@code RandomGenerator}.  To get
81   * repeatable results for large data streams, users should provide {@code RandomGenerator}
82   * instances with fixed seeds. {@code RandomPercentile} itself does not reseed or otherwise
83   * initialize the {@code RandomGenerator} provided to it.  By default, it uses a
84   * {@link Well19937c} generator with the default seed.
85   * <p>
86   * Note: This implementation is not thread-safe.
87   */
88  public class RandomPercentile
89      extends AbstractStorelessUnivariateStatistic implements StorelessUnivariateStatistic,
90      AggregatableStatistic<RandomPercentile>, Serializable {
91  
92      /** Default quantile estimation error setting */
93      public static final double DEFAULT_EPSILON = 1e-4;
94      /** Serialization version id */
95      private static final long serialVersionUID = 1L;
96      /** Storage size of each buffer */
97      private final int s;
98      /** Maximum number of buffers minus 1 */
99      private final int h;
100     /** Data structure used to manage buffers */
101     private final BufferMap bufferMap;
102     /** Bound on the quantile estimation error */
103     private final double epsilon;
104     /** Source of random data */
105     private final RandomGenerator randomGenerator;
106     /** Number of elements consumed from the input data stream */
107     private long n;
108     /** Buffer currently being filled */
109     private Buffer currentBuffer;
110 
111     /**
112      * Constructs a {@code RandomPercentile} with quantile estimation error
113      * {@code epsilon} using {@code randomGenerator} as its source of random data.
114      *
115      * @param epsilon bound on quantile estimation error (see class javadoc)
116      * @param randomGenerator PRNG used in sampling and merge operations
117      * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
118      */
119     public RandomPercentile(double epsilon, RandomGenerator randomGenerator) {
120         if (epsilon <= 0) {
121             throw new MathIllegalArgumentException(LocalizedCoreFormats.NUMBER_TOO_SMALL,
122                                                    epsilon, 0);
123         }
124         this.h = (int) FastMath.ceil(log2(1/epsilon));
125         this.s = (int) FastMath.ceil(FastMath.sqrt(log2(1/epsilon)) / epsilon);
126         this.randomGenerator = randomGenerator;
127         bufferMap = new BufferMap(h + 1, s, randomGenerator);
128         currentBuffer = bufferMap.create(0);
129         this.epsilon = epsilon;
130     }
131 
132     /**
133      * Constructs a {@code RandomPercentile} with default estimation error
134      * using {@code randomGenerator} as its source of random data.
135      *
136      * @param randomGenerator PRNG used in sampling and merge operations
137      * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
138      */
139     public RandomPercentile(RandomGenerator randomGenerator) {
140         this(DEFAULT_EPSILON, randomGenerator);
141     }
142 
143     /**
144      * Constructs a {@code RandomPercentile} with quantile estimation error
145      * {@code epsilon} using the default PRNG as source of random data.
146      *
147      * @param epsilon bound on quantile estimation error (see class javadoc)
148      * @throws MathIllegalArgumentException if percentile is not in the range [0, 100]
149      */
150     public RandomPercentile(double epsilon) {
151         this(epsilon, new Well19937c());
152     }
153 
154     /**
155      * Constructs a {@code RandomPercentile} with quantile estimation error
156      * set to the default ({@link #DEFAULT_EPSILON}), using the default PRNG
157      * as source of random data.
158      */
159     public RandomPercentile() {
160         this(DEFAULT_EPSILON, new Well19937c());
161     }
162 
163     /**
164      * Copy constructor, creates a new {@code RandomPercentile} identical
165      * to the {@code original}.  Note: the RandomGenerator used by the new
166      * instance is referenced, not copied - i.e., the new instance shares
167      * a generator with the original.
168      *
169      * @param original the {@code PSquarePercentile} instance to copy
170      */
171     public RandomPercentile(RandomPercentile original) {
172         super();
173         this.h = original.h;
174         this.n = original.n;
175         this.s = original.s;
176         this.epsilon = original.epsilon;
177         this.bufferMap = new BufferMap(original.bufferMap);
178         this.randomGenerator = original.randomGenerator;
179         Iterator<Buffer> iterator = bufferMap.iterator();
180         Buffer current = null;
181         Buffer curr = null;
182         // See if there is a partially filled buffer - that will be currentBuffer
183         while (current == null && iterator.hasNext()) {
184             curr = iterator.next();
185             if (curr.hasCapacity()) {
186                 current = curr;
187             }
188         }
189         // If there is no partially filled buffer, just assign the last one.
190         // Next increment() will find no capacity and create a new one or trigger
191         // a merge.
192         this.currentBuffer = current == null ? curr : current;
193     }
194 
195     @Override
196     public long getN() {
197         return n;
198     }
199 
200     /**
201      * Returns an estimate of the given percentile, computed using the designated
202      * array segment as input data.
203      *
204      * @param values source of input data
205      * @param begin position of the first element of the values array to include
206      * @param length number of array elements to include
207      * @param percentile desired percentile (scaled 0 - 100)
208      *
209      * @return estimated percentile
210      * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
211      */
212     public double evaluate(final double percentile, final double[] values, final int begin, final int length)
213         throws MathIllegalArgumentException {
214         if (MathArrays.verifyValues(values, begin, length)) {
215             RandomPercentile randomPercentile = new RandomPercentile(this.epsilon,
216                                                                      this.randomGenerator);
217             randomPercentile.incrementAll(values, begin, length);
218             return randomPercentile.getResult(percentile);
219         }
220         return Double.NaN;
221     }
222 
223     /**
224      * Returns an estimate of the median, computed using the designated
225      * array segment as input data.
226      *
227      * @param values source of input data
228      * @param begin position of the first element of the values array to include
229      * @param length number of array elements to include
230      *
231      * @return estimated percentile
232      * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
233      */
234     @Override
235     public double evaluate(final double[] values, final int begin, final int length) {
236         return evaluate(50d, values, begin, length);
237     }
238 
239     /**
240      * Returns an estimate of percentile over the given array.
241      *
242      * @param values source of input data
243      * @param percentile desired percentile (scaled 0 - 100)
244      *
245      * @return estimated percentile
246      * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
247      */
248     public double evaluate(final double percentile, final double[] values) {
249         return evaluate(percentile, values, 0, values.length);
250     }
251 
252     @Override
253     public RandomPercentile copy() {
254         return new RandomPercentile(this);
255     }
256 
257     @Override
258     public void clear() {
259         n = 0;
260         bufferMap.clear();
261         currentBuffer = bufferMap.create(0);
262     }
263 
264     /**
265      * Returns an estimate of the median.
266      */
267     @Override
268     public double getResult() {
269         return getResult(50d);
270 
271     }
272 
273     /**
274      * Returns an estimate of the given percentile.
275      *
276      * @param percentile desired percentile (scaled 0 - 100)
277      * @return estimated percentile
278      * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
279      */
280     public double getResult(double percentile) {
281         if (percentile > 100 || percentile < 0) {
282             throw new MathIllegalArgumentException(LocalizedCoreFormats.OUT_OF_RANGE,
283                                                    percentile, 0, 100);
284         }
285         // Convert to internal quantile scale
286         final double q = percentile / 100;
287         // First get global min and max to bound search.
288         double min = Double.POSITIVE_INFINITY;
289         double max = Double.NEGATIVE_INFINITY;
290         double bMin;
291         double bMax;
292         Iterator<Buffer> bufferIterator = bufferMap.iterator();
293         while (bufferIterator.hasNext()) {
294             Buffer buffer = bufferIterator.next();
295             bMin = buffer.min();
296             if (bMin < min) {
297                 min = bMin;
298             }
299             bMax = buffer.max();
300             if (bMax > max) {
301                 max = bMax;
302             }
303         }
304 
305         // Handle degenerate cases
306         if (Double.compare(q, 0d) == 0 || n == 1) {
307             return min;
308         }
309         if (Double.compare(q, 1) == 0) {
310             return max;
311         }
312         if (n == 0) {
313             return Double.NaN;
314         }
315 
316         // See if we have all data in memory and enough free memory to copy.
317         // If so, use Percentile to perform exact computation.
318         if (bufferMap.halfEmpty()) {
319             return new Percentile(percentile).evaluate(bufferMap.levelZeroData());
320         }
321 
322         // Compute target rank
323         final double targetRank = q * n;
324 
325         // Start with initial guess min + quantile * (max - min).
326         double estimate = min + q * (max - min);
327         double estimateRank = getRank(estimate);
328         double lower;
329         double upper;
330         if (estimateRank > targetRank) {
331             upper = estimate;
332             lower = min;
333         } else if (estimateRank < targetRank) {
334             lower = estimate;
335             upper = max;
336         } else {
337             return estimate;
338         }
339         final double eps = epsilon / 2;
340         final double rankTolerance = eps * n;
341         final double minWidth = eps / n;
342         double intervalWidth = FastMath.abs(upper - lower);
343         while (FastMath.abs(estimateRank - targetRank) > rankTolerance && intervalWidth > minWidth) {
344             if (estimateRank > targetRank) {
345                 upper = estimate;
346             } else {
347                 lower = estimate;
348             }
349             intervalWidth = upper - lower;
350             estimate = lower + intervalWidth / 2;
351             estimateRank = getRank(estimate);
352         }
353         return estimate;
354     }
355 
356     /**
357      * Gets the estimated rank of {@code value}, i.e.  \(|\{x \in X : x &lt; value\}|\)
358      * where \(X\) is the set of values that have been consumed from the stream.
359      *
360      * @param value value whose overall rank is sought
361      * @return estimated number of sample values that are strictly less than {@code value}
362      */
363     public double getRank(double value) {
364         double rankSum = 0;
365         Iterator<Buffer> bufferIterator = bufferMap.iterator();
366         while (bufferIterator.hasNext()) {
367             Buffer buffer = bufferIterator.next();
368             rankSum += buffer.rankOf(value) * FastMath.pow(2, buffer.level);
369         }
370         return rankSum;
371     }
372 
373     /**
374      * Returns the estimated quantile position of value in the dataset.
375      * Specifically, what is returned is an estimate of \(|\{x \in X : x &lt; value\}| / |X|\)
376      * where \(X\) is the set of values that have been consumed from the stream.
377      *
378      * @param value value whose quantile rank is sought.
379      * @return estimated proportion of sample values that are strictly less than {@code value}
380      */
381     public double getQuantileRank(double value) {
382         return getRank(value) / getN();
383     }
384 
385     @Override
386     public void increment(double d) {
387         n++;
388         if (!currentBuffer.hasCapacity()) { // Need to get a new buffer to fill
389             // First see if we have not yet created all the buffers
390             if (bufferMap.canCreate()) {
391                 final int level = (int) Math.ceil(Math.max(0, log2(n/(s * FastMath.pow(2, h - 1)))));
392                 currentBuffer = bufferMap.create(level);
393             } else { // All buffers have been created - need to merge to free one
394                 currentBuffer = bufferMap.merge();
395             }
396         }
397         currentBuffer.consume(d);
398     }
399 
400     /**
401      * Maintains a buffer of values sampled from the input data stream.
402      * <p>
403      * The {@link #level} of a buffer determines its sampling frequency.
404      * The {@link #consume(double)} method retains 1 out of every 2^level values
405      * read in from the stream.
406      * <p>
407      * The {@link #size} of the buffer is the number of values that it can store
408      * The buffer is considered full when it has consumed 2^level * size values.
409      * <p>
410      * The {@link #blockSize} of a buffer is 2^level.
411      * The consume method starts each block by generating a random integer in
412      * [0, blockSize - 1].  It then skips over all but the element with that offset
413      * in the block, retaining only the selected value. So after 2^level * size
414      * elements have been consumed, it will have retained size elements - one
415      * from each 2^level block.
416      * <p>
417      * The {@link #mergeWith(Buffer)} method merges this buffer with another one,
418      * The merge operation merges the data from the other buffer into this and clears
419      * the other buffer (so it can consume data). Both buffers have their level
420      * incremented by the merge. This operation is only used on full buffers.
421      */
422     private static class Buffer implements Serializable {
423         /** Serialization version id */
424         private static final long serialVersionUID = 1L;
425         /** Number of values actually stored in the buffer */
426         private final int size;
427         /** Data sampled from the stream */
428         private final double[] data;
429         /** PRNG used for merges and stream sampling */
430         private final RandomGenerator randomGenerator;
431         /** Level of the buffer */
432         private int level;
433         /** Block size  = 2^level */
434         private long blockSize;
435         /** Next location in backing array for stored (taken) value */
436         private int next;
437         /** Number of values consumed in current 2^level block of values from the stream */
438         private long consumed;
439         /** Index of next value to take in current 2^level block */
440         private long nextToTake;
441         /** ID */
442         private final UUID id;
443 
444         /**
445          * Creates a new buffer capable of retaining size values with the given level.
446          *
447          * @param size number of values the buffer can retain
448          * @param level the base 2 log of the sampling frequency
449          *        (one out of every 2^level values is retained)
450          * @param randomGenerator PRNG used for sampling and merge operations
451          */
452         Buffer(int size, int level, RandomGenerator randomGenerator) {
453             this.size = size;
454             data = new double[size];
455             this.level = level;
456             this.randomGenerator = randomGenerator;
457             this.id = UUID.randomUUID();
458             computeBlockSize();
459         }
460 
461         /**
462          * Sets blockSize and nextToTake based on level.
463          */
464         private void computeBlockSize() {
465             if (level == 0) {
466                 blockSize = 1;
467             } else {
468                 long product = 1;
469                 for (int i = 0; i < level; i++) {
470                     product *= 2;
471                 }
472                 blockSize = product;
473             }
474             if (blockSize > 1) {
475                 nextToTake = randomGenerator.nextLong(blockSize);
476             }
477         }
478 
479         /**
480          * Consumes a value from the input stream.
481          * <p>
482          * For each 2^level values consumed, one is added to the buffer.
483          * The buffer is not considered full until 2^level * size values
484          * have been consumed.
485          * <p>
486          * Sorts the data array if the consumption renders the buffer full.
487          * <p>
488          * There is no capacity check in this method.  Clients are expected
489          * to use {@link #hasCapacity()} before invoking this method.  If
490          * it is invoked on a full buffer, an ArrayIndexOutOfBounds exception
491          * will result.
492          *
493          * @param value value to consume from the stream
494          */
495         public void consume(double value) {
496             if (consumed == nextToTake) {
497                 data[next] = value;
498                 next++;
499             }
500             consumed++;
501             if (consumed == blockSize) {
502                 if (next == size) {   // Buffer is full
503                     Arrays.sort(data);
504                 } else {              // Reset in-block counter and nextToTake
505                     consumed = 0;
506                     if (blockSize > 1) {
507                         nextToTake = randomGenerator.nextLong(blockSize);
508                     }
509                 }
510             }
511         }
512 
513         /**
514          * Merges this with other.
515          * <p>
516          * After the merge, this will be the merged buffer and other will be free.
517          * Both will have level+1.
518          * Post-merge, other can be used to accept new data.
519          * <p>
520          * The contents of the merged buffer (this after the merge) are determined
521          * by randomly choosing one of the two retained elements in each of the
522          * [0...size - 1] positions in the two buffers.
523          * <p>
524          * This and other must have the same level and both must be full.
525          *
526          * @param other initially full other buffer at the same level as this.
527          * @throws MathIllegalArgumentException if either buffer is not full or they
528          * have different levels
529          */
530         public void mergeWith(Buffer other) {
531             // Make sure both this and other are full and have the same level
532             if (this.hasCapacity() || other.hasCapacity() || other.level != this.level) {
533                 throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
534             }
535             // Randomly select one of the two entries for each slot
536             for (int i = 0; i < size; i++) {
537                 if (randomGenerator.nextBoolean()) {
538                     data[i] = other.data[i];
539                 }
540             }
541             // Re-sort data
542             Arrays.sort(data);
543             // Bump level of both buffers
544             other.setLevel(level + 1);
545             this.setLevel(level + 1);
546             // Clear the free one (and compute new blocksize)
547             other.clear();
548         }
549 
550         /**
551          * Merge this into a higher-level buffer.
552          * <p>
553          * Does not alter this; but after the merge, higher may have some of its
554          * data replaced by data from this.  Levels are not changed for either buffer.
555          * <p>
556          * Probability of selection into the newly constituted higher buffer is weighted
557          * according to level. So for example, if this has level 0 and higher has level
558          * 2, the ith element of higher is 4 times more likely than the corresponding
559          * element of this to retain its spot.
560          * <p>
561          * This method is only used when aggregating RandomPercentile instances.
562          * <p>
563          * Preconditions:
564          * <ol><li> this.level < higher.level </li>
565          *     <li> this.size = higher.size </li>
566          *     <li> Both buffers are full </li>
567          * </ol>
568          *
569          * @param higher higher-level buffer to merge this into
570          * @throws MathIllegalArgumentException if the buffers have different sizes,
571          * either buffer is not full or this has level greater than or equal to higher
572          */
573         public void mergeInto(Buffer higher) {
574             // Check preconditions
575             if (this.size != higher.size || this.hasCapacity() || higher.hasCapacity() ||
576                     this.level >= higher.level) {
577                 throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
578             }
579             final int levelDifference = higher.level - this.level;
580             int m = 1;
581             for (int i = 0; i < levelDifference; i++) {
582                 m *= 2;
583             }
584             // Randomly select one of the two entries for each slot in higher, giving
585             // m-times higher weight to the entries of higher.
586             for (int i = 0; i < size; i++) {
587                 // data[i] <-> {0}, higher.data[i] <-> {1, ..., m}
588                 if (randomGenerator.nextInt(m + 1) == 0) {
589                     higher.data[i] = data[i];
590                 }
591             }
592             // Resort higher's data
593             Arrays.sort(higher.data);
594         }
595 
596         /**
597          * @return true if the buffer has capacity; false if it is full
598          */
599         public boolean hasCapacity() {
600             // Buffer has capacity if it has not yet set all of its data
601             // values or if it has but still has not finished its last block
602             return next < size || consumed < blockSize;
603         }
604 
605         /**
606          * Sets the level of the buffer.
607          *
608          * @param level new level value
609          */
610         public void setLevel(int level) {
611             this.level = level;
612         }
613 
614         /**
615          * Clears data, recomputes blockSize and resets consumed and nextToTake.
616          */
617         public void clear() {
618             consumed = 0;
619             next = 0;
620             computeBlockSize();
621         }
622 
623         /**
624          * Returns a copy of the data that has been added to the buffer
625          *
626          * @return possibly unsorted copy of the portion of the buffer that has been filled
627          */
628         public double[] getData() {
629             final double[] out = new double[next];
630             System.arraycopy(data, 0, out, 0, next);
631             return out;
632         }
633 
634         /**
635          * Returns the ordinal rank of value among the sampled values in this buffer.
636          *
637          * @param value value whose rank is sought
638          * @return |{v in data : v < value}|
639          */
640         public int rankOf(double value) {
641             int ret = 0;
642             if (!hasCapacity()) { // Full sorted buffer, can do binary search
643                 ret = Arrays.binarySearch(data, value);
644                 if (ret < 0) {
645                     return -ret - 1;
646                 } else {
647                     return ret;
648                 }
649             } else { // have to count - not sorted yet and can't sort yet
650                 for (int i = 0; i < next; i++) {
651                     if (data[i] < value) {
652                         ret++;
653                     }
654                 }
655                 return ret;
656             }
657         }
658 
659         /**
660          * @return the smallest value held in this buffer
661          */
662         public double min() {
663             if (!hasCapacity()) {
664                 return data[0];
665             } else {
666                 return StatUtils.min(getData());
667             }
668         }
669 
670         /**
671          * @return the largest value held in this buffer
672          */
673         public double max() {
674             if (!hasCapacity()) {
675                 return data[data.length - 1];
676             } else {
677                 return StatUtils.max(getData());
678             }
679         }
680 
681         /**
682          * @return the level of this buffer
683          */
684         public int getLevel() {
685             return level;
686         }
687 
688         /**
689          * @return the id
690          */
691         public UUID getId() {
692             return id;
693         }
694     }
695 
696     /**
697      * Computes base 2 log of the argument.
698      *
699      * @param x input value
700      * @return the value y such that 2^y = x
701      */
702     private static double log2(double x) {
703         return Math.log(x) / Math.log(2);
704     }
705 
706     /**
707      * A map structure to hold the buffers.
708      * Keys are levels and values are lists of buffers at the given level.
709      * Overall capacity is limited by the total number of buffers.
710      */
711     private static class BufferMap implements Iterable<Buffer>, Serializable {
712         /** Serialization version ID */
713         private static final long serialVersionUID = 1L;
714         /** Total number of buffers that can be created - cap for count */
715         private final int capacity;
716         /** PRNG used in merges */
717         private final RandomGenerator randomGenerator;
718         /** Total count of all buffers */
719         private int count;
720         /** Uniform buffer size */
721         private final int bufferSize;
722         /** Backing store for the buffer map. Keys are levels, values are lists of registered buffers. */
723         private final Map<Integer,List<Buffer>> registry;
724         /** Maximum buffer level */
725         private int maxLevel;
726 
727         /**
728          * Creates a BufferMap that can manage up to capacity buffers.
729          * Buffers created by the pool with have size = buffersize.
730          *
731          * @param capacity cap on the number of buffers
732          * @param bufferSize size of each buffer
733          * @param randomGenerator RandomGenerator to use in merges
734          */
735         BufferMap(int capacity, int bufferSize, RandomGenerator randomGenerator) {
736             this.bufferSize = bufferSize;
737             this.capacity = capacity;
738             this.randomGenerator = randomGenerator;
739             this.registry = new HashMap<>();
740         }
741 
742         /**
743          * Copy constructor.
744          *
745          * @param original BufferMap to copy
746          */
747         BufferMap(BufferMap original) {
748             super();
749             this.bufferSize = original.bufferSize;
750             this.capacity = original.capacity;
751             this.count = 0;
752             this.randomGenerator = original.randomGenerator;
753             this.registry = new HashMap<>();
754             Iterator<Buffer> iterator = original.iterator();
755             while (iterator.hasNext()) {
756                 final Buffer current = iterator.next();
757                 // Create and register a new buffer at the same level
758                 final Buffer newCopy = create(current.getLevel());
759                 // Consume the data
760                 final double[] data = current.getData();
761                 for (double value : data) {
762                     newCopy.consume(value);
763                 }
764             }
765         }
766 
767         /**
768          * Tries to create a buffer with the given level.
769          * <p>
770          * If there is capacity to create a new buffer (i.e., fewer than
771          * count have been created), a new buffer is created with the given
772          * level, registered and returned.  If capacity has been reached,
773          * null is returned.
774          *
775          * @param level level of the new buffer
776          * @return an empty buffer or null if a buffer can't be provided
777          */
778         public Buffer create(int level) {
779             if (!canCreate()) {
780                 return null;
781             }
782             count++;
783             Buffer buffer = new Buffer(bufferSize, level, randomGenerator);
784             List<Buffer> bufferList = registry.get(level);
785             if (bufferList == null) {
786                 bufferList = new ArrayList<>();
787                 registry.put(level, bufferList);
788             }
789             bufferList.add(buffer);
790             if (level > maxLevel) {
791                 maxLevel = level;
792             }
793             return buffer;
794         }
795 
796         /**
797          * Returns true if there is capacity to create a new buffer.
798          *
799          * @return true if fewer than capacity buffers have been created.
800          */
801         public boolean canCreate() {
802             return count < capacity;
803         }
804 
805         /**
806          * Returns true if we have used less than half of the allocated storage.
807          * <p>
808          * Includes a check to make sure all buffers have level 0;
809          * but this should always be the case.
810          * <p>
811          * When this method returns true, we have all consumed data in storage
812          * and enough space to make a copy of the combined dataset.
813          *
814          * @return true if all buffers have level 0 and less than half of the
815          * available storage has been used
816          */
817         public boolean halfEmpty() {
818             return count * 2 < capacity &&
819                     registry.size() == 1 &&
820                     registry.containsKey(0);
821         }
822 
823         /**
824          * Returns a fresh copy of all data from level 0 buffers.
825          *
826          * @return combined data stored in all level 0 buffers
827          */
828         public double[] levelZeroData() {
829             List<Buffer> levelZeroBuffers = registry.get(0);
830             // First determine the combined size of the data
831             int length = 0;
832             for (Buffer buffer : levelZeroBuffers) {
833                 if (!buffer.hasCapacity()) { // full buffer
834                     length += buffer.size;
835                 } else {
836                     length += buffer.next;  // filled amount
837                 }
838             }
839             // Copy the data
840             int pos = 0;
841             int currLen;
842             final double[] out = new double[length];
843             for (Buffer buffer : levelZeroBuffers) {
844                 if (!buffer.hasCapacity()) {
845                     currLen = buffer.size;
846                 } else {
847                     currLen =  buffer.next;
848                 }
849                 System.arraycopy(buffer.data, 0, out, pos, currLen);
850                 pos += currLen;
851             }
852             return out;
853         }
854 
855         /**
856          * Finds the lowest level l where there exist at least two buffers,
857          * merges them to create a new buffer with level l+1 and returns
858          * a free buffer with level l+1.
859          *
860          * @return free buffer that can accept data
861          */
862         public Buffer merge() {
863             int l = 0;
864             List<Buffer> mergeCandidates = null;
865             // Find the lowest level containing at least two buffers
866             while (mergeCandidates == null && l <= maxLevel) {
867                 final List<Buffer> bufferList = registry.get(l);
868                 if (bufferList != null && bufferList.size() > 1) {
869                     mergeCandidates = bufferList;
870                 } else {
871                     l++;
872                 }
873             }
874             if (mergeCandidates == null) {
875                 // Should never happen
876                 throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
877             }
878             Buffer buffer1 = mergeCandidates.get(0);
879             Buffer buffer2 = mergeCandidates.get(1);
880             // Remove buffers to be merged
881             mergeCandidates.remove(0);
882             mergeCandidates.remove(0);
883             // If these are the last level-l buffers, remove the empty list
884             if (registry.get(l).isEmpty()) {
885                 registry.remove(l);
886             }
887             // Merge the buffers
888             buffer1.mergeWith(buffer2);
889             // Now both buffers have level l+1; buffer1 is full and buffer2 is free.
890             // Register both buffers
891             register(buffer1);
892             register(buffer2);
893 
894             // Return the free one
895             return buffer2;
896         }
897 
898         /**
899          * Clears the buffer map.
900          */
901         public void clear() {
902             for (List<Buffer> bufferList : registry.values()) {
903                 bufferList.clear();
904             }
905             registry.clear();
906             count = 0;
907         }
908 
909         /**
910          * Registers a buffer.
911          *
912          * @param buffer Buffer to be registered.
913          */
914         public void register(Buffer buffer) {
915             final int level = buffer.getLevel();
916             List<Buffer> list = registry.get(level);
917             if (list == null) {
918                 list = new ArrayList<>();
919                 registry.put(level, list);
920                 if (level > maxLevel) {
921                     maxLevel = level;
922                 }
923             }
924             list.add(buffer);
925         }
926 
927         /**
928          * De-register a buffer, without clearing it.
929          *
930          * @param buffer Buffer to be de-registered
931          * @throws IllegalStateException if the buffer is not registered
932          */
933         public void deRegister(Buffer buffer) {
934             final Iterator<Buffer> iterator = registry.get(buffer.getLevel()).iterator();
935             while (iterator.hasNext()) {
936                 if (iterator.next().getId().equals(buffer.getId())) {
937                     iterator.remove();
938                     return;
939                 }
940             }
941             throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
942         }
943 
944         /**
945          * Returns an iterator over all of the buffers. Iteration goes by level, with
946          * level 0 first.  Assumes there are no empty buffer lists.
947          */
948         @Override
949         public Iterator<Buffer> iterator() {
950             return new Iterator<Buffer>() {
951 
952                 /** Outer loop iterator, from level to level. */
953                 private final Iterator<Integer> levelIterator = registry.keySet().iterator();
954 
955                 /** List of buffers at current level. */
956                 private List<Buffer> currentList = registry.get(levelIterator.next());
957 
958                 /** Inner loop iterator, from buffer to buffer. */
959                 private Iterator<Buffer> bufferIterator =
960                         currentList == null ? null : currentList.iterator();
961 
962                 @Override
963                 public boolean hasNext() {
964                     if (bufferIterator == null) {
965                         return false;
966                     }
967                     if (bufferIterator.hasNext()) {
968                         return true;
969                     }
970                     // The current level iterator has just finished, try to bump level
971                     if (levelIterator.hasNext()) {
972                         List<Buffer> currentList = registry.get(levelIterator.next());
973                         bufferIterator = currentList.iterator();
974                         return true;
975                     } else {
976                         // Nothing left, signal this by nulling bufferIterator
977                         bufferIterator = null;
978                         return false;
979                     }
980                 }
981 
982                 @Override
983                 public Buffer next() {
984                      if (hasNext()) {
985                          return bufferIterator.next();
986                      }
987                      throw new NoSuchElementException();
988                 }
989 
990                 @Override
991                 public void remove() {
992                     throw new UnsupportedOperationException();
993                 }
994             };
995         }
996 
997         /**
998          * Absorbs the data in other into this, merging buffers as necessary to trim
999          * the aggregate down to capacity. This method is only used when aggregating
1000          * RandomPercentile instances.
1001          *
1002          * @param other other BufferMap to merge in
1003          */
1004         public void absorb(BufferMap other) {
1005             // Add all of other's buffers to the map - possibly exceeding cap
1006             boolean full = true;
1007             Iterator<Buffer> otherIterator = other.iterator();
1008             while (otherIterator.hasNext()) {
1009                 Buffer buffer = otherIterator.next();
1010                 if (buffer.hasCapacity()) {
1011                     full = false;
1012                 }
1013                 register(buffer);
1014                 count++;
1015             }
1016             // Now eliminate the excess by merging
1017             while (count > capacity || (count == capacity && full)) {
1018                 mergeUp();
1019                 count--;
1020             }
1021         }
1022 
1023         /**
1024          * Find two buffers, first and second, of minimal level. Then merge
1025          * first into second and discard first.
1026          * <p>
1027          * If the buffers have different levels, make second the higher level
1028          * buffer and make probability of selection in the merge proportional
1029          * to level weight ratio.
1030          * <p>
1031          * This method is only used when aggregating RandomPercentile instances.
1032          */
1033         public void mergeUp() {
1034             // Find two minimum-level buffers to merge
1035             // Loop depends on two invariants:
1036             //   0) iterator goes in level order
1037             //   1) there are no empty lists in the registry
1038             Iterator<Buffer> bufferIterator = iterator();
1039             Buffer first = null;
1040             Buffer second = null;
1041             while ((first == null || second == null) && bufferIterator.hasNext()) {
1042                 Buffer buffer = bufferIterator.next();
1043                 if (!buffer.hasCapacity()) { // Skip not full buffers
1044                     if (first == null) {
1045                         first = buffer;
1046                     } else {
1047                         second = buffer;
1048                     }
1049                 }
1050             }
1051             if (first == null || second == null || first.level > second.level) {
1052                 throw new MathIllegalStateException(LocalizedCoreFormats.INTERNAL_ERROR);
1053             }
1054             // Merge first into second and deregister first.
1055             // Assumes that first has level <= second (checked above).
1056             if (first.getLevel() == second.getLevel()) {
1057                 deRegister(first);
1058                 deRegister(second);
1059                 second.mergeWith(first);
1060                 register(second);
1061             } else {
1062                 deRegister(first);
1063                 first.mergeInto(second);
1064             }
1065         }
1066     }
1067 
1068     /**
1069      * Computes the given percentile by combining the data from the collection
1070      * of aggregates. The result describes the combined sample of all data added
1071      * to any of the aggregates.
1072      *
1073      * @param percentile desired percentile (scaled 0-100)
1074      * @param aggregates RandomPercentile instances to combine data from
1075      * @return estimate of the given percentile using combined data from the aggregates
1076      * @throws MathIllegalArgumentException if percentile is out of the range [0, 100]
1077      */
1078     public double reduce(double percentile, Collection<RandomPercentile> aggregates) {
1079         if (percentile > 100 || percentile < 0) {
1080             throw new MathIllegalArgumentException(LocalizedCoreFormats.OUT_OF_RANGE,
1081                                                    percentile, 0, 100);
1082         }
1083 
1084         // First see if we can copy all data and just compute exactly.
1085         // The following could be improved to verify that all have only level 0 buffers
1086         // and the sum of the data sizes is less than 1/2 total capacity.  Here we
1087         // just check that each of the aggregates is less than half full.
1088         Iterator<RandomPercentile> iterator = aggregates.iterator();
1089         boolean small = true;
1090         while (small && iterator.hasNext()) {
1091             small = iterator.next().bufferMap.halfEmpty();
1092         }
1093         if (small) {
1094             iterator = aggregates.iterator();
1095             double[] combined = {};
1096             while (iterator.hasNext()) {
1097                combined = MathArrays.concatenate(combined, iterator.next().bufferMap.levelZeroData());
1098             }
1099             final Percentile exactP = new Percentile(percentile);
1100             return exactP.evaluate(combined);
1101         }
1102 
1103         // Below largely duplicates code in getResult(percentile).
1104         // Common binary search code with function parameter should be factored out.
1105 
1106         // Get global max and min to bound binary search and total N
1107         double min = Double.POSITIVE_INFINITY;
1108         double max = Double.NEGATIVE_INFINITY;
1109         double combinedN = 0;
1110         iterator = aggregates.iterator();
1111         while (iterator.hasNext()) {
1112             final RandomPercentile curr = iterator.next();
1113             final double curMin = curr.getResult(0);
1114             final double curMax = curr.getResult(100);
1115             if (curMin < min) {
1116                 min = curMin;
1117             }
1118             if (curMax > max) {
1119                 max = curMax;
1120             }
1121             combinedN += curr.getN();
1122         }
1123 
1124         final double q = percentile / 100;
1125         // Handle degenerate cases
1126         if (Double.compare(q, 0d) == 0) {
1127             return min;
1128         }
1129         if (Double.compare(q, 1) == 0) {
1130             return max;
1131         }
1132 
1133         // Compute target rank
1134         final double targetRank = q * combinedN;
1135 
1136         // Perform binary search using aggregated rank computation
1137         // Start with initial guess min + quantile * (max - min).
1138         double estimate = min + q * (max - min);
1139         double estimateRank = getAggregateRank(estimate, aggregates);
1140         double lower;
1141         double upper;
1142         if (estimateRank > targetRank) {
1143             upper = estimate;
1144             lower = min;
1145         } else if (estimateRank < targetRank) {
1146             lower = estimate;
1147             upper = max;
1148         } else {
1149             return estimate;
1150         }
1151         final double eps = epsilon / 2;
1152         double intervalWidth = FastMath.abs(upper - lower);
1153         while (FastMath.abs(estimateRank / combinedN - q) > eps && intervalWidth > eps / combinedN) {
1154             if (estimateRank == targetRank) {
1155                 return estimate;
1156             }
1157             if (estimateRank > targetRank) {
1158                 upper = estimate;
1159             } else {
1160                 lower = estimate;
1161             }
1162             intervalWidth = FastMath.abs(upper - lower);
1163             estimate = lower + intervalWidth / 2;
1164             estimateRank = getAggregateRank(estimate, aggregates);
1165         }
1166         return estimate;
1167     }
1168 
1169     /**
1170      * Computes the estimated rank of value in the combined dataset of the aggregates.
1171      * Sums the values from {@link #getRank(double)}.
1172      *
1173      * @param value value whose rank is sought
1174      * @param aggregates collection to aggregate rank over
1175      * @return estimated number of elements in the combined dataset that are less than value
1176      */
1177     public double getAggregateRank(double value, Collection<RandomPercentile> aggregates) {
1178         double result = 0;
1179         final Iterator<RandomPercentile> iterator = aggregates.iterator();
1180         while (iterator.hasNext()) {
1181             result += iterator.next().getRank(value);
1182         }
1183         return result;
1184     }
1185 
1186     /**
1187      * Returns the estimated quantile position of value in the combined dataset of the aggregates.
1188      * Specifically, what is returned is an estimate of \(|\{x \in X : x &lt; value\}| / |X|\)
1189      * where \(X\) is the set of values that have been consumed from all of the datastreams
1190      * feeding the aggregates.
1191      *
1192      * @param value value whose quantile rank is sought.
1193      * @param aggregates collection of RandomPercentile instances being combined
1194      * @return estimated proportion of combined sample values that are strictly less than {@code value}
1195      */
1196     public double getAggregateQuantileRank(double value, Collection<RandomPercentile> aggregates) {
1197         return getAggregateRank(value, aggregates) / getAggregateN(aggregates);
1198     }
1199 
1200     /**
1201      * Returns the total number of values that have been consumed by the aggregates.
1202      *
1203      * @param aggregates collection of RandomPercentile instances whose combined sample size is sought
1204      * @return total number of values that have been consumed by the aggregates
1205      */
1206     public double getAggregateN(Collection<RandomPercentile> aggregates) {
1207         double result = 0;
1208         final Iterator<RandomPercentile> iterator = aggregates.iterator();
1209         while (iterator.hasNext()) {
1210             result += iterator.next().getN();
1211         }
1212         return result;
1213     }
1214 
1215     /**
1216      * Aggregates the provided instance into this instance.
1217      * <p>
1218      * Other must have the same buffer size as this. If the combined data size
1219      * exceeds the maximum storage configured for this instance, buffers are
1220      * merged to create capacity. If all that is needed is computation of
1221      * aggregate results, {@link #reduce(double, Collection)} is faster,
1222      * may be more accurate and does not require the buffer sizes to be the same.
1223      *
1224      * @param other the instance to aggregate into this instance
1225      * @throws NullArgumentException if the input is null
1226      * @throws IllegalArgumentException if other has different buffer size than this
1227      */
1228     @Override
1229     public void aggregate(RandomPercentile other)
1230         throws NullArgumentException {
1231         if (other == null) {
1232             throw new NullArgumentException();
1233         }
1234         if (other.s != s) {
1235             throw new MathIllegalArgumentException(LocalizedCoreFormats.INTERNAL_ERROR);
1236         }
1237         bufferMap.absorb(other.bufferMap);
1238         n += other.n;
1239     }
1240 
1241     /**
1242      * Returns the maximum number of {@code double} values that a {@code RandomPercentile}
1243      * instance created with the given {@code epsilon} value will retain in memory.
1244      * <p>
1245      * If the number of values that have been consumed from the stream is less than 1/2
1246      * of this value, reported statistics are exact.
1247      *
1248      * @param epsilon bound on the relative quantile error (see class javadoc)
1249      * @return upper bound on the total number of primitive double values retained in memory
1250      * @throws MathIllegalArgumentException if epsilon is not in the interval (0,1)
1251      */
1252     public static long maxValuesRetained(double epsilon) {
1253         if (epsilon >= 1) {
1254             throw new MathIllegalArgumentException(
1255                     LocalizedCoreFormats.NUMBER_TOO_LARGE_BOUND_EXCLUDED, epsilon, 1);
1256         }
1257         if (epsilon <= 0) {
1258             throw new MathIllegalArgumentException(
1259                     LocalizedCoreFormats.NUMBER_TOO_SMALL_BOUND_EXCLUDED, epsilon, 0);
1260         }
1261         final long h = (long) FastMath.ceil(log2(1/epsilon));
1262         final long s = (long) FastMath.ceil(FastMath.sqrt(log2(1/epsilon)) / epsilon);
1263         return (h+1) * s;
1264     }
1265 }