Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/semantic-router/pkg/config/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,21 @@ type MemoryQualityScoringConfig struct {
InitialStrengthDays int `yaml:"initial_strength_days,omitempty"`
PruneThreshold float64 `yaml:"prune_threshold,omitempty"`
MaxMemoriesPerUser int `yaml:"max_memories_per_user,omitempty"`

// PruneInterval is the interval between background sweep runs (e.g. "6h", "24h").
// Empty or "0" disables the background sweep. Requires PruneSweepEnabled to be true.
PruneInterval string `yaml:"prune_interval,omitempty"`

// PruneBatchSize is the number of users to prune per batch during the background sweep (default: 50).
PruneBatchSize int `yaml:"prune_batch_size,omitempty"`

// PruneSweepEnabled enables the background prune sweep on this replica.
// In multi-replica deployments, set this to true on only one replica to avoid duplicate work.
PruneSweepEnabled bool `yaml:"prune_sweep_enabled,omitempty"`

// MaxConcurrentPrunes limits how many pruneIfOverCap goroutines can run at once
// to avoid overwhelming Milvus during high-throughput Store() bursts (default: 10).
MaxConcurrentPrunes int `yaml:"max_concurrent_prunes,omitempty"`
}

type MemoryReflectionConfig struct {
Expand Down
3 changes: 3 additions & 0 deletions src/semantic-router/pkg/extproc/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type OpenAIRouter struct {
// RateLimiter enforces per-user/model rate limits from multiple sources
// (Envoy RLS -> local limiter).
RateLimiter *ratelimit.RateLimitResolver

// StopPruneSweep stops the background memory prune sweep goroutine (nil if not started).
StopPruneSweep func()
}

// Ensure OpenAIRouter implements the ext_proc calls.
Expand Down
10 changes: 10 additions & 0 deletions src/semantic-router/pkg/extproc/router_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type routerComponents struct {
memoryExtractor *memory.MemoryExtractor
credentialResolver *authz.CredentialResolver
rateLimiter *ratelimit.RateLimitResolver
stopPruneSweep func()
}

// NewOpenAIRouter creates a new OpenAI API router instance.
Expand Down Expand Up @@ -104,6 +105,13 @@ func buildRouterComponents(cfg *config.RouterConfig) (*routerComponents, error)
credentialResolver := buildCredentialResolver(cfg)
rateLimiter := buildRateLimitResolver(cfg)

// Start background prune sweep if memory store is available and configured
var stopPruneSweep func()
if memoryStore != nil && cfg.Memory.QualityScoring.PruneSweepEnabled && cfg.Memory.QualityScoring.PruneInterval != "" {
stopPruneSweep = memory.StartPruneSweep(cfg.Memory.QualityScoring, memoryStore)
logging.Infof("Memory prune sweep enabled (interval=%s)", cfg.Memory.QualityScoring.PruneInterval)
}

if credentialResolver != nil {
logging.Infof("Credential resolver initialized with providers: %v", credentialResolver.ProviderNames())
}
Expand All @@ -125,6 +133,7 @@ func buildRouterComponents(cfg *config.RouterConfig) (*routerComponents, error)
memoryExtractor: memoryExtractor,
credentialResolver: credentialResolver,
rateLimiter: rateLimiter,
stopPruneSweep: stopPruneSweep,
}, nil
}

Expand All @@ -143,5 +152,6 @@ func (components *routerComponents) buildRouter() *OpenAIRouter {
MemoryExtractor: components.memoryExtractor,
CredentialResolver: components.credentialResolver,
RateLimiter: components.rateLimiter,
StopPruneSweep: components.stopPruneSweep,
}
}
7 changes: 6 additions & 1 deletion src/semantic-router/pkg/extproc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,13 @@ func (s *Server) Start() error {
return nil
}

// Stop stops the gRPC server
// Stop stops the gRPC server and background goroutines
func (s *Server) Stop() {
if s.service != nil {
if r := s.service.current.Load(); r != nil && r.StopPruneSweep != nil {
r.StopPruneSweep()
}
}
if s.server != nil {
s.server.GracefulStop()
logging.Infof("Server stopped")
Expand Down
91 changes: 91 additions & 0 deletions src/semantic-router/pkg/memory/milvus_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package memory

import (
"context"
"fmt"
"strings"
"time"

"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging"
)

// isTransientError checks if an error is transient and should be retried
func isTransientError(err error) bool {
if err == nil {
return false
}

errStr := strings.ToLower(err.Error())

// Check for common transient error patterns
transientPatterns := []string{
"connection",
"timeout",
"deadline exceeded",
"context deadline exceeded",
"unavailable",
"temporary",
"retry",
"rate limit",
"too many requests",
"server error",
"internal error",
"service unavailable",
"network",
"broken pipe",
"connection reset",
"no connection",
"connection refused",
}

for _, pattern := range transientPatterns {
if strings.Contains(errStr, pattern) {
return true
}
}

return false
}

// retryWithBackoff retries an operation with exponential backoff for transient errors
func (m *MilvusStore) retryWithBackoff(ctx context.Context, operation func() error) error {
var lastErr error

for attempt := 0; attempt < m.maxRetries; attempt++ {
lastErr = operation()

// If no error or non-transient error, return immediately
if lastErr == nil || !isTransientError(lastErr) {
return lastErr
}

// If this is the last attempt, return the error
if attempt == m.maxRetries-1 {
logging.Warnf("MilvusStore: operation failed after %d retries: %v", m.maxRetries, lastErr)
return lastErr
}

// Calculate exponential backoff delay
// Cap the exponent to avoid overflow (max 30 for safety)
exponent := attempt
if exponent < 0 {
exponent = 0
} else if exponent > 30 {
exponent = 30
}
delay := m.retryBaseDelay * time.Duration(1<<exponent) // 2^attempt * baseDelay

logging.Debugf("MilvusStore: transient error on attempt %d/%d, retrying in %v: %v",
attempt+1, m.maxRetries, delay, lastErr)

// Wait with context cancellation support
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled during retry: %w", ctx.Err())
case <-time.After(delay):
// Continue to next retry
}
}

return lastErr
}
95 changes: 13 additions & 82 deletions src/semantic-router/pkg/memory/milvus_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/milvus-io/milvus-sdk-go/v2/client"
Expand All @@ -23,6 +23,9 @@ const (
DefaultRetryBaseDelay = 100
)

// DefaultMaxConcurrentPrunes is the default limit for concurrent pruneIfOverCap goroutines.
const DefaultMaxConcurrentPrunes = 10

// MilvusStore provides memory retrieval from Milvus with similarity threshold filtering
type MilvusStore struct {
client client.Client
Expand All @@ -32,6 +35,8 @@ type MilvusStore struct {
maxRetries int
retryBaseDelay time.Duration
embeddingConfig EmbeddingConfig // Unified embedding configuration
pruneSem chan struct{} // bounds concurrent prune goroutines
pruneInFlight sync.Map // tracks userIDs with an active prune goroutine (dedup)
}

// MilvusStoreOptions contains configuration for creating a MilvusStore
Expand Down Expand Up @@ -80,6 +85,11 @@ func NewMilvusStore(options MilvusStoreOptions) (*MilvusStore, error) {
embeddingCfg = EmbeddingConfig{Model: EmbeddingModelBERT}
}

maxPrunes := cfg.QualityScoring.MaxConcurrentPrunes
if maxPrunes <= 0 {
maxPrunes = DefaultMaxConcurrentPrunes
}

store := &MilvusStore{
client: options.Client,
collectionName: options.CollectionName,
Expand All @@ -88,6 +98,7 @@ func NewMilvusStore(options MilvusStoreOptions) (*MilvusStore, error) {
maxRetries: DefaultMaxRetries,
retryBaseDelay: DefaultRetryBaseDelay * time.Millisecond,
embeddingConfig: embeddingCfg,
pruneSem: make(chan struct{}, maxPrunes),
}

// Auto-create collection if it doesn't exist
Expand Down Expand Up @@ -729,6 +740,7 @@ func (m *MilvusStore) Store(ctx context.Context, memory *Memory) error {
}

logging.Debugf("MilvusStore.Store: successfully stored memory id=%s", memory.ID)
m.triggerCapEnforcement(memory.UserID)
return nil
}

Expand Down Expand Up @@ -1402,84 +1414,3 @@ func (m *MilvusStore) forgetByScopeWithQuery(ctx context.Context, scope MemorySc
logging.Debugf("MilvusStore.ForgetByScope: deleted %d memories", len(idsToDelete))
return nil
}

// isTransientError checks if an error is transient and should be retried
func isTransientError(err error) bool {
if err == nil {
return false
}

errStr := strings.ToLower(err.Error())

// Check for common transient error patterns
transientPatterns := []string{
"connection",
"timeout",
"deadline exceeded",
"context deadline exceeded",
"unavailable",
"temporary",
"retry",
"rate limit",
"too many requests",
"server error",
"internal error",
"service unavailable",
"network",
"broken pipe",
"connection reset",
"no connection",
"connection refused",
}

for _, pattern := range transientPatterns {
if strings.Contains(errStr, pattern) {
return true
}
}

return false
}

// retryWithBackoff retries an operation with exponential backoff for transient errors
func (m *MilvusStore) retryWithBackoff(ctx context.Context, operation func() error) error {
var lastErr error

for attempt := 0; attempt < m.maxRetries; attempt++ {
lastErr = operation()

// If no error or non-transient error, return immediately
if lastErr == nil || !isTransientError(lastErr) {
return lastErr
}

// If this is the last attempt, return the error
if attempt == m.maxRetries-1 {
logging.Warnf("MilvusStore: operation failed after %d retries: %v", m.maxRetries, lastErr)
return lastErr
}

// Calculate exponential backoff delay
// Cap the exponent to avoid overflow (max 30 for safety)
exponent := attempt
if exponent < 0 {
exponent = 0
} else if exponent > 30 {
exponent = 30
}
delay := m.retryBaseDelay * time.Duration(1<<exponent) // 2^attempt * baseDelay

logging.Debugf("MilvusStore: transient error on attempt %d/%d, retrying in %v: %v",
attempt+1, m.maxRetries, delay, lastErr)

// Wait with context cancellation support
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled during retry: %w", ctx.Err())
case <-time.After(delay):
// Continue to next retry
}
}

return lastErr
}
Loading
Loading