Skip to content

Commit e599ad2

Browse files
martindukecopybara-github
authored andcommitted
Refresh MoQT integration test for FETCH and fix detected errors.
PiperOrigin-RevId: 712510813
1 parent 71111c7 commit e599ad2

File tree

3 files changed

+53
-62
lines changed

3 files changed

+53
-62
lines changed

quiche/quic/moqt/moqt_integration_test.cc

Lines changed: 43 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "quiche/quic/moqt/moqt_messages.h"
1717
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
1818
#include "quiche/quic/moqt/moqt_priority.h"
19+
#include "quiche/quic/moqt/moqt_publisher.h"
1920
#include "quiche/quic/moqt/moqt_session.h"
2021
#include "quiche/quic/moqt/moqt_track.h"
2122
#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
@@ -336,68 +337,55 @@ TEST_F(MoqtIntegrationTest, SendMultipleGroups) {
336337
}
337338
}
338339

339-
// TODO(martinduke): Restore this test when FETCH is implemented.
340-
#if 0
341340
TEST_F(MoqtIntegrationTest, FetchItemsFromPast) {
342341
EstablishSession();
343342
MoqtKnownTrackPublisher publisher;
344343
server_->session()->set_publisher(&publisher);
345344

346-
for (MoqtForwardingPreference forwarding_preference :
347-
{MoqtForwardingPreference::kSubgroup,
348-
MoqtForwardingPreference::kDatagram}) {
349-
SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
350-
MockSubscribeRemoteTrackVisitor client_visitor;
351-
std::string name =
352-
absl::StrCat("pref_", static_cast<int>(forwarding_preference));
353-
auto queue = std::make_shared<MoqtOutgoingQueue>(
354-
FullTrackName{"test", name}, forwarding_preference);
355-
publisher.Add(queue);
356-
for (int i = 0; i < 100; ++i) {
357-
queue->AddObject(MemSliceFromString("object"), /*key=*/true);
358-
}
359-
360-
client_->session()->SubscribeAbsolute(FullTrackName("test", name), 0, 0,
361-
&client_visitor);
362-
int received = 0;
363-
// Those won't arrive since they have expired.
364-
EXPECT_CALL(client_visitor,
365-
OnObjectFragment(_, FullSequence{0, 0}, _, _, _, true))
366-
.Times(0);
367-
EXPECT_CALL(client_visitor,
368-
OnObjectFragment(_, FullSequence{0, 0}, _, _, _, true))
369-
.Times(0);
370-
EXPECT_CALL(client_visitor,
371-
OnObjectFragment(_, FullSequence{96, 0}, _, _, _, true))
372-
.Times(0);
373-
EXPECT_CALL(client_visitor,
374-
OnObjectFragment(_, FullSequence{96, 0}, _, _, _, true))
375-
.Times(0);
376-
// Those are within the "last three groups" window.
377-
EXPECT_CALL(client_visitor,
378-
OnObjectFragment(_, FullSequence{97, 0}, _, _, _, true))
379-
.WillOnce([&] { ++received; });
380-
EXPECT_CALL(client_visitor,
381-
OnObjectFragment(_, FullSequence{97, 1}, _, _, _, true))
382-
.WillOnce([&] { ++received; });
383-
EXPECT_CALL(client_visitor,
384-
OnObjectFragment(_, FullSequence{98, 0}, _, _, _, true))
385-
.WillOnce([&] { ++received; });
386-
EXPECT_CALL(client_visitor,
387-
OnObjectFragment(_, FullSequence{98, 1}, _, _, _, true))
388-
.WillOnce([&] { ++received; });
389-
EXPECT_CALL(client_visitor,
390-
OnObjectFragment(_, FullSequence{99, 0}, _, _, _, true))
391-
.WillOnce([&] { ++received; });
392-
EXPECT_CALL(client_visitor,
393-
OnObjectFragment(_, FullSequence{99, 1}, _, _, _, true))
394-
.Times(0); // The current group should not be closed yet.
395-
bool success = test_harness_.RunUntilWithDefaultTimeout(
396-
[&]() { return received >= 5; });
397-
EXPECT_TRUE(success);
345+
MockSubscribeRemoteTrackVisitor client_visitor;
346+
FullTrackName full_track_name("test", "fetch");
347+
auto queue = std::make_shared<MoqtOutgoingQueue>(
348+
full_track_name, MoqtForwardingPreference::kSubgroup);
349+
publisher.Add(queue);
350+
for (int i = 0; i < 100; ++i) {
351+
queue->AddObject(MemSliceFromString("object"), /*key=*/true);
398352
}
353+
std::unique_ptr<MoqtFetchTask> fetch;
354+
EXPECT_TRUE(client_->session()->Fetch(
355+
full_track_name,
356+
[&](std::unique_ptr<MoqtFetchTask> task) { fetch = std::move(task); },
357+
FullSequence{0, 0}, 99, std::nullopt, 128, std::nullopt,
358+
MoqtSubscribeParameters()));
359+
// Run until we get FETCH_OK.
360+
bool success = test_harness_.RunUntilWithDefaultTimeout(
361+
[&]() { return fetch != nullptr; });
362+
EXPECT_TRUE(success);
363+
364+
EXPECT_TRUE(fetch->GetStatus().ok());
365+
EXPECT_EQ(fetch->GetLargestId(), FullSequence(99, 0));
366+
MoqtFetchTask::GetNextObjectResult result;
367+
PublishedObject object;
368+
FullSequence expected{97, 0};
369+
do {
370+
result = fetch->GetNextObject(object);
371+
if (result == MoqtFetchTask::GetNextObjectResult::kEof) {
372+
break;
373+
}
374+
EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess);
375+
EXPECT_EQ(object.sequence, expected);
376+
if (object.sequence.object == 1) {
377+
EXPECT_EQ(object.status, MoqtObjectStatus::kEndOfGroup);
378+
expected.object = 0;
379+
++expected.group;
380+
} else {
381+
EXPECT_EQ(object.status, MoqtObjectStatus::kNormal);
382+
EXPECT_EQ(object.payload.AsStringView(), "object");
383+
++expected.object;
384+
}
385+
} while (result == MoqtFetchTask::GetNextObjectResult::kSuccess);
386+
EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kEof);
387+
EXPECT_EQ(expected, FullSequence(99, 1));
399388
}
400-
#endif
401389

402390
TEST_F(MoqtIntegrationTest, AnnounceFailure) {
403391
EstablishSession();

quiche/quic/moqt/moqt_session.cc

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,11 +1415,9 @@ void MoqtSession::IncomingDataStream::OnObjectMessage(const MoqtObject& message,
14151415

14161416
MoqtSession::IncomingDataStream::~IncomingDataStream() {
14171417
if (parser_.track_alias().has_value() &&
1418-
parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch) {
1419-
RemoteTrack* track = session_->RemoteTrackById(*parser_.track_alias());
1420-
if (track != nullptr && track->is_fetch()) {
1421-
session_->upstream_by_id_.erase(*parser_.track_alias());
1422-
}
1418+
parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch &&
1419+
track_.IsValid()) {
1420+
session_->upstream_by_id_.erase(*parser_.track_alias());
14231421
}
14241422
}
14251423

quiche/quic/moqt/moqt_track.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,19 @@ UpstreamFetch::UpstreamFetchTask::GetNextObject(PublishedObject& output) {
7575
need_object_available_callback_ = true;
7676
return kPending;
7777
}
78-
quiche::QuicheMemSlice message_slice(std::move(payload_));
78+
if (!payload_.empty()) {
79+
quiche::QuicheMemSlice message_slice(std::move(payload_));
80+
output.payload = std::move(message_slice);
81+
}
7982
output.sequence = FullSequence(next_object_->group_id,
8083
next_object_->subgroup_id.value_or(0),
8184
next_object_->object_id);
8285
output.status = next_object_->object_status;
8386
output.publisher_priority = next_object_->publisher_priority;
84-
output.payload = std::move(message_slice);
8587
output.fin_after_this = false;
88+
if (output.sequence == largest_id_) { // This is the last object.
89+
eof_ = true;
90+
}
8691
next_object_.reset();
8792
can_read_callback_();
8893
return kSuccess;

0 commit comments

Comments
 (0)