Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/testoperator_dispatch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ on:
required: false
type: boolean
default: false
validate:
description: 'Validate results?'
required: false
type: boolean
default: false

jobs:
dispatch-tests:
Expand Down Expand Up @@ -62,7 +67,8 @@ jobs:
testoperator dispatch \
${{ github.event.inputs.test_files_path }} \
--workflow ${{ github.event.inputs.workflow_type }} \
--update-snapshots ${{ github.event.inputs.update_snapshots }}
--update-snapshots ${{ github.event.inputs.update_snapshots }} \
--validate=${{ github.event.inputs.validate }} \
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SPICED_COMMIT: ${{ steps.setup-spiced.outputs.SPICED_COMMIT }}
Expand Down
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ itertools = "0.14"
mailparse = "0.15.0"
mysql_async = { version = "0.35.1", features = ["native-tls-tls", "chrono"] }
object_store = { version = "0.11" }
odbc-api = { version = "12" }
odbc-api = { version = "12.0.1" }
opentelemetry = { version = "0.27", default-features = false, features = [
"metrics",
"trace",
Expand Down Expand Up @@ -131,7 +131,7 @@ serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9.30"
snafu = "0.8.5"
snowflake-api = { git = "https://github.com/spiceai/snowflake-rs.git", rev = "e7237c22ebb7a94e11de7d77b3225247d24b3581" } # spiceai
snowflake-api = { git = "https://github.com/spiceai/snowflake-rs.git", rev = "0cd20ce6ebc72bec9f9c98ddb7d242bf48d6b8d3" } # spiceai
ssh2 = { version = "0.9.5" }
suppaftp = { version = "5.3.1", features = ["async"] }
tempfile = "3"
Expand Down Expand Up @@ -170,8 +170,6 @@ datafusion-table-providers = { git = "https://github.com/datafusion-contrib/data

duckdb = { git = "https://github.com/spiceai/duckdb-rs.git", rev = "2e24b958e44ec7419290249e27a15f1a19703fff" } # spiceai-1.1.3-backported-arrow-54

odbc-api = { git = "https://github.com/spiceai/odbc-api.git", rev = "429b71a4644d8c4b144e5f3be5f076c8ddacd624" } # spiceai-12

rusqlite = { git = "https://github.com/spiceai/rusqlite.git", rev = "97054b6af725caf5d3e952e349746706e00d0ea5" }

# Tracking Issue: https://github.com/allan2/dotenvy/issues/113
Expand Down
9 changes: 9 additions & 0 deletions crates/test-framework/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl StatisticsCollector<BTreeMap<String, Duration>, BTreeMap<String, Vec<Durati
pub struct QueryMetrics<T: ExtendedMetrics, R: ExtendedMetrics> {
pub run_id: Uuid,
pub run_name: String,
pub spiced_version: String,
pub commit_sha: String,
pub branch_name: String,
pub test_type: TestType,
Expand Down Expand Up @@ -373,6 +374,7 @@ impl<T: ExtendedMetrics, R: ExtendedMetrics> QueryMetrics<T, R> {

let mut base_fields = vec![
Field::new("run_id", DataType::Utf8, false),
Field::new("spiced_version", DataType::Utf8, false),
Field::new("run_name", DataType::Utf8, false),
Field::new("commit_sha", DataType::Utf8, false),
Field::new("branch_name", DataType::Utf8, false),
Expand All @@ -397,6 +399,7 @@ impl<T: ExtendedMetrics, R: ExtendedMetrics> QueryMetrics<T, R> {

let mut base_fields = vec![
Field::new("run_id", DataType::Utf8, false),
Field::new("spiced_version", DataType::Utf8, false),
Field::new("started_at", DataType::Int64, false),
Field::new("finished_at", DataType::Int64, false),
Field::new("query_name", DataType::Utf8, false),
Expand Down Expand Up @@ -499,6 +502,7 @@ impl<T: ExtendedMetrics, R: ExtendedMetrics> QueryMetrics<T, R> {
#[allow(clippy::cast_possible_wrap)]
pub fn build_records(&self) -> Result<Vec<RecordBatch>> {
let run_id = vec![self.run_id.to_string(); self.metrics.len()];
let spiced_version = vec![self.spiced_version.clone(); self.metrics.len()];

let started_at = extract_metric_values!(self.metrics, started_at, as_i64);
let finished_at = extract_metric_values!(self.metrics, finished_at, as_i64);
Expand All @@ -520,6 +524,7 @@ impl<T: ExtendedMetrics, R: ExtendedMetrics> QueryMetrics<T, R> {

let mut columns: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(run_id)),
Arc::new(StringArray::from(spiced_version)),
Arc::new(Int64Array::from(started_at)),
Arc::new(Int64Array::from(finished_at)),
Arc::new(StringArray::from(query_name)),
Expand Down Expand Up @@ -569,6 +574,7 @@ impl<T: ExtendedMetrics, R: ExtendedMetrics> QueryMetrics<T, R> {
/// The record batch is a single row, representing the run as a whole - which can pass or fail separately from individual queries
pub fn build_run(&self, status: QueryStatus) -> Result<Vec<RecordBatch>> {
let run_id = vec![self.run_id.to_string()];
let spiced_version = vec![self.spiced_version.to_string()];
let run_name = vec![self.run_name.clone()];
let commit_sha = vec![self.commit_sha.clone()];
let branch_name = vec![self.branch_name.clone()];
Expand Down Expand Up @@ -602,6 +608,7 @@ impl<T: ExtendedMetrics, R: ExtendedMetrics> QueryMetrics<T, R> {

let mut columns: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(run_id)),
Arc::new(StringArray::from(spiced_version)),
Arc::new(StringArray::from(run_name)),
Arc::new(StringArray::from(commit_sha)),
Arc::new(StringArray::from(branch_name)),
Expand Down Expand Up @@ -650,11 +657,13 @@ pub trait MetricCollector<T: ExtendedMetrics, R: ExtendedMetrics> {
fn start_time(&self) -> SystemTime;
fn end_time(&self) -> SystemTime;
fn name(&self) -> String;
fn spiced_version(&self) -> Result<&str>;
fn metrics(&self) -> Result<Vec<QueryMetric<T>>>;
fn collect(&self, test_type: TestType) -> Result<QueryMetrics<T, R>> {
Ok(QueryMetrics {
run_id: uuid::Uuid::new_v4(),
run_name: self.name(),
spiced_version: self.spiced_version()?.to_string(),
commit_sha: git::get_commit_sha(),
branch_name: git::get_branch_name(),
test_type,
Expand Down
48 changes: 47 additions & 1 deletion crates/test-framework/src/spiced/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
*/

use std::{
fmt::Display,
path::PathBuf,
process::{Child, Command},
time::Duration,
Expand All @@ -28,9 +29,25 @@ use tempfile::TempDir;

use crate::{process::Process, utils::wait_until_true};

#[derive(Debug, Clone)]
pub struct SpicedVersion(String);
impl SpicedVersion {
#[must_use]
pub fn new(version: String) -> Self {
Self(version)
}
}

impl Display for SpicedVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

pub struct SpicedInstance {
child: Child,
tempdir: TempDir,
version: SpicedVersion,
}

pub struct StartRequest {
Expand Down Expand Up @@ -113,13 +130,42 @@ impl SpicedInstance {

let tempdir = start_request.tempdir;

// Get spiced version
let version_cmd = Command::new(start_request.spiced_path.clone())
.arg("--version")
.output()?;

if !version_cmd.status.success() {
anyhow::bail!(
"Failed to get spiced version: {}",
String::from_utf8_lossy(&version_cmd.stderr)
);
}

let version = String::from_utf8_lossy(&version_cmd.stdout).to_string();
// take just the v1.0.0 part of the version
let version = match (version.contains('-'), version.contains('+')) {
(true, _) => version.split('-').next().unwrap_or(&version).to_string(),
(false, true) => version.split('+').next().unwrap_or(&version).to_string(),
(false, false) => version,
};

// Start the spiced instance
let mut cmd = Command::new(start_request.spiced_path);
cmd.current_dir(tempdir.path());
cmd.arg("--telemetry-enabled=false");
let child = cmd.spawn()?;

Ok(Self { child, tempdir })
Ok(Self {
child,
tempdir,
version: SpicedVersion::new(version),
})
}

#[must_use]
pub fn version(&self) -> &str {
self.version.0.as_str()
}

#[must_use]
Expand Down
18 changes: 18 additions & 0 deletions crates/test-framework/src/spicetest/datasets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,15 @@ impl MetricCollector<DatasetMetrics, NoExtendedMetrics> for SpiceTest<Completed>
self.name.clone()
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
}

fn metrics(&self) -> Result<Vec<QueryMetric<DatasetMetrics>>> {
self.get_query_durations()
.iter()
Expand Down Expand Up @@ -429,6 +438,15 @@ impl MetricCollector<NoExtendedMetrics, ThroughputMetrics> for SpiceTest<Complet
self.name.clone()
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
}

fn metrics(&self) -> Result<Vec<QueryMetric<NoExtendedMetrics>>> {
self.get_query_durations()
.iter()
Expand Down
9 changes: 9 additions & 0 deletions crates/test-framework/src/spicetest/http/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,15 @@ impl MetricCollector<NoExtendedMetrics, NoExtendedMetrics> for SpiceTest<Complet
self.name.clone()
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
}

fn metrics(&self) -> Result<Vec<QueryMetric<NoExtendedMetrics>>> {
self.state
.result
Expand Down
9 changes: 9 additions & 0 deletions crates/test-framework/src/spicetest/http/overhead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ impl MetricCollector<NoExtendedMetrics, NoExtendedMetrics> for SpiceTest<Complet
self.name.clone()
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
}

fn metrics(&self) -> Result<Vec<QueryMetric<NoExtendedMetrics>>> {
let baseline = QueryMetric::new_from_durations(
"baseline",
Expand Down
9 changes: 9 additions & 0 deletions crates/test-framework/src/spicetest/vector_search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ impl MetricCollector<SearchScoreMetric, NoExtendedMetrics> for SpiceTest<Complet
self.name.clone()
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
}

fn metrics(&self) -> Result<Vec<QueryMetric<SearchScoreMetric>>> {
self.state
.results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ tests:
spicepod_path: ./test/spicepods/tpch/accelerated/file_arrow.yaml
query_set: tpch
runner_type: spiceai-runners
validate: true
throughput:
spicepod_path: ./test/spicepods/tpch/accelerated/file_arrow.yaml
query_set: tpch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ tests:
spicepod_path: ./test/spicepods/tpch/accelerated/file_duckdb_file.yaml
query_set: tpch
runner_type: spiceai-runners
validate: true
throughput:
spicepod_path: ./test/spicepods/tpch/accelerated/file_duckdb_file.yaml
query_set: tpch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ tests:
spicepod_path: ./test/spicepods/tpch/accelerated/file_duckdb_memory.yaml
query_set: tpch
runner_type: spiceai-runners
validate: true
throughput:
spicepod_path: ./test/spicepods/tpch/accelerated/file_duckdb_memory.yaml
query_set: tpch
Expand Down
1 change: 1 addition & 0 deletions tools/testoperator/dispatch/tpch/duckdb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ tests:
spicepod_path: ./test/spicepods/tpch/duckdb.yaml
query_set: tpch
runner_type: spiceai-runners
validate: true
throughput:
spicepod_path: ./test/spicepods/tpch/duckdb.yaml
query_set: tpch
Expand Down
1 change: 1 addition & 0 deletions tools/testoperator/dispatch/tpch/file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ tests:
spicepod_path: ./test/spicepods/tpch/file.yaml
query_set: tpch
runner_type: spiceai-runners
validate: true
throughput:
spicepod_path: ./test/spicepods/tpch/file.yaml
query_set: tpch
Expand Down
11 changes: 11 additions & 0 deletions tools/testoperator/src/args/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub struct DispatchArgs {

#[arg(long, default_value = "false", action = ArgAction::Set)]
pub(crate) update_snapshots: bool,

#[arg(long, action = ArgAction::Set, default_value_t = false, default_missing_value = "true", num_args = 0..=1, require_equals = false)]
pub(crate) validate: bool,
}

#[derive(Debug, Copy, Clone, ValueEnum)]
Expand Down Expand Up @@ -97,6 +100,8 @@ pub struct BenchArgs {
pub runner_type: RunnerType,
#[serde(skip_serializing_if = "Option::is_none")]
pub update_snapshots: Option<UpdateSnapshots>,
#[serde(skip_serializing_if = "Option::is_none")]
pub validate: Option<bool>,
}

impl BenchArgs {
Expand All @@ -105,6 +110,12 @@ impl BenchArgs {
self.update_snapshots = Some(update_snapshots);
self
}

#[must_use]
pub fn with_validate(mut self, validate: bool) -> Self {
self.validate = Some(validate);
self
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down
3 changes: 2 additions & 1 deletion tools/testoperator/src/commands/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ pub async fn dispatch(args: DispatchArgs) -> Result<()> {
serde_json::json!(WorkflowArgs {
specific_args: bench
.clone()
.with_update_snapshots(args.update_snapshots.into()),
.with_update_snapshots(args.update_snapshots.into())
.with_validate(args.validate),
spiced_commit: args.spiced_commit.clone(),
})
}
Expand Down
Loading