diff --git a/persistence/relational-jdbc/build.gradle.kts b/persistence/relational-jdbc/build.gradle.kts index 39fe5614f4..f5c5597a4b 100644 --- a/persistence/relational-jdbc/build.gradle.kts +++ b/persistence/relational-jdbc/build.gradle.kts @@ -42,4 +42,9 @@ dependencies { testImplementation(libs.mockito.junit.jupiter) testImplementation(libs.h2) testImplementation(testFixtures(project(":polaris-core"))) + + testImplementation(platform(libs.testcontainers.bom)) + + testImplementation("org.testcontainers:junit-jupiter:1.20.3") + testImplementation("org.testcontainers:postgresql:1.20.3") } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java new file mode 100644 index 0000000000..09781f1b30 --- /dev/null +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import jakarta.annotation.Nonnull; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatabaseType; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.apache.polaris.persistence.relational.jdbc.models.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Postgres implementation of IdempotencyStore. */ +public final class PostgresIdempotencyStore implements IdempotencyStore { + private static final Logger LOG = LoggerFactory.getLogger(PostgresIdempotencyStore.class); + + private static final String TABLE = "POLARIS_SCHEMA.idempotency_records"; + + private final DatasourceOperations ops; + + public PostgresIdempotencyStore( + @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg) + throws SQLException { + this.ops = new DatasourceOperations(dataSource, cfg); + } + + @Override + public ReserveResult reserve( + String realmId, + String idempotencyKey, + String operationType, + String normalizedResourceId, + Instant expiresAt, + String executorId, + Instant now) { + String sql = + "INSERT INTO " + + TABLE + + " (realm_id, idempotency_key, operation_type, resource_id," + + " http_status, error_subtype, response_summary, response_headers, finalized_at," + + " created_at, updated_at, heartbeat_at, executor_id, expires_at)" + + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?, ?)" + + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING"; + List params = + List.of( + realmId, + idempotencyKey, + operationType, + normalizedResourceId, + Timestamp.from(now), + Timestamp.from(now), + Timestamp.from(now), + executorId, + Timestamp.from(expiresAt)); + try { + int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql, params)); + if (updated == 1) { + return new ReserveResult(ReserveResultType.OWNED, Optional.empty()); + } else { + // Load existing to return to caller + return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey)); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to reserve idempotency key", e); + } + } + + @Override + public Optional load(String realmId, String idempotencyKey) { + String sql = + "SELECT realm_id, idempotency_key, operation_type, resource_id, http_status, error_subtype," + + " response_summary, response_headers, created_at, updated_at, finalized_at, heartbeat_at," + + " executor_id, expires_at" + + " FROM " + + TABLE + + " WHERE realm_id = ? AND idempotency_key = ?"; + try { + final IdempotencyRecord[] holder = new IdempotencyRecord[1]; + ops.executeSelectOverStream( + new QueryGenerator.PreparedQuery(sql, List.of(realmId, idempotencyKey)), + new Converter() { + @Override + public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { + return convert(rs); + } + + @Override + public Map toMap(DatabaseType databaseType) { + return Map.of(); + } + }, + stream -> stream.findFirst().ifPresent(r -> holder[0] = r)); + return Optional.ofNullable(holder[0]); + } catch (SQLException e) { + throw new RuntimeException("Failed to load idempotency record", e); + } + } + + @Override + public boolean updateHeartbeat( + String realmId, String idempotencyKey, String executorId, Instant now) { + String sql = + "UPDATE " + + TABLE + + " SET heartbeat_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ?" + + " AND http_status IS NULL" + + " AND (executor_id IS NULL OR executor_id = ?)"; + try { + int rows = + ops.executeUpdate( + new QueryGenerator.PreparedQuery( + sql, + List.of( + Timestamp.from(now), + Timestamp.from(now), + realmId, + idempotencyKey, + executorId))); + return rows > 0; + } catch (SQLException e) { + throw new RuntimeException("Failed to update heartbeat", e); + } + } + + @Override + public boolean finalizeRecord( + String realmId, + String idempotencyKey, + Integer httpStatus, + String errorSubtype, + String responseSummary, + String responseHeaders, + Instant finalizedAt) { + String sql = + "UPDATE " + + TABLE + + " SET http_status = ?, error_subtype = ?, response_summary = ?, response_headers = ?," + + " finalized_at = ?, updated_at = ?" + + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS NULL"; + try { + int rows = + ops.executeUpdate( + new QueryGenerator.PreparedQuery( + sql, + Arrays.asList( + httpStatus, + errorSubtype, + responseSummary, + responseHeaders, + Timestamp.from(finalizedAt), + Timestamp.from(finalizedAt), + realmId, + idempotencyKey))); + return rows > 0; + } catch (SQLException e) { + throw new RuntimeException("Failed to finalize idempotency record", e); + } + } + + @Override + public int purgeExpired(Instant before) { + String sql = "DELETE FROM " + TABLE + " WHERE expires_at < ?"; + try { + return ops.executeUpdate( + new QueryGenerator.PreparedQuery(sql, List.of(Timestamp.from(before)))); + } catch (SQLException e) { + throw new RuntimeException("Failed to purge expired idempotency records", e); + } + } + + private static IdempotencyRecord convert(ResultSet rs) { + try { + String realmId = rs.getString("realm_id"); + String idempotencyKey = rs.getString("idempotency_key"); + String operationType = rs.getString("operation_type"); + String resourceId = rs.getString("resource_id"); + Integer httpStatus = (Integer) rs.getObject("http_status"); + String errorSubtype = rs.getString("error_subtype"); + String responseSummary = rs.getString("response_summary"); + String responseHeaders = rs.getString("response_headers"); + Instant createdAt = rs.getTimestamp("created_at").toInstant(); + Instant updatedAt = rs.getTimestamp("updated_at").toInstant(); + Timestamp fts = rs.getTimestamp("finalized_at"); + Instant finalizedAt = fts == null ? null : fts.toInstant(); + Timestamp hb = rs.getTimestamp("heartbeat_at"); + Instant heartbeatAt = hb == null ? null : hb.toInstant(); + String executorId = rs.getString("executor_id"); + Instant expiresAt = rs.getTimestamp("expires_at").toInstant(); + return new IdempotencyRecord( + realmId, + idempotencyKey, + operationType, + resourceId, + httpStatus, + errorSubtype, + responseSummary, + responseHeaders, + createdAt, + updatedAt, + finalizedAt, + heartbeatAt, + executorId, + expiresAt); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql b/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql index 96897f5106..3f35cf66f0 100644 --- a/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql +++ b/persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql @@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events ( additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, PRIMARY KEY (event_id) ); + +-- Idempotency records (key-only idempotency; durable replay) +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + + -- Finalization/replay + http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx + error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed + response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string) + response_headers TEXT, -- small whitelisted headers to replay (JSON string) + finalized_at TIMESTAMP, -- when http_status was written + + -- Liveness/ops + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS + executor_id TEXT, -- owner pod/worker id + expires_at TIMESTAMP, + + PRIMARY KEY (realm_id, idempotency_key) +); + +-- Helpful indexes +CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records (expires_at); +CREATE INDEX IF NOT EXISTS idx_idemp_active ON idempotency_records (http_status, heartbeat_at); diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java new file mode 100644 index 0000000000..483af2d5ac --- /dev/null +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.idempotency; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.InputStream; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import javax.sql.DataSource; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.idempotency.IdempotencyRecord; +import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; +import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class PostgresIdempotencyStoreIT { + + @Container + private static final PostgreSQLContainer POSTGRES = + new PostgreSQLContainer<>("postgres:17.5-alpine"); + + private static DataSource dataSource; + private static PostgresIdempotencyStore store; + + @BeforeAll + static void setup() throws Exception { + POSTGRES.start(); + PGSimpleDataSource ds = new PGSimpleDataSource(); + ds.setURL(POSTGRES.getJdbcUrl()); + ds.setUser(POSTGRES.getUsername()); + ds.setPassword(POSTGRES.getPassword()); + dataSource = ds; + + // Apply schema + RelationalJdbcConfiguration cfg = + new RelationalJdbcConfiguration() { + @Override + public Optional maxRetries() { + return Optional.of(3); + } + + @Override + public Optional maxDurationInMs() { + return Optional.of(5000L); + } + + @Override + public Optional initialDelayInMs() { + return Optional.of(100L); + } + }; + DatasourceOperations ops = new DatasourceOperations(dataSource, cfg); + try (InputStream is = + Thread.currentThread() + .getContextClassLoader() + .getResourceAsStream("postgres/schema-v3.sql")) { + if (is == null) { + throw new IllegalStateException("schema-v3.sql not found on classpath"); + } + ops.executeScript(is); + } + + store = new PostgresIdempotencyStore(dataSource, cfg); + } + + @AfterAll + static void teardown() { + POSTGRES.stop(); + } + + @Test + void reserveSingleWinnerAndDuplicate() { + String realm = "test-realm"; + String key = "K1"; + String op = "commit-table"; + String rid = "tables/ns.tbl"; + Instant now = Instant.now(); + Instant exp = now.plus(Duration.ofMinutes(5)); + + IdempotencyStore.ReserveResult r1 = store.reserve(realm, key, op, rid, exp, "A", now); + assertThat(r1.getType()).isEqualTo(IdempotencyStore.ReserveResultType.OWNED); + + IdempotencyStore.ReserveResult r2 = store.reserve(realm, key, op, rid, exp, "B", now); + assertThat(r2.getType()).isEqualTo(IdempotencyStore.ReserveResultType.DUPLICATE); + assertThat(r2.getExisting()).isPresent(); + IdempotencyRecord rec = r2.getExisting().get(); + assertThat(rec.getRealmId()).isEqualTo(realm); + assertThat(rec.getIdempotencyKey()).isEqualTo(key); + assertThat(rec.getOperationType()).isEqualTo(op); + assertThat(rec.getNormalizedResourceId()).isEqualTo(rid); + assertThat(rec.getHttpStatus()).isNull(); + } + + @Test + void heartbeatAndFinalize() { + String realm = "test-realm"; + String key = "K2"; + String op = "commit-table"; + String rid = "tables/ns.tbl2"; + Instant now = Instant.now(); + Instant exp = now.plus(Duration.ofMinutes(5)); + + store.reserve(realm, key, op, rid, exp, "A", now); + boolean hb = store.updateHeartbeat(realm, key, "A", now.plusSeconds(1)); + assertThat(hb).isTrue(); + + boolean fin = + store.finalizeRecord( + realm, + key, + 201, + null, + "{\"ok\":true}", + "{\"Content-Type\":\"application/json\"}", + now.plusSeconds(2)); + assertThat(fin).isTrue(); + + // finalize again should be a no-op + boolean fin2 = + store.finalizeRecord( + realm, + key, + 201, + null, + "{\"ok\":true}", + "{\"Content-Type\":\"application/json\"}", + now.plusSeconds(3)); + assertThat(fin2).isFalse(); + + Optional rec = store.load(realm, key); + assertThat(rec).isPresent(); + assertThat(rec.get().isFinalized()).isTrue(); + assertThat(rec.get().getHttpStatus()).isEqualTo(201); + } + + @Test + void purgeExpired() { + String realm = "test-realm"; + String key = "K3"; + String op = "drop-table"; + String rid = "tables/ns.tbl3"; + Instant now = Instant.now(); + Instant expPast = now.minus(Duration.ofMinutes(1)); + + store.reserve(realm, key, op, rid, expPast, "A", now); + int purged = store.purgeExpired(Instant.now()); + assertThat(purged).isEqualTo(1); + } + + @Test + void duplicateReturnsExistingBindingForMismatch() { + String realm = "test-realm"; + String key = "K4"; + String op1 = "commit-table"; + String rid1 = "tables/ns.tbl4"; + String op2 = "drop-table"; // different binding + String rid2 = "tables/ns.tbl4"; // same resource, different op + Instant now = Instant.now(); + Instant exp = now.plus(Duration.ofMinutes(5)); + + IdempotencyStore.ReserveResult r1 = store.reserve(realm, key, op1, rid1, exp, "A", now); + assertThat(r1.getType()).isEqualTo(IdempotencyStore.ReserveResultType.OWNED); + + // Second reserve with different op/resource should *not* overwrite the original binding. + // The store must return DUPLICATE with the *original* (op1, rid1); the HTTP layer + // (IdempotencyFilter) + // will detect the mismatch and return 422. + IdempotencyStore.ReserveResult r2 = store.reserve(realm, key, op2, rid2, exp, "B", now); + assertThat(r2.getType()).isEqualTo(IdempotencyStore.ReserveResultType.DUPLICATE); + assertThat(r2.getExisting()).isPresent(); + IdempotencyRecord rec = r2.getExisting().get(); + assertThat(rec.getOperationType()).isEqualTo(op1); + assertThat(rec.getNormalizedResourceId()).isEqualTo(rid1); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java new file mode 100644 index 0000000000..03d7b59875 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.core.persistence; + +import java.time.Instant; +import java.util.Optional; +import org.apache.polaris.idempotency.IdempotencyRecord; + +/** + * Abstraction for persisting and querying idempotency records. + * + *

An {@link IdempotencyStore} is responsible for: + * + *

    + *
  • Reserving an idempotency key for a particular operation and resource + *
  • Recording completion status and response metadata + *
  • Allowing callers to look up existing records to detect duplicates + *
  • Expiring and purging old reservations + *
+ * + *

Implementations must be thread-safe if used concurrently. + */ +public interface IdempotencyStore { + + /** High-level outcome of attempting to reserve an idempotency key. */ + enum ReserveResultType { + /** The caller successfully acquired ownership of the idempotency key. */ + OWNED, + /** A reservation already exists for the key; the caller does not own it. */ + DUPLICATE + } + + /** + * Result of a {@link #reserve(String, String, String, String, Instant, String, Instant)} call, + * including the outcome and, when applicable, the existing idempotency record. + */ + final class ReserveResult { + private final ReserveResultType type; + private final Optional existing; + + public ReserveResult(ReserveResultType type, Optional existing) { + this.type = type; + this.existing = existing == null ? Optional.empty() : existing; + } + + /** + * Returns the outcome of the reservation attempt. + * + * @return the {@link ReserveResultType} + */ + public ReserveResultType getType() { + return type; + } + + /** + * Returns the existing idempotency record when {@link #getType()} is {@link + * ReserveResultType#DUPLICATE}, otherwise {@link Optional#empty()}. + * + * @return the existing {@link IdempotencyRecord}, if present + */ + public Optional getExisting() { + return existing; + } + } + + /** + * Attempts to reserve an idempotency key for a given operation and resource. + * + *

If no record exists yet, the implementation should create a new reservation owned by {@code + * executorId}. If a record already exists, the implementation should return {@link + * ReserveResultType#DUPLICATE} along with the existing record. + * + * @param realmId logical tenant or realm identifier + * @param idempotencyKey application-provided idempotency key + * @param operationType logical operation name (e.g., {@code "commit-table"}) + * @param normalizedResourceId normalized identifier of the affected resource + * @param expiresAt timestamp after which the reservation is considered expired + * @param executorId identifier of the caller attempting the reservation + * @param now timestamp representing the current time + * @return {@link ReserveResult} describing whether the caller owns the reservation or hit a + * duplicate + */ + ReserveResult reserve( + String realmId, + String idempotencyKey, + String operationType, + String normalizedResourceId, + Instant expiresAt, + String executorId, + Instant now); + + /** + * Loads an existing idempotency record for the given realm and key, if present. + * + * @param realmId logical tenant or realm identifier + * @param idempotencyKey application-provided idempotency key + * @return the corresponding {@link IdempotencyRecord}, if it exists + */ + Optional load(String realmId, String idempotencyKey); + + /** + * Updates the heartbeat for an in-progress reservation to indicate that the executor is still + * actively processing. + * + * @param realmId logical tenant or realm identifier + * @param idempotencyKey application-provided idempotency key + * @param executorId identifier of the executor that owns the reservation + * @param now timestamp representing the current time + * @return {@code true} if the heartbeat was updated, {@code false} otherwise + */ + boolean updateHeartbeat(String realmId, String idempotencyKey, String executorId, Instant now); + + /** + * Marks an idempotency record as finalized, recording HTTP status and response metadata. + * + *

Implementations should be tolerant of idempotent re-finalization attempts and typically + * return {@code false} when a record was already finalized. + * + * @param realmId logical tenant or realm identifier + * @param idempotencyKey application-provided idempotency key + * @param httpStatus HTTP status code returned to the client, or {@code null} if not applicable + * @param errorSubtype optional error subtype or code, if the operation failed + * @param responseSummary short, serialized representation of the response body + * @param responseHeaders serialized representation of response headers + * @param finalizedAt timestamp when the operation completed + * @return {@code true} if the record was transitioned to a finalized state, {@code false} + * otherwise + */ + boolean finalizeRecord( + String realmId, + String idempotencyKey, + Integer httpStatus, + String errorSubtype, + String responseSummary, + String responseHeaders, + Instant finalizedAt); + + /** + * Purges records whose expiration time is strictly before the given instant. + * + * @param before cutoff instant; records expiring before this time may be removed + * @return number of records that were purged + */ + int purgeExpired(Instant before); +} diff --git a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java b/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java new file mode 100644 index 0000000000..5212796433 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.idempotency; + +import java.time.Instant; + +public final class IdempotencyRecord { + private final String realmId; + private final String idempotencyKey; + private final String operationType; + private final String normalizedResourceId; + + private final Integer httpStatus; + private final String errorSubtype; + private final String responseSummary; + private final String responseHeaders; + private final Instant finalizedAt; + + private final Instant createdAt; + private final Instant updatedAt; + private final Instant heartbeatAt; + private final String executorId; + private final Instant expiresAt; + + public IdempotencyRecord( + String realmId, + String idempotencyKey, + String operationType, + String normalizedResourceId, + Integer httpStatus, + String errorSubtype, + String responseSummary, + String responseHeaders, + Instant createdAt, + Instant updatedAt, + Instant finalizedAt, + Instant heartbeatAt, + String executorId, + Instant expiresAt) { + this.realmId = realmId; + this.idempotencyKey = idempotencyKey; + this.operationType = operationType; + this.normalizedResourceId = normalizedResourceId; + this.httpStatus = httpStatus; + this.errorSubtype = errorSubtype; + this.responseSummary = responseSummary; + this.responseHeaders = responseHeaders; + this.createdAt = createdAt; + this.updatedAt = updatedAt; + this.finalizedAt = finalizedAt; + this.heartbeatAt = heartbeatAt; + this.executorId = executorId; + this.expiresAt = expiresAt; + } + + public String getRealmId() { + return realmId; + } + + public String getIdempotencyKey() { + return idempotencyKey; + } + + public String getOperationType() { + return operationType; + } + + public String getNormalizedResourceId() { + return normalizedResourceId; + } + + public Integer getHttpStatus() { + return httpStatus; + } + + public String getErrorSubtype() { + return errorSubtype; + } + + public String getResponseSummary() { + return responseSummary; + } + + public String getResponseHeaders() { + return responseHeaders; + } + + public Instant getCreatedAt() { + return createdAt; + } + + public Instant getUpdatedAt() { + return updatedAt; + } + + public Instant getFinalizedAt() { + return finalizedAt; + } + + public Instant getHeartbeatAt() { + return heartbeatAt; + } + + public String getExecutorId() { + return executorId; + } + + public Instant getExpiresAt() { + return expiresAt; + } + + public boolean isFinalized() { + return httpStatus != null; + } +}