feat: add remote dynamic filter frontend registration#8148
Conversation
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
There was a problem hiding this comment.
Code Review
This pull request implements remote dynamic filter propagation and registration within the distributed query engine. It introduces InitialDynFilterRegs for serializing filter state, a query-scoped RemoteDynFilterRegistry for lifecycle management, and updates to MergeScanExec to capture and bridge filters to remote regions. Review feedback identifies several areas for improvement, including addressing a potential memory leak in the datanode's registration map, replacing silent failures with explicit error propagation during registration decoding, and optimizing DashMap access to reduce lock contention. Additionally, suggestions were made to ensure structured logging for query identifiers and to improve the efficiency of subscriber registration.
| mito_engine: RwLock<Option<MitoEngine>>, | ||
| /// TODO(remote-dyn-filter): Reap this query-scoped placeholder registry on query finish/cancel | ||
| /// and later fold it into the real remote dyn filter runtime state lifecycle. | ||
| initial_remote_dyn_filter_registrations: DashMap<String, DashMap<String, RegisteredDynFilter>>, |
There was a problem hiding this comment.
The initial_remote_dyn_filter_registrations map is a potential memory leak as entries are only removed on error in handle_read. Successful queries will leave their registrations in the map indefinitely. While the TODO acknowledges this, it should be addressed to prevent unbounded memory growth in production environments.
| pub(super) fn initial_dyn_filter_regs_from_query_ctx( | ||
| query_ctx: &QueryContextRef, | ||
| ) -> Option<InitialDynFilterRegs> { | ||
| let registrations = | ||
| query_ctx.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)?; | ||
| match InitialDynFilterRegs::from_extension_value(registrations) { | ||
| Ok(registrations) => match registrations.validate_default_bounds() { | ||
| Ok(()) => Some(registrations), | ||
| Err(error) => { | ||
| warn!(error; "Initial remote dyn filter registrations exceeded Task 03 bounds"); | ||
| None | ||
| } | ||
| }, | ||
| Err(error) => { | ||
| warn!(error; "Failed to decode initial remote dyn filter registrations from query context"); | ||
| None | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
If decoding or validation of the initial registrations fails, the function returns None and the query proceeds without these filters. This silent failure can lead to performance degradation or unexpected behavior that is difficult to debug. In accordance with repository practices to avoid silent failures and prefer error propagation, consider returning a Result and using methods like .with_context(...)? to fail the query early if the provided registrations are invalid.
References
- In Rust, avoid using .unwrap() on Result types. Instead, use error propagation methods like .with_context(...)? (e.g., using snafu) to handle errors gracefully and prevent panics, particularly when dealing with complex expression builders that return a Result.
| return; | ||
| } | ||
|
|
||
| let query_regs = regs_by_query.entry(query_id.to_string()).or_default(); |
There was a problem hiding this comment.
The use of regs_by_query.entry(...).or_default() acquires a write lock on the shard of the outer DashMap. Since all regions for the same query share the same query_id, they will all contend for the same shard lock, effectively serializing the registration process. It is more efficient to use get first to allow concurrent access to the inner map.
let query_regs = if let Some(regs) = regs_by_query.get(query_id) {
regs
} else {
regs_by_query.entry(query_id.to_string()).or_default();
regs_by_query.get(query_id).expect("entry just created")
};| warn!( | ||
| query_id, | ||
| filter_id = reg.filter_id, | ||
| region_id = %region_id, | ||
| "Duplicate initial dyn filter reg ignored" | ||
| ); |
There was a problem hiding this comment.
The positional argument query_id in the warn! macro is likely intended to be a structured field. In many logging frameworks (like tracing), positional arguments are treated as the message. Using %query_id or query_id = %query_id ensures it is correctly captured as a structured field.
| warn!( | |
| query_id, | |
| filter_id = reg.filter_id, | |
| region_id = %region_id, | |
| "Duplicate initial dyn filter reg ignored" | |
| ); | |
| warn!( | |
| %query_id, | |
| filter_id = reg.filter_id, | |
| region_id = %region_id, | |
| "Duplicate initial dyn filter reg ignored" | |
| ); |
| pub fn register_subscriber(&self, subscriber: Subscriber) -> bool { | ||
| let mut subscribers = self.subscribers.write().unwrap(); | ||
| if subscribers.contains(&subscriber) { | ||
| return false; | ||
| } | ||
|
|
||
| subscribers.push(subscriber); | ||
| true | ||
| } |
There was a problem hiding this comment.
Using Vec::contains followed by push results in O(N) complexity for each subscriber registration. For queries involving a large number of regions, the total registration time becomes O(N^2). Consider using a HashSet<RegionId> or keeping the Vec sorted to improve efficiency for queries with high fan-out.
Summary
Scope
This is remote dynamic filter Task 03. It reuses the Task 01/02 remote dyn filter ABI and intentionally leaves fanout/runtime/apply/remap/full lifecycle cleanup to later tasks.
Validation