Skip to content

[WIP] chore: caching parameters for multiple routers and batchrouters #5784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,13 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return fmt.Errorf("failed to create rt throttler factory: %w", err)
}
rtFactory := &router.Factory{
Logger: logger.NewLogger().Child("router"),
Reporting: reporting,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: routerDB,
Logger: logger.NewLogger().Child("router"),
Reporting: reporting,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
routerDB,
),
ProcErrorDB: errDBForWrite,
TransientSources: transientSources,
RsourcesService: rsourcesService,
Expand All @@ -315,9 +318,12 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
PendingEventsRegistry: pendingEventsRegistry,
}
brtFactory := &batchrouter.Factory{
Reporting: reporting,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: batchRouterDB,
Reporting: reporting,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
batchRouterDB,
),
ProcErrorDB: errDBForWrite,
TransientSources: transientSources,
RsourcesService: rsourcesService,
Expand Down
20 changes: 13 additions & 7 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,13 @@
return fmt.Errorf("failed to create throttler factory: %w", err)
}
rtFactory := &router.Factory{
Logger: logger.NewLogger().Child("router"),
Reporting: reporting,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: routerDB,
Logger: logger.NewLogger().Child("router"),
Reporting: reporting,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
routerDB,
),

Check warning on line 308 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L302-L308

Added lines #L302 - L308 were not covered by tests
ProcErrorDB: errDBForWrite,
TransientSources: transientSources,
RsourcesService: rsourcesService,
Expand All @@ -313,9 +316,12 @@
PendingEventsRegistry: pendingEventsRegistry,
}
brtFactory := &batchrouter.Factory{
Reporting: reporting,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: batchRouterDB,
Reporting: reporting,
BackendConfig: backendconfig.DefaultBackendConfig,
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
batchRouterDB,
),

Check warning on line 324 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L319-L324

Added lines #L319 - L324 were not covered by tests
ProcErrorDB: errDBForWrite,
TransientSources: transientSources,
RsourcesService: rsourcesService,
Expand Down
63 changes: 63 additions & 0 deletions jobsdb/jobsdb_parameters_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package jobsdb

import (
"context"
"sync"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/config"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
)

// NewCachingDistinctParameterValuesJobsdb creates a new JobsDB decorator that caches the results of GetDistinctParameterValues
// for a specified duration. It uses a partition locker to ensure that only one goroutine can access the cache for a
// specific parameter at a time. The cache is invalidated after the specified TTL.
// The cache is stored in memory and is not persistent across restarts.
// The cache is thread-safe and can be accessed by multiple goroutines concurrently.
func NewCachingDistinctParameterValuesJobsdb(
ttl config.ValueLoader[time.Duration],
jobsdb JobsDB,
) JobsDB {
return &cachingDpvJobsDB{
ttl: ttl,
parameterLock: kitsync.NewPartitionLocker(),
cache: make(map[string]lo.Tuple2[[]string, time.Time]),
JobsDB: jobsdb,
}
}

type cachingDpvJobsDB struct {
ttl config.ValueLoader[time.Duration]
parameterLock *kitsync.PartitionLocker
cacheMu sync.RWMutex
cache map[string]lo.Tuple2[[]string, time.Time]
JobsDB
}

func (c *cachingDpvJobsDB) GetDistinctParameterValues(ctx context.Context, parameterName string) (values []string, err error) {
// only one goroutine can access the cache for a specific parameter at a time
c.parameterLock.Lock(parameterName)
defer c.parameterLock.Unlock(parameterName)

// read the cache
c.cacheMu.RLock()
if cachedEntry, ok := c.cache[parameterName]; ok && time.Since(cachedEntry.B) < c.ttl.Load() {
c.cacheMu.RUnlock()
return cachedEntry.A, nil
}
c.cacheMu.RUnlock()

// if not in cache or expired, fetch from DB
// and update the cache
values, err = c.JobsDB.GetDistinctParameterValues(ctx, parameterName)
if err != nil {
return nil, err
}

Check warning on line 57 in jobsdb/jobsdb_parameters_cache.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb_parameters_cache.go#L56-L57

Added lines #L56 - L57 were not covered by tests
// update the cache with the new values
c.cacheMu.Lock()
c.cache[parameterName] = lo.Tuple2[[]string, time.Time]{A: values, B: time.Now()}
c.cacheMu.Unlock()
return values, nil
}
92 changes: 92 additions & 0 deletions jobsdb/jobsdb_parameters_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package jobsdb

import (
"context"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
)

type cpvMockJobsdb struct {
calls int
JobsDB
}

func (j *cpvMockJobsdb) GetDistinctParameterValues(ctx context.Context, parameterName string) ([]string, error) {
j.calls++
return []string{"value1", "value2"}, nil
}

func TestCachingDistinctParameterValuesJobsdb(t *testing.T) {
t.Run("single goroutine", func(t *testing.T) {
// Create a mock JobsDB
jobsdb := &cpvMockJobsdb{}

cachingJobsdb := NewCachingDistinctParameterValuesJobsdb(
config.SingleValueLoader(100*time.Millisecond),
jobsdb,
)
ctx := context.Background()

// First call should fetch from the mock JobsDB
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
require.NoError(t, err)
require.Equal(t, 1, jobsdb.calls)
require.Equal(t, []string{"value1", "value2"}, values)

// Second call should hit the cache
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
require.NoError(t, err)
require.Equal(t, 1, jobsdb.calls)
require.Equal(t, []string{"value1", "value2"}, values)

time.Sleep(100 * time.Millisecond)

values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
require.NoError(t, err)
require.Equal(t, 2, jobsdb.calls)
require.Equal(t, []string{"value1", "value2"}, values)
})

t.Run("multiple goroutines and parameters", func(t *testing.T) {
jobsdb := &cpvMockJobsdb{}

cachingJobsdb := NewCachingDistinctParameterValuesJobsdb(
config.SingleValueLoader(100*time.Millisecond),
jobsdb,
)
ctx := context.Background()

var wg sync.WaitGroup
wg.Add(20)
for i := range 10 {
go func(i int) {
defer wg.Done()
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
require.NoError(t, err)
require.Equal(t, []string{"value1", "value2"}, values)
time.Sleep(100 * time.Millisecond)
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
require.NoError(t, err)
require.Equal(t, []string{"value1", "value2"}, values)
}(i)
go func(i int) {
defer wg.Done()
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
require.NoError(t, err)
require.Equal(t, []string{"value1", "value2"}, values)
time.Sleep(100 * time.Millisecond)
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
require.NoError(t, err)
require.Equal(t, []string{"value1", "value2"}, values)
}(i)
}
wg.Wait()
require.Equal(t, 20, jobsdb.calls)
})
}
16 changes: 8 additions & 8 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ type LifecycleManager struct {
mainCtx context.Context
currentCancel context.CancelFunc
waitGroup interface{ Wait() }
gatewayDB *jobsdb.Handle
routerDB *jobsdb.Handle
batchRouterDB *jobsdb.Handle
readErrDB *jobsdb.Handle
writeErrDB *jobsdb.Handle
esDB *jobsdb.Handle
arcDB *jobsdb.Handle
gatewayDB jobsdb.JobsDB
routerDB jobsdb.JobsDB
batchRouterDB jobsdb.JobsDB
readErrDB jobsdb.JobsDB
writeErrDB jobsdb.JobsDB
esDB jobsdb.JobsDB
arcDB jobsdb.JobsDB
clearDB *bool
ReportingI types.Reporting // need not initialize again
BackendConfig backendconfig.BackendConfig
Expand Down Expand Up @@ -116,7 +116,7 @@ func (proc *LifecycleManager) Stop() {
func New(
ctx context.Context,
clearDb *bool,
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB jobsdb.JobsDB,
reporting types.Reporting,
transientSources transientsource.Service,
fileuploader fileuploader.Provider,
Expand Down
Loading