@@ -31,22 +31,17 @@ namespace pg {
3131
3232namespace impl {
3333
34- inline void append_row (std::shared_ptr<deeplake_api::dataset> dataset, icm::string_map<nd::array> row)
34+ inline async::promise<void >
35+ append_rows (std::shared_ptr<deeplake_api::dataset> dataset, icm::string_map<nd::array> rows, int32_t num_rows)
3536{
36- async::run_on_main ([dataset = std::move (dataset), row = std::move (row)]() mutable {
37- return dataset->append_row (std::move (row));
38- })
39- .get_future ()
40- .get ();
41- }
37+ ASSERT (num_rows > 0 );
4238
43- inline void append_rows (std::shared_ptr<deeplake_api::dataset> dataset, icm::string_map<nd::array> rows)
44- {
45- async::run_on_main ([dataset = std::move (dataset), rows = std::move (rows)]() mutable {
46- return dataset->append_rows (std::move (rows));
47- })
48- .get_future ()
49- .get ();
39+ return async::run_on_main ([ds = std::move (dataset), rows = std::move (rows), num_rows]() mutable {
40+ if (num_rows == 1 ) {
41+ return ds->append_row (std::move (rows));
42+ }
43+ return ds->append_rows (std::move (rows));
44+ });
5045}
5146
5247inline void commit_dataset (std::shared_ptr<deeplake_api::dataset> dataset, bool show_progress)
@@ -112,6 +107,7 @@ inline void table_data::open_dataset(bool create)
112107 dataset_ = deeplake_api::open (dataset_path_, std::move (creds)).get_future ().get ();
113108 }
114109 ASSERT (dataset_ != nullptr );
110+ num_total_rows_ = dataset_->num_rows ();
115111
116112 // Enable logging if GUC parameter is set
117113 if (pg::enable_dataset_logging && dataset_ && !dataset_->is_logging_enabled ()) {
@@ -150,6 +146,7 @@ inline void table_data::refresh()
150146 })
151147 .get_future ()
152148 .get ();
149+ num_total_rows_ = dataset_->num_rows ();
153150 }
154151 } else {
155152 ASSERT (refreshing_dataset_ != nullptr );
@@ -163,6 +160,7 @@ inline void table_data::refresh()
163160 std::swap (dataset_, refreshing_dataset_);
164161 dataset_->set_indexing_mode (ds_indexing_mode);
165162 refreshing_dataset_->set_indexing_mode (deeplake::indexing_mode::off);
163+ num_total_rows_ = dataset_->num_rows ();
166164 }
167165 }
168166 // trigger new refresh
@@ -264,41 +262,47 @@ inline int64_t table_data::num_rows() const noexcept
264262 return get_read_only_dataset ()->num_rows ();
265263}
266264
267- inline int64_t table_data::num_uncommitted_rows () const noexcept
268- {
269- if (insert_rows_.empty ()) {
270- return 0 ;
271- }
272- return insert_rows_.begin ()->second .size ();
273- }
274-
275265inline int64_t table_data::num_total_rows () const noexcept
276266{
277- return num_rows () + num_uncommitted_rows () ;
267+ return num_total_rows_ ;
278268}
279269
280270inline void table_data::reset_insert_rows () noexcept
281271{
282- insert_rows_.clear ();
272+ for (auto & p : insert_promises_) {
273+ p.cancel ();
274+ }
275+ insert_promises_.clear ();
276+ if (dataset_) {
277+ num_total_rows_ = dataset_->num_rows ();
278+ }
283279}
284280
285281inline void table_data::add_insert_slots (int32_t nslots, TupleTableSlot** slots)
286282{
287283 for (auto k = 0 ; k < nslots; ++k) {
288284 auto slot = slots[k];
289285 slot_getallattrs (slot);
290- for (int32_t i = 0 ; i < num_columns (); ++i) {
286+ }
287+ for (int32_t i = 0 ; i < num_columns (); ++i) {
288+ auto & column_values = insert_rows_[get_atttypename (i)];
289+ const auto dt = get_column_view (i)->dtype ();
290+ for (auto k = 0 ; k < nslots; ++k) {
291+ auto slot = slots[k];
291292 nd::array val;
292293 if (slot->tts_isnull [i]) {
293- auto dt = get_column_view (i)->dtype ();
294294 val = (nd::dtype_is_numeric (dt) ? nd::adapt (0 ) : nd::none (dt, 0 ));
295295 } else {
296296 val = pg::utils::datum_to_nd (slot->tts_values [i], get_base_atttypid (i), get_atttypmod (i));
297297 }
298- std::string column_name = get_atttypename (i);
299- insert_rows_[std::move (column_name)].push_back (std::move (val));
298+ column_values.push_back (std::move (val));
300299 }
301300 }
301+ num_total_rows_ += nslots;
302+ const auto num_inserts = insert_rows_.begin ()->second .size ();
303+ if (num_inserts >= 512 ) {
304+ flush_inserts ();
305+ }
302306}
303307
304308inline void table_data::add_delete_row (int64_t row_id)
@@ -391,43 +395,41 @@ inline bool table_data::can_stream_column(int32_t column_number) const noexcept
391395 return column_width > 0 && column_width < pg::max_streamable_column_width;
392396}
393397
394- inline bool table_data::flush_inserts ()
398+ inline bool table_data::flush_inserts (bool full_flush )
395399{
396- const auto num_inserts = num_uncommitted_rows ();
397- if (num_inserts == 0 ) {
398- return true ;
399- }
400-
401- // Flush the insert tuples to the dataset
402- try {
403- streamers_.reset ();
404- elog (DEBUG1, " Flushing %zu insert rows to dataset '%s'" , num_inserts, table_name_.c_str ());
405- const auto start = std::chrono::high_resolution_clock::now ();
406-
407- icm::string_map<nd::array> merged;
400+ if (!insert_rows_.empty ()) {
401+ icm::string_map<nd::array> deeplake_rows;
402+ const auto num_inserts = insert_rows_.begin ()->second .size ();
408403 if (num_inserts == 1 ) {
409404 for (auto & [column_name, values] : insert_rows_) {
410- merged [column_name] = std::move (values.front ());
405+ deeplake_rows [column_name] = std::move (values.front ());
411406 }
412- impl::append_row (get_dataset (), std::move (merged));
413407 } else {
414408 for (auto & [column_name, values] : insert_rows_) {
415- merged[column_name] = nd::dynamic (std::move (values));
409+ deeplake_rows[column_name] = nd::dynamic (std::move (values));
410+ }
411+ }
412+ insert_rows_.clear ();
413+ streamers_.reset ();
414+ insert_promises_.push_back (impl::append_rows (get_dataset (), std::move (deeplake_rows), num_inserts));
415+ }
416+ try {
417+ constexpr size_t max_pending_insert_promises = 1024 ;
418+ while (!insert_promises_.empty () && (full_flush || insert_promises_.size () >= max_pending_insert_promises)) {
419+ auto p = std::move (insert_promises_.front ());
420+ insert_promises_.pop_front ();
421+ if (p.is_ready ()) {
422+ std::move (p).get ();
423+ } else {
424+ p.get_future ().get ();
416425 }
417- impl::append_rows (get_dataset (), std::move (merged));
418426 }
419- const auto end = std::chrono::high_resolution_clock::now ();
420- elog (DEBUG1,
421- " Flushed insert rows to dataset '%s' in %.2f seconds" ,
422- table_name_.c_str (),
423- std::chrono::duration<double >(end - start).count ());
424427 } catch (const base::exception& e) {
425428 elog (WARNING, " Failed to flush inserts: %s" , e.what ());
426- insert_rows_. clear ();
429+ reset_insert_rows ();
427430 return false ;
428431 }
429432
430- insert_rows_.clear ();
431433 return true ;
432434}
433435
@@ -524,6 +526,7 @@ inline std::pair<int64_t, int64_t> table_data::get_row_range(int32_t worker_id)
524526
525527inline void table_data::create_streamer (int32_t idx, int32_t worker_id)
526528{
529+ return ;
527530 if (streamers_.streamers .empty ()) {
528531 const auto s = num_columns ();
529532 streamers_.streamers .resize (s);
@@ -636,7 +639,7 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number,
636639
637640inline bool table_data::flush ()
638641{
639- const bool s1 = flush_inserts ();
642+ const bool s1 = flush_inserts (true );
640643 const bool s2 = flush_deletes ();
641644 const bool s3 = flush_updates ();
642645 return s1 && s2 && s3;
0 commit comments