Skip to content

Commit e9a2431

Browse files
fix(gofeatureflag): issue when using inProcess with high concurency (#1781)
Signed-off-by: Thomas Poignant <thomas.poignant@gofeatureflag.org>
1 parent 733c2a3 commit e9a2431

6 files changed

Lines changed: 180 additions & 41 deletions

File tree

providers/go-feature-flag/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ You can configure the provider with several options to customize its behavior. T
8282
| **`exporterMetadata`** | `false` | exporterMetadata is the metadata we send to the GO Feature Flag relay proxy when we report the evaluation data usage. |
8383
| **`evaluationFlagList`** | `false` | If you are using in process evaluation, by default we will load in memory all the flags available in the relay proxy. If you want to limit the number of flags loaded in memory, you can use this parameter. By setting this parameter, you will only load the flags available in the list. <p>If null or empty, all the flags available in the relay proxy will be loaded.</p> |
8484
| **`flagChangePollingIntervalMs`** | `false` | interval time we poll the proxy to check if the configuration has changed. It is used for the in process evaluation to check if we should refresh our internal cache. default: `120000` |
85+
| **`wasmEvaluatorPoolSize`** | `false` | _(IN_PROCESS only)_ Number of WASM instances kept in the evaluation pool. Each instance owns independent memory, allowing fully concurrent flag evaluations without serialisation. Must be `>= 1`. _(default: number of available CPU cores)_ |
8586

8687
### Evaluate a feature flag
8788
The OpenFeature client is used to retrieve values for the current `EvaluationContext`. For example, retrieving a boolean value for the flag **"my-flag"**:

providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ public class GoFeatureFlagProviderOptions {
9191
*/
9292
private Long flagChangePollingIntervalMs;
9393

94+
/**
95+
* (optional) Number of WASM instances kept in the evaluation pool for in-process evaluation.
96+
* Each instance owns independent WASM linear memory, allowing fully concurrent evaluations
97+
* without serialisation. Must be &gt;= 1 when set explicitly.
98+
* Default: number of available CPU cores.
99+
*/
100+
private Integer wasmEvaluatorPoolSize;
101+
94102
/**
95103
* Validate the options provided to the provider.
96104
*
@@ -107,6 +115,10 @@ public void validate() throws InvalidOptions {
107115
throw new InvalidEndpoint("malformed endpoint: " + getEndpoint());
108116
}
109117

118+
if (getWasmEvaluatorPoolSize() != null && getWasmEvaluatorPoolSize() < 1) {
119+
throw new InvalidOptions("wasmEvaluatorPoolSize must be at least 1");
120+
}
121+
110122
if (getExporterMetadata() != null) {
111123
val acceptableExporterMetadataTypes = List.of("String", "Boolean", "Integer", "Double");
112124
for (Map.Entry<String, Object> entry : getExporterMetadata().entrySet()) {

providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import dev.openfeature.contrib.providers.gofeatureflag.bean.FlagConfigResponse;
77
import dev.openfeature.contrib.providers.gofeatureflag.bean.GoFeatureFlagResponse;
88
import dev.openfeature.contrib.providers.gofeatureflag.util.Const;
9-
import dev.openfeature.contrib.providers.gofeatureflag.wasm.EvaluationWasm;
9+
import dev.openfeature.contrib.providers.gofeatureflag.wasm.WasmEvaluatorPool;
1010
import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.FlagContext;
1111
import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.WasmInput;
1212
import dev.openfeature.sdk.ErrorCode;
@@ -16,7 +16,6 @@
1616
import io.reactivex.rxjava3.core.Observable;
1717
import io.reactivex.rxjava3.disposables.Disposable;
1818
import io.reactivex.rxjava3.schedulers.Schedulers;
19-
import io.reactivex.rxjava3.subjects.PublishSubject;
2019
import java.util.ArrayList;
2120
import java.util.Collections;
2221
import java.util.Date;
@@ -35,23 +34,35 @@
3534
public class InProcessEvaluator implements IEvaluator {
3635
/** API to contact GO Feature Flag. */
3736
private final GoFeatureFlagApi api;
38-
/** WASM evaluation engine. */
39-
private final EvaluationWasm evaluationEngine;
37+
/** Pool of WASM evaluation engine instances for thread-safe concurrent evaluation. */
38+
private final WasmEvaluatorPool evaluationPool;
4039
/** Options to configure the provider. */
4140
private final GoFeatureFlagProviderOptions options;
4241
/** Method to call when we have a configuration change. */
4342
private final Consumer<ProviderEventDetails> emitProviderConfigurationChanged;
44-
/** Local copy of the flags' configuration. */
45-
private Map<String, Flag> flags;
46-
/** Evaluation context enrichment. */
47-
private Map<String, Object> evaluationContextEnrichment;
48-
/** Last hash of the flags' configuration. */
49-
private String etag;
50-
/** Last update of the flags' configuration. */
51-
private Date lastUpdate;
43+
/** Immutable snapshot of all flag configuration state; updated atomically by the polling daemon. */
44+
private volatile EvaluatorState state;
5245
/** disposable which manage the polling of the flag configurations. */
5346
private Disposable configurationDisposable;
5447

48+
private static final class EvaluatorState {
49+
final Map<String, Flag> flags;
50+
final Map<String, Object> evaluationContextEnrichment;
51+
final String etag;
52+
final Date lastUpdate;
53+
54+
EvaluatorState(
55+
Map<String, Flag> flags,
56+
Map<String, Object> evaluationContextEnrichment,
57+
String etag,
58+
Date lastUpdate) {
59+
this.flags = flags;
60+
this.evaluationContextEnrichment = evaluationContextEnrichment;
61+
this.etag = etag;
62+
this.lastUpdate = lastUpdate;
63+
}
64+
}
65+
5566
/**
5667
* Constructor of the InProcessEvaluator.
5768
*
@@ -64,17 +75,19 @@ public InProcessEvaluator(
6475
GoFeatureFlagProviderOptions options,
6576
Consumer<ProviderEventDetails> emitProviderConfigurationChanged) {
6677
this.api = api;
67-
this.flags = Collections.emptyMap();
68-
this.etag = "";
6978
this.options = options;
70-
this.lastUpdate = new Date(0);
7179
this.emitProviderConfigurationChanged = emitProviderConfigurationChanged;
72-
this.evaluationEngine = new EvaluationWasm();
80+
this.state = new EvaluatorState(Collections.emptyMap(), null, "", new Date(0));
81+
int poolSize = options.getWasmEvaluatorPoolSize() != null
82+
? options.getWasmEvaluatorPoolSize()
83+
: Const.DEFAULT_WASM_EVALUATOR_POOL_SIZE;
84+
this.evaluationPool = new WasmEvaluatorPool(poolSize);
7385
}
7486

7587
@Override
7688
public GoFeatureFlagResponse evaluate(String key, Object defaultValue, EvaluationContext evaluationContext) {
77-
if (this.flags.get(key) == null) {
89+
EvaluatorState current = this.state;
90+
if (current.flags.get(key) == null) {
7891
val err = new GoFeatureFlagResponse();
7992
err.setReason(Reason.ERROR.name());
8093
err.setErrorCode(ErrorCode.FLAG_NOT_FOUND.name());
@@ -85,30 +98,29 @@ public GoFeatureFlagResponse evaluate(String key, Object defaultValue, Evaluatio
8598
val wasmInput = WasmInput.builder()
8699
.flagContext(FlagContext.builder()
87100
.defaultSdkValue(defaultValue)
88-
.evaluationContextEnrichment(this.evaluationContextEnrichment)
101+
.evaluationContextEnrichment(current.evaluationContextEnrichment)
89102
.build())
90103
.evalContext(evaluationContext.asObjectMap())
91-
.flag(this.flags.get(key))
104+
.flag(current.flags.get(key))
92105
.flagKey(key)
93106
.build();
94-
return this.evaluationEngine.evaluate(wasmInput);
107+
return this.evaluationPool.evaluate(wasmInput);
95108
}
96109

97110
@Override
98111
public boolean isFlagTrackable(final String flagKey) {
99-
Flag flag = this.flags.get(flagKey);
112+
Flag flag = this.state.flags.get(flagKey);
100113
return flag != null && (flag.getTrackEvents() == null || flag.getTrackEvents());
101114
}
102115

103116
@Override
104117
public void init() {
105-
val configFlags = api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList());
106-
this.flags = configFlags.getFlags();
107-
this.etag = configFlags.getEtag();
108-
this.lastUpdate = configFlags.getLastUpdated();
109-
this.evaluationContextEnrichment = configFlags.getEvaluationContextEnrichment();
110-
// We call the WASM engine to avoid a cold start at the 1st evaluation
111-
this.evaluationEngine.preWarmWasm();
118+
val configFlags = api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList());
119+
this.state = new EvaluatorState(
120+
configFlags.getFlags(),
121+
configFlags.getEvaluationContextEnrichment(),
122+
configFlags.getEtag(),
123+
configFlags.getLastUpdated());
112124

113125
// start the polling of the flag configuration
114126
this.configurationDisposable = startCheckFlagConfigurationChangesDaemon();
@@ -131,13 +143,11 @@ private Disposable startCheckFlagConfigurationChangesDaemon() {
131143
? options.getFlagChangePollingIntervalMs()
132144
: Const.DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS;
133145

134-
PublishSubject<Object> stopSignal = PublishSubject.create();
135-
Observable<Long> intervalObservable = Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS);
146+
Observable<Long> intervalObservable =
147+
Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS, Schedulers.io());
136148
Observable<FlagConfigResponse> apiCallObservable = intervalObservable
137-
// as soon something is published in stopSignal, the interval will stop
138-
.takeUntil(stopSignal)
139-
.flatMap(tick -> Observable.fromCallable(
140-
() -> this.api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList()))
149+
.flatMap(tick -> Observable.fromCallable(() ->
150+
this.api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList()))
141151
.onErrorResumeNext(e -> {
142152
log.error("error while calling flag configuration API", e);
143153
return Observable.empty();
@@ -146,22 +156,24 @@ private Disposable startCheckFlagConfigurationChangesDaemon() {
146156

147157
return apiCallObservable.subscribe(
148158
response -> {
149-
if (response.getEtag().equals(this.etag)) {
159+
EvaluatorState current = this.state;
160+
if (response.getEtag().equals(current.etag)) {
150161
log.debug("flag configuration has not changed: {}", response);
151162
return;
152163
}
153164

154-
if (response.getLastUpdated().before(this.lastUpdate)) {
165+
if (response.getLastUpdated().before(current.lastUpdate)) {
155166
log.info("configuration received is older than the current one");
156167
return;
157168
}
158169

159170
log.info("flag configuration has changed");
160-
this.etag = response.getEtag();
161-
this.lastUpdate = response.getLastUpdated();
162-
val flagChanges = findFlagConfigurationChanges(this.flags, response.getFlags());
163-
this.flags = response.getFlags();
164-
this.evaluationContextEnrichment = response.getEvaluationContextEnrichment();
171+
val flagChanges = findFlagConfigurationChanges(current.flags, response.getFlags());
172+
this.state = new EvaluatorState(
173+
response.getFlags(),
174+
response.getEvaluationContextEnrichment(),
175+
response.getEtag(),
176+
response.getLastUpdated());
165177
val changeDetails = ProviderEventDetails.builder()
166178
.flagsChanged(flagChanges)
167179
.message("flag configuration has changed")

providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public class Const {
2121
public static final long DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS = 2L * 60L * 1000L;
2222
public static final long DEFAULT_FLUSH_INTERVAL_MS = Duration.ofMinutes(1).toMillis();
2323
public static final int DEFAULT_MAX_PENDING_EVENTS = 10000;
24+
public static final int DEFAULT_WASM_EVALUATOR_POOL_SIZE =
25+
Runtime.getRuntime().availableProcessors();
2426
// MAPPERS
2527
public static final ObjectMapper DESERIALIZE_OBJECT_MAPPER =
2628
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package dev.openfeature.contrib.providers.gofeatureflag.wasm;
2+
3+
import dev.openfeature.contrib.providers.gofeatureflag.bean.GoFeatureFlagResponse;
4+
import dev.openfeature.contrib.providers.gofeatureflag.exception.WasmFileNotFound;
5+
import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.WasmInput;
6+
import dev.openfeature.sdk.ErrorCode;
7+
import dev.openfeature.sdk.Reason;
8+
import java.util.concurrent.ArrayBlockingQueue;
9+
import java.util.concurrent.BlockingQueue;
10+
import lombok.extern.slf4j.Slf4j;
11+
12+
/**
13+
* WasmEvaluatorPool manages a fixed pool of EvaluationWasm instances.
14+
* Each instance owns independent WASM linear memory, allowing concurrent
15+
* evaluate() calls without interleaving memory operations.
16+
*/
17+
@Slf4j
18+
public final class WasmEvaluatorPool {
19+
private final BlockingQueue<EvaluationWasm> pool;
20+
21+
/**
22+
* Creates a pool of {@code size} independent EvaluationWasm instances.
23+
* All instances are allocated eagerly so that first-call latency is
24+
* absorbed at provider initialisation time.
25+
*
26+
* @param size number of WASM instances; must be >= 1
27+
* @throws WasmFileNotFound if the embedded WASM module cannot be loaded
28+
*/
29+
public WasmEvaluatorPool(int size) throws WasmFileNotFound {
30+
this.pool = new ArrayBlockingQueue<>(size);
31+
for (int i = 0; i < size; i++) {
32+
EvaluationWasm instance = new EvaluationWasm();
33+
instance.preWarmWasm();
34+
pool.add(instance);
35+
}
36+
}
37+
38+
/**
39+
* Evaluates a feature flag by borrowing one WASM instance from the pool,
40+
* delegating to it, and returning it when done.
41+
* Blocks if all instances are busy until one becomes available.
42+
*
43+
* @param wasmInput evaluation input
44+
* @return evaluation result
45+
*/
46+
public GoFeatureFlagResponse evaluate(WasmInput wasmInput) {
47+
EvaluationWasm instance;
48+
try {
49+
instance = pool.take();
50+
} catch (InterruptedException e) {
51+
Thread.currentThread().interrupt();
52+
GoFeatureFlagResponse err = new GoFeatureFlagResponse();
53+
err.setErrorCode(ErrorCode.GENERAL.name());
54+
err.setReason(Reason.ERROR.name());
55+
err.setErrorDetails("WASM evaluator pool interrupted while waiting for an available instance");
56+
return err;
57+
}
58+
try {
59+
return instance.evaluate(wasmInput);
60+
} finally {
61+
if (!pool.offer(instance)) {
62+
log.error("Failed to return WASM instance to pool — instance leaked, pool capacity may be compromised");
63+
}
64+
}
65+
}
66+
}

providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
3133
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicInteger;
3235
import lombok.SneakyThrows;
3336
import lombok.extern.slf4j.Slf4j;
3437
import lombok.val;
@@ -596,6 +599,49 @@ void shouldNotApplyAScheduledRolloutStepIfTheDateIsInTheFuture() {
596599
assertEquals(want, got);
597600
}
598601
}
602+
603+
@DisplayName("Should evaluate flags correctly under concurrent access")
604+
@SneakyThrows
605+
@Test
606+
void shouldEvaluateFlagsCorrectlyUnderConcurrentAccess() {
607+
GoFeatureFlagProvider provider = new GoFeatureFlagProvider(GoFeatureFlagProviderOptions.builder()
608+
.endpoint(baseUrl.toString())
609+
.evaluationType(EvaluationType.IN_PROCESS)
610+
.flagChangePollingIntervalMs(999999L)
611+
.build());
612+
OpenFeatureAPI.getInstance().setProviderAndWait(testName, provider);
613+
val client = OpenFeatureAPI.getInstance().getClient(testName);
614+
615+
int threadCount = 20;
616+
int evaluationsPerThread = 100;
617+
AtomicInteger errorCount = new AtomicInteger(0);
618+
CountDownLatch startGate = new CountDownLatch(1);
619+
CountDownLatch doneLatch = new CountDownLatch(threadCount);
620+
621+
for (int t = 0; t < threadCount; t++) {
622+
new Thread(() -> {
623+
try {
624+
startGate.await();
625+
for (int i = 0; i < evaluationsPerThread; i++) {
626+
FlagEvaluationDetails<Boolean> result = client.getBooleanDetails(
627+
"bool_targeting_match", false, TestUtils.defaultEvaluationContext);
628+
if (result.getErrorCode() != null) {
629+
errorCount.incrementAndGet();
630+
}
631+
}
632+
} catch (InterruptedException e) {
633+
Thread.currentThread().interrupt();
634+
} finally {
635+
doneLatch.countDown();
636+
}
637+
})
638+
.start();
639+
}
640+
641+
startGate.countDown();
642+
assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Threads did not finish in time");
643+
assertEquals(0, errorCount.get(), "Concurrent evaluations produced errors");
644+
}
599645
}
600646

601647
@Nested

0 commit comments

Comments
 (0)