Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
27e0793
feat: Add support for getting headers on event source connection.
kinyoklion Aug 29, 2025
ace28b8
Remove event source from melow workspace.
kinyoklion Sep 2, 2025
e885ac4
Add test implementation.
kinyoklion Sep 2, 2025
7865444
wip: Adding new testing proccess.
kinyoklion Sep 2, 2025
ed5eb10
Fix existing tests.
kinyoklion Sep 2, 2025
cfa03b4
Revert change from common client.
kinyoklion Sep 2, 2025
91984b8
Make sure everything uses Event and not MessageEvent.
kinyoklion Sep 2, 2025
9261669
Update parser and connected state to handle Event type.
kinyoklion Sep 2, 2025
4c2dc50
Tests
kinyoklion Sep 2, 2025
0a7a73a
Doc
kinyoklion Sep 2, 2025
117f9df
feat: Add internal environment ID support.
kinyoklion Sep 3, 2025
3883d52
Address PR feedback.
kinyoklion Sep 3, 2025
187e49c
Merge branch 'main' into rlamb/o11y-add-environment-id-support
kinyoklion Sep 3, 2025
22b6107
Namespace
kinyoklion Sep 3, 2025
b493ec8
Fix SSE test.
kinyoklion Sep 3, 2025
693a504
Merge branch 'rlamb/o11y-add-environment-id-support' into rlamb/updat…
kinyoklion Sep 3, 2025
91341e8
Post merge updates.
kinyoklion Sep 3, 2025
8196367
Polling data source web use client-side-id.
kinyoklion Sep 3, 2025
0a45970
Merge branch 'main' into rlamb/update-for-2.x-event-source
kinyoklion Sep 3, 2025
4ccc498
Format
kinyoklion Sep 3, 2025
38c0db3
Merge branch 'main' into rlamb/update-for-2.x-event-source
kinyoklion Sep 3, 2025
635ed95
Set 2.0.0
kinyoklion Sep 3, 2025
8cca976
Analysis.
kinyoklion Sep 3, 2025
884d177
Branch update changes.
kinyoklion Sep 3, 2025
8ddf361
Revert client.
kinyoklion Sep 3, 2025
4c4d2f4
Main updates.
kinyoklion Sep 3, 2025
575305e
Merge branch 'main' into rlamb/update-for-2.x-event-source
kinyoklion Sep 4, 2025
b867cff
2.0.1 event source.
kinyoklion Sep 4, 2025
ad86a37
Merge branch 'rlamb/update-for-2.x-event-source' of github.com:launch…
kinyoklion Sep 4, 2025
be04094
Merge branch 'main' into rlamb/update-for-2.x-event-source
kinyoklion Sep 4, 2025
284f6e7
Safeguard against multiple header values.
kinyoklion Sep 4, 2025
10b6448
Fix github?
kinyoklion Sep 4, 2025
e21f366
Re-add assertions.
kinyoklion Sep 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions apps/flutter_client_contract_test_service/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -391,33 +391,33 @@ packages:
dependency: transitive
description:
name: launchdarkly_common_client
sha256: e691f25676dfb659975843d7ef5e178297939bf3f4a53cab75381be07f27feec
sha256: a48e52d8aebf8a12fb8816b4776a75d68dffeaaeb3b9a9b35189467218fecd35
url: "https://pub.dev"
source: hosted
version: "1.6.1"
version: "1.6.2"
launchdarkly_dart_common:
dependency: transitive
description:
name: launchdarkly_dart_common
sha256: "323b0ae2bc756c7c83e95494983b72b190e012e090758a920a992358cbc025a2"
sha256: a5275ba48635364690181d7172d679889fffca501a7dbec74992c4057357ac6f
url: "https://pub.dev"
source: hosted
version: "1.6.0"
version: "1.6.1"
launchdarkly_event_source_client:
dependency: transitive
description:
name: launchdarkly_event_source_client
sha256: "3506de716320c80898e12b825063a69a9a7169042902cdd6eb164f46b3ec60e3"
sha256: "7931ffed3d38272db1bf768e351bc49bda8a391b6811a6e4a7004149c2aee1d0"
url: "https://pub.dev"
source: hosted
version: "1.2.0"
version: "1.2.1"
launchdarkly_flutter_client_sdk:
dependency: "direct main"
description:
path: "../../packages/flutter_client_sdk"
relative: true
source: path
version: "4.11.1"
version: "4.11.2"
lints:
dependency: "direct dev"
description:
Expand Down Expand Up @@ -462,10 +462,10 @@ packages:
dependency: transitive
description:
name: meta
sha256: e3641ec5d63ebf0d9b41bd43201a66e3fc79a65db5f61fc181f04cd27aab950c
sha256: "23f08335362185a5ea2ad3a4e597f1375e78bce8a040df5c600c8d3552ef2394"
url: "https://pub.dev"
source: hosted
version: "1.16.0"
version: "1.17.0"
mime:
dependency: transitive
description:
Expand Down
2 changes: 1 addition & 1 deletion apps/sse_contract_test_service/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ packages:
path: "../../packages/event_source_client"
relative: true
source: path
version: "1.2.1"
version: "2.0.0"
lints:
dependency: "direct dev"
description:
Expand Down
9 changes: 2 additions & 7 deletions melos.yaml
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing temporary changes from the major versioning of the event source client.

Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ environment:
sdk: '>=3.4.0 <4.0.0'

packages:
# Remove the event_source_client from the workspace temporarily to allow a breaking change.
- packages/common
- packages/common_client
- packages/*
- packages/flutter_client_sdk/example
- apps/*

Expand All @@ -21,10 +19,7 @@ scripts:
# Add more packages as more of them have tests.
# Tests are ran with flutter as it supports coverage. Some packages may also include flutter
# dependencies.
run: >
MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage &&
cd packages/event_source_client && dart test

run: MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage
merge-trace-files:
description: Merge all packages coverage trace files ignoring data related to generated files.
run: >
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ sealed class DataSourceEvent {}
final class DataEvent implements DataSourceEvent {
final String type;
final String data;
final String? environmentId;

DataEvent(this.type, this.data);
DataEvent(this.type, this.data, {this.environmentId});
}

final class StatusEvent implements DataSourceEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ final class DataSourceEventHandler {
final LDLogger _logger;

Future<MessageStatus> handleMessage(
LDContext context, String type, String data) async {
LDContext context, String type, String data,
{String? environmentId}) async {
switch (type) {
case 'put':
{
try {
final parsed = jsonDecode(data);
return _processPut(context, parsed);
return _processPut(context, parsed, environmentId);
} catch (err) {
_logger.error('put message contained invalid json: $err');
_statusManager.setErrorByKind(
Expand Down Expand Up @@ -95,12 +96,13 @@ final class DataSourceEventHandler {
}
}

Future<MessageStatus> _processPut(LDContext context, dynamic parsed) async {
Future<MessageStatus> _processPut(
LDContext context, dynamic parsed, String? environmentId) async {
try {
final putData = LDEvaluationResultsSerialization.fromJson(parsed).map(
(key, value) => MapEntry(
key, ItemDescriptor(version: value.version, flag: value)));
await _flagManager.init(context, putData);
await _flagManager.init(context, putData, environmentId: environmentId);
_statusManager.setValid();
return MessageStatus.messageHandled;
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ final class DataSourceManager {
switch (event) {
case DataEvent():
var handled = await _dataSourceEventHandler.handleMessage(
_activeContext!, event.type, event.data);
_activeContext!, event.type, event.data,
environmentId: event.environmentId);
if (handled == MessageStatus.messageHandled &&
_identifyCompleter != null) {
if (_identifyCompleter!.isCompleted) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const _envIdHeader = 'x-ld-envid';

final _splitRegex = RegExp(r'\s*,\s*');

/// Get the environment ID from headers.
String? getEnvironmentId(Map<String, String>? headers) {
// Headers will always be in lower case from the http response.
// If multiple headers are associated with a single key, then they will be
// in a comma separated list with potential whitespace.
final headerValue = headers?[_envIdHeader];
if (headerValue == null) {
return null;
}
return headerValue.split(_splitRegex).first;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
import 'dart:math';

import '../config/data_source_config.dart';
import '../config/defaults/credential_type.dart';
import '../config/defaults/default_config.dart';
import 'data_source.dart';
import 'data_source_status.dart';
import 'get_environment_id.dart';

HttpClient _defaultClientFactory(HttpProperties httpProperties) {
return HttpClient(httpProperties: httpProperties);
Expand Down Expand Up @@ -41,6 +44,8 @@ final class PollingDataSource implements DataSource {

final StreamController<DataSourceEvent> _eventController = StreamController();

late final String _credential;

@override
Stream<DataSourceEvent> get events => _eventController.stream;

Expand Down Expand Up @@ -68,7 +73,8 @@ final class PollingDataSource implements DataSource {
_defaultClientFactory})
: _endpoints = endpoints,
_logger = logger.subLogger('PollingDataSource'),
_dataSourceConfig = dataSourceConfig {
_dataSourceConfig = dataSourceConfig,
_credential = credential {
_pollingInterval = testingInterval ?? dataSourceConfig.pollingInterval;

if (_dataSourceConfig.useReport) {
Expand Down Expand Up @@ -134,7 +140,18 @@ final class PollingDataSource implements DataSource {
}
_lastEtag = etag;

_eventController.sink.add(DataEvent('put', res.body));
var environmentId = getEnvironmentId(res.headers);

if (environmentId == null &&
DefaultConfig.credentialConfig.credentialType ==
CredentialType.clientSideId) {
// When using a client-side ID we can use it to represent the
// environment.
environmentId = _credential;
}

_eventController.sink
.add(DataEvent('put', res.body, environmentId: environmentId));
} else {
if (isHttpGloballyRecoverable(res.statusCode)) {
_eventController.sink.add(StatusEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart';

import '../config/data_source_config.dart';
import '../config/defaults/credential_type.dart';
import '../config/defaults/default_config.dart';
import 'data_source.dart';
import 'data_source_status.dart';
import 'get_environment_id.dart';

typedef MessageHandler = void Function(MessageEvent);
typedef ErrorHandler = void Function(dynamic);
Expand Down Expand Up @@ -43,14 +45,18 @@ final class StreamingDataSource implements DataSource {
late final String _contextString;
bool _stopped = false;

StreamSubscription<MessageEvent>? _subscription;
StreamSubscription<Event>? _subscription;

final StreamController<DataSourceEvent> _dataController = StreamController();

late final bool _useReport;

SSEClient? _client;

String? _environmentId;

final String _credential;

@override
Stream<DataSourceEvent> get events => _dataController.stream;

Expand All @@ -73,7 +79,8 @@ final class StreamingDataSource implements DataSource {
_logger = logger.subLogger('StreamingDataSource'),
_dataSourceConfig = dataSourceConfig,
_clientFactory = clientFactory,
_httpProperties = httpProperties {
_httpProperties = httpProperties,
_credential = credential {
final plainContextString =
jsonEncode(LDContextSerialization.toJson(context, isEvent: false));

Expand Down Expand Up @@ -122,8 +129,22 @@ final class StreamingDataSource implements DataSource {
return;
}

_logger.debug('Received event, data: ${event.data}');
_dataController.sink.add(DataEvent(event.type, event.data));
switch (event) {
case MessageEvent():
_logger.debug('Received message event, data: ${event.data}');
_dataController.sink.add(
DataEvent(event.type, event.data, environmentId: _environmentId));
case OpenEvent():
_logger.debug('Received connect event, data: ${event.headers}');
if (event.headers != null) {
_environmentId = getEnvironmentId(event.headers);
} else if (DefaultConfig.credentialConfig.credentialType ==
CredentialType.clientSideId) {
// When using a client-side ID we can use it to represent the
// environment.
_environmentId = _credential;
}
}
})
..onError((err) {
if (_permanentShutdown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ final class FlagManager {
/// Gets all the current flags.
Map<String, ItemDescriptor> getAll() => _flagStore.getAll();

/// Gets the environment ID for the current flag set.
String? get environmentId => _flagStore.environmentId;

/// Initializes the flag manager with data from a data source.
/// Persistence initialization is handled by [FlagPersistence].
Future<void> init(LDContext context, Map<String, ItemDescriptor> newFlags) =>
_flagPersistence.init(context, newFlags);
Future<void> init(LDContext context, Map<String, ItemDescriptor> newFlags,
{String? environmentId}) =>
_flagPersistence.init(context, newFlags, environmentId: environmentId);

/// Attempt to update a flag. If the flag is for the wrong context, or
/// it is of an older version, then an update will not be performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'flag_store.dart';

const String _globalNamespace = 'LaunchDarkly';
const String _indexKey = 'ContextIndex';
const String _envIdKey = 'EnvironmentId';

String _makeEnvironment(String sdkKey) {
return '${_globalNamespace}_${encodePersistenceKey(sdkKey)}';
Expand Down Expand Up @@ -45,9 +46,9 @@ final class FlagPersistence {
_logger = logger.subLogger('FlagPersistence'),
_stamper = stamper;

Future<void> init(
LDContext context, Map<String, ItemDescriptor> newFlags) async {
_updater.init(context, newFlags);
Future<void> init(LDContext context, Map<String, ItemDescriptor> newFlags,
{String? environmentId}) async {
_updater.init(context, newFlags, environmentId: environmentId);
return _storeCache(context);
}

Expand All @@ -70,14 +71,17 @@ final class FlagPersistence {
return false;
}

final environmentId = await _persistence?.read(_environmentKey, _envIdKey);

try {
final flagConfig =
LDEvaluationResultsSerialization.fromJson(jsonDecode(json));

_updater.initCached(
context,
flagConfig.map((key, value) => MapEntry(
key, ItemDescriptor(version: value.version, flag: value))));
key, ItemDescriptor(version: value.version, flag: value))),
environmentId: environmentId);
_logger.debug('Loaded a cached flag config from persistence.');
return true;
} catch (e) {
Expand Down Expand Up @@ -135,6 +139,12 @@ final class FlagPersistence {
// is always written.
if (maxCachedContexts > 0) {
await _persistence?.set(_environmentKey, contextPersistenceKey, jsonAll);
// There will be a singular environment ID for a given environment key
// (credential).
if (_store.environmentId != null) {
await _persistence?.set(
_environmentKey, _envIdKey, _store.environmentId!);
Copy link
Contributor

@tanderson-ld tanderson-ld Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this bang avoidable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is checked before, but the dart compiler still wanted it. I think because maybe it doesn't know the parameter evaluation happens before the async operation.

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import '../item_descriptor.dart';
/// it is just a key-value store.
final class FlagStore {
final Map<String, ItemDescriptor> _flags = {};
String? _environmentId;

void init(Map<String, ItemDescriptor> newFlags) {
void init(Map<String, ItemDescriptor> newFlags, {String? environmentId}) {
_flags.clear();
_flags.addAll(newFlags);
_environmentId = environmentId;
}

void insertOrUpdate(String key, ItemDescriptor update) {
Expand All @@ -18,6 +20,10 @@ final class FlagStore {
/// Attempts to get a flag by key from the current flags.
ItemDescriptor? get(String key) => _flags[key];

/// Get the environment ID for the flag set.
/// The ID may not always be available.
String? get environmentId => _environmentId;

/// Gets all the current flags.
Map<String, ItemDescriptor> getAll() {
// The map itself will not be modifiable, but the contents are references.
Expand Down
10 changes: 6 additions & 4 deletions packages/common_client/lib/src/flag_manager/flag_updater.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ final class FlagUpdater {
: _flagStore = flagStore,
_logger = logger.subLogger('FlagUpdater');

void init(LDContext context, Map<String, ItemDescriptor> newFlags) {
void init(LDContext context, Map<String, ItemDescriptor> newFlags,
{String? environmentId}) {
_activeContextKey = context.canonicalKey;
final oldFlags = _flagStore.getAll();
_flagStore.init(newFlags);
_flagStore.init(newFlags, environmentId: environmentId);
_handleChanges(oldFlags, newFlags);
}

void initCached(LDContext context, Map<String, ItemDescriptor> newFlags) {
void initCached(LDContext context, Map<String, ItemDescriptor> newFlags,
{String? environmentId}) {
// The store has been initialized from our data source for this context,
// so we can discard this update.
// This would happen because the network response was faster than loading
Expand All @@ -69,7 +71,7 @@ final class FlagUpdater {
return;
}

init(context, newFlags);
init(context, newFlags, environmentId: environmentId);
}

/// Create, update, or delete the item for the specific key. If the item
Expand Down
Loading