From 6f16899499686a3f6c9da0cf83762294f97e89a4 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Sun, 14 Apr 2024 22:35:12 +0000 Subject: [PATCH 1/6] Wrapping into Arc, plus other --- Cargo.lock | 80 +++++++++++++++++--------------------------- Cargo.toml | 6 ++-- src/catalog.rs | 6 ++-- src/client/mod.rs | 11 +++--- src/dataframe.rs | 21 ++++++------ src/errors.rs | 3 ++ src/readwriter.rs | 5 +-- src/session.rs | 21 +++++++----- src/streaming/mod.rs | 23 +++++++------ 9 files changed, 87 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4cc56f7..fea0957 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,9 +62,9 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" [[package]] name = "arrow" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219d05930b81663fd3b32e3bde8ce5bff3c4d23052a99f11a8fa50a3b47b2658" +checksum = "aa285343fba4d829d49985bdc541e3789cf6000ed0e84be7c039438df4a4e78c" dependencies = [ "arrow-arith", "arrow-array", @@ -83,9 +83,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0272150200c07a86a390be651abdd320a2d12e84535f0837566ca87ecd8f95e0" +checksum = "753abd0a5290c1bcade7c6623a556f7d1659c5f4148b140b5b63ce7bd1a45705" dependencies = [ "arrow-array", "arrow-buffer", @@ -98,9 +98,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8010572cf8c745e242d1b632bd97bd6d4f40fefed5ed1290a8f433abaa686fea" +checksum = "d390feeb7f21b78ec997a4081a025baef1e2e0d6069e181939b61864c9779609" dependencies = [ "ahash", "arrow-buffer", @@ -114,9 +114,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d0a2432f0cba5692bf4cb757469c66791394bac9ec7ce63c1afe74744c37b27" +checksum = "69615b061701bcdffbc62756bc7e85c827d5290b472b580c972ebbbf690f5aa4" dependencies = [ "bytes", "half", @@ -125,30 +125,28 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9abc10cd7995e83505cc290df9384d6e5412b207b79ce6bdff89a10505ed2cba" +checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", - "atoi", - "base64 0.22.0", + "base64", "chrono", "comfy-table", "half", "lexical-core", "num", - "ryu", ] [[package]] name = "arrow-csv" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95cbcba196b862270bf2a5edb75927380a7f3a163622c61d40cbba416a6305f2" +checksum = "46af72211f0712612f5b18325530b9ad1bfbdc87290d5fbfd32a7da128983781" dependencies = [ "arrow-array", "arrow-buffer", @@ -165,9 +163,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2742ac1f6650696ab08c88f6dd3f0eb68ce10f8c253958a18c943a68cd04aec5" +checksum = "67d644b91a162f3ad3135ce1184d0a31c28b816a581e08f29e8e9277a574c64e" dependencies = [ "arrow-buffer", "arrow-schema", @@ -177,9 +175,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42ea853130f7e78b9b9d178cb4cd01dee0f78e64d96c2949dc0a915d6d9e19d" +checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" dependencies = [ "arrow-array", "arrow-buffer", @@ -191,9 +189,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaafb5714d4e59feae964714d724f880511500e3569cc2a94d02456b403a2a49" +checksum = "8950719280397a47d37ac01492e3506a8a724b3fb81001900b866637a829ee0f" dependencies = [ "arrow-array", "arrow-buffer", @@ -211,9 +209,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3e6b61e3dc468f503181dccc2fc705bdcc5f2f146755fa5b56d0a6c5943f412" +checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" dependencies = [ "arrow-array", "arrow-buffer", @@ -226,9 +224,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "848ee52bb92eb459b811fb471175ea3afcf620157674c8794f539838920f9228" +checksum = "007035e17ae09c4e8993e4cb8b5b96edf0afb927cd38e2dff27189b274d83dcf" dependencies = [ "ahash", "arrow-array", @@ -241,15 +239,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d9483aaabe910c4781153ae1b6ae0393f72d9ef757d38d09d450070cf2e528" +checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" [[package]] name = "arrow-select" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "849524fa70e0e3c5ab58394c770cb8f514d0122d20de08475f7b472ed8075830" +checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" dependencies = [ "ahash", "arrow-array", @@ -261,16 +259,15 @@ dependencies = [ [[package]] name = "arrow-string" -version = "51.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9373cb5a021aee58863498c37eb484998ef13377f69989c6c5ccfbd258236cdb" +checksum = "00f3b37f2aeece31a2636d1b037dabb69ef590e03bdc7eb68519b51ec86932a7" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", - "memchr", "num", "regex", "regex-syntax 0.8.2", @@ -309,15 +306,6 @@ dependencies = [ "syn 2.0.57", ] -[[package]] -name = "atoi" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" -dependencies = [ - "num-traits", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -416,12 +404,6 @@ version = "0.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53" -[[package]] -name = "base64" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" - [[package]] name = "bindgen" version = "0.69.4" @@ -1602,7 +1584,7 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f48172685e6ff52a556baa527774f61fcaa884f59daf3375c62a3f1cd2549dab" dependencies = [ - "base64 0.21.3", + "base64", "rustls-pki-types", ] @@ -1968,7 +1950,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.21.3", + "base64", "bytes", "h2", "http", diff --git a/Cargo.toml b/Cargo.toml index 5eef5e9..eb1063a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,12 +22,12 @@ tonic = { version ="0.11.0"} tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] } tokio-rustls = "0.26.0" -arrow = { version = "51.0.0", features = ["prettyprint"] } -arrow-ipc = "51.0.0" +arrow = { version = "50.0.0", features = ["prettyprint"] } +arrow-ipc = "50.0.0" serde_json = "1.0.115" -parking_lot = "0.12.1" +parking_lot = { version="0.12.1" , features = ["send_guard"]} prost = "0.12.0" prost-types = "0.12.0" diff --git a/src/catalog.rs b/src/catalog.rs index 14a5e9e..7d57dd7 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -1,5 +1,7 @@ //! Spark Catalog representation through which the user may create, drop, alter or query underlying databases, tables, functions, etc. +use std::sync::Arc; + use arrow::array::RecordBatch; use crate::errors::SparkError; @@ -9,11 +11,11 @@ use crate::spark; #[derive(Debug, Clone)] pub struct Catalog { - spark_session: SparkSession, + spark_session: Arc, } impl Catalog { - pub fn new(spark_session: SparkSession) -> Self { + pub fn new(spark_session: Arc) -> Self { Self { spark_session } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 8a8b3f6..4dc9364 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -127,15 +127,13 @@ impl ChannelBuilder { channel_builder.use_ssl = true } }; - channel_builder.headers = Some(metadata_builder(&headers)); - Ok(channel_builder) } async fn create_client(&self) -> Result { let endpoint = format!("https://{}:{}", self.host, self.port); - + println!("endpoint {:?}",endpoint.as_str()); let channel = Endpoint::from_shared(endpoint)?.connect().await?; let service_client = SparkConnectServiceClient::with_interceptor( @@ -302,6 +300,8 @@ impl ResponseHandler { } impl AnalyzeHandler { + + fn new() -> Self { Self { schema: None, @@ -317,6 +317,7 @@ impl AnalyzeHandler { get_storage_level: None, } } + } #[derive(Clone, Debug)] @@ -327,6 +328,9 @@ pub struct SparkConnectClient { pub analyzer: AnalyzeHandler, } +unsafe impl Send for SparkConnectClient {} +unsafe impl Sync for SparkConnectClient {} + impl SparkConnectClient where T: tonic::client::GrpcService, @@ -413,7 +417,6 @@ where self.handle_analyze(resp) } - fn handle_response(&mut self, resp: spark::ExecutePlanResponse) -> Result<(), SparkError> { self.validate_session(&resp.session_id)?; diff --git a/src/dataframe.rs b/src/dataframe.rs index 7ea899f..bbe1236 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -1,5 +1,7 @@ //! DataFrame representation for Spark Connection +use std::sync::Arc; + use crate::column::Column; use crate::errors::SparkError; use crate::expressions::{ToExpr, ToFilterExpr, ToVecExpr}; @@ -66,7 +68,7 @@ use arrow::util::pretty; #[derive(Clone, Debug)] pub struct DataFrame { /// Global [SparkSession] connecting to the remote cluster - pub spark_session: SparkSession, + pub spark_session: Arc, /// Logical Plan representing the unresolved Relation /// which will be submitted to the remote cluster @@ -75,7 +77,7 @@ pub struct DataFrame { impl DataFrame { /// create default DataFrame based on a spark session and initial logical plan - pub fn new(spark_session: SparkSession, logical_plan: LogicalPlanBuilder) -> DataFrame { + pub fn new(spark_session: Arc, logical_plan: LogicalPlanBuilder) -> DataFrame { DataFrame { spark_session, logical_plan, @@ -658,15 +660,14 @@ impl DataFrame { spark::analyze_plan_request::Analyze::Schema(spark::analyze_plan_request::Schema { plan: Some(plan), }); - - let data_type = self - .spark_session - .client() + let session = self.spark_session.clone(); + let mut client = session.client(); + let data_type = client .analyze(schema) - .await? - .schema()?; + .await?; + let schema = data_type.schema()?; + Ok(schema.clone()) - Ok(data_type) } /// Projects a set of expressions and returns a new [DataFrame] @@ -757,7 +758,7 @@ impl DataFrame { } #[allow(non_snake_case)] - pub fn sparkSession(self) -> SparkSession { + pub fn sparkSession(self) -> Arc { self.spark_session } diff --git a/src/errors.rs b/src/errors.rs index 9a25c9e..f92cab8 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -79,6 +79,9 @@ impl Display for SparkError { } } +unsafe impl Send for SparkError {} +unsafe impl Sync for SparkError {} + impl Error for SparkError { fn source(&self) -> Option<&(dyn Error + 'static)> { if let Self::ExternalError(e) = self { diff --git a/src/readwriter.rs b/src/readwriter.rs index 615f34d..ea70c64 100644 --- a/src/readwriter.rs +++ b/src/readwriter.rs @@ -1,6 +1,7 @@ //! DataFrameReader & DataFrameWriter representations use std::collections::HashMap; +use std::sync::Arc; use crate::errors::SparkError; use crate::plan::LogicalPlanBuilder; @@ -14,14 +15,14 @@ use spark::write_operation::SaveMode; /// from a specific file format. #[derive(Clone, Debug)] pub struct DataFrameReader { - spark_session: SparkSession, + spark_session: Arc, format: Option, read_options: HashMap, } impl DataFrameReader { /// Create a new DataFrameReader with a [SparkSession] - pub fn new(spark_session: SparkSession) -> Self { + pub fn new(spark_session: Arc) -> Self { Self { spark_session, format: None, diff --git a/src/session.rs b/src/session.rs index 05cf905..62296dc 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,6 +1,7 @@ //! Spark Session containing the remote gRPC client use std::collections::HashMap; +use std::sync::Arc; use crate::catalog::Catalog; pub use crate::client::SparkSessionBuilder; @@ -39,7 +40,7 @@ impl SparkSession { /// `end` (exclusive) with a step value `step`, and control the number /// of partitions with `num_partitions` pub fn range( - self, + self: Arc, start: Option, end: i64, step: i64, @@ -56,28 +57,28 @@ impl SparkSession { } /// Returns a [DataFrameReader] that can be used to read datra in as a [DataFrame] - pub fn read(self) -> DataFrameReader { + pub fn read(self: Arc) -> DataFrameReader { DataFrameReader::new(self) } /// Returns a [DataFrameReader] that can be used to read datra in as a [DataFrame] #[allow(non_snake_case)] - pub fn readStream(self) -> DataStreamReader { + pub fn readStream(self: Arc) -> DataStreamReader { DataStreamReader::new(self) } - pub fn table(self, name: &str) -> Result { + pub fn table(self: Arc, name: &str) -> Result { DataFrameReader::new(self).table(name, None) } /// Interface through which the user may create, drop, alter or query underlying databases, /// tables, functions, etc. - pub fn catalog(self) -> Catalog { + pub fn catalog(self: Arc) -> Catalog { Catalog::new(self) } /// Returns a [DataFrame] representing the result of the given query - pub async fn sql(self, sql_query: &str) -> Result { + pub async fn sql(self: Arc, sql_query: &str) -> Result { let sql_cmd = spark::command::CommandType::SqlCommand(spark::SqlCommand { sql: sql_query.to_string(), args: HashMap::default(), @@ -100,7 +101,7 @@ impl SparkSession { } #[allow(non_snake_case)] - pub fn createDataFrame(self, data: &RecordBatch) -> Result { + pub fn createDataFrame(self: Arc, data: &RecordBatch) -> Result { let logical_plan = LogicalPlanBuilder::local_relation(data)?; Ok(DataFrame::new(self, logical_plan)) } @@ -111,7 +112,9 @@ impl SparkSession { } /// Spark Connection gRPC client interface - pub fn client(self) -> SparkConnectClient> { - self.client + pub fn client( + self: Arc, + ) -> SparkConnectClient> { + self.client.clone() } } diff --git a/src/streaming/mod.rs b/src/streaming/mod.rs index 6d66031..3e79dad 100644 --- a/src/streaming/mod.rs +++ b/src/streaming/mod.rs @@ -1,6 +1,7 @@ //! Streaming implementation for the Spark Connect Client use std::collections::HashMap; +use std::sync::Arc; use crate::plan::LogicalPlanBuilder; use crate::session::SparkSession; @@ -13,14 +14,14 @@ use crate::errors::SparkError; /// DataStreamReader represents the entrypoint to create a streaming DataFrame #[derive(Clone, Debug)] pub struct DataStreamReader { - spark_session: SparkSession, + spark_session: Arc, format: Option, schema: Option, read_options: HashMap, } impl DataStreamReader { - pub fn new(spark_session: SparkSession) -> Self { + pub fn new(spark_session: Arc) -> Self { Self { spark_session, format: None, @@ -238,7 +239,7 @@ impl DataStreamWriter { .write_stream_operation_start_result; Ok(StreamingQuery::new( - self.dataframe.spark_session, + self.dataframe.spark_session.clone(), res.unwrap(), )) } @@ -270,7 +271,7 @@ impl DataStreamWriter { /// This object is used to control and monitor the active stream #[derive(Clone, Debug)] pub struct StreamingQuery { - spark_session: SparkSession, + spark_session: Arc, query_instance: spark::StreamingQueryInstanceId, query_id: String, run_id: String, @@ -279,7 +280,7 @@ pub struct StreamingQuery { impl StreamingQuery { pub fn new( - spark_session: SparkSession, + spark_session: Arc, write_stream: spark::WriteStreamOperationStartResult, ) -> Self { let query_instance = write_stream.query_id.unwrap(); @@ -475,15 +476,17 @@ mod tests { use crate::errors::SparkError; use crate::SparkSessionBuilder; - async fn setup() -> SparkSession { + async fn setup() -> Arc { println!("SparkSession Setup"); let connection = "sc://127.0.0.1:15002/;user_id=rust_stream"; - SparkSessionBuilder::remote(connection) - .build() - .await - .unwrap() + Arc::new( + SparkSessionBuilder::remote(connection) + .build() + .await + .unwrap(), + ) } #[tokio::test] From fe53515ada9bfa96a8fdfe5b03fe9ad5b845b012 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 15 Apr 2024 12:36:19 +0000 Subject: [PATCH 2/6] Reverting arrow change --- Cargo.lock | 80 +++++++++++++++++++++++++++++++++--------------------- Cargo.toml | 4 +-- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fea0957..4cc56f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,9 +62,9 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" [[package]] name = "arrow" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa285343fba4d829d49985bdc541e3789cf6000ed0e84be7c039438df4a4e78c" +checksum = "219d05930b81663fd3b32e3bde8ce5bff3c4d23052a99f11a8fa50a3b47b2658" dependencies = [ "arrow-arith", "arrow-array", @@ -83,9 +83,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "753abd0a5290c1bcade7c6623a556f7d1659c5f4148b140b5b63ce7bd1a45705" +checksum = "0272150200c07a86a390be651abdd320a2d12e84535f0837566ca87ecd8f95e0" dependencies = [ "arrow-array", "arrow-buffer", @@ -98,9 +98,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d390feeb7f21b78ec997a4081a025baef1e2e0d6069e181939b61864c9779609" +checksum = "8010572cf8c745e242d1b632bd97bd6d4f40fefed5ed1290a8f433abaa686fea" dependencies = [ "ahash", "arrow-buffer", @@ -114,9 +114,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69615b061701bcdffbc62756bc7e85c827d5290b472b580c972ebbbf690f5aa4" +checksum = "0d0a2432f0cba5692bf4cb757469c66791394bac9ec7ce63c1afe74744c37b27" dependencies = [ "bytes", "half", @@ -125,28 +125,30 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" +checksum = "9abc10cd7995e83505cc290df9384d6e5412b207b79ce6bdff89a10505ed2cba" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", - "base64", + "atoi", + "base64 0.22.0", "chrono", "comfy-table", "half", "lexical-core", "num", + "ryu", ] [[package]] name = "arrow-csv" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46af72211f0712612f5b18325530b9ad1bfbdc87290d5fbfd32a7da128983781" +checksum = "95cbcba196b862270bf2a5edb75927380a7f3a163622c61d40cbba416a6305f2" dependencies = [ "arrow-array", "arrow-buffer", @@ -163,9 +165,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67d644b91a162f3ad3135ce1184d0a31c28b816a581e08f29e8e9277a574c64e" +checksum = "2742ac1f6650696ab08c88f6dd3f0eb68ce10f8c253958a18c943a68cd04aec5" dependencies = [ "arrow-buffer", "arrow-schema", @@ -175,9 +177,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" +checksum = "a42ea853130f7e78b9b9d178cb4cd01dee0f78e64d96c2949dc0a915d6d9e19d" dependencies = [ "arrow-array", "arrow-buffer", @@ -189,9 +191,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8950719280397a47d37ac01492e3506a8a724b3fb81001900b866637a829ee0f" +checksum = "eaafb5714d4e59feae964714d724f880511500e3569cc2a94d02456b403a2a49" dependencies = [ "arrow-array", "arrow-buffer", @@ -209,9 +211,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" +checksum = "e3e6b61e3dc468f503181dccc2fc705bdcc5f2f146755fa5b56d0a6c5943f412" dependencies = [ "arrow-array", "arrow-buffer", @@ -224,9 +226,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "007035e17ae09c4e8993e4cb8b5b96edf0afb927cd38e2dff27189b274d83dcf" +checksum = "848ee52bb92eb459b811fb471175ea3afcf620157674c8794f539838920f9228" dependencies = [ "ahash", "arrow-array", @@ -239,15 +241,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" +checksum = "02d9483aaabe910c4781153ae1b6ae0393f72d9ef757d38d09d450070cf2e528" [[package]] name = "arrow-select" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" +checksum = "849524fa70e0e3c5ab58394c770cb8f514d0122d20de08475f7b472ed8075830" dependencies = [ "ahash", "arrow-array", @@ -259,15 +261,16 @@ dependencies = [ [[package]] name = "arrow-string" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f3b37f2aeece31a2636d1b037dabb69ef590e03bdc7eb68519b51ec86932a7" +checksum = "9373cb5a021aee58863498c37eb484998ef13377f69989c6c5ccfbd258236cdb" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", + "memchr", "num", "regex", "regex-syntax 0.8.2", @@ -306,6 +309,15 @@ dependencies = [ "syn 2.0.57", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -404,6 +416,12 @@ version = "0.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "bindgen" version = "0.69.4" @@ -1584,7 +1602,7 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f48172685e6ff52a556baa527774f61fcaa884f59daf3375c62a3f1cd2549dab" dependencies = [ - "base64", + "base64 0.21.3", "rustls-pki-types", ] @@ -1950,7 +1968,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.21.3", "bytes", "h2", "http", diff --git a/Cargo.toml b/Cargo.toml index eb1063a..590a290 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,8 +22,8 @@ tonic = { version ="0.11.0"} tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] } tokio-rustls = "0.26.0" -arrow = { version = "50.0.0", features = ["prettyprint"] } -arrow-ipc = "50.0.0" +arrow = { version = "51.0.0", features = ["prettyprint"] } +arrow-ipc = "51.0.0" serde_json = "1.0.115" From 1483bcfae6546a71bf9db9808c8ff55df2776635 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 15 Apr 2024 12:40:42 +0000 Subject: [PATCH 3/6] Revert dummy print --- src/client/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 4dc9364..189f087 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -133,7 +133,6 @@ impl ChannelBuilder { async fn create_client(&self) -> Result { let endpoint = format!("https://{}:{}", self.host, self.port); - println!("endpoint {:?}",endpoint.as_str()); let channel = Endpoint::from_shared(endpoint)?.connect().await?; let service_client = SparkConnectServiceClient::with_interceptor( From d1e656b91c50f66f15c5ed9bedfcecbb5b11abe1 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 15 Apr 2024 13:08:50 +0000 Subject: [PATCH 4/6] Updated examples --- examples/databricks.rs | 6 ++++-- examples/delta.rs | 4 +++- examples/reader.rs | 4 +++- examples/readstream.rs | 9 ++++++--- examples/sql.rs | 10 +++++++--- examples/writer.rs | 4 +++- 6 files changed, 26 insertions(+), 11 deletions(-) diff --git a/examples/databricks.rs b/examples/databricks.rs index 372bed5..2343171 100644 --- a/examples/databricks.rs +++ b/examples/databricks.rs @@ -11,9 +11,11 @@ use spark_connect_rs::{SparkSession, SparkSessionBuilder}; #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = SparkSessionBuilder::remote("sc://:443/;token=;x-databricks-cluster-id=") + let spark:Arc = Arc::new( + SparkSessionBuilder::remote("sc://:443/;token=;x-databricks-cluster-id=") .build() - .await?; + .await? + ); spark .range(None, 10, 1, Some(1)) diff --git a/examples/delta.rs b/examples/delta.rs index 425befa..e51395b 100644 --- a/examples/delta.rs +++ b/examples/delta.rs @@ -6,13 +6,15 @@ // The remote spark session must have the spark package `io.delta:delta-spark_2.12:{DELTA_VERSION}` enabled. // Where the `DELTA_VERSION` is the specified Delta Lake version. +use std::sync::Arc; + use spark_connect_rs::{SparkSession, SparkSessionBuilder}; use spark_connect_rs::dataframe::SaveMode; #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = SparkSessionBuilder::default().build().await?; + let spark:Arc = Arc::new(SparkSessionBuilder::default().build().await?); let paths = ["/opt/spark/examples/src/main/resources/people.csv"]; diff --git a/examples/reader.rs b/examples/reader.rs index 550469a..05d73e8 100644 --- a/examples/reader.rs +++ b/examples/reader.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use spark_connect_rs::{SparkSession, SparkSessionBuilder}; use spark_connect_rs::functions as F; @@ -7,7 +9,7 @@ use spark_connect_rs::functions as F; // printing the results as "show(...)" #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = SparkSessionBuilder::default().build().await?; + let spark:Arc = Arc::new(SparkSessionBuilder::default().build().await?); let path = ["/opt/spark/examples/src/main/resources/people.csv"]; diff --git a/examples/readstream.rs b/examples/readstream.rs index 025098a..9accf4e 100644 --- a/examples/readstream.rs +++ b/examples/readstream.rs @@ -3,15 +3,18 @@ use spark_connect_rs; use spark_connect_rs::streaming::{OutputMode, Trigger}; use spark_connect_rs::{SparkSession, SparkSessionBuilder}; +use std::sync::Arc; use std::{thread, time}; // This example demonstrates creating a Spark Stream and monitoring the progress #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = - SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") + let spark: Arc = + Arc::new( + SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") .build() - .await?; + .await? + ); let df = spark .readStream() diff --git a/examples/sql.rs b/examples/sql.rs index 1284a5c..3692ebb 100644 --- a/examples/sql.rs +++ b/examples/sql.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use spark_connect_rs; use spark_connect_rs::{SparkSession, SparkSessionBuilder}; @@ -7,10 +9,12 @@ use spark_connect_rs::{SparkSession, SparkSessionBuilder}; // Displaying the results as "show(...)" #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = - SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") + let spark: Arc = + Arc::new( + SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") .build() - .await?; + .await? + ); let df = spark .sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`") diff --git a/examples/writer.rs b/examples/writer.rs index 42e9708..e0312da 100644 --- a/examples/writer.rs +++ b/examples/writer.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use spark_connect_rs; use spark_connect_rs::{SparkSession, SparkSessionBuilder}; @@ -11,7 +13,7 @@ use spark_connect_rs::dataframe::SaveMode; // then reading the csv file back #[tokio::main] async fn main() -> Result<(), Box> { - let spark: SparkSession = SparkSessionBuilder::default().build().await?; + let spark: Arc = Arc::new(SparkSessionBuilder::default().build().await?); let df = spark .clone() From cff3e9bfcec878d42553c949f7d8317ced1cbcdd Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 15 Apr 2024 13:10:13 +0000 Subject: [PATCH 5/6] Fixing format --- examples/delta.rs | 2 +- examples/reader.rs | 2 +- examples/readstream.rs | 9 ++++----- examples/sql.rs | 9 ++++----- src/client/mod.rs | 6 ------ src/dataframe.rs | 5 +---- 6 files changed, 11 insertions(+), 22 deletions(-) diff --git a/examples/delta.rs b/examples/delta.rs index e51395b..a9ca902 100644 --- a/examples/delta.rs +++ b/examples/delta.rs @@ -14,7 +14,7 @@ use spark_connect_rs::dataframe::SaveMode; #[tokio::main] async fn main() -> Result<(), Box> { - let spark:Arc = Arc::new(SparkSessionBuilder::default().build().await?); + let spark: Arc = Arc::new(SparkSessionBuilder::default().build().await?); let paths = ["/opt/spark/examples/src/main/resources/people.csv"]; diff --git a/examples/reader.rs b/examples/reader.rs index 05d73e8..118a55d 100644 --- a/examples/reader.rs +++ b/examples/reader.rs @@ -9,7 +9,7 @@ use spark_connect_rs::functions as F; // printing the results as "show(...)" #[tokio::main] async fn main() -> Result<(), Box> { - let spark:Arc = Arc::new(SparkSessionBuilder::default().build().await?); + let spark: Arc = Arc::new(SparkSessionBuilder::default().build().await?); let path = ["/opt/spark/examples/src/main/resources/people.csv"]; diff --git a/examples/readstream.rs b/examples/readstream.rs index 9accf4e..7020ec8 100644 --- a/examples/readstream.rs +++ b/examples/readstream.rs @@ -9,12 +9,11 @@ use std::{thread, time}; // This example demonstrates creating a Spark Stream and monitoring the progress #[tokio::main] async fn main() -> Result<(), Box> { - let spark: Arc = - Arc::new( - SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") + let spark: Arc = Arc::new( + SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") .build() - .await? - ); + .await?, + ); let df = spark .readStream() diff --git a/examples/sql.rs b/examples/sql.rs index 3692ebb..38aa4bf 100644 --- a/examples/sql.rs +++ b/examples/sql.rs @@ -9,12 +9,11 @@ use spark_connect_rs::{SparkSession, SparkSessionBuilder}; // Displaying the results as "show(...)" #[tokio::main] async fn main() -> Result<(), Box> { - let spark: Arc = - Arc::new( - SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") + let spark: Arc = Arc::new( + SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs") .build() - .await? - ); + .await?, + ); let df = spark .sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`") diff --git a/src/client/mod.rs b/src/client/mod.rs index 189f087..5dd2f18 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -299,8 +299,6 @@ impl ResponseHandler { } impl AnalyzeHandler { - - fn new() -> Self { Self { schema: None, @@ -316,7 +314,6 @@ impl AnalyzeHandler { get_storage_level: None, } } - } #[derive(Clone, Debug)] @@ -327,9 +324,6 @@ pub struct SparkConnectClient { pub analyzer: AnalyzeHandler, } -unsafe impl Send for SparkConnectClient {} -unsafe impl Sync for SparkConnectClient {} - impl SparkConnectClient where T: tonic::client::GrpcService, diff --git a/src/dataframe.rs b/src/dataframe.rs index bbe1236..c23001f 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -662,12 +662,9 @@ impl DataFrame { }); let session = self.spark_session.clone(); let mut client = session.client(); - let data_type = client - .analyze(schema) - .await?; + let data_type = client.analyze(schema).await?; let schema = data_type.schema()?; Ok(schema.clone()) - } /// Projects a set of expressions and returns a new [DataFrame] From ed5c679570d86d5625ebe0b26dde78ea060e5f41 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 17 Apr 2024 18:14:59 -0700 Subject: [PATCH 6/6] Add setCatalog and setDatabase to SparkSession --- src/session.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/session.rs b/src/session.rs index 62296dc..4f1a5c1 100644 --- a/src/session.rs +++ b/src/session.rs @@ -56,6 +56,34 @@ impl SparkSession { DataFrame::new(self, LogicalPlanBuilder::from(range_relation)) } + pub fn setCatalog(self: Arc, catalog: &str) -> DataFrame { + let catalog_relation = spark::relation::RelType::Catalog(spark::Catalog { + cat_type: Some(spark::catalog::CatType::SetCurrentCatalog( + spark::SetCurrentCatalog { + catalog_name: catalog.to_string(), + }, + )), + }); + + let logical_plan = LogicalPlanBuilder::from(catalog_relation); + + DataFrame::new(self, logical_plan) + } + + pub fn setDatabase(self: Arc, database: &str) -> DataFrame { + let catalog_relation = spark::relation::RelType::Catalog(spark::Catalog { + cat_type: Some(spark::catalog::CatType::SetCurrentDatabase( + spark::SetCurrentDatabase { + db_name: database.to_string(), + }, + )), + }); + + let logical_plan = LogicalPlanBuilder::from(catalog_relation); + + DataFrame::new(self, logical_plan) + } + /// Returns a [DataFrameReader] that can be used to read datra in as a [DataFrame] pub fn read(self: Arc) -> DataFrameReader { DataFrameReader::new(self)