Skip to content

Commit 73372c0

Browse files
authored
fix(io): add websocket scheme props (#3874)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent f09af7c commit 73372c0

File tree

7 files changed

+28
-16
lines changed

7 files changed

+28
-16
lines changed

docs/en_US/guide/sinks/builtin/websocket.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The action is used for publishing output message into websocket channel.
77
| Property name | Optional | Description |
88
| addr | false | The address of the websocket sink server, like: 127.0.0.1:8080 |
99
| path | true | The url path of the websocket sink server, like: /api/data |
10+
| scheme | true | The url scheme of the websocket sink server, like: ws or wss |
1011
| insecureSkipVerify | false | whether to ignore SSL verification |
1112
| certificationPath | true | websocket client ssl verification crt file path |
1213
| privateKeyPath | true | Key file path for websocket client SSL verification |

docs/en_US/guide/sources/builtin/websocket.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ When you need eKuiper as a websocket client, you need to specify the server addr
1515
```yaml
1616
default:
1717
addr: 127.0.0.1:8080
18+
scheme: ws
1819
```
1920
2021
```sql

docs/zh_CN/guide/sinks/builtin/websocket.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,19 @@
22

33
## 属性
44

5-
| 属性名称 | 是否必填 | 说明 |
6-
|--------------|------|------------------------------------------|
7-
| addr || websocket server 的地址,如: 127.0.0.1:8080 |
8-
| path || websocket server 的 url path,如: /api/data |
9-
| insecureSkipVerify || 是否忽略 SSL 验证 |
10-
| certificationPath || websocket 客户端 ssl 验证的 crt 文件路径 |
11-
| privateKeyPath || websocket 客户端 ssl 验证的 key 文件路径 |
12-
| rootCaPath || websocket 客户端 ssl 验证的 ca 证书文件路径 |
13-
| certficationRaw || websocket 客户端 ssl 验证,经过 base64 编码过的 crt 原文, 如果同时定义了 `certificationPath` 将会先用该参数。 |
14-
| privateKeyRaw || websocket 客户端 ssl 验证,经过 base64 编码过的的 key 原文, 如果同时定义了 `privateKeyPath` 将会先用该参数。 |
5+
| 属性名称 | 是否必填 | 说明 |
6+
|--------------------|------|-----------------------------------------------------------------------------------|
7+
| addr || websocket server 的地址,如: 127.0.0.1:8080 |
8+
| path || websocket server 的 url path,如: /api/data |
9+
| scheme || websocket server 的 url scheme,如: ws 或者 wss |
10+
| insecureSkipVerify || 是否忽略 SSL 验证 |
11+
| certificationPath || websocket 客户端 ssl 验证的 crt 文件路径 |
12+
| privateKeyPath || websocket 客户端 ssl 验证的 key 文件路径 |
13+
| rootCaPath || websocket 客户端 ssl 验证的 ca 证书文件路径 |
14+
| certficationRaw || websocket 客户端 ssl 验证,经过 base64 编码过的 crt 原文, 如果同时定义了 `certificationPath` 将会先用该参数。 |
15+
| privateKeyRaw || websocket 客户端 ssl 验证,经过 base64 编码过的的 key 原文, 如果同时定义了 `privateKeyPath` 将会先用该参数。 |
1516
| rootCARaw || websocket 客户端 ssl 验证,经过 base64 编码过的的 ca 原文, 如果同时定义了 `rootCAPath` 将会先用该参数。 |
16-
| checkConnection || 是否检查 websocket endpoint 已经存在连接 |
17+
| checkConnection || 是否检查 websocket endpoint 已经存在连接 |
1718

1819
其他通用的 sink 属性也支持,请参阅[公共属性](../overview.md#公共属性)
1920

docs/zh_CN/guide/sources/builtin/websocket.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ eKuiper 可以作为 websocket 客户端,向远端的 websocket 服务器发
1515
```yaml
1616
default:
1717
addr: 127.0.0.1:8080
18+
scheme: ws
1819
```
1920
2021
```sql

internal/io/http/httpserver/websocketConn.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ func (w *WebsocketConnection) GetId(ctx api.StreamContext) string {
3737
}
3838

3939
func (w *WebsocketConnection) Provision(ctx api.StreamContext, conId string, props map[string]any) error {
40-
cfg := &wscConfig{}
40+
cfg := &wscConfig{
41+
Scheme: "ws",
42+
}
4143
if err := cast.MapToStruct(props, cfg); err != nil {
4244
return err
4345
}
@@ -64,7 +66,7 @@ func (w *WebsocketConnection) Dial(ctx api.StreamContext) error {
6466
if err != nil {
6567
return err
6668
}
67-
c := NewWebsocketClient(w.cfg.Addr, w.cfg.Path, tlsConfig)
69+
c := NewWebsocketClient(w.cfg.Scheme, w.cfg.Addr, w.cfg.Path, tlsConfig)
6870
if err := c.Connect(); err != nil {
6971
return err
7072
}
@@ -78,6 +80,7 @@ type wscConfig struct {
7880
Path string `json:"path"`
7981
Datasource string `json:"datasource"`
8082
Addr string `json:"addr"`
83+
Scheme string `json:"scheme"`
8184
}
8285

8386
func (w *WebsocketConnection) Ping(ctx api.StreamContext) error {

internal/io/http/httpserver/websocket_client.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type WebsocketClient struct {
3232
RecvTopic string
3333
SendTopic string
3434

35+
scheme string
3536
addr string
3637
path string
3738
tlsConfig *tls.Config
@@ -40,8 +41,12 @@ type WebsocketClient struct {
4041
cancel context.CancelFunc
4142
}
4243

43-
func NewWebsocketClient(addr, path string, tlsConfig *tls.Config) *WebsocketClient {
44+
func NewWebsocketClient(scheme, addr, path string, tlsConfig *tls.Config) *WebsocketClient {
45+
if scheme == "" {
46+
scheme = "ws"
47+
}
4448
return &WebsocketClient{
49+
scheme: scheme,
4550
addr: addr,
4651
path: path,
4752
tlsConfig: tlsConfig,
@@ -57,7 +62,7 @@ func (c *WebsocketClient) Connect() error {
5762
if len(c.addr) < 1 {
5863
return fmt.Errorf("addr should be defined")
5964
}
60-
u := url.URL{Scheme: "ws", Host: c.addr, Path: c.path}
65+
u := url.URL{Scheme: c.scheme, Host: c.addr, Path: c.path}
6166
conn, _, err := d.Dial(u.String(), nil)
6267
if err != nil {
6368
return err

internal/io/http/httpserver/websocket_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestWebsocketClient(t *testing.T) {
3131
s.Close()
3232
}()
3333
ctx := mockContext.NewMockContext("1", "2")
34-
wc := NewWebsocketClient(s.URL[len("http://"):], "/ws", nil)
34+
wc := NewWebsocketClient("ws", s.URL[len("http://"):], "/ws", nil)
3535
require.NoError(t, wc.Connect())
3636
rt, st := wc.Run(ctx)
3737
pubsub.CreatePub(st)

0 commit comments

Comments
 (0)