Skip to content

Commit 9b34787

Browse files
committed
feat(catalog): support for multiple catalog pods
Enables horizontal scaling through PostgreSQL-based leader election that coordinates database writes across multiple pods. All pods serve read requests from in-memory data and database queries. The leader alone performs database writes: fetches models, writes updates, and cleans up orphaned data. Leadership transfers automatically when the leader fails. Implementation: - Leader election package using pglock for distributed locking - Loader split into StartReadOnly() and StartLeader() modes - Configuration: CATALOG_LEADER_LOCK_DURATION and CATALOG_LEADER_HEARTBEAT environment variables - Integration tests for multi-pod scenarios Signed-off-by: Paul Boyd <[email protected]>
1 parent 4a37ec2 commit 9b34787

File tree

11 files changed

+1257
-76
lines changed

11 files changed

+1257
-76
lines changed

catalog/cmd/catalog.go

Lines changed: 155 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,21 @@ package cmd
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net/http"
8+
"os"
9+
"os/signal"
710
"reflect"
11+
"sync"
12+
"syscall"
813
"time"
914

1015
"github.com/golang/glog"
1116
"github.com/kubeflow/model-registry/catalog/internal/catalog"
1217
"github.com/kubeflow/model-registry/catalog/internal/db/models"
1318
"github.com/kubeflow/model-registry/catalog/internal/db/service"
19+
"github.com/kubeflow/model-registry/catalog/internal/leader"
1420
"github.com/kubeflow/model-registry/catalog/internal/server/openapi"
1521
"github.com/kubeflow/model-registry/internal/datastore"
1622
"github.com/kubeflow/model-registry/internal/datastore/embedmd"
@@ -27,6 +33,49 @@ var catalogCfg = struct {
2733
PerformanceMetricsPath: []string{},
2834
}
2935

36+
const (
37+
leaderLockName = "catalog-leader"
38+
39+
defaultLeaderLockDuration = 60 * time.Second
40+
defaultLeaderHeartbeat = 15 * time.Second
41+
42+
envLeaderLockDuration = "CATALOG_LEADER_LOCK_DURATION"
43+
envLeaderHeartbeat = "CATALOG_LEADER_HEARTBEAT"
44+
)
45+
46+
// getLeaderElectionConfig reads leader election configuration from environment
47+
// variables, falling back to defaults when unset or invalid.
48+
func getLeaderElectionConfig() (lockDuration, heartbeat time.Duration) {
49+
lockDuration = defaultLeaderLockDuration
50+
heartbeat = defaultLeaderHeartbeat
51+
52+
if envDuration := os.Getenv(envLeaderLockDuration); envDuration != "" {
53+
if parsed, err := time.ParseDuration(envDuration); err == nil {
54+
lockDuration = parsed
55+
glog.Infof("Using leader lock duration from %s: %v", envLeaderLockDuration, lockDuration)
56+
} else {
57+
glog.Warningf("Invalid %s value %q: %v (using default %v)", envLeaderLockDuration, envDuration, err, defaultLeaderLockDuration)
58+
}
59+
}
60+
61+
if envHB := os.Getenv(envLeaderHeartbeat); envHB != "" {
62+
if parsed, err := time.ParseDuration(envHB); err == nil {
63+
heartbeat = parsed
64+
glog.Infof("Using leader heartbeat from %s: %v", envLeaderHeartbeat, heartbeat)
65+
} else {
66+
glog.Warningf("Invalid %s value %q: %v (using default %v)", envLeaderHeartbeat, envHB, err, defaultLeaderHeartbeat)
67+
}
68+
}
69+
70+
// Validate pglock requirement: heartbeat <= lockDuration/2
71+
if heartbeat > lockDuration/2 {
72+
glog.Warningf("Heartbeat (%v) exceeds half of lock duration (%v), required by pglock. Using defaults.", heartbeat, lockDuration)
73+
return defaultLeaderLockDuration, defaultLeaderHeartbeat
74+
}
75+
76+
return lockDuration, heartbeat
77+
}
78+
3079
var CatalogCmd = &cobra.Command{
3180
Use: "catalog",
3281
Short: "Catalog API server",
@@ -81,11 +130,23 @@ func runCatalogServer(cmd *cobra.Command, args []string) error {
81130
return nil
82131
})
83132

84-
err = loader.Start(context.Background())
85-
if err != nil {
86-
return fmt.Errorf("error loading catalog sources: %v", err)
133+
ctx, cancel := context.WithCancel(context.Background())
134+
defer cancel()
135+
136+
sigCh := make(chan os.Signal, 1)
137+
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
138+
go func() {
139+
sig := <-sigCh
140+
glog.Infof("Received signal %v, initiating graceful shutdown", sig)
141+
cancel()
142+
}()
143+
144+
glog.Info("Starting loader in read-only mode (standby)")
145+
if err := loader.StartReadOnly(ctx); err != nil {
146+
return fmt.Errorf("error starting loader in read-only mode: %v", err)
87147
}
88148

149+
// Set up HTTP server (runs continuously regardless of leadership)
89150
svc := openapi.NewModelCatalogServiceAPIService(
90151
catalog.NewDBCatalog(services, loader.Sources),
91152
loader.Sources,
@@ -94,8 +155,97 @@ func runCatalogServer(cmd *cobra.Command, args []string) error {
94155
)
95156
ctrl := openapi.NewModelCatalogServiceAPIController(svc)
96157

97-
glog.Infof("Catalog API server listening on %s", catalogCfg.ListenAddress)
98-
return http.ListenAndServe(catalogCfg.ListenAddress, openapi.NewRouter(ctrl))
158+
server := &http.Server{
159+
Addr: catalogCfg.ListenAddress,
160+
Handler: openapi.NewRouter(ctrl),
161+
}
162+
163+
var wg sync.WaitGroup
164+
165+
errCh := make(chan error, 3)
166+
wg.Add(1)
167+
go func() {
168+
defer wg.Done()
169+
glog.Infof("Catalog API server listening on %s", catalogCfg.ListenAddress)
170+
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
171+
errCh <- fmt.Errorf("HTTP server failed: %w", err)
172+
}
173+
}()
174+
175+
go func() {
176+
<-ctx.Done()
177+
glog.Info("Shutting down HTTP server...")
178+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
179+
defer shutdownCancel()
180+
if err := server.Shutdown(shutdownCtx); err != nil {
181+
glog.Errorf("HTTP server shutdown error: %v", err)
182+
}
183+
}()
184+
185+
gormDB, err := ds.DB()
186+
if err != nil {
187+
return fmt.Errorf("error getting database connection: %w", err)
188+
}
189+
190+
lockDuration, heartbeat := getLeaderElectionConfig()
191+
glog.Infof("Leader election configured: lock duration=%v, heartbeat=%v", lockDuration, heartbeat)
192+
193+
elector, err := leader.NewLeaderElector(
194+
gormDB,
195+
ctx,
196+
leaderLockName,
197+
lockDuration,
198+
heartbeat,
199+
func(leaderCtx context.Context) {
200+
glog.Info("Became leader - starting leader-only operations")
201+
202+
// Monitor leaderCtx in separate goroutine and call StopLeader when lost
203+
go func() {
204+
<-leaderCtx.Done()
205+
glog.Info("Lost leadership, stopping leader operations...")
206+
if err := loader.StopLeader(); err != nil {
207+
glog.Errorf("Error stopping leader: %v", err)
208+
}
209+
}()
210+
211+
// StartLeader blocks until StopLeader is called or ctx cancels
212+
// Pass program context (ctx), NOT leaderCtx
213+
if err := loader.StartLeader(ctx); err != nil && !errors.Is(err, context.Canceled) {
214+
glog.Errorf("StartLeader exited with error: %v", err)
215+
}
216+
217+
glog.Info("Leader callback complete")
218+
},
219+
)
220+
if err != nil {
221+
return fmt.Errorf("error creating leader elector: %w", err)
222+
}
223+
224+
wg.Add(1)
225+
go func() {
226+
defer wg.Done()
227+
if err := elector.Run(ctx); err != nil {
228+
errCh <- fmt.Errorf("leader elector failed: %w", err)
229+
}
230+
}()
231+
232+
go func() {
233+
defer close(errCh)
234+
wg.Wait()
235+
}()
236+
237+
errs := []error{}
238+
for err := range errCh {
239+
if !errors.Is(err, context.Canceled) {
240+
errs = append(errs, err)
241+
}
242+
}
243+
244+
if err := loader.Shutdown(); err != nil {
245+
errs = append(errs, fmt.Errorf("loader shutdown error: %w", err))
246+
}
247+
248+
return errors.Join(errs...)
99249
}
100250

101251
func getRepo[T any](repoSet datastore.RepoSet) T {

catalog/cmd/catalog_config_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package cmd
2+
3+
import (
4+
"os"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestGetLeaderElectionConfig(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
lockDurationEnv string
13+
heartbeatEnv string
14+
expectedLockDuration time.Duration
15+
expectedHeartbeat time.Duration
16+
}{
17+
{
18+
name: "defaults when no env vars set",
19+
lockDurationEnv: "",
20+
heartbeatEnv: "",
21+
expectedLockDuration: defaultLeaderLockDuration,
22+
expectedHeartbeat: defaultLeaderHeartbeat,
23+
},
24+
{
25+
name: "custom valid values",
26+
lockDurationEnv: "30s",
27+
heartbeatEnv: "10s",
28+
expectedLockDuration: 30 * time.Second,
29+
expectedHeartbeat: 10 * time.Second,
30+
},
31+
{
32+
name: "fast failover for local dev",
33+
lockDurationEnv: "10s",
34+
heartbeatEnv: "3s",
35+
expectedLockDuration: 10 * time.Second,
36+
expectedHeartbeat: 3 * time.Second,
37+
},
38+
{
39+
name: "long lease for production",
40+
lockDurationEnv: "120s",
41+
heartbeatEnv: "30s",
42+
expectedLockDuration: 120 * time.Second,
43+
expectedHeartbeat: 30 * time.Second,
44+
},
45+
{
46+
name: "invalid lock duration uses default lock, keeps valid heartbeat",
47+
lockDurationEnv: "invalid",
48+
heartbeatEnv: "10s",
49+
expectedLockDuration: defaultLeaderLockDuration, // falls back to default
50+
expectedHeartbeat: 10 * time.Second, // uses parsed value
51+
},
52+
{
53+
name: "invalid heartbeat uses default heartbeat, keeps valid lock",
54+
lockDurationEnv: "30s",
55+
heartbeatEnv: "invalid",
56+
expectedLockDuration: 30 * time.Second, // uses parsed value
57+
expectedHeartbeat: defaultLeaderHeartbeat, // falls back to default
58+
},
59+
{
60+
name: "heartbeat exceeds lock duration/2 uses defaults",
61+
lockDurationEnv: "30s",
62+
heartbeatEnv: "20s", // 20s > 15s (half of 30s)
63+
expectedLockDuration: defaultLeaderLockDuration,
64+
expectedHeartbeat: defaultLeaderHeartbeat,
65+
},
66+
{
67+
name: "heartbeat exactly at lock duration/2 is valid",
68+
lockDurationEnv: "30s",
69+
heartbeatEnv: "15s",
70+
expectedLockDuration: 30 * time.Second,
71+
expectedHeartbeat: 15 * time.Second,
72+
},
73+
}
74+
75+
for _, tt := range tests {
76+
t.Run(tt.name, func(t *testing.T) {
77+
// Clear environment
78+
os.Unsetenv(envLeaderLockDuration)
79+
os.Unsetenv(envLeaderHeartbeat)
80+
81+
// Set test environment
82+
if tt.lockDurationEnv != "" {
83+
os.Setenv(envLeaderLockDuration, tt.lockDurationEnv)
84+
defer os.Unsetenv(envLeaderLockDuration)
85+
}
86+
if tt.heartbeatEnv != "" {
87+
os.Setenv(envLeaderHeartbeat, tt.heartbeatEnv)
88+
defer os.Unsetenv(envLeaderHeartbeat)
89+
}
90+
91+
// Get configuration
92+
lockDuration, heartbeat := getLeaderElectionConfig()
93+
94+
// Verify
95+
if lockDuration != tt.expectedLockDuration {
96+
t.Errorf("lock duration = %v, want %v", lockDuration, tt.expectedLockDuration)
97+
}
98+
if heartbeat != tt.expectedHeartbeat {
99+
t.Errorf("heartbeat = %v, want %v", heartbeat, tt.expectedHeartbeat)
100+
}
101+
102+
// Verify pglock requirement: heartbeat <= lockDuration/2
103+
if heartbeat > lockDuration/2 {
104+
t.Errorf("heartbeat (%v) exceeds half of lock duration (%v), violates pglock requirement", heartbeat, lockDuration)
105+
}
106+
})
107+
}
108+
}

0 commit comments

Comments
 (0)