Skip to content

Commit 4279488

Browse files
committed
ADBC only sink
1 parent dafe79e commit 4279488

12 files changed

Lines changed: 411 additions & 247 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/etl/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@ name = "etl"
1717
path = "src/main.rs"
1818

1919
[dependencies]
20+
adbc_client = { path = "../adbc_client" }
2021
anyhow.workspace = true
22+
arrow.workspace = true
23+
async-trait.workspace = true
24+
chrono.workspace = true
2125
clap = { workspace = true, features = ["derive"] }
2226
data-generation = { path = "../data-generation" }
27+
serde_json.workspace = true
2328
system-adapter-protocol = { path = "../system-adapter-protocol" }
2429
tokio.workspace = true
2530
tokio-util.workspace = true

crates/etl/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# ETL Pipeline
22

33
```bash
4-
cargo run -p etl -- --bucket peasee-indexes --region us-west-2 --source-prefix raw --target-prefix rehydrated --dataset tpch --scale-factor 1.0 --num-steps 10
4+
cargo run -p etl -- --bucket peasee-indexes --region us-west-2 --source-prefix raw --dataset tpch --scale-factor 1.0 --num-steps 10 --adbc-driver databricks --adbc-uri "databricks://token:...@dbc-xxxx.cloud.databricks.com?http_path=..."
55
```

crates/etl/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ use std::collections::{BTreeMap, HashSet};
2727
use std::sync::Arc as StdArc;
2828
use std::sync::atomic::{AtomicU64, Ordering};
2929
use std::time::Instant;
30-
use system_adapter_protocol::{DatasetConfig as ProtocolDatasetConfig, EtlType};
30+
use system_adapter_protocol::DatasetConfig as ProtocolDatasetConfig;
3131
use tokio::sync::watch;
3232
use tokio::task::{JoinHandle, JoinSet};
3333
use tokio_util::sync::CancellationToken;
3434
use tracing::{debug, error, info, warn};
3535

36+
pub mod sink;
37+
3638
type DynSource = Arc<dyn Source>;
3739
type DynTarget = Arc<dyn Target>;
3840

@@ -177,7 +179,7 @@ impl ETLPipeline {
177179
///
178180
/// Each entry maps a table name to its
179181
/// [`DatasetConfig`](system_adapter_protocol::DatasetConfig), which includes
180-
/// the rehydrated Arrow schema and the ETL type. This can be used to build a
182+
/// the rehydrated Arrow schema. This can be used to build a
181183
/// [`SetupRequest`](system_adapter_protocol::SetupRequest) for the system
182184
/// adapter.
183185
pub fn setup_request_datasets(&self) -> HashMap<String, ProtocolDatasetConfig> {
@@ -186,9 +188,7 @@ impl ETLPipeline {
186188
.into_iter()
187189
.map(|(name, table)| {
188190
let config = ProtocolDatasetConfig {
189-
etl_type: EtlType::S3,
190191
schema: table.rehydrated_schema(),
191-
params: self.target.table_params(&name),
192192
};
193193
(name, config)
194194
})

crates/etl/src/main.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ limitations under the License.
1616

1717
use std::sync::Arc;
1818

19+
use adbc_client::AdbcConnection;
1920
use clap::Parser;
2021
use data_generation::config::{DatasetConfig, TableFormat, TargetConfig};
2122
use data_generation::storage::s3::S3Storage;
23+
use etl::sink::adbc::AdbcSink;
2224
use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
25+
use serde_json::Value;
2326
use tracing_subscriber::EnvFilter;
2427

2528
#[derive(Parser)]
26-
#[command(about = "Run an ETL pipeline that reads from S3, rehydrates data, and writes back to S3")]
29+
#[command(about = "Run an ETL pipeline that reads from S3, rehydrates data, and writes directly to a SUT via ADBC")]
2730
struct Cli {
2831
/// Dataset type: "tpch" or "simple_sequence"
2932
#[arg(long, default_value = "tpch")]
@@ -45,11 +48,6 @@ struct Cli {
4548
#[arg(long, default_value = "")]
4649
source_prefix: String,
4750

48-
/// Base S3 key prefix for target (rehydrated) data.
49-
/// A random suffix is appended automatically to create a unique destination per run.
50-
#[arg(long, default_value = "")]
51-
target_base_prefix: String,
52-
5351
/// Logical table format propagated to system adapters
5452
#[arg(long, value_enum, default_value = "parquet")]
5553
table_format: TableFormat,
@@ -65,6 +63,18 @@ struct Cli {
6563
/// S3 endpoint URL (for MinIO/LocalStack)
6664
#[arg(long)]
6765
endpoint: Option<String>,
66+
67+
/// ADBC driver name (for example: databricks, flightsql)
68+
#[arg(long)]
69+
adbc_driver: String,
70+
71+
/// ADBC connection URI passed as db option `uri`
72+
#[arg(long)]
73+
adbc_uri: String,
74+
75+
/// Optional schema name to prefix destination table names
76+
#[arg(long)]
77+
adbc_schema: Option<String>,
6878
}
6979

7080
impl Cli {
@@ -97,22 +107,6 @@ impl Cli {
97107
}
98108
}
99109

100-
fn target_config(&self) -> TargetConfig {
101-
let run_suffix = uuid::Uuid::new_v4().to_string();
102-
let prefix = if self.target_base_prefix.is_empty() {
103-
run_suffix
104-
} else {
105-
format!("{}/{run_suffix}", self.target_base_prefix)
106-
};
107-
TargetConfig {
108-
bucket: self.bucket.clone(),
109-
prefix,
110-
table_format: self.table_format.clone(),
111-
executor_instance_type: self.executor_instance_type.clone(),
112-
region: self.region.clone(),
113-
endpoint: self.endpoint.clone(),
114-
}
115-
}
116110
}
117111

118112
#[tokio::main]
@@ -127,15 +121,24 @@ async fn main() -> anyhow::Result<()> {
127121
let dataset_config = cli.dataset_config();
128122

129123
let source = Arc::new(S3Storage::new(&cli.source_config())?);
130-
let target = Arc::new(S3Storage::new(&cli.target_config())?);
124+
125+
let adbc_conn = AdbcConnection::create(
126+
&cli.adbc_driver,
127+
std::collections::HashMap::from([(
128+
"uri".to_string(),
129+
Value::String(cli.adbc_uri.clone()),
130+
)]),
131+
)?;
132+
let target = Arc::new(AdbcSink::new(adbc_conn, cli.adbc_schema.clone()));
131133

132134
let mut pipeline = ETLPipeline::new(dataset_source, &dataset_config, source, target)?;
133135

134136
tracing::info!(
135137
dataset = %cli.dataset,
136138
bucket = %cli.bucket,
137139
source_prefix = %cli.source_prefix,
138-
target_base_prefix = %cli.target_base_prefix,
140+
adbc_driver = %cli.adbc_driver,
141+
adbc_schema = ?cli.adbc_schema,
139142
scale_factor = cli.scale_factor,
140143
num_steps = cli.num_steps,
141144
"Starting ETL pipeline"

0 commit comments

Comments
 (0)