diff --git a/packages/common/lib/src/network/http_client.dart b/packages/common/lib/src/network/http_client.dart index 60f54da..c265a12 100644 --- a/packages/common/lib/src/network/http_client.dart +++ b/packages/common/lib/src/network/http_client.dart @@ -82,4 +82,9 @@ final class HttpClient { return await http.Response.fromStream(streamedResponse) .timeout(_httpProperties.readTimeout); } + + /// Close the underlying HTTP client and free its resources. + void close() { + _client.close(); + } } diff --git a/packages/common_client/lib/src/data_sources/data_source_requestor.dart b/packages/common_client/lib/src/data_sources/data_source_requestor.dart new file mode 100644 index 0000000..77c04b9 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/data_source_requestor.dart @@ -0,0 +1,134 @@ +import 'dart:async'; +import 'package:http/http.dart' as http; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import '../config/defaults/credential_type.dart'; +import '../config/defaults/default_config.dart'; +import 'data_source.dart'; +import 'data_source_status.dart'; +import 'get_environment_id.dart'; + +class RequestResult { + final DataSourceEvent? event; + final bool shouldRetry; + final bool shutdown; + + RequestResult({this.event, this.shouldRetry = false, this.shutdown = false}); +} + +class DataSourceRequestor { + final HttpClient _client; + final LDLogger _logger; + final String _credential; + + int _currentChainId = 0; + String? _lastEtag; + + DataSourceRequestor({ + required HttpClient client, + required LDLogger logger, + required String credential, + }) : _client = client, + _logger = logger, + _credential = credential; + + int startRequestChain() { + return ++_currentChainId; + } + + bool isValidChain(int chainId) { + return chainId == _currentChainId; + } + + Future makeRequest( + int chainId, + Uri uri, + RequestMethod method, { + String? body, + Map? additionalHeaders, + }) async { + if (!isValidChain(chainId)) { + throw Exception('Request chain $chainId is no longer valid'); + } + + final headers = _buildHeaders(additionalHeaders); + _logger.debug( + 'Making request for chain $chainId, method: $method, uri: $uri, etag: $_lastEtag'); + + return await _client.request(method, uri, + additionalHeaders: headers, body: body); + } + + RequestResult? processResponse( + http.Response res, + int chainId, { + required bool Function(int) isRecoverableStatus, + }) { + if (!isValidChain(chainId)) { + _logger.debug('Discarding response from stale request chain $chainId'); + return null; + } + + final statusCode = res.statusCode; + + if (statusCode == 200 || statusCode == 304) { + if (statusCode == 200) { + _updateEtagFromResponse(res); + final environmentId = _getEnvironmentIdFromHeaders(res.headers); + return RequestResult( + event: DataEvent('put', res.body, environmentId: environmentId), + ); + } + return RequestResult(); + } + + if (isRecoverableStatus(statusCode)) { + _logger.debug( + 'Received recoverable status code $statusCode for chain $chainId, will retry'); + return RequestResult(shouldRetry: true); + } + + _logger.error( + 'Received unexpected status code $statusCode for chain $chainId, shutting down'); + return RequestResult( + event: StatusEvent( + ErrorKind.networkError, + statusCode, + 'Received unexpected status code: $statusCode', + shutdown: true, + ), + shutdown: true, + ); + } + + Map? _buildHeaders(Map? additionalHeaders) { + if (_lastEtag == null && additionalHeaders == null) { + return null; + } + + final headers = {}; + if (_lastEtag != null) { + headers['if-none-match'] = _lastEtag!; + } + if (additionalHeaders != null) { + headers.addAll(additionalHeaders); + } + return headers; + } + + void _updateEtagFromResponse(http.Response res) { + final etag = res.headers['etag']; + if (etag != null) { + _lastEtag = etag; + } + } + + String? _getEnvironmentIdFromHeaders(Map? headers) { + var environmentId = getEnvironmentId(headers); + if (environmentId == null && + DefaultConfig.credentialConfig.credentialType == + CredentialType.clientSideId) { + environmentId = _credential; + } + return environmentId; + } +} diff --git a/packages/common_client/lib/src/data_sources/polling_data_source.dart b/packages/common_client/lib/src/data_sources/polling_data_source.dart index a36a8f4..7b113c0 100644 --- a/packages/common_client/lib/src/data_sources/polling_data_source.dart +++ b/packages/common_client/lib/src/data_sources/polling_data_source.dart @@ -1,15 +1,12 @@ import 'dart:async'; import 'dart:convert'; -import 'package:http/http.dart' as http; import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'dart:math'; import '../config/data_source_config.dart'; -import '../config/defaults/credential_type.dart'; -import '../config/defaults/default_config.dart'; import 'data_source.dart'; +import 'data_source_requestor.dart'; import 'data_source_status.dart'; -import 'get_environment_id.dart'; HttpClient _defaultClientFactory(HttpProperties httpProperties) { return HttpClient(httpProperties: httpProperties); @@ -28,6 +25,8 @@ final class PollingDataSource implements DataSource { late final HttpClient _client; + late final DataSourceRequestor _requestor; + Timer? _pollTimer; final Stopwatch _pollStopwatch = Stopwatch(); @@ -44,13 +43,9 @@ final class PollingDataSource implements DataSource { final StreamController _eventController = StreamController(); - late final String _credential; - @override Stream get events => _eventController.stream; - String? _lastEtag; - /// Used to track if there has been an unrecoverable error. bool _permanentShutdown = false; @@ -73,8 +68,7 @@ final class PollingDataSource implements DataSource { _defaultClientFactory}) : _endpoints = endpoints, _logger = logger.subLogger('PollingDataSource'), - _dataSourceConfig = dataSourceConfig, - _credential = credential { + _dataSourceConfig = dataSourceConfig { _pollingInterval = testingInterval ?? dataSourceConfig.pollingInterval; if (_dataSourceConfig.useReport) { @@ -87,6 +81,9 @@ final class PollingDataSource implements DataSource { _client = clientFactory(httpProperties); } + _requestor = DataSourceRequestor( + client: _client, logger: _logger, credential: credential); + final plainContextString = jsonEncode(LDContextSerialization.toJson(context, isEvent: false)); if (dataSourceConfig.useReport) { @@ -111,66 +108,42 @@ final class PollingDataSource implements DataSource { } Future _makeRequest() async { - try { - _logger.debug( - 'Making polling request, method: $_method, uri: $_uri, etag: $_lastEtag'); - final res = await _client.request(_method, _uri, - additionalHeaders: _lastEtag != null ? {'etag': _lastEtag!} : null, - body: _dataSourceConfig.useReport ? _contextString : null); - await _handleResponse(res); - } catch (err) { - _logger.error('encountered error with polling request: $err, will retry'); - _eventController.sink - .add(StatusEvent(ErrorKind.networkError, null, err.toString())); - } - } - - Future _handleResponse(http.Response res) async { - // The data source has been instructed to stop, so we discard the response. if (_stopped) { return; } - if (res.statusCode == 200 || res.statusCode == 304) { - final etag = res.headers['etag']; - if (etag != null && etag == _lastEtag) { - // The response has not changed, so we don't need to do the work of - // updating the store, calculating changes, or persisting the payload. + final chainId = _requestor.startRequestChain(); + + try { + final res = await _requestor.makeRequest( + chainId, + _uri, + _method, + body: _dataSourceConfig.useReport ? _contextString : null, + ); + + final result = _requestor.processResponse( + res, + chainId, + isRecoverableStatus: isHttpGloballyRecoverable, + ); + + if (result == null) { return; } - _lastEtag = etag; - var environmentId = getEnvironmentId(res.headers); - - if (environmentId == null && - DefaultConfig.credentialConfig.credentialType == - CredentialType.clientSideId) { - // When using a client-side ID we can use it to represent the - // environment. - environmentId = _credential; + if (result.event != null) { + _eventController.sink.add(result.event!); } - _eventController.sink - .add(DataEvent('put', res.body, environmentId: environmentId)); - } else { - if (isHttpGloballyRecoverable(res.statusCode)) { - _eventController.sink.add(StatusEvent( - ErrorKind.networkError, - res.statusCode, - 'Received unexpected status code: ${res.statusCode}')); - _logger.error( - 'received unexpected status code when polling: ${res.statusCode}, will retry'); - } else { - _logger.error( - 'received unexpected status code when polling: ${res.statusCode}, stopping polling'); - _eventController.sink.add(StatusEvent( - ErrorKind.networkError, - res.statusCode, - 'Received unexpected status code: ${res.statusCode}', - shutdown: true)); + if (result.shutdown) { _permanentShutdown = true; stop(); } + } catch (err) { + _logger.error('encountered error with polling request: $err, will retry'); + _eventController.sink + .add(StatusEvent(ErrorKind.networkError, null, err.toString())); } } diff --git a/packages/common_client/lib/src/data_sources/streaming_data_source.dart b/packages/common_client/lib/src/data_sources/streaming_data_source.dart index dc8bb0c..d87e940 100644 --- a/packages/common_client/lib/src/data_sources/streaming_data_source.dart +++ b/packages/common_client/lib/src/data_sources/streaming_data_source.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:math' as math; import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; @@ -8,11 +9,10 @@ import '../config/data_source_config.dart'; import '../config/defaults/credential_type.dart'; import '../config/defaults/default_config.dart'; import 'data_source.dart'; +import 'data_source_requestor.dart'; import 'data_source_status.dart'; import 'get_environment_id.dart'; -typedef MessageHandler = void Function(MessageEvent); -typedef ErrorHandler = void Function(dynamic); typedef SseClientFactory = SSEClient Function( Uri uri, HttpProperties httpProperties, @@ -22,7 +22,7 @@ typedef SseClientFactory = SSEClient Function( SSEClient _defaultClientFactory(Uri uri, HttpProperties httpProperties, String? body, SseHttpMethod? method, EventSourceLogger? logger) { - return SSEClient(uri, {'put', 'patch', 'delete'}, + return SSEClient(uri, {'put', 'patch', 'delete', 'ping'}, headers: httpProperties.baseHeaders, body: body, httpMethod: method ?? SseHttpMethod.get, @@ -30,43 +30,36 @@ SSEClient _defaultClientFactory(Uri uri, HttpProperties httpProperties, } final class StreamingDataSource implements DataSource { - final LDLogger _logger; + static const String _pingEventType = 'ping'; + final LDLogger _logger; final ServiceEndpoints _endpoints; - final StreamingDataSourceConfig _dataSourceConfig; - final SseClientFactory _clientFactory; + final HttpProperties _httpProperties; + final String _credential; + final Backoff _pollBackoff; late final Uri _uri; - - late final HttpProperties _httpProperties; - late final String _contextString; - bool _stopped = false; - - StreamSubscription? _subscription; - - final StreamController _dataController = StreamController(); - late final bool _useReport; + late final HttpClient _pollingClient; + late final DataSourceRequestor _requestor; + late final Uri _pollingUri; + late final RequestMethod _pollingMethod; + final StreamController _dataController = StreamController(); SSEClient? _client; - + StreamSubscription? _subscription; String? _environmentId; + bool _stopped = false; + bool _permanentShutdown = false; - final String _credential; + int? _pollActiveSince; @override Stream get events => _dataController.stream; - /// Used to track if there has been an unrecoverable error. - bool _permanentShutdown = false; - - /// The [clientFactory] parameter is primarily intended for testing, but it also - /// could be used for customized SSE clients which support functionality - /// our default client support does not, or for alternative implementations - /// which are not based on SSE. StreamingDataSource( {required String credential, required LDContext context, @@ -80,7 +73,8 @@ final class StreamingDataSource implements DataSource { _dataSourceConfig = dataSourceConfig, _clientFactory = clientFactory, _httpProperties = httpProperties, - _credential = credential { + _credential = credential, + _pollBackoff = Backoff(math.Random()) { final plainContextString = jsonEncode(LDContextSerialization.toJson(context, isEvent: false)); @@ -97,17 +91,11 @@ final class StreamingDataSource implements DataSource { ? plainContextString : base64UrlEncode(utf8.encode(plainContextString)); - final path = _useReport - ? _dataSourceConfig.streamingReportPath(credential, _contextString) - : _dataSourceConfig.streamingGetPath(credential, _contextString); - - String completeUrl = appendPath(_endpoints.streaming, path); - - if (_dataSourceConfig.withReasons) { - completeUrl = '$completeUrl?withReasons=true'; - } - - _uri = Uri.parse(completeUrl); + _uri = _buildStreamingUri(); + _setupPollingClient(); + _requestor = DataSourceRequestor( + client: _pollingClient, logger: _logger, credential: credential); + _pollingUri = _buildPollingUri(); } @override @@ -124,26 +112,20 @@ final class StreamingDataSource implements DataSource { _useReport ? SseHttpMethod.report : SseHttpMethod.get, LDLoggerToEventSourceAdapter(_logger)); - _subscription = _client!.stream.listen((event) async { + _subscription = _client!.stream.listen((event) { if (_stopped) { return; } switch (event) { case MessageEvent(): - _logger.debug('Received message event, data: ${event.data}'); - _dataController.sink.add( - DataEvent(event.type, event.data, environmentId: _environmentId)); - case OpenEvent(): - _logger.debug('Received connect event, data: ${event.headers}'); - if (event.headers != null) { - _environmentId = getEnvironmentId(event.headers); - } else if (DefaultConfig.credentialConfig.credentialType == - CredentialType.clientSideId) { - // When using a client-side ID we can use it to represent the - // environment. - _environmentId = _credential; + if (event.type == _pingEventType) { + _handlePingEvent(); + } else { + _handleMessageEvent(event.type, event.data); } + case OpenEvent(): + _environmentId = _getEnvironmentIdFromHeaders(event.headers); } }) ..onError((err) { @@ -166,13 +148,141 @@ final class StreamingDataSource implements DataSource { @override void stop() { - // Cancel is async, but it should only be for the cleanup portion, according - // to the method documentation. + if (_stopped) { + return; + } _subscription?.cancel(); _subscription = null; _stopped = true; + _pollActiveSince = null; + _pollingClient.close(); _dataController.close(); } + + void _handleMessageEvent(String type, String data) { + _dataController.sink.add( + DataEvent(type, data, environmentId: _environmentId)); + } + + Future _handlePingEvent() async { + if (_stopped) { + return; + } + + final chainId = _requestor.startRequestChain(); + _updatePollActiveTime(); + await _pollWithRetry(chainId); + } + + Future _pollWithRetry(int chainId, {bool isRetry = false}) async { + if (!_requestor.isValidChain(chainId)) { + return; + } + + if (isRetry) { + await _waitForBackoff(); + if (!_requestor.isValidChain(chainId)) { + return; + } + } + + try { + final res = await _requestor.makeRequest( + chainId, + _pollingUri, + _pollingMethod, + body: _dataSourceConfig.useReport ? _contextString : null, + ); + + final result = _requestor.processResponse( + res, + chainId, + isRecoverableStatus: isHttpGloballyRecoverable, + ); + + if (result == null) { + return; + } + + if (result.event != null) { + _dataController.sink.add(result.event!); + if (result.event is DataEvent) { + _updatePollActiveTime(); + } + } + + if (result.shouldRetry) { + await _pollWithRetry(chainId, isRetry: true); + } else if (result.shutdown) { + _permanentShutdown = true; + stop(); + } + } catch (err) { + if (!_requestor.isValidChain(chainId)) { + return; + } + _logger + .error('encountered error with ping-triggered polling request: $err'); + await _pollWithRetry(chainId, isRetry: true); + } + } + + void _updatePollActiveTime() { + _pollActiveSince = DateTime.now().millisecondsSinceEpoch; + } + + Future _waitForBackoff() async { + final retryDelay = _pollBackoff.getRetryDelay(_pollActiveSince); + await Future.delayed(Duration(milliseconds: retryDelay)); + } + + String? _getEnvironmentIdFromHeaders(Map? headers) { + var environmentId = getEnvironmentId(headers); + if (environmentId == null && + DefaultConfig.credentialConfig.credentialType == + CredentialType.clientSideId) { + environmentId = _credential; + } + return environmentId; + } + + Uri _buildStreamingUri() { + return _buildUri( + _endpoints.streaming, + _dataSourceConfig.streamingReportPath, + _dataSourceConfig.streamingGetPath); + } + + Uri _buildPollingUri() { + return _buildUri(_endpoints.polling, + DefaultConfig.pollingPaths.pollingReportPath, + DefaultConfig.pollingPaths.pollingGetPath); + } + + Uri _buildUri(String baseUrl, String Function(String, String) reportPath, + String Function(String, String) getPath) { + final path = _useReport + ? reportPath(_credential, _contextString) + : getPath(_credential, _contextString); + + var url = appendPath(baseUrl, path); + if (_dataSourceConfig.withReasons) { + url = '$url?withReasons=true'; + } + return Uri.parse(url); + } + + void _setupPollingClient() { + if (_dataSourceConfig.useReport) { + final updatedProperties = + _httpProperties.withHeaders({'content-type': 'application/json'}); + _pollingMethod = RequestMethod.report; + _pollingClient = HttpClient(httpProperties: updatedProperties); + } else { + _pollingMethod = RequestMethod.get; + _pollingClient = HttpClient(httpProperties: _httpProperties); + } + } } /// Adapter to convert LDLogger to EventSourceLogger diff --git a/packages/event_source_client/lib/launchdarkly_event_source_client.dart b/packages/event_source_client/lib/launchdarkly_event_source_client.dart index ab92bcd..548c526 100644 --- a/packages/event_source_client/lib/launchdarkly_event_source_client.dart +++ b/packages/event_source_client/lib/launchdarkly_event_source_client.dart @@ -16,6 +16,7 @@ export 'src/events.dart' show Event, MessageEvent, OpenEvent; export 'src/test_sse_client.dart' show TestSseClient; export 'src/logging.dart' show EventSourceLogger, LogLevel, NoOpLogger, PrintLogger; +export 'src/backoff.dart' show Backoff; /// HTTP methods supported by the event source client. enum SseHttpMethod { diff --git a/packages/event_source_client/lib/src/sse_client_html.dart b/packages/event_source_client/lib/src/sse_client_html.dart index 55c91b9..8e34147 100644 --- a/packages/event_source_client/lib/src/sse_client_html.dart +++ b/packages/event_source_client/lib/src/sse_client_html.dart @@ -5,7 +5,6 @@ import 'dart:math' as math; import '../launchdarkly_event_source_client.dart'; -import 'backoff.dart'; import 'events.dart' as ld_message_event; /// An [SSEClient] that uses the [web.EventSource] available on most browsers for web platform support.