Skip to content

Commit e01492c

Browse files
miguelcmedeirosNeelansh-nsmatehat
authored
Add binary message decoding/encoding support for message and optional payload decoder. (braverhealth#105) (#12)
* add msgpack_dart decoding * refactor: replace useMessagePack with payloadDecoder in MessageSerializer and PhoenixSocketOptions * add tests for binry message encoding and decoding * fix ci --------- Co-authored-by: Neelansh Sethi <sethineelansh@gmail.com> Co-authored-by: matehat <mathieu@braver.health>
1 parent 05cce83 commit e01492c

8 files changed

Lines changed: 537 additions & 17 deletions

File tree

.github/workflows/test.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,19 @@ jobs:
1818
ports:
1919
- 4001:4001
2020
- 4002:4002
21-
21+
2222
proxy:
2323
image: ghcr.io/shopify/toxiproxy
2424
ports:
2525
- 8474:8474
2626
- 4004:4004
2727

2828
steps:
29-
3029
- name: Setup Dart
3130
uses: dart-lang/setup-dart@v1
3231
with:
3332
architecture: x64
34-
sdk: "3.1.2"
33+
sdk: "3.7.2"
3534

3635
- name: Fetch sources
3736
uses: actions/checkout@v2

lib/src/message.dart

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
import 'dart:typed_data';
12
import 'package:logging/logging.dart';
2-
33
import 'channel.dart';
44
import 'events.dart';
55
import 'socket.dart';
@@ -53,6 +53,23 @@ class Message {
5353
this.payload,
5454
});
5555

56+
/// Build a [Message] with binary payload.
57+
factory Message.binary({
58+
String? joinRef,
59+
String? ref,
60+
String? topic,
61+
required PhoenixChannelEvent event,
62+
required Uint8List payload,
63+
}) {
64+
return Message(
65+
joinRef: joinRef,
66+
ref: ref,
67+
topic: topic,
68+
event: event,
69+
payload: payload,
70+
);
71+
}
72+
5673
/// Reference of the channel on which the message is received.
5774
///
5875
/// Used by the [PhoenixSocket] to route the message on the proper
@@ -73,8 +90,8 @@ class Message {
7390

7491
/// The payload of this message.
7592
///
76-
/// This needs to be a JSON-encodable object.
77-
final Map<String, dynamic>? payload;
93+
/// This can be either a JSON-encodable Map or a Uint8List for binary data.
94+
final dynamic payload;
7895

7996
/// Encode a message to a JSON-encodable list of values.
8097
Object encode() {

lib/src/message_serializer.dart

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,86 @@
11
import 'dart:convert';
2+
import 'dart:typed_data';
23

3-
import 'message.dart';
4+
import 'package:phoenix_socket/phoenix_socket.dart';
5+
import 'package:phoenix_socket/src/utils/serializer.dart';
6+
import 'package:phoenix_socket/src/utils/map_utils.dart';
47

58
typedef DecoderCallback = dynamic Function(String rawData);
69
typedef EncoderCallback = String Function(Object? data);
10+
typedef PayloadDecoderCallback = dynamic Function(Uint8List payload);
711

812
/// Default class to serialize [Message] instances to JSON.
913
class MessageSerializer {
14+
static const int headerLength = 1;
15+
static const int metaLength = 4;
16+
17+
static const Map<String, int> kinds = {
18+
'push': 0,
19+
'reply': 1,
20+
'broadcast': 2,
21+
};
22+
1023
final DecoderCallback _decoder;
1124
final EncoderCallback _encoder;
25+
final PayloadDecoderCallback? _payloadDecoder;
1226

1327
/// Default constructor returning the singleton instance of this class.
1428
const MessageSerializer({
1529
DecoderCallback decoder = jsonDecode,
1630
EncoderCallback encoder = jsonEncode,
31+
PayloadDecoderCallback? payloadDecoder,
1732
}) : _decoder = decoder,
18-
_encoder = encoder;
33+
_encoder = encoder,
34+
_payloadDecoder = payloadDecoder;
35+
36+
/// Encode a [Message] into a raw string or a Uint8List.
37+
///
38+
/// If the message has a binary payload, it will be encoded using the
39+
/// [BinaryDecoder.binaryEncode] method. Otherwise, the message will be
40+
/// encoded using the [encoder] callback.
41+
/// Given a [Message], return the raw string that would be sent through
42+
/// a websocket.
43+
dynamic encode(Message message) {
44+
if (message.payload is Uint8List) {
45+
return BinaryDecoder.binaryEncode(message);
46+
}
47+
return _encoder(message.encode());
48+
}
1949

20-
/// Yield a [Message] from some raw string arriving from a websocket.
50+
/// Decode a [Message] from a raw string or a Uint8List.
51+
///
52+
/// If the message has a binary payload, it will be decoded using the
53+
/// [BinaryDecoder.binaryDecode] method. Otherwise, the message will be
54+
/// decoded using the [decoder] callback.
2155
Message decode(dynamic rawData) {
22-
if (rawData is String || rawData is List<int>) {
56+
if (rawData is String) {
2357
return Message.fromJson(_decoder(rawData));
58+
} else if (rawData is Uint8List) {
59+
final rawMap = BinaryDecoder.binaryDecode(rawData);
60+
return Message(
61+
joinRef: rawMap['join_ref'],
62+
ref: rawMap['ref'],
63+
topic: rawMap['topic'],
64+
event: PhoenixChannelEvent.custom(rawMap['event']),
65+
payload: _getPayload(rawMap['payload']),
66+
);
2467
} else {
2568
throw ArgumentError('Received a non-string or a non-list of integers');
2669
}
2770
}
2871

29-
/// Given a [Message], return the raw string that would be sent through
30-
/// a websocket.
31-
String encode(Message message) => _encoder(message.encode());
72+
dynamic _getPayload(dynamic payLoad) {
73+
if (_payloadDecoder != null && payLoad is Uint8List) {
74+
final deserializedPayload = _payloadDecoder!(payLoad);
75+
if (deserializedPayload is Map) {
76+
return MapUtils.deepConvertToStringDynamic(deserializedPayload);
77+
} else if (deserializedPayload is Uint8List) {
78+
return deserializedPayload;
79+
} else {
80+
return {'data': deserializedPayload};
81+
}
82+
} else {
83+
return payLoad;
84+
}
85+
}
3286
}

lib/src/socket.dart

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import 'dart:async';
22
import 'dart:core';
33
import 'dart:math';
4+
import 'dart:typed_data';
45

56
import 'package:logging/logging.dart';
67
import 'package:phoenix_socket/src/utils/iterable_extensions.dart';
@@ -86,7 +87,7 @@ class PhoenixSocket {
8687

8788
final BehaviorSubject<PhoenixSocketEvent> _stateStreamController =
8889
BehaviorSubject();
89-
final StreamController<String> _receiveStreamController =
90+
final StreamController<dynamic> _receiveStreamController =
9091
StreamController.broadcast();
9192
final String _endpoint;
9293
final StreamController<Message> _topicMessages = StreamController();
@@ -396,7 +397,7 @@ class PhoenixSocket {
396397
///
397398
/// Used to define a custom message type for proper data decoding
398399
onSocketDataCallback(message) {
399-
if (message is String) {
400+
if (message is String || message is Uint8List) {
400401
if (!_receiveStreamController.isClosed) {
401402
_receiveStreamController.add(message);
402403
}

lib/src/socket_options.dart

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ import 'message_serializer.dart';
55
/// Provided durations are all in milliseconds.
66
class PhoenixSocketOptions {
77
/// Create a PhoenixSocketOptions
8-
const PhoenixSocketOptions({
8+
PhoenixSocketOptions({
99
/// The duration after which a connection attempt
1010
/// is considered failed
1111
Duration? timeout,
1212

1313
/// The interval between heartbeat roundtrips
1414
Duration? heartbeat,
1515

16+
/// Function to decode binary payloads
17+
PayloadDecoderCallback? payloadDecoder,
18+
1619
/// The list of delays between reconnection attempts.
1720
///
1821
/// The last duration will be repeated until it works.
@@ -38,7 +41,8 @@ class PhoenixSocketOptions {
3841
this.dynamicParams,
3942
MessageSerializer? serializer,
4043
}) : _timeout = timeout ?? const Duration(seconds: 10),
41-
serializer = serializer ?? const MessageSerializer(),
44+
serializer =
45+
serializer ?? MessageSerializer(payloadDecoder: payloadDecoder),
4246
_heartbeat = heartbeat ?? const Duration(seconds: 30),
4347
assert(!(params != null && dynamicParams != null),
4448
"Can't set both params and dynamicParams");

lib/src/utils/map_utils.dart

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/// Utility functions for handling map operations.
2+
class MapUtils {
3+
/// Recursively converts all keys in a map to strings and handles nested maps and lists.
4+
///
5+
/// [input] is the map to be converted.
6+
/// Returns a new map with all keys converted to strings and nested structures handled.
7+
static Map<String, dynamic> deepConvertToStringDynamic(
8+
Map<dynamic, dynamic> input) {
9+
return input.map((key, value) {
10+
if (value is Map) {
11+
return MapEntry(key.toString(), deepConvertToStringDynamic(value));
12+
} else if (value is List) {
13+
return MapEntry(
14+
key.toString(),
15+
value.map((element) {
16+
if (element is Map) {
17+
return deepConvertToStringDynamic(element);
18+
} else {
19+
return element;
20+
}
21+
}).toList());
22+
} else {
23+
return MapEntry(key.toString(), value);
24+
}
25+
});
26+
}
27+
}

0 commit comments

Comments
 (0)