Skip to content

Commit de578a9

Browse files
rionmonsterwuchong
authored andcommitted
[FLUSS-2123][flink] Added Flink Wrapper for Histogram Statistics
1 parent 57c0de7 commit de578a9

File tree

2 files changed

+217
-4
lines changed

2 files changed

+217
-4
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,7 @@ public long getCount() {
4141

4242
@Override
4343
public HistogramStatistics getStatistics() {
44-
45-
wrapped.getStatistics();
46-
47-
return null;
44+
return new FlinkHistogramStatistics(wrapped.getStatistics());
4845
}
4946

5047
private static class FlinkHistogramStatistics extends HistogramStatistics {
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.metrics;
19+
20+
import org.apache.fluss.metrics.Histogram;
21+
22+
import org.apache.flink.metrics.HistogramStatistics;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
/** Test for {@link FlinkHistogram}. */
32+
class FlinkHistogramTest {
33+
34+
private FlinkHistogram flinkHistogram;
35+
private TestFlussHistogram testFlussHistogram;
36+
37+
@BeforeEach
38+
void setUp() {
39+
testFlussHistogram = new TestFlussHistogram();
40+
flinkHistogram = new FlinkHistogram(testFlussHistogram);
41+
}
42+
43+
@Test
44+
void testUpdate() {
45+
flinkHistogram.update(100L);
46+
assertThat(testFlussHistogram.getUpdateCount()).isEqualTo(1);
47+
assertThat(testFlussHistogram.getLastUpdateValue()).isEqualTo(100L);
48+
}
49+
50+
@Test
51+
void testGetCount() {
52+
testFlussHistogram.setCount(5L);
53+
assertThat(flinkHistogram.getCount()).isEqualTo(5L);
54+
}
55+
56+
@Test
57+
void testGetStatisticsReturnsNonNull() {
58+
HistogramStatistics statistics = flinkHistogram.getStatistics();
59+
assertThat(statistics).isNotNull();
60+
}
61+
62+
@Test
63+
void testGetStatisticsGetMin() {
64+
testFlussHistogram.setMin(10L);
65+
HistogramStatistics statistics = flinkHistogram.getStatistics();
66+
assertThat(statistics.getMin()).isEqualTo(10L);
67+
}
68+
69+
@Test
70+
void testGetStatisticsGetMax() {
71+
testFlussHistogram.setMax(100L);
72+
HistogramStatistics statistics = flinkHistogram.getStatistics();
73+
assertThat(statistics.getMax()).isEqualTo(100L);
74+
}
75+
76+
@Test
77+
void testGetStatisticsGetMean() {
78+
testFlussHistogram.setMean(50.5);
79+
HistogramStatistics statistics = flinkHistogram.getStatistics();
80+
assertThat(statistics.getMean()).isEqualTo(50.5);
81+
}
82+
83+
@Test
84+
void testGetStatisticsGetStdDev() {
85+
testFlussHistogram.setStdDev(15.2);
86+
HistogramStatistics statistics = flinkHistogram.getStatistics();
87+
assertThat(statistics.getStdDev()).isEqualTo(15.2);
88+
}
89+
90+
@Test
91+
void testGetStatisticsGetQuantile() {
92+
testFlussHistogram.setQuantile(0.5, 25.0);
93+
HistogramStatistics statistics = flinkHistogram.getStatistics();
94+
assertThat(statistics.getQuantile(0.5)).isEqualTo(25.0);
95+
}
96+
97+
@Test
98+
void testGetStatisticsGetValues() {
99+
long[] expectedValues = {1L, 2L, 3L};
100+
testFlussHistogram.setValues(expectedValues);
101+
HistogramStatistics statistics = flinkHistogram.getStatistics();
102+
assertThat(statistics.getValues()).isEqualTo(expectedValues);
103+
}
104+
105+
@Test
106+
void testGetStatisticsSize() {
107+
testFlussHistogram.setSize(42);
108+
HistogramStatistics statistics = flinkHistogram.getStatistics();
109+
assertThat(statistics.size()).isEqualTo(42);
110+
}
111+
112+
/** Test implementation of Fluss Histogram for unit testing. */
113+
private static class TestFlussHistogram implements Histogram {
114+
private long count = 0;
115+
private long lastUpdateValue = 0;
116+
private int updateCount = 0;
117+
private long min = 0;
118+
private long max = 0;
119+
private double mean = 0.0;
120+
private double stdDev = 0.0;
121+
private long[] values = new long[0];
122+
private int size = 0;
123+
private final Map<Double, Double> quantiles = new HashMap<>();
124+
125+
@Override
126+
public void update(long value) {
127+
lastUpdateValue = value;
128+
updateCount++;
129+
}
130+
131+
@Override
132+
public long getCount() {
133+
return count;
134+
}
135+
136+
public void setCount(long count) {
137+
this.count = count;
138+
}
139+
140+
@Override
141+
public org.apache.fluss.metrics.HistogramStatistics getStatistics() {
142+
return new org.apache.fluss.metrics.HistogramStatistics() {
143+
@Override
144+
public double getQuantile(double quantile) {
145+
return quantiles.getOrDefault(quantile, quantile);
146+
}
147+
148+
@Override
149+
public long[] getValues() {
150+
return values;
151+
}
152+
153+
@Override
154+
public int size() {
155+
return size;
156+
}
157+
158+
@Override
159+
public double getMean() {
160+
return mean;
161+
}
162+
163+
@Override
164+
public double getStdDev() {
165+
return stdDev;
166+
}
167+
168+
@Override
169+
public long getMax() {
170+
return max;
171+
}
172+
173+
@Override
174+
public long getMin() {
175+
return min;
176+
}
177+
};
178+
}
179+
180+
public long getLastUpdateValue() {
181+
return lastUpdateValue;
182+
}
183+
184+
public int getUpdateCount() {
185+
return updateCount;
186+
}
187+
188+
public void setMin(long min) {
189+
this.min = min;
190+
}
191+
192+
public void setMax(long max) {
193+
this.max = max;
194+
}
195+
196+
public void setMean(double mean) {
197+
this.mean = mean;
198+
}
199+
200+
public void setStdDev(double stdDev) {
201+
this.stdDev = stdDev;
202+
}
203+
204+
public void setValues(long[] values) {
205+
this.values = values;
206+
}
207+
208+
public void setSize(int size) {
209+
this.size = size;
210+
}
211+
212+
public void setQuantile(double quantile, double value) {
213+
this.quantiles.put(quantile, value);
214+
}
215+
}
216+
}

0 commit comments

Comments
 (0)