Skip to content

[Data] refactor checkpointfilter with anti-join #60764

@my-vegetable-has-exploded

Description

Description

Currently, each ReadTask initializes a checkpointer to filter rows whose primary key already exists in checkpoint_id (similar to a broadcast join). This is a typical anti-join pattern. I think that we refactor the checkpoint filter using an anti-join approach.

By adopting anti-join, the checkpoint filter may benefit from optimizer rules:
1.The optimizer can choose between broadcast join or distributed hash join based on dataset statistics, potentially resolving efficiency or memory issues (such as #60294).
2.Once the runtimefilter/dynamic optimizer is supported, the checkpoint filter may be pushed down to the source side.

However, currently the hash join is based on ShuffleAggregation, which remains a pipeline broker. This architecture blocks the streaming execution pipeline, as all data must be completely shuffled and materialized before downstream operators can begin processing, thereby preventing true pipelined execution.

ptal @wxwmd @owenowenisme

Metadata

Metadata

Assignees

No one assigned

    Labels

    community-backlogenhancementRequest for new feature and/or capabilitytriageNeeds triage (eg: priority, bug/not-bug, and owning component)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions