Skip to content

Commit 8816c0f

Browse files
quariumcmassiot
authored andcommitted
upipe_avfilt: rework latency
1 parent afc2d5b commit 8816c0f

File tree

1 file changed

+109
-40
lines changed

1 file changed

+109
-40
lines changed

lib/upipe-av/upipe_avfilter.c

Lines changed: 109 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ struct upipe_avfilt {
173173
AVFilterGraph *filter_graph;
174174
/** filter graph is configured? */
175175
bool configured;
176+
/** filter graph is not a source only? */
177+
bool has_input;
176178

177179
/** reference to hardware device context for filters */
178180
AVBufferRef *hw_device_ctx;
@@ -187,6 +189,13 @@ struct upipe_avfilt {
187189
/** uref from input */
188190
struct uref *uref;
189191

192+
/** input latency */
193+
uint64_t input_latency;
194+
/** last input pts prog */
195+
uint64_t last_input_pts_prog;
196+
/** last input pts sys */
197+
uint64_t last_input_pts_sys;
198+
190199
/** public upipe structure */
191200
struct upipe upipe;
192201
};
@@ -196,7 +205,8 @@ static int upipe_avfilt_init_filters(struct upipe *upipe);
196205
/** @hidden */
197206
static void upipe_avfilt_clean_filters(struct upipe *upipe);
198207
/** @hidden */
199-
static void upipe_avfilt_update_outputs(struct upipe *upipe);
208+
static void upipe_avfilt_update_outputs(struct upipe *upipe,
209+
struct upump **upump_p);
200210

201211
UPIPE_HELPER_UPIPE(upipe_avfilt, upipe, UPIPE_AVFILT_SIGNATURE)
202212
UPIPE_HELPER_VOID(upipe_avfilt)
@@ -410,6 +420,7 @@ static struct uref *upipe_avfilt_sub_build_flow_def(struct upipe *upipe,
410420
{
411421
struct upipe_avfilt_sub *upipe_avfilt_sub =
412422
upipe_avfilt_sub_from_upipe(upipe);
423+
struct upipe_avfilt *upipe_avfilt = upipe_avfilt_from_sub_mgr(upipe->mgr);
413424
AVFilterContext *ctx = upipe_avfilt_sub->buffer_ctx;
414425

415426
struct uref *flow_def =
@@ -423,7 +434,33 @@ static struct uref *upipe_avfilt_sub_build_flow_def(struct upipe *upipe,
423434
upipe_err_va(upipe, "unknown buffersink type");
424435
return NULL;
425436
}
437+
438+
uint64_t latency = 0;
439+
if (frame->pts != AV_NOPTS_VALUE) {
440+
AVRational time_base = av_buffersink_get_time_base(ctx);
441+
442+
uint64_t pts =
443+
av_rescale_q(frame->pts, time_base, av_make_q(1, UCLOCK_FREQ));
444+
if (upipe_avfilt->last_input_pts_prog != UINT64_MAX &&
445+
upipe_avfilt->last_input_pts_prog >= pts) {
446+
latency = upipe_avfilt->last_input_pts_prog - pts;
447+
} else if (upipe_avfilt->last_input_pts_prog != UINT64_MAX) {
448+
upipe_warn_va(upipe, "pts in the past %.2f ms",
449+
(pts - upipe_avfilt->last_input_pts_prog) * 1000. /
450+
UCLOCK_FREQ);
451+
}
452+
} else {
453+
upipe_warn(upipe, "no pts");
454+
}
455+
if (upipe_avfilt->input_latency + latency > upipe_avfilt_sub->latency) {
456+
upipe_notice_va(upipe, "increase latency %.2fms to %.2fms",
457+
(upipe_avfilt_sub->latency * 1000.) / UCLOCK_FREQ,
458+
((upipe_avfilt->input_latency + latency) * 1000.) /
459+
UCLOCK_FREQ);
460+
upipe_avfilt_sub->latency = upipe_avfilt->input_latency + latency;
461+
}
426462
uref_clock_set_latency(flow_def, upipe_avfilt_sub->latency);
463+
427464
return flow_def;
428465
}
429466

@@ -453,6 +490,7 @@ upipe_avfilt_sub_frame_to_uref(struct upipe *upipe, AVFrame *frame)
453490
{
454491
struct upipe_avfilt_sub *upipe_avfilt_sub =
455492
upipe_avfilt_sub_from_upipe(upipe);
493+
struct upipe_avfilt *upipe_avfilt = upipe_avfilt_from_sub_mgr(upipe->mgr);
456494

457495
struct uref *flow_def = upipe_avfilt_sub_build_flow_def(upipe, frame);
458496
if (unlikely(flow_def == NULL)) {
@@ -498,9 +536,6 @@ upipe_avfilt_sub_frame_to_uref(struct upipe *upipe, AVFrame *frame)
498536
}
499537
uref_attach_ubuf(uref, ubuf);
500538

501-
/* get system time */
502-
uint64_t now = upipe_avfilt_sub_now(upipe);
503-
504539
/* set pts orig */
505540
AVRational time_base = av_buffersink_get_time_base(
506541
upipe_avfilt_sub->buffer_ctx);
@@ -518,23 +553,18 @@ upipe_avfilt_sub_frame_to_uref(struct upipe *upipe, AVFrame *frame)
518553
}
519554
upipe_avfilt_sub->last_pts_prog = pts_prog;
520555

521-
if (upipe_avfilt_sub->pts_sys_offset == UINT64_MAX) {
522-
upipe_avfilt_sub->pts_sys_offset = now;
523-
upipe_avfilt_sub->first_pts_prog = pts_prog;
524-
}
525556
uint64_t pts_sys = UINT64_MAX;
526-
if (upipe_avfilt_sub->pts_sys_offset != UINT64_MAX) {
557+
if (upipe_avfilt->last_input_pts_sys != UINT64_MAX &&
558+
upipe_avfilt->last_input_pts_prog != UINT64_MAX)
559+
pts_sys = upipe_avfilt->last_input_pts_sys +
560+
(upipe_avfilt->last_input_pts_prog - pts_prog);
561+
else if (upipe_avfilt_sub->pts_sys_offset == UINT64_MAX) {
562+
upipe_avfilt_sub->pts_sys_offset = upipe_avfilt_sub_now(upipe);
563+
upipe_avfilt_sub->first_pts_prog = pts_prog;
564+
pts_sys = upipe_avfilt_sub->pts_sys_offset;
565+
} else {
527566
pts_sys = upipe_avfilt_sub->pts_sys_offset +
528-
pts_prog - upipe_avfilt_sub->first_pts_prog;
529-
}
530-
531-
if (pts_sys < now && now - pts_sys > upipe_avfilt_sub->latency) {
532-
upipe_avfilt_sub->latency = now - pts_sys;
533-
uref_clock_set_latency(upipe_avfilt_sub->flow_def,
534-
upipe_avfilt_sub->latency);
535-
struct uref *flow_def = upipe_avfilt_sub->flow_def;
536-
upipe_avfilt_sub->flow_def = NULL;
537-
upipe_avfilt_sub_store_flow_def(upipe, flow_def);
567+
(pts_prog - upipe_avfilt_sub->first_pts_prog);
538568
}
539569

540570
uref_clock_set_pts_orig(uref, pts_orig);
@@ -1099,9 +1129,8 @@ static int upipe_avfilt_avframe_from_uref(struct upipe *upipe,
10991129
* @param uref input buffer to handle
11001130
* @param upump_p reference to the pump that generated the buffer
11011131
*/
1102-
static void upipe_avfilt_sub_input(struct upipe *upipe,
1103-
struct uref *uref,
1104-
struct upump **upump_p)
1132+
static void upipe_avfilt_sub_input(struct upipe *upipe, struct uref *uref,
1133+
struct upump **upump_p)
11051134
{
11061135
struct upipe_avfilt_sub *upipe_avfilt_sub =
11071136
upipe_avfilt_sub_from_upipe(upipe);
@@ -1141,6 +1170,17 @@ static void upipe_avfilt_sub_input(struct upipe *upipe,
11411170
return;
11421171
}
11431172

1173+
uint64_t pts_sys = UINT64_MAX;
1174+
uint64_t pts_prog = UINT64_MAX;
1175+
if (ubase_check(uref_clock_get_pts_sys(uref, &pts_sys)) &&
1176+
ubase_check(uref_clock_get_pts_prog(uref, &pts_prog))) {
1177+
if (upipe_avfilt->last_input_pts_prog == UINT64_MAX ||
1178+
pts_prog > upipe_avfilt->last_input_pts_prog) {
1179+
upipe_avfilt->last_input_pts_prog = pts_prog;
1180+
upipe_avfilt->last_input_pts_sys = pts_sys;
1181+
}
1182+
}
1183+
11441184
if (!ubase_check(ubuf_av_get_avframe(uref->ubuf, frame))) {
11451185
int ret = upipe_avfilt_avframe_from_uref(upipe, uref,
11461186
upipe_avfilt_sub->flow_def_input,
@@ -1173,7 +1213,7 @@ static void upipe_avfilt_sub_input(struct upipe *upipe,
11731213
return;
11741214
}
11751215

1176-
upipe_avfilt_update_outputs(upipe_avfilt_to_upipe(upipe_avfilt));
1216+
upipe_avfilt_update_outputs(upipe_avfilt_to_upipe(upipe_avfilt), upump_p);
11771217
}
11781218

11791219
/** @internal @This sets the input sub pipe flow definition.
@@ -1198,6 +1238,10 @@ static int upipe_avfilt_sub_set_flow_def(struct upipe *upipe,
11981238
UBASE_ALLOC_RETURN(flow_def_dup);
11991239

12001240
upipe_avfilt_clean_filters(upipe_avfilt_to_upipe(upipe_avfilt));
1241+
uint64_t input_latency = 0;
1242+
uref_clock_get_latency(flow_def_dup, &input_latency);
1243+
if (input_latency > upipe_avfilt->input_latency)
1244+
upipe_avfilt->input_latency = input_latency;
12011245
upipe_avfilt_sub_store_flow_def_input(upipe, flow_def_dup);
12021246
upipe_avfilt_init_filters(upipe_avfilt_to_upipe(upipe_avfilt));
12031247

@@ -1253,7 +1297,7 @@ static int upipe_avfilt_sub_check(struct upipe *upipe)
12531297
if (!upipe_avfilt_sub->upump_mgr)
12541298
return UBASE_ERR_NONE;
12551299

1256-
if (upipe_avfilt->filter_graph)
1300+
if (upipe_avfilt->configured && !upipe_avfilt->has_input)
12571301
upipe_avfilt_sub_wait(upipe, 0);
12581302

12591303
return UBASE_ERR_NONE;
@@ -1278,9 +1322,11 @@ static int upipe_avfilt_sub_control(struct upipe *upipe, int cmd, va_list args)
12781322
*
12791323
* @param upipe description structure of the pipe
12801324
* @param configured true if the filter is configured, false otherwise
1325+
* @param has_input true if the filter has an input
12811326
* @return an error code
12821327
*/
1283-
static int upipe_avfilt_set_configured(struct upipe *upipe, bool configured)
1328+
static int upipe_avfilt_set_configured(struct upipe *upipe, bool configured,
1329+
bool has_input)
12841330
{
12851331
struct upipe_avfilt *upipe_avfilt = upipe_avfilt_from_upipe(upipe);
12861332

@@ -1291,6 +1337,7 @@ static int upipe_avfilt_set_configured(struct upipe *upipe, bool configured)
12911337
upipe_avfilt->filters_desc ?: "(none)",
12921338
configured ? "configured" :
12931339
"not configured");
1340+
upipe_avfilt->has_input = has_input;
12941341
if (configured)
12951342
return upipe_avfilt_sync_acquired(upipe);
12961343
return upipe_avfilt_sync_lost(upipe);
@@ -1299,15 +1346,28 @@ static int upipe_avfilt_set_configured(struct upipe *upipe, bool configured)
12991346
/** @internal @This updates the outputs if needed.
13001347
*
13011348
* @param upipe description structure of the pipe
1349+
* @param upump_p reference to the pump that generated the last input buffer
13021350
*/
1303-
static void upipe_avfilt_update_outputs(struct upipe *upipe)
1351+
static void upipe_avfilt_update_outputs(struct upipe *upipe,
1352+
struct upump **upump_p)
13041353
{
13051354
struct upipe_avfilt *upipe_avfilt = upipe_avfilt_from_upipe(upipe);
13061355
struct uchain *uchain;
13071356
ulist_foreach(&upipe_avfilt->subs, uchain) {
13081357
struct upipe_avfilt_sub *sub = upipe_avfilt_sub_from_uchain(uchain);
1309-
if (!sub->flow_def_input)
1310-
upipe_avfilt_sub_wait(upipe_avfilt_sub_to_upipe(sub), 0);
1358+
struct upipe *sub_pipe = upipe_avfilt_sub_to_upipe(sub);
1359+
1360+
if (sub->flow_def_input)
1361+
continue;
1362+
if (!upipe_avfilt->has_input)
1363+
upipe_avfilt_sub_wait(sub_pipe, 0);
1364+
else {
1365+
struct uref *uref;
1366+
upipe_use(sub_pipe);
1367+
while ((uref = upipe_avfilt_sub_pop(sub_pipe)))
1368+
upipe_avfilt_sub_output(sub_pipe, uref, upump_p);
1369+
upipe_release(sub_pipe);
1370+
}
13111371
}
13121372
}
13131373

@@ -1329,7 +1389,7 @@ static void upipe_avfilt_clean_filters(struct upipe *upipe)
13291389
}
13301390
avfilter_graph_free(&upipe_avfilt->filter_graph);
13311391
upipe_avfilt->buffer_ctx = NULL;
1332-
upipe_avfilt_set_configured(upipe, false);
1392+
upipe_avfilt_set_configured(upipe, false, false);
13331393
}
13341394

13351395
/** @internal @This initializes the avfilter graph.
@@ -1353,6 +1413,8 @@ static int upipe_avfilt_init_filters(struct upipe *upipe)
13531413
return UBASE_ERR_NONE;
13541414

13551415
upipe_avfilt->filter_graph = avfilter_graph_alloc();
1416+
upipe_avfilt->last_input_pts_prog = UINT64_MAX;
1417+
upipe_avfilt->last_input_pts_sys = UINT64_MAX;
13561418

13571419
AVDictionaryEntry *option = NULL;
13581420
while ((option = av_dict_get(upipe_avfilt->options,
@@ -1367,6 +1429,7 @@ static int upipe_avfilt_init_filters(struct upipe *upipe)
13671429
}
13681430
}
13691431

1432+
bool has_input = false;
13701433
ulist_foreach(&upipe_avfilt->subs, uchain) {
13711434
struct upipe_avfilt_sub *sub = upipe_avfilt_sub_from_uchain(uchain);
13721435
if (!sub->flow_def_input)
@@ -1377,6 +1440,7 @@ static int upipe_avfilt_init_filters(struct upipe *upipe)
13771440
upipe_err_va(upipe, "create filter for input %s failed", sub->name);
13781441
goto end;
13791442
}
1443+
has_input = true;
13801444
}
13811445

13821446
ret = UBASE_ERR_EXTERNAL;
@@ -1509,9 +1573,10 @@ static int upipe_avfilt_init_filters(struct upipe *upipe)
15091573
}
15101574

15111575
ret = UBASE_ERR_NONE;
1512-
upipe_avfilt_set_configured(upipe, true);
1576+
upipe_avfilt_set_configured(upipe, true, has_input);
15131577

1514-
upipe_avfilt_update_outputs(upipe_avfilt_to_upipe(upipe_avfilt));
1578+
if (has_input)
1579+
upipe_avfilt_update_outputs(upipe_avfilt_to_upipe(upipe_avfilt), NULL);
15151580

15161581
end:
15171582
if (!ubase_check(ret))
@@ -1634,6 +1699,8 @@ static int upipe_avfilt_set_flow_def(struct upipe *upipe,
16341699
upipe_avfilt_clean_filters(upipe_avfilt_to_upipe(upipe_avfilt));
16351700
upipe_avfilt_store_flow_def_input(upipe, NULL);
16361701

1702+
upipe_avfilt->input_latency = 0;
1703+
uref_clock_get_latency(flow_def_dup, &upipe_avfilt->input_latency);
16371704
struct uref *uref = upipe_avfilt_store_flow_def_input(upipe, flow_def_dup);
16381705
if (uref != NULL)
16391706
/* output flow definition will be set when outputting frames */
@@ -1844,18 +1911,14 @@ static void upipe_avfilt_output_frame(struct upipe *upipe,
18441911
if (frame->pts != AV_NOPTS_VALUE) {
18451912
uint64_t pts = av_rescale_q(frame->pts, time_base,
18461913
av_make_q(1, UCLOCK_FREQ));
1847-
uint64_t input_pts;
1848-
if (ubase_check(uref_clock_get_pts_prog(
1849-
upipe_avfilt->uref, &input_pts)) &&
1850-
input_pts >= pts) {
1851-
latency = input_pts - pts;
1914+
if (upipe_avfilt->last_input_pts_prog != UINT64_MAX &&
1915+
upipe_avfilt->last_input_pts_prog >= pts) {
1916+
latency = upipe_avfilt->last_input_pts_prog - pts;
18521917
upipe_notice_va(upipe, "latency: %" PRIu64 " ms",
18531918
1000 * latency / UCLOCK_FREQ);
18541919
}
18551920
}
1856-
uint64_t input_latency = 0;
1857-
uref_clock_get_latency(upipe_avfilt->flow_def_input, &input_latency);
1858-
uref_clock_set_latency(flow_def, input_latency + latency);
1921+
uref_clock_set_latency(flow_def, upipe_avfilt->input_latency + latency);
18591922

18601923
upipe_avfilt_store_flow_def(upipe, flow_def);
18611924
} else {
@@ -1969,6 +2032,8 @@ static void upipe_avfilt_input(struct upipe *upipe,
19692032
}
19702033

19712034
uref_free(upipe_avfilt->uref);
2035+
upipe_avfilt->last_input_pts_prog = UINT64_MAX;
2036+
uref_clock_get_pts_prog(uref, &upipe_avfilt->last_input_pts_prog);
19722037
upipe_avfilt->uref = uref;
19732038

19742039
if (!ubase_check(ubuf_av_get_avframe(uref->ubuf, frame))) {
@@ -2006,7 +2071,7 @@ static void upipe_avfilt_input(struct upipe *upipe,
20062071
upipe_throw_error(upipe, UBASE_ERR_EXTERNAL);
20072072
goto end;
20082073
}
2009-
upipe_avfilt_set_configured(upipe, true);
2074+
upipe_avfilt_set_configured(upipe, true, true);
20102075
}
20112076

20122077
/* push incoming frame to the filtergraph */
@@ -2166,6 +2231,10 @@ static struct upipe *upipe_avfilt_alloc(struct upipe_mgr *mgr,
21662231
upipe_avfilt->buffersink_ctx = NULL;
21672232
upipe_avfilt->uref = NULL;
21682233
upipe_avfilt->options = NULL;
2234+
upipe_avfilt->input_latency = 0;
2235+
upipe_avfilt->last_input_pts_prog = UINT64_MAX;
2236+
upipe_avfilt->configured = false;
2237+
upipe_avfilt->has_input = false;
21692238

21702239
upipe_throw_ready(upipe);
21712240

0 commit comments

Comments
 (0)