feat: support pending flow metadata with defer_on_missing_source#8124
feat: support pending flow metadata with defer_on_missing_source#8124discord9 wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a 'pending flow' mechanism, allowing users to create flows with missing source tables by specifying the defer_on_missing_source option. Key additions include the ActivatePendingFlowProcedure for transitioning flows to an active state and a background PendingFlowReconcileManager that periodically checks for available sources. Metadata structures were updated to track flow status and unresolved dependencies. Feedback suggests optimizing the reconciliation loop by removing redundant metadata fetches and addressing a limitation where pending flows are currently forced into batch mode, potentially preventing them from utilizing streaming capabilities upon activation.
| let current_flow_info = ddl_context | ||
| .flow_metadata_manager | ||
| .flow_info_manager() | ||
| .get_raw(flow_id) | ||
| .await; | ||
| let current_flow_info = match current_flow_info { | ||
| Ok(current_flow_info) => current_flow_info, | ||
| Err(e) => { | ||
| error!(e; "Failed to load flow metadata for pending flow {}", flow_id); | ||
| return Ok(()); | ||
| } | ||
| }; | ||
| let Some(current_flow_info) = current_flow_info else { | ||
| return Ok(()); | ||
| }; | ||
| if !current_flow_info.get_inner_ref().is_pending() { | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
The call to get_raw(flow_id) here is redundant and inefficient. The flow_infos() stream already provides the full FlowInfoValue for each flow. Since the manager only needs to decide whether to trigger the activation procedure (which will perform its own up-to-date checks and metadata updates), re-fetching the metadata from the KV store for every pending flow on every tick adds unnecessary overhead and latency to the reconciliation loop.
References
- Avoid redundant data fetching or processing within loops when the data is already available or can be retrieved once before the loop starts.
| "Flow `{}` defaults to batching because defer_on_missing_source=true and some source tables are not available yet", | ||
| flow_name | ||
| ); | ||
| return Ok(Some(FlowType::Batching)); |
There was a problem hiding this comment.
Forcing FlowType::Batching for all pending flows is a significant limitation. If a user creates a flow that would otherwise be a streaming flow (e.g., a simple filter without aggregations), it will be permanently stuck in batch mode after activation because the flow type is determined at creation time and stored in metadata. Since flow_type is currently rejected as a user-provided option, there is no way for a user to override this behavior for pending flows. Consider allowing the flow_type option in WITH when defer_on_missing_source is enabled, or re-evaluating the flow type during activation.
Add `defer_on_missing_source` flow option that allows creating flows even when source tables do not yet exist. The flow enters a pending state and is automatically activated when source tables become available. Key changes: - New `FlowStatus::PendingSources` and fields in `FlowInfoValue` for unresolved source table names and last activation error - `defer_on_missing_source` create-time-only option: stripped from runtime/flownode `CreateRequest` but preserved in metadata for SQL round-trip (`SHOW CREATE FLOW`, `information_schema.flows`) - `CreateFlowProcedure` creates pending metadata when sources are missing and `defer_on_missing_source=true`; falls back to `FlowType::Batching` for missing-source flows - `PendingFlowReconcileManager` in meta-srv periodically checks pending flows and activates them when source tables resolve - `ActivatePendingFlowProcedure` handles activation: allocates peers, creates flows on flownodes, updates metadata, invalidates cache - `OR REPLACE` properly handles pending<->active transitions, including peer allocation and flownode flow teardown - `FlowMetadataAllocator::alloc_peers` for peer allocation at activation time - Validated flow options: only `defer_on_missing_source` allowed; unknown options rejected - Known issue: standalone mode does not support flownodes, so pending flow flush/sink behavior covered only in distributed sqlness; operator and meta unit tests cover activation logic Tests: - operator `determine_flow_type_for_source_state` (3 passed) - common-meta `create_flow` (19 passed) including replacement - common-meta `activate_flow` (4 passed) - meta-srv `flow` (11 passed) - sqlness: `flow_pending` covers create/replace/round-trip Signed-off-by: discord9 <discord9@163.com>
Reduce PR GreptimeTeam#8124 to the metadata-only MVP after complexity review. Changes: - Remove automatic activation procedure and meta-srv reconcile wiring - Remove activation tests and activation-only metadata fields - Reject cross-state pending<->active `OR REPLACE` transitions for now - Keep pending metadata creation and SQL round-trip behavior - Allow `DROP FLOW` for pending flows without routes - Reduce flow_pending sqlness to metadata/round-trip/drop coverage only Deferred follow-ups are documented locally in `.tmp/tasks/pending-defer-semantics/deferred-followups.md` and intentionally not committed. Tests: - `cargo test -p operator determine_flow_type_for_source_state` - `cargo test -p common-meta create_flow` - `cargo test -p common-meta drop_flow` - `cargo sqlness bare --test-filter flow_pending --bins-dir /mnt/nvme_rust/rust-targets/pending_defer/debug`
Reduce PR GreptimeTeam#8124 to the metadata-only MVP after complexity review. Changes: - Remove automatic activation procedure and meta-srv reconcile wiring - Remove activation tests and activation-only metadata fields - Reject cross-state pending<->active `OR REPLACE` transitions for now - Keep pending metadata creation and SQL round-trip behavior - Allow `DROP FLOW` for pending flows without routes - Reduce flow_pending sqlness to metadata/round-trip/drop coverage only Deferred follow-ups are documented locally in `.tmp/tasks/pending-defer-semantics/deferred-followups.md` and intentionally not committed. Tests: - `cargo test -p operator determine_flow_type_for_source_state` - `cargo test -p common-meta create_flow` - `cargo test -p common-meta drop_flow` - `cargo sqlness bare --test-filter flow_pending --bins-dir /mnt/nvme_rust/rust-targets/pending_defer/debug` Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Summary
defer_on_missing_sourceflow option for metadata-only pending flow creation when source tables are missingdefer_on_missing_sourcein metadata forSHOW CREATE FLOW/information_schema.flows, while stripping it from runtime/flownodeCreateRequestScope
This draft is intentionally reduced to the metadata-only MVP after complexity review.
Included:
defer_on_missing_sourceis supportedFlowStatus::PendingSources, source names, unresolved source namesOR REPLACEtransitions for nowDeferred follow-ups:
Tests
cargo test -p operator determine_flow_type_for_source_statecargo test -p common-meta create_flowcargo test -p common-meta drop_flowcargo sqlness bare --test-filter flow_pending --bins-dir /mnt/nvme_rust/rust-targets/pending_defer/debug