Skip to content

Commit 3af0256

Browse files
committed
refactor(state): encapsulate fields and extract room update logic
- Make `rooms`, `connections`, and `udp_routes` fields private - Extract stats update logic into `RoomState::update_state` method - Extract join validation logic into `RoomState::can_join` method - Broadcast room updates immediately upon room creation - Fix error message order for join failures
1 parent 37c1b18 commit 3af0256

File tree

2 files changed

+73
-74
lines changed

2 files changed

+73
-74
lines changed

src/connection.rs

Lines changed: 16 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use crate::packet::{
44
FrameworkMessage, Message2Packet, MessagePacket, RoomId, RoomLinkPacket,
55
};
66
use crate::rate::AtomicRateLimiter;
7-
use crate::state::{AppState, ConnectionAction, RoomInit, RoomUpdate};
8-
use crate::utils::current_time_millis;
7+
use crate::state::{AppState, ConnectionAction, RoomInit};
98
use crate::writer::{TcpWriter, UdpWriter};
109
use anyhow::anyhow;
1110
use bytes::{Buf, Bytes, BytesMut};
@@ -235,33 +234,15 @@ impl ConnectionActor {
235234
}
236235
AppPacket::Stats(p) => {
237236
if let Some(ref room) = self.room {
238-
if let Some(mut r) = self.state.room_state.rooms.get_mut(&room.room_id) {
239-
if !room.is_host {
240-
warn!(
241-
"Connection {} tried to update stats but is not host",
242-
self.id
243-
);
244-
return Ok(());
245-
}
246-
247-
let sent_at = p.data.created_at;
248-
249-
r.stats = p.data;
250-
r.updated_at = current_time_millis();
251-
r.ping = current_time_millis() - sent_at;
252-
253-
if let Err(err) =
254-
self.state
255-
.room_state
256-
.broadcast_sender
257-
.send(RoomUpdate::Update {
258-
id: r.id.clone(),
259-
data: r.clone(),
260-
})
261-
{
262-
warn!("Fail to broadcast room update {}", err);
263-
}
237+
if !room.is_host {
238+
warn!(
239+
"Connection {} tried to update stats but is not host",
240+
self.id
241+
);
242+
return Ok(());
264243
}
244+
245+
self.state.room_state.update_state(&room.room_id, p);
265246
}
266247
}
267248
AppPacket::RoomJoin(p) => {
@@ -290,27 +271,17 @@ impl ConnectionActor {
290271
self.room = None;
291272
}
292273

293-
let (can_join, wrong_password) = (|| {
294-
let room = self.state.room_state.rooms.get(&p.room_id)?;
295-
296-
if let Some(ref pass) = room.password {
297-
if pass != &p.password {
298-
return Some((false, true));
299-
}
300-
}
301-
302-
Some((true, false))
303-
})()
304-
.unwrap_or((false, false));
274+
let (room_exists, can_join) =
275+
self.state.room_state.can_join(&p.room_id, &p.password);
305276

306-
if wrong_password {
277+
if !room_exists {
307278
warn!(
308-
"Connection {} tried to join room {} with wrong password.",
279+
"Connection {} tried to join a non-existent room {}.",
309280
self.id, p.room_id
310281
);
311282

312283
self.write_packet(AnyPacket::App(AppPacket::Message(MessagePacket {
313-
message: "@player-connect.wrong-password".to_string(),
284+
message: "@player-connect.room-not-found".to_string(),
314285
})))
315286
.await?;
316287

@@ -319,12 +290,12 @@ impl ConnectionActor {
319290

320291
if !can_join {
321292
warn!(
322-
"Connection {} tried to join a non-existent room {}.",
293+
"Connection {} tried to join room {} with wrong password.",
323294
self.id, p.room_id
324295
);
325296

326297
self.write_packet(AnyPacket::App(AppPacket::Message(MessagePacket {
327-
message: "@player-connect.room-not-found".to_string(),
298+
message: "@player-connect.wrong-password".to_string(),
328299
})))
329300
.await?;
330301

@@ -385,22 +356,6 @@ impl ConnectionActor {
385356
})))
386357
.await?;
387358

388-
let Some(room) = self.state.room_state.rooms.get(&room_id) else {
389-
return Err(anyhow!("Can not find room {}", room_id));
390-
};
391-
392-
if let Err(err) =
393-
self.state
394-
.room_state
395-
.broadcast_sender
396-
.send(RoomUpdate::Update {
397-
id: room.id.clone(),
398-
data: room.clone(),
399-
})
400-
{
401-
warn!("Fail to broadcast room update {}", err);
402-
}
403-
404359
info!("Room {} created by connection {}.", room_id, self.id);
405360
}
406361
}

src/state.rs

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::error::AppError;
44
use crate::models::{RoomView, Stats};
55
use crate::packet::{
66
AnyPacket, AppPacket, ConnectionClosedPacket, ConnectionId, ConnectionIdlingPacket,
7-
ConnectionJoinPacket, RoomClosedPacket, RoomId,
7+
ConnectionJoinPacket, RoomClosedPacket, RoomId, StatsPacket,
88
};
99
use crate::rate::AtomicRateLimiter;
1010
use crate::utils::current_time_millis;
@@ -54,7 +54,7 @@ pub enum RoomUpdate {
5454
}
5555

5656
pub struct RoomState {
57-
pub rooms: DashMap<RoomId, Room>,
57+
rooms: DashMap<RoomId, Room>,
5858
pub broadcast_sender: tokio::sync::broadcast::Sender<RoomUpdate>,
5959
// Keep a receiver to prevent the channel from closing when all clients disconnect
6060
pub _broadcast_receiver: tokio::sync::broadcast::Receiver<RoomUpdate>,
@@ -126,6 +126,40 @@ impl RoomState {
126126
}
127127
}
128128

129+
pub fn update_state(&self, room_id: &RoomId, p: StatsPacket) {
130+
if let Some(mut r) = self.rooms.get_mut(&room_id) {
131+
let sent_at = p.data.created_at;
132+
133+
r.stats = p.data;
134+
r.updated_at = current_time_millis();
135+
r.ping = current_time_millis() - sent_at;
136+
137+
if let Err(err) = self
138+
.broadcast_sender
139+
.send(RoomUpdate::Update {
140+
id: r.id.clone(),
141+
data: r.clone(),
142+
})
143+
{
144+
warn!("Fail to broadcast room update {}", err);
145+
}
146+
} else {
147+
warn!("Room not found {}", room_id);
148+
}
149+
}
150+
151+
pub fn can_join(&self, room_id: &RoomId, password: &str) -> (bool, bool) {
152+
if let Some(room) = self.rooms.get(room_id) {
153+
if let Some(pwd) = room.password.as_deref() {
154+
(true, pwd == password)
155+
} else {
156+
(true, true)
157+
}
158+
} else {
159+
(false, false)
160+
}
161+
}
162+
129163
pub fn create(&self, init: RoomInit) -> RoomId {
130164
let RoomInit {
131165
password,
@@ -157,7 +191,14 @@ impl RoomState {
157191
ping: 0,
158192
};
159193

160-
self.rooms.insert(room_id.clone(), room);
194+
self.rooms.insert(room_id.clone(), room.clone());
195+
196+
if let Err(err) = self.broadcast_sender.send(RoomUpdate::Update {
197+
id: room_id.clone(),
198+
data: room,
199+
}) {
200+
warn!("Fail to broadcast room update {}", err);
201+
}
161202

162203
room_id
163204
}
@@ -167,13 +208,17 @@ impl RoomState {
167208

168209
if let Some((_, room)) = removed {
169210
info!("Room closed {}: {:?}", room_id, reason);
170-
171-
if let Err(e) = room.host_sender.try_send(ConnectionAction::SendTCP(AnyPacket::App(
172-
AppPacket::RoomClosed(RoomClosedPacket {
173-
reason,
174-
}),
175-
))) {
176-
warn!("Failed to send room closed packet to host {}: {}", room.host_connection_id, e);
211+
212+
if let Err(e) = room
213+
.host_sender
214+
.try_send(ConnectionAction::SendTCP(AnyPacket::App(
215+
AppPacket::RoomClosed(RoomClosedPacket { reason }),
216+
)))
217+
{
218+
warn!(
219+
"Failed to send room closed packet to host {}: {}",
220+
room.host_connection_id, e
221+
);
177222
}
178223

179224
for (id, sender) in room.members {
@@ -299,9 +344,8 @@ impl From<&Room> for RoomView {
299344
pub struct AppState {
300345
pub config: Config,
301346
pub room_state: RoomState,
302-
pub connections:
303-
DashMap<ConnectionId, (mpsc::Sender<ConnectionAction>, Arc<AtomicRateLimiter>)>,
304-
pub udp_routes: DashMap<SocketAddr, (mpsc::Sender<ConnectionAction>, Arc<AtomicRateLimiter>)>,
347+
connections: DashMap<ConnectionId, (mpsc::Sender<ConnectionAction>, Arc<AtomicRateLimiter>)>,
348+
udp_routes: DashMap<SocketAddr, (mpsc::Sender<ConnectionAction>, Arc<AtomicRateLimiter>)>,
305349
}
306350

307351
impl AppState {

0 commit comments

Comments
 (0)