Skip to content

Commit dae40cb

Browse files
authored
Merge pull request #49 from inhabitedtype/simplify-api
Zero-copy read API
2 parents 13f0cb5 + 5784ec9 commit dae40cb

File tree

7 files changed

+260
-208
lines changed

7 files changed

+260
-208
lines changed

async/httpaf_async.ml

Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,56 @@
11
open Core
22
open Async
33

4+
(** XXX(seliopou): Replace Angstrom.Buffered with a module like this, while
5+
also supporting growing the buffer. Clients can use this to buffer and the
6+
use the unbuffered interface for actually running the parser. *)
7+
module Buffer : sig
8+
type t
9+
10+
val create : int -> t
11+
12+
val get : t -> f:(Bigstring.t -> off:int -> len:int -> int) -> int
13+
val put : t -> f:(Bigstring.t -> off:int -> len:int -> int) -> int
14+
end= struct
15+
type t =
16+
{ buffer : Bigstring.t
17+
; mutable off : int
18+
; mutable len : int }
19+
20+
let create size =
21+
let buffer = Bigstring.create size in
22+
{ buffer; off = 0; len = 0 }
23+
;;
24+
25+
let compress t =
26+
if t.len = 0
27+
then begin
28+
t.off <- 0;
29+
t.len <- 0;
30+
end else if t.off > 0
31+
then begin
32+
Bigstring.blit ~src:t.buffer ~src_pos:t.off ~dst:t.buffer ~dst_pos:0 ~len:t.len;
33+
t.off <- 0;
34+
end
35+
;;
36+
37+
let get t ~f =
38+
let n = f t.buffer ~off:t.off ~len:t.len in
39+
t.off <- t.off + n;
40+
t.len <- t.len - n;
41+
if t.len = 0
42+
then t.off <- 0;
43+
n
44+
;;
45+
46+
let put t ~f =
47+
compress t;
48+
let n = f t.buffer ~off:(t.off + t.len) ~len:(Bigstring.length t.buffer - t.len) in
49+
t.len <- t.len + n;
50+
n
51+
;;
52+
end
53+
454
let read fd buffer =
555
let badfd fd = failwithf "read got back fd: %s" (Fd.to_string fd) () in
656
let rec finish fd buffer result =
@@ -25,16 +75,18 @@ let read fd buffer =
2575
finish fd buffer
2676
(Fd.syscall fd ~nonblocking:true
2777
(fun file_descr ->
28-
Unix.Syscall_result.Int.ok_or_unix_error_exn ~syscall_name:"read"
29-
(Bigstring.read_assume_fd_is_nonblocking file_descr buffer)))
78+
Buffer.put buffer ~f:(fun bigstring ~off ~len ->
79+
Unix.Syscall_result.Int.ok_or_unix_error_exn ~syscall_name:"read"
80+
(Bigstring.read_assume_fd_is_nonblocking file_descr bigstring ~pos:off ~len))))
3081
else
3182
Fd.syscall_in_thread fd ~name:"read"
32-
(fun file_descr -> Bigstring.read file_descr buffer)
83+
(fun file_descr ->
84+
Buffer.put buffer ~f:(fun bigstring ~off ~len ->
85+
Bigstring.read file_descr bigstring ~pos:off ~len))
3386
>>= fun result -> finish fd buffer result
3487
in
3588
go fd buffer
3689

37-
3890
open Httpaf
3991

4092
module Server = struct
@@ -46,13 +98,23 @@ module Server = struct
4698
let error_handler = error_handler client_addr in
4799
let conn = Server_connection.create ?config ~error_handler request_handler in
48100
let read_complete = Ivar.create () in
101+
(* XXX(seliopou): Make this configurable *)
102+
let buffer = Buffer.create 0x1000 in
49103
let rec reader_thread () =
50104
match Server_connection.next_read_operation conn with
51-
| `Read buffer ->
105+
| `Read ->
52106
(* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *)
53-
read fd buffer >>> fun result ->
54-
Server_connection.report_read_result conn result;
55-
reader_thread ()
107+
read fd buffer
108+
>>> begin function
109+
| `Eof ->
110+
Server_connection.shutdown_reader conn;
111+
reader_thread ()
112+
| `Ok _ ->
113+
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
114+
Server_connection.read conn bigstring ~off ~len)
115+
|> ignore;
116+
reader_thread ()
117+
end
56118
| `Yield ->
57119
(* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *)
58120
Server_connection.yield_reader conn reader_thread
@@ -100,13 +162,22 @@ module Client = struct
100162
let request_body, conn =
101163
Client_connection.request request ~error_handler ~response_handler in
102164
let read_complete = Ivar.create () in
165+
let buffer = Buffer.create 0x1000 in
103166
let rec reader_thread () =
104167
match Client_connection.next_read_operation conn with
105-
| `Read buffer ->
168+
| `Read ->
106169
(* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *)
107-
read fd buffer >>> fun result ->
108-
Client_connection.report_read_result conn result;
109-
reader_thread ()
170+
read fd buffer
171+
>>> begin function
172+
| `Eof ->
173+
Client_connection.shutdown_reader conn;
174+
reader_thread ()
175+
| `Ok _ ->
176+
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
177+
Client_connection.read conn bigstring ~off ~len)
178+
|> ignore;
179+
reader_thread ()
180+
end
110181
| `Close ->
111182
(* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *)
112183
Ivar.fill read_complete ();

benchmarks/wrk_async_benchmark.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ let error_handler _ ?request error start_response =
1717
| #Status.standard as error ->
1818
Body.write_string response_body (Status.default_reason_phrase error)
1919
end;
20-
Body.close response_body
20+
Body.close_writer response_body
2121
;;
2222

2323
let request_handler _ reqd =
2424
let { Request.target } = Reqd.request reqd in
2525
let request_body = Reqd.request_body reqd in
26-
Body.close request_body;
26+
Body.close_reader request_body;
2727
match target with
2828
| "/" -> Reqd.respond_with_bigstring reqd (Response.create ~headers `OK) text;
2929
| _ -> Reqd.respond_with_string reqd (Response.create `Not_found) "Route not found"

lib/client_connection.ml

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,8 @@ module Oneshot = struct
9191
Body.transfer_to_writer_with_encoding t.request_body ~encoding t.writer
9292
;;
9393

94-
let shutdown t =
95-
flush_request_body t;
94+
let shutdown_reader t =
9695
Reader.close t.reader;
97-
Writer.close t.writer;
98-
Body.close_writer t.request_body;
9996
begin match !(t.state) with
10097
| Awaiting_response | Closed -> ()
10198
| Received_response(_, response_body) ->
@@ -104,6 +101,17 @@ module Oneshot = struct
104101
end;
105102
;;
106103

104+
let shutdown_writer t =
105+
flush_request_body t;
106+
Writer.close t.writer;
107+
Body.close_writer t.request_body;
108+
;;
109+
110+
let shutdown t =
111+
shutdown_reader t;
112+
shutdown_writer t;
113+
;;
114+
107115
let set_error_and_handle t error =
108116
shutdown t;
109117
t.state := Closed;
@@ -144,12 +152,13 @@ module Oneshot = struct
144152
| `Error (`Invalid_response_body_length _ as error) ->
145153
set_error_and_handle t error;
146154
`Close
147-
| (`Read _ | `Close) as operation -> operation
155+
| (`Read | `Close) as operation -> operation
148156
;;
149157

150-
let report_read_result t result =
151-
Reader.report_result t.reader result;
158+
let read t bs ~off ~len =
159+
let consumed = Reader.read t.reader bs ~off ~len in
152160
flush_response_body t;
161+
consumed
153162
;;
154163

155164
let next_write_operation t =

lib/httpaf.mli

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -695,8 +695,7 @@ end
695695
module Server_connection : sig
696696
module Config : sig
697697
type t =
698-
{ read_buffer_size : int (** Default is [4096] *)
699-
; response_buffer_size : int (** Default is [1024] *)
698+
{ response_buffer_size : int (** Default is [1024] *)
700699
; response_body_buffer_size : int (** Default is [4096] *)
701700
}
702701

@@ -723,27 +722,29 @@ module Server_connection : sig
723722
(** [create ?config ?error_handler ~request_handler] creates a connection
724723
handler that will service individual requests with [request_handler]. *)
725724

726-
val next_read_operation : _ t -> [ `Read of Bigstring.t | `Yield | `Close ]
725+
val next_read_operation : _ t -> [ `Read | `Yield | `Close ]
727726
(** [next_read_operation t] returns a value describing the next operation
728727
that the caller should conduct on behalf of the connection. *)
729728

730-
val report_read_result : _ t -> [`Ok of int | `Eof] -> unit
731-
(** [report_read_result t result] reports the result of the latest read
732-
attempt to the connection. {report_read_result} should be called after a
733-
call to {next_read_operation} that returns a [`Read buffer] value.
734-
735-
{ul
736-
{- [`Ok n] indicates that the caller successfully received [n] bytes of
737-
input and wrote them into the the read buffer that the caller was
738-
provided by {next_read_operation}. }
739-
{- [`Eof] indicates that the input source will no longer provide any
740-
bytes to the read processor. }} *)
729+
val read : _ t -> Bigstring.t -> off:int -> len:int -> int
730+
(** [read t bigstring ~off ~len] reads bytes of input from the provided range
731+
of [bigstring] and returns the number of bytes consumed by the
732+
connection. {!read} should be called after {!next_read_operation}
733+
returns a [`Read] value and additional input is available for the
734+
connection to consume. *)
741735

742736
val yield_reader : _ t -> (unit -> unit) -> unit
743737
(** [yield_reader t continue] registers with the connection to call
744738
[continue] when reading should resume. {!yield_reader} should be called
745739
after {next_read_operation} returns a [`Yield] value. *)
746740

741+
val shutdown_reader : _ t -> unit
742+
(** [shutdown_reader t] shutds own the read processor for the connection. All
743+
subsequent calls to {!next_read_operations} will return [`Close].
744+
{!shutdown_reader} should be called after {!next_read_operation} returns
745+
a [`Read] value and there is no further input available for the
746+
connection to consume. *)
747+
747748
val next_write_operation : _ t -> [
748749
| `Write of Bigstring.t IOVec.t list
749750
| `Yield
@@ -808,21 +809,23 @@ module Client_connection : sig
808809
-> response_handler:response_handler
809810
-> [`write] Body.t * t
810811

811-
val next_read_operation : t -> [ `Read of Bigstring.t | `Close ]
812+
val next_read_operation : t -> [ `Read | `Close ]
812813
(** [next_read_operation t] returns a value describing the next operation
813814
that the caller should conduct on behalf of the connection. *)
814815

815-
val report_read_result : t -> [`Ok of int | `Eof] -> unit
816-
(** [report_read_result t result] reports the result of the latest read
817-
attempt to the connection. {report_read_result} should be called after a
818-
call to {next_read_operation} that returns a [`Read buffer] value.
819-
820-
{ul
821-
{- [`Ok n] indicates that the caller successfully received [n] bytes of
822-
input and wrote them into the the read buffer that the caller was
823-
provided by {next_read_operation}. }
824-
{- [`Eof] indicates that the input source will no longer provide any
825-
bytes to the read processor. }} *)
816+
val read : t -> Bigstring.t -> off:int -> len:int -> int
817+
(** [read t bigstring ~off ~len] reads bytes of input from the provided range
818+
of [bigstring] and returns the number of bytes consumed by the
819+
connection. {!read} should be called after {!next_read_operation}
820+
returns a [`Read] value and additional input is available for the
821+
connection to consume. *)
822+
823+
val shutdown_reader : t -> unit
824+
(** [shutdown_reader t] shutds own the read processor for the connection. All
825+
subsequent calls to {!next_read_operations} will return [`Close].
826+
{!shutdown_reader} should be called after {!next_read_operation} returns
827+
a [`Read] value and there is no further input available for the
828+
connection to consume. *)
826829

827830
val next_write_operation : t -> [
828831
| `Write of Bigstring.t IOVec.t list

0 commit comments

Comments
 (0)