forked from Upipe/upipe
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupipe_ts_demux.c
More file actions
4390 lines (3983 loc) · 167 KB
/
upipe_ts_demux.c
File metadata and controls
4390 lines (3983 loc) · 167 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (C) 2013-2018 OpenHeadend S.A.R.L.
* Copyright (C) 2023-2026 EasyTools S.A.S.
*
* Authors: Christophe Massiot
*
* SPDX-License-Identifier: LGPL-2.1-or-later
*/
/** @file
* @short Upipe higher-level module demuxing elementary streams of a TS
* Four parts in this file:
* @list
* @item psi_pid structure, which handles PSI demultiplexing from ts_split
* until ts_psi_split
* @item output source pipe, which is returned to the application, and
* represents an elementary stream; it sets up the ts_decaps, pes_decaps and
* framer inner pipes
* @item program split pipe, which is returned to the application, and
* represents a program; it sets up the ts_split_output and ts_pmtd inner pipes
* @item demux sink pipe which sets up the ts_split, ts_patd and optional input
* synchronizer inner pipes
*
* Normative references:
* - ISO/IEC 13818-1:2007(E) (MPEG-2 Systems)
* - ETSI EN 300 468 V1.13.1 (2012-08) (SI in DVB systems)
* - ETSI TR 101 211 V1.9.1 (2009-06) (Guidelines of SI in DVB systems)
*/
#include "config.h"
#include "upipe/ubase.h"
#include "upipe/ulist.h"
#include "upipe/uprobe.h"
#include "upipe/uprobe_prefix.h"
#include "upipe/uref.h"
#include "upipe/uref_flow.h"
#include "upipe/uref_clock.h"
#include "upipe/uclock.h"
#include "upipe/upipe.h"
#include "upipe/upipe_helper_upipe.h"
#include "upipe/upipe_helper_urefcount.h"
#include "upipe/upipe_helper_urefcount_real.h"
#include "upipe/upipe_helper_void.h"
#include "upipe/upipe_helper_uref_mgr.h"
#include "upipe/upipe_helper_flow.h"
#include "upipe/upipe_helper_inner.h"
#include "upipe/upipe_helper_uprobe.h"
#include "upipe/upipe_helper_bin_input.h"
#include "upipe/upipe_helper_bin_output.h"
#include "upipe/upipe_helper_sync.h"
#include "upipe/upipe_helper_output.h"
#include "upipe/upipe_helper_subpipe.h"
#include "upipe-modules/upipe_null.h"
#include "upipe-modules/upipe_setrap.h"
#include "upipe-modules/upipe_idem.h"
#include "upipe-modules/upipe_setflowdef.h"
#include "upipe-modules/upipe_probe_uref.h"
#include "upipe-ts/uref_ts_flow.h"
#include "upipe-ts/uref_ts_event.h"
#include "upipe-ts/upipe_ts_demux.h"
#include "upipe-ts/upipe_ts_split.h"
#include "upipe-ts/upipe_ts_sync.h"
#include "upipe-ts/upipe_ts_check.h"
#include "upipe-ts/upipe_ts_decaps.h"
#include "upipe-ts/upipe_ts_ait_decoder.h"
#include "upipe-ts/upipe_ts_eit_decoder.h"
#include "upipe-ts/upipe_ts_nit_decoder.h"
#include "upipe-ts/upipe_ts_psi_merge.h"
#include "upipe-ts/upipe_ts_psi_split.h"
#include "upipe-ts/upipe_ts_pat_decoder.h"
#include "upipe-ts/upipe_ts_cat_decoder.h"
#ifdef HAVE_TS_CRYPT
#include "upipe-ts/upipe_ts_emm_decoder.h"
#endif
#include "upipe-ts/upipe_ts_pmt_decoder.h"
#include "upipe-ts/upipe_ts_pes_decaps.h"
#include "upipe-ts/upipe_ts_scte35_decoder.h"
#include "upipe-ts/upipe_ts_sdt_decoder.h"
#include "upipe-ts/upipe_ts_tdt_decoder.h"
#include "upipe-ts/upipe_ts_tot_decoder.h"
#include <stdlib.h>
#include <stdbool.h>
#include <stdarg.h>
#include <string.h>
#include <assert.h>
#include <bitstream/mpeg/ts.h>
#include <bitstream/mpeg/psi.h>
#include <bitstream/dvb/si.h>
#include <bitstream/ebu/biss.h>
/** we only accept all kinds of blocks */
#define EXPECTED_FLOW_DEF "block."
/** but already sync'ed TS packets are better */
#define EXPECTED_FLOW_DEF_SYNC "block.mpegts."
/** or otherwise aligned TS packets to check */
#define EXPECTED_FLOW_DEF_CHECK "block.mpegtsaligned."
/** maximum number of PIDs */
#define MAX_PIDS 8192
/** 2^33 (max resolution of PCR, PTS and DTS) */
#define POW2_33 UINT64_C(8589934592)
/** max resolution of PCR, PTS and DTS */
#define TS_CLOCK_MAX (POW2_33 * UCLOCK_FREQ / 90000)
/** max interval between PCRs (ISO/IEC 13818-1 2.7.2) - could be 100 ms but
* allow higher tolerance */
#define MAX_PCR_INTERVAL UCLOCK_FREQ
/** max interval between PCRs according to ISO/IEC 13818-1 */
#define MAX_PCR_INTERVAL_ISO (100 * UCLOCK_FREQ / 1000) /* 100 ms */
/** max interval between PCRs according to DVB */
#define MAX_PCR_INTERVAL_DVB (40 * UCLOCK_FREQ / 1000) /* 40 ms */
/** max retention time for most streams (ISO/IEC 13818-1 2.4.2.6) */
#define MAX_DELAY UCLOCK_FREQ
/** number of EITs table IDs */
#define EITS_TABLEIDS 16
/** teletext frame rate */
#define TELX_FPS 25
#define UPIPE_TS_DEMUX_EMM_SIGNATURE UBASE_FOURCC('t','s','d','M')
/** @internal @This is the private context of a ts_demux manager. */
struct upipe_ts_demux_mgr {
/** refcount management structure */
struct urefcount urefcount;
/** pointer to null manager */
struct upipe_mgr *null_mgr;
/** pointer to setrap manager */
struct upipe_mgr *setrap_mgr;
/** pointer to idem manager */
struct upipe_mgr *idem_mgr;
/** pointer to probe_uref manager */
struct upipe_mgr *probe_uref_mgr;
/** pointer to ts_split manager */
struct upipe_mgr *ts_split_mgr;
/* inputs */
/** pointer to ts_sync manager */
struct upipe_mgr *ts_sync_mgr;
/** pointer to ts_check manager */
struct upipe_mgr *ts_check_mgr;
/** pointer to ts_decaps manager */
struct upipe_mgr *ts_decaps_mgr;
/* PSI */
/** pointer to ts_psim manager */
struct upipe_mgr *ts_psim_mgr;
/** pointer to ts_psi_split manager */
struct upipe_mgr *ts_psi_split_mgr;
/** pointer to ts_patd manager */
struct upipe_mgr *ts_patd_mgr;
/** pointer to ts_catd manager */
struct upipe_mgr *ts_catd_mgr;
/** pointer to ts_emmd manager */
struct upipe_mgr *ts_emmd_mgr;
/** pointer to ts_nitd manager */
struct upipe_mgr *ts_nitd_mgr;
/** pointer to ts_sdtd manager */
struct upipe_mgr *ts_sdtd_mgr;
/** pointer to ts_tdtd manager */
struct upipe_mgr *ts_tdtd_mgr;
/** pointer to ts_totd manager */
struct upipe_mgr *ts_totd_mgr;
/** pointer to ts_pmtd manager */
struct upipe_mgr *ts_pmtd_mgr;
/** pointer to ts_eitd manager */
struct upipe_mgr *ts_eitd_mgr;
/** pointer to ts_scte35d manager */
struct upipe_mgr *ts_scte35d_mgr;
/** pointer to ts_aitd manager */
struct upipe_mgr *ts_aitd_mgr;
/* ES */
/** pointer to ts_pesd manager */
struct upipe_mgr *ts_pesd_mgr;
/** pointer to autof manager */
struct upipe_mgr *autof_mgr;
/** public upipe_mgr structure */
struct upipe_mgr mgr;
};
UBASE_FROM_TO(upipe_ts_demux_mgr, upipe_mgr, upipe_mgr, mgr)
UBASE_FROM_TO(upipe_ts_demux_mgr, urefcount, urefcount, urefcount)
/** @hidden */
struct upipe_ts_demux_psi_pid;
/** @internal @This is the private context of a ts_demux pipe. */
struct upipe_ts_demux {
/** real refcount management structure */
struct urefcount urefcount_real;
/** refcount management structure exported to the public structure */
struct urefcount urefcount;
/** uref manager */
struct uref_mgr *uref_mgr;
/** uref manager request */
struct urequest uref_mgr_request;
/** pipe acting as output */
struct upipe *output;
/** output flow definition */
struct uref *flow_def;
/** output state */
enum upipe_helper_output_state output_state;
/** list of output requests */
struct uchain output_request_list;
/** list of input bin requests */
struct uchain input_request_list;
/** pointer to input inner pipe */
struct upipe *input;
/** true if we have thrown the sync_acquired event */
bool acquired;
/** flow definition of the input */
struct uref *flow_def_input;
/** pointer to null inner pipe */
struct upipe *null;
/** pointer to setrap inner pipe */
struct upipe *setrap;
/** pointer to ts_split inner pipe */
struct upipe *split;
/** psi_pid structure for PAT */
struct upipe_ts_demux_psi_pid *psi_pid_pat;
/** ts_psi_split_output inner pipe for PAT */
struct upipe *psi_split_output_pat;
/** pointer to ts_patd inner pipe */
struct upipe *patd;
/** list of available programs (from PAT) */
struct uchain pat_programs;
/** psi_pid structure for CAT */
struct upipe_ts_demux_psi_pid *psi_pid_cat;
/** ts_psi_split_output inner pipe for CAT */
struct upipe *psi_split_output_cat;
/** pointer to ts_catd inner pipe */
struct upipe *catd;
/** list of available entitlements (from CAT) */
struct uchain cat_bissca_entitlements;
/** RSA private key file */
char *private_key;
/** psi_pid structure for NIT */
struct upipe_ts_demux_psi_pid *psi_pid_nit;
/** ts_psi_split_output inner pipe for NIT */
struct upipe *psi_split_output_nit;
/** pointer to optional ts_nitd inner pipe */
struct upipe *nitd;
/** psi_pid structure for SDT */
struct upipe_ts_demux_psi_pid *psi_pid_sdt;
/** ts_psi_split_output inner pipe for SDT */
struct upipe *psi_split_output_sdt;
/** pointer to optional ts_sdtd inner pipe */
struct upipe *sdtd;
/** psi_pid structure for TDT/TOT */
struct upipe_ts_demux_psi_pid *psi_pid_tdttot;
/** ts_psi_split_output inner pipe for TDT */
struct upipe *psi_split_output_tdt;
/** pointer to optional ts_tdtd inner pipe */
struct upipe *tdtd;
/** ts_psi_split_output inner pipe for TOT */
struct upipe *psi_split_output_tot;
/** pointer to optional ts_totd inner pipe */
struct upipe *totd;
/** list of PIDs carrying PSI */
struct uchain psi_pids;
/** PID of the NIT */
uint64_t nit_pid;
/** true if the conformance is guessed from the stream */
bool auto_conformance;
/** current conformance */
enum upipe_ts_conformance conformance;
/** enable EITp/f decoder */
bool eit_enabled;
/** enable EITs table ID decoder */
bool eits_enabled;
/** maximum allowed interval between PCRs */
uint64_t max_pcr_interval;
/** probe to get new flow events from inner pipes created by psi_pid
* objects */
struct uprobe psi_pid_plumber;
/** probe to get events from ts_psim inner pipes created by psi_pid
* objects */
struct uprobe psim_probe;
/** probe to get events from ts_patd inner pipe */
struct uprobe patd_probe;
/** probe to get events from ts_catd inner pipe */
struct uprobe catd_probe;
/** probe to get events from ts_nitd inner pipe */
struct uprobe nitd_probe;
/** probe to get events from ts_sdtd inner pipe */
struct uprobe sdtd_probe;
/** probe to get events from ts_totd inner pipe */
struct uprobe totd_probe;
/** probe to get events from ts_sync or ts_check inner pipe */
struct uprobe input_probe;
/** probe to get events from ts_split inner pipe */
struct uprobe split_probe;
/** probe to proxify events from other pipes */
struct uprobe proxy_probe;
/** list of programs */
struct uchain programs;
/** list of EMMs */
struct uchain emms;
/** mamager to create EMMs */
struct upipe_mgr emm_mgr;
/** manager to create programs */
struct upipe_mgr program_mgr;
/** public upipe structure */
struct upipe upipe;
};
UPIPE_HELPER_UPIPE(upipe_ts_demux, upipe, UPIPE_TS_DEMUX_SIGNATURE)
UPIPE_HELPER_UREFCOUNT(upipe_ts_demux, urefcount, upipe_ts_demux_no_input)
UPIPE_HELPER_VOID(upipe_ts_demux)
UPIPE_HELPER_OUTPUT(upipe_ts_demux, output, flow_def, output_state,
output_request_list)
UPIPE_HELPER_SYNC(upipe_ts_demux, acquired)
UPIPE_HELPER_INNER(upipe_ts_demux, input)
UPIPE_HELPER_BIN_INPUT(upipe_ts_demux, input, input_request_list)
UPIPE_HELPER_UREF_MGR(upipe_ts_demux, uref_mgr, uref_mgr_request, NULL,
upipe_ts_demux_register_output_request,
upipe_ts_demux_unregister_output_request)
UBASE_FROM_TO(upipe_ts_demux, urefcount, urefcount_real, urefcount_real)
/** @hidden */
static void upipe_ts_demux_free(struct urefcount *urefcount_real);
/** @internal @This is the private context of a program of a ts_demux pipe. */
struct upipe_ts_demux_program {
/** real refcount management structure */
struct urefcount urefcount_real;
/** refcount management structure exported to the public structure */
struct urefcount urefcount;
/** structure for double-linked lists */
struct uchain uchain;
/** pipe acting as output */
struct upipe *output;
/** output flow definition */
struct uref *flow_def;
/** output state */
enum upipe_helper_output_state output_state;
/** list of output requests */
struct uchain output_request_list;
/** flow definition of the input */
struct uref *flow_def_input;
/** program number */
uint64_t program;
/** psi_pid structure for PMT */
struct upipe_ts_demux_psi_pid *psi_pid_pmt;
/** ts_psi_split_output inner pipe */
struct upipe *psi_split_output_pmt;
/** pointer to ts_pmtd inner pipe */
struct upipe *pmtd;
/** setflowdef to give SDT attributes to PMT */
struct upipe *setflowdef;
/** systime_rap of the last PMT */
uint64_t pmt_rap;
/** psi_pid structure for EITp/f */
struct upipe_ts_demux_psi_pid *psi_pid_eit;
/** ts_psi_split_output inner pipe for EITp/f */
struct upipe *psi_split_output_eit;
/** pointer to optional ts_eitd inner pipe for EITp/f */
struct upipe *eitd;
/** psi_pid structure for EITs */
struct upipe_ts_demux_psi_pid *psi_pid_eits[EITS_TABLEIDS];
/** ts_psi_split_output inner pipe for EITs */
struct upipe *psi_split_output_eits[EITS_TABLEIDS];
/** pointer to optional ts_eitd inner pipe for EITs */
struct upipe *eitsd[EITS_TABLEIDS];
/** psi_pid structure for ECM */
struct upipe_ts_demux_psi_pid *psi_pid_ecm;
/** ts_psi_split_output inner pipe for ECM */
struct upipe *psi_split_output_ecm;
/** pointer to ts_ecmd inner pipe */
struct upipe *ecmd;
/** PMT PID */
uint64_t pmt_pid;
/** PCR PID */
uint16_t pcr_pid;
/** PCR ts_split output inner pipe */
struct upipe *pcr_split_output;
/** offset between MPEG timestamps and Upipe timestamps */
int64_t timestamp_offset;
/** maximum allowed interval between PCRs */
uint64_t max_pcr_interval;
/** last MPEG clock reference */
uint64_t last_pcr;
/** highest Upipe timestamp given to a frame */
uint64_t timestamp_highest;
/** probe to get events from ts_pmtd inner pipe */
struct uprobe pmtd_probe;
/** probe to get events from ts_eitd inner pipes */
struct uprobe eitd_probe;
/** probe to get events from PCR ts_decaps inner pipe */
struct uprobe pcr_probe;
/** probe to proxify events from other pipes */
struct uprobe proxy_probe;
/** probe to get events from ts_ecmd inner pipe */
struct uprobe ecmd_probe;
/** list of outputs */
struct uchain outputs;
/** manager to create outputs */
struct upipe_mgr output_mgr;
/** public upipe structure */
struct upipe upipe;
};
UPIPE_HELPER_UPIPE(upipe_ts_demux_program, upipe,
UPIPE_TS_DEMUX_PROGRAM_SIGNATURE)
UPIPE_HELPER_UREFCOUNT(upipe_ts_demux_program, urefcount,
upipe_ts_demux_program_no_input)
UPIPE_HELPER_FLOW(upipe_ts_demux_program, "void.")
UPIPE_HELPER_OUTPUT(upipe_ts_demux_program, output, flow_def, output_state,
output_request_list)
UPIPE_HELPER_SUBPIPE(upipe_ts_demux, upipe_ts_demux_program, program,
program_mgr, programs, uchain)
UBASE_FROM_TO(upipe_ts_demux_program, urefcount, urefcount_real, urefcount_real)
/** @hidden */
static void upipe_ts_demux_program_free(struct urefcount *urefcount_real);
/** @internal @This is the private context of an output of a ts_demux_program
* inner pipe. */
struct upipe_ts_demux_output {
/** real refcount management structure */
struct urefcount urefcount_real;
/** refcount management structure */
struct urefcount urefcount;
/** structure for double-linked lists */
struct uchain uchain;
/** flow definition of the input */
struct uref *flow_def_input;
/** PID */
uint64_t pid;
/** true if the output is used for PCR */
bool pcr;
/** ts_split_output inner pipe */
struct upipe *split_output;
/** setrap inner pipe */
struct upipe *setrap;
/** decaps inner pipe */
struct upipe *decaps;
/** maximum retention time in the pipeline */
uint64_t max_delay;
/** last DTS orig (used for telx) */
uint64_t last_dts_orig;
/** probe to get events from probe_uref telx inner pipe */
struct uprobe telx_probe;
/** probe to get events from inner pipes */
struct uprobe probe;
/** probe to update highest timestamp after the framer */
struct uprobe timestamp_probe;
/** list of output bin requests */
struct uchain output_request_list;
/** probe for the last inner pipe */
struct uprobe last_inner_probe;
/** pointer to the last inner pipe */
struct upipe *last_inner;
/** pointer to the output of the last inner pipe */
struct upipe *output;
/** public upipe structure */
struct upipe upipe;
};
/** @hidden */
static int upipe_ts_demux_output_probe(struct uprobe *uprobe,
struct upipe *inner,
int event, va_list args);
/** @hidden */
static int upipe_ts_demux_output_telx_probe(struct uprobe *uprobe,
struct upipe *inner,
int event, va_list args);
/** @hidden */
static int upipe_ts_demux_output_timestamp_probe(struct uprobe *uprobe,
struct upipe *inner,
int event, va_list args);
UPIPE_HELPER_UPIPE(upipe_ts_demux_output, upipe,
UPIPE_TS_DEMUX_OUTPUT_SIGNATURE)
UPIPE_HELPER_UREFCOUNT(upipe_ts_demux_output, urefcount,
upipe_ts_demux_output_no_input)
UPIPE_HELPER_UREFCOUNT_REAL(upipe_ts_demux_output, urefcount_real,
upipe_ts_demux_output_free)
UPIPE_HELPER_FLOW(upipe_ts_demux_output, NULL)
UPIPE_HELPER_INNER(upipe_ts_demux_output, last_inner)
UPIPE_HELPER_UPROBE(upipe_ts_demux_output, urefcount_real, probe,
upipe_ts_demux_output_probe)
UPIPE_HELPER_UPROBE(upipe_ts_demux_output, urefcount_real, telx_probe,
upipe_ts_demux_output_telx_probe)
UPIPE_HELPER_UPROBE(upipe_ts_demux_output, urefcount_real, timestamp_probe,
upipe_ts_demux_output_timestamp_probe)
UPIPE_HELPER_UPROBE(upipe_ts_demux_output, urefcount_real, last_inner_probe,
NULL)
UPIPE_HELPER_BIN_OUTPUT(upipe_ts_demux_output, last_inner,
output, output_request_list)
UPIPE_HELPER_SUBPIPE(upipe_ts_demux_program, upipe_ts_demux_output, output,
output_mgr, outputs, uchain)
/** @internal @This is the private context of an EMM pipe. */
struct upipe_ts_demux_emm {
/** real refcount management structure */
struct urefcount urefcount_real;
/** refcount management structure */
struct urefcount urefcount;
/** structure for double-linked lists */
struct uchain uchain;
/** flow definition of the input */
struct uref *flow_def_input;
/** PID */
uint64_t pid;
/** psi_pid structure for EMM */
struct upipe_ts_demux_psi_pid *psi_pid_emm;
/** ts_psi_split_output inner pipe for EMM */
struct upipe *psi_split_output_emm;
/** pointer to ts_emmd inner pipe */
struct upipe *emmd;
/** probe to get events from inner pipes */
struct uprobe probe;
/** is up to date? */
bool updated;
/** public upipe structure */
struct upipe upipe;
};
/** @hidden */
static int upipe_ts_demux_emm_probe(struct uprobe *uprobe, struct upipe *upipe,
int event, va_list args);
UPIPE_HELPER_UPIPE(upipe_ts_demux_emm, upipe, UPIPE_TS_DEMUX_EMM_SIGNATURE)
UPIPE_HELPER_UREFCOUNT(upipe_ts_demux_emm, urefcount,
upipe_ts_demux_emm_no_input)
UPIPE_HELPER_UREFCOUNT_REAL(upipe_ts_demux_emm, urefcount_real,
upipe_ts_demux_emm_free)
UPIPE_HELPER_FLOW(upipe_ts_demux_emm, NULL)
UPIPE_HELPER_UPROBE(upipe_ts_demux_emm, urefcount_real, probe,
upipe_ts_demux_emm_probe)
UPIPE_HELPER_INNER(upipe_ts_demux_emm, psi_split_output_emm);
UPIPE_HELPER_INNER(upipe_ts_demux_emm, emmd);
UPIPE_HELPER_SUBPIPE(upipe_ts_demux, upipe_ts_demux_emm, emm,
emm_mgr, emms, uchain)
/*
* psi_pid structure handling
*/
/** @internal @This is the context of a PID carrying PSI of a ts_demux pipe. */
struct upipe_ts_demux_psi_pid {
/** structure for double-linked lists */
struct uchain uchain;
/** PID */
uint16_t pid;
/** pointer to psi_split inner pipe */
struct upipe *psi_split;
/** pointer to split_output inner pipe */
struct upipe *split_output;
/** reference count */
unsigned int refcount;
};
UBASE_FROM_TO(upipe_ts_demux_psi_pid, uchain, uchain, uchain)
/** @internal @This allocates and initializes a new PID-specific
* substructure.
*
* @param upipe description structure of the pipe
* @param pid PID
* @return pointer to allocated substructure
*/
static struct upipe_ts_demux_psi_pid *
upipe_ts_demux_psi_pid_alloc(struct upipe *upipe, uint16_t pid)
{
struct upipe_ts_demux *upipe_ts_demux = upipe_ts_demux_from_upipe(upipe);
struct upipe_ts_demux_mgr *ts_demux_mgr =
upipe_ts_demux_mgr_from_upipe_mgr(upipe->mgr);
struct upipe_ts_demux_psi_pid *psi_pid =
malloc(sizeof(struct upipe_ts_demux_psi_pid));
if (unlikely(psi_pid == NULL)) {
upipe_throw_fatal(upipe, UBASE_ERR_ALLOC);
return NULL;
}
psi_pid->pid = pid;
/* set PID filter on ts_split inner pipe */
if (unlikely((psi_pid->psi_split =
upipe_void_alloc(ts_demux_mgr->ts_psi_split_mgr,
uprobe_pfx_alloc_va(
uprobe_use(&upipe_ts_demux->proxy_probe),
UPROBE_LOG_VERBOSE,
"psi split %"PRIu16,
pid))) == NULL)) {
free(psi_pid);
upipe_throw_fatal(upipe, UBASE_ERR_ALLOC);
return NULL;
}
upipe_ts_demux_demand_uref_mgr(upipe);
struct uref *flow_def = uref_alloc_control(upipe_ts_demux->uref_mgr);
if (unlikely(flow_def == NULL ||
!ubase_check(uref_flow_set_def(flow_def, "block.mpegtspsi.")) ||
!ubase_check(uref_ts_flow_set_pid(flow_def, pid)) ||
!ubase_check(upipe_set_flow_def(psi_pid->psi_split,
flow_def)))) {
if (flow_def != NULL)
uref_free(flow_def);
free(psi_pid);
return NULL;
}
uref_flow_set_def(flow_def, "block.mpegts.mpegtspsi.");
psi_pid->split_output =
upipe_flow_alloc_sub(upipe_ts_demux->split,
uprobe_pfx_alloc_va(
uprobe_use(&upipe_ts_demux->psi_pid_plumber),
UPROBE_LOG_VERBOSE,
"split output %"PRIu16, pid),
flow_def);
uref_free(flow_def);
if (unlikely(psi_pid->split_output == NULL)) {
upipe_release(psi_pid->psi_split);
free(psi_pid);
return NULL;
}
psi_pid->refcount = 1;
uchain_init(upipe_ts_demux_psi_pid_to_uchain(psi_pid));
ulist_add(&upipe_ts_demux->psi_pids,
upipe_ts_demux_psi_pid_to_uchain(psi_pid));
return psi_pid;
}
/** @internal @This finds a psi_pid by its number.
*
* @param upipe description structure of the pipe
* @param pid PID
* @return pointer to substructure
*/
static struct upipe_ts_demux_psi_pid *
upipe_ts_demux_psi_pid_find(struct upipe *upipe, uint16_t pid)
{
struct upipe_ts_demux *upipe_ts_demux = upipe_ts_demux_from_upipe(upipe);
struct uchain *uchain;
ulist_foreach (&upipe_ts_demux->psi_pids, uchain) {
struct upipe_ts_demux_psi_pid *psi_pid =
upipe_ts_demux_psi_pid_from_uchain(uchain);
if (psi_pid->pid == pid)
return psi_pid;
}
return NULL;
}
/** @internal @This marks a PID as being used for PSI, optionally allocates
* the substructure, and increments the refcount.
*
* @param upipe description structure of the pipe
* @param pid PID
* @return pointer to substructure
*/
static struct upipe_ts_demux_psi_pid *
upipe_ts_demux_psi_pid_use(struct upipe *upipe, uint16_t pid)
{
struct upipe_ts_demux_psi_pid *psi_pid =
upipe_ts_demux_psi_pid_find(upipe, pid);
if (psi_pid == NULL)
return upipe_ts_demux_psi_pid_alloc(upipe, pid);
psi_pid->refcount++;
return psi_pid;
}
/** @internal @This releases a PID from being used for PSI, optionally
* freeing allocated resources.
*
* @param psi_pid psi_pid structure
*/
static void
upipe_ts_demux_psi_pid_release(struct upipe_ts_demux_psi_pid *psi_pid)
{
assert(psi_pid != NULL);
psi_pid->refcount--;
if (!psi_pid->refcount) {
ulist_delete(upipe_ts_demux_psi_pid_to_uchain(psi_pid));
upipe_release(psi_pid->split_output);
upipe_release(psi_pid->psi_split);
free(psi_pid);
}
}
/*
* upipe_ts_demux_output structure handling (derived from upipe structure)
*/
/** @hidden */
static void upipe_ts_demux_program_handle_pcr(struct upipe *upipe,
struct uref *uref,
uint64_t pcr_orig,
int discontinuity);
/** @hidden */
static void upipe_ts_demux_program_check_pcr(struct upipe *upipe);
/** @internal @This catches clock_ref events coming from output inner pipes.
*
* @param upipe description structure of the pipe
* @param inner pointer to the inner pipe
* @param event event triggered by the inner pipe
* @param args arguments of the event
* @return an error code
*/
static int upipe_ts_demux_output_clock_ref(struct upipe *upipe,
struct upipe *inner,
int event, va_list args)
{
struct upipe_ts_demux_output *upipe_ts_demux_output =
upipe_ts_demux_output_from_upipe(upipe);
struct upipe_ts_demux_program *program =
upipe_ts_demux_program_from_output_mgr(upipe->mgr);
if (upipe_ts_demux_output->pcr) {
struct uref *uref = va_arg(args, struct uref *);
uint64_t pcr_orig = va_arg(args, uint64_t);
int discontinuity = va_arg(args, int);
upipe_ts_demux_program_handle_pcr(
upipe_ts_demux_program_to_upipe(program),
uref, pcr_orig, discontinuity);
}
return UBASE_ERR_NONE;
}
/** @internal @This catches clock_ts events coming from output inner pipes.
*
* @param upipe description structure of the pipe
* @param inner pointer to the inner pipe
* @param event event triggered by the inner pipe
* @param args arguments of the event
* @return an error code
*/
static int upipe_ts_demux_output_clock_ts(struct upipe *upipe,
struct upipe *inner,
int event, va_list args)
{
struct upipe_ts_demux_output *output =
upipe_ts_demux_output_from_upipe(upipe);
struct upipe_ts_demux_program *program =
upipe_ts_demux_program_from_output_mgr(upipe->mgr);
struct upipe_ts_demux *demux = upipe_ts_demux_from_program_mgr(
upipe_ts_demux_program_to_upipe(program)->mgr);
struct uref *uref = va_arg(args, struct uref *);
uint64_t dts_orig;
if (ubase_check(uref_clock_get_dts_orig(uref, &dts_orig))) {
if (program->pcr_pid == 8191) {
/* No PCR, treat DTS as PCR. */
upipe_ts_demux_program_handle_pcr(
upipe_ts_demux_program_to_upipe(program),
uref, dts_orig, false);
}
uint64_t max_pcr_interval = MAX_PCR_INTERVAL_ISO;
if (demux->conformance == UPIPE_TS_CONFORMANCE_DVB)
max_pcr_interval = MAX_PCR_INTERVAL_DVB;
if (program->max_pcr_interval > max_pcr_interval)
max_pcr_interval = program->max_pcr_interval;
/* handle overflow */
if (output->max_delay + max_pcr_interval < output->max_delay)
max_pcr_interval = UINT64_MAX - output->max_delay;
/* handle 2^33 wrap-arounds */
uint64_t delta = (TS_CLOCK_MAX + dts_orig -
(program->last_pcr % TS_CLOCK_MAX)) % TS_CLOCK_MAX;
if (delta <= output->max_delay + max_pcr_interval) {
uint64_t dts = program->timestamp_offset +
program->last_pcr + delta;
uref_clock_set_dts_prog(uref, dts);
uint64_t dts_pts_delay = 0;
uref_clock_get_dts_pts_delay(uref, &dts_pts_delay);
if (dts + dts_pts_delay > program->timestamp_highest)
program->timestamp_highest = dts + dts_pts_delay;
upipe_verbose_va(upipe, "read DTS %"PRIu64" -> %"PRIu64" (pts delay %"PRIu64")",
dts_orig, dts, dts_pts_delay);
} else if (delta > TS_CLOCK_MAX / 2) {
upipe_warn_va(upipe, "late DTS %"PRIu64" (%"PRIu64" - %f ms)",
dts_orig, TS_CLOCK_MAX - delta,
(TS_CLOCK_MAX - delta) * 1000. / UCLOCK_FREQ);
} else
upipe_warn_va(upipe, "too long delay for DTS %"PRIu64" "
"(%"PRIu64" - %f ms)",
dts_orig, delta, delta * 1000. / UCLOCK_FREQ);
}
return upipe_throw(upipe, event, uref);
}
/** @internal @This catches need_output events coming from output inner pipes.
*
* @param upipe description structure of the pipe
* @param inner pointer to the inner pipe
* @param event event triggered by the inner pipe
* @param args arguments of the event
* @return an error code
*/
static int upipe_ts_demux_output_plumber(struct upipe *upipe,
struct upipe *inner,
int event, va_list args)
{
struct upipe_ts_demux_output *upipe_ts_demux_output =
upipe_ts_demux_output_from_upipe(upipe);
struct upipe_ts_demux_program *program =
upipe_ts_demux_program_from_output_mgr(upipe->mgr);
struct upipe_ts_demux *demux = upipe_ts_demux_from_program_mgr(
upipe_ts_demux_program_to_upipe(program)->mgr);
struct upipe_ts_demux_mgr *ts_demux_mgr =
upipe_ts_demux_mgr_from_upipe_mgr(upipe_ts_demux_to_upipe(demux)->mgr);
struct uref *flow_def;
const char *def;
if (!uprobe_plumber(event, args, &flow_def, &def))
return upipe_throw_proxy(upipe, inner, event, args);
if (!ubase_ncmp(def, "block.mpegts.")) {
/* allocate ts_decaps inner */
if (unlikely(upipe_ts_demux_output->decaps == NULL)) {
upipe_release(upipe_ts_demux_output->setrap);
upipe_ts_demux_output->setrap = NULL;
return UBASE_ERR_ALLOC;
}
upipe_set_output(inner, upipe_ts_demux_output->decaps);
return UBASE_ERR_NONE;
}
if (!ubase_ncmp(def, "block.mpegtspes.")) {
/* allocate ts_pesd inner */
struct upipe *output =
upipe_void_alloc_output(inner, ts_demux_mgr->ts_pesd_mgr,
uprobe_pfx_alloc(uprobe_use(&upipe_ts_demux_output->probe),
UPROBE_LOG_VERBOSE, "pesd"));
if (unlikely(output == NULL))
return UBASE_ERR_ALLOC;
upipe_release(output);
return UBASE_ERR_NONE;
}
if (!ubase_ncmp(def, "block.mpegtspsi.mpegtsscte35.")) {
/* allocate ts_psim inner */
struct upipe *output =
upipe_void_alloc_output(inner, ts_demux_mgr->ts_psim_mgr,
uprobe_pfx_alloc(
uprobe_use(&upipe_ts_demux_output->probe),
UPROBE_LOG_VERBOSE, "psim"));
if (unlikely(output == NULL))
return UBASE_ERR_ALLOC;
int err =
upipe_void_spawn_output(output, ts_demux_mgr->ts_scte35d_mgr,
uprobe_pfx_alloc(uprobe_use(&upipe_ts_demux_output->probe),
UPROBE_LOG_VERBOSE, "scte35d"));
upipe_release(output);
if (unlikely(!ubase_check(err)))
return UBASE_ERR_ALLOC;
return UBASE_ERR_NONE;
}
if (!ubase_ncmp(def, "block.mpegtspsi.mpegtsait.")) {
/* allocate ts_psim inner */
struct upipe *output =
upipe_void_alloc_output(inner, ts_demux_mgr->ts_psim_mgr,
uprobe_pfx_alloc(
uprobe_use(&upipe_ts_demux_output->probe),
UPROBE_LOG_VERBOSE, "psim"));
if (unlikely(output == NULL))
return UBASE_ERR_ALLOC;
/** allocate AIT decoder */
output = upipe_void_chain_output(output, ts_demux_mgr->ts_aitd_mgr,
uprobe_pfx_alloc(
uprobe_use(&upipe_ts_demux_output->last_inner_probe),
UPROBE_LOG_VERBOSE, "aitd"));
if (unlikely(output == NULL))
return UBASE_ERR_ALLOC;
upipe_ts_demux_output_store_bin_output(upipe, output);
return UBASE_ERR_NONE;
}
if (!ubase_ncmp(def, "block.dvb_teletext.") &&
ts_demux_mgr->probe_uref_mgr != NULL) {
/* allocate probe_uref for teletext without PTS */
inner = upipe_void_alloc_output(inner, ts_demux_mgr->probe_uref_mgr,
uprobe_pfx_alloc(
uprobe_use(&upipe_ts_demux_output->telx_probe),
UPROBE_LOG_VERBOSE, "telx_probe"));
if (unlikely(inner == NULL))
return UBASE_ERR_ALLOC;
upipe_release(inner);
}
if (ts_demux_mgr->autof_mgr != NULL) {
/* allocate autof inner */
struct upipe *output =
upipe_void_alloc_output(inner, ts_demux_mgr->autof_mgr,
uprobe_pfx_alloc(
uprobe_use(&upipe_ts_demux_output->last_inner_probe),
UPROBE_LOG_VERBOSE, "autof"));
if (unlikely(output == NULL))
return UBASE_ERR_ALLOC;
/* allocate probe_uref to watch pts */
output = upipe_void_chain_output(
output, ts_demux_mgr->probe_uref_mgr,
uprobe_pfx_alloc(
uprobe_use(&upipe_ts_demux_output->timestamp_probe),
UPROBE_LOG_VERBOSE, "autof probe"));
if (unlikely(output == NULL))
return UBASE_ERR_ALLOC;
upipe_ts_demux_output_store_bin_output(upipe, output);
return UBASE_ERR_NONE;
}
upipe_warn_va(upipe, "unframed output flow definition: %s", def);
/* allocate idem inner */
struct upipe *output =
upipe_void_alloc_output(inner, ts_demux_mgr->idem_mgr,
uprobe_pfx_alloc(
uprobe_use(&upipe_ts_demux_output->last_inner_probe),
UPROBE_LOG_VERBOSE, "idem"));
if (unlikely(output == NULL))
return UBASE_ERR_ALLOC;
upipe_ts_demux_output_store_bin_output(upipe, output);
return UBASE_ERR_NONE;
}
/** @internal @This catches events coming from output inner pipes.
*
* @param uprobe pointer to the probe in upipe_ts_demux_output
* @param inner pointer to the inner pipe
* @param event event triggered by the inner pipe
* @param args arguments of the event
* @return an error code
*/
static int upipe_ts_demux_output_probe(struct uprobe *uprobe,
struct upipe *inner,
int event, va_list args)
{
struct upipe_ts_demux_output *upipe_ts_demux_output =
container_of(uprobe, struct upipe_ts_demux_output, probe);
struct upipe *upipe = upipe_ts_demux_output_to_upipe(upipe_ts_demux_output);
switch (event) {
case UPROBE_CLOCK_REF:
return upipe_ts_demux_output_clock_ref(upipe, inner, event, args);
case UPROBE_CLOCK_TS:
return upipe_ts_demux_output_clock_ts(upipe, inner, event, args);
case UPROBE_NEED_OUTPUT:
return upipe_ts_demux_output_plumber(upipe, inner, event, args);
/* Do not proxy source_end because upipe_ts_split also throws
* source_end and this confuses uprobe_selflow. */
case UPROBE_SOURCE_END:
return UBASE_ERR_NONE;