Skip to content

Commit cfefe78

Browse files
authored
fix: handle kinesis unhandled error: InternalError (#15979) (#16001)
1 parent 4772af4 commit cfefe78

File tree

1 file changed

+11
-1
lines changed
  • src/connector/src/source/kinesis/source

1 file changed

+11
-1
lines changed

src/connector/src/source/kinesis/source/reader.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::time::Duration;
1616

1717
use anyhow::{anyhow, Result};
1818
use async_trait::async_trait;
19-
use aws_sdk_kinesis::error::{DisplayErrorContext, SdkError};
19+
use aws_sdk_kinesis::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError};
2020
use aws_sdk_kinesis::operation::get_records::{GetRecordsError, GetRecordsOutput};
2121
use aws_sdk_kinesis::primitives::DateTime;
2222
use aws_sdk_kinesis::types::ShardIteratorType;
@@ -188,6 +188,16 @@ impl CommonSplitReader for KinesisSplitReader {
188188
self.new_shard_iter().await?;
189189
continue;
190190
}
191+
Err(e) if e.code() == Some("InternalFailure") => {
192+
tracing::warn!(
193+
"stream {:?} shard {:?} met internal failure, retrying",
194+
self.stream_name,
195+
self.shard_id
196+
);
197+
self.new_shard_iter().await?;
198+
tokio::time::sleep(Duration::from_millis(200)).await;
199+
continue;
200+
}
191201
Err(e) => {
192202
let error_msg = format!(
193203
"Kinesis got a unhandled error: {:?}, stream {:?}, shard {:?}",

0 commit comments

Comments
 (0)