@@ -131,6 +131,8 @@ struct upipe_srt_handshake {
131131
132132 uint64_t last_hs_sent ;
133133
134+ bool end ;
135+
134136 /** public upipe structure */
135137 struct upipe upipe ;
136138};
@@ -303,28 +305,66 @@ static void upipe_srt_handshake_kmreq(struct upump *upump)
303305 upipe_srt_handshake_output (& upipe_srt_handshake -> upipe , uref_dup (kmreq ), NULL );
304306}
305307
306- static void upipe_srt_handshake_keepalive_timeout (struct upump * upump )
308+ static int upipe_srt_handshake_set_flow_def (struct upipe * upipe , struct uref * flow_def );
309+ static void upipe_srt_handshake_timeout (struct upump * upump );
310+
311+ static void upipe_srt_handshake_disconnect (struct upipe * upipe , bool end , bool blacklist )
307312{
308- struct upipe * upipe = upump_get_opaque (upump , struct upipe * );
309313 struct upipe_srt_handshake * upipe_srt_handshake = upipe_srt_handshake_from_upipe (upipe );
310314
311- upipe_err (upipe , "No data in 10s" );
312- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
313- upipe_throw_source_end (upipe );
315+ /* Connection has just been aborted already */
316+
317+ /* No need to keep waiting for keepalives */
318+ upipe_srt_handshake_set_upump_keepalive_timeout (upipe , NULL );
319+ /* No need to keep sending KMREQ packets */
320+ upipe_srt_handshake_set_upump_kmreq (upipe , NULL );
321+ /* No need to keep sending handshake packets */
322+ upipe_srt_handshake_set_upump_handshake_send (upipe , NULL );
323+
324+ if (upipe_srt_handshake -> upump_handshake_timeout ) /* if timeout was running we die */
325+ end = true;
314326
327+ upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false, blacklist , end );
315328 upipe_srt_handshake -> expect_conclusion = false;
329+
330+ if (end ) {
331+ upipe_throw_source_end (upipe );
332+ upipe_srt_handshake -> end = true;
333+ /* No need to wait for a timeout */
334+ upipe_srt_handshake_set_upump_handshake_timeout (upipe , NULL );
335+ } else {
336+ /* (new) connection has to succeed within 3 seconds */
337+ struct upump * upump =
338+ upump_alloc_timer (upipe_srt_handshake -> upump_mgr ,
339+ upipe_srt_handshake_timeout ,
340+ upipe , upipe -> refcount ,
341+ 3 * UCLOCK_FREQ , 0 );
342+ upump_start (upump );
343+ upipe_srt_handshake_set_upump_handshake_timeout (upipe , upump );
344+ }
345+
346+ struct uref * flow_def = uref_block_flow_alloc_def (upipe_srt_handshake -> uref_mgr , "" );
347+ if (flow_def ) {
348+ upipe_srt_handshake_set_flow_def (upipe , flow_def );
349+ /* force sending flow definition immediately */
350+ upipe_srt_handshake_output (upipe , NULL , NULL );
351+ }
352+ }
353+
354+ static void upipe_srt_handshake_keepalive_timeout (struct upump * upump )
355+ {
356+ struct upipe * upipe = upump_get_opaque (upump , struct upipe * );
357+
358+ upipe_err (upipe , "No data in 10s" );
359+ upipe_srt_handshake_disconnect (upipe , true, false);
316360}
317361
318362static void upipe_srt_handshake_timeout (struct upump * upump )
319363{
320364 struct upipe * upipe = upump_get_opaque (upump , struct upipe * );
321- struct upipe_srt_handshake * upipe_srt_handshake = upipe_srt_handshake_from_upipe (upipe );
322365
323366 upipe_err (upipe , "Connection timed out" );
324- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
325- upipe_srt_handshake_set_upump_handshake_timeout (upipe , NULL );
326- upipe_srt_handshake_set_upump_handshake_send (upipe , NULL );
327- upipe_srt_handshake -> expect_conclusion = false;
367+ upipe_srt_handshake_disconnect (upipe , false, false);
328368}
329369
330370static void upipe_srt_handshake_send_timer (struct upump * upump )
@@ -460,6 +500,7 @@ static struct upipe *upipe_srt_handshake_alloc(struct upipe_mgr *mgr,
460500 upipe_srt_handshake -> password = NULL ;
461501
462502 upipe_srt_handshake -> establish_time = 0 ;
503+ upipe_srt_handshake -> end = false;
463504
464505 upipe_throw_ready (upipe );
465506 return upipe ;
@@ -1161,7 +1202,7 @@ static struct uref *upipe_srt_handshake_handle_hs_caller_conclusion(struct upipe
11611202 size -= ext_len ;
11621203 }
11631204
1164- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , true);
1205+ upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , true, false, false );
11651206
11661207 upipe_srt_handshake_finalize (upipe );
11671208 return NULL ;
@@ -1254,17 +1295,15 @@ static struct uref *upipe_srt_handshake_handle_hs_listener_conclusion(struct upi
12541295
12551296 if (hs_packet -> syn_cookie != upipe_srt_handshake -> syn_cookie ) {
12561297 upipe_err (upipe , "Malformed conclusion handshake (invalid syn cookie)" );
1257- upipe_srt_handshake -> expect_conclusion = false;
1258- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1298+ upipe_srt_handshake_disconnect (upipe , false, false);
12591299 return upipe_srt_handshake_alloc_hs_reject (upipe , timestamp ,
12601300 hs_packet -> remote_socket_id , SRT_HANDSHAKE_TYPE_REJ_UNKNOWN );
12611301 }
12621302
12631303 /* At least HSREQ is expected */
12641304 if (hs_packet -> version == SRT_HANDSHAKE_VERSION && size < SRT_HANDSHAKE_CIF_EXTENSION_MIN_SIZE + SRT_HANDSHAKE_HSREQ_SIZE ) {
12651305 upipe_err (upipe , "Malformed conclusion handshake (size)" );
1266- upipe_srt_handshake -> expect_conclusion = false;
1267- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1306+ upipe_srt_handshake_disconnect (upipe , false, false);
12681307 return upipe_srt_handshake_alloc_hs_reject (upipe , timestamp ,
12691308 hs_packet -> remote_socket_id , SRT_HANDSHAKE_TYPE_REJ_UNKNOWN );
12701309 }
@@ -1297,6 +1336,13 @@ static struct uref *upipe_srt_handshake_handle_hs_listener_conclusion(struct upi
12971336 upipe_err_va (upipe , "Malformed HSREQ: %u < %u\n" , ext_len ,
12981337 SRT_HANDSHAKE_HSREQ_SIZE );
12991338 } else if (ext_type == SRT_HANDSHAKE_EXT_TYPE_KMREQ ) {
1339+ if (!upipe_srt_handshake -> password ) {
1340+ upipe_err (upipe , "Password not specified but remote requested encryption." );
1341+ upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false, true, false);
1342+ upipe_srt_handshake -> expect_conclusion = false;
1343+ return upipe_srt_handshake_alloc_hs_reject (upipe , timestamp ,
1344+ hs_packet -> remote_socket_id , SRT_HANDSHAKE_TYPE_REJ_BADSECRET );
1345+ }
13001346 got_key = upipe_srt_handshake_parse_kmreq (upipe , ext , ext_len , & wrap , & wrap_len );
13011347 }
13021348
@@ -1306,8 +1352,7 @@ static struct uref *upipe_srt_handshake_handle_hs_listener_conclusion(struct upi
13061352
13071353 if (upipe_srt_handshake -> password && !got_key ) {
13081354 upipe_err (upipe , "Password specified but could not get streaming key" );
1309- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1310- upipe_srt_handshake -> expect_conclusion = false;
1355+ upipe_srt_handshake_disconnect (upipe , false, true);
13111356 return upipe_srt_handshake_alloc_hs_reject (upipe , timestamp ,
13121357 hs_packet -> remote_socket_id , SRT_HANDSHAKE_TYPE_REJ_BADSECRET );
13131358 }
@@ -1374,7 +1419,7 @@ static struct uref *upipe_srt_handshake_handle_hs_listener_conclusion(struct upi
13741419#endif
13751420 }
13761421
1377- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , true);
1422+ upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , true, false );
13781423 upipe_srt_handshake_finalize (upipe );
13791424
13801425 uref_block_unmap (uref , 0 );
@@ -1426,9 +1471,7 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
14261471 upipe_err_va (upipe , "Remote rejected handshake (%s)" , get_hs_error (hs_type ));
14271472 if (!upipe_srt_handshake -> listener )
14281473 upipe_srt_handshake -> syn_cookie = 0 ;
1429- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1430- upipe_throw_source_end (upipe );
1431- upipe_srt_handshake -> expect_conclusion = false;
1474+ upipe_srt_handshake_disconnect (upipe , true, false);
14321475 return NULL ;
14331476 }
14341477
@@ -1456,8 +1499,7 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
14561499
14571500 if (hs_type != SRT_HANDSHAKE_TYPE_INDUCTION ) {
14581501 upipe_err_va (upipe , "Expected induction, ignore hs type %s" , get_hs_type (hs_type ));
1459- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1460- upipe_srt_handshake_set_upump_handshake_send (upipe , NULL );
1502+ upipe_srt_handshake_disconnect (upipe , false, false);
14611503 return upipe_srt_handshake_alloc_hs_reject (upipe , timestamp ,
14621504 hs_packet .remote_socket_id , SRT_HANDSHAKE_TYPE_REJ_UNKNOWN );
14631505 }
@@ -1480,8 +1522,7 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
14801522 if (hs_packet .version != SRT_HANDSHAKE_VERSION || hs_packet .dst_socket_id != upipe_srt_handshake -> socket_id ) {
14811523 upipe_err_va (upipe , "Malformed handshake (%08x != %08x)" ,
14821524 hs_packet .dst_socket_id , upipe_srt_handshake -> socket_id );
1483- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1484- upipe_srt_handshake_set_upump_handshake_send (upipe , NULL );
1525+ upipe_srt_handshake_disconnect (upipe , false, false);
14851526 return upipe_srt_handshake_alloc_hs_reject (upipe , timestamp ,
14861527 hs_packet .remote_socket_id , SRT_HANDSHAKE_TYPE_REJ_UNKNOWN );
14871528 }
@@ -1498,7 +1539,7 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
14981539 hs_packet .extension != SRT_HANDSHAKE_EXT_KMREQ ||
14991540 hs_packet .syn_cookie != 0 || hs_packet .dst_socket_id != 0 ) {
15001541 upipe_err_va (upipe , "Malformed first handshake syn %u dst_id %u" , hs_packet .syn_cookie , hs_packet .dst_socket_id );
1501- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1542+ upipe_srt_handshake_disconnect (upipe , false , false);
15021543 return upipe_srt_handshake_alloc_hs_reject (upipe , timestamp ,
15031544 hs_packet .remote_socket_id , SRT_HANDSHAKE_TYPE_REJ_UNKNOWN );
15041545 }
@@ -1544,6 +1585,14 @@ static struct uref *upipe_srt_handshake_handle_user(struct upipe *upipe, const u
15441585 const uint8_t * cif = srt_get_control_packet_cif (buf );
15451586 size -= SRT_HEADER_SIZE ;
15461587
1588+ if (!upipe_srt_handshake -> password ) {
1589+ upipe_err (upipe , "Password not specified but remote requested encryption in user packet." );
1590+ upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false, true, false);
1591+ upipe_srt_handshake -> expect_conclusion = false;
1592+ return upipe_srt_handshake_alloc_hs_reject (upipe , timestamp ,
1593+ srt_get_packet_dst_socket_id (buf ), SRT_HANDSHAKE_TYPE_REJ_BADSECRET );
1594+ }
1595+
15471596 if (!upipe_srt_handshake_parse_kmreq (upipe , cif , size , & wrap , & wrap_len )) {
15481597 return NULL ;
15491598 }
@@ -1653,8 +1702,7 @@ static struct uref *upipe_srt_handshake_input_control(struct upipe *upipe, const
16531702
16541703 case SRT_CONTROL_TYPE_SHUTDOWN :
16551704 upipe_err_va (upipe , "shutdown requested" );
1656- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1657- upipe_throw_source_end (upipe );
1705+ upipe_srt_handshake_disconnect (upipe , true, false);
16581706 break ;
16591707
16601708 case SRT_CONTROL_TYPE_DROPREQ :
@@ -1686,14 +1734,21 @@ static void upipe_srt_handshake_input(struct upipe *upipe, struct uref *uref,
16861734 upipe_warn (upipe , "No upump mgr" );
16871735 upipe_srt_handshake_check (upipe , NULL );
16881736 uref_free (uref );
1737+ return ;
16891738 }
16901739
16911740 if (!upipe_srt_handshake -> uclock ) {
16921741 upipe_warn (upipe , "No uclock" );
16931742 upipe_srt_handshake_check (upipe , NULL );
16941743 uref_free (uref );
1744+ return ;
16951745 }
16961746
1747+ /* Pipe is gonna die soon */
1748+ if (upipe_srt_handshake -> end ) {
1749+ uref_free (uref );
1750+ return ;
1751+ }
16971752 size_t total_size ;
16981753 ubase_assert (uref_block_size (uref , & total_size ));
16991754
@@ -1716,11 +1771,14 @@ static void upipe_srt_handshake_input(struct upipe *upipe, struct uref *uref,
17161771 if (dst_socket_id == 0 ) {
17171772 uint16_t type = srt_get_control_packet_type (buf );
17181773 if (!control || type != SRT_CONTROL_TYPE_HANDSHAKE ) {
1719- upipe_dbg (upipe , "dst socket id unset" );
1774+ upipe_dbg_va (upipe , "dst socket id unset (%s)" ,
1775+ control ? get_ctrl_type (type ) : "data" );
17201776 ubase_assert (uref_block_unmap (uref , 0 ));
17211777 uref_free (uref );
1722- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1723- upipe_throw_source_end (upipe );
1778+ // blacklist?
1779+ // XXX: find out who is sending shutdown without dst socket set
1780+ // XXX: sometimes the legitimate caller does it too
1781+ upipe_srt_handshake_disconnect (upipe , true, false);
17241782 return ;
17251783 }
17261784 }
@@ -1730,8 +1788,7 @@ static void upipe_srt_handshake_input(struct upipe *upipe, struct uref *uref,
17301788 upipe_srt_handshake -> socket_id );
17311789 ubase_assert (uref_block_unmap (uref , 0 ));
17321790 uref_free (uref );
1733- upipe_throw (upipe , UPROBE_SRT_HANDSHAKE_CONNECTED , UPIPE_SRT_HANDSHAKE_SIGNATURE , false);
1734- upipe_throw_source_end (upipe );
1791+ upipe_srt_handshake_disconnect (upipe , true, false);
17351792 return ;
17361793 }
17371794
0 commit comments