Skip to content

Commit c150989

Browse files
committed
Refactoring in preparation of dtls-server
None of this should alter working behavior, but several error cases are now handled more cleanly or at all.
2 parents 9bd1211 + 774a0c2 commit c150989

File tree

7 files changed

+113
-59
lines changed

7 files changed

+113
-59
lines changed

aiocoap/defaults.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,7 @@ def get_default_clienttransports(*, loop=None, use_env=True):
4646
if not oscore_missing_modules():
4747
yield 'oscore'
4848

49-
try:
50-
from DTLSSocket import dtls # noqa: F401
51-
except ImportError:
52-
pass
53-
else:
49+
if not dtls_missing_modules():
5450
yield 'tinydtls'
5551

5652
yield 'tcpclient'

aiocoap/messagemanager.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ def dispatch_message(self, message):
124124
self.log.warning("Received a message with code %s and type %s (those don't fit) from %s, ignoring it.", message.code, message.mtype, message.remote)
125125

126126
def dispatch_error(self, error, remote):
127+
if self._active_exchanges is None:
128+
# Not entirely sure where it is so far; better just raise a warning
129+
# than an exception later, nothing terminally bad should come of
130+
# this error.
131+
self.log.warning("Internal shutdown sequence msismatch: error dispatched through messagemanager after shutown")
132+
return
133+
127134
self.log.debug("Incoming error %s from %r", error, remote)
128135

129136
# cancel requests first, and then exchanges: cancelling the pending

aiocoap/tokenmanager.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,13 @@ def next_token(self):
6767
#
6868

6969
def dispatch_error(self, exception, remote):
70-
keys_for_removal = []
70+
if self.outgoing_requests is None:
71+
# Not entirely sure where it is so far; better just raise a warning
72+
# than an exception later, nothing terminally bad should come of
73+
# this error.
74+
self.log.warning("Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutown")
75+
return
76+
7177
# NetworkError is what we promise users to raise from request etc; if
7278
# it's already a NetworkError and possibly more descriptive (eg. a
7379
# TimeoutError), we'll just let it through (and thus allow

aiocoap/transports/generic_udp.py

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,24 @@
1212
class GenericMessageInterface(interfaces.MessageInterface):
1313
"""GenericMessageInterface is not a standalone implementation of a
1414
message inteface. It does implement everything between the MessageInterface
15-
and a not yet fully specified interface of "bound UDP sockets"."""
15+
and a not yet fully specified interface of "bound UDP sockets".
1616
17-
def __init__(self, ctx: interfaces.MessageManager, log, loop):
18-
self._ctx = ctx
19-
self._log = log
20-
self._loop = loop
17+
It delegates sending through the address objects (which persist through
18+
some time, given this is some kind of bound-socket scenario).
2119
22-
async def determine_remote(self, request):
23-
if request.requested_scheme not in ('coap', None):
24-
return None
20+
The user must:
21+
* set up a ._pool after construction with a shutdown and a connect method
22+
* provide their addresses with a send(bytes) method
23+
* pass incoming data to the _received_datagram and _received_exception methods
24+
"""
2525

26-
if request.unresolved_remote is not None:
27-
host, port = util.hostportsplit(request.unresolved_remote)
28-
port = port or COAP_PORT
29-
elif request.opt.uri_host:
30-
host = request.opt.uri_host
31-
port = request.opt.uri_port or COAP_PORT
32-
else:
33-
raise ValueError("No location found to send message to (neither in .opt.uri_host nor in .remote)")
26+
def __init__(self, mman: interfaces.MessageManager, log, loop):
27+
self._mman = mman
28+
self._log = log
29+
self._loop = loop
3430

35-
return await self._pool.connect((host, port))
31+
# Callbacks to be hooked up by the user of the class; feed data on to the
32+
# message manager
3633

3734
def _received_datagram(self, address, datagram):
3835
try:
@@ -41,17 +38,34 @@ def _received_datagram(self, address, datagram):
4138
self._log.warning("Ignoring unparsable message from %s", address)
4239
return
4340

44-
self._ctx.dispatch_message(message)
41+
self._mman.dispatch_message(message)
4542

4643
def _received_exception(self, address, exception):
47-
self._ctx.dispatch_error(exception, address)
44+
self._mman.dispatch_error(exception, address)
45+
46+
# Implementations of MessageInterface
4847

4948
def send(self, message):
50-
if self._ctx is None:
49+
if self._mman is None:
5150
self._log.info("Not sending message %r: transport is already shutting down.", message)
5251
else:
5352
message.remote.send(message.encode())
5453

5554
async def shutdown(self):
5655
await self._pool.shutdown()
57-
self._ctx = None
56+
self._mman = None
57+
58+
async def determine_remote(self, request):
59+
if request.requested_scheme not in ('coap', None):
60+
return None
61+
62+
if request.unresolved_remote is not None:
63+
host, port = util.hostportsplit(request.unresolved_remote)
64+
port = port or COAP_PORT
65+
elif request.opt.uri_host:
66+
host = request.opt.uri_host
67+
port = request.opt.uri_port or COAP_PORT
68+
else:
69+
raise ValueError("No location found to send message to (neither in .opt.uri_host nor in .remote)")
70+
71+
return await self._pool.connect((host, port))

aiocoap/transports/simple6.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,9 @@
5050
from .generic_udp import GenericMessageInterface
5151

5252
class _Connection(asyncio.DatagramProtocol, interfaces.EndpointAddress):
53-
def __init__(self, ready_callback, new_message_callback, new_error_callback, stored_sockaddr):
53+
def __init__(self, ready_callback, message_interface: "GenericMessageInterface", stored_sockaddr):
5454
self._ready_callback = ready_callback
55-
self._new_message_callback = new_message_callback
56-
self._new_error_callback = new_error_callback
55+
self._message_interface = message_interface
5756

5857
# This gets stored in the _Connection because not all implementations
5958
# of datagram transports will expose the get_extra_info('socket')
@@ -128,10 +127,10 @@ def connection_made(self, transport):
128127
del self._ready_callback
129128

130129
def datagram_received(self, data, address):
131-
self._new_message_callback(self, data)
130+
self._message_interface._received_datagram(self, data)
132131

133132
def error_received(self, exception):
134-
self._new_error_callback(self, exception)
133+
self._message_interface._received_exception(self, exception)
135134

136135
def connection_lost(self, exception):
137136
if exception is None:
@@ -141,14 +140,14 @@ def connection_lost(self, exception):
141140

142141
# whatever it is _DatagramClientSocketpoolSimple6 expects
143142

143+
# ... because generic_udp expects it from _DatagramClientSocketpoolSimple6
144144
def send(self, data):
145145
self._transport.sendto(data, None)
146146

147147
async def shutdown(self):
148148
self._stage = "shutting down"
149149
self._transport.abort()
150-
del self._new_message_callback
151-
del self._new_error_callback
150+
del self._message_interface
152151
self._stage = "destroyed"
153152

154153
class _DatagramClientSocketpoolSimple6:
@@ -174,13 +173,12 @@ class _DatagramClientSocketpoolSimple6:
174173
# which MessageInterface to talk to for sending), or we move the
175174
# MessageInterface out completely and have that object be the Protocol,
176175
# and the Protocol can even send new packages via the address
177-
def __init__(self, loop, new_message_callback, new_error_callback):
176+
def __init__(self, loop, mi: "GenericMessageInterface"):
178177
# using an OrderedDict to implement an LRU cache as it's suitable for that purpose according to its documentation
179178
self._sockets = OrderedDict()
180179

181180
self._loop = loop
182-
self._new_message_callback = new_message_callback
183-
self._new_error_callback = new_error_callback
181+
self._message_interface = mi
184182

185183
async def _maybe_purge_sockets(self):
186184
while len(self._sockets) >= self.max_sockets: # more of an if
@@ -209,7 +207,7 @@ async def connect(self, sockaddr):
209207

210208
ready = asyncio.get_running_loop().create_future()
211209
transport, protocol = await self._loop.create_datagram_endpoint(
212-
lambda: _Connection(lambda: ready.set_result(None), self._new_message_callback, self._new_error_callback, sockaddr),
210+
lambda: _Connection(lambda: ready.set_result(None), self._message_interface, sockaddr),
213211
remote_addr=sockaddr)
214212
await ready
215213

@@ -225,6 +223,10 @@ async def connect(self, sockaddr):
225223
return protocol
226224

227225
async def shutdown(self):
226+
# preventing the creation of new sockets early on, and generally
227+
# breaking cycles
228+
del self._message_interface
229+
228230
if self._sockets:
229231
done, pending = await asyncio.wait([
230232
asyncio.create_task(s.shutdown())
@@ -239,7 +241,8 @@ class MessageInterfaceSimple6(GenericMessageInterface):
239241
async def create_client_transport_endpoint(cls, ctx, log, loop):
240242
self = cls(ctx, log, loop)
241243

242-
self._pool = _DatagramClientSocketpoolSimple6(self._loop, self._received_datagram, self._received_exception)
244+
# Cyclic reference broken during shutdown
245+
self._pool = _DatagramClientSocketpoolSimple6(self._loop, self)
243246
return self
244247

245248
async def recognize_remote(self, remote):

aiocoap/transports/simplesocketserver.py

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import asyncio
4141
from collections import namedtuple
4242

43+
from .. import error
4344
from ..numbers import COAP_PORT
4445
from .. import interfaces
4546
from .generic_udp import GenericMessageInterface
@@ -87,7 +88,7 @@ class _DatagramServerSocketSimple(asyncio.DatagramProtocol):
8788
_Address = _Address
8889

8990
@classmethod
90-
async def create(cls, bind, log, loop, new_message_callback, new_error_callback):
91+
async def create(cls, bind, log, loop, message_interface: "GenericMessageInterface"):
9192
if bind is None or bind[0] in ('::', '0.0.0.0', '', None):
9293
# If you feel tempted to remove this check, think about what
9394
# happens if two configured addresses can both route to a
@@ -100,7 +101,7 @@ async def create(cls, bind, log, loop, new_message_callback, new_error_callback)
100101
ready = asyncio.get_running_loop().create_future()
101102

102103
transport, protocol = await loop.create_datagram_endpoint(
103-
lambda: cls(ready.set_result, new_message_callback, new_error_callback, log),
104+
lambda: cls(ready.set_result, message_interface, log),
104105
local_addr=bind,
105106
reuse_port=defaults.has_reuse_port(),
106107
)
@@ -110,12 +111,13 @@ async def create(cls, bind, log, loop, new_message_callback, new_error_callback)
110111
# hostinfo), and can thus store the local hostinfo without distinction
111112
protocol.hostinfo_local = hostportjoin(bind[0], bind[1] if bind[1] != COAP_PORT else None)
112113

113-
return await ready
114+
self = await ready
115+
self._loop = loop
116+
return self
114117

115-
def __init__(self, ready_callback, new_message_callback, new_error_callback, log):
118+
def __init__(self, ready_callback, message_interface: "GenericMessageInterface", log):
116119
self._ready_callback = ready_callback
117-
self._new_message_callback = new_message_callback
118-
self._new_error_callback = new_error_callback
120+
self._message_interface = message_interface
119121
self.log = log
120122

121123
async def shutdown(self):
@@ -124,10 +126,26 @@ async def shutdown(self):
124126
# interface like _DatagramClientSocketpoolSimple6
125127

126128
async def connect(self, sockaddr):
127-
# FIXME it might be necessary to resolve the address now to get a
128-
# canonical form that can be recognized later when a package comes back
129+
# FIXME this is not regularly tested either
130+
129131
self.log.warning("Sending initial messages via a server socket is not recommended")
130-
return self._Address(self, sockaddr)
132+
# A legitimate case is when something stores return addresses as
133+
# URI(part)s and not as remotes. (In similar transports this'd also be
134+
# the case if the address's connection is dropped from the pool, but
135+
# that doesn't happen here since there is no pooling as there is no
136+
# per-connection state).
137+
138+
# getaddrinfo is not only to needed to resolve any host names (which
139+
# would not be recognized otherwise), but also to get a complete (host,
140+
# port, zoneinfo, whatwasthefourth) tuple from what is passed in as a
141+
# (host, port) tuple.
142+
addresses = await self._loop.getaddrinfo(*sockaddr, family=self._transport.get_extra_info('socket').family)
143+
if not addresses:
144+
raise error.NetworkError("No addresses found for %s" % sockaddr[0])
145+
# FIXME could do happy eyebals
146+
address = addresses[0][4]
147+
address = self._Address(self, address)
148+
return address
131149

132150
# datagram protocol interface
133151

@@ -137,19 +155,23 @@ def connection_made(self, transport):
137155
del self._ready_callback
138156

139157
def datagram_received(self, data, sockaddr):
140-
self._new_message_callback(self._Address(self, sockaddr), data)
158+
self._message_interface._received_datagram(self._Address(self, sockaddr), data)
141159

142160
def error_received(self, exception):
143161
# This is why this whole implementation is a bad idea (but still the best we got on some platforms)
144162
self.log.warning("Ignoring error because it can not be mapped to any connection: %s", exception)
145163

146164
def connection_lost(self, exception):
147165
if exception is None:
148-
pass
166+
pass # regular shutdown
149167
else:
150168
self.log.error("Received unexpected connection loss: %s", exception)
151169

152170
class MessageInterfaceSimpleServer(GenericMessageInterface):
171+
# for alteration by tinydtls_server
172+
_default_port = COAP_PORT
173+
_serversocket = _DatagramServerSocketSimple
174+
153175
@classmethod
154176
async def create_server(cls, bind, ctx: interfaces.MessageManager, log, loop):
155177
self = cls(ctx, log, loop)
@@ -158,11 +180,17 @@ async def create_server(cls, bind, ctx: interfaces.MessageManager, log, loop):
158180
# servers that want a random port (eg. when the service URLs are
159181
# advertised out-of-band anyway). LwM2M clients should use simple6
160182
# instead as outlined there.
161-
bind = (bind[0], COAP_PORT if bind[1] is None else bind[1])
183+
bind = (bind[0], self._default_port if bind[1] is None else bind[1] + (self._default_port - COAP_PORT))
162184

163-
self._pool = await _DatagramServerSocketSimple.create(bind, log, self._loop, self._received_datagram, self._received_exception)
185+
# Cyclic reference broken during shutdown
186+
self._pool = await self._serversocket.create(bind, log, self._loop, self)
164187

165188
return self
166189

167190
async def recognize_remote(self, remote):
168-
return isinstance(remote, _Address) and remote in remote.serversocket is self._pool
191+
# FIXME: This is never tested (as is the connect method) because all
192+
# tests create client contexts client-side (which don't build a
193+
# simplesocketserver), and because even when a server context is
194+
# created, there's a simple6 that grabs such addresses before a request
195+
# is sent out
196+
return isinstance(remote, _Address) and remote.serversocket is self._pool

aiocoap/transports/tinydtls.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,18 +151,18 @@ def send(self, message):
151151

152152
log = property(lambda self: self.coaptransport.log)
153153

154-
def _build_accessor(self, method):
154+
def _build_accessor(self, method, deadvalue):
155155
"""Think self._build_accessor('_write')() == self._write(), just that
156156
it's returning a weak wrapper that allows refcounting-based GC to
157157
happen when the remote falls out of use"""
158158
weakself = weakref.ref(self)
159-
def wrapper(*args, __weakself=weakself, __method=method):
159+
def wrapper(*args, __weakself=weakself, __method=method, __deadvalue=deadvalue):
160160
self = __weakself()
161161
if self is None:
162162
warnings.warn("DTLS module did not shut down the DTLSSocket "
163163
"perfectly; it still tried to call %s in vain" %
164164
__method)
165-
return
165+
return __deadvalue
166166
return getattr(self, __method)(*args)
167167
wrapper.__name__ = "_build_accessor(%s)" % method
168168
return wrapper
@@ -181,9 +181,9 @@ async def _start(self):
181181
)
182182

183183
self._dtls_socket = dtls.DTLS(
184-
read=self._build_accessor("_read"),
185-
write=self._build_accessor("_write"),
186-
event=self._build_accessor("_event"),
184+
read=self._build_accessor("_read", 0),
185+
write=self._build_accessor("_write", 0),
186+
event=self._build_accessor("_event", 0),
187187
pskId=self._pskId,
188188
pskStore={self._pskId: self._psk},
189189
)

0 commit comments

Comments
 (0)