Skip to content

Commit 9f0b1e4

Browse files
authored
feat!: Add support for getting headers on event source connection. (#208)
1 parent 823772e commit 9f0b1e4

18 files changed

+494
-100
lines changed

apps/flutter_client_contract_test_service/pubspec.lock

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -388,26 +388,29 @@ packages:
388388
source: hosted
389389
version: "6.8.0"
390390
launchdarkly_common_client:
391-
dependency: "direct overridden"
391+
dependency: transitive
392392
description:
393-
path: "../../packages/common_client"
394-
relative: true
395-
source: path
393+
name: launchdarkly_common_client
394+
sha256: e691f25676dfb659975843d7ef5e178297939bf3f4a53cab75381be07f27feec
395+
url: "https://pub.dev"
396+
source: hosted
396397
version: "1.6.1"
397398
launchdarkly_dart_common:
398-
dependency: "direct overridden"
399+
dependency: transitive
399400
description:
400-
path: "../../packages/common"
401-
relative: true
402-
source: path
403-
version: "1.6.1"
401+
name: launchdarkly_dart_common
402+
sha256: "323b0ae2bc756c7c83e95494983b72b190e012e090758a920a992358cbc025a2"
403+
url: "https://pub.dev"
404+
source: hosted
405+
version: "1.6.0"
404406
launchdarkly_event_source_client:
405-
dependency: "direct overridden"
407+
dependency: transitive
406408
description:
407-
path: "../../packages/event_source_client"
408-
relative: true
409-
source: path
410-
version: "1.2.1"
409+
name: launchdarkly_event_source_client
410+
sha256: "3506de716320c80898e12b825063a69a9a7169042902cdd6eb164f46b3ec60e3"
411+
url: "https://pub.dev"
412+
source: hosted
413+
version: "1.2.0"
411414
launchdarkly_flutter_client_sdk:
412415
dependency: "direct main"
413416
description:
@@ -872,10 +875,10 @@ packages:
872875
dependency: transitive
873876
description:
874877
name: vector_math
875-
sha256: "80b3257d1492ce4d091729e3a67a60407d227c27241d6927be0130c98e741803"
878+
sha256: d530bd74fea330e6e364cda7a85019c434070188383e1cd8d9777ee586914c5b
876879
url: "https://pub.dev"
877880
source: hosted
878-
version: "2.1.4"
881+
version: "2.2.0"
879882
vm_service:
880883
dependency: transitive
881884
description:
@@ -957,5 +960,5 @@ packages:
957960
source: hosted
958961
version: "3.1.3"
959962
sdks:
960-
dart: ">=3.7.0-0 <4.0.0"
963+
dart: ">=3.8.0-0 <4.0.0"
961964
flutter: ">=3.22.0"

apps/sse_contract_test_service/bin/sse_contract_test_service.dart

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,18 @@ class TestApiImpl extends TestApi {
6565
httpMethod: method,
6666
headers: headers);
6767
final subscription = client.stream.listen((event) {
68-
callbackClient.callbackNumberPost(
69-
PostCallback(
70-
kind: 'event',
71-
event: PostCallbackEvent(
72-
type: event.type, data: event.data, id: event.id)),
73-
callbackNumber: callbackId);
74-
callbackId++;
68+
switch (event) {
69+
case MessageEvent():
70+
callbackClient.callbackNumberPost(
71+
PostCallback(
72+
kind: 'event',
73+
event: PostCallbackEvent(
74+
type: event.type, data: event.data, id: event.id)),
75+
callbackNumber: callbackId);
76+
callbackId++;
77+
default:
78+
break;
79+
}
7580
}, onError: (error) {
7681
callbackClient.callbackNumberPost(
7782
PostCallback(kind: 'error', comment: error.toString()),

melos.yaml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ environment:
44
sdk: '>=3.4.0 <4.0.0'
55

66
packages:
7-
- packages/*
7+
# Remove the event_source_client from the workspace temporarily to allow a breaking change.
8+
- packages/common
9+
- packages/common_client
810
- packages/flutter_client_sdk/example
911
- apps/*
1012

@@ -19,7 +21,9 @@ scripts:
1921
# Add more packages as more of them have tests.
2022
# Tests are ran with flutter as it supports coverage. Some packages may also include flutter
2123
# dependencies.
22-
run: MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage
24+
run: >
25+
MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage &&
26+
cd packages/event_source_client && dart test
2327
2428
merge-trace-files:
2529
description: Merge all packages coverage trace files ignoring data related to generated files.

packages/event_source_client/lib/launchdarkly_event_source_client.dart

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22
library launchdarkly_sse;
33

44
import 'dart:async';
5+
import 'dart:collection';
56

67
import 'src/http_consts.dart';
78
import 'src/logging.dart';
8-
import 'src/message_event.dart';
9+
import 'src/events.dart';
910
import 'src/sse_client_stub.dart'
1011
if (dart.library.io) 'src/sse_client_http.dart'
1112
if (dart.library.js_interop) 'src/sse_client_html.dart';
13+
import 'src/test_sse_client.dart';
1214

13-
export 'src/message_event.dart' show MessageEvent;
15+
export 'src/events.dart' show Event, MessageEvent, OpenEvent;
16+
export 'src/test_sse_client.dart' show TestSseClient;
1417
export 'src/logging.dart'
1518
show EventSourceLogger, LogLevel, NoOpLogger, PrintLogger;
1619

@@ -32,10 +35,11 @@ enum SseHttpMethod {
3235

3336
/// An [SSEClient] that works to maintain a SSE connection to a server.
3437
///
35-
/// You can receive [MessageEvent]s by listening to the [stream] object. The SSEClient will
36-
/// connect when there is a nonzero number of subscribers on [stream] and will disconnect when
37-
/// there are zero subscribers on [stream]. In certain cases, unrecoverable errors will be
38-
/// reported on the [stream] at which point the stream will be done.
38+
/// You can receive [Events]s by listening to the [stream] object. The SSEClient
39+
/// will connect when there is a nonzero number of subscribers on the [stream]
40+
/// and will disconnect when there are zero subscribers on the [stream].
41+
/// In certain cases, unrecoverable errors will be reported on the [stream] at
42+
/// which point the stream will be done.
3943
///
4044
/// The [SSEClient] will make best effort to maintain the streaming connection.
4145
abstract class SSEClient {
@@ -47,9 +51,9 @@ abstract class SSEClient {
4751
static const defaultConnectTimeout = Duration(seconds: 30);
4852
static const defaultReadTimeout = Duration(minutes: 5);
4953

50-
/// Subscribe to this [stream] to receive events and sometimes errors. The first
54+
/// Subscribe to this [stream] to receive events and sometimes errors.
5155
/// subscribe triggers the connection, so expect network delay initially.
52-
Stream<MessageEvent> get stream;
56+
Stream<Event> get stream;
5357

5458
/// Closes the SSEClient and tears down connections and resources. Do not use the
5559
/// SSEClient after close is called, behavior is undefined at that point.
@@ -97,4 +101,33 @@ abstract class SSEClient {
97101
return getSSEClient(uri, eventTypes, mergedHeaders, connectTimeout,
98102
readTimeout, body, httpMethod.toString(), logger);
99103
}
104+
105+
/// Get an SSE client for use in unit tests.
106+
///
107+
/// Most parameters are the same as those of the main SSEClient factory, but
108+
/// the test client supports an additional property which is the [sourceStream].
109+
/// Events sent to the [sourceStream] will also be emitted by the event source
110+
/// if the event source has listeners. When a user unsubscribes from the event
111+
/// stream, then the test client will unsubscribe from the source stream.
112+
///
113+
/// This method is primarily for use the the LaunchDarkly SDK implementation.
114+
/// Changes may be made to this API without following semantic conventions.
115+
static TestSseClient testClient(
116+
Uri uri,
117+
Set<String> eventTypes, {
118+
Map<String, String> headers = defaultHeaders,
119+
Duration connectTimeout = defaultConnectTimeout,
120+
Duration readTimeout = defaultReadTimeout,
121+
String? body,
122+
SseHttpMethod httpMethod = SseHttpMethod.get,
123+
Stream<Event>? sourceStream,
124+
}) {
125+
return TestSseClient.internal(
126+
headers: UnmodifiableMapView(headers),
127+
connectTimeout: connectTimeout,
128+
readTimeout: readTimeout,
129+
body: body,
130+
httpMethod: httpMethod,
131+
sourceStream: sourceStream);
132+
}
100133
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import 'dart:collection';
2+
3+
/// Base class for event source events.
4+
final class Event {}
5+
6+
// Implementation note: Any new constructor parameters should be added as
7+
// optional parameters unless added in a major version.
8+
9+
/// Represents a message that came across the SSE stream.
10+
final class MessageEvent implements Event {
11+
/// The type of the message.
12+
final String type;
13+
14+
/// The data sent in the message.
15+
final String data;
16+
17+
/// An optional message id that was provided.
18+
final String? id;
19+
20+
/// Creates the message with the provided values.
21+
const MessageEvent(this.type, this.data, this.id);
22+
23+
@override
24+
String toString() {
25+
return 'MessageEvent{type:$type,data:$data,id:$id}';
26+
}
27+
28+
@override
29+
bool operator ==(Object other) =>
30+
identical(this, other) ||
31+
other is MessageEvent &&
32+
runtimeType == other.runtimeType &&
33+
type == other.type &&
34+
data == other.data &&
35+
id == other.id;
36+
37+
@override
38+
int get hashCode => type.hashCode ^ data.hashCode ^ id.hashCode;
39+
}
40+
41+
/// Event emitted when the SSE client connects.
42+
final class OpenEvent implements Event {
43+
/// Any headers associated with the connection.
44+
final UnmodifiableMapView<String, String>? headers;
45+
46+
/// Create a connected event with the specified headers.
47+
const OpenEvent({this.headers});
48+
49+
@override
50+
String toString() {
51+
return 'OpenEvent{headers:$headers}';
52+
}
53+
54+
bool _compareHeaders(UnmodifiableMapView<String, String>? otherHeaders) {
55+
if (headers == null && otherHeaders == null) {
56+
return true;
57+
}
58+
if (headers != null && otherHeaders == null) {
59+
return false;
60+
}
61+
if (headers == null && otherHeaders != null) {
62+
return false;
63+
}
64+
var self = headers!;
65+
var other = otherHeaders!;
66+
if (self.length != other.length) {
67+
return false;
68+
}
69+
for (var pair in self.entries) {
70+
if (!other.containsKey(pair.key)) {
71+
return false;
72+
}
73+
if (pair.value != other[pair.key]) {
74+
return false;
75+
}
76+
}
77+
return true;
78+
}
79+
80+
@override
81+
bool operator ==(Object other) {
82+
return identical(this, other) ||
83+
other is OpenEvent && _compareHeaders(other.headers);
84+
}
85+
86+
@override
87+
int get hashCode => headers != null
88+
? Object.hashAllUnordered(
89+
headers!.entries.map((item) => Object.hash(item.key, item.value)))
90+
: null.hashCode;
91+
}
92+
93+
bool isMessageEvent(Event event) {
94+
{
95+
switch (event) {
96+
case MessageEvent():
97+
return true;
98+
default:
99+
return false;
100+
}
101+
}
102+
}

packages/event_source_client/lib/src/message_event.dart

Lines changed: 0 additions & 31 deletions
This file was deleted.

packages/event_source_client/lib/src/sse_client_html.dart

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,15 @@ import 'dart:math' as math;
66
import '../launchdarkly_event_source_client.dart';
77

88
import 'backoff.dart';
9-
import 'message_event.dart' as ld_message_event;
9+
import 'events.dart' as ld_message_event;
1010

1111
/// An [SSEClient] that uses the [web.EventSource] available on most browsers for web platform support.
1212
class HtmlSseClient implements SSEClient {
1313
/// The underlying eventsource
1414
web.EventSource? _eventSource;
1515

1616
/// This controller is for the events going to the subscribers of this client.
17-
late final StreamController<ld_message_event.MessageEvent>
18-
_messageEventsController;
17+
late final StreamController<ld_message_event.Event> _messageEventsController;
1918

2019
late final EventSourceLogger _logger;
2120

@@ -34,7 +33,7 @@ class HtmlSseClient implements SSEClient {
3433
_eventTypes = eventTypes {
3534
_logger = logger ?? NoOpLogger();
3635
_messageEventsController =
37-
StreamController<ld_message_event.MessageEvent>.broadcast(
36+
StreamController<ld_message_event.Event>.broadcast(
3837
onListen: () {
3938
// this is triggered when first listener subscribes
4039

@@ -63,6 +62,7 @@ class HtmlSseClient implements SSEClient {
6362
_eventSource?.addEventListener(eventType, _handleMessageEvent.toJS);
6463
}
6564
_eventSource?.addEventListener('error', _handleError.toJS);
65+
_eventSource?.addEventListener('open', _handleOpen.toJS);
6666
}
6767

6868
void _handleError(web.Event event) {
@@ -72,6 +72,11 @@ class HtmlSseClient implements SSEClient {
7272
restart();
7373
}
7474

75+
void _handleOpen(web.Event event) {
76+
// The browser event source doesn't have header support.
77+
_messageEventsController.sink.add(OpenEvent());
78+
}
79+
7580
void _handleMessageEvent(web.Event event) {
7681
_activeSince = DateTime.now().millisecondsSinceEpoch;
7782
final messageEvent = event as web.MessageEvent;
@@ -85,8 +90,7 @@ class HtmlSseClient implements SSEClient {
8590
/// Subscribe to this [stream] to receive events and sometimes errors. The first
8691
/// subscribe triggers the connection, so expect a network delay initially.
8792
@override
88-
Stream<ld_message_event.MessageEvent> get stream =>
89-
_messageEventsController.stream;
93+
Stream<ld_message_event.Event> get stream => _messageEventsController.stream;
9094

9195
@override
9296
Future close() async {

0 commit comments

Comments
 (0)