16
16
#include < folly/coro/Task.h>
17
17
#include < folly/coro/UnboundedQueue.h>
18
18
#include < folly/logging/xlog.h>
19
+ #include < moxygen/MoQConsumers.h>
19
20
#include " moxygen/util/TimedBaton.h"
20
21
21
22
#include < boost/variant.hpp>
@@ -71,7 +72,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,
71
72
if (maxConcurrent > maxConcurrentSubscribes_) {
72
73
auto delta = maxConcurrent - maxConcurrentSubscribes_;
73
74
maxSubscribeID_ += delta;
74
- sendMaxSubscribeID (/* signal =*/ true );
75
+ sendMaxSubscribeID (/* signalWriteLoop =*/ true );
75
76
}
76
77
}
77
78
@@ -191,9 +192,11 @@ class MoQSession : public MoQControlCodec::ControlCallback,
191
192
TrackHandle (
192
193
FullTrackName fullTrackName,
193
194
SubscribeID subscribeID,
195
+ folly::EventBase* evb,
194
196
folly::CancellationToken token)
195
197
: fullTrackName_(std::move(fullTrackName)),
196
198
subscribeID_ (subscribeID),
199
+ evb_(evb),
197
200
cancelToken_(std::move(token)) {
198
201
auto contract = folly::coro::makePromiseContract<
199
202
folly::Expected<std::shared_ptr<TrackHandle>, SubscribeError>>();
@@ -217,10 +220,18 @@ class MoQSession : public MoQControlCodec::ControlCallback,
217
220
return subscribeID_;
218
221
}
219
222
223
+ void setNewObjectTimeout (std::chrono::milliseconds objectTimeout) {
224
+ objectTimeout_ = objectTimeout;
225
+ }
226
+
220
227
[[nodiscard]] folly::CancellationToken getCancelToken () const {
221
228
return cancelToken_;
222
229
}
223
230
231
+ void mergeReadCancelToken (folly::CancellationToken readToken) {
232
+ cancelToken_ = folly::CancellationToken::merge (cancelToken_, readToken);
233
+ }
234
+
224
235
void fin ();
225
236
226
237
folly::coro::Task<
@@ -327,6 +338,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,
327
338
private:
328
339
FullTrackName fullTrackName_;
329
340
SubscribeID subscribeID_;
341
+ folly::EventBase* evb_;
330
342
using SubscribeResult =
331
343
folly::Expected<std::shared_ptr<TrackHandle>, SubscribeError>;
332
344
folly::coro::Promise<SubscribeResult> promise_;
@@ -343,21 +355,21 @@ class MoQSession : public MoQControlCodec::ControlCallback,
343
355
GroupOrder groupOrder_;
344
356
folly::Optional<AbsoluteLocation> latest_;
345
357
folly::CancellationToken cancelToken_;
358
+ std::chrono::milliseconds objectTimeout_{std::chrono::hours (24 )};
346
359
bool allDataReceived_{false };
347
360
};
348
361
349
362
folly::coro::Task<
350
363
folly::Expected<std::shared_ptr<TrackHandle>, SubscribeError>>
351
364
subscribe (SubscribeRequest sub);
352
- void subscribeOk (SubscribeOk subOk);
365
+ std::shared_ptr<TrackConsumer> subscribeOk (SubscribeOk subOk);
353
366
void subscribeError (SubscribeError subErr);
354
367
void unsubscribe (Unsubscribe unsubscribe);
355
- void subscribeDone (SubscribeDone subDone);
356
368
void subscribeUpdate (SubscribeUpdate subUpdate);
357
369
358
370
folly::coro::Task<folly::Expected<std::shared_ptr<TrackHandle>, FetchError>>
359
371
fetch (Fetch fetch);
360
- void fetchOk (FetchOk fetchOk);
372
+ std::shared_ptr<FetchConsumer> fetchOk (FetchOk fetchOk);
361
373
void fetchError (FetchError fetchError);
362
374
void fetchCancel (FetchCancel fetchCancel);
363
375
@@ -371,23 +383,54 @@ class MoQSession : public MoQControlCodec::ControlCallback,
371
383
proxygen::WebTransport::ErrorCode errorCode;
372
384
};
373
385
374
- // Publish this object.
375
- folly::SemiFuture<folly::Unit> publish (
376
- const ObjectHeader& objHeader,
377
- SubscribeID subscribeID,
378
- uint64_t payloadOffset,
379
- std::unique_ptr<folly::IOBuf> payload,
380
- bool eom);
381
- folly::SemiFuture<folly::Unit> publishStreamPerObject (
382
- const ObjectHeader& objHeader,
383
- SubscribeID subscribeID,
384
- uint64_t payloadOffset,
385
- std::unique_ptr<folly::IOBuf> payload,
386
- bool eom);
387
- folly::SemiFuture<folly::Unit> publishStatus (
388
- const ObjectHeader& objHeader,
389
- SubscribeID subscribeID);
390
- folly::Try<folly::Unit> closeFetchStream (SubscribeID subID);
386
+ class PublisherImpl {
387
+ public:
388
+ PublisherImpl (
389
+ MoQSession* session,
390
+ SubscribeID subscribeID,
391
+ Priority priority,
392
+ GroupOrder groupOrder)
393
+ : session_(session),
394
+ subscribeID_ (subscribeID),
395
+ priority_(priority),
396
+ groupOrder_(groupOrder) {}
397
+ virtual ~PublisherImpl () = default ;
398
+
399
+ SubscribeID subscribeID () const {
400
+ return subscribeID_;
401
+ }
402
+ uint8_t priority () const {
403
+ return priority_;
404
+ }
405
+ void setPriority (uint8_t priority) {
406
+ priority_ = priority;
407
+ }
408
+ void setGroupOrder (GroupOrder groupOrder) {
409
+ groupOrder_ = groupOrder;
410
+ }
411
+
412
+ virtual void reset (ResetStreamErrorCode error) = 0;
413
+
414
+ virtual void onStreamComplete (const ObjectHeader& finalHeader) = 0;
415
+
416
+ folly::Expected<folly::Unit, MoQPublishError> subscribeDone (
417
+ SubscribeDone subDone);
418
+
419
+ void fetchComplete ();
420
+
421
+ protected:
422
+ proxygen::WebTransport* getWebTransport () const {
423
+ if (session_) {
424
+ return session_->wt_ ;
425
+ }
426
+ return nullptr ;
427
+ }
428
+
429
+ MoQSession* session_{nullptr };
430
+ SubscribeID subscribeID_;
431
+ uint8_t priority_;
432
+ GroupOrder groupOrder_;
433
+ };
391
434
392
435
void onNewUniStream (proxygen::WebTransport::StreamReadHandle* rh) override ;
393
436
void onNewBidiStream (proxygen::WebTransport::BidiStreamHandle bh) override ;
@@ -398,13 +441,16 @@ class MoQSession : public MoQControlCodec::ControlCallback,
398
441
}
399
442
400
443
private:
444
+ void cleanup ();
445
+
401
446
folly::coro::Task<void > controlWriteLoop (
402
447
proxygen::WebTransport::StreamWriteHandle* writeHandle);
403
448
folly::coro::Task<void > readLoop (
404
449
StreamType streamType,
405
450
proxygen::WebTransport::StreamReadHandle* readHandle);
406
451
407
452
std::shared_ptr<TrackHandle> getTrack (TrackIdentifier trackidentifier);
453
+ void subscribeDone (SubscribeDone subDone);
408
454
409
455
void onClientSetup (ClientSetup clientSetup) override ;
410
456
void onServerSetup (ServerSetup setup) override ;
@@ -445,72 +491,9 @@ class MoQSession : public MoQControlCodec::ControlCallback,
445
491
void onConnectionError (ErrorCode error) override ;
446
492
void checkForCloseOnDrain ();
447
493
448
- folly::SemiFuture<folly::Unit> publishImpl (
449
- const ObjectHeader& objHeader,
450
- SubscribeID subscribeID,
451
- uint64_t payloadOffset,
452
- std::unique_ptr<folly::IOBuf> payload,
453
- bool eom,
454
- bool streamPerObject);
455
-
456
- uint64_t order (const ObjectHeader& objHeader, const SubscribeID subscribeID);
457
-
458
- void retireSubscribeId (bool signal);
459
- void sendMaxSubscribeID (bool signal);
460
-
461
- struct PublishKey {
462
- TrackIdentifier trackIdentifier;
463
- uint64_t group;
464
- uint64_t subgroup;
465
- ForwardPreference pref;
466
- uint64_t object;
467
-
468
- bool operator ==(const PublishKey& other) const {
469
- if (trackIdentifier != other.trackIdentifier || pref != other.pref ) {
470
- return false ;
471
- }
472
- if (pref == ForwardPreference::Datagram) {
473
- return object == other.object ;
474
- } else if (pref == ForwardPreference::Subgroup) {
475
- return group == other.group && subgroup == other.subgroup ;
476
- } else if (pref == ForwardPreference::Track) {
477
- return true ;
478
- } else if (pref == ForwardPreference::Fetch) {
479
- return true ;
480
- }
481
- return false ;
482
- }
483
-
484
- struct hash {
485
- size_t operator ()(const PublishKey& ook) const {
486
- if (ook.pref == ForwardPreference::Datagram) {
487
- return folly::hash::hash_combine (
488
- TrackIdentifierHash{}(ook.trackIdentifier ),
489
- ook.group ,
490
- ook.object );
491
- } else if (ook.pref == ForwardPreference::Subgroup) {
492
- return folly::hash::hash_combine (
493
- TrackIdentifierHash{}(ook.trackIdentifier ),
494
- ook.group ,
495
- ook.subgroup );
496
- }
497
- // Track or Fetch
498
- return folly::hash::hash_combine (
499
- TrackIdentifierHash{}(ook.trackIdentifier ));
500
- }
501
- };
502
- };
503
-
504
- struct PublishData {
505
- uint64_t streamID;
506
- uint64_t group;
507
- uint64_t subgroup;
508
- uint64_t objectID;
509
- folly::Optional<uint64_t > objectLength;
510
- uint64_t offset;
511
- bool streamPerObject;
512
- bool cancelled{false };
513
- };
494
+ void retireSubscribeId (bool signalWriteLoop);
495
+ void sendMaxSubscribeID (bool signalWriteLoop);
496
+ void fetchComplete (SubscribeID subscribeID);
514
497
515
498
// Get the max subscribe id from the setup params. If MAX_SUBSCRIBE_ID key is
516
499
// not present, we default to 0 as specified. 0 means that the peer MUST NOT
@@ -555,13 +538,10 @@ class MoQSession : public MoQControlCodec::ControlCallback,
555
538
TrackNamespace::hash>
556
539
pendingSubscribeAnnounces_;
557
540
558
- struct PubTrack {
559
- uint8_t priority;
560
- GroupOrder groupOrder;
561
- };
562
541
// Subscriber ID -> metadata about a publish track
563
- folly::F14FastMap<SubscribeID, PubTrack, SubscribeID::hash> pubTracks_;
564
- folly::F14FastMap<PublishKey, PublishData, PublishKey::hash> publishDataMap_;
542
+ folly::
543
+ F14FastMap<SubscribeID, std::shared_ptr<PublisherImpl>, SubscribeID::hash>
544
+ pubTracks_;
565
545
uint64_t nextTrackId_{0 };
566
546
uint64_t closedSubscribes_{0 };
567
547
// TODO: Make this value configurable. maxConcurrentSubscribes_ represents
0 commit comments