Skip to content

Commit 7018eea

Browse files
leakonvalinkaguidobreitoddbaert
authored
feat(flagd): introduce fatalStatusCodes option (#1624)
Signed-off-by: lea konvalinka <lea.konvalinka@dynatrace.com> Signed-off-by: Konvalinka <lea.konvalinka@dynatrace.com> Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com> Signed-off-by: Todd Baert <todd.baert@dynatrace.com> Co-authored-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com> Co-authored-by: Todd Baert <todd.baert@dynatrace.com>
1 parent 7146dea commit 7018eea

20 files changed

Lines changed: 444 additions & 237 deletions

File tree

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package dev.openfeature.contrib.providers.flagd;
22

33
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType;
4+
import java.util.Arrays;
5+
import java.util.List;
46
import java.util.function.Function;
7+
import java.util.stream.Collectors;
58
import lombok.extern.slf4j.Slf4j;
69

710
/** Helper class to hold configuration default values. */
@@ -37,6 +40,7 @@ public final class Config {
3740
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
3841
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
3942
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
43+
static final String FATAL_STATUS_CODES_ENV_VAR_NAME = "FLAGD_FATAL_STATUS_CODES";
4044
/**
4145
* Environment variable to fetch Provider id.
4246
*
@@ -93,6 +97,18 @@ static long fallBackToEnvOrDefault(String key, long defaultValue) {
9397
}
9498
}
9599

100+
static List<String> fallBackToEnvOrDefaultList(String key, List<String> defaultValue) {
101+
try {
102+
return System.getenv(key) != null
103+
? Arrays.stream(System.getenv(key).split(","))
104+
.map(String::trim)
105+
.collect(Collectors.toList())
106+
: defaultValue;
107+
} catch (Exception e) {
108+
return defaultValue;
109+
}
110+
}
111+
96112
static Resolver fromValueProvider(Function<String, String> provider) {
97113
final String resolverVar = provider.apply(RESOLVER_ENV_VAR);
98114
if (resolverVar == null) {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.openfeature.contrib.providers.flagd;
22

33
import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault;
4+
import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefaultList;
45
import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider;
56

67
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
@@ -122,6 +123,15 @@ public class FlagdOptions {
122123
@Builder.Default
123124
private int retryGracePeriod =
124125
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
126+
127+
/**
128+
* List of grpc response status codes for which the provider transitions into fatal state upon first connection.
129+
* Defaults to empty list
130+
*/
131+
@Builder.Default
132+
private List<String> fatalStatusCodes =
133+
fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of());
134+
125135
/**
126136
* Selector to be used with flag sync gRPC contract.
127137
*

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package dev.openfeature.contrib.providers.flagd;
22

33
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
4-
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
54
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
65
import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver;
76
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache;
7+
import dev.openfeature.sdk.ErrorCode;
88
import dev.openfeature.sdk.EvaluationContext;
99
import dev.openfeature.sdk.EventProvider;
1010
import dev.openfeature.sdk.Hook;
@@ -192,8 +192,9 @@ EvaluationContext getEnrichedContext() {
192192
}
193193

194194
@SuppressWarnings("checkstyle:fallthrough")
195-
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
196-
log.debug("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
195+
private void onProviderEvent(
196+
ProviderEvent providerEvent, ProviderEventDetails providerEventDetails, Structure syncMetadata) {
197+
log.debug("FlagdProviderEvent event {} ", providerEvent);
197198
synchronized (syncResources) {
198199
/*
199200
* We only use Error and Ready as previous states.
@@ -204,10 +205,10 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
204205
* forward a configuration changed to the ready, if we are not in the ready
205206
* state.
206207
*/
207-
switch (flagdProviderEvent.getEvent()) {
208+
switch (providerEvent) {
208209
case PROVIDER_CONFIGURATION_CHANGED:
209210
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_READY) {
210-
onConfigurationChanged(flagdProviderEvent);
211+
emit(providerEvent, providerEventDetails);
211212
break;
212213
}
213214
// intentional fall through
@@ -216,33 +217,30 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
216217
* Sync metadata is used to enrich the context, and is immutable in flagd,
217218
* so we only need it to be fetched once at READY.
218219
*/
219-
if (flagdProviderEvent.getSyncMetadata() != null) {
220-
syncResources.setEnrichedContext(contextEnricher.apply(flagdProviderEvent.getSyncMetadata()));
220+
if (syncMetadata != null) {
221+
syncResources.setEnrichedContext(contextEnricher.apply(syncMetadata));
221222
}
222223
onReady();
223224
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
224225
break;
225-
226226
case PROVIDER_ERROR:
227+
if (providerEventDetails != null
228+
&& providerEventDetails.getErrorCode() == ErrorCode.PROVIDER_FATAL) {
229+
onFatal();
230+
break;
231+
}
232+
227233
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
228234
onError();
229235
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
230236
}
231237
break;
232-
233238
default:
234-
log.warn("Unknown event {}", flagdProviderEvent.getEvent());
239+
log.warn("Unknown event {}", providerEvent);
235240
}
236241
}
237242
}
238243

239-
private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
240-
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
241-
.flagsChanged(flagdProviderEvent.getFlagsChanged())
242-
.message("configuration changed")
243-
.build());
244-
}
245-
246244
private void onReady() {
247245
if (syncResources.initialize()) {
248246
log.info("Initialized FlagdProvider");
@@ -284,4 +282,17 @@ private void onError() {
284282
TimeUnit.SECONDS);
285283
}
286284
}
285+
286+
private void onFatal() {
287+
if (errorTask != null && !errorTask.isCancelled()) {
288+
errorTask.cancel(false);
289+
}
290+
this.syncResources.setFatal(true);
291+
292+
this.emitProviderError(ProviderEventDetails.builder()
293+
.errorCode(ErrorCode.PROVIDER_FATAL)
294+
.build());
295+
296+
shutdown();
297+
}
287298
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import dev.openfeature.sdk.EvaluationContext;
44
import dev.openfeature.sdk.ImmutableContext;
55
import dev.openfeature.sdk.ProviderEvent;
6+
import dev.openfeature.sdk.exceptions.FatalError;
67
import dev.openfeature.sdk.exceptions.GeneralError;
78
import lombok.Getter;
89
import lombok.Setter;
@@ -16,8 +17,11 @@ class FlagdProviderSyncResources {
1617
@Setter
1718
private volatile ProviderEvent previousEvent = null;
1819

20+
@Setter
21+
private volatile boolean isFatal;
22+
1923
private volatile EvaluationContext enrichedContext = new ImmutableContext();
20-
private volatile boolean initialized;
24+
private volatile boolean isInitialized;
2125
private volatile boolean isShutDown;
2226

2327
public void setEnrichedContext(EvaluationContext context) {
@@ -31,32 +35,40 @@ public void setEnrichedContext(EvaluationContext context) {
3135
* @return true iff this was the first call to {@code initialize()}
3236
*/
3337
public synchronized boolean initialize() {
34-
if (this.initialized) {
38+
if (this.isInitialized) {
3539
return false;
3640
}
37-
this.initialized = true;
41+
this.isInitialized = true;
42+
this.isFatal = false;
3843
this.notifyAll();
3944
return true;
4045
}
4146

4247
/**
43-
* Blocks the calling thread until either {@link FlagdProviderSyncResources#initialize()} or
44-
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is exceeded, whatever happens first. If
45-
* {@link FlagdProviderSyncResources#initialize()} has been executed before {@code waitForInitialization(long)} is
46-
* called, it will return instantly. If the deadline is exceeded, a GeneralError will be thrown.
47-
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime, an {@link IllegalStateException} will
48+
* Blocks the calling thread until either
49+
* {@link FlagdProviderSyncResources#initialize()} or
50+
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is
51+
* exceeded, whatever happens first. If
52+
* {@link FlagdProviderSyncResources#initialize()} has been executed before
53+
* {@code waitForInitialization(long)} is
54+
* called, it will return instantly. If the deadline is exceeded, a GeneralError
55+
* will be thrown.
56+
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime,
57+
* an {@link IllegalStateException} will
4858
* be thrown. Otherwise, the method will return cleanly.
4959
*
5060
* @param deadline the maximum time in ms to wait
51-
* @throws GeneralError when the deadline is exceeded before
52-
* {@link FlagdProviderSyncResources#initialize()} is called on this object
53-
* @throws IllegalStateException when {@link FlagdProviderSyncResources#shutdown()} is called or has been called on
54-
* this object
61+
* @throws GeneralError when the deadline is exceeded before
62+
* {@link FlagdProviderSyncResources#initialize()} is
63+
* called on this object, or when
64+
* {@link FlagdProviderSyncResources#shutdown()}
65+
* @throws FatalError when the provider has been marked as fatal during
66+
* shutdown
5567
*/
5668
public void waitForInitialization(long deadline) {
5769
long start = System.currentTimeMillis();
5870
long end = start + deadline;
59-
while (!initialized && !isShutDown) {
71+
while (!isInitialized && !isShutDown) {
6072
long now = System.currentTimeMillis();
6173
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
6274
if (now >= end) {
@@ -68,7 +80,7 @@ public void waitForInitialization(long deadline) {
6880
if (isShutDown) {
6981
break;
7082
}
71-
if (initialized) { // might have changed in the meantime
83+
if (isInitialized) { // might have changed in the meantime
7284
return;
7385
}
7486
try {
@@ -80,7 +92,11 @@ public void waitForInitialization(long deadline) {
8092
}
8193
}
8294
if (isShutDown) {
83-
throw new IllegalStateException("Already shut down");
95+
String msg = "Already shut down due to previous error.";
96+
if (isFatal) {
97+
throw new FatalError(msg);
98+
}
99+
throw new GeneralError(msg);
84100
}
85101
}
86102

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
66
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
7-
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
87
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
98
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore;
109
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
@@ -20,13 +19,15 @@
2019
import dev.openfeature.sdk.ImmutableMetadata;
2120
import dev.openfeature.sdk.ProviderEvaluation;
2221
import dev.openfeature.sdk.ProviderEvent;
22+
import dev.openfeature.sdk.ProviderEventDetails;
2323
import dev.openfeature.sdk.Reason;
24+
import dev.openfeature.sdk.Structure;
2425
import dev.openfeature.sdk.Value;
2526
import dev.openfeature.sdk.exceptions.GeneralError;
2627
import dev.openfeature.sdk.exceptions.ParseError;
2728
import dev.openfeature.sdk.exceptions.TypeMismatchError;
29+
import dev.openfeature.sdk.internal.TriConsumer;
2830
import java.util.Map;
29-
import java.util.function.Consumer;
3031
import lombok.extern.slf4j.Slf4j;
3132
import org.apache.commons.lang3.StringUtils;
3233

@@ -38,7 +39,7 @@
3839
@Slf4j
3940
public class InProcessResolver implements Resolver {
4041
private final Storage flagStore;
41-
private final Consumer<FlagdProviderEvent> onConnectionEvent;
42+
private final TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onConnectionEvent;
4243
private final Operator operator;
4344
private final String scope;
4445
private final QueueSource queueSource;
@@ -52,7 +53,8 @@ public class InProcessResolver implements Resolver {
5253
* @param onConnectionEvent lambda which handles changes in the
5354
* connection/stream
5455
*/
55-
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
56+
public InProcessResolver(
57+
FlagdOptions options, TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onConnectionEvent) {
5658
this.queueSource = getQueueSource(options);
5759
this.flagStore = new FlagStore(queueSource);
5860
this.onConnectionEvent = onConnectionEvent;
@@ -73,14 +75,29 @@ public void init() throws Exception {
7375
switch (storageStateChange.getStorageState()) {
7476
case OK:
7577
log.debug("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
76-
onConnectionEvent.accept(new FlagdProviderEvent(
78+
79+
var eventDetails = ProviderEventDetails.builder()
80+
.flagsChanged(storageStateChange.getChangedFlagsKeys())
81+
.message("configuration changed")
82+
.build();
83+
84+
onConnectionEvent.accept(
7785
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED,
78-
storageStateChange.getChangedFlagsKeys(),
79-
storageStateChange.getSyncMetadata()));
86+
eventDetails,
87+
storageStateChange.getSyncMetadata());
88+
8089
log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
8190
break;
91+
case STALE:
92+
onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null);
93+
break;
8294
case ERROR:
83-
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
95+
onConnectionEvent.accept(
96+
ProviderEvent.PROVIDER_ERROR,
97+
ProviderEventDetails.builder()
98+
.errorCode(ErrorCode.PROVIDER_FATAL)
99+
.build(),
100+
null);
84101
break;
85102
default:
86103
log.warn(String.format(

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.openfeature.sdk.ImmutableStructure;
1212
import dev.openfeature.sdk.Structure;
1313
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14+
import java.util.Collections;
1415
import java.util.HashMap;
1516
import java.util.List;
1617
import java.util.Map;
@@ -109,7 +110,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
109110
switch (payload.getType()) {
110111
case DATA:
111112
try {
112-
List<String> changedFlagsKeys;
113+
List<String> changedFlagsKeys = Collections.emptyList();
113114
ParsingResult parsingResult = FlagParser.parseString(payload.getFlagData(), throwIfInvalid);
114115
Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
115116
Map<String, Object> flagSetMetadataMap = parsingResult.getFlagSetMetadata();
@@ -133,13 +134,19 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
133134
// catch all exceptions and avoid stream listener interruptions
134135
log.warn("Invalid flag sync payload from connector", e);
135136
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
136-
log.warn("Failed to convey STALE status, queue is full");
137+
log.warn("Failed to convey TRANSIENT_ERROR status, queue is full");
137138
}
138139
}
139140
break;
140141
case ERROR:
142+
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
143+
log.warn("Failed to convey TRANSIENT_ERROR status, queue is full");
144+
}
145+
break;
146+
case SHUTDOWN:
147+
shutdown();
141148
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
142-
log.warn("Failed to convey ERROR status, queue is full");
149+
log.warn("Failed to convey FATAL_ERROR status, queue is full");
143150
}
144151
break;
145152
default:

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;
22

3-
/** Satus of the storage. */
3+
/** Status of the storage. */
44
public enum StorageState {
55
/** Storage is upto date and working as expected. */
66
OK,
7-
/** Storage has gone stale(most recent sync failed). May get to OK status with next sync. */
7+
/** Storage has gone stale (most recent sync failed). May get to OK status with next sync. */
88
STALE,
99
/** Storage is in an unrecoverable error stage. */
1010
ERROR,

0 commit comments

Comments
 (0)