Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Makefile.local
*.parquet
spicepod.yaml
!test/tpc-bench/tpch-spicepod/spicepod.yaml
!examples/cosmosdb-connector/spicepod.yaml

target/

Expand Down
27 changes: 23 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ check-rust-features:
cargo check $(CARGO_PROFILE) --no-default-features --features delta_lake
cargo check $(CARGO_PROFILE) --no-default-features --features dremio
cargo check $(CARGO_PROFILE) --no-default-features --features clickhouse
cargo check $(CARGO_PROFILE) --no-default-features --features cosmosdb
cargo check $(CARGO_PROFILE) --no-default-features --features debezium
cargo check $(CARGO_PROFILE) --no-default-features --features runtime/openapi
cargo check $(CARGO_PROFILE) --no-default-features --features dynamodb
Expand Down Expand Up @@ -253,7 +254,7 @@ display-deps:
# Default install includes models. Use -data suffix variants to build without models.
# Data-only features (default features minus models)
# Note: postgres-accel enables the PostgreSQL data accelerator (separate from postgres connector)
SPICED_DATA_FEATURES := duckdb,postgres,postgres-accel,sqlite,mysql,flightsql,delta_lake,databricks,dremio,clickhouse,sharepoint,snapshots,snowflake,spark,ftp,sftp,debezium,kafka,anonymous_telemetry,mssql,dynamodb,imap,alloc-snmalloc,oracle,runtime/s3_vectors,mongodb,iceberg-write,turso,smb,pingora,scylladb
SPICED_DATA_FEATURES := duckdb,postgres,postgres-accel,sqlite,mysql,flightsql,delta_lake,databricks,dremio,clickhouse,cosmosdb,sharepoint,snapshots,snowflake,spark,ftp,sftp,debezium,kafka,anonymous_telemetry,mssql,dynamodb,imap,alloc-snmalloc,oracle,runtime/s3_vectors,mongodb,iceberg-write,turso,smb,pingora,scylladb

.PHONY: install
install: build
Expand Down
2 changes: 2 additions & 0 deletions bin/spiced/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ alloc-system = []
anonymous_telemetry = ["telemetry/anonymous_telemetry"]
aws-secrets-manager = ["runtime/aws-secrets-manager"]
clickhouse = ["connector-clickhouse", "runtime/clickhouse"]
cosmosdb = ["runtime/cosmosdb"]
cuda = ["runtime/cuda"]
databricks = ["connector-databricks", "runtime/databricks"]
debezium = ["runtime/debezium"]
Expand All @@ -88,6 +89,7 @@ default = [
"databricks",
"dremio",
"clickhouse",
"cosmosdb",
"sharepoint",
"snapshots",
"snowflake",
Expand Down
30 changes: 30 additions & 0 deletions crates/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ pub use utils::filter_transient_error_responses;
pub use utils::get_logical_plan_input_tables;
pub use utils::to_cached_record_batch_stream;

/// Stable [`datafusion::logical_expr::UserDefinedLogicalNodeCore::name`] values for
/// every Spice logical-plan extension node that performs (or dispatches) a write,
/// a schema mutation, or any other side-effect that must not be reachable via a
/// read-only SQL path and must not be served from or populated into the SQL
/// results cache.
///
/// Keep this list in sync with:
/// - `datafusion_ddl::DdlExtensionNode` → `"DdlExtension"`
/// - `datafusion_dml::DmlExtensionNode` → `"DmlExtension"`
/// - `cayenne::ddl::logical_nodes::CayenneMergeNode` → `"CayenneMerge"`
/// - `runtime::datafusion::cayenne_ddl::logical_nodes::DistributedCayenne{Insert,Update,Delete}Node`
/// → `"CayenneInsert"` / `"CayenneUpdate"` / `"CayenneDelete"` (they reuse the
/// non-distributed names by design)
/// - `runtime::datafusion::cayenne_ddl::logical_nodes::DistributedCayenneMergeNode`
/// → `"DistributedCayenneMerge"`
pub const WRITE_CAPABLE_EXTENSION_NAMES: &[&str] = &[
"DdlExtension",
"DmlExtension",
"CayenneInsert",
"CayenneUpdate",
"CayenneDelete",
"CayenneMerge",
"DistributedCayenneMerge",
];

use crate::result::embeddings::CachedEmbeddingResult;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -551,6 +576,11 @@ impl QueryResultsCacheProvider {
| LogicalPlan::Dml(..)
| LogicalPlan::Copy { .. }
| LogicalPlan::Statement(..) => return false,
LogicalPlan::Extension(ext)
if WRITE_CAPABLE_EXTENSION_NAMES.contains(&ext.node.name()) =>
{
return false;
}
_ => {}
}

Expand Down
6 changes: 6 additions & 0 deletions crates/data_components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ aws-sdk-credential-bridge = { path = "../aws-sdk-credential-bridge" }
aws-sdk-dynamodb = { workspace = true, optional = true }
aws-sdk-dynamodbstreams = { workspace = true, optional = true }
aws-smithy-async = { workspace = true, optional = true }
azure_core = { version = "0.31.0", optional = true }
azure_data_cosmos = { version = "0.30.0", default-features = false, features = [
"hmac_rust",
"key_auth",
], optional = true }
base64.workspace = true
bb8 = { workspace = true, optional = true }
bb8-oracle = { version = "0.3", features = ["chrono"], optional = true }
Expand Down Expand Up @@ -121,6 +126,7 @@ rdkafka = { workspace = true, features = ["cmake-build"], optional = true }
bench = [] # Feature for benchmarking that exposes internal functions
adbc = ["datafusion-table-providers/adbc-federation"]
clickhouse = ["dep:clickhouse-rs", "datafusion-table-providers/federation"]
cosmosdb = ["dep:azure_core", "dep:azure_data_cosmos"]
databricks = [
"delta_lake",
"spark_connect",
Expand Down
134 changes: 134 additions & 0 deletions crates/data_components/src/cosmosdb/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2024-2026 The Spice.ai OSS Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

//! Build a [`ContainerClient`] for a specific `(database, container)` from a
//! user-supplied credential. Each [`CosmosDBTableProvider`] is pinned to one
//! container, so we construct the `ContainerClient` once at connector setup
//! and reuse it for schema inference and every subsequent scan.
//!
//! [`CosmosDBTableProvider`]: super::provider::CosmosDBTableProvider

use std::sync::Arc;

use azure_core::credentials::Secret;
use azure_data_cosmos::{ConnectionString, CosmosClient, clients::ContainerClient};
use snafu::ResultExt;

use super::{BuildClientSnafu, Error, InvalidConnectionStringSnafu};

/// Credential used to build a Cosmos client.
///
/// Carries account keys / full connection strings; the manual `Debug` below
/// redacts both so tracing / panic dumps never surface them.
#[derive(Clone)]
pub enum CosmosDBCredential {
/// An `AccountEndpoint=https://...;AccountKey=...;` connection string.
ConnectionString(String),
/// Explicit account endpoint URL plus primary/secondary key.
Key { endpoint: String, key: String },
}

impl std::fmt::Debug for CosmosDBCredential {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ConnectionString(_) => f
.debug_tuple("ConnectionString")
.field(&"<redacted>")
.finish(),
Self::Key { endpoint, .. } => f
.debug_struct("Key")
.field("endpoint", endpoint)
.field("key", &"<redacted>")
.finish(),
}
}
}

/// Build a [`ContainerClient`] for the given `(database, container)` pair,
/// returning the account endpoint alongside it (needed for resilience keying
/// and error messages).
///
/// # Errors
/// Returns an error if the credential is malformed or the underlying Azure
/// SDK client cannot be constructed.
pub fn build_container_client(
credential: CosmosDBCredential,
database: &str,
container: &str,
) -> Result<(ContainerClient, Arc<str>), Error> {
let (client, endpoint) = match credential {
CosmosDBCredential::ConnectionString(conn_str) => {
let parsed: ConnectionString = conn_str
.parse()
.map_err(boxed_err)
.context(InvalidConnectionStringSnafu)?;
let endpoint = parsed.account_endpoint;

let client = CosmosClient::with_connection_string(Secret::from(conn_str), None)
.map_err(boxed_err)
.context(BuildClientSnafu {
endpoint: endpoint.clone(),
})?;

(client, endpoint)
}
CosmosDBCredential::Key { endpoint, key } => {
let client = CosmosClient::with_key(&endpoint, Secret::from(key), None)
.map_err(boxed_err)
.context(BuildClientSnafu {
endpoint: endpoint.clone(),
})?;

(client, endpoint)
}
};

let container_client = client.database_client(database).container_client(container);

Ok((container_client, Arc::from(normalize_endpoint(&endpoint))))
}

/// Normalize a Cosmos DB account endpoint so benign URL-formatting differences
/// (trailing slash, casing) don't split the shared per-account concurrency
/// budget across datasets that target the same account.
fn normalize_endpoint(endpoint: &str) -> String {
endpoint.trim().trim_end_matches('/').to_ascii_lowercase()
}

fn boxed_err<E>(e: E) -> Box<dyn std::error::Error + Send + Sync>
where
E: std::error::Error + Send + Sync + 'static,
{
Box::new(e)
}

#[cfg(test)]
mod tests {
use super::normalize_endpoint;

#[test]
fn normalize_endpoint_collapses_benign_variants() {
let canonical = "https://myaccount.documents.azure.com:443";
for variant in [
"https://myaccount.documents.azure.com:443",
"https://myaccount.documents.azure.com:443/",
"https://MYACCOUNT.documents.azure.com:443/",
" https://myaccount.documents.azure.com:443/ ",
] {
assert_eq!(normalize_endpoint(variant), canonical, "input: {variant:?}");
}
}
}
Loading
Loading