Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.IdempotencyPersistence;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.bootstrap.RootCredentialsSet;
Expand Down Expand Up @@ -136,6 +137,12 @@ public BasePersistence getOrCreateSession(RealmContext realmContext) {
return newPersistenceMetaStore(initializedRealmPersistence(realmContext.getRealmIdentifier()));
}

@Override
public IdempotencyPersistence getOrCreateIdempotencyPersistence(RealmContext realmContext) {
throw new UnsupportedOperationException(
"NoSQL backend does not currently implement IdempotencyPersistence");
}

@Override
public PolarisMetaStoreManager getOrCreateMetaStoreManager(RealmContext realmContext) {
var realmId = realmContext.getRealmIdentifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public String getDisplayName() {
*/
public int getLatestSchemaVersion() {
return switch (this) {
case POSTGRES -> 4; // PostgreSQL has schemas v1, v2, v3, v4
case COCKROACHDB -> 4; // CockroachDB schema version kept in sync with PostgreSQL
case H2 -> 4; // H2 uses same schemas as PostgreSQL
case POSTGRES -> 5; // PostgreSQL has schemas v1, v2, v3, v4, v5

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok, but it might be best to wait until 1.5.0 is branched to avoid major schema changes just before the release... WDYT?

case COCKROACHDB -> 5; // CockroachDB schema version kept in sync with PostgreSQL
case H2 -> 5; // H2 uses same schemas as PostgreSQL
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -33,13 +36,15 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.IdempotencyRecord;
import org.apache.polaris.core.entity.LocationBasedEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
Expand All @@ -54,6 +59,8 @@
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.EntityAlreadyExistsException;
import org.apache.polaris.core.persistence.IdempotencyPersistence;
import org.apache.polaris.core.persistence.IdempotencyPersistenceException;
import org.apache.polaris.core.persistence.IntegrationPersistence;
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
Expand All @@ -70,19 +77,22 @@
import org.apache.polaris.core.storage.PolarisStorageIntegration;
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.persistence.relational.jdbc.models.Converter;
import org.apache.polaris.persistence.relational.jdbc.models.EntityNameLookupRecordConverter;
import org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReport;
import org.apache.polaris.persistence.relational.jdbc.models.ModelEntity;
import org.apache.polaris.persistence.relational.jdbc.models.ModelEvent;
import org.apache.polaris.persistence.relational.jdbc.models.ModelGrantRecord;
import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord;
import org.apache.polaris.persistence.relational.jdbc.models.ModelPolicyMappingRecord;
import org.apache.polaris.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData;
import org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReport;
import org.apache.polaris.persistence.relational.jdbc.models.SchemaVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPersistence {
public class JdbcBasePersistenceImpl
implements BasePersistence, IntegrationPersistence, IdempotencyPersistence {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBasePersistenceImpl.class);

Expand Down Expand Up @@ -1336,4 +1346,232 @@ private void writeCommitMetricsReport(@Nonnull ModelCommitMetricsReport report)
String.format("Failed to write commit metrics report due to %s", e.getMessage()), e);
}
}

// ============================================================================
// IdempotencyPersistence Implementation
// ============================================================================

@Override
public ReserveResult reserve(
String realmId,
String idempotencyKey,
String operationType,
String normalizedResourceId,
String principalHash,
Instant expiresAt,
String executorId,
Instant now) {
try {
Map<String, Object> insertMap = new LinkedHashMap<>();
insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
insertMap.put(ModelIdempotencyRecord.PRINCIPAL_HASH, principalHash);
insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null);
insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null);
insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null);
insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null);
insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.HEARTBEAT_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);
insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(expiresAt));

List<Object> values = insertMap.values().stream().toList();
PreparedQuery insert =
QueryGenerator.generateInsertQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
values,
realmId);
datasourceOperations.executeUpdate(insert);
return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
} catch (SQLException e) {
if (datasourceOperations.isConstraintViolation(e)) {
return new ReserveResult(
ReserveResultType.DUPLICATE, loadIdempotencyRecord(realmId, idempotencyKey));
}
throw new IdempotencyPersistenceException("Failed to reserve idempotency key", e);
}
}

@Override
public Optional<IdempotencyRecord> loadIdempotencyRecord(String realmId, String idempotencyKey) {
try {
PreparedQuery query =
QueryGenerator.generateSelectQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
Map.of(
ModelIdempotencyRecord.REALM_ID,
realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
idempotencyKey));
List<IdempotencyRecord> results =
datasourceOperations.executeSelect(
query,
new Converter<>() {
@Override
public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException {
return ModelIdempotencyRecord.fromRow(realmId, rs);
}

@Override
public Map<String, Object> toMap(DatabaseType databaseType) {
throw new UnsupportedOperationException("Not used for SELECT conversion");
}
});
if (results.isEmpty()) {
return Optional.empty();
}
if (results.size() > 1) {
throw new IllegalStateException(
"More than one idempotency record found for realm/key: "
+ realmId
+ "/"
+ idempotencyKey);
}
return Optional.of(results.getFirst());
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to load idempotency record", e);
}
}

@Override
public HeartbeatResult updateHeartbeat(
String realmId, String idempotencyKey, String executorId, Instant now) {
Optional<IdempotencyRecord> existing = loadIdempotencyRecord(realmId, idempotencyKey);
if (existing.isEmpty()) {
return HeartbeatResult.NOT_FOUND;
}

IdempotencyRecord record = existing.get();
if (record.httpStatus() != null) {
return HeartbeatResult.FINALIZED;
}
if (record.executorId() == null || !record.executorId().equals(executorId)) {
return HeartbeatResult.LOST_OWNERSHIP;
}

PreparedQuery update =
QueryGenerator.generateUpdateQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
Map.of(
ModelIdempotencyRecord.HEARTBEAT_AT,
Timestamp.from(now),
ModelIdempotencyRecord.UPDATED_AT,
Timestamp.from(now)),
Map.of(
ModelIdempotencyRecord.REALM_ID,
realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
idempotencyKey,
ModelIdempotencyRecord.EXECUTOR_ID,
executorId),
Map.of(),
Map.of(),
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
Set.of());

try {
int updated = datasourceOperations.executeUpdate(update);
if (updated > 0) {
return HeartbeatResult.UPDATED;
}
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to update idempotency heartbeat", e);
}

Optional<IdempotencyRecord> after = loadIdempotencyRecord(realmId, idempotencyKey);
if (after.isEmpty()) {
return HeartbeatResult.NOT_FOUND;
}
if (after.get().httpStatus() != null) {
return HeartbeatResult.FINALIZED;
}
return HeartbeatResult.LOST_OWNERSHIP;
}

@Override
public boolean cancelInProgressReservation(
String realmId, String idempotencyKey, String executorId) {
try {
PreparedQuery delete =
QueryGenerator.generateDeleteQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
Map.of(
ModelIdempotencyRecord.REALM_ID,
realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
idempotencyKey,
ModelIdempotencyRecord.EXECUTOR_ID,
executorId),
Map.of(),
Map.of(),
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
Set.of());
return datasourceOperations.executeUpdate(delete) > 0;
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to cancel idempotency reservation", e);
}
}

@Override
public boolean finalizeRecord(
String realmId,
String idempotencyKey,
String executorId,
Integer httpStatus,
String errorSubtype,
String responseSummary,
Instant finalizedAt) {
Map<String, Object> setClause = new LinkedHashMap<>();
setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary);
setClause.put(ModelIdempotencyRecord.FINALIZED_AT, Timestamp.from(finalizedAt));
setClause.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(finalizedAt));

Map<String, Object> whereEquals = new HashMap<>();
whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId);
whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
whereEquals.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);

PreparedQuery update =
QueryGenerator.generateUpdateQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
setClause,
whereEquals,
Map.of(),
Map.of(),
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
Set.of());

try {
return datasourceOperations.executeUpdate(update) > 0;
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to finalize idempotency record", e);
}
}

@Override
public int purgeExpired(String realmId, Instant before) {
try {
PreparedQuery delete =
QueryGenerator.generateDeleteQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
Map.of(ModelIdempotencyRecord.REALM_ID, realmId),
Map.of(),
Map.of(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(before)),
Set.of(),
Set.of(ModelIdempotencyRecord.EXPIRES_AT));
return datasourceOperations.executeUpdate(delete);
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to purge expired idempotency records", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.IdempotencyPersistence;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
Expand Down Expand Up @@ -117,7 +118,7 @@ private int getOrLoadSchemaVersion(String realmId, boolean fallbackOnDne) {
}

/** Creates a new stateless {@link JdbcBasePersistenceImpl} for the given realm. */
private BasePersistence createSession(
private JdbcBasePersistenceImpl createSession(
String realmId, @Nullable RootCredentialsSet rootCredentialsSet, boolean fallbackOnDne) {
int schemaVersion = getOrLoadSchemaVersion(realmId, fallbackOnDne);
return new JdbcBasePersistenceImpl(
Expand Down Expand Up @@ -233,6 +234,15 @@ public PolarisMetaStoreManager getOrCreateMetaStoreManager(RealmContext realmCon

@Override
public BasePersistence getOrCreateSession(RealmContext realmContext) {
return getOrCreateJdbcPersistence(realmContext);
}

@Override
public IdempotencyPersistence getOrCreateIdempotencyPersistence(RealmContext realmContext) {
return getOrCreateJdbcPersistence(realmContext);
}

private JdbcBasePersistenceImpl getOrCreateJdbcPersistence(RealmContext realmContext) {
String realmId = realmContext.getRealmIdentifier();
RealmConfig realmConfig = new RealmConfigImpl(realmConfigurationSource, realmContext);
boolean fallbackOnDne =
Expand Down
Loading