Skip to content

Commit 49a67f3

Browse files
authored
feat: support rule tags (#3721)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 1b6725a commit 49a67f3

File tree

10 files changed

+384
-0
lines changed

10 files changed

+384
-0
lines changed

docs/en_US/api/restapi/rules.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,39 @@ GET http://localhost:9081/rules/usage/cpu
256256
```
257257

258258
Get the CPU time used by all rules in the past 30 seconds, in milliseconds.
259+
260+
## Add tags on rules
261+
262+
This API is used to add tags to rules
263+
264+
```shell
265+
PUT /rules/{id}/tags
266+
267+
{
268+
"tags": ["t1","t2"]
269+
}
270+
```
271+
272+
## Delete tags on rules
273+
274+
This API is used to delete tags from rules
275+
276+
```shell
277+
DELETE /rules/{id}/tags
278+
279+
{
280+
"keys":["key1","key2"]
281+
}
282+
```
283+
284+
## Query rules based on tags
285+
286+
This API is used to query rules containing a given tags and return a list of rule names that meet the conditions
287+
288+
```shell
289+
GET /rules/tags/match
290+
291+
{
292+
"keys":["key1","key2"]
293+
}
294+
```

docs/en_US/guide/rules/overview.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ The table below is a detailed explanation of the row component:
3939
| graph | required if sql is not defined | The json presentation of the rule's DAG(directed acyclic graph) |
4040
| options | true | A map of options |
4141
| triggerd | true | Whether to start the rule after creation. Default is true. |
42+
| tags | yes | string list, rule tags, used to filter rules |
4243

4344
## Rule Logic
4445

docs/zh_CN/api/restapi/rules.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,3 +227,39 @@ GET http://localhost:9081/rules/usage/cpu
227227
```
228228

229229
获取所有规则在过去 30s 内的所使用的 CPU 时间,单位为毫秒
230+
231+
## 添加标签
232+
233+
该 API 用于给规则添加标签
234+
235+
```shell
236+
PUT /rules/{id}/tags
237+
238+
{
239+
"tags": ["t1","t2"]
240+
}
241+
```
242+
243+
## 删除标签
244+
245+
该 API 用于给规则删除标签
246+
247+
```shell
248+
DELETE /rules/{id}/tags
249+
250+
{
251+
"tags": ["t1","t2"]
252+
}
253+
```
254+
255+
## 根据标签查询规则
256+
257+
该 API 用于根据给定标签查询包含该标签的规则们,返回符合条件的规则名列表
258+
259+
```shell
260+
GET /rules/tags/match
261+
262+
{
263+
"tags": ["t1","t2"]
264+
}
265+
```

docs/zh_CN/guide/rules/overview.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
| graph | 如果 sql 未定义,则该属性必须定义 | 规则有向无环图的 JSON 表示 |
3434
| options || 选项列表 |
3535
| triggerd || 布尔值,设置是否创建完规则后立刻运行,默认是 true |
36+
| tags || 字符串列表,规则标签,用来筛选规则 |
3637

3738
## 规则逻辑
3839

fvt/rulestate_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,3 +267,66 @@ func (s *RuleStateTestSuite) TestCreateStoppedRule() {
267267
s.Equal(http.StatusOK, res.StatusCode)
268268
})
269269
}
270+
271+
func (s *RuleStateTestSuite) TestRuleTags() {
272+
s.Run("clean up", func() {
273+
client.DeleteStream("simStream1")
274+
client.DeleteRule("ruleTags")
275+
})
276+
s.Run("create rule and attach labels", func() {
277+
conf := map[string]any{
278+
"interval": "10ms",
279+
}
280+
resp, err := client.CreateConf("sources/simulator/confKeys/ttt", conf)
281+
s.Require().NoError(err)
282+
s.Require().Equal(http.StatusOK, resp.StatusCode)
283+
284+
streamSql := `{"sql": "create stream simStream1() WITH (TYPE=\"simulator\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}`
285+
resp, err = client.CreateStream(streamSql)
286+
s.Require().NoError(err)
287+
s.T().Log(GetResponseText(resp))
288+
s.Require().Equal(http.StatusCreated, resp.StatusCode)
289+
ruleJson := `{
290+
"id": "ruleTags",
291+
"triggered": false,
292+
"sql": "SELECT * FROM simStream1",
293+
"actions": [
294+
{
295+
"nop":{}
296+
}
297+
],
298+
"options": {
299+
"sendError": false,
300+
"bufferLength": 2
301+
}
302+
}`
303+
resp, err = client.CreateRule(ruleJson)
304+
s.Require().NoError(err)
305+
s.T().Log(GetResponseText(resp))
306+
s.Require().Equal(http.StatusCreated, resp.StatusCode)
307+
308+
resp, err = client.AddRuleTags("ruleTags", []string{"t1", "t2"})
309+
s.Require().NoError(err)
310+
s.T().Log(GetResponseText(resp))
311+
s.Require().Equal(http.StatusOK, resp.StatusCode)
312+
313+
lists, err := client.GetRulesByTags([]string{"t1", "t2"})
314+
s.Require().NoError(err)
315+
s.Require().Equal(http.StatusOK, resp.StatusCode)
316+
s.Require().Equal([]string{"ruleTags"}, lists)
317+
318+
resp, err = client.RemoveRuleTags("ruleTags", []string{"t1"})
319+
s.Require().NoError(err)
320+
s.T().Log(GetResponseText(resp))
321+
s.Require().Equal(http.StatusOK, resp.StatusCode)
322+
323+
lists, err = client.GetRulesByTags([]string{"t1", "t2"})
324+
s.Require().NoError(err)
325+
s.Require().Equal(http.StatusOK, resp.StatusCode)
326+
s.Require().Equal([]string{}, lists)
327+
})
328+
s.Run("clean up", func() {
329+
client.DeleteStream("simStream1")
330+
client.DeleteRule("ruleTags")
331+
})
332+
}

fvt/sdk.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,47 @@ func (sdk *SDK) BatchRequest(reqs []*server.EachRequest) ([]*server.EachResponse
216216
return resps, nil
217217
}
218218

219+
func (sdk *SDK) AddRuleTags(name string, tags []string) (resp *http.Response, err error) {
220+
v, _ := json.Marshal(&server.RuleTagRequest{Tags: tags})
221+
url := sdk.baseUrl.JoinPath("rules", name, "tags").String()
222+
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(v))
223+
if err != nil {
224+
fmt.Println(err)
225+
return
226+
}
227+
return sdk.httpClient.Do(req)
228+
}
229+
230+
func (sdk *SDK) RemoveRuleTags(name string, keys []string) (resp *http.Response, err error) {
231+
v, _ := json.Marshal(&server.RuleTagRequest{Tags: keys})
232+
req, err := http.NewRequest(http.MethodDelete, sdk.baseUrl.JoinPath("rules", name, "tags").String(), bytes.NewBuffer(v))
233+
if err != nil {
234+
fmt.Println(err)
235+
return
236+
}
237+
return sdk.httpClient.Do(req)
238+
}
239+
240+
func (sdk *SDK) GetRulesByTags(tags []string) (list []string, err error) {
241+
v, _ := json.Marshal(&server.RuleTagRequest{Tags: tags})
242+
req, err := http.NewRequest(http.MethodGet, sdk.baseUrl.JoinPath("rules", "tags", "match").String(), bytes.NewBuffer(v))
243+
if err != nil {
244+
fmt.Println(err)
245+
return
246+
}
247+
resp, err := sdk.httpClient.Do(req)
248+
if err != nil {
249+
fmt.Println(err)
250+
return
251+
}
252+
rsp := server.RuleTagResponse{Rules: make([]string, 0)}
253+
if err := json.NewDecoder(resp.Body).Decode(&rsp); err != nil {
254+
fmt.Println(err)
255+
return nil, err
256+
}
257+
return rsp.Rules, nil
258+
}
259+
219260
func TryAssert(count int, interval time.Duration, tryFunc func() bool) bool {
220261
for count > 0 {
221262
time.Sleep(interval)

internal/pkg/def/rule.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,24 @@ type Rule struct {
103103
Graph *RuleGraph `json:"graph,omitempty" yaml:"graph,omitempty"`
104104
Actions []map[string]interface{} `json:"actions,omitempty" yaml:"actions,omitempty"`
105105
Options *RuleOption `json:"options,omitempty" yaml:"options,omitempty"`
106+
Tags []string `json:"tags,omitempty" yaml:"tags,omitempty"`
107+
}
108+
109+
func (r *Rule) IsTagsMatch(tags []string) bool {
110+
if len(r.Tags) < len(tags) {
111+
return false
112+
}
113+
mTags := make(map[string]struct{})
114+
for _, tag := range r.Tags {
115+
mTags[tag] = struct{}{}
116+
}
117+
for _, tag := range tags {
118+
_, ok := mTags[tag]
119+
if !ok {
120+
return false
121+
}
122+
}
123+
return true
106124
}
107125

108126
func (r *Rule) IsDurationRule() bool {

internal/server/rest.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
200200
r.HandleFunc("/rules/validate", validateRuleHandler).Methods(http.MethodPost)
201201
r.HandleFunc("/rules/{name}/reset_state", ruleStateHandler).Methods(http.MethodPut)
202202
r.HandleFunc("/rules/{name}/explain", explainRuleHandler).Methods(http.MethodGet)
203+
r.HandleFunc("/rules/tags/match", rulesTagsHandler).Methods(http.MethodGet)
204+
r.HandleFunc("/rules/{name}/tags", ruleTagHandler).Methods(http.MethodPut, http.MethodDelete)
203205
r.HandleFunc("/ruleset/export", exportHandler).Methods(http.MethodPost)
204206
r.HandleFunc("/ruleset/import", importHandler).Methods(http.MethodPost)
205207
r.HandleFunc("/configs", configurationUpdateHandler).Methods(http.MethodPatch)

internal/server/rule_manager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ type RuleRegistry struct {
4949

5050
//// registry and db level state change functions
5151

52+
func (rr *RuleRegistry) update(key string, ruleJson string, value *rule.State) error {
53+
rr.Lock()
54+
defer rr.Unlock()
55+
rr.internal[key] = value
56+
return ruleProcessor.ExecUpsert(key, ruleJson)
57+
}
58+
5259
// load the entry of a rule by id. It is used to get the current rule state
5360
// or send command to a running rule
5461
func (rr *RuleRegistry) load(key string) (value *rule.State, ok bool) {

0 commit comments

Comments
 (0)