Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/examples/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ delta_kernel = { path = "../../../kernel", features = [
"default-engine-rustls",
"internal-api",
] }
object_store = { version = "0.12.3", features = ["aws", "azure", "gcp", "http"] }
url = "2"
130 changes: 121 additions & 9 deletions kernel/examples/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! utilities
use std::{collections::HashMap, sync::Arc};

use clap::Args;
use clap::{Args, CommandFactory, FromArgMatches};
use delta_kernel::{
arrow::array::RecordBatch,
engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine},
Expand All @@ -11,6 +11,10 @@ use delta_kernel::{
DeltaResult, SnapshotRef,
};

use object_store::{
aws::AmazonS3Builder, azure::MicrosoftAzureBuilder, gcp::GoogleCloudStorageBuilder,
DynObjectStore, ObjectStoreScheme,
};
use url::Url;

#[derive(Args)]
Expand All @@ -19,9 +23,27 @@ pub struct LocationArgs {
pub path: String,

/// Region to specify to the cloud access store (only applies to S3)
#[arg(long)]
#[arg(long, conflicts_with = "env_creds", conflicts_with = "option")]
pub region: Option<String>,

/// Extra key-value pairs to pass to the ObjectStore builder. Note different object stores
/// accept different configuration options, see the object_store types: AmazonS3Builder,
/// MicrosoftAzureBuilder, and GoogleCloudStorageBuilder. Specify as "key=value", and pass
/// multiple times to set more than one option.
#[arg(long, conflicts_with = "env_creds", conflicts_with = "region")]
pub option: Vec<String>,

/// Get credentials from the environment. For details see the object_store types:
/// AmazonS3Builder, MicrosoftAzureBuilder, and GoogleCloudStorageBuilder. Specifically the
/// `from_env` method.
#[arg(
long,
conflicts_with = "region",
conflicts_with = "option",
default_value = "false"
)]
pub env_creds: bool,

/// Specify that the table is "public" (i.e. no cloud credentials are needed). This is required
/// for things like s3 public buckets, otherwise the kernel will try and authenticate by talking
/// to the aws metadata server, which will fail unless you're on an ec2 instance.
Expand All @@ -44,20 +66,110 @@ pub struct ScanArgs {
pub columns: Option<Vec<String>>,
}

pub trait ParseWithExamples<T> {
/// parse command line, and add examples in help
/// program_name - name of program
/// caps_action - actions the program performs (like read), capitalized for start of sentence
/// action - same as above, for middle of sentence
/// trailing_args - will be put at the end of each example, used if the command needs extra args
fn parse_with_examples(
program_name: &str,
caps_action: &str,
action: &str,
trailing_args: &str,
) -> T;
}

impl<T> ParseWithExamples<T> for T
where
T: clap::Parser,
{
fn parse_with_examples(
program_name: &str,
caps_action: &str,
action: &str,
trailing_args: &str,
) -> Self {
let examples = format!("Examples:
{caps_action} table at foo/bar/bazz, relative to where invoked:
{program_name} foo/bar/bazz {trailing_args}

Get S3 credentials, region, etc. from the environment, and {action} table on S3:
{program_name} --env_creds s3://path/to/table {trailing_args}

Specify azure credentials on the command line and {action} table in azure:
{program_name} --option AZURE_STORAGE_ACCOUNT_NAME=my_account --option AZURE_STORAGE_ACCOUNT_KEY=my_key [more --option args] az://account/container/path {trailing_args}

{caps_action} a table in a public S3 bucket in us-west-2 region:
{program_name} --region us-west-2 --public s3://my/public/table {trailing_args}");
let mut matches = <Self as CommandFactory>::command()
.after_help(examples)
.get_matches();
let res = <Self as FromArgMatches>::from_arg_matches_mut(&mut matches)
.map_err(|e| e.format(&mut Self::command()));
match res {
Ok(s) => s,
Err(e) => e.exit(),
}
}
}

/// Get an engine configured to read table at `url` and `LocationArgs`
pub fn get_engine(
url: &Url,
args: &LocationArgs,
) -> DeltaResult<DefaultEngine<TokioBackgroundExecutor>> {
let mut options = if let Some(ref region) = args.region {
HashMap::from([("region", region.clone())])
if args.env_creds {
let (scheme, _path) = ObjectStoreScheme::parse(url).map_err(|e| {
delta_kernel::Error::Generic(format!("Object store could not parse url: {}", e))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
delta_kernel::Error::Generic(format!("Object store could not parse url: {}", e))
delta_kernel::Error::generic(format!("Object store could not parse url: {e}"))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not, because it's already a String because I'm using format!. So we'll being doing an unneeded to_string() if we use generic.

})?;
use ObjectStoreScheme::*;
let url_str = url.to_string();
let store: Arc<DynObjectStore> = match scheme {
AmazonS3 => Arc::new(AmazonS3Builder::from_env().with_url(url_str).build()?),
GoogleCloudStorage => Arc::new(
GoogleCloudStorageBuilder::from_env()
.with_url(url_str)
.build()?,
),
MicrosoftAzure => Arc::new(
MicrosoftAzureBuilder::from_env()
.with_url(url_str)
.build()?,
),
Local | Memory | Http => {
return Err(delta_kernel::Error::Generic(format!(
"Scheme {scheme:?} doesn't support getting credentials from environment"
)));
}
_ => {
// scheme is non-exhaustive
return Err(delta_kernel::Error::Generic(format!(
"Unknown schema {scheme:?} doesn't support getting credentials from environment"
)));
}
};
Ok(DefaultEngine::new(
store,
Arc::new(TokioBackgroundExecutor::new()),
))
} else if !args.option.is_empty() {
let opts = args.option.iter().map(|option| {
let parts: Vec<&str> = option.split("=").collect();
(parts[0].to_ascii_lowercase(), parts[1])
});
DefaultEngine::try_new(url, opts, Arc::new(TokioBackgroundExecutor::new()))
} else {
HashMap::new()
};
if args.public {
options.insert("skip_signature", "true".to_string());
let mut options = if let Some(ref region) = args.region {
HashMap::from([("region", region.clone())])
} else {
HashMap::new()
};
if args.public {
options.insert("skip_signature", "true".to_string());
}
DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new()))
}
DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new()))
}

/// Construct a scan at the latest snapshot. This is over the specified table and using the passed
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use common::LocationArgs;
use common::{LocationArgs, ParseWithExamples};
use delta_kernel::actions::visitors::{
visit_metadata_at, visit_protocol_at, AddVisitor, CdcVisitor, RemoveVisitor,
SetTransactionVisitor,
Expand Down Expand Up @@ -178,7 +178,7 @@ fn print_scan_file(
}

fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Inspect", "inspect", "<COMMAND>");

let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
let engine = common::get_engine(&url, &cli.location_args)?;
Expand Down
9 changes: 7 additions & 2 deletions kernel/examples/read-table-changes/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use clap::Parser;
use common::LocationArgs;
use common::{LocationArgs, ParseWithExamples};
use delta_kernel::arrow::array::RecordBatch;
use delta_kernel::arrow::{compute::filter_record_batch, util::pretty::print_batches};
use delta_kernel::engine::arrow_data::ArrowEngineData;
Expand All @@ -25,7 +25,12 @@ struct Cli {
}

fn main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(
env!("CARGO_PKG_NAME"),
"Read changes in",
"read changes in",
"",
);
let url = delta_kernel::try_parse_uri(cli.location_args.path.as_str())?;
let engine = common::get_engine(&url, &cli.location_args)?;
let table_changes = TableChanges::try_new(url, &engine, cli.start_version, cli.end_version)?;
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::thread;
use arrow::compute::filter_record_batch;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use common::{LocationArgs, ScanArgs};
use common::{LocationArgs, ParseWithExamples, ScanArgs};
use delta_kernel::actions::deletion_vector::split_vector;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats};
Expand Down Expand Up @@ -94,7 +94,7 @@ struct ScanState {
}

fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Read", "read", "");

let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Reading {url}");
Expand Down
7 changes: 3 additions & 4 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use std::sync::Arc;
use arrow::compute::filter_record_batch;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use common::{LocationArgs, ScanArgs};
use common::{LocationArgs, ParseWithExamples, ScanArgs};
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::{DeltaResult, Snapshot};

use clap::Parser;
use itertools::Itertools;

/// An example program that dumps out the data of a delta table. Struct and Map types are not
/// supported.
/// An example program that dumps out the data of a delta table.
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(propagate_version = true)]
Expand All @@ -39,7 +38,7 @@ fn main() -> ExitCode {
}

fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Read", "read", "");
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Reading {url}");
let engine = common::get_engine(&url, &cli.location_args)?;
Expand Down
1 change: 1 addition & 0 deletions kernel/examples/write-table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ publish = false
[dependencies]
arrow = { version = "56", features = ["prettyprint", "chrono-tz"] }
clap = { version = "4.5", features = ["derive"] }
common = { path = "../common" }
delta_kernel = { path = "../../../kernel", features = [
"arrow-56",
"default-engine-rustls",
Expand Down
19 changes: 11 additions & 8 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use arrow::array::{BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray};
use arrow::util::pretty::print_batches;
use clap::Parser;
use common::{LocationArgs, ParseWithExamples};
use itertools::Itertools;
use serde_json::{json, to_vec};
use url::Url;
Expand All @@ -26,9 +27,8 @@ use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};
#[command(author, version, about, long_about = None)]
#[command(propagate_version = true)]
struct Cli {
/// Path to the table
#[arg(long, short = 'p')]
path: String,
#[command(flatten)]
location_args: LocationArgs,

/// Comma-separated schema specification of the form `field_name:data_type`
#[arg(
Expand Down Expand Up @@ -59,16 +59,19 @@ async fn main() -> ExitCode {

// TODO: Update the example once official write APIs are introduced (issue#1123)
async fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Write", "write", "");

// Check if path is a directory and if not, create it
if !Path::new(&cli.path).exists() {
create_dir_all(&cli.path).map_err(|e| {
Error::generic(format!("Failed to create directory {}: {e}", &cli.path))
if !Path::new(&cli.location_args.path).exists() {
create_dir_all(&cli.location_args.path).map_err(|e| {
Error::generic(format!(
"Failed to create directory {}: {e}",
&cli.location_args.path
))
})?;
}

let url = delta_kernel::try_parse_uri(&cli.path)?;
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Using Delta table at: {url}");

// Get the engine for local filesystem
Expand Down
Loading