Skip to content

Commit 7f53feb

Browse files
committed
srth: throw an event when we get connected or disconnected
srt_tx: use this event to "disconnect" from remote peer
1 parent 5bebb60 commit 7f53feb

File tree

3 files changed

+33
-0
lines changed

3 files changed

+33
-0
lines changed

examples/srt_tx.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,16 @@ static int catch_hs(struct uprobe *uprobe, struct upipe *upipe,
143143
uint16_t latency_ms;
144144

145145
switch (event) {
146+
case UPROBE_SRT_HANDSHAKE_CONNECTED:
147+
if (ubase_get_signature(args) != UPIPE_SRT_HANDSHAKE_SIGNATURE) {
148+
return uprobe_throw_next(uprobe, upipe, event, args);
149+
}
150+
va_arg(args, unsigned int); // signature
151+
bool connected = va_arg(args, int );
152+
upipe_notice_va(upipe, "%sCONNECTED", connected ? "" : "DIS");
153+
if (!connected)
154+
ubase_assert(upipe_set_uri(upipe_udpsink, NULL));
155+
return UBASE_ERR_NONE;
146156
case UPROBE_SOURCE_END:
147157
upipe_warn(upipe, "Remote shutdown");
148158
struct upump *u = upump_alloc_timer(upump_mgr, stop, upipe_udpsrc,

include/upipe-srt/upipe_srt_handshake.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ enum upipe_srt_handshake_command {
5555
UPIPE_SRT_HANDSHAKE_GET_LATENCY,
5656
};
5757

58+
/** @This extends uprobe_throw with specific events. */
59+
enum uprobe_srt_handshake_event {
60+
UPROBE_SRT_HANDSHAKE_SENTINEL = UPROBE_LOCAL,
61+
62+
/** connection status changed (bool) */
63+
UPROBE_SRT_HANDSHAKE_CONNECTED,
64+
};
65+
5866
/** @This sets the peer address
5967
*
6068
* @param upipe description structure of the pipe

lib/upipe-srt/upipe_srt_handshake.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ static void upipe_srt_handshake_keepalive_timeout(struct upump *upump)
309309
struct upipe_srt_handshake *upipe_srt_handshake = upipe_srt_handshake_from_upipe(upipe);
310310

311311
upipe_err(upipe, "No data in 10s");
312+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
312313
upipe_throw_source_end(upipe);
313314

314315
upipe_srt_handshake->expect_conclusion = false;
@@ -1158,6 +1159,8 @@ static struct uref *upipe_srt_handshake_handle_hs_caller_conclusion(struct upipe
11581159
size -= ext_len;
11591160
}
11601161

1162+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, true);
1163+
11611164
upipe_srt_handshake_finalize(upipe);
11621165
return NULL;
11631166
}
@@ -1250,13 +1253,15 @@ static struct uref *upipe_srt_handshake_handle_hs_listener_conclusion(struct upi
12501253
if (hs_packet->dst_socket_id != 0) {
12511254
upipe_err_va(upipe, "Malformed conclusion handshake (dst_socket_id 0x%08x)", hs_packet->dst_socket_id);
12521255
upipe_srt_handshake->expect_conclusion = false;
1256+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
12531257
upipe_throw_source_end(upipe);
12541258
return upipe_srt_handshake_alloc_hs_reject(upipe, timestamp,
12551259
hs_packet->remote_socket_id, SRT_HANDSHAKE_TYPE_REJ_UNKNOWN);
12561260
}
12571261
if (hs_packet->syn_cookie != upipe_srt_handshake->syn_cookie) {
12581262
upipe_err(upipe, "Malformed conclusion handshake (invalid syn cookie)");
12591263
upipe_srt_handshake->expect_conclusion = false;
1264+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
12601265
upipe_throw_source_end(upipe);
12611266
return upipe_srt_handshake_alloc_hs_reject(upipe, timestamp,
12621267
hs_packet->remote_socket_id, SRT_HANDSHAKE_TYPE_REJ_UNKNOWN);
@@ -1266,6 +1271,7 @@ static struct uref *upipe_srt_handshake_handle_hs_listener_conclusion(struct upi
12661271
if (hs_packet->version == SRT_HANDSHAKE_VERSION && size < SRT_HANDSHAKE_CIF_EXTENSION_MIN_SIZE + SRT_HANDSHAKE_HSREQ_SIZE) {
12671272
upipe_err(upipe, "Malformed conclusion handshake (size)");
12681273
upipe_srt_handshake->expect_conclusion = false;
1274+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
12691275
upipe_throw_source_end(upipe);
12701276
return upipe_srt_handshake_alloc_hs_reject(upipe, timestamp,
12711277
hs_packet->remote_socket_id, SRT_HANDSHAKE_TYPE_REJ_UNKNOWN);
@@ -1308,6 +1314,7 @@ static struct uref *upipe_srt_handshake_handle_hs_listener_conclusion(struct upi
13081314

13091315
if (upipe_srt_handshake->password && !got_key) {
13101316
upipe_err(upipe, "Password specified but could not get streaming key");
1317+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
13111318
upipe_throw_source_end(upipe);
13121319
upipe_srt_handshake->expect_conclusion = false;
13131320
return upipe_srt_handshake_alloc_hs_reject(upipe, timestamp,
@@ -1376,6 +1383,7 @@ static struct uref *upipe_srt_handshake_handle_hs_listener_conclusion(struct upi
13761383
#endif
13771384
}
13781385

1386+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, true);
13791387
upipe_srt_handshake_finalize(upipe);
13801388

13811389
uref_block_unmap(uref, 0);
@@ -1427,6 +1435,7 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
14271435
upipe_err_va(upipe, "Remote rejected handshake (%s)", get_hs_error(hs_type));
14281436
if (!upipe_srt_handshake->listener)
14291437
upipe_srt_handshake->syn_cookie = 0;
1438+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
14301439
upipe_srt_handshake->expect_conclusion = false;
14311440
return NULL;
14321441
}
@@ -1447,6 +1456,7 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
14471456
} else {
14481457
if (hs_type != SRT_HANDSHAKE_TYPE_INDUCTION) {
14491458
upipe_err_va(upipe, "Expected induction, ignore hs type %s", get_hs_type(hs_type));
1459+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
14501460
return upipe_srt_handshake_alloc_hs_reject(upipe, timestamp,
14511461
hs_packet.remote_socket_id, SRT_HANDSHAKE_TYPE_REJ_UNKNOWN);
14521462
}
@@ -1476,6 +1486,7 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
14761486
if (hs_packet.version != SRT_HANDSHAKE_VERSION || hs_packet.dst_socket_id != upipe_srt_handshake->socket_id) {
14771487
upipe_err_va(upipe, "Malformed handshake (%08x != %08x)",
14781488
hs_packet.dst_socket_id, upipe_srt_handshake->socket_id);
1489+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
14791490
return upipe_srt_handshake_alloc_hs_reject(upipe, timestamp,
14801491
hs_packet.remote_socket_id, SRT_HANDSHAKE_TYPE_REJ_UNKNOWN);
14811492
}
@@ -1492,6 +1503,7 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
14921503
hs_packet.extension != SRT_HANDSHAKE_EXT_KMREQ ||
14931504
hs_packet.syn_cookie != 0 || hs_packet.dst_socket_id != 0) {
14941505
upipe_err_va(upipe, "Malformed first handshake syn %u dst_id %u", hs_packet.syn_cookie, hs_packet.dst_socket_id);
1506+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
14951507
return upipe_srt_handshake_alloc_hs_reject(upipe, timestamp,
14961508
hs_packet.remote_socket_id, SRT_HANDSHAKE_TYPE_REJ_UNKNOWN);
14971509
}
@@ -1646,6 +1658,7 @@ static struct uref *upipe_srt_handshake_input_control(struct upipe *upipe, const
16461658

16471659
case SRT_CONTROL_TYPE_SHUTDOWN:
16481660
upipe_err_va(upipe, "shutdown requested");
1661+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
16491662
upipe_throw_source_end(upipe);
16501663
break;
16511664

@@ -1711,6 +1724,7 @@ static void upipe_srt_handshake_input(struct upipe *upipe, struct uref *uref,
17111724
upipe_dbg(upipe, "dst socket id unset");
17121725
ubase_assert(uref_block_unmap(uref, 0));
17131726
uref_free(uref);
1727+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
17141728
upipe_throw_source_end(upipe);
17151729
return;
17161730
}
@@ -1721,6 +1735,7 @@ static void upipe_srt_handshake_input(struct upipe *upipe, struct uref *uref,
17211735
upipe_srt_handshake->socket_id);
17221736
ubase_assert(uref_block_unmap(uref, 0));
17231737
uref_free(uref);
1738+
upipe_throw(upipe, UPROBE_SRT_HANDSHAKE_CONNECTED, UPIPE_SRT_HANDSHAKE_SIGNATURE, false);
17241739
upipe_throw_source_end(upipe);
17251740
return;
17261741
}

0 commit comments

Comments
 (0)