Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a5e132a
make possible to build image on macOS
fbalicchia Mar 31, 2026
c057836
valy db configurable
fbalicchia Mar 31, 2026
f7edb92
add valkeys params
fbalicchia Apr 1, 2026
3281513
Take in cosideration cache price too
fbalicchia Apr 1, 2026
e34740d
wip
fbalicchia Apr 1, 2026
81f37b6
git commit add test
fbalicchia Apr 2, 2026
35a997f
remove duplicate code
fbalicchia Apr 7, 2026
3d03309
use valkey istead of redis
fbalicchia Apr 8, 2026
24095d7
replace FT.SEARCH with direct key lookup for pending cache entries
fbalicchia Apr 8, 2026
917b084
add month case in ParseUnit
fbalicchia Apr 8, 2026
bb2da6b
Fix: make golint happy
fbalicchia Apr 8, 2026
599c1d6
remove blank line before error check in valkey_cache.go
fbalicchia Apr 8, 2026
bfca243
yet an other formt fix
fbalicchia Apr 8, 2026
03753d7
fix
fbalicchia Apr 8, 2026
92435ea
Fix : cross-compile arch
fbalicchia Apr 8, 2026
a8647c9
Merge branch 'main' into feat/global-model-ratelimit-clean
Xunzhuo Apr 9, 2026
77e8a0c
Fix: add missing helper_provider.go extracted from helper.go
fbalicchia Apr 9, 2026
1140d90
run locally make precommit-local
fbalicchia Apr 9, 2026
b756d7d
Update src/semantic-router/pkg/extproc/processor_res_cache.go
fbalicchia Apr 9, 2026
7fb6a36
Apply copilot change
fbalicchia Apr 9, 2026
b939324
Merge branch 'main' into feat/global-model-ratelimit-clean
fbalicchia Apr 9, 2026
dbad827
Fix TTL race conditions in Valkey cache and ratelimit providers
fbalicchia Apr 9, 2026
e973fd0
Fix TTL race in ratelimit, update cache index schema, and fix usage test
fbalicchia Apr 10, 2026
9c62e3a
Merge branch 'main' into feat/global-model-ratelimit-clean
fbalicchia Apr 10, 2026
7a82228
Fix: golint after merge
fbalicchia Apr 10, 2026
e620ce2
Merge branch 'main' into feat/global-model-ratelimit-clean
fbalicchia Apr 10, 2026
d69f2da
Merge branch 'main' into feat/global-model-ratelimit-clean
fbalicchia Apr 10, 2026
a693f1b
Revert bundled cache changes and improve ratelimit providers
fbalicchia Apr 10, 2026
341428a
Revert bundled cache changes and improve ratelimit providers
fbalicchia Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ global:
providers:
- type: redis
address: redis:6379
db: 0
domain: api
rules:
- name: premium-per-minute
Expand Down
87 changes: 0 additions & 87 deletions src/semantic-router/pkg/cache/valkey_cache_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,93 +7,6 @@ import (
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging"
)

// pendingEntry holds the parsed fields from a pending cache entry search result.
type pendingEntry struct {
docID string
model string
query string
requestBodyStr string
}

// extractPendingFields extracts a pendingEntry from a Valkey GLIDE doc map (the second element of FT.SEARCH results).
func extractPendingFields(docMap map[string]interface{}) (*pendingEntry, error) {
entry := &pendingEntry{}
for docKey, docValue := range docMap {
entry.docID = docKey

fieldsMap, mapOk := docValue.(map[string]interface{})
if !mapOk {
logging.Warnf("UpdateWithResponse: document fields is not a map, type=%T", docValue)
return nil, fmt.Errorf("invalid search result: expected fields map")
}

if v, exists := fieldsMap["model"]; exists {
entry.model = fmt.Sprint(v)
}
if v, exists := fieldsMap["query"]; exists {
entry.query = fmt.Sprint(v)
}
if v, exists := fieldsMap["request_body"]; exists {
entry.requestBodyStr = fmt.Sprint(v)
}

break // Only process the first document
}
return entry, nil
}

// parsePendingSearchResult extracts a pendingEntry from a Valkey FT.SEARCH result.
// Returns the entry or an error describing the parse failure.
func parsePendingSearchResult(results interface{}, requestID string, prefix string) (*pendingEntry, error) {
resultsArray, ok := results.([]interface{})
if !ok || len(resultsArray) < 1 {
logging.Infof("ValkeyCache.UpdateWithResponse: invalid result format for request_id=%s", requestID)
return nil, fmt.Errorf("invalid search result format")
}

totalResults, ok := resultsArray[0].(int64)
if !ok {
logging.Infof("ValkeyCache.UpdateWithResponse: invalid count type for request_id=%s (got %T)", requestID, resultsArray[0])
return nil, fmt.Errorf("invalid search result count type")
}

if totalResults == 0 {
logging.Infof("ValkeyCache.UpdateWithResponse: no pending entry found with request_id=%s (count=0, may still be indexing)", requestID)
return nil, fmt.Errorf("no pending entry found (indexing may still be in progress)")
}

logging.Infof("UpdateWithResponse: found %d result(s) for request_id=%s", totalResults, requestID)

if len(resultsArray) < 2 {
logging.Warnf("UpdateWithResponse: resultsArray only has %d elements", len(resultsArray))
return nil, fmt.Errorf("invalid search result: expected at least 2 elements")
}

docMap, ok := resultsArray[1].(map[string]interface{})
if !ok {
logging.Warnf("UpdateWithResponse: resultsArray[1] is not a map, type=%T", resultsArray[1])
return nil, fmt.Errorf("invalid search result: expected map at index 1")
}

entry, err := extractPendingFields(docMap)
if err != nil {
return nil, err
}

if !strings.HasPrefix(entry.docID, prefix) {
logging.Warnf("UpdateWithResponse: docID '%s' doesn't have expected prefix '%s'", entry.docID, prefix)
}

logging.Debugf("UpdateWithResponse: extracted docID='%s', model='%s', query='%s'", entry.docID, entry.model, entry.query)

if entry.model == "" || entry.query == "" {
logging.Warnf("UpdateWithResponse: missing required fields (model='%s', query='%s')", entry.model, entry.query)
return nil, fmt.Errorf("missing required fields in pending entry")
}

return entry, nil
}

// searchMatch holds the parsed fields from a vector search result.
type searchMatch struct {
distance float64
Expand Down
86 changes: 50 additions & 36 deletions src/semantic-router/pkg/cache/valkey_cache_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,64 +223,69 @@ func TestValkeyCacheIntegration_AddPendingRequest(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

func TestValkeyCacheIntegration_UpdateWithResponse(t *testing.T) {
cache := setupValkeyCacheIntegration(t)
defer func() { _ = cache.Close() }()

requestID := "req_update_456"
model := "gpt-4"
query := "What is Python?"
requestBody := []byte(`{"model":"gpt-4"}`)
ttlSeconds := 300

err := cache.AddPendingRequest(requestID, model, query, requestBody, ttlSeconds)
require.NoError(t, err)

// Wait for indexing with retry logic
time.Sleep(200 * time.Millisecond)

responseBody := []byte(`{"choices":[{"message":{"content":"Python is a programming language."}}]}`)

// Retry UpdateWithResponse to handle indexing delays
// retryUpdateWithResponse retries UpdateWithResponse up to maxRetries times with backoff.
func retryUpdateWithResponse(t *testing.T, cache *ValkeyCache, requestID string, responseBody []byte, ttlSeconds int) {
t.Helper()
var updateErr error
maxRetries := 5
for i := 0; i < maxRetries; i++ {
for i := 0; i < 5; i++ {
if i > 0 {
t.Logf("Retry %d/%d for UpdateWithResponse", i+1, maxRetries)
t.Logf("Retry %d/5 for UpdateWithResponse", i+1)
time.Sleep(time.Duration(200*(i+1)) * time.Millisecond)
}

updateErr = cache.UpdateWithResponse(requestID, responseBody, ttlSeconds)
if updateErr == nil {
break
return
}
t.Logf("UpdateWithResponse attempt %d failed: %v", i+1, updateErr)
}

require.NoError(t, updateErr, "UpdateWithResponse should succeed after retries")
}

// Wait longer for vector index to update with the new response and retry search
var foundResponse []byte
var hit bool
searchRetries := 5

for i := 0; i < searchRetries; i++ {
// retryFindSimilar retries FindSimilar until a hit is found or retries are exhausted.
func retryFindSimilar(t *testing.T, cache *ValkeyCache, model, query string) ([]byte, bool) {
t.Helper()
for i := 0; i < 5; i++ {
if i > 0 {
t.Logf("Search retry %d/%d for updated entry", i+1, searchRetries)
t.Logf("Search retry %d/5 for updated entry", i+1)
}
time.Sleep(time.Duration(200*(i+1)) * time.Millisecond)

foundResponse, hit, err = cache.FindSimilar(model, query)
foundResponse, hit, err := cache.FindSimilar(model, query)
require.NoError(t, err)

if hit {
break
return foundResponse, true
}
}
return nil, false
}

func TestValkeyCacheIntegration_UpdateWithResponse(t *testing.T) {
cache := setupValkeyCacheIntegration(t)
defer func() { _ = cache.Close() }()

requestID := "req_update_456"
model := "gpt-4"
query := "What is Python?"
requestBody := []byte(`{"model":"gpt-4"}`)
ttlSeconds := 300

err := cache.AddPendingRequest(requestID, model, query, requestBody, ttlSeconds)
require.NoError(t, err)

time.Sleep(200 * time.Millisecond)

responseBody := []byte(`{"choices":[{"message":{"content":"Python is a programming language."}}]}`)
retryUpdateWithResponse(t, cache, requestID, responseBody, ttlSeconds)

foundResponse, hit := retryFindSimilar(t, cache, model, query)
require.True(t, hit, "Should find the updated entry after retries")
assert.NotNil(t, foundResponse)
assert.Contains(t, string(foundResponse), "Python", "Updated response should be findable")

// Verify the pending lookup key was cleaned up
pendingKey := cache.config.Index.Prefix + "pending:" + requestID
result, err := cache.client.CustomCommand(context.Background(), []string{"GET", pendingKey})
assert.NoError(t, err)
assert.Nil(t, result, "Pending lookup key should be deleted after UpdateWithResponse")
}

func TestValkeyCacheIntegration_UpdateWithResponseSpecialChars(t *testing.T) {
Expand Down Expand Up @@ -334,6 +339,15 @@ func TestValkeyCacheIntegration_UpdateWithResponseSpecialChars(t *testing.T) {
}
}

func TestValkeyCacheIntegration_UpdateWithResponseUnknownRequestID(t *testing.T) {
cache := setupValkeyCacheIntegration(t)
defer func() { _ = cache.Close() }()

err := cache.UpdateWithResponse("nonexistent-request-id", []byte(`{"response":"test"}`), 300)
assert.Error(t, err, "UpdateWithResponse should fail for unknown request ID")
assert.Contains(t, err.Error(), "no pending entry found")
}

func TestValkeyCacheIntegration_TTLExpiration(t *testing.T) {
cache := setupValkeyCacheIntegration(t)
defer func() { _ = cache.Close() }()
Expand Down
1 change: 1 addition & 0 deletions src/semantic-router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type RateLimitConfig struct {
type RateLimitProviderConfig struct {
Type string `yaml:"type"`
Address string `yaml:"address,omitempty"`
DB int `yaml:"db,omitempty"`
Domain string `yaml:"domain,omitempty"`
Rules []RateLimitRule `yaml:"rules,omitempty"`
}
Expand Down
Loading
Loading