Skip to content

Commit dde5d88

Browse files
flxoFelix Obenhuberclaude
authored
anng: adapt to NNG v2.0.0-alpha.7 API (#42)
* anng: adapt to NNG v2.0.0-alpha.7 API Refactor address handling to use URL strings instead of a complex enum: - Update nng-sys to 0.4.0-v2pre.2 (path dependency) - Replace `Addr` enum (Inproc, Ipc, Inet, Inet6, Zt, Abstract) with `Url` wrapper around String, matching NNG's URL-based addressing model. Use `nng_url_sprintf` for URL formatting in `Listener::local_addr()` - Use `nng_pipe_peer_addr` instead of deprecated `nng_pipe_get_addr` - Retrieve URL scheme from pipe's listener/dialer endpoint - Check pipe validity with `nng_pipe_id()` instead of internal field - Enable full feature-powerset CI checks for anng * anng: use unreachable! for validated pipe in remote_addr The NNG_ENOENT branch in nng_pipe_peer_addr cannot be reached because self.pipe() already validates the pipe before this point. * anng: use unreachable! for invalid dialer/listener error paths Replace verbose error-logging-and-debug-assert patterns with unreachable! in remote_addr URL retrieval, consistent with the approach taken for the validated pipe path. * anng: make log message capitalization consistent * anng: add TODO for upstream NNG issue #2228 * anng: accept &CStr in Url::from_nng to avoid caller conversion Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * anng: handle stale pipe in remote_addr gracefully nng_pipe_peer_addr can return NNG_ENOENT when the pipe was closed after self.pipe() returned a stale ID. Return None instead of panicking. * anng: replace unwrap with expect in Url::from_nng write! calls * anng: use SocketAddrV6 Display for IPv6 URL formatting * anng: restore lifetime nuance in CStr SAFETY comments * anng: use split_once for URL scheme parsing in parse_tcp_url * anng: use unreachable! for NNG_ENOENT in Listener::local_addr &self guarantees the listener cannot be closed, so NNG_ENOENT is unreachable. * anng: take &nng_sockaddr by reference in Url::from_nng * anng: validate UTF-8 on stack before allocating in local_addr * anng: add TODO * anng: preserve IPv6 scope_id in parse_tcp_url SocketAddr::from_str() supports numeric IPv6 scope IDs, so there's no need to strip them during parsing. This fixes data loss and maintains consistency with Url::from_nng() which already preserves scope_id. * anng: fix MSRV issue (core vs std) --------- Co-authored-by: Felix Obenhuber <felix.obenhuber@helsing.ai> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 42a73c1 commit dde5d88

File tree

6 files changed

+542
-398
lines changed

6 files changed

+542
-398
lines changed

.github/workflows/check.yml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ jobs:
127127
# The effect of the failure (when running `hack` on the workspace) is here:
128128
# https://github.com/nanomsg/nng-rs/actions/runs/20988550343/job/60328143824
129129
#
130-
# TODO: Remove this workaround once nng and anng are migrated to nng-sys v0.4/NNG v2
130+
# TODO: Remove this workaround once nng is migrated to nng-sys v0.4/NNG v2
131131
# and revert to:
132132
# - name: cargo hack
133133
# run: cargo hack --feature-powerset check --skip source-update-bindings
@@ -141,9 +141,7 @@ jobs:
141141
- name: build and install NNG v2 for nng-sys
142142
run: .github/scripts/install-nng.sh
143143
- name: cargo hack anng
144-
# TODO(flxo): check anng just with feature vendored enabled because the needed nng version
145-
# alpha.6 is not available
146-
run: cargo hack -p anng --features vendored check
144+
run: cargo hack -p anng --feature-powerset check
147145
env:
148146
# add /usr/local/include to the C compiler search path for NNG headers
149147
CFLAGS: -I/usr/local/include

Cargo.lock

Lines changed: 1 addition & 63 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

anng/Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ tracing = "0.1.41"
2727
lazy_static = { version = "1.5.0", optional = true }
2828

2929
[dependencies.nng-sys]
30-
version = "0.4.0-v2pre.1"
31-
# TODO(flxo): make anng use the nng-sys as path dependency from the workspace when updated
32-
# path = "../nng-sys"
30+
version = "0.4.0-v2pre.2"
31+
path = "../nng-sys"
3332
default-features = false
3433
features = ["std"]
3534

@@ -40,6 +39,5 @@ tokio = ["tokio/rt-multi-thread"]
4039

4140
[dev-dependencies]
4241
lexopt = "0.3"
43-
rstest = { version = "0.26.1", default-features = false }
4442
tokio = { version = "1.47.1", features = ["full"] }
4543
tracing-subscriber = "0.3.20"

anng/src/message.rs

Lines changed: 112 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::{AioError, pipes::Addr};
1+
use crate::{AioError, pipes::Url};
22
use bytes::{Buf, BufMut};
33
use core::{
44
cmp::max,
5-
ffi::{c_char, c_void},
5+
ffi::c_void,
66
fmt,
77
mem::MaybeUninit,
88
ops::{Deref, DerefMut},
@@ -386,44 +386,124 @@ impl Message {
386386
}
387387
}
388388

389-
/// Returns the remote address of the message (ie, where it was received from), if available.
390-
pub fn remote_addr(&self) -> Option<Addr> {
389+
/// Returns the remote URL of the message (ie, where it was received from), if available.
390+
pub fn remote_addr(&self) -> Option<Url> {
391+
use core::ffi::CStr;
392+
391393
let pipe = self.pipe()?;
392394

393-
// SAFETY: arg and addr are both valid, and on success, nng_pipe_get_addr initializes
394-
let addr = unsafe {
395-
let mut addr = MaybeUninit::<nng_sys::nng_sockaddr>::uninit();
396-
let errno = nng_sys::nng_pipe_get_addr(
397-
pipe,
398-
nng_sys::NNG_OPT_REMADDR as *const _ as *const c_char,
399-
addr.as_mut_ptr(),
400-
);
401-
match errno {
402-
nng_err::NNG_OK => addr.assume_init(),
403-
nng_err::NNG_ENOTSUP => {
404-
tracing::warn!("Message pipe does not support REMADDR");
405-
return None;
406-
}
407-
nng_err::NNG_ENOENT => {
408-
tracing::warn!("Message does not have a pipe");
409-
return None;
410-
}
411-
err if err.0 & nng_err::NNG_ESYSERR.0 != 0 => {
412-
tracing::warn!(
413-
"nng_pipe_get_addr returned a system error: {}",
414-
io::Error::from_raw_os_error((err.0 & !(nng_err::NNG_ESYSERR.0)) as i32)
415-
);
395+
// Get the peer address from the pipe.
396+
let mut addr = MaybeUninit::<nng_sys::nng_sockaddr>::uninit();
397+
// SAFETY: pipe is valid (from self.pipe()), addr is valid for writing.
398+
let errno = unsafe { nng_sys::nng_pipe_peer_addr(pipe, addr.as_mut_ptr()) };
399+
let addr = match errno {
400+
// SAFETY: nng_pipe_peer_addr initializes addr on success.
401+
nng_err::NNG_OK => unsafe { addr.assume_init() },
402+
nng_err::NNG_ENOTSUP => {
403+
tracing::warn!("Message pipe does not support peer address");
404+
return None;
405+
}
406+
nng_err::NNG_ENOENT => {
407+
// The pipe ID stored in the message is stale - the pipe was closed
408+
// between self.pipe() (which only checks the stored ID) and the
409+
// nng_pipe_peer_addr call.
410+
return None;
411+
}
412+
err if err.0 & nng_err::NNG_ESYSERR.0 != 0 => {
413+
tracing::warn!(
414+
"nng_pipe_peer_addr returned a system error: {}",
415+
io::Error::from_raw_os_error((err.0 & !(nng_err::NNG_ESYSERR.0)) as i32)
416+
);
417+
return None;
418+
}
419+
_ => {
420+
unreachable!(
421+
"nng_pipe_peer_addr documentation claims err \"{errno}\" is never returned"
422+
);
423+
}
424+
};
425+
426+
// Get the URL scheme from the pipe's originating endpoint (listener or dialer).
427+
//
428+
// NNG does not provide a direct API to get a URL or transport type from a pipe.
429+
// There's no `nng_pipe_get_url()` or transport type accessor. The peer address
430+
// (`nng_sockaddr`) only contains address family, not the URL scheme. We must
431+
// retrieve the scheme from the endpoint that created this pipe.
432+
//
433+
// TODO(flxo): once upstream fixes [2228](https://github.com/nanomsg/nng/issues/2228),
434+
// simplify this.
435+
//
436+
// In NNG, a pipe represents a single connection and is created by exactly one of:
437+
// - A listener: when accepting an incoming connection
438+
// - A dialer: when initiating an outgoing connection
439+
//
440+
// We use nng_listener_id/nng_dialer_id to determine which endpoint created this pipe.
441+
// These functions return a positive ID for valid endpoints, or -1 for invalid ones.
442+
// Only one will be valid for any given pipe.
443+
//
444+
// Note on URL ownership: nng_listener_get_url/nng_dialer_get_url return a borrowed
445+
// pointer to NNG's internal URL structure (hence `const nng_url **`). This URL is
446+
// owned by the listener/dialer and will be freed when that endpoint closes.
447+
// We do NOT call nng_url_free here - that is only for URLs created by nng_url_parse().
448+
let mut urlp: *const nng_sys::nng_url = core::ptr::null();
449+
450+
// Try to get URL from listener first, then dialer.
451+
// Note: The endpoint could be closed between checking the ID and getting the URL.
452+
// nng_listener_get_url and nng_dialer_get_url can only return:
453+
// - 0 (NNG_OK): success
454+
// - NNG_ENOENT: endpoint is invalid or was closed
455+
// See nni_listener_find/nni_dialer_find in nng/src/core/listener.c and dialer.c.
456+
const NNG_ENOENT: i32 = nng_sys::nng_err::NNG_ENOENT.0 as i32;
457+
458+
// SAFETY: pipe is valid (from self.pipe()).
459+
let listener = unsafe { nng_sys::nng_pipe_listener(pipe) };
460+
// SAFETY: listener is a stack value returned from nng_pipe_listener.
461+
if unsafe { nng_sys::nng_listener_id(listener) } > 0 {
462+
// SAFETY: listener is valid (positive ID), urlp is valid for writing.
463+
let result = unsafe { nng_sys::nng_listener_get_url(listener, &mut urlp) };
464+
match result {
465+
0 => {} // success, urlp is now set
466+
NNG_ENOENT => {
467+
tracing::debug!("listener was closed while retrieving URL");
416468
return None;
417469
}
418-
_ => {
419-
unreachable!(
420-
"nng_pipe_get_addr documentation claims err \"{errno}\" is never returned"
421-
);
470+
err => unreachable!("unexpected error from nng_listener_get_url: {err}"),
471+
}
472+
} else {
473+
// SAFETY: pipe is valid (from self.pipe()).
474+
let dialer = unsafe { nng_sys::nng_pipe_dialer(pipe) };
475+
// SAFETY: dialer is a stack value returned from nng_pipe_dialer.
476+
if unsafe { nng_sys::nng_dialer_id(dialer) } > 0 {
477+
// SAFETY: dialer is valid (positive ID), urlp is valid for writing.
478+
let result = unsafe { nng_sys::nng_dialer_get_url(dialer, &mut urlp) };
479+
match result {
480+
0 => {} // success, urlp is now set
481+
NNG_ENOENT => {
482+
tracing::debug!("dialer was closed while retrieving URL");
483+
return None;
484+
}
485+
err => unreachable!("unexpected error from nng_dialer_get_url: {err}"),
422486
}
487+
} else {
488+
// Neither listener nor dialer is valid - pipe may have been closed
489+
return None;
490+
}
491+
}
492+
493+
let scheme = if !urlp.is_null() {
494+
// SAFETY: urlp is non-null and was set by nng_listener_get_url or nng_dialer_get_url.
495+
let scheme_ptr = unsafe { nng_sys::nng_url_scheme(urlp) };
496+
if !scheme_ptr.is_null() {
497+
// SAFETY: scheme_ptr is non-null and points to a valid C string owned by NNG.
498+
unsafe { CStr::from_ptr(scheme_ptr) }
499+
} else {
500+
return None;
423501
}
502+
} else {
503+
return None;
424504
};
425505

426-
Addr::from_nng(addr)
506+
Url::from_nng(&addr, scheme)
427507
}
428508

429509
/// Reserves the minimum capacity for at least additional more bytes to be

0 commit comments

Comments
 (0)