Skip to content

Commit 07d3bf2

Browse files
martindukecopybara-github
authored andcommitted
Update PUBLISH_NAMESPACE to MOQT draft-16.
PiperOrigin-RevId: 868705687
1 parent 908d5e4 commit 07d3bf2

22 files changed

+432
-392
lines changed

quiche/quic/moqt/moqt_error.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,15 @@ enum class QUICHE_EXPORT RequestErrorCode : uint64_t {
7171
kMalformedTrack = 0x9,
7272
kMalformedAuthToken = 0x10,
7373
kExpiredAuthToken = 0x12,
74+
kDuplicateSubscription = 0x19,
7475
kPrefixOverlap = 0x30,
7576
};
7677

7778
struct MoqtRequestErrorInfo {
7879
RequestErrorCode error_code;
7980
std::optional<quic::QuicTimeDelta> retry_interval;
8081
std::string reason_phrase;
82+
bool operator==(const MoqtRequestErrorInfo& other) const = default;
8183
};
8284

8385
RequestErrorCode StatusToRequestErrorCode(absl::Status status);

quiche/quic/moqt/moqt_framer.cc

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -573,21 +573,16 @@ quiche::QuicheBuffer MoqtFramer::SerializeRequestUpdate(
573573

574574
quiche::QuicheBuffer MoqtFramer::SerializePublishNamespace(
575575
const MoqtPublishNamespace& message) {
576-
KeyValuePairList parameters;
577-
if (!FillAndValidateVersionSpecificParameters(
578-
MoqtMessageType::kPublishNamespace, message.parameters, parameters)) {
579-
return quiche::QuicheBuffer();
580-
};
581-
return SerializeControlMessage(MoqtMessageType::kPublishNamespace,
582-
WireVarInt62(message.request_id),
583-
WireTrackNamespace(message.track_namespace),
584-
WireKeyValuePairList(parameters));
576+
return SerializeControlMessage(
577+
MoqtMessageType::kPublishNamespace, WireVarInt62(message.request_id),
578+
WireTrackNamespace(message.track_namespace),
579+
WireKeyValuePairList(message.parameters.ToKeyValuePairList()));
585580
}
586581

587582
quiche::QuicheBuffer MoqtFramer::SerializePublishNamespaceDone(
588583
const MoqtPublishNamespaceDone& message) {
589584
return SerializeControlMessage(MoqtMessageType::kPublishNamespaceDone,
590-
WireTrackNamespace(message.track_namespace));
585+
WireVarInt62(message.request_id));
591586
}
592587

593588
quiche::QuicheBuffer MoqtFramer::SerializeNamespace(
@@ -608,8 +603,7 @@ quiche::QuicheBuffer MoqtFramer::SerializePublishNamespaceCancel(
608603
const MoqtPublishNamespaceCancel& message) {
609604
return SerializeControlMessage(
610605
MoqtMessageType::kPublishNamespaceCancel,
611-
WireTrackNamespace(message.track_namespace),
612-
WireVarInt62(message.error_code),
606+
WireVarInt62(message.request_id), WireVarInt62(message.error_code),
613607
WireStringWithVarInt62Length(message.error_reason));
614608
}
615609

quiche/quic/moqt/moqt_integration_test.cc

Lines changed: 75 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
#include "quiche/quic/core/quic_time.h"
1919
#include "quiche/quic/core/quic_types.h"
2020
#include "quiche/quic/moqt/moqt_error.h"
21+
#include "quiche/quic/moqt/moqt_fetch_task.h"
2122
#include "quiche/quic/moqt/moqt_key_value_pair.h"
2223
#include "quiche/quic/moqt/moqt_known_track_publisher.h"
2324
#include "quiche/quic/moqt/moqt_messages.h"
2425
#include "quiche/quic/moqt/moqt_names.h"
2526
#include "quiche/quic/moqt/moqt_object.h"
2627
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
27-
#include "quiche/quic/moqt/moqt_priority.h"
2828
#include "quiche/quic/moqt/moqt_probe_manager.h"
2929
#include "quiche/quic/moqt/moqt_publisher.h"
3030
#include "quiche/quic/moqt/moqt_session.h"
@@ -172,86 +172,73 @@ TEST_F(MoqtIntegrationTest, VersionMismatch) {
172172

173173
TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessThenPublishNamespaceDone) {
174174
EstablishSession();
175-
auto parameters = std::make_optional<VersionSpecificParameters>(
176-
AuthTokenType::kOutOfBand, "foo");
175+
MessageParameters parameters;
176+
parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand,
177+
"foo");
177178
EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback,
178-
Call(TrackNamespace{"foo"}, parameters, _))
179+
Call(TrackNamespace{"foo"}, std::make_optional(parameters), _))
179180
.WillOnce([](const TrackNamespace&,
180-
const std::optional<VersionSpecificParameters>&,
181+
const std::optional<MessageParameters>&,
181182
MoqtResponseCallback callback) {
182183
std::move(callback)(std::nullopt);
183184
});
184-
testing::MockFunction<void(TrackNamespace track_namespace,
185-
std::optional<MoqtRequestErrorInfo> error_message)>
186-
publish_namespace_callback;
187-
client_->session()->PublishNamespace(
188-
TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(),
189-
*parameters);
185+
testing::MockFunction<void(std::optional<MoqtRequestErrorInfo>)>
186+
response_callback;
187+
client_->session()->PublishNamespace(TrackNamespace{"foo"}, parameters,
188+
response_callback.AsStdFunction(),
189+
[](MoqtRequestErrorInfo) {});
190190
bool matches = false;
191-
EXPECT_CALL(publish_namespace_callback, Call(_, _))
192-
.WillOnce([&](TrackNamespace track_namespace,
193-
std::optional<MoqtRequestErrorInfo> error) {
191+
EXPECT_CALL(response_callback, Call)
192+
.WillOnce([&](std::optional<MoqtRequestErrorInfo> error) {
194193
matches = true;
195-
EXPECT_EQ(track_namespace, TrackNamespace{"foo"});
196194
EXPECT_FALSE(error.has_value());
197195
});
198196
bool success =
199197
test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; });
200198
EXPECT_TRUE(success);
201199
matches = false;
202-
EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback,
203-
Call(TrackNamespace{"foo"},
204-
std::optional<VersionSpecificParameters>(), _))
205-
.WillOnce([&](const TrackNamespace& name,
206-
const std::optional<VersionSpecificParameters>& parameters,
207-
MoqtResponseCallback callback) {
208-
matches = true;
209-
EXPECT_EQ(callback, nullptr);
210-
});
211-
client_->session()->PublishNamespaceDone(TrackNamespace{"foo"});
200+
EXPECT_CALL(
201+
server_callbacks_.incoming_publish_namespace_callback,
202+
Call(TrackNamespace{"foo"}, std::optional<MessageParameters>(), nullptr))
203+
.WillOnce([&](const TrackNamespace&,
204+
const std::optional<MessageParameters>&,
205+
MoqtResponseCallback) { matches = true; });
206+
EXPECT_TRUE(client_->session()->PublishNamespaceDone(TrackNamespace{"foo"}));
212207
success = test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; });
213208
EXPECT_TRUE(success);
214209
}
215210

216211
TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessThenCancel) {
217212
EstablishSession();
218-
auto parameters = std::make_optional<VersionSpecificParameters>(
219-
AuthTokenType::kOutOfBand, "foo");
213+
MessageParameters parameters;
214+
parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand,
215+
"foo");
220216
EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback,
221-
Call(TrackNamespace{"foo"}, parameters, _))
217+
Call(TrackNamespace{"foo"}, std::make_optional(parameters), _))
222218
.WillOnce([](const TrackNamespace&,
223-
const std::optional<VersionSpecificParameters>&,
219+
const std::optional<MessageParameters>&,
224220
MoqtResponseCallback callback) {
225221
std::move(callback)(std::nullopt);
226222
});
227-
testing::MockFunction<void(TrackNamespace track_namespace,
228-
std::optional<MoqtRequestErrorInfo> error_message)>
229-
publish_namespace_callback;
230-
client_->session()->PublishNamespace(
231-
TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(),
232-
*parameters);
223+
testing::MockFunction<void(std::optional<MoqtRequestErrorInfo>)>
224+
response_callback;
225+
testing::MockFunction<void(MoqtRequestErrorInfo)> cancel_callback;
226+
client_->session()->PublishNamespace(TrackNamespace{"foo"}, parameters,
227+
response_callback.AsStdFunction(),
228+
cancel_callback.AsStdFunction());
233229
bool matches = false;
234-
EXPECT_CALL(publish_namespace_callback, Call(_, _))
235-
.WillOnce([&](TrackNamespace track_namespace,
236-
std::optional<MoqtRequestErrorInfo> error) {
237-
matches = true;
238-
EXPECT_EQ(track_namespace, TrackNamespace{"foo"});
239-
EXPECT_FALSE(error.has_value());
240-
});
230+
EXPECT_CALL(response_callback, Call(std::optional<MoqtRequestErrorInfo>()))
231+
.WillOnce(
232+
[&](std::optional<MoqtRequestErrorInfo> error) { matches = true; });
241233
bool success =
242234
test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; });
243235
EXPECT_TRUE(success);
244236
matches = false;
245-
EXPECT_CALL(publish_namespace_callback, Call(_, _))
246-
.WillOnce([&](TrackNamespace track_namespace,
247-
std::optional<MoqtRequestErrorInfo> error) {
248-
matches = true;
249-
EXPECT_EQ(track_namespace, TrackNamespace{"foo"});
250-
ASSERT_TRUE(error.has_value());
251-
EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError);
252-
EXPECT_EQ(error->reason_phrase, "internal error");
253-
});
254-
server_->session()->CancelPublishNamespace(TrackNamespace{"foo"},
237+
EXPECT_CALL(cancel_callback,
238+
Call(MoqtRequestErrorInfo{RequestErrorCode::kInternalError,
239+
std::nullopt, "internal error"}))
240+
.WillOnce([&](std::optional<MoqtRequestErrorInfo>) { matches = true; });
241+
server_->session()->PublishNamespaceCancel(TrackNamespace{"foo"},
255242
RequestErrorCode::kInternalError,
256243
"internal error");
257244
success = test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; });
@@ -260,59 +247,52 @@ TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessThenCancel) {
260247

261248
TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessSubscribeInResponse) {
262249
EstablishSession();
263-
auto parameters = std::make_optional<VersionSpecificParameters>(
264-
AuthTokenType::kOutOfBand, "foo");
250+
TrackNamespace prefix{"foo"};
251+
MessageParameters parameters;
252+
parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand,
253+
"foo");
265254
EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback,
266-
Call(TrackNamespace{"foo"}, parameters, _))
267-
.WillOnce([](const TrackNamespace&,
268-
const std::optional<VersionSpecificParameters>&,
269-
MoqtResponseCallback callback) {
255+
Call(TrackNamespace{"foo"}, std::make_optional(parameters), _))
256+
.WillOnce([&](const TrackNamespace& track_namespace,
257+
const std::optional<MessageParameters>&,
258+
MoqtResponseCallback callback) {
270259
std::move(callback)(std::nullopt);
271-
});
272-
client_->session()->PublishNamespace(
273-
TrackNamespace{"foo"},
274-
outgoing_publish_namespace_callback_.AsStdFunction(), *parameters);
275-
bool matches = false;
276-
EXPECT_CALL(outgoing_publish_namespace_callback_, Call)
277-
.WillOnce([&](TrackNamespace track_namespace,
278-
std::optional<MoqtRequestErrorInfo> error) {
279-
EXPECT_EQ(track_namespace, TrackNamespace{"foo"});
280260
FullTrackName track_name(track_namespace, "/catalog");
281-
EXPECT_FALSE(error.has_value());
282261
MessageParameters parameters(MoqtFilterType::kLargestObject);
283262
server_->session()->Subscribe(track_name, &subscribe_visitor_,
284263
parameters);
285-
})
286-
.WillOnce([&](TrackNamespace track_namespace,
287-
std::optional<MoqtRequestErrorInfo> error) {
288-
EXPECT_EQ(track_namespace, TrackNamespace{"foo"});
289-
EXPECT_TRUE(error.has_value());
264+
});
265+
testing::MockFunction<void(std::optional<MoqtRequestErrorInfo>)>
266+
response_callback;
267+
client_->session()->PublishNamespace(prefix, parameters,
268+
response_callback.AsStdFunction(),
269+
[](MoqtRequestErrorInfo) {});
270+
bool matches = false;
271+
EXPECT_CALL(response_callback, Call)
272+
.WillOnce([&](std::optional<MoqtRequestErrorInfo> error) {
273+
EXPECT_FALSE(error.has_value());
290274
});
291275
EXPECT_CALL(subscribe_visitor_, OnReply).WillOnce([&]() { matches = true; });
292276
bool success =
293277
test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; });
294278
EXPECT_TRUE(success);
295279
// Session tears down PUBLISH_NAMESPACE.
296280
EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback,
297-
Call(TrackNamespace{"foo"},
298-
std::optional<VersionSpecificParameters>(), _))
299-
.WillOnce(
300-
[](const TrackNamespace&,
301-
const std::optional<VersionSpecificParameters>&,
302-
MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); });
281+
Call(prefix, std::optional<MessageParameters>(), nullptr));
303282
}
304283

305284
TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessSendDataInResponse) {
306285
EstablishSession();
307286

308287
// Set up the server to subscribe to "data" track for the namespace
309288
// publish_namespace it receives.
310-
auto parameters = std::make_optional<VersionSpecificParameters>(
311-
AuthTokenType::kOutOfBand, "foo");
289+
MessageParameters parameters;
290+
parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand,
291+
"foo");
312292
EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback,
313-
Call(TrackNamespace{"test"}, parameters, _))
293+
Call(TrackNamespace{"test"}, std::make_optional(parameters), _))
314294
.WillOnce([&](const TrackNamespace& track_namespace,
315-
const std::optional<VersionSpecificParameters>&,
295+
const std::optional<MessageParameters>&,
316296
MoqtResponseCallback callback) {
317297
FullTrackName track_name(track_namespace, "data");
318298
std::move(callback)(std::nullopt);
@@ -331,8 +311,8 @@ TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessSendDataInResponse) {
331311
received_subscribe_ok = true;
332312
});
333313
client_->session()->PublishNamespace(
334-
TrackNamespace{"test"},
335-
[](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, *parameters);
314+
TrackNamespace{"test"}, parameters,
315+
[](std::optional<MoqtRequestErrorInfo>) {}, [](MoqtRequestErrorInfo) {});
336316
bool success = test_harness_.RunUntilWithDefaultTimeout(
337317
[&]() { return received_subscribe_ok; });
338318
EXPECT_TRUE(success);
@@ -357,12 +337,8 @@ TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessSendDataInResponse) {
357337
EXPECT_TRUE(success);
358338
// Session tears down PUBLISH_NAMESPACE.
359339
EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback,
360-
Call(TrackNamespace{"test"},
361-
std::optional<VersionSpecificParameters>(), _))
362-
.WillOnce(
363-
[](const TrackNamespace&,
364-
const std::optional<VersionSpecificParameters>&,
365-
MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); });
340+
Call(TrackNamespace{"test"}, std::optional<MessageParameters>(),
341+
nullptr));
366342
}
367343

368344
TEST_F(MoqtIntegrationTest, SendMultipleGroups) {
@@ -531,18 +507,16 @@ TEST_F(MoqtIntegrationTest, FetchItemsFromPast) {
531507

532508
TEST_F(MoqtIntegrationTest, PublishNamespaceFailure) {
533509
EstablishSession();
534-
testing::MockFunction<void(TrackNamespace track_namespace,
535-
std::optional<MoqtRequestErrorInfo> error_message)>
536-
publish_namespace_callback;
537-
client_->session()->PublishNamespace(
538-
TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(),
539-
VersionSpecificParameters());
510+
testing::MockFunction<void(std::optional<MoqtRequestErrorInfo>)>
511+
response_callback;
512+
client_->session()->PublishNamespace(TrackNamespace{"foo"},
513+
MessageParameters(),
514+
response_callback.AsStdFunction(),
515+
[](MoqtRequestErrorInfo error_info) {});
540516
bool matches = false;
541-
EXPECT_CALL(publish_namespace_callback, Call(_, _))
542-
.WillOnce([&](TrackNamespace track_namespace,
543-
std::optional<MoqtRequestErrorInfo> error) {
517+
EXPECT_CALL(response_callback, Call)
518+
.WillOnce([&](std::optional<MoqtRequestErrorInfo> error) {
544519
matches = true;
545-
EXPECT_EQ(track_namespace, TrackNamespace{"foo"});
546520
ASSERT_TRUE(error.has_value());
547521
EXPECT_EQ(error->error_code, RequestErrorCode::kNotSupported);
548522
});

quiche/quic/moqt/moqt_messages.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ struct QUICHE_EXPORT MoqtRequestUpdate {
401401
struct QUICHE_EXPORT MoqtPublishNamespace {
402402
uint64_t request_id;
403403
TrackNamespace track_namespace;
404-
VersionSpecificParameters parameters;
404+
MessageParameters parameters;
405405
};
406406

407407
struct QUICHE_EXPORT MoqtRequestOk {
@@ -410,11 +410,11 @@ struct QUICHE_EXPORT MoqtRequestOk {
410410
};
411411

412412
struct QUICHE_EXPORT MoqtPublishNamespaceDone {
413-
TrackNamespace track_namespace;
413+
uint64_t request_id;
414414
};
415415

416416
struct QUICHE_EXPORT MoqtPublishNamespaceCancel {
417-
TrackNamespace track_namespace;
417+
uint64_t request_id;
418418
RequestErrorCode error_code;
419419
std::string error_reason;
420420
};

quiche/quic/moqt/moqt_parser.cc

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -789,13 +789,7 @@ size_t MoqtControlParser::ProcessPublishNamespace(
789789
!ReadTrackNamespace(reader, publish_namespace.track_namespace)) {
790790
return 0;
791791
}
792-
KeyValuePairList parameters;
793-
if (!ParseKeyValuePairList(reader, parameters)) {
794-
return 0;
795-
}
796-
if (!FillAndValidateVersionSpecificParameters(
797-
parameters, publish_namespace.parameters,
798-
MoqtMessageType::kPublishNamespace)) {
792+
if (!FillAndValidateMessageParameters(reader, publish_namespace.parameters)) {
799793
return 0;
800794
}
801795
visitor_.OnPublishNamespaceMessage(publish_namespace);
@@ -834,22 +828,20 @@ size_t MoqtControlParser::ProcessRequestOk(quic::QuicDataReader& reader) {
834828

835829
size_t MoqtControlParser::ProcessPublishNamespaceDone(
836830
quic::QuicDataReader& reader) {
837-
MoqtPublishNamespaceDone unpublish_namespace;
838-
if (!ReadTrackNamespace(reader, unpublish_namespace.track_namespace)) {
831+
MoqtPublishNamespaceDone pn_done;
832+
if (!reader.ReadVarInt62(&pn_done.request_id)) {
839833
return 0;
840834
}
841-
visitor_.OnPublishNamespaceDoneMessage(unpublish_namespace);
835+
visitor_.OnPublishNamespaceDoneMessage(pn_done);
842836
return reader.PreviouslyReadPayload().length();
843837
}
844838

845839
size_t MoqtControlParser::ProcessPublishNamespaceCancel(
846840
quic::QuicDataReader& reader) {
847841
MoqtPublishNamespaceCancel publish_namespace_cancel;
848-
if (!ReadTrackNamespace(reader, publish_namespace_cancel.track_namespace)) {
849-
return 0;
850-
}
851842
uint64_t error_code;
852-
if (!reader.ReadVarInt62(&error_code) ||
843+
if (!reader.ReadVarInt62(&publish_namespace_cancel.request_id) ||
844+
!reader.ReadVarInt62(&error_code) ||
853845
!reader.ReadStringVarInt62(publish_namespace_cancel.error_reason)) {
854846
return 0;
855847
}

0 commit comments

Comments
 (0)