Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/examples/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ delta_kernel = { path = "../../../kernel", features = [
"default-engine-rustls",
"internal-api",
] }
object_store = { version = "0.12.3", features = ["aws", "azure", "gcp", "http"] }
url = "2"
130 changes: 121 additions & 9 deletions kernel/examples/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! utilities
use std::{collections::HashMap, sync::Arc};

use clap::Args;
use clap::{Args, CommandFactory, FromArgMatches};
use delta_kernel::{
arrow::array::RecordBatch,
engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine},
Expand All @@ -11,6 +11,10 @@ use delta_kernel::{
DeltaResult, SnapshotRef,
};

use object_store::{
aws::AmazonS3Builder, azure::MicrosoftAzureBuilder, gcp::GoogleCloudStorageBuilder,
DynObjectStore, ObjectStoreScheme,
};
use url::Url;

#[derive(Args)]
Expand All @@ -19,9 +23,27 @@ pub struct LocationArgs {
pub path: String,

/// Region to specify to the cloud access store (only applies to S3)
#[arg(long)]
#[arg(long, conflicts_with = "env_creds", conflicts_with = "option")]
pub region: Option<String>,

/// Extra key-value pairs to pass to the ObjectStore builder. Note different object stores
/// accept different configuration options, see the object_store types: AmazonS3Builder,
/// MicrosoftAzureBuilder, and GoogleCloudStorageBuilder. Specify as "key=value", and pass
/// multiple times to set more than one option.
#[arg(long, conflicts_with = "env_creds", conflicts_with = "region")]
pub option: Vec<String>,

/// Get credentials from the environment. For details see the object_store types:
/// AmazonS3Builder, MicrosoftAzureBuilder, and GoogleCloudStorageBuilder. Specifically the
/// `from_env` method.
#[arg(
long,
conflicts_with = "region",
conflicts_with = "option",
default_value = "false"
)]
pub env_creds: bool,

/// Specify that the table is "public" (i.e. no cloud credentials are needed). This is required
/// for things like s3 public buckets, otherwise the kernel will try and authenticate by talking
/// to the aws metadata server, which will fail unless you're on an ec2 instance.
Expand All @@ -44,20 +66,110 @@ pub struct ScanArgs {
pub columns: Option<Vec<String>>,
}

pub trait ParseWithExamples<T> {
/// parse command line, and add examples in help
/// program_name - name of program
/// caps_action - actions the program performs (like read), capitalized for start of sentence
/// action - same as above, for middle of sentence
/// trailing_args - will be put at the end of each example, used if the command needs extra args
fn parse_with_examples(
program_name: &str,
caps_action: &str,
action: &str,
trailing_args: &str,
) -> T;
}

impl<T> ParseWithExamples<T> for T
where
T: clap::Parser,
{
fn parse_with_examples(
program_name: &str,
caps_action: &str,
action: &str,
trailing_args: &str,
) -> Self {
let examples = format!("Examples:
{caps_action} table at foo/bar/bazz, relative to where invoked:
{program_name} foo/bar/bazz {trailing_args}

Get S3 credentials, region, etc. from the environment, and {action} table on S3:
{program_name} --env_creds s3://path/to/table {trailing_args}

Specify azure credentials on the command line and {action} table in azure:
{program_name} --option AZURE_STORAGE_ACCOUNT_NAME=my_account --option AZURE_STORAGE_ACCOUNT_KEY=my_key [more --option args] az://account/container/path {trailing_args}

{caps_action} a table in a public S3 bucket in us-west-2 region:
{program_name} --region us-west-2 --public s3://my/public/table {trailing_args}");
let mut matches = <Self as CommandFactory>::command()
.after_help(examples)
.get_matches();
let res = <Self as FromArgMatches>::from_arg_matches_mut(&mut matches)
.map_err(|e| e.format(&mut Self::command()));
match res {
Ok(s) => s,
Err(e) => e.exit(),
}
}
}

/// Get an engine configured to read table at `url` and `LocationArgs`
pub fn get_engine(
url: &Url,
args: &LocationArgs,
) -> DeltaResult<DefaultEngine<TokioBackgroundExecutor>> {
let mut options = if let Some(ref region) = args.region {
HashMap::from([("region", region.clone())])
if args.env_creds {
let (scheme, _path) = ObjectStoreScheme::parse(url).map_err(|e| {
delta_kernel::Error::Generic(format!("Object store could not parse url: {}", e))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
delta_kernel::Error::Generic(format!("Object store could not parse url: {}", e))
delta_kernel::Error::generic(format!("Object store could not parse url: {e}"))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not, because it's already a String because I'm using format!. So we'll being doing an unneeded to_string() if we use generic.

})?;
use ObjectStoreScheme::*;
let url_str = url.to_string();
let store: Arc<DynObjectStore> = match scheme {
AmazonS3 => Arc::new(AmazonS3Builder::from_env().with_url(url_str).build()?),
GoogleCloudStorage => Arc::new(
GoogleCloudStorageBuilder::from_env()
.with_url(url_str)
.build()?,
),
MicrosoftAzure => Arc::new(
MicrosoftAzureBuilder::from_env()
.with_url(url_str)
.build()?,
),
Local | Memory | Http => {
return Err(delta_kernel::Error::Generic(format!(
"Scheme {scheme:?} doesn't support getting credentials from environment"
)));
}
_ => {
// scheme is non-exhaustive
return Err(delta_kernel::Error::Generic(format!(
"Unknown schema {scheme:?} doesn't support getting credentials from environment"
)));
}
};
Ok(DefaultEngine::new(
store,
Arc::new(TokioBackgroundExecutor::new()),
))
} else if !args.option.is_empty() {
let opts = args.option.iter().map(|option| {
let parts: Vec<&str> = option.split("=").collect();
(parts[0].to_ascii_lowercase(), parts[1])
});
DefaultEngine::try_new(url, opts, Arc::new(TokioBackgroundExecutor::new()))
} else {
HashMap::new()
};
if args.public {
options.insert("skip_signature", "true".to_string());
let mut options = if let Some(ref region) = args.region {
HashMap::from([("region", region.clone())])
} else {
HashMap::new()
};
if args.public {
options.insert("skip_signature", "true".to_string());
}
DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new()))
}
DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new()))
}

/// Construct a scan at the latest snapshot. This is over the specified table and using the passed
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use common::LocationArgs;
use common::{LocationArgs, ParseWithExamples};
use delta_kernel::actions::visitors::{
visit_metadata_at, visit_protocol_at, AddVisitor, CdcVisitor, RemoveVisitor,
SetTransactionVisitor,
Expand Down Expand Up @@ -178,7 +178,7 @@ fn print_scan_file(
}

fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Inspect", "inspect", "<COMMAND>");

let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
let engine = common::get_engine(&url, &cli.location_args)?;
Expand Down
9 changes: 7 additions & 2 deletions kernel/examples/read-table-changes/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use clap::Parser;
use common::LocationArgs;
use common::{LocationArgs, ParseWithExamples};
use delta_kernel::arrow::array::RecordBatch;
use delta_kernel::arrow::{compute::filter_record_batch, util::pretty::print_batches};
use delta_kernel::engine::arrow_data::ArrowEngineData;
Expand All @@ -25,7 +25,12 @@ struct Cli {
}

fn main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(
env!("CARGO_PKG_NAME"),
"Read changes in",
"read changes in",
"",
);
let url = delta_kernel::try_parse_uri(cli.location_args.path.as_str())?;
let engine = common::get_engine(&url, &cli.location_args)?;
let table_changes = TableChanges::try_new(url, &engine, cli.start_version, cli.end_version)?;
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::thread;
use arrow::compute::filter_record_batch;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use common::{LocationArgs, ScanArgs};
use common::{LocationArgs, ParseWithExamples, ScanArgs};
use delta_kernel::actions::deletion_vector::split_vector;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats};
Expand Down Expand Up @@ -94,7 +94,7 @@ struct ScanState {
}

fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Read", "read", "");

let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Reading {url}");
Expand Down
7 changes: 3 additions & 4 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use std::sync::Arc;
use arrow::compute::filter_record_batch;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use common::{LocationArgs, ScanArgs};
use common::{LocationArgs, ParseWithExamples, ScanArgs};
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::{DeltaResult, Snapshot};

use clap::Parser;
use itertools::Itertools;

/// An example program that dumps out the data of a delta table. Struct and Map types are not
/// supported.
/// An example program that dumps out the data of a delta table.
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(propagate_version = true)]
Expand All @@ -39,7 +38,7 @@ fn main() -> ExitCode {
}

fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Read", "read", "");
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Reading {url}");
let engine = common::get_engine(&url, &cli.location_args)?;
Expand Down
1 change: 1 addition & 0 deletions kernel/examples/write-table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ publish = false
[dependencies]
arrow = { version = "56", features = ["prettyprint", "chrono-tz"] }
clap = { version = "4.5", features = ["derive"] }
common = { path = "../common" }
delta_kernel = { path = "../../../kernel", features = [
"arrow-56",
"default-engine-rustls",
Expand Down
19 changes: 11 additions & 8 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use arrow::array::{BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray};
use arrow::util::pretty::print_batches;
use clap::Parser;
use common::{LocationArgs, ParseWithExamples};
use itertools::Itertools;
use serde_json::{json, to_vec};
use url::Url;
Expand All @@ -26,9 +27,8 @@ use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};
#[command(author, version, about, long_about = None)]
#[command(propagate_version = true)]
struct Cli {
/// Path to the table
#[arg(long, short = 'p')]
path: String,
#[command(flatten)]
location_args: LocationArgs,

/// Comma-separated schema specification of the form `field_name:data_type`
#[arg(
Expand Down Expand Up @@ -59,16 +59,19 @@ async fn main() -> ExitCode {

// TODO: Update the example once official write APIs are introduced (issue#1123)
async fn try_main() -> DeltaResult<()> {
let cli = Cli::parse();
let cli = Cli::parse_with_examples(env!("CARGO_PKG_NAME"), "Write", "write", "");

// Check if path is a directory and if not, create it
if !Path::new(&cli.path).exists() {
create_dir_all(&cli.path).map_err(|e| {
Error::generic(format!("Failed to create directory {}: {e}", &cli.path))
if !Path::new(&cli.location_args.path).exists() {
create_dir_all(&cli.location_args.path).map_err(|e| {
Error::generic(format!(
"Failed to create directory {}: {e}",
&cli.location_args.path
))
})?;
}

let url = delta_kernel::try_parse_uri(&cli.path)?;
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Using Delta table at: {url}");

// Get the engine for local filesystem
Expand Down
Loading