Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ jobs:
cargo nextest archive -p runtime --test integration --features ${FEATURES} --archive-file integration.tar.zst
cargo nextest archive -p runtime --test retention_oom --features ${FEATURES} --archive-file retention_oom.tar.zst
cargo nextest archive -p runtime --test integration_aws_sdk --features databricks,delta_lake --archive-file integration_aws_sdk.tar.zst
cargo nextest archive -p runtime --test integration_snapshot_refresh --features duckdb,sqlite,turso,snapshots --archive-file integration_snapshot_refresh.tar.zst
cargo nextest archive -p aws-sdk-credential-bridge --test credential_provider --archive-file integration_aws_sdk_credential_bridge.tar.zst
cargo nextest archive -p runtime-table-partition --test partition_table_provider --archive-file partition_table_test.tar.zst
cargo nextest archive -p data_components --test hadoop_catalog_test --archive-file data_components_hadoop.tar.zst
Expand Down Expand Up @@ -175,6 +176,16 @@ jobs:
# Archive is already zstd-compressed; use minimal artifact zip compression.
compression-level: 1

- name: Upload snapshot refresh test archive
if: needs.check_changes.outputs.relevant_changes == 'true'
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7
with:
name: integration-snapshot-refresh-test-archive
path: ./integration_snapshot_refresh.tar.zst
retention-days: 3
# Archive is already zstd-compressed; use minimal artifact zip compression.
compression-level: 1

- name: Upload AWS SDK credential bridge test archive
if: needs.check_changes.outputs.relevant_changes == 'true'
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7
Expand Down Expand Up @@ -499,6 +510,45 @@ jobs:
run: |
INSTA_WORKSPACE_ROOT="${PWD}" CARGO_MANIFEST_DIR="${PWD}" cargo nextest run --workspace-remap "${PWD}" --archive-file ./integration_aws_sdk_credential_bridge_test/integration_aws_sdk_credential_bridge.tar.zst

test-snapshot-refresh:
name: Snapshot Refresh Mode Integration Tests
needs: [build, check_changes]
if: needs.check_changes.outputs.relevant_changes == 'true'
permissions: read-all
runs-on: spiceai-dev-runners
steps:
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6
with:
fetch-depth: 1

- name: Set up Rust
uses: ./.github/actions/setup-rust

- name: Download snapshot refresh test archive
uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8
with:
name: integration-snapshot-refresh-test-archive
path: ./integration_snapshot_refresh_test

- name: Set up Nextest
uses: ./.github/actions/setup-nextest

- name: Login to ACR
uses: docker/login-action@4907a6ddec9925e35a0a9e82d7399ccc52663121 # v4.1.0
if: github.repository == 'spiceai/spiceai'
with:
registry: spiceaitestimages.azurecr.io
username: spiceai-repo-pull
password: ${{ secrets.AZCR_PASSWORD }}

- name: Run snapshot refresh integration tests
env:
AWS_EC2_METADATA_DISABLED: true
AWS_SNAPSHOT_KEY: ${{ secrets.AWS_ICEBERG_ACCESS_KEY_ID }}
AWS_SNAPSHOT_SECRET: ${{ secrets.AWS_ICEBERG_SECRET_ACCESS_KEY }}
run: |
INSTA_WORKSPACE_ROOT="${PWD}" CARGO_MANIFEST_DIR="${PWD}" cargo nextest run --workspace-remap "${PWD}" --archive-file ./integration_snapshot_refresh_test/integration_snapshot_refresh.tar.zst

test-data-components:
name: Data Components Integration Tests
needs: [build, check_changes]
Expand Down
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ datafusion-physical-optimizer = { git = "https://github.com/spiceai/datafusion.g
datafusion-spark = { git = "https://github.com/spiceai/datafusion.git", rev = "06e4b624c6073c40c7b2127ce620e281ec1979ae" } # spiceai-52.5
datafusion-substrait = { git = "https://github.com/spiceai/datafusion.git", rev = "06e4b624c6073c40c7b2127ce620e281ec1979ae" } # spiceai-52.5

datafusion-table-providers = { git = "https://github.com/datafusion-contrib/datafusion-table-providers.git", rev = "915f03870eff972dab671aa3481a3b55a289d2b9" } # spiceai-52
datafusion-table-providers = { git = "https://github.com/datafusion-contrib/datafusion-table-providers.git", rev = "97ecd0059bd49297b956b6dd51c7047547cc97e0" } # spiceai-52 with invalidate_instance / invalidate_file_instance (datafusion-contrib/datafusion-table-providers#635)

ballista-core = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "383e165a080d648c313a2530a3a53eae6077fdba" } # spiceai-52.5
ballista-executor = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "383e165a080d648c313a2530a3a53eae6077fdba" } # spiceai-52.5
Expand Down
43 changes: 43 additions & 0 deletions crates/cayenne/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,49 @@ pub trait MetadataCatalog: Send + Sync {
///
/// Returns `Ok(true)` if the table was dropped, `Ok(false)` if the table didn't exist.
async fn drop_table(&self, table_name: &str) -> CatalogResult<bool>;

/// Export the metastore rows for `dataset_name` as a portable, versioned
/// slice with path columns rewritten relative to `data_dir_anchor`.
///
/// Default implementation returns [`CatalogError::InvalidOperation`].
/// `CayenneCatalog` overrides this with a real implementation.
///
/// # Errors
///
/// Returns an error if the dataset is not present or the underlying metastore
/// query fails.
async fn export_dataset_slice(
&self,
dataset_name: &str,
data_dir_anchor: &std::path::Path,
) -> CatalogResult<crate::metastore::snapshot::DatasetMetastoreSlice> {
let _ = (dataset_name, data_dir_anchor);
Err(CatalogError::InvalidOperationNoSource {
message: "export_dataset_slice is not supported by this catalog".to_string(),
})
}

/// Atomically import a dataset slice into the metastore, replacing any
/// prior rows for the same `dataset_name`. Path columns are re-anchored
/// at `data_dir_anchor`.
///
/// Default implementation returns [`CatalogError::InvalidOperation`].
/// `CayenneCatalog` overrides this with a real implementation.
///
/// # Errors
///
/// Returns an error if the slice format is unsupported, the engine identifier
/// mismatches, or any DML in the underlying transaction fails.
async fn import_dataset_slice(
&self,
slice: &crate::metastore::snapshot::DatasetMetastoreSlice,
data_dir_anchor: &std::path::Path,
) -> CatalogResult<()> {
let _ = (slice, data_dir_anchor);
Err(CatalogError::InvalidOperationNoSource {
message: "import_dataset_slice is not supported by this catalog".to_string(),
})
}
}

/// Factory trait for creating catalog instances.
Expand Down
32 changes: 32 additions & 0 deletions crates/cayenne/src/cayenne_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,38 @@ impl MetadataCatalog for CayenneCatalog {

Ok(true)
}

async fn export_dataset_slice(
&self,
dataset_name: &str,
data_dir_anchor: &std::path::Path,
) -> CatalogResult<crate::metastore::snapshot::DatasetMetastoreSlice> {
match &self.metastore {
MetastoreImpl::Sqlite(m) => {
crate::metastore::snapshot::export_dataset(m, dataset_name, data_dir_anchor).await
}
#[cfg(feature = "turso")]
MetastoreImpl::Turso(m) => {
crate::metastore::snapshot::export_dataset(m, dataset_name, data_dir_anchor).await
}
}
}

async fn import_dataset_slice(
&self,
slice: &crate::metastore::snapshot::DatasetMetastoreSlice,
data_dir_anchor: &std::path::Path,
) -> CatalogResult<()> {
match &self.metastore {
MetastoreImpl::Sqlite(m) => {
crate::metastore::snapshot::import_dataset(m, slice, data_dir_anchor).await
}
#[cfg(feature = "turso")]
MetastoreImpl::Turso(m) => {
crate::metastore::snapshot::import_dataset(m, slice, data_dir_anchor).await
}
}
}
}

fn is_retryable_write_conflict(error: &CatalogError) -> bool {
Expand Down
1 change: 1 addition & 0 deletions crates/cayenne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub(crate) mod schema;
pub mod stats;

pub use catalog::MetadataCatalog;
pub use catalog::{CatalogError, CatalogResult};
pub use catalog_provider::{
CayenneCatalogProvider, CayenneCatalogProviderConfig, CayenneSchemaProvider,
};
Expand Down
9 changes: 9 additions & 0 deletions crates/cayenne/src/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
//! that can be used to store Cayenne metadata. This allows swapping between `SQLite`,
//! Turso, or other storage implementations.

pub mod snapshot;
pub mod sqlite;

#[cfg(feature = "turso")]
Expand Down Expand Up @@ -281,6 +282,14 @@ impl<T: Into<MetastoreValue>> From<Option<T>> for MetastoreValue {

/// A row returned from a query.
pub trait MetastoreRow: Send {
/// Get the raw `MetastoreValue` for a column by index. Used by
/// generic export/import logic that does not know the column types.
///
/// # Errors
///
/// Returns an error if the column index is out of bounds.
fn get_value(&self, index: usize) -> CatalogResult<MetastoreValue>;

/// Get an i64 value from the row by column index.
///
/// # Errors
Expand Down
Loading
Loading