Skip to content

Commit 4b76805

Browse files
stackiaclaude
andcommitted
feat(rtp): add RTP packet reordering support
Implement sliding window reorder buffer to handle out-of-order RTP packets: - 64-slot ring buffer with O(1) insert/lookup using bitmask - Zero-delay forwarding for in-order packets (seq == base_seq) - Force flush mechanism when window is full to bound latency - Initial packet collection phase (8 packets) to handle first-packet reordering caused by software/hardware path split on upstream devices - Proper buffer reference counting for zero-copy memory management This fixes stream corruption issues caused by network jitter and upstream multicast device behavior (e.g., Huawei switches forwarding first packet via software path while subsequent packets use hardware fast-path). Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 24c94c0 commit 4b76805

File tree

11 files changed

+315
-124
lines changed

11 files changed

+315
-124
lines changed

src/Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ rtp2httpd_SOURCES = \
1111
http_fetch.c \
1212
service.c \
1313
rtp.c \
14+
rtp_reorder.c \
1415
multicast.c \
1516
fcc.c \
1617
fcc_telecom.c \
@@ -40,6 +41,7 @@ noinst_HEADERS = \
4041
http_fetch.h \
4142
service.h \
4243
rtp.h \
44+
rtp_reorder.h \
4345
multicast.h \
4446
fcc.h \
4547
fcc_telecom.h \

src/fcc.c

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,8 @@ void fcc_session_cleanup(fcc_session_t *fcc, service_t *service, int epoll_fd) {
117117
fcc->fcc_server =
118118
NULL; /* This was pointing to service memory, safe to NULL */
119119
fcc->media_port = 0;
120-
fcc->current_seqn = 0;
121120
fcc->fcc_term_seqn = 0;
122121
fcc->fcc_term_sent = 0;
123-
fcc->not_first_packet = 0;
124122

125123
/* Clear client address structure */
126124
memset(&fcc->fcc_client, 0, sizeof(fcc->fcc_client));
@@ -333,21 +331,24 @@ int fcc_handle_unicast_media(stream_context_t *ctx, buffer_ref_t *buf_ref) {
333331
logger(LOG_INFO, "FCC: Unicast stream started successfully");
334332
}
335333

336-
/* Forward RTP payload to client (true zero-copy) or capture I-frame
337-
* (snapshot) */
338-
int processed_bytes = stream_process_rtp_payload(
339-
ctx, buf_ref, &fcc->current_seqn, &fcc->not_first_packet);
334+
/* Forward RTP payload to client (with reordering) */
335+
int processed_bytes = stream_process_rtp_payload(ctx, buf_ref);
340336
if (processed_bytes > 0) {
341337
ctx->total_bytes_sent += (uint64_t)processed_bytes;
342338
}
343339

344-
/* Check if we should terminate FCC based on sequence number */
345-
if (fcc->fcc_term_sent && fcc->current_seqn >= fcc->fcc_term_seqn - 1 &&
340+
/* Check if we should terminate FCC based on reorder's delivered sequence.
341+
* base_seq - 1 is the last sequence number successfully delivered.
342+
* Only check when reorder is in active phase (initialized == 2). */
343+
if (fcc->fcc_term_sent && ctx->reorder.initialized == 2 &&
346344
fcc->state != FCC_STATE_MCAST_ACTIVE) {
347-
logger(LOG_INFO,
348-
"FCC: Switching to multicast stream (reached termination sequence)");
349-
fcc_session_set_state(fcc, FCC_STATE_MCAST_ACTIVE,
350-
"Reached termination sequence");
345+
uint16_t last_delivered = ctx->reorder.base_seq - 1;
346+
if ((int16_t)(last_delivered - (fcc->fcc_term_seqn - 1)) >= 0) {
347+
logger(LOG_INFO,
348+
"FCC: Switching to multicast stream (reached termination sequence)");
349+
fcc_session_set_state(fcc, FCC_STATE_MCAST_ACTIVE,
350+
"Reached termination sequence");
351+
}
351352
}
352353

353354
return 0;
@@ -446,8 +447,7 @@ int fcc_handle_mcast_active(stream_context_t *ctx, buffer_ref_t *buf_ref) {
446447
while (node) {
447448
/* Queue each buffer for zero-copy send */
448449
buffer_ref_t *next = node->send_next;
449-
int processed_bytes = stream_process_rtp_payload(
450-
ctx, node, &fcc->current_seqn, &fcc->not_first_packet);
450+
int processed_bytes = stream_process_rtp_payload(ctx, node);
451451
if (likely(processed_bytes > 0)) {
452452
ctx->total_bytes_sent += (uint64_t)processed_bytes;
453453
flushed_bytes += (uint64_t)processed_bytes;
@@ -467,8 +467,7 @@ int fcc_handle_mcast_active(stream_context_t *ctx, buffer_ref_t *buf_ref) {
467467

468468
/* Forward multicast data to client (true zero-copy) or capture I-frame
469469
* (snapshot) */
470-
int processed_bytes = stream_process_rtp_payload(
471-
ctx, buf_ref, &fcc->current_seqn, &fcc->not_first_packet);
470+
int processed_bytes = stream_process_rtp_payload(ctx, buf_ref);
472471
if (likely(processed_bytes > 0)) {
473472
ctx->total_bytes_sent += (uint64_t)processed_bytes;
474473
}

src/fcc.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,8 @@ typedef struct {
5555
uint16_t media_port; /* RTP media port (network byte order, for direct
5656
comparison with sin_port) */
5757
bool verify_server_ip; /* Verify server ip before processing packets */
58-
uint16_t current_seqn;
5958
uint16_t fcc_term_seqn;
6059
int fcc_term_sent;
61-
uint16_t not_first_packet;
6260
int redirect_count; /* Number of redirects followed */
6361
int64_t unicast_start_time; /* Timestamp when unicast started (for sync wait
6462
timeout) */

src/rtp.c

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -77,74 +77,15 @@ int rtp_get_payload(uint8_t *buf, int recv_len, uint8_t **payload, int *size,
7777
}
7878
}
7979

80-
int rtp_queue_buf(connection_t *conn, buffer_ref_t *buf_ref, uint16_t *old_seqn,
81-
uint16_t *not_first) {
82-
int payloadlength;
83-
uint8_t *payload;
84-
uint16_t seqn;
85-
int is_rtp;
86-
uint8_t *data_ptr = (uint8_t *)buf_ref->data + buf_ref->data_offset;
87-
88-
/* Extract payload and sequence number - automatically handles RTP and non-RTP
89-
* packets */
90-
is_rtp = rtp_get_payload(data_ptr, buf_ref->data_size, &payload,
91-
&payloadlength, &seqn);
92-
if (unlikely(is_rtp < 0)) {
93-
return 0; /* Malformed packet, already logged */
94-
}
95-
96-
/* Perform sequence number tracking only for RTP packets (is_rtp == 1) */
97-
if (likely(is_rtp == 1)) {
98-
/* Sequence number validation - discard duplicate/backward/out-of-order
99-
* packets */
100-
if (unlikely(*not_first)) {
101-
/* Calculate sequence number difference (handling wrap-around) */
102-
int16_t seq_diff = (int16_t)(seqn - *old_seqn);
103-
104-
/* Discard duplicate/backward/out-of-order packets (seq_diff <= 0) */
105-
if (seq_diff <= 0) {
106-
logger(LOG_DEBUG,
107-
"Out-of-order RTP packet discarded - last sent seq %d, received "
108-
"%d (diff: %d)",
109-
*old_seqn, seqn, seq_diff);
110-
return 0;
111-
}
112-
113-
/* Forward packet but detect gaps for logging */
114-
uint16_t expected = (*old_seqn + 1) & 0xFFFF;
115-
if (seqn != expected) {
116-
/* This indicates upstream packet loss (network or source), NOT local
117-
* send congestion */
118-
logger(LOG_DEBUG,
119-
"RTP packet loss detected - expected seq %d, received %d (gap: "
120-
"%d packets)",
121-
expected, seqn, seq_diff - 1);
122-
}
123-
}
124-
125-
*old_seqn = seqn;
126-
*not_first = 1;
127-
}
128-
/* For non-RTP packets (is_rtp == 0), skip sequence number tracking */
129-
80+
int rtp_queue_buf_direct(connection_t *conn, buffer_ref_t *buf_ref) {
13081
/* Send headers lazily on first data packet */
13182
if (!conn->headers_sent) {
13283
send_http_headers(conn, STATUS_200, "video/mp2t", NULL);
13384
}
13485

135-
/* True zero-copy send - payload is in buffer pool, send directly without
136-
* memcpy */
137-
/* Calculate offset of payload in the buffer */
138-
buf_ref->data_offset = payload - (uint8_t *)buf_ref->data;
139-
buf_ref->data_size = (size_t)payloadlength;
140-
14186
/* Queue for zero-copy send */
142-
/* Note: zerocopy_queue_add() will automatically increment refcount */
14387
if (connection_queue_zerocopy(conn, buf_ref) == 0) {
144-
/* Successfully queued - send queue now holds a reference */
145-
return payloadlength;
146-
} else {
147-
/* Queue full - backpressure */
148-
return -1;
88+
return (int)buf_ref->data_size;
14989
}
90+
return -1;
15091
}

src/rtp.h

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,12 @@ int rtp_get_payload(uint8_t *buf, int recv_len, uint8_t **payload, int *size,
2727
uint16_t *seqn);
2828

2929
/**
30-
* Write RTP payload to client via connection output buffer, handling sequence
31-
* numbers and duplicates Uses true zero-copy by sending payload directly from
32-
* buffer pool without memcpy
30+
* Queue RTP payload directly to client (used by reorder module)
3331
*
3432
* @param conn Connection object for output buffering
35-
* @param buf_ref Buffer reference for the buffer containing the RTP packet
36-
* @param old_seqn Pointer to store/track previous sequence number
37-
* @param not_first Pointer to track if this is not the first packet
38-
* @return number of payload bytes queued to the client (>=0), or -1 if buffer
39-
* full
33+
* @param buf_ref Buffer reference (already pointing to payload)
34+
* @return number of payload bytes queued (>=0), or -1 if buffer full
4035
*/
41-
int rtp_queue_buf(connection_t *conn, buffer_ref_t *buf_ref, uint16_t *old_seqn,
42-
uint16_t *not_first);
36+
int rtp_queue_buf_direct(connection_t *conn, buffer_ref_t *buf_ref);
4337

4438
#endif /* __RTP_H__ */

src/rtp_reorder.c

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
#include "rtp_reorder.h"
2+
#include "connection.h"
3+
#include "rtp.h"
4+
#include "snapshot.h"
5+
#include "stream.h"
6+
#include "utils.h"
7+
#include <string.h>
8+
9+
void rtp_reorder_init(rtp_reorder_t *r) { memset(r, 0, sizeof(*r)); }
10+
11+
void rtp_reorder_cleanup(rtp_reorder_t *r) {
12+
int pending = 0;
13+
for (int i = 0; i < RTP_REORDER_WINDOW_SIZE; i++) {
14+
if (r->slots[i]) {
15+
buffer_ref_put(r->slots[i]);
16+
r->slots[i] = NULL;
17+
pending++;
18+
}
19+
}
20+
if (pending > 0) {
21+
logger(LOG_DEBUG,
22+
"RTP reorder: Cleanup discarded %d pending packets (base_seq=%u)",
23+
pending, r->base_seq);
24+
}
25+
r->count = 0;
26+
r->initialized = 0;
27+
}
28+
29+
/* Deliver single packet */
30+
static int deliver_packet(buffer_ref_t *buf, connection_t *conn,
31+
int is_snapshot) {
32+
if (is_snapshot) {
33+
return snapshot_process_packet(&conn->stream.snapshot, buf->data_size,
34+
(uint8_t *)buf->data + buf->data_offset,
35+
conn);
36+
}
37+
return rtp_queue_buf_direct(conn, buf);
38+
}
39+
40+
/* Flush consecutive packets, stop at hole
41+
* log_recovery: if true, log "Recovered" message (for Phase 2 reordering) */
42+
static int flush_consecutive(rtp_reorder_t *r, connection_t *conn,
43+
int is_snapshot, int log_recovery) {
44+
int total_bytes = 0;
45+
int flushed = 0;
46+
uint16_t start_seq = r->base_seq;
47+
48+
while (r->count > 0) {
49+
int slot = r->base_seq & RTP_REORDER_WINDOW_MASK;
50+
buffer_ref_t *buf = r->slots[slot];
51+
52+
if (!buf)
53+
break; /* Hole, stop */
54+
55+
int bytes = deliver_packet(buf, conn, is_snapshot);
56+
if (bytes > 0)
57+
total_bytes += bytes;
58+
59+
buffer_ref_put(buf);
60+
r->slots[slot] = NULL;
61+
r->base_seq++;
62+
r->count--;
63+
flushed++;
64+
}
65+
66+
if (log_recovery && flushed > 0) {
67+
logger(LOG_DEBUG,
68+
"RTP reorder: Recovered %d out-of-order packets (seq %u-%u)", flushed,
69+
start_seq, (uint16_t)(r->base_seq - 1));
70+
}
71+
72+
return total_bytes;
73+
}
74+
75+
/* Force flush to make room */
76+
static int force_flush_until(rtp_reorder_t *r, uint16_t target_seq,
77+
connection_t *conn, int is_snapshot) {
78+
int total_bytes = 0;
79+
int lost_count = 0;
80+
uint16_t start_seq = r->base_seq;
81+
82+
while ((int16_t)(target_seq - r->base_seq) >= RTP_REORDER_WINDOW_SIZE) {
83+
int slot = r->base_seq & RTP_REORDER_WINDOW_MASK;
84+
buffer_ref_t *buf = r->slots[slot];
85+
86+
if (buf) {
87+
int bytes = deliver_packet(buf, conn, is_snapshot);
88+
if (bytes > 0)
89+
total_bytes += bytes;
90+
buffer_ref_put(buf);
91+
r->slots[slot] = NULL;
92+
r->count--;
93+
} else {
94+
lost_count++;
95+
}
96+
97+
r->base_seq++;
98+
}
99+
100+
if (lost_count > 0) {
101+
logger(LOG_DEBUG, "RTP reorder: Packet loss at seq %u (target=%u)",
102+
start_seq, target_seq);
103+
}
104+
105+
return total_bytes;
106+
}
107+
108+
int rtp_reorder_insert(rtp_reorder_t *r, buffer_ref_t *buf_ref, uint16_t seqn,
109+
connection_t *conn, int is_snapshot) {
110+
int total_bytes = 0;
111+
112+
/* Phase 0: First packet - start collecting */
113+
if (unlikely(r->initialized == 0)) {
114+
r->base_seq = seqn; /* Remember first seq as reference */
115+
r->initialized = 1; /* Enter collecting phase */
116+
117+
/* Store first packet */
118+
int slot = seqn & RTP_REORDER_WINDOW_MASK;
119+
buffer_ref_get(buf_ref);
120+
r->slots[slot] = buf_ref;
121+
r->count = 1;
122+
return 0;
123+
}
124+
125+
/* Phase 1: Collecting initial packets
126+
* base_seq dynamically tracks the minimum sequence seen so far */
127+
if (unlikely(r->initialized == 1)) {
128+
int slot = seqn & RTP_REORDER_WINDOW_MASK;
129+
130+
if (!r->slots[slot]) {
131+
buffer_ref_get(buf_ref);
132+
r->slots[slot] = buf_ref;
133+
r->count++;
134+
135+
/* Update min_seq: if this packet is earlier than current base_seq */
136+
if ((int16_t)(seqn - r->base_seq) < 0) {
137+
r->base_seq = seqn;
138+
}
139+
}
140+
141+
/* Collected enough? Start delivering from base_seq */
142+
if (r->count >= RTP_REORDER_INIT_COLLECT) {
143+
r->initialized = 2; /* Enter active phase */
144+
145+
logger(LOG_DEBUG,
146+
"RTP reorder: Init complete, base_seq=%u (%d packets collected)",
147+
r->base_seq, r->count);
148+
149+
/* Flush consecutive from base_seq (already the minimum)
150+
* Don't log "Recovered" - this is normal init, not reordering */
151+
total_bytes += flush_consecutive(r, conn, is_snapshot, 0);
152+
}
153+
return total_bytes;
154+
}
155+
156+
/* Phase 2: Active reordering (initialized == 2) */
157+
int16_t seq_diff = (int16_t)(seqn - r->base_seq);
158+
159+
/* Case 1: Expected sequence -> immediate delivery */
160+
if (likely(seq_diff == 0)) {
161+
int bytes = deliver_packet(buf_ref, conn, is_snapshot);
162+
if (bytes > 0)
163+
total_bytes = bytes;
164+
r->base_seq++;
165+
166+
total_bytes += flush_consecutive(r, conn, is_snapshot, 1);
167+
return total_bytes;
168+
}
169+
170+
/* Case 2: Late/duplicate packet -> drop */
171+
if (seq_diff < 0) {
172+
logger(LOG_DEBUG, "RTP reorder: Late packet dropped (seq=%u, base=%u)",
173+
seqn, r->base_seq);
174+
return 0;
175+
}
176+
177+
/* Case 3: Beyond window -> force flush */
178+
if (seq_diff >= RTP_REORDER_WINDOW_SIZE) {
179+
total_bytes += force_flush_until(r, seqn, conn, is_snapshot);
180+
}
181+
182+
/* Store in slot */
183+
int slot = seqn & RTP_REORDER_WINDOW_MASK;
184+
if (r->slots[slot]) {
185+
/* Slot occupied - duplicate packet, silently drop.
186+
* This is normal in some network environments where upstream devices
187+
* send redundant packets for reliability. */
188+
return total_bytes;
189+
}
190+
191+
buffer_ref_get(buf_ref);
192+
r->slots[slot] = buf_ref;
193+
r->count++;
194+
195+
return total_bytes;
196+
}

0 commit comments

Comments
 (0)