Skip to content

Commit 2e5581c

Browse files
committed
fmt
1 parent e3495f9 commit 2e5581c

11 files changed

Lines changed: 238 additions & 210 deletions

File tree

crates/starlang/src/channel/core.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,10 +1161,11 @@ impl ChannelServer {
11611161

11621162
for topic in topics {
11631163
if let Some(handler) = self.find_handler(&topic).cloned()
1164-
&& let Some(socket) = self.channels.get_mut(&topic) {
1165-
let result = handler.handle_info(msg.clone(), socket).await;
1166-
results.push((topic, result));
1167-
}
1164+
&& let Some(socket) = self.channels.get_mut(&topic)
1165+
{
1166+
let result = handler.handle_info(msg.clone(), socket).await;
1167+
results.push((topic, result));
1168+
}
11681169
}
11691170

11701171
results

crates/starlang/src/distribution/global.rs

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,13 @@ impl GlobalRegistry {
129129
.collect();
130130

131131
if let Some(manager) = DIST_MANAGER.get()
132-
&& let Some(tx) = manager.get_node_tx(from_node) {
133-
let msg = GlobalRegistryMessage::SyncResponse { entries };
134-
if let Ok(payload) = postcard::to_allocvec(&msg) {
135-
let _ = tx.try_send(DistMessage::GlobalRegistry { payload });
136-
}
132+
&& let Some(tx) = manager.get_node_tx(from_node)
133+
{
134+
let msg = GlobalRegistryMessage::SyncResponse { entries };
135+
if let Ok(payload) = postcard::to_allocvec(&msg) {
136+
let _ = tx.try_send(DistMessage::GlobalRegistry { payload });
137137
}
138+
}
138139
}
139140
GlobalRegistryMessage::SyncResponse { entries } => {
140141
tracing::debug!(count = entries.len(), from_node = %from_node, "Received global sync response");
@@ -168,40 +169,42 @@ impl GlobalRegistry {
168169
/// Broadcast a global registry message to all connected nodes.
169170
fn broadcast_global_message(&self, msg: &GlobalRegistryMessage) {
170171
if let Some(manager) = DIST_MANAGER.get()
171-
&& let Ok(payload) = postcard::to_allocvec(msg) {
172-
let dist_msg = DistMessage::GlobalRegistry { payload };
173-
for node_atom in manager.connected_nodes() {
174-
if let Some(tx) = manager.get_node_tx(node_atom) {
175-
let _ = tx.try_send(dist_msg.clone());
176-
}
172+
&& let Ok(payload) = postcard::to_allocvec(msg)
173+
{
174+
let dist_msg = DistMessage::GlobalRegistry { payload };
175+
for node_atom in manager.connected_nodes() {
176+
if let Some(tx) = manager.get_node_tx(node_atom) {
177+
let _ = tx.try_send(dist_msg.clone());
177178
}
178179
}
180+
}
179181
}
180182

181183
/// Request sync from a newly connected node.
182184
pub fn request_sync(&self, node_atom: Atom) {
183185
if let Some(manager) = DIST_MANAGER.get()
184-
&& let Some(tx) = manager.get_node_tx(node_atom) {
185-
// Ask them to send their registry to us
186-
let msg = GlobalRegistryMessage::SyncRequest;
187-
if let Ok(payload) = postcard::to_allocvec(&msg) {
188-
let _ = tx.try_send(DistMessage::GlobalRegistry { payload });
189-
}
186+
&& let Some(tx) = manager.get_node_tx(node_atom)
187+
{
188+
// Ask them to send their registry to us
189+
let msg = GlobalRegistryMessage::SyncRequest;
190+
if let Ok(payload) = postcard::to_allocvec(&msg) {
191+
let _ = tx.try_send(DistMessage::GlobalRegistry { payload });
192+
}
190193

191-
// Also push our registry to them
192-
let entries: Vec<(String, Pid)> = self
193-
.names
194-
.iter()
195-
.map(|r| (r.key().clone(), *r.value()))
196-
.collect();
194+
// Also push our registry to them
195+
let entries: Vec<(String, Pid)> = self
196+
.names
197+
.iter()
198+
.map(|r| (r.key().clone(), *r.value()))
199+
.collect();
197200

198-
if !entries.is_empty() {
199-
let response = GlobalRegistryMessage::SyncResponse { entries };
200-
if let Ok(payload) = postcard::to_allocvec(&response) {
201-
let _ = tx.try_send(DistMessage::GlobalRegistry { payload });
202-
}
201+
if !entries.is_empty() {
202+
let response = GlobalRegistryMessage::SyncResponse { entries };
203+
if let Ok(payload) = postcard::to_allocvec(&response) {
204+
let _ = tx.try_send(DistMessage::GlobalRegistry { payload });
203205
}
204206
}
207+
}
205208
}
206209
}
207210

crates/starlang/src/distribution/manager.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -393,16 +393,17 @@ async fn message_sender_loop(mut rx: mpsc::Receiver<DistMessage>, node_atom: Ato
393393

394394
// Connection closed or error - clean up
395395
if let Some(manager) = DIST_MANAGER.get()
396-
&& let Some((_, node)) = manager.nodes.remove(&node_atom) {
397-
if let Some(addr) = node.info.addr {
398-
manager.addr_to_node.remove(&addr);
399-
}
400-
manager
401-
.monitors
402-
.notify_node_down(node_atom, "connection closed".to_string());
403-
// Clean up pg memberships from this node
404-
super::pg::pg().remove_node_members(node_atom);
396+
&& let Some((_, node)) = manager.nodes.remove(&node_atom)
397+
{
398+
if let Some(addr) = node.info.addr {
399+
manager.addr_to_node.remove(&addr);
405400
}
401+
manager
402+
.monitors
403+
.notify_node_down(node_atom, "connection closed".to_string());
404+
// Clean up pg memberships from this node
405+
super::pg::pg().remove_node_members(node_atom);
406+
}
406407
}
407408

408409
/// Loop to receive messages from a remote node.
@@ -435,16 +436,17 @@ async fn message_receiver_loop(node_atom: Atom) {
435436

436437
// Clean up
437438
if let Some(manager) = DIST_MANAGER.get()
438-
&& let Some((_, node)) = manager.nodes.remove(&node_atom) {
439-
if let Some(addr) = node.info.addr {
440-
manager.addr_to_node.remove(&addr);
441-
}
442-
manager
443-
.monitors
444-
.notify_node_down(node_atom, "connection closed".to_string());
445-
// Clean up pg memberships from this node
446-
super::pg::pg().remove_node_members(node_atom);
439+
&& let Some((_, node)) = manager.nodes.remove(&node_atom)
440+
{
441+
if let Some(addr) = node.info.addr {
442+
manager.addr_to_node.remove(&addr);
447443
}
444+
manager
445+
.monitors
446+
.notify_node_down(node_atom, "connection closed".to_string());
447+
// Clean up pg memberships from this node
448+
super::pg::pg().remove_node_members(node_atom);
449+
}
448450
}
449451

450452
/// Handle an incoming message from a remote node.
@@ -470,9 +472,10 @@ async fn handle_incoming_message(from_node: Atom, msg: DistMessage) {
470472
DistMessage::Ping { seq } => {
471473
// Respond with pong
472474
if let Some(manager) = DIST_MANAGER.get()
473-
&& let Some(node) = manager.nodes.get(&from_node) {
474-
let _ = node.tx.try_send(DistMessage::Pong { seq });
475-
}
475+
&& let Some(node) = manager.nodes.get(&from_node)
476+
{
477+
let _ = node.tx.try_send(DistMessage::Pong { seq });
478+
}
476479
}
477480
DistMessage::Pong { seq } => {
478481
tracing::trace!(seq, from_node = %from_node, "Received pong");

crates/starlang/src/distribution/monitor.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ impl NodeMonitorRegistry {
100100
/// Remove a local monitor.
101101
pub fn remove_local_monitor(&self, monitor_ref: NodeMonitorRef) {
102102
if let Some((_, (node_atom, pid))) = self.monitor_refs.remove(&monitor_ref.0)
103-
&& let Some(mut monitors) = self.local_monitors.get_mut(&node_atom) {
104-
monitors.remove(&(pid, monitor_ref.0));
105-
}
103+
&& let Some(mut monitors) = self.local_monitors.get_mut(&node_atom)
104+
{
105+
monitors.remove(&(pid, monitor_ref.0));
106+
}
106107
}
107108

108109
/// Add a remote process monitoring this node.
@@ -135,9 +136,10 @@ impl NodeMonitorRegistry {
135136
};
136137

137138
if let Ok(payload) = postcard::to_allocvec(&msg)
138-
&& let Some(handle) = crate::process::global::try_handle() {
139-
let _ = handle.registry().send_raw(pid, payload);
140-
}
139+
&& let Some(handle) = crate::process::global::try_handle()
140+
{
141+
let _ = handle.registry().send_raw(pid, payload);
142+
}
141143
}
142144
}
143145

@@ -203,12 +205,13 @@ pub fn demonitor_node(monitor_ref: NodeMonitorRef) -> Result<(), DistError> {
203205

204206
// Send DemonitorNode message to remote
205207
if let Some(node_atom) = node_atom
206-
&& let Some(tx) = manager.get_node_tx(node_atom) {
207-
let msg = DistMessage::DemonitorNode {
208-
requesting_pid: crate::runtime::current_pid(),
209-
};
210-
let _ = tx.try_send(msg);
211-
}
208+
&& let Some(tx) = manager.get_node_tx(node_atom)
209+
{
210+
let msg = DistMessage::DemonitorNode {
211+
requesting_pid: crate::runtime::current_pid(),
212+
};
213+
let _ = tx.try_send(msg);
214+
}
212215

213216
Ok(())
214217
}

crates/starlang/src/distribution/pg.rs

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,13 @@ impl ProcessGroups {
249249
.collect();
250250

251251
if let Some(manager) = DIST_MANAGER.get()
252-
&& let Some(tx) = manager.get_node_tx(from_node) {
253-
let msg = PgMessage::SyncResponse { groups };
254-
if let Ok(payload) = postcard::to_allocvec(&msg) {
255-
let _ = tx.try_send(DistMessage::ProcessGroups { payload });
256-
}
252+
&& let Some(tx) = manager.get_node_tx(from_node)
253+
{
254+
let msg = PgMessage::SyncResponse { groups };
255+
if let Ok(payload) = postcard::to_allocvec(&msg) {
256+
let _ = tx.try_send(DistMessage::ProcessGroups { payload });
257257
}
258+
}
258259
}
259260
PgMessage::SyncResponse { groups } => {
260261
tracing::debug!(
@@ -310,32 +311,33 @@ impl ProcessGroups {
310311
/// Request sync from a newly connected node.
311312
pub fn request_sync(&self, node_atom: Atom) {
312313
if let Some(manager) = DIST_MANAGER.get()
313-
&& let Some(tx) = manager.get_node_tx(node_atom) {
314-
// Ask them to send their group memberships to us
315-
let msg = PgMessage::SyncRequest;
316-
if let Ok(payload) = postcard::to_allocvec(&msg) {
317-
let _ = tx.try_send(DistMessage::ProcessGroups { payload });
318-
}
319-
320-
// Also push our local memberships to them
321-
let groups: Vec<(String, Vec<Pid>)> = self
322-
.groups
323-
.iter()
324-
.map(|r| {
325-
let local_members: Vec<Pid> =
326-
r.value().iter().filter(|p| p.is_local()).copied().collect();
327-
(r.key().clone(), local_members)
328-
})
329-
.filter(|(_, members)| !members.is_empty())
330-
.collect();
314+
&& let Some(tx) = manager.get_node_tx(node_atom)
315+
{
316+
// Ask them to send their group memberships to us
317+
let msg = PgMessage::SyncRequest;
318+
if let Ok(payload) = postcard::to_allocvec(&msg) {
319+
let _ = tx.try_send(DistMessage::ProcessGroups { payload });
320+
}
331321

332-
if !groups.is_empty() {
333-
let response = PgMessage::SyncResponse { groups };
334-
if let Ok(payload) = postcard::to_allocvec(&response) {
335-
let _ = tx.try_send(DistMessage::ProcessGroups { payload });
336-
}
322+
// Also push our local memberships to them
323+
let groups: Vec<(String, Vec<Pid>)> = self
324+
.groups
325+
.iter()
326+
.map(|r| {
327+
let local_members: Vec<Pid> =
328+
r.value().iter().filter(|p| p.is_local()).copied().collect();
329+
(r.key().clone(), local_members)
330+
})
331+
.filter(|(_, members)| !members.is_empty())
332+
.collect();
333+
334+
if !groups.is_empty() {
335+
let response = PgMessage::SyncResponse { groups };
336+
if let Ok(payload) = postcard::to_allocvec(&response) {
337+
let _ = tx.try_send(DistMessage::ProcessGroups { payload });
337338
}
338339
}
340+
}
339341
}
340342

341343
/// Broadcast a join to all connected nodes.
@@ -365,14 +367,15 @@ impl ProcessGroups {
365367
/// Broadcast a pg message to all connected nodes.
366368
fn broadcast_pg_message(&self, msg: &PgMessage) {
367369
if let Some(manager) = DIST_MANAGER.get()
368-
&& let Ok(payload) = postcard::to_allocvec(msg) {
369-
let dist_msg = DistMessage::ProcessGroups { payload };
370-
for node_atom in manager.connected_nodes() {
371-
if let Some(tx) = manager.get_node_tx(node_atom) {
372-
let _ = tx.try_send(dist_msg.clone());
373-
}
370+
&& let Ok(payload) = postcard::to_allocvec(msg)
371+
{
372+
let dist_msg = DistMessage::ProcessGroups { payload };
373+
for node_atom in manager.connected_nodes() {
374+
if let Some(tx) = manager.get_node_tx(node_atom) {
375+
let _ = tx.try_send(dist_msg.clone());
374376
}
375377
}
378+
}
376379
}
377380
}
378381

0 commit comments

Comments
 (0)