-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Improve][Zeta] Decouple Hazelcast IMap via StateStore Abstraction #10256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
…e/imap-interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a StateStore abstraction to decouple SeaTunnel from direct Hazelcast IMap dependencies, making it easier to swap out storage backends (e.g., to RocksDB) in the future.
Key changes:
- Introduces
StateStoreinterface andStateStoreFactoryto abstract storage operations - Implements
HazelcastStateStoreandHazelcastStateStoreFactoryas the current Hazelcast-backed implementation - Adds
DistributedStoreManagerto centralize StateStore instance creation - Migrates all IMap usages throughout the codebase to use StateStore abstraction
- Updates variable names, method names, and comments from "IMap" terminology to "StateStore" terminology
Reviewed changes
Copilot reviewed 31 out of 31 changed files in this pull request and generated 20 comments.
Show a summary per file
| File | Description |
|---|---|
| StateStore.java | New interface defining storage abstraction with map-like operations |
| StateStoreFactory.java | Factory interface for creating StateStore instances |
| HazelcastStateStore.java | Hazelcast IMap implementation of StateStore interface |
| HazelcastStateStoreFactory.java | Factory for creating HazelcastStateStore instances |
| DistributedStoreManager.java | Manager class that provides StateStore instances via factory |
| JobMaster.java | Updated to use StateStore instead of IMap for job state management |
| CoordinatorService.java | Migrated IMap fields to StateStore for coordinator state |
| CheckpointCoordinator.java | Updated checkpoint state storage to use StateStore |
| SeaTunnelServer.java | Changed metrics storage from IMap to StateStore |
| JobHistoryService.java | Updated job history storage to use StateStore abstraction |
| PhysicalPlan.java, SubPlan.java, PhysicalVertex.java | Updated physical execution plan state storage to StateStore |
| Multiple test files | Updated tests to use StateStore and DistributedStoreManager |
| AbstractSeaTunnelServerTest.java | Added DistributedStoreManager field for test infrastructure |
| BaseService.java, JobInfoService.java | Updated REST service layer to use StateStore |
| SharedConnectorJarStorageStrategy.java | Migrated connector JAR ref counting to StateStore |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...l-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
Outdated
Show resolved
Hide resolved
...-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
Outdated
Show resolved
Hide resolved
...atunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
Outdated
Show resolved
Hide resolved
...atunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
Outdated
Show resolved
Hide resolved
...nel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
Show resolved
Hide resolved
...ine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
Outdated
Show resolved
Hide resolved
...ine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
Outdated
Show resolved
Hide resolved
...erver/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
...l-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
Outdated
Show resolved
Hide resolved
...l-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
Outdated
Show resolved
Hide resolved
…che/seatunnel/engine/server/CoordinatorServiceTest.java Co-authored-by: Copilot <[email protected]>
…che/seatunnel/engine/server/CoordinatorServiceTest.java Co-authored-by: Copilot <[email protected]>
…che/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java Co-authored-by: Copilot <[email protected]>
…che/seatunnel/engine/server/dag/physical/SubPlan.java Co-authored-by: Copilot <[email protected]>
…che/seatunnel/engine/server/master/JobHistoryService.java Co-authored-by: Copilot <[email protected]>
…che/seatunnel/engine/server/master/JobMaster.java Co-authored-by: Copilot <[email protected]>
…che/seatunnel/engine/server/execution/TaskExecutionContext.java Co-authored-by: Copilot <[email protected]>
…che/seatunnel/engine/server/dag/physical/SubPlan.java Co-authored-by: Copilot <[email protected]>
…che/seatunnel/engine/server/CoordinatorServiceTest.java Co-authored-by: Copilot <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 31 out of 31 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...unnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
Show resolved
Hide resolved
...ine-server/src/main/java/org/apache/seatunnel/engine/server/storage/HazelcastStateStore.java
Show resolved
Hide resolved
...server/src/main/java/org/apache/seatunnel/engine/server/storage/DistributedStoreManager.java
Show resolved
Hide resolved
...unnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
Show resolved
Hide resolved
...unnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
Show resolved
Hide resolved
…e/imap-interface # Conflicts: # seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
|
Hi @zhangshenghang , PTAL when you have a time. Thanks! |
Fixes: #10181
Purpose of this pull request
Background
Currently, SeaTunnel relies directly on Hazelcast IMap for internal state management. This tight coupling makes it difficult to:
Proposal
Introduce a
StateStoreabstraction to encapsulate all operations currently performed on Hazelcast IMap, along with aStateStoreFactoryto create instances ofStateStore.Components
StateStoreinstances.StateStorevia theNodeEngine.NodeEngineto theDistributedStoreManagerto obtain aStateStore.This design allows future replacement of Hazelcast IMap with RocksDB or any other storage engine by simply implementing
StateStoreandStateStoreFactoryfor the new engine. Additionally, creating a storage implementation using RocksDB (or others) becomes clear and straightforward.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Covered by existing tests.
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.