14
14
import abc
15
15
import asyncio
16
16
import base64
17
+ import functools
17
18
import hashlib
19
+ import logging
18
20
import os
19
21
import sys
20
22
import struct
@@ -97,6 +99,9 @@ def log_exception(
97
99
98
100
_default_max_message_size = 10 * 1024 * 1024
99
101
102
+ # log to "gen_log" but suppress duplicate log messages
103
+ de_dupe_gen_log = functools .lru_cache (gen_log .log )
104
+
100
105
101
106
class WebSocketError (Exception ):
102
107
pass
@@ -274,17 +279,40 @@ async def get(self, *args: Any, **kwargs: Any) -> None:
274
279
275
280
@property
276
281
def ping_interval (self ) -> Optional [float ]:
277
- """The interval for websocket keep-alive pings.
282
+ """Send periodic pings down the websocket.
283
+
284
+ This will send a ping every ``wesocket_ping_interval`` seconds.
285
+ The client will respond with a "pong". The connection can be configured
286
+ to timeout on late pong delivery using ``websocket_ping_timeout``.
278
287
279
- Set websocket_ping_interval = 0 to disable pings.
288
+ Set ``websocket_ping_interval = 0`` to disable pings.
289
+
290
+ Default: ``0``
280
291
"""
281
292
return self .settings .get ("websocket_ping_interval" , None )
282
293
283
294
@property
284
295
def ping_timeout (self ) -> Optional [float ]:
285
- """If no ping is received in this many seconds,
286
- close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
287
- Default is max of 3 pings or 30 seconds.
296
+ """Timeout if no pong is received in this many seconds.
297
+
298
+ To be used in combination with ``websocket_ping_interval > 0``.
299
+ If a ping response (a "pong") is not received within
300
+ ``websocket_ping_timeout`` seconds, then the websocket connection
301
+ will be closed.
302
+
303
+ This can help to clean up clients which have disconnected without
304
+ cleanly closing the websocket connection.
305
+
306
+ Note, the ping timeout cannot be longer than the ping interval.
307
+
308
+ Set ``websocket_ping_timeout = 0`` to disable the ping timeout.
309
+
310
+ Default: ``min(ping_interval, 30)``
311
+
312
+ .. versionchanged:: 6.5.0
313
+ Default changed from the max of 3 pings or 30 seconds.
314
+ The ping timeout can no longer be configured longer than the
315
+ ping interval.
288
316
"""
289
317
return self .settings .get ("websocket_ping_timeout" , None )
290
318
@@ -832,10 +860,10 @@ def __init__(
832
860
self ._wire_bytes_in = 0
833
861
self ._wire_bytes_out = 0
834
862
self .ping_callback = None # type: Optional[PeriodicCallback]
835
- self .last_ping = 0.0
836
863
self .last_pong = 0.0
837
864
self .close_code = None # type: Optional[int]
838
865
self .close_reason = None # type: Optional[str]
866
+ self ._ping_coroutine = None # type: Optional[asyncio.Task]
839
867
840
868
# Use a property for this to satisfy the abc.
841
869
@property
@@ -1266,9 +1294,9 @@ def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> Non
1266
1294
self ._waiting = self .stream .io_loop .add_timeout (
1267
1295
self .stream .io_loop .time () + 5 , self ._abort
1268
1296
)
1269
- if self .ping_callback :
1270
- self .ping_callback . stop ()
1271
- self .ping_callback = None
1297
+ if self ._ping_coroutine :
1298
+ self ._ping_coroutine . cancel ()
1299
+ self ._ping_coroutine = None
1272
1300
1273
1301
def is_closing (self ) -> bool :
1274
1302
"""Return ``True`` if this connection is closing.
@@ -1279,60 +1307,76 @@ def is_closing(self) -> bool:
1279
1307
"""
1280
1308
return self .stream .closed () or self .client_terminated or self .server_terminated
1281
1309
1310
+ def set_nodelay (self , x : bool ) -> None :
1311
+ self .stream .set_nodelay (x )
1312
+
1282
1313
@property
1283
- def ping_interval (self ) -> Optional [ float ] :
1314
+ def ping_interval (self ) -> float :
1284
1315
interval = self .params .ping_interval
1285
1316
if interval is not None :
1286
1317
return interval
1287
1318
return 0
1288
1319
1289
1320
@property
1290
- def ping_timeout (self ) -> Optional [ float ] :
1321
+ def ping_timeout (self ) -> float :
1291
1322
timeout = self .params .ping_timeout
1292
1323
if timeout is not None :
1324
+ if self .ping_interval and timeout > self .ping_interval :
1325
+ de_dupe_gen_log (
1326
+ # Note: using de_dupe_gen_log to prevent this message from
1327
+ # being duplicated for each connection
1328
+ logging .WARNING ,
1329
+ f'The websocket_ping_timeout ({ timeout } ) cannot be longer'
1330
+ f' than the websocket_ping_interval ({ self .ping_interval } ).'
1331
+ f'\n Setting websocket_ping_timeout={ self .ping_interval } '
1332
+ )
1333
+ return self .ping_interval
1293
1334
return timeout
1294
- assert self .ping_interval is not None
1295
- return max (3 * self .ping_interval , 30 )
1335
+ return max (self .ping_interval , 30 )
1296
1336
1297
1337
def start_pinging (self ) -> None :
1298
1338
"""Start sending periodic pings to keep the connection alive"""
1299
- assert self .ping_interval is not None
1300
- if self .ping_interval > 0 :
1301
- self .last_ping = self .last_pong = IOLoop .current ().time ()
1302
- self .ping_callback = PeriodicCallback (
1303
- self .periodic_ping , self .ping_interval * 1000
1304
- )
1305
- self .ping_callback .start ()
1339
+ if not self ._ping_coroutine :
1340
+ self ._ping_coroutine = asyncio .create_task (self .periodic_ping ())
1306
1341
1307
- def periodic_ping (self ) -> None :
1308
- """Send a ping to keep the websocket alive
1342
+ async def periodic_ping (self ) -> None :
1343
+ """Send a ping and wait for a pong if ping_timeout is configured.
1309
1344
1310
1345
Called periodically if the websocket_ping_interval is set and non-zero.
1311
1346
"""
1312
- if self .is_closing () and self .ping_callback is not None :
1313
- self .ping_callback .stop ()
1314
- return
1347
+ interval = self .ping_interval
1348
+ timeout = self .ping_timeout
1315
1349
1316
- # Check for timeout on pong. Make sure that we really have
1317
- # sent a recent ping in case the machine with both server and
1318
- # client has been suspended since the last ping.
1319
- now = IOLoop .current ().time ()
1320
- since_last_pong = now - self .last_pong
1321
- since_last_ping = now - self .last_ping
1322
- assert self .ping_interval is not None
1323
- assert self .ping_timeout is not None
1324
- if (
1325
- since_last_ping < 2 * self .ping_interval
1326
- and since_last_pong > self .ping_timeout
1327
- ):
1328
- self .close ()
1350
+ if interval <= 0 :
1351
+ # no pings configured
1329
1352
return
1330
1353
1331
- self .write_ping (b"" )
1332
- self .last_ping = now
1354
+ await asyncio .sleep (interval )
1333
1355
1334
- def set_nodelay (self , x : bool ) -> None :
1335
- self .stream .set_nodelay (x )
1356
+ while True :
1357
+ # send a ping
1358
+ ping_time = IOLoop .current ().time ()
1359
+ self .write_ping (b"" )
1360
+
1361
+ # wait until the ping timeout
1362
+ await asyncio .sleep (timeout )
1363
+
1364
+ # make sure we received a pong within the timeout
1365
+ if (
1366
+ timeout > 0
1367
+ # and ping_time - self.last_pong >= timeout
1368
+ and (
1369
+ # pong took longer than the timeout
1370
+ self .last_pong - ping_time > timeout
1371
+ # pong was not received
1372
+ or self .last_pong - ping_time < 0
1373
+ )
1374
+ ):
1375
+ self .close (reason = 'ping timed out' )
1376
+ return
1377
+
1378
+ # wait until the next scheduled ping
1379
+ await asyncio .sleep (interval - timeout )
1336
1380
1337
1381
1338
1382
class WebSocketClientConnection (simple_httpclient ._HTTPConnection ):
0 commit comments