Skip to content

Commit dbc08f1

Browse files
authored
Set the "missing" key in ES sorter (temporalio#8964)
## What changed? Explicitly set the `"missing"` key in ES sorter. This is to address pagination bug in OpenSearch 2.8.0+ (see opensearch-project/OpenSearch#8212 (comment)). Run CI tests with latest version of OpenSearch 2 and OpenSearch 3. ## Why? Full support of OpenSearch as visibility store. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks
1 parent 4ed59cd commit dbc08f1

7 files changed

Lines changed: 52 additions & 23 deletions

File tree

.github/workflows/run-tests.yml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ jobs:
427427
- cass_es
428428
- cass_es8
429429
- cass_os2
430+
- cass_os3
430431
- sqlite
431432
- mysql8
432433
- postgres12
@@ -452,6 +453,10 @@ jobs:
452453
persistence_type: nosql
453454
persistence_driver: cassandra
454455
containers: [cassandra, opensearch2]
456+
- name: cass_os3
457+
persistence_type: nosql
458+
persistence_driver: cassandra
459+
containers: [cassandra, opensearch3]
455460
- name: sqlite
456461
persistence_type: sql
457462
persistence_driver: sqlite
@@ -604,7 +609,7 @@ jobs:
604609
strategy:
605610
fail-fast: false
606611
matrix:
607-
name: [cass_es, cass_es8, cass_os2, mysql8, postgres12, postgres12_pgx]
612+
name: [cass_es, cass_es8, cass_os2, cass_os3, mysql8, postgres12, postgres12_pgx]
608613
include:
609614
- name: cass_es
610615
persistence_type: nosql
@@ -624,6 +629,10 @@ jobs:
624629
persistence_type: nosql
625630
persistence_driver: cassandra
626631
containers: [cassandra, opensearch2]
632+
- name: cass_os3
633+
persistence_type: nosql
634+
persistence_driver: cassandra
635+
containers: [cassandra, opensearch3]
627636
- name: mysql8
628637
persistence_type: sql
629638
persistence_driver: mysql8
@@ -766,6 +775,7 @@ jobs:
766775
- cass_es
767776
- cass_es8
768777
- cass_os2
778+
- cass_os3
769779
- mysql8
770780
- postgres12
771781
- postgres12_pgx
@@ -788,6 +798,10 @@ jobs:
788798
persistence_type: nosql
789799
persistence_driver: cassandra
790800
containers: [cassandra, opensearch2]
801+
- name: cass_os3
802+
persistence_type: nosql
803+
persistence_driver: cassandra
804+
containers: [cassandra, opensearch3]
791805
- name: mysql8
792806
persistence_type: sql
793807
persistence_driver: mysql8

common/persistence/visibility/store/elasticsearch/converter_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ var supportedWhereOrderCases = map[string]struct {
7878
}{
7979
"id > 1 order by id asc, order_id desc": {
8080
query: `{"bool":{"filter":{"range":{"id":{"from":1,"include_lower":false,"include_upper":true,"to":null}}}}}`,
81-
sorter: `[{"id":{"order":"asc"}},{"order_id":{"order":"desc"}}]`,
81+
sorter: `[{"id":{"missing":"_last","order":"asc"}},{"order_id":{"missing":"_last","order":"desc"}}]`,
8282
},
8383
"id is null order by `order`.abc": {
8484
query: `{"bool":{"must_not":{"exists":{"field":"id"}}}}`,
85-
sorter: `[{"order.abc":{"order":"asc"}}]`,
85+
sorter: `[{"order.abc":{"missing":"_last","order":"asc"}}]`,
8686
},
8787
"id beTweeN 1 AnD 3 ORdeR BY random_id DESC": {
8888
query: `{"bool":{"filter":{"range":{"id":{"from":1,"include_lower":true,"include_upper":true,"to":3}}}}}`,
89-
sorter: `[{"random_id":{"order":"desc"}}]`,
89+
sorter: `[{"random_id":{"missing":"_last","order":"desc"}}]`,
9090
},
9191
}
9292

@@ -153,7 +153,7 @@ func TestEmptySelectWhere(t *testing.T) {
153153
assert.Len(t, queryParams.Sorter, 1)
154154
actualSorterMap, _ := queryParams.Sorter[0].Source()
155155
actualSorterJson, _ := json.Marshal([]interface{}{actualSorterMap})
156-
assert.Equal(t, `[{"Id":{"order":"desc"}}]`, string(actualSorterJson))
156+
assert.JSONEq(t, `[{"Id":{"missing":"_last","order":"desc"}}]`, string(actualSorterJson))
157157
}
158158

159159
func TestSupportedSelectWhereOrder(t *testing.T) {

common/persistence/visibility/store/elasticsearch/visibility_store.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ var (
106106
}
107107
if item.missing_first {
108108
fs.Missing("_first")
109+
} else {
110+
fs.Missing("_last")
109111
}
110112
ret = append(ret, fs)
111113
}
@@ -749,7 +751,7 @@ func (s *VisibilityStore) convertQuery(
749751
sqlparser.String(orderByExpr),
750752
)
751753
}
752-
fieldSort := elastic.NewFieldSort(colName.FieldName)
754+
fieldSort := elastic.NewFieldSort(colName.FieldName).Missing("_last")
753755
if orderByExpr.Direction == sqlparser.DescScr {
754756
fieldSort = fieldSort.Desc()
755757
}

common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
261261
SearchAfter: nil,
262262
PageSize: testPageSize,
263263
Sorter: []elastic.Sorter{
264-
elastic.NewFieldSort(sadefs.WorkflowID).Asc(),
264+
elastic.NewFieldSort(sadefs.WorkflowID).Asc().Missing("_last"),
265265
elastic.NewFieldSort(sadefs.RunID).Desc(),
266266
},
267267
}, p)
@@ -355,7 +355,7 @@ func (s *ESVisibilitySuite) Test_convertQueryLegacy() {
355355
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
356356
s.NoError(err)
357357
s.Equal(`{"bool":{"filter":[{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},{"bool":{"filter":{"term":{"WorkflowId":"wid"}}}}],"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
358-
s.Equal(`[{"StartTime":{"order":"desc"}}]`, s.sorterToJSON(queryParams.Sorter))
358+
s.JSONEq(`[{"StartTime":{"missing":"_last","order":"desc"}}]`, s.sorterToJSON(queryParams.Sorter))
359359

360360
query = `WorkflowId = 'wid' and CloseTime is null`
361361
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
@@ -373,7 +373,7 @@ func (s *ESVisibilitySuite) Test_convertQueryLegacy() {
373373
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
374374
s.NoError(err)
375375
s.Equal(`{"bool":{"filter":[{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},{"bool":{"must_not":{"exists":{"field":"CloseTime"}}}}],"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
376-
s.Equal(`[{"CloseTime":{"order":"desc"}}]`, s.sorterToJSON(queryParams.Sorter))
376+
s.JSONEq(`[{"CloseTime":{"missing":"_last","order":"desc"}}]`, s.sorterToJSON(queryParams.Sorter))
377377

378378
query = `StartTime = "2018-06-07T15:04:05.123456789-08:00"`
379379
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
@@ -409,13 +409,13 @@ func (s *ESVisibilitySuite) Test_convertQueryLegacy() {
409409
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
410410
s.NoError(err)
411411
s.Equal(`{"bool":{"filter":{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
412-
s.Equal(`[{"ExecutionTime":{"order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
412+
s.JSONEq(`[{"ExecutionTime":{"missing":"_last","order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
413413

414414
query = `order by StartTime desc, CloseTime asc`
415415
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
416416
s.NoError(err)
417417
s.Equal(`{"bool":{"filter":{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
418-
s.Equal(`[{"StartTime":{"order":"desc"}},{"CloseTime":{"order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
418+
s.JSONEq(`[{"StartTime":{"missing":"_last","order":"desc"}},{"CloseTime":{"missing":"_last","order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
419419

420420
query = `order by CustomTextField desc`
421421
_, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
@@ -427,7 +427,7 @@ func (s *ESVisibilitySuite) Test_convertQueryLegacy() {
427427
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
428428
s.NoError(err)
429429
s.Equal(`{"bool":{"filter":{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
430-
s.Equal(`[{"CustomIntField":{"order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
430+
s.JSONEq(`[{"CustomIntField":{"missing":"_last","order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
431431

432432
query = `ExecutionTime < "unable to parse"`
433433
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
@@ -479,13 +479,13 @@ func (s *ESVisibilitySuite) Test_convertQueryLegacy_Mapper() {
479479
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
480480
s.NoError(err)
481481
s.Equal(`{"bool":{"filter":{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
482-
s.Equal(`[{"ExecutionTime":{"order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
482+
s.JSONEq(`[{"ExecutionTime":{"missing":"_last","order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
483483

484484
query = `order by AliasForCustomKeywordField asc`
485485
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
486486
s.NoError(err)
487487
s.Equal(`{"bool":{"filter":{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
488-
s.Equal(`[{"CustomKeywordField":{"order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
488+
s.JSONEq(`[{"CustomKeywordField":{"missing":"_last","order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
489489

490490
query = `order by CustomKeywordField asc`
491491
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
@@ -519,7 +519,7 @@ func (s *ESVisibilitySuite) Test_convertQueryLegacy_Mapper_Error() {
519519
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, query, nil, chasm.UnspecifiedArchetypeID)
520520
s.NoError(err)
521521
s.Equal(`{"bool":{"filter":{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
522-
s.Equal(`[{"ExecutionTime":{"order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
522+
s.JSONEq(`[{"ExecutionTime":{"missing":"_last","order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
523523

524524
s.visibilityStore.searchAttributesMapperProvider = nil
525525
}
@@ -571,7 +571,7 @@ func (s *ESVisibilitySuite) Test_convertQuery() {
571571
elastic.NewTermQuery(sadefs.WorkflowID, "wid"),
572572
),
573573
),
574-
Sorter: []elastic.Sorter{elastic.NewFieldSort(sadefs.WorkflowID)},
574+
Sorter: []elastic.Sorter{elastic.NewFieldSort(sadefs.WorkflowID).Missing("_last")},
575575
GroupBy: []string{},
576576
},
577577
},
@@ -609,7 +609,7 @@ func (s *ESVisibilitySuite) Test_convertQuery() {
609609
MinimumNumberShouldMatch(1),
610610
),
611611
),
612-
Sorter: []elastic.Sorter{elastic.NewFieldSort("CustomKeywordField")},
612+
Sorter: []elastic.Sorter{elastic.NewFieldSort("CustomKeywordField").Missing("_last")},
613613
GroupBy: []string{},
614614
},
615615
},
@@ -1957,7 +1957,7 @@ func (s *ESVisibilitySuite) Test_convertQueryLegacy_ChasmMapper() {
19571957
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, queryStr, chasmMapper, chasm.UnspecifiedArchetypeID)
19581958
s.NoError(err)
19591959
s.JSONEq(`{"bool":{"filter":[{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},{"bool":{"filter":{"match":{"TemporalBool01":{"query":true}}}}}],"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
1960-
s.JSONEq(`[{"TemporalKeyword01":{"order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
1960+
s.JSONEq(`[{"TemporalKeyword01":{"missing":"_last","order":"asc"}}]`, s.sorterToJSON(queryParams.Sorter))
19611961

19621962
queryStr = `ChasmStatus = 'active' and WorkflowId = 'wid'`
19631963
queryParams, err = s.visibilityStore.convertQueryLegacy(testNamespace, testNamespaceID, queryStr, chasmMapper, chasm.UnspecifiedArchetypeID)
@@ -2051,7 +2051,7 @@ func (s *ESVisibilitySuite) Test_convertQuery_ChasmMapper() {
20512051
elastic.NewTermQuery("TemporalBool01", true),
20522052
),
20532053
),
2054-
Sorter: []elastic.Sorter{elastic.NewFieldSort("TemporalKeyword01")},
2054+
Sorter: []elastic.Sorter{elastic.NewFieldSort("TemporalKeyword01").Missing("_last")},
20552055
GroupBy: []string{},
20562056
},
20572057
},
@@ -2166,7 +2166,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2_ChasmMapper() {
21662166
SearchAfter: nil,
21672167
PageSize: testPageSize,
21682168
Sorter: []elastic.Sorter{
2169-
elastic.NewFieldSort("TemporalKeyword01").Asc(),
2169+
elastic.NewFieldSort("TemporalKeyword01").Asc().Missing("_last"),
21702170
elastic.NewFieldSort(sadefs.RunID).Desc(),
21712171
},
21722172
}, p)

common/persistence/visibility/store/query/converter_legacy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func (c *ConverterLegacy) convertSelect(sel *sqlparser.Select) (*QueryParamsLega
244244
if err != nil {
245245
return nil, wrapConverterError("unable to convert 'order by' column name", err)
246246
}
247-
fieldSort := elastic.NewFieldSort(colName)
247+
fieldSort := elastic.NewFieldSort(colName).Missing("_last")
248248
if orderByExpr.Direction == sqlparser.DescScr {
249249
fieldSort = fieldSort.Desc()
250250
}

common/persistence/visibility/store/query/interceptors_legacy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestNameInterceptor(t *testing.T) {
6060
}
6161
//nolint:staticcheck
6262
actualSorterJson, _ := json.Marshal(actualSorterMaps)
63-
require.JSONEq(t, `[{"StartTime1":{"order":"asc"}}]`, string(actualSorterJson))
63+
require.JSONEq(t, `[{"StartTime1":{"missing":"_last","order":"asc"}}]`, string(actualSorterJson))
6464

6565
_, err = c.ConvertWhereOrderBy("error='Running' order by StartTime")
6666
require.Error(t, err)

develop/github/docker-compose.yml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,20 @@ services:
5353
- ES_JAVA_OPTS=-Xms1g -Xmx1g
5454

5555
opensearch2:
56-
image: opensearchproject/opensearch:2.7.0
56+
image: opensearchproject/opensearch:2
57+
ports:
58+
- "9200:9200"
59+
environment:
60+
- cluster.routing.allocation.disk.threshold_enabled=true
61+
- cluster.routing.allocation.disk.watermark.low=512mb
62+
- cluster.routing.allocation.disk.watermark.high=256mb
63+
- cluster.routing.allocation.disk.watermark.flood_stage=128mb
64+
- discovery.type=single-node
65+
- DISABLE_SECURITY_PLUGIN=true
66+
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
67+
68+
opensearch3:
69+
image: opensearchproject/opensearch:3
5770
ports:
5871
- "9200:9200"
5972
environment:

0 commit comments

Comments
 (0)