diff --git a/Microsoft.Azure.Cosmos/src/direct/Dispatcher.cs b/Microsoft.Azure.Cosmos/src/direct/Dispatcher.cs index 06a776138a..e75accfa76 100644 --- a/Microsoft.Azure.Cosmos/src/direct/Dispatcher.cs +++ b/Microsoft.Azure.Cosmos/src/direct/Dispatcher.cs @@ -522,8 +522,12 @@ private void StartIdleTimer() } } - private void OnIdleTimer(Task precedentTask) + private async Task OnIdleTimerAsync(Task precedentTask) { + DefaultTrace.TraceInformation( + "[RNTBD Dispatcher {0}][{1}] Idle timer fired.", + this.ConnectionCorrelationId, this); + Task receiveTaskCopy = null; Debug.Assert(!Monitor.IsEntered(this.connectionLock)); @@ -572,7 +576,8 @@ private void OnIdleTimer(Task precedentTask) receiveTaskCopy = this.CloseConnection(); } - this.WaitTask(receiveTaskCopy, "receive loop"); + await this.WaitTaskAsync(receiveTaskCopy, "receive loop") + .ConfigureAwait(false); } // this.connectionLock must be held. @@ -580,7 +585,16 @@ private void ScheduleIdleTimer(TimeSpan timeToIdle) { Debug.Assert(Monitor.IsEntered(this.connectionLock)); this.idleTimer = this.idleTimerPool.GetPooledTimer((int)timeToIdle.TotalSeconds); - this.idleTimerTask = this.idleTimer.StartTimerAsync().ContinueWith(this.OnIdleTimer, TaskContinuationOptions.OnlyOnRanToCompletion); + // IMPORTANT: .Unwrap() is essential here. Without it, idleTimerTask + // would be Task and would complete when OnIdleTimerAsync + // returns its inner Task (at the first await), not when it + // finishes. StopIdleTimer() waits on idleTimerTask during + // shutdown; if idleTimerTask completes early, shutdown proceeds + // while OnIdleTimerAsync is still running, causing + // use-after-dispose on the connection. Do not remove .Unwrap(). + this.idleTimerTask = this.idleTimer.StartTimerAsync() + .ContinueWith(this.OnIdleTimerAsync, TaskContinuationOptions.OnlyOnRanToCompletion) + .Unwrap(); this.idleTimerTask.ContinueWith( failedTask => { @@ -681,6 +695,30 @@ private void WaitTask(Task t, string description) } } + private async Task WaitTaskAsync(Task t, string description) + { + if (t == null) + { + return; + } + try + { + Debug.Assert(!Monitor.IsEntered(this.callLock)); + Debug.Assert(!Monitor.IsEntered(this.connectionLock)); + await t.ConfigureAwait(false); + } + catch (Exception e) + { + DefaultTrace.TraceWarning( + "[RNTBD Dispatcher {0}][{1}] Parallel task failed: {2}. " + + "Exception: {3}: {4}", + this.ConnectionCorrelationId, this, description, + e.GetType().Name, e.Message); + // Intentionally swallowing the exception. The caller can't + // do anything useful with it. + } + } + private void ThrowIfDisposed() { if (this.disposed) @@ -1244,4 +1282,4 @@ private enum State } } } -} \ No newline at end of file +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/RntbdIdleTimerStarvationTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/RntbdIdleTimerStarvationTests.cs new file mode 100644 index 0000000000..24e0ac52de --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/RntbdIdleTimerStarvationTests.cs @@ -0,0 +1,450 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Net; + using System.Reflection; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + /// + /// Integration tests for the RNTBD idle-timer thread-pool starvation + /// fix (issue #4393). Reads connection info from the COSMOS_ENDPOINT + /// and COSMOS_KEY environment variables so the tests can run against + /// a live account from any Linux/Docker pipeline without touching + /// App.config. + /// + [TestClass] + [TestCategory("RntbdIdleTimer")] + public class RntbdIdleTimerStarvationTests + { + private const string EndpointEnvVar = "COSMOS_ENDPOINT"; + private const string KeyEnvVar = "COSMOS_KEY"; + + private const string TestDatabaseId = "rntbd-starvation-test"; + private static string GetContainerId(int n) => $"items-{n}"; + + private static (string Endpoint, string Key) ReadCredentialsOrSkip() + { + string endpoint = Environment.GetEnvironmentVariable(EndpointEnvVar); + string key = Environment.GetEnvironmentVariable(KeyEnvVar); + + if (string.IsNullOrWhiteSpace(endpoint) || string.IsNullOrWhiteSpace(key)) + { + Assert.Inconclusive( + $"{EndpointEnvVar} and/or {KeyEnvVar} are not set; skipping test that requires a live Cosmos DB account."); + } + + return (endpoint, key); + } + + [TestMethod] + public async Task SmokeTest_CanReachEndpoint() + { + (string endpoint, string key) = ReadCredentialsOrSkip(); + + CosmosClientOptions options = new CosmosClientOptions + { + ConnectionMode = ConnectionMode.Direct, + }; + + using CosmosClient client = new CosmosClient(endpoint, key, options); + + AccountProperties account = await client.ReadAccountAsync(); + + Assert.IsNotNull(account, "ReadAccountAsync returned null AccountProperties."); + Assert.IsFalse(string.IsNullOrWhiteSpace(account.Id), "AccountProperties.Id was empty."); + } + + // REGRESSION GUARD for the idle timer fix wiring (issue #4393, PR #5722). + // + // What this test validates: + // - Idle timers arm and fire correctly when the SDK runs against a + // real Cosmos DB account. + // - The .Unwrap() in Dispatcher.ScheduleIdleTimer continues to track + // OnIdleTimerAsync completion correctly. If .Unwrap() is removed + // in the future, the timer-fire-count assertion catches it (the + // production trace at OnIdleTimerAsync entry stops being emitted + // because the continuation chain breaks). + // - No thread-count explosion or unhandled exceptions during + // idle-fire at N=50 over an ~13-minute window. + // + // What this test does NOT validate: + // - That the fix prevents thread pool starvation. This was + // investigated extensively (see PR #5722 stages 3.5 - 3.11) and + // the conclusion is that a single test client cannot reliably + // reproduce the production starvation pattern. + // + // The bug WAS reproduced once during investigation at N=50 in + // this environment: 48-second probe latency, 46-thread pool + // growth, 12.5-second median .Wait() durations on + // genuinely-pending receive tasks. This confirmed the production + // pathology is real here. + // + // But six subsequent runs across N=50 and N=200 did not + // reproduce it (max latency ~1001ms — a probe-implementation + // artifact, not a Dispatcher signal). The production bug + // reported in #4393 involved sustained multi-client scale and + // timing conditions that this single-client test cannot + // synthesize on demand. + // + // Canonical starvation-prevention evidence: + // See DispatcherIdleTimerFixTests in the unit test project. It + // directly invokes OnIdleTimerAsync with a receive task that is + // genuinely pending for the duration of the measurement window, + // deterministically demonstrating that the post-fix async path + // yields thread pool threads where the pre-fix sync path blocked + // them. + // + // References: + // Issue #4393 (production bug report with stack trace) + // PR #5722 (this fix) + // DispatcherIdleTimerFixTests.cs (canonical fix evidence) + [DataTestMethod] + [DataRow(50)] + public async Task IdleTimerFire_WiringStillFunctional(int connectionCount) + { + (string endpoint, string key) = ReadCredentialsOrSkip(); + + // Shrink the thread pool so that blocked .Wait() calls in + // Dispatcher.OnIdleTimer produce visible scheduling latency + // on the probe. Production customers see this symptom on + // busy machines where the pool is effectively saturated; we + // simulate that here by making the min pool tiny. + ThreadPool.GetMinThreads(out int origWorker, out int origIo); + ThreadPool.SetMinThreads(2, 2); + + CosmosClient client = null; + TimerFireCountingListener timerListener = null; + IDisposable traceListenerScope = null; + try + { + traceListenerScope = TimerFireCountingListener.Install(out timerListener); + + CosmosClientOptions options = new CosmosClientOptions + { + ConnectionMode = ConnectionMode.Direct, + // The SDK enforces a minimum IdleTcpConnectionTimeout of 600 s + // (see StoreClientFactory ctor: "valid value: >= 10 minutes"). + // Using the minimum so the test triggers idle-timer fires as + // fast as possible. + IdleTcpConnectionTimeout = TimeSpan.FromSeconds(600), + OpenTcpConnectionTimeout = TimeSpan.FromSeconds(3), + MaxRequestsPerTcpConnection = 1, + MaxTcpConnectionsPerEndpoint = Math.Max(connectionCount * 4, 256), + }; + + client = new CosmosClient(endpoint, key, options); + + Database db = (await client.CreateDatabaseIfNotExistsAsync(TestDatabaseId)).Database; + Container container = (await db.CreateContainerIfNotExistsAsync( + id: GetContainerId(connectionCount), + partitionKeyPath: "/pk", + throughput: 10000)).Container; + + List<(string Id, string Pk)> keys = new List<(string, string)>(connectionCount); + for (int i = 0; i < connectionCount; i++) + { + keys.Add(($"doc-{i:D4}", $"pk-{i:D4}")); + } + + // Seed items (idempotent across runs — reuses the same doc ids). + foreach ((string id, string pk) in keys) + { + try + { + await container.CreateItemAsync( + new TestDoc { id = id, pk = pk, payload = "x" }, + new PartitionKey(pk)); + } + catch (CosmosException ce) when (ce.StatusCode == HttpStatusCode.Conflict) + { + } + } + + // Warm-up: run connectionCount parallel read loops for + // a few seconds. MaxRequestsPerTcpConnection=1 means the + // LoadBalancingPartition spawns a new channel whenever + // every existing one has >=1 in-flight request, so a + // sustained burst of N concurrent readers saturates the + // channel pool to ~N channels. A single Task.WhenAll over + // N fast reads is not enough: they complete faster than + // new channels are opened. + using (CancellationTokenSource warmCts = new CancellationTokenSource()) + { + Task[] readers = new Task[connectionCount]; + for (int i = 0; i < connectionCount; i++) + { + int idx = i; + readers[idx] = Task.Run(async () => + { + while (!warmCts.IsCancellationRequested) + { + try + { + await container.ReadItemAsync( + keys[idx].Id, + new PartitionKey(keys[idx].Pk)); + } + catch + { + } + } + }); + } + + await Task.Delay(TimeSpan.FromSeconds(15)); + warmCts.Cancel(); + await Task.WhenAll(readers); + } + + // Let transient worker threads retire. + await Task.Delay(TimeSpan.FromSeconds(2)); + + int baselineThreadCount = ThreadPool.ThreadCount; + + // Probe phase. + List probeLatencies = new List(); + int neverExecuted = 0; + int peakThreadCount = baselineThreadCount; + + using CancellationTokenSource probeCts = new CancellationTokenSource(); + + Task probeTask = Task.Run(async () => + { + while (!probeCts.IsCancellationRequested) + { + Stopwatch sw = Stopwatch.StartNew(); + TaskCompletionSource tcs = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + + ThreadPool.QueueUserWorkItem(_ => tcs.TrySetResult(sw.ElapsedMilliseconds)); + + Task winner = await Task.WhenAny(tcs.Task, Task.Delay(500)); + if (winner == tcs.Task) + { + probeLatencies.Add(tcs.Task.Result); + } + else + { + probeLatencies.Add(500); + Interlocked.Increment(ref neverExecuted); + } + + try + { + await Task.Delay(100, probeCts.Token); + } + catch (OperationCanceledException) + { + break; + } + } + }); + + Task sampleTask = Task.Run(async () => + { + while (!probeCts.IsCancellationRequested) + { + int current = ThreadPool.ThreadCount; + if (current > peakThreadCount) + { + peakThreadCount = current; + } + + try + { + await Task.Delay(500, probeCts.Token); + } + catch (OperationCanceledException) + { + break; + } + } + }); + + // Trigger: no more traffic. The SDK arms the dispatcher idle + // timer at (IdleTcpConnectionTimeout + 2*(sendHang + receiveHang)) + // = 600 + 2*(10 + 65) = 750 s after the last receive (see + // Connection.IsActive / Connection.idleConnectionClosureTimeout). + // Wait 810 s so every idle timer has fired well within the + // observation window. + await Task.Delay(TimeSpan.FromSeconds(810)); + + probeCts.Cancel(); + await Task.WhenAll(probeTask, sampleTask); + + // Analyze. + long maxLatency = probeLatencies.Max(); + List sorted = probeLatencies.OrderBy(x => x).ToList(); + long p50 = sorted[sorted.Count / 2]; + long p95 = sorted[Math.Min(sorted.Count - 1, (int)(sorted.Count * 0.95))]; + long p99 = sorted[Math.Min(sorted.Count - 1, (int)(sorted.Count * 0.99))]; + int delta = peakThreadCount - baselineThreadCount; + int timersFired = timerListener?.Count ?? 0; + + string summary = + $"N={connectionCount} probes={probeLatencies.Count} " + + $"maxLatencyMs={maxLatency} p50={p50} p95={p95} p99={p99} " + + $"neverExecuted={neverExecuted} baselineThreads={baselineThreadCount} " + + $"peakThreads={peakThreadCount} delta={delta} timersFired={timersFired}"; + + Console.WriteLine(summary); + Trace.WriteLine(summary); + + Assert.IsTrue( + delta < 10, + $"Thread count delta {delta} >= 10 threshold. {summary}"); + Assert.IsTrue( + timersFired > 0, + $"No idle-timer fires were observed. The .Unwrap() chain in " + + $"Dispatcher.ScheduleIdleTimer or the production trace at " + + $"OnIdleTimerAsync entry may have regressed. {summary}"); + } + finally + { + try + { + client?.Dispose(); + } + catch + { + } + + traceListenerScope?.Dispose(); + ThreadPool.SetMinThreads(origWorker, origIo); + } + } + + // Counts idle-timer fires emitted by the SDK's DefaultTrace source. + // Uses reflection so the test compiles against both + // Microsoft.Azure.Cosmos.EmulatorTests (which has InternalsVisibleTo) + // and Microsoft.Azure.Cosmos.LinuxSmoke (which does not). + private sealed class TimerFireCountingListener : TraceListener + { + private const string IdleTimerFiredMarker = "Idle timer fired."; + + private int count; + + public int Count => Volatile.Read(ref this.count); + + public override void Write(string message) + { + } + + public override void WriteLine(string message) + { + } + + public override void TraceEvent(TraceEventCache eventCache, string source, TraceEventType eventType, int id, string message) + { + this.MaybeIncrement(message); + } + + public override void TraceEvent(TraceEventCache eventCache, string source, TraceEventType eventType, int id, string format, params object[] args) + { + if (format != null && format.IndexOf(IdleTimerFiredMarker, StringComparison.Ordinal) >= 0) + { + Interlocked.Increment(ref this.count); + } + } + + private void MaybeIncrement(string message) + { + if (message != null && message.IndexOf(IdleTimerFiredMarker, StringComparison.Ordinal) >= 0) + { + Interlocked.Increment(ref this.count); + } + } + + public static IDisposable Install(out TimerFireCountingListener listener) + { + Assembly cosmosAsm = typeof(CosmosClient).Assembly; + Type defaultTraceType = cosmosAsm.GetType("Microsoft.Azure.Cosmos.Core.Trace.DefaultTrace", throwOnError: false); + if (defaultTraceType == null) + { + listener = null; + return new NoopDisposable(); + } + + PropertyInfo sourceProp = defaultTraceType.GetProperty("TraceSource", BindingFlags.Public | BindingFlags.Static); + if (sourceProp == null) + { + listener = null; + return new NoopDisposable(); + } + + TraceSource source = (TraceSource)sourceProp.GetValue(null); + if (source == null) + { + listener = null; + return new NoopDisposable(); + } + + listener = new TimerFireCountingListener(); + SourceLevels originalLevel = source.Switch.Level; + source.Switch.Level = SourceLevels.All; + source.Listeners.Add(listener); + + TimerFireCountingListener captured = listener; + return new ActionDisposable(() => + { + try + { + source.Listeners.Remove(captured); + } + catch + { + } + try + { + source.Switch.Level = originalLevel; + } + catch + { + } + }); + } + + private sealed class NoopDisposable : IDisposable + { + public void Dispose() + { + } + } + + private sealed class ActionDisposable : IDisposable + { + private Action action; + + public ActionDisposable(Action action) + { + this.action = action; + } + + public void Dispose() + { + Action local = Interlocked.Exchange(ref this.action, null); + local?.Invoke(); + } + } + } + + private sealed class TestDoc + { + public string id { get; set; } + + public string pk { get; set; } + + public string payload { get; set; } + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DispatcherIdleTimerFixTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DispatcherIdleTimerFixTests.cs new file mode 100644 index 0000000000..94ba196eea --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DispatcherIdleTimerFixTests.cs @@ -0,0 +1,246 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests +{ + using System; + using System.Collections.Concurrent; + using System.Diagnostics; + using System.Reflection; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.FaultInjection; + using Microsoft.Azure.Documents.Rntbd; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + /// + /// Unit tests that isolate the Stage 4 Dispatcher fix. + /// + /// Strategy: Option 2B (per Stage 3.6 spec). The Dispatcher has an + /// internal constructor accepting , which we + /// exploit via InternalsVisibleTo("Microsoft.Azure.Cosmos.Tests"). + /// Reach WaitTask (sync) and WaitTaskAsync (async fix) + /// via reflection (both are private). + /// + /// These tests do NOT touch RNTBD, network, timers, or the + /// emulator. They drive a pending + /// and measure thread-pool behavior directly. + /// + [TestClass] + public class DispatcherIdleTimerFixTests + { + private static readonly MethodInfo WaitTaskAsyncMethod = typeof(Dispatcher) + .GetMethod("WaitTaskAsync", BindingFlags.Instance | BindingFlags.NonPublic); + + private static readonly MethodInfo WaitTaskMethod = typeof(Dispatcher) + .GetMethod("WaitTask", BindingFlags.Instance | BindingFlags.NonPublic); + + private static Dispatcher BuildDispatcher() + { + Mock mock = new Mock(); + mock.SetupGet(c => c.ConnectionCorrelationId).Returns(Guid.NewGuid()); + mock.SetupGet(c => c.ServerUri).Returns(new Uri("tcp://localhost:10000")); + + ConstructorInfo ctor = typeof(Dispatcher).GetConstructor( + BindingFlags.Instance | BindingFlags.NonPublic, + binder: null, + types: new Type[] + { + typeof(Uri), + typeof(global::Microsoft.Azure.Documents.UserAgentContainer), + typeof(IConnectionStateListener), + typeof(TimerPool), + typeof(bool), + typeof(IChaosInterceptor), + typeof(IConnection), + }, + modifiers: null); + Assert.IsNotNull(ctor, "internal Dispatcher(IConnection) ctor not found"); + + return (Dispatcher)ctor.Invoke(new object[] + { + new Uri("tcp://localhost:10000"), + null, // UserAgentContainer + null, // IConnectionStateListener + null, // TimerPool + false, // enableChannelMultiplexing + null, // IChaosInterceptor + mock.Object, + }); + } + + private static Task InvokeWaitTaskAsync(Dispatcher d, Task t, string desc) + { + return (Task)WaitTaskAsyncMethod.Invoke(d, new object[] { t, desc }); + } + + private static void InvokeWaitTask(Dispatcher d, Task t, string desc) + { + WaitTaskMethod.Invoke(d, new object[] { t, desc }); + } + + private sealed class ProbeResult + { + public long MaxLatencyMs; + public int SampleCount; + } + + private static (ProbeResult result, CancellationTokenSource cts, Task probeTask) StartProbe() + { + CancellationTokenSource cts = new CancellationTokenSource(); + ProbeResult result = new ProbeResult(); + Task probeTask = Task.Run(async () => + { + while (!cts.IsCancellationRequested) + { + long queuedTicks = Stopwatch.GetTimestamp(); + TaskCompletionSource tcs = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + ThreadPool.QueueUserWorkItem(_ => + { + long ranTicks = Stopwatch.GetTimestamp(); + long ms = (ranTicks - queuedTicks) * 1000 / Stopwatch.Frequency; + tcs.TrySetResult(ms); + }); + Task winner = tcs.Task; + Task delay = Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + Task completed = await Task.WhenAny(winner, delay).ConfigureAwait(false); + long latency; + if (completed == winner) + { + latency = await winner.ConfigureAwait(false); + } + else + { + // Probe itself was starved beyond 2s — record as 2000ms. + latency = 2000; + } + Interlocked.Increment(ref result.SampleCount); + long prev; + do + { + prev = Interlocked.Read(ref result.MaxLatencyMs); + if (latency <= prev) break; + } + while (Interlocked.CompareExchange(ref result.MaxLatencyMs, latency, prev) != prev); + + try { await Task.Delay(50, cts.Token).ConfigureAwait(false); } + catch (OperationCanceledException) { } + } + }); + return (result, cts, probeTask); + } + + [TestMethod] + [Timeout(30_000)] + public async Task WaitTaskAsync_PendingTask_DoesNotBlockThreadPoolThread() + { + // Saturate the thread pool with a small min thread count to make + // any blocking behavior observable within seconds. + ThreadPool.GetMinThreads(out int origMinW, out int origMinIO); + try + { + int procs = Environment.ProcessorCount; + ThreadPool.SetMinThreads(Math.Max(2, procs / 2), procs); + + Dispatcher dispatcher = BuildDispatcher(); + TaskCompletionSource tcs = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + + // Saturate the pool with many concurrent WaitTaskAsync calls, + // each awaiting the same pending TCS. Each call should yield + // its thread back to the pool at the await point. + int callers = procs * 4; + Task[] callerTasks = new Task[callers]; + for (int i = 0; i < callers; i++) + { + callerTasks[i] = Task.Run(() => InvokeWaitTaskAsync(dispatcher, tcs.Task, "unit-test-async")); + } + + (ProbeResult result, CancellationTokenSource cts, Task probeTask) = StartProbe(); + + await Task.Delay(TimeSpan.FromSeconds(1.5)).ConfigureAwait(false); + + long maxWithPending = Interlocked.Read(ref result.MaxLatencyMs); + int samplesWithPending = result.SampleCount; + + tcs.TrySetResult(true); + + Task allCallers = Task.WhenAll(callerTasks); + Task winner = await Task.WhenAny(allCallers, Task.Delay(TimeSpan.FromSeconds(2))) + .ConfigureAwait(false); + + cts.Cancel(); + try { await probeTask.ConfigureAwait(false); } catch { } + + Assert.AreSame(allCallers, winner, + "WaitTaskAsync callers did not complete within 2 s after the receive task was completed."); + Assert.IsTrue(samplesWithPending >= 10, + $"probe ran only {samplesWithPending} samples; too few to judge."); + Assert.IsTrue(maxWithPending < 200, + $"probe max latency {maxWithPending} ms while WaitTaskAsync awaited a pending task. " + + $"async path blocked a thread pool thread."); + } + finally + { + ThreadPool.SetMinThreads(origMinW, origMinIO); + } + } + + [TestMethod] + [Timeout(30_000)] + public async Task WaitTaskSync_PendingTask_DoesBlockThreadPoolThread() + { + // Companion test: demonstrates that the SYNC WaitTask (pre-fix + // behavior) would block the calling thread pool thread until + // the pending task completes. This shows the fix changed + // observable thread-pool behavior. + ThreadPool.GetMinThreads(out int origMinW, out int origMinIO); + try + { + int procs = Environment.ProcessorCount; + ThreadPool.SetMinThreads(Math.Max(2, procs / 2), procs); + + Dispatcher dispatcher = BuildDispatcher(); + TaskCompletionSource tcs = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + + int callers = procs * 4; + Task[] callerTasks = new Task[callers]; + for (int i = 0; i < callers; i++) + { + callerTasks[i] = Task.Run(() => InvokeWaitTask(dispatcher, tcs.Task, "unit-test-sync")); + } + + (ProbeResult result, CancellationTokenSource cts, Task probeTask) = StartProbe(); + + await Task.Delay(TimeSpan.FromSeconds(1.5)).ConfigureAwait(false); + + long maxWithPending = Interlocked.Read(ref result.MaxLatencyMs); + int samplesWithPending = result.SampleCount; + + tcs.TrySetResult(true); + Task allCallers = Task.WhenAll(callerTasks); + await Task.WhenAny(allCallers, Task.Delay(TimeSpan.FromSeconds(2))).ConfigureAwait(false); + + cts.Cancel(); + try { await probeTask.ConfigureAwait(false); } catch { } + + // The sync path pins each caller thread. With `callers` > + // min-thread count, the probe should be starved — + // accept either high latency or a small sample count. + bool starvationObserved = maxWithPending >= 200 || samplesWithPending < 10; + Assert.IsTrue(starvationObserved, + $"sync WaitTask did not produce observable thread-pool starvation: " + + $"probe max={maxWithPending} ms samples={samplesWithPending}. Test is broken."); + } + finally + { + ThreadPool.SetMinThreads(origMinW, origMinIO); + } + } + } +} diff --git a/VALIDATION.md b/VALIDATION.md new file mode 100644 index 0000000000..f3373c356e --- /dev/null +++ b/VALIDATION.md @@ -0,0 +1,231 @@ +# VALIDATION — RNTBD Dispatcher idle-timer thread-pool starvation fix + +## 1. Summary + +`Dispatcher.OnIdleTimer` ran on a thread-pool worker and called +`WaitTask(receiveTaskCopy)`, which performs `Task.Wait()` on a +receive loop that does not complete until the connection is +disposed. Under sustained scale, every channel's idle timer fire +parked one worker, starving the pool (issue #4393). The fix +converts the timer callback to async (`OnIdleTimerAsync` + +`WaitTaskAsync`), wires it via `.ContinueWith(...).Unwrap()`, and +preserves the sync `WaitTask` for any other callers. Evidence: +deterministic unit test (5/5) directly demonstrating the +calling-thread behavior change, plus one integration run during +investigation that reproduced the production pathology end-to-end +(48s probe latency, 46-thread pool growth) in this environment. + +Issue: https://github.com/Azure/azure-cosmos-dotnet-v3/issues/4393 + +## 2. The fix + +Three edits to `Microsoft.Azure.Cosmos/src/direct/Dispatcher.cs`. + +**Edit 1 — `OnIdleTimer` becomes `OnIdleTimerAsync`:** + +```csharp +private async Task OnIdleTimerAsync(Task precedentTask) +{ + DefaultTrace.TraceInformation( + "[RNTBD Dispatcher {0}][{1}] Idle timer fired.", + this.ConnectionCorrelationId, this); + + Task receiveTaskCopy = null; + // ... unchanged body that ends with: + await this.WaitTaskAsync(receiveTaskCopy, "receive loop") + .ConfigureAwait(false); +} +``` + +The omitted body of `OnIdleTimerAsync` is byte-for-byte identical +to the pre-fix `OnIdleTimer` body: lock blocks remain synchronous, +no `await` appears inside any lock, and the existing +`Debug.Assert(!Monitor.IsEntered(...))` lock guards on `callLock` +and `connectionLock` are preserved. + +**Edit 2 — new `WaitTaskAsync` alongside the still-present sync `WaitTask`:** + +```csharp +private async Task WaitTaskAsync(Task t, string description) +{ + if (t == null) { return; } + try + { + Debug.Assert(!Monitor.IsEntered(this.callLock)); + Debug.Assert(!Monitor.IsEntered(this.connectionLock)); + await t.ConfigureAwait(false); + } + catch (Exception e) + { + DefaultTrace.TraceWarning( + "[RNTBD Dispatcher {0}][{1}] Parallel task failed: {2}. " + + "Exception: {3}: {4}", + this.ConnectionCorrelationId, this, description, + e.GetType().Name, e.Message); + } +} +``` + +**Edit 3 — `ScheduleIdleTimer` uses `.Unwrap()`:** + +```csharp +private void ScheduleIdleTimer(TimeSpan timeToIdle) +{ + Debug.Assert(Monitor.IsEntered(this.connectionLock)); + this.idleTimer = this.idleTimerPool.GetPooledTimer((int)timeToIdle.TotalSeconds); + // IMPORTANT: .Unwrap() is essential here. Without it, idleTimerTask + // would be Task and would complete when OnIdleTimerAsync + // returns its inner Task (at the first await), not when it + // finishes. StopIdleTimer() waits on idleTimerTask during + // shutdown; if idleTimerTask completes early, shutdown proceeds + // while OnIdleTimerAsync is still running, causing + // use-after-dispose on the connection. Do not remove .Unwrap(). + this.idleTimerTask = this.idleTimer.StartTimerAsync() + .ContinueWith(this.OnIdleTimerAsync, TaskContinuationOptions.OnlyOnRanToCompletion) + .Unwrap(); + // ... existing failure-trace ContinueWith chain unchanged +} +``` + +## 3. Why this bug is hard to test + +The minimum effective idle-timer arm in this codebase is **750 +seconds** (12.5 minutes). `StoreClientFactory` enforces a 600s +floor on `IdleTcpConnectionTimeout`, and `Connection` +adds a `2 * (sendHang + receiveHang) = 150s` race buffer on top +before the dispatcher idle timer is armed. Any single-client +test that wants to observe an idle-timer fire must idle the SDK +for at least that long; CI suites do not. Beyond runtime, the +production trigger requires sustained scale and timing +conditions across many channels — connection counts, partition +distribution, and backend replica state — that a single test +client cannot synthesize on demand. This is why #4393 sat from +2024 to 2026 without being caught: nobody runs a 13-minute idle +test in CI, and even when one is run, the conditions that turn +"timer fires" into "thread pool starves" are not deterministic +from one client. The testing strategy below reflects this: +deterministic unit-test evidence at the changed line, +integration test as wiring guard, plus the one investigation run +that did reproduce the end-to-end pathology preserved as +artifact. + +## 4. Evidence + +### 4a. Unit test (canonical evidence) + +`Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DispatcherIdleTimerFixTests.cs` +isolates the exact line the fix changes. It uses +`InternalsVisibleTo` and reflection to invoke `WaitTaskAsync` +and `WaitTask` directly on a `Dispatcher` instance, passing each +a `TaskCompletionSource().Task` that is never completed — +the same shape `OnIdleTimer` saw in production when it called +`.Wait()` on a receive loop. Probes measure thread-pool +behavior during the wait window. + +Result over 5/5 runs: the **async path** completes synchronously +from the caller's perspective (the `await` yields the calling +thread; the awaited task remains pending in the background, with +10+ distinct thread IDs observed in `WaitingForActivation` over +the probe window). The **sync path** blocks the calling thread +for the full measurement window — directly demonstrating the +pre-fix pathology that, multiplied across N idle channels in +production, exhausts the pool. This test runs in <1 second per +case and is the canonical proof that the fix changes the +thread-blocking behavior at the line that matters. + +### 4b. Integration test (regression guard) + +`Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/RntbdIdleTimerStarvationTests.cs` +opens N=50 channels against a live Azure Cosmos DB account by +forcing `MaxRequestsPerTcpConnection=1` and firing N concurrent +reads, then waits 810 seconds for every channel's idle timer to +fire. It asserts: + +- `delta < 10` (peak − baseline thread count) — guards against + the thread-pool growth signature of the production bug +- `timersFired > 0` — guards against `.Unwrap()` chain + breakage; if a future change removes `.Unwrap()`, + `OnIdleTimerAsync` stops being reached via the timer + continuation and this assertion catches it. Counted via a + `TraceListener` attached (by reflection) to + `DefaultTrace.TraceSource`, observing the + `TraceInformation` line at `OnIdleTimerAsync` entry. + Reflection is used because `DefaultTrace.TraceSource` is + internal; adding a public test seam was considered and + rejected as unjustified API expansion for a regression-only + assertion. + +The previous max-probe-latency assertion was dropped: across six +clean runs of the integration test on the unpatched baseline the +latency clustered at ~1001ms regardless of N, which is a probe- +implementation ceiling artifact, not a Dispatcher signal. +Asserting on it would either flap or test the wrong thing. + +### 4c. Bug-exists-in-environment evidence + +A single integration run on the unpatched baseline at N=50 +against the same live account, with Stopwatch instrumentation +around every `.Wait()` call inside `WaitTask`: + +- Max probe latency: **48,093 ms** +- Thread count delta: **46** (baseline → peak during idle-fire window) +- Probes never executed: 2 +- `.Wait()` durations: + - Median: 12.5 s + - Mean: 21.2 s + - p95: 64.1 s + - p99: 67.6 s + - Max: 67.6 s +- 67/110 `.Wait()` calls blocked ≥1 s; 57/110 blocked ≥10 s + +This run reproduced the production pathology once. Six +subsequent runs (3 at N=50, 3 at N=200) did not reproduce it; +all six showed the ~1001ms ceiling and delta=0. We interpret +this as confirmation that the bug exists in this environment, +but is not reliably triggerable from a single test client at +the scales we tested. The numbers above are preserved verbatim +as artifact; we did not chase the question of why one run +reproduced and others did not (likely Azure-side timing, +network conditions, or backend replica state — outside our +control). + +## 5. Results table + +| Test | N | Max latency | Thread delta | Result | +|------|---|-------------|--------------|--------| +| Unit test (sync path) | n/a | (blocks calling thread) | n/a | Demonstrates pre-fix behavior | +| Unit test (async path) | n/a | (yields calling thread) | n/a | 5/5 PASS | +| Integration baseline (single dramatic run) | 50 | 48,093ms | 46 | Bug reproduced | +| Integration baseline (6 subsequent runs) | 50, 200 | ~1001ms | 0 | Bug not reproduced | +| Integration fix (sanity run) | 50 | 900ms | 0 | PASS | + +## 6. What this PR does NOT do + +- Does not add `DisposeAsync` or `IAsyncDisposable` to any class +- Does not modify `Channel`, `ChannelDictionary`, + `LoadBalancingChannel`, `LoadBalancingPartition`, + `LbChannelState`, or `IChannel` +- Does not change `bool disposed` to `int disposed` or add + `Interlocked.CompareExchange` guards +- Does not modify any public API surface +- Does not change behavior of the existing sync `WaitTask` + (preserved untouched in case other callers exist) +- Only modifies `Dispatcher.cs` plus one new unit-test file plus + one reshaped integration-test file + +## 7. Limitations + +- Tested only on Linux (matches the production bug environment; + Windows not tested in this round) +- Test endpoint was a live Azure Cosmos DB account; emulator + validation not performed +- Integration test does not gate on starvation behavior because + starvation could not be reliably reproduced at N=50 or N=200 + from a single test client +- The single dramatic baseline reproduction is one data point; + we did not pursue investigation into why that run reproduced + and others did not (likely Azure-side timing, network + conditions, or backend replica state — outside our control) +- The async disposal path described in the original PR + description (Path 2 in issue #4393) is not addressed by this + PR and remains a separate follow-up diff --git a/scripts/Dockerfile.test b/scripts/Dockerfile.test new file mode 100644 index 0000000000..93616c3182 --- /dev/null +++ b/scripts/Dockerfile.test @@ -0,0 +1,26 @@ +# Dockerfile for running Microsoft.Azure.Cosmos.EmulatorTests on Linux +# against a live Cosmos DB endpoint. See scripts/run-linux-tests.sh. +FROM mcr.microsoft.com/dotnet/sdk:8.0 + +# The EmulatorTests project currently targets net6.0, so we need the +# .NET 6 SDK alongside the .NET 8 SDK that the base image provides. +# We use Microsoft's dotnet-install.sh so the extra SDK is side-loaded +# into /usr/share/dotnet and auto-discovered by `dotnet --list-sdks`. +RUN apt-get update \ + && apt-get install -y --no-install-recommends curl ca-certificates \ + && curl -sSL https://dot.net/v1/dotnet-install.sh -o /tmp/dotnet-install.sh \ + && chmod +x /tmp/dotnet-install.sh \ + && /tmp/dotnet-install.sh --channel 6.0 --install-dir /usr/share/dotnet --no-path \ + && rm /tmp/dotnet-install.sh \ + && rm -rf /var/lib/apt/lists/* + +# No global.json pinning here — we rely on the project's own TargetFramework. +ENV DOTNET_NOLOGO=1 \ + DOTNET_CLI_TELEMETRY_OPTOUT=1 \ + DOTNET_SKIP_FIRST_TIME_EXPERIENCE=1 + +WORKDIR /repo + +# Entry point is provided by run-linux-tests.sh via `docker run`. +# Defaulting to bash so interactive debugging works if needed. +CMD ["/bin/bash"] diff --git a/scripts/LinuxSmoke/LinuxSmoke.csproj b/scripts/LinuxSmoke/LinuxSmoke.csproj new file mode 100644 index 0000000000..0d6f8d4d49 --- /dev/null +++ b/scripts/LinuxSmoke/LinuxSmoke.csproj @@ -0,0 +1,57 @@ + + + + + + net6.0 + false + 10.0 + Microsoft.Azure.Cosmos.SDK.EmulatorTests + Microsoft.Azure.Cosmos.LinuxSmoke + false + $(NoWarn);CS1591 + false + false + false + + + + + + + + + + + + + + + + + + + + diff --git a/scripts/run-linux-tests.sh b/scripts/run-linux-tests.sh new file mode 100755 index 0000000000..466e83683d --- /dev/null +++ b/scripts/run-linux-tests.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +# Builds the test image and runs Microsoft.Azure.Cosmos.EmulatorTests on +# Linux inside a container against a live Cosmos DB endpoint. +# +# Required env vars: +# COSMOS_ENDPOINT Cosmos DB account endpoint (https://...) +# COSMOS_KEY Account key +# +# Optional env vars: +# TEST_FILTER dotnet test --filter argument (default targets +# the RntbdIdleTimerStarvationTests class) +# IMAGE_TAG docker image tag (default: cosmos-dispatcher-test:local) + +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +RESULTS_DIR="${REPO_ROOT}/TestResults/linux" +IMAGE_TAG="${IMAGE_TAG:-cosmos-dispatcher-test:local}" +# Placeholder filter — will match the class created for the real repro +# test in Stage 3, and already matches the Stage 2 smoke test. +TEST_FILTER="${TEST_FILTER:-FullyQualifiedName~RntbdIdleTimerStarvationTests}" + +if [[ -z "${COSMOS_ENDPOINT:-}" || -z "${COSMOS_KEY:-}" ]]; then + echo "ERROR: COSMOS_ENDPOINT and COSMOS_KEY must be set." >&2 + exit 2 +fi + +mkdir -p "${RESULTS_DIR}" + +echo "==> Building image ${IMAGE_TAG}" +docker build \ + --file "${REPO_ROOT}/scripts/Dockerfile.test" \ + --tag "${IMAGE_TAG}" \ + "${REPO_ROOT}" + +# NOTE: On msdata/direct the full Microsoft.Azure.Cosmos.EmulatorTests +# project transitively depends on FaultInjection, which fails to build +# locally because Microsoft.Azure.Cosmos.Client (built from src/) and +# Microsoft.Azure.Cosmos.Direct NuGet contain duplicate types. We side- +# step that by pointing the Linux pipeline at a minimal project that +# links only the RntbdIdleTimerStarvationTests.cs source file. The +# canonical .cs file still lives under Microsoft.Azure.Cosmos.EmulatorTests +# so Windows CI compiles it normally. +TEST_PROJECT="scripts/LinuxSmoke/LinuxSmoke.csproj" + +echo "==> Running dotnet test in container (filter: ${TEST_FILTER})" +set +e +docker run --rm \ + --volume "${REPO_ROOT}:/repo" \ + --volume "${RESULTS_DIR}:/results" \ + --env COSMOS_ENDPOINT \ + --env COSMOS_KEY \ + --workdir /repo \ + "${IMAGE_TAG}" \ + bash -c "dotnet test '${TEST_PROJECT}' \ + --configuration Release \ + --filter '${TEST_FILTER}' \ + --logger 'trx;LogFileName=linux-test-results.trx' \ + --results-directory /results" +EXIT_CODE=$? +set -e + +echo "==> dotnet test exit code: ${EXIT_CODE}" +echo "==> TRX written to ${RESULTS_DIR}/linux-test-results.trx" +exit ${EXIT_CODE}