Skip to content

Commit ca3c16a

Browse files
authored
fix: stream stability when connecting (wait4Ready) (#1458)
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
1 parent 7213189 commit ca3c16a

13 files changed

Lines changed: 210 additions & 100 deletions

File tree

libs/providers/flagd/src/e2e/step-definitions/providerSteps.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { OpenFeature, ProviderEvents, ProviderStatus } from '@openfeature/server-sdk';
1+
import { OpenFeature, ProviderStatus } from '@openfeature/server-sdk';
22
import { FlagdComposeContainer } from '../tests/flagdComposeContainer';
33
import type { State, Steps } from './state';
44
import { FlagdProvider } from '../../lib/flagd-provider';
@@ -9,7 +9,7 @@ import { existsSync } from 'node:fs';
99

1010
export const providerSteps: Steps =
1111
(state: State) =>
12-
({ given, when, then, and }) => {
12+
({ given, when, then }) => {
1313
const container: FlagdComposeContainer = FlagdComposeContainer.build();
1414
beforeAll(async () => {
1515
console.log('Setting test harness...');
@@ -21,8 +21,8 @@ export const providerSteps: Steps =
2121
await container.stop();
2222
});
2323

24-
beforeEach(() => {
25-
OpenFeature.clearProviders();
24+
beforeEach(async () => {
25+
await OpenFeature.clearProviders();
2626
});
2727

2828
afterEach(async () => {
@@ -34,10 +34,11 @@ export const providerSteps: Steps =
3434
const flagdOptions: FlagdProviderOptions = {
3535
resolverType: state.resolverType,
3636
retryGracePeriod: 2, // retryGracePeriod is related to test expectations; this must be 2
37-
// these 3 options are optimized for test speed and stability
38-
deadlineMs: 4000,
37+
// these options are optimized for test speed and stability
38+
deadlineMs: 15000,
39+
keepAliveTime: 200,
3940
retryBackoffMaxMs: 1000,
40-
retryBackoffMs: 1000,
41+
retryBackoffMs: 100,
4142

4243
...state.config,
4344
...state.options,
@@ -77,7 +78,6 @@ export const providerSteps: Steps =
7778
}
7879

7980
await fetch('http://' + container.getLaunchpadUrl() + '/start?config=' + type);
80-
await new Promise((r) => setTimeout(r, 100));
8181
if (providerType == 'unavailable' || providerType == 'forbidden') {
8282
OpenFeature.setProvider(providerType, new FlagdProvider(flagdOptions));
8383
} else {

libs/providers/flagd/src/e2e/tests/in-process.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import { contextSteps } from '../step-definitions/contextSteps';
1010
const steps = [providerSteps, configSteps, eventSteps, flagSteps, contextSteps];
1111

1212
jest.setTimeout(50000);
13+
jest.retryTimes(3);
14+
1315
describe('in-process', () => {
1416
const state: State = {
1517
resolverType: 'in-process',

libs/providers/flagd/src/e2e/tests/rpc.spec.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { GHERKIN_FLAGD } from '../constants';
1010
const steps = [providerSteps, configSteps, eventSteps, flagSteps, contextSteps];
1111

1212
jest.setTimeout(50000);
13+
jest.retryTimes(3);
1314

1415
describe('rpc', () => {
1516
const state: State = {

libs/providers/flagd/src/lib/configuration.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Config, FlagdProviderOptions } from './configuration';
22
import { getConfig } from './configuration';
3-
import { DEFAULT_MAX_CACHE_SIZE, DEFAULT_RETRY_GRACE_PERIOD } from './constants';
3+
import { DEFAULT_MAX_BACKOFF_MS, DEFAULT_MAX_CACHE_SIZE, DEFAULT_RETRY_GRACE_PERIOD } from './constants';
44
import type { EvaluationContext } from '@openfeature/server-sdk';
55
import { configSteps } from '../e2e/step-definitions/configSteps';
66
import type { State } from '../e2e/step-definitions/state';
@@ -23,7 +23,7 @@ describe('Configuration', () => {
2323
maxCacheSize: DEFAULT_MAX_CACHE_SIZE,
2424
cache: 'lru',
2525
resolverType: 'rpc',
26-
retryBackoffMaxMs: 120000,
26+
retryBackoffMaxMs: DEFAULT_MAX_BACKOFF_MS,
2727
retryBackoffMs: 1000,
2828
selector: '',
2929
deadlineMs: 500,
@@ -124,7 +124,7 @@ describe('Configuration', () => {
124124
maxCacheSize: 1000,
125125
cache: 'lru',
126126
resolverType: 'rpc',
127-
retryBackoffMaxMs: 120000,
127+
retryBackoffMaxMs: DEFAULT_MAX_BACKOFF_MS,
128128
retryBackoffMs: 1000,
129129
selector: '',
130130
defaultAuthority: '',

libs/providers/flagd/src/lib/configuration.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { DEFAULT_MAX_CACHE_SIZE, DEFAULT_RETRY_GRACE_PERIOD } from './constants';
1+
import { DEFAULT_MAX_BACKOFF_MS, DEFAULT_MAX_CACHE_SIZE, DEFAULT_RETRY_GRACE_PERIOD } from './constants';
22
import type { EvaluationContext } from '@openfeature/server-sdk';
33

44
export type CacheOption = 'lru' | 'disabled';
@@ -158,7 +158,7 @@ const DEFAULT_CONFIG: Omit<FlagdConfig & FlagdGrpcConfig, 'port' | 'resolverType
158158
maxCacheSize: DEFAULT_MAX_CACHE_SIZE,
159159
contextEnricher: (syncContext: EvaluationContext | null) => syncContext ?? {},
160160
retryBackoffMs: 1000,
161-
retryBackoffMaxMs: 120000,
161+
retryBackoffMaxMs: DEFAULT_MAX_BACKOFF_MS,
162162
keepAliveTime: 0,
163163
retryGracePeriod: DEFAULT_RETRY_GRACE_PERIOD,
164164
};

libs/providers/flagd/src/lib/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ export const EVENT_CONFIGURATION_CHANGE = 'configuration_change';
44
export const EVENT_PROVIDER_READY = 'provider_ready';
55
export const DEFAULT_MAX_CACHE_SIZE = 1000;
66
export const DEFAULT_RETRY_GRACE_PERIOD = 5;
7+
export const DEFAULT_MAX_BACKOFF_MS = 12000;

libs/providers/flagd/src/lib/flagd-provider.spec.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ describe(FlagdProvider.name, () => {
6767

6868
// mock ServiceClient to inject
6969
const basicServiceClientMock: ServiceClient = {
70+
waitForReady: jest.fn((_deadline: number, callback: (err?: Error) => void) => {
71+
callback();
72+
}),
7073
eventStream: jest.fn(() => {
7174
return {
7275
on: jest.fn((event: string, callback: (message: unknown) => void) => {
@@ -270,6 +273,9 @@ describe(FlagdProvider.name, () => {
270273

271274
// mock ServiceClient to inject
272275
const streamingServiceClientMock = {
276+
waitForReady: jest.fn((_deadline: number, callback: (err?: Error) => void) => {
277+
callback();
278+
}),
273279
eventStream: jest.fn(() => {
274280
return streamMock;
275281
}),
@@ -507,7 +513,15 @@ describe(FlagdProvider.name, () => {
507513
});
508514

509515
describe('connection/re-connection', () => {
510-
it('should watch channel for reconnect after error', () => {
516+
beforeEach(() => {
517+
jest.useFakeTimers();
518+
});
519+
520+
afterEach(() => {
521+
jest.useRealTimers();
522+
});
523+
524+
it('should attempt to reconnect after error', () => {
511525
const provider = new FlagdProvider(
512526
undefined,
513527
undefined,
@@ -523,8 +537,11 @@ describe(FlagdProvider.name, () => {
523537
// fake some errors
524538
registeredOnErrorCallback();
525539

526-
expect(streamingServiceClientMock.getChannel().getConnectivityState).toHaveBeenCalledWith(true);
527-
expect(streamingServiceClientMock.getChannel().watchConnectivityState).toHaveBeenCalled();
540+
// verify reconnection is scheduled via setTimeout
541+
jest.runAllTimers();
542+
543+
// eventStream should have been called twice (initial + reconnect attempt)
544+
expect(streamingServiceClientMock.eventStream).toHaveBeenCalledTimes(2);
528545
});
529546
});
530547
});
@@ -535,6 +552,9 @@ describe(FlagdProvider.name, () => {
535552

536553
// mock ServiceClient to inject
537554
const errServiceClientMock: ServiceClient = {
555+
waitForReady: jest.fn((_deadline: number, callback: (err?: Error) => void) => {
556+
callback();
557+
}),
538558
eventStream: jest.fn(() => {
539559
return {
540560
on: jest.fn((event: string, callback: (message: unknown) => void) => {
@@ -661,6 +681,9 @@ describe(FlagdProvider.name, () => {
661681

662682
// mock ServiceClient to inject
663683
const errServiceClientMock: ServiceClient = {
684+
waitForReady: jest.fn((_deadline: number, callback: (err?: Error) => void) => {
685+
callback();
686+
}),
664687
eventStream: jest.fn(() => {
665688
return {
666689
on: jest.fn((event: string, callback: (message: unknown) => void) => {

libs/providers/flagd/src/lib/service/common/grpc-util.spec.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,29 @@ describe('buildClientOptions', () => {
6060
expect(buildClientOptions(config)).toEqual({
6161
'grpc.default_authority': 'my-authority',
6262
'grpc.keepalive_time_ms': 5000,
63-
'grpc.service_config': buildRetryPolicy('flagd.service.v1.FlagService', 100, 200),
63+
'grpc.service_config': buildRetryPolicy(
64+
['flagd.evaluation.v1.Service', 'flagd.sync.v1.FlagSyncService'],
65+
100,
66+
200,
67+
),
68+
});
69+
});
70+
});
71+
72+
describe('buildRetryPolicy', () => {
73+
it('should create a single methodConfig with multiple services sharing one retryPolicy', () => {
74+
const result = JSON.parse(buildRetryPolicy(['service.A', 'service.B'], 2000, 60000));
75+
76+
expect(result.methodConfig).toHaveLength(1);
77+
expect(result.methodConfig[0]).toEqual({
78+
name: [{ service: 'service.A' }, { service: 'service.B' }],
79+
retryPolicy: {
80+
maxAttempts: 3,
81+
initialBackoff: '2.00s',
82+
maxBackoff: '60.00s',
83+
backoffMultiplier: 2,
84+
retryableStatusCodes: ['UNAVAILABLE', 'UNKNOWN'],
85+
},
6486
});
6587
});
6688
});

libs/providers/flagd/src/lib/service/common/grpc-util.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const CONFIG_TO_GRPC_OPTIONS: {
5858
export function buildClientOptions(config: Config): ClientOptions {
5959
const options: Partial<ClientOptions> = {
6060
'grpc.service_config': buildRetryPolicy(
61-
'flagd.service.v1.FlagService',
61+
['flagd.evaluation.v1.Service', 'flagd.sync.v1.FlagSyncService'],
6262
config.retryBackoffMs,
6363
config.retryBackoffMaxMs,
6464
),
@@ -76,27 +76,33 @@ export function buildClientOptions(config: Config): ClientOptions {
7676

7777
/**
7878
* Builds RetryPolicy for gRPC client options.
79-
* @param serviceName
79+
* @param serviceNames Array of service names to configure retry policy for
8080
* @param retryBackoffMs Initial backoff duration in milliseconds
8181
* @param retryBackoffMaxMs Maximum backoff duration in milliseconds
8282
* @returns gRPC client options with retry policy
8383
*/
84-
export const buildRetryPolicy = (serviceName: string, retryBackoffMs?: number, retryBackoffMaxMs?: number): string => {
84+
export const buildRetryPolicy = (
85+
serviceNames: string[],
86+
retryBackoffMs?: number,
87+
retryBackoffMaxMs?: number,
88+
): string => {
8589
const initialBackoff = retryBackoffMs ?? 1000;
8690
const maxBackoff = retryBackoffMaxMs ?? 120000;
8791

92+
const retryPolicy = {
93+
maxAttempts: 3,
94+
initialBackoff: `${Math.round(initialBackoff / 1000).toFixed(2)}s`,
95+
maxBackoff: `${Math.round(maxBackoff / 1000).toFixed(2)}s`,
96+
backoffMultiplier: 2,
97+
retryableStatusCodes: [statusName(status.UNAVAILABLE), statusName(status.UNKNOWN)],
98+
};
99+
88100
return JSON.stringify({
89101
loadBalancingConfig: [],
90102
methodConfig: [
91103
{
92-
name: [{ service: serviceName }],
93-
retryPolicy: {
94-
maxAttempts: 3,
95-
initialBackoff: `${Math.round(initialBackoff / 1000).toFixed(2)}s`,
96-
maxBackoff: `${Math.round(maxBackoff / 1000).toFixed(2)}s`,
97-
backoffMultiplier: 2,
98-
retryableStatusCodes: [statusName(status.UNAVAILABLE), statusName(status.UNKNOWN)],
99-
},
104+
name: serviceNames.map((serviceName) => ({ service: serviceName })),
105+
retryPolicy,
100106
},
101107
],
102108
});

libs/providers/flagd/src/lib/service/grpc/grpc-service.ts

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ClientReadableStream, ClientUnaryCall, ServiceError } from '@grpc/grpc-js';
1+
import { type ClientReadableStream, type ClientUnaryCall, type ServiceError } from '@grpc/grpc-js';
22
import { status } from '@grpc/grpc-js';
33
import { ConnectivityState } from '@grpc/grpc-js/build/src/connectivity-state';
44
import type { EvaluationContext, FlagValue, JsonValue, Logger, ResolutionDetails } from '@openfeature/server-sdk';
@@ -27,7 +27,12 @@ import type {
2727
} from '../../../proto/ts/flagd/evaluation/v1/evaluation';
2828
import { ServiceClient } from '../../../proto/ts/flagd/evaluation/v1/evaluation';
2929
import type { FlagdGrpcConfig } from '../../configuration';
30-
import { DEFAULT_MAX_CACHE_SIZE, EVENT_CONFIGURATION_CHANGE, EVENT_PROVIDER_READY } from '../../constants';
30+
import {
31+
DEFAULT_MAX_BACKOFF_MS,
32+
DEFAULT_MAX_CACHE_SIZE,
33+
EVENT_CONFIGURATION_CHANGE,
34+
EVENT_PROVIDER_READY,
35+
} from '../../constants';
3136
import { FlagdProvider } from '../../flagd-provider';
3237
import type { Service } from '../service';
3338
import {
@@ -79,6 +84,8 @@ export class GRPCService implements Service {
7984
private readonly _fatalStatusCodes: Set<number>;
8085
private _initialized = false;
8186
private _streamDeadline: number;
87+
private _maxBackoffMs: number;
88+
private _errorThrottled = false;
8289

8390
private get _cacheActive() {
8491
// the cache is "active" (able to be used) if the config enabled it, AND the gRPC stream is live
@@ -94,6 +101,7 @@ export class GRPCService implements Service {
94101
const clientOptions = buildClientOptions(config);
95102
const channelCredentials = createChannelCredentials(tls, certPath);
96103

104+
this._maxBackoffMs = config.retryBackoffMaxMs || DEFAULT_MAX_BACKOFF_MS;
97105
this._client = client
98106
? client
99107
: new ServiceClient(socketPath ? `unix://${socketPath}` : `${host}:${port}`, channelCredentials, clientOptions);
@@ -175,32 +183,48 @@ export class GRPCService implements Service {
175183
// close the previous stream if we're reconnecting
176184
closeStreamIfDefined(this._eventStream);
177185

178-
const deadline = this._streamDeadline != 0 ? Date.now() + this._streamDeadline : undefined;
179-
const stream = this._client.eventStream({ waitForReady: true }, { deadline });
180-
stream.on('error', (err: Error) => {
181-
// Check if error is a fatal status code on first connection only
182-
if (isFatalStatusCodeError(err, this._initialized, this._fatalStatusCodes)) {
183-
handleFatalStatusCodeError(err, this.logger, disconnectCallback, rejectConnect);
184-
return;
185-
}
186-
rejectConnect?.(err);
187-
this.handleError(reconnectCallback, changedCallback, disconnectCallback);
188-
});
189-
stream.on('data', (message) => {
190-
if (message.type === EVENT_PROVIDER_READY) {
191-
this.logger?.debug(`${FlagdProvider.name}: streaming connection established with flagd`);
192-
this._initialized = true;
193-
// if resolveConnect is undefined, this is a reconnection; we only want to fire the reconnect callback in that case
194-
if (resolveConnect) {
195-
resolveConnect();
196-
} else {
197-
reconnectCallback();
186+
// wait for connection to be stable
187+
this._client.waitForReady(Date.now() + this._deadline, (err) => {
188+
if (err) {
189+
// Check if error is a fatal status code on first connection only
190+
if (isFatalStatusCodeError(err, this._initialized, this._fatalStatusCodes)) {
191+
handleFatalStatusCodeError(err, this.logger, disconnectCallback, rejectConnect);
192+
return;
198193
}
199-
} else if (message.type === EVENT_CONFIGURATION_CHANGE) {
200-
this.handleFlagsChanged(message, changedCallback);
194+
rejectConnect?.(err);
195+
this.handleError(reconnectCallback, changedCallback, disconnectCallback);
196+
} else {
197+
const streamDeadline = this._streamDeadline != 0 ? Date.now() + this._streamDeadline : undefined;
198+
const stream = this._client.eventStream({}, { deadline: streamDeadline });
199+
stream.on('error', (err: Error) => {
200+
// In cases where we get an explicit error status, we add a delay.
201+
// This prevents tight loops when errors are returned immediately, typically by intervening proxies like Envoy.
202+
this._errorThrottled = true;
203+
// Check if error is a fatal status code on first connection only
204+
if (isFatalStatusCodeError(err, this._initialized, this._fatalStatusCodes)) {
205+
handleFatalStatusCodeError(err, this.logger, disconnectCallback, rejectConnect);
206+
return;
207+
}
208+
rejectConnect?.(err);
209+
this.handleError(reconnectCallback, changedCallback, disconnectCallback);
210+
});
211+
stream.on('data', (message) => {
212+
if (message.type === EVENT_PROVIDER_READY) {
213+
this.logger?.debug(`${FlagdProvider.name}: streaming connection established with flagd`);
214+
this._initialized = true;
215+
// if resolveConnect is undefined, this is a reconnection; we only want to fire the reconnect callback in that case
216+
if (resolveConnect) {
217+
resolveConnect();
218+
} else {
219+
reconnectCallback();
220+
}
221+
} else if (message.type === EVENT_CONFIGURATION_CHANGE) {
222+
this.handleFlagsChanged(message, changedCallback);
223+
}
224+
});
225+
this._eventStream = stream;
201226
}
202227
});
203-
this._eventStream = stream;
204228
}
205229

206230
private handleFlagsChanged(message: EventStreamResponse, changedCallback: (flagsChanged: string[]) => void) {
@@ -228,10 +252,11 @@ export class GRPCService implements Service {
228252
changedCallback: (flagsChanged: string[]) => void,
229253
disconnectCallback: (message: string) => void,
230254
) {
231-
const channel = this._client.getChannel();
232-
channel.watchConnectivityState(channel.getConnectivityState(true), Infinity, () => {
233-
this.listen(reconnectCallback, changedCallback, disconnectCallback);
234-
});
255+
setTimeout(
256+
() => this.listen(reconnectCallback, changedCallback, disconnectCallback),
257+
this._errorThrottled ? this._maxBackoffMs : 0,
258+
);
259+
this._errorThrottled = false;
235260
}
236261

237262
private handleError(

0 commit comments

Comments
 (0)