Skip to content

Commit 8d0a868

Browse files
committed
fixup! chore: caching parameters for multiple routers and batchrouters
1 parent 4d83c2d commit 8d0a868

File tree

3 files changed

+37
-59
lines changed

3 files changed

+37
-59
lines changed

jobsdb/jobsdb.go

+21-45
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import (
4141
"github.com/samber/lo"
4242
"github.com/tidwall/gjson"
4343
"golang.org/x/sync/errgroup"
44-
"golang.org/x/sync/singleflight"
4544

4645
"github.com/rudderlabs/rudder-go-kit/bytesize"
4746
"github.com/rudderlabs/rudder-go-kit/config"
@@ -471,14 +470,13 @@ type Handle struct {
471470
logger logger.Logger
472471
stats stats.Stats
473472

474-
datasetList []dataSetT
475-
datasetRangeList []dataSetRangeT
476-
dsRangeFuncMap map[string]func() (dsRangeMinMax, error)
477-
distinctValuesCache *distinctValuesCache
478-
distinctParameterSingleFlight *singleflight.Group
479-
dsListLock *lock.Locker
480-
dsMigrationLock *lock.Locker
481-
noResultsCache *cache.NoResultsCache[ParameterFilterT]
473+
datasetList []dataSetT
474+
datasetRangeList []dataSetRangeT
475+
dsRangeFuncMap map[string]func() (dsRangeMinMax, error)
476+
distinctValuesCache *distinctValuesCache
477+
dsListLock *lock.Locker
478+
dsMigrationLock *lock.Locker
479+
noResultsCache *cache.NoResultsCache[ParameterFilterT]
482480

483481
// table count stats
484482
statTableCount stats.Measurement
@@ -781,7 +779,6 @@ func (jd *Handle) init() {
781779
jd.logger = logger.NewLogger().Child("jobsdb").Child(jd.tablePrefix)
782780
}
783781
jd.dsRangeFuncMap = make(map[string]func() (dsRangeMinMax, error))
784-
jd.distinctParameterSingleFlight = new(singleflight.Group)
785782
jd.distinctValuesCache = NewDistinctValuesCache()
786783

787784
if jd.config == nil {
@@ -1983,43 +1980,22 @@ func (jd *Handle) getDistinctValuesPerDataset(
19831980
}
19841981

19851982
func (jd *Handle) GetDistinctParameterValues(ctx context.Context, parameter ParameterName) ([]string, error) {
1986-
res, err, shared := jd.distinctParameterSingleFlight.Do(parameter.string(), func() (interface{}, error) {
1987-
if !jd.dsMigrationLock.RTryLockWithCtx(ctx) {
1988-
return nil, fmt.Errorf("could not acquire a migration read lock: %w", ctx.Err())
1989-
}
1990-
defer jd.dsMigrationLock.RUnlock()
1991-
if !jd.dsListLock.RTryLockWithCtx(ctx) {
1992-
return nil, fmt.Errorf("could not acquire a dslist read lock: %w", ctx.Err())
1993-
}
1994-
dsList := jd.getDSList()
1995-
jd.logger.Info(dsList)
1996-
jd.dsListLock.RUnlock()
1997-
values, err := jd.distinctValuesCache.GetDistinctValues(
1998-
parameter.string(),
1999-
lo.Map(dsList, func(ds dataSetT, _ int) string { return ds.JobTable }),
2000-
func(datasets []string) (map[string][]string, error) {
2001-
return jd.getDistinctValuesPerDataset(datasets, parameter)
2002-
},
2003-
)
2004-
if err != nil {
2005-
return nil, err
2006-
}
2007-
return values, nil
2008-
})
2009-
if shared {
2010-
jd.stats.NewTaggedStat("jobsdb_get_distinct_parameter_values_shared", stats.CountType, stats.Tags{
2011-
"parameter": parameter.string(),
2012-
"tablePrefix": jd.tablePrefix,
2013-
}).Increment()
2014-
}
2015-
if err != nil {
2016-
return nil, err
1983+
if !jd.dsMigrationLock.RTryLockWithCtx(ctx) {
1984+
return nil, fmt.Errorf("could not acquire a migration read lock: %w", ctx.Err())
20171985
}
2018-
val, ok := res.([]string)
2019-
if !ok {
2020-
return nil, fmt.Errorf("type assertion failed")
1986+
defer jd.dsMigrationLock.RUnlock()
1987+
if !jd.dsListLock.RTryLockWithCtx(ctx) {
1988+
return nil, fmt.Errorf("could not acquire a dslist read lock: %w", ctx.Err())
20211989
}
2022-
return val, nil
1990+
dsList := jd.getDSList()
1991+
jd.dsListLock.RUnlock()
1992+
return jd.distinctValuesCache.GetDistinctValues(
1993+
parameter.string(),
1994+
lo.Map(dsList, func(ds dataSetT, _ int) string { return ds.JobTable }),
1995+
func(datasets []string) (map[string][]string, error) {
1996+
return jd.getDistinctValuesPerDataset(datasets, parameter)
1997+
},
1998+
)
20231999
}
20242000

20252001
func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobList []*JobT) error {

jobsdb/jobsdb_parameters_cache.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,28 @@ type cachingDpvJobsDB struct {
3636
JobsDB
3737
}
3838

39-
func (c *cachingDpvJobsDB) GetDistinctParameterValues(ctx context.Context, parameterName string) (values []string, err error) {
39+
func (c *cachingDpvJobsDB) GetDistinctParameterValues(ctx context.Context, parameter ParameterName) (values []string, err error) {
4040
// only one goroutine can access the cache for a specific parameter at a time
41-
c.parameterLock.Lock(parameterName)
42-
defer c.parameterLock.Unlock(parameterName)
41+
c.parameterLock.Lock(parameter.string())
42+
defer c.parameterLock.Unlock(parameter.string())
4343

4444
// read the cache
4545
c.cacheMu.RLock()
46-
if cachedEntry, ok := c.cache[parameterName]; ok && time.Since(cachedEntry.B) < c.ttl.Load() {
46+
if cachedEntry, ok := c.cache[parameter.string()]; ok && time.Since(cachedEntry.B) < c.ttl.Load() {
4747
c.cacheMu.RUnlock()
4848
return cachedEntry.A, nil
4949
}
5050
c.cacheMu.RUnlock()
5151

5252
// if not in cache or expired, fetch from DB
5353
// and update the cache
54-
values, err = c.JobsDB.GetDistinctParameterValues(ctx, parameterName)
54+
values, err = c.JobsDB.GetDistinctParameterValues(ctx, parameter)
5555
if err != nil {
5656
return nil, err
5757
}
5858
// update the cache with the new values
5959
c.cacheMu.Lock()
60-
c.cache[parameterName] = lo.Tuple2[[]string, time.Time]{A: values, B: time.Now()}
60+
c.cache[parameter.string()] = lo.Tuple2[[]string, time.Time]{A: values, B: time.Now()}
6161
c.cacheMu.Unlock()
6262
return values, nil
6363
}

jobsdb/jobsdb_parameters_cache_test.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ type cpvMockJobsdb struct {
1717
JobsDB
1818
}
1919

20-
func (j *cpvMockJobsdb) GetDistinctParameterValues(ctx context.Context, parameterName string) ([]string, error) {
20+
func (j *cpvMockJobsdb) GetDistinctParameterValues(ctx context.Context, parameter ParameterName) ([]string, error) {
2121
j.calls++
2222
return []string{"value1", "value2"}, nil
2323
}
2424

25+
var testParameter parameterName = "test_parameter"
26+
2527
func TestCachingDistinctParameterValuesJobsdb(t *testing.T) {
2628
t.Run("single goroutine", func(t *testing.T) {
2729
// Create a mock JobsDB
@@ -34,20 +36,20 @@ func TestCachingDistinctParameterValuesJobsdb(t *testing.T) {
3436
ctx := context.Background()
3537

3638
// First call should fetch from the mock JobsDB
37-
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
39+
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, testParameter)
3840
require.NoError(t, err)
3941
require.Equal(t, 1, jobsdb.calls)
4042
require.Equal(t, []string{"value1", "value2"}, values)
4143

4244
// Second call should hit the cache
43-
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
45+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, testParameter)
4446
require.NoError(t, err)
4547
require.Equal(t, 1, jobsdb.calls)
4648
require.Equal(t, []string{"value1", "value2"}, values)
4749

4850
time.Sleep(100 * time.Millisecond)
4951

50-
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter")
52+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, testParameter)
5153
require.NoError(t, err)
5254
require.Equal(t, 2, jobsdb.calls)
5355
require.Equal(t, []string{"value1", "value2"}, values)
@@ -67,21 +69,21 @@ func TestCachingDistinctParameterValuesJobsdb(t *testing.T) {
6769
for i := range 10 {
6870
go func(i int) {
6971
defer wg.Done()
70-
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
72+
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)))
7173
require.NoError(t, err)
7274
require.Equal(t, []string{"value1", "value2"}, values)
7375
time.Sleep(100 * time.Millisecond)
74-
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
76+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)))
7577
require.NoError(t, err)
7678
require.Equal(t, []string{"value1", "value2"}, values)
7779
}(i)
7880
go func(i int) {
7981
defer wg.Done()
80-
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
82+
values, err := cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)))
8183
require.NoError(t, err)
8284
require.Equal(t, []string{"value1", "value2"}, values)
8385
time.Sleep(100 * time.Millisecond)
84-
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, "test_parameter_"+strconv.Itoa(i))
86+
values, err = cachingJobsdb.GetDistinctParameterValues(ctx, parameterName("test_parameter_"+strconv.Itoa(i)))
8587
require.NoError(t, err)
8688
require.Equal(t, []string{"value1", "value2"}, values)
8789
}(i)

0 commit comments

Comments
 (0)