Skip to content

Commit 8b932e6

Browse files
Sujay Patelfacebook-github-bot
authored andcommitted
Full Publisher For Forwarding Preference One
Summary: Fully implemented Subscribe call for MoQTestServer for TrackNamespaces that have a Forwarding Preference of 1. Reviewed By: sharmafb Differential Revision: D75614231 fbshipit-source-id: c1bba81dccd9bc7a29b467b4da8f934b2d1a9875
1 parent 214ed61 commit 8b932e6

File tree

3 files changed

+113
-8
lines changed

3 files changed

+113
-8
lines changed

moxygen/moqtest/MoQTestServer.cpp

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,13 @@ folly::coro::Task<void> MoQTestServer::onSubscribe(
7070
// Switch based on forwarding preference
7171
switch (params.forwardingPreference) {
7272
case (ForwardingPreference::ONE_SUBGROUP_PER_GROUP): {
73-
co_await sendObjectsForForwardPreferenceZero(params, callback);
73+
co_await sendOneSubgroupPerGroup(params, callback);
74+
break;
75+
}
76+
77+
case (ForwardingPreference::ONE_SUBGROUP_PER_OBJECT): {
78+
co_await sendOneSubgroupPerObject(params, callback);
79+
7480
break;
7581
}
7682

@@ -89,7 +95,7 @@ folly::coro::Task<void> MoQTestServer::onSubscribe(
8995
co_return;
9096
}
9197

92-
folly::coro::Task<void> MoQTestServer::sendObjectsForForwardPreferenceZero(
98+
folly::coro::Task<void> MoQTestServer::sendOneSubgroupPerGroup(
9399
MoQTestParameters params,
94100
std::shared_ptr<TrackConsumer> callback) {
95101
// Iterate through Groups
@@ -136,4 +142,51 @@ folly::coro::Task<void> MoQTestServer::sendObjectsForForwardPreferenceZero(
136142
}
137143
}
138144

145+
folly::coro::Task<void> MoQTestServer::sendOneSubgroupPerObject(
146+
MoQTestParameters params,
147+
std::shared_ptr<TrackConsumer> callback) {
148+
// Iterate through Objects
149+
for (int groupNum = params.startGroup; groupNum <= params.lastGroupInTrack;
150+
groupNum += params.groupIncrement) {
151+
// Iterate Through Objects in SubGroup
152+
for (int objectId = params.startObject;
153+
objectId <= params.lastObjectInTrack;
154+
objectId += params.objectIncrement) {
155+
// Begin a New Subgroup per object (Default Priority)
156+
auto maybeSubConsumer =
157+
callback->beginSubgroup(groupNum, objectId, kDefaultPriority);
158+
auto subConsumer = maybeSubConsumer->get();
159+
// Find Object Size
160+
int objectSize = moxygen::getObjectSize(objectId, &params);
161+
162+
// Add Integer/Variable Extensions if needed
163+
std::vector<Extension> extensions = getExtensions(
164+
params.testIntegerExtension, params.testVariableExtension);
165+
166+
// If there are send end of group markers and j == lastObjectID, send
167+
// the end of group
168+
if (objectId < params.lastObjectInTrack ||
169+
!params.sendEndOfGroupMarkers) {
170+
// Begin Delivering Object With Payload
171+
std::string p = std::string(objectSize, 't');
172+
auto objectPayload = folly::IOBuf::copyBuffer(p);
173+
subConsumer->object(
174+
objectId, std::move(objectPayload), extensions, false);
175+
} else {
176+
subConsumer->endOfGroup(objectId);
177+
}
178+
179+
// If SubGroup Hasn't Been Ended Already
180+
if (!params.sendEndOfGroupMarkers) {
181+
subConsumer->endOfSubgroup();
182+
}
183+
184+
// Set Delay Based on Object Frequency
185+
co_await folly::coro::sleep(
186+
std::chrono::milliseconds(params.objectFrequency));
187+
}
188+
}
189+
co_return;
190+
}
191+
139192
} // namespace moxygen

moxygen/moqtest/MoQTestServer.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ class MoQTestServer : public moxygen::Publisher,
3939
SubscribeRequest sub,
4040
std::shared_ptr<TrackConsumer> callback);
4141

42-
folly::coro::Task<void> sendObjectsForForwardPreferenceZero(
42+
folly::coro::Task<void> sendOneSubgroupPerGroup(
43+
MoQTestParameters params,
44+
std::shared_ptr<TrackConsumer> callback);
45+
46+
folly::coro::Task<void> sendOneSubgroupPerObject(
4347
MoQTestParameters params,
4448
std::shared_ptr<TrackConsumer> callback);
4549

moxygen/test/MoQTrackServerTest.cpp

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class MoQTrackServerTest : public testing::Test {
5959

6060
} // namespace
6161

62-
// Subscription Testing - ForwardPreference=0
62+
// Subscription Testing
6363
TEST_F(MoQTrackServerTest, ValidateSubscribeWithForwardPreferenceZero) {
6464
MoQTrackServerTest::CreateDefaultMoQTestParameters();
6565

@@ -102,8 +102,7 @@ TEST_F(MoQTrackServerTest, ValidateSubscribeWithForwardPreferenceZero) {
102102
}
103103

104104
// Call the onSubscribe method
105-
auto task =
106-
server_.sendObjectsForForwardPreferenceZero(params_, mockConsumer);
105+
auto task = server_.sendOneSubgroupPerGroup(params_, mockConsumer);
107106

108107
// Wait for the coroutine to complete
109108
folly::coro::blockingWait(std::move(task));
@@ -176,8 +175,57 @@ TEST_F(
176175
}
177176

178177
// Call the onSubscribe method
179-
auto task =
180-
server_.sendObjectsForForwardPreferenceZero(params_, mockConsumer);
178+
auto task = server_.sendOneSubgroupPerGroup(params_, mockConsumer);
179+
180+
// Wait for the coroutine to complete
181+
folly::coro::blockingWait(std::move(task));
182+
}
183+
184+
TEST_F(MoQTrackServerTest, ValidateSubscribeWithForwardPreferenceOne) {
185+
MoQTrackServerTest::CreateDefaultMoQTestParameters();
186+
params_.forwardingPreference = moxygen::ForwardingPreference(1);
187+
188+
// Create a mock track consumer
189+
auto mockConsumer = std::make_shared<moxygen::MockTrackConsumer>();
190+
191+
// Create a mock subgroup consumer
192+
auto mockSubgroupConsumer = std::make_shared<moxygen::MockSubgroupConsumer>();
193+
194+
// Set expectations for beginSubgroup
195+
for (int groupId = 0; groupId <= 10; groupId++) {
196+
for (int objectId = 0; objectId <= params_.lastObjectInTrack; objectId++) {
197+
// Set expectations for beginObject
198+
int objectSize = moxygen::getObjectSize(objectId, &params_);
199+
// Create a mock subgroup consumer
200+
auto mockSubgroupConsumer =
201+
std::make_shared<moxygen::MockSubgroupConsumer>();
202+
EXPECT_CALL(*mockConsumer, beginSubgroup(groupId, objectId, testing::_))
203+
.Times(1)
204+
.WillOnce(testing::Return(mockSubgroupConsumer));
205+
EXPECT_CALL(
206+
*mockSubgroupConsumer,
207+
object(objectId, testing::_, testing::_, testing::_))
208+
.Times(1)
209+
.WillOnce(testing::Invoke(
210+
[objectSize](
211+
auto, std::unique_ptr<folly::IOBuf> payload, auto, auto) {
212+
int payloadLength = (*payload).length();
213+
EXPECT_EQ(payloadLength, objectSize);
214+
return folly::Expected<folly::Unit, moxygen::MoQPublishError>(
215+
{});
216+
}))
217+
.WillOnce(::testing::Return(
218+
folly::Expected<folly::Unit, moxygen::MoQPublishError>({})));
219+
// Set expectations for endOfSubgroup
220+
EXPECT_CALL(*mockSubgroupConsumer, endOfSubgroup())
221+
.Times(1)
222+
.WillOnce(::testing::Return(
223+
folly::Expected<folly::Unit, moxygen::MoQPublishError>({})));
224+
}
225+
}
226+
227+
// Call the onSubscribe method
228+
auto task = server_.sendOneSubgroupPerObject(params_, mockConsumer);
181229

182230
// Wait for the coroutine to complete
183231
folly::coro::blockingWait(std::move(task));

0 commit comments

Comments
 (0)