Open
Description
Using the High-level SDK, a consumer currently running is not replaying offsets after an update via the CLI.
iggy --username xxxx--password xxxxx --tcp-server-address localhost:5100 o s <CONSUMER_GROUP> <STREAM_ID> <TOPIC_ID> <PARTITION_ID> 0
High-level SDK code:
let mut consumer = site_management
.client
.consumer_group("neo-all-in-one", "neo", "site_events")?
.auto_commit(AutoCommit::IntervalOrWhen(
IggyDuration::from_str("1s").map_err(|_e| IggyError::Error)?,
AutoCommitWhen::ConsumingAllMessages,
))
.create_consumer_group_if_not_exists()
.auto_join_consumer_group()
.polling_strategy(PollingStrategy::next())
.poll_interval(IggyDuration::from_str("1ms").map_err(|_e| IggyError::Error)?)
.batch_size(1000)
.build();
consumer.init().await?;
//Start consuming the messages
while let Some(message) = consumer.next().await {
let received_message = message?;
info!("message received: {:?}", received_message.message.payload);
handle_message(&received_message.message).await.map_err(|_| IggyError::Error)?;
consumer.store_offset(received_message.current_offset,Some(received_message.partition_id)).await?;
}
Doing the following in code works:
consumer.store_offset(0,Some(1),).await?;
Metadata
Assignees
Labels
No labels