From f6c97197a96f4d68725d17d727ea5faa49d88abf Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Mon, 28 Apr 2025 20:11:20 +0300 Subject: [PATCH] chore: caching parameters for multiple routers and batchrouters --- app/apphandlers/embeddedAppHandler.go | 20 ++++-- app/apphandlers/processorAppHandler.go | 20 ++++-- jobsdb/jobsdb_parameters_cache.go | 63 ++++++++++++++++++ jobsdb/jobsdb_parameters_cache_test.go | 92 ++++++++++++++++++++++++++ processor/manager.go | 16 ++--- 5 files changed, 189 insertions(+), 22 deletions(-) create mode 100644 jobsdb/jobsdb_parameters_cache.go create mode 100644 jobsdb/jobsdb_parameters_cache_test.go diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index fd3557d2ad..79fd1bad46 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -301,10 +301,13 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) return fmt.Errorf("failed to create rt throttler factory: %w", err) } rtFactory := &router.Factory{ - Logger: logger.NewLogger().Child("router"), - Reporting: reporting, - BackendConfig: backendconfig.DefaultBackendConfig, - RouterDB: routerDB, + Logger: logger.NewLogger().Child("router"), + Reporting: reporting, + BackendConfig: backendconfig.DefaultBackendConfig, + RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time + config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"), + routerDB, + ), ProcErrorDB: errDBForWrite, TransientSources: transientSources, RsourcesService: rsourcesService, @@ -315,9 +318,12 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) PendingEventsRegistry: pendingEventsRegistry, } brtFactory := &batchrouter.Factory{ - Reporting: reporting, - BackendConfig: backendconfig.DefaultBackendConfig, - RouterDB: batchRouterDB, + Reporting: reporting, + BackendConfig: backendconfig.DefaultBackendConfig, + RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time + config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"), + batchRouterDB, + ), ProcErrorDB: errDBForWrite, TransientSources: transientSources, RsourcesService: rsourcesService, diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index 4697337870..63c240885f 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -299,10 +299,13 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options return fmt.Errorf("failed to create throttler factory: %w", err) } rtFactory := &router.Factory{ - Logger: logger.NewLogger().Child("router"), - Reporting: reporting, - BackendConfig: backendconfig.DefaultBackendConfig, - RouterDB: routerDB, + Logger: logger.NewLogger().Child("router"), + Reporting: reporting, + BackendConfig: backendconfig.DefaultBackendConfig, + RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time + config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"), + routerDB, + ), ProcErrorDB: errDBForWrite, TransientSources: transientSources, RsourcesService: rsourcesService, @@ -313,9 +316,12 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options PendingEventsRegistry: pendingEventsRegistry, } brtFactory := &batchrouter.Factory{ - Reporting: reporting, - BackendConfig: backendconfig.DefaultBackendConfig, - RouterDB: batchRouterDB, + Reporting: reporting, + BackendConfig: backendconfig.DefaultBackendConfig, + RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time + config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"), + batchRouterDB, + ), ProcErrorDB: errDBForWrite, TransientSources: transientSources, RsourcesService: rsourcesService, diff --git a/jobsdb/jobsdb_parameters_cache.go b/jobsdb/jobsdb_parameters_cache.go new file mode 100644 index 0000000000..5654743d1d --- /dev/null +++ b/jobsdb/jobsdb_parameters_cache.go @@ -0,0 +1,63 @@ +package jobsdb + +import ( + "context" + "sync" + "time" + + "github.com/samber/lo" + + "github.com/rudderlabs/rudder-go-kit/config" + kitsync "github.com/rudderlabs/rudder-go-kit/sync" +) + +// NewCachingDistinctParameterValuesJobsdb creates a new JobsDB decorator that caches the results of GetDistinctParameterValues +// for a specified duration. It uses a partition locker to ensure that only one goroutine can access the cache for a +// specific parameter at a time. The cache is invalidated after the specified TTL. +// The cache is stored in memory and is not persistent across restarts. +// The cache is thread-safe and can be accessed by multiple goroutines concurrently. +func NewCachingDistinctParameterValuesJobsdb( + ttl config.ValueLoader[time.Duration], + jobsdb JobsDB, +) JobsDB { + return &cachingDpvJobsDB{ + ttl: ttl, + parameterLock: kitsync.NewPartitionLocker(), + cache: make(map[string]lo.Tuple2[[]string, time.Time]), + JobsDB: jobsdb, + } +} + +type cachingDpvJobsDB struct { + ttl config.ValueLoader[time.Duration] + parameterLock *kitsync.PartitionLocker + cacheMu sync.RWMutex + cache map[string]lo.Tuple2[[]string, time.Time] + JobsDB +} + +func (c *cachingDpvJobsDB) GetDistinctParameterValues(ctx context.Context, parameterName string) (values []string, err error) { + // only one goroutine can access the cache for a specific parameter at a time + c.parameterLock.Lock(parameterName) + defer c.parameterLock.Unlock(parameterName) + + // read the cache + c.cacheMu.RLock() + if cachedEntry, ok := c.cache[parameterName]; ok && time.Since(cachedEntry.B) < c.ttl.Load() { + c.cacheMu.RUnlock() + return cachedEntry.A, nil + } + c.cacheMu.RUnlock() + + // if not in cache or expired, fetch from DB + // and update the cache + values, err = c.JobsDB.GetDistinctParameterValues(ctx, parameterName) + if err != nil { + return nil, err + } + // update the cache with the new values + c.cacheMu.Lock() + c.cache[parameterName] = lo.Tuple2[[]string, time.Time]{A: values, B: time.Now()} + c.cacheMu.Unlock() + return values, nil +} diff --git a/jobsdb/jobsdb_parameters_cache_test.go b/jobsdb/jobsdb_parameters_cache_test.go new file mode 100644 index 0000000000..dc773e2b16 --- /dev/null +++ b/jobsdb/jobsdb_parameters_cache_test.go @@ -0,0 +1,92 @@ +package jobsdb + +import ( + "context" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/config" +) + +type cpvMockJobsdb struct { + calls int + JobsDB +} + +func (j *cpvMockJobsdb) GetDistinctParameterValues(ctx context.Context, parameterName string) ([]string, error) { + j.calls++ + return []string{"value1", "value2"}, nil +} + +func TestCachingDistinctParameterValuesJobsdb(t *testing.T) { + t.Run("single goroutine", func(t *testing.T) { + // Create a mock JobsDB + jobsdb := &cpvMockJobsdb{} + + cachingJobsdb := NewCachingDistinctParameterValuesJobsdb( + config.SingleValueLoader(100*time.Millisecond), + jobsdb, + ) + ctx := context.Background() + + // First call should fetch from the mock JobsDB + values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter") + require.NoError(t, err) + require.Equal(t, 1, jobsdb.calls) + require.Equal(t, []string{"value1", "value2"}, values) + + // Second call should hit the cache + values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter") + require.NoError(t, err) + require.Equal(t, 1, jobsdb.calls) + require.Equal(t, []string{"value1", "value2"}, values) + + time.Sleep(100 * time.Millisecond) + + values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter") + require.NoError(t, err) + require.Equal(t, 2, jobsdb.calls) + require.Equal(t, []string{"value1", "value2"}, values) + }) + + t.Run("multiple goroutines and parameters", func(t *testing.T) { + jobsdb := &cpvMockJobsdb{} + + cachingJobsdb := NewCachingDistinctParameterValuesJobsdb( + config.SingleValueLoader(100*time.Millisecond), + jobsdb, + ) + ctx := context.Background() + + var wg sync.WaitGroup + wg.Add(20) + for i := range 10 { + go func(i int) { + defer wg.Done() + values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i)) + require.NoError(t, err) + require.Equal(t, []string{"value1", "value2"}, values) + time.Sleep(100 * time.Millisecond) + values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i)) + require.NoError(t, err) + require.Equal(t, []string{"value1", "value2"}, values) + }(i) + go func(i int) { + defer wg.Done() + values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i)) + require.NoError(t, err) + require.Equal(t, []string{"value1", "value2"}, values) + time.Sleep(100 * time.Millisecond) + values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i)) + require.NoError(t, err) + require.Equal(t, []string{"value1", "value2"}, values) + }(i) + } + wg.Wait() + require.Equal(t, 20, jobsdb.calls) + }) +} diff --git a/processor/manager.go b/processor/manager.go index 191501c7b2..a1b047b267 100644 --- a/processor/manager.go +++ b/processor/manager.go @@ -28,13 +28,13 @@ type LifecycleManager struct { mainCtx context.Context currentCancel context.CancelFunc waitGroup interface{ Wait() } - gatewayDB *jobsdb.Handle - routerDB *jobsdb.Handle - batchRouterDB *jobsdb.Handle - readErrDB *jobsdb.Handle - writeErrDB *jobsdb.Handle - esDB *jobsdb.Handle - arcDB *jobsdb.Handle + gatewayDB jobsdb.JobsDB + routerDB jobsdb.JobsDB + batchRouterDB jobsdb.JobsDB + readErrDB jobsdb.JobsDB + writeErrDB jobsdb.JobsDB + esDB jobsdb.JobsDB + arcDB jobsdb.JobsDB clearDB *bool ReportingI types.Reporting // need not initialize again BackendConfig backendconfig.BackendConfig @@ -116,7 +116,7 @@ func (proc *LifecycleManager) Stop() { func New( ctx context.Context, clearDb *bool, - gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle, + gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB jobsdb.JobsDB, reporting types.Reporting, transientSources transientsource.Service, fileuploader fileuploader.Provider,