diff --git a/kernel/examples/common/Cargo.toml b/kernel/examples/common/Cargo.toml index 5c8f75f94..2a6baae2d 100644 --- a/kernel/examples/common/Cargo.toml +++ b/kernel/examples/common/Cargo.toml @@ -17,4 +17,5 @@ delta_kernel = { path = "../../../kernel", features = [ "default-engine-rustls", "internal-api", ] } +object_store = { version = "0.12.3", features = ["aws", "azure", "gcp", "http"] } url = "2" diff --git a/kernel/examples/common/src/lib.rs b/kernel/examples/common/src/lib.rs index ab1a2f03a..b5976c6a8 100644 --- a/kernel/examples/common/src/lib.rs +++ b/kernel/examples/common/src/lib.rs @@ -2,7 +2,7 @@ //! utilities use std::{collections::HashMap, sync::Arc}; -use clap::Args; +use clap::{Args, CommandFactory, FromArgMatches}; use delta_kernel::{ arrow::array::RecordBatch, engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}, @@ -11,6 +11,10 @@ use delta_kernel::{ DeltaResult, SnapshotRef, }; +use object_store::{ + aws::AmazonS3Builder, azure::MicrosoftAzureBuilder, gcp::GoogleCloudStorageBuilder, + DynObjectStore, ObjectStoreScheme, +}; use url::Url; #[derive(Args)] @@ -19,9 +23,27 @@ pub struct LocationArgs { pub path: String, /// Region to specify to the cloud access store (only applies to S3) - #[arg(long)] + #[arg(long, conflicts_with = "env_creds", conflicts_with = "option")] pub region: Option, + /// Extra key-value pairs to pass to the ObjectStore builder. Note different object stores + /// accept different configuration options, see the object_store types: AmazonS3Builder, + /// MicrosoftAzureBuilder, and GoogleCloudStorageBuilder. Specify as "key=value", and pass + /// multiple times to set more than one option. + #[arg(long, conflicts_with = "env_creds", conflicts_with = "region")] + pub option: Vec, + + /// Get credentials from the environment. For details see the object_store types: + /// AmazonS3Builder, MicrosoftAzureBuilder, and GoogleCloudStorageBuilder. Specifically the + /// `from_env` method. + #[arg( + long, + conflicts_with = "region", + conflicts_with = "option", + default_value = "false" + )] + pub env_creds: bool, + /// Specify that the table is "public" (i.e. no cloud credentials are needed). This is required /// for things like s3 public buckets, otherwise the kernel will try and authenticate by talking /// to the aws metadata server, which will fail unless you're on an ec2 instance. @@ -44,20 +66,110 @@ pub struct ScanArgs { pub columns: Option>, } +pub trait ParseWithExamples { + /// parse command line, and add examples in help + /// program_name - name of program + /// caps_action - actions the program performs (like read), capitalized for start of sentence + /// action - same as above, for middle of sentence + /// trailing_args - will be put at the end of each example, used if the command needs extra args + fn parse_with_examples( + program_name: &str, + caps_action: &str, + action: &str, + trailing_args: &str, + ) -> T; +} + +impl ParseWithExamples for T +where + T: clap::Parser, +{ + fn parse_with_examples( + program_name: &str, + caps_action: &str, + action: &str, + trailing_args: &str, + ) -> Self { + let examples = format!("Examples: + {caps_action} table at foo/bar/bazz, relative to where invoked: + {program_name} foo/bar/bazz {trailing_args} + + Get S3 credentials, region, etc. from the environment, and {action} table on S3: + {program_name} --env_creds s3://path/to/table {trailing_args} + + Specify azure credentials on the command line and {action} table in azure: + {program_name} --option AZURE_STORAGE_ACCOUNT_NAME=my_account --option AZURE_STORAGE_ACCOUNT_KEY=my_key [more --option args] az://account/container/path {trailing_args} + + {caps_action} a table in a public S3 bucket in us-west-2 region: + {program_name} --region us-west-2 --public s3://my/public/table {trailing_args}"); + let mut matches = ::command() + .after_help(examples) + .get_matches(); + let res = ::from_arg_matches_mut(&mut matches) + .map_err(|e| e.format(&mut Self::command())); + match res { + Ok(s) => s, + Err(e) => e.exit(), + } + } +} + /// Get an engine configured to read table at `url` and `LocationArgs` pub fn get_engine( url: &Url, args: &LocationArgs, ) -> DeltaResult> { - let mut options = if let Some(ref region) = args.region { - HashMap::from([("region", region.clone())]) + if args.env_creds { + let (scheme, _path) = ObjectStoreScheme::parse(url).map_err(|e| { + delta_kernel::Error::Generic(format!("Object store could not parse url: {}", e)) + })?; + use ObjectStoreScheme::*; + let url_str = url.to_string(); + let store: Arc = match scheme { + AmazonS3 => Arc::new(AmazonS3Builder::from_env().with_url(url_str).build()?), + GoogleCloudStorage => Arc::new( + GoogleCloudStorageBuilder::from_env() + .with_url(url_str) + .build()?, + ), + MicrosoftAzure => Arc::new( + MicrosoftAzureBuilder::from_env() + .with_url(url_str) + .build()?, + ), + Local | Memory | Http => { + return Err(delta_kernel::Error::Generic(format!( + "Scheme {scheme:?} doesn't support getting credentials from environment" + ))); + } + _ => { + // scheme is non-exhaustive + return Err(delta_kernel::Error::Generic(format!( + "Unknown schema {scheme:?} doesn't support getting credentials from environment" + ))); + } + }; + Ok(DefaultEngine::new( + store, + Arc::new(TokioBackgroundExecutor::new()), + )) + } else if !args.option.is_empty() { + let opts = args.option.iter().map(|option| { + let parts: Vec<&str> = option.split("=").collect(); + (parts[0].to_ascii_lowercase(), parts[1]) + }); + DefaultEngine::try_new(url, opts, Arc::new(TokioBackgroundExecutor::new())) } else { - HashMap::new() - }; - if args.public { - options.insert("skip_signature", "true".to_string()); + let mut options = if let Some(ref region) = args.region { + HashMap::from([("region", region.clone())]) + } else { + HashMap::new() + }; + if args.public { + options.insert("skip_signature", "true".to_string()); + } + DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new())) } - DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new())) } /// Construct a scan at the latest snapshot. This is over the specified table and using the passed diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 67a651ae1..60d6146d9 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -1,4 +1,4 @@ -use common::LocationArgs; +use common::{LocationArgs, ParseWithExamples}; use delta_kernel::actions::visitors::{ visit_metadata_at, visit_protocol_at, AddVisitor, CdcVisitor, RemoveVisitor, SetTransactionVisitor, @@ -178,7 +178,7 @@ fn print_scan_file( } fn try_main() -> DeltaResult<()> { - let cli = Cli::parse(); + let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Inspect", "inspect", ""); let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; let engine = common::get_engine(&url, &cli.location_args)?; diff --git a/kernel/examples/read-table-changes/src/main.rs b/kernel/examples/read-table-changes/src/main.rs index 8ffee30ff..db5e7a7be 100644 --- a/kernel/examples/read-table-changes/src/main.rs +++ b/kernel/examples/read-table-changes/src/main.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use clap::Parser; -use common::LocationArgs; +use common::{LocationArgs, ParseWithExamples}; use delta_kernel::arrow::array::RecordBatch; use delta_kernel::arrow::{compute::filter_record_batch, util::pretty::print_batches}; use delta_kernel::engine::arrow_data::ArrowEngineData; @@ -25,7 +25,12 @@ struct Cli { } fn main() -> DeltaResult<()> { - let cli = Cli::parse(); + let cli = Cli::parse_with_examples( + env!("CARGO_PKG_NAME"), + "Read changes in", + "read changes in", + "", + ); let url = delta_kernel::try_parse_uri(cli.location_args.path.as_str())?; let engine = common::get_engine(&url, &cli.location_args)?; let table_changes = TableChanges::try_new(url, &engine, cli.start_version, cli.end_version)?; diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index f52e4aa22..3effd8994 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -7,7 +7,7 @@ use std::thread; use arrow::compute::filter_record_batch; use arrow::record_batch::RecordBatch; use arrow::util::pretty::print_batches; -use common::{LocationArgs, ScanArgs}; +use common::{LocationArgs, ParseWithExamples, ScanArgs}; use delta_kernel::actions::deletion_vector::split_vector; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats}; @@ -94,7 +94,7 @@ struct ScanState { } fn try_main() -> DeltaResult<()> { - let cli = Cli::parse(); + let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Read", "read", ""); let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Reading {url}"); diff --git a/kernel/examples/read-table-single-threaded/src/main.rs b/kernel/examples/read-table-single-threaded/src/main.rs index b9ba43eeb..caa5dccf7 100644 --- a/kernel/examples/read-table-single-threaded/src/main.rs +++ b/kernel/examples/read-table-single-threaded/src/main.rs @@ -4,15 +4,14 @@ use std::sync::Arc; use arrow::compute::filter_record_batch; use arrow::record_batch::RecordBatch; use arrow::util::pretty::print_batches; -use common::{LocationArgs, ScanArgs}; +use common::{LocationArgs, ParseWithExamples, ScanArgs}; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::{DeltaResult, Snapshot}; use clap::Parser; use itertools::Itertools; -/// An example program that dumps out the data of a delta table. Struct and Map types are not -/// supported. +/// An example program that dumps out the data of a delta table. #[derive(Parser)] #[command(author, version, about, long_about = None)] #[command(propagate_version = true)] @@ -39,7 +38,7 @@ fn main() -> ExitCode { } fn try_main() -> DeltaResult<()> { - let cli = Cli::parse(); + let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Read", "read", ""); let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Reading {url}"); let engine = common::get_engine(&url, &cli.location_args)?; diff --git a/kernel/examples/write-table/Cargo.toml b/kernel/examples/write-table/Cargo.toml index 05552df51..3291944f6 100644 --- a/kernel/examples/write-table/Cargo.toml +++ b/kernel/examples/write-table/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] arrow = { version = "56", features = ["prettyprint", "chrono-tz"] } clap = { version = "4.5", features = ["derive"] } +common = { path = "../common" } delta_kernel = { path = "../../../kernel", features = [ "arrow-56", "default-engine-rustls", diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 138724ff4..729e9eabd 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use arrow::array::{BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray}; use arrow::util::pretty::print_batches; use clap::Parser; +use common::{LocationArgs, ParseWithExamples}; use itertools::Itertools; use serde_json::{json, to_vec}; use url::Url; @@ -26,9 +27,8 @@ use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef}; #[command(author, version, about, long_about = None)] #[command(propagate_version = true)] struct Cli { - /// Path to the table - #[arg(long, short = 'p')] - path: String, + #[command(flatten)] + location_args: LocationArgs, /// Comma-separated schema specification of the form `field_name:data_type` #[arg( @@ -59,16 +59,19 @@ async fn main() -> ExitCode { // TODO: Update the example once official write APIs are introduced (issue#1123) async fn try_main() -> DeltaResult<()> { - let cli = Cli::parse(); + let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Write", "write", ""); // Check if path is a directory and if not, create it - if !Path::new(&cli.path).exists() { - create_dir_all(&cli.path).map_err(|e| { - Error::generic(format!("Failed to create directory {}: {e}", &cli.path)) + if !Path::new(&cli.location_args.path).exists() { + create_dir_all(&cli.location_args.path).map_err(|e| { + Error::generic(format!( + "Failed to create directory {}: {e}", + &cli.location_args.path + )) })?; } - let url = delta_kernel::try_parse_uri(&cli.path)?; + let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Using Delta table at: {url}"); // Get the engine for local filesystem