Skip to content

Commit b7aa08d

Browse files
committed
Add buffering of outgoing messages
Sending each message in its own TCP packet isn't very efficient. Note: The Prometheus metrics `messages_outbound_sent_total` and `messages_outbound_dropped_total` have gone. They weren't very useful and we no longer know the number of messages by the time the connection is dropped (could report dropped bytes if needed though). $ dune exec -- ./test-bin/echo/echo_bench.exe echo_bench.exe: [INFO] rate = 18466.209374 # Before echo_bench.exe: [INFO] rate = 59655.912455 # After
1 parent cd511b0 commit b7aa08d

File tree

10 files changed

+101
-118
lines changed

10 files changed

+101
-118
lines changed

capnp-rpc-net/capTP_capnp.ml

Lines changed: 3 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,5 @@
11
open Eio.Std
22

3-
module Metrics = struct
4-
open Prometheus
5-
6-
let namespace = "capnp"
7-
8-
let subsystem = "net"
9-
10-
let connections =
11-
let help = "Number of live capnp-rpc connections" in
12-
Gauge.v ~help ~namespace ~subsystem "connections"
13-
14-
let messages_inbound_received_total =
15-
let help = "Total number of messages received" in
16-
Counter.v ~help ~namespace ~subsystem "messages_inbound_received_total"
17-
18-
let messages_outbound_enqueued_total =
19-
let help = "Total number of messages enqueued to be transmitted" in
20-
Counter.v ~help ~namespace ~subsystem "messages_outbound_enqueued_total"
21-
22-
let messages_outbound_sent_total =
23-
let help = "Total number of messages transmitted" in
24-
Counter.v ~help ~namespace ~subsystem "messages_outbound_sent_total"
25-
26-
let messages_outbound_dropped_total =
27-
let help = "Total number of messages lost due to disconnections" in
28-
Counter.v ~help ~namespace ~subsystem "messages_outbound_dropped_total"
29-
end
30-
313
module Log = Capnp_rpc.Debug.Log
324

335
module Builder = Capnp_rpc.Private.Schema.Builder
@@ -44,7 +16,6 @@ module Make (Network : S.NETWORK) = struct
4416
type t = {
4517
endpoint : Endpoint.t;
4618
conn : Conn.t;
47-
xmit_queue : Capnp.Message.rw Capnp.BytesMessage.Message.t Eio.Stream.t;
4819
mutable disconnecting : bool;
4920
}
5021

@@ -59,48 +30,12 @@ module Make (Network : S.NETWORK) = struct
5930

6031
let tags t = Conn.tags t.conn
6132

62-
let drop_queue q =
63-
let len = Eio.Stream.length q in
64-
Prometheus.Counter.inc Metrics.messages_outbound_dropped_total (float_of_int len)
65-
(* Queue.clear q -- could close stream here instead *)
66-
67-
(* [flush ~xmit_queue endpoint] writes each message in [xmit_queue] to [endpoint]. *)
68-
let rec flush ~xmit_queue endpoint =
69-
let next = Eio.Stream.take xmit_queue in
70-
match Endpoint.send endpoint next with
71-
| Error `Closed ->
72-
Endpoint.disconnect endpoint; (* We'll read a close soon *)
73-
drop_queue xmit_queue;
74-
`Stop_daemon
75-
| Error (`Msg msg) ->
76-
Log.warn (fun f -> f "Error sending messages: %s (will shutdown connection)" msg);
77-
Endpoint.disconnect endpoint;
78-
drop_queue xmit_queue;
79-
`Stop_daemon
80-
| Ok () ->
81-
Prometheus.Counter.inc_one Metrics.messages_outbound_sent_total;
82-
flush ~xmit_queue endpoint
83-
| exception ex ->
84-
drop_queue xmit_queue;
85-
raise ex
86-
87-
(* Enqueue [message] in [xmit_queue] and ensure the flush thread is running. *)
88-
let queue_send ~xmit_queue message =
89-
Log.debug (fun f ->
90-
let module M = Capnp_rpc.Private.Schema.MessageWrapper.Message in
91-
f "queue_send: %d/%d allocated bytes in %d segs"
92-
(M.total_size message)
93-
(M.total_alloc_size message)
94-
(M.num_segments message));
95-
Eio.Stream.add xmit_queue message;
96-
Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total
97-
9833
let return_not_implemented t x =
9934
Log.debug (fun f -> f ~tags:(tags t) "Returning Unimplemented");
10035
let open Builder in
10136
let m = Message.init_root () in
10237
let _ : Builder.Message.t = Message.unimplemented_set_reader m x in
103-
queue_send ~xmit_queue:t.xmit_queue (Message.to_message m)
38+
Endpoint.send t.endpoint (Message.to_message m)
10439

10540
let listen t =
10641
let rec loop () =
@@ -109,7 +44,6 @@ module Make (Network : S.NETWORK) = struct
10944
| Ok msg ->
11045
let open Reader.Message in
11146
let msg = of_message msg in
112-
Prometheus.Counter.inc_one Metrics.messages_inbound_received_total;
11347
match Parse.message msg with
11448
| #Endpoint_types.In.t as msg ->
11549
Log.debug (fun f ->
@@ -139,7 +73,7 @@ module Make (Network : S.NETWORK) = struct
13973
loop ()
14074

14175
let send_abort t ex =
142-
queue_send ~xmit_queue:t.xmit_queue (Serialise.message (`Abort ex))
76+
Endpoint.send t.endpoint (Serialise.message (`Abort ex))
14377

14478
let disconnect t ex =
14579
if not t.disconnecting then (
@@ -152,21 +86,17 @@ module Make (Network : S.NETWORK) = struct
15286
let disconnecting t = t.disconnecting
15387

15488
let connect ~sw ~restore ?(tags=Logs.Tag.empty) endpoint =
155-
let xmit_queue = Eio.Stream.create 100 in (* todo: tune this? make it configurable? *)
156-
Fiber.fork_daemon ~sw (fun () -> flush ~xmit_queue endpoint);
157-
let queue_send msg = Eio.Stream.add xmit_queue (Serialise.message msg) in
89+
let queue_send msg = Endpoint.send endpoint (Serialise.message msg) in
15890
let restore = Restorer.fn restore in
15991
let fork = Fiber.fork ~sw in
16092
let conn = Conn.create ~restore ~tags ~fork ~queue_send in
16193
{
16294
conn;
16395
endpoint;
164-
xmit_queue;
16596
disconnecting = false;
16697
}
16798

16899
let listen t =
169-
Prometheus.Gauge.inc_one Metrics.connections;
170100
let tags = Conn.tags t.conn in
171101
begin
172102
match listen t with
@@ -180,7 +110,6 @@ module Make (Network : S.NETWORK) = struct
180110
send_abort t (Capnp_rpc.Exception.v ~ty:`Failed (Printexc.to_string ex))
181111
end;
182112
Log.info (fun f -> f ~tags "Connection closed");
183-
Prometheus.Gauge.dec_one Metrics.connections;
184113
Eio.Cancel.protect (fun () ->
185114
disconnect t (Capnp_rpc.Exception.v ~ty:`Disconnected "Connection closed")
186115
)

capnp-rpc-net/endpoint.ml

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,27 @@
11
open Eio.Std
22

3+
module Metrics = struct
4+
open Prometheus
5+
6+
let namespace = "capnp"
7+
8+
let subsystem = "net"
9+
10+
let connections =
11+
let help = "Number of live capnp-rpc connections" in
12+
Gauge.v ~help ~namespace ~subsystem "connections"
13+
14+
let messages_inbound_received_total =
15+
let help = "Total number of messages received" in
16+
Counter.v ~help ~namespace ~subsystem "messages_inbound_received_total"
17+
18+
let messages_outbound_enqueued_total =
19+
let help = "Total number of messages enqueued to be transmitted" in
20+
Counter.v ~help ~namespace ~subsystem "messages_outbound_enqueued_total"
21+
end
22+
23+
module Write = Eio.Buf_write
24+
325
let src = Logs.Src.create "endpoint" ~doc:"Send and receive Cap'n'Proto messages"
426
module Log = (val Logs.src_log src: Logs.LOG)
527

@@ -11,17 +33,13 @@ type flow = Eio.Flow.two_way_ty r
1133

1234
type t = {
1335
flow : flow;
36+
writer : Write.t;
1437
decoder : Capnp.Codecs.FramedStream.t;
1538
peer_id : Auth.Digest.t;
1639
}
1740

1841
let peer_id t = t.peer_id
1942

20-
let of_flow ~peer_id flow =
21-
let decoder = Capnp.Codecs.FramedStream.empty compression in
22-
let flow = (flow :> flow) in
23-
{ flow; decoder; peer_id }
24-
2543
let dump_msg =
2644
let next = ref 0 in
2745
fun data ->
@@ -32,25 +50,62 @@ let dump_msg =
3250
output_string ch data;
3351
close_out ch
3452

53+
let disconnect t =
54+
try
55+
Eio.Flow.shutdown t.flow `All
56+
with Eio.Io (Eio.Net.E Connection_reset _, _) ->
57+
(* TCP connection already shut down, so TLS shutdown failed. Ignore. *)
58+
()
59+
3560
let send t msg =
36-
let data = Capnp.Codecs.serialize ~compression msg in
37-
if record_sent_messages then dump_msg data;
38-
match Eio.Flow.copy_string data t.flow with
39-
| ()
40-
| exception End_of_file -> Ok ()
61+
Log.debug (fun f ->
62+
let module M = Capnp_rpc.Private.Schema.MessageWrapper.Message in
63+
f "queue_send: %d/%d allocated bytes in %d segs"
64+
(M.total_size msg)
65+
(M.total_alloc_size msg)
66+
(M.num_segments msg));
67+
Capnp.Codecs.serialize_iter_copyless ~compression msg ~f:(fun x len -> Write.string t.writer x ~len);
68+
Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total;
69+
if record_sent_messages then dump_msg (Capnp.Codecs.serialize ~compression msg)
70+
71+
let rec run_writer t =
72+
let bufs = Write.await_batch t.writer in
73+
match Eio.Flow.single_write t.flow bufs with
74+
| n -> Write.shift t.writer n; run_writer t
4175
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) ->
4276
Log.info (fun f -> f "%a" Eio.Exn.pp ex);
43-
Error `Closed
77+
disconnect t (* We'll read a close soon *)
4478
| exception ex ->
4579
Eio.Fiber.check ();
46-
Error (`Msg (Printexc.to_string ex))
80+
Log.warn (fun f -> f "Error sending messages: %a (will shutdown connection)" Fmt.exn ex);
81+
disconnect t
82+
83+
let of_flow ~sw ~peer_id flow =
84+
let decoder = Capnp.Codecs.FramedStream.empty compression in
85+
let flow = (flow :> flow) in
86+
let writer = Write.create 4096 in
87+
let t = { flow; writer; decoder; peer_id } in
88+
Prometheus.Gauge.inc_one Metrics.connections;
89+
Switch.on_release sw (fun () -> Prometheus.Gauge.dec_one Metrics.connections);
90+
Fiber.fork_daemon ~sw (fun () -> run_writer t; `Stop_daemon);
91+
t
4792

4893
let rec recv t =
4994
match Capnp.Codecs.FramedStream.get_next_frame t.decoder with
50-
| Ok msg -> Ok (Capnp.BytesMessage.Message.readonly msg)
95+
| Ok msg ->
96+
Prometheus.Counter.inc_one Metrics.messages_inbound_received_total;
97+
(* We often want to send multiple response messages while processing a batch of requests,
98+
so pause the writer to collect them. We'll unpause on the next read. *)
99+
Write.pause t.writer;
100+
Ok (Capnp.BytesMessage.Message.readonly msg)
51101
| Error Capnp.Codecs.FramingError.Unsupported -> failwith "Unsupported Cap'n'Proto frame received"
52102
| Error Capnp.Codecs.FramingError.Incomplete ->
53103
Log.debug (fun f -> f "Incomplete; waiting for more data...");
104+
(* We probably scheduled one or more application fibers to run while handling the last
105+
batch of messages. Given them a chance to run now while the writer is paused, because
106+
they might want to send more messages immediately. *)
107+
Fiber.yield ();
108+
Write.unpause t.writer;
54109
let buf = Cstruct.create 4096 in (* TODO: make this efficient *)
55110
match Eio.Flow.single_read t.flow buf with
56111
| got ->
@@ -62,10 +117,3 @@ let rec recv t =
62117
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) ->
63118
Log.info (fun f -> f "%a" Eio.Exn.pp ex);
64119
Error `Closed
65-
66-
let disconnect t =
67-
try
68-
Eio.Flow.shutdown t.flow `All
69-
with Eio.Io (Eio.Net.E Connection_reset _, _) ->
70-
(* TCP connection already shut down, so TLS shutdown failed. Ignore. *)
71-
()

capnp-rpc-net/endpoint.mli

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
(** Send and receive capnp messages over a byte-stream. *)
22

3+
open Eio.Std
4+
35
val src : Logs.src
46
(** Control the log level. *)
57

68
type t
79
(** A wrapper for a byte-stream (flow). *)
810

9-
val send : t -> 'a Capnp.BytesMessage.Message.t -> (unit, [`Closed | `Msg of string]) result
10-
(** [send t msg] transmits [msg]. *)
11+
val send : t -> 'a Capnp.BytesMessage.Message.t -> unit
12+
(** [send t msg] enqueues [msg]. *)
1113

1214
val recv : t -> (Capnp.Message.ro Capnp.BytesMessage.Message.t, [> `Closed]) result
1315
(** [recv t] reads the next message from the remote peer.
1416
It returns [Error `Closed] if the connection to the peer is lost. *)
1517

16-
val of_flow : peer_id:Auth.Digest.t -> _ Eio.Flow.two_way -> t
17-
(** [of_flow ~peer_id flow] sends and receives on [flow]. *)
18+
val of_flow : sw:Switch.t -> peer_id:Auth.Digest.t -> _ Eio.Flow.two_way -> t
19+
(** [of_flow ~sw ~peer_id flow] sends and receives on [flow].
20+
21+
[sw] is used to run a fiber writing messages in batches. *)
1822

1923
val peer_id : t -> Auth.Digest.t
2024
(** [peer_id t] is the fingerprint of the peer's public key,

capnp-rpc-net/tls_wrapper.ml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ let error fmt =
66
fmt |> Fmt.kstr @@ fun msg ->
77
Error (`Msg msg)
88

9-
let plain_endpoint flow =
10-
Endpoint.of_flow ~peer_id:Auth.Digest.insecure flow
9+
let plain_endpoint ~sw flow =
10+
Endpoint.of_flow ~sw ~peer_id:Auth.Digest.insecure flow
1111

12-
let connect_as_server flow secret_key =
12+
let connect_as_server ~sw flow secret_key =
1313
match secret_key with
14-
| None -> Ok (plain_endpoint flow)
14+
| None -> Ok (plain_endpoint ~sw flow)
1515
| Some key ->
1616
Log.info (fun f -> f "Doing TLS server-side handshake...");
1717
let tls_config = Secret_key.tls_server_config key in
@@ -26,15 +26,15 @@ let connect_as_server flow secret_key =
2626
| None -> error "No client certificate found"
2727
| Some client_cert ->
2828
let peer_id = Digest.of_certificate client_cert in
29-
Ok (Endpoint.of_flow ~peer_id flow)
29+
Ok (Endpoint.of_flow ~sw ~peer_id flow)
3030

31-
let connect_as_client flow secret_key auth =
31+
let connect_as_client ~sw flow secret_key auth =
3232
match Digest.authenticator auth with
33-
| None -> Ok (plain_endpoint flow)
33+
| None -> Ok (plain_endpoint ~sw flow)
3434
| Some authenticator ->
3535
let tls_config = Secret_key.tls_client_config ~authenticator (Lazy.force secret_key) in
3636
Log.info (fun f -> f "Doing TLS client-side handshake...");
3737
match Tls_eio.client_of_flow tls_config flow with
3838
| exception (Failure msg) -> error "TLS connection failed: %s" msg
3939
| exception ex -> Eio.Fiber.check (); error "TLS connection failed: %a" Fmt.exn ex
40-
| flow -> Ok (Endpoint.of_flow ~peer_id:auth flow)
40+
| flow -> Ok (Endpoint.of_flow ~sw ~peer_id:auth flow)

capnp-rpc-net/tls_wrapper.mli

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ open Auth
22
open Eio.Std
33

44
val connect_as_server :
5+
sw:Switch.t ->
56
[> Eio.Flow.two_way_ty | Eio.Resource.close_ty] r -> Auth.Secret_key.t option ->
67
(Endpoint.t, [> `Msg of string]) result
78

89
val connect_as_client :
10+
sw:Switch.t ->
911
[> Eio.Flow.two_way_ty | Eio.Resource.close_ty] r -> Auth.Secret_key.t Lazy.t -> Digest.t ->
1012
(Endpoint.t, [> `Msg of string]) result
11-
(** [connect_as_client underlying key digest] is an endpoint using flow [underlying].
13+
(** [connect_as_client ~sw underlying key digest] is an endpoint using flow [underlying].
1214
If [digest] requires TLS, it performs a TLS handshake. It uses [key] as its private key
1315
and checks that the server is the one required by [auth]. *)

test-bin/calc_direct.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ module Parent = struct
3535
Switch.run @@ fun sw ->
3636
(* Run Cap'n Proto RPC protocol on [socket]: *)
3737
let p = Eio_unix.Net.import_socket_stream ~sw ~close_unix:true socket
38-
|> Capnp_rpc_net.Endpoint.of_flow
38+
|> Capnp_rpc_net.Endpoint.of_flow ~sw
3939
~peer_id:Capnp_rpc_net.Auth.Digest.insecure
4040
in
4141
Logs.info (fun f -> f "Connecting to child process...");
@@ -60,7 +60,7 @@ module Child = struct
6060
let service = Calc.local ~sw in
6161
let restore = Capnp_rpc_net.Restorer.single service_name service in
6262
(* Run Cap'n Proto RPC protocol on [socket]: *)
63-
let endpoint = Capnp_rpc_net.Endpoint.of_flow socket
63+
let endpoint = Capnp_rpc_net.Endpoint.of_flow socket ~sw
6464
~peer_id:Capnp_rpc_net.Auth.Digest.insecure
6565
in
6666
let conn = Capnp_rpc_unix.CapTP.connect ~sw ~restore endpoint in

test-bin/echo/echo_bench.ml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ let () =
77
Logs.set_reporter (Logs_fmt.reporter ())
88

99
let run_client service =
10-
(* let n = 100000 in *) (* XXX: improve speed *)
11-
let n = 1000 in
10+
let n = 100000 in
1211
let ops = List.init n (fun i ->
1312
let payload = Int.to_string i in
1413
let desired_result = "echo:" ^ payload in

unix/capnp_rpc_unix.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ let with_cap_exn ?progress sr f =
150150
| Error ex -> Fmt.failwith "%a" Capnp_rpc.Exception.pp ex
151151
| Ok x -> Capnp_rpc.Capability.with_ref x f
152152

153-
let handle_connection ?tags ~secret_key vat client =
154-
match Network.accept_connection ~secret_key client with
153+
let handle_connection ?tags ~sw ~secret_key vat client =
154+
match Network.accept_connection ~sw ~secret_key client with
155155
| Error (`Msg msg) ->
156156
Log.warn (fun f -> f ?tags "Rejecting new connection: %s" msg)
157157
| Ok ep ->
@@ -189,7 +189,7 @@ let listen ?tags ~sw (config, vat, socket) =
189189
let secret_key = if config.Vat_config.serve_tls then Some (Vat_config.secret_key config) else None in
190190
Fiber.fork ~sw (fun () ->
191191
(* We don't use [Net.accept_fork] here because this returns immediately after connecting. *)
192-
handle_connection ?tags ~secret_key vat client
192+
handle_connection ?tags ~sw ~secret_key vat client
193193
)
194194
done
195195

0 commit comments

Comments
 (0)