Skip to content

Commit 6f64391

Browse files
make etl hive sink faster (#130)
* make etl hive sink faster * chore: auto-fix cargo fmt + clippy * Update s3_hive.rs --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 8f86c00 commit 6f64391

3 files changed

Lines changed: 86 additions & 63 deletions

File tree

crates/etl/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,8 @@ impl ETLPipeline {
487487
prefix = config.prefix
488488
)
489489
}),
490+
time_column: Some(CREATED_AT_COLUMN.to_string()),
491+
partition_columns: self.dataset.partition_columns(&name),
490492
};
491493

492494
(name.clone(), config)

crates/etl/src/sink/s3_hive.rs

Lines changed: 80 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use object_store::path::Path as ObjectPath;
2727
use parquet::arrow::ArrowWriter;
2828
use parquet::basic::Compression;
2929
use parquet::file::properties::WriterProperties;
30+
use tokio::task::JoinSet;
3031

3132
use super::{InsertOp, Sink};
3233

@@ -93,70 +94,75 @@ impl S3HiveSink {
9394
partition_columns,
9495
})
9596
}
97+
}
9698

97-
/// Writes a single partition's batch to S3 as a Parquet file.
98-
async fn write_partition(
99-
&self,
100-
table_name: &str,
101-
batch_id: u64,
102-
partition_path: &str,
103-
batch: &RecordBatch,
104-
effective_partition_columns: &[String],
105-
partition_idx: usize,
106-
) -> anyhow::Result<()> {
107-
// Strip partition columns from written data — they are encoded in the path.
108-
let batch_without_partition = strip_columns(batch, effective_partition_columns)?;
109-
110-
if batch_without_partition.num_columns() == 0 {
111-
anyhow::bail!(
112-
"Cannot write table '{table_name}' with partition columns {:?}: no columns would remain in parquet output",
113-
effective_partition_columns
114-
);
115-
}
99+
/// Writes a single partition batch to S3 as a Parquet file.
100+
///
101+
/// Parquet encoding (CPU-bound) is offloaded to `spawn_blocking` so that the
102+
/// async executor is not blocked during compression. The resulting bytes are
103+
/// then uploaded with a single `PUT`.
104+
async fn write_partition_task(
105+
store: Arc<dyn ObjectStore>,
106+
prefix: String,
107+
table_name: String,
108+
batch_id: u64,
109+
partition_path: String,
110+
batch: RecordBatch,
111+
effective_partition_columns: Vec<String>,
112+
partition_idx: usize,
113+
) -> anyhow::Result<()> {
114+
// Strip partition columns — they are encoded in the path.
115+
let batch_without_partition = strip_columns(&batch, &effective_partition_columns)?;
116+
117+
if batch_without_partition.num_columns() == 0 {
118+
anyhow::bail!(
119+
"Cannot write table '{table_name}' with partition columns {effective_partition_columns:?}: \
120+
no columns would remain in parquet output"
121+
);
122+
}
116123

117-
let path = if self.prefix.is_empty() {
118-
if partition_path.is_empty() {
119-
ObjectPath::from(format!(
120-
"{table_name}/batch-{batch_id:06}-{partition_idx:04}.parquet"
121-
))
122-
} else {
123-
ObjectPath::from(format!(
124-
"{table_name}/{partition_path}/batch-{batch_id:06}-{partition_idx:04}.parquet"
125-
))
126-
}
124+
let path = if prefix.is_empty() {
125+
if partition_path.is_empty() {
126+
ObjectPath::from(format!(
127+
"{table_name}/batch-{batch_id:06}-{partition_idx:04}.parquet"
128+
))
127129
} else {
128-
if partition_path.is_empty() {
129-
ObjectPath::from(format!(
130-
"{}/{table_name}/batch-{batch_id:06}-{partition_idx:04}.parquet",
131-
self.prefix
132-
))
133-
} else {
134-
ObjectPath::from(format!(
135-
"{}/{table_name}/{partition_path}/batch-{batch_id:06}-{partition_idx:04}.parquet",
136-
self.prefix
137-
))
138-
}
139-
};
140-
130+
ObjectPath::from(format!(
131+
"{table_name}/{partition_path}/batch-{batch_id:06}-{partition_idx:04}.parquet"
132+
))
133+
}
134+
} else if partition_path.is_empty() {
135+
ObjectPath::from(format!(
136+
"{prefix}/{table_name}/batch-{batch_id:06}-{partition_idx:04}.parquet",
137+
))
138+
} else {
139+
ObjectPath::from(format!(
140+
"{prefix}/{table_name}/{partition_path}/batch-{batch_id:06}-{partition_idx:04}.parquet",
141+
))
142+
};
143+
144+
// Encode to Parquet + Snappy on a blocking thread so the async executor
145+
// is not stalled during CPU-intensive compression.
146+
let buf = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<u8>> {
141147
let props = WriterProperties::builder()
142148
.set_compression(Compression::SNAPPY)
143149
.build();
144-
145150
let mut buf = Vec::new();
146-
{
147-
let mut writer =
148-
ArrowWriter::try_new(&mut buf, batch_without_partition.schema(), Some(props))?;
149-
writer.write(&batch_without_partition)?;
150-
writer.close()?;
151-
}
152-
153-
self.store
154-
.put(&path, buf.into())
155-
.await
156-
.map_err(|e| anyhow::anyhow!("S3 PUT failed for {path}: {e}"))?;
157-
158-
Ok(())
159-
}
151+
let mut writer =
152+
ArrowWriter::try_new(&mut buf, batch_without_partition.schema(), Some(props))?;
153+
writer.write(&batch_without_partition)?;
154+
writer.close()?;
155+
Ok(buf)
156+
})
157+
.await
158+
.map_err(|e| anyhow::anyhow!("Parquet encoding task panicked: {e}"))??;
159+
160+
store
161+
.put(&path, buf.into())
162+
.await
163+
.map_err(|e| anyhow::anyhow!("S3 PUT failed for {path}: {e}"))?;
164+
165+
Ok(())
160166
}
161167

162168
#[async_trait]
@@ -204,16 +210,27 @@ impl Sink for S3HiveSink {
204210
// Group rows by distinct partition tuples in configured column order.
205211
let partitions = partition_batch(&batch, &partition_columns_with_idx)?;
206212

207-
for (idx, (partition_path, partition_batch)) in partitions.iter().enumerate() {
208-
self.write_partition(
209-
table_name,
213+
// Spawn all partition writes concurrently. Each task owns its data
214+
// so there is no contention, and S3 PUTs for different paths are
215+
// fully independent.
216+
let mut join_set: JoinSet<anyhow::Result<()>> = JoinSet::new();
217+
218+
for (idx, (partition_path, partition_batch)) in partitions.into_iter().enumerate() {
219+
join_set.spawn(write_partition_task(
220+
Arc::clone(&self.store),
221+
self.prefix.clone(),
222+
table_name.to_string(),
210223
batch_id,
211224
partition_path,
212225
partition_batch,
213-
&effective_partition_columns,
226+
effective_partition_columns.clone(),
214227
idx,
215-
)
216-
.await?;
228+
));
229+
}
230+
231+
// Collect results; propagate the first error encountered.
232+
while let Some(result) = join_set.join_next().await {
233+
result.map_err(|e| anyhow::anyhow!("Partition write task panicked: {e}"))??;
217234
}
218235

219236
Ok(())

crates/system-adapter-protocol/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ pub struct DatasetConfig {
135135
pub schema: SchemaRef,
136136
/// Dataset S3 location (e.g. "s3://my-bucket/path/to/data/")
137137
pub location: Option<String>,
138+
/// Optional column name to use as the ingestion time for metrics tracking
139+
pub time_column: Option<String>,
140+
/// Optional list of columns to use for partitioning the dataset in storage
141+
pub partition_columns: Vec<String>,
138142
}
139143

140144
/// Request to setup a benchmark run.

0 commit comments

Comments
 (0)