diff --git a/Cargo.lock b/Cargo.lock index 84815db15f..05eccc901e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2105,6 +2105,7 @@ dependencies = [ "tracing-subscriber", "ts-rs", "wasm-bindgen", + "wasm-bindgen-derive", "wasm-bindgen-futures", "wasm-bindgen-test", "web-sys", @@ -2198,6 +2199,7 @@ dependencies = [ "tracing-subscriber", "ts-rs", "wasm-bindgen", + "wasm-bindgen-derive", "wasm-bindgen-futures", "wasm-bindgen-test", "web-sys", @@ -3759,6 +3761,28 @@ dependencies = [ "wasm-bindgen-wasm-interpreter", ] +[[package]] +name = "wasm-bindgen-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ab8fdb87408dab27d43267c311c89135b35d13f8b3081e88451ddeff742c93a" +dependencies = [ + "js-sys", + "wasm-bindgen", + "wasm-bindgen-derive-macro", +] + +[[package]] +name = "wasm-bindgen-derive-macro" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbc080f15cb38f447d52bbae64630c2d4925a9ecb5d140d56c0910b69b4cc7" +dependencies = [ + "proc-macro2 1.0.94", + "quote 1.0.39", + "syn 1.0.109", +] + [[package]] name = "wasm-bindgen-externref-xform" version = "0.2.100" diff --git a/cpp/perspective/src/cpp/server.cpp b/cpp/perspective/src/cpp/server.cpp index eef950c80e..5bea64dcee 100644 --- a/cpp/perspective/src/cpp/server.cpp +++ b/cpp/perspective/src/cpp/server.cpp @@ -608,6 +608,11 @@ ServerResources::remove_view_on_delete_sub( } } +void +ServerResources::drop_view_on_delete_sub(const t_id& view_id) { + m_view_on_delete_subs.erase(view_id); +} + void ServerResources::create_view_on_update_sub( const t_id& view_id, Subscription sub @@ -636,6 +641,53 @@ ServerResources::drop_view_on_update_sub(const t_id& view_id) { m_view_on_update_subs.erase(view_id); } +void +ServerResources::remove_view_on_update_sub( + const t_id& view_id, std::uint32_t sub_id, std::uint32_t client_id +) { + if (m_view_on_update_subs.find(view_id) != m_view_on_update_subs.end()) { + auto& subs = m_view_on_update_subs[view_id]; + subs.erase( + std::remove_if( + subs.begin(), + subs.end(), + [sub_id, client_id](const Subscription& sub) { + return sub.id == sub_id && sub.client_id == client_id; + } + ), + subs.end() + ); + } +} + +void +ServerResources::create_on_hosted_tables_update_sub(Subscription sub) { + PSP_WRITE_LOCK(m_write_lock); + m_on_hosted_tables_update_subs.push_back(sub); +} + +std::vector +ServerResources::get_on_hosted_tables_update_sub() { + PSP_READ_LOCK(m_write_lock); + return m_on_hosted_tables_update_subs; +} + +void +ServerResources::remove_on_hosted_tables_update_sub( + std::uint32_t sub_id, std::uint32_t client_id +) { + m_on_hosted_tables_update_subs.erase( + std::remove_if( + m_on_hosted_tables_update_subs.begin(), + m_on_hosted_tables_update_subs.end(), + [sub_id, client_id](const Subscription& sub) { + return sub.id == sub_id && sub.client_id == client_id; + } + ), + m_on_hosted_tables_update_subs.end() + ); +} + std::vector, const ServerResources::t_id>> ServerResources::get_dirty_tables() { PSP_READ_LOCK(m_write_lock); @@ -662,6 +714,18 @@ ServerResources::drop_client(const std::uint32_t client_id) { delete_view(client_id, view_id); } } + + std::vector subs; + std::remove_copy_if( + m_on_hosted_tables_update_subs.begin(), + m_on_hosted_tables_update_subs.end(), + std::back_inserter(subs), + [&client_id](const Subscription& item) { + return item.client_id == client_id; + } + ); + + m_on_hosted_tables_update_subs = subs; } std::uint32_t @@ -876,6 +940,7 @@ needs_poll(const proto::Request::ClientReqCase proto_case) { case ReqCase::kTableUpdateReq: case ReqCase::kTableRemoveDeleteReq: case ReqCase::kGetHostedTablesReq: + case ReqCase::kRemoveHostedTablesUpdateReq: case ReqCase::kTableReplaceReq: case ReqCase::kTableDeleteReq: case ReqCase::kViewGetConfigReq: @@ -932,6 +997,7 @@ entity_type_is_table(const proto::Request::ClientReqCase proto_case) { case ReqCase::kViewDeleteReq: case ReqCase::kViewExpressionSchemaReq: case ReqCase::kViewRemoveOnUpdateReq: + case ReqCase::kRemoveHostedTablesUpdateReq: return false; case proto::Request::CLIENT_REQ_NOT_SET: throw std::runtime_error("Unhandled request type 2"); @@ -1228,24 +1294,41 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) { break; } case proto::Request::kGetHostedTablesReq: { - proto::Response resp; - const auto& tables = resp.mutable_get_hosted_tables_resp(); - const auto& infos = tables->mutable_table_infos(); - for (const auto& name : m_resources.get_table_ids()) { - const auto& v = infos->Add(); + const auto& r = req.get_hosted_tables_req(); + if (!r.subscribe()) { + proto::Response resp; + const auto& tables = resp.mutable_get_hosted_tables_resp(); + const auto& infos = tables->mutable_table_infos(); + for (const auto& name : m_resources.get_table_ids()) { + const auto& v = infos->Add(); - v->set_entity_id(name); - const auto tbl = m_resources.get_table(name); + v->set_entity_id(name); + const auto tbl = m_resources.get_table(name); - if (!tbl->get_index().empty()) { - v->set_index(tbl->get_index()); - } + if (!tbl->get_index().empty()) { + v->set_index(tbl->get_index()); + } - if (tbl->get_limit() != std::numeric_limits::max()) { - v->set_limit(tbl->get_limit()); + if (tbl->get_limit() != std::numeric_limits::max()) { + v->set_limit(tbl->get_limit()); + } } + + push_resp(std::move(resp)); + } else { + Subscription sub_info; + sub_info.id = req.msg_id(); + sub_info.client_id = client_id; + m_resources.create_on_hosted_tables_update_sub(sub_info); } + break; + } + case proto::Request::kRemoveHostedTablesUpdateReq: { + auto sub_id = req.remove_hosted_tables_update_req().id(); + m_resources.remove_on_hosted_tables_update_sub(sub_id, client_id); + proto::Response resp; + resp.mutable_remove_hosted_tables_update_resp(); push_resp(std::move(resp)); break; } @@ -1348,6 +1431,18 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) { proto::Response resp; resp.mutable_make_table_resp(); push_resp(std::move(resp)); + + // Notify `on_thsoted_tables_update` listeners + auto subscriptions = m_resources.get_on_hosted_tables_update_sub(); + for (auto& subscription : subscriptions) { + Response out; + out.set_msg_id(subscription.id); + ProtoServerResp resp2; + resp2.data = std::move(out); + resp2.client_id = subscription.client_id; + proto_resp.emplace_back(std::move(resp2)); + } + break; } case proto::Request::kTableSizeReq: { @@ -2272,6 +2367,18 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) { proto::Response resp; resp.mutable_table_delete_resp(); push_resp(std::move(resp)); + + // notify `on_hosted_tables_update` listeners + auto subscriptions = m_resources.get_on_hosted_tables_update_sub(); + for (auto& subscription : subscriptions) { + Response out; + out.set_msg_id(subscription.id); + ProtoServerResp resp2; + resp2.data = std::move(out); + resp2.client_id = subscription.client_id; + proto_resp.emplace_back(std::move(resp2)); + } + break; } case proto::Request::kViewDeleteReq: { @@ -2509,30 +2616,6 @@ ProtoServer::_process_table( m_resources.mark_table_clean(table_id); } -void -ServerResources::remove_view_on_update_sub( - const t_id& view_id, std::uint32_t sub_id, std::uint32_t client_id -) { - if (m_view_on_update_subs.find(view_id) != m_view_on_update_subs.end()) { - auto& subs = m_view_on_update_subs[view_id]; - subs.erase( - std::remove_if( - subs.begin(), - subs.end(), - [sub_id, client_id](const Subscription& sub) { - return sub.id == sub_id && sub.client_id == client_id; - } - ), - subs.end() - ); - } -} - -void -ServerResources::drop_view_on_delete_sub(const t_id& view_id) { - m_view_on_delete_subs.erase(view_id); -} - } // namespace perspective::server const char* diff --git a/cpp/perspective/src/include/perspective/server.h b/cpp/perspective/src/include/perspective/server.h index 9f2efb18ac..8d36ddc15a 100644 --- a/cpp/perspective/src/include/perspective/server.h +++ b/cpp/perspective/src/include/perspective/server.h @@ -569,6 +569,13 @@ namespace server { ); void drop_view_on_delete_sub(const t_id& view_id); + // `on_hosted_tables_update()` + void create_on_hosted_tables_update_sub(Subscription sub); + std::vector get_on_hosted_tables_update_sub(); + void remove_on_hosted_tables_update_sub( + std::uint32_t sub_id, std::uint32_t client_id + ); + void mark_table_dirty(const t_id& id); void mark_table_clean(const t_id& id); void mark_all_tables_clean(); @@ -594,6 +601,8 @@ namespace server { tsl::hopscotch_map> m_table_on_delete_subs; + std::vector m_on_hosted_tables_update_subs; + tsl::hopscotch_set m_dirty_tables; #ifdef PSP_PARALLEL_FOR diff --git a/cpp/protos/perspective.proto b/cpp/protos/perspective.proto index b13098ce07..5c0d63aa3f 100644 --- a/cpp/protos/perspective.proto +++ b/cpp/protos/perspective.proto @@ -118,6 +118,7 @@ message Request { // Minimum Virtual API (theoretical). GetFeaturesReq get_features_req = 3; GetHostedTablesReq get_hosted_tables_req = 4; + RemoveHostedTablesUpdateReq remove_hosted_tables_update_req = 37; TableMakePortReq table_make_port_req = 5; TableMakeViewReq table_make_view_req = 6; TableSchemaReq table_schema_req = 7; @@ -164,6 +165,7 @@ message Response { oneof client_resp { GetFeaturesResp get_features_resp = 3; GetHostedTablesResp get_hosted_tables_resp = 4; + RemoveHostedTablesUpdateResp remove_hosted_tables_update_resp = 37; TableMakePortResp table_make_port_resp = 5; TableMakeViewResp table_make_view_resp = 6; TableSchemaResp table_schema_resp = 7; @@ -219,7 +221,10 @@ message GetFeaturesResp { } // `Client::get_hosted_tables` -message GetHostedTablesReq {} +message GetHostedTablesReq { + bool subscribe = 1; +} + message GetHostedTablesResp { repeated HostedTable table_infos = 1; } @@ -230,6 +235,11 @@ message HostedTable { optional uint32 limit = 3; } +message RemoveHostedTablesUpdateReq { + uint32 id = 1; +} +message RemoveHostedTablesUpdateResp {} + // `Table::size` message TableSizeReq {} message TableSizeResp { diff --git a/examples/blocks/src/editable/README.md b/examples/blocks/src/editable/README.md index b5c308502f..67ac27cc0b 100644 --- a/examples/blocks/src/editable/README.md +++ b/examples/blocks/src/editable/README.md @@ -1,7 +1,4 @@ -A simple example of [Perspective](https://github.com/finos/perspective) with -superstore data, and editability enabled (not default but easy to toggle at -runtime). - -An explcit `SharedWorker` is used to create the Perspective engine, allowing -edits to be shared live when this example is opened across multiple browser -tabs. +A simple example of [Perspective](https://github.com/finos/perspective) with superstore +data, and editability enabled (not default but easy to toggle at runtime). This +example has no server component, and the edits occur only within the browser session; +refreshing the page will forget any edits and revert to the original dataset. diff --git a/examples/blocks/src/editable/index.html b/examples/blocks/src/editable/index.html index 6fad615870..91af59f54d 100644 --- a/examples/blocks/src/editable/index.html +++ b/examples/blocks/src/editable/index.html @@ -13,23 +13,13 @@ import perspective from "/node_modules/@finos/perspective/dist/cdn/perspective.js"; + const worker = await perspective.worker(); + const resp = await fetch("/node_modules/superstore-arrow/superstore.lz4.arrow"); + const arrow = await resp.arrayBuffer(); const viewer = document.getElementsByTagName("perspective-viewer")[0]; - - const worker = new SharedWorker("/node_modules/@finos/perspective/dist/cdn/perspective-server.worker.js"); - const client = await perspective.worker(worker); - const tables = await client.get_hosted_table_names(); - - if (tables.length > 0) { - const table = client.open_table(tables[0]); - viewer.load(table); - } else { - const resp = await fetch("/node_modules/superstore-arrow/superstore.lz4.arrow"); - const arrow = await resp.arrayBuffer(); - const table = client.table(arrow); - viewer.load(table); - } - - viewer.restore({ plugin_config: { edit_mode: "EDIT" }, settings: true }); + const table = worker.table(arrow); + viewer.load(table); + viewer.restore({ settings: true, plugin_config: { edit_mode: "EDIT" } });