@@ -280,15 +280,12 @@ inline void table_data::reset_insert_rows() noexcept
280280
281281inline void table_data::add_insert_slots (int32_t nslots, TupleTableSlot** slots)
282282{
283- streamers_.reset ();
284283 for (auto k = 0 ; k < nslots; ++k) {
285284 auto slot = slots[k];
286285 slot_getallattrs (slot);
287286 }
288- icm::string_map<nd::array> deeplake_rows;
289287 for (int32_t i = 0 ; i < num_columns (); ++i) {
290- std::vector<nd::array> column_values;
291- column_values.reserve (nslots);
288+ auto & column_values = insert_rows_[get_atttypename (i)];
292289 const auto dt = get_column_view (i)->dtype ();
293290 for (auto k = 0 ; k < nslots; ++k) {
294291 auto slot = slots[k];
@@ -300,24 +297,11 @@ inline void table_data::add_insert_slots(int32_t nslots, TupleTableSlot** slots)
300297 }
301298 column_values.push_back (std::move (val));
302299 }
303- std::string column_name = get_atttypename (i);
304- if (nslots == 1 ) {
305- deeplake_rows[std::move (column_name)] = std::move (column_values.front ());
306- } else {
307- deeplake_rows[std::move (column_name)] = nd::dynamic (std::move (column_values));
308- }
309300 }
310301 num_total_rows_ += nslots;
311- insert_promises_.push_back (impl::append_rows (get_dataset (), std::move (deeplake_rows), nslots));
312- constexpr size_t max_pending_insert_promises = 1024 ;
313- if (insert_promises_.size () >= max_pending_insert_promises) {
314- auto p = std::move (insert_promises_.front ());
315- insert_promises_.pop_front ();
316- if (p.is_ready ()) {
317- std::move (p).get ();
318- } else {
319- p.get_future ().get ();
320- }
302+ const auto num_inserts = insert_rows_.begin ()->second .size ();
303+ if (num_inserts >= 512 ) {
304+ flush_inserts ();
321305 }
322306}
323307
@@ -411,22 +395,35 @@ inline bool table_data::can_stream_column(int32_t column_number) const noexcept
411395 return column_width > 0 && column_width < pg::max_streamable_column_width;
412396}
413397
414- inline bool table_data::flush_inserts ()
398+ inline bool table_data::flush_inserts (bool full_flush )
415399{
416- if (insert_promises_.empty ()) {
417- return true ;
400+ if (!insert_rows_.empty ()) {
401+ icm::string_map<nd::array> deeplake_rows;
402+ const auto num_inserts = insert_rows_.begin ()->second .size ();
403+ if (num_inserts == 1 ) {
404+ for (auto & [column_name, values] : insert_rows_) {
405+ deeplake_rows[column_name] = std::move (values.front ());
406+ }
407+ } else {
408+ for (auto & [column_name, values] : insert_rows_) {
409+ deeplake_rows[column_name] = nd::dynamic (std::move (values));
410+ }
411+ }
412+ insert_rows_.clear ();
413+ streamers_.reset ();
414+ insert_promises_.push_back (impl::append_rows (get_dataset (), std::move (deeplake_rows), num_inserts));
418415 }
419- // Flush the insert tuples to the dataset
420416 try {
421- for (auto & p : insert_promises_) {
417+ constexpr size_t max_pending_insert_promises = 1024 ;
418+ while (!insert_promises_.empty () && (full_flush || insert_promises_.size () >= max_pending_insert_promises)) {
419+ auto p = std::move (insert_promises_.front ());
420+ insert_promises_.pop_front ();
422421 if (p.is_ready ()) {
423422 std::move (p).get ();
424423 } else {
425424 p.get_future ().get ();
426425 }
427426 }
428- insert_promises_.clear ();
429- num_total_rows_ = dataset_->num_rows ();
430427 } catch (const base::exception& e) {
431428 elog (WARNING, " Failed to flush inserts: %s" , e.what ());
432429 reset_insert_rows ();
@@ -642,7 +639,7 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number,
642639
643640inline bool table_data::flush ()
644641{
645- const bool s1 = flush_inserts ();
642+ const bool s1 = flush_inserts (true );
646643 const bool s2 = flush_deletes ();
647644 const bool s3 = flush_updates ();
648645 return s1 && s2 && s3;
0 commit comments