Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ public ReserveResult reserve(
String idempotencyKey,
String operationType,
String normalizedResourceId,
String principalHash,
Instant expiresAt,
String executorId,
Instant now) {
try {
// Build insert values directly to avoid requiring an Immutables-generated model type.
Map<String, Object> insertMap = new LinkedHashMap<>();
insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
insertMap.put(ModelIdempotencyRecord.PRINCIPAL_HASH, principalHash);
insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null);
insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null);
insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null);
insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null);
insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null);
insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now));
Expand Down Expand Up @@ -179,7 +179,6 @@ public HeartbeatResult updateHeartbeat(
throw new IdempotencyPersistenceException("Failed to update idempotency heartbeat", e);
}

// Raced with finalize/ownership loss; re-check to return a meaningful result.
Optional<IdempotencyRecord> after = load(realmId, idempotencyKey);
if (after.isEmpty()) {
return HeartbeatResult.NOT_FOUND;
Expand All @@ -190,27 +189,51 @@ public HeartbeatResult updateHeartbeat(
return HeartbeatResult.LOST_OWNERSHIP;
}

@Override
public boolean cancelInProgressReservation(
String realmId, String idempotencyKey, String executorId) {
try {
QueryGenerator.PreparedQuery delete =
QueryGenerator.generateDeleteQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
Map.of(
ModelIdempotencyRecord.REALM_ID,
realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
idempotencyKey,
ModelIdempotencyRecord.EXECUTOR_ID,
executorId),
Map.of(),
Map.of(),
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
Set.of());
return datasourceOperations.executeUpdate(delete) > 0;
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to cancel idempotency reservation", e);
}
}

@Override
public boolean finalizeRecord(
String realmId,
String idempotencyKey,
String executorId,
Integer httpStatus,
String errorSubtype,
String responseSummary,
String responseHeaders,
Instant finalizedAt) {
// Use ordered/set maps so we can include nullable values (Map.of disallows nulls).
Map<String, Object> setClause = new LinkedHashMap<>();
setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary);
setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders);
setClause.put(ModelIdempotencyRecord.FINALIZED_AT, Timestamp.from(finalizedAt));
setClause.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(finalizedAt));

Map<String, Object> whereEquals = new HashMap<>();
whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId);
whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
whereEquals.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);

QueryGenerator.PreparedQuery update =
QueryGenerator.generateUpdateQuery(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.polaris.persistence.relational.jdbc.idempotency;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import javax.sql.DataSource;
import org.apache.polaris.core.persistence.IdempotencyStore;
import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Produces an {@link IdempotencyStore} backed by the JDBC datasource. Qualified with
* {@code @Identifier("relational-jdbc")} so the runtime can select it based on {@code
* polaris.persistence.type}.
*/
@ApplicationScoped
public class RelationalJdbcIdempotencyStoreProducer {

private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalJdbcIdempotencyStoreProducer.class);

@Inject Instance<DataSource> dataSource;
@Inject RelationalJdbcConfiguration relationalJdbcConfiguration;

@Produces
@ApplicationScoped
@Identifier("relational-jdbc")
IdempotencyStore idempotencyStore() {
if (dataSource.isUnsatisfied()) {
throw new IllegalStateException(
"relational-jdbc IdempotencyStore requested but no DataSource is wired");
}
try {
LOGGER.info("Wiring RelationalJdbcIdempotencyStore");
return new RelationalJdbcIdempotencyStore(dataSource.get(), relationalJdbcConfiguration);
} catch (Exception e) {
throw new IllegalStateException("Failed to construct RelationalJdbcIdempotencyStore", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
Expand Down Expand Up @@ -42,46 +42,38 @@ public interface ModelIdempotencyRecord extends Converter<IdempotencyRecord> {

String TABLE_NAME = "idempotency_records";

// Logical tenant / realm identifier.
String REALM_ID = "realm_id";
// Client-provided idempotency key.
String IDEMPOTENCY_KEY = "idempotency_key";
// Logical operation type (e.g. commit-table).
String OPERATION_TYPE = "operation_type";
// Normalized identifier of the affected resource.
String RESOURCE_ID = "resource_id";

// Final HTTP status code once the operation is completed (null while in-progress).
// Hash of caller principal identity bound to the reservation.
// Compared on replay to prevent cross-principal cache hits.
String PRINCIPAL_HASH = "principal_hash";

String HTTP_STATUS = "http_status";
// Optional error subtype for failures.
String ERROR_SUBTYPE = "error_subtype";
// Short serialized representation of the response body.

// Minimal response body (JSON) for replay; null for credential-bearing mutations,
// which re-derive the response on replay rather than serving stored content.
String RESPONSE_SUMMARY = "response_summary";
// Serialized representation of response headers.
String RESPONSE_HEADERS = "response_headers";
// Timestamp when the operation was finalized (null while in-progress).
String FINALIZED_AT = "finalized_at";

// Timestamp when the record was created.
String CREATED_AT = "created_at";
// Timestamp when the record was last updated.
String UPDATED_AT = "updated_at";
// Timestamp for the last heartbeat update (null if no heartbeat recorded).
String HEARTBEAT_AT = "heartbeat_at";
// Identifier of the executor that owns the in-progress record (null if not owned).
String EXECUTOR_ID = "executor_id";
// Expiration timestamp after which the record can be considered stale/purgeable.
String EXPIRES_AT = "expires_at";

List<String> ALL_COLUMNS =
List.of(
IDEMPOTENCY_KEY,
OPERATION_TYPE,
RESOURCE_ID,
PRINCIPAL_HASH,
HTTP_STATUS,
ERROR_SUBTYPE,
RESPONSE_SUMMARY,
RESPONSE_HEADERS,
FINALIZED_AT,
CREATED_AT,
UPDATED_AT,
Expand All @@ -97,6 +89,8 @@ public interface ModelIdempotencyRecord extends Converter<IdempotencyRecord> {

String getResourceId();

String getPrincipalHash();

@Nullable
Integer getHttpStatus();

Expand All @@ -106,9 +100,6 @@ public interface ModelIdempotencyRecord extends Converter<IdempotencyRecord> {
@Nullable
String getResponseSummary();

@Nullable
String getResponseHeaders();

@Nullable
Instant getFinalizedAt();

Expand All @@ -131,7 +122,6 @@ default IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException {

/** Convert the current ResultSet row into an {@link IdempotencyRecord}. */
static IdempotencyRecord fromRow(ResultSet rs) throws SQLException {
// Requires realm_id to be projected in the ResultSet.
return fromRow(rs.getString(REALM_ID), rs);
}

Expand All @@ -143,11 +133,11 @@ static IdempotencyRecord fromRow(String realmId, ResultSet rs) throws SQLExcepti
String idempotencyKey = rs.getString(IDEMPOTENCY_KEY);
String operationType = rs.getString(OPERATION_TYPE);
String resourceId = rs.getString(RESOURCE_ID);
String principalHash = rs.getString(PRINCIPAL_HASH);

Integer httpStatus = (Integer) rs.getObject(HTTP_STATUS);
String errorSubtype = rs.getString(ERROR_SUBTYPE);
String responseSummary = rs.getString(RESPONSE_SUMMARY);
String responseHeaders = rs.getString(RESPONSE_HEADERS);

Instant createdAt = rs.getTimestamp(CREATED_AT).toInstant();
Instant updatedAt = rs.getTimestamp(UPDATED_AT).toInstant();
Expand All @@ -166,10 +156,10 @@ static IdempotencyRecord fromRow(String realmId, ResultSet rs) throws SQLExcepti
idempotencyKey,
operationType,
resourceId,
principalHash,
httpStatus,
errorSubtype,
responseSummary,
responseHeaders,
createdAt,
updatedAt,
finalizedAt,
Expand All @@ -184,10 +174,10 @@ default Map<String, Object> toMap(DatabaseType databaseType) {
map.put(IDEMPOTENCY_KEY, getIdempotencyKey());
map.put(OPERATION_TYPE, getOperationType());
map.put(RESOURCE_ID, getResourceId());
map.put(PRINCIPAL_HASH, getPrincipalHash());
map.put(HTTP_STATUS, getHttpStatus());
map.put(ERROR_SUBTYPE, getErrorSubtype());
map.put(RESPONSE_SUMMARY, getResponseSummary());
map.put(RESPONSE_HEADERS, getResponseHeaders());
map.put(FINALIZED_AT, getFinalizedAt() == null ? null : Timestamp.from(getFinalizedAt()));
map.put(CREATED_AT, Timestamp.from(getCreatedAt()));
map.put(UPDATED_AT, Timestamp.from(getUpdatedAt()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ CREATE TABLE IF NOT EXISTS idempotency_records (
idempotency_key TEXT NOT NULL,
operation_type TEXT NOT NULL,
resource_id TEXT NOT NULL, -- normalized request-derived resource identifier (not a generated entity id)
principal_hash TEXT NOT NULL, -- hash of caller principal + realm; checked on replay to prevent cross-principal cache hits

-- Finalization/replay
http_status INT4, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx
error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed
response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string)
response_headers TEXT, -- small whitelisted headers to replay (JSON string)
response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string); null for credential-bearing mutations
finalized_at TIMESTAMP, -- when http_status was written

-- Liveness/ops
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ CREATE TABLE IF NOT EXISTS idempotency_records (
idempotency_key TEXT NOT NULL,
operation_type TEXT NOT NULL,
resource_id TEXT NOT NULL, -- normalized request-derived resource identifier (not a generated entity id)
principal_hash TEXT NOT NULL, -- hash of caller principal + realm; checked on replay

-- Finalization/replay
http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx
error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed
response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string)
response_headers TEXT, -- small whitelisted headers to replay (JSON string)
response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string); null for credential-bearing mutations
finalized_at TIMESTAMP, -- when http_status was written

-- Liveness/ops
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ CREATE TABLE IF NOT EXISTS idempotency_records (
idempotency_key TEXT NOT NULL,
operation_type TEXT NOT NULL,
resource_id TEXT NOT NULL, -- normalized request-derived resource identifier (not a generated entity id)
principal_hash TEXT NOT NULL, -- hash of caller principal + realm; checked on replay to prevent cross-principal cache hits

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding principal_hash TEXT NOT NULL in-place is only safe if schema-v4 has not been released. Given v4 is already released with 1.4.0. This feature is not enabled, however, users should already create the table, I'd suggest to bump to schema-v5.sql with ALTER TABLE idempotency_records ADD COLUMN principal_hash TEXT, then ALTER COLUMN ... SET NOT NULL. Same applies to h2/schema-v4.sql:150 and cockroachdb/schema-v4.sql:153. Backfill isn't needed as the feature is not available in 1.4.0.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, this applies to removal of response_headers TEXT as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Reverted schema-v4.sql (Postgres / H2 / CockroachDB), and added schema-v5.sql.


-- Finalization/replay
http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx
error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed
response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string)
response_headers TEXT, -- small whitelisted headers to replay (JSON string)
response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string); null for credential-bearing mutations
finalized_at TIMESTAMP, -- when http_status was written

-- Liveness/ops
Expand Down
Loading
Loading