Skip to content

Commit fc8f448

Browse files
authored
fix(mqtt): fix mqtt session config (#3864)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent e3c029f commit fc8f448

File tree

2 files changed

+19
-13
lines changed

2 files changed

+19
-13
lines changed

internal/io/mqtt/v4client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func Provision(ctx api.StreamContext, props map[string]any, onConnect client.Con
7777
}
7878

7979
cli := pahoMqtt.NewClient(opts)
80-
return &Client{cli: cli}, nil
80+
return &Client{cli: cli, EnableClientSession: c.EnableClientSession}, nil
8181
}
8282

8383
func (c *Client) Connect(_ api.StreamContext) error {
@@ -136,7 +136,7 @@ func (c *Client) Unsubscribe(_ api.StreamContext, topic string) error {
136136
}
137137

138138
func (c *Client) Disconnect(_ api.StreamContext) {
139-
c.cli.Disconnect(1)
139+
c.cli.Disconnect(1000)
140140
}
141141

142142
func ValidateConfig(ctx api.StreamContext, props map[string]any) (*ConnectionConfig, error) {

internal/io/mqtt/v5client/client.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package v5client
1616

1717
import (
18+
"context"
1819
"crypto/tls"
1920
"errors"
2021
"fmt"
@@ -47,14 +48,15 @@ type Client struct {
4748
}
4849

4950
type ConnectionConfig struct {
50-
Server string `json:"server"`
51-
ClientId string `json:"clientid"`
52-
Uname string `json:"username"`
53-
Password string `json:"password"`
54-
EnableClientSession bool `json:"enableClientSession"`
55-
ClientStatePath string `json:"clientStatePath"`
56-
serverUrl *url.URL
57-
tls *tls.Config
51+
Server string `json:"server"`
52+
ClientId string `json:"clientid"`
53+
Uname string `json:"username"`
54+
Password string `json:"password"`
55+
EnableClientSession bool `json:"enableClientSession"`
56+
ClientStatePath string `json:"clientStatePath"`
57+
SessionExpiryIntervalSeconds int `json:"sessionExpiryIntervalSeconds"`
58+
serverUrl *url.URL
59+
tls *tls.Config
5860
}
5961

6062
func Provision(ctx api.StreamContext, props map[string]any, onConnect client.ConnectHandler, onConnectLost client.ConnectErrorHandler, _ client.ConnectHandler) (*Client, error) {
@@ -79,7 +81,7 @@ func Provision(ctx api.StreamContext, props map[string]any, onConnect client.Con
7981
// It is important to set this because otherwise, any queued messages will be lost if the connection drops and
8082
// the server will not queue messages while it is down. The specific setting will depend upon your needs
8183
// (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 0xFFFFFFFE = 136 years, 0xFFFFFFFF = don't expire)
82-
SessionExpiryInterval: 60,
84+
SessionExpiryInterval: uint32(cc.SessionExpiryIntervalSeconds),
8385
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
8486
onConnect(ctx)
8587
},
@@ -235,7 +237,9 @@ func (c *Client) Unsubscribe(ctx api.StreamContext, topic string) error {
235237
}
236238

237239
func (c *Client) Disconnect(ctx api.StreamContext) {
238-
err := c.cm.Disconnect(ctx)
240+
dctx, dcancel := context.WithTimeout(context.Background(), time.Second*5)
241+
defer dcancel()
242+
err := c.cm.Disconnect(dctx)
239243
if err != nil {
240244
ctx.GetLogger().Warnf("disconnect error: %s", err)
241245
}
@@ -263,7 +267,9 @@ func (c *Client) ParseMsg(ctx api.StreamContext, msg any) ([]byte, map[string]an
263267
}
264268

265269
func ValidateConfig(ctx api.StreamContext, props map[string]any) (*ConnectionConfig, error) {
266-
c := &ConnectionConfig{}
270+
c := &ConnectionConfig{
271+
SessionExpiryIntervalSeconds: 60,
272+
}
267273
err := cast.MapToStruct(props, c)
268274
if err != nil {
269275
return nil, err

0 commit comments

Comments
 (0)