Skip to content

Commit c7375f4

Browse files
committed
Add topic to request-response stream client
1 parent b19fb66 commit c7375f4

File tree

4 files changed

+27
-4
lines changed

4 files changed

+27
-4
lines changed

lib/browser/mqtt_request_response.ts

+1
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
808808

809809
let streamingOperation = operation as StreamingOperation;
810810
streamingOperation.operation.triggerIncomingPublishEvent({
811+
topic: event.topic,
811812
payload: event.payload
812813
});
813814
}

lib/common/mqtt_request_response.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ export interface SubscriptionStatusEvent {
5757
*/
5858
export interface IncomingPublishEvent {
5959

60+
/**
61+
* MQTT Topic that the response was received on.
62+
*/
63+
topic: string
64+
6065
/**
6166
* The payload of the incoming message.
6267
*/
@@ -235,4 +240,3 @@ export interface IRequestResponseClient {
235240
*/
236241
submitRequest(requestOptions: RequestResponseOperationOptions): Promise<Response>;
237242
}
238-

source/mqtt_request_response.c

+19-2
Original file line numberDiff line numberDiff line change
@@ -1270,6 +1270,7 @@ struct on_incoming_publish_user_data {
12701270
struct aws_allocator *allocator;
12711271

12721272
struct aws_request_response_streaming_operation_binding *binding_ref;
1273+
struct aws_byte_buf topic;
12731274
struct aws_byte_buf *payload;
12741275
};
12751276

@@ -1280,6 +1281,8 @@ static void s_on_incoming_publish_user_data_destroy(struct on_incoming_publish_u
12801281

12811282
user_data->binding_ref = s_aws_request_response_streaming_operation_binding_release(user_data->binding_ref);
12821283

1284+
aws_byte_buf_clean_up(&user_data->topic);
1285+
12831286
if (user_data->payload != NULL) {
12841287
aws_byte_buf_clean_up(user_data->payload);
12851288
aws_mem_release(user_data->allocator, user_data->payload);
@@ -1290,12 +1293,17 @@ static void s_on_incoming_publish_user_data_destroy(struct on_incoming_publish_u
12901293

12911294
static struct on_incoming_publish_user_data *s_on_incoming_publish_user_data_new(
12921295
struct aws_request_response_streaming_operation_binding *binding,
1296+
struct aws_byte_cursor topic,
12931297
struct aws_byte_cursor payload) {
12941298

12951299
struct on_incoming_publish_user_data *user_data =
12961300
aws_mem_calloc(binding->allocator, 1, sizeof(struct on_incoming_publish_user_data));
12971301
user_data->allocator = binding->allocator;
12981302

1303+
if (aws_byte_buf_init_copy_from_cursor(&user_data->topic, binding->allocator, topic)) {
1304+
goto error;
1305+
}
1306+
12991307
user_data->payload = aws_mem_calloc(binding->allocator, 1, sizeof(struct aws_byte_buf));
13001308
if (aws_byte_buf_init_copy_from_cursor(user_data->payload, binding->allocator, payload)) {
13011309
goto error;
@@ -1325,6 +1333,11 @@ static int s_aws_create_napi_value_from_incoming_publish_event(
13251333
AWS_NAPI_CALL(
13261334
env, napi_create_object(env, &napi_event), { return aws_raise_error(AWS_CRT_NODEJS_ERROR_NAPI_FAILURE); });
13271335

1336+
struct aws_byte_cursor topic_cursor = aws_byte_cursor_from_buf(&publish_event->topic);
1337+
if (aws_napi_attach_object_property_string(napi_event, env, AWS_NAPI_KEY_TOPIC, topic_cursor)) {
1338+
return AWS_OP_ERR;
1339+
}
1340+
13281341
if (aws_napi_attach_object_property_binary_as_finalizable_external(
13291342
napi_event, env, AWS_NAPI_KEY_PAYLOAD, publish_event->payload)) {
13301343
return AWS_OP_ERR;
@@ -1386,10 +1399,14 @@ static void s_napi_mqtt_streaming_operation_on_incoming_publish(
13861399
s_on_incoming_publish_user_data_destroy(publish_event);
13871400
}
13881401

1389-
static void s_mqtt_streaming_operation_on_incoming_publish(struct aws_byte_cursor payload, void *user_data) {
1402+
static void s_mqtt_streaming_operation_on_incoming_publish(
1403+
struct aws_byte_cursor payload,
1404+
struct aws_byte_cursor topic,
1405+
void *user_data) {
13901406
struct aws_request_response_streaming_operation_binding *binding = user_data;
13911407

1392-
struct on_incoming_publish_user_data *incoming_publish_ud = s_on_incoming_publish_user_data_new(binding, payload);
1408+
struct on_incoming_publish_user_data *incoming_publish_ud =
1409+
s_on_incoming_publish_user_data_new(binding, topic, payload);
13931410
if (incoming_publish_ud == NULL) {
13941411
return;
13951412
}

test/mqtt_request_response.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ export async function do_streaming_operation_incoming_publish_test(version: Prot
467467

468468
let incoming_publish : mqtt_request_response.IncomingPublishEvent = (await publish_received_promise)[0];
469469

470+
expect(incoming_publish.topic).toEqual(topic_filter);
470471
expect(Buffer.from(incoming_publish.payload as ArrayBuffer)).toEqual(payload);
471472

472473
stream.close();
@@ -539,4 +540,4 @@ export async function do_invalid_streaming_operation_config_test(config: Streami
539540
}).toThrow(expected_error);
540541

541542
await context.close();
542-
}
543+
}

0 commit comments

Comments
 (0)