feat(kinesis): Kinesis Data Streams Library#6516
Conversation
packages/kinesis/aws_kinesis_datastreams/lib/src/impl/record_storage.dart
Outdated
Show resolved
Hide resolved
packages/kinesis/aws_kinesis_datastreams/lib/src/impl/kinesis_sender.dart
Outdated
Show resolved
Hide resolved
packages/kinesis/aws_kinesis_datastreams/lib/src/exception/kinesis_exception.dart
Outdated
Show resolved
Hide resolved
| final Future<void> Function() _onFlush; | ||
| Timer? _timer; | ||
| bool _enabled = true; | ||
| bool _closed = false; |
There was a problem hiding this comment.
What is the benefit of having both _enabled and _closed?
There was a problem hiding this comment.
_closed can serve as a flag for other code to know whether or not the client itself is shut down or if just flushing is enabled or not. In our implementation they do similar things but it is useful for getters to know the internal state of the client I feel. What do you think?
There was a problem hiding this comment.
I think both approaches are fine, but personally, I recommend merging them for the auto flush scheduler, since _closed is not really used.
packages/kinesis/aws_kinesis_datastreams_dart/lib/src/impl/kinesis_sender.dart
Outdated
Show resolved
Hide resolved
packages/kinesis/aws_kinesis_datastreams/test/e2e/kinesis_e2e_test.dart
Outdated
Show resolved
Hide resolved
packages/kinesis/aws_kinesis_datastreams/test/e2e/kinesis_e2e_test.dart
Outdated
Show resolved
Hide resolved
packages/kinesis/aws_kinesis_datastreams_dart/test/e2e/kinesis_e2e_test.dart
Outdated
Show resolved
Hide resolved
packages/kinesis/aws_kinesis_datastreams/test/e2e/kinesis_e2e_test.dart
Outdated
Show resolved
Hide resolved
| }); | ||
| }); | ||
| }); | ||
| } |
There was a problem hiding this comment.
Can we ensure concurrent access to the record storage works via a test?
a21d70d to
ff90989
Compare
bb714ba to
6ee5d6d
Compare
packages/kinesis/aws_kinesis_datastreams/lib/src/impl/kinesis_sender.dart
Outdated
Show resolved
Hide resolved
| import 'package:amplify_foundation_dart/amplify_foundation_dart.dart'; | ||
| import 'package:aws_kinesis_datastreams/aws_kinesis_datastreams.dart' show KinesisLimitExceededException; | ||
| import 'package:aws_kinesis_datastreams/src/db/kinesis_record_database.dart'; | ||
| import 'package:aws_kinesis_datastreams/src/exception/amplify_kinesis_exception.dart' show KinesisLimitExceededException; |
There was a problem hiding this comment.
This is redundant to
import 'package:aws_kinesis_datastreams/aws_kinesis_datastreams.dart' show KinesisLimitExceededException;| } | ||
| } | ||
|
|
||
| static const int _maxCacheBytes = 4 * 1024 * 1024 * 1024; // 4GB |
There was a problem hiding this comment.
Not sure, can 4GB be a problem on 32Bit systems? @jvh-aws How do they other platforms treat that?
However, probably a nit considering how uncommon 32Bit got.
There was a problem hiding this comment.
Yes, let's change it to 5MB to align with the other implementations.
packages/kinesis/aws_kinesis_datastreams_dart/lib/src/kinesis_data_streams_options.dart
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Why do we have this empty file (https://github.com/aws-amplify/amplify-flutter/blob/feat/kinesis-data-streams-library/packages/kinesis/aws_kinesis_datastreams/lib/src/models/flush_data.dart / https://github.com/aws-amplify/amplify-flutter/blob/d9ec0471dc40b3cbcead0133e3be08558f39a347/packages/kinesis/aws_kinesis_datastreams/lib/src/models/flush_data.dart)?
| final batch = await _storage.getRecordsBatch(); | ||
| final count = batch.length; |
There was a problem hiding this comment.
We get all the data just to count it. Shouldn't we use a count query.
@jvh-aws What do you think?
There was a problem hiding this comment.
Agreed. We can probably use the ORM to count.
| await (_db.update(_db.kinesisRecords) | ||
| ..where((t) => t.id.equals(id))) | ||
| .write( | ||
| KinesisRecordsCompanion.custom( | ||
| retryCount: _db.kinesisRecords.retryCount + const Constant(1), | ||
| ), | ||
| ); | ||
| } |
There was a problem hiding this comment.
I am not so familiar with this but don't we query more updates than needed here? Shouldn't we use something like
UPDATE ... WHERE id IN (...) Maybe, @jvh-aws knows more here.
There was a problem hiding this comment.
agreed, this could be one query, good catch
There was a problem hiding this comment.
This is what we use in Android/Swift:
UPDATE records SET retry_count = retry_count + 1 WHERE id IN ($placeholders)| void enable() { _enabled = true; _scheduler.enable(); } | ||
| void disable() { _enabled = false; _scheduler.disable(); } | ||
| void enableAutoFlush() { _scheduler.enable(); } | ||
| void disableAutoFlush() { _scheduler.disable(); } |
There was a problem hiding this comment.
Nit: I don't think this matches our codestyle.
| /// {@template aws_kinesis_datastreams.amplify_kinesis_exception} | ||
| /// Base exception for Amplify Kinesis Data Streams errors. | ||
| /// {@endtemplate} | ||
| sealed class AmplifyKinesisException extends AmplifyException { |
There was a problem hiding this comment.
Since this is sealed, shouldn't the classes below extending this be final to keep the hierarchy sealed?
| sdk: ^3.9.0 | ||
|
|
||
| dependencies: | ||
| amplify_core: ">=2.7.0 <2.8.0" |
There was a problem hiding this comment.
This is not compatible with main, we are on a newer version.
| @@ -0,0 +1 @@ | |||
| {"info":"This is a generated file; do not edit or check into version control.","plugins":{"ios":[{"name":"amplify_auth_cognito","path":"/Users/ekjotm/Development/amplify-flutter/packages/auth/amplify_auth_cognito/","shared_darwin_source":true,"native_build":true,"dependencies":["amplify_secure_storage"],"dev_dependency":false},{"name":"amplify_secure_storage","path":"/Users/ekjotm/Development/amplify-flutter/packages/secure_storage/amplify_secure_storage/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"device_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/device_info_plus-12.3.0/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"package_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/package_info_plus-9.0.0/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"path_provider_foundation","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/path_provider_foundation-2.5.1/","shared_darwin_source":true,"native_build":true,"dependencies":[],"dev_dependency":false}],"android":[{"name":"amplify_analytics_pinpoint","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/amplify_analytics_pinpoint-2.7.0/","native_build":true,"dependencies":["amplify_db_common","amplify_secure_storage","device_info_plus","package_info_plus"],"dev_dependency":false},{"name":"amplify_auth_cognito","path":"/Users/ekjotm/Development/amplify-flutter/packages/auth/amplify_auth_cognito/","native_build":true,"dependencies":["amplify_analytics_pinpoint","amplify_secure_storage"],"dev_dependency":false},{"name":"amplify_db_common","path":"/Users/ekjotm/Development/amplify-flutter/packages/common/amplify_db_common/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"amplify_secure_storage","path":"/Users/ekjotm/Development/amplify-flutter/packages/secure_storage/amplify_secure_storage/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"device_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/device_info_plus-12.3.0/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"package_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/package_info_plus-9.0.0/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"path_provider_android","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/path_provider_android-2.2.22/","native_build":true,"dependencies":[],"dev_dependency":false}],"macos":[{"name":"amplify_auth_cognito","path":"/Users/ekjotm/Development/amplify-flutter/packages/auth/amplify_auth_cognito/","shared_darwin_source":true,"native_build":true,"dependencies":["amplify_secure_storage"],"dev_dependency":false},{"name":"amplify_secure_storage","path":"/Users/ekjotm/Development/amplify-flutter/packages/secure_storage/amplify_secure_storage/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"device_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/device_info_plus-12.3.0/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"package_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/package_info_plus-9.0.0/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"path_provider_foundation","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/path_provider_foundation-2.5.1/","shared_darwin_source":true,"native_build":true,"dependencies":[],"dev_dependency":false}],"linux":[{"name":"amplify_db_common","path":"/Users/ekjotm/Development/amplify-flutter/packages/common/amplify_db_common/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"amplify_secure_storage","path":"/Users/ekjotm/Development/amplify-flutter/packages/secure_storage/amplify_secure_storage/","native_build":false,"dependencies":[],"dev_dependency":false},{"name":"device_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/device_info_plus-12.3.0/","native_build":false,"dependencies":[],"dev_dependency":false},{"name":"package_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/package_info_plus-9.0.0/","native_build":false,"dependencies":[],"dev_dependency":false},{"name":"path_provider_linux","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/path_provider_linux-2.2.1/","native_build":false,"dependencies":[],"dev_dependency":false}],"windows":[{"name":"amplify_db_common","path":"/Users/ekjotm/Development/amplify-flutter/packages/common/amplify_db_common/","native_build":true,"dependencies":[],"dev_dependency":false},{"name":"amplify_secure_storage","path":"/Users/ekjotm/Development/amplify-flutter/packages/secure_storage/amplify_secure_storage/","native_build":false,"dependencies":["path_provider_windows"],"dev_dependency":false},{"name":"device_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/device_info_plus-12.3.0/","native_build":false,"dependencies":[],"dev_dependency":false},{"name":"package_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/package_info_plus-9.0.0/","native_build":false,"dependencies":[],"dev_dependency":false},{"name":"path_provider_windows","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/path_provider_windows-2.3.0/","native_build":false,"dependencies":[],"dev_dependency":false}],"web":[{"name":"amplify_secure_storage","path":"/Users/ekjotm/Development/amplify-flutter/packages/secure_storage/amplify_secure_storage/","dependencies":[],"dev_dependency":false},{"name":"device_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/device_info_plus-12.3.0/","dependencies":[],"dev_dependency":false},{"name":"package_info_plus","path":"/Users/ekjotm/.pub-cache/hosted/pub.dev/package_info_plus-9.0.0/","dependencies":[],"dev_dependency":false}]},"dependencyGraph":[{"name":"amplify_analytics_pinpoint","dependencies":["amplify_db_common","amplify_secure_storage","device_info_plus","package_info_plus","path_provider"]},{"name":"amplify_auth_cognito","dependencies":["amplify_analytics_pinpoint","amplify_secure_storage"]},{"name":"amplify_db_common","dependencies":["path_provider"]},{"name":"amplify_secure_storage","dependencies":["path_provider","path_provider_windows"]},{"name":"device_info_plus","dependencies":[]},{"name":"package_info_plus","dependencies":[]},{"name":"path_provider","dependencies":["path_provider_android","path_provider_foundation","path_provider_linux","path_provider_windows"]},{"name":"path_provider_android","dependencies":[]},{"name":"path_provider_foundation","dependencies":[]},{"name":"path_provider_linux","dependencies":[]},{"name":"path_provider_windows","dependencies":[]}],"date_created":"2026-01-26 14:26:14.510543","version":"3.35.4","swift_package_manager_enabled":{"ios":false,"macos":false}} No newline at end of file | |||
There was a problem hiding this comment.
thank you!!!! where not commented to your other comments in the review , consider them addressed
d9ec047 to
8ec0ec5
Compare
packages/kinesis/aws_kinesis_datastreams_dart/lib/src/db/kinesis_record_database.dart
Outdated
Show resolved
Hide resolved
| /// {@template aws_kinesis_datastreams.kinesis_service_exception} | ||
| /// Thrown when a Kinesis SDK/API error occurs. Inspect [sdkException] for details. | ||
| /// {@endtemplate} | ||
| final class KinesisServiceException extends AmplifyKinesisException { |
There was a problem hiding this comment.
I think we discussed that those errors are silently swallowed and not exposed.
| /// Thrown when a single record exceeds the Kinesis per-record size limit | ||
| /// (10 MiB, partition key + data blob combined). | ||
| /// {@endtemplate} | ||
| final class KinesisRecordTooLargeException extends AmplifyKinesisException { |
There was a problem hiding this comment.
If we have a specific type for records being too large, we should also have one for the partition key being too long.
There was a problem hiding this comment.
agreed, added a corresponding type
| if (_closed) throw ClientClosedException(); | ||
| if (!_enabled) return; | ||
|
|
||
| if (record.dataSize > kKinesisMaxRecordBytes) { |
There was a problem hiding this comment.
We should also check the partition key length here. It should be in (0, 256]
packages/kinesis/aws_kinesis_datastreams/lib/src/impl/record_client.dart
Outdated
Show resolved
Hide resolved
| ''', | ||
| variables: [Variable.withInt(maxCount), Variable.withInt(maxBytes)], | ||
| readsFrom: {_db.kinesisRecords}, | ||
| ).get(); |
There was a problem hiding this comment.
What happens if this query fails? In my understanding, currently, we'd swallow the error in flush's try-catch block. However, I think we would like to expose such errors as storage errors. Can we add logic to expose such errors as Kinesis* errors? Or am I missing something?
There was a problem hiding this comment.
no, good call, wrapped the failures in error object
|
|
||
| /// Returns the current total size of cached data in bytes. | ||
| Future<int> getCurrentCacheSize() async { | ||
| final query = _db.selectOnly(_db.kinesisRecords) |
There was a problem hiding this comment.
I think, I commented this before: We can keep an integer for the cache size and update / check against that, when we add new records. We will only need to run a DB query after removing records. This allows us to avoid the extra query when recording, which is most likely the most frequent operation.
| // --------------------------------------------------------------------------- | ||
| // Deprecated alias for backward compatibility | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| /// Deprecated: Use [AmplifyKinesisClient] instead. | ||
| @Deprecated('Use AmplifyKinesisClient instead') | ||
| typedef KinesisDataStreams = AmplifyKinesisClient; |
There was a problem hiding this comment.
Why do we need this in a brand new package?
There was a problem hiding this comment.
leftover from a previous generation, thanks
| final currentSize = await _storage.getCurrentCacheSize(); | ||
| if (currentSize + record.dataSize > _storage.maxCacheBytes) { | ||
| throw KinesisLimitExceededException(); | ||
| } | ||
|
|
||
| await _storage.saveRecord(record); |
There was a problem hiding this comment.
Don't we need a mutex/lock here? What happens if we call record() async two times?
There was a problem hiding this comment.
good call, added a Completer for the method
| while (true) { | ||
| final batch = await _storage.getRecordsBatch( | ||
| maxCount: _maxRecords, | ||
| maxBytes: maxBatchSizeBytes, | ||
| ); | ||
|
|
||
| if (batch.isEmpty) break; | ||
|
|
||
| final recordsByStream = <String, List<StoredRecord>>{}; | ||
| for (final record in batch) { | ||
| recordsByStream.putIfAbsent(record.streamName, () => []).add(record); | ||
| } | ||
|
|
||
| for (final entry in recordsByStream.entries) { | ||
| final flushed = await _sendStreamBatch(entry.key, entry.value); | ||
| totalFlushed += flushed; | ||
| } | ||
|
|
||
| await _storage.deleteRecordsExceedingRetries(_maxRetries); | ||
| } |
There was a problem hiding this comment.
Can't we get into an endless loop here? Is there any detection of a situation where batch does not get smaller?
There was a problem hiding this comment.
yes, I've added a hard limit and a counter for retries without the batch size decreasing, if maxRetries+2 iterations go without the batch getting smaller we break
| /// {@template aws_kinesis_datastreams.interval_flush_strategy} | ||
| /// A flush strategy that triggers automatic flushes at a fixed interval. | ||
| /// {@endtemplate} | ||
| class KinesisDataStreamsInterval extends KinesisDataStreamsFlushStrategy { |
There was a problem hiding this comment.
I think this should be final. We don't want to extend it, right?
| /// {@template aws_kinesis_datastreams.none_flush_strategy} | ||
| /// A flush strategy that disables automatic flushing. | ||
| /// {@endtemplate} | ||
| class KinesisDataStreamsNone extends KinesisDataStreamsFlushStrategy { |
There was a problem hiding this comment.
I think this should be final. We don't want to extend it, right?
| /// Orchestrates record operations, managing the flow between storage, | ||
| /// scheduling, and sending. | ||
| /// {@endtemplate} | ||
| class RecordClient { |
There was a problem hiding this comment.
can't make it final because MockRecordClient extends Mock implements RecordClient in tests. Added doc comment explaining why.
| /// {@template aws_kinesis_datastreams.record_storage} | ||
| /// Manages SQLite database operations for record persistence. | ||
| /// {@endtemplate} | ||
| class RecordStorage { |
packages/kinesis/aws_kinesis_datastreams/lib/src/impl/auto_flush_scheduler.dart
Outdated
Show resolved
Hide resolved
| /// Uses window functions to efficiently limit at the database level. | ||
| Future<List<StoredRecord>> getRecordsBatch({ | ||
| int maxCount = 500, | ||
| int maxBytes = 10 * 1024 * 1024, // 10 MiB for Kinesis PutRecords |
There was a problem hiding this comment.
Don't we already have a constant for this somewhere? Maybe, we should move it to a constant file and use it everywhere.
5ffac53 to
11bb6e0
Compare
1f9c2b1 to
93c5331
Compare
Adds the foundational package with minimal constructs needed for the Kinesis client libraries: - AmplifyException base class - AWSCredentialsProvider<T> and AWSCredentials sealed hierarchy - Logger interface with AmplifyLogger implementation - LogLevel enum - Result<T, E> sealed type
Provides V2CredentialsProviderBridge to bridge aws_common (V2) credentials to amplify_foundation_dart (V3) credentials. Shared by kinesis client packages to avoid duplicating bridge logic.
refactor(kinesis): rename kinesis_data_streams_options.dart to amplify_kinesis_client_options.dart Align the options file name with the client class name for consistency. Co-authored-by: jvh-aws <jvhoff@amazon.de>
* Reapply "Align exceptions with Amplify v2 AmplifyException pattern\n\nSwitch AmplifyKinesisException to extend amplify_core's\nAmplifyException instead of amplify_foundation_dart's.\nAdopt positional message, optional recoverySuggestion,\nunderlyingException (was cause), const constructors,\nand runtimeTypeName overrides."" This reverts commit f660c5f. * Add const to ClientClosedException() call sites --------- Co-authored-by: jvh-aws <jvhoff@amazon.de>
Expose library version in foundation, use it in kinesis user agent
* Expose library version in foundation, use it in kinesis user agent * Fix flush strategy naming * Apply autoformat * Apply autoformat
* Expose library version in foundation, use it in kinesis user agent * Fix flush strategy naming * Apply autoformat * Apply autoformat * Rename package from aws_kinesis_datastreams to amplify_kinesis * Fix analyzer
This reverts commit c3cfb6c.
Strip RecordData to an empty wrapper for now. Fields can be added later once cross-platform alignment on the return shape is finalized.
* Minor doc fixes and remove reexport * Adapt public facing docstrings
…6795) * refactor: remove Category.kinesis enum, use string categories in deploy script Decouple deploy_gen2.dart from the amplify_core Category enum by changing AmplifyBackendGroup.category to a plain String. This allows removing Category.kinesis from the enum and its dead switch case in AmplifyClassImpl, since the kinesis client is standalone and does not participate in the Amplify plugin registration system. The --category CLI flag now derives allowed values from infraConfig instead of Category.values. * fix: match original Category.api.name casing ('API' not 'Api')
Add Kinesis Data Streams Client --------- Co-authored-by: Jan Vincent Hoffbauer <jvhoff@amazon.de> Co-authored-by: Jonas Greifenhain <jonasgre@amazon.de>
Add Kinesis Data Streams Client --------- Co-authored-by: Jan Vincent Hoffbauer <jvhoff@amazon.de> Co-authored-by: Jonas Greifenhain <jonasgre@amazon.de>
Add Kinesis Data Streams Client --------- Co-authored-by: Jan Vincent Hoffbauer <jvhoff@amazon.de> Co-authored-by: Jonas Greifenhain <jonasgre@amazon.de>
Issue #, if available:
Description of changes:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.