Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

155 changes: 119 additions & 36 deletions cpp/perspective/src/cpp/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ ServerResources::remove_view_on_delete_sub(
}
}

void
ServerResources::drop_view_on_delete_sub(const t_id& view_id) {
m_view_on_delete_subs.erase(view_id);
}

void
ServerResources::create_view_on_update_sub(
const t_id& view_id, Subscription sub
Expand Down Expand Up @@ -636,6 +641,53 @@ ServerResources::drop_view_on_update_sub(const t_id& view_id) {
m_view_on_update_subs.erase(view_id);
}

void
ServerResources::remove_view_on_update_sub(
const t_id& view_id, std::uint32_t sub_id, std::uint32_t client_id
) {
if (m_view_on_update_subs.find(view_id) != m_view_on_update_subs.end()) {
auto& subs = m_view_on_update_subs[view_id];
subs.erase(
std::remove_if(
subs.begin(),
subs.end(),
[sub_id, client_id](const Subscription& sub) {
return sub.id == sub_id && sub.client_id == client_id;
}
),
subs.end()
);
}
}

void
ServerResources::create_on_hosted_tables_update_sub(Subscription sub) {
PSP_WRITE_LOCK(m_write_lock);
m_on_hosted_tables_update_subs.push_back(sub);
}

std::vector<Subscription>
ServerResources::get_on_hosted_tables_update_sub() {
PSP_READ_LOCK(m_write_lock);
return m_on_hosted_tables_update_subs;
}

void
ServerResources::remove_on_hosted_tables_update_sub(
std::uint32_t sub_id, std::uint32_t client_id
) {
m_on_hosted_tables_update_subs.erase(
std::remove_if(
m_on_hosted_tables_update_subs.begin(),
m_on_hosted_tables_update_subs.end(),
[sub_id, client_id](const Subscription& sub) {
return sub.id == sub_id && sub.client_id == client_id;
}
),
m_on_hosted_tables_update_subs.end()
);
}

std::vector<std::pair<std::shared_ptr<Table>, const ServerResources::t_id>>
ServerResources::get_dirty_tables() {
PSP_READ_LOCK(m_write_lock);
Expand All @@ -662,6 +714,18 @@ ServerResources::drop_client(const std::uint32_t client_id) {
delete_view(client_id, view_id);
}
}

std::vector<Subscription> subs;
std::remove_copy_if(
m_on_hosted_tables_update_subs.begin(),
m_on_hosted_tables_update_subs.end(),
std::back_inserter(subs),
[&client_id](const Subscription& item) {
return item.client_id == client_id;
}
);

m_on_hosted_tables_update_subs = subs;
}

std::uint32_t
Expand Down Expand Up @@ -876,6 +940,7 @@ needs_poll(const proto::Request::ClientReqCase proto_case) {
case ReqCase::kTableUpdateReq:
case ReqCase::kTableRemoveDeleteReq:
case ReqCase::kGetHostedTablesReq:
case ReqCase::kRemoveHostedTablesUpdateReq:
case ReqCase::kTableReplaceReq:
case ReqCase::kTableDeleteReq:
case ReqCase::kViewGetConfigReq:
Expand Down Expand Up @@ -932,6 +997,7 @@ entity_type_is_table(const proto::Request::ClientReqCase proto_case) {
case ReqCase::kViewDeleteReq:
case ReqCase::kViewExpressionSchemaReq:
case ReqCase::kViewRemoveOnUpdateReq:
case ReqCase::kRemoveHostedTablesUpdateReq:
return false;
case proto::Request::CLIENT_REQ_NOT_SET:
throw std::runtime_error("Unhandled request type 2");
Expand Down Expand Up @@ -1228,24 +1294,41 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) {
break;
}
case proto::Request::kGetHostedTablesReq: {
proto::Response resp;
const auto& tables = resp.mutable_get_hosted_tables_resp();
const auto& infos = tables->mutable_table_infos();
for (const auto& name : m_resources.get_table_ids()) {
const auto& v = infos->Add();
const auto& r = req.get_hosted_tables_req();
if (!r.subscribe()) {
proto::Response resp;
const auto& tables = resp.mutable_get_hosted_tables_resp();
const auto& infos = tables->mutable_table_infos();
for (const auto& name : m_resources.get_table_ids()) {
const auto& v = infos->Add();

v->set_entity_id(name);
const auto tbl = m_resources.get_table(name);
v->set_entity_id(name);
const auto tbl = m_resources.get_table(name);

if (!tbl->get_index().empty()) {
v->set_index(tbl->get_index());
}
if (!tbl->get_index().empty()) {
v->set_index(tbl->get_index());
}

if (tbl->get_limit() != std::numeric_limits<int>::max()) {
v->set_limit(tbl->get_limit());
if (tbl->get_limit() != std::numeric_limits<int>::max()) {
v->set_limit(tbl->get_limit());
}
}

push_resp(std::move(resp));
} else {
Subscription sub_info;
sub_info.id = req.msg_id();
sub_info.client_id = client_id;
m_resources.create_on_hosted_tables_update_sub(sub_info);
}

break;
}
case proto::Request::kRemoveHostedTablesUpdateReq: {
auto sub_id = req.remove_hosted_tables_update_req().id();
m_resources.remove_on_hosted_tables_update_sub(sub_id, client_id);
proto::Response resp;
resp.mutable_remove_hosted_tables_update_resp();
push_resp(std::move(resp));
break;
}
Expand Down Expand Up @@ -1348,6 +1431,18 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) {
proto::Response resp;
resp.mutable_make_table_resp();
push_resp(std::move(resp));

// Notify `on_thsoted_tables_update` listeners
auto subscriptions = m_resources.get_on_hosted_tables_update_sub();
for (auto& subscription : subscriptions) {
Response out;
out.set_msg_id(subscription.id);
ProtoServerResp<ProtoServer::Response> resp2;
resp2.data = std::move(out);
resp2.client_id = subscription.client_id;
proto_resp.emplace_back(std::move(resp2));
}

break;
}
case proto::Request::kTableSizeReq: {
Expand Down Expand Up @@ -2272,6 +2367,18 @@ ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) {
proto::Response resp;
resp.mutable_table_delete_resp();
push_resp(std::move(resp));

// notify `on_hosted_tables_update` listeners
auto subscriptions = m_resources.get_on_hosted_tables_update_sub();
for (auto& subscription : subscriptions) {
Response out;
out.set_msg_id(subscription.id);
ProtoServerResp<ProtoServer::Response> resp2;
resp2.data = std::move(out);
resp2.client_id = subscription.client_id;
proto_resp.emplace_back(std::move(resp2));
}

break;
}
case proto::Request::kViewDeleteReq: {
Expand Down Expand Up @@ -2509,30 +2616,6 @@ ProtoServer::_process_table(
m_resources.mark_table_clean(table_id);
}

void
ServerResources::remove_view_on_update_sub(
const t_id& view_id, std::uint32_t sub_id, std::uint32_t client_id
) {
if (m_view_on_update_subs.find(view_id) != m_view_on_update_subs.end()) {
auto& subs = m_view_on_update_subs[view_id];
subs.erase(
std::remove_if(
subs.begin(),
subs.end(),
[sub_id, client_id](const Subscription& sub) {
return sub.id == sub_id && sub.client_id == client_id;
}
),
subs.end()
);
}
}

void
ServerResources::drop_view_on_delete_sub(const t_id& view_id) {
m_view_on_delete_subs.erase(view_id);
}

} // namespace perspective::server

const char*
Expand Down
9 changes: 9 additions & 0 deletions cpp/perspective/src/include/perspective/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,13 @@ namespace server {
);
void drop_view_on_delete_sub(const t_id& view_id);

// `on_hosted_tables_update()`
void create_on_hosted_tables_update_sub(Subscription sub);
std::vector<Subscription> get_on_hosted_tables_update_sub();
void remove_on_hosted_tables_update_sub(
std::uint32_t sub_id, std::uint32_t client_id
);

void mark_table_dirty(const t_id& id);
void mark_table_clean(const t_id& id);
void mark_all_tables_clean();
Expand All @@ -594,6 +601,8 @@ namespace server {
tsl::hopscotch_map<t_id, std::vector<Subscription>>
m_table_on_delete_subs;

std::vector<Subscription> m_on_hosted_tables_update_subs;

tsl::hopscotch_set<t_id> m_dirty_tables;

#ifdef PSP_PARALLEL_FOR
Expand Down
12 changes: 11 additions & 1 deletion cpp/protos/perspective.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ message Request {
// Minimum Virtual API (theoretical).
GetFeaturesReq get_features_req = 3;
GetHostedTablesReq get_hosted_tables_req = 4;
RemoveHostedTablesUpdateReq remove_hosted_tables_update_req = 37;
TableMakePortReq table_make_port_req = 5;
TableMakeViewReq table_make_view_req = 6;
TableSchemaReq table_schema_req = 7;
Expand Down Expand Up @@ -164,6 +165,7 @@ message Response {
oneof client_resp {
GetFeaturesResp get_features_resp = 3;
GetHostedTablesResp get_hosted_tables_resp = 4;
RemoveHostedTablesUpdateResp remove_hosted_tables_update_resp = 37;
TableMakePortResp table_make_port_resp = 5;
TableMakeViewResp table_make_view_resp = 6;
TableSchemaResp table_schema_resp = 7;
Expand Down Expand Up @@ -219,7 +221,10 @@ message GetFeaturesResp {
}

// `Client::get_hosted_tables`
message GetHostedTablesReq {}
message GetHostedTablesReq {
bool subscribe = 1;
}

message GetHostedTablesResp {
repeated HostedTable table_infos = 1;
}
Expand All @@ -230,6 +235,11 @@ message HostedTable {
optional uint32 limit = 3;
}

message RemoveHostedTablesUpdateReq {
uint32 id = 1;
}
message RemoveHostedTablesUpdateResp {}

// `Table::size`
message TableSizeReq {}
message TableSizeResp {
Expand Down
11 changes: 4 additions & 7 deletions examples/blocks/src/editable/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
A simple example of [Perspective](https://github.com/finos/perspective) with
superstore data, and editability enabled (not default but easy to toggle at
runtime).

An explcit `SharedWorker` is used to create the Perspective engine, allowing
edits to be shared live when this example is opened across multiple browser
tabs.
A simple example of [Perspective](https://github.com/finos/perspective) with superstore
data, and editability enabled (not default but easy to toggle at runtime). This
example has no server component, and the edits occur only within the browser session;
refreshing the page will forget any edits and revert to the original dataset.
22 changes: 6 additions & 16 deletions examples/blocks/src/editable/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,13 @@

import perspective from "/node_modules/@finos/perspective/dist/cdn/perspective.js";

const worker = await perspective.worker();
const resp = await fetch("/node_modules/superstore-arrow/superstore.lz4.arrow");
const arrow = await resp.arrayBuffer();
const viewer = document.getElementsByTagName("perspective-viewer")[0];

const worker = new SharedWorker("/node_modules/@finos/perspective/dist/cdn/perspective-server.worker.js");
const client = await perspective.worker(worker);
const tables = await client.get_hosted_table_names();

if (tables.length > 0) {
const table = client.open_table(tables[0]);
viewer.load(table);
} else {
const resp = await fetch("/node_modules/superstore-arrow/superstore.lz4.arrow");
const arrow = await resp.arrayBuffer();
const table = client.table(arrow);
viewer.load(table);
}

viewer.restore({ plugin_config: { edit_mode: "EDIT" }, settings: true });
const table = worker.table(arrow);
viewer.load(table);
viewer.restore({ settings: true, plugin_config: { edit_mode: "EDIT" } });
</script>
<style>
perspective-viewer {
Expand Down
4 changes: 2 additions & 2 deletions packages/perspective-workspace/src/ts/workspace/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const createCommands = (
args.widget_name as string
)!;

menu.unsafe_set_model(await widget.viewer.unsafe_get_model());
menu.set_model(widget.viewer.get_model());
menu.open(indicator);
workspace.get_context_menu()?.init_overlay?.();
menu.addEventListener("blur", () => {
Expand Down Expand Up @@ -80,7 +80,7 @@ export const createCommands = (
const widget = workspace.getWidgetByName(
args.widget_name as string
)!;
menu.unsafe_set_model(await widget.viewer.unsafe_get_model());
menu.set_model(widget.viewer.get_model());

menu.open(indicator);
workspace.get_context_menu()?.init_overlay?.();
Expand Down
Loading
Loading