diff --git a/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/InProcessResolver.cs b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/InProcessResolver.cs index cb1fb142..81cf951c 100644 --- a/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/InProcessResolver.cs +++ b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/InProcessResolver.cs @@ -58,13 +58,9 @@ public async Task Init() { await _jsonSchemaValidator.InitializeAsync().ConfigureAwait(false); - var latch = new CountdownEvent(1); - var handleEventsThread = new Thread(async () => await HandleEvents(latch).ConfigureAwait(false)) - { - IsBackground = true - }; - handleEventsThread.Start(); - await Task.Run(() => latch.Wait()).ConfigureAwait(false); + var initComplete = new TaskCompletionSource(); + _ = Task.Run(() => HandleEvents(initComplete)); + await initComplete.Task.ConfigureAwait(false); } public async Task Shutdown() @@ -108,27 +104,24 @@ public Task> ResolveStructureValueAsync(string flagKey, return Task.FromResult(_evaluator.ResolveStructureValueAsync(flagKey, defaultValue, context)); } - private async Task HandleEvents(CountdownEvent latch) + private async Task HandleEvents(TaskCompletionSource tcs) { CancellationToken token = _cancellationTokenSource.Token; while (!token.IsCancellationRequested) { - var call = _client.SyncFlags(new SyncFlagsRequest - { - Selector = _config.SourceSelector - }); try { - // Read the response stream asynchronously + var call = _client.SyncFlags(new SyncFlagsRequest + { + Selector = _config.SourceSelector + }, cancellationToken: token); + while (!token.IsCancellationRequested && await call.ResponseStream.MoveNext(token).ConfigureAwait(false)) { var response = call.ResponseStream.Current; this._evaluator.Sync(FlagConfigurationUpdateType.ALL, response.FlagConfiguration); - if (!latch.IsSet) - { - latch.Signal(); - } + tcs.TrySetResult(true); // Reset delay backoff on successful response this._eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff; @@ -148,18 +141,36 @@ private async Task HandleEvents(CountdownEvent latch) } catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) { - // do nothing, we've been shutdown + // The operation was cancelled, which is expected during shutdown. + break; } catch (RpcException) { + // This is a transient error, so we signal that Init() can complete, + // but the provider is in an error state. We will then retry. + // Emit the error event first, so that the error state is propagated + // before Init() completes. This avoids a race condition where the + // provider appears ready but hasn't signaled the error yet. var flagdEvent = new FlagdProviderEvent(ProviderEventTypes.ProviderError, new List(), Structure.Empty); ProviderEvent?.Invoke(this, flagdEvent); - // Handle the dropped connection by reconnecting and retrying the stream + tcs.TrySetResult(true); + this._eventStreamRetryBackoff = Math.Min(this._eventStreamRetryBackoff * 2, MaxEventStreamRetryBackoff); - await Task.Delay(this._eventStreamRetryBackoff * 1000).ConfigureAwait(false); + await Task.Delay(TimeSpan.FromSeconds(this._eventStreamRetryBackoff), token).ConfigureAwait(false); + } + catch (Exception ex) + { + // This is an unexpected and likely non-transient error. + // We should fail Init() and stop the event loop. + tcs.TrySetException(ex); + // It would be good to log the exception here. + return; } } + + // If the loop exits cleanly (e.g., via cancellation), ensure the TCS is completed. + tcs.TrySetResult(true); } private static Value ExtractValue(Google.Protobuf.WellKnownTypes.Value value) diff --git a/test/OpenFeature.Contrib.Providers.Flagd.Test/FlagdProviderTest.cs b/test/OpenFeature.Contrib.Providers.Flagd.Test/FlagdProviderTest.cs index 415c3267..5aea37bc 100644 --- a/test/OpenFeature.Contrib.Providers.Flagd.Test/FlagdProviderTest.cs +++ b/test/OpenFeature.Contrib.Providers.Flagd.Test/FlagdProviderTest.cs @@ -739,7 +739,7 @@ public async Task TestInProcessResolver() ); mockGrpcClient.SyncFlags( - Arg.Any(), null, null, CancellationToken.None) + Arg.Any(), null, null, Arg.Any()) .Returns(grpcEventStreamResp); @@ -760,7 +760,7 @@ await Utils.AssertUntilAsync( Assert.True(val.Value); }); - mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is(req => req.Selector == "source-selector"), null, null, CancellationToken.None); + mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is(req => req.Selector == "source-selector"), null, null, Arg.Any()); await flagdProvider.ShutdownAsync(); } @@ -799,7 +799,7 @@ public async Task TestInProcessResolverDefaultValueIfNotFound() ); mockGrpcClient.SyncFlags( - Arg.Any(), null, null, CancellationToken.None) + Arg.Any(), null, null, Arg.Any()) .Returns(grpcEventStreamResp); @@ -820,7 +820,7 @@ await Utils.AssertUntilAsync( Assert.Equal(ErrorType.FlagNotFound, exception.ErrorType); }); - mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is(req => req.Selector == "source-selector"), null, null, CancellationToken.None); + mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is(req => req.Selector == "source-selector"), null, null, Arg.Any()); await flagdProvider.ShutdownAsync(); } diff --git a/test/OpenFeature.Contrib.Providers.Flagd.Test/Resolver/InProcess/InProcessResolverTests.cs b/test/OpenFeature.Contrib.Providers.Flagd.Test/Resolver/InProcess/InProcessResolverTests.cs index d3f73dbe..f6e03195 100644 --- a/test/OpenFeature.Contrib.Providers.Flagd.Test/Resolver/InProcess/InProcessResolverTests.cs +++ b/test/OpenFeature.Contrib.Providers.Flagd.Test/Resolver/InProcess/InProcessResolverTests.cs @@ -138,7 +138,7 @@ private static (FlagSyncService.FlagSyncServiceClient, IAsyncStreamReader enumerator.Current); var grpcEventStreamResp = new AsyncServerStreamingCall(asyncStreamReader, null, null, null, null, null); - mockGrpcClient.SyncFlags(Arg.Any(), null, null, CancellationToken.None).Returns(grpcEventStreamResp); + mockGrpcClient.SyncFlags(Arg.Any(), null, null, Arg.Any()).Returns(grpcEventStreamResp); return (mockGrpcClient, asyncStreamReader); }