1
+ //! # A minimal RPC library for use with [iroh].
2
+ //!
3
+ //! ## Goals
4
+ //!
5
+ //! The main goal of this library is to provide an rpc framework that is so
6
+ //! lightweight that it can be also used for async boundaries within a single
7
+ //! process without any overhead, instead of the usual practice of a mpsc channel
8
+ //! with a giant message enum where each enum case contains mpsc or oneshot
9
+ //! backchannels.
10
+ //!
11
+ //! The second goal is to lightly abstract over remote and local communication,
12
+ //! so that a system can be interacted with cross process or even across networks.
13
+ //!
14
+ //! ## Non-goals
15
+ //!
16
+ //! - Cross language interop. This is for talking from rust to rust
17
+ //! - Any kind of versioning. You have to do this yourself
18
+ //! - Making remote message passing look like local async function calls
19
+ //! - Being runtime agnostic. This is for tokio
20
+ //!
21
+ //! ## Interaction patterns
22
+ //!
23
+ //! For each request, there can be a response and update channel. Each channel
24
+ //! can be either oneshot, carry multiple messages, or be disabled. This enables
25
+ //! the typical interaction patterns known from libraries like grpc:
26
+ //!
27
+ //! - rpc: 1 request, 1 response
28
+ //! - server streaming: 1 request, multiple responses
29
+ //! - client streaming: multiple requests, 1 response
30
+ //! - bidi streaming: multiple requests, multiple responses
31
+ //!
32
+ //! as well as more complex patterns. It is however not possible to have multiple
33
+ //! differently typed tx channels for a single message type.
34
+ //!
35
+ //! ## Transports
36
+ //!
37
+ //! We don't abstract over the send and receive stream. These must always be
38
+ //! quinn streams, specifically streams from the [iroh quinn fork].
39
+ //!
40
+ //! This restricts the possible rpc transports to quinn (QUIC with dial by
41
+ //! socket address) and iroh (QUIC with dial by node id).
42
+ //!
43
+ //! An upside of this is that the quinn streams can be tuned for each rpc
44
+ //! request, e.g. by setting the stream priority or by directy using more
45
+ //! advanced part of the quinn SendStream and RecvStream APIs such as out of
46
+ //! order receiving.
47
+ //!
48
+ //! ## Serialization
49
+ //!
50
+ //! Serialization is currently done using [postcard]. Messages are always
51
+ //! length prefixed with postcard varints, even in the case of oneshot
52
+ //! channels.
53
+ //!
54
+ //! ## Features
55
+ //!
56
+ //! - `rpc`: Enable the rpc features. Enabled by default.
57
+ //! By disabling this feature, all rpc related dependencies are removed.
58
+ //! The remaining dependencies are just serde, tokio and tokio-util.
59
+ //! - `message_spans`: Enable tracing spans for messages. Enabled by default.
60
+ //! This is useful even without rpc, to not lose tracing context when message
61
+ //! passing. This is frequently done manually. This obviously requires
62
+ //! a dependency on tracing.
63
+ //! - `test`: Test features. Mostly easy way to create two connected quinn endpoints.
64
+ //!
65
+ //! - [iroh]: https://docs.rs/iroh/latest/iroh/index.html
66
+ //! - [quinn]: https://docs.rs/quinn/latest/quinn/index.html
67
+ //! - [bytes]: https://docs.rs/bytes/latest/bytes/index.html
68
+ //! - [iroh quinn fork]: https://docs.rs/iroh-quinn/latest/iroh-quinn/index.html
1
69
#![ cfg_attr( quicrpc_docsrs, feature( doc_cfg) ) ]
2
70
use std:: { fmt:: Debug , future:: Future , io, marker:: PhantomData , ops:: Deref } ;
3
71
@@ -28,6 +96,9 @@ impl<T> RpcMessage for T where
28
96
///
29
97
/// This is usually implemented by a zero-sized struct.
30
98
/// It has various bounds to make derives easier.
99
+ ///
100
+ /// A service acts as a scope for defining the tx and rx channels for each
101
+ /// message type, and provides some type safety when sending messages.
31
102
pub trait Service : Send + Sync + Debug + Clone + ' static { }
32
103
33
104
mod sealed {
@@ -40,7 +111,7 @@ pub trait Sender: Debug + Sealed {}
40
111
/// Sealed marker trait for a receiver
41
112
pub trait Receiver : Debug + Sealed { }
42
113
43
- /// Channels to be used for a message and service
114
+ /// Trait to specify channels for a message and service
44
115
pub trait Channels < S : Service > {
45
116
/// The sender type, can be either spsc, oneshot or none
46
117
type Tx : Sender ;
@@ -76,17 +147,35 @@ pub mod channel {
76
147
use super :: { RecvError , SendError } ;
77
148
use crate :: util:: FusedOneshotReceiver ;
78
149
150
+ /// Create a local oneshot sender and receiver pair.
151
+ ///
152
+ /// This is currently using a tokio channel pair internally.
79
153
pub fn channel < T > ( ) -> ( Sender < T > , Receiver < T > ) {
80
154
let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
81
155
( tx. into ( ) , rx. into ( ) )
82
156
}
83
157
158
+ /// A generic boxed sender.
159
+ ///
160
+ /// Remote senders are always boxed, since for remote communication the boxing
161
+ /// overhead is negligible. However, boxing can also be used for local communication,
162
+ /// e.g. when applying a transform or filter to the message before sending it.
84
163
pub type BoxedSender < T > = Box <
85
164
dyn FnOnce ( T ) -> crate :: BoxedFuture < ' static , io:: Result < ( ) > > + Send + Sync + ' static ,
86
165
> ;
87
166
167
+ /// A generic boxed receiver
168
+ ///
169
+ /// Remote receivers are always boxed, since for remote communication the boxing
170
+ /// overhead is negligible. However, boxing can also be used for local communication,
171
+ /// e.g. when applying a transform or filter to the message before receiving it.
88
172
pub type BoxedReceiver < T > = crate :: BoxedFuture < ' static , io:: Result < T > > ;
89
173
174
+ /// A oneshot sender.
175
+ ///
176
+ /// Compared to a local onehsot sender, sending a message is async since in the case
177
+ /// of remote communication, sending over the wire is async. Other than that it
178
+ /// behaves like a local oneshot sender and has no overhead in the local case.
90
179
pub enum Sender < T > {
91
180
Tokio ( tokio:: sync:: oneshot:: Sender < T > ) ,
92
181
Boxed ( BoxedSender < T > ) ,
@@ -119,6 +208,10 @@ pub mod channel {
119
208
}
120
209
121
210
impl < T > Sender < T > {
211
+ /// Send a message
212
+ ///
213
+ /// If this is a boxed sender that represents a remote connection, sending may yield or fail with an io error.
214
+ /// Local senders will never yield, but can fail if the receiver has been closed.
122
215
pub async fn send ( self , value : T ) -> std:: result:: Result < ( ) , SendError > {
123
216
match self {
124
217
Sender :: Tokio ( tx) => tx. send ( value) . map_err ( |_| SendError :: ReceiverClosed ) ,
@@ -128,6 +221,7 @@ pub mod channel {
128
221
}
129
222
130
223
impl < T > Sender < T > {
224
+ /// Check if this is a remote sender
131
225
pub fn is_rpc ( & self ) -> bool
132
226
where
133
227
T : ' static ,
@@ -142,6 +236,10 @@ pub mod channel {
142
236
impl < T > crate :: sealed:: Sealed for Sender < T > { }
143
237
impl < T > crate :: Sender for Sender < T > { }
144
238
239
+ /// A oneshot receiver.
240
+ ///
241
+ /// Compared to a local oneshot receiver, receiving a message can fail not just
242
+ /// when the sender has been closed, but also when the remote connection fails.
145
243
pub enum Receiver < T > {
146
244
Tokio ( FusedOneshotReceiver < T > ) ,
147
245
Boxed ( BoxedReceiver < T > ) ,
@@ -209,11 +307,22 @@ pub mod channel {
209
307
use super :: { RecvError , SendError } ;
210
308
use crate :: RpcMessage ;
211
309
310
+ /// Create a local spsc sender and receiver pair, with the given buffer size.
311
+ ///
312
+ /// This is currently using a tokio channel pair internally.
212
313
pub fn channel < T > ( buffer : usize ) -> ( Sender < T > , Receiver < T > ) {
213
314
let ( tx, rx) = tokio:: sync:: mpsc:: channel ( buffer) ;
214
315
( tx. into ( ) , rx. into ( ) )
215
316
}
216
317
318
+ /// Single producer, single consumer sender.
319
+ ///
320
+ /// For the local case, this wraps a tokio::sync::mpsc::Sender. However,
321
+ /// due to the fact that a stream to a remote service can not be cloned,
322
+ /// this can also not be cloned.
323
+ ///
324
+ /// This forces you to use senders in a linear way, passing out references
325
+ /// to the sender to other tasks instead of cloning it.
217
326
pub enum Sender < T > {
218
327
Tokio ( tokio:: sync:: mpsc:: Sender < T > ) ,
219
328
Boxed ( Box < dyn BoxedSender < T > > ) ,
@@ -249,16 +358,26 @@ pub mod channel {
249
358
}
250
359
251
360
pub trait BoxedSender < T > : Debug + Send + Sync + ' static {
361
+ /// Send a message.
362
+ ///
363
+ /// For the remote case, if the message can not be completely sent,
364
+ /// this must return an error and disable the channel.
252
365
fn send (
253
366
& mut self ,
254
367
value : T ,
255
368
) -> Pin < Box < dyn Future < Output = io:: Result < ( ) > > + Send + ' _ > > ;
256
369
370
+ /// Try to send a message, returning as fast as possible if sending
371
+ /// is not currently possible.
372
+ ///
373
+ /// For the remote case, it must be guaranteed that the message is
374
+ /// either completely sent or not at all.
257
375
fn try_send (
258
376
& mut self ,
259
377
value : T ,
260
378
) -> Pin < Box < dyn Future < Output = io:: Result < bool > > + Send + ' _ > > ;
261
379
380
+ /// True if this is a remote sender
262
381
fn is_rpc ( & self ) -> bool ;
263
382
}
264
383
@@ -271,13 +390,18 @@ pub mod channel {
271
390
impl < T > Debug for Sender < T > {
272
391
fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
273
392
match self {
274
- Self :: Tokio ( _) => f. debug_tuple ( "Tokio" ) . finish ( ) ,
275
- Self :: Boxed ( _) => f. debug_tuple ( "Boxed" ) . finish ( ) ,
393
+ Self :: Tokio ( x) => f
394
+ . debug_struct ( "Tokio" )
395
+ . field ( "avail" , & x. capacity ( ) )
396
+ . field ( "cap" , & x. max_capacity ( ) )
397
+ . finish ( ) ,
398
+ Self :: Boxed ( inner) => f. debug_tuple ( "Boxed" ) . field ( & inner) . finish ( ) ,
276
399
}
277
400
}
278
401
}
279
402
280
403
impl < T : RpcMessage > Sender < T > {
404
+ /// Send a message and yield until either it is sent or an error occurs.
281
405
pub async fn send ( & mut self , value : T ) -> std:: result:: Result < ( ) , SendError > {
282
406
match self {
283
407
Sender :: Tokio ( tx) => {
@@ -287,6 +411,20 @@ pub mod channel {
287
411
}
288
412
}
289
413
414
+ /// Try to send a message, returning as fast as possible if sending
415
+ /// is not currently possible. This can be used to send ephemeral
416
+ /// messages.
417
+ ///
418
+ /// For the local case, this will immediately return false if the
419
+ /// channel is full.
420
+ ///
421
+ /// For the remote case, it will attempt to send the message and
422
+ /// return false if sending the first byte fails, otherwise yield
423
+ /// until the message is completely sent or an error occurs. This
424
+ /// guarantees that the message is sent either completely or not at
425
+ /// all.
426
+ ///
427
+ /// Returns true if the message was sent.
290
428
pub async fn try_send ( & mut self , value : T ) -> std:: result:: Result < ( ) , SendError > {
291
429
match self {
292
430
Sender :: Tokio ( tx) => match tx. try_send ( value) {
@@ -347,8 +485,12 @@ pub mod channel {
347
485
impl < T > Debug for Receiver < T > {
348
486
fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
349
487
match self {
350
- Self :: Tokio ( _) => f. debug_tuple ( "Tokio" ) . finish ( ) ,
351
- Self :: Boxed ( _) => f. debug_tuple ( "Boxed" ) . finish ( ) ,
488
+ Self :: Tokio ( inner) => f
489
+ . debug_struct ( "Tokio" )
490
+ . field ( "avail" , & inner. capacity ( ) )
491
+ . field ( "cap" , & inner. max_capacity ( ) )
492
+ . finish ( ) ,
493
+ Self :: Boxed ( inner) => f. debug_tuple ( "Boxed" ) . field ( & inner) . finish ( ) ,
352
494
}
353
495
}
354
496
}
@@ -361,11 +503,13 @@ pub mod channel {
361
503
pub mod none {
362
504
use crate :: sealed:: Sealed ;
363
505
506
+ /// A sender that does nothing. This is used when no communication is needed.
364
507
#[ derive( Debug ) ]
365
508
pub struct NoSender ;
366
509
impl Sealed for NoSender { }
367
510
impl crate :: Sender for NoSender { }
368
511
512
+ /// A receiver that does nothing. This is used when no communication is needed.
369
513
#[ derive( Debug ) ]
370
514
pub struct NoReceiver ;
371
515
@@ -380,8 +524,12 @@ pub mod channel {
380
524
/// generic io error.
381
525
#[ derive( Debug , thiserror:: Error ) ]
382
526
pub enum SendError {
527
+ /// The receiver has been closed. This is the only error that can occur
528
+ /// for local communication.
383
529
#[ error( "receiver closed" ) ]
384
530
ReceiverClosed ,
531
+ /// The underlying io error. This can occur for remote communication,
532
+ /// due to a network error or serialization error.
385
533
#[ error( "io error: {0}" ) ]
386
534
Io ( #[ from] io:: Error ) ,
387
535
}
@@ -402,8 +550,12 @@ pub mod channel {
402
550
/// generic io error.
403
551
#[ derive( Debug , thiserror:: Error ) ]
404
552
pub enum RecvError {
553
+ /// The sender has been closed. This is the only error that can occur
554
+ /// for local communication.
405
555
#[ error( "sender closed" ) ]
406
556
SenderClosed ,
557
+ /// An io error occurred. This can occur for remote communication,
558
+ /// due to a network error or deserialization error.
407
559
#[ error( "io error: {0}" ) ]
408
560
Io ( #[ from] io:: Error ) ,
409
561
}
@@ -422,7 +574,11 @@ pub mod channel {
422
574
/// This expands the protocol message to a full message that includes the
423
575
/// active and unserializable channels.
424
576
///
425
- /// rx and tx can be set to an appropriate channel kind.
577
+ /// The channel kind for rx and tx is defined by implementing the `Channels`
578
+ /// trait, either manually or using a macro.
579
+ ///
580
+ /// When the `message_spans` feature is enabled, this also includes a tracing
581
+ /// span to carry the tracing context during message passing.
426
582
pub struct WithChannels < I : Channels < S > , S : Service > {
427
583
/// The inner message.
428
584
pub inner : I ,
@@ -447,6 +603,7 @@ impl<I: Channels<S> + Debug, S: Service> Debug for WithChannels<I, S> {
447
603
}
448
604
449
605
impl < I : Channels < S > , S : Service > WithChannels < I , S > {
606
+ /// Get the parent span
450
607
#[ cfg( feature = "message_spans" ) ]
451
608
pub fn parent_span_opt ( & self ) -> Option < & tracing:: Span > {
452
609
Some ( & self . span )
@@ -497,7 +654,10 @@ where
497
654
}
498
655
}
499
656
500
- /// Deref so you can access the inner fields directly
657
+ /// Deref so you can access the inner fields directly.
658
+ ///
659
+ /// If the inner message has fields named `tx`, `rx` or `span`, you need to use the
660
+ /// `inner` field to access them.
501
661
impl < I : Channels < S > , S : Service > Deref for WithChannels < I , S > {
502
662
type Target = I ;
503
663
@@ -568,7 +728,8 @@ impl<M, R, S> Client<M, R, S> {
568
728
}
569
729
}
570
730
571
- /// Create a sender that allows sending messages to the service.
731
+ /// Start a request by creating a sender that can be used to send the initial
732
+ /// message to the local or remote service.
572
733
///
573
734
/// In the local case, this is just a clone which has almost zero overhead.
574
735
/// Creating a local sender can not fail.
@@ -644,14 +805,17 @@ impl<M> Clone for ClientInner<M> {
644
805
/// an empty enum since local requests can not fail.
645
806
#[ derive( Debug , thiserror:: Error ) ]
646
807
pub enum RequestError {
808
+ /// Error in quinn during connect
647
809
#[ cfg( feature = "rpc" ) ]
648
810
#[ cfg_attr( quicrpc_docsrs, doc( cfg( feature = "rpc" ) ) ) ]
649
811
#[ error( "error establishing connection: {0}" ) ]
650
812
Connect ( #[ from] quinn:: ConnectError ) ,
813
+ /// Error in quinn when the connection already exists, when opening a stream pair
651
814
#[ cfg( feature = "rpc" ) ]
652
815
#[ cfg_attr( quicrpc_docsrs, doc( cfg( feature = "rpc" ) ) ) ]
653
816
#[ error( "error opening stream: {0}" ) ]
654
817
Connection ( #[ from] quinn:: ConnectionError ) ,
818
+ /// Generic error for non-quinn transports
655
819
#[ cfg( feature = "rpc" ) ]
656
820
#[ cfg_attr( quicrpc_docsrs, doc( cfg( feature = "rpc" ) ) ) ]
657
821
#[ error( "error opening stream: {0}" ) ]
0 commit comments