Skip to content

Commit 49c0591

Browse files
committed
Merge branch 'master' of github.com:anmonteiro/ocaml-h2
2 parents 10a06bd + 25645cc commit 49c0591

17 files changed

+2111
-59
lines changed

.ocamlformat-ignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
hpack/src/huffman_table.ml

CHANGES.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
0.13.0 2024-09-04
22
--------------
33

4-
- surface write failures through `Body.Writer.flush`
4+
- h2: surface (body) write failures through `flush`
55
([#247](https://github.com/anmonteiro/ocaml-h2/pull/247))
6+
- `Body.Writer.flush` now takes a callback of the type
7+
``([ `Written | ` Closed] -> unit)``, informing the caller whether the
8+
previous writes have been written or whether the output channel was
9+
closed.
610

711
0.12.0 2024-06-23
812
--------------

examples/lwt/lwt_echo_server2.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t =
8484
in
8585
(* let (finished, notify) = Lwt.wait () in *)
8686
let rec on_read _request_data ~off:_ ~len:_ =
87-
Body.Writer.flush response_body (fun () ->
87+
Body.Writer.flush response_body (fun _ ->
8888
Body.Reader.schedule_read request_body ~on_eof ~on_read)
8989
and on_eof () =
9090
set_interval
@@ -93,13 +93,13 @@ let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t =
9393
let _ =
9494
Body.Writer.write_string response_body "data: some data\n\n"
9595
in
96-
Body.Writer.flush response_body (fun () -> ());
96+
Body.Writer.flush response_body ignore;
9797
true)
9898
(fun () ->
9999
let _ =
100100
Body.Writer.write_string response_body "event: end\ndata: 1\n\n"
101101
in
102-
Body.Writer.flush response_body (fun () ->
102+
Body.Writer.flush response_body (fun _ ->
103103
Body.Writer.close response_body))
104104
in
105105
Body.Reader.schedule_read ~on_read ~on_eof request_body;

hpack/src/dune

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@
88
(action
99
(with-stdout-to
1010
%{targets}
11-
(run ../util/gen_huffman.exe %{deps}))))
11+
(run ../util/gen_huffman.exe %{deps})))
12+
(mode fallback))

hpack/src/huffman_table.ml

Lines changed: 1847 additions & 0 deletions
Large diffs are not rendered by default.

lib/body.ml

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ module Reader = struct
3838
; mutable read_scheduled : bool
3939
; mutable on_eof : unit -> unit
4040
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
41-
; buffered_bytes : int ref
4241
; done_reading : int -> unit
4342
}
4443

@@ -51,7 +50,6 @@ module Reader = struct
5150
; read_scheduled = false
5251
; on_eof = default_on_eof
5352
; on_read = default_on_read
54-
; buffered_bytes = ref 0
5553
; done_reading
5654
}
5755

@@ -110,12 +108,12 @@ module Writer = struct
110108

111109
type t =
112110
{ faraday : Faraday.t
113-
; buffered_bytes : int ref
111+
; mutable buffered_bytes : int
114112
; writer : Serialize.Writer.t
115113
}
116114

117115
let create buffer ~writer =
118-
{ faraday = Faraday.of_bigstring buffer; buffered_bytes = ref 0; writer }
116+
{ faraday = Faraday.of_bigstring buffer; buffered_bytes = 0; writer }
119117

120118
let create_empty ~writer =
121119
let t = create Bigstringaf.empty ~writer in
@@ -125,28 +123,41 @@ module Writer = struct
125123
let ready_to_write t = Serialize.Writer.wakeup t.writer
126124

127125
let write_char t c =
128-
Faraday.write_char t.faraday c;
126+
if not (Faraday.is_closed t.faraday) then Faraday.write_char t.faraday c;
129127
ready_to_write t
130128

131129
let write_string t ?off ?len s =
132-
Faraday.write_string ?off ?len t.faraday s;
130+
if not (Faraday.is_closed t.faraday)
131+
then Faraday.write_string ?off ?len t.faraday s;
133132
ready_to_write t
134133

135134
let write_bigstring t ?off ?len b =
136-
Faraday.write_bigstring ?off ?len t.faraday b;
135+
if not (Faraday.is_closed t.faraday)
136+
then Faraday.write_bigstring ?off ?len t.faraday b;
137137
ready_to_write t
138138

139139
let schedule_bigstring t ?off ?len (b : Bigstringaf.t) =
140-
Faraday.schedule_bigstring ?off ?len t.faraday b;
140+
if not (Faraday.is_closed t.faraday)
141+
then Faraday.schedule_bigstring ?off ?len t.faraday b;
141142
ready_to_write t
142143

143144
let flush t kontinue =
144-
Faraday.flush t.faraday kontinue;
145-
ready_to_write t
145+
if Serialize.Writer.is_closed t.writer
146+
then kontinue `Closed
147+
else (
148+
Faraday.flush_with_reason t.faraday (function
149+
| Drain -> kontinue `Closed
150+
| Nothing_pending | Shift -> kontinue `Written);
151+
ready_to_write t)
146152

147153
let is_closed t = Faraday.is_closed t.faraday
148154
let has_pending_output t = Faraday.has_pending_output t.faraday
149155

156+
let close_and_drain t =
157+
Faraday.close t.faraday;
158+
(* Resolve all pending flushes *)
159+
ignore (Faraday.drain t.faraday : int)
160+
150161
let close t =
151162
Serialize.Writer.unyield t.writer;
152163
Faraday.close t.faraday;
@@ -156,18 +167,24 @@ module Writer = struct
156167

157168
let transfer_to_writer t writer ~max_frame_size ~max_bytes stream_id =
158169
let faraday = t.faraday in
159-
match Faraday.operation faraday with
160-
| `Yield | `Close -> 0
161-
| `Writev iovecs ->
162-
let buffered = t.buffered_bytes in
163-
let iovecs = Httpun_types.IOVec.shiftv iovecs !buffered in
164-
let lengthv = Httpun_types.IOVec.lengthv iovecs in
165-
let writev_len = if max_bytes < lengthv then max_bytes else lengthv in
166-
buffered := !buffered + writev_len;
167-
let frame_info = Writer.make_frame_info ~max_frame_size stream_id in
168-
Writer.schedule_iovecs writer frame_info ~len:writev_len iovecs;
169-
Writer.flush writer (fun () ->
170-
Faraday.shift faraday writev_len;
171-
buffered := !buffered - writev_len);
172-
writev_len
170+
if Serialize.Writer.is_closed t.writer
171+
then (
172+
close_and_drain t;
173+
0)
174+
else
175+
match Faraday.operation faraday with
176+
| `Yield | `Close -> 0
177+
| `Writev iovecs ->
178+
let iovecs = Httpun_types.IOVec.shiftv iovecs t.buffered_bytes in
179+
let lengthv = Httpun_types.IOVec.lengthv iovecs in
180+
let writev_len = if max_bytes < lengthv then max_bytes else lengthv in
181+
t.buffered_bytes <- t.buffered_bytes + writev_len;
182+
let frame_info = Writer.make_frame_info ~max_frame_size stream_id in
183+
Writer.schedule_iovecs writer frame_info ~len:writev_len iovecs;
184+
Writer.flush t.writer (function
185+
| `Closed -> close_and_drain t
186+
| `Written ->
187+
Faraday.shift faraday writev_len;
188+
t.buffered_bytes <- t.buffered_bytes - writev_len);
189+
writev_len
173190
end

lib/client_connection.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ let report_error =
135135
t.did_send_go_away <- true;
136136
if error <> Error_code.NoError
137137
then t.error_handler (`Protocol_error (error, data));
138-
Writer.flush t.writer (fun () ->
138+
Writer.flush t.writer (fun _reason ->
139139
(* XXX: We need to allow lower numbered streams to complete before
140140
* shutting down. *)
141141
shutdown_rw t);

lib/h2.mli

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@
3434

3535
(** H2 is a high-performance, memory-efficient, and scalable HTTP/2
3636
implementation for OCaml. It is based on the concepts introduced http/af,
37-
and therefore uses the Angstrom and Faraday libraries to implement the
38-
parsing and serialization layers of the HTTP/2 standard. It preserves
39-
the same API as httpun wherever possible.
37+
and therefore uses the Angstrom and Faraday libraries to implement the
38+
parsing and serialization layers of the HTTP/2 standard. It preserves the
39+
same API as httpun wherever possible.
4040
41-
Not unlike httpun, the user should be familiar with HTTP, and the basic
42-
principles of memory management and vectorized IO in order to use this
43-
library. *)
41+
Not unlike httpun, the user should be familiar with HTTP, and the basic
42+
principles of memory management and vectorized IO in order to use this
43+
library. *)
4444

4545
(** {2 Basic HTTP Types} *)
4646

@@ -65,10 +65,10 @@ module Method : module type of Httpun_types.Method
6565
See {{:https://tools.ietf.org/html/rfc7231#section-6} RFC7231§6} for more
6666
details.
6767
68-
This module is a strict superset of [Httpun_types.Status]. Even though the HTTP/2
69-
specification removes support for the [Switching_protocols] status code, h2
70-
keeps it for the sake of higher level interaction between OCaml libraries
71-
that support both HTTP/1 and HTTP/2.
68+
This module is a strict superset of [Httpun_types.Status]. Even though the
69+
HTTP/2 specification removes support for the [Switching_protocols] status
70+
code, h2 keeps it for the sake of higher level interaction between OCaml
71+
libraries that support both HTTP/1 and HTTP/2.
7272
7373
See {{:https://tools.ietf.org/html/rfc7540#section-8.1.1} RFC7540§8.1.1} for
7474
more details. *)
@@ -373,7 +373,7 @@ module Body : sig
373373
the next opportunity without performing a copy. [bs] should not be
374374
modified until a subsequent call to {!flush} has successfully completed. *)
375375

376-
val flush : t -> (unit -> unit) -> unit
376+
val flush : t -> ([ `Written | `Closed ] -> unit) -> unit
377377
(** [flush t f] makes all bytes in [t] available for writing to the awaiting
378378
output channel. Once those bytes have reached that output channel, [f]
379379
will be called.
@@ -449,8 +449,8 @@ module Response : sig
449449
-> Status.t
450450
-> t
451451
(** [create ?headers status] creates an HTTP response with the given
452-
parameters. Unlike the [Response] type in httpun, h2 does not define a
453-
way for responses to carry reason phrases or protocol version.
452+
parameters. Unlike the [Response] type in httpun, h2 does not define a way
453+
for responses to carry reason phrases or protocol version.
454454
455455
See
456456
{{:https://tools.ietf.org/html/rfc7540#section-8.1.2.4} RFC7540§8.1.2.4}

lib/reqd.ml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ let unsafe_respond_with_data (t : t) response data =
215215
* reserved (local): [...] In this state, only the following transitions
216216
* are possible: The endpoint can send a HEADERS frame. This causes the
217217
* stream to open in a "half-closed (remote)" state. *)
218-
Writer.flush t.writer (fun () ->
218+
Writer.flush t.writer (fun _reason ->
219+
(* TODO(anmonteiro): different if closed? *)
219220
t.state <- Active (HalfClosed request_info, stream))
220221
| Closed _ -> assert false
221222

@@ -268,7 +269,8 @@ let unsafe_respond_with_streaming (t : t) ~flush_headers_immediately response =
268269
* reserved (local): [...] In this state, only the following transitions
269270
* are possible: The endpoint can send a HEADERS frame. This causes the
270271
* stream to open in a "half-closed (remote)" state. *)
271-
Writer.flush t.writer (fun () ->
272+
Writer.flush t.writer (fun _reason ->
273+
(* TODO(anmonteiro): different if closed? *)
272274
t.state <- Active (HalfClosed request_info, stream));
273275
response_body
274276
| Closed _ -> assert false
@@ -444,7 +446,7 @@ let close_stream (t : t) =
444446
* flag). *)
445447
Stream.reset_stream t Error_code.NoError
446448
| Active (HalfClosed _, _) ->
447-
Writer.flush t.writer (fun () -> Stream.finish_stream t Finished)
449+
Writer.flush t.writer (fun _reason -> Stream.finish_stream t Finished)
448450
| _ -> assert false)
449451
| Exn _ -> Stream.reset_stream t InternalError
450452
| Other { code; _ } -> Stream.reset_stream t code

lib/serialize.ml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,12 +589,19 @@ module Writer = struct
589589
t.wakeup <- Optional_thunk.none;
590590
Optional_thunk.call_if_some f
591591

592-
let flush t f = flush t.encoder f
592+
let flush t f =
593+
flush_with_reason t.encoder (fun reason ->
594+
let result =
595+
match reason with
596+
| Nothing_pending | Shift -> `Written
597+
| Drain -> `Closed
598+
in
599+
f result)
593600

594601
let unyield t =
595602
(* Faraday doesn't have a function to take the serializer out of a yield
596603
state. In the meantime, `flush` does it. *)
597-
flush t (fun () -> ())
604+
flush t (fun _reason -> ())
598605

599606
let yield t = Faraday.yield t.encoder
600607
let close t = Faraday.close t.encoder

0 commit comments

Comments
 (0)