@@ -189,6 +189,7 @@ const (
189
189
mqttProcessSubTooLong = 100 * time .Millisecond
190
190
mqttDefaultRetainedCacheTTL = 2 * time .Minute
191
191
mqttRetainedTransferTimeout = 10 * time .Second
192
+ mqttDefaultJSAPITimeout = 5 * time .Second
192
193
)
193
194
194
195
const (
@@ -209,30 +210,30 @@ var (
209
210
mqttOldProtoName = []byte ("MQIsdp" )
210
211
mqttSessJailDur = mqttSessFlappingJailDur
211
212
mqttFlapCleanItvl = mqttSessFlappingCleanupInterval
212
- mqttJSAPITimeout = 4 * time .Second
213
213
mqttRetainedCacheTTL = mqttDefaultRetainedCacheTTL
214
214
)
215
215
216
216
var (
217
- errMQTTNotWebsocketPort = errors .New ("MQTT clients over websocket must connect to the Websocket port, not the MQTT port" )
218
- errMQTTTopicFilterCannotBeEmpty = errors .New ("topic filter cannot be empty" )
219
- errMQTTMalformedVarInt = errors .New ("malformed variable int" )
220
- errMQTTSecondConnectPacket = errors .New ("received a second CONNECT packet" )
221
- errMQTTServerNameMustBeSet = errors .New ("mqtt requires server name to be explicitly set" )
222
- errMQTTUserMixWithUsersNKeys = errors .New ("mqtt authentication username not compatible with presence of users/nkeys" )
223
- errMQTTTokenMixWIthUsersNKeys = errors .New ("mqtt authentication token not compatible with presence of users/nkeys" )
224
- errMQTTAckWaitMustBePositive = errors .New ("ack wait must be a positive value" )
225
- errMQTTStandaloneNeedsJetStream = errors .New ("mqtt requires JetStream to be enabled if running in standalone mode" )
226
- errMQTTConnFlagReserved = errors .New ("connect flags reserved bit not set to 0" )
227
- errMQTTWillAndRetainFlag = errors .New ("if Will flag is set to 0, Will Retain flag must be 0 too" )
228
- errMQTTPasswordFlagAndNoUser = errors .New ("password flag set but username flag is not" )
229
- errMQTTCIDEmptyNeedsCleanFlag = errors .New ("when client ID is empty, clean session flag must be set to 1" )
230
- errMQTTEmptyWillTopic = errors .New ("empty Will topic not allowed" )
231
- errMQTTEmptyUsername = errors .New ("empty user name not allowed" )
232
- errMQTTTopicIsEmpty = errors .New ("topic cannot be empty" )
233
- errMQTTPacketIdentifierIsZero = errors .New ("packet identifier cannot be 0" )
234
- errMQTTUnsupportedCharacters = errors .New ("character ' ' not supported for MQTT topics" )
235
- errMQTTInvalidSession = errors .New ("invalid MQTT session" )
217
+ errMQTTNotWebsocketPort = errors .New ("MQTT clients over websocket must connect to the Websocket port, not the MQTT port" )
218
+ errMQTTTopicFilterCannotBeEmpty = errors .New ("topic filter cannot be empty" )
219
+ errMQTTMalformedVarInt = errors .New ("malformed variable int" )
220
+ errMQTTSecondConnectPacket = errors .New ("received a second CONNECT packet" )
221
+ errMQTTServerNameMustBeSet = errors .New ("mqtt requires server name to be explicitly set" )
222
+ errMQTTUserMixWithUsersNKeys = errors .New ("mqtt authentication username not compatible with presence of users/nkeys" )
223
+ errMQTTTokenMixWIthUsersNKeys = errors .New ("mqtt authentication token not compatible with presence of users/nkeys" )
224
+ errMQTTAckWaitMustBePositive = errors .New ("ack wait must be a positive value" )
225
+ errMQTTJSAPITimeoutMustBePositive = errors .New ("JS API timeout must be a positive value" )
226
+ errMQTTStandaloneNeedsJetStream = errors .New ("mqtt requires JetStream to be enabled if running in standalone mode" )
227
+ errMQTTConnFlagReserved = errors .New ("connect flags reserved bit not set to 0" )
228
+ errMQTTWillAndRetainFlag = errors .New ("if Will flag is set to 0, Will Retain flag must be 0 too" )
229
+ errMQTTPasswordFlagAndNoUser = errors .New ("password flag set but username flag is not" )
230
+ errMQTTCIDEmptyNeedsCleanFlag = errors .New ("when client ID is empty, clean session flag must be set to 1" )
231
+ errMQTTEmptyWillTopic = errors .New ("empty Will topic not allowed" )
232
+ errMQTTEmptyUsername = errors .New ("empty user name not allowed" )
233
+ errMQTTTopicIsEmpty = errors .New ("topic cannot be empty" )
234
+ errMQTTPacketIdentifierIsZero = errors .New ("packet identifier cannot be 0" )
235
+ errMQTTUnsupportedCharacters = errors .New ("character ' ' not supported for MQTT topics" )
236
+ errMQTTInvalidSession = errors .New ("invalid MQTT session" )
236
237
)
237
238
238
239
type srvMQTT struct {
@@ -281,6 +282,7 @@ type mqttJSA struct {
281
282
quitCh chan struct {}
282
283
domain string // Domain or possibly empty. This is added to session subject.
283
284
domainSet bool // covers if domain was set, even to empty
285
+ timeout time.Duration
284
286
}
285
287
286
288
type mqttJSPubMsg struct {
@@ -696,6 +698,9 @@ func validateMQTTOptions(o *Options) error {
696
698
if mo .AckWait < 0 {
697
699
return errMQTTAckWaitMustBePositive
698
700
}
701
+ if mo .JSAPITimeout < 0 {
702
+ return errMQTTJSAPITimeoutMustBePositive
703
+ }
699
704
// If strictly standalone and there is no JS enabled, then it won't work...
700
705
// For leafnodes, we could either have remote(s) and it would be ok, or no
701
706
// remote but accept from a remote side that has "hub" property set, which
@@ -1152,6 +1157,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
1152
1157
c .acc = acc
1153
1158
1154
1159
id := s .NodeName ()
1160
+
1161
+ mqttJSAPITimeout := opts .MQTT .JSAPITimeout
1162
+ if mqttJSAPITimeout == 0 {
1163
+ mqttJSAPITimeout = mqttDefaultJSAPITimeout
1164
+ }
1165
+
1155
1166
replicas := opts .MQTT .StreamReplicas
1156
1167
if replicas <= 0 {
1157
1168
replicas = s .mqttDetermineReplicas ()
@@ -1163,12 +1174,13 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
1163
1174
sessLocked : make (map [string ]struct {}),
1164
1175
flappers : make (map [string ]int64 ),
1165
1176
jsa : mqttJSA {
1166
- id : id ,
1167
- c : c ,
1168
- rplyr : mqttJSARepliesPrefix + id + "." ,
1169
- sendq : newIPQueue [* mqttJSPubMsg ](s , qname + "send" ),
1170
- nuid : nuid .New (),
1171
- quitCh : quitCh ,
1177
+ id : id ,
1178
+ c : c ,
1179
+ rplyr : mqttJSARepliesPrefix + id + "." ,
1180
+ sendq : newIPQueue [* mqttJSPubMsg ](s , qname + "send" ),
1181
+ nuid : nuid .New (),
1182
+ quitCh : quitCh ,
1183
+ timeout : mqttJSAPITimeout ,
1172
1184
},
1173
1185
}
1174
1186
if ! testDisableRMSCache {
@@ -1546,7 +1558,7 @@ func (s *Server) mqttDetermineReplicas() int {
1546
1558
//////////////////////////////////////////////////////////////////////////////
1547
1559
1548
1560
func (jsa * mqttJSA ) newRequest (kind , subject string , hdr int , msg []byte ) (any , error ) {
1549
- return jsa .newRequestEx (kind , subject , _EMPTY_ , hdr , msg , mqttJSAPITimeout )
1561
+ return jsa .newRequestEx (kind , subject , _EMPTY_ , hdr , msg )
1550
1562
}
1551
1563
1552
1564
func (jsa * mqttJSA ) prefixDomain (subject string ) string {
@@ -1559,8 +1571,8 @@ func (jsa *mqttJSA) prefixDomain(subject string) string {
1559
1571
return subject
1560
1572
}
1561
1573
1562
- func (jsa * mqttJSA ) newRequestEx (kind , subject , cidHash string , hdr int , msg []byte , timeout time. Duration ) (any , error ) {
1563
- responses , err := jsa .newRequestExMulti (kind , subject , cidHash , []int {hdr }, [][]byte {msg }, timeout )
1574
+ func (jsa * mqttJSA ) newRequestEx (kind , subject , cidHash string , hdr int , msg []byte ) (any , error ) {
1575
+ responses , err := jsa .newRequestExMulti (kind , subject , cidHash , []int {hdr }, [][]byte {msg })
1564
1576
if err != nil {
1565
1577
return nil , err
1566
1578
}
@@ -1578,7 +1590,7 @@ func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []b
1578
1590
//
1579
1591
// Note that each response may represent an error and should be inspected as
1580
1592
// such by the caller.
1581
- func (jsa * mqttJSA ) newRequestExMulti (kind , subject , cidHash string , hdrs []int , msgs [][]byte , timeout time. Duration ) ([]* mqttJSAResponse , error ) {
1593
+ func (jsa * mqttJSA ) newRequestExMulti (kind , subject , cidHash string , hdrs []int , msgs [][]byte ) ([]* mqttJSAResponse , error ) {
1582
1594
if len (hdrs ) != len (msgs ) {
1583
1595
return nil , fmt .Errorf ("unreachable: invalid number of messages (%d) or header offsets (%d)" , len (msgs ), len (hdrs ))
1584
1596
}
@@ -1630,7 +1642,7 @@ func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int,
1630
1642
c := 0
1631
1643
responses := make ([]* mqttJSAResponse , len (msgs ))
1632
1644
start := time .Now ()
1633
- t := time .NewTimer (timeout )
1645
+ t := time .NewTimer (jsa . timeout )
1634
1646
defer t .Stop ()
1635
1647
for {
1636
1648
select {
@@ -1789,7 +1801,7 @@ func (jsa *mqttJSA) loadLastMsgForMulti(streamName string, subjects []string) ([
1789
1801
headerBytes = append (headerBytes , 0 )
1790
1802
}
1791
1803
1792
- all , err := jsa .newRequestExMulti (mqttJSAMsgLoad , fmt .Sprintf (JSApiMsgGetT , streamName ), _EMPTY_ , headerBytes , marshaled , mqttJSAPITimeout )
1804
+ all , err := jsa .newRequestExMulti (mqttJSAMsgLoad , fmt .Sprintf (JSApiMsgGetT , streamName ), _EMPTY_ , headerBytes , marshaled )
1793
1805
// all has the same order as subjects, preserve it as we unmarshal
1794
1806
responses := make ([]* JSApiMsgGetResponse , len (all ))
1795
1807
for i , v := range all {
@@ -1847,7 +1859,7 @@ func (jsa *mqttJSA) storeSessionMsg(domainTk, cidHash string, hdr int, msg []byt
1847
1859
1848
1860
// Passing cidHash will add it to the JS reply subject, so that we can use
1849
1861
// it in processSessionPersist.
1850
- smri , err := jsa .newRequestEx (mqttJSASessPersist , subject , cidHash , hdr , msg , mqttJSAPITimeout )
1862
+ smri , err := jsa .newRequestEx (mqttJSASessPersist , subject , cidHash , hdr , msg )
1851
1863
if err != nil {
1852
1864
return nil , err
1853
1865
}
@@ -2982,7 +2994,7 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve
2982
2994
}()
2983
2995
2984
2996
jsa := & as .jsa
2985
- sni , err := jsa .newRequestEx (mqttJSAStreamNames , JSApiStreams , _EMPTY_ , 0 , nil , 5 * time . Second )
2997
+ sni , err := jsa .newRequestEx (mqttJSAStreamNames , JSApiStreams , _EMPTY_ , 0 , nil )
2986
2998
if err != nil {
2987
2999
log .Errorf ("Unable to transfer MQTT session streams: %v" , err )
2988
3000
return
0 commit comments