11import 'dart:async' ;
22import 'dart:convert' ;
3+ import 'dart:math' as math;
4+ import 'package:http/http.dart' as http;
35
46import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart' ;
57import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart' ;
8+ import 'package:launchdarkly_event_source_client/src/backoff.dart' ;
69
710import '../config/data_source_config.dart' ;
811import '../config/defaults/credential_type.dart' ;
@@ -11,8 +14,6 @@ import 'data_source.dart';
1114import 'data_source_status.dart' ;
1215import 'get_environment_id.dart' ;
1316
14- typedef MessageHandler = void Function (MessageEvent );
15- typedef ErrorHandler = void Function (dynamic );
1617typedef SseClientFactory = SSEClient Function (
1718 Uri uri,
1819 HttpProperties httpProperties,
@@ -22,51 +23,49 @@ typedef SseClientFactory = SSEClient Function(
2223
2324SSEClient _defaultClientFactory (Uri uri, HttpProperties httpProperties,
2425 String ? body, SseHttpMethod ? method, EventSourceLogger ? logger) {
25- return SSEClient (uri, {'put' , 'patch' , 'delete' },
26+ return SSEClient (uri, {'put' , 'patch' , 'delete' , 'ping' },
2627 headers: httpProperties.baseHeaders,
2728 body: body,
2829 httpMethod: method ?? SseHttpMethod .get ,
2930 logger: logger);
3031}
3132
3233final class StreamingDataSource implements DataSource {
33- final LDLogger _logger;
34+ static const int _httpOk = 200 ;
35+ static const int _httpNotModified = 304 ;
36+ static const String _ifNoneMatchHeader = 'if-none-match' ;
37+ static const String _etagHeader = 'etag' ;
38+ static const String _pingEventType = 'ping' ;
3439
40+ final LDLogger _logger;
3541 final ServiceEndpoints _endpoints;
36-
3742 final StreamingDataSourceConfig _dataSourceConfig;
38-
3943 final SseClientFactory _clientFactory;
44+ final HttpProperties _httpProperties;
45+ final String _credential;
46+ final Backoff _pollBackoff;
4047
4148 late final Uri _uri;
42-
43- late final HttpProperties _httpProperties;
44-
4549 late final String _contextString;
46- bool _stopped = false ;
47-
48- StreamSubscription <Event >? _subscription;
49-
50- final StreamController <DataSourceEvent > _dataController = StreamController ();
51-
5250 late final bool _useReport;
51+ late final HttpClient _pollingClient;
52+ late final Uri _pollingUri;
53+ late final RequestMethod _pollingMethod;
5354
55+ final StreamController <DataSourceEvent > _dataController = StreamController ();
5456 SSEClient ? _client;
55-
57+ StreamSubscription < Event > ? _subscription;
5658 String ? _environmentId;
59+ bool _stopped = false ;
60+ bool _permanentShutdown = false ;
5761
58- final String _credential;
62+ int _pollGeneration = 0 ;
63+ int ? _pollActiveSince;
64+ String ? _lastEtag;
5965
6066 @override
6167 Stream <DataSourceEvent > get events => _dataController.stream;
6268
63- /// Used to track if there has been an unrecoverable error.
64- bool _permanentShutdown = false ;
65-
66- /// The [clientFactory] parameter is primarily intended for testing, but it also
67- /// could be used for customized SSE clients which support functionality
68- /// our default client support does not, or for alternative implementations
69- /// which are not based on SSE.
7069 StreamingDataSource (
7170 {required String credential,
7271 required LDContext context,
@@ -80,7 +79,8 @@ final class StreamingDataSource implements DataSource {
8079 _dataSourceConfig = dataSourceConfig,
8180 _clientFactory = clientFactory,
8281 _httpProperties = httpProperties,
83- _credential = credential {
82+ _credential = credential,
83+ _pollBackoff = Backoff (math.Random ()) {
8484 final plainContextString =
8585 jsonEncode (LDContextSerialization .toJson (context, isEvent: false ));
8686
@@ -97,17 +97,9 @@ final class StreamingDataSource implements DataSource {
9797 ? plainContextString
9898 : base64UrlEncode (utf8.encode (plainContextString));
9999
100- final path = _useReport
101- ? _dataSourceConfig.streamingReportPath (credential, _contextString)
102- : _dataSourceConfig.streamingGetPath (credential, _contextString);
103-
104- String completeUrl = appendPath (_endpoints.streaming, path);
105-
106- if (_dataSourceConfig.withReasons) {
107- completeUrl = '$completeUrl ?withReasons=true' ;
108- }
109-
110- _uri = Uri .parse (completeUrl);
100+ _uri = _buildStreamingUri ();
101+ _setupPollingClient ();
102+ _pollingUri = _buildPollingUri ();
111103 }
112104
113105 @override
@@ -124,26 +116,20 @@ final class StreamingDataSource implements DataSource {
124116 _useReport ? SseHttpMethod .report : SseHttpMethod .get ,
125117 LDLoggerToEventSourceAdapter (_logger));
126118
127- _subscription = _client! .stream.listen ((event) async {
119+ _subscription = _client! .stream.listen ((event) {
128120 if (_stopped) {
129121 return ;
130122 }
131123
132124 switch (event) {
133125 case MessageEvent ():
134- _logger.debug ('Received message event, data: ${event .data }' );
135- _dataController.sink.add (
136- DataEvent (event.type, event.data, environmentId: _environmentId));
137- case OpenEvent ():
138- _logger.debug ('Received connect event, data: ${event .headers }' );
139- if (event.headers != null ) {
140- _environmentId = getEnvironmentId (event.headers);
141- } else if (DefaultConfig .credentialConfig.credentialType ==
142- CredentialType .clientSideId) {
143- // When using a client-side ID we can use it to represent the
144- // environment.
145- _environmentId = _credential;
126+ if (event.type == _pingEventType) {
127+ _handlePingEvent ();
128+ } else {
129+ _handleMessageEvent (event.type, event.data);
146130 }
131+ case OpenEvent ():
132+ _environmentId = _getEnvironmentIdFromHeaders (event.headers);
147133 }
148134 })
149135 ..onError ((err) {
@@ -166,13 +152,180 @@ final class StreamingDataSource implements DataSource {
166152
167153 @override
168154 void stop () {
169- // Cancel is async, but it should only be for the cleanup portion, according
170- // to the method documentation.
155+ if (_stopped) {
156+ return ;
157+ }
171158 _subscription? .cancel ();
172159 _subscription = null ;
173160 _stopped = true ;
161+ _pollGeneration++ ;
162+ _pollActiveSince = null ;
174163 _dataController.close ();
175164 }
165+
166+ void _handleMessageEvent (String type, String data) {
167+ _dataController.sink.add (
168+ DataEvent (type, data, environmentId: _environmentId));
169+ }
170+
171+ Future <void > _handlePingEvent () async {
172+ if (_stopped) {
173+ return ;
174+ }
175+
176+ final currentGeneration = ++ _pollGeneration;
177+ _updatePollActiveTime ();
178+ await _pollWithRetry (currentGeneration);
179+ }
180+
181+ Future <void > _pollWithRetry (int generation, {bool isRetry = false }) async {
182+ if (! _isValidGeneration (generation)) {
183+ return ;
184+ }
185+
186+ if (isRetry) {
187+ await _waitForBackoff ();
188+ if (! _isValidGeneration (generation)) {
189+ return ;
190+ }
191+ }
192+
193+ try {
194+ final res = await _makePollingRequest ();
195+ if (! _isValidGeneration (generation)) {
196+ return ;
197+ }
198+
199+ final shouldRetry = _handlePollingResponse (res, generation);
200+ if (shouldRetry) {
201+ await _pollWithRetry (generation, isRetry: true );
202+ }
203+ } catch (err) {
204+ if (! _isValidGeneration (generation)) {
205+ return ;
206+ }
207+ _logger.error ('encountered error with ping-triggered polling request: $err ' );
208+ await _pollWithRetry (generation, isRetry: true );
209+ }
210+ }
211+
212+ Future <http.Response > _makePollingRequest () {
213+ final body = _dataSourceConfig.useReport ? _contextString : null ;
214+ return _pollingClient.request (_pollingMethod, _pollingUri,
215+ additionalHeaders: _buildPollingHeaders (), body: body);
216+ }
217+
218+ bool _handlePollingResponse (http.Response res, int generation) {
219+ if (! _isValidGeneration (generation)) {
220+ return false ;
221+ }
222+
223+ final statusCode = res.statusCode;
224+ if (statusCode == _httpOk || statusCode == _httpNotModified) {
225+ if (statusCode == _httpOk) {
226+ _updateEtagFromResponse (res);
227+ final environmentId = _getEnvironmentIdFromHeaders (res.headers);
228+ _dataController.sink
229+ .add (DataEvent ('put' , res.body, environmentId: environmentId));
230+ }
231+ _updatePollActiveTime ();
232+ return false ;
233+ }
234+
235+ return _handlePollingError (statusCode);
236+ }
237+
238+ bool _handlePollingError (int statusCode) {
239+ if (isHttpGloballyRecoverable (statusCode)) {
240+ _logger.debug (
241+ 'received recoverable status code when polling in response to ping: $statusCode , will retry' );
242+ return true ;
243+ }
244+
245+ _logger.error (
246+ 'received unexpected status code when polling in response to ping: $statusCode ' );
247+ _dataController.sink.add (StatusEvent (
248+ ErrorKind .networkError,
249+ statusCode,
250+ 'Received unexpected status code: $statusCode ' ,
251+ shutdown: true ));
252+ _permanentShutdown = true ;
253+ stop ();
254+ return false ;
255+ }
256+
257+ bool _isValidGeneration (int generation) {
258+ return ! _stopped && generation == _pollGeneration;
259+ }
260+
261+ void _updatePollActiveTime () {
262+ _pollActiveSince = DateTime .now ().millisecondsSinceEpoch;
263+ }
264+
265+ Future <void > _waitForBackoff () async {
266+ final retryDelay = _pollBackoff.getRetryDelay (_pollActiveSince);
267+ await Future .delayed (Duration (milliseconds: retryDelay));
268+ }
269+
270+ String ? _getEnvironmentIdFromHeaders (Map <String , String >? headers) {
271+ var environmentId = getEnvironmentId (headers);
272+ if (environmentId == null &&
273+ DefaultConfig .credentialConfig.credentialType ==
274+ CredentialType .clientSideId) {
275+ environmentId = _credential;
276+ }
277+ return environmentId;
278+ }
279+
280+ Map <String , String >? _buildPollingHeaders () {
281+ if (_lastEtag == null ) {
282+ return null ;
283+ }
284+ return {_ifNoneMatchHeader: _lastEtag! };
285+ }
286+
287+ void _updateEtagFromResponse (http.Response res) {
288+ final etag = res.headers[_etagHeader];
289+ if (etag != null ) {
290+ _lastEtag = etag;
291+ }
292+ }
293+
294+ Uri _buildStreamingUri () {
295+ return _buildUri (_endpoints.streaming, _dataSourceConfig.streamingReportPath,
296+ _dataSourceConfig.streamingGetPath);
297+ }
298+
299+ Uri _buildPollingUri () {
300+ return _buildUri (_endpoints.polling,
301+ DefaultConfig .pollingPaths.pollingReportPath,
302+ DefaultConfig .pollingPaths.pollingGetPath);
303+ }
304+
305+ Uri _buildUri (String baseUrl, String Function (String , String ) reportPath,
306+ String Function (String , String ) getPath) {
307+ final path = _useReport
308+ ? reportPath (_credential, _contextString)
309+ : getPath (_credential, _contextString);
310+
311+ var url = appendPath (baseUrl, path);
312+ if (_dataSourceConfig.withReasons) {
313+ url = '$url ?withReasons=true' ;
314+ }
315+ return Uri .parse (url);
316+ }
317+
318+ void _setupPollingClient () {
319+ if (_dataSourceConfig.useReport) {
320+ final updatedProperties =
321+ _httpProperties.withHeaders ({'content-type' : 'application/json' });
322+ _pollingMethod = RequestMethod .report;
323+ _pollingClient = HttpClient (httpProperties: updatedProperties);
324+ } else {
325+ _pollingMethod = RequestMethod .get ;
326+ _pollingClient = HttpClient (httpProperties: _httpProperties);
327+ }
328+ }
176329}
177330
178331/// Adapter to convert LDLogger to EventSourceLogger
0 commit comments