Skip to content

Commit 6d97262

Browse files
committed
Fix Postgres metadata passthrough regressions
1 parent cf7dbba commit 6d97262

5 files changed

Lines changed: 125 additions & 19 deletions

File tree

src/include/metadata_manager/postgres_metadata_manager.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class PostgresMetadataManager : public DuckLakeMetadataManager {
4242

4343
protected:
4444
string GetLatestSnapshotQuery() const override;
45+
bool InlinedDeletionTableExists(TableIndex table_id, DuckLakeSnapshot snapshot, const string &table_name) override;
4546
string CastValueToTarget(const Value &val, const LogicalType &type) override;
4647
string CastStatsToTarget(const string &stats, const LogicalType &type) override;
4748
string GenerateConstantFilter(const ConstantFilter &constant_filter, const LogicalType &type,
@@ -53,6 +54,8 @@ class PostgresMetadataManager : public DuckLakeMetadataManager {
5354
string GetPostgresIndexStatements();
5455
string GetPostgresStatsType(const LogicalType &type);
5556
bool CanCastStatsForValueComparison(const LogicalType &type);
57+
bool IsPostgresTemporalStatsType(const LogicalType &type);
58+
bool CanCastTemporalValueForValueComparison(const Value &val, const LogicalType &type);
5659
};
5760

5861
} // namespace duckdb

src/include/storage/ducklake_metadata_manager.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ class DuckLakeMetadataManager {
264264

265265
protected:
266266
virtual string GetLatestSnapshotQuery() const;
267+
virtual bool InlinedDeletionTableExists(TableIndex table_id, DuckLakeSnapshot snapshot, const string &table_name);
267268

268269
//! Wrap field selections with list aggregation of struct objects (DBMS-specific)
269270
//! For DuckDB: LIST({'key1': val1, 'key2': val2, ...})

src/metadata_manager/postgres_metadata_manager.cpp

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,16 @@
77

88
namespace duckdb {
99

10+
static bool IsDigit(char c) {
11+
return c >= '0' && c <= '9';
12+
}
13+
14+
static bool HasFourDigitDatePrefix(const string &value) {
15+
return value.size() >= 10 && IsDigit(value[0]) && IsDigit(value[1]) && IsDigit(value[2]) && IsDigit(value[3]) &&
16+
value[4] == '-' && IsDigit(value[5]) && IsDigit(value[6]) && value[7] == '-' && IsDigit(value[8]) &&
17+
IsDigit(value[9]);
18+
}
19+
1020
PostgresMetadataManager::PostgresMetadataManager(DuckLakeTransaction &transaction)
1121
: DuckLakeMetadataManager(transaction) {
1222
}
@@ -120,13 +130,42 @@ string PostgresMetadataManager::GetPostgresStatsType(const LogicalType &type) {
120130
return "REAL";
121131
case LogicalTypeId::DOUBLE:
122132
return "DOUBLE PRECISION";
133+
case LogicalTypeId::DATE:
134+
return "DATE";
135+
case LogicalTypeId::TIMESTAMP:
136+
case LogicalTypeId::TIMESTAMP_SEC:
137+
case LogicalTypeId::TIMESTAMP_MS:
138+
return "TIMESTAMP";
123139
default:
124140
return type.ToString();
125141
}
126142
}
127143

144+
bool PostgresMetadataManager::IsPostgresTemporalStatsType(const LogicalType &type) {
145+
switch (type.id()) {
146+
case LogicalTypeId::DATE:
147+
case LogicalTypeId::TIMESTAMP:
148+
case LogicalTypeId::TIMESTAMP_SEC:
149+
case LogicalTypeId::TIMESTAMP_MS:
150+
return true;
151+
default:
152+
return false;
153+
}
154+
}
155+
156+
bool PostgresMetadataManager::CanCastTemporalValueForValueComparison(const Value &val, const LogicalType &type) {
157+
auto value = val.ToString();
158+
if (!HasFourDigitDatePrefix(value)) {
159+
return false;
160+
}
161+
if (type.id() == LogicalTypeId::DATE) {
162+
return value.size() == 10;
163+
}
164+
return IsPostgresTemporalStatsType(type);
165+
}
166+
128167
bool PostgresMetadataManager::CanCastStatsForValueComparison(const LogicalType &type) {
129-
return type.IsNumeric() || type.id() == LogicalTypeId::BOOLEAN;
168+
return type.IsNumeric() || type.id() == LogicalTypeId::BOOLEAN || IsPostgresTemporalStatsType(type);
130169
}
131170

132171
string PostgresMetadataManager::CastValueToTarget(const Value &val, const LogicalType &type) {
@@ -138,13 +177,27 @@ string PostgresMetadataManager::CastValueToTarget(const Value &val, const Logica
138177
return val.ToString();
139178
}
140179
auto literal = DuckLakeUtil::SQLLiteralToString(val.ToString());
180+
if (IsPostgresTemporalStatsType(type) && CanCastTemporalValueForValueComparison(val, type)) {
181+
return literal + "::" + GetPostgresStatsType(type);
182+
}
141183
if (RequiresValueComparison(type) && CanCastStatsForValueComparison(type)) {
142184
return literal + "::" + GetPostgresStatsType(type);
143185
}
144186
return literal;
145187
}
146188

147189
string PostgresMetadataManager::CastStatsToTarget(const string &stats, const LogicalType &type) {
190+
if (IsPostgresTemporalStatsType(type)) {
191+
string regex;
192+
if (type.id() == LogicalTypeId::DATE) {
193+
regex = "'^[0-9]{4}-(0[1-9]|1[0-2])-([0][1-9]|[12][0-9]|3[01])$'";
194+
} else {
195+
regex =
196+
"'^[0-9]{4}-(0[1-9]|1[0-2])-([0][1-9]|[12][0-9]|3[01])( [0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]{1,6})?)?$'";
197+
}
198+
return StringUtil::Format("(CASE WHEN %s ~ %s THEN %s::%s END)", stats, regex, stats,
199+
GetPostgresStatsType(type));
200+
}
148201
if (RequiresValueComparison(type) && CanCastStatsForValueComparison(type)) {
149202
return stats + "::" + GetPostgresStatsType(type);
150203
}
@@ -156,9 +209,17 @@ string PostgresMetadataManager::GenerateConstantFilter(const ConstantFilter &con
156209
if (RequiresValueComparison(type) && !CanCastStatsForValueComparison(type)) {
157210
return string();
158211
}
212+
if (IsPostgresTemporalStatsType(type) && !CanCastTemporalValueForValueComparison(constant_filter.constant, type)) {
213+
return string();
214+
}
159215
auto constant_str = CastValueToTarget(constant_filter.constant, type);
160216
auto min_value = CastStatsToTarget("min_value", type);
161217
auto max_value = CastStatsToTarget("max_value", type);
218+
if (IsPostgresTemporalStatsType(type)) {
219+
auto postgres_type = GetPostgresStatsType(type);
220+
min_value = StringUtil::Format("COALESCE(%s, '-infinity'::%s)", min_value, postgres_type);
221+
max_value = StringUtil::Format("COALESCE(%s, 'infinity'::%s)", max_value, postgres_type);
222+
}
162223
switch (constant_filter.comparison_type) {
163224
case ExpressionType::COMPARE_EQUAL:
164225
referenced_stats.insert("min_value");
@@ -215,7 +276,12 @@ unique_ptr<QueryResult> PostgresMetadataManager::ExecuteQuery(DuckLakeSnapshot s
215276
query = StringUtil::Replace(query, "{DATA_PATH}", data_path);
216277

217278
auto passthrough_query = StringUtil::Format("CALL %s(%s, %s)", command, catalog_literal, SQLString(query));
218-
return transaction.Query(passthrough_query);
279+
auto result = transaction.Query(passthrough_query);
280+
if (command == "postgres_execute" && !result->HasError()) {
281+
while (result->Fetch()) {
282+
}
283+
}
284+
return result;
219285
}
220286

221287
unique_ptr<QueryResult> PostgresMetadataManager::ExecuteQuery(string &query, string command) {
@@ -241,14 +307,34 @@ unique_ptr<QueryResult> PostgresMetadataManager::Query(string &query) {
241307

242308
string PostgresMetadataManager::GetLatestSnapshotQuery() const {
243309
return R"(
244-
SELECT * FROM postgres_query({METADATA_CATALOG_NAME_LITERAL},
245-
'SELECT snapshot_id, schema_version, next_catalog_id, next_file_id
246-
FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot WHERE snapshot_id = (
247-
SELECT MAX(snapshot_id) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot
248-
);')
310+
SELECT * FROM postgres_query({METADATA_CATALOG_NAME_LITERAL},
311+
'SELECT snapshot_id, schema_version, next_catalog_id, next_file_id
312+
FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot WHERE snapshot_id = (
313+
SELECT MAX(snapshot_id) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot
314+
);')
249315
)";
250316
}
251317

318+
bool PostgresMetadataManager::InlinedDeletionTableExists(TableIndex, DuckLakeSnapshot snapshot,
319+
const string &table_name) {
320+
auto query = StringUtil::Format(R"(
321+
SELECT EXISTS (
322+
SELECT 1
323+
FROM information_schema.tables
324+
WHERE table_schema = {METADATA_SCHEMA_NAME_LITERAL}
325+
AND table_name = %s
326+
))",
327+
DuckLakeUtil::SQLLiteralToString(table_name));
328+
auto result = Query(snapshot, query);
329+
if (result->HasError()) {
330+
return false;
331+
}
332+
for (auto &row : *result) {
333+
return row.GetValue<bool>(0);
334+
}
335+
return false;
336+
}
337+
252338
// We need a specialized function here to do a reinterpret for postgres from BLOB to VARCHAR
253339
shared_ptr<DuckLakeInlinedData>
254340
PostgresMetadataManager::TransformInlinedData(QueryResult &result, const vector<LogicalType> &expected_types) {

src/storage/ducklake_initializer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ void DuckLakeInitializer::Initialize() {
6464
error_obj.Throw("Failed to attach DuckLake MetaData \"" + catalog.MetadataDatabaseName() + "\" at path + \"" +
6565
catalog.MetadataPath() + "\"");
6666
}
67+
if (catalog.MetadataType() == "postgres" || catalog.MetadataType() == "postgres_scanner") {
68+
transaction.Query("SET pg_connection_cache=false");
69+
}
6770
// explicitly load all secrets - work-around to secret initialization bug
6871
transaction.Query("FROM duckdb_secrets()");
6972

src/storage/ducklake_metadata_manager.cpp

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,13 @@ WHERE {SNAPSHOT_ID} >= begin_snapshot
513513
return 0;
514514
}
515515

516+
bool DuckLakeMetadataManager::InlinedDeletionTableExists(TableIndex, DuckLakeSnapshot snapshot,
517+
const string &table_name) {
518+
auto query = StringUtil::Format("SELECT NULL FROM {METADATA_CATALOG}.%s LIMIT 1", table_name);
519+
auto result = Query(snapshot, query);
520+
return !result->HasError();
521+
}
522+
516523
DuckLakeCatalogInfo DuckLakeMetadataManager::GetCatalogForSnapshot(DuckLakeSnapshot snapshot) {
517524
auto &ducklake_catalog = transaction.GetCatalog();
518525
auto &base_data_path = ducklake_catalog.DataPath();
@@ -901,7 +908,7 @@ LEFT JOIN {METADATA_CATALOG}.ducklake_table_column_stats USING (table_id)
901908
WHERE record_count IS NOT NULL AND file_size_bytes IS NOT NULL
902909
ORDER BY table_id;
903910
)";
904-
auto result = Query(snapshot, query);
911+
auto result = transaction.Query(snapshot, query);
905912
return TransformGlobalStats(*result);
906913
}
907914

@@ -1654,20 +1661,28 @@ LEFT JOIN LATERAL (
16541661
FROM {METADATA_CATALOG}.ducklake_delete_file
16551662
WHERE table_id = %d AND begin_snapshot < data.end_snapshot
16561663
ORDER BY data_file_id, begin_snapshot DESC
1657-
) AS previous_delete
1658-
USING (data_file_id), (
1659-
SELECT NULL path, NULL path_is_relative, NULL file_size_bytes, NULL footer_size, NULL encryption_key, NULL format
1660-
) current_delete
1661-
)",
1664+
) AS previous_delete
1665+
USING (data_file_id), (
1666+
SELECT
1667+
CAST(NULL AS VARCHAR) path,
1668+
CAST(NULL AS BOOLEAN) path_is_relative,
1669+
CAST(NULL AS BIGINT) file_size_bytes,
1670+
CAST(NULL AS BIGINT) footer_size,
1671+
CAST(NULL AS VARCHAR) encryption_key,
1672+
CAST(NULL AS VARCHAR) format
1673+
) current_delete
1674+
)",
16621675
select_list, table_id.index, table_id.index, start_snapshot.snapshot_id, table_id.index,
16631676
select_list, table_id.index, start_snapshot.snapshot_id, table_id.index);
16641677

16651678
if (has_inlined_table) {
1666-
string null_file_cols = "NULL path, NULL path_is_relative, NULL file_size_bytes, NULL footer_size";
1679+
string null_file_cols =
1680+
"CAST(NULL AS VARCHAR) path, CAST(NULL AS BOOLEAN) path_is_relative, CAST(NULL AS BIGINT) file_size_bytes, "
1681+
"CAST(NULL AS BIGINT) footer_size";
16671682
if (IsEncrypted()) {
1668-
null_file_cols += ", NULL encryption_key";
1683+
null_file_cols += ", CAST(NULL AS VARCHAR) encryption_key";
16691684
}
1670-
null_file_cols += ", NULL format";
1685+
null_file_cols += ", CAST(NULL AS VARCHAR) format";
16711686
query += StringUtil::Format(R"(
16721687
UNION ALL
16731688
@@ -2676,12 +2691,10 @@ string DuckLakeMetadataManager::GetInlinedDeletionTableName(TableIndex table_id,
26762691
}
26772692

26782693
// Read path: table visibility implies it was committed, safe to cache at catalog level
2679-
auto query = StringUtil::Format("SELECT NULL FROM {METADATA_CATALOG}.%s LIMIT 1", table_name);
2680-
auto result = Query(snapshot, query);
26812694
// TODO: Using the error state to check for existence here is fragile.
26822695
// Even if the table exists, a transient error in the catalog query would lead us to assume it does not exist.
26832696
// Maybe persist the existence of the deletion inlining table on the table metadata instead?
2684-
if (!result->HasError()) {
2697+
if (InlinedDeletionTableExists(table_id, snapshot, table_name)) {
26852698
delete_inlined_table_cache.insert(table_id.index);
26862699
catalog.CacheInlinedDeletionTableResult(table_id, snapshot, true);
26872700
return table_name;

0 commit comments

Comments
 (0)