Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions apps/flutter_client_contract_test_service/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -388,33 +388,36 @@ packages:
source: hosted
version: "6.8.0"
launchdarkly_common_client:
dependency: "direct overridden"
dependency: transitive
description:
path: "../../packages/common_client"
relative: true
source: path
name: launchdarkly_common_client
sha256: e691f25676dfb659975843d7ef5e178297939bf3f4a53cab75381be07f27feec
url: "https://pub.dev"
source: hosted
version: "1.6.1"
launchdarkly_dart_common:
dependency: "direct overridden"
dependency: transitive
description:
path: "../../packages/common"
relative: true
source: path
name: launchdarkly_dart_common
sha256: "323b0ae2bc756c7c83e95494983b72b190e012e090758a920a992358cbc025a2"
url: "https://pub.dev"
source: hosted
version: "1.6.0"
launchdarkly_event_source_client:
dependency: "direct overridden"
dependency: transitive
description:
path: "../../packages/event_source_client"
relative: true
source: path
name: launchdarkly_event_source_client
sha256: "3506de716320c80898e12b825063a69a9a7169042902cdd6eb164f46b3ec60e3"
url: "https://pub.dev"
source: hosted
version: "1.2.0"
launchdarkly_flutter_client_sdk:
dependency: "direct main"
description:
path: "../../packages/flutter_client_sdk"
relative: true
source: path
version: "4.11.0"
version: "4.11.1"
lints:
dependency: "direct dev"
description:
Expand Down Expand Up @@ -872,10 +875,10 @@ packages:
dependency: transitive
description:
name: vector_math
sha256: "80b3257d1492ce4d091729e3a67a60407d227c27241d6927be0130c98e741803"
sha256: d530bd74fea330e6e364cda7a85019c434070188383e1cd8d9777ee586914c5b
url: "https://pub.dev"
source: hosted
version: "2.1.4"
version: "2.2.0"
vm_service:
dependency: transitive
description:
Expand Down Expand Up @@ -957,5 +960,5 @@ packages:
source: hosted
version: "3.1.3"
sdks:
dart: ">=3.7.0-0 <4.0.0"
dart: ">=3.8.0-0 <4.0.0"
flutter: ">=3.22.0"
4 changes: 3 additions & 1 deletion melos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ environment:
sdk: '>=3.4.0 <4.0.0'

packages:
- packages/*
# Remove the event_source_client from the workspace temporarily to allow a breaking change.
- packages/common
- packages/common_client
- packages/flutter_client_sdk/example
- apps/*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
library launchdarkly_sse;

import 'dart:async';
import 'dart:collection';

import 'src/http_consts.dart';
import 'src/message_event.dart';
import 'src/events.dart';
import 'src/sse_client_stub.dart'
if (dart.library.io) 'src/sse_client_http.dart'
if (dart.library.js_interop) 'src/sse_client_html.dart';
import 'src/test_sse_client.dart';

export 'src/message_event.dart' show MessageEvent;
export 'src/events.dart' show Event, MessageEvent, OpenEvent;
export 'src/test_sse_client.dart' show TestSseClient;

/// HTTP methods supported by the event source client.
enum SseHttpMethod {
Expand All @@ -29,10 +32,11 @@ enum SseHttpMethod {

/// An [SSEClient] that works to maintain a SSE connection to a server.
///
/// You can receive [MessageEvent]s by listening to the [stream] object. The SSEClient will
/// connect when there is a nonzero number of subscribers on [stream] and will disconnect when
/// there are zero subscribers on [stream]. In certain cases, unrecoverable errors will be
/// reported on the [stream] at which point the stream will be done.
/// You can receive [Events]s by listening to the [stream] object. The SSEClient
/// will connect when there is a nonzero number of subscribers on the [stream]
/// and will disconnect when there are zero subscribers on the [stream].
/// In certain cases, unrecoverable errors will be reported on the [stream] at
/// which point the stream will be done.
///
/// The [SSEClient] will make best effort to maintain the streaming connection.
abstract class SSEClient {
Expand All @@ -44,9 +48,9 @@ abstract class SSEClient {
static const defaultConnectTimeout = Duration(seconds: 30);
static const defaultReadTimeout = Duration(minutes: 5);

/// Subscribe to this [stream] to receive events and sometimes errors. The first
/// Subscribe to this [stream] to receive events and sometimes errors.
/// subscribe triggers the connection, so expect network delay initially.
Stream<MessageEvent> get stream;
Stream<Event> get stream;

/// Closes the SSEClient and tears down connections and resources. Do not use the
/// SSEClient after close is called, behavior is undefined at that point.
Expand Down Expand Up @@ -90,4 +94,33 @@ abstract class SSEClient {
return getSSEClient(uri, eventTypes, mergedHeaders, connectTimeout,
readTimeout, body, httpMethod.toString());
}

/// Get an SSE client for use in unit tests.
///
/// Most parameters are the same as those of the main SSEClient factory, but
/// the test client supports an additional property which is the [sourceStream].
/// Events sent to the [sourceStream] will also be emitted by the event source
/// if the event source has listeners. When a user unsubscribes from the event
/// stream, then the test client will unsubscribe from the source stream.
///
/// This method is primarily for use the the LaunchDarkly SDK implementation.
/// Changes may be made to this API without following semantic conventions.
static TestSseClient testClient(
Uri uri,
Set<String> eventTypes, {
Map<String, String> headers = defaultHeaders,
Duration connectTimeout = defaultConnectTimeout,
Duration readTimeout = defaultReadTimeout,
String? body,
SseHttpMethod httpMethod = SseHttpMethod.get,
Stream<Event>? sourceStream,
}) {
return TestSseClient.internal(
headers: UnmodifiableMapView(headers),
connectTimeout: connectTimeout,
readTimeout: readTimeout,
body: body,
httpMethod: httpMethod,
sourceStream: sourceStream);
}
}
102 changes: 102 additions & 0 deletions packages/event_source_client/lib/src/events.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import 'dart:collection';

/// Base class for event source events.
final class Event {}

// Implementation note: Any new constructor parameters should be added as
// optional parameters unless added in a major version.

/// Represents a message that came across the SSE stream.
final class MessageEvent implements Event {
/// The type of the message.
final String type;

/// The data sent in the message.
final String data;

/// An optional message id that was provided.
final String? id;

/// Creates the message with the provided values.
const MessageEvent(this.type, this.data, this.id);

@override
String toString() {
return 'MessageEvent{type:$type,data:$data,id:$id}';
}

@override
bool operator ==(Object other) =>
identical(this, other) ||
other is MessageEvent &&
runtimeType == other.runtimeType &&
type == other.type &&
data == other.data &&
id == other.id;

@override
int get hashCode => type.hashCode ^ data.hashCode ^ id.hashCode;
}

/// Event emitted when the SSE client connects.
final class OpenEvent implements Event {
/// Any headers associated with the connection.
final UnmodifiableMapView<String, String>? headers;

/// Create a connected event with the specified headers.
const OpenEvent({this.headers});

@override
String toString() {
return 'OpenEvent{headers:$headers}';
}

bool _compareHeaders(UnmodifiableMapView<String, String>? otherHeaders) {
if (headers == null && otherHeaders == null) {
return true;
}
if (headers != null && otherHeaders == null) {
return false;
}
if (headers == null && otherHeaders != null) {
return false;
}
var self = headers!;
var other = otherHeaders!;
if (self.length != other.length) {
return false;
}
for (var pair in self.entries) {
if (!other.containsKey(pair.key)) {
return false;
}
if (pair.value != other[pair.key]) {
return false;
}
}
return true;
}

@override
bool operator ==(Object other) {
return identical(this, other) ||
other is OpenEvent && _compareHeaders(other.headers);
}

@override
int get hashCode => headers != null
? Object.hashAllUnordered(
headers!.entries.map((item) => Object.hash(item.key, item.value)))
: null.hashCode;
}

bool isMessageEvent(Event event) {
{
switch (event) {
case MessageEvent():
return true;
default:
return false;
}
}
}
31 changes: 0 additions & 31 deletions packages/event_source_client/lib/src/message_event.dart

This file was deleted.

16 changes: 10 additions & 6 deletions packages/event_source_client/lib/src/sse_client_html.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import 'dart:math' as math;
import '../launchdarkly_event_source_client.dart';

import 'backoff.dart';
import 'message_event.dart' as ld_message_event;
import 'events.dart' as ld_message_event;

/// An [SSEClient] that uses the [web.EventSource] available on most browsers for web platform support.
class HtmlSseClient implements SSEClient {
/// The underlying eventsource
web.EventSource? _eventSource;

/// This controller is for the events going to the subscribers of this client.
late final StreamController<ld_message_event.MessageEvent>
_messageEventsController;
late final StreamController<ld_message_event.Event> _messageEventsController;

Backoff _backoff = Backoff(math.Random());

Expand All @@ -31,7 +30,7 @@ class HtmlSseClient implements SSEClient {
: _uri = uri,
_eventTypes = eventTypes {
_messageEventsController =
StreamController<ld_message_event.MessageEvent>.broadcast(
StreamController<ld_message_event.Event>.broadcast(
onListen: () {
// this is triggered when first listener subscribes

Expand Down Expand Up @@ -60,6 +59,7 @@ class HtmlSseClient implements SSEClient {
_eventSource?.addEventListener(eventType, _handleMessageEvent.toJS);
}
_eventSource?.addEventListener('error', _handleError.toJS);
_eventSource?.addEventListener('open', _handleOpen.toJS);
}

void _handleError(web.Event event) {
Expand All @@ -69,6 +69,11 @@ class HtmlSseClient implements SSEClient {
restart();
}

void _handleOpen(web.Event event) {
// The browser event source doesn't have header support.
_messageEventsController.sink.add(OpenEvent());
}

void _handleMessageEvent(web.Event event) {
_activeSince = DateTime.now().millisecondsSinceEpoch;
final messageEvent = event as web.MessageEvent;
Expand All @@ -82,8 +87,7 @@ class HtmlSseClient implements SSEClient {
/// Subscribe to this [stream] to receive events and sometimes errors. The first
/// subscribe triggers the connection, so expect a network delay initially.
@override
Stream<ld_message_event.MessageEvent> get stream =>
_messageEventsController.stream;
Stream<Event> get stream => _messageEventsController.stream;

@override
Future close() => _messageEventsController.close();
Expand Down
7 changes: 4 additions & 3 deletions packages/event_source_client/lib/src/sse_client_http.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:math' as math;
import 'package:http/http.dart' as http;

import '../launchdarkly_event_source_client.dart';
import 'events.dart' show isMessageEvent;
import 'state_idle.dart';
import 'state_value_object.dart';

Expand All @@ -18,7 +19,7 @@ class HttpSseClient implements SSEClient {
static const defaultReadTimeout = Duration(minutes: 5);

/// This controller is for the events going to the subscribers of this client.
late final StreamController<MessageEvent> _messageEventsController;
late final StreamController<Event> _messageEventsController;

/// This controller is for controlling the internal state machine when subscribers
/// subscribe / unsubscribe.
Expand Down Expand Up @@ -53,7 +54,7 @@ class HttpSseClient implements SSEClient {
math.Random random,
String? body,
String httpMethod) {
_messageEventsController = StreamController<MessageEvent>.broadcast(
_messageEventsController = StreamController<Event>.broadcast(
// this is triggered when first listener subscribes
onListen: () => _connectionDesiredStateController.add(true),
// this is triggered when last listener unsubscribes
Expand All @@ -79,7 +80,7 @@ class HttpSseClient implements SSEClient {
/// Subscribe to this [stream] to receive events and sometimes errors. The first
/// subscribe triggers the connection, so expect a network delay initially.
@override
Stream<MessageEvent> get stream => _messageEventsController.stream;
Stream<Event> get stream => _messageEventsController.stream;

@override
Future close() async {
Expand Down
Loading