Skip to content

Commit db67020

Browse files
committed
chore: caching parameters for multiple routers and batchrouters
1 parent 081e6c3 commit db67020

File tree

5 files changed

+150
-22
lines changed

5 files changed

+150
-22
lines changed

app/apphandlers/embeddedAppHandler.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,13 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
301301
return fmt.Errorf("failed to create rt throttler factory: %w", err)
302302
}
303303
rtFactory := &router.Factory{
304-
Logger: logger.NewLogger().Child("router"),
305-
Reporting: reporting,
306-
BackendConfig: backendconfig.DefaultBackendConfig,
307-
RouterDB: routerDB,
304+
Logger: logger.NewLogger().Child("router"),
305+
Reporting: reporting,
306+
BackendConfig: backendconfig.DefaultBackendConfig,
307+
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time
308+
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
309+
routerDB,
310+
),
308311
ProcErrorDB: errDBForWrite,
309312
TransientSources: transientSources,
310313
RsourcesService: rsourcesService,
@@ -315,9 +318,12 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
315318
PendingEventsRegistry: pendingEventsRegistry,
316319
}
317320
brtFactory := &batchrouter.Factory{
318-
Reporting: reporting,
319-
BackendConfig: backendconfig.DefaultBackendConfig,
320-
RouterDB: batchRouterDB,
321+
Reporting: reporting,
322+
BackendConfig: backendconfig.DefaultBackendConfig,
323+
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time
324+
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
325+
batchRouterDB,
326+
),
321327
ProcErrorDB: errDBForWrite,
322328
TransientSources: transientSources,
323329
RsourcesService: rsourcesService,

app/apphandlers/processorAppHandler.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,13 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
299299
return fmt.Errorf("failed to create throttler factory: %w", err)
300300
}
301301
rtFactory := &router.Factory{
302-
Logger: logger.NewLogger().Child("router"),
303-
Reporting: reporting,
304-
BackendConfig: backendconfig.DefaultBackendConfig,
305-
RouterDB: routerDB,
302+
Logger: logger.NewLogger().Child("router"),
303+
Reporting: reporting,
304+
BackendConfig: backendconfig.DefaultBackendConfig,
305+
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple routers can share the same cache and not hit the DB every time
306+
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
307+
routerDB,
308+
),
306309
ProcErrorDB: errDBForWrite,
307310
TransientSources: transientSources,
308311
RsourcesService: rsourcesService,
@@ -313,9 +316,12 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
313316
PendingEventsRegistry: pendingEventsRegistry,
314317
}
315318
brtFactory := &batchrouter.Factory{
316-
Reporting: reporting,
317-
BackendConfig: backendconfig.DefaultBackendConfig,
318-
RouterDB: batchRouterDB,
319+
Reporting: reporting,
320+
BackendConfig: backendconfig.DefaultBackendConfig,
321+
RouterDB: jobsdb.NewCachingDistinctParameterValuesJobsdb( // using a cache so that multiple batch routers can share the same cache and not hit the DB every time
322+
config.GetReloadableDurationVar(1, time.Second, "JobsDB.rt.parameterValuesCacheTtl", "JobsDB.parameterValuesCacheTtl"),
323+
batchRouterDB,
324+
),
319325
ProcErrorDB: errDBForWrite,
320326
TransientSources: transientSources,
321327
RsourcesService: rsourcesService,

jobsdb/jobsdb_parameters_cache.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package jobsdb
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/samber/lo"
9+
10+
"github.com/rudderlabs/rudder-go-kit/config"
11+
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
12+
)
13+
14+
// NewCachingDistinctParameterValuesJobsdb creates a new JobsDB decorator that caches the results of GetDistinctParameterValues
15+
// for a specified duration. It uses a partition locker to ensure that only one goroutine can access the cache for a
16+
// specific parameter at a time. The cache is invalidated after the specified TTL.
17+
// The cache is stored in memory and is not persistent across restarts.
18+
// The cache is thread-safe and can be accessed by multiple goroutines concurrently.
19+
func NewCachingDistinctParameterValuesJobsdb(
20+
ttl config.ValueLoader[time.Duration],
21+
jobsdb JobsDB,
22+
) JobsDB {
23+
return &cachingDpvJobsDB{
24+
ttl: ttl,
25+
parameterLock: kitsync.NewPartitionLocker(),
26+
cache: make(map[string]lo.Tuple2[[]string, time.Time]),
27+
JobsDB: jobsdb,
28+
}
29+
}
30+
31+
type cachingDpvJobsDB struct {
32+
ttl config.ValueLoader[time.Duration]
33+
parameterLock *kitsync.PartitionLocker
34+
cacheMu sync.RWMutex
35+
cache map[string]lo.Tuple2[[]string, time.Time]
36+
JobsDB
37+
}
38+
39+
func (c *cachingDpvJobsDB) GetDistinctParameterValues(ctx context.Context, parameterName string) (values []string, err error) {
40+
// only one goroutine can access the cache for a specific parameter at a time
41+
c.parameterLock.Lock(parameterName)
42+
defer c.parameterLock.Unlock(parameterName)
43+
44+
// read the cache
45+
c.cacheMu.RLock()
46+
if cachedEntry, ok := c.cache[parameterName]; ok && time.Since(cachedEntry.B) < c.ttl.Load() {
47+
c.cacheMu.RUnlock()
48+
return cachedEntry.A, nil
49+
}
50+
c.cacheMu.RUnlock()
51+
52+
// if not in cache or expired, fetch from DB
53+
// and update the cache
54+
values, err = c.JobsDB.GetDistinctParameterValues(ctx, parameterName)
55+
if err != nil {
56+
return nil, err
57+
}
58+
// update the cache with the new values
59+
c.cacheMu.Lock()
60+
c.cache[parameterName] = lo.Tuple2[[]string, time.Time]{A: values, B: time.Now()}
61+
c.cacheMu.Unlock()
62+
return values, nil
63+
}
+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package jobsdb
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestCachingDistinctParameterValuesJobsdb(t *testing.T) {
10+
// Create a mock JobsDB
11+
jobsdb := &MockJobsDB{
12+
GetDistinctParameterValuesFunc: func(ctx context.Context, parameterName string) ([]string, error) {
13+
return []string{"value1", "value2"}, nil
14+
},
15+
}
16+
17+
// Create a caching decorator with a TTL of 1 second
18+
cachingJobsdb := NewCachingDistinctParameterValuesJobsdb(
19+
config.NewValueLoader[time.Duration](1*time.Second),
20+
jobsdb,
21+
)
22+
23+
ctx := context.Background()
24+
25+
// First call should fetch from the mock JobsDB
26+
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
27+
if err != nil {
28+
t.Fatalf("expected no error, got %v", err)
29+
}
30+
if len(values) != 2 {
31+
t.Fatalf("expected 2 values, got %d", len(values))
32+
}
33+
34+
// Second call should hit the cache
35+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
36+
if err != nil {
37+
t.Fatalf("expected no error, got %v", err)
38+
}
39+
if len(values) != 2 {
40+
t.Fatalf("expected 2 values, got %d", len(values))
41+
}
42+
43+
time.Sleep(2 * time.Second)
44+
45+
// After TTL expires, it should fetch from the mock JobsDB again
46+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
47+
if err != nil {
48+
t.Fatalf("expected no error, got %v", err)
49+
}
50+
if len(values) != 2 {
51+
t.Fatalf("expected 2 values, got %d", len(values))
52+
}
53+
}

processor/manager.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ type LifecycleManager struct {
2828
mainCtx context.Context
2929
currentCancel context.CancelFunc
3030
waitGroup interface{ Wait() }
31-
gatewayDB *jobsdb.Handle
32-
routerDB *jobsdb.Handle
33-
batchRouterDB *jobsdb.Handle
34-
readErrDB *jobsdb.Handle
35-
writeErrDB *jobsdb.Handle
36-
esDB *jobsdb.Handle
37-
arcDB *jobsdb.Handle
31+
gatewayDB jobsdb.JobsDB
32+
routerDB jobsdb.JobsDB
33+
batchRouterDB jobsdb.JobsDB
34+
readErrDB jobsdb.JobsDB
35+
writeErrDB jobsdb.JobsDB
36+
esDB jobsdb.JobsDB
37+
arcDB jobsdb.JobsDB
3838
clearDB *bool
3939
ReportingI types.Reporting // need not initialize again
4040
BackendConfig backendconfig.BackendConfig
@@ -116,7 +116,7 @@ func (proc *LifecycleManager) Stop() {
116116
func New(
117117
ctx context.Context,
118118
clearDb *bool,
119-
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
119+
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB jobsdb.JobsDB,
120120
reporting types.Reporting,
121121
transientSources transientsource.Service,
122122
fileuploader fileuploader.Provider,

0 commit comments

Comments
 (0)