-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsession.rs
More file actions
100 lines (85 loc) · 3.56 KB
/
session.rs
File metadata and controls
100 lines (85 loc) · 3.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//! Registry of active connection push handles.
//!
//! `SessionRegistry` stores non-owning weak references to [`PushHandle`]s,
//! allowing asynchronous tasks to send frames to live connections without
//! preventing their cleanup. Dead entries can be pruned opportunistically or
//! lazily at lookup time.
use std::sync::{Arc, Weak};
use dashmap::DashMap;
use crate::push::{FrameLike, PushHandle, PushHandleInner};
/// Identifier assigned to a connection.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct ConnectionId(u64);
impl From<u64> for ConnectionId {
fn from(value: u64) -> Self { Self(value) }
}
impl ConnectionId {
/// Create a new [`ConnectionId`] with the provided value.
#[must_use]
pub fn new(id: u64) -> Self { Self(id) }
/// Return the inner `u64` representation.
#[must_use]
pub fn as_u64(&self) -> u64 { self.0 }
}
impl std::fmt::Display for ConnectionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ConnectionId({})", self.0)
}
}
/// Concurrent registry of push handles keyed by [`ConnectionId`].
#[derive(Default)]
pub struct SessionRegistry<F>(DashMap<ConnectionId, Weak<PushHandleInner<F>>>);
impl<F: FrameLike> SessionRegistry<F> {
/// Retain live entries and collect data from each upgraded handle.
fn retain_and_collect<T>(
&self,
mut map: impl FnMut(ConnectionId, Arc<PushHandleInner<F>>) -> T,
) -> Vec<T> {
let mut out = Vec::with_capacity(self.0.len());
self.0.retain(|id, weak| match weak.upgrade() {
Some(inner) => {
out.push(map(*id, inner));
true
}
None => false,
});
out
}
/// Retrieve a `PushHandle` for `id` if the connection is still alive.
pub fn get(&self, id: &ConnectionId) -> Option<PushHandle<F>> {
let guard = self.0.get(id);
let handle = guard.as_ref().and_then(|weak| weak.upgrade());
drop(guard);
if handle.is_none() {
self.0.remove_if(id, |_, weak| weak.strong_count() == 0);
}
handle.map(PushHandle::from_arc)
}
/// Insert a handle for a newly established connection.
pub fn insert(&self, id: ConnectionId, handle: &PushHandle<F>) {
self.0.insert(id, handle.downgrade());
}
/// Remove a handle, typically on connection teardown.
pub fn remove(&self, id: &ConnectionId) { self.0.remove(id); }
/// Remove all stale weak references without returning any handles.
///
/// `DashMap::retain` acquires per-bucket write locks, so other operations
/// may contend briefly while the registry is pruned.
pub fn prune(&self) { self.0.retain(|_, weak| weak.strong_count() > 0); }
/// Prune stale weak references, then collect the remaining live handles.
///
/// This method mutates the registry. Use [`Self::prune`] from a maintenance task
/// to clean up without collecting handles. `DashMap::retain` holds
/// per-bucket write locks while iterating.
#[must_use]
pub fn active_handles(&self) -> Vec<(ConnectionId, PushHandle<F>)> {
self.retain_and_collect(|id, inner| (id, PushHandle::from_arc(inner)))
}
/// Prune stale weak references, then return the IDs of the live connections.
///
/// This method mutates the registry. Use [`Self::prune`] from a maintenance task
/// to clean up without collecting handles. `DashMap::retain` holds
/// per-bucket write locks while iterating.
#[must_use]
pub fn active_ids(&self) -> Vec<ConnectionId> { self.retain_and_collect(|id, _| id) }
}