Skip to content

Commit 11bb6e0

Browse files
committed
fix: address KDS PR review comments (round 2)\n\n- Remove Category.kinesis from amplify_categories (KDS is independent of plugin system)\n- Remove KinesisServiceException (SDK errors are silently retried, not exposed)\n- Add KinesisPartitionKeyInvalidException and partition key validation (1-256 chars)\n- Add in-memory cache size tracking to avoid DB query on every record() call\n- Add async lock on record() to prevent concurrent cache size races\n- Add maxIterations safety bound on flush loop\n- Wrap storage query failures in KinesisStorageException in flush\n- Scope sender error catch in _sendStreamBatch to only the putRecords call\n- Make KinesisDataStreamsInterval, KinesisDataStreamsNone, RecordStorage final\n- Use kKinesisMaxBatchBytes constant in getRecordsBatch defaults\n- Fix Timer.periodic arrow syntax style nit
1 parent c27e40b commit 11bb6e0

File tree

8 files changed

+158
-47
lines changed

8 files changed

+158
-47
lines changed

packages/amplify_core/lib/src/amplify_class_impl.dart

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ class AmplifyClassImpl extends AmplifyClass {
4646
plugin.cast(),
4747
authProviderRepo: authProviderRepo,
4848
);
49-
case Category.kinesis:
50-
throw UnimplementedError('Kinesis is not an Amplify category plugin');
5149
}
5250
}
5351
}

packages/amplify_core/lib/src/category/amplify_categories.dart

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@ enum Category {
5050
storage,
5151

5252
/// Push Notifications
53-
pushNotifications,
54-
55-
/// Kinesis (Data Streams & Firehose)
56-
kinesis;
53+
pushNotifications;
5754

5855
String get name => switch (this) {
5956
Category.analytics => 'Analytics',
@@ -63,7 +60,6 @@ enum Category {
6360
Category.hub => 'Hub',
6461
Category.storage => 'Storage',
6562
Category.pushNotifications => 'PushNotifications',
66-
Category.kinesis => 'kinesis',
6763
};
6864
}
6965

packages/kinesis/aws_kinesis_datastreams/lib/src/exception/amplify_kinesis_exception.dart

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,6 @@ final class KinesisStorageException extends AmplifyKinesisException {
3737
);
3838
}
3939

40-
/// {@template aws_kinesis_datastreams.kinesis_service_exception}
41-
/// Thrown when a Kinesis SDK/API error occurs. Inspect [sdkException] for details.
42-
/// {@endtemplate}
43-
final class KinesisServiceException extends AmplifyKinesisException {
44-
/// {@macro aws_kinesis_datastreams.kinesis_service_exception}
45-
KinesisServiceException(String message, {this.sdkException, super.cause})
46-
: super(
47-
message: message,
48-
recoverySuggestion: 'Check sdkException for details.',
49-
);
50-
51-
/// The underlying SDK exception, if any.
52-
final Object? sdkException;
53-
}
54-
5540
/// {@template aws_kinesis_datastreams.kinesis_limit_exceeded_exception}
5641
/// Thrown when the local cache is full.
5742
/// {@endtemplate}
@@ -107,6 +92,22 @@ final class KinesisRecordTooLargeException extends AmplifyKinesisException {
10792
);
10893
}
10994

95+
/// {@template aws_kinesis_datastreams.kinesis_partition_key_invalid_exception}
96+
/// Thrown when a partition key is empty or exceeds the Kinesis limit (256 characters).
97+
/// {@endtemplate}
98+
final class KinesisPartitionKeyInvalidException extends AmplifyKinesisException {
99+
/// {@macro aws_kinesis_datastreams.kinesis_partition_key_invalid_exception}
100+
KinesisPartitionKeyInvalidException({
101+
required int keyLength,
102+
}) : super(
103+
message:
104+
'Partition key length ($keyLength) is invalid. '
105+
'Kinesis requires partition keys to be between 1 and 256 characters.',
106+
recoverySuggestion:
107+
'Use a partition key between 1 and 256 characters.',
108+
);
109+
}
110+
110111
/// {@template aws_kinesis_datastreams.client_closed_exception}
111112
/// Thrown when an operation is attempted on a closed client.
112113
/// {@endtemplate}

packages/kinesis/aws_kinesis_datastreams/lib/src/flush_strategy/flush_strategy.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ sealed class KinesisDataStreamsFlushStrategy {
1616
/// {@template aws_kinesis_datastreams.interval_flush_strategy}
1717
/// A flush strategy that triggers automatic flushes at a fixed interval.
1818
/// {@endtemplate}
19-
class KinesisDataStreamsInterval extends KinesisDataStreamsFlushStrategy {
19+
final class KinesisDataStreamsInterval extends KinesisDataStreamsFlushStrategy {
2020
/// {@macro aws_kinesis_datastreams.interval_flush_strategy}
2121
const KinesisDataStreamsInterval({
2222
this.interval = const Duration(seconds: 30),
@@ -29,7 +29,7 @@ class KinesisDataStreamsInterval extends KinesisDataStreamsFlushStrategy {
2929
/// {@template aws_kinesis_datastreams.none_flush_strategy}
3030
/// A flush strategy that disables automatic flushing.
3131
/// {@endtemplate}
32-
class KinesisDataStreamsNone extends KinesisDataStreamsFlushStrategy {
32+
final class KinesisDataStreamsNone extends KinesisDataStreamsFlushStrategy {
3333
/// {@macro aws_kinesis_datastreams.none_flush_strategy}
3434
const KinesisDataStreamsNone();
3535
}

packages/kinesis/aws_kinesis_datastreams/lib/src/impl/auto_flush_scheduler.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class AutoFlushScheduler {
3131

3232
switch (_strategy) {
3333
case KinesisDataStreamsInterval(:final interval):
34-
_timer = Timer.periodic(interval, (_) { _handleTimerTick(); });
34+
_timer = Timer.periodic(interval, (_) => _handleTimerTick());
3535
case KinesisDataStreamsNone():
3636
break;
3737
}

packages/kinesis/aws_kinesis_datastreams/lib/src/impl/record_client.dart

Lines changed: 130 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
import 'dart:async';
55

6+
import 'package:aws_kinesis_datastreams/aws_kinesis_datastreams.dart' show AmplifyKinesisClient;
7+
import 'package:aws_kinesis_datastreams/src/amplify_kinesis_client.dart' show AmplifyKinesisClient;
68
import 'package:aws_kinesis_datastreams/src/db/kinesis_record_database.dart';
79
import 'package:aws_kinesis_datastreams/src/exception/amplify_kinesis_exception.dart';
810
import 'package:aws_kinesis_datastreams/src/impl/auto_flush_scheduler.dart';
@@ -16,6 +18,8 @@ import 'package:aws_kinesis_datastreams/src/model/flush_data.dart';
1618
/// {@template aws_kinesis_datastreams.record_client}
1719
/// Orchestrates record operations, managing the flow between storage,
1820
/// scheduling, and sending.
21+
///
22+
/// Not `final` to allow mocking in tests via [AmplifyKinesisClient.withRecordClient].
1923
/// {@endtemplate}
2024
class RecordClient {
2125
/// {@macro aws_kinesis_datastreams.record_client}
@@ -41,38 +45,79 @@ class RecordClient {
4145
bool _closed = false;
4246
bool _flushing = false;
4347

48+
/// In-memory cache size tracker to avoid a DB query on every record() call.
49+
/// Recalculated from DB after deletes (flush/clearCache).
50+
int _cachedSize = 0;
51+
bool _cacheSizeInitialized = false;
52+
53+
/// Simple async lock to prevent concurrent record() calls from
54+
/// racing on cache size checks.
55+
Completer<void>? _recordLock;
56+
4457
/// Maximum batch size in bytes (10 MiB Kinesis PutRecords limit).
45-
static const int maxBatchSizeBytes = 10 * 1024 * 1024;
58+
static const int maxBatchSizeBytes = kKinesisMaxBatchBytes;
4659

4760
/// Whether the client is currently enabled.
4861
bool get isEnabled => _enabled;
4962

5063
/// Whether the client has been closed.
5164
bool get isClosed => _closed;
5265

66+
/// Ensures the in-memory cache size is initialized from the database.
67+
Future<void> _ensureCacheSizeInitialized() async {
68+
if (!_cacheSizeInitialized) {
69+
_cachedSize = await _storage.getCurrentCacheSize();
70+
_cacheSizeInitialized = true;
71+
}
72+
}
73+
5374
/// Records data to the local cache.
5475
///
5576
/// Throws [ClientClosedException] if the client has been closed.
77+
/// Throws [KinesisPartitionKeyInvalidException] if the partition key is
78+
/// empty or exceeds 256 characters.
5679
/// Throws [KinesisRecordTooLargeException] if the record exceeds the
5780
/// per-record size limit (10 MiB, partition key + data blob).
5881
/// Throws [KinesisLimitExceededException] if the cache is full.
5982
Future<void> record(KinesisRecord record) async {
6083
if (_closed) throw ClientClosedException();
6184
if (!_enabled) return;
6285

86+
// Validate partition key length (Kinesis requires 1-256 characters).
87+
if (record.partitionKey.isEmpty ||
88+
record.partitionKey.length > kKinesisMaxPartitionKeyLength) {
89+
throw KinesisPartitionKeyInvalidException(
90+
keyLength: record.partitionKey.length,
91+
);
92+
}
93+
6394
if (record.dataSize > kKinesisMaxRecordBytes) {
6495
throw KinesisRecordTooLargeException(
6596
recordBytes: record.dataSize,
6697
maxBytes: kKinesisMaxRecordBytes,
6798
);
6899
}
69100

70-
final currentSize = await _storage.getCurrentCacheSize();
71-
if (currentSize + record.dataSize > _storage.maxCacheBytes) {
72-
throw KinesisLimitExceededException();
101+
// Acquire async lock to prevent concurrent cache size races.
102+
while (_recordLock != null) {
103+
await _recordLock!.future;
73104
}
105+
_recordLock = Completer<void>();
106+
107+
try {
108+
await _ensureCacheSizeInitialized();
74109

75-
await _storage.saveRecord(record);
110+
if (_cachedSize + record.dataSize > _storage.maxCacheBytes) {
111+
throw KinesisLimitExceededException();
112+
}
113+
114+
await _storage.saveRecord(record);
115+
_cachedSize += record.dataSize;
116+
} finally {
117+
final lock = _recordLock!;
118+
_recordLock = null;
119+
lock.complete();
120+
}
76121
}
77122

78123
/// Flushes all cached records to Kinesis.
@@ -86,14 +131,36 @@ class RecordClient {
86131
var totalFlushed = 0;
87132

88133
try {
89-
while (true) {
90-
final batch = await _storage.getRecordsBatch(
91-
maxCount: _maxRecords,
92-
maxBytes: maxBatchSizeBytes,
93-
);
134+
// Safety bound: limit iterations to prevent infinite loops if records
135+
// keep failing but never exceed retries within a single flush cycle.
136+
var iterations = 0;
137+
const maxIterations = 100;
138+
139+
var consecutiveNoProgress = 0;
140+
// Allow enough no-progress iterations for records to exhaust their
141+
// retries before considering the batch stuck.
142+
final maxConsecutiveNoProgress = _maxRetries + 2;
143+
144+
while (iterations < maxIterations) {
145+
iterations++;
146+
147+
List<StoredRecord> batch;
148+
try {
149+
batch = await _storage.getRecordsBatch(
150+
maxCount: _maxRecords,
151+
maxBytes: maxBatchSizeBytes,
152+
);
153+
} on Exception catch (e) {
154+
throw KinesisStorageException(
155+
'Failed to retrieve records batch',
156+
cause: e,
157+
);
158+
}
94159

95160
if (batch.isEmpty) break;
96161

162+
final countBefore = await _storage.getRecordCount();
163+
97164
final recordsByStream = <String, List<StoredRecord>>{};
98165
for (final record in batch) {
99166
recordsByStream.putIfAbsent(record.streamName, () => []).add(record);
@@ -105,58 +172,102 @@ class RecordClient {
105172
}
106173

107174
await _storage.deleteRecordsExceedingRetries(_maxRetries);
175+
176+
// Track whether the batch is making progress. If the record count
177+
// hasn't decreased for several consecutive iterations, the batch
178+
// is stuck (e.g. all records are retryable but haven't exceeded
179+
// max retries yet) — break to avoid spinning.
180+
final countAfter = await _storage.getRecordCount();
181+
if (countAfter < countBefore) {
182+
consecutiveNoProgress = 0;
183+
} else {
184+
consecutiveNoProgress++;
185+
if (consecutiveNoProgress >= maxConsecutiveNoProgress) break;
186+
}
108187
}
188+
189+
// Recalculate in-memory cache size from DB after deletes.
190+
_cachedSize = await _storage.getCurrentCacheSize();
109191
} finally {
110192
_flushing = false;
111193
}
112194

113195
return FlushData(recordsFlushed: totalFlushed);
114196
}
115197

116-
Future<int> _sendStreamBatch(String streamName, List<StoredRecord> records) async {
198+
Future<int> _sendStreamBatch(
199+
String streamName,
200+
List<StoredRecord> records,
201+
) async {
117202
final senderRecords = records
118-
.map((r) => KinesisSenderRecord(data: r.data, partitionKey: r.partitionKey))
203+
.map(
204+
(r) => KinesisSenderRecord(
205+
data: r.data,
206+
partitionKey: r.partitionKey,
207+
),
208+
)
119209
.toList();
120210

211+
PutRecordsResult result;
121212
try {
122-
final result = await _sender.putRecords(streamName: streamName, records: senderRecords);
123-
124-
await _storage.deleteRecords(result.successfulRecordIndices.map((i) => records[i].id));
125-
await _storage.incrementRetryCount(result.retryableRecordIndices.map((i) => records[i].id));
126-
await _storage.deleteRecords(result.failedRecordIndices.map((i) => records[i].id));
127-
128-
return result.successfulRecordIndices.length;
213+
result = await _sender.putRecords(
214+
streamName: streamName,
215+
records: senderRecords,
216+
);
129217
} on Exception {
218+
// Sender/SDK errors — increment retry count and continue.
219+
// Non-SDK exceptions (e.g. storage errors) are not caught here
220+
// because they originate from _storage calls below, not _sender.
130221
await _storage.incrementRetryCount(records.map((r) => r.id));
131222
return 0;
132223
}
224+
225+
// Storage operations after a successful send propagate errors to caller.
226+
await _storage.deleteRecords(
227+
result.successfulRecordIndices.map((i) => records[i].id),
228+
);
229+
await _storage.incrementRetryCount(
230+
result.retryableRecordIndices.map((i) => records[i].id),
231+
);
232+
await _storage.deleteRecords(
233+
result.failedRecordIndices.map((i) => records[i].id),
234+
);
235+
236+
return result.successfulRecordIndices.length;
133237
}
134238

135239
/// Clears all cached records.
136240
Future<ClearCacheData> clearCache() async {
137241
final count = await _storage.getRecordCount();
138242
await _storage.clear();
243+
// Reset in-memory cache size after clearing.
244+
_cachedSize = 0;
139245
return ClearCacheData(recordsCleared: count);
140246
}
141247

248+
/// Enables the client to accept and flush records.
142249
void enable() {
143250
_enabled = true;
144251
_scheduler.enable();
145252
}
146253

254+
/// Disables the client from accepting and flushing records.
147255
void disable() {
148256
_enabled = false;
149257
_scheduler.disable();
150258
}
151259

260+
/// Enables automatic flush operations.
152261
void enableAutoFlush() {
153262
_scheduler.enable();
154263
}
155264

265+
/// Disables automatic flush operations.
156266
void disableAutoFlush() {
157267
_scheduler.disable();
158268
}
159269

270+
/// Closes the client and releases all resources.
160271
Future<void> close() async {
161272
_closed = true;
162273
await _scheduler.close();

packages/kinesis/aws_kinesis_datastreams/lib/src/impl/record_storage.dart

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import 'dart:async';
55

66
import 'package:aws_kinesis_datastreams/src/db/kinesis_record_database.dart';
77
import 'package:aws_kinesis_datastreams/src/impl/kinesis_record.dart';
8+
import 'package:aws_kinesis_datastreams/src/kinesis_data_streams_options.dart'
9+
show kKinesisMaxBatchBytes, kKinesisMaxRecordsPerBatch;
810
import 'package:drift/drift.dart';
911

1012
/// {@template aws_kinesis_datastreams.record_storage}
1113
/// Manages SQLite database operations for record persistence.
1214
/// {@endtemplate}
13-
class RecordStorage {
15+
final class RecordStorage {
1416
/// {@macro aws_kinesis_datastreams.record_storage}
1517
RecordStorage({
1618
required KinesisRecordDatabase database,
@@ -42,8 +44,8 @@ class RecordStorage {
4244
/// Returns records up to [maxCount] records and [maxBytes] total size.
4345
/// Uses window functions to efficiently limit at the database level.
4446
Future<List<StoredRecord>> getRecordsBatch({
45-
int maxCount = 500,
46-
int maxBytes = 10 * 1024 * 1024, // 10 MiB for Kinesis PutRecords
47+
int maxCount = kKinesisMaxRecordsPerBatch,
48+
int maxBytes = kKinesisMaxBatchBytes,
4749
}) async {
4850
// Use window functions to compute row number and running size,
4951
// then filter to get records within both limits.

packages/kinesis/aws_kinesis_datastreams/lib/src/kinesis_data_streams_options.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ const int kKinesisMaxBatchBytes = 10 * 1024 * 1024;
2121
/// partition key and data blob."
2222
const int kKinesisMaxRecordBytes = 10 * 1024 * 1024;
2323

24+
/// Maximum length of a Kinesis partition key (256 characters).
25+
const int kKinesisMaxPartitionKeyLength = 256;
26+
2427
/// {@template aws_kinesis_datastreams.amplify_kinesis_client_options}
2528
/// Configuration options for [AmplifyKinesisClient].
2629
///

0 commit comments

Comments
 (0)