99#include < folly/coro/FutureUtil.h>
1010#include < folly/io/async/EventBase.h>
1111#include < quic/common/CircularDeque.h>
12+ #include < quic/priority/HTTPPriorityQueue.h>
1213#include < moxygen/events/MoQDeliveryTimeoutManager.h>
1314
1415#include < folly/logging/xlog.h>
@@ -19,20 +20,35 @@ namespace {
1920using namespace moxygen ;
2021constexpr uint64_t kMaxSendTokenCacheSize (1024 );
2122
22- constexpr uint32_t IdMask = 0x1FFFFF ;
23+ // Bit allocations for 32-bit order: sub=5, pub=8, group=14, subgroup=5
24+ constexpr uint32_t kSubPriBits = 5 ;
25+ constexpr uint32_t kPubPriBits = 8 ;
26+ constexpr uint32_t kGroupBits = 14 ;
27+ constexpr uint32_t kSubgroupBits = 5 ;
28+
29+ constexpr uint32_t kSubPriMask = (1 << kSubPriBits ) - 1 ; // 0x1F
30+ constexpr uint32_t kPubPriMask = (1 << kPubPriBits ) - 1 ; // 0xFF
31+ constexpr uint32_t kGroupMask = (1 << kGroupBits ) - 1 ; // 0x3FFF
32+ constexpr uint32_t kSubgroupMask = (1 << kSubgroupBits ) - 1 ; // 0x1F
33+
2334uint32_t groupPriorityBits (GroupOrder groupOrder, uint64_t group) {
2435 // If the group order is oldest first, we want to give lower group
2536 // ids a higher precedence. Otherwise, if it is newest first, we want
2637 // to give higher group ids a higher precedence.
27- uint32_t truncGroup = static_cast <uint32_t >(group) & IdMask ;
38+ uint32_t truncGroup = static_cast <uint32_t >(group) & kGroupMask ;
2839 return groupOrder == GroupOrder::OldestFirst ? truncGroup
29- : (IdMask - truncGroup);
40+ : (kGroupMask - truncGroup);
3041}
3142
3243uint32_t subgroupPriorityBits (uint32_t subgroupID) {
33- return static_cast <uint32_t >(subgroupID) & IdMask ;
44+ return static_cast <uint32_t >(subgroupID) & kSubgroupMask ;
3445}
3546
47+ struct StreamPriority {
48+ uint8_t urgency;
49+ uint32_t order;
50+ };
51+
3652/*
3753 * The spec mentions that scheduling should go as per
3854 * the following precedence list:
@@ -45,19 +61,27 @@ uint32_t subgroupPriorityBits(uint32_t subgroupID) {
4561 * priority so that we respect the aforementioned precedence order when we are
4662 * sending objects.
4763 */
48- uint64_t getStreamPriority (
64+ StreamPriority getStreamPriority (
4965 uint64_t groupID,
5066 uint64_t subgroupID,
5167 uint8_t subPri,
5268 uint8_t pubPri,
5369 GroupOrder pubGroupOrder) {
54- // 6 reserved bits | 58 bit order
55- // 6 reserved | 8 sub pri | 8 pub pri | 21 group order | 21 obj order
70+ // 32-bit order: 5 sub pri | 8 pub pri | 14 group order | 5 subgroup
71+ // Urgency: 3 MSB of subPri
72+ uint8_t urgency = subPri >> 5 ;
73+ uint32_t subPriBits = subPri & kSubPriMask ;
74+ uint32_t pubPriBits = pubPri & kPubPriMask ;
5675 uint32_t groupBits = groupPriorityBits (pubGroupOrder, groupID);
5776 uint32_t subgroupBits = subgroupPriorityBits (subgroupID);
58- return (
59- (uint64_t (subPri) << 50 ) | (uint64_t (pubPri) << 42 ) | (groupBits << 21 ) |
60- subgroupBits);
77+ uint32_t order = (subPriBits << (kPubPriBits + kGroupBits + kSubgroupBits )) |
78+ (pubPriBits << (kGroupBits + kSubgroupBits )) |
79+ (groupBits << kSubgroupBits ) | subgroupBits;
80+ // Never use urgency=0, order=0 except for control stream
81+ if (urgency == 0 && order == 0 ) {
82+ order = 1 ;
83+ }
84+ return {urgency, order};
6185}
6286
6387// Helper function to validate priority from application is set.
@@ -951,11 +975,10 @@ StreamPublisherImpl::ensureWriteHandle() {
951975 << " tp=" << this ;
952976 // publisher group order is not known here, but it shouldn't matter
953977 // Currently sets group=0 for FETCH priority bits
978+ auto pri = getStreamPriority (
979+ 0 , 0 , publisher_->subPriority (), 0 , GroupOrder::OldestFirst);
954980 stream.value ()->setPriority (
955- 1 ,
956- getStreamPriority (
957- 0 , 0 , publisher_->subPriority (), 0 , GroupOrder::OldestFirst),
958- false );
981+ quic::HTTPPriorityQueue::Priority (pri.urgency , false , pri.order ));
959982 setWriteHandle (*stream);
960983 return folly::unit;
961984}
@@ -1392,11 +1415,10 @@ MoQSession::TrackPublisherImpl::beginSubgroup(
13921415 XLOG (DBG4) << " New stream created, id: " << stream.value ()->getID ()
13931416 << " tp=" << this ;
13941417 session_->onSubscriptionStreamOpened ();
1418+ auto pri = getStreamPriority (
1419+ groupID, subgroupID, subPriority_, pubPriority, groupOrder_);
13951420 stream.value ()->setPriority (
1396- 1 ,
1397- getStreamPriority (
1398- groupID, subgroupID, subPriority_, pubPriority, groupOrder_),
1399- false );
1421+ quic::HTTPPriorityQueue::Priority (pri.urgency , false , pri.order ));
14001422
14011423 // Get effective timeout to pass to StreamPublisherImpl
14021424 auto effectiveTimeout = deliveryTimeoutManager_.getEffectiveTimeout ();
@@ -2113,7 +2135,8 @@ void MoQSession::start() {
21132135 return ;
21142136 }
21152137 auto controlStream = cs.value ();
2116- controlStream.writeHandle ->setPriority (0 , 0 , false );
2138+ controlStream.writeHandle ->setPriority (
2139+ quic::HTTPPriorityQueue::Priority (0 , false , 0 ));
21172140
21182141 if (logger_) {
21192142 logger_->logStreamTypeSet (
@@ -4828,7 +4851,7 @@ void MoQSession::onNewBidiStream(
48284851 bh.readHandle ->getID (), MOQTStreamType::CONTROL, Owner::REMOTE);
48294852 }
48304853
4831- bh.writeHandle ->setPriority (0 , 0 , false );
4854+ bh.writeHandle ->setPriority (quic::HTTPPriorityQueue::Priority ( 0 , false , 0 ) );
48324855 co_withExecutor (
48334856 exec_.get (),
48344857 co_withCancellation (
0 commit comments