Skip to content

Commit bc8fa7f

Browse files
committed
realtime output
1 parent c317138 commit bc8fa7f

File tree

2 files changed

+71
-18
lines changed

2 files changed

+71
-18
lines changed

examples/pcap.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
#include "upipe/uprobe_stdio.h"
2929
#include "upipe/uprobe_prefix.h"
3030
#include "upipe/uprobe_uref_mgr.h"
31+
#include "upipe/uprobe_uclock.h"
3132
#include "upipe/uprobe_upump_mgr.h"
3233
#include "upipe/uprobe_ubuf_mem.h"
3334
#include "upipe/umem.h"
35+
#include "upipe/uclock.h"
36+
#include "upipe/uclock_std.h"
3437
#include "upipe/umem_pool.h"
3538
#include "upipe/udict.h"
3639
#include "upipe/udict_inline.h"
@@ -71,6 +74,8 @@ int main(int argc, char **argv)
7174
uref_std_mgr_alloc(UREF_POOL_DEPTH, udict_mgr, 0);
7275
udict_mgr_release(udict_mgr);
7376

77+
struct uclock *uclock = uclock_std_alloc(0);
78+
7479
/* probes */
7580
struct uprobe *uprobe;
7681
uprobe = uprobe_stdio_alloc(NULL, stderr, UPROBE_LOG_DEBUG);
@@ -82,6 +87,9 @@ int main(int argc, char **argv)
8287
uprobe = uprobe_ubuf_mem_alloc(uprobe, umem_mgr, UBUF_POOL_DEPTH,
8388
UBUF_SHARED_POOL_DEPTH);
8489
assert(uprobe != NULL);
90+
uprobe = uprobe_uclock_alloc(uprobe, uclock);
91+
assert(uprobe != NULL);
92+
8593
uref_mgr_release(uref_mgr);
8694
upump_mgr_release(upump_mgr);
8795
umem_mgr_release(umem_mgr);
@@ -93,6 +101,8 @@ int main(int argc, char **argv)
93101
uprobe_pfx_alloc(uprobe_use(uprobe), UPROBE_LOG_DEBUG, "pcap"));
94102
upipe_mgr_release(upipe_pcap_src_mgr);
95103

104+
upipe_attach_uclock(upipe_src);
105+
96106
struct upipe_mgr *upipe_null_mgr = upipe_null_mgr_alloc();
97107
struct upipe *upipe = upipe_void_alloc_output(upipe_src, upipe_null_mgr,
98108
uprobe_pfx_alloc(uprobe_use(uprobe), UPROBE_LOG_DEBUG, "null"));
@@ -109,6 +119,7 @@ int main(int argc, char **argv)
109119
/* main loop */
110120
upump_mgr_run(upump_mgr, NULL);
111121

122+
uclock_release(uclock);
112123
upipe_release(upipe_src);
113124

114125
return 0;

lib/upipe-modules/upipe_pcap_src.c

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,26 @@
3131
- avoid memcpy? custom ubuf_mgr to delay pcap_close?
3232
- filtering? pcap_setfilter() or something else
3333
- set uref source ip?
34-
- sleep to release packets in real time
3534
*/
3635

3736
#include <stdlib.h>
3837

39-
#include <upipe/upipe.h>
40-
#include <upipe/upump.h>
41-
#include <upipe/uref_block.h>
42-
#include <upipe/uref_clock.h>
43-
#include <upipe/uclock.h>
44-
#include <upipe/uref_block_flow.h>
45-
#include <upipe/upipe_helper_upipe.h>
46-
#include <upipe/upipe_helper_urefcount.h>
47-
#include <upipe/upipe_helper_void.h>
48-
#include <upipe/upipe_helper_upump_mgr.h>
49-
#include <upipe/upipe_helper_upump.h>
50-
#include <upipe/upipe_helper_output.h>
51-
#include <upipe/upipe_helper_uref_mgr.h>
52-
#include <upipe/upipe_helper_ubuf_mgr.h>
53-
#include <upipe-modules/upipe_pcap_src.h>
38+
#include "upipe/upipe.h"
39+
#include "upipe/upump.h"
40+
#include "upipe/uref_block.h"
41+
#include "upipe/uref_clock.h"
42+
#include "upipe/uclock.h"
43+
#include "upipe/uref_block_flow.h"
44+
#include "upipe/upipe_helper_upipe.h"
45+
#include "upipe/upipe_helper_urefcount.h"
46+
#include "upipe/upipe_helper_void.h"
47+
#include "upipe/upipe_helper_upump_mgr.h"
48+
#include "upipe/upipe_helper_upump.h"
49+
#include "upipe/upipe_helper_output.h"
50+
#include "upipe/upipe_helper_uref_mgr.h"
51+
#include "upipe/upipe_helper_ubuf_mgr.h"
52+
#include "upipe/upipe_helper_uclock.h"
53+
#include "upipe-modules/upipe_pcap_src.h"
5454

5555
#include <pcap/pcap.h>
5656

@@ -89,8 +89,15 @@ struct upipe_pcap_src {
8989
/** ubuf manager request */
9090
struct urequest ubuf_mgr_request;
9191

92+
/** uclock structure, if not NULL we are in live mode */
93+
struct uclock *uclock;
94+
/** uclock request */
95+
struct urequest uclock_request;
96+
9297
pcap_t *pcap;
9398
char errbuf[PCAP_ERRBUF_SIZE];
99+
uint64_t cr_offset;
100+
struct uref *uref;
94101

95102
/** public upipe structure */
96103
struct upipe upipe;
@@ -113,14 +120,17 @@ UPIPE_HELPER_UBUF_MGR(upipe_pcap_src, ubuf_mgr, flow_format, ubuf_mgr_request,
113120
upipe_pcap_src_unregister_output_request)
114121
UPIPE_HELPER_UPUMP_MGR(upipe_pcap_src, upump_mgr)
115122
UPIPE_HELPER_UPUMP(upipe_pcap_src, upump, upump_mgr)
123+
UPIPE_HELPER_UCLOCK(upipe_pcap_src, uclock, uclock_request, upipe_pcap_src_check,
124+
upipe_pcap_src_register_output_request,
125+
upipe_pcap_src_unregister_output_request)
116126

117127
/* Skip straight to UDP data */
118128
static size_t upipe_pcap_skip(struct upipe *upipe, const uint8_t *buf, size_t len)
119129
{
120130
if (len < ETHERNET_HEADER_LEN + ETHERNET_VLAN_LEN)
121131
return 0;
122132

123-
const uint8_t *ip = ethernet_payload(buf);
133+
const uint8_t *ip = ethernet_payload((uint8_t*)buf);
124134

125135
len -= ip - buf;
126136

@@ -147,6 +157,11 @@ static void upipe_pcap_src_worker(struct upump *upump)
147157

148158
pcap_t *pcap = upipe_pcap_src->pcap;
149159

160+
if (upipe_pcap_src->uref) {
161+
upipe_pcap_src_output(upipe, upipe_pcap_src->uref, &upipe_pcap_src->upump);
162+
upipe_pcap_src->uref = NULL;
163+
}
164+
150165
struct pcap_pkthdr *hdr;
151166
const u_char *data;
152167
switch (pcap_next_ex(pcap, &hdr, &data)) {
@@ -201,9 +216,27 @@ static void upipe_pcap_src_worker(struct upump *upump)
201216

202217
uref_block_unmap(uref, 0);
203218

204-
/* XXX: rebase to start from uclock_now ? */
205219
uint64_t ts = hdr->ts.tv_sec * UCLOCK_FREQ + hdr->ts.tv_usec * (UCLOCK_FREQ / 1000000);
206220

221+
if (upipe_pcap_src->uclock) {
222+
uint64_t now = uclock_now(upipe_pcap_src->uclock);
223+
224+
if (upipe_pcap_src->cr_offset == 0)
225+
upipe_pcap_src->cr_offset = now - ts;
226+
227+
ts += upipe_pcap_src->cr_offset;
228+
229+
uref_clock_set_cr_sys(uref, ts);
230+
upipe_pcap_src->uref = uref;
231+
232+
uint64_t ticks = 0;
233+
if (now < ts)
234+
ticks = ts - now;
235+
236+
upipe_pcap_src_wait_upump(upipe, ticks, upipe_pcap_src_worker);
237+
return;
238+
}
239+
207240
uref_clock_set_cr_sys(uref, ts);
208241

209242
upipe_pcap_src_output(upipe, uref, &upipe_pcap_src->upump);
@@ -260,6 +293,9 @@ static int _upipe_pcap_src_control(struct upipe *upipe, int command,
260293
switch (command) {
261294
case UPIPE_ATTACH_UPUMP_MGR:
262295
return upipe_pcap_src_attach_upump_mgr(upipe);
296+
case UPIPE_ATTACH_UCLOCK:
297+
upipe_pcap_src_require_uclock(upipe);
298+
return UBASE_ERR_NONE;
263299
case UPIPE_REGISTER_REQUEST: {
264300
struct urequest *request = va_arg(args, struct urequest *);
265301
if (request->type == UREQUEST_FLOW_FORMAT ||
@@ -307,13 +343,16 @@ static void upipe_pcap_src_free(struct upipe *upipe)
307343

308344
if (upipe_pcap_src->pcap)
309345
pcap_close(upipe_pcap_src->pcap);
346+
if (upipe_pcap_src->uref)
347+
uref_free(upipe_pcap_src->uref);
310348

311349
upipe_pcap_src_clean_output(upipe);
312350
upipe_pcap_src_clean_urefcount(upipe);
313351
upipe_pcap_src_clean_ubuf_mgr(upipe);
314352
upipe_pcap_src_clean_uref_mgr(upipe);
315353
upipe_pcap_src_clean_upump_mgr(upipe);
316354
upipe_pcap_src_clean_upump(upipe);
355+
upipe_pcap_src_clean_uclock(upipe);
317356
upipe_pcap_src_free_void(upipe);
318357
}
319358

@@ -329,13 +368,16 @@ static struct upipe *upipe_pcap_src_alloc(struct upipe_mgr *mgr,
329368

330369
struct upipe_pcap_src *upipe_pcap_src = upipe_pcap_src_from_upipe(upipe);
331370
upipe_pcap_src->pcap = NULL;
371+
upipe_pcap_src->uref = NULL;
372+
upipe_pcap_src->cr_offset = 0;
332373

333374
upipe_pcap_src_init_urefcount(upipe);
334375
upipe_pcap_src_init_ubuf_mgr(upipe);
335376
upipe_pcap_src_init_uref_mgr(upipe);
336377
upipe_pcap_src_init_output(upipe);
337378
upipe_pcap_src_init_upump_mgr(upipe);
338379
upipe_pcap_src_init_upump(upipe);
380+
upipe_pcap_src_init_uclock(upipe);
339381

340382
upipe_throw_ready(upipe);
341383

0 commit comments

Comments
 (0)