From b12ea7371adbac3db708de7f14202422dcfa3be5 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Wed, 6 May 2026 08:45:21 +0200 Subject: [PATCH] fix(streaming): drain bridge on ctx cancel before vmConn.Close On Windows, vmConn.Close() traverses the AF_UNIX <-> vsock proxy and arrives at vminitd as a vsock SHUTDOWN packet. When the per-RPC ctx cancels mid-stream, the deferred Close() fires while the VM may still have in-flight data; the SHUTDOWN cascades into Task.Kill -> Delete -> Shutdown and tears down the entire VM, surfacing as state_error="rpc error: code = Unknown desc = ttrpc: closed". Linux/macOS use native vsock without the AF_UNIX intermediate, so the cascade does not occur there. Empirical evidence (docker/sandboxes#2529): across 10 parallel CI runs of identical content, every PASS produced exactly 1 EOF event on the proxy AF_UNIX socket while every FAIL produced 13-30. Zero overlap. This change has two pieces, both of which serialise teardown so that v2t (the VM->client bridge) drains cleanly before vmConn.Close() runs: 1. In Stream's <-ctx.Done() branch (per-RPC cancel mid-stream): call vmConn.SetReadDeadline(time.Now()) to interrupt binary.Read via the Go runtime poller -- no wire-level packet -- and drain v2t before the deferred Close() fires. 2. On shim shutdown (ContainerStop of the kit container): track every in-flight stream's vmConn so the shutdown callback can SetReadDeadline on all of them and wait (via a WaitGroup) for every Stream handler to return before reporting back. Without this, N concurrent handlers race ttrpc.server.Close() and the transport may go away before all bridges have drained, leaving partially-drained streams whose eventual Close() re-triggers the SHUTDOWN cascade against the already-stopping VM. Late-arriving Stream RPCs (after shutdown has set closing=true) are rejected with errdefs.ErrUnavailable rather than being allowed to register a vmConn the drain has already finished waiting for. Tests: - TestStreamReturnsAfterContextCancelOnIdleStream (per-RPC drain). - TestStreamPreservesStartStreamCancel (StartStream observes ctx). - TestShutdownDrainsAllInFlightStreams (shim-shutdown drain across N concurrent streams). - TestStreamRejectedAfterShutdown (post-shutdown new-stream rejection). - 3 pre-existing tests still pass. Refs: docker/sandboxes#2529 Signed-off-by: Nicolas De Loof --- plugins/shim/streaming/plugin.go | 95 ++++++++++- plugins/shim/streaming/plugin_test.go | 221 +++++++++++++++++++++++++- 2 files changed, 309 insertions(+), 7 deletions(-) diff --git a/plugins/shim/streaming/plugin.go b/plugins/shim/streaming/plugin.go index c1dbac5..7f95d76 100644 --- a/plugins/shim/streaming/plugin.go +++ b/plugins/shim/streaming/plugin.go @@ -22,10 +22,15 @@ import ( "errors" "fmt" "io" + "net" + "sync" + "time" streamapi "github.com/containerd/containerd/api/services/streaming/v1" ptypes "github.com/containerd/containerd/v2/pkg/protobuf/types" + "github.com/containerd/containerd/v2/pkg/shutdown" cplugins "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/errdefs" "github.com/containerd/log" "github.com/containerd/plugin" "github.com/containerd/plugin/registry" @@ -43,17 +48,24 @@ func init() { Type: cplugins.TTRPCPlugin, ID: "streaming", Requires: []plugin.Type{ + cplugins.InternalPlugin, plugins.SandboxPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { + ss, err := ic.GetByID(cplugins.InternalPlugin, "shutdown") + if err != nil { + return nil, err + } sb, err := ic.GetSingle(plugins.SandboxPlugin) if err != nil { return nil, err } - - return &service{ - sb: sb.(sandbox.Sandbox), - }, nil + s := &service{ + sb: sb.(sandbox.Sandbox), + streams: make(map[string]net.Conn), + } + ss.(shutdown.Service).RegisterCallback(s.shutdown) + return s, nil }, }) } @@ -63,6 +75,11 @@ const maxFrameSize = 10 << 20 type service struct { sb sandbox.Sandbox + + mu sync.Mutex + streams map[string]net.Conn + closing bool + wg sync.WaitGroup } func (s *service) RegisterTTRPC(server *ttrpc.Server) error { @@ -88,7 +105,28 @@ func (s *service) Stream(ctx context.Context, srv streamapi.TTRPCStreaming_Strea if err != nil { return fmt.Errorf("failed to start vm stream: %w", err) } - defer vmConn.Close() + + // Track the stream so the shim shutdown callback can drain all + // in-flight bridges via SetReadDeadline before sandbox.Stop tears + // down the VM. If the shim is already shutting down, reject the + // new stream rather than racing against teardown. + s.mu.Lock() + if s.closing { + s.mu.Unlock() + vmConn.Close() + return fmt.Errorf("streaming plugin is shutting down: %w", errdefs.ErrUnavailable) + } + s.streams[i.ID] = vmConn + s.wg.Add(1) + s.mu.Unlock() + + defer func() { + s.mu.Lock() + delete(s.streams, i.ID) + s.mu.Unlock() + vmConn.Close() + s.wg.Done() + }() log.G(ctx).WithField("stream", i.ID).Debug("stream bridge established") @@ -148,11 +186,58 @@ func (s *service) Stream(ctx context.Context, srv streamapi.TTRPCStreaming_Strea log.G(ctx).WithError(err).WithField("stream", i.ID).Debug("server->client bridge ended") } case <-ctx.Done(): + // On Windows, the AF_UNIX <-> vsock proxy turns vmConn.Close() + // into a vsock SHUTDOWN that cascades into Task.Kill -> + // Delete -> Shutdown when fired while the VM is mid-stream. + // SetReadDeadline interrupts the bridge's binary.Read via the + // Go runtime poller (no wire-level packet) so v2t drains + // cleanly before the deferred Close() runs. See + // docker/sandboxes#2529. + if err := vmConn.SetReadDeadline(time.Now()); err != nil { + log.G(ctx).WithError(err).WithField("stream", i.ID).Debug("failed to set read deadline on vm conn") + } + <-v2t } return nil } +// shutdown is registered as a shim shutdown callback. It drains every +// in-flight Stream handler before sandbox.Stop tears down the VM so +// that all bridges have exited cleanly and the deferred vmConn.Close() +// calls have already fired by the time the shim/ttrpc transport goes +// away. Without this serialisation, N concurrent Stream handlers race +// the transport teardown on shim shutdown and leak partially-drained +// bridges that re-trigger the Windows SHUTDOWN cascade documented in +// docker/sandboxes#2529. +func (s *service) shutdown(ctx context.Context) error { + s.mu.Lock() + s.closing = true + streams := make([]net.Conn, 0, len(s.streams)) + for _, c := range s.streams { + streams = append(streams, c) + } + s.mu.Unlock() + + for _, c := range streams { + if err := c.SetReadDeadline(time.Now()); err != nil { + log.G(ctx).WithError(err).Debug("failed to set read deadline on vm conn during shutdown") + } + } + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + // bridgeTTRPCToVM reads typeurl.Any messages from the TTRPC stream and // writes them as length-prefixed proto frames to the VM connection. func bridgeTTRPCToVM(srv streamapi.TTRPCStreaming_StreamServer, conn io.Writer) error { diff --git a/plugins/shim/streaming/plugin_test.go b/plugins/shim/streaming/plugin_test.go index cc8114d..08d5cca 100644 --- a/plugins/shim/streaming/plugin_test.go +++ b/plugins/shim/streaming/plugin_test.go @@ -164,7 +164,7 @@ func startStream(t *testing.T, ctx context.Context, id string) (srv *fakeStreamS srv = newFakeStreamServer(ctx) srv.recvCh <- streamInitAny(t, id) - svc := &service{sb: &fakeSandbox{conn: shimSide}} + svc := &service{sb: &fakeSandbox{conn: shimSide}, streams: make(map[string]net.Conn)} d := make(chan error, 1) go func() { d <- svc.Stream(ctx, srv) }() @@ -325,6 +325,223 @@ func TestStreamForwardsBothDirections(t *testing.T) { } } +// TestStreamReturnsAfterContextCancelOnIdleStream covers the abandoned- +// exec scenario: a client disconnects (per-RPC ctx cancels) while the +// VM holds the stream open but is not sending any data. The handler +// must drain via the SetReadDeadline-driven exit path on <-ctx.Done() +// rather than leak its goroutines and the vmConn FD waiting for a VM +// EOF that will never come. +// +// The c2v goroutine exits cleanly when srv.Recv() observes ctx.Err(), +// but its zero-length EOF marker is written into the shim->VM half of +// the pipe; v2t reads from the VM->shim half and therefore never sees +// it. The handler's <-ctx.Done() branch must SetReadDeadline on vmConn +// to unblock v2t's binary.Read. +func TestStreamReturnsAfterContextCancelOnIdleStream(t *testing.T) { + parent := t.Context() + ctx, cancel := context.WithCancel(parent) + defer cancel() + + _, _, done := startStream(t, ctx, "stream-cancel-idle") + + // Neither side ever sends a frame. The handler is now waiting on + // v2t. Cancelling the per-RPC ctx mimics ttrpc tearing down the + // stream after the caller has gone away. + cancel() + + select { + case err := <-done: + // Either nil (clean drain) or context.Canceled (propagated + // from srv.Recv) is acceptable; what matters is that the + // handler returned at all. + if err != nil && !errors.Is(err, context.Canceled) { + t.Fatalf("Stream returned unexpected error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Stream handler did not return after ctx cancel; v2t is still blocked in binary.Read(vmConn). The fix must give ctx.Done() a controlled exit path that unblocks the VM->client bridge (e.g. SetReadDeadline) without firing vmConn.Close() before v2t has drained.") + } +} + +// blockingStartSandbox blocks in StartStream until ctx is cancelled or +// release is closed, mimicking the real libkrun StartStream retry loop +// that polls for the guest stream socket to appear and observes +// ctx.Done() between attempts. +type blockingStartSandbox struct { + called chan struct{} + release chan struct{} +} + +func (s *blockingStartSandbox) Start(context.Context, ...sandbox.Opt) error { + return errdefs.ErrNotImplemented +} +func (s *blockingStartSandbox) Stop(context.Context) error { return errdefs.ErrNotImplemented } +func (s *blockingStartSandbox) Client() (*ttrpc.Client, error) { return nil, errdefs.ErrNotImplemented } +func (s *blockingStartSandbox) StartStream(ctx context.Context, _ string) (net.Conn, error) { + close(s.called) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-s.release: + return nil, errdefs.ErrNotImplemented + } +} + +// TestStreamPreservesStartStreamCancel asserts that StartStream observes +// the caller's ctx cancellation: a timed-out or cancelled RPC must abort +// connection establishment promptly rather than block in libkrun's +// retry loop for the full polling window. +func TestStreamPreservesStartStreamCancel(t *testing.T) { + parent := t.Context() + ctx, cancel := context.WithCancel(parent) + defer cancel() + + sb := &blockingStartSandbox{ + called: make(chan struct{}), + release: make(chan struct{}), + } + t.Cleanup(func() { close(sb.release) }) + + srv := newFakeStreamServer(ctx) + srv.recvCh <- streamInitAny(t, "slow-start") + + svc := &service{sb: sb, streams: make(map[string]net.Conn)} + + done := make(chan error, 1) + go func() { done <- svc.Stream(ctx, srv) }() + + select { + case <-sb.called: + case <-time.After(2 * time.Second): + t.Fatal("StartStream was never invoked") + } + + // Caller is gone. StartStream must observe and abort. + cancel() + + select { + case err := <-done: + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled propagation, got %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Stream did not return after ctx cancel during StartStream; the retry loop ignored the cancel signal. The handler must pass the caller's ctx to StartStream so its retry loop aborts on cancel/timeout.") + } +} + +// TestShutdownDrainsAllInFlightStreams covers the shim shutdown drain +// path: when ContainerStop tears down the kit shim, multiple Stream +// handlers may be in flight simultaneously (dockerd healthcheck poll, +// readiness exec, sentinel hold, ...) and their bridges race the +// transport teardown. The shutdown callback must SetReadDeadline on +// every tracked vmConn and wait for all handlers to exit before +// returning, so that vmConn.Close() has fired on every stream by the +// time sandbox.Stop tears down the VM. +func TestShutdownDrainsAllInFlightStreams(t *testing.T) { + ctx := t.Context() + + const n = 3 + shimSides := make([]net.Conn, n) + vmSides := make([]net.Conn, n) + conns := make(map[string]net.Conn, n) + for i := 0; i < n; i++ { + shim, vm := net.Pipe() + shimSides[i] = shim + vmSides[i] = vm + conns[fmt.Sprintf("stream-%d", i)] = shim + t.Cleanup(func() { shim.Close(); vm.Close() }) + } + + svc := &service{ + sb: &fakeMultiSandbox{conns: conns}, + streams: make(map[string]net.Conn), + } + + // Open n concurrent streams; none send any data so all handlers + // are blocked in v2t's binary.Read. + dones := make([]<-chan error, n) + srvs := make([]*fakeStreamServer, n) + for i := 0; i < n; i++ { + srvs[i] = newFakeStreamServer(ctx) + srvs[i].recvCh <- streamInitAny(t, fmt.Sprintf("stream-%d", i)) + d := make(chan error, 1) + go func(i int) { d <- svc.Stream(ctx, srvs[i]) }(i) + dones[i] = d + select { + case <-srvs[i].sendCh: + case <-time.After(2 * time.Second): + t.Fatalf("stream %d: timed out waiting for ack", i) + } + } + + // Confirm all n streams are tracked. + svc.mu.Lock() + if len(svc.streams) != n { + svc.mu.Unlock() + t.Fatalf("expected %d tracked streams, got %d", n, len(svc.streams)) + } + svc.mu.Unlock() + + // Trigger the shim shutdown drain. It must SetReadDeadline on each + // vmConn (unblocking every v2t bridge) and wait for every handler + // to return before reporting back. + shutDone := make(chan error, 1) + go func() { shutDone <- svc.shutdown(ctx) }() + + select { + case err := <-shutDone: + if err != nil { + t.Fatalf("shutdown returned %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("shutdown did not return; some Stream handlers are still in flight (their v2t bridges blocked in binary.Read). The shutdown callback must SetReadDeadline on every tracked vmConn and wait for all handlers to exit.") + } + + // All Stream handlers must have returned. + for i, d := range dones { + select { + case <-d: + case <-time.After(time.Second): + t.Fatalf("stream %d: handler did not return after shutdown", i) + } + } + + // Tracking map must be empty (every handler's defer ran). + svc.mu.Lock() + if got := len(svc.streams); got != 0 { + svc.mu.Unlock() + t.Fatalf("expected streams map empty after shutdown, got %d remaining", got) + } + svc.mu.Unlock() +} + +// TestStreamRejectedAfterShutdown asserts that the streaming service +// refuses new Stream RPCs once shutdown has marked it closing, so that +// late-arriving requests cannot register a vmConn that the shutdown +// drain has already finished waiting for. +func TestStreamRejectedAfterShutdown(t *testing.T) { + ctx := t.Context() + + shimSide, vmSide := net.Pipe() + t.Cleanup(func() { shimSide.Close(); vmSide.Close() }) + + svc := &service{ + sb: &fakeSandbox{conn: shimSide}, + streams: make(map[string]net.Conn), + } + + if err := svc.shutdown(ctx); err != nil { + t.Fatalf("shutdown returned %v", err) + } + + srv := newFakeStreamServer(ctx) + srv.recvCh <- streamInitAny(t, "post-shutdown") + + err := svc.Stream(ctx, srv) + if !errors.Is(err, errdefs.ErrUnavailable) { + t.Fatalf("expected ErrUnavailable, got %v", err) + } +} + // fakeMultiSandbox returns a different net.Conn from StartStream for // each registered stream ID, allowing a single ttrpc connection to host // multiple streams with different VM-side behavior in the same test. @@ -405,7 +622,7 @@ func TestSlowStreamDoesNotBlockOtherStreams(t *testing.T) { if err != nil { t.Fatalf("NewServer: %v", err) } - streamapi.RegisterTTRPCStreamingService(server, &service{sb: sb}) + streamapi.RegisterTTRPCStreamingService(server, &service{sb: sb, streams: make(map[string]net.Conn)}) // Stay under the AF_UNIX 104-byte sun_path limit on macOS: // t.TempDir() embeds the full test name, pushing the path over the