Skip to content

Commit 1066e3b

Browse files
authored
Prevent InsertBuilder from taking ownership of record batches (#518)
1 parent 5bebce2 commit 1066e3b

5 files changed

Lines changed: 36 additions & 22 deletions

File tree

core/src/mysql.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,9 @@ impl MySQL {
382382
batch: RecordBatch,
383383
on_conflict: Option<OnConflict>,
384384
) -> Result<()> {
385+
let batches = vec![batch];
385386
let insert_table_builder =
386-
InsertBuilder::new(&TableReference::bare(self.table_name.clone()), vec![batch]);
387+
InsertBuilder::new(&TableReference::bare(self.table_name.clone()), &batches);
387388

388389
let sea_query_on_conflict =
389390
on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema));

core/src/postgres.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,8 @@ impl Postgres {
419419
batch: RecordBatch,
420420
on_conflict: Option<OnConflict>,
421421
) -> Result<()> {
422-
let insert_table_builder = InsertBuilder::new(&self.table, vec![batch]);
422+
let batches = vec![batch];
423+
let insert_table_builder = InsertBuilder::new(&self.table, &batches);
423424

424425
let sea_query_on_conflict =
425426
on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema));

core/src/sql/arrow_sql_gen/statement.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,9 @@ macro_rules! push_list_values {
190190
}};
191191
}
192192

193-
pub struct InsertBuilder {
193+
pub struct InsertBuilder<'a> {
194194
table: TableReference,
195-
record_batches: Vec<RecordBatch>,
195+
record_batches: &'a Vec<RecordBatch>,
196196
}
197197

198198
pub fn use_json_insert_for_type<T: QueryBuilder + 'static>(
@@ -218,9 +218,9 @@ pub fn use_json_insert_for_type<T: QueryBuilder + 'static>(
218218
false
219219
}
220220

221-
impl InsertBuilder {
221+
impl<'a> InsertBuilder<'a> {
222222
#[must_use]
223-
pub fn new(table: &TableReference, record_batches: Vec<RecordBatch>) -> Self {
223+
pub fn new(table: &TableReference, record_batches: &'a Vec<RecordBatch>) -> Self {
224224
Self {
225225
table: table.clone(),
226226
record_batches,
@@ -1082,7 +1082,7 @@ impl InsertBuilder {
10821082
.columns(columns)
10831083
.to_owned();
10841084

1085-
for record_batch in &self.record_batches {
1085+
for record_batch in self.record_batches {
10861086
self.construct_insert_stmt(&mut insert_stmt, record_batch, &query_builder)?;
10871087
}
10881088
if let Some(on_conflict) = on_conflict {
@@ -1500,7 +1500,7 @@ mod tests {
15001500
.expect("Unable to build record batch");
15011501
let record_batches = vec![batch1, batch2];
15021502

1503-
let sql = InsertBuilder::new(&TableReference::from("users"), record_batches)
1503+
let sql = InsertBuilder::new(&TableReference::from("users"), &record_batches)
15041504
.build_postgres(None)
15051505
.expect("Failed to build insert statement");
15061506
assert_eq!(sql, "INSERT INTO \"users\" (\"id\", \"name\", \"age\") VALUES (1, 'a', 10), (2, 'b', 20), (3, 'c', 30), (1, 'a', 10), (2, 'b', 20), (3, 'c', 30)");
@@ -1544,7 +1544,7 @@ mod tests {
15441544
.expect("Unable to build record batch");
15451545
let record_batches = vec![batch1, batch2];
15461546

1547-
let sql = InsertBuilder::new(&TableReference::from("schema.users"), record_batches)
1547+
let sql = InsertBuilder::new(&TableReference::from("schema.users"), &record_batches)
15481548
.build_postgres(None)
15491549
.expect("Failed to build insert statement");
15501550
assert_eq!(sql, "INSERT INTO \"schema\".\"users\" (\"id\", \"name\", \"age\") VALUES (1, 'a', 10), (2, 'b', 20), (3, 'c', 30), (1, 'a', 10), (2, 'b', 20), (3, 'c', 30)");
@@ -1595,7 +1595,7 @@ mod tests {
15951595
let batch = RecordBatch::try_new(Arc::new(schema1.clone()), vec![Arc::new(list_array)])
15961596
.expect("Unable to build record batch");
15971597

1598-
let sql = InsertBuilder::new(&TableReference::from("arrays"), vec![batch])
1598+
let sql = InsertBuilder::new(&TableReference::from("arrays"), &vec![batch])
15991599
.build_postgres(None)
16001600
.expect("Failed to build insert statement");
16011601
assert_eq!(

core/src/sqlite.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,17 @@ pub enum Error {
7171
},
7272

7373
#[snafu(display("Unable to create table in Sqlite: {source}"))]
74-
UnableToCreateTable { source: tokio_rusqlite::Error<rusqlite::Error> },
74+
UnableToCreateTable {
75+
source: tokio_rusqlite::Error<rusqlite::Error>,
76+
},
7577

7678
#[snafu(display("Unable to insert data into the Sqlite table: {source}"))]
7779
UnableToInsertIntoTable { source: rusqlite::Error },
7880

7981
#[snafu(display("Unable to insert data into the Sqlite table: {source}"))]
80-
UnableToInsertIntoTableAsync { source: tokio_rusqlite::Error<rusqlite::Error> },
82+
UnableToInsertIntoTableAsync {
83+
source: tokio_rusqlite::Error<rusqlite::Error>,
84+
},
8185

8286
#[snafu(display("Unable to insert data into the Sqlite table. The disk is full."))]
8387
DiskFull {},
@@ -549,7 +553,8 @@ impl Sqlite {
549553
batch: RecordBatch,
550554
on_conflict: Option<&OnConflict>,
551555
) -> rusqlite::Result<()> {
552-
let insert_table_builder = InsertBuilder::new(&self.table, vec![batch]);
556+
let batches = vec![batch];
557+
let insert_table_builder = InsertBuilder::new(&self.table, &batches);
553558

554559
let sea_query_on_conflict =
555560
on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema));

core/tests/sqlite/mod.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,10 @@ async fn arrow_sqlite_round_trip(
5252
// Create sqlite table from arrow records and insert arrow records
5353
let schema = Arc::clone(&arrow_record.schema());
5454
let create_table_stmts = CreateTableBuilder::new(schema, table_name).build_sqlite();
55-
let insert_table_stmt = InsertBuilder::new(
56-
&TableReference::from(table_name),
57-
vec![arrow_record.clone()],
58-
)
59-
.build_sqlite(None)
60-
.expect("SQLite insert statement should be constructed");
55+
let batches = vec![arrow_record.clone()];
56+
let insert_table_stmt = InsertBuilder::new(&TableReference::from(table_name), &batches)
57+
.build_sqlite(None)
58+
.expect("SQLite insert statement should be constructed");
6159

6260
// Test arrow -> Sqlite row coverage
6361
let _ = conn
@@ -211,9 +209,18 @@ fn create_comprehensive_test_data() -> (RecordBatch, SchemaRef) {
211209
None,
212210
Some(500000u64),
213211
]);
214-
let col_float32 = Float32Array::from(vec![Some(1.5), None, Some(-std::f32::consts::PI), Some(2.71)]);
215-
let col_float64 =
216-
Float64Array::from(vec![None, Some(std::f64::consts::E), Some(-1.414), Some(std::f64::consts::PI)]);
212+
let col_float32 = Float32Array::from(vec![
213+
Some(1.5),
214+
None,
215+
Some(-std::f32::consts::PI),
216+
Some(2.71),
217+
]);
218+
let col_float64 = Float64Array::from(vec![
219+
None,
220+
Some(std::f64::consts::E),
221+
Some(-1.414),
222+
Some(std::f64::consts::PI),
223+
]);
217224
let col_utf8 = StringArray::from(vec![Some("hello"), Some("world"), None, Some("test")]);
218225
let col_large_utf8 = LargeStringArray::from(vec![
219226
None,

0 commit comments

Comments
 (0)