@@ -1007,6 +1007,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
10071007 , _in_sem(1 )
10081008 , _out_sem(1 )
10091009 , _options(std::move(options))
1010+ , _output_in_progress(false )
10101011 , _output_pending(make_ready_future<>())
10111012 , _ctx(make_ssl_context(t))
10121013 , _ssl([this ]() {
@@ -1065,7 +1066,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
10651066 : session(t, std::move(creds), net::get_impl::get(std::move(sock)), options) {}
10661067
10671068 ~session () {
1068- SEASTAR_ASSERT (_output_pending. available ());
1069+ SEASTAR_ASSERT (output_available ());
10691070 }
10701071
10711072 const char * get_type_string () const {
@@ -1081,6 +1082,8 @@ class session : public enable_shared_from_this<session>, public session_impl {
10811082 tls_log.debug (" {} wait_for_output error: {}" , *this , ep);
10821083 _error = ep;
10831084 return make_exception_future (ep);
1085+ }).finally ([this ] {
1086+ _output_in_progress = false ;
10841087 });
10851088 }
10861089
@@ -1166,7 +1169,16 @@ class session : public enable_shared_from_this<session>, public session_impl {
11661169 // any unprocessed part of the packet is returned.
11671170 future<> do_put (net::packet p) {
11681171 tls_log.trace (" {} do_put" , *this );
1169- SEASTAR_ASSERT (_output_pending.available ());
1172+
1173+ // the put path is protected from concurrent calls by the _out_sem semaphore,
1174+ // however it is possible via the read (get()) path that a renegotiation may
1175+ // occur, putting an outstanding write on the output path. Originally,
1176+ // there was an assert to ensure that _output_pending was available. This assert
1177+ // would correctly serve the purpose that no writes occurred outside of the protection
1178+ // of the _out_sem. However, since writes from OpenSSL can occur outside of this semaphore,
1179+ // we will attempt to perform a write even if there is an outstanding write pending.
1180+ // This will result in SSL_write_ex failing with SSL_WANT_WRITE as the error code.
1181+ // This will force a wait_for_output to occur.
11701182 return do_with (std::move (p),
11711183 [this ](net::packet& p) {
11721184 // This do_until runs until either a renegotiation occurs or the packet is empty
@@ -1655,7 +1667,9 @@ class session : public enable_shared_from_this<session>, public session_impl {
16551667 // handshake aqcuire, because in worst case, we get here while a reader is attempting
16561668 // re-handshake.
16571669 return with_semaphore (_in_sem, 1 , [this ] {
1658- return with_semaphore (_out_sem, 1 , [] { });
1670+ return with_semaphore (_out_sem, 1 , [this ] {
1671+ _output_in_progress = false ;
1672+ });
16591673 });
16601674 }).handle_exception ([me = shared_from_this ()](std::exception_ptr){
16611675 }).discard_result ());
@@ -1788,6 +1802,39 @@ class session : public enable_shared_from_this<session>, public session_impl {
17881802 return _remote_address;
17891803 }
17901804
1805+ future<>& output_pending () {
1806+ return _output_pending;
1807+ }
1808+
1809+ bool output_available () {
1810+ // the wait_for_output method exchanges _output_pending for a ready
1811+ // future and then awaits for the output to complete. Checking that
1812+ // _output_pending is not enough to ensure that the output has completed.
1813+ // The purpose of the output_in_progress flag is to await that that
1814+ // future has resolved before permitting another write to be enqueued.
1815+ return !_output_in_progress && _output_pending.available ();
1816+ }
1817+
1818+ void assign_output_pending (future<> f) {
1819+ SEASTAR_ASSERT (!_output_in_progress);
1820+ SEASTAR_ASSERT (_output_pending.available ());
1821+ _output_in_progress = true ;
1822+ _output_pending = std::move (f);
1823+ }
1824+
1825+ void assign_output_error (std::exception_ptr ep) {
1826+ _output_in_progress = false ;
1827+ _output_pending = make_exception_future<>(ep);
1828+ }
1829+
1830+ data_sink& out () {
1831+ return _out;
1832+ }
1833+
1834+ buf_type& input () {
1835+ return _input;
1836+ }
1837+
17911838private:
17921839 std::vector<subject_alt_name> do_get_alt_name_information (const x509_ptr &peer_cert,
17931840 const std::unordered_set<subject_alt_name_type> &types) const {
@@ -2152,6 +2199,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
21522199 semaphore _out_sem;
21532200 tls_options _options;
21542201
2202+ bool _output_in_progress{false };
21552203 future<> _output_pending;
21562204 buf_type _input;
21572205 // ALPN protocols in OPENSSL format
@@ -2165,9 +2213,6 @@ class session : public enable_shared_from_this<session>, public session_impl {
21652213 bool _eof = false ;
21662214 bool _shutdown = false ;
21672215
2168- friend int bio_write_ex (BIO* b, const char * data, size_t dlen, size_t * written);
2169- friend int bio_read_ex (BIO* b, char * data, size_t dlen, size_t *readbytes);
2170- friend long bio_ctrl (BIO * b, int ctrl, long num, void * data);
21712216 friend int session_ticket_cb (SSL*, unsigned char [16 ], unsigned char [EVP_MAX_IV_LENGTH],
21722217 EVP_CIPHER_CTX*, EVP_MAC_CTX*, int );
21732218};
@@ -2266,9 +2311,9 @@ long bio_ctrl(BIO * b, int ctrl, long num, void * data) {
22662311 case BIO_CTRL_EOF:
22672312 return BIO_test_flags (b, BIO_FLAGS_IN_EOF) != 0 ;
22682313 case BIO_CTRL_PENDING:
2269- return static_cast <long >(session->_input .size ());
2314+ return static_cast <long >(session->input () .size ());
22702315 case BIO_CTRL_WPENDING:
2271- return session->_output_pending . available () ? 0 : 1 ;
2316+ return session->output_available () ? 0 : 1 ;
22722317 default :
22732318 return 0 ;
22742319 }
@@ -2294,7 +2339,7 @@ int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {
22942339 tls_log.trace (" {} bio_write_ex: dlen {}" , *session, dlen);
22952340 BIO_clear_retry_flags (b);
22962341
2297- if (!session->_output_pending . available ()) {
2342+ if (!session->output_available ()) {
22982343 tls_log.trace (" {} bio_write_ex: nothing pending in output" , *session);
22992344 BIO_set_retry_write (b);
23002345 return 0 ;
@@ -2303,17 +2348,20 @@ int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {
23032348 try {
23042349 size_t n;
23052350
2306- if (!session->_output_pending .failed ()) {
2351+ if (!session->output_pending () .failed ()) {
23072352 scattered_message<char > msg;
23082353 msg.append (std::string_view (data, dlen));
23092354 n = msg.size ();
2310- session->_output_pending = session->_out .put (std::move (msg).release ());
2355+ // Important here to use the assign_output_pending function. This ensures
2356+ // that there are no outstanding writes in progress and sets the
2357+ // output_pending flag
2358+ session->assign_output_pending (session->out ().put (std::move (msg).release ()));
23112359 tls_log.trace (" {} bio_write_ex: Appended {} bytes to output pending" , *session, n);
23122360 }
23132361
2314- if (session->_output_pending .failed ()) {
2362+ if (session->output_pending () .failed ()) {
23152363 tls_log.debug (" {} bio_write_ex: output pending has error" , *session);
2316- std::rethrow_exception (session->_output_pending .get_exception ());
2364+ std::rethrow_exception (session->output_pending () .get_exception ());
23172365 }
23182366
23192367 if (written != nullptr ) {
@@ -2324,11 +2372,11 @@ int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {
23242372 } catch (const std::system_error & e) {
23252373 tls_log.debug (" {} bio_write_ex: system error occurred: {}" , *session, e.what ());
23262374 ERR_raise_data (ERR_LIB_SYS, e.code ().value (), e.what ());
2327- session->_output_pending = make_exception_future<> (std::current_exception ());
2375+ session->assign_output_error (std::current_exception ());
23282376 } catch (...) {
23292377 tls_log.debug (" {} bio_write_ex: unknown error occurred" , *session);
23302378 ERR_raise (ERR_LIB_SYS, EIO);
2331- session->_output_pending = make_exception_future<> (std::current_exception ());
2379+ session->assign_output_error (std::current_exception ());
23322380 }
23332381
23342382 return 0 ;
@@ -2350,15 +2398,15 @@ int bio_read_ex(BIO* b, char * data, size_t dlen, size_t *readbytes) {
23502398 return 0 ;
23512399 }
23522400
2353- if (session->_input .empty ()) {
2401+ if (session->input () .empty ()) {
23542402 tls_log.trace (" {} bio_read_ex: input empty" , *session);
23552403 BIO_set_retry_read (b);
23562404 return 0 ;
23572405 }
23582406
2359- auto n = std::min (dlen, session->_input .size ());
2360- memcpy (data, session->_input .get (), n);
2361- session->_input .trim_front (n);
2407+ auto n = std::min (dlen, session->input () .size ());
2408+ memcpy (data, session->input () .get (), n);
2409+ session->input () .trim_front (n);
23622410 if (readbytes != nullptr ) {
23632411 *readbytes = n;
23642412 }
0 commit comments