Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 66 additions & 22 deletions src/net/ossl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
, _in_sem(1)
, _out_sem(1)
, _options(std::move(options))
, _output_in_progress(false)
, _output_pending(make_ready_future<>())
, _ctx(make_ssl_context(t))
, _ssl([this]() {
Expand Down Expand Up @@ -1065,7 +1066,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
: session(t, std::move(creds), net::get_impl::get(std::move(sock)), options) {}

~session() {
SEASTAR_ASSERT(_output_pending.available());
SEASTAR_ASSERT(output_available());
}

const char * get_type_string() const {
Expand Down Expand Up @@ -1166,7 +1167,16 @@ class session : public enable_shared_from_this<session>, public session_impl {
// any unprocessed part of the packet is returned.
future<> do_put(net::packet p) {
tls_log.trace("{} do_put", *this);
SEASTAR_ASSERT(_output_pending.available());

// the put path is protected from concurrent calls by the _out_sem semaphore,
// however it is possible via the read (get()) path that a renegotiation may
// occur, putting an outstanding write on the output path. Originally,
// there was an assert to ensure that _output_pending was available. This assert
// would correctly serve the purpose that no writes occurred outside of the protection
// of the _out_sem. However, since writes from OpenSSL can occur outside of this semaphore,
// we will attempt to perform a write even if there is an outstanding write pending.
// This will result in SSL_write_ex failing with SSL_WANT_WRITE as the error code.
// This will force a wait_for_output to occur.
return do_with(std::move(p),
[this](net::packet& p) {
// This do_until runs until either a renegotiation occurs or the packet is empty
Expand Down Expand Up @@ -1655,7 +1665,8 @@ class session : public enable_shared_from_this<session>, public session_impl {
// handshake aqcuire, because in worst case, we get here while a reader is attempting
// re-handshake.
return with_semaphore(_in_sem, 1, [this] {
return with_semaphore(_out_sem, 1, [] { });
return with_semaphore(_out_sem, 1, [] {
});
});
}).handle_exception([me = shared_from_this()](std::exception_ptr){
}).discard_result());
Expand Down Expand Up @@ -1788,6 +1799,40 @@ class session : public enable_shared_from_this<session>, public session_impl {
return _remote_address;
}

future<>& output_pending() {
return _output_pending;
}

bool output_available() {
// the wait_for_output method exchanges _output_pending for a ready
// future and then awaits for the output to complete. Checking that
// _output_pending is not enough to ensure that the output has completed.
// The purpose of the output_in_progress flag is to await that that
// future has resolved before permitting another write to be enqueued.
return !_output_in_progress && _output_pending.available();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should theoretically be sufficient to just check _output_in_progress, ya?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... I guess I'm just overdoing it a little bit

}

void assign_output_pending(future<> f) {
SEASTAR_ASSERT(!_output_in_progress);
SEASTAR_ASSERT(_output_pending.available());
_output_in_progress = true;
_output_pending = std::move(f).finally([this]{_output_in_progress = false;});
}

void assign_output_error(std::exception_ptr ep) {
SEASTAR_ASSERT(_output_pending.available());
_output_in_progress = false;
_output_pending = make_exception_future<>(ep);
}

data_sink& out() {
return _out;
}

buf_type& input() {
return _input;
}

private:
std::vector<subject_alt_name> do_get_alt_name_information(const x509_ptr &peer_cert,
const std::unordered_set<subject_alt_name_type> &types) const {
Expand Down Expand Up @@ -2152,6 +2197,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
semaphore _out_sem;
tls_options _options;

Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _output_in_progress flag is a critical concurrency control mechanism but lacks documentation. Add a comment explaining its purpose: tracking when output operations are actively being processed, distinct from when futures are pending, to prevent concurrent writes during renegotiation.

Suggested change
// Tracks when output operations are actively being processed.
// This is distinct from when futures are pending (_output_pending).
// Used as a concurrency control mechanism to prevent concurrent writes
// during renegotiation.

Copilot uses AI. Check for mistakes.
bool _output_in_progress{false};
future<> _output_pending;
buf_type _input;
// ALPN protocols in OPENSSL format
Expand All @@ -2165,9 +2211,6 @@ class session : public enable_shared_from_this<session>, public session_impl {
bool _eof = false;
bool _shutdown = false;

friend int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written);
friend int bio_read_ex(BIO* b, char * data, size_t dlen, size_t *readbytes);
friend long bio_ctrl(BIO * b, int ctrl, long num, void * data);
friend int session_ticket_cb(SSL*, unsigned char[16], unsigned char[EVP_MAX_IV_LENGTH],
EVP_CIPHER_CTX*, EVP_MAC_CTX*, int);
};
Expand Down Expand Up @@ -2266,9 +2309,9 @@ long bio_ctrl(BIO * b, int ctrl, long num, void * data) {
case BIO_CTRL_EOF:
return BIO_test_flags(b, BIO_FLAGS_IN_EOF) != 0;
case BIO_CTRL_PENDING:
return static_cast<long>(session->_input.size());
return static_cast<long>(session->input().size());
case BIO_CTRL_WPENDING:
return session->_output_pending.available() ? 0 : 1;
return session->output_available() ? 0 : 1;
default:
return 0;
}
Expand All @@ -2294,26 +2337,27 @@ int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {
tls_log.trace("{} bio_write_ex: dlen {}", *session, dlen);
BIO_clear_retry_flags(b);

if (!session->_output_pending.available()) {
tls_log.trace("{} bio_write_ex: nothing pending in output", *session);
if (!session->output_available()) {
tls_log.trace("{} bio_write_ex: write pending", *session);
BIO_set_retry_write(b);
return 0;
}

try {
size_t n;

if (!session->_output_pending.failed()) {
if (!session->output_pending().failed()) {
scattered_message<char> msg;
msg.append(std::string_view(data, dlen));
n = msg.size();
session->_output_pending = session->_out.put(std::move(msg).release());
// Important here to use the assign_output_pending function. This ensures
// that there are no outstanding writes in progress and sets the
// output_pending flag
session->assign_output_pending(session->out().put(std::move(msg).release()));
tls_log.trace("{} bio_write_ex: Appended {} bytes to output pending", *session, n);
}

if (session->_output_pending.failed()) {
} else {
tls_log.debug("{} bio_write_ex: output pending has error", *session);
std::rethrow_exception(session->_output_pending.get_exception());
std::rethrow_exception(session->output_pending().get_exception());
}

if (written != nullptr) {
Expand All @@ -2324,11 +2368,11 @@ int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {
} catch(const std::system_error & e) {
tls_log.debug("{} bio_write_ex: system error occurred: {}", *session, e.what());
ERR_raise_data(ERR_LIB_SYS, e.code().value(), e.what());
session->_output_pending = make_exception_future<>(std::current_exception());
session->assign_output_error(std::current_exception());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q:

so if i'm reading this correctly the source of errors is our put onto the network layer
which just gets stored in the future as an exception future,
and then all subsequent calls to bio_write_ex will just continually grab, rethrow, and re-store the exception back into the future?

Who is the ultimate upstream error handler?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caller to put() will handle the error. These are considered fatal so then the session should be destructed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does get_exception extract the exception?
If thats the case we could have
put -> bio_write -> future emplaced -> exception -> put detects and throws
followed by an out of band write or another put

Wondering if we should check that the future remains errored for all subsequent callers as a signal of "this session is borked"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what happens is

  • PUT
  • BIO write (error occurs and error is put into _output_pending
  • wait_for_output() gets called and attemps to resolve _output_pending
  • Within wait_for_ouptut() handle_exception is triggered and that emplaces the error into _error
  • Any future calls to put() check _error and if that is set, returns _error

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PUT
BIO write -> error stored in _output_pending
wait_for_output -> moves the exception out?, regardless throws
read fiber performs an out of band write, will it see the exception?

} catch(...) {
tls_log.debug("{} bio_write_ex: unknown error occurred", *session);
ERR_raise(ERR_LIB_SYS, EIO);
session->_output_pending = make_exception_future<>(std::current_exception());
session->assign_output_error(std::current_exception());
}

return 0;
Expand All @@ -2350,15 +2394,15 @@ int bio_read_ex(BIO* b, char * data, size_t dlen, size_t *readbytes) {
return 0;
}

if (session->_input.empty()) {
if (session->input().empty()) {
tls_log.trace("{} bio_read_ex: input empty", *session);
BIO_set_retry_read(b);
return 0;
}

auto n = std::min(dlen, session->_input.size());
memcpy(data, session->_input.get(), n);
session->_input.trim_front(n);
auto n = std::min(dlen, session->input().size());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
the usages of input are either const (get size/empty)
or an extraction of a chunk.

the ref handing function breaks encapsulation, maybe just two member functions for get_input_size and extract_input(max_size)

memcpy(data, session->input().get(), n);
session->input().trim_front(n);
if (readbytes != nullptr) {
*readbytes = n;
}
Expand Down