Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 67 additions & 26 deletions lib/upipe-modules/upipe_aggregate.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2013-2015 OpenHeadend S.A.R.L.
* Copyright (C) 2024 EasyTools
*
* Authors: Benjamin Cohen
*
Expand Down Expand Up @@ -111,6 +112,61 @@ static struct upipe *upipe_agg_alloc(struct upipe_mgr *mgr,
return upipe;
}

/** @internal @This flushes the aggregated buffer.
*
* @param upipe description structure of the pipe
* @param upump_p reference to pump that generated the buffer
*/
static void upipe_agg_flush(struct upipe *upipe, struct upump **upump_p)
{
struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe);
struct uref *uref = upipe_agg->aggregated;
upipe_agg->aggregated = NULL;
if (uref)
upipe_agg_output(upipe, uref, upump_p);
}

/** @internal @This sets the input flow definition for real.
*
* @param upipe description structure of the pipe
* @param flow_def flow definition packet
* @return an error code
*/
static int upipe_agg_set_flow_def_real(struct upipe *upipe,
struct uref *flow_def)
{
struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe);
uint64_t size = 0;
uint64_t octetrate = 0;
uint64_t latency = 0;

uref_block_flow_get_size(flow_def, &size);
uref_block_flow_get_octetrate(flow_def, &octetrate);
uref_clock_get_latency(flow_def, &latency);

upipe_agg->input_size = size;

int err = uref_block_flow_set_size(flow_def, upipe_agg->output_size);
if (unlikely(!ubase_check(err))) {
uref_free(flow_def);
upipe_throw_fatal(upipe, err);
return err;
}

if (octetrate) {
latency += (uint64_t)upipe_agg->output_size * UCLOCK_FREQ / octetrate;
err = uref_clock_set_latency(flow_def, latency);
if (unlikely(!ubase_check(err))) {
uref_free(flow_def);
upipe_throw_fatal(upipe, err);
return err;
}
}
upipe_agg_store_flow_def(upipe, flow_def);
return UBASE_ERR_NONE;
}


/** @internal @This receives data.
*
* @param upipe description structure of the pipe
Expand All @@ -124,6 +180,12 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref,
size_t size = 0;
const size_t output_size = upipe_agg->output_size;

if (unlikely(ubase_check(uref_flow_get_def(uref, NULL)))) {
upipe_agg_flush(upipe, upump_p);
upipe_agg_set_flow_def_real(upipe, uref);
return;
}

uref_block_size(uref, &size);

/* check for invalid or too large size */
Expand All @@ -135,10 +197,8 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref,
}

/* flush if incoming packet makes aggregated overflow */
if (upipe_agg->size + size > output_size) {
upipe_agg_output(upipe, upipe_agg->aggregated, upump_p);
upipe_agg->aggregated = NULL;
}
if (upipe_agg->size + size > output_size)
upipe_agg_flush(upipe, upump_p);

/* keep or attach incoming packet */
if (unlikely(!upipe_agg->aggregated)) {
Expand All @@ -159,11 +219,8 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref,
/* anticipate next packet size and flush now if necessary */
if (upipe_agg->input_size)
size = upipe_agg->input_size;
if (unlikely(upipe_agg->size + size > output_size)) {
upipe_agg_output(upipe, upipe_agg->aggregated, upump_p);
upipe_agg->aggregated = NULL;
upipe_agg->size = 0;
}
if (unlikely(upipe_agg->size + size > output_size))
upipe_agg_flush(upipe, upump_p);
}

/** @internal @This sets the input flow definition.
Expand All @@ -178,26 +235,10 @@ static int upipe_agg_set_flow_def(struct upipe *upipe, struct uref *flow_def)
return UBASE_ERR_INVALID;
UBASE_RETURN(uref_flow_match_def(flow_def, EXPECTED_FLOW_DEF))

struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe);
uint64_t size = 0;
uref_block_flow_get_size(flow_def, &size);
upipe_agg->input_size = size;

uint64_t octetrate = 0;
uref_block_flow_get_octetrate(flow_def, &octetrate);
uint64_t latency = 0;
uref_clock_get_latency(flow_def, &latency);

struct uref *flow_def_dup;
if ((flow_def_dup = uref_dup(flow_def)) == NULL)
return UBASE_ERR_ALLOC;
UBASE_RETURN(uref_block_flow_set_size(flow_def_dup, upipe_agg->output_size))

if (octetrate) {
UBASE_RETURN(uref_clock_set_latency(flow_def_dup, latency +
(uint64_t)upipe_agg->output_size * UCLOCK_FREQ / octetrate))
}
upipe_agg_store_flow_def(upipe, flow_def_dup);
upipe_agg_input(upipe, flow_def_dup, NULL);
return UBASE_ERR_NONE;
}

Expand Down
Loading