Skip to content

Commit 84982d4

Browse files
authored
feat(trial): replace websocket with sse (#3991)
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent 85d4f19 commit 84982d4

File tree

12 files changed

+634
-52
lines changed

12 files changed

+634
-52
lines changed

docs/en_US/api/restapi/ruletest.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,16 @@ and it will automatically stop and clear after the time is exceeded. The general
1010
follows:
1111

1212
1. [Create a test rule](#create-a-test-rule), get the id and port of the test rule.
13-
2. Use the id and port of the test rule to connect and listen to the WebSocket service. Its service address
13+
2. Use the id and port of the test rule to connect and listen to the SSE service. Its service address
1414
is `http://locahost:10081/test/myid` where `10081` is the port value returned in step 1, and myid is the id of the
15-
test rule.
15+
test rule. Server-Sent Events (SSE) allows the server to push updates to the client. Connect via HTTP GET with header `Accept: text/event-stream`.
1616
3. [Start the test rule](#start-the-test-rule), wait for the test rule to run. The rule running result will be returned
17-
through the WebSocket service.
18-
4. After the rule trial run ends, [delete the test rule](#delete-the-test-rule), and close the WebSocket service.
17+
through the SSE service.
18+
4. After the rule trial run ends, [delete the test rule](#delete-the-test-rule), and close the SSE service.
1919

2020
::: tip
2121

22-
The WebSocket service defaults to port 10081, which can be modified by the `httpServerPort` field in the `kuiper.yaml`
22+
The SSE service defaults to port 10081, which can be modified by the `httpServerPort` field in the `kuiper.yaml`
2323
configuration file. Before using the test rule, please make sure that this port is accessible.
2424

2525
:::
@@ -93,7 +93,7 @@ If created successfully, the return example is as follows:
9393
}
9494
```
9595

96-
After the rule is created successfully, the websocket endpoint starts. Users can listen to the websocket
96+
After the rule is created successfully, the SSE endpoint starts. Users can listen to the SSE
9797
address `http://locahost:10081/test/uuid` to get the result output. Among them, the port and id are the above return
9898
values.
9999

@@ -111,12 +111,12 @@ If creation fails, the status code is 400, return error information, an example
111111
POST /ruletest/{id}/start
112112
```
113113

114-
Start the trial run rule, WebSocket will be able to receive the data output after the rule runs.
114+
Start the trial run rule, SSE will be able to receive the data output after the rule runs.
115115

116116
## Delete the Test Rule
117117

118118
```shell
119119
DELETE /ruletest/{id}
120120
```
121121

122-
Delete the trial run rule, WebSocket will stop the service.
122+
Delete the trial run rule, SSE will stop the service.

docs/zh_CN/api/restapi/ruletest.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
10 分钟),超过时间后会自动停止并清除。使用规则试运行的一般步骤如下:
88

99
1. [创建测试规则](#创建测试规则),获得测试规则的 id 和端口。
10-
2. 使用测试规则的 id 和端口,连接并监听 WebSocket 服务。其服务地址为 `http://locahost:10081/test/myid` 其中 `10081` 为步骤1
11-
返回的端口值,myid 为测试规则的 id。
12-
3. [启动测试规则](#启动测试规则),等待测试规则运行。规则运行结果将通过 WebSocket 服务返回。
13-
4. 规则试运行结束后,[删除测试规则](#删除测试规则),关闭 WebSocket 服务。
10+
2. 使用测试规则的 id 和端口,连接并监听 SSE 服务。其服务地址为 `http://locahost:10081/test/myid` 其中 `10081` 为步骤1
11+
返回的端口值,myid 为测试规则的 id。Server-Sent Events (SSE) 允许服务器向客户端推送更新。请使用带有请求头 `Accept: text/event-stream` 的 HTTP GET 请求进行连接。
12+
3. [启动测试规则](#启动测试规则),等待测试规则运行。规则运行结果将通过 SSE 服务返回。
13+
4. 规则试运行结束后,[删除测试规则](#删除测试规则),关闭 SSE 服务。
1414

1515
::: tip
1616

17-
WebSocket 服务默认采用 10081 端口,可通过配置文件 `kuiper.yaml` 中的 `httpServerPort` 字段修改。使用测试规则前,请确保该端口可访问。
17+
SSE 服务默认采用 10081 端口,可通过配置文件 `kuiper.yaml` 中的 `httpServerPort` 字段修改。使用测试规则前,请确保该端口可访问。
1818

1919
:::
2020

@@ -84,7 +84,7 @@ POST /ruletest
8484
}
8585
```
8686

87-
规则创建成功后,websocket endpoint 启动。用户可通过监听 websocket 地址 `http://locahost:10081/test/uuid` 获取结果输出。其中,端口和
87+
规则创建成功后,SSE endpoint 启动。用户可通过监听 SSE 地址 `http://locahost:10081/test/uuid` 获取结果输出。其中,端口和
8888
id 为上述返回值。
8989

9090
若创建失败,状态码为 400,返回错误信息,示例如下:
@@ -101,12 +101,12 @@ id 为上述返回值。
101101
POST /ruletest/{id}/start
102102
```
103103

104-
启动试运行规则,WebSocket 将可接收到规则运行后输出的数据。
104+
启动试运行规则,SSE 将可接收到规则运行后输出的数据。
105105

106106
## 删除测试规则
107107

108108
```shell
109109
DELETE /ruletest/{id}
110110
```
111111

112-
删除试运行规则,WebSocket 将停止服务。
112+
删除试运行规则,SSE 将停止服务。

internal/binder/io/builtin.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/lf-edge/ekuiper/v2/internal/io/nexmark"
2828
"github.com/lf-edge/ekuiper/v2/internal/io/simulator"
2929
"github.com/lf-edge/ekuiper/v2/internal/io/sink"
30+
"github.com/lf-edge/ekuiper/v2/internal/io/sse"
3031
"github.com/lf-edge/ekuiper/v2/internal/io/websocket"
3132
plugin2 "github.com/lf-edge/ekuiper/v2/internal/plugin"
3233
"github.com/lf-edge/ekuiper/v2/pkg/modules"
@@ -53,6 +54,7 @@ func init() {
5354
modules.RegisterSink("neuron", neuron.GetSink)
5455
modules.RegisterSink("file", file.GetSink)
5556
modules.RegisterSink("websocket", func() api.Sink { return websocket.GetSink() })
57+
modules.RegisterSink("sse", func() api.Sink { return sse.GetSink() })
5658

5759
modules.RegisterLookupSource("memory", memory.GetLookupSource)
5860
modules.RegisterLookupSource("httppull", http.GetLookUpSource)
@@ -62,6 +64,7 @@ func init() {
6264
modules.RegisterConnection("nng", nng.CreateConnection)
6365
modules.RegisterConnection("httppush", httpserver.CreateConnection)
6466
modules.RegisterConnection("websocket", httpserver.CreateWebsocketConnection)
67+
modules.RegisterConnection("sse", httpserver.CreateSSEConnection)
6568
}
6669

6770
type Manager struct{}

internal/io/http/httpserver/data_server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type GlobalServerManager struct {
4343
routes map[string]http.HandlerFunc
4444
upgrader websocket.Upgrader
4545
websocketEndpoint map[string]*websocketEndpointContext
46+
sseEndpoint map[string]*sseEndpointContext
4647
}
4748

4849
var (
@@ -77,6 +78,7 @@ func InitGlobalServerManager(ip string, port int, tlsConf *model.TlsConf) {
7778
router: r,
7879
routes: map[string]http.HandlerFunc{},
7980
upgrader: upgrader,
81+
sseEndpoint: map[string]*sseEndpointContext{},
8082
}
8183
go func(m *GlobalServerManager) {
8284
if tlsConf == nil {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 2024 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package httpserver
16+
17+
import (
18+
"github.com/lf-edge/ekuiper/contract/v2/api"
19+
20+
"github.com/lf-edge/ekuiper/v2/pkg/cast"
21+
"github.com/lf-edge/ekuiper/v2/pkg/modules"
22+
)
23+
24+
type SSEConnection struct {
25+
RecvTopic string
26+
SendTopic string
27+
id string
28+
props map[string]any
29+
cfg *sseConfig
30+
}
31+
32+
type sseConfig struct {
33+
Path string `json:"path"`
34+
Datasource string `json:"datasource"`
35+
}
36+
37+
func (s *SSEConnection) GetId(ctx api.StreamContext) string {
38+
return s.id
39+
}
40+
41+
func (s *SSEConnection) Provision(ctx api.StreamContext, conId string, props map[string]any) error {
42+
cfg := &sseConfig{}
43+
if err := cast.MapToStruct(props, cfg); err != nil {
44+
return err
45+
}
46+
if cfg.Path == "" && len(cfg.Datasource) > 0 {
47+
cfg.Path = cfg.Datasource
48+
}
49+
s.cfg = cfg
50+
s.id = conId
51+
s.props = props
52+
return nil
53+
}
54+
55+
func (s *SSEConnection) Dial(ctx api.StreamContext) error {
56+
rTopic, sTopic, err := RegisterSSEEndpoint(ctx, s.cfg.Datasource)
57+
if err != nil {
58+
return err
59+
}
60+
s.RecvTopic = rTopic
61+
s.SendTopic = sTopic
62+
return nil
63+
}
64+
65+
func (s *SSEConnection) Ping(ctx api.StreamContext) error {
66+
return nil
67+
}
68+
69+
func (s *SSEConnection) Close(ctx api.StreamContext) error {
70+
if s.cfg != nil {
71+
UnRegisterSSEEndpoint(s.cfg.Datasource)
72+
}
73+
return nil
74+
}
75+
76+
func CreateSSEConnection(ctx api.StreamContext) modules.Connection {
77+
return &SSEConnection{}
78+
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Copyright 2024 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package httpserver
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"net/http"
21+
"sync"
22+
23+
"github.com/lf-edge/ekuiper/contract/v2/api"
24+
25+
"github.com/lf-edge/ekuiper/v2/internal/conf"
26+
"github.com/lf-edge/ekuiper/v2/internal/io/memory/pubsub"
27+
)
28+
29+
const (
30+
SseTopicPrefix = "$$sse/"
31+
)
32+
33+
type sseEndpointContext struct {
34+
wg *sync.WaitGroup
35+
conns map[int64]context.CancelFunc
36+
}
37+
38+
func recvSseTopic(endpoint string) string {
39+
return fmt.Sprintf("%s/server/recv/%s", SseTopicPrefix, endpoint)
40+
}
41+
42+
func sendSseTopic(endpoint string) string {
43+
return fmt.Sprintf("%s/server/send/%s", SseTopicPrefix, endpoint)
44+
}
45+
46+
func RegisterSSEEndpoint(ctx api.StreamContext, endpoint string) (string, string, error) {
47+
managerLock.RLock()
48+
m := manager
49+
managerLock.RUnlock()
50+
if m == nil {
51+
return "", "", fmt.Errorf("http server is not running")
52+
}
53+
return m.RegisterSSEEndpoint(ctx, endpoint)
54+
}
55+
56+
func UnRegisterSSEEndpoint(endpoint string) {
57+
managerLock.RLock()
58+
m := manager
59+
managerLock.RUnlock()
60+
if m == nil {
61+
return
62+
}
63+
sctx := m.UnRegisterSSEEndpoint(endpoint)
64+
if sctx != nil {
65+
// wait all connections to close
66+
sctx.wg.Wait()
67+
}
68+
}
69+
70+
func (m *GlobalServerManager) RegisterSSEEndpoint(ctx api.StreamContext, endpoint string) (string, string, error) {
71+
conf.Log.Infof("sse endpoint %v register", endpoint)
72+
m.Lock()
73+
defer m.Unlock()
74+
rTopic := recvSseTopic(endpoint)
75+
sTopic := sendSseTopic(endpoint)
76+
pubsub.CreatePub(rTopic)
77+
78+
m.routes[endpoint] = func(w http.ResponseWriter, r *http.Request) {
79+
flusher, ok := w.(http.Flusher)
80+
if !ok {
81+
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
82+
return
83+
}
84+
85+
w.Header().Set("Content-Type", "text/event-stream")
86+
w.Header().Set("Cache-Control", "no-cache")
87+
w.Header().Set("Connection", "keep-alive")
88+
w.Header().Set("Access-Control-Allow-Origin", "*")
89+
flusher.Flush()
90+
91+
// Create a cancel context for this specific connection
92+
connCtx, cancel := context.WithCancel(r.Context())
93+
connID := int64(m.FetchInstanceID())
94+
wg, ok := m.AddSSEConnection(endpoint, connID, cancel)
95+
if !ok {
96+
return
97+
}
98+
defer func() {
99+
m.CloseSSEConnection(endpoint, connID)
100+
wg.Done()
101+
}()
102+
103+
// Create a subscription to the send topic
104+
// The sourceID must be unique for each connection to ensure all clients receive the message
105+
sourceID := fmt.Sprintf("sse/send/%v", connID)
106+
ch := pubsub.CreateSub(sTopic, nil, sourceID, 1024)
107+
defer pubsub.CloseSourceConsumerChannel(sTopic, sourceID)
108+
109+
conf.Log.Infof("sse client connected to %s", endpoint)
110+
111+
for {
112+
select {
113+
case <-connCtx.Done():
114+
conf.Log.Infof("sse client disconnected from %s", endpoint)
115+
return
116+
case d, ok := <-ch:
117+
if !ok {
118+
conf.Log.Infof("sse channel closed for %s", endpoint)
119+
return
120+
}
121+
data, ok := d.([]byte)
122+
if !ok || data == nil {
123+
continue
124+
}
125+
fmt.Fprintf(w, "data: %s\n\n", string(data))
126+
flusher.Flush()
127+
}
128+
}
129+
}
130+
131+
m.sseEndpoint[endpoint] = &sseEndpointContext{
132+
wg: &sync.WaitGroup{},
133+
conns: make(map[int64]context.CancelFunc),
134+
}
135+
m.router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
136+
m.RLock()
137+
h, ok := m.routes[endpoint]
138+
m.RUnlock()
139+
if ok {
140+
h(w, r)
141+
} else {
142+
w.WriteHeader(http.StatusNotFound)
143+
}
144+
})
145+
146+
conf.Log.Infof("sse endpoint %v registered success", endpoint)
147+
return rTopic, sTopic, nil
148+
}
149+
150+
func (m *GlobalServerManager) AddSSEConnection(endpoint string, connID int64, cancel context.CancelFunc) (*sync.WaitGroup, bool) {
151+
m.Lock()
152+
defer m.Unlock()
153+
sctx, ok := m.sseEndpoint[endpoint]
154+
if !ok {
155+
return nil, false
156+
}
157+
sctx.conns[connID] = cancel
158+
sctx.wg.Add(1)
159+
return sctx.wg, true
160+
}
161+
162+
func (m *GlobalServerManager) CloseSSEConnection(endpoint string, connID int64) {
163+
m.Lock()
164+
defer m.Unlock()
165+
sctx, ok := m.sseEndpoint[endpoint]
166+
if !ok {
167+
return
168+
}
169+
delete(sctx.conns, connID)
170+
}
171+
172+
func (m *GlobalServerManager) UnRegisterSSEEndpoint(endpoint string) *sseEndpointContext {
173+
conf.Log.Infof("sse endpoint %v unregister", endpoint)
174+
pubsub.RemovePub(recvSseTopic(endpoint))
175+
m.Lock()
176+
defer m.Unlock()
177+
178+
sctx, ok := m.sseEndpoint[endpoint]
179+
if !ok {
180+
delete(m.routes, endpoint)
181+
return nil
182+
}
183+
// Cancel all active connections
184+
for _, cancel := range sctx.conns {
185+
cancel()
186+
}
187+
delete(m.sseEndpoint, endpoint)
188+
delete(m.routes, endpoint)
189+
return sctx
190+
}

0 commit comments

Comments
 (0)