Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/common/lib/src/network/http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ final class HttpClient {
return await http.Response.fromStream(streamedResponse)
.timeout(_httpProperties.readTimeout);
}

/// Close the underlying HTTP client and free its resources.
void close() {
_client.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import 'dart:async';
import 'package:http/http.dart' as http;
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
import '../config/defaults/credential_type.dart';
import '../config/defaults/default_config.dart';
import 'data_source.dart';
import 'data_source_status.dart';
import 'get_environment_id.dart';

class RequestResult {
final DataSourceEvent? event;
final bool shouldRetry;
final bool shutdown;

RequestResult({this.event, this.shouldRetry = false, this.shutdown = false});
}

class DataSourceRequestor {
final HttpClient _client;
final LDLogger _logger;
final String _credential;

int _currentChainId = 0;
String? _lastEtag;

DataSourceRequestor({
required HttpClient client,
required LDLogger logger,
required String credential,
}) : _client = client,
_logger = logger,
_credential = credential;

int startRequestChain() {
return ++_currentChainId;
}

bool isValidChain(int chainId) {
return chainId == _currentChainId;
}

Future<http.Response> makeRequest(
int chainId,
Uri uri,
RequestMethod method, {
String? body,
Map<String, String>? additionalHeaders,
}) async {
if (!isValidChain(chainId)) {
throw Exception('Request chain $chainId is no longer valid');
}

final headers = _buildHeaders(additionalHeaders);
_logger.debug(
'Making request for chain $chainId, method: $method, uri: $uri, etag: $_lastEtag');

return await _client.request(method, uri,
additionalHeaders: headers, body: body);
}

RequestResult? processResponse(
http.Response res,
int chainId, {
required bool Function(int) isRecoverableStatus,
}) {
if (!isValidChain(chainId)) {
_logger.debug('Discarding response from stale request chain $chainId');
return null;
}

final statusCode = res.statusCode;

if (statusCode == 200 || statusCode == 304) {
if (statusCode == 200) {
_updateEtagFromResponse(res);
final environmentId = _getEnvironmentIdFromHeaders(res.headers);
return RequestResult(
event: DataEvent('put', res.body, environmentId: environmentId),
);
}
return RequestResult();
}

if (isRecoverableStatus(statusCode)) {
_logger.debug(
'Received recoverable status code $statusCode for chain $chainId, will retry');
return RequestResult(shouldRetry: true);
}

_logger.error(
'Received unexpected status code $statusCode for chain $chainId, shutting down');
return RequestResult(
event: StatusEvent(
ErrorKind.networkError,
statusCode,
'Received unexpected status code: $statusCode',
shutdown: true,
),
shutdown: true,
);
}

Map<String, String>? _buildHeaders(Map<String, String>? additionalHeaders) {
if (_lastEtag == null && additionalHeaders == null) {
return null;
}

final headers = <String, String>{};
if (_lastEtag != null) {
headers['if-none-match'] = _lastEtag!;
}
if (additionalHeaders != null) {
headers.addAll(additionalHeaders);
}
return headers;
}

void _updateEtagFromResponse(http.Response res) {
final etag = res.headers['etag'];
if (etag != null) {
_lastEtag = etag;
}
}

String? _getEnvironmentIdFromHeaders(Map<String, String>? headers) {
var environmentId = getEnvironmentId(headers);
if (environmentId == null &&
DefaultConfig.credentialConfig.credentialType ==
CredentialType.clientSideId) {
environmentId = _credential;
}
return environmentId;
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
import 'dart:math';

import '../config/data_source_config.dart';
import '../config/defaults/credential_type.dart';
import '../config/defaults/default_config.dart';
import 'data_source.dart';
import 'data_source_requestor.dart';
import 'data_source_status.dart';
import 'get_environment_id.dart';

HttpClient _defaultClientFactory(HttpProperties httpProperties) {
return HttpClient(httpProperties: httpProperties);
Expand All @@ -28,6 +25,8 @@ final class PollingDataSource implements DataSource {

late final HttpClient _client;

late final DataSourceRequestor _requestor;

Timer? _pollTimer;

final Stopwatch _pollStopwatch = Stopwatch();
Expand All @@ -44,13 +43,9 @@ final class PollingDataSource implements DataSource {

final StreamController<DataSourceEvent> _eventController = StreamController();

late final String _credential;

@override
Stream<DataSourceEvent> get events => _eventController.stream;

String? _lastEtag;

/// Used to track if there has been an unrecoverable error.
bool _permanentShutdown = false;

Expand All @@ -73,8 +68,7 @@ final class PollingDataSource implements DataSource {
_defaultClientFactory})
: _endpoints = endpoints,
_logger = logger.subLogger('PollingDataSource'),
_dataSourceConfig = dataSourceConfig,
_credential = credential {
_dataSourceConfig = dataSourceConfig {
_pollingInterval = testingInterval ?? dataSourceConfig.pollingInterval;

if (_dataSourceConfig.useReport) {
Expand All @@ -87,6 +81,9 @@ final class PollingDataSource implements DataSource {
_client = clientFactory(httpProperties);
}

_requestor = DataSourceRequestor(
client: _client, logger: _logger, credential: credential);

final plainContextString =
jsonEncode(LDContextSerialization.toJson(context, isEvent: false));
if (dataSourceConfig.useReport) {
Expand All @@ -111,66 +108,42 @@ final class PollingDataSource implements DataSource {
}

Future<void> _makeRequest() async {
try {
_logger.debug(
'Making polling request, method: $_method, uri: $_uri, etag: $_lastEtag');
final res = await _client.request(_method, _uri,
additionalHeaders: _lastEtag != null ? {'etag': _lastEtag!} : null,
body: _dataSourceConfig.useReport ? _contextString : null);
await _handleResponse(res);
} catch (err) {
_logger.error('encountered error with polling request: $err, will retry');
_eventController.sink
.add(StatusEvent(ErrorKind.networkError, null, err.toString()));
}
}

Future<void> _handleResponse(http.Response res) async {
// The data source has been instructed to stop, so we discard the response.
if (_stopped) {
return;
}

if (res.statusCode == 200 || res.statusCode == 304) {
final etag = res.headers['etag'];
if (etag != null && etag == _lastEtag) {
// The response has not changed, so we don't need to do the work of
// updating the store, calculating changes, or persisting the payload.
final chainId = _requestor.startRequestChain();

try {
final res = await _requestor.makeRequest(
chainId,
_uri,
_method,
body: _dataSourceConfig.useReport ? _contextString : null,
);

final result = _requestor.processResponse(
res,
chainId,
isRecoverableStatus: isHttpGloballyRecoverable,
);

if (result == null) {
return;
}
_lastEtag = etag;

var environmentId = getEnvironmentId(res.headers);

if (environmentId == null &&
DefaultConfig.credentialConfig.credentialType ==
CredentialType.clientSideId) {
// When using a client-side ID we can use it to represent the
// environment.
environmentId = _credential;
if (result.event != null) {
_eventController.sink.add(result.event!);
}

_eventController.sink
.add(DataEvent('put', res.body, environmentId: environmentId));
} else {
if (isHttpGloballyRecoverable(res.statusCode)) {
_eventController.sink.add(StatusEvent(
ErrorKind.networkError,
res.statusCode,
'Received unexpected status code: ${res.statusCode}'));
_logger.error(
'received unexpected status code when polling: ${res.statusCode}, will retry');
} else {
_logger.error(
'received unexpected status code when polling: ${res.statusCode}, stopping polling');
_eventController.sink.add(StatusEvent(
ErrorKind.networkError,
res.statusCode,
'Received unexpected status code: ${res.statusCode}',
shutdown: true));
if (result.shutdown) {
_permanentShutdown = true;
stop();
}
} catch (err) {
_logger.error('encountered error with polling request: $err, will retry');
_eventController.sink
.add(StatusEvent(ErrorKind.networkError, null, err.toString()));
}
}

Expand Down
Loading
Loading