Skip to content

Commit 42faf08

Browse files
afrindfacebook-github-bot
authored andcommitted
Use Consumer interface for MoQSession reads (facebookexperimental#13)
Summary: This is the second half of the MoQSession rewrite. subscribe and fetch callers now supply a Consumer which the library drives as a callback. To make the consumer API work required changing the codec callbacks. The relay now connects a Forwarder (Consumer) to the upstream subscription directly. Differential Revision: D66881617
1 parent b21fed2 commit 42faf08

15 files changed

+1067
-589
lines changed

moxygen/MoQClient.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ void MoQClient::HTTPHandler::onError(
190190
void MoQClient::onSessionEnd(folly::Optional<uint32_t>) {
191191
// TODO: cleanup?
192192
XLOG(DBG1) << "resetting moqSession_";
193-
moqSession_.reset();
193+
auto moqSession = std::move(moqSession_);
194+
moqSession.reset();
194195
CHECK(!moqSession_);
195196
}
196197

moxygen/MoQCodec.cpp

+80-46
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,13 @@ void MoQCodec::onIngressEnd(
103103

104104
void MoQObjectStreamCodec::onIngress(
105105
std::unique_ptr<folly::IOBuf> data,
106-
bool eom) {
106+
bool endOfStream) {
107107
onIngressStart(std::move(data));
108108
folly::io::Cursor cursor(ingress_.front());
109+
bool isFetch = std::get_if<SubscribeID>(&curObjectHeader_.trackIdentifier);
109110
while (!connError_ &&
110111
((ingress_.chainLength() > 0 && !cursor.isAtEnd())/* ||
111-
(eom && parseState_ == ParseState::OBJECT_PAYLOAD_NO_LENGTH)*/)) {
112+
(endOfStream && parseState_ == ParseState::OBJECT_PAYLOAD_NO_LENGTH)*/)) {
112113
switch (parseState_) {
113114
case ParseState::STREAM_HEADER_TYPE: {
114115
auto newCursor = cursor;
@@ -146,7 +147,12 @@ void MoQObjectStreamCodec::onIngress(
146147
connError_ = res.error();
147148
break;
148149
}
149-
curObjectHeader_.trackIdentifier = SubscribeID(res.value());
150+
auto subscribeID = SubscribeID(res.value());
151+
curObjectHeader_.trackIdentifier = subscribeID;
152+
isFetch = true;
153+
if (callback_) {
154+
callback_->onFetchHeader(subscribeID);
155+
}
150156
parseState_ = ParseState::MULTI_OBJECT_HEADER;
151157
cursor = newCursor;
152158
break;
@@ -160,6 +166,16 @@ void MoQObjectStreamCodec::onIngress(
160166
break;
161167
}
162168
curObjectHeader_ = res.value();
169+
auto trackAlias =
170+
std::get_if<TrackAlias>(&curObjectHeader_.trackIdentifier);
171+
XCHECK(trackAlias);
172+
if (callback_) {
173+
callback_->onSubgroup(
174+
*trackAlias,
175+
curObjectHeader_.group,
176+
curObjectHeader_.subgroup,
177+
curObjectHeader_.priority);
178+
}
163179
parseState_ = ParseState::MULTI_OBJECT_HEADER;
164180
cursor = newCursor;
165181
[[fallthrough]];
@@ -174,84 +190,102 @@ void MoQObjectStreamCodec::onIngress(
174190
break;
175191
}
176192
curObjectHeader_ = res.value();
177-
if (callback_) {
178-
callback_->onObjectHeader(std::move(res.value()));
179-
}
180193
cursor = newCursor;
181194
if (curObjectHeader_.status == ObjectStatus::NORMAL) {
182-
parseState_ = ParseState::OBJECT_PAYLOAD;
195+
XLOG(DBG2) << "Parsing object with length, need="
196+
<< *curObjectHeader_.length
197+
<< " have=" << cursor.totalLength();
198+
std::unique_ptr<folly::IOBuf> payload;
199+
auto chunkLen = cursor.cloneAtMost(payload, *curObjectHeader_.length);
200+
auto endOfObject = chunkLen == *curObjectHeader_.length;
201+
if (endOfStream && !endOfObject) {
202+
connError_ = ErrorCode::PARSE_ERROR;
203+
XLOG(DBG4) << __func__ << " " << uint32_t(*connError_);
204+
break;
205+
}
206+
if (callback_) {
207+
callback_->onObjectBegin(
208+
curObjectHeader_.group,
209+
curObjectHeader_.subgroup,
210+
curObjectHeader_.id,
211+
*curObjectHeader_.length,
212+
std::move(payload),
213+
endOfObject,
214+
endOfStream && cursor.isAtEnd());
215+
}
216+
*curObjectHeader_.length -= chunkLen;
217+
if (endOfObject) {
218+
if (endOfStream && cursor.isAtEnd()) {
219+
parseState_ = ParseState::STREAM_FIN_DELIVERED;
220+
} else {
221+
parseState_ = ParseState::MULTI_OBJECT_HEADER;
222+
}
223+
break;
224+
} else {
225+
parseState_ = ParseState::OBJECT_PAYLOAD;
226+
}
183227
} else {
184-
parseState_ = ParseState::MULTI_OBJECT_HEADER;
228+
if (callback_) {
229+
callback_->onObjectStatus(
230+
curObjectHeader_.group,
231+
curObjectHeader_.subgroup,
232+
curObjectHeader_.id,
233+
curObjectHeader_.status);
234+
}
235+
if (curObjectHeader_.status == ObjectStatus::END_OF_TRACK_AND_GROUP ||
236+
(!isFetch &&
237+
curObjectHeader_.status == ObjectStatus::END_OF_GROUP)) {
238+
parseState_ = ParseState::STREAM_FIN_DELIVERED;
239+
} else {
240+
parseState_ = ParseState::MULTI_OBJECT_HEADER;
241+
}
185242
break;
186243
}
187244
[[fallthrough]];
188245
}
189246
case ParseState::OBJECT_PAYLOAD: {
190-
auto newCursor = cursor;
191247
// need to check for bufLen == 0?
192248
std::unique_ptr<folly::IOBuf> payload;
193249
// TODO: skip clone and do split
194250
uint64_t chunkLen = 0;
195251
XCHECK(curObjectHeader_.length);
196252
XLOG(DBG2) << "Parsing object with length, need="
197253
<< *curObjectHeader_.length;
198-
if (ingress_.chainLength() > 0 && newCursor.canAdvance(1)) {
199-
chunkLen = newCursor.cloneAtMost(payload, *curObjectHeader_.length);
254+
if (ingress_.chainLength() > 0 && cursor.canAdvance(1)) {
255+
chunkLen = cursor.cloneAtMost(payload, *curObjectHeader_.length);
200256
}
201257
*curObjectHeader_.length -= chunkLen;
202-
if (eom && *curObjectHeader_.length != 0) {
258+
if (endOfStream && *curObjectHeader_.length != 0) {
203259
connError_ = ErrorCode::PARSE_ERROR;
204260
XLOG(DBG4) << __func__ << " " << uint32_t(*connError_);
205261
break;
206262
}
207263
bool endOfObject = (*curObjectHeader_.length == 0);
208264
if (callback_ && (payload || endOfObject)) {
209-
callback_->onObjectPayload(
210-
curObjectHeader_.trackIdentifier,
211-
curObjectHeader_.group,
212-
curObjectHeader_.id,
213-
std::move(payload),
214-
endOfObject);
265+
callback_->onObjectPayload(std::move(payload), endOfObject);
215266
}
216267
if (endOfObject) {
217268
parseState_ = ParseState::MULTI_OBJECT_HEADER;
218269
}
219-
cursor = newCursor;
220270
break;
221271
}
222-
#if 0
223-
// This code is no longer reachable, but I'm leaving it here in case
224-
// the wire format changes back
225-
case ParseState::OBJECT_PAYLOAD_NO_LENGTH: {
226-
auto newCursor = cursor;
227-
// need to check for bufLen == 0?
228-
std::unique_ptr<folly::IOBuf> payload;
229-
// TODO: skip clone and do split
230-
if (ingress_.chainLength() > 0 && newCursor.canAdvance(1)) {
231-
newCursor.cloneAtMost(payload, std::numeric_limits<uint64_t>::max());
232-
}
233-
XCHECK(!curObjectHeader_.length);
234-
if (callback_ && (payload || eom)) {
235-
callback_->onObjectPayload(
236-
curObjectHeader_.trackIdentifier,
237-
curObjectHeader_.group,
238-
curObjectHeader_.id,
239-
std::move(payload),
240-
eom);
241-
}
242-
if (eom) {
243-
parseState_ = ParseState::FRAME_HEADER_TYPE;
244-
}
245-
cursor = newCursor;
272+
case ParseState::STREAM_FIN_DELIVERED: {
273+
XLOG(DBG2) << "Bytes=" << cursor.totalLength()
274+
<< " remaining in STREAM_FIN_DELIVERED";
275+
connError_ = ErrorCode::PARSE_ERROR;
276+
break;
246277
}
247-
#endif
248278
}
249279
}
250280
size_t remainingLength = 0;
251-
if (!eom && !cursor.isAtEnd()) {
281+
if (!endOfStream && !cursor.isAtEnd()) {
252282
remainingLength = cursor.totalLength(); // must be less than 1 message
253283
}
254-
onIngressEnd(remainingLength, eom, callback_);
284+
if (endOfStream && parseState_ != ParseState::STREAM_FIN_DELIVERED &&
285+
!connError_ && callback_) {
286+
callback_->onEndOfStream();
287+
}
288+
onIngressEnd(remainingLength, endOfStream, callback_);
255289
}
256290

257291
folly::Expected<folly::Unit, ErrorCode> MoQControlCodec::parseFrame(

moxygen/MoQCodec.h

+22-13
Original file line numberDiff line numberDiff line change
@@ -141,15 +141,27 @@ class MoQObjectStreamCodec : public MoQCodec {
141141
public:
142142
~ObjectCallback() override = default;
143143

144-
virtual void onFetchHeader(uint64_t subscribeID) = 0;
145-
virtual void onObjectHeader(ObjectHeader objectHeader) = 0;
146-
147-
virtual void onObjectPayload(
148-
TrackIdentifier trackIdentifier,
149-
uint64_t groupID,
150-
uint64_t id,
151-
std::unique_ptr<folly::IOBuf> payload,
152-
bool eom) = 0;
144+
virtual void onFetchHeader(SubscribeID subscribeID) = 0;
145+
virtual void onSubgroup(
146+
TrackAlias alias,
147+
uint64_t group,
148+
uint64_t subgroup,
149+
uint8_t priority) = 0;
150+
virtual void onObjectBegin(
151+
uint64_t group,
152+
uint64_t subgroup,
153+
uint64_t objectID,
154+
uint64_t length,
155+
Payload initialPayload,
156+
bool objectComplete,
157+
bool subgroupComplete) = 0;
158+
virtual void onObjectStatus(
159+
uint64_t group,
160+
uint64_t subgroup,
161+
uint64_t objectID,
162+
ObjectStatus status) = 0;
163+
virtual void onObjectPayload(Payload payload, bool objectComplete) = 0;
164+
virtual void onEndOfStream() = 0;
153165
};
154166

155167
MoQObjectStreamCodec(ObjectCallback* callback) : callback_(callback) {}
@@ -160,17 +172,14 @@ class MoQObjectStreamCodec : public MoQCodec {
160172

161173
void onIngress(std::unique_ptr<folly::IOBuf> data, bool eom) override;
162174

163-
TrackIdentifier getTrackIdentifier() const {
164-
return curObjectHeader_.trackIdentifier;
165-
}
166-
167175
private:
168176
enum class ParseState {
169177
STREAM_HEADER_TYPE,
170178
OBJECT_STREAM,
171179
FETCH_HEADER,
172180
MULTI_OBJECT_HEADER,
173181
OBJECT_PAYLOAD,
182+
STREAM_FIN_DELIVERED,
174183
// OBJECT_PAYLOAD_NO_LENGTH
175184
};
176185
ParseState parseState_{ParseState::STREAM_HEADER_TYPE};

moxygen/MoQServer.cpp

-6
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,6 @@ void MoQServer::ControlVisitor::operator()(Fetch fetch) const {
9292
XLOG(INFO) << "Fetch id=" << fetch.subscribeID;
9393
}
9494

95-
void MoQServer::ControlVisitor::operator()(SubscribeDone subscribeDone) const {
96-
XLOG(INFO) << "SubscribeDone id=" << subscribeDone.subscribeID
97-
<< " code=" << folly::to_underlying(subscribeDone.statusCode)
98-
<< " reason=" << subscribeDone.reasonPhrase;
99-
}
100-
10195
void MoQServer::ControlVisitor::operator()(Unsubscribe unsubscribe) const {
10296
XLOG(INFO) << "Unsubscribe id=" << unsubscribe.subscribeID;
10397
}

moxygen/MoQServer.h

-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ class MoQServer : public MoQSession::ServerSetupCallback {
4343
void operator()(AnnounceCancel announceCancel) const override;
4444
void operator()(SubscribeAnnounces subscribeAnnounces) const override;
4545
void operator()(UnsubscribeAnnounces unsubscribeAnnounces) const override;
46-
void operator()(SubscribeDone subscribeDone) const override;
4746
void operator()(Unsubscribe unsubscribe) const override;
4847
void operator()(TrackStatusRequest trackStatusRequest) const override;
4948
void operator()(TrackStatus trackStatus) const override;

0 commit comments

Comments
 (0)