Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions crates/oxbow/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ use deltalake::arrow::array::RecordBatch;
use deltalake::arrow::datatypes::Schema as ArrowSchema;
use deltalake::arrow::error::ArrowError;
use deltalake::arrow::json::reader::ReaderBuilder;
use deltalake::kernel::Action;
use deltalake::writer::{DeltaWriter, record_batch::RecordBatchWriter};
use deltalake::{DeltaResult, DeltaTable};

use std::io::Cursor;
use std::sync::Arc;
use tracing::log::*;

use super::commit_to_table;

///
/// Append an iterator which yields [RecordBatch] onto the given [DeltaTable]
///
Expand All @@ -31,7 +34,7 @@ pub async fn append_batches(
}

if written {
let version = writer.flush_and_commit(&mut table).await?;
let version = flush_and_commit(&mut writer, &mut table).await?;
info!("Successfully flushed v{version} via append_batches to Delta table");
} else {
error!("Failed to write any data files! Cowardly avoiding a Delta commit");
Expand Down Expand Up @@ -75,7 +78,7 @@ pub async fn append_values(
}

if written {
let version = writer.flush_and_commit(&mut table).await?;
let version = flush_and_commit(&mut writer, &mut table).await?;
info!("Successfully flushed v{version} to Delta table");
} else {
warn!("Failed to write any data files! Cowardly avoiding a Delta commit");
Expand Down Expand Up @@ -124,6 +127,16 @@ pub fn augment_with_ds(batch: RecordBatch) -> DeltaResult<RecordBatch> {
Ok(RecordBatch::try_new(batch.schema(), columns)?)
}

async fn flush_and_commit(
writer: &mut RecordBatchWriter,
table: &mut DeltaTable,
) -> DeltaResult<i64> {
let actions: Vec<_> = writer.flush().await?.drain(..).map(Action::Add).collect();
let version = commit_to_table(&actions, table).await?;
table.update().await?;
Ok(version)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading