Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions .github/workflows/testoperator_dispatch_htap.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
name: testoperator dispatch htap

on:
workflow_dispatch:
inputs:
spiced_commit:
description: 'An optional commit hash to use for spiced'
required: false
type: string
scale_factor:
description: 'Scale factor to run'
required: true
type: choice
options:
- 'sf1'
- 'sf10'
- 'sf100'
- 'sf1000'

jobs:
dispatch-htap:
name: Dispatch HTAP - ${{ github.event.inputs.scale_factor }}
runs-on: spiceai-dev-runners
concurrency:
group: testoperator-dispatch-htap-${{ github.event.ref }}
cancel-in-progress: false
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
persist-credentials: false

- name: Install MinIO
uses: ./.github/actions/setup-minio
with:
minio_endpoint: ${{ secrets.TEST_MINIO_ENDPOINT }}
minio_access_key: ${{ secrets.TEST_MINIO_ACCESS_KEY }}
minio_secret_key: ${{ secrets.TEST_MINIO_SECRET_KEY }}

- name: Setup spiced
uses: ./.github/actions/setup-spiced
id: setup-spiced
with:
spiced_commit: ${{ github.event.inputs.spiced_commit }}
ref: ${{ github.event.ref }}

- name: Display spiced commit
run: echo "SPICED_COMMIT=${{ steps.setup-spiced.outputs.SPICED_COMMIT }}"

- name: Build Testoperator
uses: ./.github/actions/build-testoperator
with:
minio_endpoint: ${{ secrets.TEST_MINIO_ENDPOINT }}
minio_access_key: ${{ secrets.TEST_MINIO_ACCESS_KEY }}
minio_secret_key: ${{ secrets.TEST_MINIO_SECRET_KEY }}

- name: Build spicepod validator
id: build-spicepod-validator
uses: ./.github/actions/build-spicepod-validator
with:
minio_endpoint: ${{ secrets.TEST_MINIO_ENDPOINT }}
minio_access_key: ${{ secrets.TEST_MINIO_ACCESS_KEY }}
minio_secret_key: ${{ secrets.TEST_MINIO_SECRET_KEY }}

- name: Set spicepod validator path
run: echo "SPICEPOD_VALIDATOR=${{ steps.build-spicepod-validator.outputs.validator-path }}" >> $GITHUB_ENV

- name: Validate spicepods - CH-BenCHmark
run: |
shopt -s globstar nullglob
for file in ./test/spicepods/chbench/**/*.yaml; do
echo "Validating $file"
"$SPICEPOD_VALIDATOR" "$file"
done

- name: Dispatch Testoperator - HTAP - CH-BenCHmark - ${{ github.event.inputs.scale_factor }}
run: |
testoperator dispatch ./tools/testoperator/dispatch/chbench/${{ github.event.inputs.scale_factor }} \
--workflow htap
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SPICED_COMMIT: ${{ steps.setup-spiced.outputs.SPICED_COMMIT }}
WORKFLOW_COMMIT: ${{ github.event.ref }}
10 changes: 7 additions & 3 deletions bin/spice/src/commands/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,13 @@ EXAMPLES
# Add a SQL view
spice view add recent_orders --sql "select * from orders limit 100"

# Add an MCP-backed tool with a secret-bound token
spice tool add lookup --from mcp:server \
--env TOKEN='${ secrets:TOKEN }'
# Add an HTTP MCP-backed tool with a secret-bound bearer token
spice tool add lookup --from mcp:https://example.com/v1/mcp \
--param mcp_auth_token='${ secrets:TOKEN }'

# Or pass custom HTTP headers using the same format as HTTP datasets
spice tool add lookup --from mcp:https://example.com/v1/mcp \
--param mcp_headers='X-API-Key: ${ secrets:API_KEY }'

# Reference an external component definition file
spice model add --ref models/llm.yaml
Expand Down
1 change: 1 addition & 0 deletions crates/cayenne/tests/small_files_compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ async fn compaction_collapses_tiny_protected_snapshots(
let config = VortexConfig {
target_vortex_file_size_mb: 128,
compaction_trigger_files: 4,
compaction_trigger_protected_snapshots: 4,
compaction_background_interval_ms: 0,
..Default::default()
};
Expand Down
1 change: 0 additions & 1 deletion crates/runtime/src/dataaccelerator/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ pub(crate) fn create_factory() -> DuckDBTableProviderFactory {
.with_function_support(deny_spice_functions_for_duckdb().as_ref().clone())
}

pub(crate) const DEFAULT_MIN_IDLE_CONNECTIONS: u32 = 10;
pub(crate) const DEFAULT_CONNECTION_POOL_SIZE: u32 = 10;
pub(crate) const DEFAULT_EBS_CONNECTION_POOL_SIZE: u32 = 4;
pub(crate) const SPICE_ACCELERATOR_METADATA_KEY: &str = "spice.accelerator";
Expand Down
1 change: 0 additions & 1 deletion crates/runtime/src/dataaccelerator/partitioned_duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use super::{
use crate::{
component::dataset::acceleration::{Engine, Mode},
dataaccelerator::{FilePathError, storage::resolve_acceleration_storage_async},
datafusion::{dialect::new_duckdb_dialect, udf::deny_spice_functions_for_duckdb},
parameters::ParameterSpec,
register_data_accelerator, spice_data_base_path,
};
Expand Down
6 changes: 3 additions & 3 deletions crates/runtime/src/datafusion/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,12 +1132,12 @@ mod tests {
.build();

let state = df.ctx.state();
let rule_names: Vec<&str> = state.optimizers().iter().map(|r| r.name()).collect();

assert!(
!rule_names
!state
.optimizers()
.iter()
.any(|name| *name == "cayenne_propagate_filter_across_equi_join_keys"),
.any(|r| r.name() == "cayenne_propagate_filter_across_equi_join_keys"),
"Cayenne logical filter propagation should be disabled by default"
);
}
Expand Down
20 changes: 17 additions & 3 deletions crates/runtime/src/tools/mcp/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ use rmcp::{
},
serve_client,
service::{RunningService, ServiceError},
transport::{ConfigureCommandExt, StreamableHttpClientTransport, TokioChildProcess},
transport::{
ConfigureCommandExt, StreamableHttpClientTransport, TokioChildProcess,
streamable_http_client::StreamableHttpClientTransportConfig,
},
};
use secrecy::ExposeSecret;
use snafu::ResultExt;
use std::{
sync::{Arc, LazyLock},
Expand Down Expand Up @@ -189,7 +193,11 @@ impl McpToolCatalog {
.context(UnderlyingTransportSnafu)?,
))
}
MCPConfig::StreamableHttp { url } => {
MCPConfig::StreamableHttp {
url,
auth_token,
headers,
} => {
// Security: Validate URL scheme (only https allowed, http for localhost testing)
if url.scheme() != "https" && url.scheme() != "http" {
return Err(Error::CouldNotConstructTool {
Expand All @@ -211,7 +219,13 @@ impl McpToolCatalog {
);
}

let transport = StreamableHttpClientTransport::from_uri(url.to_string());
let mut transport_config =
StreamableHttpClientTransportConfig::with_uri(url.to_string())
.custom_headers(headers.clone());
if let Some(auth_token) = auth_token {
transport_config = transport_config.auth_header(auth_token.expose_secret());
}
let transport = StreamableHttpClientTransport::from_config(transport_config);

let client_info = InitializeRequestParams::new(
ClientCapabilities::default(),
Expand Down
161 changes: 158 additions & 3 deletions crates/runtime/src/tools/mcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod tool;

use std::{collections::HashMap, str::FromStr};

use http::{HeaderName, HeaderValue, header::AUTHORIZATION};
use rmcp::ErrorData as McpError;
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -53,6 +54,9 @@ pub enum Error {

pub type Result<T, E = Error> = std::result::Result<T, E>;

const MCP_AUTH_TOKEN_PARAM: &str = "mcp_auth_token";
const MCP_HEADERS_PARAM: &str = "mcp_headers";

#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum MCPType {
Expand All @@ -74,7 +78,7 @@ impl FromStr for MCPType {
}
}

#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[derive(Clone)]
pub(crate) enum MCPConfig {
Stdio {
command: String,
Expand All @@ -83,6 +87,8 @@ pub(crate) enum MCPConfig {
},
StreamableHttp {
url: url::Url,
auth_token: Option<SecretString>,
headers: HashMap<HeaderName, HeaderValue>,
},
}
impl MCPConfig {
Expand All @@ -108,11 +114,66 @@ impl MCPConfig {

Self::Stdio { command, args, env }
}
MCPType::StreamableHttp(url) => Self::StreamableHttp { url },
MCPType::StreamableHttp(url) => {
let auth_token = params.get(MCP_AUTH_TOKEN_PARAM).cloned();
let mut headers = parse_custom_headers(params);
if auth_token.is_some() && headers.remove(&AUTHORIZATION).is_some() {
tracing::warn!(
"Ignoring 'authorization' header from MCP custom headers because '{MCP_AUTH_TOKEN_PARAM}' is configured"
);
}
Self::StreamableHttp {
url,
auth_token,
headers,
}
}
}
}
}

fn parse_custom_headers(
params: &HashMap<String, SecretString>,
) -> HashMap<HeaderName, HeaderValue> {
let mut custom_headers = HashMap::new();
let Some(headers) = params.get(MCP_HEADERS_PARAM) else {
return custom_headers;
};
let param_name = MCP_HEADERS_PARAM;

// Same UX as the HTTP connector's `http_headers` parameter:
// `Header1: Value1, Header2: Value2` (or semicolon-delimited).
let headers_str = headers.expose_secret();
let delimiter = if headers_str.contains(';') { ';' } else { ',' };
for header in headers_str.split(delimiter) {
let Some((name, value)) = header.split_once(':') else {
tracing::warn!(
"Malformed MCP HTTP header in '{param_name}'. Expected format 'Name: Value'. Skipping this header."
);
continue;
};

let name = name.trim();
let value = value.trim();
let Ok(header_name) = HeaderName::try_from(name) else {
tracing::warn!(
"Invalid MCP HTTP header name in '{param_name}': '{name}'. Skipping this header."
);
continue;
};
let Ok(mut header_value) = HeaderValue::from_str(value) else {
tracing::warn!(
"Invalid MCP HTTP header value for '{name}' in '{param_name}'. Skipping this header."
);
continue;
};
header_value.set_sensitive(true);
custom_headers.insert(header_name, header_value);
}

custom_headers
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -180,7 +241,101 @@ mod tests {
let mcp_type = MCPType::StreamableHttp(url.clone());
let cfg = MCPConfig::from_type(&mcp_type, &HashMap::new(), &HashMap::new());
match cfg {
MCPConfig::StreamableHttp { url: u } => assert_eq!(u, url),
MCPConfig::StreamableHttp {
url: u,
auth_token,
headers,
} => {
assert_eq!(u, url);
assert!(auth_token.is_none());
assert!(headers.is_empty());
}
MCPConfig::Stdio { .. } => panic!("expected https config"),
}
}

#[test]
fn mcp_config_from_https_collects_auth_token() {
let url = url::Url::parse("https://example.com/v1/mcp").expect("valid url");
let mcp_type = MCPType::StreamableHttp(url);
let mut params = HashMap::new();
params.insert(
"mcp_auth_token".to_string(),
SecretString::from("test-api-key"),
);

let cfg = MCPConfig::from_type(&mcp_type, &params, &HashMap::new());
match cfg {
MCPConfig::StreamableHttp {
auth_token,
headers,
..
} => {
assert_eq!(
auth_token.as_ref().map(ExposeSecret::expose_secret),
Some("test-api-key")
);
assert!(headers.is_empty());
}
MCPConfig::Stdio { .. } => panic!("expected https config"),
}
}

#[test]
fn mcp_config_from_https_collects_custom_headers() {
let url = url::Url::parse("https://example.com/v1/mcp").expect("valid url");
let mcp_type = MCPType::StreamableHttp(url);
let mut params = HashMap::new();
params.insert(
MCP_HEADERS_PARAM.to_string(),
SecretString::from("X-API-Key: test-api-key, X-Tenant: acme"),
);

let cfg = MCPConfig::from_type(&mcp_type, &params, &HashMap::new());
match cfg {
MCPConfig::StreamableHttp { headers, .. } => {
assert_eq!(
headers
.get(&HeaderName::from_static("x-api-key"))
.and_then(|value| value.to_str().ok()),
Some("test-api-key")
);
assert_eq!(
headers
.get(&HeaderName::from_static("x-tenant"))
.and_then(|value| value.to_str().ok()),
Some("acme")
);
}
MCPConfig::Stdio { .. } => panic!("expected https config"),
}
}

#[test]
fn mcp_auth_token_removes_custom_authorization_header() {
let url = url::Url::parse("https://example.com/v1/mcp").expect("valid url");
let mcp_type = MCPType::StreamableHttp(url);
let mut params = HashMap::new();
params.insert(
MCP_AUTH_TOKEN_PARAM.to_string(),
SecretString::from("test-api-key"),
);
params.insert(
MCP_HEADERS_PARAM.to_string(),
SecretString::from("Authorization: Basic abc, X-Tenant: acme"),
);

let cfg = MCPConfig::from_type(&mcp_type, &params, &HashMap::new());
match cfg {
MCPConfig::StreamableHttp { headers, .. } => {
assert!(!headers.contains_key(&AUTHORIZATION));
assert_eq!(
headers
.get(&HeaderName::from_static("x-tenant"))
.and_then(|value| value.to_str().ok()),
Some("acme")
);
}
MCPConfig::Stdio { .. } => panic!("expected https config"),
}
}
Expand Down
Loading
Loading