Skip to content

Commit fb5d547

Browse files
Aman Sharmameta-codesync[bot]
authored andcommitted
Add allDataReceived callback to ObjectReceiver
Summary: The `allDataReceived` callback that I'm adding in is going to be called when: 1. SUBSCRIBE_DONE has been received 2. All subgroups have been closed Reviewed By: afrind Differential Revision: D88871876 fbshipit-source-id: abf9feebe7b9db4a6c9cc0735fc3ddd26d3639c3
1 parent ac71e30 commit fb5d547

File tree

2 files changed

+150
-2
lines changed

2 files changed

+150
-2
lines changed

moxygen/ObjectReceiver.h

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,21 @@ class ObjectReceiverCallback {
2626
virtual void onEndOfStream() = 0;
2727
virtual void onError(ResetStreamErrorCode) = 0;
2828
virtual void onSubscribeDone(SubscribeDone done) = 0;
29+
// Called when SUBSCRIBE_DONE has arrived AND all outstanding subgroup
30+
// streams have closed. Only fires for subscriptions, not fetches.
31+
virtual void onAllDataReceived() {}
2932
};
3033

34+
class ObjectReceiver;
35+
3136
class ObjectSubgroupReceiver : public SubgroupConsumer {
3237
std::shared_ptr<ObjectReceiverCallback> callback_{nullptr};
38+
std::shared_ptr<ObjectReceiver> parent_{nullptr};
3339
StreamType streamType_;
3440
ObjectHeader header_;
3541
folly::IOBufQueue payload_{folly::IOBufQueue::cacheChainLength()};
3642
folly::Optional<TrackAlias> trackAlias_;
43+
bool finished_{false};
3744

3845
public:
3946
explicit ObjectSubgroupReceiver(
@@ -47,6 +54,10 @@ class ObjectSubgroupReceiver : public SubgroupConsumer {
4754
header_(groupID, subgroupID, 0, priority),
4855
trackAlias_(trackAlias) {}
4956

57+
void setParent(std::shared_ptr<ObjectReceiver> parent) {
58+
parent_ = std::move(parent);
59+
}
60+
5061
void setFetchGroupAndSubgroup(uint64_t groupID, uint64_t subgroupID) {
5162
streamType_ = StreamType::FETCH_HEADER;
5263
header_.group = groupID;
@@ -138,18 +149,29 @@ class ObjectSubgroupReceiver : public SubgroupConsumer {
138149

139150
folly::Expected<folly::Unit, MoQPublishError> endOfSubgroup() override {
140151
callback_->onEndOfStream();
152+
notifyParentFinished();
141153
return folly::unit;
142154
}
143155

144156
void reset(ResetStreamErrorCode error) override {
145157
callback_->onError(error);
158+
notifyParentFinished();
146159
}
160+
161+
private:
162+
void notifyParentFinished();
147163
};
148164

149-
class ObjectReceiver : public TrackConsumer, public FetchConsumer {
165+
class ObjectReceiver : public TrackConsumer,
166+
public FetchConsumer,
167+
public std::enable_shared_from_this<ObjectReceiver> {
150168
std::shared_ptr<ObjectReceiverCallback> callback_{nullptr};
151169
folly::Optional<ObjectSubgroupReceiver> fetchPublisher_;
152170
folly::Optional<TrackAlias> trackAlias_;
171+
// Tracking for onAllDataReceived callback (subscription mode only)
172+
size_t openSubgroups_{0};
173+
bool subscribeDoneDelivered_{false};
174+
bool allDataCallbackSent_{false};
153175

154176
public:
155177
enum Type { SUBSCRIBE, FETCH };
@@ -171,8 +193,30 @@ class ObjectReceiver : public TrackConsumer, public FetchConsumer {
171193
folly::Expected<std::shared_ptr<SubgroupConsumer>, MoQPublishError>
172194
beginSubgroup(uint64_t groupID, uint64_t subgroupID, Priority priority)
173195
override {
174-
return std::make_shared<ObjectSubgroupReceiver>(
196+
++openSubgroups_;
197+
auto receiver = std::make_shared<ObjectSubgroupReceiver>(
175198
callback_, trackAlias_, groupID, subgroupID, priority);
199+
receiver->setParent(shared_from_this());
200+
return receiver;
201+
}
202+
203+
// Called when a subgroup stream finishes (via endOfSubgroup or reset)
204+
void onSubgroupFinished() {
205+
if (openSubgroups_ > 0) {
206+
--openSubgroups_;
207+
}
208+
maybeFireAllDataReceived();
209+
}
210+
211+
// Fire onAllDataReceived callback once when both conditions are met:
212+
// 1. SUBSCRIBE_DONE has been received
213+
// 2. All subgroup streams have closed
214+
void maybeFireAllDataReceived() {
215+
if (!allDataCallbackSent_ && subscribeDoneDelivered_ &&
216+
openSubgroups_ == 0) {
217+
allDataCallbackSent_ = true;
218+
callback_->onAllDataReceived();
219+
}
176220
}
177221

178222
folly::Expected<folly::SemiFuture<folly::Unit>, MoQPublishError>
@@ -222,6 +266,8 @@ class ObjectReceiver : public TrackConsumer, public FetchConsumer {
222266
folly::Expected<folly::Unit, MoQPublishError> subscribeDone(
223267
SubscribeDone subDone) override {
224268
callback_->onSubscribeDone(std::move(subDone));
269+
subscribeDoneDelivered_ = true;
270+
maybeFireAllDataReceived();
225271
return folly::unit;
226272
}
227273

@@ -311,4 +357,13 @@ class ObjectReceiver : public TrackConsumer, public FetchConsumer {
311357
}
312358
};
313359

360+
// Definition of ObjectSubgroupReceiver::notifyParentFinished() - needs full
361+
// ObjectReceiver definition
362+
inline void ObjectSubgroupReceiver::notifyParentFinished() {
363+
if (parent_ && !finished_) {
364+
finished_ = true;
365+
parent_->onSubgroupFinished();
366+
}
367+
}
368+
314369
} // namespace moxygen
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* This source code is licensed under the MIT license found in the
4+
* LICENSE file in the root directory of this source tree.
5+
*/
6+
7+
#include <folly/portability/GMock.h>
8+
#include <folly/portability/GTest.h>
9+
#include <moxygen/ObjectReceiver.h>
10+
11+
using namespace moxygen;
12+
using namespace testing;
13+
14+
namespace {
15+
16+
class MockObjectReceiverCallback : public ObjectReceiverCallback {
17+
public:
18+
MOCK_METHOD(
19+
FlowControlState,
20+
onObject,
21+
(folly::Optional<TrackAlias>, const ObjectHeader&, Payload),
22+
(override));
23+
MOCK_METHOD(
24+
void,
25+
onObjectStatus,
26+
(folly::Optional<TrackAlias>, const ObjectHeader&),
27+
(override));
28+
MOCK_METHOD(void, onEndOfStream, (), (override));
29+
MOCK_METHOD(void, onError, (ResetStreamErrorCode), (override));
30+
MOCK_METHOD(void, onSubscribeDone, (SubscribeDone), (override));
31+
MOCK_METHOD(void, onAllDataReceived, (), (override));
32+
};
33+
34+
Payload makePayload(const std::string& str) {
35+
return folly::IOBuf::copyBuffer(str);
36+
}
37+
38+
} // namespace
39+
40+
class ObjectReceiverTest : public Test {
41+
protected:
42+
void SetUp() override {
43+
callback_ = std::make_shared<MockObjectReceiverCallback>();
44+
}
45+
46+
std::shared_ptr<MockObjectReceiverCallback> callback_;
47+
};
48+
49+
TEST_F(ObjectReceiverTest, SubscribeDoneDelivery) {
50+
auto receiver = std::make_shared<ObjectReceiver>(
51+
ObjectReceiver::Type::SUBSCRIBE, callback_);
52+
53+
SubscribeDone done;
54+
done.requestID = RequestID(1);
55+
done.statusCode = SubscribeDoneStatusCode::SUBSCRIPTION_ENDED;
56+
57+
EXPECT_CALL(*callback_, onSubscribeDone(_)).Times(1);
58+
// onAllDataReceived should be called since no subgroups are open
59+
EXPECT_CALL(*callback_, onAllDataReceived()).Times(1);
60+
61+
auto result = receiver->subscribeDone(std::move(done));
62+
EXPECT_TRUE(result.hasValue());
63+
}
64+
65+
TEST_F(ObjectReceiverTest, AllDataReceivedAfterSubgroupClose) {
66+
auto receiver = std::make_shared<ObjectReceiver>(
67+
ObjectReceiver::Type::SUBSCRIBE, callback_);
68+
receiver->setTrackAlias(TrackAlias(1));
69+
70+
// Start a subgroup
71+
EXPECT_CALL(*callback_, onEndOfStream()).Times(1);
72+
auto subgroupResult =
73+
receiver->beginSubgroup(/*groupID=*/0, /*subgroupID=*/0, /*priority=*/0);
74+
ASSERT_TRUE(subgroupResult.hasValue());
75+
auto subgroup = *subgroupResult;
76+
77+
// Deliver subscribeDone while subgroup is open
78+
SubscribeDone done;
79+
done.requestID = RequestID(1);
80+
done.statusCode = SubscribeDoneStatusCode::SUBSCRIPTION_ENDED;
81+
82+
EXPECT_CALL(*callback_, onSubscribeDone(_)).Times(1);
83+
// onAllDataReceived should NOT be called yet because subgroup is open
84+
EXPECT_CALL(*callback_, onAllDataReceived()).Times(0);
85+
86+
auto subDoneResult = receiver->subscribeDone(std::move(done));
87+
EXPECT_TRUE(subDoneResult.hasValue());
88+
89+
// Now close the subgroup - onAllDataReceived should fire
90+
EXPECT_CALL(*callback_, onAllDataReceived()).Times(1);
91+
auto endResult = subgroup->endOfSubgroup();
92+
EXPECT_TRUE(endResult.hasValue());
93+
}

0 commit comments

Comments
 (0)