Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/src/xstudio/api/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def subscribe_to_event_group(self, event_source, callback_method):
raise Exception("Actor has no event group.")

return self.connection.link.add_message_callback(
event_group, callback_method
event_group, callback_method, event_source.remote
)

def unsubscribe_from_event_group(self, uuid):
Expand Down
2 changes: 1 addition & 1 deletion python/src/xstudio/plugin/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def subscribe_to_playhead_events(self, playhead, callback_method, auto_cancel=Tr
self.playhead_subscriptions = []

subscription_id = self.connection.link.add_message_callback(
event_group, callback_method
event_group, callback_method, playhead
)

self.playhead_subscriptions.append(subscription_id)
Expand Down
70 changes: 55 additions & 15 deletions src/python_module/src/py_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,31 @@ class EventToPythonThreadLockerActor : public caf::event_based_actor {
[=](const xstudio::utility::Uuid &callback_id) {
// stop watching

// find actor that has its events forwarded to the callback with
// the given ID
// 1. Send leave broadcast if needed
auto eq_it = callback_to_event_group_.find(callback_id);
if (eq_it != callback_to_event_group_.end()) {
auto events_source = eq_it->second;
bool event_source_still_needed = false;
for (const auto &pair : actor_to_callback_uuid_) {
if (pair.first == caf::actor_cast<caf::actor_addr>(events_source)) {
for (const auto &cid : pair.second) {
if (cid != callback_id) {
event_source_still_needed = true;
break;
}
}
}
}
if (!event_source_still_needed && events_source) {
mail(
xstudio::broadcast::leave_broadcast_atom_v,
caf::actor_cast<caf::actor>(this))
.send(events_source);
}
callback_to_event_group_.erase(eq_it);
}

// 2. Erase from actor_to_callback_uuid_
auto p = actor_to_callback_uuid_.begin();
while (p != actor_to_callback_uuid_.end()) {

Expand All @@ -44,14 +67,6 @@ class EventToPythonThreadLockerActor : public caf::event_based_actor {
}

if (p->second.empty()) {
// don't need to get events from this actor any more
auto events_source = caf::actor_cast<caf::actor>(p->first);
if (events_source) {
mail(
xstudio::broadcast::leave_broadcast_atom_v,
caf::actor_cast<caf::actor>(this))
.send(events_source);
}
p = actor_to_callback_uuid_.erase(p);
} else {
p++;
Expand All @@ -61,6 +76,21 @@ class EventToPythonThreadLockerActor : public caf::event_based_actor {
[=](caf::actor events_source, const xstudio::utility::Uuid &callback_id) {
actor_to_callback_uuid_[caf::actor_cast<caf::actor_addr>(events_source)]
.push_back(callback_id);
callback_to_event_group_[callback_id] = events_source;
// join the events broadcast
mail(
xstudio::broadcast::join_broadcast_atom_v,
caf::actor_cast<caf::actor>(this))
.send(events_source);
},
[=](caf::actor events_source, caf::actor owner_actor, const xstudio::utility::Uuid &callback_id) {
actor_to_callback_uuid_[caf::actor_cast<caf::actor_addr>(events_source)]
.push_back(callback_id);
if (owner_actor) {
actor_to_callback_uuid_[caf::actor_cast<caf::actor_addr>(owner_actor)]
.push_back(callback_id);
}
callback_to_event_group_[callback_id] = events_source;
// join the events broadcast
mail(
xstudio::broadcast::join_broadcast_atom_v,
Expand All @@ -87,6 +117,7 @@ class EventToPythonThreadLockerActor : public caf::event_based_actor {
caf::behavior behavior_;
py_context *context_;
std::map<caf::actor_addr, std::vector<xstudio::utility::Uuid>> actor_to_callback_uuid_;
std::map<xstudio::utility::Uuid, caf::actor> callback_to_event_group_;

caf::behavior make_behavior() override { return behavior_; }
};
Expand Down Expand Up @@ -424,14 +455,19 @@ xstudio::utility::Uuid py_context::py_add_message_callback(const py::args &xs) {

xstudio::utility::Uuid uuid = xstudio::utility::Uuid::generate();

if (xs.size() == 2) {
if (xs.size() == 2 || xs.size() == 3) {

auto i = xs.begin();
// this is the xstudio actor that
auto remote_actor = (*i).cast<caf::actor>();
i++;
auto callback_func = (*i).cast<py::function>();
auto addr = caf::actor_cast<caf::actor_addr>(remote_actor);

caf::actor owner_actor;
if (xs.size() == 3) {
i++;
owner_actor = (*i).cast<caf::actor>();
}

// spawn a listener actor to receive the event messages. THis will
// run the Python callback (after acquiring the GIL)
Expand All @@ -447,12 +483,16 @@ xstudio::utility::Uuid py_context::py_add_message_callback(const py::args &xs) {
// here we message our 'watcher'. It will join the event group of
// the 'remote_actor' and when it gets event messages from that actor
// it will run the py_callback
anon_mail(remote_actor, uuid).send(message_callback_handler_actor_);
if (owner_actor) {
anon_mail(remote_actor, owner_actor, uuid).send(message_callback_handler_actor_);
} else {
anon_mail(remote_actor, uuid).send(message_callback_handler_actor_);
}

} else {
throw std::runtime_error(
"Set message callback expecting tuple of size 2 "
"(remote_event_group_actor, callack_func).");
"Set message callback expecting tuple of size 2 or 3 "
"(remote_event_group_actor, callback_func, [optional_owner_actor]).");
}
return uuid;
}
Expand Down
Loading