Skip to content

Commit 13b2abb

Browse files
afrindfacebook-github-bot
authored andcommitted
Request ID part 2 - increment by 2 (facebookexperimental#54)
Summary: Pull Request resolved: facebookexperimental#54 Differential Revision: D74530554
1 parent af19b21 commit 13b2abb

File tree

3 files changed

+77
-24
lines changed

3 files changed

+77
-24
lines changed

moxygen/MoQSession.cpp

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,7 +1559,8 @@ folly::coro::Task<ServerSetup> MoQSession::setup(ClientSetup setup) {
15591559
XLOG(ERR) << "writeClientSetup failed sess=" << this;
15601560
co_yield folly::coro::co_error(std::runtime_error("Failed to write setup"));
15611561
}
1562-
maxRequestID_ = maxConcurrentRequests_ = maxRequestID;
1562+
maxRequestID_ = maxRequestID;
1563+
maxConcurrentRequests_ = maxRequestID_ / getRequestIDMultiplier();
15631564
controlWriteEvent_.signal();
15641565

15651566
auto deletedToken = cancellationSource_.getToken();
@@ -1618,6 +1619,9 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) {
16181619
}
16191620
initializeNegotiatedVersion(serverSetup->selectedVersion);
16201621
auto maxRequestID = getMaxRequestIDIfPresent(serverSetup->params);
1622+
if (getDraftMajorVersion(serverSetup->selectedVersion) < 11) {
1623+
nextRequestID_ = 0;
1624+
}
16211625
// TODO: clamp egress max auth token cache size
16221626
controlCodec_.setMaxAuthTokenCacheSize(
16231627
getMaxAuthTokenCacheSizeIfPresent(serverSetup->params));
@@ -1628,7 +1632,8 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) {
16281632
close(SessionCloseErrorCode::VERSION_NEGOTIATION_FAILED);
16291633
return;
16301634
}
1631-
maxRequestID_ = maxConcurrentRequests_ = maxRequestID;
1635+
maxRequestID_ = maxRequestID;
1636+
maxConcurrentRequests_ = maxRequestID_ / getRequestIDMultiplier();
16321637
setupComplete_ = true;
16331638
controlWriteEvent_.signal();
16341639
}
@@ -2028,7 +2033,7 @@ void MoQSession::onSubscribe(SubscribeRequest subscribeRequest) {
20282033
XLOG(DBG1) << __func__ << " ftn=" << subscribeRequest.fullTrackName
20292034
<< " sess=" << this;
20302035
const auto requestID = subscribeRequest.requestID;
2031-
if (closeSessionIfRequestIDInvalid(requestID)) {
2036+
if (closeSessionIfRequestIDInvalid(requestID, false, true)) {
20322037
return;
20332038
}
20342039

@@ -2118,7 +2123,7 @@ void MoQSession::onSubscribeUpdate(SubscribeUpdate subscribeUpdate) {
21182123
XLOG(ERR) << "No matching subscribe ID=" << requestID << " sess=" << this;
21192124
return;
21202125
}
2121-
if (closeSessionIfRequestIDInvalid(requestID)) {
2126+
if (closeSessionIfRequestIDInvalid(requestID, false, false)) {
21222127
return;
21232128
}
21242129

@@ -2142,7 +2147,7 @@ void MoQSession::onSubscribeUpdate(SubscribeUpdate subscribeUpdate) {
21422147

21432148
void MoQSession::onUnsubscribe(Unsubscribe unsubscribe) {
21442149
XLOG(DBG1) << __func__ << " id=" << unsubscribe.requestID << " sess=" << this;
2145-
if (closeSessionIfRequestIDInvalid(unsubscribe.requestID)) {
2150+
if (closeSessionIfRequestIDInvalid(unsubscribe.requestID, false, false)) {
21462151
return;
21472152
}
21482153
if (!publishHandler_) {
@@ -2267,7 +2272,7 @@ void MoQSession::onRequestsBlocked(RequestsBlocked requestsBlocked) {
22672272
// Increment the maxRequestID_ by the number of pending closed subscribes
22682273
// and send a new MaxRequestID.
22692274
if (requestsBlocked.maxRequestID >= maxRequestID_ && closedRequests_ > 0) {
2270-
maxRequestID_ += closedRequests_;
2275+
maxRequestID_ += (closedRequests_ * getRequestIDMultiplier());
22712276
closedRequests_ = 0;
22722277
sendMaxRequestID(true);
22732278
}
@@ -2280,7 +2285,7 @@ void MoQSession::onFetch(Fetch fetch) {
22802285
: folly::to<std::string>("joining=", joining->joiningRequestID.value);
22812286
XLOG(DBG1) << __func__ << " (" << logStr << ") sess=" << this;
22822287
const auto requestID = fetch.requestID;
2283-
if (closeSessionIfRequestIDInvalid(requestID)) {
2288+
if (closeSessionIfRequestIDInvalid(requestID, false, true)) {
22842289
return;
22852290
}
22862291
if (standalone) {
@@ -2384,6 +2389,9 @@ folly::coro::Task<void> MoQSession::handleFetch(
23842389

23852390
void MoQSession::onFetchCancel(FetchCancel fetchCancel) {
23862391
XLOG(DBG1) << __func__ << " id=" << fetchCancel.requestID << " sess=" << this;
2392+
if (closeSessionIfRequestIDInvalid(fetchCancel.requestID, false, false)) {
2393+
return;
2394+
}
23872395
auto pubTrackIt = pubTracks_.find(fetchCancel.requestID);
23882396
if (pubTrackIt == pubTracks_.end()) {
23892397
XLOG(DBG4) << "No publish key for fetch id=" << fetchCancel.requestID
@@ -2436,7 +2444,9 @@ void MoQSession::onFetchError(FetchError fetchError) {
24362444
void MoQSession::onAnnounce(Announce ann) {
24372445
XLOG(DBG1) << __func__ << " ns=" << ann.trackNamespace << " sess=" << this;
24382446
if (closeSessionIfRequestIDInvalid(
2439-
ann.requestID, getDraftMajorVersion(*getNegotiatedVersion()) < 11)) {
2447+
ann.requestID,
2448+
getDraftMajorVersion(*getNegotiatedVersion()) < 11,
2449+
true)) {
24402450
return;
24412451
}
24422452
if (!subscribeHandler_) {
@@ -2588,7 +2598,9 @@ void MoQSession::onSubscribeAnnounces(SubscribeAnnounces sa) {
25882598
XLOG(DBG1) << __func__ << " prefix=" << sa.trackNamespacePrefix
25892599
<< " sess=" << this;
25902600
if (closeSessionIfRequestIDInvalid(
2591-
sa.requestID, getDraftMajorVersion(*getNegotiatedVersion()) < 11)) {
2601+
sa.requestID,
2602+
getDraftMajorVersion(*getNegotiatedVersion()) < 11,
2603+
true)) {
25922604
return;
25932605
}
25942606
if (!publishHandler_) {
@@ -2701,7 +2713,8 @@ void MoQSession::onTrackStatusRequest(TrackStatusRequest trackStatusRequest) {
27012713
<< " sess=" << this;
27022714
if (closeSessionIfRequestIDInvalid(
27032715
trackStatusRequest.requestID,
2704-
getDraftMajorVersion(*getNegotiatedVersion()) < 11)) {
2716+
getDraftMajorVersion(*getNegotiatedVersion()) < 11,
2717+
true)) {
27052718
return;
27062719
}
27072720
if (!publishHandler_) {
@@ -3050,7 +3063,9 @@ RequestID MoQSession::getNextRequestID(bool legacyAction) {
30503063
<< nextRequestID_ << " peerMaxRequestID_=" << peerMaxRequestID_
30513064
<< " sess=" << this;
30523065
}
3053-
return nextRequestID_++;
3066+
auto ret = nextRequestID_;
3067+
nextRequestID_ += getRequestIDMultiplier();
3068+
return ret;
30543069
}
30553070

30563071
folly::coro::Task<Publisher::SubscribeResult> MoQSession::subscribe(
@@ -3224,7 +3239,7 @@ void MoQSession::retireRequestID(bool signalWriteLoop) {
32243239
// If # of closed requests is greater than 1/2 of max requests, then
32253240
// let's bump the maxRequestID by the number of closed requests.
32263241
if (++closedRequests_ >= maxConcurrentRequests_ / 2) {
3227-
maxRequestID_ += closedRequests_;
3242+
maxRequestID_ += (closedRequests_ * getRequestIDMultiplier());
32283243
closedRequests_ = 0;
32293244
sendMaxRequestID(signalWriteLoop);
32303245
}
@@ -3412,9 +3427,6 @@ void MoQSession::fetchError(const FetchError& fetchErr) {
34123427

34133428
void MoQSession::fetchCancel(const FetchCancel& fetchCan) {
34143429
XLOG(DBG1) << __func__ << " sess=" << this;
3415-
if (closeSessionIfRequestIDInvalid(fetchCan.requestID)) {
3416-
return;
3417-
}
34183430
auto trackIt = fetches_.find(fetchCan.requestID);
34193431
if (trackIt == fetches_.end()) {
34203432
XLOG(ERR) << "unknown subscribe ID=" << fetchCan.requestID
@@ -3544,12 +3556,39 @@ void MoQSession::onDatagram(std::unique_ptr<folly::IOBuf> datagram) {
35443556

35453557
bool MoQSession::closeSessionIfRequestIDInvalid(
35463558
RequestID requestID,
3547-
bool skipCheck) {
3548-
if (!skipCheck && maxRequestID_ <= requestID.value) {
3549-
XLOG(ERR) << "Invalid requestID: " << requestID << " sess=" << this;
3550-
close(SessionCloseErrorCode::TOO_MANY_REQUESTS);
3559+
bool skipCheck,
3560+
bool isNewRequest) {
3561+
if (skipCheck) {
3562+
return false;
3563+
}
3564+
3565+
if (getDraftMajorVersion(*getNegotiatedVersion()) >= 11 &&
3566+
((requestID.value % 2) == 1) !=
3567+
(dir_ == MoQControlCodec::Direction::CLIENT)) {
3568+
XLOG(ERR) << "Invalid requestID parity: " << requestID << " sess=" << this;
3569+
close(SessionCloseErrorCode::INVALID_REQUEST_ID);
35513570
return true;
35523571
}
3572+
if (isNewRequest) {
3573+
if (requestID.value >= maxRequestID_) {
3574+
XLOG(ERR) << "Too many requests requestID: " << requestID
3575+
<< " sess=" << this;
3576+
close(SessionCloseErrorCode::TOO_MANY_REQUESTS);
3577+
return true;
3578+
}
3579+
if (requestID.value != nextExpectedPeerRequestID_) {
3580+
XLOG(ERR) << "Invalid next requestID: " << requestID << " sess=" << this;
3581+
close(SessionCloseErrorCode::INVALID_REQUEST_ID);
3582+
return true;
3583+
}
3584+
nextExpectedPeerRequestID_ += getRequestIDMultiplier();
3585+
} else {
3586+
if (requestID.value >= maxRequestID_) {
3587+
XLOG(ERR) << "Invalid requestID: " << requestID << " sess=" << this;
3588+
close(SessionCloseErrorCode::INVALID_REQUEST_ID);
3589+
return true;
3590+
}
3591+
}
35533592
return false;
35543593
}
35553594

moxygen/MoQSession.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class MoQSession : public MoQControlCodec::ControlCallback,
7373
: dir_(MoQControlCodec::Direction::CLIENT),
7474
wt_(wt),
7575
evb_(evb),
76+
nextRequestID_(0),
77+
nextExpectedPeerRequestID_(1),
7678
controlCodec_(dir_, this) {}
7779

7880
explicit MoQSession(
@@ -82,6 +84,8 @@ class MoQSession : public MoQControlCodec::ControlCallback,
8284
: dir_(MoQControlCodec::Direction::SERVER),
8385
wt_(wt),
8486
evb_(evb),
87+
nextRequestID_(1),
88+
nextExpectedPeerRequestID_(0),
8589
serverSetupCallback_(&serverSetupCallback),
8690
controlCodec_(dir_, this) {}
8791

@@ -396,13 +400,17 @@ class MoQSession : public MoQControlCodec::ControlCallback,
396400
// requestID <= maxRequestID_;
397401
bool closeSessionIfRequestIDInvalid(
398402
RequestID requestID,
399-
bool skipCheck = false);
403+
bool skipCheck,
404+
bool isNewRequest);
400405

401406
void initializeNegotiatedVersion(uint64_t negotiatedVersion);
402407
void aliasifyAuthTokens(std::vector<TrackRequestParameter>& params);
403408
RequestID getRequestID(RequestID id, const FullTrackName& ftn);
404409
RequestID getNextRequestID(bool legacyAction = false);
405410
void addLegacyRequestIDMapping(const FullTrackName& ftn, RequestID id);
411+
uint8_t getRequestIDMultiplier() const {
412+
return (getDraftMajorVersion(*getNegotiatedVersion()) >= 11) ? 2 : 1;
413+
}
406414

407415
MoQControlCodec::Direction dir_;
408416
folly::MaybeManagedPtr<proxygen::WebTransport> wt_;
@@ -489,6 +497,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,
489497
// RequestID must be a unique monotonically increasing number that is
490498
// less than maxRequestID.
491499
uint64_t nextRequestID_{0};
500+
uint64_t nextExpectedPeerRequestID_{0};
492501
// For request IDs for messages that didn't use subscribe ID in v<11
493502
uint64_t legacyNextRequestID_{quic::kEightByteLimit + 1};
494503
uint64_t maxRequestID_{0};

moxygen/test/MoQSessionTest.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ class MoQSessionTest : public testing::Test,
207207
return negotiatedVersion_;
208208
}
209209

210+
uint8_t getRequestIDMultiplier() const {
211+
return negotiatedVersion_ == kVersionDraft11 ? 2 : 1;
212+
}
213+
210214
using TestLogicFn = std::function<void(
211215
const SubscribeRequest& sub,
212216
std::shared_ptr<TrackConsumer> pub,
@@ -224,7 +228,7 @@ class MoQSessionTest : public testing::Test,
224228
std::shared_ptr<MockPublisher> serverPublisher{
225229
std::make_shared<MockPublisher>()};
226230
uint64_t negotiatedVersion_ = kVersionDraftCurrent;
227-
uint64_t initialMaxRequestID_{kTestMaxRequestID};
231+
uint64_t initialMaxRequestID_{kTestMaxRequestID * getRequestIDMultiplier()};
228232
bool failServerSetup_{false};
229233
bool invalidVersion_{false};
230234
std::shared_ptr<testing::StrictMock<MockFetchConsumer>> fetchCallback_;
@@ -453,6 +457,7 @@ class MoQSessionTestWithVersion11 : public MoQSessionTest {
453457
};
454458

455459
CO_TEST_F_X(MoQSessionTestWithVersion11, AbsoluteJoiningFetch) {
460+
initialMaxRequestID_ = 10;
456461
co_await setupMoQSession();
457462
expectSubscribe([](auto sub, auto pub) -> TaskSubscribeResult {
458463
for (uint32_t group = 6; group < 10; group++) {
@@ -933,9 +938,9 @@ CO_TEST_F_X(MoQSessionTest, MaxRequestID) {
933938
auto res = co_await clientSession_->subscribe(
934939
getSubscribe(kTestTrackName), trackPublisher1);
935940
co_await folly::coro::co_reschedule_on_current_executor;
936-
// This is true because initial is 2 in this test case and we grant credit
941+
// This is true because initial is 4 in this test case and we grant credit
937942
// every 50%.
938-
auto expectedSubId = 3;
943+
auto expectedSubId = 3 * getRequestIDMultiplier();
939944
EXPECT_EQ(serverSession_->maxRequestID(), expectedSubId);
940945

941946
// subscribe again but this time we get a DONE
@@ -945,7 +950,7 @@ CO_TEST_F_X(MoQSessionTest, MaxRequestID) {
945950
res = co_await clientSession_->subscribe(
946951
getSubscribe(kTestTrackName), trackPublisher2);
947952
co_await folly::coro::co_reschedule_on_current_executor;
948-
expectedSubId++;
953+
expectedSubId += getRequestIDMultiplier();
949954
EXPECT_EQ(serverSession_->maxRequestID(), expectedSubId);
950955

951956
// subscribe three more times, last one should fail, the first two will get

0 commit comments

Comments
 (0)