Skip to content

Commit b983f61

Browse files
committed
Add Idempotency-Key support for Iceberg REST createTable
1 parent 3c1fdfc commit b983f61

28 files changed

Lines changed: 2974 additions & 546 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
3030
### Highlights
3131

3232
### Upgrade notes
33+
- The relational-JDBC metastore schema is bumped to v5, which reshapes the (previously unused) `idempotency_records` table for the optimistic-commit idempotency model. New installations bootstrap at v5 automatically. Existing v4 installations are not migrated in place; enabling idempotency requires bootstrapping at v5 or applying the equivalent DDL from `schema-v5.sql`. The v4 `idempotency_records` table was never wired to request handling, so no data migration is needed.
3334
- Event listeners are now executed on a dedicated executor. **This executor does not propagate the original request's CDI context**; listeners that were improperly relying on that should instead manage their own CDI request scope from now on. Furthermore, two new configuration options were introduced to configure the executor:
3435
- `polaris.event-listener.executor.pool-size` configures the thread pool size.
3536
- `polaris.event-listener.executor.queue-size` configures the queue size for pending events when all threads are busy.
@@ -52,6 +53,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
5253
- Added support for `register table` overwrite semantics in the Iceberg REST catalog flow (`overwrite=true`) for internal Polaris catalogs. With overwrite enabled, existing table pointers can be updated to a new metadata location while preserving default behavior for `overwrite=false`.
5354
- Added `REGISTER_TABLE_OVERWRITE` authorization operation mapped to `TABLE_FULL_METADATA` for deterministic overwrite authorization.
5455
- Added Polaris Spark 4.0 client.
56+
- Added handler-level support for the Iceberg REST `Idempotency-Key` header on `createTable`, using an optimistic-commit model: the terminal outcome is recorded only after a successful (2xx) response and retries replay an equivalent response rebuilt from current catalog state (no response body is stored). The key is bound to the full canonicalized request (operation, namespace, name, schema, partition spec, sort order, properties, location and access-delegation modes) and the caller principal, so reusing a key for a different request or by a different caller is rejected with HTTP 422. A retry that loses a concurrent create race replays the winning request instead of returning 409, and a replay returns 422 if the table has advanced beyond the originally-created state. Only successful outcomes are recorded — a retry after a failure simply re-runs the operation. Idempotency is disabled by default and configured under `polaris.idempotency`; records are kept in a standalone idempotency store decoupled from the metastore persistence.
5557

5658
### Changes
5759
- Added REPL support to Polaris CLI.

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ public String getDisplayName() {
5353
*/
5454
public int getLatestSchemaVersion() {
5555
return switch (this) {
56-
case POSTGRES -> 4; // PostgreSQL has schemas v1, v2, v3, v4
57-
case COCKROACHDB -> 4; // CockroachDB schema version kept in sync with PostgreSQL
58-
case H2 -> 4; // H2 uses same schemas as PostgreSQL
56+
case POSTGRES -> 5; // PostgreSQL has schemas v1, v2, v3, v4, v5
57+
case COCKROACHDB -> 5; // CockroachDB schema version kept in sync with PostgreSQL
58+
case H2 -> 5; // H2 uses same schemas as PostgreSQL
5959
};
6060
}
6161

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java

Lines changed: 61 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -20,74 +20,36 @@
2020
import java.sql.SQLException;
2121
import java.sql.Timestamp;
2222
import java.time.Instant;
23-
import java.util.HashMap;
2423
import java.util.LinkedHashMap;
2524
import java.util.List;
2625
import java.util.Map;
2726
import java.util.Optional;
2827
import java.util.Set;
29-
import javax.sql.DataSource;
3028
import org.apache.polaris.core.entity.IdempotencyRecord;
3129
import org.apache.polaris.core.persistence.IdempotencyPersistenceException;
3230
import org.apache.polaris.core.persistence.IdempotencyStore;
31+
import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
3332
import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
3433
import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
35-
import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
3634
import org.apache.polaris.persistence.relational.jdbc.models.Converter;
3735
import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord;
3836
import org.jspecify.annotations.NonNull;
3937

38+
/**
39+
* JDBC-backed {@link IdempotencyStore}.
40+
*
41+
* <p>Implements the "optimistic commit" model: a row is inserted only after the originating
42+
* operation has finalized. Race conditions between concurrent retries are detected via the table's
43+
* {@code (realm_id, idempotency_key)} primary key — a duplicate INSERT surfaces as a constraint
44+
* violation, which we translate into a {@link RecordResultType#DUPLICATE} along with the existing
45+
* row.
46+
*/
4047
public class RelationalJdbcIdempotencyStore implements IdempotencyStore {
4148

4249
private final DatasourceOperations datasourceOperations;
4350

44-
public RelationalJdbcIdempotencyStore(
45-
@NonNull DataSource dataSource, @NonNull RelationalJdbcConfiguration cfg)
46-
throws SQLException {
47-
this.datasourceOperations = new DatasourceOperations(dataSource, cfg);
48-
}
49-
50-
@Override
51-
public ReserveResult reserve(
52-
String realmId,
53-
String idempotencyKey,
54-
String operationType,
55-
String normalizedResourceId,
56-
Instant expiresAt,
57-
String executorId,
58-
Instant now) {
59-
try {
60-
// Build insert values directly to avoid requiring an Immutables-generated model type.
61-
Map<String, Object> insertMap = new LinkedHashMap<>();
62-
insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
63-
insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
64-
insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
65-
insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null);
66-
insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null);
67-
insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null);
68-
insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null);
69-
insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null);
70-
insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now));
71-
insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now));
72-
insertMap.put(ModelIdempotencyRecord.HEARTBEAT_AT, Timestamp.from(now));
73-
insertMap.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);
74-
insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(expiresAt));
75-
76-
List<Object> values = insertMap.values().stream().toList();
77-
QueryGenerator.PreparedQuery insert =
78-
QueryGenerator.generateInsertQuery(
79-
ModelIdempotencyRecord.ALL_COLUMNS,
80-
ModelIdempotencyRecord.TABLE_NAME,
81-
values,
82-
realmId);
83-
datasourceOperations.executeUpdate(insert);
84-
return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
85-
} catch (SQLException e) {
86-
if (datasourceOperations.isUniquenessConstraintViolation(e)) {
87-
return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey));
88-
}
89-
throw new IdempotencyPersistenceException("Failed to reserve idempotency key", e);
90-
}
51+
public RelationalJdbcIdempotencyStore(@NonNull DatasourceOperations datasourceOperations) {
52+
this.datasourceOperations = datasourceOperations;
9153
}
9254

9355
@Override
@@ -98,10 +60,8 @@ public Optional<IdempotencyRecord> load(String realmId, String idempotencyKey) {
9860
ModelIdempotencyRecord.ALL_COLUMNS,
9961
ModelIdempotencyRecord.TABLE_NAME,
10062
Map.of(
101-
ModelIdempotencyRecord.REALM_ID,
102-
realmId,
103-
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
104-
idempotencyKey));
63+
ModelIdempotencyRecord.REALM_ID, realmId,
64+
ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey));
10565
List<IdempotencyRecord> results =
10666
datasourceOperations.executeSelect(
10767
query,
@@ -112,8 +72,7 @@ public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException {
11272
}
11373

11474
@Override
115-
public Map<String, Object> toMap(
116-
org.apache.polaris.persistence.relational.jdbc.DatabaseType databaseType) {
75+
public Map<String, Object> toMap(DatabaseType databaseType) {
11776
throw new UnsupportedOperationException("Not used for SELECT conversion");
11877
}
11978
});
@@ -134,99 +93,56 @@ public Map<String, Object> toMap(
13493
}
13594

13695
@Override
137-
public HeartbeatResult updateHeartbeat(
138-
String realmId, String idempotencyKey, String executorId, Instant now) {
139-
Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
140-
if (existing.isEmpty()) {
141-
return HeartbeatResult.NOT_FOUND;
142-
}
143-
144-
IdempotencyRecord record = existing.get();
145-
if (record.httpStatus() != null) {
146-
return HeartbeatResult.FINALIZED;
147-
}
148-
if (record.executorId() == null || !record.executorId().equals(executorId)) {
149-
return HeartbeatResult.LOST_OWNERSHIP;
150-
}
151-
152-
QueryGenerator.PreparedQuery update =
153-
QueryGenerator.generateUpdateQuery(
154-
ModelIdempotencyRecord.ALL_COLUMNS,
155-
ModelIdempotencyRecord.TABLE_NAME,
156-
Map.of(
157-
ModelIdempotencyRecord.HEARTBEAT_AT,
158-
Timestamp.from(now),
159-
ModelIdempotencyRecord.UPDATED_AT,
160-
Timestamp.from(now)),
161-
Map.of(
162-
ModelIdempotencyRecord.REALM_ID,
163-
realmId,
164-
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
165-
idempotencyKey,
166-
ModelIdempotencyRecord.EXECUTOR_ID,
167-
executorId),
168-
Map.of(),
169-
Map.of(),
170-
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
171-
Set.of());
172-
173-
try {
174-
int updated = datasourceOperations.executeUpdate(update);
175-
if (updated > 0) {
176-
return HeartbeatResult.UPDATED;
177-
}
178-
} catch (SQLException e) {
179-
throw new IdempotencyPersistenceException("Failed to update idempotency heartbeat", e);
180-
}
181-
182-
// Raced with finalize/ownership loss; re-check to return a meaningful result.
183-
Optional<IdempotencyRecord> after = load(realmId, idempotencyKey);
184-
if (after.isEmpty()) {
185-
return HeartbeatResult.NOT_FOUND;
186-
}
187-
if (after.get().httpStatus() != null) {
188-
return HeartbeatResult.FINALIZED;
189-
}
190-
return HeartbeatResult.LOST_OWNERSHIP;
191-
}
192-
193-
@Override
194-
public boolean finalizeRecord(
96+
public RecordResult recordIfAbsent(
19597
String realmId,
19698
String idempotencyKey,
197-
Integer httpStatus,
198-
String errorSubtype,
199-
String responseSummary,
200-
String responseHeaders,
201-
Instant finalizedAt) {
202-
// Use ordered/set maps so we can include nullable values (Map.of disallows nulls).
203-
Map<String, Object> setClause = new LinkedHashMap<>();
204-
setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
205-
setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
206-
setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary);
207-
setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders);
208-
setClause.put(ModelIdempotencyRecord.FINALIZED_AT, Timestamp.from(finalizedAt));
209-
setClause.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(finalizedAt));
210-
211-
Map<String, Object> whereEquals = new HashMap<>();
212-
whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId);
213-
whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
214-
215-
QueryGenerator.PreparedQuery update =
216-
QueryGenerator.generateUpdateQuery(
217-
ModelIdempotencyRecord.ALL_COLUMNS,
218-
ModelIdempotencyRecord.TABLE_NAME,
219-
setClause,
220-
whereEquals,
221-
Map.of(),
222-
Map.of(),
223-
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
224-
Set.of());
225-
99+
String operationType,
100+
String requestHash,
101+
String principalHash,
102+
int httpStatus,
103+
String metadataLocation,
104+
Instant createdAt,
105+
Instant expiresAt) {
226106
try {
227-
return datasourceOperations.executeUpdate(update) > 0;
107+
Map<String, Object> insertMap = new LinkedHashMap<>();
108+
insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
109+
insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
110+
insertMap.put(ModelIdempotencyRecord.REQUEST_HASH, requestHash);
111+
insertMap.put(ModelIdempotencyRecord.PRINCIPAL_HASH, principalHash);
112+
insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
113+
insertMap.put(ModelIdempotencyRecord.METADATA_LOCATION, metadataLocation);
114+
insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(createdAt));
115+
insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(expiresAt));
116+
117+
List<Object> values = insertMap.values().stream().toList();
118+
QueryGenerator.PreparedQuery insert =
119+
QueryGenerator.generateInsertQuery(
120+
ModelIdempotencyRecord.ALL_COLUMNS,
121+
ModelIdempotencyRecord.TABLE_NAME,
122+
values,
123+
realmId);
124+
datasourceOperations.executeUpdate(insert);
125+
return new RecordResult(RecordResultType.OWNED, Optional.empty());
228126
} catch (SQLException e) {
229-
throw new IdempotencyPersistenceException("Failed to finalize idempotency record", e);
127+
if (datasourceOperations.isUniquenessConstraintViolation(e)) {
128+
Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
129+
if (existing.isEmpty()) {
130+
// The insert lost the race on the (realm_id, idempotency_key) constraint, yet the winning
131+
// row is no longer visible (e.g. purged or rolled back between the conflict and this
132+
// reload). Surface a persistence error rather than a DUPLICATE without a record, which
133+
// the
134+
// handler layer treats as an invariant violation.
135+
throw new IdempotencyPersistenceException(
136+
"Insert for realm/key "
137+
+ realmId
138+
+ "/"
139+
+ idempotencyKey
140+
+ " conflicted on the unique constraint but the existing record could not be"
141+
+ " reloaded");
142+
}
143+
return new RecordResult(RecordResultType.DUPLICATE, existing);
144+
}
145+
throw new IdempotencyPersistenceException("Failed to record idempotency entry", e);
230146
}
231147
}
232148

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.polaris.persistence.relational.jdbc.idempotency;
18+
19+
import io.smallrye.common.annotation.Identifier;
20+
import jakarta.enterprise.context.ApplicationScoped;
21+
import jakarta.inject.Inject;
22+
import org.apache.polaris.core.context.RealmContext;
23+
import org.apache.polaris.core.persistence.IdempotencyStore;
24+
import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
25+
import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
26+
27+
/**
28+
* {@link IdempotencyStoreFactory} backed by the same JDBC {@link DatasourceOperations} used by the
29+
* primary metastore.
30+
*
31+
* <p>The store is stateless and shared across realms — realm scoping is enforced inside SQL via the
32+
* {@code realm_id} column.
33+
*/
34+
@ApplicationScoped
35+
@Identifier("relational-jdbc")
36+
public class RelationalJdbcIdempotencyStoreFactory implements IdempotencyStoreFactory {
37+
38+
private final RelationalJdbcIdempotencyStore store;
39+
40+
@Inject
41+
public RelationalJdbcIdempotencyStoreFactory(DatasourceOperations datasourceOperations) {
42+
this.store = new RelationalJdbcIdempotencyStore(datasourceOperations);
43+
}
44+
45+
@Override
46+
public IdempotencyStore getOrCreateIdempotencyStore(RealmContext realmContext) {
47+
return store;
48+
}
49+
}

0 commit comments

Comments
 (0)