Skip to content

Commit 74779af

Browse files
committed
Add ViaRegistry trait, Name enum, and opts builders for named process registration
Brings Elixir-style `name:` option to GenServer, Supervisor, and DynamicSupervisor start functions. Processes are auto-registered after init and auto-unregistered after terminate. - ViaRegistry trait for pluggable registry backends (bound-name pattern) - ServerRef::Via variant for addressing via custom registries - Registry::via() method to produce Arc<dyn ViaRegistry> from a Registry - StartOpts builder for GenServer (.name(), .link(), .start::<G>()) - SupervisorStartOpts builder for Supervisor (.name(), .link(), .start::<S>()) - DynamicSupervisorOpts.name() builder method - Error::AlreadyStarted variant for duplicate name registration - Job queue example updated to use builders instead of manual register
1 parent 0ab093b commit 74779af

13 files changed

Lines changed: 499 additions & 80 deletions

File tree

crates/ambitious/src/gen_server/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,14 @@ mod server_ref;
8888
mod traits;
8989
mod types;
9090

91+
/// Pluggable registry support and named process registration.
92+
pub mod via;
93+
9194
// Primary exports
92-
pub use server::{Error, call, cast, reply, start, start_link, stop};
95+
pub use server::{Error, StartOpts, call, cast, reply, start, start_link, stop};
9396
pub use server_ref::ServerRef;
9497
pub use traits::GenServer;
98+
pub use via::{AlreadyRegistered, Name, ViaRegistry};
9599

96100
pub use async_trait::async_trait;
97101
pub use protocol::From;
@@ -108,8 +112,8 @@ pub use crate::core::{ExitReason, Pid, Ref, Term};
108112
/// ```
109113
pub mod prelude {
110114
pub use super::{
111-
Error, ExitReason, From, GenServer, Init, Pid, Reply, ServerRef, Status, async_trait, call,
112-
cast, reply, start, start_link, stop,
115+
Error, ExitReason, From, GenServer, Init, Name, Pid, Reply, ServerRef, StartOpts, Status,
116+
ViaRegistry, async_trait, call, cast, reply, start, start_link, stop,
113117
};
114118
}
115119

crates/ambitious/src/gen_server/server.rs

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use super::protocol::{
1111
};
1212
use super::traits::GenServer;
1313
use super::types::{Init, Reply, Status};
14+
use super::via::{Name, register_name, unregister_name};
1415
use crate::core::{DecodeError, ExitReason, Pid, Ref, Term};
1516
use crate::message::Message;
1617
use crate::process::global;
@@ -37,6 +38,58 @@ pub enum Error {
3738
/// Decode error.
3839
#[error("decode error: {0}")]
3940
Decode(#[from] DecodeError),
41+
/// The process name is already registered.
42+
#[error("already started")]
43+
AlreadyStarted,
44+
}
45+
46+
/// Options for starting a GenServer.
47+
///
48+
/// # Example
49+
///
50+
/// ```ignore
51+
/// use ambitious::gen_server::{StartOpts, Name};
52+
///
53+
/// let pid = StartOpts::new(42)
54+
/// .name(Name::local("counter"))
55+
/// .link()
56+
/// .start::<Counter>()
57+
/// .await?;
58+
/// ```
59+
pub struct StartOpts<A> {
60+
args: A,
61+
name: Option<Name>,
62+
link: bool,
63+
}
64+
65+
impl<A: Send + 'static> StartOpts<A> {
66+
/// Create start options with the given args.
67+
pub fn new(args: A) -> Self {
68+
Self {
69+
args,
70+
name: None,
71+
link: false,
72+
}
73+
}
74+
75+
/// Register the process under the given name after init succeeds.
76+
pub fn name(mut self, name: Name) -> Self {
77+
self.name = Some(name);
78+
self
79+
}
80+
81+
/// Link the spawned process to the caller.
82+
pub fn link(mut self) -> Self {
83+
self.link = true;
84+
self
85+
}
86+
87+
/// Start the GenServer with these options.
88+
///
89+
/// Returns `Error::AlreadyStarted` if a `name` was set and is already taken.
90+
pub async fn start<G: GenServer<Args = A>>(self) -> Result<Pid, Error> {
91+
start_impl::<G>(self.args, self.name, self.link).await
92+
}
4093
}
4194

4295
/// Start a GenServer process (not linked to caller).
@@ -47,17 +100,21 @@ pub enum Error {
47100
/// let pid = start::<Counter>(42).await?;
48101
/// ```
49102
pub async fn start<G: GenServer>(args: G::Args) -> Result<Pid, Error> {
50-
start_impl::<G>(args, false).await
103+
start_impl::<G>(args, None, false).await
51104
}
52105

53106
/// Start a GenServer process linked to the caller.
54107
///
55108
/// If the GenServer crashes, the caller will also crash (unless trapping exits).
56109
pub async fn start_link<G: GenServer>(args: G::Args) -> Result<Pid, Error> {
57-
start_impl::<G>(args, true).await
110+
start_impl::<G>(args, None, true).await
58111
}
59112

60-
async fn start_impl<G: GenServer>(args: G::Args, link: bool) -> Result<Pid, Error> {
113+
async fn start_impl<G: GenServer>(
114+
args: G::Args,
115+
name: Option<Name>,
116+
link: bool,
117+
) -> Result<Pid, Error> {
61118
use tokio::sync::oneshot;
62119
let (init_tx, init_rx) = oneshot::channel();
63120

@@ -71,11 +128,11 @@ async fn start_impl<G: GenServer>(args: G::Args, link: bool) -> Result<Pid, Erro
71128
))
72129
})?;
73130
handle.spawn_link(parent, move || async move {
74-
gen_server_main::<G>(args, Some(init_tx)).await;
131+
gen_server_main::<G>(args, name, Some(init_tx)).await;
75132
})
76133
} else {
77134
handle.spawn(move || async move {
78-
gen_server_main::<G>(args, Some(init_tx)).await;
135+
gen_server_main::<G>(args, name, Some(init_tx)).await;
79136
})
80137
};
81138

@@ -91,30 +148,64 @@ async fn start_impl<G: GenServer>(args: G::Args, link: bool) -> Result<Pid, Erro
91148
/// Main entry point for a GenServer process.
92149
async fn gen_server_main<G: GenServer>(
93150
args: G::Args,
151+
name: Option<Name>,
94152
init_tx: Option<tokio::sync::oneshot::Sender<Result<(), Error>>>,
95153
) {
96154
let init_result = G::init(args).await;
97155

98156
match init_result {
99157
Init::Ok(server) => {
158+
if let Some(ref name) = name
159+
&& register_name(name, crate::current_pid()).is_err()
160+
{
161+
if let Some(tx) = init_tx {
162+
let _ = tx.send(Err(Error::AlreadyStarted));
163+
}
164+
return;
165+
}
100166
if let Some(tx) = init_tx {
101167
let _ = tx.send(Ok(()));
102168
}
103169
gen_server_loop(server).await;
170+
if let Some(ref name) = name {
171+
unregister_name(name);
172+
}
104173
}
105174
Init::Continue(server, continue_arg) => {
175+
if let Some(ref name) = name
176+
&& register_name(name, crate::current_pid()).is_err()
177+
{
178+
if let Some(tx) = init_tx {
179+
let _ = tx.send(Err(Error::AlreadyStarted));
180+
}
181+
return;
182+
}
106183
if let Some(tx) = init_tx {
107184
let _ = tx.send(Ok(()));
108185
}
109186
schedule_continue(&continue_arg);
110187
gen_server_loop(server).await;
188+
if let Some(ref name) = name {
189+
unregister_name(name);
190+
}
111191
}
112192
Init::Timeout(server, duration) => {
193+
if let Some(ref name) = name
194+
&& register_name(name, crate::current_pid()).is_err()
195+
{
196+
if let Some(tx) = init_tx {
197+
let _ = tx.send(Err(Error::AlreadyStarted));
198+
}
199+
return;
200+
}
113201
if let Some(tx) = init_tx {
114202
let _ = tx.send(Ok(()));
115203
}
116204
schedule_timeout(duration);
117205
gen_server_loop(server).await;
206+
if let Some(ref name) = name {
207+
unregister_name(name);
208+
}
118209
}
119210
Init::Ignore => {
120211
if let Some(tx) = init_tx {

crates/ambitious/src/gen_server/server_ref.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
//! ServerRef - unified way to address a GenServer.
22
33
use crate::core::Pid;
4+
use std::sync::Arc;
45

5-
/// A reference to a GenServer, either by PID or registered name.
6+
use super::via::ViaRegistry;
7+
8+
/// A reference to a GenServer, either by PID, registered name, or via a
9+
/// pluggable registry.
610
///
711
/// This allows GenServer client functions (`call`, `cast`, `stop`) to
8-
/// accept either a `Pid` or a name string.
12+
/// accept either a `Pid`, a name string, or a `Via` registry reference.
913
///
1014
/// # Examples
1115
///
@@ -18,12 +22,13 @@ use crate::core::Pid;
1822
/// // By registered name
1923
/// let reply = call::<MyServer, _>("my_server", msg, timeout).await?;
2024
/// ```
21-
#[derive(Debug, Clone)]
2225
pub enum ServerRef {
2326
/// Reference by process ID.
2427
Pid(Pid),
2528
/// Reference by registered name.
2629
Name(String),
30+
/// Reference via a pluggable registry.
31+
Via(Arc<dyn ViaRegistry>),
2732
}
2833

2934
impl ServerRef {
@@ -34,6 +39,27 @@ impl ServerRef {
3439
match self {
3540
ServerRef::Pid(pid) => Some(*pid),
3641
ServerRef::Name(name) => crate::whereis(name),
42+
ServerRef::Via(via) => via.whereis_name(),
43+
}
44+
}
45+
}
46+
47+
impl Clone for ServerRef {
48+
fn clone(&self) -> Self {
49+
match self {
50+
ServerRef::Pid(pid) => ServerRef::Pid(*pid),
51+
ServerRef::Name(name) => ServerRef::Name(name.clone()),
52+
ServerRef::Via(via) => ServerRef::Via(Arc::clone(via)),
53+
}
54+
}
55+
}
56+
57+
impl std::fmt::Debug for ServerRef {
58+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59+
match self {
60+
ServerRef::Pid(pid) => f.debug_tuple("Pid").field(pid).finish(),
61+
ServerRef::Name(name) => f.debug_tuple("Name").field(name).finish(),
62+
ServerRef::Via(_) => f.debug_tuple("Via").field(&"...").finish(),
3763
}
3864
}
3965
}
@@ -56,11 +82,18 @@ impl From<String> for ServerRef {
5682
}
5783
}
5884

85+
impl From<Arc<dyn ViaRegistry>> for ServerRef {
86+
fn from(via: Arc<dyn ViaRegistry>) -> Self {
87+
ServerRef::Via(via)
88+
}
89+
}
90+
5991
impl std::fmt::Display for ServerRef {
6092
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6193
match self {
6294
ServerRef::Pid(pid) => write!(f, "{:?}", pid),
6395
ServerRef::Name(name) => write!(f, "{}", name),
96+
ServerRef::Via(_) => write!(f, "<via>"),
6497
}
6598
}
6699
}

0 commit comments

Comments
 (0)