Skip to content

Commit 05c0a08

Browse files
committed
fix(galleryop): persist cancellation + periodically reap orphaned ops
Two distributed gaps surfaced when a replica was killed mid-upgrade on a live cluster, leaving the backend stuck 'processing' in the UI forever: 1. CancelOperation flipped the in-memory status to cancelled and broadcast a NATS event but never persisted the terminal status. On the next replica restart the still-active row re-hydrated straight back into processingBackends and the UI spun again. It now calls store.Cancel(id) so the cancel survives a restart. 2. CleanStale (which marks abandoned active ops failed) only ran once on startup, so an op orphaned AFTER startup - its owning replica's foreground handler goroutine gone - was never reaped until the next restart. Add GalleryService.ReapStaleOperations and run it on a 15m ticker (CleanStale now returns the reaped count for observability). Neither is covered by the OpCache self-evict fix: an orphaned op never reaches Processed, so it would never self-evict. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
1 parent 7824105 commit 05c0a08

4 files changed

Lines changed: 178 additions & 12 deletions

File tree

core/application/startup.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import (
2323
"github.com/mudler/LocalAI/core/services/routing/pii"
2424
"github.com/mudler/LocalAI/core/services/routing/router"
2525
"github.com/mudler/LocalAI/core/services/storage"
26-
"github.com/mudler/LocalAI/pkg/signals"
2726
coreStartup "github.com/mudler/LocalAI/core/startup"
2827
"github.com/mudler/LocalAI/internal"
28+
"github.com/mudler/LocalAI/pkg/signals"
2929
"github.com/mudler/LocalAI/pkg/vram"
3030

3131
"github.com/mudler/LocalAI/pkg/model"
@@ -308,10 +308,31 @@ func New(opts ...config.AppOption) (*Application, error) {
308308
application.galleryService.SetNATSClient(distSvc.Nats)
309309
if distSvc.DistStores != nil && distSvc.DistStores.Gallery != nil {
310310
// Clean up stale in-progress operations from previous crashed instances
311-
if err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
311+
if _, err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
312312
xlog.Warn("Failed to clean stale gallery operations", "error", err)
313313
}
314314
application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery)
315+
316+
// Reap stale ops periodically, not just at boot: an op orphaned by
317+
// a replica that died mid-install (its foreground handler goroutine
318+
// gone) would otherwise linger "processing" in the UI until the next
319+
// restart. 30m matches the install/upgrade ceiling so a genuinely
320+
// slow op is never reaped out from under itself.
321+
gsvc := application.galleryService
322+
go func() {
323+
ticker := time.NewTicker(15 * time.Minute)
324+
defer ticker.Stop()
325+
for {
326+
select {
327+
case <-options.Context.Done():
328+
return
329+
case <-ticker.C:
330+
if _, err := gsvc.ReapStaleOperations(30 * time.Minute); err != nil {
331+
xlog.Warn("Failed to reap stale gallery operations", "error", err)
332+
}
333+
}
334+
}
335+
}()
315336
}
316337
// Hydrate from the store first so the wildcard subscriber finds an
317338
// already-populated statuses map for any operations still in flight

core/services/distributed/gallery.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,18 +180,21 @@ func (s *GalleryStore) Cancel(id string) error {
180180
return s.UpdateStatus(id, "cancelled", "")
181181
}
182182

183-
// CleanStale marks abandoned in-progress operations as failed.
184-
// Should be called on startup to recover from crashed instances that
185-
// left records in pending/downloading/processing state.
186-
func (s *GalleryStore) CleanStale(age time.Duration) error {
183+
// CleanStale marks abandoned in-progress operations as failed and returns the
184+
// number of rows reaped. Called on startup AND periodically to recover from
185+
// crashed/restarted instances that left records in pending/downloading/
186+
// processing state — an op orphaned after startup would otherwise linger
187+
// "processing" until the next restart.
188+
func (s *GalleryStore) CleanStale(age time.Duration) (int64, error) {
187189
cutoff := time.Now().Add(-age)
188-
return s.db.Model(&GalleryOperationRecord{}).
190+
res := s.db.Model(&GalleryOperationRecord{}).
189191
Where("updated_at < ? AND status IN ?", cutoff, activeStatuses).
190192
Updates(map[string]any{
191193
"status": "failed",
192-
"error": "stale operation cleaned up on startup",
194+
"error": "stale operation reaped (abandoned by a crashed or restarted instance)",
193195
"updated_at": time.Now(),
194-
}).Error
196+
})
197+
return res.RowsAffected, res.Error
195198
}
196199

197200
// CleanOld removes operations older than the given duration.
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package galleryop_test
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
10+
"github.com/mudler/LocalAI/core/config"
11+
"github.com/mudler/LocalAI/core/services/distributed"
12+
"github.com/mudler/LocalAI/core/services/galleryop"
13+
"github.com/mudler/LocalAI/core/services/testutil"
14+
)
15+
16+
// Reproduces "a cancelled/orphaned op resurrects as 'processing' after a pod
17+
// restart". CancelOperation flipped the in-memory status to cancelled and
18+
// broadcast a NATS event, but never persisted the terminal status to the
19+
// gallery store. On the next replica restart the still-"pending" row hydrated
20+
// straight back into processingBackends and the UI spun again. CancelOperation
21+
// must persist the cancellation so it survives a restart.
22+
var _ = Describe("GalleryService.CancelOperation persistence", func() {
23+
It("persists the cancelled status to the gallery store", func() {
24+
db := testutil.SetupTestDB()
25+
store, err := distributed.NewGalleryStore(db)
26+
Expect(err).ToNot(HaveOccurred())
27+
28+
// Seed an in-flight op as if a replica was mid-install.
29+
Expect(store.Create(&distributed.GalleryOperationRecord{
30+
ID: "op-cancel",
31+
GalleryElementName: "llama-cpp-development",
32+
OpType: "backend_install",
33+
Status: "pending",
34+
Progress: 0,
35+
})).To(Succeed())
36+
37+
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
38+
svc.SetGalleryStore(store)
39+
// Make the op locally cancellable so CancelOperation proceeds.
40+
svc.StoreCancellation("op-cancel", context.CancelFunc(func() {}))
41+
42+
Expect(svc.CancelOperation("op-cancel")).To(Succeed())
43+
44+
// The persisted row must now be terminal — otherwise it re-hydrates as
45+
// pending on the next restart.
46+
rec, err := store.Get("op-cancel")
47+
Expect(err).ToNot(HaveOccurred())
48+
Expect(rec.Status).To(Equal("cancelled"))
49+
50+
// And a fresh service hydrating from the store must NOT see it as active.
51+
fresh := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
52+
fresh.SetGalleryStore(store)
53+
Expect(fresh.Hydrate()).To(Succeed())
54+
Expect(fresh.GetStatus("op-cancel")).To(BeNil(),
55+
"a cancelled op must not hydrate back as active after a restart")
56+
})
57+
})
58+
59+
// Reproduces "an op orphaned by a replica that died mid-flight stays 'pending'
60+
// forever". CleanStale (which marks abandoned active ops failed) only ran once
61+
// on startup, so an op orphaned AFTER startup was never reaped until the next
62+
// restart. The service must reap stale ops on an interval, not just at boot.
63+
var _ = Describe("GalleryService.ReapStaleOperations", func() {
64+
It("marks abandoned active ops terminal once they pass the age cutoff", func() {
65+
db := testutil.SetupTestDB()
66+
store, err := distributed.NewGalleryStore(db)
67+
Expect(err).ToNot(HaveOccurred())
68+
69+
Expect(store.Create(&distributed.GalleryOperationRecord{
70+
ID: "orphan-op",
71+
GalleryElementName: "llama-cpp-development",
72+
OpType: "backend_install",
73+
Status: "pending",
74+
Progress: 0,
75+
})).To(Succeed())
76+
// Force the row's updated_at into the past so it is older than the cutoff.
77+
Expect(db.Exec(
78+
"UPDATE gallery_operations SET updated_at = ? WHERE id = ?",
79+
time.Now().Add(-1*time.Hour), "orphan-op",
80+
).Error).To(Succeed())
81+
82+
// A fresh, still-progressing op must NOT be reaped.
83+
Expect(store.Create(&distributed.GalleryOperationRecord{
84+
ID: "live-op",
85+
GalleryElementName: "vllm-development",
86+
OpType: "backend_install",
87+
Status: "downloading",
88+
Progress: 50,
89+
})).To(Succeed())
90+
91+
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
92+
svc.SetGalleryStore(store)
93+
94+
reaped, err := svc.ReapStaleOperations(30 * time.Minute)
95+
Expect(err).ToNot(HaveOccurred())
96+
Expect(reaped).To(Equal(int64(1)))
97+
98+
orphan, err := store.Get("orphan-op")
99+
Expect(err).ToNot(HaveOccurred())
100+
Expect(orphan.Status).To(Equal("failed"))
101+
102+
live, err := store.Get("live-op")
103+
Expect(err).ToNot(HaveOccurred())
104+
Expect(live.Status).To(Equal("downloading"), "a recently-updated op must not be reaped")
105+
})
106+
})

core/services/galleryop/service.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"sync"
8+
"time"
89

910
"github.com/mudler/LocalAI/core/config"
1011
"github.com/mudler/LocalAI/core/gallery"
@@ -31,9 +32,9 @@ type GalleryService struct {
3132
// natsClient is the wider MessagingClient (Publisher + subscribe methods)
3233
// when wired by the distributed startup path; broadcastSubs holds the
3334
// progress + cancel subscriptions opened by SubscribeBroadcasts.
34-
natsClient messaging.MessagingClient
35-
galleryStore *distributed.GalleryStore
36-
broadcastSubs []messaging.Subscription
35+
natsClient messaging.MessagingClient
36+
galleryStore *distributed.GalleryStore
37+
broadcastSubs []messaging.Subscription
3738

3839
// OnBackendOpCompleted is fired after every successful install/upgrade/delete
3940
// on the backend channel. The Application wires this to UpgradeChecker.TriggerCheck
@@ -274,6 +275,29 @@ func (g *GalleryService) GetAllStatus() map[string]*OpStatus {
274275
return g.statuses
275276
}
276277

278+
// ReapStaleOperations marks abandoned in-progress operations (pending/
279+
// downloading/processing) older than `age` as failed, so an op orphaned by a
280+
// replica that died mid-flight does not linger as "processing" forever. The
281+
// store's CleanStale runs once on startup; this exposes it for periodic
282+
// invocation (a post-startup orphan is otherwise not reaped until the next
283+
// restart). No-op when no gallery store is wired. Returns rows reaped.
284+
func (g *GalleryService) ReapStaleOperations(age time.Duration) (int64, error) {
285+
g.Lock()
286+
store := g.galleryStore
287+
g.Unlock()
288+
if store == nil {
289+
return 0, nil
290+
}
291+
n, err := store.CleanStale(age)
292+
if err != nil {
293+
return 0, err
294+
}
295+
if n > 0 {
296+
xlog.Info("Reaped stale gallery operations", "count", n)
297+
}
298+
return n, nil
299+
}
300+
277301
// CancelOperation cancels an in-progress operation by its ID.
278302
//
279303
// In distributed mode the UI's cancel click may land on a different replica
@@ -295,6 +319,7 @@ func (g *GalleryService) CancelOperation(id string) error {
295319
}
296320

297321
nc := g.natsClient
322+
store := g.galleryStore
298323

299324
if !localExists && nc == nil {
300325
g.Unlock()
@@ -315,6 +340,17 @@ func (g *GalleryService) CancelOperation(id string) error {
315340
}
316341
g.Unlock()
317342

343+
// Persist the terminal status so the cancel survives a restart. Without
344+
// this the row stays in its active state and re-hydrates straight back into
345+
// processingBackends on the next replica boot — the UI spins again on an op
346+
// the admin already cancelled. The peer that broadcasts wins the write; a
347+
// no-op when standalone (store nil).
348+
if store != nil {
349+
if err := store.Cancel(id); err != nil {
350+
xlog.Warn("Failed to persist gallery operation cancellation", "op_id", id, "error", err)
351+
}
352+
}
353+
318354
// I/O and user-provided callback after Unlock — the cancel-wildcard
319355
// subscriber loops back into applyCancel on this same replica, which
320356
// would otherwise deadlock on g.Mutex.

0 commit comments

Comments
 (0)