feat: add RDF bloom payload model#8354
Conversation
There was a problem hiding this comment.
Code Review
This pull request transitions the project's DataFusion dependency to a new fork and introduces a join-hash Bloom filter payload model (JoinHashBloomPayload) along with its corresponding probe expression (JoinHashBloomProbeExpr) to optimize remote dynamic filter updates. It also adds robust encoding, decoding, and fallback mechanisms, supported by extensive unit tests. The code review feedback suggests restoring the deleted documentation comments for DynFilterUpdate and QueryRequest to preserve code clarity, and simplifying the ceiling division math in num_bits_ceil_bytes using a more standard integer arithmetic pattern.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| /// A remote dynamic filter update sent from a query coordinator to region servers. | ||
| #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] | ||
| pub struct DynFilterUpdate { | ||
| /// Protocol version used by this update payload. | ||
| pub protocol_version: u32, | ||
| /// Internal query identifier that owns this dynamic filter lifecycle. | ||
| pub query_id: String, | ||
| /// Identifier of the dynamic filter within the query. | ||
| pub filter_id: String, | ||
| /// Monotonic update generation for this filter. | ||
| pub generation: u64, | ||
| /// Whether this update completes the dynamic filter stream. | ||
| pub is_complete: bool, | ||
| /// Serialized predicate payload carried by this update. | ||
| pub payload: DynFilterPayload, | ||
| } |
There was a problem hiding this comment.
The detailed documentation for DynFilterUpdate and its fields has been removed. This reduces code clarity and makes it harder for future developers to understand the purpose of each field. Please consider restoring the documentation.
/// A remote dynamic filter update sent from a query coordinator to region servers.
///
/// `generation` is monotonic within a `query_id`/`filter_id` pair and matches the
/// gRPC field name used by `RemoteDynFilterUpdate`. Receivers use it to ignore
/// stale updates while `is_complete` marks the final payload for the filter.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct DynFilterUpdate {
/// Protocol version used by this update payload.
pub protocol_version: u32,
/// Internal query identifier that owns this dynamic filter lifecycle.
pub query_id: String,
/// Identifier of the dynamic filter within the query.
pub filter_id: String,
/// Monotonic update generation for this filter.
pub generation: u64,
/// Whether this update completes the dynamic filter stream.
pub is_complete: bool,
/// Serialized predicate payload carried by this update.
pub payload: DynFilterPayload,
}| /// The query request to be handled by the RegionServer (Datanode). | ||
| #[derive(Clone, Debug)] | ||
| pub struct QueryRequest { | ||
| /// The header of this request. Often to store some context of the query. None means all to defaults. | ||
| pub header: Option<RegionRequestHeader>, | ||
|
|
||
| /// The id of the region to be queried. | ||
| pub region_id: RegionId, | ||
|
|
||
| /// The form of the query: a logical plan. | ||
| pub plan: LogicalPlan, | ||
| } |
There was a problem hiding this comment.
The doc comments for the fields of QueryRequest have been removed. To maintain code clarity and help other developers understand the structure, please consider adding them back.
/// The query request to be handled by the RegionServer (Datanode).
#[derive(Clone, Debug)]
pub struct QueryRequest {
/// The header of this request. Often to store some context of the query. None means all to defaults.
pub header: Option<RegionRequestHeader>,
/// The id of the region to be queried.
pub region_id: RegionId,
/// The form of the query: a logical plan.
pub plan: LogicalPlan,
}| fn num_bits_ceil_bytes(num_bits: usize) -> usize { | ||
| (num_bits / 8) + usize::from(!num_bits.is_multiple_of(8)) | ||
| } |
There was a problem hiding this comment.
The implementation of num_bits_ceil_bytes is a bit obscure. For better readability and maintainability, consider using the more conventional integer arithmetic pattern (num_bits + 7) / 8 for ceiling division.
| fn num_bits_ceil_bytes(num_bits: usize) -> usize { | |
| (num_bits / 8) + usize::from(!num_bits.is_multiple_of(8)) | |
| } | |
| fn num_bits_ceil_bytes(num_bits: usize) -> usize { | |
| (num_bits + 7) / 8 | |
| } |
7768ea3 to
7b323f3
Compare
c49d164 to
8aaf683
Compare
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
8aaf683 to
532e9f9
Compare
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
This is PR 1a of the RDF Bloom stack. Follow-up fork-only stack PRs:
Depends on greptime-proto payload PR: GreptimeTeam/greptime-proto#325
Part of original umbrella draft PR: #8342
What's changed and what's your intention?
This PR introduces only the typed RDF Bloom payload model and serialization plumbing:
DynFilterPayload::JoinHashBloomas a typed remote dynamic filter payload variantJoinHashBloomPayloadmodel/proto conversion/validationJoinHashKind, noBloomHashAlgorithm, and no serializeddistinct_hash_count;df_seed0..3remain for receiver-side DataFusion hash reconstructionBloom probing and HashTableLookupExpr encoding are intentionally split into follow-up PR1b/PR1c to keep this review bounded.
Validation run locally:
cargo fmt --all -- --checkgit diff --checkCARGO_PROFILE_DEV_DEBUG=0 CARGO_PROFILE_TEST_DEBUG=0 cargo test -p common-query request --libPR Checklist
Please convert it to a draft if some of the following conditions are not met.