Skip to content

Commit 1d1c1ad

Browse files
aepfliCopilot
andauthored
refactor(flagd): replace Thread.sleep with ScheduledExecutorService in SyncStreamQueueSource (#1734)
Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 20e7181 commit 1d1c1ad

2 files changed

Lines changed: 162 additions & 74 deletions

File tree

  • providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync
  • tools/flagd-core/src/main/resources/flagd/schemas

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import io.grpc.stub.StreamObserver;
2222
import java.util.List;
2323
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.Executors;
2425
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.RejectedExecutionException;
27+
import java.util.concurrent.ScheduledExecutorService;
2528
import java.util.concurrent.TimeUnit;
2629
import java.util.concurrent.atomic.AtomicBoolean;
2730
import lombok.extern.slf4j.Slf4j;
@@ -48,6 +51,11 @@ public class SyncStreamQueueSource implements QueueSource {
4851
private final FlagdOptions options;
4952
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
5053
private final List<String> fatalStatusCodes;
54+
private final ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
55+
Thread t = new Thread(r, "flagd-sync-retry-scheduler");
56+
t.setDaemon(true);
57+
return t;
58+
});
5159
private volatile GrpcComponents grpcComponents;
5260

5361
/**
@@ -143,9 +151,7 @@ public synchronized void reinitializeChannelComponents() {
143151

144152
/** Initialize sync stream connector. */
145153
public void init() throws Exception {
146-
Thread listener = new Thread(this::observeSyncStream);
147-
listener.setDaemon(true);
148-
listener.start();
154+
retryScheduler.execute(this::observeSyncStream);
149155
}
150156

151157
/** Get blocking queue to obtain payloads exposed by this connector. */
@@ -167,6 +173,8 @@ public void shutdown() throws InterruptedException {
167173
return;
168174
}
169175

176+
retryScheduler.shutdownNow();
177+
retryScheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS);
170178
grpcComponents.channelConnector.shutdown();
171179
}
172180

@@ -181,12 +189,12 @@ private void observeSyncStream() {
181189
try {
182190
if (shouldThrottle.getAndSet(false)) {
183191
log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs);
184-
Thread.sleep(this.maxBackoffMs);
185-
186-
// Check shutdown again after sleep to avoid unnecessary work
187-
if (shutdown.get()) {
188-
break;
192+
try {
193+
retryScheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS);
194+
} catch (RejectedExecutionException e) {
195+
log.debug("Retry scheduling rejected, most likely shutdown was invoked", e);
189196
}
197+
return;
190198
}
191199

192200
log.debug("Initializing sync stream request");
@@ -198,8 +206,8 @@ private void observeSyncStream() {
198206
log.info(
199207
"Fatal status code for metadata request: {}, not retrying",
200208
metaEx.getStatus().getCode());
201-
shutdown();
202209
enqueue(QueuePayload.SHUTDOWN);
210+
shutdown();
203211
} else {
204212
// retry for other status codes
205213
String message = metaEx.getMessage();
@@ -218,8 +226,8 @@ private void observeSyncStream() {
218226
log.info(
219227
"Fatal status code during sync stream: {}, not retrying",
220228
ex.getStatus().getCode());
221-
shutdown();
222229
enqueue(QueuePayload.SHUTDOWN);
230+
shutdown();
223231
} else {
224232
// retry for other status codes
225233
log.error("Unexpected sync stream exception, will restart.", ex);
@@ -228,7 +236,8 @@ private void observeSyncStream() {
228236
shouldThrottle.set(true);
229237
}
230238
} catch (InterruptedException ie) {
231-
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
239+
log.debug("Stream observer interrupted, most likely shutdown was invoked", ie);
240+
Thread.currentThread().interrupt();
232241
}
233242
}
234243

Lines changed: 142 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,123 @@
11
{
22
"$id": "https://flagd.dev/schema/v0/flags.json",
33
"$schema": "http://json-schema.org/draft-07/schema#",
4-
"title": "flagd Flag Configuration",
5-
"description": "Defines flags for use in flagd, including typed variants and rules.",
6-
"type": "object",
7-
"properties": {
8-
"flags": {
4+
"$ref": "#/definitions/providerConfig",
5+
"definitions": {
6+
"flagsMap": {
97
"title": "Flags",
108
"description": "Top-level flags object. All flags are defined here.",
119
"type": "object",
1210
"$comment": "flag objects are one of the 4 flag types defined in definitions",
1311
"additionalProperties": false,
1412
"patternProperties": {
1513
"^.{1,}$": {
16-
"oneOf": [
17-
{
18-
"title": "Boolean flag",
19-
"description": "A flag having boolean values.",
20-
"$ref": "#/definitions/booleanFlag"
21-
},
22-
{
23-
"title": "String flag",
24-
"description": "A flag having string values.",
25-
"$ref": "#/definitions/stringFlag"
14+
"$ref": "#/definitions/anyFlag"
15+
}
16+
}
17+
},
18+
"flagsArray": {
19+
"title": "Flags",
20+
"description": "Top-level flags array. All flags are defined here.",
21+
"type": "array",
22+
"items": {
23+
"allOf": [
24+
{
25+
"$ref": "#/definitions/anyFlag"
26+
},
27+
{
28+
"type": "object",
29+
"properties": {
30+
"key": {
31+
"description": "Key of the flag: uniquely identifies this flag within it's flagSet",
32+
"type": "string",
33+
"minLength": 1
34+
}
2635
},
27-
{
28-
"title": "Numeric flag",
29-
"description": "A flag having numeric values.",
30-
"$ref": "#/definitions/numberFlag"
36+
"required": [
37+
"key"
38+
]
39+
}
40+
]
41+
}
42+
},
43+
"baseConfig": {
44+
"title": "flagd Flag Configuration",
45+
"description": "Defines flags for use in flagd providers, including typed variants and rules.",
46+
"type": "object",
47+
"properties": {
48+
"$evaluators": {
49+
"title": "Evaluators",
50+
"description": "Reusable targeting rules that can be referenced with \"$ref\": \"myRule\" in multiple flags.",
51+
"type": "object",
52+
"additionalProperties": false,
53+
"patternProperties": {
54+
"^.{1,}$": {
55+
"$comment": "this relative ref means that targeting.json MUST be in the same dir, or available on the same HTTP path",
56+
"$ref": "./targeting.json"
57+
}
58+
}
59+
},
60+
"metadata": {
61+
"title": "Flag Set Metadata",
62+
"description": "Metadata about the flag set, with keys of type string, and values of type boolean, string, or number.",
63+
"properties": {
64+
"flagSetId": {
65+
"description": "The unique identifier for the flag set.",
66+
"type": "string"
3167
},
32-
{
33-
"title": "Object flag",
34-
"description": "A flag having arbitrary object values.",
35-
"$ref": "#/definitions/objectFlag"
68+
"version": {
69+
"description": "The version of the flag set.",
70+
"type": "string"
3671
}
37-
]
72+
},
73+
"$ref": "#/definitions/metadata"
3874
}
3975
}
4076
},
41-
"$evaluators": {
42-
"title": "Evaluators",
43-
"description": "Reusable targeting rules that can be referenced with \"$ref\": \"myRule\" in multiple flags.",
77+
"providerConfig": {
78+
"description": "Defines flags for use in providers (not flagd), including typed variants and rules.",
4479
"type": "object",
45-
"additionalProperties": false,
46-
"patternProperties": {
47-
"^.{1,}$": {
48-
"$comment": "this relative ref means that targeting.json MUST be in the same dir, or available on the same HTTP path",
49-
"$ref": "./targeting.json"
80+
"allOf": [
81+
{
82+
"$ref": "#/definitions/baseConfig"
5083
}
51-
}
52-
},
53-
"metadata": {
54-
"title": "Flag Set Metadata",
55-
"description": "Metadata about the flag set, with keys of type string, and values of type boolean, string, or number.",
84+
],
5685
"properties": {
57-
"flagSetId": {
58-
"description": "The unique identifier for the flag set.",
59-
"type": "string"
60-
},
61-
"version": {
62-
"description": "The version of the flag set.",
63-
"type": "string"
86+
"flags": {
87+
"$ref": "#/definitions/flagsMap"
6488
}
6589
},
66-
"$ref": "#/definitions/metadata"
67-
}
68-
},
69-
"definitions": {
70-
"flag": {
90+
"required": [
91+
"flags"
92+
]
93+
},
94+
"flagdConfig": {
95+
"description": "Defines flags for use in the flagd daemon (a superset of what's available in providers), including typed variants and rules. Flags can be defined as an array or an object.",
96+
"type": "object",
97+
"allOf": [
98+
{
99+
"$ref": "#/definitions/baseConfig"
100+
},
101+
{
102+
"properties": {
103+
"flags": {
104+
"oneOf": [
105+
{
106+
"$ref": "#/definitions/flagsMap"
107+
},
108+
{
109+
"$ref": "#/definitions/flagsArray"
110+
}
111+
]
112+
}
113+
}
114+
}
115+
],
116+
"required": [
117+
"flags"
118+
]
119+
},
120+
"baseFlag": {
71121
"$comment": "base flag object; no title/description here, allows for better UX, keep it in the overrides",
72122
"type": "object",
73123
"properties": {
@@ -82,8 +132,11 @@
82132
},
83133
"defaultVariant": {
84134
"title": "Default Variant",
85-
"description": "The variant to serve if no dynamic targeting applies (including if the targeting returns null).",
86-
"type": "string"
135+
"description": "The variant to serve if no dynamic targeting applies (including if the targeting returns null). Set to null to use code-defined default.",
136+
"type": [
137+
"string",
138+
"null"
139+
]
87140
},
88141
"targeting": {
89142
"$ref": "./targeting.json"
@@ -92,11 +145,19 @@
92145
"title": "Flag Metadata",
93146
"description": "Metadata about an individual feature flag, with keys of type string, and values of type boolean, string, or number.",
94147
"$ref": "#/definitions/metadata"
148+
},
149+
"variants": {
150+
"type": "object",
151+
"minProperties": 1,
152+
"additionalProperties": false,
153+
"patternProperties": {
154+
"^.{1,}$": {}
155+
}
95156
}
96157
},
97158
"required": [
98159
"state",
99-
"defaultVariant"
160+
"variants"
100161
]
101162
},
102163
"booleanVariants": {
@@ -109,10 +170,6 @@
109170
"^.{1,}$": {
110171
"type": "boolean"
111172
}
112-
},
113-
"default": {
114-
"true": true,
115-
"false": false
116173
}
117174
}
118175
}
@@ -159,41 +216,65 @@
159216
}
160217
}
161218
},
219+
"anyFlag": {
220+
"anyOf": [
221+
{
222+
"$ref": "#/definitions/booleanFlag"
223+
},
224+
{
225+
"$ref": "#/definitions/numberFlag"
226+
},
227+
{
228+
"$ref": "#/definitions/stringFlag"
229+
},
230+
{
231+
"$ref": "#/definitions/objectFlag"
232+
}
233+
]
234+
},
162235
"booleanFlag": {
163236
"$comment": "merge the variants with the base flag to build our typed flags",
237+
"title": "Boolean flag",
238+
"description": "A flag having boolean values.",
164239
"allOf": [
165240
{
166-
"$ref": "#/definitions/flag"
241+
"$ref": "#/definitions/baseFlag"
167242
},
168243
{
169244
"$ref": "#/definitions/booleanVariants"
170245
}
171246
]
172247
},
173248
"stringFlag": {
249+
"title": "String flag",
250+
"description": "A flag having string values.",
174251
"allOf": [
175252
{
176-
"$ref": "#/definitions/flag"
253+
"$ref": "#/definitions/baseFlag"
177254
},
178255
{
179256
"$ref": "#/definitions/stringVariants"
180257
}
181258
]
182259
},
183260
"numberFlag": {
261+
"title": "Numeric flag",
262+
"description": "A flag having numeric values.",
184263
"allOf": [
185264
{
186-
"$ref": "#/definitions/flag"
265+
"$ref": "#/definitions/baseFlag"
187266
},
188267
{
189268
"$ref": "#/definitions/numberVariants"
190269
}
191270
]
192271
},
193272
"objectFlag": {
273+
"title": "Object flag",
274+
"description": "A flag having arbitrary object values.",
194275
"allOf": [
195276
{
196-
"$ref": "#/definitions/flag"
277+
"$ref": "#/definitions/baseFlag"
197278
},
198279
{
199280
"$ref": "#/definitions/objectVariants"
@@ -203,14 +284,12 @@
203284
"metadata": {
204285
"type": "object",
205286
"additionalProperties": {
206-
"description": "Any additional key/value pair with value of type boolean, string, or number.",
207287
"type": [
208288
"string",
209289
"number",
210290
"boolean"
211291
]
212-
},
213-
"required": []
292+
}
214293
}
215294
}
216295
}

0 commit comments

Comments
 (0)