Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HTTP client proxy support #742

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 110 additions & 106 deletions src/aleph/http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@
(.setConnectTimeoutMillis ^ProxyHandler handler connection-timeout))
handler))

(defn pending-proxy-connection-handler [response-stream]
(defn pending-proxy-connection-handler [early-response-d]
(netty/channel-inbound-handler
:exception-caught
([_ ctx cause]
Expand All @@ -396,7 +396,7 @@

:else
cause)]
(s/put! response-stream response)
(d/error! early-response-d response)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, so far, I was unable to even reach this code path and have it result in an error response anyway. It would probably be a good idea to come up with a bunch of test cases for the proxy functionality (currently there are none).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how untested proxy code is, I'd strongly suggest writing tests before approving.

Maybe whoever needed the proxies could lend a hand?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to come up with a test that fails as expected without the patch - sanity check welcome 🙏

Using the test as a basis, I should be able to see how we can reach the error case referenced above. Stay tuned.

;; client handler should take care of the rest
(netty/close ctx))))

Expand All @@ -408,7 +408,7 @@

(defn- add-proxy-handlers
"Inserts handlers for proxying through a server"
[^ChannelPipeline p response-stream proxy-options ssl?]
[^ChannelPipeline p early-response-d proxy-options ssl?]
(when (some? proxy-options)
(let [proxy (proxy-handler (assoc proxy-options :ssl? ssl?))]
(.addFirst p "proxy" ^ChannelHandler proxy)
Expand All @@ -420,7 +420,7 @@
"proxy"
"pending-proxy-connection"
^ChannelHandler
(pending-proxy-connection-handler response-stream)))))
(pending-proxy-connection-handler early-response-d)))))
p)

(defn- setup-http1-pipeline
Expand Down Expand Up @@ -458,7 +458,6 @@
false))
(.addLast "streamer" ^ChannelHandler (ChunkedWriteHandler.))
(.addLast "handler" ^ChannelHandler handler)
(add-proxy-handlers responses proxy-options ssl?)
(common/add-non-http-handlers logger pipeline-transform))

(log/debug "http1 client pipeline" pipeline)
Expand All @@ -473,9 +472,10 @@

Can't use an ApnHandler/ApplicationProtocolNegotiationHandler here,
because it's tricky to run Manifold code on Netty threads."
[{:keys [ssl? remote-address ssl-context ssl-endpoint-id-alg]}]
[{:keys [ssl? remote-address ssl-context ssl-endpoint-id-alg proxy-options early-response-d]}]
(fn pipeline-builder*
[^ChannelPipeline pipeline]
(add-proxy-handlers pipeline early-response-d proxy-options ssl?)
(when ssl?
(do
(.addLast pipeline
Expand Down Expand Up @@ -803,16 +803,18 @@
(some? log-activity) (netty/activity-logger "aleph-client" log-activity)
:else nil)

early-response-d (d/deferred)
pipeline-builder (make-pipeline-builder
(assoc opts
:ssl? ssl?
:ssl-context ssl-context
:ssl-endpoint-id-alg ssl-endpoint-id-alg
:remote-address remote-address
:raw-stream? raw-stream?
:response-buffer-size response-buffer-size
:logger logger
:pipeline-transform pipeline-transform))
(assoc opts
:early-response-d early-response-d
:ssl? ssl?
:ssl-context ssl-context
:ssl-endpoint-id-alg ssl-endpoint-id-alg
:remote-address remote-address
:raw-stream? raw-stream?
:response-buffer-size response-buffer-size
:logger logger
:pipeline-transform pipeline-transform))

ch-d (netty/create-client-chan
{:pipeline-builder pipeline-builder
Expand All @@ -825,108 +827,110 @@

(attach-on-close-handler ch-d on-closed)

(d/chain' ch-d
(fn setup-client
[^Channel ch]
(log/debug "Channel:" ch)

;; We know the SSL handshake must be complete because create-client wraps the
;; future with maybe-ssl-handshake-future, so we can get the negotiated
;; protocol, falling back to HTTP/1.1 by default.
(let [pipeline (.pipeline ch)
protocol (cond
ssl?
(or (-> pipeline
^SslHandler (.get ^Class SslHandler)
(.applicationProtocol))
ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed

force-h2c?
(do
(log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.")
ApplicationProtocolNames/HTTP_2)

:else
ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested
setup-opts (assoc opts
:authority authority
:ch ch
:server? false
:keep-alive? keep-alive?
:keep-alive?' keep-alive?'
:logger logger
:non-tun-proxy? non-tun-proxy?
:pipeline pipeline
:pipeline-transform pipeline-transform
:raw-stream? raw-stream?
:remote-address remote-address
:response-buffer-size response-buffer-size
:ssl-context ssl-context
:ssl? ssl?)]

(log/debug (str "Using HTTP protocol: " protocol)
{:authority authority
:ssl? ssl?
:force-h2c? force-h2c?})

;; can't use ApnHandler, because we need to coordinate with Manifold code
(let [http-req-handler
(cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
(setup-http1-client setup-opts)

(.equals ApplicationProtocolNames/HTTP_2 protocol)
(do
(http2/setup-conn-pipeline setup-opts)
(http2-req-handler setup-opts))

:else
(do
(let [msg (str "Unknown protocol: " protocol)
e (IllegalStateException. msg)]
(log/error e msg)
(netty/close ch)
(throw e))))]

;; Both Netty and Aleph are set up, unpause the pipeline
(when (.get pipeline "pause-handler")
(log/debug "Unpausing pipeline")
(.remove pipeline "pause-handler"))

(fn http-req-fn
[req]
(log/trace "http-req-fn fired")
(log/debug "client request:" (pr-str req))

;; If :aleph/close is set in the req, closes the channel and
;; returns a deferred containing the result.
(if (or (contains? req :aleph/close)
(contains? req ::close))
(-> ch (netty/close) (netty/wrap-future))

(let [t0 (System/nanoTime)
;; I suspect the below is an error for http1
;; since the shared handler might not match.
;; Should work for HTTP2, though
raw-stream? (get req :raw-stream? raw-stream?)]

(if (or (not (.isActive ch))
(not (.isOpen ch)))

(d/error-deferred
(d/alt'
early-response-d
(d/chain' ch-d
(fn setup-client
[^Channel ch]
(log/debug "Channel:" ch)

;; We know the SSL handshake must be complete because create-client wraps the
;; future with maybe-ssl-handshake-future, so we can get the negotiated
;; protocol, falling back to HTTP/1.1 by default.
(let [pipeline (.pipeline ch)
protocol (cond
ssl?
(or (-> pipeline
^SslHandler (.get ^Class SslHandler)
(.applicationProtocol))
ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed

force-h2c?
(do
(log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.")
ApplicationProtocolNames/HTTP_2)

:else
ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested
setup-opts (assoc opts
:authority authority
:ch ch
:server? false
:keep-alive? keep-alive?
:keep-alive?' keep-alive?'
:logger logger
:non-tun-proxy? non-tun-proxy?
:pipeline pipeline
:pipeline-transform pipeline-transform
:raw-stream? raw-stream?
:remote-address remote-address
:response-buffer-size response-buffer-size
:ssl-context ssl-context
:ssl? ssl?)]

(log/debug (str "Using HTTP protocol: " protocol)
{:authority authority
:ssl? ssl?
:force-h2c? force-h2c?})

;; can't use ApnHandler, because we need to coordinate with Manifold code
(let [http-req-handler
(cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
(setup-http1-client setup-opts)

(.equals ApplicationProtocolNames/HTTP_2 protocol)
(do
(http2/setup-conn-pipeline setup-opts)
(http2-req-handler setup-opts))

:else
(do
(let [msg (str "Unknown protocol: " protocol)
e (IllegalStateException. msg)]
(log/error e msg)
(netty/close ch)
(throw e))))]

;; Both Netty and Aleph are set up, unpause the pipeline
(when (.get pipeline "pause-handler")
(log/debug "Unpausing pipeline")
(.remove pipeline "pause-handler"))

(fn http-req-fn
[req]
(log/trace "http-req-fn fired")
(log/debug "client request:" (pr-str req))

;; If :aleph/close is set in the req, closes the channel and
;; returns a deferred containing the result.
(if (or (contains? req :aleph/close)
(contains? req ::close))
(-> ch (netty/close) (netty/wrap-future))

(let [t0 (System/nanoTime)
;; I suspect the below is an error for http1
;; since the shared handler might not match.
;; Should work for HTTP2, though
raw-stream? (get req :raw-stream? raw-stream?)]

(if (or (not (.isActive ch))
(not (.isOpen ch)))

(d/error-deferred
(ex-info "Channel is inactive/closed."
{:req req
:ch ch
:open? (.isOpen ch)
:active? (.isActive ch)}))

(-> (http-req-handler req)
(d/chain' (rsp-handler
(-> (http-req-handler req)
(d/chain' (rsp-handler
{:ch ch
:keep-alive? keep-alive? ; why not keep-alive?'
:raw-stream? raw-stream?
:req req
:response-buffer-size response-buffer-size
:t0 t0})))))))))))))
:t0 t0}))))))))))))))



Expand Down
4 changes: 0 additions & 4 deletions src/aleph/http/http2.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1251,17 +1251,13 @@
[{:keys [^ChannelPipeline pipeline
handler
server?
proxy-options
logger
http2-stream-pipeline-transform
max-request-body-size
compression?
compression-options]}]
(log/trace "setup-stream-pipeline called")

(when (some? proxy-options)
(throw (IllegalArgumentException. "Proxying HTTP/2 messages not supported yet")))

(cond-> pipeline

;; necessary for multipart support in HTTP/2?
Expand Down
53 changes: 52 additions & 1 deletion test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
StandardCompressionOptions)
(io.netty.handler.codec.http
HttpMessage
HttpObjectAggregator)
HttpObjectAggregator
HttpRequestDecoder
LastHttpContent)
(io.netty.handler.codec.http2 Http2HeadersFrame)
(java.io
Closeable
Expand Down Expand Up @@ -1656,6 +1658,55 @@
(is (= true (deref conn-closed 1000 :timeout)))
(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))

(deftest test-client-proxy-support
(with-http-ssl-servers basic-handler {}
(let [proxy-request (d/deferred)
proxy-server (tcp/start-server
(fn [stream _]
(d/future
(try
(let [stream-ch (:aleph/channel (meta stream))
req (http.core/netty-request->ring-request
@(d/timeout! (s/take! stream) 1000)
false
stream-ch
nil)]
(assert (instance? LastHttpContent @(d/timeout! (s/take! stream) 1000)))
(-> stream-ch .pipeline (.remove "http-request-decoder"))
(when (= :connect (:request-method req))
(when-let [[host port] (some-> req :uri (str/split #":" 2))]
(when-let [client (-> (tcp/client {:host host
:port (parse-long port)
:connect-timeout 1000})
(d/timeout! 2000)
deref)]
(s/put! stream (netty/to-byte-buf "HTTP/1.0 200 OK\r\n\r\n"))
(s/connect stream client {:upstream? true :downstream? true})
(s/connect client stream {:upstream? true :downstream? true}))))
(d/success! proxy-request req))
(catch Throwable e
(d/error! proxy-request e)))))
{:port 0
:shutdown-timeout 0
:raw-stream? true
:pipeline-transform (fn [^ChannelPipeline p]
(.addFirst p "http-request-decoder" (HttpRequestDecoder.)))})]
(binding [*connection-options* {:proxy-options {:host "localhost"
:port (netty/port proxy-server)}}]
(with-server proxy-server
(is (= string-response (some-> (http-get "/string")
(d/timeout! 1000)
deref
:body
bs/to-string)))
(is (= {:request-method :connect
:uri (str "localhost:" port)
:headers {"host" (str "localhost:" port)
"proxy-connection" "Keep-Alive"}
:body nil}
(select-keys @(d/timeout! proxy-request 1000)
[:request-method :uri :headers :body]))))))))

(deftest ^:leak test-leak-in-raw-stream-handler
;; NOTE: Expecting 2 leaks because `with-raw-handler` will run its body for both http1 and
;; http2. It would be nicer to put this assertion into the body but the http1 server seems to
Expand Down