Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
### Highlights

### Upgrade notes
- The relational-JDBC metastore schema is bumped to v5, which reshapes the (previously unused) `idempotency_records` table for the optimistic-commit idempotency model. New installations bootstrap at v5 automatically. Existing v4 installations are not migrated in place; enabling idempotency requires bootstrapping at v5 or applying the equivalent DDL from `schema-v5.sql`. The v4 `idempotency_records` table was never wired to request handling, so no data migration is needed.
- Event listeners are now executed on a dedicated executor. **This executor does not propagate the original request's CDI context**; listeners that were improperly relying on that should instead manage their own CDI request scope from now on. Furthermore, two new configuration options were introduced to configure the executor:
- `polaris.event-listener.executor.pool-size` configures the thread pool size.
- `polaris.event-listener.executor.queue-size` configures the queue size for pending events when all threads are busy.
Expand All @@ -52,6 +53,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
- Added support for `register table` overwrite semantics in the Iceberg REST catalog flow (`overwrite=true`) for internal Polaris catalogs. With overwrite enabled, existing table pointers can be updated to a new metadata location while preserving default behavior for `overwrite=false`.
- Added `REGISTER_TABLE_OVERWRITE` authorization operation mapped to `TABLE_FULL_METADATA` for deterministic overwrite authorization.
- Added Polaris Spark 4.0 client.
- Added handler-level support for the Iceberg REST `Idempotency-Key` header on `createTable`, using an optimistic-commit model: the terminal outcome is recorded only after a successful (2xx) response and retries replay an equivalent response rebuilt from current catalog state (no response body is stored). The key is bound to the request-derived resource (operation, namespace, name and access-delegation modes) and the caller principal, so reusing a key for a different resource or by a different caller is rejected with HTTP 422. A retry that loses a concurrent create race replays the winning request instead of returning 409, and a replay returns 422 if the table has advanced beyond the originally-created state. Only successful outcomes are recorded — a retry after a failure simply re-runs the operation. Idempotency is disabled by default and configured under `polaris.idempotency`; records are kept in a standalone idempotency store decoupled from the metastore persistence.

### Changes
- Added REPL support to Polaris CLI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public String getDisplayName() {
*/
public int getLatestSchemaVersion() {
return switch (this) {
case POSTGRES -> 4; // PostgreSQL has schemas v1, v2, v3, v4
case COCKROACHDB -> 4; // CockroachDB schema version kept in sync with PostgreSQL
case H2 -> 4; // H2 uses same schemas as PostgreSQL
case POSTGRES -> 5; // PostgreSQL has schemas v1, v2, v3, v4, v5
case COCKROACHDB -> 5; // CockroachDB schema version kept in sync with PostgreSQL
case H2 -> 5; // H2 uses same schemas as PostgreSQL
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,78 +20,47 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.sql.DataSource;
import java.util.UUID;
import org.apache.polaris.core.entity.IdempotencyRecord;
import org.apache.polaris.core.persistence.IdempotencyPersistenceException;
import org.apache.polaris.core.persistence.IdempotencyStore;
import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
import org.apache.polaris.persistence.relational.jdbc.models.Converter;
import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord;
import org.jspecify.annotations.NonNull;

/**
* JDBC-backed {@link IdempotencyStore}.
*
* <p>Implements the "optimistic commit" model: a row is inserted only after the originating
* operation has finalized. Race conditions between concurrent retries are detected via the table's
* {@code (realm_id, idempotency_key)} primary key — a duplicate INSERT surfaces as a constraint
* violation, which we translate into a {@link RecordResultType#DUPLICATE} along with the existing
* row.
*
* <p>Following the {@code JdbcBasePersistenceImpl} pattern, an instance is bound to a single realm
* at construction; realm scoping is then applied to every query via the {@code realm_id} column.
*/
public class RelationalJdbcIdempotencyStore implements IdempotencyStore {

private final DatasourceOperations datasourceOperations;
private final String realmId;

public RelationalJdbcIdempotencyStore(
@NonNull DataSource dataSource, @NonNull RelationalJdbcConfiguration cfg)
throws SQLException {
this.datasourceOperations = new DatasourceOperations(dataSource, cfg);
}

@Override
public ReserveResult reserve(
String realmId,
String idempotencyKey,
String operationType,
String normalizedResourceId,
Instant expiresAt,
String executorId,
Instant now) {
try {
// Build insert values directly to avoid requiring an Immutables-generated model type.
Map<String, Object> insertMap = new LinkedHashMap<>();
insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null);
insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null);
insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null);
insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null);
insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null);
insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.HEARTBEAT_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);
insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(expiresAt));

List<Object> values = insertMap.values().stream().toList();
QueryGenerator.PreparedQuery insert =
QueryGenerator.generateInsertQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
values,
realmId);
datasourceOperations.executeUpdate(insert);
return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
} catch (SQLException e) {
if (datasourceOperations.isUniquenessConstraintViolation(e)) {
return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey));
}
throw new IdempotencyPersistenceException("Failed to reserve idempotency key", e);
}
@NonNull DatasourceOperations datasourceOperations, @NonNull String realmId) {
this.datasourceOperations = datasourceOperations;
Comment thread
dimas-b marked this conversation as resolved.
this.realmId = realmId;
}

@Override
public Optional<IdempotencyRecord> load(String realmId, String idempotencyKey) {
public Optional<IdempotencyRecord> load(UUID idempotencyKey) {
try {
QueryGenerator.PreparedQuery query =
QueryGenerator.generateSelectQuery(
Expand All @@ -101,7 +70,7 @@ public Optional<IdempotencyRecord> load(String realmId, String idempotencyKey) {
ModelIdempotencyRecord.REALM_ID,
realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
idempotencyKey));
idempotencyKey.toString()));
List<IdempotencyRecord> results =
datasourceOperations.executeSelect(
query,
Expand All @@ -112,8 +81,7 @@ public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException {
}

@Override
public Map<String, Object> toMap(
org.apache.polaris.persistence.relational.jdbc.DatabaseType databaseType) {
public Map<String, Object> toMap(DatabaseType databaseType) {
throw new UnsupportedOperationException("Not used for SELECT conversion");
}
});
Expand All @@ -134,104 +102,58 @@ public Map<String, Object> toMap(
}

@Override
public HeartbeatResult updateHeartbeat(
String realmId, String idempotencyKey, String executorId, Instant now) {
Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
if (existing.isEmpty()) {
return HeartbeatResult.NOT_FOUND;
}

IdempotencyRecord record = existing.get();
if (record.httpStatus() != null) {
return HeartbeatResult.FINALIZED;
}
if (record.executorId() == null || !record.executorId().equals(executorId)) {
return HeartbeatResult.LOST_OWNERSHIP;
}

QueryGenerator.PreparedQuery update =
QueryGenerator.generateUpdateQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
Map.of(
ModelIdempotencyRecord.HEARTBEAT_AT,
Timestamp.from(now),
ModelIdempotencyRecord.UPDATED_AT,
Timestamp.from(now)),
Map.of(
ModelIdempotencyRecord.REALM_ID,
realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
idempotencyKey,
ModelIdempotencyRecord.EXECUTOR_ID,
executorId),
Map.of(),
Map.of(),
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
Set.of());

public RecordResult recordIfAbsent(
UUID idempotencyKey,
String operationType,
String bindingHash,
int httpStatus,
String metadataLocation,
Instant createdAt,
Instant expiresAt) {
try {
int updated = datasourceOperations.executeUpdate(update);
if (updated > 0) {
return HeartbeatResult.UPDATED;
}
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to update idempotency heartbeat", e);
}

// Raced with finalize/ownership loss; re-check to return a meaningful result.
Optional<IdempotencyRecord> after = load(realmId, idempotencyKey);
if (after.isEmpty()) {
return HeartbeatResult.NOT_FOUND;
}
if (after.get().httpStatus() != null) {
return HeartbeatResult.FINALIZED;
}
return HeartbeatResult.LOST_OWNERSHIP;
}

@Override
public boolean finalizeRecord(
String realmId,
String idempotencyKey,
Integer httpStatus,
String errorSubtype,
String responseSummary,
String responseHeaders,
Instant finalizedAt) {
// Use ordered/set maps so we can include nullable values (Map.of disallows nulls).
Map<String, Object> setClause = new LinkedHashMap<>();
setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary);
setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders);
setClause.put(ModelIdempotencyRecord.FINALIZED_AT, Timestamp.from(finalizedAt));
setClause.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(finalizedAt));

Map<String, Object> whereEquals = new HashMap<>();
whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId);
whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);

QueryGenerator.PreparedQuery update =
QueryGenerator.generateUpdateQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
setClause,
whereEquals,
Map.of(),
Map.of(),
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
Set.of());
Map<String, Object> insertMap = new LinkedHashMap<>();
insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey.toString());
insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
insertMap.put(ModelIdempotencyRecord.BINDING_HASH, bindingHash);
insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
insertMap.put(ModelIdempotencyRecord.METADATA_LOCATION, metadataLocation);
insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(createdAt));
insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(expiresAt));

try {
return datasourceOperations.executeUpdate(update) > 0;
List<Object> values = insertMap.values().stream().toList();
QueryGenerator.PreparedQuery insert =
QueryGenerator.generateInsertQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
values,
realmId);
datasourceOperations.executeUpdate(insert);
return new RecordResult(RecordResultType.OWNED, Optional.empty());
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to finalize idempotency record", e);
if (datasourceOperations.isUniquenessConstraintViolation(e)) {
Optional<IdempotencyRecord> existing = load(idempotencyKey);
if (existing.isEmpty()) {
// The insert lost the race on the (realm_id, idempotency_key) constraint, yet the winning
// row is no longer visible (e.g. purged or rolled back between the conflict and this
// reload). Surface a persistence error rather than a DUPLICATE without a record, which
// the
// handler layer treats as an invariant violation.
throw new IdempotencyPersistenceException(
"Insert for realm/key "
+ realmId
+ "/"
+ idempotencyKey
+ " conflicted on the unique constraint but the existing record could not be"
+ " reloaded");
}
return new RecordResult(RecordResultType.DUPLICATE, existing);
}
throw new IdempotencyPersistenceException("Failed to record idempotency entry", e);
}
Comment on lines 132 to 152
}

@Override
public int purgeExpired(String realmId, Instant before) {
public int purgeExpired(Instant before) {
try {
QueryGenerator.PreparedQuery delete =
QueryGenerator.generateDeleteQuery(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.polaris.persistence.relational.jdbc.idempotency;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.IdempotencyStore;
import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;

/**
* {@link IdempotencyStoreFactory} backed by the same JDBC {@link DatasourceOperations} used by the
* primary metastore.
*
* <p>Each call vends a lightweight {@link RelationalJdbcIdempotencyStore} bound to the requested
* realm (mirroring {@code JdbcBasePersistenceImpl}); realm scoping is enforced inside SQL via the
* {@code realm_id} column. The realm arrives as a method argument, so this bean has no {@link
* RealmContext} dependency and stays deployable wherever the JDBC persistence is (including the
* Admin Tool, which has no request scope).
*/
@ApplicationScoped
@Identifier("relational-jdbc")
public class RelationalJdbcIdempotencyStoreFactory implements IdempotencyStoreFactory {

private final DatasourceOperations datasourceOperations;

@Inject
public RelationalJdbcIdempotencyStoreFactory(DatasourceOperations datasourceOperations) {
this.datasourceOperations = datasourceOperations;
}

@Override
public IdempotencyStore getOrCreateIdempotencyStore(RealmContext realmContext) {
return new RelationalJdbcIdempotencyStore(
datasourceOperations, realmContext.getRealmIdentifier());
}
}
Loading