Skip to content

Commit de81ad3

Browse files
Remove permanent errors from DynamoDB Streams (spiceai#10794)
* Remote permanent errors from DynamoDB Streams * Address feedback * Lint * Fix max backoff * Apply suggestions from code review Co-authored-by: Phillip LeBlanc <879445+phillipleblanc@users.noreply.github.com> --------- Co-authored-by: Phillip LeBlanc <879445+phillipleblanc@users.noreply.github.com>
1 parent da237ad commit de81ad3

6 files changed

Lines changed: 213 additions & 325 deletions

File tree

crates/data_components/src/dynamodb/stream.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use aws_sdk_dynamodb::types::AttributeValue as DynamoDbAttributeValue;
2727
use aws_sdk_dynamodbstreams::types::AttributeValue as StreamsAttributeValue;
2828
use aws_sdk_dynamodbstreams::types::OperationType;
2929
use datafusion::error::DataFusionError;
30-
use dynamodb_streams::StreamResult;
30+
use dynamodb_streams::DynamoDBStreamBatch;
3131
use dynamodb_streams::checkpoint::Checkpoint;
3232
use snafu::prelude::*;
3333
use std::collections::HashMap;
@@ -119,14 +119,13 @@ fn get_primary_keys_array(primary_keys: &[String], row_count: usize) -> ListArra
119119
// This function is used for processing stream batches which have checkpoints.
120120
// Incoming batches are from DynamoDB Streams.
121121
pub fn process_batch(
122-
batch: StreamResult,
122+
batch: DynamoDBStreamBatch,
123123
table_schema: &Arc<Schema>,
124124
primary_keys: &[String],
125125
unnest_depth: Option<usize>,
126126
time_format: &str,
127127
json_nesting: Option<&JsonNesting>,
128128
) -> Result<(ChangeBatch, Checkpoint, Option<SystemTime>), StreamError> {
129-
let batch = batch.context(FailedToReceiveMessageSnafu)?;
130129
let records = batch.records;
131130

132131
let changes_schema = changes_schema(table_schema);
@@ -304,15 +303,14 @@ mod tests {
304303
.build()
305304
}
306305

307-
#[expect(clippy::unnecessary_wraps)]
308-
fn create_stream_result(records: Vec<Record>) -> StreamResult {
309-
Ok(DynamoDBStreamBatch {
306+
fn create_stream_result(records: Vec<Record>) -> DynamoDBStreamBatch {
307+
DynamoDBStreamBatch {
310308
records,
311309
checkpoint: Checkpoint {
312310
shards: HashMap::default(),
313311
},
314312
watermark: None,
315-
})
313+
}
316314
}
317315

318316
mod process_batch {

crates/dynamodb-streams/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ impl Client {
121121
let retry_strategy = RetryBackoffBuilder::new()
122122
.method(BackoffMethod::Fibonacci)
123123
.max_retries(None)
124+
.max_duration(Some(Duration::from_secs(60)))
124125
.build();
125126

126127
let producer = DynamodbStreamProducer {

crates/dynamodb-streams/src/lib.rs

Lines changed: 0 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ pub use client::Client;
3333
pub use metrics::{Metrics, MetricsCollector};
3434
pub use stream::{DynamoDBStreamBatch, DynamodbStream};
3535

36-
pub type StreamResult = Result<DynamoDBStreamBatch, Error>;
37-
3836
pub type Result<T, E = Error> = std::result::Result<T, E>;
3937

4038
#[derive(Debug, Snafu)]
@@ -109,14 +107,6 @@ fn is_throttling_error_code(code: Option<&str>) -> bool {
109107
}
110108

111109
impl Error {
112-
#[must_use]
113-
pub fn is_retriable(&self) -> bool {
114-
matches!(
115-
self,
116-
Error::Timeout | Error::ConnectionFailure | Error::Throttled
117-
)
118-
}
119-
120110
pub fn from_describe_table(err: SdkError<DescribeTableError>) -> Self {
121111
match err {
122112
SdkError::TimeoutError(_) => Error::Timeout,
@@ -203,96 +193,6 @@ impl Error {
203193
mod tests {
204194
use super::*;
205195

206-
mod error_is_retriable {
207-
use super::*;
208-
209-
#[test]
210-
fn test_timeout_is_retriable() {
211-
let err = Error::Timeout;
212-
assert!(err.is_retriable());
213-
}
214-
215-
#[test]
216-
fn test_connection_failure_is_retriable() {
217-
let err = Error::ConnectionFailure;
218-
assert!(err.is_retriable());
219-
}
220-
221-
#[test]
222-
fn test_throttled_is_retriable() {
223-
let err = Error::Throttled;
224-
assert!(err.is_retriable());
225-
}
226-
227-
#[test]
228-
fn test_table_not_found_is_not_retriable() {
229-
let err = Error::TableNotFound;
230-
assert!(!err.is_retriable());
231-
}
232-
233-
#[test]
234-
fn test_stream_not_found_is_not_retriable() {
235-
let err = Error::StreamNotFound;
236-
assert!(!err.is_retriable());
237-
}
238-
239-
#[test]
240-
fn test_stream_description_not_found_is_not_retriable() {
241-
let err = Error::StreamDescriptionNotFound {
242-
stream_arn: "arn:aws:dynamodb:region:account:table/name/stream/time".to_string(),
243-
};
244-
assert!(!err.is_retriable());
245-
}
246-
247-
#[test]
248-
fn test_shard_iterator_not_found_is_not_retriable() {
249-
let err = Error::ShardIteratorNotFound {
250-
shard_id: "shard-123".to_string(),
251-
};
252-
assert!(!err.is_retriable());
253-
}
254-
255-
#[test]
256-
fn test_failed_to_initialize_checkpoint_is_not_retriable() {
257-
let err = Error::FailedToInitializeCheckpoint;
258-
assert!(!err.is_retriable());
259-
}
260-
261-
#[test]
262-
fn test_stream_beyond_retention_is_not_retriable() {
263-
let err = Error::StreamBeyondRetention;
264-
assert!(!err.is_retriable());
265-
}
266-
267-
#[test]
268-
fn test_shard_not_found_is_not_retriable() {
269-
let err = Error::ShardNotFound;
270-
assert!(!err.is_retriable());
271-
}
272-
273-
#[test]
274-
fn test_missing_starting_sequence_number_is_not_retriable() {
275-
let err = Error::MissingStaringSequenceNumber;
276-
assert!(!err.is_retriable());
277-
}
278-
279-
#[test]
280-
fn test_unexpected_shard_id_is_not_retriable() {
281-
let err = Error::UnexpectedShardId {
282-
shard_id: "shard-xyz".to_string(),
283-
};
284-
assert!(!err.is_retriable());
285-
}
286-
287-
#[test]
288-
fn test_iterator_expired_is_not_retriable() {
289-
// Iterator expired is handled specially (reinitialization)
290-
// but should NOT be in the retriable category
291-
let err = Error::IteratorExpired;
292-
assert!(!err.is_retriable());
293-
}
294-
}
295-
296196
mod error_display {
297197
use super::*;
298198

crates/dynamodb-streams/src/stream.rs

Lines changed: 61 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use crate::checkpoint::{Checkpoint, CheckpointPosition};
1717
use crate::client_sdk::SDKClient;
1818
use crate::metrics::MetricsCollector;
1919
use crate::stream_state::{InitializingShard, PollOutcome, ShardPollResult, StreamState};
20-
use crate::{Result, StreamResult};
2120
use aws_sdk_dynamodbstreams::types::{Record, ShardIteratorType};
2221
use futures::{Stream, future::join_all};
2322
use std::collections::HashMap;
@@ -37,7 +36,7 @@ pub struct DynamodbStreamProducer {
3736
pub stream_arn: String,
3837
pub state: StreamState,
3938
pub interval: Option<Duration>,
40-
pub sender: mpsc::Sender<StreamResult>,
39+
pub sender: mpsc::Sender<DynamoDBStreamBatch>,
4140
pub client: Arc<SDKClient>,
4241
pub retry_strategy: RetryBackoff,
4342
pub metrics_collector: Arc<MetricsCollector>,
@@ -52,13 +51,12 @@ pub struct DynamoDBStreamBatch {
5251
const DEFAULT_SLEEP_DURATION: Duration = Duration::from_millis(500);
5352

5453
impl DynamodbStreamProducer {
55-
async fn collect(&mut self) -> Result<(DynamoDBStreamBatch, bool)> {
54+
async fn collect(&mut self) -> (DynamoDBStreamBatch, bool) {
5655
let mut poll_results = Vec::new();
57-
let mut had_transient_error = false;
56+
let mut had_error = false;
5857

5958
// 1. Initialize shards that require iterators
60-
// If permanent error is encountered, it is surfaced to the client.
61-
self.initialize_shards_iterators().await?;
59+
had_error |= self.initialize_shards_iterators().await;
6260

6361
// 2. Poll active shards
6462
let futures = self.state.get_active_shards().map(|shard| {
@@ -76,37 +74,36 @@ impl DynamodbStreamProducer {
7674
// 3. Process poll results
7775
for (shard_id, result) in results {
7876
let poll_result = match result {
79-
Ok((next_iter, records)) => self
80-
.state
81-
.handle_poll_result(&shard_id, next_iter, records)?,
77+
Ok((next_iter, records)) => {
78+
self.state.handle_poll_result(&shard_id, next_iter, records)
79+
}
8280
Err(e) => {
83-
had_transient_error = true;
84-
self.state.handle_poll_error(&shard_id, e)?
81+
had_error = true;
82+
self.state.handle_poll_error(&shard_id, &e)
8583
}
8684
};
87-
poll_results.push(poll_result);
85+
if let Some(poll_result) = poll_result {
86+
poll_results.push(poll_result);
87+
}
8888
}
8989

9090
// 4. Discover new shards
91-
// If permanent error is encountered, it is surfaced to the client.
9291
match self.client.get_all_shards(&self.stream_arn).await {
93-
Ok(shards) => self.state.add_discovered(&shards)?,
92+
Ok(shards) => self.state.add_discovered(&shards),
9493
Err(e) => {
95-
if !e.is_retriable() {
96-
return Err(e);
97-
}
98-
had_transient_error = true;
94+
had_error = true;
9995
tracing::warn!("Failed to discover new shards. Will retry on next iteration: {e}");
10096
}
10197
}
10298

103-
Ok((combine_shard_batches(&poll_results), had_transient_error))
99+
(combine_shard_batches(&poll_results), had_error)
104100
}
105101

106-
async fn initialize_shards_iterators(&mut self) -> Result<()> {
102+
async fn initialize_shards_iterators(&mut self) -> bool {
107103
let shards: Vec<InitializingShard> =
108104
self.state.get_initializing_shards().cloned().collect();
109105

106+
let mut had_error = false;
110107
for shard in shards {
111108
// Shards that were already polled use `After`.
112109
// Newly discovered shards use `At`.
@@ -129,18 +126,15 @@ impl DynamodbStreamProducer {
129126
self.state.mark_active(shard.shard_id, iterator);
130127
}
131128
Err(e) => {
132-
if !e.is_retriable() {
133-
return Err(e);
134-
}
129+
had_error = true;
135130
tracing::warn!(
136-
"Failed to initialize shard. Will retry on next iteration : {}",
131+
"Failed to initialize shard. Will retry on next iteration: {}",
137132
e
138133
);
139134
}
140135
}
141136
}
142-
143-
Ok(())
137+
had_error
144138
}
145139

146140
pub async fn streaming(mut self) {
@@ -151,65 +145,46 @@ impl DynamodbStreamProducer {
151145
*guard = self.state.get_active_shards().count();
152146
}
153147

154-
match self.collect().await {
155-
Ok((batch, had_transient_error)) => {
156-
let is_batch_empty = batch.records.is_empty();
157-
158-
self.metrics_collector
159-
.records
160-
.fetch_add(batch.records.len(), Ordering::Relaxed);
161-
162-
if let Some(watermark) = batch.watermark
163-
&& let Ok(mut wm) = self.metrics_collector.watermark.write()
164-
{
165-
*wm = Some(watermark);
166-
}
167-
168-
if had_transient_error {
169-
self.metrics_collector
170-
.transient_errors
171-
.fetch_add(1, Ordering::Relaxed);
172-
}
173-
174-
// Send batch even if it's empty
175-
if self.sender.send(Ok(batch)).await.is_err() {
176-
return;
177-
}
178-
179-
if had_transient_error {
180-
// Transient error occurred during collection - apply backoff
181-
if let Some(mut duration) = backoff.next_backoff() {
182-
// Avoid sleeping for more than 60 seconds
183-
if duration > Duration::from_secs(60) {
184-
duration = Duration::from_secs(1);
185-
backoff.reset();
186-
}
187-
tokio::time::sleep(duration).await;
188-
} else {
189-
// Backoff exhausted - transient errors persisted too long
190-
// Shouldn't happen as we should have infinite retries.
191-
return;
192-
}
193-
} else {
194-
// Clean success - reset backoff and use normal interval
195-
backoff = self.retry_strategy.clone();
196-
197-
if is_batch_empty {
198-
// To avoid throttling - wait at least 500ms before polling again
199-
sleep(
200-
DEFAULT_SLEEP_DURATION
201-
.max(self.interval.unwrap_or(Duration::from_secs(0))),
202-
)
203-
.await;
204-
} else if let Some(duration) = self.interval {
205-
sleep(duration).await;
206-
}
207-
}
148+
let (batch, had_error) = self.collect().await;
149+
let is_batch_empty = batch.records.is_empty();
150+
151+
self.metrics_collector
152+
.records
153+
.fetch_add(batch.records.len(), Ordering::Relaxed);
154+
155+
if let Some(watermark) = batch.watermark
156+
&& let Ok(mut wm) = self.metrics_collector.watermark.write()
157+
{
158+
*wm = Some(watermark);
159+
}
160+
161+
if had_error {
162+
self.metrics_collector
163+
.transient_errors
164+
.fetch_add(1, Ordering::Relaxed);
165+
}
166+
167+
// Send batch even if it's empty
168+
if self.sender.send(batch).await.is_err() {
169+
return;
170+
}
171+
172+
if had_error {
173+
if let Some(duration) = backoff.next_backoff() {
174+
tokio::time::sleep(duration).await;
208175
}
209-
Err(e) => {
210-
// Permanent error - return immediately without retry
211-
let _ = self.sender.send(Err(e)).await;
212-
return;
176+
} else {
177+
// Clean cycle — reset backoff and use normal interval
178+
backoff = self.retry_strategy.clone();
179+
180+
if is_batch_empty {
181+
// To avoid throttling - wait at least 500ms before polling again
182+
sleep(
183+
DEFAULT_SLEEP_DURATION.max(self.interval.unwrap_or(Duration::from_secs(0))),
184+
)
185+
.await;
186+
} else if let Some(duration) = self.interval {
187+
sleep(duration).await;
213188
}
214189
}
215190
}
@@ -290,11 +265,11 @@ fn combine_shard_batches(poll_results: &[ShardPollResult]) -> DynamoDBStreamBatc
290265

291266
#[derive(Debug)]
292267
pub struct DynamodbStream {
293-
pub receiver: mpsc::Receiver<StreamResult>,
268+
pub receiver: mpsc::Receiver<DynamoDBStreamBatch>,
294269
}
295270

296271
impl Stream for DynamodbStream {
297-
type Item = StreamResult;
272+
type Item = DynamoDBStreamBatch;
298273

299274
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
300275
self.receiver.poll_recv(cx)

0 commit comments

Comments
 (0)