Skip to content

Commit 6ca92b2

Browse files
jordicenzanofacebook-github-bot
authored andcommitted
Changes to MoQFlvStreamerClient to work with refectored MoQSession + MoQFlvReceiverClient (to test)
Summary: 1- Audio track works (group/subg per obj)!!!! 2- Video track crashes the server (different err than before). See below: ``` ... V1213 09:08:36.445933 925420 MoQSession.cpp:1134] subscribeOk sess=0x51a000013900 V1213 09:08:36.446077 925420 MoQLocation.h:24] m=2 V1213 09:08:36.446148 925420 MoQLocation.h:55] g=0 o=0g=18446744073709551615 o=18446744073709551615 V1213 09:08:37.516494 925420 MoQSession.cpp:1953] onNewUniStream sess=0x51a000011b00 V1213 09:08:37.516966 925420 MoQSession.cpp:643] unidirectionalReadLoop sess=0x51a000011b00 F1213 09:08:37.548959 925420 MoQForwarder.h:270] Check failed: subgroupConsumerIt != sub->subgroups.end() *** Check failure stack trace: *** *** Aborted at 1734109717 (Unix time, try 'date -d 1734109717') *** *** Signal 6 (SIGABRT) (0x3113a000e1e3c) received by PID 925244 (pthread TID 0x7ff6e3600640) (linux TID 925420) (maybe from PID 925244, UID 201018) (code: -6), stack trace: *** ================================================================= ==925244==ERROR: AddressSanitizer: stack-use-after-return on address 0x7ff6e2403a50 at pc 0x7ff6eb667c02 bp 0x7ff6e8e90010 sp 0x7ff6e8e90008 READ of size 8 at 0x7ff6e2403a50 thread T1 SCARINESS: 61 (8-byte-read-stack-use-after-return) #0 0x7ff6eb667c01 in folly::AsyncStackRoot::getStackFramePointer() const fbcode/folly/tracing/AsyncStack-inl.h:135 #1 0x7ff6eb6677d6 in folly::symbolizer::getAsyncStackTraceSafe(unsigned long*, unsigned long) fbcode/folly/debugging/symbolizer/StackTrace.cpp:311 facebookexperimental#2 0x7ff6ed483896 in folly::symbolizer::SafeStackTracePrinter::printStackTrace(bool) fbcode/folly/debugging/symbolizer/Symbolizer.h:91 facebookexperimental#3 0x7ff6ef0b0607 in folly::symbolizer::(anonymous namespace)::innerSignalHandler(int, siginfo_t*, void*) fbcode/folly/debugging/symbolizer/SignalHandler.cpp:453 facebookexperimental#4 0x7ff6ef0aed81 in folly::symbolizer::(anonymous namespace)::signalHandler(int, siginfo_t*, void*) fbcode/folly/debugging/symbolizer/SignalHandler.cpp:474 ... ``` Differential Revision: D66176642
1 parent 42faf08 commit 6ca92b2

File tree

3 files changed

+359
-98
lines changed

3 files changed

+359
-98
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
# MoQFlvReceiverClient
8+
add_executable(
9+
moqflvreceiverclient
10+
MoQFlvReceiverClient.cpp
11+
)
12+
set_target_properties(
13+
moqflvreceiverclient
14+
PROPERTIES
15+
BUILD_RPATH ${DEPS_LIBRARIES_DIR}
16+
INSTALL_RPATH ${DEPS_LIBRARIES_DIR}
17+
)
18+
target_include_directories(
19+
moqflvreceiverclient PUBLIC $<BUILD_INTERFACE:${MOXYGEN_FBCODE_ROOT}>
20+
)
21+
target_compile_options(
22+
moqflvreceiverclient PRIVATE
23+
${_MOXYGEN_COMMON_COMPILE_OPTIONS}
24+
)
25+
target_link_libraries(
26+
moqflvreceiverclient PUBLIC
27+
Folly::folly
28+
moxygen
29+
)
30+
31+
install(
32+
TARGETS moqflvreceiverclient
33+
EXPORT moxygen-exports
34+
ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
35+
LIBRARY DESTINATION ${LIB_INSTALL_DIR}
36+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* This source code is licensed under the MIT license found in the
4+
* LICENSE file in the root directory of this source tree.
5+
*/
6+
7+
#include <folly/portability/GFlags.h>
8+
#include "moxygen/MoQClient.h"
9+
#include "moxygen/ObjectReceiver.h"
10+
11+
#include <folly/init/Init.h>
12+
#include <folly/io/async/AsyncSignalHandler.h>
13+
#include <signal.h>
14+
15+
DEFINE_string(
16+
connect_url,
17+
"https://localhost:4433/moq",
18+
"URL for webtransport server");
19+
DEFINE_string(track_namespace, "flvstreamer", "Track Namespace");
20+
DEFINE_string(track_namespace_delimiter, "/", "Track Namespace Delimiter");
21+
// TODO DEFINE_string(video_track_name, "video0", "Video track Name");
22+
// TODO: Fix and add proper audo & video parsing. This is set to video0 on
23+
// purpose to test the video track
24+
DEFINE_string(track_name, "video0", "Track Name");
25+
DEFINE_int32(connect_timeout, 1000, "Connect timeout (ms)");
26+
DEFINE_int32(transaction_timeout, 120, "Transaction timeout (s)");
27+
DEFINE_bool(quic_transport, false, "Use raw QUIC transport");
28+
DEFINE_bool(fetch, false, "Use fetch rather than subscribe");
29+
30+
namespace {
31+
using namespace moxygen;
32+
33+
struct SubParams {
34+
LocationType locType;
35+
folly::Optional<AbsoluteLocation> start;
36+
folly::Optional<AbsoluteLocation> end;
37+
};
38+
39+
class TrackReceiverHandler : public ObjectReceiverCallback {
40+
public:
41+
~TrackReceiverHandler() override = default;
42+
FlowControlState onObject(const ObjectHeader&, Payload payload) override {
43+
if (payload) {
44+
std::cout << "Received payload. Size="
45+
<< payload->computeChainDataLength() << std::endl;
46+
}
47+
return FlowControlState::UNBLOCKED;
48+
}
49+
void onObjectStatus(const ObjectHeader& objHeader) override {
50+
std::cout << "ObjectStatus=" << uint32_t(objHeader.status) << std::endl;
51+
}
52+
void onEndOfStream() override {}
53+
void onError(ResetStreamErrorCode error) override {
54+
std::cout << "Stream Error=" << folly::to_underlying(error) << std::endl;
55+
}
56+
57+
void subscribeDone(SubscribeDone) override {
58+
baton.post();
59+
}
60+
61+
folly::coro::Baton baton;
62+
};
63+
64+
class MoQFlvReceiverClient {
65+
public:
66+
MoQFlvReceiverClient(
67+
folly::EventBase* evb,
68+
proxygen::URL url,
69+
FullTrackName ftn)
70+
: moqClient_(
71+
evb,
72+
std::move(url),
73+
(FLAGS_quic_transport ? MoQClient::TransportType::QUIC
74+
: MoQClient::TransportType::H3_WEBTRANSPORT)),
75+
fullTrackName_(std::move(ftn)) {}
76+
77+
folly::coro::Task<void> run(SubscribeRequest sub) noexcept {
78+
XLOG(INFO) << __func__;
79+
auto g =
80+
folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; });
81+
try {
82+
co_await moqClient_.setupMoQSession(
83+
std::chrono::milliseconds(FLAGS_connect_timeout),
84+
std::chrono::seconds(FLAGS_transaction_timeout),
85+
Role::SUBSCRIBER);
86+
auto exec = co_await folly::coro::co_current_executor;
87+
controlReadLoop().scheduleOn(exec).start();
88+
89+
SubParams subParams{sub.locType, sub.start, sub.end};
90+
sub.locType = LocationType::LatestObject;
91+
sub.start = folly::none;
92+
sub.end = folly::none;
93+
subRxHandler_ = std::make_shared<ObjectReceiver>(
94+
ObjectReceiver::SUBSCRIBE, &trackReceiverHandler_);
95+
auto track =
96+
co_await moqClient_.moqSession_->subscribe(sub, subRxHandler_);
97+
if (track.hasValue()) {
98+
subscribeID_ = track->subscribeID;
99+
XLOG(DBG1) << "subscribeID=" << subscribeID_;
100+
auto latest = track->latest;
101+
if (latest) {
102+
XLOG(INFO) << "Latest={" << latest->group << ", " << latest->object
103+
<< "}";
104+
}
105+
} else {
106+
XLOG(INFO) << "SubscribeError id=" << track.error().subscribeID
107+
<< " code=" << track.error().errorCode
108+
<< " reason=" << track.error().reasonPhrase;
109+
}
110+
moqClient_.moqSession_->drain();
111+
} catch (const std::exception& ex) {
112+
XLOG(ERR) << ex.what();
113+
co_return;
114+
}
115+
co_await trackReceiverHandler_.baton;
116+
XLOG(INFO) << __func__ << " done";
117+
}
118+
119+
void stop() {
120+
moqClient_.moqSession_->unsubscribe({subscribeID_});
121+
moqClient_.moqSession_->close();
122+
}
123+
124+
folly::coro::Task<void> controlReadLoop() {
125+
class ControlVisitor : public MoQSession::ControlVisitor {
126+
public:
127+
explicit ControlVisitor(MoQFlvReceiverClient& client) : client_(client) {}
128+
129+
void operator()(Announce announce) const override {
130+
XLOG(WARN) << "Announce ns=" << announce.trackNamespace;
131+
// text client doesn't expect server or relay to announce anything,
132+
// but announce OK anyways
133+
client_.moqClient_.moqSession_->announceOk({announce.trackNamespace});
134+
}
135+
136+
void operator()(SubscribeRequest subscribeReq) const override {
137+
XLOG(INFO) << "SubscribeRequest";
138+
client_.moqClient_.moqSession_->subscribeError(
139+
{subscribeReq.subscribeID, 404, "don't care"});
140+
}
141+
142+
void operator()(Goaway) const override {
143+
XLOG(INFO) << "Goaway";
144+
client_.moqClient_.moqSession_->unsubscribe({client_.subscribeID_});
145+
}
146+
147+
private:
148+
MoQFlvReceiverClient& client_;
149+
};
150+
XLOG(INFO) << __func__;
151+
auto g =
152+
folly::makeGuard([func = __func__] { XLOG(INFO) << "exit " << func; });
153+
ControlVisitor visitor(*this);
154+
MoQSession::ControlVisitor* vptr(&visitor);
155+
while (auto msg =
156+
co_await moqClient_.moqSession_->controlMessages().next()) {
157+
boost::apply_visitor(*vptr, msg.value());
158+
}
159+
}
160+
161+
MoQClient moqClient_;
162+
FullTrackName fullTrackName_;
163+
SubscribeID subscribeID_{0};
164+
TrackReceiverHandler trackReceiverHandler_;
165+
std::shared_ptr<ObjectReceiver> subRxHandler_;
166+
};
167+
} // namespace
168+
169+
using namespace moxygen;
170+
171+
int main(int argc, char* argv[]) {
172+
folly::Init init(&argc, &argv, false);
173+
folly::EventBase eventBase;
174+
proxygen::URL url(FLAGS_connect_url);
175+
if (!url.isValid() || !url.hasHost()) {
176+
XLOG(ERR) << "Invalid url: " << FLAGS_connect_url;
177+
}
178+
TrackNamespace ns =
179+
TrackNamespace(FLAGS_track_namespace, FLAGS_track_namespace_delimiter);
180+
MoQFlvReceiverClient flvReceiverClient(
181+
&eventBase,
182+
std::move(url),
183+
moxygen::FullTrackName({ns, FLAGS_track_name}));
184+
class SigHandler : public folly::AsyncSignalHandler {
185+
public:
186+
explicit SigHandler(folly::EventBase* evb, std::function<void(int)> fn)
187+
: folly::AsyncSignalHandler(evb), fn_(std::move(fn)) {
188+
registerSignalHandler(SIGTERM);
189+
registerSignalHandler(SIGINT);
190+
}
191+
void signalReceived(int signum) noexcept override {
192+
fn_(signum);
193+
unreg();
194+
}
195+
196+
void unreg() {
197+
unregisterSignalHandler(SIGTERM);
198+
unregisterSignalHandler(SIGINT);
199+
}
200+
201+
private:
202+
std::function<void(int)> fn_;
203+
};
204+
SigHandler handler(&eventBase, [&flvReceiverClient](int) mutable {
205+
flvReceiverClient.stop();
206+
});
207+
auto subParams =
208+
SubParams{LocationType::LatestObject, folly::none, folly::none};
209+
const auto subscribeID = 0;
210+
const auto trackAlias = 1;
211+
flvReceiverClient
212+
.run(
213+
{subscribeID,
214+
trackAlias,
215+
moxygen::FullTrackName({std::move(ns), FLAGS_track_name}),
216+
0,
217+
GroupOrder::OldestFirst,
218+
subParams.locType,
219+
subParams.start,
220+
subParams.end,
221+
{}})
222+
.scheduleOn(&eventBase)
223+
.start()
224+
.via(&eventBase)
225+
.thenTry([&handler](auto) { handler.unreg(); });
226+
eventBase.loop();
227+
}

0 commit comments

Comments
 (0)