Skip to content

Commit d745198

Browse files
authored
Change the Java sdk harness cache timeout for bundle processors to be an hour for streaming pipelines instead of 1 minute. (#33175)
* Change the cache timeout for bundle processors to be an hour for streaming pipelines instead of 1 minute. Use a hidden option so that it can be controlled further if desired.
1 parent aa21e4a commit d745198

File tree

3 files changed

+59
-34
lines changed

3 files changed

+59
-34
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2121

2222
import com.fasterxml.jackson.annotation.JsonCreator;
23+
import java.time.Duration;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
2526
import java.util.HashMap;
@@ -386,4 +387,21 @@ static List<Logger> getConfiguredLoggerFromOptions(SdkHarnessOptions loggingOpti
386387
}
387388
return configuredLoggers;
388389
}
390+
391+
@Hidden
392+
@Description(
393+
"Timeout used for cache of bundle processors. Defaults to a minute for batch and an hour for streaming.")
394+
@Default.InstanceFactory(BundleProcessorCacheTimeoutFactory.class)
395+
Duration getBundleProcessorCacheTimeout();
396+
397+
void setBundleProcessorCacheTimeout(Duration duration);
398+
399+
class BundleProcessorCacheTimeoutFactory implements DefaultValueFactory<Duration> {
400+
@Override
401+
public Duration create(PipelineOptions options) {
402+
return options.as(StreamingOptions.class).isStreaming()
403+
? Duration.ofHours(1)
404+
: Duration.ofMinutes(1);
405+
}
406+
}
389407
}

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.beam.sdk.metrics.MetricsEnvironment;
8484
import org.apache.beam.sdk.metrics.MetricsEnvironment.MetricsEnvironmentState;
8585
import org.apache.beam.sdk.options.PipelineOptions;
86+
import org.apache.beam.sdk.options.SdkHarnessOptions;
8687
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
8788
import org.apache.beam.sdk.util.WindowedValue;
8889
import org.apache.beam.sdk.util.common.ReflectHelpers;
@@ -188,7 +189,8 @@ public ProcessBundleHandler(
188189
executionStateSampler,
189190
REGISTERED_RUNNER_FACTORIES,
190191
processWideCache,
191-
new BundleProcessorCache(),
192+
new BundleProcessorCache(
193+
options.as(SdkHarnessOptions.class).getBundleProcessorCacheTimeout()),
192194
dataSampler);
193195
}
194196

@@ -927,25 +929,25 @@ public int hashCode() {
927929
return super.hashCode();
928930
}
929931

930-
BundleProcessorCache() {
931-
this.cachedBundleProcessors =
932+
BundleProcessorCache(Duration timeout) {
933+
CacheBuilder<String, ConcurrentLinkedQueue<ProcessBundleHandler.BundleProcessor>> builder =
932934
CacheBuilder.newBuilder()
933-
.expireAfterAccess(Duration.ofMinutes(1L))
934935
.removalListener(
935-
removalNotification -> {
936-
((ConcurrentLinkedQueue<BundleProcessor>) removalNotification.getValue())
937-
.forEach(
938-
bundleProcessor -> {
939-
bundleProcessor.shutdown();
940-
});
941-
})
942-
.build(
943-
new CacheLoader<String, ConcurrentLinkedQueue<BundleProcessor>>() {
944-
@Override
945-
public ConcurrentLinkedQueue<BundleProcessor> load(String s) throws Exception {
946-
return new ConcurrentLinkedQueue<>();
947-
}
948-
});
936+
removalNotification ->
937+
removalNotification
938+
.getValue()
939+
.forEach(bundleProcessor -> bundleProcessor.shutdown()));
940+
if (timeout.compareTo(Duration.ZERO) > 0) {
941+
builder = builder.expireAfterAccess(timeout);
942+
}
943+
this.cachedBundleProcessors =
944+
builder.build(
945+
new CacheLoader<String, ConcurrentLinkedQueue<BundleProcessor>>() {
946+
@Override
947+
public ConcurrentLinkedQueue<BundleProcessor> load(String s) throws Exception {
948+
return new ConcurrentLinkedQueue<>();
949+
}
950+
});
949951
// We specifically use a weak hash map so that references will automatically go out of scope
950952
// and not need to be freed explicitly from the cache.
951953
this.activeBundleProcessors = Collections.synchronizedMap(new WeakHashMap<>());

sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import static org.mockito.Mockito.when;
4949

5050
import java.io.IOException;
51+
import java.time.Duration;
5152
import java.util.ArrayList;
5253
import java.util.Collection;
5354
import java.util.Collections;
@@ -354,6 +355,10 @@ void reset() throws Exception {
354355

355356
private static class TestBundleProcessorCache extends BundleProcessorCache {
356357

358+
TestBundleProcessorCache() {
359+
super(Duration.ZERO);
360+
}
361+
357362
@Override
358363
BundleProcessor get(
359364
InstructionRequest processBundleRequest,
@@ -376,7 +381,7 @@ public void testTrySplitBeforeBundleDoesNotFail() {
376381
executionStateSampler,
377382
ImmutableMap.of(),
378383
Caches.noop(),
379-
new BundleProcessorCache(),
384+
new BundleProcessorCache(Duration.ZERO),
380385
null /* dataSampler */);
381386

382387
BeamFnApi.InstructionResponse response =
@@ -407,7 +412,7 @@ public void testProgressBeforeBundleDoesNotFail() throws Exception {
407412
executionStateSampler,
408413
ImmutableMap.of(),
409414
Caches.noop(),
410-
new BundleProcessorCache(),
415+
new BundleProcessorCache(Duration.ZERO),
411416
null /* dataSampler */);
412417

413418
handler.progress(
@@ -487,7 +492,7 @@ public void testOrderOfStartAndFinishCalls() throws Exception {
487492
DATA_INPUT_URN, startFinishRecorder,
488493
DATA_OUTPUT_URN, startFinishRecorder),
489494
Caches.noop(),
490-
new BundleProcessorCache(),
495+
new BundleProcessorCache(Duration.ZERO),
491496
null /* dataSampler */);
492497

493498
handler.processBundle(
@@ -592,7 +597,7 @@ public void testOrderOfSetupTeardownCalls() throws Exception {
592597
executionStateSampler,
593598
urnToPTransformRunnerFactoryMap,
594599
Caches.noop(),
595-
new BundleProcessorCache(),
600+
new BundleProcessorCache(Duration.ZERO),
596601
null /* dataSampler */);
597602

598603
handler.processBundle(
@@ -699,7 +704,7 @@ private static InstructionRequest processBundleRequestFor(
699704
public void testBundleProcessorIsFoundWhenActive() {
700705
BundleProcessor bundleProcessor = mock(BundleProcessor.class);
701706
when(bundleProcessor.getInstructionId()).thenReturn("known");
702-
BundleProcessorCache cache = new BundleProcessorCache();
707+
BundleProcessorCache cache = new BundleProcessorCache(Duration.ZERO);
703708

704709
// Check that an unknown bundle processor is not found
705710
assertNull(cache.find("unknown"));
@@ -811,7 +816,7 @@ public void testCreatingPTransformExceptionsArePropagated() throws Exception {
811816
throw new IllegalStateException("TestException");
812817
}),
813818
Caches.noop(),
814-
new BundleProcessorCache(),
819+
new BundleProcessorCache(Duration.ZERO),
815820
null /* dataSampler */);
816821
assertThrows(
817822
"TestException",
@@ -862,7 +867,7 @@ public void testBundleFinalizationIsPropagated() throws Exception {
862867
return null;
863868
}),
864869
Caches.noop(),
865-
new BundleProcessorCache(),
870+
new BundleProcessorCache(Duration.ZERO),
866871
null /* dataSampler */);
867872
BeamFnApi.InstructionResponse.Builder response =
868873
handler.processBundle(
@@ -916,7 +921,7 @@ public void testPTransformStartExceptionsArePropagated() {
916921
return null;
917922
}),
918923
Caches.noop(),
919-
new BundleProcessorCache(),
924+
new BundleProcessorCache(Duration.ZERO),
920925
null /* dataSampler */);
921926
assertThrows(
922927
"TestException",
@@ -1094,7 +1099,7 @@ public void onCompleted() {}
10941099
executionStateSampler,
10951100
urnToPTransformRunnerFactoryMap,
10961101
Caches.noop(),
1097-
new BundleProcessorCache(),
1102+
new BundleProcessorCache(Duration.ZERO),
10981103
null /* dataSampler */);
10991104
}
11001105

@@ -1427,7 +1432,7 @@ public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess() throws
14271432
return null;
14281433
}),
14291434
Caches.noop(),
1430-
new BundleProcessorCache(),
1435+
new BundleProcessorCache(Duration.ZERO),
14311436
null /* dataSampler */);
14321437
handler.processBundle(
14331438
BeamFnApi.InstructionRequest.newBuilder()
@@ -1500,7 +1505,7 @@ public void testDataProcessingExceptionsArePropagated() throws Exception {
15001505
return null;
15011506
}),
15021507
Caches.noop(),
1503-
new BundleProcessorCache(),
1508+
new BundleProcessorCache(Duration.ZERO),
15041509
null /* dataSampler */);
15051510
assertThrows(
15061511
"TestException",
@@ -1551,7 +1556,7 @@ public void testPTransformFinishExceptionsArePropagated() throws Exception {
15511556
return null;
15521557
}),
15531558
Caches.noop(),
1554-
new BundleProcessorCache(),
1559+
new BundleProcessorCache(Duration.ZERO),
15551560
null /* dataSampler */);
15561561
assertThrows(
15571562
"TestException",
@@ -1647,7 +1652,7 @@ private void doStateCalls(BeamFnStateClient beamFnStateClient) {
16471652
}
16481653
}),
16491654
Caches.noop(),
1650-
new BundleProcessorCache(),
1655+
new BundleProcessorCache(Duration.ZERO),
16511656
null /* dataSampler */);
16521657
handler.processBundle(
16531658
BeamFnApi.InstructionRequest.newBuilder()
@@ -1698,7 +1703,7 @@ private void doStateCalls(BeamFnStateClient beamFnStateClient) {
16981703
}
16991704
}),
17001705
Caches.noop(),
1701-
new BundleProcessorCache(),
1706+
new BundleProcessorCache(Duration.ZERO),
17021707
null /* dataSampler */);
17031708
assertThrows(
17041709
"State API calls are unsupported",
@@ -1787,7 +1792,7 @@ public void reset() {
17871792
return null;
17881793
};
17891794

1790-
BundleProcessorCache bundleProcessorCache = new BundleProcessorCache();
1795+
BundleProcessorCache bundleProcessorCache = new BundleProcessorCache(Duration.ZERO);
17911796
ProcessBundleHandler handler =
17921797
new ProcessBundleHandler(
17931798
PipelineOptionsFactory.create(),
@@ -1930,7 +1935,7 @@ public Object createRunnerForPTransform(Context context) throws IOException {
19301935
}
19311936
}),
19321937
Caches.noop(),
1933-
new BundleProcessorCache(),
1938+
new BundleProcessorCache(Duration.ZERO),
19341939
null /* dataSampler */);
19351940
assertThrows(
19361941
"Timers are unsupported",

0 commit comments

Comments
 (0)