Skip to content

Commit 4c4acd8

Browse files
committed
Add queries for mysql and postgres provider
1 parent ffc8688 commit 4c4acd8

2 files changed

Lines changed: 246 additions & 14 deletions

File tree

src/metadata_provider_mysql.rs

Lines changed: 127 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -529,14 +529,112 @@ impl MetadataProvider for MySqlMetadataProvider {
529529
end_snapshot: i64,
530530
) -> Result<Vec<DeleteFileChange>> {
531531
block_on(async {
532+
// MySQL equivalent of DuckDB's SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS
533+
// Uses LATERAL (supported in MySQL 8.0.14+) for previous delete file lookup
532534
let rows = sqlx::query(
533-
"SELECT del.begin_snapshot
534-
FROM ducklake_delete_file AS del
535-
WHERE del.table_id = ?
536-
AND del.begin_snapshot > ?
537-
AND del.begin_snapshot <= ?
538-
ORDER BY del.begin_snapshot",
535+
r#"
536+
WITH current_delete AS (
537+
SELECT
538+
ddf.data_file_id,
539+
ddf.begin_snapshot,
540+
ddf.path,
541+
ddf.path_is_relative,
542+
ddf.file_size_bytes,
543+
ddf.footer_size,
544+
ddf.encryption_key
545+
FROM ducklake_delete_file ddf
546+
WHERE ddf.table_id = ?
547+
AND ddf.begin_snapshot > ?
548+
AND ddf.begin_snapshot <= ?
549+
),
550+
551+
data_files AS (
552+
SELECT df.*
553+
FROM ducklake_data_file df
554+
WHERE df.table_id = ?
555+
)
556+
557+
-- Part 1: Incremental deletes
558+
SELECT
559+
data.path,
560+
data.path_is_relative,
561+
data.file_size_bytes,
562+
data.footer_size,
563+
data.row_id_start,
564+
data.record_count,
565+
data.mapping_id,
566+
current_delete.path,
567+
current_delete.path_is_relative,
568+
current_delete.file_size_bytes,
569+
current_delete.footer_size,
570+
prev.path,
571+
prev.path_is_relative,
572+
prev.file_size_bytes,
573+
prev.footer_size,
574+
current_delete.begin_snapshot
575+
FROM current_delete
576+
JOIN data_files data USING (data_file_id)
577+
LEFT JOIN LATERAL (
578+
SELECT
579+
ddf.path,
580+
ddf.path_is_relative,
581+
ddf.file_size_bytes,
582+
ddf.footer_size
583+
FROM ducklake_delete_file ddf
584+
WHERE ddf.table_id = ?
585+
AND ddf.data_file_id = current_delete.data_file_id
586+
AND ddf.begin_snapshot < current_delete.begin_snapshot
587+
ORDER BY ddf.begin_snapshot DESC
588+
LIMIT 1
589+
) prev ON true
590+
591+
UNION ALL
592+
593+
-- Part 2: Full file deletes
594+
SELECT
595+
data.path,
596+
data.path_is_relative,
597+
data.file_size_bytes,
598+
data.footer_size,
599+
data.row_id_start,
600+
data.record_count,
601+
data.mapping_id,
602+
NULL,
603+
NULL,
604+
NULL,
605+
NULL,
606+
prev.path,
607+
prev.path_is_relative,
608+
prev.file_size_bytes,
609+
prev.footer_size,
610+
data.end_snapshot
611+
FROM ducklake_data_file data
612+
LEFT JOIN LATERAL (
613+
SELECT
614+
ddf.path,
615+
ddf.path_is_relative,
616+
ddf.file_size_bytes,
617+
ddf.footer_size
618+
FROM ducklake_delete_file ddf
619+
WHERE ddf.table_id = ?
620+
AND ddf.data_file_id = data.data_file_id
621+
AND ddf.begin_snapshot < data.end_snapshot
622+
ORDER BY ddf.begin_snapshot DESC
623+
LIMIT 1
624+
) prev ON true
625+
WHERE data.table_id = ?
626+
AND data.end_snapshot > ?
627+
AND data.end_snapshot <= ?
628+
"#,
539629
)
630+
// Part 1 bindings: table_id (current_delete), start_snapshot, end_snapshot, table_id (data_files), table_id (prev lateral)
631+
.bind(table_id)
632+
.bind(start_snapshot)
633+
.bind(end_snapshot)
634+
.bind(table_id)
635+
.bind(table_id)
636+
// Part 2 bindings: table_id (prev lateral), table_id (data), start_snapshot, end_snapshot
637+
.bind(table_id)
540638
.bind(table_id)
541639
.bind(start_snapshot)
542640
.bind(end_snapshot)
@@ -546,7 +644,29 @@ impl MetadataProvider for MySqlMetadataProvider {
546644
rows.into_iter()
547645
.map(|row| {
548646
Ok(DeleteFileChange {
549-
begin_snapshot: row.try_get(0)?,
647+
// data file
648+
data_file_path: row.try_get(0)?,
649+
data_file_path_is_relative: row.try_get(1)?,
650+
data_file_size_bytes: row.try_get(2)?,
651+
data_file_footer_size: row.try_get(3)?,
652+
data_row_id_start: row.try_get(4)?,
653+
data_record_count: row.try_get(5)?,
654+
data_mapping_id: row.try_get(6)?,
655+
656+
// current delete
657+
current_delete_path: row.try_get(7)?,
658+
current_delete_path_is_relative: row.try_get(8)?,
659+
current_delete_file_size_bytes: row.try_get(9)?,
660+
current_delete_footer_size: row.try_get(10)?,
661+
662+
// previous delete
663+
previous_delete_path: row.try_get(11)?,
664+
previous_delete_path_is_relative: row.try_get(12)?,
665+
previous_delete_file_size_bytes: row.try_get(13)?,
666+
previous_delete_footer_size: row.try_get(14)?,
667+
668+
// snapshot
669+
snapshot_id: row.try_get(15)?,
550670
})
551671
})
552672
.collect()

src/metadata_provider_postgres.rs

Lines changed: 119 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -562,13 +562,103 @@ impl MetadataProvider for PostgresMetadataProvider {
562562
end_snapshot: i64,
563563
) -> Result<Vec<DeleteFileChange>> {
564564
block_on(async {
565+
// PostgreSQL equivalent of DuckDB's SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS
566+
// Uses LATERAL joins instead of MAX_BY/COLUMNS
565567
let rows = sqlx::query(
566-
"SELECT del.begin_snapshot
567-
FROM ducklake_delete_file AS del
568-
WHERE del.table_id = $1
569-
AND del.begin_snapshot > $2
570-
AND del.begin_snapshot <= $3
571-
ORDER BY del.begin_snapshot",
568+
r#"
569+
WITH current_delete AS (
570+
SELECT
571+
ddf.data_file_id,
572+
ddf.begin_snapshot,
573+
ddf.path,
574+
ddf.path_is_relative,
575+
ddf.file_size_bytes,
576+
ddf.footer_size,
577+
ddf.encryption_key
578+
FROM ducklake_delete_file ddf
579+
WHERE ddf.table_id = $1
580+
AND ddf.begin_snapshot > $2
581+
AND ddf.begin_snapshot <= $3
582+
),
583+
584+
data_files AS (
585+
SELECT df.*
586+
FROM ducklake_data_file df
587+
WHERE df.table_id = $1
588+
)
589+
590+
-- Part 1: Incremental deletes
591+
SELECT
592+
data.path,
593+
data.path_is_relative,
594+
data.file_size_bytes,
595+
data.footer_size,
596+
data.row_id_start,
597+
data.record_count,
598+
data.mapping_id,
599+
current_delete.path,
600+
current_delete.path_is_relative,
601+
current_delete.file_size_bytes,
602+
current_delete.footer_size,
603+
prev.path,
604+
prev.path_is_relative,
605+
prev.file_size_bytes,
606+
prev.footer_size,
607+
current_delete.begin_snapshot
608+
FROM current_delete
609+
JOIN data_files data USING (data_file_id)
610+
LEFT JOIN LATERAL (
611+
SELECT
612+
ddf.path,
613+
ddf.path_is_relative,
614+
ddf.file_size_bytes,
615+
ddf.footer_size
616+
FROM ducklake_delete_file ddf
617+
WHERE ddf.table_id = $1
618+
AND ddf.data_file_id = current_delete.data_file_id
619+
AND ddf.begin_snapshot < current_delete.begin_snapshot
620+
ORDER BY ddf.begin_snapshot DESC
621+
LIMIT 1
622+
) prev ON true
623+
624+
UNION ALL
625+
626+
-- Part 2: Full file deletes
627+
SELECT
628+
data.path,
629+
data.path_is_relative,
630+
data.file_size_bytes,
631+
data.footer_size,
632+
data.row_id_start,
633+
data.record_count,
634+
data.mapping_id,
635+
NULL::VARCHAR,
636+
NULL::BOOLEAN,
637+
NULL::BIGINT,
638+
NULL::BIGINT,
639+
prev.path,
640+
prev.path_is_relative,
641+
prev.file_size_bytes,
642+
prev.footer_size,
643+
data.end_snapshot
644+
FROM ducklake_data_file data
645+
LEFT JOIN LATERAL (
646+
SELECT
647+
ddf.path,
648+
ddf.path_is_relative,
649+
ddf.file_size_bytes,
650+
ddf.footer_size
651+
FROM ducklake_delete_file ddf
652+
WHERE ddf.table_id = $1
653+
AND ddf.data_file_id = data.data_file_id
654+
AND ddf.begin_snapshot < data.end_snapshot
655+
ORDER BY ddf.begin_snapshot DESC
656+
LIMIT 1
657+
) prev ON true
658+
WHERE data.table_id = $1
659+
AND data.end_snapshot > $2
660+
AND data.end_snapshot <= $3
661+
"#,
572662
)
573663
.bind(table_id)
574664
.bind(start_snapshot)
@@ -579,7 +669,29 @@ impl MetadataProvider for PostgresMetadataProvider {
579669
rows.into_iter()
580670
.map(|row| {
581671
Ok(DeleteFileChange {
582-
begin_snapshot: row.try_get(0)?,
672+
// data file
673+
data_file_path: row.try_get(0)?,
674+
data_file_path_is_relative: row.try_get(1)?,
675+
data_file_size_bytes: row.try_get(2)?,
676+
data_file_footer_size: row.try_get(3)?,
677+
data_row_id_start: row.try_get(4)?,
678+
data_record_count: row.try_get(5)?,
679+
data_mapping_id: row.try_get(6)?,
680+
681+
// current delete
682+
current_delete_path: row.try_get(7)?,
683+
current_delete_path_is_relative: row.try_get(8)?,
684+
current_delete_file_size_bytes: row.try_get(9)?,
685+
current_delete_footer_size: row.try_get(10)?,
686+
687+
// previous delete
688+
previous_delete_path: row.try_get(11)?,
689+
previous_delete_path_is_relative: row.try_get(12)?,
690+
previous_delete_file_size_bytes: row.try_get(13)?,
691+
previous_delete_footer_size: row.try_get(14)?,
692+
693+
// snapshot
694+
snapshot_id: row.try_get(15)?,
583695
})
584696
})
585697
.collect()

0 commit comments

Comments
 (0)