Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,9 @@ public async Task Init()
{
await _jsonSchemaValidator.InitializeAsync().ConfigureAwait(false);

var latch = new CountdownEvent(1);
var handleEventsThread = new Thread(async () => await HandleEvents(latch).ConfigureAwait(false))
{
IsBackground = true
};
handleEventsThread.Start();
await Task.Run(() => latch.Wait()).ConfigureAwait(false);
var initComplete = new TaskCompletionSource<bool>();
_ = Task.Run(() => HandleEvents(initComplete));
await initComplete.Task.ConfigureAwait(false);
}

public async Task Shutdown()
Expand Down Expand Up @@ -108,27 +104,24 @@ public Task<ResolutionDetails<Value>> ResolveStructureValueAsync(string flagKey,
return Task.FromResult(_evaluator.ResolveStructureValueAsync(flagKey, defaultValue, context));
}

private async Task HandleEvents(CountdownEvent latch)
private async Task HandleEvents(TaskCompletionSource<bool> tcs)
{
CancellationToken token = _cancellationTokenSource.Token;
while (!token.IsCancellationRequested)
{
var call = _client.SyncFlags(new SyncFlagsRequest
{
Selector = _config.SourceSelector
});
try
{
// Read the response stream asynchronously
var call = _client.SyncFlags(new SyncFlagsRequest
{
Selector = _config.SourceSelector
}, cancellationToken: token);

while (!token.IsCancellationRequested && await call.ResponseStream.MoveNext(token).ConfigureAwait(false))
{
var response = call.ResponseStream.Current;
this._evaluator.Sync(FlagConfigurationUpdateType.ALL, response.FlagConfiguration);

if (!latch.IsSet)
{
latch.Signal();
}
tcs.TrySetResult(true);

// Reset delay backoff on successful response
this._eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff;
Expand All @@ -148,18 +141,36 @@ private async Task HandleEvents(CountdownEvent latch)
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
// do nothing, we've been shutdown
// The operation was cancelled, which is expected during shutdown.
break;
}
catch (RpcException)
{
// This is a transient error, so we signal that Init() can complete,
// but the provider is in an error state. We will then retry.
// Emit the error event first, so that the error state is propagated
// before Init() completes. This avoids a race condition where the
// provider appears ready but hasn't signaled the error yet.
var flagdEvent = new FlagdProviderEvent(ProviderEventTypes.ProviderError, new List<string>(), Structure.Empty);
ProviderEvent?.Invoke(this, flagdEvent);

// Handle the dropped connection by reconnecting and retrying the stream
tcs.TrySetResult(true);

this._eventStreamRetryBackoff = Math.Min(this._eventStreamRetryBackoff * 2, MaxEventStreamRetryBackoff);
await Task.Delay(this._eventStreamRetryBackoff * 1000).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(this._eventStreamRetryBackoff), token).ConfigureAwait(false);
}
catch (Exception ex)
{
// This is an unexpected and likely non-transient error.
// We should fail Init() and stop the event loop.
tcs.TrySetException(ex);
// It would be good to log the exception here.
return;
}
}

// If the loop exits cleanly (e.g., via cancellation), ensure the TCS is completed.
tcs.TrySetResult(true);
}

private static Value ExtractValue(Google.Protobuf.WellKnownTypes.Value value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ public async Task TestInProcessResolver()
);

mockGrpcClient.SyncFlags(
Arg.Any<SyncFlagsRequest>(), null, null, CancellationToken.None)
Arg.Any<SyncFlagsRequest>(), null, null, Arg.Any<CancellationToken>())
.Returns(grpcEventStreamResp);


Expand All @@ -760,7 +760,7 @@ await Utils.AssertUntilAsync(
Assert.True(val.Value);
});

mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is<SyncFlagsRequest>(req => req.Selector == "source-selector"), null, null, CancellationToken.None);
mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is<SyncFlagsRequest>(req => req.Selector == "source-selector"), null, null, Arg.Any<CancellationToken>());

await flagdProvider.ShutdownAsync();
}
Expand Down Expand Up @@ -799,7 +799,7 @@ public async Task TestInProcessResolverDefaultValueIfNotFound()
);

mockGrpcClient.SyncFlags(
Arg.Any<SyncFlagsRequest>(), null, null, CancellationToken.None)
Arg.Any<SyncFlagsRequest>(), null, null, Arg.Any<CancellationToken>())
.Returns(grpcEventStreamResp);


Expand All @@ -820,7 +820,7 @@ await Utils.AssertUntilAsync(
Assert.Equal(ErrorType.FlagNotFound, exception.ErrorType);
});

mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is<SyncFlagsRequest>(req => req.Selector == "source-selector"), null, null, CancellationToken.None);
mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is<SyncFlagsRequest>(req => req.Selector == "source-selector"), null, null, Arg.Any<CancellationToken>());

await flagdProvider.ShutdownAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private static (FlagSyncService.FlagSyncServiceClient, IAsyncStreamReader<SyncFl
asyncStreamReader.Current.Returns(_ => enumerator.Current);

var grpcEventStreamResp = new AsyncServerStreamingCall<SyncFlagsResponse>(asyncStreamReader, null, null, null, null, null);
mockGrpcClient.SyncFlags(Arg.Any<SyncFlagsRequest>(), null, null, CancellationToken.None).Returns(grpcEventStreamResp);
mockGrpcClient.SyncFlags(Arg.Any<SyncFlagsRequest>(), null, null, Arg.Any<CancellationToken>()).Returns(grpcEventStreamResp);

return (mockGrpcClient, asyncStreamReader);
}
Expand Down