Skip to content

Commit 5b20930

Browse files
wesmclaude
andauthored
Fix sync now to connect if daemon not yet connected (#69)
## Summary - Fix `roborev sync now` failing when the daemon hasn't connected to PostgreSQL yet - Add `/api/sync/status` endpoint for health checking ## Changes **SyncNowWithProgress auto-connects** If called before the background sync loop has established a connection (e.g., immediately after daemon start), `SyncNowWithProgress` now attempts to connect first rather than returning "not connected to PostgreSQL". **Connection serialization** Added `connectMu` mutex to prevent concurrent connection attempts. If multiple callers try to connect simultaneously, only one will proceed and others will find the connection already established. **Sync status endpoint** New `/api/sync/status` endpoint returns: ```json {"enabled": true, "connected": true, "message": "ok"} ``` Useful for monitoring and debugging sync health. ## Test plan - [x] Start daemon with sync enabled - [x] Immediately run `roborev sync now` before background sync connects - [x] Verify it connects and syncs successfully instead of failing --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent c77f6e8 commit 5b20930

File tree

2 files changed

+104
-29
lines changed

2 files changed

+104
-29
lines changed

internal/daemon/server.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func NewServer(db *storage.DB, cfg *config.Config) *Server {
7171
mux.HandleFunc("/api/status", s.handleStatus)
7272
mux.HandleFunc("/api/stream/events", s.handleStreamEvents)
7373
mux.HandleFunc("/api/sync/now", s.handleSyncNow)
74+
mux.HandleFunc("/api/sync/status", s.handleSyncStatus)
7475

7576
s.httpServer = &http.Server{
7677
Addr: cfg.ServerAddr,
@@ -234,6 +235,32 @@ func (s *Server) handleSyncNow(w http.ResponseWriter, r *http.Request) {
234235
})
235236
}
236237

238+
// handleSyncStatus returns the current sync worker health status
239+
func (s *Server) handleSyncStatus(w http.ResponseWriter, r *http.Request) {
240+
if r.Method != http.MethodGet {
241+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
242+
return
243+
}
244+
245+
w.Header().Set("Content-Type", "application/json")
246+
247+
if s.syncWorker == nil {
248+
json.NewEncoder(w).Encode(map[string]interface{}{
249+
"enabled": false,
250+
"connected": false,
251+
"message": "sync not enabled",
252+
})
253+
return
254+
}
255+
256+
healthy, message := s.syncWorker.HealthCheck()
257+
json.NewEncoder(w).Encode(map[string]interface{}{
258+
"enabled": true,
259+
"connected": healthy,
260+
"message": message,
261+
})
262+
}
263+
237264
// API request/response types
238265

239266
type EnqueueRequest struct {

internal/storage/syncworker.go

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@ import (
1212

1313
// SyncWorker handles background synchronization between SQLite and PostgreSQL
1414
type SyncWorker struct {
15-
db *DB
16-
cfg config.SyncConfig
17-
pgPool *PgPool
18-
stopCh chan struct{}
19-
doneCh chan struct{}
20-
mu sync.Mutex // protects running and pgPool
21-
syncMu sync.Mutex // serializes sync operations (doSync, SyncNow, FinalPush)
22-
running bool
15+
db *DB
16+
cfg config.SyncConfig
17+
pgPool *PgPool
18+
stopCh chan struct{}
19+
doneCh chan struct{}
20+
mu sync.Mutex // protects running and pgPool
21+
syncMu sync.Mutex // serializes sync operations (doSync, SyncNow, FinalPush)
22+
connectMu sync.Mutex // serializes connect operations
23+
running bool
2324
}
2425

2526
// NewSyncWorker creates a new sync worker
@@ -179,6 +180,7 @@ func (w *SyncWorker) SyncNow() (*SyncStats, error) {
179180
}
180181

181182
// SyncNowWithProgress is like SyncNow but calls progressFn after each batch.
183+
// If not connected, attempts to connect first.
182184
func (w *SyncWorker) SyncNowWithProgress(progressFn func(SyncProgress)) (*SyncStats, error) {
183185
w.mu.Lock()
184186
if !w.running {
@@ -188,8 +190,33 @@ func (w *SyncWorker) SyncNowWithProgress(progressFn func(SyncProgress)) (*SyncSt
188190
pool := w.pgPool
189191
w.mu.Unlock()
190192

193+
// If not connected, attempt to connect now
191194
if pool == nil {
192-
return nil, fmt.Errorf("not connected to PostgreSQL")
195+
connectTimeout := 30 * time.Second
196+
if _, err := w.connect(connectTimeout); err != nil {
197+
// Log full error server-side, return generic error to caller
198+
// (connection errors may contain credentials or host details)
199+
log.Printf("Sync: connection failed: %v", err)
200+
return nil, fmt.Errorf("failed to connect to PostgreSQL")
201+
}
202+
// Re-check running state and get pool under lock
203+
// (Stop may have been called during connect)
204+
w.mu.Lock()
205+
if !w.running {
206+
// Worker was stopped during connect - close pool and abort
207+
if w.pgPool != nil {
208+
w.pgPool.Close()
209+
w.pgPool = nil
210+
}
211+
w.mu.Unlock()
212+
return nil, fmt.Errorf("sync worker stopped during connect")
213+
}
214+
pool = w.pgPool
215+
w.mu.Unlock()
216+
if pool == nil {
217+
return nil, fmt.Errorf("connection succeeded but pool is nil")
218+
}
219+
log.Printf("Sync: connected to PostgreSQL (triggered by sync now)")
193220
}
194221

195222
// Serialize with other sync operations
@@ -293,7 +320,8 @@ func (w *SyncWorker) run(stopCh, doneCh chan struct{}, interval, connectTimeout
293320
}
294321

295322
// Try to connect
296-
if err := w.connect(connectTimeout); err != nil {
323+
newConn, err := w.connect(connectTimeout)
324+
if err != nil {
297325
log.Printf("Sync: connection failed: %v (retry in %v)", err, backoff)
298326
select {
299327
case <-stopCh:
@@ -306,10 +334,13 @@ func (w *SyncWorker) run(stopCh, doneCh chan struct{}, interval, connectTimeout
306334

307335
// Connected - reset backoff
308336
backoff = time.Second
309-
log.Printf("Sync: connected to PostgreSQL")
337+
if newConn {
338+
log.Printf("Sync: connected to PostgreSQL")
339+
}
310340

311341
// Run sync loop until disconnection or stop
312-
w.syncLoop(stopCh, interval)
342+
// Only do initial sync if we made the connection (not if SyncNow connected for us)
343+
w.syncLoop(stopCh, interval, newConn)
313344

314345
// If we get here, we disconnected - try to reconnect
315346
w.mu.Lock()
@@ -321,11 +352,24 @@ func (w *SyncWorker) run(stopCh, doneCh chan struct{}, interval, connectTimeout
321352
}
322353
}
323354

324-
// connect establishes the PostgreSQL connection
325-
func (w *SyncWorker) connect(timeout time.Duration) error {
355+
// connect establishes the PostgreSQL connection.
356+
// Serialized by connectMu to prevent concurrent connection attempts.
357+
// Returns (true, nil) if a new connection was made, (false, nil) if already connected.
358+
func (w *SyncWorker) connect(timeout time.Duration) (bool, error) {
359+
w.connectMu.Lock()
360+
defer w.connectMu.Unlock()
361+
362+
// Check if already connected (another goroutine may have connected while we waited)
363+
w.mu.Lock()
364+
if w.pgPool != nil {
365+
w.mu.Unlock()
366+
return false, nil
367+
}
368+
w.mu.Unlock()
369+
326370
url := w.cfg.PostgresURLExpanded()
327371
if url == "" {
328-
return fmt.Errorf("postgres_url not configured")
372+
return false, fmt.Errorf("postgres_url not configured")
329373
}
330374

331375
ctx, cancel := context.WithTimeout(context.Background(), timeout)
@@ -336,26 +380,26 @@ func (w *SyncWorker) connect(timeout time.Duration) error {
336380

337381
pool, err := NewPgPool(ctx, url, cfg)
338382
if err != nil {
339-
return err
383+
return false, err
340384
}
341385

342386
// Ensure schema exists
343387
if err := pool.EnsureSchema(ctx); err != nil {
344388
pool.Close()
345-
return fmt.Errorf("ensure schema: %w", err)
389+
return false, fmt.Errorf("ensure schema: %w", err)
346390
}
347391

348392
// Check if this is a new/different Postgres database
349393
dbID, err := pool.GetDatabaseID(ctx)
350394
if err != nil {
351395
pool.Close()
352-
return fmt.Errorf("get database ID: %w", err)
396+
return false, fmt.Errorf("get database ID: %w", err)
353397
}
354398

355399
lastTargetID, err := w.db.GetSyncState(SyncStateSyncTargetID)
356400
if err != nil {
357401
pool.Close()
358-
return fmt.Errorf("get sync target ID: %w", err)
402+
return false, fmt.Errorf("get sync target ID: %w", err)
359403
}
360404

361405
if lastTargetID != "" && lastTargetID != dbID {
@@ -370,50 +414,54 @@ func (w *SyncWorker) connect(timeout time.Duration) error {
370414
log.Printf("Sync: detected new Postgres database (was %s, now %s), clearing sync state for full re-sync", oldID, newID)
371415
if err := w.db.ClearAllSyncedAt(); err != nil {
372416
pool.Close()
373-
return fmt.Errorf("clear synced_at: %w", err)
417+
return false, fmt.Errorf("clear synced_at: %w", err)
374418
}
375419
// Also clear pull cursors so we pull all data from the new database
376420
for _, key := range []string{SyncStateLastJobCursor, SyncStateLastReviewCursor, SyncStateLastResponseID} {
377421
if err := w.db.SetSyncState(key, ""); err != nil {
378422
pool.Close()
379-
return fmt.Errorf("clear %s: %w", key, err)
423+
return false, fmt.Errorf("clear %s: %w", key, err)
380424
}
381425
}
382426
}
383427

384428
// Update the sync target ID
385429
if err := w.db.SetSyncState(SyncStateSyncTargetID, dbID); err != nil {
386430
pool.Close()
387-
return fmt.Errorf("set sync target ID: %w", err)
431+
return false, fmt.Errorf("set sync target ID: %w", err)
388432
}
389433

390434
// Register this machine
391435
machineID, err := w.db.GetMachineID()
392436
if err != nil {
393437
pool.Close()
394-
return fmt.Errorf("get machine ID: %w", err)
438+
return false, fmt.Errorf("get machine ID: %w", err)
395439
}
396440

397441
if err := pool.RegisterMachine(ctx, machineID, w.cfg.MachineName); err != nil {
398442
pool.Close()
399-
return fmt.Errorf("register machine: %w", err)
443+
return false, fmt.Errorf("register machine: %w", err)
400444
}
401445

402446
w.mu.Lock()
403447
w.pgPool = pool
404448
w.mu.Unlock()
405449

406-
return nil
450+
return true, nil
407451
}
408452

409453
// syncLoop runs the periodic sync until stop or disconnection
410-
func (w *SyncWorker) syncLoop(stopCh <-chan struct{}, interval time.Duration) {
454+
// doInitialSync controls whether to sync immediately on entry (only when we made the connection)
455+
func (w *SyncWorker) syncLoop(stopCh <-chan struct{}, interval time.Duration, doInitialSync bool) {
411456
ticker := time.NewTicker(interval)
412457
defer ticker.Stop()
413458

414-
// Do initial sync immediately
415-
if err := w.doSync(); err != nil {
416-
log.Printf("Sync: error: %v", err)
459+
// Do initial sync immediately only if we made the connection
460+
// (if SyncNow connected, it will handle its own sync)
461+
if doInitialSync {
462+
if err := w.doSync(); err != nil {
463+
log.Printf("Sync: error: %v", err)
464+
}
417465
}
418466

419467
for {

0 commit comments

Comments
 (0)