Skip to content

Commit 8957d73

Browse files
committed
fixup! chore: filter workspaces at router and batchrouter - like destinationIDs
1 parent f54545c commit 8957d73

File tree

5 files changed

+48
-54
lines changed

5 files changed

+48
-54
lines changed

integration_test/multi_tenant_test/testdata/mtGatewayTest01.json

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
{
1818
"id": "xxxyyyzzP9kQfzOoKd1tuxchYAG",
1919
"name": "Dev WebHook Integration Test 1",
20+
"workspaceId": "{{.workspaceId}}",
2021
"enabled": true,
2122
"isProcessorEnabled": true,
2223
"config": {

jobsdb/distinct_values_cache.go

+25-25
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,29 @@ type distinctValuesCache struct {
2626
// already cached, it returns the cached values. If not, it loads the values from the given load
2727
// function and caches them. The last dataset is never cached, so it is always loaded from the
2828
// load function. The load function is called with the missing datasets and the last dataset.
29-
func (dpc *distinctValuesCache) GetDistinctValues(key string, datasets []string, load func(datasets []string) (map[string][]string, error)) ([]string, error) {
29+
func (dvc *distinctValuesCache) GetDistinctValues(key string, datasets []string, load func(datasets []string) (map[string][]string, error)) ([]string, error) {
3030
// First check if we are missing any datasets from the cache.
3131
// If we are, we need to load them along with the last dataset
3232
// The last dataset is never cached, so we need to load it every time
33-
dpc.cacheMu.RLock()
34-
missing := dpc.missing(key, datasets[:len(datasets)-1])
35-
dpc.cacheMu.RUnlock()
33+
dvc.cacheMu.RLock()
34+
missing := dvc.missing(key, datasets[:len(datasets)-1])
35+
dvc.cacheMu.RUnlock()
3636

3737
// If we are missing any datasets, we need to lock the key, so that
3838
// we don't load the same datasets multiple times for the same key.
3939
// This lock needs to be retained until the datasets are loaded into the cache.
4040
if len(missing) > 0 {
41-
dpc.klock.Lock(key)
41+
dvc.klock.Lock(key)
4242
// Check again if we are missing any datasets, to deal with race conditions
43-
dpc.cacheMu.Lock()
44-
missing = dpc.missing(key, datasets[:len(datasets)-1])
45-
if _, ok := dpc.cache[key]; !ok {
46-
dpc.cache[key] = make(map[string][]string)
43+
dvc.cacheMu.Lock()
44+
missing = dvc.missing(key, datasets[:len(datasets)-1])
45+
if _, ok := dvc.cache[key]; !ok {
46+
dvc.cache[key] = make(map[string][]string)
4747
}
48-
dpc.cacheMu.Unlock()
48+
dvc.cacheMu.Unlock()
4949
if len(missing) == 0 {
5050
// If we are not missing any datasets, we need to unlock the key
51-
dpc.klock.Unlock(key)
51+
dvc.klock.Unlock(key)
5252
}
5353
}
5454

@@ -59,25 +59,25 @@ func (dpc *distinctValuesCache) GetDistinctValues(key string, datasets []string,
5959
}
6060
// if we were missing any datasets, we need to add them to the cache and unlock the key
6161
if len(missing) > 0 {
62-
dpc.cacheMu.Lock()
62+
dvc.cacheMu.Lock()
6363
for _, ds := range missing {
64-
dpc.cache[key][ds] = results[ds]
64+
dvc.cache[key][ds] = results[ds]
6565
}
66-
dpc.cacheMu.Unlock()
67-
dpc.klock.Unlock(key)
66+
dvc.cacheMu.Unlock()
67+
dvc.klock.Unlock(key)
6868
}
6969

7070
// Now we need to get values for all the datasets requested so that we can calculate
7171
// the distinct values.
7272
// We already have some values in the results map (last dataset & missing), so we only need to fill in
7373
// the rest of the datasets from the cache.
74-
dpc.cacheMu.RLock()
74+
dvc.cacheMu.RLock()
7575
for _, ds := range datasets {
7676
if _, ok := results[ds]; !ok {
77-
results[ds] = dpc.cache[key][ds]
77+
results[ds] = dvc.cache[key][ds]
7878
}
7979
}
80-
dpc.cacheMu.RUnlock()
80+
dvc.cacheMu.RUnlock()
8181

8282
// Calculating distinct values is easy, we just need to
8383
// iterate over all the datasets and add them to a map
@@ -92,17 +92,17 @@ func (dpc *distinctValuesCache) GetDistinctValues(key string, datasets []string,
9292
}
9393

9494
// RemoveDataset removes the dataset from the cache for all keys.
95-
func (dpc *distinctValuesCache) RemoveDataset(dataset string) {
96-
dpc.cacheMu.Lock()
97-
defer dpc.cacheMu.Unlock()
98-
for key := range dpc.cache {
99-
delete(dpc.cache[key], dataset)
95+
func (dvc *distinctValuesCache) RemoveDataset(dataset string) {
96+
dvc.cacheMu.Lock()
97+
defer dvc.cacheMu.Unlock()
98+
for key := range dvc.cache {
99+
delete(dvc.cache[key], dataset)
100100
}
101101
}
102102

103-
func (dpc *distinctValuesCache) missing(key string, datasets []string) []string {
103+
func (dvc *distinctValuesCache) missing(key string, datasets []string) []string {
104104
var missing []string
105-
dscache, ok := dpc.cache[key]
105+
dscache, ok := dvc.cache[key]
106106
if !ok {
107107
return datasets
108108
}

jobsdb/jobsdb.go

+11-26
Original file line numberDiff line numberDiff line change
@@ -1919,7 +1919,14 @@ FROM pending GROUP BY workspace_id, custom_val`
19191919
return g.Wait()
19201920
}
19211921

1922-
var parametersWithoutCustomval string = `SELECT '%[2]s', * FROM (
1922+
func (jd *Handle) getDistinctValuesPerDataset(
1923+
ctx context.Context,
1924+
dsList []string,
1925+
param ParameterName,
1926+
) (map[string][]string, error) {
1927+
var queries []string
1928+
for _, ds := range dsList {
1929+
queries = append(queries, fmt.Sprintf(`SELECT '%[2]s', * FROM (
19231930
WITH RECURSIVE t AS (
19241931
(SELECT %[1]s as parameter FROM %[2]q ORDER BY %[1]s LIMIT 1)
19251932
UNION ALL
@@ -1931,32 +1938,10 @@ var parametersWithoutCustomval string = `SELECT '%[2]s', * FROM (
19311938
)s
19321939
)
19331940
)
1934-
SELECT * FROM t) a`
1935-
1936-
// var parametersWithCustomVal string = `SELECT '%[2]s', * FROM (
1937-
// WITH RECURSIVE t AS (
1938-
// (SELECT %[1]s as parameter FROM %[2]q WHERE custom_val = '%[3]s' ORDER BY %[1]s LIMIT 1)
1939-
// UNION ALL
1940-
// (
1941-
// SELECT s.* FROM t, LATERAL(
1942-
// SELECT %[1]s as parameter FROM %[2]q f
1943-
// WHERE custom_val = '%[3]s' AND f.%[1]s > t.parameter
1944-
// ORDER BY %[1]s LIMIT 1
1945-
// )s
1946-
// )
1947-
// )
1948-
// SELECT * FROM t) a`
1949-
1950-
func (jd *Handle) getDistinctValuesPerDataset(
1951-
dsList []string,
1952-
param ParameterName,
1953-
) (map[string][]string, error) {
1954-
var queries []string
1955-
for _, ds := range dsList {
1956-
queries = append(queries, fmt.Sprintf(parametersWithoutCustomval, param.string(), ds))
1941+
SELECT * FROM t) a`, param.string(), ds))
19571942
}
19581943
query := strings.Join(queries, " UNION ")
1959-
rows, err := jd.dbHandle.Query(query)
1944+
rows, err := jd.dbHandle.QueryContext(ctx, query)
19601945
if err != nil {
19611946
return nil, fmt.Errorf("couldn't query distinct parameter-%s: %w", param.string(), err)
19621947
}
@@ -1993,7 +1978,7 @@ func (jd *Handle) GetDistinctParameterValues(ctx context.Context, parameter Para
19931978
parameter.string(),
19941979
lo.Map(dsList, func(ds dataSetT, _ int) string { return ds.JobTable }),
19951980
func(datasets []string) (map[string][]string, error) {
1996-
return jd.getDistinctValuesPerDataset(datasets, parameter)
1981+
return jd.getDistinctValuesPerDataset(ctx, datasets, parameter)
19971982
},
19981983
)
19991984
}

processor/processor_isolation_test.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ func TestProcessorIsolation(t *testing.T) {
4848
pipelinesPerPartition = 3
4949
)
5050

51-
for _, isolationMode := range []isolation.Mode{isolation.ModeNone, isolation.ModeWorkspace, isolation.ModeSource} {
51+
for _, isolationMode := range []isolation.Mode{
52+
isolation.ModeNone,
53+
isolation.ModeWorkspace,
54+
isolation.ModeSource,
55+
} {
5256
t.Run(fmt.Sprintf("%s isolation", isolationMode), func(t *testing.T) {
5357
t.Run("1 worker", func(t *testing.T) {
5458
ProcIsolationScenario(t, NewProcIsolationScenarioSpec(isolationMode, workspaces, jobsPerWorkspace, 1))
@@ -212,6 +216,10 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa
212216
"workspaces": spec.workspaces,
213217
"destinationId": destinationID,
214218
}
219+
var destinationIDs []string
220+
for i := 0; i < len(spec.workspaces); i++ {
221+
destinationIDs = append(destinationIDs, destinationID+fmt.Sprintf("-%d", i))
222+
}
215223
configJsonPath := workspaceConfig.CreateTempFile(t, "testdata/procIsolationTestTemplate.json.tpl", templateCtx)
216224
mockCBE := m.newMockConfigBackend(t, configJsonPath)
217225
config.Set("CONFIG_BACKEND_URL", mockCBE.URL)
@@ -237,7 +245,7 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa
237245
config.Set("JobsDB.backup.enabled", false)
238246
config.Set("JobsDB.migrateDSLoopSleepDuration", "60m")
239247
config.Set("JobsDB.payloadColumnType", "text")
240-
config.Set("Router.toAbortDestinationIDs", destinationID)
248+
config.Set("Router.toAbortDestinationIDs", destinationIDs)
241249
config.Set("archival.Enabled", false)
242250
config.Set("enableStats", false)
243251

processor/testdata/procIsolationTestTemplate.json.tpl

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
"webhookMethod": "POST"
2525
},
2626
"secretConfig": {},
27-
"id": "{{$.destinationId}}",
27+
"id": "{{$.destinationId}}-{{$index}}",
2828
"name": "Des WebHook Integration Test 1",
2929
"enabled": true,
3030
"workspaceId": "{{$workspace}}",

0 commit comments

Comments
 (0)