Skip to content

Commit b745d56

Browse files
committed
fix: cross-node presence synchronization
The Presence GenServer now joins a pg group and forwards deltas to other Presence servers on remote nodes. When a delta is received from a remote node, it's applied to the local state, enabling presence data to sync across the cluster. Changes: - Presence server joins pg group "presence:{name}" on startup - broadcast_delta() forwards deltas to remote Presence servers via pg - handle_info() applies incoming deltas from remote nodes to local state - Channels subscribe to presence topic and handle presence delta messages
1 parent 13d7edd commit b745d56

3 files changed

Lines changed: 168 additions & 16 deletions

File tree

crates/starlang/src/presence/server.rs

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use super::types::{PresenceDiff, PresenceMessage, PresenceMeta, PresenceRef, PresenceState};
44
use crate::core::{Pid, RawTerm};
5+
use crate::dist::pg;
56
use crate::gen_server::{
67
CallResult, CastResult, ContinueArg, ContinueResult, From, GenServer, InfoResult, InitResult,
78
async_trait,
@@ -92,6 +93,8 @@ pub struct PresenceServerState {
9293
name: String,
9394
/// The PubSub server name.
9495
pubsub: String,
96+
/// The pg group for presence servers.
97+
pg_group: String,
9598
/// Presence state per topic: topic -> (key -> state).
9699
state: Arc<DashMap<String, HashMap<String, PresenceState>>>,
97100
/// Reverse lookup: PID -> set of (topic, key) pairs.
@@ -105,6 +108,7 @@ impl Default for PresenceServerState {
105108
Self {
106109
name: String::new(),
107110
pubsub: String::new(),
111+
pg_group: String::new(),
108112
state: Arc::new(DashMap::new()),
109113
pid_to_presence: Arc::new(DashMap::new()),
110114
self_pid: None,
@@ -124,9 +128,11 @@ impl GenServer for Presence {
124128
type Reply = PresenceReply;
125129

126130
async fn init(config: PresenceConfig) -> InitResult<PresenceServerState> {
131+
let pg_group = format!("presence:{}", config.name);
127132
let state = PresenceServerState {
128133
name: config.name,
129134
pubsub: config.pubsub,
135+
pg_group,
130136
state: Arc::new(DashMap::new()),
131137
pid_to_presence: Arc::new(DashMap::new()),
132138
self_pid: None,
@@ -165,6 +171,13 @@ impl GenServer for Presence {
165171
let mut topic_state = state.state.entry(topic.clone()).or_default();
166172
let key_state = topic_state.entry(key.clone()).or_default();
167173
key_state.metas.push(presence_meta.clone());
174+
tracing::debug!(
175+
topic = %topic,
176+
key = %key,
177+
pid = ?pid,
178+
total_keys = topic_state.len(),
179+
"Presence::track - added to state"
180+
);
168181
}
169182

170183
// Track reverse lookup
@@ -337,6 +350,12 @@ impl GenServer for Presence {
337350
.get(&topic)
338351
.map(|s| s.clone())
339352
.unwrap_or_default();
353+
tracing::debug!(
354+
topic = %topic,
355+
presence_count = presences.len(),
356+
keys = ?presences.keys().collect::<Vec<_>>(),
357+
"Presence::list called"
358+
);
340359
CallResult::Reply(PresenceReply::List(presences), std::mem::take(state))
341360
}
342361

@@ -370,16 +389,36 @@ impl GenServer for Presence {
370389
msg: RawTerm,
371390
state: &mut PresenceServerState,
372391
) -> InfoResult<PresenceServerState> {
373-
// Handle presence messages from PubSub
392+
// Handle presence messages from other Presence servers
374393
if let Ok(presence_msg) = postcard::from_bytes::<PresenceMessage>(msg.as_ref()) {
375394
match presence_msg {
376395
PresenceMessage::Delta { topic, diff } => {
396+
tracing::debug!(
397+
topic = %topic,
398+
joins = diff.joins.len(),
399+
leaves = diff.leaves.len(),
400+
"Presence received delta from remote node, applying"
401+
);
377402
apply_delta(&state.state, &topic, diff);
403+
404+
// Log state after applying delta
405+
if let Some(topic_state) = state.state.get(&topic) {
406+
tracing::debug!(
407+
topic = %topic,
408+
keys = ?topic_state.keys().collect::<Vec<_>>(),
409+
"Presence state after applying delta"
410+
);
411+
}
378412
}
379413
PresenceMessage::StateSync {
380414
topic,
381415
state: remote_state,
382416
} => {
417+
tracing::debug!(
418+
topic = %topic,
419+
remote_keys = remote_state.len(),
420+
"Presence received state sync from remote node, merging"
421+
);
383422
merge_state(&state.state, &topic, remote_state);
384423
}
385424
}
@@ -397,9 +436,13 @@ impl GenServer for Presence {
397436
// Register ourselves by name
398437
let _ = crate::register(&state.name, self_pid);
399438

439+
// Join the pg group so other Presence servers can find us
440+
pg::join(&state.pg_group, self_pid);
441+
400442
tracing::debug!(
401443
name = %state.name,
402444
pubsub = %state.pubsub,
445+
pg_group = %state.pg_group,
403446
pid = ?self_pid,
404447
"Presence started"
405448
);
@@ -408,20 +451,41 @@ impl GenServer for Presence {
408451
}
409452
}
410453

411-
/// Broadcast a delta to other nodes via PubSub.
454+
/// Broadcast a delta to other nodes via PubSub and to other Presence servers via pg.
412455
async fn broadcast_delta(pubsub: &str, topic: &str, diff: PresenceDiff) {
413456
if diff.is_empty() {
414457
return;
415458
}
416459

417460
let msg = PresenceMessage::Delta {
418461
topic: topic.to_string(),
419-
diff,
462+
diff: diff.clone(),
420463
};
421464

422-
// Broadcast to all subscribers of the presence topic
465+
// Broadcast to all subscribers of the presence topic (channels)
423466
let presence_topic = format!("presence:{}", topic);
424467
let _ = PubSub::broadcast(pubsub, &presence_topic, &msg).await;
468+
469+
// Also send to other Presence servers in the pg group so they can merge state
470+
// The pg group name is "presence:{pubsub_name}"
471+
let pg_group = format!("presence:{}", pubsub);
472+
let members = pg::get_members(&pg_group);
473+
let self_pid = crate::current_pid();
474+
let my_node = crate::core::node::node_name_atom();
475+
476+
if let Ok(msg_bytes) = postcard::to_allocvec(&msg) {
477+
for pid in members {
478+
// Skip ourselves and only send to remote nodes
479+
if pid != self_pid && pid.node() != my_node {
480+
tracing::trace!(
481+
target_pid = ?pid,
482+
topic = %topic,
483+
"Forwarding presence delta to remote Presence server"
484+
);
485+
let _ = crate::send_raw(pid, msg_bytes.clone());
486+
}
487+
}
488+
}
425489
}
426490

427491
/// Apply a delta to local state.

examples/chat/src/channel.rs

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ use starlang::RawTerm;
1212
use starlang::channel::{
1313
Channel, ChannelReply, HandleResult, JoinError, JoinResult, Socket, broadcast_from, push,
1414
};
15-
use starlang::presence::Presence;
15+
use starlang::presence::{Presence, PresenceMessage};
16+
use starlang::pubsub::PubSub;
1617

1718
/// The name of the chat presence server.
1819
const PRESENCE_NAME: &str = "chat_presence";
20+
/// The name of the chat pubsub server.
21+
const PUBSUB_NAME: &str = "chat_pubsub";
1922
use std::time::Duration;
2023

2124
/// Custom state stored in each socket's assigns.
@@ -140,6 +143,13 @@ impl Channel for RoomChannel {
140143
}
141144
tracing::debug!(topic = %topic, key = %presence_key, "Tracked presence");
142145

146+
// Subscribe to presence updates for this room
147+
let presence_topic = format!("presence:{}", topic);
148+
if let Err(e) = PubSub::subscribe_pid(PUBSUB_NAME, &presence_topic, socket.pid).await {
149+
tracing::warn!(error = %e, "Failed to subscribe to presence updates");
150+
}
151+
tracing::debug!(topic = %presence_topic, "Subscribed to presence updates");
152+
143153
// Send ourselves an :after_join message to trigger presence sync and history push
144154
if let Ok(msg) = postcard::to_allocvec(&ChannelInfo::AfterJoin) {
145155
let _ = starlang::send_raw(socket.pid, msg);
@@ -331,8 +341,64 @@ impl Channel for RoomChannel {
331341
}
332342
}
333343

334-
// Note: Presence messages are now handled by the Presence GenServer
335-
// via PubSub subscriptions, so we don't need to manually handle them here.
344+
// Handle presence delta messages from PubSub
345+
if let Some(presence_msg) = msg.decode::<PresenceMessage>() {
346+
match presence_msg {
347+
PresenceMessage::Delta { topic: _, diff } => {
348+
// Process joins - notify client of new users
349+
for (key, state) in &diff.joins {
350+
for meta in &state.metas {
351+
if let Some(user_meta) = meta.decode::<UserPresenceMeta>() {
352+
// Don't notify about ourselves
353+
if meta.pid != socket.pid {
354+
tracing::debug!(
355+
room = %socket.assigns.room_name,
356+
nick = %user_meta.nick,
357+
key = %key,
358+
"Presence join received, notifying client"
359+
);
360+
push(
361+
socket,
362+
"user_joined",
363+
&RoomOutEvent::UserJoined {
364+
nick: user_meta.nick,
365+
},
366+
);
367+
}
368+
}
369+
}
370+
}
371+
372+
// Process leaves - notify client of users leaving
373+
for (key, state) in &diff.leaves {
374+
for meta in &state.metas {
375+
if let Some(user_meta) = meta.decode::<UserPresenceMeta>() {
376+
// Don't notify about ourselves
377+
if meta.pid != socket.pid {
378+
tracing::debug!(
379+
room = %socket.assigns.room_name,
380+
nick = %user_meta.nick,
381+
key = %key,
382+
"Presence leave received, notifying client"
383+
);
384+
push(
385+
socket,
386+
"user_left",
387+
&RoomOutEvent::UserLeft {
388+
nick: user_meta.nick,
389+
},
390+
);
391+
}
392+
}
393+
}
394+
}
395+
return HandleResult::NoReply;
396+
}
397+
PresenceMessage::StateSync { .. } => {
398+
// Full state sync - we handle this via PushPresenceState
399+
}
400+
}
401+
}
336402

337403
HandleResult::NoReply
338404
}
@@ -363,6 +429,12 @@ impl Channel for RoomChannel {
363429
tracing::warn!(error = %e, "Failed to untrack presence");
364430
}
365431
tracing::debug!(topic = %topic, key = %presence_key, "Untracked presence");
432+
433+
// Unsubscribe from presence updates
434+
let presence_topic = format!("presence:{}", topic);
435+
if let Err(e) = PubSub::unsubscribe_pid(PUBSUB_NAME, &presence_topic, socket.pid).await {
436+
tracing::warn!(error = %e, "Failed to unsubscribe from presence updates");
437+
}
366438
}
367439
}
368440

examples/chat/tests/distribution_test.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -270,15 +270,31 @@ async fn test_single_node_chat() {
270270
)
271271
.await;
272272

273-
// Bob should receive the message
274-
let resp = timeout(Duration::from_secs(2), read_message(&mut bob)).await;
275-
assert!(resp.is_ok(), "Bob should receive alice's message");
276-
let resp = decode_event(&resp.unwrap());
277-
assert!(
278-
matches!(resp, ServerEvent::Message { ref from, ref text, .. } if from == "alice" && text == "hello from alice"),
279-
"Expected Message, got {:?}",
280-
resp
281-
);
273+
// Bob should receive the message (may receive UserJoined/UserList first due to presence sync)
274+
let mut message_received = false;
275+
for _ in 0..5 {
276+
let resp = timeout(Duration::from_secs(2), read_message(&mut bob)).await;
277+
if resp.is_err() {
278+
break;
279+
}
280+
let event = decode_event(&resp.unwrap());
281+
match event {
282+
ServerEvent::Message {
283+
ref from, ref text, ..
284+
} if from == "alice" && text == "hello from alice" => {
285+
message_received = true;
286+
break;
287+
}
288+
ServerEvent::UserJoined { .. } | ServerEvent::UserList { .. } => {
289+
// Skip presence-related events
290+
continue;
291+
}
292+
other => {
293+
panic!("Expected Message or presence event, got {:?}", other);
294+
}
295+
}
296+
}
297+
assert!(message_received, "Bob should have received alice's message");
282298

283299
// Server is automatically cleaned up when _server goes out of scope (start_link)
284300
}

0 commit comments

Comments
 (0)