Skip to content

Commit f6c9719

Browse files
committed
chore: caching parameters for multiple routers and batchrouters
1 parent 081e6c3 commit f6c9719

File tree

5 files changed

+189
-22
lines changed

5 files changed

+189
-22
lines changed

app/apphandlers/embeddedAppHandler.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,13 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
301301
return fmt.Errorf("failed to create rt throttler factory: %w", err)
302302
}
303303
rtFactory := &router.Factory{
304-
Logger: logger.NewLogger().Child("router"),
305-
Reporting: reporting,
306-
BackendConfig: backendconfig.DefaultBackendConfig,
307-
RouterDB: routerDB,
304+
Logger: logger.NewLogger().Child("router"),
305+
Reporting: reporting,
306+
BackendConfig: backendconfig.DefaultBackendConfig,
307+
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time
308+
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
309+
routerDB,
310+
),
308311
ProcErrorDB: errDBForWrite,
309312
TransientSources: transientSources,
310313
RsourcesService: rsourcesService,
@@ -315,9 +318,12 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
315318
PendingEventsRegistry: pendingEventsRegistry,
316319
}
317320
brtFactory := &batchrouter.Factory{
318-
Reporting: reporting,
319-
BackendConfig: backendconfig.DefaultBackendConfig,
320-
RouterDB: batchRouterDB,
321+
Reporting: reporting,
322+
BackendConfig: backendconfig.DefaultBackendConfig,
323+
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time
324+
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
325+
batchRouterDB,
326+
),
321327
ProcErrorDB: errDBForWrite,
322328
TransientSources: transientSources,
323329
RsourcesService: rsourcesService,

app/apphandlers/processorAppHandler.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,13 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
299299
return fmt.Errorf("failed to create throttler factory: %w", err)
300300
}
301301
rtFactory := &router.Factory{
302-
Logger: logger.NewLogger().Child("router"),
303-
Reporting: reporting,
304-
BackendConfig: backendconfig.DefaultBackendConfig,
305-
RouterDB: routerDB,
302+
Logger: logger.NewLogger().Child("router"),
303+
Reporting: reporting,
304+
BackendConfig: backendconfig.DefaultBackendConfig,
305+
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time
306+
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
307+
routerDB,
308+
),
306309
ProcErrorDB: errDBForWrite,
307310
TransientSources: transientSources,
308311
RsourcesService: rsourcesService,
@@ -313,9 +316,12 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
313316
PendingEventsRegistry: pendingEventsRegistry,
314317
}
315318
brtFactory := &batchrouter.Factory{
316-
Reporting: reporting,
317-
BackendConfig: backendconfig.DefaultBackendConfig,
318-
RouterDB: batchRouterDB,
319+
Reporting: reporting,
320+
BackendConfig: backendconfig.DefaultBackendConfig,
321+
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time
322+
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
323+
batchRouterDB,
324+
),
319325
ProcErrorDB: errDBForWrite,
320326
TransientSources: transientSources,
321327
RsourcesService: rsourcesService,

jobsdb/jobsdb_parameters_cache.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package jobsdb
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/samber/lo"
9+
10+
"github.com/rudderlabs/rudder-go-kit/config"
11+
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
12+
)
13+
14+
// NewCachingDistinctParameterValuesJobsdb creates a new JobsDB decorator that caches the results of GetDistinctParameterValues
15+
// for a specified duration. It uses a partition locker to ensure that only one goroutine can access the cache for a
16+
// specific parameter at a time. The cache is invalidated after the specified TTL.
17+
// The cache is stored in memory and is not persistent across restarts.
18+
// The cache is thread-safe and can be accessed by multiple goroutines concurrently.
19+
func NewCachingDistinctParameterValuesJobsdb(
20+
ttl config.ValueLoader[time.Duration],
21+
jobsdb JobsDB,
22+
) JobsDB {
23+
return &cachingDpvJobsDB{
24+
ttl: ttl,
25+
parameterLock: kitsync.NewPartitionLocker(),
26+
cache: make(map[string]lo.Tuple2[[]string, time.Time]),
27+
JobsDB: jobsdb,
28+
}
29+
}
30+
31+
type cachingDpvJobsDB struct {
32+
ttl config.ValueLoader[time.Duration]
33+
parameterLock *kitsync.PartitionLocker
34+
cacheMu sync.RWMutex
35+
cache map[string]lo.Tuple2[[]string, time.Time]
36+
JobsDB
37+
}
38+
39+
func (c *cachingDpvJobsDB) GetDistinctParameterValues(ctx context.Context, parameterName string) (values []string, err error) {
40+
// only one goroutine can access the cache for a specific parameter at a time
41+
c.parameterLock.Lock(parameterName)
42+
defer c.parameterLock.Unlock(parameterName)
43+
44+
// read the cache
45+
c.cacheMu.RLock()
46+
if cachedEntry, ok := c.cache[parameterName]; ok && time.Since(cachedEntry.B) < c.ttl.Load() {
47+
c.cacheMu.RUnlock()
48+
return cachedEntry.A, nil
49+
}
50+
c.cacheMu.RUnlock()
51+
52+
// if not in cache or expired, fetch from DB
53+
// and update the cache
54+
values, err = c.JobsDB.GetDistinctParameterValues(ctx, parameterName)
55+
if err != nil {
56+
return nil, err
57+
}
58+
// update the cache with the new values
59+
c.cacheMu.Lock()
60+
c.cache[parameterName] = lo.Tuple2[[]string, time.Time]{A: values, B: time.Now()}
61+
c.cacheMu.Unlock()
62+
return values, nil
63+
}
+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package jobsdb
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/rudderlabs/rudder-go-kit/config"
13+
)
14+
15+
type cpvMockJobsdb struct {
16+
calls int
17+
JobsDB
18+
}
19+
20+
func (j *cpvMockJobsdb) GetDistinctParameterValues(ctx context.Context, parameterName string) ([]string, error) {
21+
j.calls++
22+
return []string{"value1", "value2"}, nil
23+
}
24+
25+
func TestCachingDistinctParameterValuesJobsdb(t *testing.T) {
26+
t.Run("single goroutine", func(t *testing.T) {
27+
// Create a mock JobsDB
28+
jobsdb := &cpvMockJobsdb{}
29+
30+
cachingJobsdb := NewCachingDistinctParameterValuesJobsdb(
31+
config.SingleValueLoader(100*time.Millisecond),
32+
jobsdb,
33+
)
34+
ctx := context.Background()
35+
36+
// First call should fetch from the mock JobsDB
37+
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
38+
require.NoError(t, err)
39+
require.Equal(t, 1, jobsdb.calls)
40+
require.Equal(t, []string{"value1", "value2"}, values)
41+
42+
// Second call should hit the cache
43+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
44+
require.NoError(t, err)
45+
require.Equal(t, 1, jobsdb.calls)
46+
require.Equal(t, []string{"value1", "value2"}, values)
47+
48+
time.Sleep(100 * time.Millisecond)
49+
50+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
51+
require.NoError(t, err)
52+
require.Equal(t, 2, jobsdb.calls)
53+
require.Equal(t, []string{"value1", "value2"}, values)
54+
})
55+
56+
t.Run("multiple goroutines and parameters", func(t *testing.T) {
57+
jobsdb := &cpvMockJobsdb{}
58+
59+
cachingJobsdb := NewCachingDistinctParameterValuesJobsdb(
60+
config.SingleValueLoader(100*time.Millisecond),
61+
jobsdb,
62+
)
63+
ctx := context.Background()
64+
65+
var wg sync.WaitGroup
66+
wg.Add(20)
67+
for i := range 10 {
68+
go func(i int) {
69+
defer wg.Done()
70+
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
71+
require.NoError(t, err)
72+
require.Equal(t, []string{"value1", "value2"}, values)
73+
time.Sleep(100 * time.Millisecond)
74+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
75+
require.NoError(t, err)
76+
require.Equal(t, []string{"value1", "value2"}, values)
77+
}(i)
78+
go func(i int) {
79+
defer wg.Done()
80+
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
81+
require.NoError(t, err)
82+
require.Equal(t, []string{"value1", "value2"}, values)
83+
time.Sleep(100 * time.Millisecond)
84+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
85+
require.NoError(t, err)
86+
require.Equal(t, []string{"value1", "value2"}, values)
87+
}(i)
88+
}
89+
wg.Wait()
90+
require.Equal(t, 20, jobsdb.calls)
91+
})
92+
}

processor/manager.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ type LifecycleManager struct {
2828
mainCtx context.Context
2929
currentCancel context.CancelFunc
3030
waitGroup interface{ Wait() }
31-
gatewayDB *jobsdb.Handle
32-
routerDB *jobsdb.Handle
33-
batchRouterDB *jobsdb.Handle
34-
readErrDB *jobsdb.Handle
35-
writeErrDB *jobsdb.Handle
36-
esDB *jobsdb.Handle
37-
arcDB *jobsdb.Handle
31+
gatewayDB jobsdb.JobsDB
32+
routerDB jobsdb.JobsDB
33+
batchRouterDB jobsdb.JobsDB
34+
readErrDB jobsdb.JobsDB
35+
writeErrDB jobsdb.JobsDB
36+
esDB jobsdb.JobsDB
37+
arcDB jobsdb.JobsDB
3838
clearDB *bool
3939
ReportingI types.Reporting // need not initialize again
4040
BackendConfig backendconfig.BackendConfig
@@ -116,7 +116,7 @@ func (proc *LifecycleManager) Stop() {
116116
func New(
117117
ctx context.Context,
118118
clearDb *bool,
119-
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
119+
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB jobsdb.JobsDB,
120120
reporting types.Reporting,
121121
transientSources transientsource.Service,
122122
fileuploader fileuploader.Provider,

0 commit comments

Comments
 (0)