@@ -75,7 +75,7 @@ void PhysicalInsert::GetInsertInfo(const BoundCreateTableInfo &info, vector<Logi
7575
7676InsertGlobalState::InsertGlobalState (ClientContext &context, const vector<LogicalType> &return_types,
7777 DuckTableEntry &table)
78- : table(table), insert_count(0 ), initialized( false ), return_collection(context, return_types) {
78+ : table(table), insert_count(0 ), return_collection(context, return_types) {
7979}
8080
8181InsertLocalState::InsertLocalState (ClientContext &context, const vector<LogicalType> &types,
@@ -275,11 +275,6 @@ static idx_t PerformOnConflictAction(InsertLocalState &lstate, InsertGlobalState
275275 return update_chunk.size ();
276276 }
277277 auto &local_storage = LocalStorage::Get (context.client , data_table.db );
278- if (gstate.initialized ) {
279- // Flush any local appends that could be referenced by the UPDATE.
280- data_table.FinalizeLocalAppend (gstate.append_state );
281- gstate.initialized = false ;
282- }
283278 local_storage.Update (data_table, row_ids, set_columns, update_chunk);
284279 return update_chunk.size ();
285280 }
@@ -289,11 +284,6 @@ static idx_t PerformOnConflictAction(InsertLocalState &lstate, InsertGlobalState
289284 data_table.Delete (delete_state, context.client , row_ids, update_chunk.size ());
290285 } else {
291286 auto &local_storage = LocalStorage::Get (context.client , data_table.db );
292- if (gstate.initialized ) {
293- // Flush any local appends that could be referenced by the DELETE.
294- data_table.FinalizeLocalAppend (gstate.append_state );
295- gstate.initialized = false ;
296- }
297287 local_storage.Delete (data_table, row_ids, update_chunk.size ());
298288 }
299289
@@ -427,7 +417,7 @@ static void VerifyOnConflictCondition(ExecutionContext &context, DataChunk &comb
427417 throw InternalException (" VerifyAppendConstraints was expected to throw but didn't" );
428418 }
429419
430- auto &indexes = local_storage.GetIndexes (data_table);
420+ auto &indexes = local_storage.GetIndexes (context. client , data_table);
431421 auto storage = local_storage.GetStorage (data_table);
432422 DataTable::VerifyUniqueIndexes (indexes, storage, tuples, nullptr );
433423 throw InternalException (" VerifyUniqueIndexes was expected to throw but didn't" );
@@ -451,7 +441,7 @@ static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &c
451441 auto &constraint_state = lstate.GetConstraintState (data_table, table);
452442 data_table.VerifyAppendConstraints (constraint_state, context.client , tuples, storage, &conflict_manager);
453443 } else {
454- auto &indexes = local_storage.GetIndexes (data_table);
444+ auto &indexes = local_storage.GetIndexes (context. client , data_table);
455445 DataTable::VerifyUniqueIndexes (indexes, storage, tuples, &conflict_manager);
456446 }
457447
@@ -529,7 +519,7 @@ idx_t PhysicalInsert::OnConflictHandling(TableCatalogEntry &table, ExecutionCont
529519 ConflictInfo conflict_info (conflict_target);
530520
531521 auto &global_indexes = data_table.GetDataTableInfo ()->GetIndexes ();
532- auto &local_indexes = local_storage.GetIndexes (data_table);
522+ auto &local_indexes = local_storage.GetIndexes (context. client , data_table);
533523
534524 unordered_set<BoundIndex *> matched_indexes;
535525 if (conflict_info.column_ids .empty ()) {
@@ -623,19 +613,14 @@ SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &insert
623613 insert_chunk.Flatten ();
624614
625615 if (!parallel) {
626- if (!gstate.initialized ) {
627- storage.InitializeLocalAppend (gstate.append_state , table, context.client , bound_constraints);
628- gstate.initialized = true ;
629- }
630-
631616 idx_t updated_tuples = OnConflictHandling (table, context, gstate, lstate, insert_chunk);
632617
633618 gstate.insert_count += insert_chunk.size ();
634619 gstate.insert_count += updated_tuples;
635620 if (return_chunk) {
636621 gstate.return_collection .Append (insert_chunk);
637622 }
638- storage.LocalAppend (gstate. append_state , context.client , insert_chunk, true );
623+ storage.LocalAppend (table , context.client , insert_chunk, bound_constraints );
639624 if (action_type == OnConflictAction::UPDATE && lstate.update_chunk .size () != 0 ) {
640625 (void )HandleInsertConflicts<true >(table, context, lstate, gstate, lstate.update_chunk , *this );
641626 (void )HandleInsertConflicts<false >(table, context, lstate, gstate, lstate.update_chunk , *this );
@@ -701,13 +686,14 @@ SinkCombineResultType PhysicalInsert::Combine(ExecutionContext &context, Operato
701686 gstate.insert_count += append_count;
702687 if (append_count < row_group_size) {
703688 // we have few rows - append to the local storage directly
704- storage.InitializeLocalAppend (gstate.append_state , table, context.client , bound_constraints);
689+ LocalAppendState append_state;
690+ storage.InitializeLocalAppend (append_state, table, context.client , bound_constraints);
705691 auto &transaction = DuckTransaction::Get (context.client , table.catalog );
706692 collection.Scan (transaction, [&](DataChunk &insert_chunk) {
707- storage.LocalAppend (gstate. append_state , context.client , insert_chunk, false );
693+ storage.LocalAppend (append_state, context.client , insert_chunk, false );
708694 return true ;
709695 });
710- storage.FinalizeLocalAppend (gstate. append_state );
696+ storage.FinalizeLocalAppend (append_state);
711697 } else {
712698 // we have written rows to disk optimistically - merge directly into the transaction-local storage
713699 lstate.optimistic_writer ->WriteLastRowGroup (collection);
@@ -722,12 +708,6 @@ SinkCombineResultType PhysicalInsert::Combine(ExecutionContext &context, Operato
722708
723709SinkFinalizeType PhysicalInsert::Finalize (Pipeline &pipeline, Event &event, ClientContext &context,
724710 OperatorSinkFinalizeInput &input) const {
725- auto &gstate = input.global_state .Cast <InsertGlobalState>();
726- if (!parallel && gstate.initialized ) {
727- auto &table = gstate.table ;
728- auto &storage = table.GetStorage ();
729- storage.FinalizeLocalAppend (gstate.append_state );
730- }
731711 return SinkFinalizeType::READY;
732712}
733713
0 commit comments