diff --git a/lib_eio/buf_write.ml b/lib_eio/buf_write.ml index cf5e290e..b35e79bd 100644 --- a/lib_eio/buf_write.ml +++ b/lib_eio/buf_write.ml @@ -515,21 +515,16 @@ let copy t flow = try aux () with End_of_file -> () -let with_flow ?(initial_size=0x1000) flow fn = - Switch.run ~name:"Buf_write.with_flow" @@ fun sw -> +let of_flow ~sw ?(initial_size=0x1000) flow = let t = create ~sw initial_size in + Switch.on_release sw (fun () -> close t); Fiber.fork ~sw (fun () -> copy t flow); - match fn t with - | x -> - close t; - x - | exception ex -> - close t; - (* Raising the exception will cancel the writer thread, so do a flush first. - We don't want to flush if cancelled, but in that case the switch will - end the writer thread itself (and [flush] will raise). *) - flush t; - raise ex + t + +let with_flow ?initial_size flow fn = + Switch.run ~name:"Buf_write.with_flow" @@ fun sw -> + let t = of_flow ~sw ?initial_size flow in + fn t let rec serialize t writev = match await_batch t with