Skip to content

Commit 4a71f22

Browse files
committed
Switch to SOCK_SEQPACKET
1 parent f7c2539 commit 4a71f22

File tree

7 files changed

+166
-86
lines changed

7 files changed

+166
-86
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ camino = "1.0.9"
4949
cap-std-ext = "0.26"
5050
cap-std = { version = "0.25", features = ["fs_utf8"] }
5151
# Explicitly force on libc
52-
rustix = { version = "0.35", features = ["use-libc"] }
52+
rustix = { version = "0.35", features = ["use-libc", "net"] }
5353
cap-primitives = "0.25.2"
5454
cap-tempfile = "0.25.2"
5555
chrono = { version = "0.4.19", features = ["serde"] }

Makefile-daemon.am

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ systemdunit_other_files = \
6969
$(NULL)
7070

7171
if CLIENT_SOCKET
72-
systemdunit_other_files += $(srcdir)/src/daemon/rpm-ostreed.socket
72+
systemdunit_other_files += $(srcdir)/src/daemon/rpm-ostreed.socket
7373
endif
7474

7575
systemdunit_DATA = \

configure.ac

+2-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ AC_ARG_ENABLE(featuresrs,
6969
AS_HELP_STRING([--enable-featuresrs],
7070
[Rust features, see Cargo.toml for more information]),,
7171
[enable_featuresrs=])
72-
AC_SUBST([RUST_FEATURES], $enable_featuresrs)
7372

7473
AC_ARG_ENABLE(client-socket,
7574
AS_HELP_STRING([--enable-client-socket],
@@ -78,6 +77,8 @@ AC_ARG_ENABLE(client-socket,
7877
AS_IF([test x$enable_client_socket = xyes], [enable_featuresrs="$enable_featuresrs client-socket"])
7978
AM_CONDITIONAL(CLIENT_SOCKET, [echo $enable_featuresrs | grep -q 'client-socket'])
8079

80+
AC_SUBST([RUST_FEATURES], $enable_featuresrs)
81+
8182
# Initialize libtool
8283
LT_PREREQ([2.2.4])
8384
LT_INIT([disable-static])

rust/src/client.rs

+36-15
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ use crate::core::OSTREE_BOOTED;
66
use crate::cxxrsutil::*;
77
use crate::ffi::SystemHostType;
88
use crate::utils;
9-
use anyhow::{anyhow, Result};
9+
use anyhow::{anyhow, Context, Result};
1010
use cap_std_ext::rustix;
1111
use gio::prelude::*;
1212
use ostree_ext::{gio, glib};
1313
use std::os::unix::io::IntoRawFd;
14-
use std::process::Command;
1514

1615
/// The well-known bus name.
1716
const BUS_NAME: &str = "org.projectatomic.rpmostree1";
@@ -48,7 +47,8 @@ impl ClientConnection {
4847
SYSROOT_PATH,
4948
"org.projectatomic.rpmostree1.Sysroot",
5049
gio::NONE_CANCELLABLE,
51-
)?;
50+
)
51+
.context("Initializing sysroot proxy")?;
5252
// Today the daemon mode requires running inside a booted deployment.
5353
let booted = sysroot_proxy
5454
.cached_property("Booted")
@@ -159,21 +159,40 @@ pub(crate) fn client_handle_fd_argument(
159159
/// returned cleanly.
160160
#[cfg(feature = "client-socket")]
161161
fn start_daemon_via_socket() -> Result<()> {
162-
let address = "/run/rpm-ostree/client.sock";
163-
tracing::debug!("Starting daemon via {address}");
164-
let s = std::os::unix::net::UnixStream::connect(address)
165-
.with_context(|| anyhow!("Failed to connect to {}", address))?;
166-
let mut s = std::io::BufReader::new(s);
167-
let mut r = String::new();
168-
s.read_to_string(&mut r)
169-
.context("Reading from client socket")?;
170-
if r.is_empty() {
171-
Ok(())
172-
} else {
173-
Err(anyhow!("{r}").into())
162+
use cap_std::io_lifetimes::IntoSocketlike;
163+
164+
let address = sockaddr()?;
165+
let socket = rustix::net::socket(
166+
rustix::net::AddressFamily::UNIX,
167+
rustix::net::SocketType::SEQPACKET,
168+
rustix::net::Protocol::from_raw(0),
169+
)?;
170+
let addr = crate::client::sockaddr()?;
171+
tracing::debug!("Starting daemon via {address:?}");
172+
rustix::net::connect_unix(&socket, &addr)
173+
.with_context(|| anyhow!("Failed to connect to {address:?}"))?;
174+
let socket = socket.into_socketlike();
175+
crate::daemon::write_message(
176+
&socket,
177+
crate::daemon::SocketMessage::ClientHello {
178+
selfid: crate::core::self_id()?,
179+
},
180+
)?;
181+
let resp = crate::daemon::recv_message(&socket)?;
182+
match resp {
183+
crate::daemon::SocketMessage::ServerOk => Ok(()),
184+
crate::daemon::SocketMessage::ServerError { msg } => {
185+
Err(anyhow!("server error: {msg}").into())
186+
}
187+
o => Err(anyhow!("unexpected message: {o:?}").into()),
174188
}
175189
}
176190

191+
/// Returns the address of the client socket.
192+
pub(crate) fn sockaddr() -> Result<rustix::net::SocketAddrUnix> {
193+
rustix::net::SocketAddrUnix::new("/run/rpm-ostree/client.sock").map_err(anyhow::Error::msg)
194+
}
195+
177196
/// Explicitly ensure the daemon is started via systemd, if possible.
178197
///
179198
/// This works around bugs from DBus activation, see
@@ -186,6 +205,8 @@ fn start_daemon_via_socket() -> Result<()> {
186205
/// What we really should do probably is use native socket activation.
187206
#[cfg(not(feature = "client-socket"))]
188207
fn start_daemon_via_systemctl() -> Result<()> {
208+
use std::process::Command;
209+
189210
let service = "rpm-ostreed.service";
190211
// Assume non-root can't use systemd right now.
191212
if rustix::process::getuid().as_raw() != 0 {

rust/src/core.rs

+8
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,14 @@ fn stage_container_rpm_files(rpms: Vec<File>) -> CxxResult<Vec<String>> {
329329
Ok(r)
330330
}
331331

332+
/// Return an opaque identifier for the current executing binary. This can
333+
/// be passed via IPC to verify that client and server are running the same code.
334+
pub(crate) fn self_id() -> Result<String> {
335+
use std::os::unix::fs::MetadataExt;
336+
let metadata = std::fs::metadata("/proc/self/exe").context("Failed to read /proc/self/exe")?;
337+
Ok(format!("dev={};inode={}", metadata.dev(), metadata.ino()))
338+
}
339+
332340
#[cfg(test)]
333341
mod test {
334342
use super::*;

rust/src/daemon.rs

+117-67
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::ffi::{
1010
};
1111
use anyhow::{anyhow, format_err, Context, Result};
1212
use cap_std::fs::Dir;
13+
use cap_std::io_lifetimes::{IntoSocketlike, OwnedFd, OwnedSocketlike};
1314
use cap_std_ext::dirext::CapStdExtDirExt;
1415
use cap_std_ext::{cap_std, rustix};
1516
use fn_error_context::context;
@@ -18,16 +19,29 @@ use once_cell::sync::Lazy;
1819
use ostree_ext::{gio, glib, ostree};
1920
use rustix::fd::{BorrowedFd, FromRawFd};
2021
use rustix::fs::MetadataExt;
22+
use serde::{Deserialize, Serialize};
2123
use std::collections::{BTreeMap, BTreeSet};
22-
use std::io::{Read, Write};
24+
use std::io::Read;
2325
use std::os::unix::fs::PermissionsExt;
2426
use std::path::Path;
2527
use std::sync::Mutex;
26-
use tokio::net::{UnixListener, UnixStream};
27-
use tokio::sync::oneshot::{Receiver, Sender};
28+
use tokio::sync::oneshot::Sender;
2829

2930
const RPM_OSTREED_COMMIT_VERIFICATION_CACHE: &str = "rpm-ostree/gpgcheck-cache";
3031

32+
// Messages sent across the socket
33+
#[derive(Debug, Serialize, Deserialize)]
34+
pub(crate) enum SocketMessage {
35+
ClientHello { selfid: String },
36+
ServerOk,
37+
ServerError { msg: String },
38+
}
39+
40+
impl SocketMessage {
41+
// Maximum size of a message.
42+
pub(crate) const BUFSIZE: usize = 8192;
43+
}
44+
3145
/// Validate basic assumptions on daemon startup.
3246
pub(crate) fn daemon_sanitycheck_environment(sysroot: &crate::FFIOstreeSysroot) -> CxxResult<()> {
3347
let sysroot = &sysroot.glib_reborrow();
@@ -135,57 +149,83 @@ fn deployment_populate_variant_origin(
135149
Ok(())
136150
}
137151

138-
async fn send_ok_result_to_client(_client: UnixStream) {
139-
// On success we close the stream without writing anything,
140-
// which acknowledges successful startup to the client.
141-
// In the future we may actually implement a protocol here, so this
142-
// is stubbed out as a full async fn in preparation for that.
152+
pub(crate) fn write_message(conn: &OwnedFd, message: SocketMessage) -> Result<()> {
153+
let sendbuf = serde_json::to_vec(&message)?;
154+
rustix::net::send(conn, &sendbuf, rustix::net::SendFlags::empty())?;
155+
Ok(())
156+
}
157+
158+
pub(crate) fn recv_message(conn: &OwnedFd) -> Result<SocketMessage> {
159+
let mut buf = [0u8; SocketMessage::BUFSIZE];
160+
let n = rustix::net::recv(&conn, &mut buf, rustix::net::RecvFlags::empty())?;
161+
if n == SocketMessage::BUFSIZE {
162+
anyhow::bail!("Buffer filled to {n} bytes when reading");
163+
}
164+
assert!(n < SocketMessage::BUFSIZE);
165+
let buf = &buf[0..n];
166+
let msg: SocketMessage =
167+
serde_json::from_slice(buf).context("Parsing client message")?;
168+
Ok(msg)
169+
}
170+
171+
fn client_hello(client: OwnedSocketlike, e: anyhow::Result<()>) -> Result<()> {
172+
let msg = recv_message(&client)?;
173+
let reply = match msg {
174+
SocketMessage::ClientHello { selfid } => {
175+
let myid = crate::core::self_id()?;
176+
if selfid != myid {
177+
// For now, make this not an error
178+
tracing::warn!("Client reported id: {selfid} different from mine: {myid}");
179+
}
180+
match e {
181+
Ok(()) => SocketMessage::ServerOk,
182+
Err(e) => SocketMessage::ServerError {
183+
msg: format!("{e}"),
184+
},
185+
}
186+
}
187+
o => SocketMessage::ServerError {
188+
msg: format!("Unexpected message: {o:?}"),
189+
},
190+
};
191+
write_message(&client, reply).context("Writing client reply")?;
143192
tracing::debug!("Acknowleged client");
193+
Ok(())
144194
}
145195

146196
static SHUTDOWN_SIGNAL: Lazy<Mutex<Option<Sender<()>>>> = Lazy::new(|| Mutex::new(None));
147197

148-
async fn process_clients_with_ok(listener: UnixListener, mut cancel: Receiver<()>) {
198+
fn run_acknowledgement_worker(listener: OwnedSocketlike) {
149199
tracing::debug!("Processing clients...");
150200
loop {
151-
tokio::select! {
152-
_ = &mut cancel => {
153-
tracing::debug!("Got cancellation event");
154-
return
201+
let sock = match rustix::net::accept(&listener) {
202+
Ok(s) => s,
203+
Err(e) => {
204+
tracing::warn!("Failed to accept client: {e}");
205+
continue;
155206
}
156-
r = listener.accept() => {
157-
match r {
158-
Ok((stream, _addr)) => {
159-
send_ok_result_to_client(stream).await;
160-
},
161-
Err(e) => {
162-
tracing::debug!("failed to accept client: {e}")
163-
}
164-
}
207+
};
208+
std::thread::spawn(move || {
209+
if let Err(e) = client_hello(sock.into_socketlike(), Ok(())) {
210+
tracing::warn!("error acknowledging client: {e}");
165211
}
166-
}
212+
});
167213
}
168214
}
169215

170216
/// Ensure all asynchronous tasks in this Rust half of the daemon code are stopped.
171217
/// Called from C++.
172218
pub(crate) fn daemon_terminate() {
173-
let chan = (*SHUTDOWN_SIGNAL).lock().unwrap().take().unwrap();
174-
let _ = chan.send(());
219+
if let Some(chan) = (*SHUTDOWN_SIGNAL).lock().unwrap().take() {
220+
let _ = chan.send(());
221+
}
175222
}
176223

177-
fn process_one_client(listener: std::os::unix::net::UnixListener, e: anyhow::Error) -> Result<()> {
178-
let mut incoming = match listener.incoming().next() {
179-
Some(r) => r?,
180-
None => {
181-
anyhow::bail!("Expected to find client socket from activation");
182-
}
183-
};
184-
185-
let buf = format!("{e}");
186-
incoming.write_all(buf.as_bytes())?;
187-
188-
todo!()
224+
fn process_one_client(listener: OwnedSocketlike, e: anyhow::Error) -> Result<()> {
225+
let incoming = rustix::net::accept(&listener)?;
226+
client_hello(incoming.into_socketlike(), Err(e))?;
227+
// Now that we've acknowledged one client, exit the process with an error
228+
Ok(())
189229
}
190230

191231
/// Perform initialization steps required by systemd service activation.
@@ -195,7 +235,6 @@ fn process_one_client(listener: std::os::unix::net::UnixListener, e: anyhow::Err
195235
pub(crate) fn daemon_main(debug: bool) -> Result<()> {
196236
let handle = tokio::runtime::Handle::current();
197237
let _tokio_guard = handle.enter();
198-
use std::os::unix::net::UnixListener as StdUnixListener;
199238
if !systemd::daemon::booted()? {
200239
return Err(anyhow!("not running as a systemd service"));
201240
}
@@ -204,54 +243,65 @@ pub(crate) fn daemon_main(debug: bool) -> Result<()> {
204243
tracing::debug!("Initialization result: {init_res:?}");
205244

206245
let mut fds = systemd::daemon::listen_fds(false)?.iter();
207-
let listener = match fds.next() {
246+
let (listener, init_res) = match fds.next() {
208247
None => {
209248
// If started directly via `systemctl start` or DBus activation, we
210-
// directly propagate the error back to our exit code.
249+
// directly propagate the error back to our exit code without even bothering
250+
// with a socket.
211251
init_res?;
212252
tracing::debug!("Initializing directly (not socket activated)");
213-
cfg!(feature = "client-socket")
214-
.then(|| StdUnixListener::bind("/run/rpm-ostree/client.sock"))
253+
let listener = cfg!(feature = "client-socket")
254+
.then(|| {
255+
let socket = rustix::net::socket(
256+
rustix::net::AddressFamily::UNIX,
257+
rustix::net::SocketType::SEQPACKET,
258+
rustix::net::Protocol::from_raw(0),
259+
)?;
260+
let addr = crate::client::sockaddr()?;
261+
rustix::net::bind_unix(&socket, &addr)?;
262+
Ok::<_, anyhow::Error>(socket.into_socketlike())
263+
})
215264
.transpose()
216-
.context("Binding to socket")?
265+
.context("Binding to socket")?;
266+
(listener, Ok(()))
217267
}
218268
Some(fd) => {
219269
if fds.next().is_some() {
220270
return Err(anyhow!("Expected exactly 1 fd from systemd activation"));
221271
}
222272
tracing::debug!("Initializing from socket activation; fd={fd}");
223-
let listener = unsafe { StdUnixListener::from_raw_fd(fd) };
224-
225-
match init_res {
226-
Ok(_) => Some(listener),
227-
Err(e) => {
228-
let err_copy = anyhow!("{e}");
229-
tracing::debug!("Reporting initialization error: {e}");
230-
match process_one_client(listener, err_copy) {
231-
Ok(()) => {
232-
tracing::debug!("Acknowleged initial client");
233-
}
234-
Err(e) => {
235-
tracing::debug!("Caught error while processing client {e}");
236-
}
237-
}
238-
return Err(e);
239-
}
240-
}
273+
let listener = unsafe { OwnedFd::from_raw_fd(fd) }.into_socketlike();
274+
// In the socket case, we will process the initialization error later.
275+
(Some(listener), init_res)
241276
}
242277
};
243278

244279
if let Some(listener) = listener {
245-
let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel();
246-
(*SHUTDOWN_SIGNAL).lock().unwrap().replace(shutdown_send);
247-
248-
let listener = UnixListener::from_std(listener)?;
249-
250280
// On success, we spawn a helper task that just responds with
251281
// sucess to clients that connect via the socket. In the future,
252282
// perhaps we'll expose an API here.
253283
tracing::debug!("Spawning acknowledgement task");
254-
tokio::task::spawn(async { process_clients_with_ok(listener, shutdown_recv).await });
284+
match init_res {
285+
Ok(()) => {
286+
std::thread::spawn(move || run_acknowledgement_worker(listener));
287+
}
288+
Err(e) => {
289+
let err_copy = anyhow::format_err!("{e}");
290+
let r = std::thread::spawn(move || {
291+
if let Err(suberr) = process_one_client(listener, err_copy) {
292+
tracing::warn!("Failed to respond to client: {suberr}")
293+
}
294+
});
295+
// Block until we've written the reply to the client;
296+
if let Err(e) = r.join() {
297+
tracing::warn!("Failed to join response thread: {e:?}");
298+
}
299+
// And finally propagate out the error
300+
return Err(e);
301+
}
302+
};
303+
} else {
304+
init_res?;
255305
}
256306

257307
tracing::debug!("Entering daemon mainloop");

0 commit comments

Comments
 (0)