Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/testoperator_run_throughput.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ on:
- 'postgres-catalog'
- 'mysql-catalog'
- 'mssql-catalog'
- 'turso'
- 'bigquery'
- 'scylladb'
ready_wait:
description: 'How long (in seconds) to wait for spiced to start'
required: true
Expand Down Expand Up @@ -142,6 +145,22 @@ jobs:
echo "MYSQL_DB=tpch_sf1" >> $GITHUB_ENV
fi

- name: Install ADBC BigQuery Driver
if: ${{ contains(github.event.inputs.spicepod_path, 'adbc[bigquery]') }}
uses: ./.github/actions/setup-adbc-bigquery

- name: Set up Python
if: ${{ contains(github.event.inputs.spicepod_path, 'scylladb') }}
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6
with:
python-version: '3.12'

- name: Setup ScyllaDB
if: ${{ contains(github.event.inputs.spicepod_path, 'scylladb') }}
uses: ./.github/actions/setup-scylladb
with:
keyspace: tpch_sf1

- name: Run the throughput test - ${{ github.event.inputs.spicepod_path }}
if: ${{ github.event.inputs.query_overrides == '' }}
run: |
Expand Down Expand Up @@ -199,6 +218,10 @@ jobs:
SPICEAI_BENCHMARK_METRICS_KEY: ${{ secrets.SPICEAI_BENCHMARK_METRICS_KEY }}
MONGODB_HOST: ${{ vars.TEST_MONGODB_HOST }}
MONGODB_PASSWORD: ${{ secrets.TEST_MONGODB_PASSWORD }}
BIGQUERY_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_SERVICE_ACCOUNT_JSON }}
SCYLLADB_HOST: ${{ contains(github.event.inputs.spicepod_path, 'scylladb') && '127.0.0.1' || '' }}
SCYLLADB_PORT: ${{ contains(github.event.inputs.spicepod_path, 'scylladb') && '9042' || '' }}
SCYLLADB_KEYSPACE: ${{ contains(github.event.inputs.spicepod_path, 'scylladb') && 'tpch_sf1' || '' }}

- name: Run the throughput test (with overrides) - ${{ github.event.inputs.spicepod_path }}
if: ${{ github.event.inputs.query_overrides != '' }}
Expand Down Expand Up @@ -258,3 +281,7 @@ jobs:
SPICEAI_BENCHMARK_METRICS_KEY: ${{ secrets.SPICEAI_BENCHMARK_METRICS_KEY }}
MONGODB_HOST: ${{ vars.TEST_MONGODB_HOST }}
MONGODB_PASSWORD: ${{ secrets.TEST_MONGODB_PASSWORD }}
BIGQUERY_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_SERVICE_ACCOUNT_JSON }}
SCYLLADB_HOST: ${{ contains(github.event.inputs.spicepod_path, 'scylladb') && '127.0.0.1' || '' }}
SCYLLADB_PORT: ${{ contains(github.event.inputs.spicepod_path, 'scylladb') && '9042' || '' }}
SCYLLADB_KEYSPACE: ${{ contains(github.event.inputs.spicepod_path, 'scylladb') && 'tpch_sf1' || '' }}
38 changes: 19 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,9 @@ datafusion-substrait = { git = "https://github.com/spiceai/datafusion.git", rev

datafusion-table-providers = { git = "https://github.com/datafusion-contrib/datafusion-table-providers.git", rev = "29f66085875f1d4d7b80fc1947e68c6835302241" } # spiceai-52

ballista-core = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "47e2b4946762c834d4a11532a25cc99c9e8a0b9d" } # spiceai-52.5
ballista-executor = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "47e2b4946762c834d4a11532a25cc99c9e8a0b9d" } # spiceai-52.5
ballista-scheduler = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "47e2b4946762c834d4a11532a25cc99c9e8a0b9d" } # spiceai-52.5
ballista-core = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "07be66a8efff7e20c2abadaaba6ef62b02fc1ffc" } # spiceai-52.5
ballista-executor = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "07be66a8efff7e20c2abadaaba6ef62b02fc1ffc" } # spiceai-52.5
ballista-scheduler = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "07be66a8efff7e20c2abadaaba6ef62b02fc1ffc" } # spiceai-52.5

delta_kernel = { git = "https://github.com/spiceai/delta-kernel-rs.git", rev = "47034733a0477f72e4f6abbbf6a27d0da069860a" } # spiceai-0.18.2

Expand Down
45 changes: 24 additions & 21 deletions crates/runtime/src/accelerated_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub struct Builder {
append_stream: Option<ChangesStream>,
disable_federation: bool,
write_to_accelerator_only: bool,
write_through: bool,
dual_write: bool,
write_back: bool,
refresh_semaphore: Option<Arc<Semaphore>>,
checkpointer: Option<Arc<dyn DatasetCheckpointer>>,
Expand Down Expand Up @@ -396,7 +396,7 @@ impl Builder {
synchronize_with: None,
disable_federation: false,
write_to_accelerator_only: false,
write_through: false,
dual_write: false,
write_back: false,
initial_load_complete: false,
refresh_semaphore: None,
Expand Down Expand Up @@ -490,10 +490,12 @@ impl Builder {
self
}

/// Enable write-through mode: writes go simultaneously to both the federated source
/// Enable dual-write mode: writes go simultaneously to both the federated source
/// and the local Cayenne accelerator using staged append/commit/rollback semantics.
pub fn write_through(&mut self) -> &mut Self {
self.write_through = true;
/// Reserved for the Iceberg federated catalog cache path — not driven by the
/// user-facing `write_mode: write_through` setting.
pub fn dual_write(&mut self) -> &mut Self {
self.dual_write = true;
self
}

Expand Down Expand Up @@ -1047,14 +1049,14 @@ impl Builder {
}
}

let write_mode = if self.write_through {
WriteMode::resolve_write_through(&self.accelerator, &self.federated)?
let write_mode = if self.dual_write {
WriteMode::resolve_dual_write(&self.accelerator, &self.federated)?
} else if self.write_back {
WriteMode::WriteBack
} else if self.write_to_accelerator_only {
WriteMode::AcceleratorOnly
} else {
WriteMode::FederatedOnly
WriteMode::WriteThrough
};

Ok(AcceleratedTable {
Expand Down Expand Up @@ -1170,8 +1172,8 @@ impl AcceleratedTable {
}

#[must_use]
pub fn is_write_through(&self) -> bool {
self.write_mode.is_write_through()
pub fn is_dual_write(&self) -> bool {
self.write_mode.is_dual_write()
}

#[must_use]
Expand Down Expand Up @@ -1645,9 +1647,10 @@ impl TableProvider for AcceleratedTable {
self.refresher().set_initial_load_completed(true);
Ok(accelerated_insert_plan)
}
WriteMode::FederatedOnly => {
// Writes go to the federated source. The acceleration refresh
// mechanism will pick up the new data on its next cycle.
WriteMode::WriteThrough => {
// Writes go to the federated source synchronously. The acceleration
// refresh mechanism (CDC for refresh_mode: changes, otherwise the
// periodic refresh cycle) propagates the change to the accelerator.
let federated_table = self.federated.table_provider().await;
federated_table.insert_into(state, input, overwrite).await
}
Expand All @@ -1663,10 +1666,10 @@ impl TableProvider for AcceleratedTable {
self.schema(),
)
}
WriteMode::WriteThrough {
WriteMode::DualWrite {
cayenne_target,
federated_provider,
} => write::write_through::insert_write_through(
} => write::dual_write::insert_dual_write(
input,
overwrite,
cayenne_target.as_ref(),
Expand All @@ -1693,7 +1696,7 @@ impl TableProvider for AcceleratedTable {

match &self.write_mode {
WriteMode::AcceleratorOnly => self.accelerator.delete_from(state, filters).await,
WriteMode::FederatedOnly => {
WriteMode::WriteThrough => {
let federated_table = self.federated.table_provider().await;
federated_table.delete_from(state, filters).await
}
Expand All @@ -1706,11 +1709,11 @@ impl TableProvider for AcceleratedTable {
)
.await
}
WriteMode::WriteThrough {
WriteMode::DualWrite {
cayenne_target,
federated_provider,
} => {
write::write_through::delete_write_through(
write::dual_write::delete_dual_write(
state,
filters,
cayenne_target.as_ref(),
Expand Down Expand Up @@ -1740,7 +1743,7 @@ impl TableProvider for AcceleratedTable {
WriteMode::AcceleratorOnly => {
self.accelerator.update(state, assignments, filters).await
}
WriteMode::FederatedOnly => {
WriteMode::WriteThrough => {
let federated_table = self.federated.table_provider().await;
federated_table.update(state, assignments, filters).await
}
Expand All @@ -1754,11 +1757,11 @@ impl TableProvider for AcceleratedTable {
)
.await
}
WriteMode::WriteThrough {
WriteMode::DualWrite {
cayenne_target,
federated_provider,
} => {
write::write_through::update_write_through(
write::dual_write::update_dual_write(
state,
assignments,
filters,
Expand Down
Loading
Loading