Skip to content

Commit ab69675

Browse files
hanidamlajmeta-codesync[bot]
authored andcommitted
add WebTransport http/2 support to proxygen/lib
Summary: This diff adds support for WebTransport over http/2 in proxygen/lib for parity with proxygen::coro (recently added). Designed to reuse as much code as possible with proxygen::coro (WtUtils, WtIngressCb, WtEgressCb, etc.) Diff is admittedly a bit messy, but the abstractions in legacy proxygen/lib aren't as clean as proxygen::coro (pretty much no way around it). Unit tests ported over from proxygen::coro Reviewed By: joanna-jo, afrind Differential Revision: D90333392 fbshipit-source-id: 7566379a6b6e74ba7f610398eb105d0bcc9fc4d6
1 parent 503942b commit ab69675

13 files changed

Lines changed: 1493 additions & 30 deletions

proxygen/lib/http/session/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ proxygen_add_library(proxygen_http_session_session
1313
DEPS
1414
proxygen_http_codec_http1x_codec
1515
proxygen_http_session_stats
16+
proxygen_http_webtransport_webtransport_session
1617
fizz::fizz
1718
Folly::folly_conv
1819
Folly::folly_cpp_attributes
@@ -184,6 +185,8 @@ proxygen_add_library(proxygen_http_session_http_upstream_session
184185
SRCS
185186
HTTPUpstreamSession.cpp
186187
DEPS
188+
proxygen_http_webtransport_httpwebtransport
189+
proxygen_http_webtransport_wt_util
187190
wangle::wangle_acceptor_managed
188191
Folly::folly_io_async_async_ssl_socket
189192
EXPORTED_DEPS

proxygen/lib/http/session/HTTPSession.cpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <proxygen/lib/http/codec/HTTPChecks.h>
2222
#include <proxygen/lib/http/session/HTTPSessionController.h>
2323
#include <proxygen/lib/http/session/HTTPSessionStats.h>
24+
#include <proxygen/lib/http/webtransport/WebTransportSession.h>
25+
#include <proxygen/lib/http/webtransport/WtUtils.h>
2426
#include <wangle/acceptor/ConnectionManager.h>
2527

2628
using fizz::AsyncFizzBase;
@@ -46,6 +48,7 @@ static constexpr folly::StringPiece kClientLabel =
4648
"EXPORTER HTTP CERTIFICATE client";
4749
static constexpr folly::StringPiece kServerLabel =
4850
"EXPORTER HTTP CERTIFICATE server";
51+
4952
} // anonymous namespace
5053

5154
namespace proxygen {
@@ -275,6 +278,8 @@ std::chrono::milliseconds HTTPSession::getDrainTimeout() const {
275278
void HTTPSession::startNow() {
276279
CHECK(!started_);
277280
started_ = true;
281+
detail::setEgressWtHttpSettings(codec_->getEgressSettings());
282+
278283
codec_->generateSettings(writeBuf_);
279284
if (connFlowControl_) {
280285
connFlowControl_->setReceiveWindowSize(writeBuf_,
@@ -1449,6 +1454,44 @@ void HTTPSession::sendHeaders(HTTPTransaction* txn,
14491454
observer->requestStarted(observed, event);
14501455
});
14511456
}
1457+
// terminate WebTransport if connect stream, wtHandler_ set and upstream or
1458+
// 2xx resp
1459+
auto& wtCtx = txn->wtCtx_;
1460+
const bool upgraded = txn->isWebTransportConnectStream() &&
1461+
(headers.isRequest() || headers.is2xxResponse());
1462+
const bool makeWtSession = !includeEOM && upgraded && wtCtx.hasWtHandler();
1463+
VLOG(6) << "eom=" << includeEOM << "; upgraded=" << upgraded
1464+
<< "; wtConnect=" << txn->isWebTransportConnectStream()
1465+
<< "; supportsWebTransport=" << supportsWebTransport()
1466+
<< "; wtHandler=" << wtCtx.hasWtHandler();
1467+
if (!makeWtSession) {
1468+
return;
1469+
}
1470+
/**
1471+
* for downstream:
1472+
* Detach existing txn handler with a custom WtHandler to transparently
1473+
* enable wt.
1474+
*
1475+
* for upstream:
1476+
* Implementation detail, but all wt requests should go thru
1477+
* HTTPUpstreamSession::sendWtReq. The existing handler still needs to be
1478+
* notified of ::onHeadersComplete or ::onError (whichever happens first)
1479+
* to resolve the Promise/Future returned to the application.
1480+
*/
1481+
if (auto* prevHandler = txn->getHandler()) {
1482+
prevHandler->detachTransaction();
1483+
}
1484+
txn->setHandler(nullptr); // clear existing handler
1485+
// make wt session
1486+
detail::WtDir dir =
1487+
isUpstream() ? detail::WtDir::Client : detail::WtDir::Server;
1488+
auto wtConfig = detail::getWtConfig(codec_->getIngressSettings(),
1489+
codec_->getEgressSettings());
1490+
auto wtSession = detail::H2WtSession::make(
1491+
getEventBase(), dir, wtConfig, wtCtx.moveWtHandler());
1492+
// ::init notifies WtHandler of WtSession
1493+
wtSession->init(wtSession, isUpstream() ? wtCtx.moveWtCallback() : nullptr);
1494+
txn->setHandler(&wtSession->getTxnHandler());
14521495
}
14531496

14541497
void HTTPSession::commonEom(HTTPTransaction* txn,
@@ -2946,4 +2989,10 @@ void HTTPSession::invokeOnAllTransactions(
29462989
}
29472990
}
29482991

2992+
bool HTTPSession::supportsWebTransport() const noexcept {
2993+
const auto& codec = getCodec();
2994+
return detail::supportsH2Wt(
2995+
{codec.getIngressSettings(), codec.getEgressSettings()});
2996+
}
2997+
29492998
} // namespace proxygen

proxygen/lib/http/session/HTTPSession.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,7 @@ class HTTPSession
501501
const HTTPCodec& getCodec() const noexcept override {
502502
return codec_.getChainEnd();
503503
}
504+
bool supportsWebTransport() const noexcept final;
504505

505506
/**
506507
* Returns the underlying AsyncTransport.

proxygen/lib/http/session/HTTPTransaction.cpp

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ inline ErrorCode getDefaultAbortErrorCode(bool isUpstream) {
4747
return isUpstream ? ErrorCode::CANCEL : ErrorCode::INTERNAL_ERROR;
4848
}
4949

50+
bool isConnectUdp(const HTTPMessage& msg) noexcept {
51+
return msg.getMethod() == HTTPMethod::CONNECT && msg.getUpgradeProtocol() &&
52+
*msg.getUpgradeProtocol() == "connect-udp";
53+
}
54+
5055
} // namespace
5156

5257
#define INVARIANT_RETURN(X, Y) \
@@ -217,9 +222,7 @@ void HTTPTransaction::onIngressHeadersComplete(
217222
headRequest_ = (method == HTTPMethod::HEAD);
218223
upgraded_ = (method == HTTPMethod::CONNECT);
219224
wtConnectStream_ = HTTPWebTransport::isConnectMessage(*msg);
220-
connectUdpStream_ = msg->getMethod() == HTTPMethod::CONNECT &&
221-
msg->getUpgradeProtocol() &&
222-
*msg->getUpgradeProtocol() == "connect-udp";
225+
connectUdpStream_ = isConnectUdp(*msg);
223226
}
224227

225228
if ((msg->isRequest() && msg->getMethod() != HTTPMethod::CONNECT) ||
@@ -1026,12 +1029,11 @@ void HTTPTransaction::sendHeadersWithOptionalEOM(const HTTPMessage& headers,
10261029
if (!headers.isRequest() && !isPushed()) {
10271030
lastResponseStatus_ = headers.getStatusCode();
10281031
}
1032+
10291033
if (headers.isRequest()) {
10301034
headRequest_ = (headers.getMethod() == HTTPMethod::HEAD);
10311035
wtConnectStream_ = HTTPWebTransport::isConnectMessage(headers);
1032-
connectUdpStream_ = headers.getMethod() == HTTPMethod::CONNECT &&
1033-
headers.getUpgradeProtocol() &&
1034-
*headers.getUpgradeProtocol() == "connect-udp";
1036+
connectUdpStream_ = isConnectUdp(headers);
10351037
} else {
10361038
has1xxResponse_ = headers.is1xxResponse();
10371039
}
@@ -1101,6 +1103,16 @@ void HTTPTransaction::sendHeaders(const HTTPMessage& header) {
11011103
sendHeadersWithOptionalEOM(header, false);
11021104
}
11031105

1106+
void HTTPTransaction::sendWtHeaders(
1107+
const HTTPMessage& headers,
1108+
WebTransportHandler::Ptr wtHandler,
1109+
HttpWtClientCallbackPtr wtClientCb) noexcept {
1110+
wtCtx_.wtHandler_ = std::move(wtHandler);
1111+
wtCtx_.upstreamWtCb_ = std::move(wtClientCb);
1112+
CHECK(wtCtx_.wtHandler_);
1113+
sendHeadersWithOptionalEOM(headers, /*eom=*/false);
1114+
}
1115+
11041116
void HTTPTransaction::sendBody(std::unique_ptr<folly::IOBuf> body) {
11051117
DestructorGuard guard(this);
11061118
bool chunking =

proxygen/lib/http/session/HTTPTransaction.h

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,9 @@ class HTTPTransactionTransportCallback {
430430
};
431431

432432
class HTTPSessionBase;
433+
struct HttpWtClientCallbackIf;
434+
using HttpWtClientCallbackPtr = std::unique_ptr<HttpWtClientCallbackIf>;
435+
433436
class HTTPTransaction
434437
: public folly::HHWheelTimer::Callback
435438
, public folly::DelayedDestructionBase
@@ -1170,6 +1173,26 @@ class HTTPTransaction
11701173
virtual void sendHeadersWithEOM(const HTTPMessage& headers);
11711174
virtual void sendHeadersWithOptionalEOM(const HTTPMessage& headers, bool eom);
11721175

1176+
/**
1177+
* Variant of send headers specifically for webtransport requests/responses
1178+
*
1179+
* This api will terminate WebTransport. If 200/ok resp is
1180+
* sent, the existing application's HTTPTransactionHandler will be detached
1181+
* and replaced internally. Applications should solely interact with the
1182+
* WebTransport session via the WebTransportHandler & WebTransportSession
1183+
* (passed into WebTransportHandler::onWebTransportSession) pair.
1184+
*
1185+
* If an endpoint/proxy wants to bypass parsing WebTransport bits (e.g. to
1186+
* enable http/2 end-to-end CONNECT tunneling), it can use the non-wt
1187+
* HttpTxn::sendHeaders
1188+
*
1189+
* HttpWtCallbackPtr is solely for upstream -- it's a mechanism to be notified
1190+
* of ingress headers/error (whichever occurs first).
1191+
*/
1192+
void sendWtHeaders(const HTTPMessage& headers,
1193+
WebTransportHandler::Ptr wtHandler,
1194+
HttpWtClientCallbackPtr wtClientCb = nullptr) noexcept;
1195+
11731196
/**
11741197
* Send part or all of the egress message body to the Transport. If flow
11751198
* control is enabled, the chunk boundaries may not be respected.
@@ -1576,7 +1599,7 @@ class HTTPTransaction
15761599

15771600
folly::Optional<ConnectionToken> getConnectionToken() const noexcept;
15781601

1579-
bool isWebTransportConnectStream() {
1602+
bool isWebTransportConnectStream() const {
15801603
return transport_.supportsWebTransport() && wtConnectStream_;
15811604
}
15821605

@@ -1634,6 +1657,29 @@ class HTTPTransaction
16341657
return &txnObserverAccessor_;
16351658
}
16361659

1660+
// only for consumption by the backing sessions (e.g. HTTPSession, HQSession)
1661+
// avoids applications messing with wt state
1662+
struct WtCtx {
1663+
friend class HTTPTransaction;
1664+
friend class HTTPSession;
1665+
friend class HQSession;
1666+
1667+
private:
1668+
bool hasWtHandler() const noexcept {
1669+
return bool(wtHandler_);
1670+
}
1671+
WebTransportHandler::Ptr moveWtHandler() noexcept {
1672+
return std::move(wtHandler_);
1673+
}
1674+
1675+
HttpWtClientCallbackPtr moveWtCallback() noexcept {
1676+
return std::move(upstreamWtCb_);
1677+
}
1678+
1679+
WebTransportHandler::Ptr wtHandler_;
1680+
HttpWtClientCallbackPtr upstreamWtCb_;
1681+
} wtCtx_;
1682+
16371683
private:
16381684
HTTPTransaction(const HTTPTransaction&) = delete;
16391685
HTTPTransaction& operator=(const HTTPTransaction&) = delete;
@@ -2013,6 +2059,14 @@ class HTTPTransaction
20132059
HTTPTransactionObserverContainer txnObserverContainer_;
20142060
};
20152061

2062+
struct HttpWtClientCallbackIf {
2063+
virtual ~HttpWtClientCallbackIf() noexcept = default;
2064+
virtual void onHeaders(std::unique_ptr<HTTPMessage>) noexcept {
2065+
}
2066+
virtual void onErr(const HTTPException&) noexcept {
2067+
}
2068+
};
2069+
20162070
/**
20172071
* Write a description of an HTTPTransaction to an ostream
20182072
*/

proxygen/lib/http/session/HTTPUpstreamSession.cpp

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
#include <proxygen/lib/http/session/HTTPUpstreamSession.h>
1010

11+
#include <proxygen/lib/http/webtransport/HTTPWebTransport.h>
12+
#include <proxygen/lib/http/webtransport/WtUtils.h>
13+
1114
#include <folly/io/async/AsyncSSLSocket.h>
1215
#include <wangle/acceptor/ConnectionManager.h>
1316

@@ -103,10 +106,7 @@ void HTTPUpstreamSession::startNow() {
103106
HTTPTransaction* HTTPUpstreamSession::newTransaction(
104107
HTTPTransaction::Handler* handler) {
105108
auto txn = newTransactionWithError(handler);
106-
if (txn.hasError()) {
107-
return nullptr;
108-
}
109-
return txn.value();
109+
return txn.value_or(nullptr);
110110
}
111111

112112
folly::Expected<HTTPTransaction*, HTTPUpstreamSession::NewTransactionError>
@@ -224,6 +224,109 @@ void HTTPUpstreamSession::detachThreadLocals(bool detachSSLContext) {
224224
}
225225
}
226226

227+
namespace {
228+
229+
constexpr std::string_view kWtNotSupported = "WebTransport not supported";
230+
constexpr std::string_view kInvalidWtReq = "Invalid WebTransport request";
231+
constexpr std::string_view kStreamFailed = "Failed to create stream";
232+
233+
folly::exception_wrapper makeHttpEx(const std::string& err) noexcept {
234+
constexpr auto kExDir = HTTPException::Direction::INGRESS_AND_EGRESS;
235+
return folly::make_exception_wrapper<HTTPException>(kExDir, err);
236+
}
237+
238+
using WtReqResult = std::unique_ptr<HTTPMessage>;
239+
using WtReqResultPromise = folly::Promise<WtReqResult>;
240+
folly::Promise<WtReqResult> emptyWtReqPromise() noexcept {
241+
return folly::Promise<WtReqResult>::makeEmpty();
242+
}
243+
244+
class WtClientCallback final
245+
: public HttpWtClientCallbackIf
246+
, public HTTPTransactionHandler {
247+
private:
248+
folly::Promise<WtReqResult> promise{emptyWtReqPromise()};
249+
250+
public:
251+
explicit WtClientCallback(WtReqResultPromise p) noexcept
252+
: promise(std::move(p)) {
253+
}
254+
~WtClientCallback() noexcept override = default;
255+
folly::Promise<WtReqResult> resetPromise() noexcept {
256+
return std::exchange(promise, emptyWtReqPromise());
257+
}
258+
/**
259+
* Either ::onHeaders or ::onErr can be invoked first
260+
*
261+
* When ::onHeadersComplete is invoked, resolve the promise with the non-final
262+
* http headers
263+
*
264+
* When ::onError is invoked, resolve the promise with HTTPException
265+
*/
266+
void onHeaders(std::unique_ptr<HTTPMessage> msg) noexcept override {
267+
if (msg->isFinal()) {
268+
auto p = resetPromise();
269+
CHECK(p.valid());
270+
p.setValue(std::move(msg));
271+
}
272+
}
273+
void onErr(const HTTPException& ex) noexcept override {
274+
auto p = resetPromise();
275+
CHECK(p.valid());
276+
p.setException(ex);
277+
}
278+
279+
// **ignored**
280+
void onHeadersComplete(std::unique_ptr<HTTPMessage>) noexcept override {
281+
}
282+
void onError(const HTTPException&) noexcept override {
283+
}
284+
void detachTransaction() noexcept override {
285+
}
286+
void setTransaction(HTTPTransaction*) noexcept override {
287+
}
288+
void onBody(std::unique_ptr<folly::IOBuf>) noexcept override {
289+
}
290+
void onTrailers(std::unique_ptr<HTTPHeaders>) noexcept override {
291+
}
292+
void onEOM() noexcept override {
293+
}
294+
void onUpgrade(UpgradeProtocol) noexcept override {
295+
}
296+
void onEgressPaused() noexcept override {
297+
}
298+
void onEgressResumed() noexcept override {
299+
}
300+
};
301+
302+
} // namespace
303+
304+
folly::SemiFuture<WtReqResult> HTTPUpstreamSession::sendWebTransportRequest(
305+
const HTTPMessage& req, WebTransportHandler::Ptr wtHandler) noexcept {
306+
// both self and peer must indicate support for WebTransport
307+
const bool supportsWt = proxygen::detail::supportsH2Wt(
308+
{codec_->getIngressSettings(), codec_->getEgressSettings()});
309+
const bool validWtReq = HTTPWebTransport::isConnectMessage(req);
310+
if (!(supportsWt && validWtReq)) {
311+
auto err = !validWtReq ? kInvalidWtReq : kWtNotSupported;
312+
VLOG(6) << __func__ << " err=" << err << "; sess=" << *this;
313+
return makeHttpEx(std::string(err));
314+
}
315+
316+
auto [p, f] = folly::makePromiseContract<WtReqResult>();
317+
auto wtClientCb = std::make_unique<WtClientCallback>(std::move(p));
318+
319+
auto* txn = newTransaction(wtClientCb.get());
320+
if (!txn) {
321+
return makeHttpEx(std::string(kStreamFailed));
322+
}
323+
324+
// send wt upgrade req
325+
txn->setHandler(nullptr); // clear handler, will be replaced by sendWtHeaders
326+
txn->sendWtHeaders(req, std::move(wtHandler), std::move(wtClientCb));
327+
return std::move(f);
328+
}
329+
227330
void HTTPUpstreamSession::maybeDetachSSLContext() const {
228331
#ifndef NO_ASYNCSSLSOCKET
229332
auto sslSocket = sock_->getUnderlyingTransport<folly::AsyncSSLSocket>();

0 commit comments

Comments
 (0)