Skip to content

Commit 4696607

Browse files
bretambroseBret Ambrose
and
Bret Ambrose
authored
Request response streams (#647)
Co-authored-by: Bret Ambrose <[email protected]>
1 parent af06aa5 commit 4696607

File tree

5 files changed

+469
-31
lines changed

5 files changed

+469
-31
lines changed

awscrt/mqtt_request_response.py

Lines changed: 160 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,95 @@
55
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
66
# SPDX-License-Identifier: Apache-2.0.
77

8+
from collections.abc import Sequence
9+
from enum import IntEnum
810
from dataclasses import dataclass
9-
from typing import Union
11+
from typing import Callable, Union, Optional
1012
from awscrt import NativeResource, mqtt5, mqtt, exceptions
1113
from concurrent.futures import Future
1214
import _awscrt
13-
import collections.abc
15+
16+
17+
class SubscriptionStatusEventType(IntEnum):
18+
"""
19+
The type of change to the state of a streaming operation subscription
20+
"""
21+
22+
SUBSCRIPTION_ESTABLISHED = 0
23+
"""
24+
The streaming operation is successfully subscribed to its topic (filter)
25+
"""
26+
27+
SUBSCRIPTION_LOST = 1
28+
"""
29+
The streaming operation has temporarily lost its subscription to its topic (filter)
30+
"""
31+
32+
SUBSCRIPTION_HALTED = 2
33+
"""
34+
The streaming operation has entered a terminal state where it has given up trying to subscribe
35+
to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy).
36+
"""
37+
38+
39+
@dataclass
40+
class SubscriptionStatusEvent:
41+
"""
42+
An event that describes a change in subscription status for a streaming operation.
43+
44+
Args:
45+
type (SubscriptionStatusEventType): The type of status change represented by the event
46+
error (Optional[Exception]): Describes an underlying reason for the event. Only set for SubscriptionLost and SubscriptionHalted.
47+
"""
48+
type: SubscriptionStatusEventType = None
49+
error: 'Optional[Exception]' = None
50+
51+
52+
@dataclass
53+
class IncomingPublishEvent:
54+
"""
55+
An event that describes an incoming message on a streaming operation.
56+
57+
Args:
58+
topic (str): MQTT Topic that the response was received on.
59+
payload (Optional[bytes]): The payload of the incoming message.
60+
"""
61+
topic: str
62+
payload: 'Optional[bytes]' = None
63+
64+
65+
"""
66+
Signature for a handler that listens to subscription status events.
67+
"""
68+
SubscriptionStatusListener = Callable[[SubscriptionStatusEvent], None]
69+
70+
"""
71+
Signature for a handler that listens to incoming publish events.
72+
"""
73+
IncomingPublishListener = Callable[[IncomingPublishEvent], None]
74+
75+
76+
@dataclass
77+
class StreamingOperationOptions:
78+
"""
79+
Configuration options for an MQTT-based streaming operation.
80+
81+
Args:
82+
subscription_topic_filter (str): Topic filter that the streaming operation should listen on
83+
subscription_status_listener (SubscriptionStatusListener): function object to invoke when the operation's subscription status changes
84+
incoming_publish_listener (IncomingPublishListener): function object to invoke when a publish packet arrives that matches the subscription topic filter
85+
"""
86+
subscription_topic_filter: str
87+
subscription_status_listener: 'Optional[SubscriptionStatusListener]' = None
88+
incoming_publish_listener: 'Optional[IncomingPublishListener]' = None
89+
90+
def validate(self):
91+
"""
92+
Stringently type-checks an instance's field values.
93+
"""
94+
assert isinstance(self.subscription_topic_filter, str)
95+
assert callable(self.subscription_status_listener) or self.subscription_status_listener is None
96+
assert callable(self.incoming_publish_listener) or self.incoming_publish_listener is None
1497

1598

1699
@dataclass
@@ -41,12 +124,15 @@ class ResponsePath:
41124
correlation_token_json_path: 'Optional[str]' = None
42125

43126
def validate(self):
127+
"""
128+
Stringently type-checks an instance's field values.
129+
"""
44130
assert isinstance(self.topic, str)
45131
assert isinstance(self.correlation_token_json_path, str) or self.correlation_token_json_path is None
46132

47133

48134
@dataclass
49-
class RequestResponseOperationOptions:
135+
class RequestOptions:
50136
"""
51137
Configuration options for an MQTT-based request-response operation.
52138
@@ -64,11 +150,14 @@ class RequestResponseOperationOptions:
64150
correlation_token: 'Optional[str]' = None
65151

66152
def validate(self):
67-
assert isinstance(self.subscription_topic_filters, collections.abc.Sequence)
153+
"""
154+
Stringently type-checks an instance's field values.
155+
"""
156+
assert isinstance(self.subscription_topic_filters, Sequence)
68157
for topic_filter in self.subscription_topic_filters:
69158
assert isinstance(topic_filter, str)
70159

71-
assert isinstance(self.response_paths, collections.abc.Sequence)
160+
assert isinstance(self.response_paths, Sequence)
72161
for response_path in self.response_paths:
73162
response_path.validate()
74163

@@ -78,18 +167,7 @@ def validate(self):
78167

79168

80169
@dataclass
81-
class StreamingOperationOptions:
82-
"""
83-
Configuration options for an MQTT-based streaming operation.
84-
85-
Args:
86-
subscription_topic_filter (str): Topic filter that the streaming operation should listen on
87-
"""
88-
subscription_topic_filter: str
89-
90-
91-
@dataclass
92-
class RequestResponseClientOptions:
170+
class ClientOptions:
93171
"""
94172
MQTT-based request-response client configuration options
95173
@@ -103,6 +181,9 @@ class RequestResponseClientOptions:
103181
operation_timeout_in_seconds: 'Optional[int]' = 60
104182

105183
def validate(self):
184+
"""
185+
Stringently type-checks an instance's field values.
186+
"""
106187
assert isinstance(self.max_request_response_subscriptions, int)
107188
assert isinstance(self.max_streaming_subscriptions, int)
108189
assert isinstance(self.operation_timeout_in_seconds, int)
@@ -123,10 +204,10 @@ class Client(NativeResource):
123204
"""
124205

125206
def __init__(self, protocol_client: Union[mqtt5.Client, mqtt.Connection],
126-
client_options: RequestResponseClientOptions):
207+
client_options: ClientOptions):
127208

128209
assert isinstance(protocol_client, mqtt5.Client) or isinstance(protocol_client, mqtt.Connection)
129-
assert isinstance(client_options, RequestResponseClientOptions)
210+
assert isinstance(client_options, ClientOptions)
130211
client_options.validate()
131212

132213
super().__init__()
@@ -136,7 +217,17 @@ def __init__(self, protocol_client: Union[mqtt5.Client, mqtt.Connection],
136217
else:
137218
self._binding = _awscrt.mqtt_request_response_client_new_from_311(protocol_client, client_options)
138219

139-
def make_request(self, options: RequestResponseOperationOptions):
220+
def make_request(self, options: RequestOptions):
221+
"""
222+
Initiate an MQTT-based request-response async workflow
223+
224+
Args:
225+
options (RequestOptions): Configuration options for the request to perform
226+
227+
Returns:
228+
concurrent.futures.Future: A Future whose result will contain the topic and payload of a response
229+
to the request. The future will contain an exception if the request fails.
230+
"""
140231
options.validate()
141232

142233
future = Future()
@@ -157,3 +248,52 @@ def on_request_complete(error_code, topic, payload):
157248
on_request_complete)
158249

159250
return future
251+
252+
def create_stream(self, options: StreamingOperationOptions):
253+
"""
254+
Creates a new streaming operation
255+
256+
Args:
257+
options (StreamingOperationOptions): Configuration options for the streaming operation
258+
259+
Returns:
260+
StreamingOperation: a new streaming operation. Opening the operation triggers the client to maintain
261+
an MQTT subscription for relevant events. Matching publishes and subscription status changes are
262+
communicated by invoking configuration-controlled callbacks.
263+
"""
264+
options.validate()
265+
266+
def on_subscription_status_event(event_type, error_code):
267+
if options.subscription_status_listener is not None:
268+
event = SubscriptionStatusEvent(event_type)
269+
if error_code != 0:
270+
event.error = exceptions.from_code(error_code)
271+
options.subscription_status_listener(event)
272+
273+
def on_incoming_publish_event(topic, payload):
274+
if options.incoming_publish_listener is not None:
275+
event = IncomingPublishEvent(topic, payload)
276+
options.incoming_publish_listener(event)
277+
278+
stream_binding = _awscrt.mqtt_request_response_client_create_stream(
279+
self._binding, options.subscription_topic_filter, on_subscription_status_event, on_incoming_publish_event)
280+
281+
return StreamingOperation(stream_binding)
282+
283+
284+
class StreamingOperation(NativeResource):
285+
"""
286+
An operation that represents a stream of events broadcast to an MQTT topic
287+
"""
288+
289+
def __init__(self, binding):
290+
super().__init__()
291+
292+
self._binding = binding
293+
294+
def open(self):
295+
"""
296+
Triggers the streaming operation to maintain an MQTT subscription for relevant events. Until a stream is
297+
opened, no events can be received.
298+
"""
299+
_awscrt.mqtt_streaming_operation_open(self._binding)

source/module.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,8 @@ static PyMethodDef s_module_methods[] = {
781781
AWS_PY_METHOD_DEF(mqtt_request_response_client_new_from_5, METH_VARARGS),
782782
AWS_PY_METHOD_DEF(mqtt_request_response_client_new_from_311, METH_VARARGS),
783783
AWS_PY_METHOD_DEF(mqtt_request_response_client_make_request, METH_VARARGS),
784+
AWS_PY_METHOD_DEF(mqtt_request_response_client_create_stream, METH_VARARGS),
785+
AWS_PY_METHOD_DEF(mqtt_streaming_operation_open, METH_VARARGS),
784786

785787
/* Cryptographic primitives */
786788
AWS_PY_METHOD_DEF(md5_new, METH_NOARGS),

0 commit comments

Comments
 (0)