Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docs/en_US/api/restapi/ruletest.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ and it will automatically stop and clear after the time is exceeded. The general
follows:

1. [Create a test rule](#create-a-test-rule), get the id and port of the test rule.
2. Use the id and port of the test rule to connect and listen to the WebSocket service. Its service address
2. Use the id and port of the test rule to connect and listen to the SSE service. Its service address
is `http://locahost:10081/test/myid` where `10081` is the port value returned in step 1, and myid is the id of the
test rule.
test rule. Server-Sent Events (SSE) allows the server to push updates to the client. Connect via HTTP GET with header `Accept: text/event-stream`.
3. [Start the test rule](#start-the-test-rule), wait for the test rule to run. The rule running result will be returned
through the WebSocket service.
4. After the rule trial run ends, [delete the test rule](#delete-the-test-rule), and close the WebSocket service.
through the SSE service.
4. After the rule trial run ends, [delete the test rule](#delete-the-test-rule), and close the SSE service.

::: tip

The WebSocket service defaults to port 10081, which can be modified by the `httpServerPort` field in the `kuiper.yaml`
The SSE service defaults to port 10081, which can be modified by the `httpServerPort` field in the `kuiper.yaml`
configuration file. Before using the test rule, please make sure that this port is accessible.

:::
Expand Down Expand Up @@ -93,7 +93,7 @@ If created successfully, the return example is as follows:
}
```

After the rule is created successfully, the websocket endpoint starts. Users can listen to the websocket
After the rule is created successfully, the SSE endpoint starts. Users can listen to the SSE
address `http://locahost:10081/test/uuid` to get the result output. Among them, the port and id are the above return
values.

Expand All @@ -111,12 +111,12 @@ If creation fails, the status code is 400, return error information, an example
POST /ruletest/{id}/start
```

Start the trial run rule, WebSocket will be able to receive the data output after the rule runs.
Start the trial run rule, SSE will be able to receive the data output after the rule runs.

## Delete the Test Rule

```shell
DELETE /ruletest/{id}
```

Delete the trial run rule, WebSocket will stop the service.
Delete the trial run rule, SSE will stop the service.
16 changes: 8 additions & 8 deletions docs/zh_CN/api/restapi/ruletest.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
10 分钟),超过时间后会自动停止并清除。使用规则试运行的一般步骤如下:

1. [创建测试规则](#创建测试规则),获得测试规则的 id 和端口。
2. 使用测试规则的 id 和端口,连接并监听 WebSocket 服务。其服务地址为 `http://locahost:10081/test/myid` 其中 `10081` 为步骤1
返回的端口值,myid 为测试规则的 id。
3. [启动测试规则](#启动测试规则),等待测试规则运行。规则运行结果将通过 WebSocket 服务返回。
4. 规则试运行结束后,[删除测试规则](#删除测试规则),关闭 WebSocket 服务。
2. 使用测试规则的 id 和端口,连接并监听 SSE 服务。其服务地址为 `http://locahost:10081/test/myid` 其中 `10081` 为步骤1
返回的端口值,myid 为测试规则的 id。Server-Sent Events (SSE) 允许服务器向客户端推送更新。请使用带有请求头 `Accept: text/event-stream` 的 HTTP GET 请求进行连接。
3. [启动测试规则](#启动测试规则),等待测试规则运行。规则运行结果将通过 SSE 服务返回。
4. 规则试运行结束后,[删除测试规则](#删除测试规则),关闭 SSE 服务。

::: tip

WebSocket 服务默认采用 10081 端口,可通过配置文件 `kuiper.yaml` 中的 `httpServerPort` 字段修改。使用测试规则前,请确保该端口可访问。
SSE 服务默认采用 10081 端口,可通过配置文件 `kuiper.yaml` 中的 `httpServerPort` 字段修改。使用测试规则前,请确保该端口可访问。

:::

Expand Down Expand Up @@ -84,7 +84,7 @@ POST /ruletest
}
```

规则创建成功后,websocket endpoint 启动。用户可通过监听 websocket 地址 `http://locahost:10081/test/uuid` 获取结果输出。其中,端口和
规则创建成功后,SSE endpoint 启动。用户可通过监听 SSE 地址 `http://locahost:10081/test/uuid` 获取结果输出。其中,端口和
id 为上述返回值。

若创建失败,状态码为 400,返回错误信息,示例如下:
Expand All @@ -101,12 +101,12 @@ id 为上述返回值。
POST /ruletest/{id}/start
```

启动试运行规则,WebSocket 将可接收到规则运行后输出的数据。
启动试运行规则,SSE 将可接收到规则运行后输出的数据。

## 删除测试规则

```shell
DELETE /ruletest/{id}
```

删除试运行规则,WebSocket 将停止服务。
删除试运行规则,SSE 将停止服务。
3 changes: 3 additions & 0 deletions internal/binder/io/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/io/nexmark"
"github.com/lf-edge/ekuiper/v2/internal/io/simulator"
"github.com/lf-edge/ekuiper/v2/internal/io/sink"
"github.com/lf-edge/ekuiper/v2/internal/io/sse"
"github.com/lf-edge/ekuiper/v2/internal/io/websocket"
plugin2 "github.com/lf-edge/ekuiper/v2/internal/plugin"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
Expand All @@ -53,6 +54,7 @@ func init() {
modules.RegisterSink("neuron", neuron.GetSink)
modules.RegisterSink("file", file.GetSink)
modules.RegisterSink("websocket", func() api.Sink { return websocket.GetSink() })
modules.RegisterSink("sse", func() api.Sink { return sse.GetSink() })

modules.RegisterLookupSource("memory", memory.GetLookupSource)
modules.RegisterLookupSource("httppull", http.GetLookUpSource)
Expand All @@ -62,6 +64,7 @@ func init() {
modules.RegisterConnection("nng", nng.CreateConnection)
modules.RegisterConnection("httppush", httpserver.CreateConnection)
modules.RegisterConnection("websocket", httpserver.CreateWebsocketConnection)
modules.RegisterConnection("sse", httpserver.CreateSSEConnection)
}

type Manager struct{}
Expand Down
2 changes: 2 additions & 0 deletions internal/io/http/httpserver/data_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type GlobalServerManager struct {
routes map[string]http.HandlerFunc
upgrader websocket.Upgrader
websocketEndpoint map[string]*websocketEndpointContext
sseEndpoint map[string]*sseEndpointContext
}

var (
Expand Down Expand Up @@ -77,6 +78,7 @@ func InitGlobalServerManager(ip string, port int, tlsConf *model.TlsConf) {
router: r,
routes: map[string]http.HandlerFunc{},
upgrader: upgrader,
sseEndpoint: map[string]*sseEndpointContext{},
}
go func(m *GlobalServerManager) {
if tlsConf == nil {
Expand Down
78 changes: 78 additions & 0 deletions internal/io/http/httpserver/sse_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)

type SSEConnection struct {
RecvTopic string
SendTopic string
id string
props map[string]any
cfg *sseConfig
}

type sseConfig struct {
Path string `json:"path"`
Datasource string `json:"datasource"`
}

func (s *SSEConnection) GetId(ctx api.StreamContext) string {
return s.id
}

func (s *SSEConnection) Provision(ctx api.StreamContext, conId string, props map[string]any) error {
cfg := &sseConfig{}
if err := cast.MapToStruct(props, cfg); err != nil {
return err
}
if cfg.Path == "" && len(cfg.Datasource) > 0 {
cfg.Path = cfg.Datasource
}
s.cfg = cfg
s.id = conId
s.props = props
return nil
}

func (s *SSEConnection) Dial(ctx api.StreamContext) error {
rTopic, sTopic, err := RegisterSSEEndpoint(ctx, s.cfg.Datasource)
if err != nil {
return err
}
s.RecvTopic = rTopic
s.SendTopic = sTopic
return nil
}

func (s *SSEConnection) Ping(ctx api.StreamContext) error {
return nil
}

func (s *SSEConnection) Close(ctx api.StreamContext) error {
if s.cfg != nil {
UnRegisterSSEEndpoint(s.cfg.Datasource)
}
return nil
}

func CreateSSEConnection(ctx api.StreamContext) modules.Connection {
return &SSEConnection{}
}
190 changes: 190 additions & 0 deletions internal/io/http/httpserver/sse_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"context"
"fmt"
"net/http"
"sync"

"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/io/memory/pubsub"
)

const (
SseTopicPrefix = "$$sse/"
)

type sseEndpointContext struct {
wg *sync.WaitGroup
conns map[int64]context.CancelFunc
}

func recvSseTopic(endpoint string) string {
return fmt.Sprintf("%s/server/recv/%s", SseTopicPrefix, endpoint)
}

func sendSseTopic(endpoint string) string {
return fmt.Sprintf("%s/server/send/%s", SseTopicPrefix, endpoint)
}

func RegisterSSEEndpoint(ctx api.StreamContext, endpoint string) (string, string, error) {
managerLock.RLock()
m := manager
managerLock.RUnlock()
if m == nil {
return "", "", fmt.Errorf("http server is not running")
}
return m.RegisterSSEEndpoint(ctx, endpoint)
}

func UnRegisterSSEEndpoint(endpoint string) {
managerLock.RLock()
m := manager
managerLock.RUnlock()
if m == nil {
return
}
sctx := m.UnRegisterSSEEndpoint(endpoint)
if sctx != nil {
// wait all connections to close
sctx.wg.Wait()
}
}

func (m *GlobalServerManager) RegisterSSEEndpoint(ctx api.StreamContext, endpoint string) (string, string, error) {
conf.Log.Infof("sse endpoint %v register", endpoint)
m.Lock()
defer m.Unlock()
rTopic := recvSseTopic(endpoint)
sTopic := sendSseTopic(endpoint)
pubsub.CreatePub(rTopic)

m.routes[endpoint] = func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher.Flush()

// Create a cancel context for this specific connection
connCtx, cancel := context.WithCancel(r.Context())
connID := int64(m.FetchInstanceID())
wg, ok := m.AddSSEConnection(endpoint, connID, cancel)
if !ok {
return
}
defer func() {
m.CloseSSEConnection(endpoint, connID)
wg.Done()
}()

// Create a subscription to the send topic
// The sourceID must be unique for each connection to ensure all clients receive the message
sourceID := fmt.Sprintf("sse/send/%v", connID)
ch := pubsub.CreateSub(sTopic, nil, sourceID, 1024)
defer pubsub.CloseSourceConsumerChannel(sTopic, sourceID)

conf.Log.Infof("sse client connected to %s", endpoint)

for {
select {
case <-connCtx.Done():
conf.Log.Infof("sse client disconnected from %s", endpoint)
return
case d, ok := <-ch:
if !ok {
conf.Log.Infof("sse channel closed for %s", endpoint)
return
}
data, ok := d.([]byte)
if !ok || data == nil {
continue
}
fmt.Fprintf(w, "data: %s\n\n", string(data))
flusher.Flush()
}
}
}

m.sseEndpoint[endpoint] = &sseEndpointContext{
wg: &sync.WaitGroup{},
conns: make(map[int64]context.CancelFunc),
}
m.router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
m.RLock()
h, ok := m.routes[endpoint]
m.RUnlock()
if ok {
h(w, r)
} else {
w.WriteHeader(http.StatusNotFound)
}
})

conf.Log.Infof("sse endpoint %v registered success", endpoint)
return rTopic, sTopic, nil
}

func (m *GlobalServerManager) AddSSEConnection(endpoint string, connID int64, cancel context.CancelFunc) (*sync.WaitGroup, bool) {
m.Lock()
defer m.Unlock()
sctx, ok := m.sseEndpoint[endpoint]
if !ok {
return nil, false
}
sctx.conns[connID] = cancel
sctx.wg.Add(1)
return sctx.wg, true
}

func (m *GlobalServerManager) CloseSSEConnection(endpoint string, connID int64) {
m.Lock()
defer m.Unlock()
sctx, ok := m.sseEndpoint[endpoint]
if !ok {
return
}
delete(sctx.conns, connID)
}

func (m *GlobalServerManager) UnRegisterSSEEndpoint(endpoint string) *sseEndpointContext {
conf.Log.Infof("sse endpoint %v unregister", endpoint)
pubsub.RemovePub(recvSseTopic(endpoint))
m.Lock()
defer m.Unlock()

sctx, ok := m.sseEndpoint[endpoint]
if !ok {
delete(m.routes, endpoint)
return nil
}
// Cancel all active connections
for _, cancel := range sctx.conns {
cancel()
}
delete(m.sseEndpoint, endpoint)
delete(m.routes, endpoint)
return sctx
}
Loading
Loading