Skip to content

Conversation

locnguyen1986
Copy link
Contributor

Implements caching system using Valkey (primary) with Redis fallback and NoOp for graceful degradation.
Optimizes the performance on:

  • UserService.FindByPublicID (called on every authenticated request)
  • Inference Model Registry


// Check every 5 minutes instead of every minute
ctab.AddJob("*/5 * * * *", func() {
cs.CheckInferenceModels(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9-10 minutes downtime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have another logic to handle this cron soon.
but let me rollback to every minute -))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I used to use a mechanism called "ticker" for health checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just check and see Ticker is good to, simple to run, but I think cron is fine too. We can centralize them in one place. Move back every minute scan.

Comment on lines 72 to 77
for id, model1 := range map1 {
model2, exists := map2[id]
if !exists || model1.Object != model2.Object || model1.OwnedBy != model2.OwnedBy {
return false
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should compare the maps in the reverse direction:

for id, model2 := range map2 {

Consider renaming the variables for clarity.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(but somehow I feel map1 and map2 are good enough)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switch to compare the model name with service name only. it's clear.

Comment on lines 33 to 36
CACHE_URL string
CACHE_PASSWORD string
CACHE_DB string
CACHE_TYPE string // "valkey" (primary) or "redis" (alternative)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove CACHE_TYPE and specify CACHE_REDIS_... or CACHE_VALKEY_...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we use REDIS_ only

Comment on lines 11 to 26
cacheType := strings.ToLower(environment_variables.EnvironmentVariables.CACHE_TYPE)

// Default to Valkey if no cache type is specified
if cacheType == "" {
cacheType = "valkey"
}

switch cacheType {
case "redis":
return NewRedisCacheService()
case "valkey":
return NewValkeyCacheService()
default:
// Fallback to Valkey for unknown types
return NewValkeyCacheService()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use just one specific service for caching, it will soon become a maintenance nightmare.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we use Redis only

address, password, db, err := parseValkeyURL(valkeyURL)
if err != nil {
// Return a no-op implementation for graceful degradation
return &NoOpCacheService{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic

Comment on lines 101 to 118
func (r *InferenceModelRegistry) SetModels(ctx context.Context, serviceName string, models []inferencemodel.Model) {
// Check if models have actually changed to avoid unnecessary cache operations
if !r.hasModelsChanged(serviceName, models) {
return // No changes, skip cache update
}

r.endpointToModels[serviceName] = functional.Map(models, func(model inferencemodel.Model) string {
r.modelsDetail[model.ID] = model
return model.ID
})
r.rebuild()

// Invalidate cache after setting models
r.invalidateCache(ctx)

// Populate cache with new registry data
r.populateCache(ctx)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: if we have n pods, each pod will rebuild the cache when a user adds a new model.
With poor timing, the pods will enter an endless rebuild loop (line 114 + line 117)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock is added

Comment on lines 67 to 74
func (r *RedisCacheService) Set(ctx context.Context, key string, value any, expiration time.Duration) error {
jsonValue, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("failed to marshal value: %w", err)
}

return r.client.Set(ctx, key, jsonValue, expiration).Err()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json.Marshal(value) should not be used for the general set function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.
I am thinking about type check.
but no, just leave arg as string, the function that call will process the conversion

Comment on lines 157 to 160
err := r.cache.GetWithFallback(ctx, cache.RegistryModelEndpointsKey, &modelToEndpoints, func() (any, error) {
// Cache miss, return from memory
return r.modelToEndpoints, nil
}, r.cacheExpiry)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a sorted set or list for the model cache.
Should we use LRange?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will introduce new entities for model soon, we may have different provider.

Comment on lines 84 to 88
existingModels, exists := r.endpointToModels[serviceName]
if !exists {
// Service doesn't exist, so it's a change
return len(newModels) > 0
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should retrieve it from Redis here when comparing model.

  • The local cache (endpointToModels) is simply syntactic sugar for accessing data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local variables are removed.

}

// NewRedisCacheService creates a new Redis cache service
func NewRedisCacheService() CacheService {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return CacheService, error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I throw panic :D :D

// Cache TTL constants
const (
// ModelsCacheTTL is the TTL for cached models list
ModelsCacheTTL = 10 * time.Minute
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we provide ttl to vllm models,

  • the expiration will cause downtime for /chat/completions if we get models from cache.
  • the worst-case downtime is almost 2 minutes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, can we add back model verification in /chat/completions and return an appropriate response if the model is not provided?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cache ttl maybe use when cron are failure or in another context. The downtime should be < 2 minutes with cron

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TTL here will definitely introduce downtime. Why do we have to deliberately introduce downtime to handle cases that may not exist?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I checked your latest implementation. Basically, you refresh (extend TTL) the cache every minute. It's good to go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:| I see, we have different context here, actually
ModelsCacheTTL -> use for JanInferenceProvider which is not called anywhere in current system, I may use it when combine models later.
and the Cron fetch the models and save into registry, we load models via Registry, not from JanInferenceProvider
but let me make this PR clean, YAGNI, let remove this cache in JanInferenceProvider. If I will need that, I will add it back

Comment on lines +245 to +252
for _, model := range janModelResp.Data {
models = append(models, inferencemodel.Model{
ID: model.ID,
Object: model.Object,
Created: model.Created,
OwnedBy: model.OwnedBy,
})
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

janModelResp.Data is []inferencemodel.Model. Why do we need to copy it again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they will be different soon.
one for model which is respond from inference client
one will be our local model.

Comment on lines 255 to 274
if len(models) > 0 {
modelsJSON, _ := json.Marshal(models)
r.cache.Set(ctx, cache.ModelsCacheKey, string(modelsJSON), r.cacheExpiry)

// Store service models mapping
serviceCacheKey := cache.RegistryEndpointModelsKey + ":" + sanitizeKeyPart(r.janClient.BaseURL)
modelIDs := functional.Map(models, func(model inferencemodel.Model) string {
return model.ID
})
modelIDsJSON, _ := json.Marshal(modelIDs)
r.cache.Set(ctx, serviceCacheKey, string(modelIDsJSON), r.cacheExpiry)

// Build model-to-endpoints mapping
modelToEndpoints := make(map[string][]string)
for _, model := range models {
modelToEndpoints[model.ID] = append(modelToEndpoints[model.ID], r.janClient.BaseURL)
}
modelToEndpointsJSON, _ := json.Marshal(modelToEndpoints)
r.cache.Set(ctx, cache.RegistryModelEndpointsKey, string(modelToEndpointsJSON), r.cacheExpiry)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are allocating multiple resources to cache from a single source of truth, you will need a lock.
Can we change the plan to allocate the resource to a single slot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache structure will be change soon
application -> organization -> project -> user

Comment on lines 31 to 39
if err != nil {
logger.GetLogger().Error(fmt.Sprintf("Failed to parse Redis URL: %v", err))
// Fallback to default configuration
opts = &redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := client.Ping(ctx).Err(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic

var updateErr error

// Execute with lock using go-redsync
err := cache.WithLock(s.cache, lockKey, func() error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not add lock everywhere.
Use optimistic locking here. We just need to cache the user instance and ignore the error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can pick one of cache strategy here:

  • delete cache on update.
  • update the cache with a single instruction.


type UserService struct {
userrepo UserRepository
cache cache.CacheService
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to store the cache in the repository, but I feel it's not mandatory.

  • The user service should not be aware of the user repository(persistance layer) changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UserCacheTTL is now stored in the Cache, which may become a large file and violate SPR soon. The cache is similar to an RDB, so we can wrap it with a service/repository.

@locnguyen1986 locnguyen1986 merged commit 55814f9 into main Sep 29, 2025
1 check passed
@locnguyen1986 locnguyen1986 deleted the feat/170/add-redis-support-models-loading branch September 29, 2025 02:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants