feat: migrate to pull model with gRPC unary RPCs#200
Conversation
Replace the bidirectional gRPC streaming client with a polling-based architecture. The agent now pulls changed stacks from membership via paginated ListStacks RPC and reports status via unary RPCs. Key changes: - New polling_client.go: cursor-based poll loop with orphan cleanup - New membership_reporter.go: unary RPC wrapper for status reporting - Informers adapted to use MembershipReporter instead of stream Send() - Auth metadata attached via UnaryClientInterceptor - Remove stream client, connection adapter, gRPC tracing wrappers Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
The tracer was only used by the deleted gRPC streaming code. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The informer must see the stack via AddFunc before it can report a DeleteFunc. Wait for the StackStatus event before deleting. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
A newly created stack has no status, so the AddFunc doesn't emit a StackStatus event. Use a sleep to allow the informer cache to sync before deleting the stack. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add graceful disconnect RPC for agent shutdown signaling. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CRITICAL: - Fix agentID vs region.ID in UpsertVersion, DeleteVersion, regionPing (was using agent_id where DB primary key region.id is needed) - Extract resolveRegion() helper for consistent region lookup HIGH: - Revert Justfile proto ref to refs/heads/main - Delete dead code: agent/storage.go, grpc/storage.go, server_impl.go, manager.go and their generated mocks MEDIUM: - Sanitize gRPC error messages (don't leak DB errors to clients) - Use AnyRegion consistently for region lookups - Only set nextCursor when hasMore is true - Add nil check in InputUpdate.Validate() - Clean .gitignore of stale pulumi entries Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When the last page has no results, the server returns an empty cursor. The agent was overwriting its saved cursor with this empty value, causing the next poll to re-scan from the beginning. Now the agent only updates its cursor when the server returns a non-empty one, preserving the previous position. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| p.reconcileStack(ctx, stack) | ||
| } | ||
|
|
||
| if !resp.GetHasMore() { | ||
| // Only update cursor if the server returned one. | ||
| // An empty cursor means no results — keep the previous cursor | ||
| // to avoid re-scanning from the beginning. | ||
| if next := resp.GetNextCursor(); next != "" { | ||
| p.cursor = next |
There was a problem hiding this comment.
reconcileStack cannot report failures, but SyncExistingStack can fail and return early after logging. This means the poll loop can still advance p.cursor and permanently skip a membership update that was not applied to Kubernetes. Return reconciliation errors and keep the cursor unchanged on failure, or add an explicit ack/retry path.
| if isFullSync { | ||
| p.cleanupOrphans(ctx, allStackNames) | ||
| p.fullSyncDone = true |
There was a problem hiding this comment.
The first full-sync cleanup depends on p.k8sClient.List, which is backed by the informer cache, but polling is started before informers are started/synced. If this one-shot cleanup sees an empty or stale cache, fullSyncDone is still set and orphan cleanup is skipped permanently. Make cleanup return an error and only set fullSyncDone after a successful list/delete pass, or wait for cache sync first.
Restore the deleted `service Server { rpc Join(stream Message) returns
(stream Order) }` definition alongside the new AgentService. Old agent
binaries already deployed in production still speak the streaming
protocol; membership-api needs both service stubs generated to serve
them in parallel during the migration.
The agent binary itself only uses AgentService — the legacy schema is
shipped only so membership can keep streaming support behind a flag.
Sunset target: 2026-09-09 (~3 months).
Existing architecture and problems
The agent connects to membership via bidirectional gRPC streaming (Join RPC):
New architecture
Migration to a pull model with gRPC unary RPCs:
What we gain
Key changes
Companion PR
Membership: formancehq/membership-api#755 — both PRs must be deployed together.
Test plan
Generated with Claude Code