Skip to content

Commit b5c3a11

Browse files
authored
[runtime] Change send and recv signatures (#929)
1 parent d824eca commit b5c3a11

14 files changed

Lines changed: 709 additions & 576 deletions

File tree

runtime/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
1818
//! expect breaking changes and occasional instability.
1919
20+
use commonware_utils::{StableBuf, StableBufMut};
2021
use prometheus_client::registry::Metric;
2122
use std::io::Error as IoError;
2223
use std::{
@@ -272,15 +273,15 @@ pub trait Listener: Sync + Send + 'static {
272273
/// messages over a network connection.
273274
pub trait Sink: Sync + Send + 'static {
274275
/// Send a message to the sink.
275-
fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
276+
fn send<B: StableBuf>(&mut self, msg: B) -> impl Future<Output = Result<(), Error>> + Send;
276277
}
277278

278279
/// Interface that any runtime must implement to receive
279280
/// messages over a network connection.
280281
pub trait Stream: Sync + Send + 'static {
281282
/// Receive a message from the stream, storing it in the given buffer.
282283
/// Reads exactly the number of bytes that fit in the buffer.
283-
fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
284+
fn recv<B: StableBufMut>(&mut self, buf: B) -> impl Future<Output = Result<B, Error>> + Send;
284285
}
285286

286287
/// Interface to interact with storage.
@@ -356,6 +357,7 @@ pub trait Blob: Clone + Send + Sync + 'static {
356357
#[cfg(test)]
357358
mod tests {
358359
use super::*;
360+
use bytes::Bytes;
359361
use commonware_macros::select;
360362
use futures::channel::oneshot;
361363
use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
@@ -1325,8 +1327,7 @@ mod tests {
13251327
async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
13261328
let mut line = Vec::new();
13271329
loop {
1328-
let mut byte = [0; 1];
1329-
stream.recv(&mut byte).await?;
1330+
let byte = stream.recv(vec![0; 1]).await?;
13301331
if byte[0] == b'\n' {
13311332
if line.last() == Some(&b'\r') {
13321333
line.pop(); // Remove trailing \r
@@ -1359,9 +1360,8 @@ mod tests {
13591360
stream: &mut St,
13601361
content_length: usize,
13611362
) -> Result<String, Error> {
1362-
let mut body = vec![0; content_length];
1363-
stream.recv(&mut body).await?;
1364-
String::from_utf8(body).map_err(|_| Error::ReadFailed)
1363+
let read = stream.recv(vec![0; content_length]).await?;
1364+
String::from_utf8(read).map_err(|_| Error::ReadFailed)
13651365
}
13661366

13671367
// Simulate a client connecting to the server
@@ -1384,7 +1384,7 @@ mod tests {
13841384
"GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
13851385
address
13861386
);
1387-
sink.send(request.as_bytes()).await.unwrap();
1387+
sink.send(Bytes::from(request)).await.unwrap();
13881388

13891389
// Read and verify the HTTP status line
13901390
let status_line = read_line(&mut stream).await.unwrap();

runtime/src/mocks.rs

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use crate::{Error, Sink as SinkTrait, Stream as StreamTrait};
44
use bytes::Bytes;
5+
use commonware_utils::{StableBuf, StableBufMut};
56
use futures::channel::oneshot;
67
use std::{
78
collections::VecDeque,
@@ -41,10 +42,10 @@ pub struct Sink {
4142
}
4243

4344
impl SinkTrait for Sink {
44-
async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
45+
async fn send<B: StableBuf>(&mut self, msg: B) -> Result<(), Error> {
4546
let (os_send, data) = {
4647
let mut channel = self.channel.lock().unwrap();
47-
channel.buffer.extend(msg);
48+
channel.buffer.extend(msg.as_ref());
4849

4950
// If there is a waiter and the buffer is large enough,
5051
// return the waiter (while clearing the waiter field).
@@ -74,16 +75,16 @@ pub struct Stream {
7475
}
7576

7677
impl StreamTrait for Stream {
77-
async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
78+
async fn recv<B: StableBufMut>(&mut self, mut buf: B) -> Result<B, Error> {
7879
let os_recv = {
7980
let mut channel = self.channel.lock().unwrap();
8081

8182
// If the message is fully available in the buffer,
8283
// drain the value into buf and return.
8384
if channel.buffer.len() >= buf.len() {
8485
let b: Vec<u8> = channel.buffer.drain(0..buf.len()).collect();
85-
buf.copy_from_slice(&b);
86-
return Ok(());
86+
buf.put_slice(&b);
87+
return Ok(buf);
8788
}
8889

8990
// Otherwise, populate the waiter.
@@ -95,8 +96,9 @@ impl StreamTrait for Stream {
9596

9697
// Wait for the waiter to be resolved.
9798
let data = os_recv.await.map_err(|_| Error::RecvFailed)?;
98-
buf.copy_from_slice(&data);
99-
Ok(())
99+
assert_eq!(data.len(), buf.len());
100+
buf.put_slice(&data);
101+
Ok(buf)
100102
}
101103
}
102104

@@ -111,52 +113,45 @@ mod tests {
111113
#[test]
112114
fn test_send_recv() {
113115
let (mut sink, mut stream) = Channel::init();
114-
115-
let data = b"hello world";
116-
let mut buf = vec![0; data.len()];
116+
let data = b"hello world".to_vec();
117117

118118
block_on(async {
119-
sink.send(data).await.unwrap();
120-
stream.recv(&mut buf).await.unwrap();
119+
sink.send(data.clone()).await.unwrap();
120+
let buf = stream.recv(vec![0; data.len()]).await.unwrap();
121+
assert_eq!(buf, data);
121122
});
122-
123-
assert_eq!(buf, data);
124123
}
125124

126125
#[test]
127126
fn test_send_recv_partial_multiple() {
128127
let (mut sink, mut stream) = Channel::init();
129-
130-
let data1 = b"hello";
131-
let data2 = b"world";
132-
let mut buf1 = vec![0; data1.len()];
133-
let mut buf2 = vec![0; data2.len()];
128+
let data = b"hello".to_vec();
129+
let data2 = b" world".to_vec();
134130

135131
block_on(async {
136-
sink.send(data1).await.unwrap();
132+
sink.send(data).await.unwrap();
137133
sink.send(data2).await.unwrap();
138-
stream.recv(&mut buf1[0..3]).await.unwrap();
139-
stream.recv(&mut buf1[3..]).await.unwrap();
140-
stream.recv(&mut buf2).await.unwrap();
134+
let buf = stream.recv(vec![0; 5]).await.unwrap();
135+
assert_eq!(buf, b"hello");
136+
let buf = stream.recv(buf).await.unwrap();
137+
assert_eq!(buf, b" worl");
138+
let buf = stream.recv(vec![0; 1]).await.unwrap();
139+
assert_eq!(buf, b"d");
141140
});
142-
143-
assert_eq!(buf1, data1);
144-
assert_eq!(buf2, data2);
145141
}
146142

147143
#[test]
148144
fn test_send_recv_async() {
149145
let (mut sink, mut stream) = Channel::init();
150146

151147
let data = b"hello world";
152-
let mut buf = vec![0; data.len()];
153-
154-
block_on(async {
155-
futures::try_join!(stream.recv(&mut buf), async {
148+
let buf = block_on(async {
149+
futures::try_join!(stream.recv(vec![0; data.len()]), async {
156150
sleep(Duration::from_millis(10_000));
157-
sink.send(data).await
151+
sink.send(data.to_vec()).await
158152
},)
159-
.unwrap();
153+
.unwrap()
154+
.0
160155
});
161156

162157
assert_eq!(buf, data);
@@ -170,8 +165,7 @@ mod tests {
170165
// If the oneshot sender is dropped before the oneshot receiver is resolved,
171166
// the recv function should return an error.
172167
executor.start(|_| async move {
173-
let mut buf = vec![0; 5];
174-
let (v, _) = join!(stream.recv(&mut buf), async {
168+
let (v, _) = join!(stream.recv(vec![0; 5]), async {
175169
// Take the waiter and drop it.
176170
sink.channel.lock().unwrap().waiter.take();
177171
},);
@@ -187,12 +181,10 @@ mod tests {
187181
// If the waiter value has a min, but the oneshot receiver is dropped,
188182
// the send function should return an error when attempting to send the data.
189183
executor.start(|context| async move {
190-
let mut buf = vec![0; 5];
191-
192184
// Create a waiter using a recv call.
193185
// But then drop the receiver.
194186
select! {
195-
v = stream.recv(&mut buf) => {
187+
v = stream.recv( vec![0;5]) => {
196188
panic!("unexpected value: {:?}", v);
197189
},
198190
_ = context.sleep(Duration::from_millis(100)) => {
@@ -202,7 +194,7 @@ mod tests {
202194
drop(stream);
203195

204196
// Try to send a message (longer than the requested amount), but the receiver is dropped.
205-
let result = sink.send(b"hello world").await;
197+
let result = sink.send(b"hello world".to_vec()).await;
206198
assert!(matches!(result, Err(Error::SendFailed)));
207199
});
208200
}
@@ -214,9 +206,8 @@ mod tests {
214206

215207
// If there is no data to read, test that the recv function just blocks. A timeout should return first.
216208
executor.start(|context| async move {
217-
let mut buf = vec![0; 5];
218209
select! {
219-
v = stream.recv(&mut buf) => {
210+
v = stream.recv(vec![0;5]) => {
220211
panic!("unexpected value: {:?}", v);
221212
},
222213
_ = context.sleep(Duration::from_millis(100)) => {

runtime/src/network/audited.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::{deterministic::Auditor, Error, SinkOf, StreamOf};
2+
use commonware_utils::{StableBuf, StableBufMut};
23
use sha2::Digest;
34
use std::{net::SocketAddr, sync::Arc};
45

@@ -10,10 +11,10 @@ pub struct Sink<S: crate::Sink> {
1011
}
1112

1213
impl<S: crate::Sink> crate::Sink for Sink<S> {
13-
async fn send(&mut self, data: &[u8]) -> Result<(), Error> {
14+
async fn send<B: StableBuf>(&mut self, data: B) -> Result<(), Error> {
1415
self.auditor.event(b"send_attempt", |hasher| {
1516
hasher.update(self.remote_addr.to_string().as_bytes());
16-
hasher.update(data);
17+
hasher.update(data.as_ref());
1718
});
1819

1920
self.inner.send(data).await.inspect_err(|e| {
@@ -25,7 +26,6 @@ impl<S: crate::Sink> crate::Sink for Sink<S> {
2526

2627
self.auditor.event(b"send_success", |hasher| {
2728
hasher.update(self.remote_addr.to_string().as_bytes());
28-
hasher.update(data);
2929
});
3030
Ok(())
3131
}
@@ -39,12 +39,12 @@ pub struct Stream<S: crate::Stream> {
3939
}
4040

4141
impl<S: crate::Stream> crate::Stream for Stream<S> {
42-
async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
42+
async fn recv<B: StableBufMut>(&mut self, buf: B) -> Result<B, Error> {
4343
self.auditor.event(b"recv_attempt", |hasher| {
4444
hasher.update(self.remote_addr.to_string().as_bytes());
4545
});
4646

47-
self.inner.recv(buf).await.inspect_err(|e| {
47+
let buf = self.inner.recv(buf).await.inspect_err(|e| {
4848
self.auditor.event(b"recv_failure", |hasher| {
4949
hasher.update(self.remote_addr.to_string().as_bytes());
5050
hasher.update(e.to_string().as_bytes());
@@ -53,9 +53,9 @@ impl<S: crate::Stream> crate::Stream for Stream<S> {
5353

5454
self.auditor.event(b"recv_success", |hasher| {
5555
hasher.update(self.remote_addr.to_string().as_bytes());
56-
hasher.update(buf);
56+
hasher.update(buf.as_ref());
5757
});
58-
Ok(())
58+
Ok(buf)
5959
}
6060
}
6161

@@ -242,12 +242,11 @@ mod tests {
242242
let (_, mut sink, mut stream) = listener.accept().await.unwrap();
243243

244244
// Receive data from client
245-
let mut buf = [0u8; CLIENT_MSG.len()];
246-
stream.recv(&mut buf).await.unwrap();
245+
let buf = stream.recv(vec![0; CLIENT_MSG.len()]).await.unwrap();
247246
assert_eq!(&buf, CLIENT_MSG.as_bytes());
248247

249248
// Send response
250-
sink.send(SERVER_MSG.as_bytes()).await.unwrap();
249+
sink.send(Vec::from(SERVER_MSG)).await.unwrap();
251250
});
252251
server_handles.push(handle);
253252
}
@@ -261,11 +260,10 @@ mod tests {
261260
let (mut sink, mut stream) = network.dial(listener_addr).await.unwrap();
262261

263262
// Send data to server
264-
sink.send(CLIENT_MSG.as_bytes()).await.unwrap();
263+
sink.send(Vec::from(CLIENT_MSG)).await.unwrap();
265264

266265
// Receive response
267-
let mut buf = [0u8; SERVER_MSG.len()];
268-
stream.recv(&mut buf).await.unwrap();
266+
let buf = stream.recv(vec![0; SERVER_MSG.len()]).await.unwrap();
269267
assert_eq!(&buf, SERVER_MSG.as_bytes());
270268
});
271269
client_handles.push(handle);

runtime/src/network/deterministic.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{mocks, Error};
1+
use crate::{mocks, Error, StableBuf, StableBufMut};
22
use futures::{channel::mpsc, SinkExt as _, StreamExt as _};
33
use std::{
44
collections::HashMap,
@@ -16,7 +16,7 @@ pub struct Sink {
1616
}
1717

1818
impl crate::Sink for Sink {
19-
async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
19+
async fn send<B: StableBuf>(&mut self, msg: B) -> Result<(), Error> {
2020
self.sender.send(msg).await.map_err(|_| Error::SendFailed)
2121
}
2222
}
@@ -27,7 +27,7 @@ pub struct Stream {
2727
}
2828

2929
impl crate::Stream for Stream {
30-
async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
30+
async fn recv<B: StableBufMut>(&mut self, buf: B) -> Result<B, Error> {
3131
self.receiver.recv(buf).await.map_err(|_| Error::RecvFailed)
3232
}
3333
}

0 commit comments

Comments
 (0)