Skip to content

Implement graceful shutdown for Garnet server ( #1382 )#1551

Open
yuseok-kim-edushare wants to merge 59 commits intomicrosoft:mainfrom
yuseok-kim-edushare:yuseok-kim/graceful_shutdown
Open

Implement graceful shutdown for Garnet server ( #1382 )#1551
yuseok-kim-edushare wants to merge 59 commits intomicrosoft:mainfrom
yuseok-kim-edushare:yuseok-kim/graceful_shutdown

Conversation

@yuseok-kim-edushare
Copy link
Contributor

📌 Summary

This PR introduces a robust, graceful shutdown mechanism for Garnet, specifically addressing the data loss issues identified in [#1382].

The implementation ensures a deterministic shutdown sequence: Ingress Throttling → Connection Draining → Final Data Persistence (AOF/Checkpoint). This ensures that the server shuts down safely without losing in-flight data, especially when running as a Windows Service.


🛠 Key Changes

1️⃣ Graceful Shutdown Implementation (Lifecycle Management)

  • GarnetServer.ShutdownAsync: Introduced a centralized shutdown interface that orchestrates stopping listeners, waiting for active connections within a timeout, and committing final state (AOF/Checkpoint).
  • Worker & Entry Point Integration: Updated the Windows Service worker (Garnet.worker) and the CLI entry point (Program.cs) to trigger the graceful shutdown sequence upon receiving termination signals (SIGINT, SIGTERM).

2️⃣ Enhanced Connection Handling (Infrastructure)

  • Listener Control: Added a StopListening method to IGarnetServer to immediately stop accepting new connections at the socket level while maintaining existing ones.
  • TCP Server Refactor: Implemented volatile bool isListening logic in GarnetServerTcp to safely break the Accept loop and handle socket closure exceptions gracefully.

3️⃣ Comprehensive Test Coverage

  • GarnetServerTcpTests: Added new tests to verify:
  • Idempotency of StopListening.
  • Immediate rejection of new connections after StopListening.
  • Proper data persistence (AOF commit) during the ShutdownAsync process.

💡 Why This Approach?

  • Data Integrity: By enforcing a final AOF commit and Checkpoint after all connections are drained, we guarantee that the "static" state of the store is fully persisted to disk.
  • Minimal Invasive Change: The modifications to Program.cs use a minimal-diff approach (replacing Thread.Sleep with a waitable event) to maintain codebase consistency while enabling essential functionality.
  • Architectural Alignment: The design follows the existing Garnet abstractions, extending GarnetServerBase to provide a consistent shutdown experience across different hosting environments.

✅ Related Issues

yuseok-kim-edushare and others added 30 commits November 27, 2025 00:03
Adds a graceful shutdown mechanism to the Garnet server, ensuring new connections are stopped, active connections are awaited, and data is safely persisted (AOF commit and checkpoint) before exit. Updates include new ShutdownAsync logic in GarnetServer, StopListening support in server classes, and integration of shutdown handling in both Windows service and console entry points.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
default 30 seconds value will be more torelant in production operations
(our company's small size is enough 5 seconds, but large scale production has risks)
Change GetActiveConnectionCount return type and local accumulator from int to long and remove the redundant cast when adding garnetServerBase.get_conn_active(). This prevents potential integer overflow when summing active connections across multiple server instances; callers may need to handle the updated long return value.
replace OfType<T> to is <T>
Replace manual Stopwatch-based timeout logic with a linked CancellationTokenSource (linked to the external token) and CancelAfter(timeout). The loop now observes cts.Token for both external cancellation and timeout, and delay calls use the linked token. Improved exception handling: rethrow when the external token is canceled, log a warning when the timeout triggers, and centralize other error logging/retry behavior. This ensures correct timeout semantics and clearer error handling while waiting for active connections to close.
{
do
{
// Check isListening flag before processing and before calling AcceptAsync again
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this isListening flag at all, or any changes to AcceptEventArg_Completed. Once you close the listenSocket, new connections are not accepted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your feedback is spot on. I've test and confirmed that closing the listening socket effectively prevents new connections from being accepted.
I update codes in c6fcb19 commit

/// </summary>
private async Task FinalizeDataAsync(CancellationToken token)
{
// Commit AOF before checkpoint/shutdown
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe we should first take checkpoint and then flush AOF, per redis specs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I checked checkpoint and flush AOF both with any sequence are prevent data loss correctly by this test code implement in aaaf45b
and I fixed code in 714c2f6

}

// Take checkpoint for tiered storage
if (opts.EnableStorageTier)
Copy link
Collaborator

@badrishc badrishc Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown API should take a bool noSave (default false) argument and perform the checkpoint only if noSave is false. This will allow users to shut down much more quickly in case of large databases.

note that technically, AOF flush alone is sufficient to prevent data loss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implement only one call once save data between AOF flush and Checkpoint,
in 08e48a2

/// <param name="timeout">Timeout for waiting on active connections (default: 30 seconds)</param>
/// <param name="token">Cancellation token</param>
/// <returns>Task representing the async shutdown operation</returns>
public async Task ShutdownAsync(TimeSpan? timeout = null, CancellationToken token = default)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can take noSave as an argument here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added argument noSave in ShutdownAsync
on e9b0a3e
but I don't have any idea good way to noSave = True calling in Garnet

I will think about that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1004 Request RESP Shutdown Commands and others
And My ShutdownAsync can be baseline to implement RESP Shutdown Save | NoSave

for skip availiablity about Data Persistence making during Shutdown Process

: To Reflect PR Comment from @badrishc
Introduce a new NUnit test suite ShutdownDataConsistencyTests to exercise data recovery across various shutdown/finalization sequences. The tests create a GarnetServer with AOF and storage enabled, populate main (string) and object (sorted set) stores, perform combinations of checkpoint and AOF commit (including interleaved writes), recover the server, and assert correctness. Covers scenarios: checkpoint→AOF, AOF→checkpoint, AOF only, checkpoint only, no finalization (baseline), checkpoint→more writes→AOF, and AOF→more writes→checkpoint. Uses StackExchange.Redis, TestUtils helpers, and verifies both main and object store recovery for KeyCount entries.
To reflect @badrishc 's Comments
AOF or Checkpoint single call is enough to prevent data loss
I test with test code
and we don't needs isListnening Flag

following test codes proved

```
@ -0,0 +1,295 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Allure.NUnit;
using NUnit.Framework;
using NUnit.Framework.Legacy;
using StackExchange.Redis;

namespace Garnet.test
{
    /// <summary>
    /// Tests validating graceful shutdown behavior:
    /// - StopListening prevents new connections (via listenSocket.Close + ObjectDisposedException/SocketError)
    /// - Existing connections remain functional after StopListening
    /// - ShutdownAsync performs end-to-end graceful shutdown
    /// - Data written before shutdown is preserved after recovery
    ///
    /// These tests demonstrate that the socket-close mechanism (ObjectDisposedException catch
    /// and SocketError.OperationAborted in HandleNewConnection) is sufficient to stop accepting
    /// new connections — the isListening flag is not strictly required for correctness.
    /// </summary>
    [AllureNUnit]
    [TestFixture]
    public class GracefulShutdownTests : AllureTestBase
    {
        private GarnetServer server;

        [SetUp]
        public void Setup()
        {
            TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
        }

        [TearDown]
        public void TearDown()
        {
            server?.Dispose();
            TestUtils.DeleteDirectory(TestUtils.MethodTestDir);
        }

        /// <summary>
        /// Validates that after calling ShutdownAsync (which calls StopListening internally),
        /// no new TCP connections can be established to the server port.
        /// This proves that listenSocket.Close() alone is sufficient to reject new connections.
        /// </summary>
        [Test]
        public async Task StopListening_RejectsNewConnections()
        {
            server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir);
            server.Start();

            // Verify server is accepting connections
            using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
            {
                var db = redis.GetDatabase(0);
                db.StringSet("before-shutdown", "value1");
                ClassicAssert.AreEqual("value1", db.StringGet("before-shutdown").ToString());
            }

            // Perform graceful shutdown (calls StopListening → listenSocket.Close())
            await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5)).ConfigureAwait(false);

            // Verify new TCP connections are refused after StopListening
            // Use raw socket to avoid SE.Redis retry/reconnect logic
            using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            var connectException = Assert.ThrowsAsync<SocketException>(async () =>
            {
                await socket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, TestUtils.TestPort)).ConfigureAwait(false);
            });

            // Connection should be refused (or reset) because the listen socket is closed
            ClassicAssert.IsTrue(
                connectException.SocketErrorCode == SocketError.ConnectionRefused ||
                connectException.SocketErrorCode == SocketError.ConnectionReset,
                $"Expected ConnectionRefused or ConnectionReset but got {connectException.SocketErrorCode}");
        }

        /// <summary>
        /// Validates that an existing connection established before StopListening
        /// continues to function until the server is fully disposed.
        /// This proves that StopListening only affects the accept loop, not active handlers.
        /// </summary>
        [Test]
        public async Task StopListening_ExistingConnectionsContinueWorking()
        {
            server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir);
            server.Start();

            // Establish a connection before shutdown
            using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
            var db = redis.GetDatabase(0);

            // Verify connection works
            db.StringSet("key1", "value1");
            ClassicAssert.AreEqual("value1", db.StringGet("key1").ToString());

            // Stop listening (but don't dispose yet — just stop accepting new connections)
            // ShutdownAsync calls StopListening internally, but with a short timeout
            // and noSave=true to avoid waiting for data finalization
            await server.ShutdownAsync(timeout: TimeSpan.FromMilliseconds(100), noSave: true).ConfigureAwait(false);

            // The existing connection should still be able to execute commands
            // because StopListening only closes the listen socket, not active connections
            try
            {
                var result = db.StringGet("key1");
                // If we get here, the existing connection is still alive
                ClassicAssert.AreEqual("value1", result.ToString());
            }
            catch (RedisConnectionException)
            {
                // If the server disposed active handlers as part of shutdown,
                // a connection exception is also acceptable behavior
            }
        }

        /// <summary>
        /// End-to-end graceful shutdown test: writes data, calls ShutdownAsync with data persistence,
        /// disposes, recovers, and verifies all data is intact.
        /// This validates the full shutdown sequence: StopListening → WaitForConnections → FinalizeData.
        /// </summary>
        [Test]
        public async Task ShutdownAsync_PreservesDataAfterRecovery()
        {
            server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true);
            server.Start();

            // Write data
            using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
            {
                var db = redis.GetDatabase(0);
                for (var i = 0; i < 20; i++)
                {
                    db.StringSet($"graceful:key:{i}", $"graceful:value:{i}");
                }
            }

            // Perform graceful shutdown (includes AOF commit)
            await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5)).ConfigureAwait(false);
            server.Dispose(false);

            // Recover and verify
            server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true, tryRecover: true);
            server.Start();

            using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
            {
                var db = redis.GetDatabase(0);
                for (var i = 0; i < 20; i++)
                {
                    var value = db.StringGet($"graceful:key:{i}");
                    ClassicAssert.IsTrue(value.HasValue, $"Key graceful:key:{i} should exist after recovery");
                    ClassicAssert.AreEqual($"graceful:value:{i}", value.ToString(),
                        $"Key graceful:key:{i} has wrong value after recovery");
                }
            }
        }

        /// <summary>
        /// Validates that calling ShutdownAsync with cancellation token works correctly.
        /// The shutdown should stop gracefully when cancelled and still leave the server
        /// in a consistent state for dispose.
        /// </summary>
        [Test]
        public async Task ShutdownAsync_CancellationStopsGracefully()
        {
            server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true);
            server.Start();

            // Write some data
            using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
            {
                var db = redis.GetDatabase(0);
                db.StringSet("cancel-test", "some-value");
            }

            // Cancel immediately
            using var cts = new CancellationTokenSource();
            cts.Cancel();

            // ShutdownAsync should handle cancellation gracefully (not throw)
            await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5), token: cts.Token).ConfigureAwait(false);

            // Server should still be disposable without error
            server.Dispose(false);

            // Recover — data may or may not be persisted (cancellation interrupted finalization),
            // but the server should start cleanly
            server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, enableAOF: true, tryRecover: true);
            Assert.DoesNotThrow(() => server.Start());
        }

        /// <summary>
        /// Validates that multiple rapid calls to ShutdownAsync / StopListening are idempotent
        /// and do not cause exceptions or deadlocks.
        /// Without isListening, this is handled by ObjectDisposedException on the already-closed socket.
        /// </summary>
        [Test]
        public async Task ShutdownAsync_MultipleCallsAreIdempotent()
        {
            server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir);
            server.Start();

            // Verify server works
            using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
            {
                var db = redis.GetDatabase(0);
                db.StringSet("idempotent-test", "value");
                ClassicAssert.AreEqual("value", db.StringGet("idempotent-test").ToString());
            }

            // Call ShutdownAsync multiple times — should not throw or deadlock
            await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(2), noSave: true).ConfigureAwait(false);
            await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(2), noSave: true).ConfigureAwait(false);
            await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(2), noSave: true).ConfigureAwait(false);

            // Should still dispose cleanly
            server.Dispose();
            server = null;
        }

        /// <summary>
        /// Validates that the accept loop terminates correctly when the listen socket is closed,
        /// even under concurrent connection attempts.
        /// This simulates the race condition where connections arrive while StopListening is called.
        /// </summary>
        [Test]
        public async Task StopListening_UnderConcurrentConnectionAttempts()
        {
            server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir);
            server.Start();

            // Verify server is working
            using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
            {
                var db = redis.GetDatabase(0);
                db.StringSet("concurrent-test", "value");
            }

            // Start concurrent connection attempts
            var connectionAttempts = 0;
            var connectionFailures = 0;
            using var stopCts = new CancellationTokenSource();

            var connectTask = Task.Run(async () =>
            {
                while (!stopCts.Token.IsCancellationRequested)
                {
                    try
                    {
                        using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                        socket.ReceiveTimeout = 1000;
                        socket.SendTimeout = 1000;
                        await socket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, TestUtils.TestPort)).ConfigureAwait(false);
                        Interlocked.Increment(ref connectionAttempts);
                        socket.Close();
                    }
                    catch
                    {
                        Interlocked.Increment(ref connectionFailures);
                    }

                    await Task.Delay(10, CancellationToken.None).ConfigureAwait(false);
                }
            });

            // Give some time for connection attempts to start
            await Task.Delay(100).ConfigureAwait(false);

            // Now shut down
            await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(3), noSave: true).ConfigureAwait(false);

            // Stop connection attempts
            stopCts.Cancel();
            await connectTask.ConfigureAwait(false);

            // Verify that at least some connections succeeded before shutdown
            // and at least some failed after shutdown
            TestContext.Progress.WriteLine(
                $"Connection attempts: {connectionAttempts} succeeded, {connectionFailures} failed");

            ClassicAssert.IsTrue(connectionAttempts > 0,
                "Should have had at least some successful connections before shutdown");

            // Clean dispose
            server.Dispose();
            server = null;
        }
    }
}
```
NoFinalization isn't ensure data can't fully recovery
If already backgroud aof or checkpoint created
thus recoveryable
so this test scenario 5 at Test\ShutdownDataConsistencyTests is only perfoming test for infomational
Detect cancellation requests and pass a noSave flag to server.ShutdownAsync so AOF commit/checkpoint are skipped during forced (OS) shutdowns. Keeps the existing 5s timeout and cancellation token, but avoids a lengthy graceful save when cancellationToken.IsCancellationRequested, allowing faster disposal while preserving normal graceful shutdown otherwise.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants