Add Idempotency-Key support for Iceberg REST createTable#4659
Add Idempotency-Key support for Iceberg REST createTable#4659huaxingao wants to merge 9 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds handler-level idempotency support for Iceberg REST createTable, backed by a pluggable per-realm idempotency store and optional background purging.
Changes:
- Introduces
IdempotencyHandlerSupport,IdempotencyConfiguration, CDI wiring, and a purge maintenance task. - Integrates idempotency into
IcebergCatalogAdapter→IcebergCatalogHandler#createTableDirectflow and updates test fixtures. - Adds in-memory/JDBC store implementations and new unit/integration tests validating behavior.
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java | Allows tests to opt into idempotency wiring and passes support into adapter/handler. |
| runtime/service/src/test/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupportTest.java | Unit tests for key validation and binding/conflict behavior. |
| runtime/service/src/test/java/org/apache/polaris/service/idempotency/IdempotencyCreateTableTest.java | End-to-end idempotency tests for createTable via REST adapter. |
| runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerTest.java | Updates handler construction to include idempotency support. |
| runtime/service/src/main/java/org/apache/polaris/service/idempotency/InMemoryCdiIdempotencyStoreFactory.java | CDI wrapper for selecting the in-memory store backend by identifier. |
| runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyStoreFactoryProducer.java | Produces the configured IdempotencyStoreFactory backend via @Identifier selection. |
| runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyMaintenance.java | Adds periodic purge of expired idempotency records. |
| runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java | Core handler-side idempotency helper (validation, hashing, preflight/record). |
| runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyConfiguration.java | Quarkus config mapping for enabling and tuning idempotency. |
| runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandlerFactory.java | Injects and supplies idempotency support into handlers. |
| runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java | Implements createTable idempotency flow with replay and binding checks. |
| runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java | Validates the Idempotency-Key param and forwards it to the handler. |
| polaris-core/src/test/java/org/apache/polaris/core/persistence/InMemoryIdempotencyStoreTest.java | Tests for the in-memory store semantics (record/load/purge). |
| polaris-core/src/main/java/org/apache/polaris/core/persistence/InMemoryIdempotencyStoreFactory.java | New core factory vending per-realm in-memory stores. |
| polaris-core/src/main/java/org/apache/polaris/core/persistence/InMemoryIdempotencyStore.java | New core in-memory IdempotencyStore implementation. |
| polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStoreFactory.java | New SPI for per-realm store factories. |
| polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java | Simplifies SPI to optimistic-commit model (load + recordIfAbsent + purge). |
| polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java | Updates record model to finalized-only fields (principal hash, status, ttl). |
| persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java | Updates JDBC store ITs to match new SPI and schema. |
| persistence/relational-jdbc/src/main/resources/postgres/schema-v4.sql | Updates schema for finalized-only idempotency records + indexes for purge. |
| persistence/relational-jdbc/src/main/resources/h2/schema-v4.sql | Same finalized-only schema changes for H2. |
| persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4.sql | Same finalized-only schema changes for CockroachDB. |
| persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java | Updates JDBC model mapping to new record columns. |
| persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStoreFactory.java | Adds CDI factory for JDBC-backed idempotency store. |
| persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java | Reworks JDBC store to recordIfAbsent optimistic-insert semantics. |
| CHANGELOG.md | Documents new handler-level idempotency behavior and configuration. |
| @Produces | ||
| @Singleton | ||
| public IdempotencyStoreFactory idempotencyStoreFactory( | ||
| IdempotencyConfiguration configuration, @Any Instance<IdempotencyStoreFactory> factories) { | ||
| return factories.select(Identifier.Literal.of(configuration.type())).get(); | ||
| } |
| } catch (SQLException e) { | ||
| throw new IdempotencyPersistenceException("Failed to finalize idempotency record", e); | ||
| if (datasourceOperations.isUniquenessConstraintViolation(e)) { | ||
| return new RecordResult(RecordResultType.DUPLICATE, load(realmId, idempotencyKey)); | ||
| } | ||
| throw new IdempotencyPersistenceException("Failed to record idempotency entry", e); | ||
| } |
| String resourceId = | ||
| idempotencySupport() | ||
| .resourceHash( | ||
| "create-table:" | ||
| + namespace.toString() | ||
| + ":" | ||
| + request.name() | ||
| + ":" | ||
| + delegationModesToken(delegationModes)); |
| public Builder idempotencySupport(IdempotencyHandlerSupport idempotencySupport) { | ||
| this.idempotencySupport = idempotencySupport; | ||
| return this; | ||
| } |
| void onStart(@Observes StartupEvent event) { | ||
| if (!configuration.enabled() || !configuration.purgeEnabled()) { | ||
| return; | ||
| } | ||
| long intervalMs = configuration.purgeInterval().toMillis(); | ||
| purgeTimerId = | ||
| vertx.setPeriodic( | ||
| intervalMs, | ||
| ignored -> { | ||
| if (!purgeRunning.compareAndSet(false, true)) { | ||
| return; | ||
| } | ||
| Infrastructure.getDefaultWorkerPool() | ||
| .execute( | ||
| () -> { | ||
| try { | ||
| purgeOnce(); | ||
| } finally { | ||
| purgeRunning.set(false); | ||
| } | ||
| }); | ||
| }); | ||
| } |
99c53b6 to
b983f61
Compare
b983f61 to
58edf0c
Compare
dimas-b
left a comment
There was a problem hiding this comment.
Posting a couple of preliminary comments about CDI... will review in depth later.
dimas-b
left a comment
There was a problem hiding this comment.
Posting some more comments... This is a complex PR, I hope you do not mind multiple review rounds.
| sb.append("realm=").append(realmId).append('|'); | ||
| sb.append("roles="); | ||
| new TreeSet<>(principal.getRoles()).forEach(r -> sb.append(r).append(',')); | ||
| return sha256Hex(sb.toString()); |
There was a problem hiding this comment.
Should this be performed by PolarisPrincipal itself with the contract that materially different principals yield different hashes?
@adutra : WDYT?
There was a problem hiding this comment.
(posting this comment just to group unresolved issues under the latest review).
| if (!expectedPrincipalHash.equals(existing.principalHash())) { | ||
| throw new ConflictException( | ||
| "Idempotency-Key already used by a different caller for the same key"); | ||
| } | ||
| if (!expectedResourceHash.equals(existing.resourceHash()) | ||
| || !expectedOperationType.equals(existing.operationType())) { | ||
| throw new ConflictException( | ||
| "Idempotency-Key already used for a different operation or resource"); | ||
| } |
There was a problem hiding this comment.
nit: Is it so critical to distinguish these two cases?
There was a problem hiding this comment.
collapsed them into a single ConflictException
There was a problem hiding this comment.
I meant this to be more of a persistence thing - we could store only one hash value (not a blocker) .... WDYT?
There was a problem hiding this comment.
Makes sense as a persistence simplification. I'd lean to keep them as separate columns, though: operation_type stays human-readable for debugging/observability, and the two extra hash columns are cheap. Happy to revisit as a follow-up if you feel strongly.
dimas-b
left a comment
There was a problem hiding this comment.
@huaxingao : Thanks for the previous updates and for bearing with me. More incremental review comments below.
| sb.append("realm=").append(realmId).append('|'); | ||
| sb.append("roles="); | ||
| new TreeSet<>(principal.getRoles()).forEach(r -> sb.append(r).append(',')); | ||
| return sha256Hex(sb.toString()); |
There was a problem hiding this comment.
(posting this comment just to group unresolved issues under the latest review).
dimas-b
left a comment
There was a problem hiding this comment.
This round of comments concludes my first end-to-end review. Thanks for bearing with me @huaxingao !
The PR LGTM overall 👍
My biggest concern ATM is the maintenance / cleanup work. I'd propose to split it into a separate PR to enable the API call path changes to merge sooner.
| if (preflight instanceof IdempotencyOutcome.Owned owned) { | ||
| String metadataLocation = response.tableMetadata().metadataFileLocation(); | ||
| IdempotencyOutcome recordOutcome = | ||
| idempotencySupport().recordOutcome(owned, 200, metadataLocation); |
There was a problem hiding this comment.
suggestion: same here: let's send the whole preflight and response into recordOutcome() and let the latter make the decision whether to write or not based on the preflight value.
There was a problem hiding this comment.
Done. recordOutcome(preflight, …) now takes the outcome and decides internally (no-op unless New). I pass metadataLocation (a String) instead of the full LoadTableResponse to keep the idempotency helper decoupled from the Iceberg response type, let me know if you'd prefer passing response.
| */ | ||
| public static IdempotencyHandlerSupport disabled() { | ||
| IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport(); | ||
| instance.configuration = DisabledConfiguration.INSTANCE; |
There was a problem hiding this comment.
This is not used on "prod" call paths, it seems 🤔 Can it be moved to test code?
There was a problem hiding this comment.
Agreed it's test-only. Moving it cleanly needs a test-side helper (the configuration field + DisabledConfiguration are package-private and TestServices is in another package/source set), so I'll handle it in the follow-up cleanup PR.
| // surfaces instead of a replay — correct but not ideal; widen the budget if this proves too | ||
| // tight. | ||
| private static final int CONCURRENT_REPLAY_MAX_ATTEMPTS = 5; | ||
| private static final long CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS = 5; |
There was a problem hiding this comment.
Should we make them configurable via IdempotencyConfiguration?
There was a problem hiding this comment.
Done. Exposed both via IdempotencyConfiguration (concurrentReplayMaxAttempts, default 5; concurrentReplayInitialBackoff, default PT0.005S). Defaults match the previous hardcoded values.
| IdempotencyOutcome recordOutcome = | ||
| idempotencySupport().recordOutcome(owned, 200, metadataLocation); | ||
| if (recordOutcome instanceof IdempotencyOutcome.Duplicate dup) { | ||
| // Another caller raced ahead and recorded first. Replay so the response is the same shape |
There was a problem hiding this comment.
How is that possible with createTable? 🤔 Only one of the actors should be able to go past lines 550-560, I'd think 🤔
If the key got reused for another request type, recordOutcome would throw, right?
There was a problem hiding this comment.
You are right, the Duplicate-replay branch here was unreachable. Removed.
|
|
||
| // Final HTTP status code once the operation is completed (null while in-progress). | ||
| String RESOURCE_HASH = "resource_hash"; | ||
| String PRINCIPAL_HASH = "principal_hash"; |
There was a problem hiding this comment.
just wondering: do we really need two hashes here? It is not sufficient to hash all relevant inputs into one persisted value?
There was a problem hiding this comment.
Good point — consolidated into a single binding_hash over (principal, operation, resource).
| .execute( | ||
| () -> { | ||
| try { | ||
| purgeOnce(); |
There was a problem hiding this comment.
Should this be an Admin Tool command perhaps? Similar to NoSQL and Metrics maintenance?
Cf. https://lists.apache.org/thread/5nst0f2ygnl2gj3j910q7m8nk2fvokc7
There was a problem hiding this comment.
Agreed — I've dropped the scheduled maintenance task (and its purge config) from this PR entirely. The store keeps a purgeExpired(...) primitive, but how purging is driven (Admin Tool command, per the linked thread) will be a separate follow-up. This PR is now just the request-path idempotency logic.
Adds support for the Iceberg REST
Idempotency-Keyheader oncreateTable,using an optimistic-commit model (a.k.a. Model B).
Idempotency-Key(UUIDv7) header oncreateTable.is no "reserve a slot first" step.
catalog state instead of returning a conflict. No HTTP response body is stored.
Checklist
CHANGELOG.md(if needed)site/content/in-dev/unreleased(if needed)