@@ -1559,7 +1559,8 @@ folly::coro::Task<ServerSetup> MoQSession::setup(ClientSetup setup) {
15591559 XLOG (ERR) << " writeClientSetup failed sess=" << this ;
15601560 co_yield folly::coro::co_error (std::runtime_error (" Failed to write setup" ));
15611561 }
1562- maxRequestID_ = maxConcurrentRequests_ = maxRequestID;
1562+ maxRequestID_ = maxRequestID;
1563+ maxConcurrentRequests_ = maxRequestID_ / getRequestIDMultiplier ();
15631564 controlWriteEvent_.signal ();
15641565
15651566 auto deletedToken = cancellationSource_.getToken ();
@@ -1618,6 +1619,9 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) {
16181619 }
16191620 initializeNegotiatedVersion (serverSetup->selectedVersion );
16201621 auto maxRequestID = getMaxRequestIDIfPresent (serverSetup->params );
1622+ if (getDraftMajorVersion (serverSetup->selectedVersion ) < 11 ) {
1623+ nextRequestID_ = 0 ;
1624+ }
16211625 // TODO: clamp egress max auth token cache size
16221626 controlCodec_.setMaxAuthTokenCacheSize (
16231627 getMaxAuthTokenCacheSizeIfPresent (serverSetup->params ));
@@ -1628,7 +1632,8 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) {
16281632 close (SessionCloseErrorCode::VERSION_NEGOTIATION_FAILED);
16291633 return ;
16301634 }
1631- maxRequestID_ = maxConcurrentRequests_ = maxRequestID;
1635+ maxRequestID_ = maxRequestID;
1636+ maxConcurrentRequests_ = maxRequestID_ / getRequestIDMultiplier ();
16321637 setupComplete_ = true ;
16331638 controlWriteEvent_.signal ();
16341639}
@@ -2028,7 +2033,7 @@ void MoQSession::onSubscribe(SubscribeRequest subscribeRequest) {
20282033 XLOG (DBG1) << __func__ << " ftn=" << subscribeRequest.fullTrackName
20292034 << " sess=" << this ;
20302035 const auto requestID = subscribeRequest.requestID ;
2031- if (closeSessionIfRequestIDInvalid (requestID)) {
2036+ if (closeSessionIfRequestIDInvalid (requestID, false , true )) {
20322037 return ;
20332038 }
20342039
@@ -2118,7 +2123,7 @@ void MoQSession::onSubscribeUpdate(SubscribeUpdate subscribeUpdate) {
21182123 XLOG (ERR) << " No matching subscribe ID=" << requestID << " sess=" << this ;
21192124 return ;
21202125 }
2121- if (closeSessionIfRequestIDInvalid (requestID)) {
2126+ if (closeSessionIfRequestIDInvalid (requestID, false , false )) {
21222127 return ;
21232128 }
21242129
@@ -2142,7 +2147,7 @@ void MoQSession::onSubscribeUpdate(SubscribeUpdate subscribeUpdate) {
21422147
21432148void MoQSession::onUnsubscribe (Unsubscribe unsubscribe) {
21442149 XLOG (DBG1) << __func__ << " id=" << unsubscribe.requestID << " sess=" << this ;
2145- if (closeSessionIfRequestIDInvalid (unsubscribe.requestID )) {
2150+ if (closeSessionIfRequestIDInvalid (unsubscribe.requestID , false , false )) {
21462151 return ;
21472152 }
21482153 if (!publishHandler_) {
@@ -2267,7 +2272,7 @@ void MoQSession::onRequestsBlocked(RequestsBlocked requestsBlocked) {
22672272 // Increment the maxRequestID_ by the number of pending closed subscribes
22682273 // and send a new MaxRequestID.
22692274 if (requestsBlocked.maxRequestID >= maxRequestID_ && closedRequests_ > 0 ) {
2270- maxRequestID_ += closedRequests_;
2275+ maxRequestID_ += ( closedRequests_ * getRequestIDMultiplier ()) ;
22712276 closedRequests_ = 0 ;
22722277 sendMaxRequestID (true );
22732278 }
@@ -2280,7 +2285,7 @@ void MoQSession::onFetch(Fetch fetch) {
22802285 : folly::to<std::string>(" joining=" , joining->joiningRequestID .value );
22812286 XLOG (DBG1) << __func__ << " (" << logStr << " ) sess=" << this ;
22822287 const auto requestID = fetch.requestID ;
2283- if (closeSessionIfRequestIDInvalid (requestID)) {
2288+ if (closeSessionIfRequestIDInvalid (requestID, false , true )) {
22842289 return ;
22852290 }
22862291 if (standalone) {
@@ -2384,6 +2389,9 @@ folly::coro::Task<void> MoQSession::handleFetch(
23842389
23852390void MoQSession::onFetchCancel (FetchCancel fetchCancel) {
23862391 XLOG (DBG1) << __func__ << " id=" << fetchCancel.requestID << " sess=" << this ;
2392+ if (closeSessionIfRequestIDInvalid (fetchCancel.requestID , false , false )) {
2393+ return ;
2394+ }
23872395 auto pubTrackIt = pubTracks_.find (fetchCancel.requestID );
23882396 if (pubTrackIt == pubTracks_.end ()) {
23892397 XLOG (DBG4) << " No publish key for fetch id=" << fetchCancel.requestID
@@ -2436,7 +2444,9 @@ void MoQSession::onFetchError(FetchError fetchError) {
24362444void MoQSession::onAnnounce (Announce ann) {
24372445 XLOG (DBG1) << __func__ << " ns=" << ann.trackNamespace << " sess=" << this ;
24382446 if (closeSessionIfRequestIDInvalid (
2439- ann.requestID , getDraftMajorVersion (*getNegotiatedVersion ()) < 11 )) {
2447+ ann.requestID ,
2448+ getDraftMajorVersion (*getNegotiatedVersion ()) < 11 ,
2449+ true )) {
24402450 return ;
24412451 }
24422452 if (!subscribeHandler_) {
@@ -2588,7 +2598,9 @@ void MoQSession::onSubscribeAnnounces(SubscribeAnnounces sa) {
25882598 XLOG (DBG1) << __func__ << " prefix=" << sa.trackNamespacePrefix
25892599 << " sess=" << this ;
25902600 if (closeSessionIfRequestIDInvalid (
2591- sa.requestID , getDraftMajorVersion (*getNegotiatedVersion ()) < 11 )) {
2601+ sa.requestID ,
2602+ getDraftMajorVersion (*getNegotiatedVersion ()) < 11 ,
2603+ true )) {
25922604 return ;
25932605 }
25942606 if (!publishHandler_) {
@@ -2701,7 +2713,8 @@ void MoQSession::onTrackStatusRequest(TrackStatusRequest trackStatusRequest) {
27012713 << " sess=" << this ;
27022714 if (closeSessionIfRequestIDInvalid (
27032715 trackStatusRequest.requestID ,
2704- getDraftMajorVersion (*getNegotiatedVersion ()) < 11 )) {
2716+ getDraftMajorVersion (*getNegotiatedVersion ()) < 11 ,
2717+ true )) {
27052718 return ;
27062719 }
27072720 if (!publishHandler_) {
@@ -3050,7 +3063,9 @@ RequestID MoQSession::getNextRequestID(bool legacyAction) {
30503063 << nextRequestID_ << " peerMaxRequestID_=" << peerMaxRequestID_
30513064 << " sess=" << this ;
30523065 }
3053- return nextRequestID_++;
3066+ auto ret = nextRequestID_;
3067+ nextRequestID_ += getRequestIDMultiplier ();
3068+ return ret;
30543069}
30553070
30563071folly::coro::Task<Publisher::SubscribeResult> MoQSession::subscribe (
@@ -3224,7 +3239,7 @@ void MoQSession::retireRequestID(bool signalWriteLoop) {
32243239 // If # of closed requests is greater than 1/2 of max requests, then
32253240 // let's bump the maxRequestID by the number of closed requests.
32263241 if (++closedRequests_ >= maxConcurrentRequests_ / 2 ) {
3227- maxRequestID_ += closedRequests_;
3242+ maxRequestID_ += ( closedRequests_ * getRequestIDMultiplier ()) ;
32283243 closedRequests_ = 0 ;
32293244 sendMaxRequestID (signalWriteLoop);
32303245 }
@@ -3412,9 +3427,6 @@ void MoQSession::fetchError(const FetchError& fetchErr) {
34123427
34133428void MoQSession::fetchCancel (const FetchCancel& fetchCan) {
34143429 XLOG (DBG1) << __func__ << " sess=" << this ;
3415- if (closeSessionIfRequestIDInvalid (fetchCan.requestID )) {
3416- return ;
3417- }
34183430 auto trackIt = fetches_.find (fetchCan.requestID );
34193431 if (trackIt == fetches_.end ()) {
34203432 XLOG (ERR) << " unknown subscribe ID=" << fetchCan.requestID
@@ -3544,12 +3556,39 @@ void MoQSession::onDatagram(std::unique_ptr<folly::IOBuf> datagram) {
35443556
35453557bool MoQSession::closeSessionIfRequestIDInvalid (
35463558 RequestID requestID,
3547- bool skipCheck) {
3548- if (!skipCheck && maxRequestID_ <= requestID.value ) {
3549- XLOG (ERR) << " Invalid requestID: " << requestID << " sess=" << this ;
3550- close (SessionCloseErrorCode::TOO_MANY_REQUESTS);
3559+ bool skipCheck,
3560+ bool isNewRequest) {
3561+ if (skipCheck) {
3562+ return false ;
3563+ }
3564+
3565+ if (getDraftMajorVersion (*getNegotiatedVersion ()) >= 11 &&
3566+ ((requestID.value % 2 ) == 1 ) !=
3567+ (dir_ == MoQControlCodec::Direction::CLIENT)) {
3568+ XLOG (ERR) << " Invalid requestID parity: " << requestID << " sess=" << this ;
3569+ close (SessionCloseErrorCode::INVALID_REQUEST_ID);
35513570 return true ;
35523571 }
3572+ if (isNewRequest) {
3573+ if (requestID.value >= maxRequestID_) {
3574+ XLOG (ERR) << " Too many requests requestID: " << requestID
3575+ << " sess=" << this ;
3576+ close (SessionCloseErrorCode::TOO_MANY_REQUESTS);
3577+ return true ;
3578+ }
3579+ if (requestID.value != nextExpectedPeerRequestID_) {
3580+ XLOG (ERR) << " Invalid next requestID: " << requestID << " sess=" << this ;
3581+ close (SessionCloseErrorCode::INVALID_REQUEST_ID);
3582+ return true ;
3583+ }
3584+ nextExpectedPeerRequestID_ += getRequestIDMultiplier ();
3585+ } else {
3586+ if (requestID.value >= maxRequestID_) {
3587+ XLOG (ERR) << " Invalid requestID: " << requestID << " sess=" << this ;
3588+ close (SessionCloseErrorCode::INVALID_REQUEST_ID);
3589+ return true ;
3590+ }
3591+ }
35533592 return false ;
35543593}
35553594
0 commit comments