Skip to content

Commit 05d3c3a

Browse files
refactor: Extract request logic into DataSourceRequestor
- Create shared DataSourceRequestor class for HTTP request handling - Replace pollGeneration counter with request chain tracking - Refactor PollingDataSource to use requestor - Refactor StreamingDataSource to use requestor for ping-triggered polling - Consolidate duplicate request/response handling code This provides better request lifecycle management by tracking request chains instead of generation counters, automatically discarding responses from stale request chains. Co-Authored-By: [email protected] <[email protected]>
1 parent 5ce77b1 commit 05d3c3a

File tree

3 files changed

+200
-142
lines changed

3 files changed

+200
-142
lines changed
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import 'dart:async';
2+
import 'package:http/http.dart' as http;
3+
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';
4+
import '../config/defaults/credential_type.dart';
5+
import '../config/defaults/default_config.dart';
6+
import 'data_source_status.dart';
7+
import 'get_environment_id.dart';
8+
9+
class RequestResult {
10+
final DataSourceEvent? event;
11+
final bool shouldRetry;
12+
final bool shutdown;
13+
14+
RequestResult({this.event, this.shouldRetry = false, this.shutdown = false});
15+
}
16+
17+
class DataSourceRequestor {
18+
final HttpClient _client;
19+
final LDLogger _logger;
20+
final String _credential;
21+
22+
int _currentChainId = 0;
23+
String? _lastEtag;
24+
25+
DataSourceRequestor({
26+
required HttpClient client,
27+
required LDLogger logger,
28+
required String credential,
29+
}) : _client = client,
30+
_logger = logger,
31+
_credential = credential;
32+
33+
int startRequestChain() {
34+
return ++_currentChainId;
35+
}
36+
37+
bool isValidChain(int chainId) {
38+
return chainId == _currentChainId;
39+
}
40+
41+
Future<http.Response> makeRequest(
42+
int chainId,
43+
Uri uri,
44+
RequestMethod method, {
45+
String? body,
46+
Map<String, String>? additionalHeaders,
47+
}) async {
48+
if (!isValidChain(chainId)) {
49+
throw Exception('Request chain $chainId is no longer valid');
50+
}
51+
52+
final headers = _buildHeaders(additionalHeaders);
53+
_logger.debug(
54+
'Making request for chain $chainId, method: $method, uri: $uri, etag: $_lastEtag');
55+
56+
return await _client.request(method, uri,
57+
additionalHeaders: headers, body: body);
58+
}
59+
60+
RequestResult? processResponse(
61+
http.Response res,
62+
int chainId, {
63+
required bool Function(int) isRecoverableStatus,
64+
}) {
65+
if (!isValidChain(chainId)) {
66+
_logger.debug('Discarding response from stale request chain $chainId');
67+
return null;
68+
}
69+
70+
final statusCode = res.statusCode;
71+
72+
if (statusCode == 200 || statusCode == 304) {
73+
if (statusCode == 200) {
74+
_updateEtagFromResponse(res);
75+
final environmentId = _getEnvironmentIdFromHeaders(res.headers);
76+
return RequestResult(
77+
event: DataEvent('put', res.body, environmentId: environmentId),
78+
);
79+
}
80+
return RequestResult();
81+
}
82+
83+
if (isRecoverableStatus(statusCode)) {
84+
_logger.debug(
85+
'Received recoverable status code $statusCode for chain $chainId, will retry');
86+
return RequestResult(shouldRetry: true);
87+
}
88+
89+
_logger.error(
90+
'Received unexpected status code $statusCode for chain $chainId, shutting down');
91+
return RequestResult(
92+
event: StatusEvent(
93+
ErrorKind.networkError,
94+
statusCode,
95+
'Received unexpected status code: $statusCode',
96+
shutdown: true,
97+
),
98+
shutdown: true,
99+
);
100+
}
101+
102+
Map<String, String>? _buildHeaders(Map<String, String>? additionalHeaders) {
103+
if (_lastEtag == null && additionalHeaders == null) {
104+
return null;
105+
}
106+
107+
final headers = <String, String>{};
108+
if (_lastEtag != null) {
109+
headers['if-none-match'] = _lastEtag!;
110+
}
111+
if (additionalHeaders != null) {
112+
headers.addAll(additionalHeaders);
113+
}
114+
return headers;
115+
}
116+
117+
void _updateEtagFromResponse(http.Response res) {
118+
final etag = res.headers['etag'];
119+
if (etag != null) {
120+
_lastEtag = etag;
121+
}
122+
}
123+
124+
String? _getEnvironmentIdFromHeaders(Map<String, String>? headers) {
125+
var environmentId = getEnvironmentId(headers);
126+
if (environmentId == null &&
127+
DefaultConfig.credentialConfig.credentialType ==
128+
CredentialType.clientSideId) {
129+
environmentId = _credential;
130+
}
131+
return environmentId;
132+
}
133+
}

packages/common_client/lib/src/data_sources/polling_data_source.dart

Lines changed: 30 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import '../config/defaults/credential_type.dart';
99
import '../config/defaults/default_config.dart';
1010
import 'data_source.dart';
1111
import 'data_source_status.dart';
12+
import 'data_source_requestor.dart';
1213
import 'get_environment_id.dart';
1314

1415
HttpClient _defaultClientFactory(HttpProperties httpProperties) {
@@ -28,6 +29,8 @@ final class PollingDataSource implements DataSource {
2829

2930
late final HttpClient _client;
3031

32+
late final DataSourceRequestor _requestor;
33+
3134
Timer? _pollTimer;
3235

3336
final Stopwatch _pollStopwatch = Stopwatch();
@@ -49,8 +52,6 @@ final class PollingDataSource implements DataSource {
4952
@override
5053
Stream<DataSourceEvent> get events => _eventController.stream;
5154

52-
String? _lastEtag;
53-
5455
/// Used to track if there has been an unrecoverable error.
5556
bool _permanentShutdown = false;
5657

@@ -87,6 +88,9 @@ final class PollingDataSource implements DataSource {
8788
_client = clientFactory(httpProperties);
8889
}
8990

91+
_requestor = DataSourceRequestor(
92+
client: _client, logger: _logger, credential: credential);
93+
9094
final plainContextString =
9195
jsonEncode(LDContextSerialization.toJson(context, isEvent: false));
9296
if (dataSourceConfig.useReport) {
@@ -111,66 +115,42 @@ final class PollingDataSource implements DataSource {
111115
}
112116

113117
Future<void> _makeRequest() async {
114-
try {
115-
_logger.debug(
116-
'Making polling request, method: $_method, uri: $_uri, etag: $_lastEtag');
117-
final res = await _client.request(_method, _uri,
118-
additionalHeaders: _lastEtag != null ? {'etag': _lastEtag!} : null,
119-
body: _dataSourceConfig.useReport ? _contextString : null);
120-
await _handleResponse(res);
121-
} catch (err) {
122-
_logger.error('encountered error with polling request: $err, will retry');
123-
_eventController.sink
124-
.add(StatusEvent(ErrorKind.networkError, null, err.toString()));
125-
}
126-
}
127-
128-
Future<void> _handleResponse(http.Response res) async {
129-
// The data source has been instructed to stop, so we discard the response.
130118
if (_stopped) {
131119
return;
132120
}
133121

134-
if (res.statusCode == 200 || res.statusCode == 304) {
135-
final etag = res.headers['etag'];
136-
if (etag != null && etag == _lastEtag) {
137-
// The response has not changed, so we don't need to do the work of
138-
// updating the store, calculating changes, or persisting the payload.
122+
final chainId = _requestor.startRequestChain();
123+
124+
try {
125+
final res = await _requestor.makeRequest(
126+
chainId,
127+
_uri,
128+
_method,
129+
body: _dataSourceConfig.useReport ? _contextString : null,
130+
);
131+
132+
final result = _requestor.processResponse(
133+
res,
134+
chainId,
135+
isRecoverableStatus: isHttpGloballyRecoverable,
136+
);
137+
138+
if (result == null) {
139139
return;
140140
}
141-
_lastEtag = etag;
142141

143-
var environmentId = getEnvironmentId(res.headers);
144-
145-
if (environmentId == null &&
146-
DefaultConfig.credentialConfig.credentialType ==
147-
CredentialType.clientSideId) {
148-
// When using a client-side ID we can use it to represent the
149-
// environment.
150-
environmentId = _credential;
142+
if (result.event != null) {
143+
_eventController.sink.add(result.event!);
151144
}
152145

153-
_eventController.sink
154-
.add(DataEvent('put', res.body, environmentId: environmentId));
155-
} else {
156-
if (isHttpGloballyRecoverable(res.statusCode)) {
157-
_eventController.sink.add(StatusEvent(
158-
ErrorKind.networkError,
159-
res.statusCode,
160-
'Received unexpected status code: ${res.statusCode}'));
161-
_logger.error(
162-
'received unexpected status code when polling: ${res.statusCode}, will retry');
163-
} else {
164-
_logger.error(
165-
'received unexpected status code when polling: ${res.statusCode}, stopping polling');
166-
_eventController.sink.add(StatusEvent(
167-
ErrorKind.networkError,
168-
res.statusCode,
169-
'Received unexpected status code: ${res.statusCode}',
170-
shutdown: true));
146+
if (result.shutdown) {
171147
_permanentShutdown = true;
172148
stop();
173149
}
150+
} catch (err) {
151+
_logger.error('encountered error with polling request: $err, will retry');
152+
_eventController.sink
153+
.add(StatusEvent(ErrorKind.networkError, null, err.toString()));
174154
}
175155
}
176156

0 commit comments

Comments
 (0)