Skip to content

Commit

Permalink
Skip BoundedTrie on Dataflow till service is have BoundedTrie (#33921) (
Browse files Browse the repository at this point in the history
#33947)

* Temporarily stop publishing BoundedTrie metrics till it is supported in Dataflow.

* Exclude UsesBoundedTrieMetrics from Dataflow runner

* Fix failing test

* Trivial change to postcommit to trigger it
  • Loading branch information
rohitsinha54 authored Feb 11, 2025
1 parent c5b4db0 commit a9fcaaf
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test"
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/33921": "noting that PR #33921 should run this test"
}
3 changes: 2 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics',
]

def commonRunnerV2ExcludeCategories = [
'org.apache.beam.sdk.testing.UsesExternalService',
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesStringSetMetrics',
'org.apache.beam.sdk.testing.UsesSetState',
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesMultimapState',
Expand All @@ -205,6 +205,7 @@ def commonRunnerV2ExcludeCategories = [
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime',
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics',
]

// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.InMemoryStateInternals;
Expand Down Expand Up @@ -77,14 +78,18 @@ public class BatchModeExecutionContext
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";

// TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries.
private final boolean populateBoundedTrieMetrics;

private BatchModeExecutionContext(
CounterFactory counterFactory,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
PipelineOptions options,
DataflowExecutionStateTracker executionStateTracker,
DataflowExecutionStateRegistry executionStateRegistry) {
DataflowExecutionStateRegistry executionStateRegistry,
boolean populateBoundedTrieMetrics) {
super(
counterFactory,
createMetricsContainerRegistry(),
Expand All @@ -97,6 +102,7 @@ private BatchModeExecutionContext(
this.dataCache = dataCache;
this.containerRegistry =
(MetricsContainerRegistry<MetricsContainerImpl>) getMetricsContainerRegistry();
this.populateBoundedTrieMetrics = populateBoundedTrieMetrics;
}

private static MetricsContainerRegistry<MetricsContainerImpl> createMetricsContainerRegistry() {
Expand Down Expand Up @@ -132,7 +138,8 @@ public static BatchModeExecutionContext forTesting(
counterFactory,
options,
"test-work-item-id"),
stateRegistry);
stateRegistry,
true);
}

public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) {
Expand Down Expand Up @@ -245,7 +252,8 @@ public static BatchModeExecutionContext create(
counterFactory,
options,
workItemId),
executionStateRegistry);
executionStateRegistry,
false);
}

/** Create a new {@link StepContext}. */
Expand Down Expand Up @@ -520,7 +528,10 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
update ->
MetricsToCounterUpdateConverter.fromStringSet(
update.getKey(), true, update.getUpdate())),
FluentIterable.from(updates.boundedTrieUpdates())
FluentIterable.from(
populateBoundedTrieMetrics
? updates.boundedTrieUpdates()
: Collections.emptyList())
.transform(
update ->
MetricsToCounterUpdateConverter.fromBoundedTrie(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -99,6 +100,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private final Clock clock;

// TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries.
@VisibleForTesting boolean populateBoundedTrieMetrics;

private StreamingStepMetricsContainer(String stepName) {
this.stepName = stepName;
this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -219,7 +223,7 @@ public Iterable<CounterUpdate> extractUpdates() {
.append(distributionUpdates())
.append(gaugeUpdates())
.append(stringSetUpdates())
.append(boundedTrieUpdates());
.append(populateBoundedTrieMetrics ? boundedTrieUpdates() : Collections.emptyList());
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public void testBoundedTrieUpdateExtraction() {
.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));

((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true;
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));

Expand Down Expand Up @@ -385,6 +386,7 @@ public void testBoundedTrieUpdateExtraction() {
.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto()));

((StreamingStepMetricsContainer) c2).populateBoundedTrieMetrics = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name2Update));

Expand All @@ -396,6 +398,7 @@ public void testBoundedTrieUpdateExtraction() {
name1Update.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));

((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));
}
Expand Down

0 comments on commit a9fcaaf

Please sign in to comment.