Skip to content

Commit a8231cb

Browse files
authored
Merge pull request #499 from apache/ebpps
EB-PPS Sampling sketch
2 parents 10f1048 + d75f04d commit a8231cb

12 files changed

+1415
-54
lines changed

src/main/java/org/apache/datasketches/common/Family.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,12 @@ public enum Family {
146146
/**
147147
* CountMin Sketch
148148
*/
149-
COUNTMIN(18, "COUNTMIN", 2, 2);
149+
COUNTMIN(18, "COUNTMIN", 2, 2),
150+
151+
/**
152+
* Exact and Bounded, Probability Proportional to Size (EBPPS)
153+
*/
154+
EBPPS(19, "EBPPS", 1, 5);
150155

151156
private static final Map<Integer, Family> lookupID = new HashMap<>();
152157
private static final Map<String, Family> lookupFamName = new HashMap<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.datasketches.sampling;
21+
22+
import static org.apache.datasketches.common.Util.LS;
23+
24+
import java.lang.reflect.Array;
25+
import java.util.ArrayList;
26+
import java.util.Random;
27+
import java.util.concurrent.ThreadLocalRandom;
28+
29+
import org.apache.datasketches.common.SketchesArgumentException;
30+
31+
// this is a supporting class used to hold the raw data sample
32+
class EbppsItemsSample<T> {
33+
34+
private double c_; // Current sample size, including fractional part
35+
private T partialItem_; // a sample item corresponding to a partial weight
36+
private ArrayList<T> data_; // full sample items
37+
38+
private Random rand_; // ThreadLocalRandom.current() in general
39+
40+
// basic constructor
41+
EbppsItemsSample(final int reservedSize) {
42+
c_ = 0.0;
43+
data_ = new ArrayList<>(reservedSize);
44+
rand_ = ThreadLocalRandom.current();
45+
}
46+
47+
// copy constructor used during merge
48+
EbppsItemsSample(final EbppsItemsSample<T> other) {
49+
c_ = other.c_;
50+
partialItem_ = other.partialItem_;
51+
data_ = new ArrayList<>(other.data_);
52+
rand_ = other.rand_;
53+
}
54+
55+
// constructor used for deserialization and testing
56+
// does NOT copy the incoming ArrayList since this is an internal
57+
// class's package-private constructor, not something directly
58+
// taking user data
59+
EbppsItemsSample(ArrayList<T> data, T partialItem, final double c) {
60+
if (c < 0.0 || Double.isNaN(c) || Double.isInfinite(c))
61+
throw new SketchesArgumentException("C must be nonnegative and finite. Found: " + c);
62+
63+
c_ = c;
64+
partialItem_ = partialItem;
65+
data_ = data;
66+
rand_ = ThreadLocalRandom.current();
67+
}
68+
69+
// Used in lieu of a constructor to populate a temporary sample
70+
// with data before immediately merging it. This approach
71+
// avoids excessive object allocation calls.
72+
// rand_ is not set since it is not expected to be used from
73+
// this object
74+
void replaceContent(final T item, final double theta) {
75+
if (theta < 0.0 || theta > 1.0 || Double.isNaN(theta))
76+
throw new SketchesArgumentException("Theta must be in the range [0.0, 1.0]. Found: " + theta);
77+
78+
c_ = theta;
79+
if (theta == 1.0) {
80+
if (data_ != null && data_.size() == 1) {
81+
data_.set(0, item);
82+
} else {
83+
data_ = new ArrayList<T>(1);
84+
data_.add(item);
85+
}
86+
partialItem_ = null;
87+
} else {
88+
data_ = null;
89+
partialItem_ = item;
90+
}
91+
}
92+
93+
void reset() {
94+
c_ = 0.0;
95+
partialItem_ = null;
96+
data_.clear();
97+
}
98+
99+
ArrayList<T> getSample() {
100+
final double cFrac = c_ % 1;
101+
final boolean includePartial = partialItem_ != null && rand_.nextDouble() < cFrac;
102+
final int resultSize = (data_ != null ? data_.size() : 0) + (includePartial ? 1 : 0);
103+
104+
if (resultSize == 0)
105+
return null;
106+
107+
ArrayList<T> result = new ArrayList<>(resultSize);
108+
if (data_ != null)
109+
result.addAll(data_);
110+
111+
if (includePartial)
112+
result.add(partialItem_);
113+
114+
return result;
115+
}
116+
117+
@SuppressWarnings("unchecked")
118+
T[] getAllSamples(final Class<?> clazz) {
119+
// Is it faster to use sublist and append 1?
120+
final T[] itemsArray = (T[]) Array.newInstance(clazz, getNumRetainedItems());
121+
int i = 0;
122+
if (data_ != null) {
123+
for (T item : data_) {
124+
if (item != null) {
125+
itemsArray[i++] = item;
126+
}
127+
}
128+
}
129+
if (partialItem_ != null)
130+
itemsArray[i] = partialItem_; // no need to increment i again
131+
132+
return itemsArray;
133+
}
134+
135+
// package-private for use in merge and serialization
136+
ArrayList<T> getFullItems() {
137+
return data_;
138+
}
139+
140+
// package-private for use in merge and serialization
141+
T getPartialItem() {
142+
return partialItem_;
143+
}
144+
145+
double getC() { return c_; }
146+
147+
boolean hasPartialItem() { return partialItem_ != null; }
148+
149+
// for testing to allow setting the seed
150+
void replaceRandom(Random r) {
151+
rand_ = r;
152+
}
153+
154+
void downsample(final double theta) {
155+
if (theta >= 1.0) return;
156+
157+
double newC = theta * c_;
158+
double newCInt = Math.floor(newC);
159+
double newCFrac = newC % 1;
160+
double cInt = Math.floor(c_);
161+
double cFrac = c_ % 1;
162+
163+
if (newCInt == 0.0) {
164+
// no full items retained
165+
if (rand_.nextDouble() > (cFrac / c_)) {
166+
swapWithPartialItem();
167+
}
168+
data_.clear();
169+
} else if (newCInt == cInt) {
170+
// no items deleted
171+
if (rand_.nextDouble() > (1 - theta * cFrac)/(1 - newCFrac)) {
172+
swapWithPartialItem();
173+
}
174+
} else {
175+
if (rand_.nextDouble() < theta * cFrac) {
176+
// subsample data in random order; last item is partial
177+
// create sample size newC then swapWithPartialItem()
178+
subsample((int) newCInt);
179+
swapWithPartialItem();
180+
} else {
181+
// create sample size newCInt + 1 then moveOneToPartialItem()
182+
subsample((int) newCInt + 1);
183+
moveOneToPartialItem();
184+
}
185+
}
186+
187+
if (newC == newCInt)
188+
partialItem_ = null;
189+
190+
c_ = newC;
191+
}
192+
193+
void merge(final EbppsItemsSample<T> other) {
194+
//double cInt = Math.floor(c_);
195+
double cFrac = c_ % 1;
196+
double otherCFrac = other.c_ % 1;
197+
198+
// update c_ here but do NOT recompute fractional part yet
199+
c_ += other.c_;
200+
201+
if (other.data_ != null)
202+
data_.addAll(other.data_);
203+
204+
// This modifies the original algorithm slightly due to numeric
205+
// precision issues. Specifically, the test if cFrac + otherCFrac == 1.0
206+
// happens before tests for < 1.0 or > 1.0 and can also be triggered
207+
// if c_ == floor(c_) (the updated value of c_, not the input).
208+
//
209+
// We can still run into issues where cFrac + otherCFrac == epsilon
210+
// and the first case would have ideally triggered. As a result, we must
211+
// check if the partial item exists before adding to the data_ vector.
212+
213+
if (cFrac == 0.0 && otherCFrac == 0.0) {
214+
partialItem_ = null;
215+
} else if (cFrac + otherCFrac == 1.0 || c_ == Math.floor(c_)) {
216+
if (rand_.nextDouble() <= cFrac) {
217+
if (partialItem_ != null) {
218+
data_.add(partialItem_);
219+
}
220+
} else {
221+
if (other.partialItem_ != null) {
222+
data_.add(other.partialItem_);
223+
}
224+
}
225+
partialItem_ = null;
226+
} else if (cFrac + otherCFrac < 1.0) {
227+
if (rand_.nextDouble() > cFrac / (cFrac + otherCFrac)) {
228+
partialItem_ = other.partialItem_;
229+
}
230+
} else { // cFrac + otherCFrac > 1
231+
if (rand_.nextDouble() <= (1 - cFrac) / ((1 - cFrac) + (1 - otherCFrac))) {
232+
data_.add(other.partialItem_);
233+
} else {
234+
data_.add(partialItem_);
235+
partialItem_ = other.partialItem_;
236+
}
237+
}
238+
}
239+
240+
@Override
241+
public String toString() {
242+
final StringBuilder sb = new StringBuilder();
243+
244+
sb.append(" sample:").append(LS);
245+
int idx = 0;
246+
for (T item : data_)
247+
sb.append("\t").append(idx++).append(":\t").append(item.toString()).append(LS);
248+
sb.append(" partial: ");
249+
if (partialItem_ != null)
250+
sb.append(partialItem_.toString()).append(LS);
251+
else
252+
sb.append("NULL").append(LS);
253+
254+
return sb.toString();
255+
}
256+
257+
void subsample(final int numSamples) {
258+
// we can perform a Fisher-Yates style shuffle, stopping after
259+
// numSamples points since subsequent swaps would only be
260+
// between items after num_samples. This is valid since a
261+
// point from anywhere in the initial array would be eligible
262+
// to end up in the final subsample.
263+
264+
if (numSamples == data_.size()) return;
265+
266+
int dataLen = data_.size();
267+
for (int i = 0; i < numSamples; ++i) {
268+
int j = i + rand_.nextInt(dataLen - i);
269+
// swap i and j
270+
T tmp = data_.get(i);
271+
data_.set(i, data_.get(j));
272+
data_.set(j, tmp);
273+
}
274+
275+
// clear anything beyond numSamples
276+
data_.subList(numSamples, data_.size()).clear();
277+
}
278+
279+
void swapWithPartialItem() {
280+
if (partialItem_ == null) {
281+
moveOneToPartialItem();
282+
} else {
283+
int idx = rand_.nextInt(data_.size());
284+
T tmp = partialItem_;
285+
partialItem_ = data_.get(idx);
286+
data_.set(idx, tmp);
287+
}
288+
}
289+
290+
void moveOneToPartialItem() {
291+
int idx = rand_.nextInt(data_.size());
292+
// swap selected item to end so we can delete it easily
293+
int lastIdx = data_.size() - 1;
294+
if (idx != lastIdx) {
295+
T tmp = data_.get(idx);
296+
data_.set(idx, data_.get(lastIdx));
297+
partialItem_ = tmp;
298+
} else {
299+
partialItem_ = data_.get(lastIdx);
300+
}
301+
302+
data_.remove(lastIdx);
303+
}
304+
305+
int getNumRetainedItems() {
306+
return (data_ != null ? data_.size() : 0)
307+
+ (partialItem_ != null ? 1 : 0);
308+
}
309+
}

0 commit comments

Comments
 (0)