Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions decentralized-api/payloadstorage/managed_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (m *ManagedStorage) cleanupLoop() {

func (m *ManagedStorage) cleanup() {
m.mu.Lock()
defer m.mu.Unlock()

now := time.Now()
for id, c := range m.cache {
Expand All @@ -119,23 +118,36 @@ func (m *ManagedStorage) cleanup() {
}
}

var start, threshold uint64
if m.maxEpoch > m.retainCount {
threshold := m.maxEpoch - m.retainCount
threshold = m.maxEpoch - m.retainCount

if m.minPruned+maxPruneLookback < threshold {
m.minPruned = threshold - maxPruneLookback
}
start = m.minPruned
}
m.mu.Unlock()

for epoch := m.minPruned; epoch < threshold; epoch++ {
go func(e uint64) {
if err := m.storage.PruneEpoch(context.Background(), e); err != nil {
logging.Warn("Auto-prune failed", types.PayloadStorage, "epochId", e, "error", err)
} else {
logging.Info("Auto-pruned epoch", types.PayloadStorage, "epochId", e)
}
}(epoch)
m.pruneEpochs(start, threshold)
}

// pruneEpochs prunes epochs [start, threshold) in order, advancing minPruned
// only past epochs that pruned successfully so a failed epoch is retried on
// the next cleanup tick instead of being skipped forever (#850). Pruning runs
// outside m.mu so storage I/O does not block cache reads; cleanupLoop is the
// only caller, so prunes never overlap.
func (m *ManagedStorage) pruneEpochs(start, threshold uint64) {
for epoch := start; epoch < threshold; epoch++ {
if err := m.storage.PruneEpoch(context.Background(), epoch); err != nil {
logging.Warn("Auto-prune failed, will retry on next cleanup", types.PayloadStorage, "epochId", epoch, "error", err)
return
}
m.minPruned = threshold
logging.Info("Auto-pruned epoch", types.PayloadStorage, "epochId", epoch)

m.mu.Lock()
m.minPruned = epoch + 1
m.mu.Unlock()
}
}

Expand Down
99 changes: 93 additions & 6 deletions decentralized-api/payloadstorage/managed_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package payloadstorage
import (
"bytes"
"context"
"errors"
"sync"
"testing"
"time"
Expand All @@ -13,6 +14,7 @@ type mockStorage struct {
data map[string][]byte
pruned []uint64
storeCb func(epochId uint64)
pruneCb func(epochId uint64) error
}

func newMockStorage() *mockStorage {
Expand Down Expand Up @@ -43,6 +45,11 @@ func (m *mockStorage) Retrieve(ctx context.Context, inferenceId string, epochId
func (m *mockStorage) PruneEpoch(ctx context.Context, epochId uint64) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.pruneCb != nil {
if err := m.pruneCb(epochId); err != nil {
return err
}
}
m.pruned = append(m.pruned, epochId)
return nil
}
Expand Down Expand Up @@ -150,12 +157,9 @@ func TestManagedStorage_AutoPruneTriggersInCleanup(t *testing.T) {
ms.Store(ctx, "inf-"+string(rune('a'+i)), i, []byte("p"), []byte("r"))
}

// Trigger cleanup manually
// Trigger cleanup manually; pruning runs synchronously
ms.cleanup()

// Wait for async prune goroutines
time.Sleep(50 * time.Millisecond)

pruned := mock.getPruned()
// threshold = 10 - 2 = 8
// minPruned starts at 0, but only last 10 should be pruned
Expand All @@ -175,7 +179,6 @@ func TestManagedStorage_AutoPruneSkipsOldEpochs(t *testing.T) {

// Trigger cleanup
ms.cleanup()
time.Sleep(50 * time.Millisecond)

pruned := mock.getPruned()
// threshold = 100 - 2 = 98
Expand All @@ -193,6 +196,91 @@ func TestManagedStorage_AutoPruneSkipsOldEpochs(t *testing.T) {
}
}

func TestManagedStorage_AutoPruneStopsAtFailedEpoch(t *testing.T) {
mock := newMockStorage()
pruneErr := errors.New("db connection lost")
mock.pruneCb = func(epochId uint64) error {
if epochId == 3 {
return pruneErr
}
return nil
}
ms := NewManagedStorageWithSize(mock, 2, time.Minute, 100)
ctx := context.Background()

for i := uint64(0); i <= 10; i++ {
ms.Store(ctx, "inf-"+string(rune('a'+i)), i, []byte("p"), []byte("r"))
}

// threshold = 10 - 2 = 8; epoch 3 fails, so only 0-2 should be pruned
ms.cleanup()

pruned := mock.getPruned()
if len(pruned) != 3 {
t.Errorf("expected 3 epochs pruned before failure, got %d: %v", len(pruned), pruned)
}
for _, e := range pruned {
if e >= 3 {
t.Errorf("epoch %d should not be pruned past the failed epoch 3", e)
}
}

ms.mu.RLock()
minPruned := ms.minPruned
ms.mu.RUnlock()
if minPruned != 3 {
t.Errorf("minPruned should stop at failed epoch 3, got %d", minPruned)
}
}

func TestManagedStorage_AutoPruneRetriesFailedEpoch(t *testing.T) {
mock := newMockStorage()
failOnce := true
mock.pruneCb = func(epochId uint64) error {
if epochId == 3 && failOnce {
failOnce = false
return errors.New("transient failure")
}
return nil
}
ms := NewManagedStorageWithSize(mock, 2, time.Minute, 100)
ctx := context.Background()

for i := uint64(0); i <= 10; i++ {
ms.Store(ctx, "inf-"+string(rune('a'+i)), i, []byte("p"), []byte("r"))
}

// First cleanup: prunes 0-2, fails at 3
ms.cleanup()
// Second cleanup: retries 3, then continues through 7
ms.cleanup()

pruned := mock.getPruned()
// threshold = 8: epochs 0-7 all pruned across the two ticks, none skipped
if len(pruned) != 8 {
t.Errorf("expected 8 epochs pruned after retry, got %d: %v", len(pruned), pruned)
}
seen := make(map[uint64]bool)
for _, e := range pruned {
if seen[e] {
t.Errorf("epoch %d pruned more than once", e)
}
seen[e] = true
}
for e := uint64(0); e < 8; e++ {
if !seen[e] {
t.Errorf("epoch %d was never pruned", e)
}
}

ms.mu.RLock()
minPruned := ms.minPruned
ms.mu.RUnlock()
if minPruned != 8 {
t.Errorf("minPruned should reach threshold 8 after retry, got %d", minPruned)
}
}

func TestManagedStorage_NoPruneWhenBelowRetainCount(t *testing.T) {
mock := newMockStorage()
ms := NewManagedStorageWithSize(mock, 5, time.Minute, 100)
Expand All @@ -204,7 +292,6 @@ func TestManagedStorage_NoPruneWhenBelowRetainCount(t *testing.T) {
}

ms.cleanup()
time.Sleep(50 * time.Millisecond)

pruned := mock.getPruned()
if len(pruned) != 0 {
Expand Down