feat: flownode inc query checkpoint#8132
Conversation
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
There was a problem hiding this comment.
Code Review
This pull request implements incremental read support for batching flows by introducing a state machine that manages transitions between full snapshot and incremental modes based on region watermarks. Key changes include logic to analyze and rewrite aggregate plans into delta-left-join-sink merges, extensions to the FrontendClient for handling terminal metrics, and the addition of comprehensive integration tests. Feedback from the review focuses on improving the robustness of error handling by replacing brittle string parsing with structured error variants and simplifying the logic for validating incremental checkpoint advancement to remove redundant checks.
| pub fn inspect_query_error(err: &Error) -> FlowQueryFailure { | ||
| let debug = format!("{err:?}"); | ||
| let stale_cursor = parse_stale_cursor_detail(&debug); | ||
| FlowQueryFailure { stale_cursor } | ||
| } |
There was a problem hiding this comment.
Parsing the debug representation of an error (format!("{err:?}")) is brittle and can easily break if the error formatting changes in the future. It would be more robust to introduce a specific error variant for "stale cursor" and propagate it as a structured error. This is especially important for correctly identifying connection-related errors, as only those should trigger a connection reset. While the TODO comment acknowledges this, this should be prioritized to improve the robustness of error handling.
References
- Only reset a client connection for connection-related errors. Robust error identification is necessary to follow this rule.
| pub fn can_advance_incremental_checkpoints_with_participation( | ||
| &self, | ||
| participating_regions: &BTreeSet<u64>, | ||
| watermark_map: &HashMap<u64, u64>, | ||
| ) -> bool { | ||
| !self.checkpoints.is_empty() | ||
| && !participating_regions.is_empty() | ||
| && participating_regions.len() == watermark_map.len() | ||
| && participating_regions | ||
| .iter() | ||
| .all(|region_id| self.checkpoints.contains_key(region_id)) | ||
| && participating_regions.iter().all(|region_id| { | ||
| let checkpoint = self.checkpoints.get(region_id); | ||
| watermark_map | ||
| .get(region_id) | ||
| .zip(checkpoint) | ||
| .is_some_and(|(seq, checkpoint)| seq >= checkpoint) | ||
| }) | ||
| } |
There was a problem hiding this comment.
This function's logic is a bit complex and can be simplified for better readability and maintainability. The check participating_regions.iter().all(|region_id| self.checkpoints.contains_key(region_id)) is redundant, as the subsequent check already covers this. Using a match statement can also make the logic more explicit and easier to understand than zip().is_some_and().
pub fn can_advance_incremental_checkpoints_with_participation(
&self,
participating_regions: &BTreeSet<u64>,
watermark_map: &HashMap<u64, u64>,
) -> bool {
!self.checkpoints.is_empty()
&& !participating_regions.is_empty()
&& participating_regions.len() == watermark_map.len()
&& participating_regions.iter().all(|region_id| {
match (self.checkpoints.get(region_id), watermark_map.get(region_id)) {
(Some(checkpoint), Some(seq)) => seq >= checkpoint,
_ => false,
}
})
}Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
02071a4 to
4824025
Compare
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
PR Checklist
Please convert it to a draft if some of the following conditions are not met.