Skip to content

Commit 17c5d55

Browse files
tabVersionclaude
andcommitted
feat(meta): enhance inject source offsets with better validation and feedback
- Validate split offsets against runtime actor assignments (SharedActorInfos) instead of enumerator to support adaptive splits correctly. - Return the list of actually applied split IDs in the RPC response. - Fail the executor explicitly if offset injection fails (e.g. invalid format) to ensure consistency. - Update CLI to show applied splits to the user. - Fix e2e test offset calculation to account for start_offset behavior. Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 7ee869c commit 17c5d55

File tree

2 files changed

+7
-3
lines changed

2 files changed

+7
-3
lines changed

e2e_test/source_inline/kafka/alter/alter_source_properties_safe.slt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ SELECT COUNT(*) FROM mv_test;
112112
9
113113

114114
# Get Kafka offsets with skip_last=4 to go back 4 messages
115-
# This will inject offset = high_watermark - 4, causing re-consumption of last 4 messages
115+
# The script calculates offset = high_watermark - 4 - 1.
116+
# Since inject-source-offsets treats the value as "last consumed offset",
117+
# consumption will resume from (high_watermark - 4 - 1) + 1 = high_watermark - 4.
118+
# This causes re-consumption of the last 4 messages.
116119
system ok
117120
python3 e2e_test/source_inline/kafka/alter/get_kafka_offsets.py \
118121
--topic test_safe_update \

e2e_test/source_inline/kafka/alter/get_kafka_offsets.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ def get_kafka_offsets(broker, topic, skip_last_n=2):
3131
tp = TopicPartition(topic, partition_id)
3232
low, high = consumer.get_watermark_offsets(tp)
3333
# Set offset to skip last N messages
34-
# This simulates moving forward in the stream
35-
new_offset = max(0, high - skip_last_n)
34+
# inject_source_offsets expects "last_seen_offset", so start_offset = offset + 1.
35+
# We subtract 1 to ensure we start reading from (high - skip_last_n).
36+
new_offset = high - skip_last_n - 1
3637
offsets[str(partition_id)] = str(new_offset)
3738

3839
return offsets

0 commit comments

Comments
 (0)