Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions plugins/shim/streaming/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"time"

streamapi "github.com/containerd/containerd/api/services/streaming/v1"
ptypes "github.com/containerd/containerd/v2/pkg/protobuf/types"
Expand Down Expand Up @@ -148,6 +149,17 @@ func (s *service) Stream(ctx context.Context, srv streamapi.TTRPCStreaming_Strea
log.G(ctx).WithError(err).WithField("stream", i.ID).Debug("server->client bridge ended")
}
case <-ctx.Done():
// On Windows, the AF_UNIX <-> vsock proxy turns vmConn.Close()
// into a vsock SHUTDOWN that cascades into Task.Kill ->
// Delete -> Shutdown when fired while the VM is mid-stream.
// SetReadDeadline interrupts the bridge's binary.Read via the
// Go runtime poller (no wire-level packet) so v2t drains
// cleanly before the deferred Close() runs. See
// docker/sandboxes#2529.
if dc, ok := vmConn.(interface{ SetReadDeadline(time.Time) error }); ok {
_ = dc.SetReadDeadline(time.Now())
Comment on lines +159 to +160
}
<-v2t
}

return nil
Expand Down
105 changes: 105 additions & 0 deletions plugins/shim/streaming/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,111 @@ func TestStreamForwardsBothDirections(t *testing.T) {
}
}

// TestStreamReturnsAfterContextCancelOnIdleStream covers the abandoned-
// exec scenario flagged in the review of #177: a client disconnects
// (per-RPC ctx cancels) while the VM holds the stream open but is not
// sending any data. The handler must not leak its goroutines and the
// vmConn FD waiting for a VM EOF that will never come.
//
// Without the SetReadDeadline-driven exit path, bridgeVMToTTRPC blocks
// indefinitely in binary.Read(vmConn) because vmConn's lifetime was
// detached from ctx. The c2v goroutine exits cleanly when srv.Recv()
// observes ctx.Err(), but its zero-length EOF marker is written into
// the shim->VM half of the pipe; v2t reads from the VM->shim half and
// therefore never sees it.
Comment on lines +334 to +339
func TestStreamReturnsAfterContextCancelOnIdleStream(t *testing.T) {
parent := t.Context()
ctx, cancel := context.WithCancel(parent)
defer cancel()

_, _, done := startStream(t, ctx, "stream-cancel-idle")

// Neither side ever sends a frame. The handler is now waiting on
// v2t. Cancelling the per-RPC ctx mimics ttrpc tearing down the
// stream after the caller has gone away.
cancel()

select {
case err := <-done:
// Either nil (clean drain) or context.Canceled (propagated
// from srv.Recv) is acceptable; what matters is that the
// handler returned at all.
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("Stream returned unexpected error: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Stream handler did not return after ctx cancel; v2t is still blocked in binary.Read(vmConn). The fix must give ctx.Done() a controlled exit path that unblocks the VM->client bridge (e.g. SetReadDeadline) without firing vmConn.Close() before v2t has drained.")
}
}

// blockingStartSandbox blocks in StartStream until ctx is cancelled or
// release is closed, mimicking the real libkrun StartStream retry loop
// that polls for the guest stream socket to appear and observes
// ctx.Done() between attempts.
type blockingStartSandbox struct {
called chan struct{}
release chan struct{}
}

func (s *blockingStartSandbox) Start(context.Context, ...sandbox.Opt) error {
return errdefs.ErrNotImplemented
}
func (s *blockingStartSandbox) Stop(context.Context) error { return errdefs.ErrNotImplemented }
func (s *blockingStartSandbox) Client() (*ttrpc.Client, error) { return nil, errdefs.ErrNotImplemented }
func (s *blockingStartSandbox) StartStream(ctx context.Context, _ string) (net.Conn, error) {
close(s.called)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-s.release:
return nil, errdefs.ErrNotImplemented
}
}

// TestStreamPreservesStartStreamCancel covers Copilot's second concern
// on #177: passing context.WithoutCancel(ctx) into StartStream strips the
// caller's deadline and cancellation, so the libkrun retry loop polls
// for the full ~50s window even after the RPC has been torn down. The
// handler must instead pass the original ctx to StartStream so a
// timed-out or cancelled RPC aborts connection establishment promptly.
Comment on lines +389 to +394
func TestStreamPreservesStartStreamCancel(t *testing.T) {
parent := t.Context()
ctx, cancel := context.WithCancel(parent)
defer cancel()

sb := &blockingStartSandbox{
called: make(chan struct{}),
release: make(chan struct{}),
}
t.Cleanup(func() { close(sb.release) })

srv := newFakeStreamServer(ctx)
srv.recvCh <- streamInitAny(t, "slow-start")

svc := &service{sb: sb}

done := make(chan error, 1)
go func() { done <- svc.Stream(ctx, srv) }()

select {
case <-sb.called:
case <-time.After(2 * time.Second):
t.Fatal("StartStream was never invoked")
}

// Caller is gone. StartStream must observe and abort.
cancel()

select {
case err := <-done:
if !errors.Is(err, context.Canceled) {
t.Fatalf("expected context.Canceled propagation, got %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Stream did not return after ctx cancel during StartStream; the retry loop ignored the cancel signal. This is the regression introduced by passing context.WithoutCancel(ctx) into StartStream — the original ctx must be used so its retry loop can abort.")
}
}

// fakeMultiSandbox returns a different net.Conn from StartStream for
// each registered stream ID, allowing a single ttrpc connection to host
// multiple streams with different VM-side behavior in the same test.
Expand Down
Loading