Skip to content

Commit cab9169

Browse files
committed
Parse struct returned from Dataflow API to BoundedTrieData
1 parent 1de0d1f commit cab9169

File tree

3 files changed

+209
-9
lines changed

3 files changed

+209
-9
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public final String toString() {
311311
* intended to be used directly outside of {@link BoundedTrieData} with multiple threads.
312312
*/
313313
@VisibleForTesting
314-
static class BoundedTrieNode implements Serializable {
314+
public static class BoundedTrieNode implements Serializable {
315315

316316
public static final String TRUNCATED_TRUE = String.valueOf(true);
317317
public static final String TRUNCATED_FALSE = String.valueOf(false);
@@ -334,7 +334,7 @@ static class BoundedTrieNode implements Serializable {
334334
private int size;
335335

336336
/** Constructs an empty `BoundedTrieNode` with size 1 and not truncated. */
337-
BoundedTrieNode() {
337+
public BoundedTrieNode() {
338338
this(new HashMap<>(), false, 1);
339339
}
340340

@@ -345,7 +345,7 @@ static class BoundedTrieNode implements Serializable {
345345
* @param truncated Whether this node is truncated.
346346
* @param size The size of the subtree rooted at this node.
347347
*/
348-
BoundedTrieNode(@Nonnull Map<String, BoundedTrieNode> children, boolean truncated, int size) {
348+
public BoundedTrieNode(@Nonnull Map<String, BoundedTrieNode> children, boolean truncated, int size) {
349349
this.children = children;
350350
this.size = size;
351351
this.truncated = truncated;
@@ -561,7 +561,7 @@ boolean contains(List<String> segments) {
561561
*
562562
* @return The size of the subtree.
563563
*/
564-
int getSize() {
564+
public int getSize() {
565565
return size;
566566
}
567567

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.HashMap;
3030
import java.util.HashSet;
3131
import java.util.List;
32+
import java.util.Map;
3233
import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
3334
import org.apache.beam.model.pipeline.v1.RunnerApi;
3435
import org.apache.beam.runners.core.metrics.BoundedTrieData;
@@ -44,9 +45,11 @@
4445
import org.apache.beam.sdk.metrics.MetricsFilter;
4546
import org.apache.beam.sdk.metrics.StringSetResult;
4647
import org.apache.beam.sdk.runners.AppliedPTransform;
48+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
4749
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
4850
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
4951
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
52+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
5053
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
5154
import org.checkerframework.checker.nullness.qual.Nullable;
5255
import org.slf4j.Logger;
@@ -133,7 +136,8 @@ private JobMetrics getJobMetrics() throws IOException {
133136
return result;
134137
}
135138

136-
private static class DataflowMetricResultExtractor {
139+
@VisibleForTesting
140+
static class DataflowMetricResultExtractor {
137141
private final ImmutableList.Builder<MetricResult<Long>> counterResults;
138142
private final ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults;
139143
private final ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults;
@@ -206,12 +210,21 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
206210
}
207211

208212
private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) {
209-
if (metricUpdate.getTrie() == null) {
213+
BoundedTrieData trieData = null;
214+
Object trieFromResponse = metricUpdate.getTrie();
215+
// Fail-safely cast Trie returned by dataflow API to BoundedTrieResult
216+
if (trieFromResponse instanceof BoundedTrie) {
217+
BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie();
218+
trieData = BoundedTrieData.fromProto(bTrie);
219+
} else if (trieFromResponse instanceof com.google.protobuf.Struct) {
220+
trieData = trieFromStruct((com.google.protobuf.Struct) trieFromResponse);
221+
}
222+
223+
if (trieData != null) {
224+
return BoundedTrieResult.create(trieData.extractResult().getResult());
225+
} else {
210226
return BoundedTrieResult.empty();
211227
}
212-
BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie();
213-
BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie);
214-
return BoundedTrieResult.create(trieData.extractResult().getResult());
215228
}
216229

217230
private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
@@ -226,6 +239,68 @@ private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
226239
return DistributionResult.create(sum, count, min, max);
227240
}
228241

242+
/** Translate Struct proto returned by Dataflow API client to BoundedTrieData. */
243+
@VisibleForTesting
244+
@SuppressWarnings("ReferenceEquality") // compare with protobuf Struct default instance
245+
static BoundedTrieData trieFromStruct(com.google.protobuf.Struct responseProto) {
246+
Map<String, com.google.protobuf.Value> fieldsMap = responseProto.getFieldsMap();
247+
int bound = 0;
248+
List<String> singleton = null;
249+
com.google.protobuf.Value maybeBound = fieldsMap.get("bound");
250+
if (maybeBound != null) {
251+
bound = (int) maybeBound.getNumberValue();
252+
}
253+
com.google.protobuf.Value maybeSingleton = fieldsMap.get("singleton");
254+
if (maybeSingleton != null) {
255+
List<com.google.protobuf.Value> valueList = maybeSingleton.getListValue().getValuesList();
256+
ImmutableList.Builder<String> builder = ImmutableList.builder();
257+
for (com.google.protobuf.Value stringValue : valueList) {
258+
builder.add(stringValue.getStringValue());
259+
}
260+
singleton = builder.build();
261+
}
262+
com.google.protobuf.Value maybeRoot = fieldsMap.get("root");
263+
BoundedTrieData.BoundedTrieNode root = null;
264+
if (maybeRoot != null
265+
&& maybeRoot.getStructValue() != com.google.protobuf.Struct.getDefaultInstance()) {
266+
root = trieNodeFromStruct(maybeRoot.getStructValue());
267+
}
268+
return new BoundedTrieData(singleton, root, bound);
269+
}
270+
271+
/**
272+
* Translate Struct proto returned by Dataflow API client to BoundedTrieData.BoundedTrieNode.
273+
*/
274+
@SuppressWarnings("ReferenceEquality") // compare with protobuf Struct default instance
275+
private static BoundedTrieData.BoundedTrieNode trieNodeFromStruct(
276+
com.google.protobuf.Struct responseProto) {
277+
Map<String, com.google.protobuf.Value> fieldsMap = responseProto.getFieldsMap();
278+
boolean truncated = false;
279+
com.google.protobuf.Value mayTruncated = fieldsMap.get("truncated");
280+
if (mayTruncated != null) {
281+
truncated = mayTruncated.getBoolValue();
282+
}
283+
int childrenSize = 0;
284+
ImmutableMap.Builder<String, BoundedTrieData.BoundedTrieNode> builder =
285+
ImmutableMap.builder();
286+
com.google.protobuf.Value maybeChildren = fieldsMap.get("children");
287+
if (maybeChildren != null) {
288+
Map<String, com.google.protobuf.Value> allChildren =
289+
maybeChildren.getStructValue().getFieldsMap();
290+
for (Map.Entry<String, com.google.protobuf.Value> childValue : allChildren.entrySet()) {
291+
com.google.protobuf.Struct maybeChild = childValue.getValue().getStructValue();
292+
if (maybeChild != com.google.protobuf.Struct.getDefaultInstance()) {
293+
BoundedTrieData.BoundedTrieNode child =
294+
trieNodeFromStruct(childValue.getValue().getStructValue());
295+
builder.put(childValue.getKey(), child);
296+
childrenSize += child.getSize();
297+
}
298+
}
299+
}
300+
Map<String, BoundedTrieData.BoundedTrieNode> children = builder.build();
301+
return new BoundedTrieData.BoundedTrieNode(children, truncated, Math.max(1, childrenSize));
302+
}
303+
229304
public Iterable<MetricResult<DistributionResult>> getDistributionResults() {
230305
return distributionResults.build();
231306
}

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.hamcrest.Matchers.containsString;
2626
import static org.hamcrest.Matchers.empty;
2727
import static org.hamcrest.Matchers.is;
28+
import static org.junit.Assert.assertEquals;
2829
import static org.junit.Assert.assertThrows;
2930
import static org.junit.Assert.fail;
3031
import static org.mockito.Mockito.mock;
@@ -38,6 +39,9 @@
3839
import com.google.api.services.dataflow.model.JobMetrics;
3940
import com.google.api.services.dataflow.model.MetricStructuredName;
4041
import com.google.api.services.dataflow.model.MetricUpdate;
42+
import com.google.protobuf.Struct;
43+
import com.google.protobuf.TextFormat;
44+
import com.google.protobuf.TextFormat.ParseException;
4145
import java.io.IOException;
4246
import java.math.BigDecimal;
4347
import java.util.Set;
@@ -296,6 +300,127 @@ public void testSingleStringSetUpdates() throws IOException {
296300
StringSetResult.create(ImmutableSet.of("ab", "cd")))));
297301
}
298302

303+
@Test
304+
public void testSingletonBoundedTrieFromMessage() throws ParseException {
305+
String textProto =
306+
" fields {\n"
307+
+ " key: \"bound\"\n"
308+
+ " value {\n"
309+
+ " number_value: 100\n"
310+
+ " }\n"
311+
+ " }\n"
312+
+ " fields {\n"
313+
+ " key: \"singleton\"\n"
314+
+ " value {\n"
315+
+ " list_value {\n"
316+
+ " values {\n"
317+
+ " string_value: \"pubsub:\"\n"
318+
+ " }\n"
319+
+ " values {\n"
320+
+ " string_value: \"topic:\"\n"
321+
+ " }\n"
322+
+ " values {\n"
323+
+ " string_value: \"`google.com:abc`.\"\n"
324+
+ " }\n"
325+
+ " values {\n"
326+
+ " string_value: \"some-topic\"\n"
327+
+ " }\n"
328+
+ " }\n"
329+
+ " }\n"
330+
+ " }";
331+
Struct response = TextFormat.parse(textProto, Struct.class);
332+
BoundedTrieData result = DataflowMetrics.DataflowMetricResultExtractor.trieFromStruct(response);
333+
assertEquals(
334+
"BoundedTrieData({'pubsub:topic:`google.com:abc`.some-topicfalse'})", result.toString());
335+
}
336+
337+
@Test
338+
public void testNestedBoundedTrieFromMessage() throws ParseException {
339+
String textProto =
340+
"fields {\n"
341+
+ " key: \"bound\"\n"
342+
+ " value {\n"
343+
+ " number_value: 100\n"
344+
+ " }\n"
345+
+ "}\n"
346+
+ "fields {\n"
347+
+ " key: \"root\"\n"
348+
+ " value {\n"
349+
+ " struct_value {\n"
350+
+ " fields {\n"
351+
+ " key: \"children\"\n"
352+
+ " value {\n"
353+
+ " struct_value {\n"
354+
+ " fields {\n"
355+
+ " key: \"gcs:\"\n"
356+
+ " value {\n"
357+
+ " struct_value {\n"
358+
+ " fields {\n"
359+
+ " key: \"children\"\n"
360+
+ " value {\n"
361+
+ " struct_value {\n"
362+
+ " fields {\n"
363+
+ " key: \"some-bucket.\"\n"
364+
+ " value {\n"
365+
+ " struct_value {\n"
366+
+ " fields {\n"
367+
+ " key: \"children\"\n"
368+
+ " value {\n"
369+
+ " struct_value {\n"
370+
+ " fields {\n"
371+
+ " key: \"some-folder/\"\n"
372+
+ " value {\n"
373+
+ " struct_value {\n"
374+
+ " fields {\n"
375+
+ " key: \"truncated\"\n"
376+
+ " value {\n"
377+
+ " bool_value: true\n"
378+
+ " }\n"
379+
+ " }\n"
380+
+ " }\n"
381+
+ " }\n"
382+
+ " }\n"
383+
+ " }\n"
384+
+ " }\n"
385+
+ " }\n"
386+
+ " fields {\n"
387+
+ " key: \"truncated\"\n"
388+
+ " value {\n"
389+
+ " bool_value: false\n"
390+
+ " }\n"
391+
+ " }\n"
392+
+ " }\n"
393+
+ " }\n"
394+
+ " }\n"
395+
+ " }\n"
396+
+ " }\n"
397+
+ " }\n"
398+
+ " fields {\n"
399+
+ " key: \"truncated\"\n"
400+
+ " value {\n"
401+
+ " bool_value: false\n"
402+
+ " }\n"
403+
+ " }\n"
404+
+ " }\n"
405+
+ " }\n"
406+
+ " }\n"
407+
+ " }\n"
408+
+ " }\n"
409+
+ " }\n"
410+
+ " fields {\n"
411+
+ " key: \"truncated\"\n"
412+
+ " value {\n"
413+
+ " bool_value: false\n"
414+
+ " }\n"
415+
+ " }\n"
416+
+ " }\n"
417+
+ " }\n"
418+
+ "}";
419+
Struct response = TextFormat.parse(textProto, Struct.class);
420+
BoundedTrieData result = DataflowMetrics.DataflowMetricResultExtractor.trieFromStruct(response);
421+
assertEquals("BoundedTrieData({'gcs:some-bucket.some-folder/true'})", result.toString());
422+
}
423+
299424
@Test
300425
public void testSingleBoundedTrieUpdates() throws IOException {
301426
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);

0 commit comments

Comments
 (0)