Skip to content

Lack of backpressure when client code does not schedule read (httpaf-lwt-unix) #140

@Lupus

Description

@Lupus

/cc @aantron

I've modified lwt_echo_post example as follows:

open Base
open Lwt.Infix
module Arg = Caml.Arg

open Httpaf_lwt_unix
module Reqd = Httpaf.Reqd
module Request = Httpaf.Request
module Headers = Httpaf.Headers
module Response = Httpaf.Response
module Body = Httpaf.Body

let slow_echo_post reqd =
  match Reqd.request reqd  with
  | { Request.meth = `POST; headers; _ } ->
    let response =
      let content_type =
        match Headers.get headers "content-type" with
        | None   -> "application/octet-stream"
        | Some x -> x
      in
      Response.create ~headers:(Headers.of_list ["content-type", content_type; "connection", "close"]) `OK
    in
    let request_body  = Reqd.request_body reqd in
    let response_body = Reqd.respond_with_streaming reqd response in
    let rec on_read buffer ~off ~len =
      Lwt.async @@ fun () -> Lwt.Infix.(
        Lwt_unix.sleep(1.0) >>= fun () ->
        Body.schedule_bigstring response_body buffer ~off ~len;
        Body.flush response_body (fun () ->
          Body.schedule_read request_body ~on_eof ~on_read);
        Lwt.return ()
      );
    and on_eof () =
      Body.close_writer response_body
    in
    Body.schedule_read (Reqd.request_body reqd) ~on_eof ~on_read
  | _ ->
    let headers = Headers.of_list [ "connection", "close" ] in
    Reqd.respond_with_string reqd (Response.create ~headers `Method_not_allowed) ""
;;

let request_handler (_ : Unix.sockaddr) = slow_echo_post (* Httpaf_examples.Server.echo_post *)
let error_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.error_handler

let main port =
  let listen_address = Unix.(ADDR_INET (inet_addr_loopback, port)) in
  Lwt.async (fun () ->
    Lwt_io.establish_server_with_client_socket
      listen_address
      (Server.create_connection_handler ~request_handler ~error_handler)
    >|= fun _server ->
      Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
      Stdio.printf "To send a POST request, try one of the following\n\n";
      Stdio.printf "  echo \"Testing echo POST\" | dune exec examples/async/async_post.exe\n";
      Stdio.printf "  echo \"Testing echo POST\" | dune exec examples/lwt/lwt_post.exe\n";
      Stdio.printf "  echo \"Testing echo POST\" | curl -XPOST --data @- http://localhost:%d\n\n%!" port);
  let forever, _ = Lwt.wait () in
  Lwt_main.run forever
;;

let () =
  let port = ref 8080 in
  Arg.parse
    ["-p", Arg.Set_int port, " Listening port number (8080 by default)"]
    ignore
    "Echoes POST requests. Runs forever.";
  main !port
;;

Upload large file to this app:

curl -H"Expect:" -XPOST -d @very_big_file -o/dev/null  http://0.0.0.0:8080/

Expected behavior would be to stop consuming data from the socket as there's nowhere to feed it, but that's not the case. I've instrumented read function from httpaf_lwt_unix.ml with a print like this:

image

During upload print statement executes constantly, memory footprint grows until full request is buffered in memory. Response is being sent back slowly due to delay as expected.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions