Skip to content

Commit ddbdc62

Browse files
committed
feat(server): start and stop rules via tags
Signed-off-by: Elia Renzoni <elia.renzoni03@gmail.com>
1 parent ef0cb4d commit ddbdc62

File tree

3 files changed

+136
-0
lines changed

3 files changed

+136
-0
lines changed

internal/server/rest.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
206206
r.HandleFunc("/rules/{name}/explain", explainRuleHandler).Methods(http.MethodGet)
207207
r.HandleFunc("/rules/tags/match", rulesTagsHandler).Methods(http.MethodGet)
208208
r.HandleFunc("/rules/{name}/tags", ruleTagHandler).Methods(http.MethodPut, http.MethodPatch, http.MethodDelete)
209+
r.HandleFunc("/rules/bulkstart", rulesBulkOperationsHandler).Methods(http.MethodPost)
210+
r.HandleFunc("/rules/bulkstop", rulesBulkOperationsHandler).Methods(http.MethodPost)
209211
r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
210212
r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
211213
r.HandleFunc("/configs", configurationUpdateHandler).Methods(http.MethodPatch)

internal/server/rest_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ func (suite *RestTestSuite) SetupTest() {
9191
r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
9292
r.HandleFunc("/rules/{name}/reset_state", ruleStateHandler).Methods(http.MethodPut)
9393
r.HandleFunc("/rules/{name}/explain", explainRuleHandler).Methods(http.MethodGet)
94+
r.HandleFunc("/rules/bulkstart", rulesBulkOperationsHandler).Methods(http.MethodPost)
95+
r.HandleFunc("/rules/bulkstop", rulesBulkOperationsHandler).Methods(http.MethodPost)
9496
r.HandleFunc("/rules/{name}/trace/start", enableRuleTraceHandler).Methods(http.MethodPost)
9597
r.HandleFunc("/rules/{name}/trace/stop", disableRuleTraceHandler).Methods(http.MethodPost)
9698
r.HandleFunc("/rules/validate", validateRuleHandler).Methods(http.MethodPost)
@@ -1024,3 +1026,64 @@ func (suite *RestTestSuite) TestWaitStopRule() {
10241026
require.True(suite.T(), end.Sub(now) >= 300*time.Millisecond)
10251027
waitAllRuleStop()
10261028
}
1029+
1030+
func (suite *RestTestSuite) TestRulesBulkStartAndStop() {
1031+
1032+
timestamp := time.Now().UnixNano()
1033+
mockRules := []string{
1034+
fmt.Sprintf(`{"id":"r1_%d","sql":"SELECT * FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1035+
fmt.Sprintf(`{"id":"r2_%d","sql":"SELECT color FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1036+
fmt.Sprintf(`{"id":"r3_%d","sql":"SELECT size FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1037+
fmt.Sprintf(`{"id":"r4_%d","sql":"SELECT ts FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1038+
fmt.Sprintf(`{"id":"r5_%d","sql":"SELECT color,size FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1039+
fmt.Sprintf(`{"id":"r6_%d","sql":"SELECT color,ts FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1040+
fmt.Sprintf(`{"id":"r7_%d","sql":"SELECT size,ts FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1041+
fmt.Sprintf(`{"id":"r8_%d","sql":"SELECT color,size,ts FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1042+
fmt.Sprintf(`{"id":"r9_%d","sql":"SELECT * FROM demo WHERE size>0","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1043+
fmt.Sprintf(`{"id":"r10_%d","sql":"SELECT color FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
1044+
}
1045+
1046+
// create stream
1047+
buf := bytes.NewBuffer([]byte(`{"sql":"CREATE STREAM demo (color STRING, size BIGINT, ts BIGINT) WITH (DATASOURCE=\"/data1\", TYPE=\"websocket\", FORMAT=\"json\", KEY=\"ts\")"}`))
1048+
req, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf)
1049+
w := httptest.NewRecorder()
1050+
suite.r.ServeHTTP(w, req)
1051+
require.Equal(suite.T(), http.StatusCreated, w.Code)
1052+
1053+
// create rules
1054+
for i, rule := range mockRules {
1055+
buf := bytes.NewBuffer([]byte(rule))
1056+
req, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf)
1057+
w := httptest.NewRecorder()
1058+
suite.r.ServeHTTP(w, req)
1059+
1060+
if w.Code != http.StatusCreated {
1061+
body, _ := io.ReadAll(w.Result().Body)
1062+
fmt.Printf("Error: %d (index %d): %s\n", i+1, i, string(body))
1063+
}
1064+
1065+
require.Equal(suite.T(), http.StatusCreated, w.Code)
1066+
}
1067+
1068+
// bulk start
1069+
buf = bytes.NewBuffer([]byte(`{"tags": ["mock-tag"]}`))
1070+
req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules/bulkstart", buf)
1071+
w = httptest.NewRecorder()
1072+
suite.r.ServeHTTP(w, req)
1073+
if w.Code != http.StatusOK {
1074+
body, _ := io.ReadAll(w.Result().Body)
1075+
fmt.Println(string(body))
1076+
}
1077+
require.Equal(suite.T(), http.StatusOK, w.Code)
1078+
1079+
// bulk stop
1080+
buf = bytes.NewBuffer([]byte(`{"tags": ["mock-tag"]}`))
1081+
req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules/bulkstop", buf)
1082+
w = httptest.NewRecorder()
1083+
suite.r.ServeHTTP(w, req)
1084+
if w.Code != http.StatusOK {
1085+
body, _ := io.ReadAll(w.Result().Body)
1086+
fmt.Println(string(body))
1087+
}
1088+
require.Equal(suite.T(), http.StatusOK, w.Code)
1089+
}

internal/server/rule_tag.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ package server
1616

1717
import (
1818
"encoding/json"
19+
"errors"
20+
"fmt"
1921
"net/http"
22+
"path"
2023

2124
"github.com/gorilla/mux"
25+
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
2226
)
2327

2428
type RuleTagRequest struct {
@@ -215,3 +219,70 @@ func rulesTagsHandler(w http.ResponseWriter, r *http.Request) {
215219
resp := &RuleTagResponse{Rules: res}
216220
jsonResponse(resp, w, logger)
217221
}
222+
223+
func rulesBulkOperationsHandler(w http.ResponseWriter, r *http.Request) {
224+
tags := &RuleTagRequest{Tags: []string{}}
225+
if err := json.NewDecoder(r.Body).Decode(tags); err != nil {
226+
handleError(w, err, "decode body error", logger)
227+
return
228+
}
229+
230+
rules, err := ruleProcessor.GetAllRulesJson()
231+
if err != nil {
232+
handleError(w, err, "", logger)
233+
return
234+
}
235+
236+
resultSet := make([]string, 0)
237+
for ruleID, ruleJson := range rules {
238+
rule, err := ruleProcessor.GetRuleByJsonValidated(ruleID, ruleJson)
239+
if err != nil {
240+
continue
241+
}
242+
243+
if rule.IsTagsMatch(tags.Tags) {
244+
resultSet = append(resultSet, ruleID)
245+
}
246+
}
247+
248+
if len(resultSet) == 0 {
249+
handleError(w, errors.New(""), "no matching rules", logger)
250+
return
251+
}
252+
253+
for _, ruleID := range resultSet {
254+
if _, ok := registry.load(ruleID); !ok {
255+
ruleJson := rules[ruleID]
256+
rr, err := ruleProcessor.GetRuleByJsonValidated(ruleID, ruleJson)
257+
if err != nil {
258+
continue
259+
}
260+
rs := rule.NewState(rr, func(id string, b bool) {
261+
registry.updateTrigger(id, b)
262+
})
263+
registry.register(ruleID, rs)
264+
}
265+
}
266+
267+
for index, ruleID := range resultSet {
268+
var err error
269+
if path.Base(r.URL.Path) == "bulkstart" {
270+
err = registry.StartRule(ruleID)
271+
} else {
272+
err = registry.StopRule(ruleID)
273+
}
274+
275+
if err != nil {
276+
handleError(
277+
w,
278+
err,
279+
fmt.Sprintf("operation failed, completed %d operations out of %d", index, len(resultSet)),
280+
logger,
281+
)
282+
return
283+
}
284+
}
285+
286+
w.WriteHeader(http.StatusOK)
287+
w.Write([]byte("operation completed"))
288+
}

0 commit comments

Comments
 (0)