Skip to content

Commit 157fadd

Browse files
committed
fine-tuning steps to properly cancel or exit (CLOSING > DRAINING) and finally closing a connection. Still not sure why __aexit__ is not called for a client connection used as an async context manager but adding the finally-block to QuicClient.connect is the last resort. Closing from server side needs to be investigated when implementing the test case where endpoint switches to no-DATAGRAM after establishing the connection and then the peer sends a DATAGRAM, which leads to PROTOCOL_VIOLATION and closing the connection from one side (namely server). See details in comment to Issue #16
1 parent 059cebc commit 157fadd

4 files changed

Lines changed: 33 additions & 34 deletions

File tree

examples/echo_quic_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async def parent(num: int = 0):
6666
data = await client_conn.receive()
6767
print(f"receiver: got data {data!r}")
6868

69-
print(f'Stopping client {num}')
69+
print(f'Stopping client {num} in state={client_conn.state}')
7070

7171

7272
async def two_clients():

quicly/connection.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,14 @@ def close(self) -> None:
174174
if self._closed:
175175
return
176176
self._closed = True
177-
# TODO: cancel timers?
177+
# close and clean up resources:
178178
self.on_tx_recv.close()
179179
self.sending_ch.close()
180-
# TODO: how to communicate to Endpoint?
181-
# if self.endpoint.connections.get(self.remote_address) is self:
182-
# del self.endpoint.connections[self.remote_address]
183-
# Will wake any tasks waiting on self.q.r.receive() with a ClosedResourceError:
184180
self._stream_q.r.close()
185181
self._datagram_q.r.close()
182+
# TODO: how to communicate to Endpoint so that they can clean up?
183+
# if self.endpoint.connections.get(self.remote_address) is self:
184+
# del self.endpoint.connections[self.remote_address]
186185

187186
async def aclose(self) -> None:
188187
"""
@@ -209,7 +208,7 @@ async def __aexit__(
209208

210209
async def __anext__(self):
211210
# using this object to iterate asynchronously uses ReceiveChannel semantics (DATAGRAM frames).
212-
# `iter_stream_chunks()` is the async iterator for ReceiveStream semantics (STREAM frames).
211+
# `aiter_stream_chunks()` is the async iterator for ReceiveStream semantics (STREAM frames).
213212
# TODO: we could also switch to ReceiveStream if DATAGRAM are not supported by local transport parameters?
214213
try:
215214
return await self.receive() # ← Channel semantics instead of Stream!
@@ -219,7 +218,7 @@ async def __anext__(self):
219218
# TODO: this is only needed while Connection inherits both, ReceiveChannel and ReceiveStream:
220219
# TODO: use @as_safe_channel decorator?
221220
@trio.as_safe_channel
222-
async def iter_stream_chunks(self, max_bytes: int | None = None):
221+
async def aiter_stream_chunks(self, max_bytes: int | None = None):
223222
# since we direct `__anext__` above to use the channel receive semantics, if one wanted to use STREAM method
224223
# receive_some() in an async for loop, use this generator function
225224
while True:
@@ -445,7 +444,7 @@ async def do_handshake(self, hello_payload: bytes, remote_address: NetworkAddres
445444
async def on_tx(self, qpkt: QuicPacket) -> None:
446445
if self.is_closed:
447446
self._qlog.debug(f"Cannot send QUIC packet during connection closed - dropping.", packet=qpkt)
448-
return
447+
# return
449448

450449
possible_ack_frame = self._pn_space.to_ack_frame(QuicFrameType.ACK, now=trio.current_time())
451450
if possible_ack_frame is not None:

quicly/endpoint.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,16 @@ def __del__(self) -> None:
178178
)
179179

180180
def close(self) -> None:
181-
"""Close this socket, and all associated QUIC connections.
182-
This object can also be used as a context manager.
181+
"""
182+
Close this socket, and all associated QUIC connections.
183+
This object can also be used as a context manager as it implements __enter__ and __exit__.
183184
"""
184185
self._closed = True
185-
self.socket.close()
186-
for stream in list(self._connections.values()):
187-
stream.close()
186+
for connection in list(self._connections.values()):
187+
connection.close()
188188
self._new_connections_q.r.close() # alerts anyone waiting on receive(), e.g., the server or client handshake
189189
self._send_q.r.close()
190+
self.socket.close()
190191
self.dump_qlog()
191192

192193
def __enter__(self) -> Self:
@@ -217,13 +218,13 @@ async def _datagram_received(self, udp_payload: bytes, remote_address: Any) -> N
217218
# connection for this DCID not yet established:
218219
await self._new_connections_q.s.send((udp_payload, remote_address, destination_cid))
219220
else:
220-
self._qlog.debug(f"UDP datagram from known CID={hexdump(destination_cid)}", size=len(udp_payload))
221+
# self._qlog.debug(f"UDP datagram from known CID={hexdump(destination_cid)}", size=len(udp_payload))
221222
await destination.on_rx(list(decode_udp_packet(udp_payload, destination_cid)), remote_address)
222223

223224
def _config_logger(self, level: str | int):
224225
for h in self._qlog.handlers:
225226
if h.name == 'structlog-console':
226-
h.setLevel("DEBUG") # TODO: level)
227+
h.setLevel("DEBUG") # TODO: level) Linda: replace!
227228

228229
def dump_qlog(self):
229230
if isinstance(self._mem_qlog, QlogMemoryCollector):
@@ -301,7 +302,7 @@ async def handler(quic_connection):
301302
async def handle_connection(new_connection: SimpleQuicConnection) -> None:
302303
async with new_connection:
303304
await handler(new_connection, *args)
304-
# TODO: if we end up here, we should remove the connection from the endpoint's list!
305+
# TODO: if we end up here, we should remove the connection from the endpoint's list!?
305306

306307
async with trio.open_nursery() as nursery:
307308
self.start_endpoint(nursery)
@@ -435,6 +436,12 @@ async def watch_state():
435436
yield connection # give caller control; loops continue running in the nursery
436437
finally:
437438
# on context exit: cancel everything cleanly
439+
await connection.enter_closing(
440+
QuicFrame(
441+
QuicFrameType.APPLICATION_CLOSE,
442+
content=ConnectionCloseFrame(
443+
QuicErrorCode.NO_ERROR,
444+
reason=b'finally block'))
445+
if connection.state == ConnectionState.ESTABLISHED else None)
438446
nursery.cancel_scope.cancel()
439-
await connection.aclose()
440447
self.dump_qlog()

tests/test_endpoints.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ async def echo_handler(server_channel: SimpleQuicConnection) -> None:
8686
await trio.sleep(delay)
8787
await server_channel.send(payload)
8888
else:
89-
async with server_channel.iter_stream_chunks() as recv_chan:
89+
async with server_channel.aiter_stream_chunks() as recv_chan:
9090
async for payload in recv_chan:
9191
await trio.sleep(delay)
9292
await server_channel.send_all(payload)
@@ -102,8 +102,8 @@ async def echo_handler(server_channel: SimpleQuicConnection) -> None:
102102
if autocancel:
103103
nursery.cancel_scope.cancel()
104104

105-
# TODO: @parametrize_ipv6, also remove = False default below
106-
async def test_smoke_datagram(ipv6: bool = False) -> None:
105+
@parametrize_ipv6
106+
async def test_smoke_datagram(ipv6: bool) -> None:
107107
transport_parameters = {"max_datagram_frame_size" : 1200} # add DATAGRAM support
108108
async with quic_echo_server(True, ipv6=ipv6, delay=0,
109109
transport_parameters=transport_parameters) as (_server_endpoint, address):
@@ -121,9 +121,11 @@ async def test_smoke_datagram(ipv6: bool = False) -> None:
121121
await client_channel.send(b"goodbye")
122122
answer = await client_channel.receive()
123123
assert answer == b"goodbye"
124+
assert client_channel.state == ConnectionState.DRAINING
124125

125126
# TODO: test_fast_start (sending bytes with INITIAL...)
126127

128+
# TODO: @parametrize_ipv6, also remove = False default below
127129
async def test_one_sided_datagram1(ipv6: bool = False) -> None:
128130
transport_parameters = {"max_datagram_frame_size" : 2400} # add DATAGRAM support
129131
async with quic_echo_server(True, ipv6=ipv6, delay=0,
@@ -135,7 +137,6 @@ async def test_one_sided_datagram1(ipv6: bool = False) -> None:
135137
async with client.connect((get_localhost(ipv6, use_wildcard=False),) + address[1:],
136138
client_config) as connection:
137139
assert connection.state == ConnectionState.ESTABLISHED
138-
await trio.sleep(0.1) # let handshake finalize for server
139140
# server changes to not supporting DATAGRAM:
140141
server = cast(QuicServer, _server_endpoint)
141142
assert connection.host_cid in server._connections.keys()
@@ -146,6 +147,7 @@ async def test_one_sided_datagram1(ipv6: bool = False) -> None:
146147
# TODO: PROTOCOL_VIOLATION!
147148
await trio.sleep_forever() # let server close connection with PROTOCOL_VIOLATION
148149

150+
# TODO: @parametrize_ipv6, also remove = False default below
149151
async def test_one_sided_datagram2(ipv6: bool = False) -> None:
150152
# server does not support DATAGRAM:
151153
async with quic_echo_server(True, ipv6=ipv6, delay=0) as (_server_endpoint, address):
@@ -158,8 +160,8 @@ async def test_one_sided_datagram2(ipv6: bool = False) -> None:
158160
await connection.send(b"hello")
159161
await trio.sleep(1) # let server close connection with PROTOCOL_VIOLATION
160162

161-
# TODO: @parametrize_ipv6, also remove = False default below
162-
async def test_handshake_datagram(ipv6: bool = False) -> None:
163+
@parametrize_ipv6
164+
async def test_handshake_datagram(ipv6: bool) -> None:
163165
transport_parameters = {"max_datagram_frame_size" : 2400} # add DATAGRAM support
164166
async with quic_echo_server(True, ipv6=ipv6, delay=0,
165167
transport_parameters=transport_parameters) as (_server_endpoint, address):
@@ -175,16 +177,7 @@ async def test_handshake_datagram(ipv6: bool = False) -> None:
175177
assert connection.host_cid in server._connections.keys()
176178
server_connection = server._connections[connection.host_cid]
177179
assert server_connection.state == ConnectionState.ESTABLISHED
178-
# Linda: why not called upon leaving async context manager???
179-
# await connection.aclose()
180-
181-
# await trio.sleep(0) # let closing() commence?
182-
print(f"1st client state: {connection.state}")
183-
await trio.sleep(1) # let closing() commence?
184-
print(f"2nd client state: {connection.state}")
185-
await trio.sleep(0) # let closing() commence?
186-
187-
# TODO: check cleanly shutdown?
180+
assert connection.state == ConnectionState.DRAINING
188181

189182
@parametrize_ipv6
190183
async def test_handshake(ipv6: bool) -> None:

0 commit comments

Comments
 (0)