|
1 | 1 | package protocol |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "context" |
4 | 5 | "fmt" |
5 | 6 | "os" |
| 7 | + "os/signal" |
6 | 8 | "path/filepath" |
| 9 | + "syscall" |
7 | 10 |
|
8 | 11 | "github.com/datazip-inc/olake/constants" |
9 | 12 | "github.com/datazip-inc/olake/drivers/abstract" |
@@ -80,13 +83,55 @@ var RootCmd = &cobra.Command{ |
80 | 83 | }, |
81 | 84 | } |
82 | 85 |
|
| 86 | +// CreateRootCommand wires the cobra root for the given driver. It mutates |
| 87 | +// package-level state (RootCmd, connector) and installs a process-wide signal |
| 88 | +// handler, so it must be called at most once per process — the existing |
| 89 | +// connector.RegisterDriver entry point already enforces this. |
83 | 90 | func CreateRootCommand(_ bool, driver any) *cobra.Command { |
84 | 91 | RootCmd.AddCommand(commands...) |
85 | | - connector = abstract.NewAbstractDriver(RootCmd.Context(), driver.(abstract.DriverInterface)) |
| 92 | + |
| 93 | + // Wire SIGINT/SIGTERM into the root context so CDC, backfill and |
| 94 | + // destination-writer paths reach their existing ctx.Done() branches on |
| 95 | + // pod eviction, docker stop, or Ctrl-C, instead of being killed mid-read. |
| 96 | + ctx := signalAwareRootContext(RootCmd.Context()) |
| 97 | + RootCmd.SetContext(ctx) |
| 98 | + |
| 99 | + connector = abstract.NewAbstractDriver(ctx, driver.(abstract.DriverInterface)) |
86 | 100 |
|
87 | 101 | return RootCmd |
88 | 102 | } |
89 | 103 |
|
| 104 | +// signalAwareRootContext wraps parent so that the returned context cancels on |
| 105 | +// SIGINT / SIGTERM as well as on any parent cancellation. Used to wire pod |
| 106 | +// eviction, docker stop, and Ctrl-C through to the existing ctx.Done() |
| 107 | +// branches in CDC, backfill, and destination-writer paths. |
| 108 | +// |
| 109 | +// Source / destination consistency on cancel is still owned by each |
| 110 | +// driver.PostCDC and destination writer.Close implementation. This wrapper only |
| 111 | +// makes process signals visible through ctx.Done(); it does not make source |
| 112 | +// checkpoints and destination commits atomic. Any implementation that performs |
| 113 | +// a final commit after work has been written must continue to check ctx.Done() |
| 114 | +// before that commit and must treat a canceled context as a reason to avoid |
| 115 | +// advancing only one side of the source/destination boundary. |
| 116 | +// |
| 117 | +// The Kafka driver has a separate `TODO: Add 2PC support for Kafka` for a |
| 118 | +// future stricter contract. Other drivers may have similar source-specific |
| 119 | +// checkpointing constraints, so this helper should not be used as a substitute |
| 120 | +// for driver-level cancellation safety. |
| 121 | +func signalAwareRootContext(parent context.Context) context.Context { |
| 122 | + ctx, stop := signal.NotifyContext(parent, syscall.SIGINT, syscall.SIGTERM) |
| 123 | + // signal.NotifyContext keeps the signal handler installed until stop() is |
| 124 | + // called. Releasing it after the first cancellation lets a subsequent |
| 125 | + // SIGINT/SIGTERM fall through to the Go runtime default (terminate), which |
| 126 | + // is the behavior an operator hitting Ctrl-C twice expects. |
| 127 | + go func() { |
| 128 | + <-ctx.Done() |
| 129 | + stop() |
| 130 | + }() |
| 131 | + |
| 132 | + return ctx |
| 133 | +} |
| 134 | + |
90 | 135 | func init() { |
91 | 136 | // TODO: replace --catalog flag with --streams |
92 | 137 | commands = append(commands, specCmd, checkCmd, discoverCmd, syncCmd, clearCmd) |
|
0 commit comments