Skip to content

Commit 45f1ae5

Browse files
committed
refactor(kinesis): extract shared record cache into amplify_record_cache_dart
Create amplify_record_cache_dart package with shared caching infrastructure: - RecordCacheException hierarchy (const constructors) - Record/RecordInput models (partitionKey optional, dataSize caller-computed) - RecordStorage base + SqliteRecordStorage, InMemoryRecordStorage, IndexedDbRecordStorage - RecordCacheDatabase (Drift, parameterized dbPrefix) - Sender interface + SendResult (replaces KDS-specific PutRecordsResult) - RecordClient, AutoFlushScheduler, FlushStrategy, FlushData, RecordData, ClearCacheData - Platform resolution (VM/web/stub conditional exports) Update amplify_kinesis_dart to depend on shared package: - KinesisSender implements Sender interface (sendBatch replaces putRecords) - Partition key validation moved from RecordStorage to AmplifyKinesisClient - createKinesisRecordInputNow computes dataSize with partition key - All test imports updated, zero behavioral change
1 parent deab741 commit 45f1ae5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+690
-804
lines changed

packages/kinesis/amplify_kinesis_dart/lib/amplify_kinesis_dart.dart

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,22 @@
44
/// Amplify Kinesis Data Streams client for Dart.
55
library;
66

7+
// Re-export shared types used in the public API
8+
export 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart'
9+
show
10+
FlushStrategy,
11+
FlushInterval,
12+
FlushNone,
13+
FlushData,
14+
RecordData,
15+
ClearCacheData;
16+
717
// Main client
818
export 'src/amplify_kinesis_client.dart';
919
// Options
1020
export 'src/amplify_kinesis_client_options.dart';
1121
// Exceptions
1222
export 'src/exception/amplify_kinesis_exception.dart';
13-
// Flush strategies
14-
export 'src/flush_strategy/flush_strategy.dart';
15-
// Return types
16-
export 'src/model/clear_cache_data.dart';
17-
export 'src/model/flush_data.dart';
18-
export 'src/model/record_data.dart';
1923
// SDK client (for escape hatch)
2024
export 'src/sdk/kinesis.dart'
2125
show

packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client.dart

Lines changed: 22 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,12 @@ import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'
1212
import 'package:amplify_foundation_dart_bridge/amplify_foundation_dart_bridge.dart';
1313
import 'package:amplify_kinesis_dart/src/amplify_kinesis_client_options.dart';
1414
import 'package:amplify_kinesis_dart/src/exception/amplify_kinesis_exception.dart';
15-
import 'package:amplify_kinesis_dart/src/flush_strategy/flush_strategy.dart';
16-
import 'package:amplify_kinesis_dart/src/impl/auto_flush_scheduler.dart';
1715
import 'package:amplify_kinesis_dart/src/impl/kinesis_record.dart';
1816
import 'package:amplify_kinesis_dart/src/impl/kinesis_sender.dart';
19-
import 'package:amplify_kinesis_dart/src/impl/record_client.dart';
20-
import 'package:amplify_kinesis_dart/src/impl/storage/platform/record_storage_platform.dart';
21-
import 'package:amplify_kinesis_dart/src/model/clear_cache_data.dart';
22-
import 'package:amplify_kinesis_dart/src/model/flush_data.dart';
23-
import 'package:amplify_kinesis_dart/src/model/record_data.dart';
17+
import 'package:amplify_kinesis_dart/src/kinesis_limits.dart' as limits;
2418
import 'package:amplify_kinesis_dart/src/sdk/kinesis.dart';
2519
import 'package:amplify_kinesis_dart/src/version.dart';
20+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
2621
import 'package:smithy/smithy.dart' show WithUserAgent;
2722

2823
/// User agent component identifying this library.
@@ -96,12 +91,6 @@ class AmplifyKinesisClient {
9691
_logger = AmplifyLogging.logger('AmplifyKinesisClient');
9792

9893
/// {@macro amplify_kinesis.amplify_kinesis_client}
99-
///
100-
/// [storagePath] is the directory path for the database file on IO
101-
/// platforms. On web, pass `null` (the path is unused; IndexedDB storage
102-
/// is used instead, with an in-memory fallback).
103-
/// The [region] is used as the database identifier to namespace
104-
/// the database file (e.g. `kinesis_records_us-east-1`).
10594
static Future<AmplifyKinesisClient> create({
10695
required String region,
10796
required AWSCredentialsProvider credentialsProvider,
@@ -114,6 +103,11 @@ class AmplifyKinesisClient {
114103
identifier: region,
115104
storagePath: storagePath,
116105
maxCacheBytes: opts.cacheMaxBytes,
106+
maxRecordsPerBatch: limits.maxRecordsPerStream,
107+
maxBytesPerBatch: limits.maxPutRecordsSizeBytes,
108+
maxRecordSizeBytes: limits.maxRecordSizeBytes,
109+
dbPrefix: 'kinesis_records',
110+
storeName: 'kinesis_records',
117111
);
118112

119113
final kinesisClient = KinesisClient(
@@ -170,11 +164,6 @@ class AmplifyKinesisClient {
170164
bool get isClosed => _closed;
171165

172166
/// Direct access to the underlying Kinesis SDK client.
173-
///
174-
/// Use this for advanced operations not covered by this client's API.
175-
///
176-
/// Note: This getter is only available when the client was created with
177-
/// [create] (not [AmplifyKinesisClient.withRecordClient]).
178167
KinesisClient get kinesisClient {
179168
final client = _kinesisClient;
180169
if (client == null) {
@@ -188,16 +177,10 @@ class AmplifyKinesisClient {
188177

189178
/// Records data to be sent to a Kinesis Data Stream.
190179
///
191-
/// The record is persisted to local storage and will be sent during
192-
/// the next flush operation (automatic or manual).
193-
///
194180
/// Returns [Result.ok] with [RecordData] on success, or [Result.error] with:
195-
/// - [KinesisValidationException] for invalid input (e.g. oversized record,
196-
/// empty or too-long partition key)
181+
/// - [KinesisValidationException] for invalid input
197182
/// - [KinesisLimitExceededException] if the cache is full
198183
/// - [KinesisStorageException] for database errors
199-
///
200-
/// Returns [Result.ok] silently if the client is disabled.
201184
Future<Result<RecordData>> record({
202185
required Uint8List data,
203186
required String partitionKey,
@@ -208,8 +191,21 @@ class AmplifyKinesisClient {
208191
_logger.debug('Record collection is disabled, dropping record');
209192
return const Result.ok(RecordData());
210193
}
194+
// KDS-specific partition key validation
195+
final codePoints = partitionKey.runes.length;
196+
if (codePoints == 0 || codePoints > limits.maxPartitionKeyLength) {
197+
return Result.error(
198+
KinesisValidationException(
199+
'Partition key length ($codePoints) is outside the allowed '
200+
'range of 1-${limits.maxPartitionKeyLength} characters.',
201+
recoverySuggestion:
202+
'Use a partition key between 1 and '
203+
'${limits.maxPartitionKeyLength} characters.',
204+
),
205+
);
206+
}
211207
_logger.verbose('Recording to stream: $streamName');
212-
final kinesisRecord = RecordInput.now(
208+
final kinesisRecord = createKinesisRecordInputNow(
213209
data: data,
214210
partitionKey: partitionKey,
215211
streamName: streamName,
@@ -218,38 +214,13 @@ class AmplifyKinesisClient {
218214
}
219215

220216
/// Flushes cached records to their respective Kinesis streams.
221-
///
222-
/// Each invocation sends at most one batch per stream, limited by the Kinesis
223-
/// `PutRecords` constraints (up to 500 records or 5 MB per stream). If the
224-
/// cache contains more records than a single batch can hold, the remaining
225-
/// records are sent on subsequent flush invocations — either manually or via
226-
/// the auto-flush scheduler.
227-
///
228-
/// Records that fail within a batch are marked for retry on the next flush.
229-
/// Records that exceed [AmplifyKinesisClientOptions.maxRetries] are removed
230-
/// from the cache.
231-
///
232-
/// SDK Kinesis errors (throttling, invalid stream, etc.) are logged and
233-
/// skipped so other streams can still flush. Non-SDK errors (e.g. network,
234-
/// storage) abort the flush and are returned as [Result.error].
235-
///
236-
/// If a flush is already in progress, the call returns immediately with
237-
/// `FlushData(recordsFlushed: 0, flushInProgress: true)`.
238-
///
239-
/// Manual flushes are allowed even when the client is disabled, so that
240-
/// callers can drain cached records without re-enabling collection.
241-
/// Only the automatic flush scheduler is paused when disabled.
242217
Future<Result<FlushData>> flush() async {
243218
if (_closed) return const Result.error(ClientClosedException());
244219
_logger.verbose('Starting flush');
245220
return _wrapError(_recordClient.flush);
246221
}
247222

248223
/// Clears all cached records from local storage.
249-
///
250-
/// Returns [Result.ok] with [ClearCacheData] containing the count of
251-
/// records cleared, or [Result.error] with:
252-
/// - [KinesisStorageException] for database errors
253224
Future<Result<ClearCacheData>> clearCache() async {
254225
if (_closed) return const Result.error(ClientClosedException());
255226
_logger.verbose('Clearing cache');
@@ -264,28 +235,19 @@ class AmplifyKinesisClient {
264235
}
265236

266237
/// Disables record collection and automatic flushing.
267-
///
268-
/// Records submitted while disabled are silently dropped. Already-cached
269-
/// records remain in storage and will be sent on the next flush after
270-
/// re-enabling.
271238
void disable() {
272239
_logger.info('Disabling record collection and automatic flushing');
273240
_enabled = false;
274241
_scheduler?.stop();
275242
}
276243

277244
/// Closes the client and releases all resources.
278-
///
279-
/// The client cannot be reused after closing.
280245
Future<void> close() async {
281246
_closed = true;
282247
_scheduler?.stop();
283248
await _recordClient.close();
284249
}
285250

286-
/// Wraps an async operation, catching any exceptions and returning them
287-
/// as [Result.error] with the appropriate [AmplifyKinesisException]
288-
/// subtype.
289251
Future<Result<T>> _wrapError<T>(Future<T> Function() operation) async {
290252
try {
291253
final value = await operation();

packages/kinesis/amplify_kinesis_dart/lib/src/amplify_kinesis_client_options.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import 'package:amplify_kinesis_dart/src/amplify_kinesis_client.dart'
55
show AmplifyKinesisClient;
6-
import 'package:amplify_kinesis_dart/src/flush_strategy/flush_strategy.dart';
6+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
77

88
/// {@template amplify_kinesis.amplify_kinesis_client_options}
99
/// Configuration options for [AmplifyKinesisClient].

packages/kinesis/amplify_kinesis_dart/lib/src/exception/amplify_kinesis_exception.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
import 'package:amplify_core/amplify_core.dart';
5-
import 'package:amplify_kinesis_dart/src/exception/record_cache_exception.dart';
5+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
66

77
/// Default recovery suggestion for errors.
88
const String defaultRecoverySuggestion =

packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_record.dart

Lines changed: 34 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,39 @@
44
import 'dart:convert';
55
import 'dart:typed_data';
66

7-
/// Internal representation of a record to be sent to Kinesis.
8-
final class RecordInput {
9-
/// Creates a new Kinesis record.
10-
RecordInput({
11-
required this.data,
12-
required this.partitionKey,
13-
required this.streamName,
14-
required this.createdAt,
15-
}) : dataSize = data.length + utf8.encode(partitionKey).length;
16-
17-
/// Creates a Kinesis record with the current timestamp.
18-
factory RecordInput.now({
19-
required Uint8List data,
20-
required String partitionKey,
21-
required String streamName,
22-
}) {
23-
return RecordInput(
24-
data: data,
25-
partitionKey: partitionKey,
26-
streamName: streamName,
27-
createdAt: DateTime.now(),
28-
);
29-
}
30-
31-
/// The data blob to send to Kinesis.
32-
final Uint8List data;
33-
34-
/// The partition key for the record.
35-
final String partitionKey;
36-
37-
/// The name of the Kinesis Data Stream.
38-
final String streamName;
39-
40-
/// The size of the record in bytes (data blob + partition key).
41-
///
42-
/// Per AWS docs, the record size limit applies to the total size of the
43-
/// partition key and data blob combined. Computed once at construction
44-
/// to avoid repeated UTF-8 encoding of the partition key.
45-
final int dataSize;
7+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
8+
9+
/// Creates a [RecordInput] for Kinesis Data Streams.
10+
///
11+
/// Unlike the generic `RecordInput`, this factory computes `dataSize`
12+
/// as `data.length + utf8.encode(partitionKey).length` per the KDS
13+
/// PutRecords API spec.
14+
RecordInput createKinesisRecordInput({
15+
required Uint8List data,
16+
required String partitionKey,
17+
required String streamName,
18+
required DateTime createdAt,
19+
}) {
20+
return RecordInput(
21+
data: data,
22+
streamName: streamName,
23+
partitionKey: partitionKey,
24+
dataSize: data.length + utf8.encode(partitionKey).length,
25+
createdAt: createdAt,
26+
);
27+
}
4628

47-
/// Timestamp of when the record was created.
48-
final DateTime createdAt;
29+
/// Creates a [RecordInput] for Kinesis Data Streams with the current
30+
/// timestamp.
31+
RecordInput createKinesisRecordInputNow({
32+
required Uint8List data,
33+
required String partitionKey,
34+
required String streamName,
35+
}) {
36+
return createKinesisRecordInput(
37+
data: data,
38+
partitionKey: partitionKey,
39+
streamName: streamName,
40+
createdAt: DateTime.now(),
41+
);
4942
}

packages/kinesis/amplify_kinesis_dart/lib/src/impl/kinesis_sender.dart

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,8 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
import 'package:amplify_kinesis_dart/src/model/record.dart';
54
import 'package:amplify_kinesis_dart/src/sdk/kinesis.dart';
6-
7-
/// Result of a PutRecords operation.
8-
///
9-
/// Records are categorized into three buckets:
10-
/// - [successfulIds]: records that were accepted by Kinesis.
11-
/// - [retryableIds]: records that failed with any error code but have not
12-
/// yet exceeded the retry limit. These will be retried in the next flush.
13-
/// - [failedIds]: records that have exceeded the retry limit and should be
14-
/// deleted from the cache.
15-
final class PutRecordsResult {
16-
/// Creates a new [PutRecordsResult].
17-
const PutRecordsResult({
18-
required this.successfulIds,
19-
required this.retryableIds,
20-
required this.failedIds,
21-
});
22-
23-
/// IDs of records that were successfully sent.
24-
final List<int> successfulIds;
25-
26-
/// IDs of records that failed but can be retried (retry count < max).
27-
final List<int> retryableIds;
28-
29-
/// IDs of records that exceeded the retry limit and should be deleted.
30-
final List<int> failedIds;
31-
}
5+
import 'package:amplify_record_cache_dart/amplify_record_cache_dart.dart';
326

337
/// {@template amplify_kinesis.kinesis_sender}
348
/// Handles communication with AWS Kinesis Data Streams.
@@ -37,7 +11,7 @@ final class PutRecordsResult {
3711
/// categorization so that all error codes are treated as retryable
3812
/// until the record exceeds `maxRetries`.
3913
/// {@endtemplate}
40-
class KinesisSender {
14+
class KinesisSender implements Sender {
4115
/// {@macro amplify_kinesis.kinesis_sender}
4216
KinesisSender({required KinesisClient kinesisClient, required int maxRetries})
4317
: _kinesisClient = kinesisClient,
@@ -46,18 +20,13 @@ class KinesisSender {
4620
final KinesisClient _kinesisClient;
4721
final int _maxRetries;
4822

49-
/// Sends records to a Kinesis stream and categorizes the response.
50-
///
51-
/// Each record in the response is categorized as:
52-
/// - successful: no error code
53-
/// - failed: has an error code AND retry count >= [_maxRetries]
54-
/// - retryable: has an error code AND retry count < [_maxRetries]
55-
Future<PutRecordsResult> putRecords({
23+
@override
24+
Future<SendResult> sendBatch({
5625
required String streamName,
5726
required List<Record> records,
5827
}) async {
5928
if (records.isEmpty) {
60-
return const PutRecordsResult(
29+
return const SendResult(
6130
successfulIds: [],
6231
retryableIds: [],
6332
failedIds: [],
@@ -84,10 +53,7 @@ class KinesisSender {
8453

8554
/// Splits the PutRecords response into successful, retryable, and failed
8655
/// record IDs based on error codes and retry counts.
87-
PutRecordsResult _splitResponse(
88-
PutRecordsResponse response,
89-
List<Record> records,
90-
) {
56+
SendResult _splitResponse(PutRecordsResponse response, List<Record> records) {
9157
final successfulIds = <int>[];
9258
final retryableIds = <int>[];
9359
final failedIds = <int>[];
@@ -104,14 +70,11 @@ class KinesisSender {
10470
} else if (retryCount >= _maxRetries) {
10571
failedIds.add(recordId);
10672
} else {
107-
// Error codes can be: ProvisionedThroughputExceededException or
108-
// InternalFailure. All are treated as retryable until the retry
109-
// limit is reached.
11073
retryableIds.add(recordId);
11174
}
11275
}
11376

114-
return PutRecordsResult(
77+
return SendResult(
11578
successfulIds: successfulIds,
11679
retryableIds: retryableIds,
11780
failedIds: failedIds,

0 commit comments

Comments
 (0)