Skip to content

Implements snapshot-pinned catalog to guarantee query consistency#6

Merged
shefeek-jinnah merged 2 commits into
mainfrom
shefeek/query_catalog_pinned
Dec 16, 2025
Merged

Implements snapshot-pinned catalog to guarantee query consistency#6
shefeek-jinnah merged 2 commits into
mainfrom
shefeek/query_catalog_pinned

Conversation

@shefeek-jinnah
Copy link
Copy Markdown
Collaborator

Summary

Implements snapshot-pinned catalog to guarantee query consistency by binding each catalog instance to a specific snapshot ID at creation time.

Problem

DataFusion's CatalogProvider trait methods (schema(), schema_names()) are context-free and can be called multiple times during query planning. Previously, DuckLakeCatalog called get_current_snapshot() on every
metadata lookup, which could result in:

  • Inconsistent data if the snapshot changed between lookups during a single query
  • Race conditions in concurrent query scenarios

Solution

Bind each catalog instance to a specific snapshot ID at creation time:

  • Fetch snapshot ID once in constructor
  • Store as immutable field
  • All metadata operations use the pinned snapshot
  • Snapshot propagates through Catalog → Schema → Table hierarchy

Changes

Production Code

  • Modified: src/catalog.rs
    • Added snapshot_id: i64 field to DuckLakeCatalog
    • Added with_snapshot(provider, snapshot_id) constructor for explicit snapshot control
    • Updated new() to fetch snapshot once and pin to it
    • Updated schema_names() and schema() to use pinned self.snapshot_id
    • Removed dynamic get_current_snapshot_id() calls

API

Backward Compatible - Existing code works unchanged:

let catalog = DuckLakeCatalog::new(provider)?;

New Explicit API - For time-travel queries or explicit snapshot control:
let provider = Arc::new(DuckdbMetadataProvider::new("catalog.db")?);
let snapshot_id = provider.get_current_snapshot()?;
let catalog = DuckLakeCatalog::with_snapshot(provider, snapshot_id)?;

Usage Pattern (Recommended)

// Create fresh catalog per query/session
async fn handle_query(provider: Arc<dyn MetadataProvider>, sql: &str) -> Result<Vec<RecordBatch>> {
    let snapshot_id = provider.get_current_snapshot()?;
    let catalog = Arc::new(DuckLakeCatalog::with_snapshot(provider, snapshot_id)?);

    let ctx = SessionContext::new();
    ctx.register_catalog("ducklake", catalog);

    let df = ctx.sql(sql).await?;
    df.collect().await
}

@zfarrell zfarrell marked this pull request as ready for review December 15, 2025 17:38
Copy link
Copy Markdown
Collaborator

@zfarrell zfarrell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. it might be nice to add some tests to assert the implications of this change, but not blocking.

@shefeek-jinnah shefeek-jinnah merged commit 304ed9b into main Dec 16, 2025
3 checks passed
@shefeek-jinnah shefeek-jinnah deleted the shefeek/query_catalog_pinned branch December 16, 2025 04:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants