Skip to content

Commit b63db7c

Browse files
committed
MQTT: allow custom timeout for JS API calls
1 parent 91a53d0 commit b63db7c

File tree

3 files changed

+68
-34
lines changed

3 files changed

+68
-34
lines changed

server/mqtt.go

+46-34
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ const (
189189
mqttProcessSubTooLong = 100 * time.Millisecond
190190
mqttDefaultRetainedCacheTTL = 2 * time.Minute
191191
mqttRetainedTransferTimeout = 10 * time.Second
192+
mqttDefaultJSAPITimeout = 5 * time.Second
192193
)
193194

194195
const (
@@ -209,30 +210,30 @@ var (
209210
mqttOldProtoName = []byte("MQIsdp")
210211
mqttSessJailDur = mqttSessFlappingJailDur
211212
mqttFlapCleanItvl = mqttSessFlappingCleanupInterval
212-
mqttJSAPITimeout = 4 * time.Second
213213
mqttRetainedCacheTTL = mqttDefaultRetainedCacheTTL
214214
)
215215

216216
var (
217-
errMQTTNotWebsocketPort = errors.New("MQTT clients over websocket must connect to the Websocket port, not the MQTT port")
218-
errMQTTTopicFilterCannotBeEmpty = errors.New("topic filter cannot be empty")
219-
errMQTTMalformedVarInt = errors.New("malformed variable int")
220-
errMQTTSecondConnectPacket = errors.New("received a second CONNECT packet")
221-
errMQTTServerNameMustBeSet = errors.New("mqtt requires server name to be explicitly set")
222-
errMQTTUserMixWithUsersNKeys = errors.New("mqtt authentication username not compatible with presence of users/nkeys")
223-
errMQTTTokenMixWIthUsersNKeys = errors.New("mqtt authentication token not compatible with presence of users/nkeys")
224-
errMQTTAckWaitMustBePositive = errors.New("ack wait must be a positive value")
225-
errMQTTStandaloneNeedsJetStream = errors.New("mqtt requires JetStream to be enabled if running in standalone mode")
226-
errMQTTConnFlagReserved = errors.New("connect flags reserved bit not set to 0")
227-
errMQTTWillAndRetainFlag = errors.New("if Will flag is set to 0, Will Retain flag must be 0 too")
228-
errMQTTPasswordFlagAndNoUser = errors.New("password flag set but username flag is not")
229-
errMQTTCIDEmptyNeedsCleanFlag = errors.New("when client ID is empty, clean session flag must be set to 1")
230-
errMQTTEmptyWillTopic = errors.New("empty Will topic not allowed")
231-
errMQTTEmptyUsername = errors.New("empty user name not allowed")
232-
errMQTTTopicIsEmpty = errors.New("topic cannot be empty")
233-
errMQTTPacketIdentifierIsZero = errors.New("packet identifier cannot be 0")
234-
errMQTTUnsupportedCharacters = errors.New("character ' ' not supported for MQTT topics")
235-
errMQTTInvalidSession = errors.New("invalid MQTT session")
217+
errMQTTNotWebsocketPort = errors.New("MQTT clients over websocket must connect to the Websocket port, not the MQTT port")
218+
errMQTTTopicFilterCannotBeEmpty = errors.New("topic filter cannot be empty")
219+
errMQTTMalformedVarInt = errors.New("malformed variable int")
220+
errMQTTSecondConnectPacket = errors.New("received a second CONNECT packet")
221+
errMQTTServerNameMustBeSet = errors.New("mqtt requires server name to be explicitly set")
222+
errMQTTUserMixWithUsersNKeys = errors.New("mqtt authentication username not compatible with presence of users/nkeys")
223+
errMQTTTokenMixWIthUsersNKeys = errors.New("mqtt authentication token not compatible with presence of users/nkeys")
224+
errMQTTAckWaitMustBePositive = errors.New("ack wait must be a positive value")
225+
errMQTTJSAPITimeoutMustBePositive = errors.New("JS API timeout must be a positive value")
226+
errMQTTStandaloneNeedsJetStream = errors.New("mqtt requires JetStream to be enabled if running in standalone mode")
227+
errMQTTConnFlagReserved = errors.New("connect flags reserved bit not set to 0")
228+
errMQTTWillAndRetainFlag = errors.New("if Will flag is set to 0, Will Retain flag must be 0 too")
229+
errMQTTPasswordFlagAndNoUser = errors.New("password flag set but username flag is not")
230+
errMQTTCIDEmptyNeedsCleanFlag = errors.New("when client ID is empty, clean session flag must be set to 1")
231+
errMQTTEmptyWillTopic = errors.New("empty Will topic not allowed")
232+
errMQTTEmptyUsername = errors.New("empty user name not allowed")
233+
errMQTTTopicIsEmpty = errors.New("topic cannot be empty")
234+
errMQTTPacketIdentifierIsZero = errors.New("packet identifier cannot be 0")
235+
errMQTTUnsupportedCharacters = errors.New("character ' ' not supported for MQTT topics")
236+
errMQTTInvalidSession = errors.New("invalid MQTT session")
236237
)
237238

238239
type srvMQTT struct {
@@ -281,6 +282,7 @@ type mqttJSA struct {
281282
quitCh chan struct{}
282283
domain string // Domain or possibly empty. This is added to session subject.
283284
domainSet bool // covers if domain was set, even to empty
285+
timeout time.Duration
284286
}
285287

286288
type mqttJSPubMsg struct {
@@ -696,6 +698,9 @@ func validateMQTTOptions(o *Options) error {
696698
if mo.AckWait < 0 {
697699
return errMQTTAckWaitMustBePositive
698700
}
701+
if mo.JSAPITimeout < 0 {
702+
return errMQTTJSAPITimeoutMustBePositive
703+
}
699704
// If strictly standalone and there is no JS enabled, then it won't work...
700705
// For leafnodes, we could either have remote(s) and it would be ok, or no
701706
// remote but accept from a remote side that has "hub" property set, which
@@ -1152,6 +1157,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
11521157
c.acc = acc
11531158

11541159
id := s.NodeName()
1160+
1161+
mqttJSAPITimeout := opts.MQTT.JSAPITimeout
1162+
if mqttJSAPITimeout == 0 {
1163+
mqttJSAPITimeout = mqttDefaultJSAPITimeout
1164+
}
1165+
11551166
replicas := opts.MQTT.StreamReplicas
11561167
if replicas <= 0 {
11571168
replicas = s.mqttDetermineReplicas()
@@ -1163,12 +1174,13 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
11631174
sessLocked: make(map[string]struct{}),
11641175
flappers: make(map[string]int64),
11651176
jsa: mqttJSA{
1166-
id: id,
1167-
c: c,
1168-
rplyr: mqttJSARepliesPrefix + id + ".",
1169-
sendq: newIPQueue[*mqttJSPubMsg](s, qname+"send"),
1170-
nuid: nuid.New(),
1171-
quitCh: quitCh,
1177+
id: id,
1178+
c: c,
1179+
rplyr: mqttJSARepliesPrefix + id + ".",
1180+
sendq: newIPQueue[*mqttJSPubMsg](s, qname+"send"),
1181+
nuid: nuid.New(),
1182+
quitCh: quitCh,
1183+
timeout: mqttJSAPITimeout,
11721184
},
11731185
}
11741186
if !testDisableRMSCache {
@@ -1546,7 +1558,7 @@ func (s *Server) mqttDetermineReplicas() int {
15461558
//////////////////////////////////////////////////////////////////////////////
15471559

15481560
func (jsa *mqttJSA) newRequest(kind, subject string, hdr int, msg []byte) (any, error) {
1549-
return jsa.newRequestEx(kind, subject, _EMPTY_, hdr, msg, mqttJSAPITimeout)
1561+
return jsa.newRequestEx(kind, subject, _EMPTY_, hdr, msg)
15501562
}
15511563

15521564
func (jsa *mqttJSA) prefixDomain(subject string) string {
@@ -1559,8 +1571,8 @@ func (jsa *mqttJSA) prefixDomain(subject string) string {
15591571
return subject
15601572
}
15611573

1562-
func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []byte, timeout time.Duration) (any, error) {
1563-
responses, err := jsa.newRequestExMulti(kind, subject, cidHash, []int{hdr}, [][]byte{msg}, timeout)
1574+
func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []byte) (any, error) {
1575+
responses, err := jsa.newRequestExMulti(kind, subject, cidHash, []int{hdr}, [][]byte{msg})
15641576
if err != nil {
15651577
return nil, err
15661578
}
@@ -1578,7 +1590,7 @@ func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []b
15781590
//
15791591
// Note that each response may represent an error and should be inspected as
15801592
// such by the caller.
1581-
func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int, msgs [][]byte, timeout time.Duration) ([]*mqttJSAResponse, error) {
1593+
func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int, msgs [][]byte) ([]*mqttJSAResponse, error) {
15821594
if len(hdrs) != len(msgs) {
15831595
return nil, fmt.Errorf("unreachable: invalid number of messages (%d) or header offsets (%d)", len(msgs), len(hdrs))
15841596
}
@@ -1630,7 +1642,7 @@ func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int,
16301642
c := 0
16311643
responses := make([]*mqttJSAResponse, len(msgs))
16321644
start := time.Now()
1633-
t := time.NewTimer(timeout)
1645+
t := time.NewTimer(jsa.timeout)
16341646
defer t.Stop()
16351647
for {
16361648
select {
@@ -1789,7 +1801,7 @@ func (jsa *mqttJSA) loadLastMsgForMulti(streamName string, subjects []string) ([
17891801
headerBytes = append(headerBytes, 0)
17901802
}
17911803

1792-
all, err := jsa.newRequestExMulti(mqttJSAMsgLoad, fmt.Sprintf(JSApiMsgGetT, streamName), _EMPTY_, headerBytes, marshaled, mqttJSAPITimeout)
1804+
all, err := jsa.newRequestExMulti(mqttJSAMsgLoad, fmt.Sprintf(JSApiMsgGetT, streamName), _EMPTY_, headerBytes, marshaled)
17931805
// all has the same order as subjects, preserve it as we unmarshal
17941806
responses := make([]*JSApiMsgGetResponse, len(all))
17951807
for i, v := range all {
@@ -1847,7 +1859,7 @@ func (jsa *mqttJSA) storeSessionMsg(domainTk, cidHash string, hdr int, msg []byt
18471859

18481860
// Passing cidHash will add it to the JS reply subject, so that we can use
18491861
// it in processSessionPersist.
1850-
smri, err := jsa.newRequestEx(mqttJSASessPersist, subject, cidHash, hdr, msg, mqttJSAPITimeout)
1862+
smri, err := jsa.newRequestEx(mqttJSASessPersist, subject, cidHash, hdr, msg)
18511863
if err != nil {
18521864
return nil, err
18531865
}
@@ -2982,7 +2994,7 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve
29822994
}()
29832995

29842996
jsa := &as.jsa
2985-
sni, err := jsa.newRequestEx(mqttJSAStreamNames, JSApiStreams, _EMPTY_, 0, nil, 5*time.Second)
2997+
sni, err := jsa.newRequestEx(mqttJSAStreamNames, JSApiStreams, _EMPTY_, 0, nil)
29862998
if err != nil {
29872999
log.Errorf("Unable to transfer MQTT session streams: %v", err)
29883000
return

server/mqtt_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,11 @@ func TestMQTTValidateOptions(t *testing.T) {
469469
o.MQTT.AckWait = -10 * time.Second
470470
return o
471471
}, errMQTTAckWaitMustBePositive},
472+
{"js api timeout should be >=0", func() *Options {
473+
o := mqtto.Clone()
474+
o.MQTT.JSAPITimeout = -10 * time.Second
475+
return o
476+
}, errMQTTJSAPITimeoutMustBePositive},
472477
} {
473478
t.Run(test.name, func(t *testing.T) {
474479
err := validateMQTTOptions(test.getOpts())
@@ -627,6 +632,18 @@ func TestMQTTParseOptions(t *testing.T) {
627632
}
628633
return nil
629634
}, ""},
635+
{"js_api_timeout bad duration", `mqtt: {js_api_timeout: abc}`, nil, "invalid duration"},
636+
{"js_api_timeout",
637+
`
638+
mqtt {
639+
js_api_timeout: "60s"
640+
}
641+
`, func(o *MQTTOpts) error {
642+
if o.JSAPITimeout != 60*time.Second {
643+
return fmt.Errorf("Invalid JS API timeout: %v", o.JSAPITimeout)
644+
}
645+
return nil
646+
}, ""},
630647
} {
631648
t.Run(test.name, func(t *testing.T) {
632649
conf := createConfFile(t, []byte(test.content))

server/opts.go

+5
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,9 @@ type MQTTOpts struct {
616616
// PubRels).
617617
AckWait time.Duration
618618

619+
// JSAPITimeout defines timeout for JetStream api calls (default is 5 seconds)
620+
JSAPITimeout time.Duration
621+
619622
// MaxAckPending is the amount of QoS 1 and 2 messages (combined) the server
620623
// can send to a subscription without receiving any PUBACK for those
621624
// messages. The valid range is [0..65535].
@@ -5205,6 +5208,8 @@ func parseMQTT(v any, o *Options, errors *[]error, warnings *[]error) error {
52055208
o.MQTT.NoAuthUser = mv.(string)
52065209
case "ack_wait", "ackwait":
52075210
o.MQTT.AckWait = parseDuration("ack_wait", tk, mv, errors, warnings)
5211+
case "js_api_timeout", "api_timeout":
5212+
o.MQTT.JSAPITimeout = parseDuration("js_api_timeout", tk, mv, errors, warnings)
52085213
case "max_ack_pending", "max_pending", "max_inflight":
52095214
tmp := int(mv.(int64))
52105215
if tmp < 0 || tmp > 0xFFFF {

0 commit comments

Comments
 (0)