Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/flight_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ bytes.workspace = true
futures.workspace = true
rustls-native-certs = "0.8.1"
rustls-pemfile = "2.1.3"
secrecy.workspace = true
snafu.workspace = true
tonic = { workspace = true, features = ["transport", "tls", "tls-roots"] }
12 changes: 9 additions & 3 deletions crates/flight_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use base64::prelude::BASE64_STANDARD;
use bytes::Bytes;
use futures::StreamExt;
use futures::{TryStreamExt, ready, stream};
use secrecy::ExposeSecret;
use secrecy::SecretString;
use snafu::prelude::*;
use std::error::Error as StdError;
use tonic::IntoRequest;
Expand Down Expand Up @@ -175,14 +177,14 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub enum Credentials {
UsernamePassword {
username: Arc<str>,
password: Arc<str>,
password: Arc<SecretString>,
},
Anonymous,
}

impl Credentials {
#[must_use]
pub fn new(username: &str, password: &str) -> Self {
pub fn new(username: &str, password: SecretString) -> Self {
Credentials::UsernamePassword {
username: username.into(),
password: password.into(),
Expand Down Expand Up @@ -525,7 +527,11 @@ impl FlightClient {
payload: Bytes::default(),
};
let mut req = tonic::Request::new(stream::iter(vec![cmd]));
let val = BASE64_STANDARD.encode(format!("{username}:{password}"));
let val = BASE64_STANDARD.encode(format!(
"{username}:{password}",
password = password.expose_secret()
));

let val = format!("Basic {val}")
.parse()
.context(InvalidMetadataSnafu)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/runtime/src/dataconnector/dremio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ impl DataConnectorFactory for DremioFactory {
params
.parameters
.get("password")
.expose()
.ok()
.unwrap_or_default(),
.cloned()
.unwrap_or("".into()),
);
let flight_client = FlightClient::try_new(endpoint, credentials, None)
.await
Expand Down
3 changes: 1 addition & 2 deletions crates/runtime/src/dataconnector/spiceai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@ impl DataConnectorFactory for SpiceAIFactory {
let api_key = params
.parameters
.get("api_key")
.expose()
.ok_or_else(|p| MissingRequiredParameterSnafu { parameter: p.0 }.build())?;
let credentials = Credentials::new("", api_key);
let credentials = Credentials::new("", api_key.clone());

let flight_client = FlightClient::try_new(url, credentials, None)
.await
Expand Down
4 changes: 2 additions & 2 deletions crates/runtime/src/http/v1/nsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ fn return_sql_only(accept: Option<&TypedHeader<Accept>>) -> bool {
accept.is_some_and(|a| accept_header_types(a).contains(&"application/sql".to_string()))
}

/// Text to SQL
/// Text-to-SQL (NSQL)
///
/// Generate and optionally execute an NSQL query.
/// Generate and optionally execute a natural-language text-to-SQL (NSQL) query.
///
/// This endpoint generates a SQL query using a natural language query (NSQL) and optionally executes it.
/// The SQL query is generated by the specified model and executed if the `Accept` header is not set to `application/sql`.
Expand Down
2 changes: 2 additions & 0 deletions crates/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ hostname = "0.4.0"
opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
otel-arrow = { path = "../otel-arrow" }
secrecy.workspace = true
snafu.workspace = true
sha2 = "0.10.8"
tokio.workspace = true
tracing.workspace = true
Expand Down
66 changes: 20 additions & 46 deletions crates/telemetry/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ limitations under the License.
*/

use std::{
sync::{Arc, LazyLock, Weak},
sync::{Arc, LazyLock},
time::Duration,
};

use crate::exporter::AnonymousTelemetryExporter;
use crate::{exporter::TelemetryExporterBuilder, reader::InitialReader};
use opentelemetry::KeyValue;
use opentelemetry_sdk::{
Resource,
metrics::{
InstrumentKind, ManualReader, PeriodicReader, Pipeline, SdkMeterProvider, Temporality,
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
PeriodicReader, SdkMeterProvider, data::ResourceMetrics, exporter::PushMetricExporter,
reader::MetricReader,
},
runtime::Tokio,
};
Expand All @@ -34,16 +34,16 @@ use sha2::{Digest, Sha256};

const ENDPOINT_CONST: &str = "https://telemetry.spiceai.io";

/// How often to send telemetry data to the endpoint
const TELEMETRY_INTERVAL_SECONDS: u64 = 3600; // 1 hour
const TELEMETRY_TIMEOUT_SECONDS: u64 = 30;

static ENDPOINT: LazyLock<Arc<str>> = LazyLock::new(|| {
pub static ENDPOINT: LazyLock<Arc<str>> = LazyLock::new(|| {
std::env::var("SPICEAI_TELEMETRY_ENDPOINT")
.unwrap_or_else(|_| ENDPOINT_CONST.into())
.into()
});

/// How often to send telemetry data to the endpoint
const TELEMETRY_INTERVAL_SECONDS: u64 = 3600; // 1 hour
const TELEMETRY_TIMEOUT_SECONDS: u64 = 30;

fn resource(spicepod_name: &str, telemetry_properties: Vec<KeyValue>) -> Resource {
let hostname = hostname::get()
.unwrap_or_else(|_| "unknown".into())
Expand Down Expand Up @@ -72,8 +72,17 @@ fn resource(spicepod_name: &str, telemetry_properties: Vec<KeyValue>) -> Resourc
pub async fn start(spicepod_name: &str, telemetry_properties: Vec<KeyValue>) {
let resource = resource(spicepod_name, telemetry_properties);

let oss_telemetry_exporter =
OtelArrowExporter::new(AnonymousTelemetryExporter::new(Arc::clone(&ENDPOINT)).await);
let Ok(exporter) = TelemetryExporterBuilder::new()
.with_endpoint(Arc::clone(&ENDPOINT))
.with_service_name("oss_telemetry".into())
.build()
.await
else {
tracing::trace!("Failed to setup telemetry exporter - skipping telemetry");
return;
};

let oss_telemetry_exporter = OtelArrowExporter::new(exporter);

let periodic_reader = PeriodicReader::builder(oss_telemetry_exporter.clone(), Tokio)
.with_interval(Duration::from_secs(TELEMETRY_INTERVAL_SECONDS))
Expand Down Expand Up @@ -118,38 +127,3 @@ pub async fn start(spicepod_name: &str, telemetry_properties: Vec<KeyValue>) {

tracing::trace!("Started anonymous telemetry collection to {}", *ENDPOINT);
}

#[derive(Debug, Clone)]
struct InitialReader {
reader: Arc<ManualReader>,
}

impl InitialReader {
pub fn new() -> Self {
Self {
reader: Arc::new(ManualReader::builder().build()),
}
}
}

impl MetricReader for InitialReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.reader.register_pipeline(pipeline);
}

fn collect(&self, rm: &mut ResourceMetrics) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.reader.collect(rm)
}

fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.reader.force_flush()
}

fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.reader.shutdown()
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.reader.temporality(kind)
}
}
89 changes: 77 additions & 12 deletions crates/telemetry/src/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,100 @@ use arrow::array::RecordBatch;
use async_trait::async_trait;
use flight_client::{Credentials, FlightClient};
use opentelemetry_sdk::metrics::MetricError;
use secrecy::SecretString;
use snafu::prelude::*;

#[derive(Debug, Clone)]
pub struct AnonymousTelemetryExporter {
flight_client: Option<FlightClient>,
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"An endpoint is required to connect to telemetry.\nSupply an endpoint to the telemetry builder.\nReport a bug on GitHub: https://github.com/spiceai/spiceai/issues"
))]
MissingEndpoint,
#[snafu(display(
"A service name is required to connect to telemetry.\nSupply a service name to the telemetry builder.\nReport a bug on GitHub: https://github.com/spiceai/spiceai/issues"
))]
MissingServiceName,
}

impl AnonymousTelemetryExporter {
#[allow(dead_code)]
pub async fn new(url: Arc<str>) -> Self {
let flight_client = match FlightClient::try_new(url, Credentials::anonymous(), None).await {
#[derive(Debug, Default)]
pub struct TelemetryExporterBuilder {
api_key: Option<SecretString>,
service_name: Option<Arc<str>>,
endpoint: Option<Arc<str>>,
}

impl TelemetryExporterBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}

#[must_use]
pub fn with_api_key(mut self, api_key: SecretString) -> Self {
self.api_key = Some(api_key);
self
}

#[must_use]
pub fn with_service_name(mut self, service_name: Arc<str>) -> Self {
self.service_name = Some(service_name);
self
}

#[must_use]
pub fn with_endpoint(mut self, endpoint: Arc<str>) -> Self {
self.endpoint = Some(endpoint);
self
}

/// Creates a new telemetry exporter.
///
/// # Errors
///
/// Returns an error if the endpoint is not set.
pub async fn build(self) -> Result<TelemetryExporter, Error> {
let credentials = if let Some(api_key) = self.api_key {
Credentials::new("", api_key)
} else {
Credentials::anonymous()
};

let endpoint = self.endpoint.ok_or(Error::MissingEndpoint)?;
let flight_client = match FlightClient::try_new(endpoint, credentials, None).await {
Ok(client) => Some(client),
Err(e) => {
tracing::trace!("Unable to initialize anonymous telemetry: {e}");
tracing::trace!("Unable to initialize telemetry: {e}");
None
}
};
Self { flight_client }

let service_name = self.service_name.ok_or(Error::MissingServiceName)?;

Ok(TelemetryExporter {
flight_client,
service_name,
})
}
}

#[derive(Debug, Clone)]
pub struct TelemetryExporter {
flight_client: Option<FlightClient>,
service_name: Arc<str>,
}

#[async_trait]
impl otel_arrow::ArrowExporter for AnonymousTelemetryExporter {
impl otel_arrow::ArrowExporter for TelemetryExporter {
async fn export(&self, metrics: RecordBatch) -> Result<(), MetricError> {
let Some(mut flight_client) = self.flight_client.clone() else {
return Ok(());
};

if let Err(e) = flight_client.publish("oss_telemetry", vec![metrics]).await {
tracing::trace!("Unable to publish anonymous telemetry: {e}");
if let Err(e) = flight_client
.publish(&self.service_name, vec![metrics])
.await
{
tracing::trace!("Unable to publish telemetry: {e}");
};

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use std::{sync::LazyLock, time::Duration};

#[cfg(feature = "anonymous_telemetry")]
pub mod anonymous;
mod exporter;
mod meter;
pub mod exporter;
pub mod meter;
pub mod noop;
pub mod reader;

static QUERY_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
METER
Expand Down
7 changes: 3 additions & 4 deletions crates/telemetry/src/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ use opentelemetry::metrics::{Meter, MeterProvider};

use crate::noop::NoopMeterProvider;

pub(crate) static METER_PROVIDER_ONCE: OnceLock<Arc<dyn MeterProvider + Send + Sync>> =
OnceLock::new();
pub static METER_PROVIDER_ONCE: OnceLock<Arc<dyn MeterProvider + Send + Sync>> = OnceLock::new();

/// If the meter provider isn't initialized for anonymous telemetry, use a `NoopMeterProvider`.
///
/// This allows the instrumented code to not require any changes when anonymous telemetry is disabled/compiled out.
static METER_PROVIDER: LazyLock<&'static Arc<dyn MeterProvider + Send + Sync>> =
pub static METER_PROVIDER: LazyLock<&'static Arc<dyn MeterProvider + Send + Sync>> =
LazyLock::new(|| METER_PROVIDER_ONCE.get_or_init(|| Arc::new(NoopMeterProvider::new())));

pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| METER_PROVIDER.meter("oss_telemetry"));
pub static METER: LazyLock<Meter> = LazyLock::new(|| METER_PROVIDER.meter("oss_telemetry"));
Loading
Loading