@@ -608,6 +608,11 @@ ServerResources::remove_view_on_delete_sub(
608
608
}
609
609
}
610
610
611
+ void
612
+ ServerResources::drop_view_on_delete_sub (const t_id& view_id) {
613
+ m_view_on_delete_subs.erase (view_id);
614
+ }
615
+
611
616
void
612
617
ServerResources::create_view_on_update_sub (
613
618
const t_id& view_id, Subscription sub
@@ -636,6 +641,53 @@ ServerResources::drop_view_on_update_sub(const t_id& view_id) {
636
641
m_view_on_update_subs.erase (view_id);
637
642
}
638
643
644
+ void
645
+ ServerResources::remove_view_on_update_sub (
646
+ const t_id& view_id, std::uint32_t sub_id, std::uint32_t client_id
647
+ ) {
648
+ if (m_view_on_update_subs.find (view_id) != m_view_on_update_subs.end ()) {
649
+ auto & subs = m_view_on_update_subs[view_id];
650
+ subs.erase (
651
+ std::remove_if (
652
+ subs.begin (),
653
+ subs.end (),
654
+ [sub_id, client_id](const Subscription& sub) {
655
+ return sub.id == sub_id && sub.client_id == client_id;
656
+ }
657
+ ),
658
+ subs.end ()
659
+ );
660
+ }
661
+ }
662
+
663
+ void
664
+ ServerResources::create_on_hosted_tables_update_sub (Subscription sub) {
665
+ PSP_WRITE_LOCK (m_write_lock);
666
+ m_on_hosted_tables_update_subs.push_back (sub);
667
+ }
668
+
669
+ std::vector<Subscription>
670
+ ServerResources::get_on_hosted_tables_update_sub () {
671
+ PSP_READ_LOCK (m_write_lock);
672
+ return m_on_hosted_tables_update_subs;
673
+ }
674
+
675
+ void
676
+ ServerResources::remove_on_hosted_tables_update_sub (
677
+ std::uint32_t sub_id, std::uint32_t client_id
678
+ ) {
679
+ m_on_hosted_tables_update_subs.erase (
680
+ std::remove_if (
681
+ m_on_hosted_tables_update_subs.begin (),
682
+ m_on_hosted_tables_update_subs.end (),
683
+ [sub_id, client_id](const Subscription& sub) {
684
+ return sub.id == sub_id && sub.client_id == client_id;
685
+ }
686
+ ),
687
+ m_on_hosted_tables_update_subs.end ()
688
+ );
689
+ }
690
+
639
691
std::vector<std::pair<std::shared_ptr<Table>, const ServerResources::t_id>>
640
692
ServerResources::get_dirty_tables () {
641
693
PSP_READ_LOCK (m_write_lock);
@@ -662,6 +714,18 @@ ServerResources::drop_client(const std::uint32_t client_id) {
662
714
delete_view (client_id, view_id);
663
715
}
664
716
}
717
+
718
+ std::vector<Subscription> subs;
719
+ std::remove_copy_if (
720
+ m_on_hosted_tables_update_subs.begin (),
721
+ m_on_hosted_tables_update_subs.end (),
722
+ std::back_inserter (subs),
723
+ [&client_id](const Subscription& item) {
724
+ return item.client_id == client_id;
725
+ }
726
+ );
727
+
728
+ m_on_hosted_tables_update_subs = subs;
665
729
}
666
730
667
731
std::uint32_t
@@ -876,6 +940,7 @@ needs_poll(const proto::Request::ClientReqCase proto_case) {
876
940
case ReqCase::kTableUpdateReq :
877
941
case ReqCase::kTableRemoveDeleteReq :
878
942
case ReqCase::kGetHostedTablesReq :
943
+ case ReqCase::kRemoveHostedTablesUpdateReq :
879
944
case ReqCase::kTableReplaceReq :
880
945
case ReqCase::kTableDeleteReq :
881
946
case ReqCase::kViewGetConfigReq :
@@ -932,6 +997,7 @@ entity_type_is_table(const proto::Request::ClientReqCase proto_case) {
932
997
case ReqCase::kViewDeleteReq :
933
998
case ReqCase::kViewExpressionSchemaReq :
934
999
case ReqCase::kViewRemoveOnUpdateReq :
1000
+ case ReqCase::kRemoveHostedTablesUpdateReq :
935
1001
return false ;
936
1002
case proto::Request::CLIENT_REQ_NOT_SET:
937
1003
throw std::runtime_error (" Unhandled request type 2" );
@@ -1228,24 +1294,41 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) {
1228
1294
break ;
1229
1295
}
1230
1296
case proto::Request::kGetHostedTablesReq : {
1231
- proto::Response resp;
1232
- const auto & tables = resp.mutable_get_hosted_tables_resp ();
1233
- const auto & infos = tables->mutable_table_infos ();
1234
- for (const auto & name : m_resources.get_table_ids ()) {
1235
- const auto & v = infos->Add ();
1297
+ const auto & r = req.get_hosted_tables_req ();
1298
+ if (!r.subscribe ()) {
1299
+ proto::Response resp;
1300
+ const auto & tables = resp.mutable_get_hosted_tables_resp ();
1301
+ const auto & infos = tables->mutable_table_infos ();
1302
+ for (const auto & name : m_resources.get_table_ids ()) {
1303
+ const auto & v = infos->Add ();
1236
1304
1237
- v->set_entity_id (name);
1238
- const auto tbl = m_resources.get_table (name);
1305
+ v->set_entity_id (name);
1306
+ const auto tbl = m_resources.get_table (name);
1239
1307
1240
- if (!tbl->get_index ().empty ()) {
1241
- v->set_index (tbl->get_index ());
1242
- }
1308
+ if (!tbl->get_index ().empty ()) {
1309
+ v->set_index (tbl->get_index ());
1310
+ }
1243
1311
1244
- if (tbl->get_limit () != std::numeric_limits<int >::max ()) {
1245
- v->set_limit (tbl->get_limit ());
1312
+ if (tbl->get_limit () != std::numeric_limits<int >::max ()) {
1313
+ v->set_limit (tbl->get_limit ());
1314
+ }
1246
1315
}
1316
+
1317
+ push_resp (std::move (resp));
1318
+ } else {
1319
+ Subscription sub_info;
1320
+ sub_info.id = req.msg_id ();
1321
+ sub_info.client_id = client_id;
1322
+ m_resources.create_on_hosted_tables_update_sub (sub_info);
1247
1323
}
1248
1324
1325
+ break ;
1326
+ }
1327
+ case proto::Request::kRemoveHostedTablesUpdateReq : {
1328
+ auto sub_id = req.remove_hosted_tables_update_req ().id ();
1329
+ m_resources.remove_on_hosted_tables_update_sub (sub_id, client_id);
1330
+ proto::Response resp;
1331
+ resp.mutable_remove_hosted_tables_update_resp ();
1249
1332
push_resp (std::move (resp));
1250
1333
break ;
1251
1334
}
@@ -1348,6 +1431,18 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) {
1348
1431
proto::Response resp;
1349
1432
resp.mutable_make_table_resp ();
1350
1433
push_resp (std::move (resp));
1434
+
1435
+ // Notify `on_thsoted_tables_update` listeners
1436
+ auto subscriptions = m_resources.get_on_hosted_tables_update_sub ();
1437
+ for (auto & subscription : subscriptions) {
1438
+ Response out;
1439
+ out.set_msg_id (subscription.id );
1440
+ ProtoServerResp<ProtoServer::Response> resp2;
1441
+ resp2.data = std::move (out);
1442
+ resp2.client_id = subscription.client_id ;
1443
+ proto_resp.emplace_back (std::move (resp2));
1444
+ }
1445
+
1351
1446
break ;
1352
1447
}
1353
1448
case proto::Request::kTableSizeReq : {
@@ -2272,6 +2367,18 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) {
2272
2367
proto::Response resp;
2273
2368
resp.mutable_table_delete_resp ();
2274
2369
push_resp (std::move (resp));
2370
+
2371
+ // notify `on_hosted_tables_update` listeners
2372
+ auto subscriptions = m_resources.get_on_hosted_tables_update_sub ();
2373
+ for (auto & subscription : subscriptions) {
2374
+ Response out;
2375
+ out.set_msg_id (subscription.id );
2376
+ ProtoServerResp<ProtoServer::Response> resp2;
2377
+ resp2.data = std::move (out);
2378
+ resp2.client_id = subscription.client_id ;
2379
+ proto_resp.emplace_back (std::move (resp2));
2380
+ }
2381
+
2275
2382
break ;
2276
2383
}
2277
2384
case proto::Request::kViewDeleteReq : {
@@ -2509,30 +2616,6 @@ ProtoServer::_process_table(
2509
2616
m_resources.mark_table_clean (table_id);
2510
2617
}
2511
2618
2512
- void
2513
- ServerResources::remove_view_on_update_sub (
2514
- const t_id& view_id, std::uint32_t sub_id, std::uint32_t client_id
2515
- ) {
2516
- if (m_view_on_update_subs.find (view_id) != m_view_on_update_subs.end ()) {
2517
- auto & subs = m_view_on_update_subs[view_id];
2518
- subs.erase (
2519
- std::remove_if (
2520
- subs.begin (),
2521
- subs.end (),
2522
- [sub_id, client_id](const Subscription& sub) {
2523
- return sub.id == sub_id && sub.client_id == client_id;
2524
- }
2525
- ),
2526
- subs.end ()
2527
- );
2528
- }
2529
- }
2530
-
2531
- void
2532
- ServerResources::drop_view_on_delete_sub (const t_id& view_id) {
2533
- m_view_on_delete_subs.erase (view_id);
2534
- }
2535
-
2536
2619
} // namespace perspective::server
2537
2620
2538
2621
const char *
0 commit comments