Skip to content

Commit 1bfa90f

Browse files
Fix VirtualClusterRequestInvoker async path and TestableDateTimeProvider.DeadTime (#195)
RequestAsync previously wrapped the sync Request in Task.FromResult, which meant PostData.StreamHandler's async writer was never invoked — leaving RequestBodyInBytes empty for callers using the async transport path. Now RequestAsync is truly async with its own HandleRulesAsync chain that calls InMemoryRequestInvoker.BuildResponseAsync (and thus PostData.WriteAsync). Also implement TestableDateTimeProvider.DeadTime with exponential backoff matching DefaultDateTimeProvider instead of throwing NotImplementedException, which crashed when content-type mismatches caused the transport to mark a node as dead. Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent a0b3e44 commit 1bfa90f

3 files changed

Lines changed: 204 additions & 5 deletions

File tree

src/Elastic.Transport.VirtualizedCluster/Components/VirtualClusterConnection.cs

Lines changed: 195 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,62 @@ private void UpdateCluster(VirtualCluster cluster)
118118
private bool IsPingRequest(Endpoint endpoint) => _productRegistration.IsPingRequest(endpoint);
119119

120120
/// <inheritdoc cref="IRequestInvoker.RequestAsync{TResponse}"/>>
121-
public Task<TResponse> RequestAsync<TResponse>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, CancellationToken cancellationToken)
122-
where TResponse : TransportResponse, new() =>
123-
Task.FromResult(Request<TResponse>(endpoint, boundConfiguration, postData));
121+
public async Task<TResponse> RequestAsync<TResponse>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, CancellationToken cancellationToken)
122+
where TResponse : TransportResponse, new()
123+
{
124+
if (!_calls.TryGetValue(endpoint.Uri.Port, out var state))
125+
throw new Exception($"Expected a call to happen on port {endpoint.Uri.Port} but received none");
126+
127+
try
128+
{
129+
if (IsSniffRequest(endpoint))
130+
{
131+
_ = Interlocked.Increment(ref state.Sniffed);
132+
return await HandleRulesAsync<TResponse, ISniffRule>(
133+
endpoint,
134+
boundConfiguration,
135+
postData,
136+
nameof(VirtualCluster.Sniff),
137+
_cluster.SniffingRules,
138+
boundConfiguration.RequestTimeout,
139+
r => UpdateCluster(r.NewClusterState),
140+
_ => _productRegistration.CreateSniffResponseBytes(_cluster.Nodes, _cluster.ElasticsearchVersion, _cluster.PublishAddressOverride, _cluster.SniffShouldReturnFqnd),
141+
cancellationToken
142+
).ConfigureAwait(false);
143+
}
144+
if (IsPingRequest(endpoint))
145+
{
146+
_ = Interlocked.Increment(ref state.Pinged);
147+
return await HandleRulesAsync<TResponse, IRule>(
148+
endpoint,
149+
boundConfiguration,
150+
postData,
151+
nameof(VirtualCluster.Ping),
152+
_cluster.PingingRules,
153+
boundConfiguration.PingTimeout,
154+
_ => { },
155+
_ => null, //HEAD request
156+
cancellationToken
157+
).ConfigureAwait(false);
158+
}
159+
_ = Interlocked.Increment(ref state.Called);
160+
return await HandleRulesAsync<TResponse, IClientCallRule>(
161+
endpoint,
162+
boundConfiguration,
163+
postData,
164+
nameof(VirtualCluster.ClientCalls),
165+
_cluster.ClientCallRules,
166+
boundConfiguration.RequestTimeout,
167+
_ => { },
168+
CallResponse,
169+
cancellationToken
170+
).ConfigureAwait(false);
171+
}
172+
catch (TheException e)
173+
{
174+
return ResponseFactory.Create<TResponse>(endpoint, boundConfiguration, postData, e, null, null, Stream.Null, null, -1, null, null);
175+
}
176+
}
124177

125178
/// <inheritdoc cref="IRequestInvoker.Request{TResponse}"/>>
126179
public TResponse Request<TResponse>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData)
@@ -177,6 +230,8 @@ public TResponse Request<TResponse>(Endpoint endpoint, BoundConfiguration boundC
177230
}
178231
}
179232

233+
// --- Sync path ---
234+
180235
private TResponse HandleRules<TResponse, TRule>(
181236
Endpoint endpoint,
182237
BoundConfiguration boundConfiguration,
@@ -316,6 +371,143 @@ TRule rule
316371
return _inMemoryRequestInvoker.BuildResponse<TResponse>(endpoint, boundConfiguration, postData, successResponse(rule), contentType: rule.ReturnContentType);
317372
}
318373

374+
// --- Async path ---
375+
376+
private async Task<TResponse> HandleRulesAsync<TResponse, TRule>(
377+
Endpoint endpoint,
378+
BoundConfiguration boundConfiguration,
379+
PostData? postData,
380+
string origin,
381+
IList<TRule> rules,
382+
TimeSpan timeout,
383+
Action<TRule> beforeReturn,
384+
Func<TRule, byte[]?> successResponse,
385+
CancellationToken cancellationToken
386+
)
387+
where TResponse : TransportResponse, new()
388+
where TRule : IRule
389+
{
390+
if (rules.Count == 0)
391+
throw new Exception($"No {origin} defined for the current VirtualCluster, so we do not know how to respond");
392+
393+
var (matched, response) = await TryMatchRulesAsync<TResponse, TRule>(rules.Where(s => s.OnPort.HasValue && s.PathFilter != null), endpoint, boundConfiguration, postData, timeout, beforeReturn, successResponse, cancellationToken).ConfigureAwait(false);
394+
if (matched) return response!;
395+
(matched, response) = await TryMatchRulesAsync<TResponse, TRule>(rules.Where(s => s.OnPort.HasValue && s.PathFilter == null), endpoint, boundConfiguration, postData, timeout, beforeReturn, successResponse, cancellationToken).ConfigureAwait(false);
396+
if (matched) return response!;
397+
(matched, response) = await TryMatchRulesAsync<TResponse, TRule>(rules.Where(s => !s.OnPort.HasValue && s.PathFilter != null), endpoint, boundConfiguration, postData, timeout, beforeReturn, successResponse, cancellationToken).ConfigureAwait(false);
398+
if (matched) return response!;
399+
(matched, response) = await TryMatchRulesAsync<TResponse, TRule>(rules.Where(s => !s.OnPort.HasValue && s.PathFilter == null), endpoint, boundConfiguration, postData, timeout, beforeReturn, successResponse, cancellationToken).ConfigureAwait(false);
400+
if (matched) return response!;
401+
402+
var count = _calls.Sum(kv => kv.Value.Called);
403+
throw new Exception($@"No global or port specific {origin} rule ({endpoint.Uri.Port}) matches any longer after {count} calls in to the cluster");
404+
}
405+
406+
private async Task<(bool matched, TResponse? response)> TryMatchRulesAsync<TResponse, TRule>(
407+
IEnumerable<TRule> rules,
408+
Endpoint endpoint,
409+
BoundConfiguration boundConfiguration,
410+
PostData? postData,
411+
TimeSpan timeout,
412+
Action<TRule> beforeReturn,
413+
Func<TRule, byte[]?> successResponse,
414+
CancellationToken cancellationToken
415+
)
416+
where TResponse : TransportResponse, new()
417+
where TRule : IRule
418+
{
419+
foreach (var rule in rules)
420+
{
421+
if (rule.OnPort.HasValue && rule.OnPort.Value != endpoint.Uri.Port) continue;
422+
if (rule.PathFilter != null && !rule.PathFilter(endpoint.PathAndQuery)) continue;
423+
424+
var always = rule.Times.Match(_ => true, _ => false);
425+
var times = rule.Times.Match(_ => -1, t => t);
426+
427+
if (always)
428+
return (true, await AlwaysAsync<TResponse, TRule>(endpoint, boundConfiguration, postData, timeout, beforeReturn, successResponse, rule, cancellationToken).ConfigureAwait(false));
429+
430+
if (rule.ExecuteCount > times) continue;
431+
432+
return (true, await SometimesAsync<TResponse, TRule>(endpoint, boundConfiguration, postData, timeout, beforeReturn, successResponse, rule, cancellationToken).ConfigureAwait(false));
433+
}
434+
return (false, default);
435+
}
436+
437+
private async Task<TResponse> AlwaysAsync<TResponse, TRule>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, TimeSpan timeout, Action<TRule> beforeReturn, Func<TRule, byte[]?> successResponse, TRule rule, CancellationToken cancellationToken)
438+
where TResponse : TransportResponse, new()
439+
where TRule : IRule
440+
{
441+
if (rule.Takes.HasValue)
442+
{
443+
var time = timeout < rule.Takes.Value ? timeout : rule.Takes.Value;
444+
_dateTimeProvider.ChangeTime(d => d.Add(time));
445+
if (rule.Takes.Value > boundConfiguration.RequestTimeout)
446+
throw new TheException(
447+
$"Request timed out after {time} : call configured to take {rule.Takes.Value} while requestTimeout was: {timeout}");
448+
}
449+
450+
return rule.Succeeds
451+
? await SuccessAsync<TResponse, TRule>(endpoint, boundConfiguration, postData, beforeReturn, successResponse, rule, cancellationToken).ConfigureAwait(false)
452+
: await FailAsync<TResponse, TRule>(endpoint, boundConfiguration, postData, rule, cancellationToken).ConfigureAwait(false);
453+
}
454+
455+
private async Task<TResponse> SometimesAsync<TResponse, TRule>(
456+
Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, TimeSpan timeout, Action<TRule> beforeReturn, Func<TRule, byte[]?> successResponse, TRule rule, CancellationToken cancellationToken
457+
)
458+
where TResponse : TransportResponse, new()
459+
where TRule : IRule
460+
{
461+
if (rule.Takes.HasValue)
462+
{
463+
var time = timeout < rule.Takes.Value ? timeout : rule.Takes.Value;
464+
_dateTimeProvider.ChangeTime(d => d.Add(time));
465+
if (rule.Takes.Value > boundConfiguration.RequestTimeout)
466+
throw new TheException(
467+
$"Request timed out after {time} : call configured to take {rule.Takes.Value} while requestTimeout was: {timeout}");
468+
}
469+
470+
if (rule.Succeeds)
471+
return await SuccessAsync<TResponse, TRule>(endpoint, boundConfiguration, postData, beforeReturn, successResponse, rule, cancellationToken).ConfigureAwait(false);
472+
473+
return await FailAsync<TResponse, TRule>(endpoint, boundConfiguration, postData, rule, cancellationToken).ConfigureAwait(false);
474+
}
475+
476+
private async Task<TResponse> FailAsync<TResponse, TRule>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, TRule rule, CancellationToken cancellationToken, RuleOption<Exception, int>? returnOverride = null)
477+
where TResponse : TransportResponse, new()
478+
where TRule : IRule
479+
{
480+
var state = _calls[endpoint.Uri.Port];
481+
_ = Interlocked.Increment(ref state.Failures);
482+
var ret = returnOverride ?? rule.Return;
483+
rule.RecordExecuted();
484+
485+
if (ret == null)
486+
throw new TheException();
487+
488+
return await ret.Match<Task<TResponse>>(
489+
e => throw e,
490+
async statusCode => await _inMemoryRequestInvoker.BuildResponseAsync<TResponse>(endpoint, boundConfiguration, postData, cancellationToken, CallResponse(rule),
491+
statusCode is >= 200 and < 300 ? 502 : statusCode, rule.ReturnContentType).ConfigureAwait(false)
492+
).ConfigureAwait(false);
493+
}
494+
495+
private async Task<TResponse> SuccessAsync<TResponse, TRule>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, Action<TRule> beforeReturn, Func<TRule, byte[]?> successResponse,
496+
TRule rule, CancellationToken cancellationToken
497+
)
498+
where TResponse : TransportResponse, new()
499+
where TRule : IRule
500+
{
501+
var state = _calls[endpoint.Uri.Port];
502+
_ = Interlocked.Increment(ref state.Successes);
503+
rule.RecordExecuted();
504+
505+
beforeReturn.Invoke(rule);
506+
return await _inMemoryRequestInvoker.BuildResponseAsync<TResponse>(endpoint, boundConfiguration, postData, cancellationToken, successResponse(rule), contentType: rule.ReturnContentType).ConfigureAwait(false);
507+
}
508+
509+
// --- Shared helpers ---
510+
319511
private static byte[] CallResponse<TRule>(TRule rule)
320512
where TRule : IRule
321513
{

src/Elastic.Transport.VirtualizedCluster/Providers/TestableDateTimeProvider.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,12 @@ public sealed class TestableDateTimeProvider : DateTimeProvider
2020
/// <param name="change">A fun that gets passed the current <see cref="Now"/> and needs to return the new value</param>
2121
public void ChangeTime(Func<DateTimeOffset, DateTimeOffset> change) => MutableNow = change(MutableNow);
2222

23-
public override DateTimeOffset DeadTime(int attempts, TimeSpan? minDeadTimeout, TimeSpan? maxDeadTimeout) => throw new NotImplementedException();
23+
/// <inheritdoc cref="DateTimeProvider.DeadTime"/>
24+
public override DateTimeOffset DeadTime(int attempts, TimeSpan? minDeadTimeout, TimeSpan? maxDeadTimeout)
25+
{
26+
var timeout = minDeadTimeout.GetValueOrDefault(TimeSpan.FromSeconds(60));
27+
var maxTimeout = maxDeadTimeout.GetValueOrDefault(TimeSpan.FromMinutes(30));
28+
var milliSeconds = Math.Min(timeout.TotalMilliseconds * 2 * Math.Pow(2, (attempts * 0.5) - 1), maxTimeout.TotalMilliseconds);
29+
return Now().AddMilliseconds(milliSeconds);
30+
}
2431
}

src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public async Task<TResponse> BuildResponseAsync<TResponse>(Endpoint endpoint, Bo
124124
Stream responseStream = body != null ? boundConfiguration.MemoryStreamFactory.Create(body) : boundConfiguration.MemoryStreamFactory.Create(EmptyBody);
125125

126126
return await ResponseFactory
127-
.CreateAsync<TResponse>(endpoint, boundConfiguration, postData, _exception, sc, _headers, responseStream, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)
127+
.CreateAsync<TResponse>(endpoint, boundConfiguration, postData, _exception, sc, _headers, responseStream, contentType ?? _contentType ?? BoundConfiguration.DefaultContentType, body?.Length ?? 0, null, null, cancellationToken)
128128
.ConfigureAwait(false);
129129
}
130130
}

0 commit comments

Comments
 (0)