Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/en_US/api/restapi/rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,32 @@ GET /rules/tags/match
"keys":["key1","key2"]
}
```

## Bulk start / stop rules by tag

These APIs are used to start or stop multiple rules based on the assigned tags.

- bulk start rules

```shell
POST /rules/bulkstart

{
"tags": ["t1"]
}
```

- bulk stop rules

```shell
POST /rules/bulkstop

{
"tags": ["t1"]
}
```

Both APIs return a list of rules with the operation result for each rule, indicating whether the operation was successful or failed.
In case of failure, an error message is returned for the affected rule.

These APIs are not atomic. If an error occurs during execution, some rules may be started or stopped successfully while others may not.
2 changes: 2 additions & 0 deletions internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
r.HandleFunc("/rules/{name}/explain", explainRuleHandler).Methods(http.MethodGet)
r.HandleFunc("/rules/tags/match", rulesTagsHandler).Methods(http.MethodGet)
r.HandleFunc("/rules/{name}/tags", ruleTagHandler).Methods(http.MethodPut, http.MethodPatch, http.MethodDelete)
r.HandleFunc("/rules/bulkstart", rulesBulkStartHandler).Methods(http.MethodPost)
r.HandleFunc("/rules/bulkstop", rulesBulkStopHandler).Methods(http.MethodPost)
r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
r.HandleFunc("/configs", configurationUpdateHandler).Methods(http.MethodPatch)
Expand Down
70 changes: 70 additions & 0 deletions internal/server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (suite *RestTestSuite) SetupTest() {
r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
r.HandleFunc("/rules/{name}/reset_state", ruleStateHandler).Methods(http.MethodPut)
r.HandleFunc("/rules/{name}/explain", explainRuleHandler).Methods(http.MethodGet)
r.HandleFunc("/rules/bulkstart", rulesBulkStartHandler).Methods(http.MethodPost)
r.HandleFunc("/rules/bulkstop", rulesBulkStopHandler).Methods(http.MethodPost)
r.HandleFunc("/rules/{name}/trace/start", enableRuleTraceHandler).Methods(http.MethodPost)
r.HandleFunc("/rules/{name}/trace/stop", disableRuleTraceHandler).Methods(http.MethodPost)
r.HandleFunc("/rules/validate", validateRuleHandler).Methods(http.MethodPost)
Expand Down Expand Up @@ -1024,3 +1026,71 @@ func (suite *RestTestSuite) TestWaitStopRule() {
require.True(suite.T(), end.Sub(now) >= 300*time.Millisecond)
waitAllRuleStop()
}

func (suite *RestTestSuite) TestRulesBulkStartAndStopHandlers() {
timestamp := time.Now().UnixNano()
mockRules := []string{
fmt.Sprintf(`{"id":"r1_%d","sql":"SELECT * FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r2_%d","sql":"SELECT color FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r3_%d","sql":"SELECT size FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r4_%d","sql":"SELECT ts FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r5_%d","sql":"SELECT color,size FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r6_%d","sql":"SELECT color,ts FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r7_%d","sql":"SELECT size,ts FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r8_%d","sql":"SELECT color,size,ts FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r9_%d","sql":"SELECT * FROM demo WHERE size>0","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
fmt.Sprintf(`{"id":"r10_%d","sql":"SELECT color FROM demo","actions":[{"log":{}}], "tags": ["mock-tag"], "triggered": true}`, timestamp),
}

// create stream
buf := bytes.NewBuffer([]byte(`{"sql":"CREATE STREAM demo (color STRING, size BIGINT, ts BIGINT) WITH (DATASOURCE=\"/data1\", TYPE=\"websocket\", FORMAT=\"json\", KEY=\"ts\")"}`))
req, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf)
w := httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
require.Equal(suite.T(), http.StatusCreated, w.Code)

// create rules
for i, rule := range mockRules {
buf := bytes.NewBuffer([]byte(rule))
req, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf)
w := httptest.NewRecorder()
suite.r.ServeHTTP(w, req)

if w.Code != http.StatusCreated {
body, _ := io.ReadAll(w.Result().Body)
fmt.Printf("Error: %d (index %d): %s\n", i+1, i, string(body))
}

require.Equal(suite.T(), http.StatusCreated, w.Code)
}

// bulk start
resStart := []BulkOperationResponse{}

buf = bytes.NewBuffer([]byte(`{"tags": ["mock-tag"]}`))
req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules/bulkstart", buf)
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
body, _ := io.ReadAll(w.Result().Body)
fmt.Println("------- bulk start ------")
json.Unmarshal(body, &resStart)
for _, v := range resStart {
fmt.Println(v)
}
require.Equal(suite.T(), http.StatusOK, w.Code)

// bulk stop
resStop := []BulkOperationResponse{}

buf = bytes.NewBuffer([]byte(`{"tags": ["mock-tag"]}`))
req, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules/bulkstop", buf)
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
body, _ = io.ReadAll(w.Result().Body)
fmt.Println("------- bulk stop ------")
json.Unmarshal(body, &resStop)
for _, v := range resStop {
fmt.Println(v)
}
require.Equal(suite.T(), http.StatusOK, w.Code)
}
118 changes: 118 additions & 0 deletions internal/server/rule_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ package server

import (
"encoding/json"
"errors"
"net/http"

"github.com/gorilla/mux"

"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
)

type RuleTagRequest struct {
Expand Down Expand Up @@ -221,3 +225,117 @@ func rulesTagsHandler(w http.ResponseWriter, r *http.Request) {
resp := &RuleTagResponse{Rules: res}
jsonResponse(resp, w, logger)
}

type BulkOperationResponse struct {
RuleID string `json:"ruleId"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}

func rulesBulkStartHandler(w http.ResponseWriter, r *http.Request) {
tags := &RuleTagRequest{Tags: []string{}}
if err := json.NewDecoder(r.Body).Decode(tags); err != nil {
handleError(w, err, "decode body error", logger)
return
}

resultSet, err := findRules(tags)
if err != nil {
handleError(w, err, "bulk start failed", logger)
return
}

payload := make([]BulkOperationResponse, 0)
for _, ruleID := range resultSet {
err := registry.StartRule(ruleID)
if err != nil {
payload = append(payload, BulkOperationResponse{
RuleID: ruleID,
Success: false,
Error: err.Error(),
})
continue
}

payload = append(payload, BulkOperationResponse{
RuleID: ruleID,
Success: true,
})
}

w.WriteHeader(http.StatusOK)
jsonResponse(payload, w, logger)
}

func rulesBulkStopHandler(w http.ResponseWriter, r *http.Request) {
tags := &RuleTagRequest{Tags: []string{}}
if err := json.NewDecoder(r.Body).Decode(tags); err != nil {
handleError(w, err, "decode body error", logger)
return
}

resultSet, err := findRules(tags)
if err != nil {
handleError(w, err, "bulk stop failed", logger)
return
}

payload := make([]BulkOperationResponse, 0)
for _, ruleID := range resultSet {
err := registry.StopRule(ruleID)
if err != nil {
payload = append(payload, BulkOperationResponse{
RuleID: ruleID,
Success: false,
Error: err.Error(),
})
continue
}

payload = append(payload, BulkOperationResponse{
RuleID: ruleID,
Success: true,
})
}

w.WriteHeader(http.StatusOK)
jsonResponse(payload, w, logger)
}

func findRules(tags *RuleTagRequest) ([]string, error) {
rules, err := ruleProcessor.GetAllRulesJson()
if err != nil {
return nil, err
}

resultSet := make([]string, 0)
fetchedRules := make(map[string]*def.Rule)
for ruleID, ruleJson := range rules {
rule, err := ruleProcessor.GetRuleByJsonValidated(ruleID, ruleJson)
if err != nil {
continue
}

fetchedRules[ruleID] = rule
if rule.IsTagsMatch(tags.Tags) {
resultSet = append(resultSet, ruleID)
}
}

if len(resultSet) == 0 {
return nil, errors.New("no matching rules")
}

for _, ruleID := range resultSet {
if _, ok := registry.load(ruleID); !ok {
rr := fetchedRules[ruleID]

rs := rule.NewState(rr, func(id string, b bool) {
registry.updateTrigger(id, b)
})
registry.register(ruleID, rs)
}
}

return resultSet, nil
}
Loading