Skip to content

Commit 01fa65c

Browse files
ctillercopybara-github
authored andcommitted
Fix secure endpoint
Fixing a few bugs that slipped into the new secure endpoint implementation PiperOrigin-RevId: 746145618
1 parent 098b50f commit 01fa65c

File tree

1 file changed

+51
-35
lines changed

1 file changed

+51
-35
lines changed

src/core/handshaker/security/secure_endpoint.cc

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ class FrameProtector : public RefCounted<FrameProtector> {
7979
->memory_quota()
8080
->CreateMemoryOwner()),
8181
self_reservation_(memory_owner_.MakeReservation(sizeof(*this))) {
82+
GRPC_TRACE_LOG(secure_endpoint, INFO)
83+
<< "FrameProtector: " << this << " protector: " << protector_
84+
<< " zero_copy_protector: " << zero_copy_protector_
85+
<< " leftover_nslices: " << leftover_nslices;
8286
if (leftover_nslices > 0) {
8387
leftover_bytes_ = std::make_unique<SliceBuffer>();
8488
for (size_t i = 0; i < leftover_nslices; i++) {
@@ -106,6 +110,18 @@ class FrameProtector : public RefCounted<FrameProtector> {
106110
Mutex* read_mu() ABSL_LOCK_RETURNED(read_mu_) { return &read_mu_; }
107111
Mutex* write_mu() ABSL_LOCK_RETURNED(write_mu_) { return &write_mu_; }
108112

113+
void TraceOp(absl::string_view op, grpc_slice_buffer* slices) {
114+
if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint) && ABSL_VLOG_IS_ON(2)) {
115+
size_t i;
116+
for (i = 0; i < slices->count; i++) {
117+
char* data =
118+
grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
119+
VLOG(2) << op << " " << this << ": " << data;
120+
gpr_free(data);
121+
}
122+
}
123+
}
124+
109125
void MaybePostReclaimer() {
110126
if (!has_posted_reclaimer_.exchange(true, std::memory_order_relaxed)) {
111127
memory_owner_.PostReclaimer(
@@ -146,15 +162,7 @@ class FrameProtector : public RefCounted<FrameProtector> {
146162
}
147163

148164
void FinishRead(bool ok) {
149-
if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint) && ABSL_VLOG_IS_ON(2)) {
150-
size_t i;
151-
for (i = 0; i < read_buffer_->count; i++) {
152-
char* data = grpc_dump_slice(read_buffer_->slices[i],
153-
GPR_DUMP_HEX | GPR_DUMP_ASCII);
154-
VLOG(2) << "READ " << this << ": " << data;
155-
gpr_free(data);
156-
}
157-
}
165+
TraceOp("FinishRead", read_buffer_);
158166
// TODO(yangg) experiment with moving this block after read_cb to see if it
159167
// helps latency
160168
source_buffer_.Clear();
@@ -243,6 +251,9 @@ class FrameProtector : public RefCounted<FrameProtector> {
243251
absl::StrCat("Unwrap failed (", tsi_result_to_string(result), ")"));
244252
}
245253

254+
GRPC_TRACE_LOG(secure_endpoint, INFO)
255+
<< "Unprotect: " << this << " read_status: " << read_status;
256+
246257
return read_status;
247258
}
248259

@@ -252,6 +263,9 @@ class FrameProtector : public RefCounted<FrameProtector> {
252263
}
253264

254265
bool MaybeCompleteReadImmediately() {
266+
GRPC_TRACE_LOG(secure_endpoint, INFO)
267+
<< "MaybeCompleteReadImmediately: " << this
268+
<< " leftover_bytes_: " << leftover_bytes_.get();
255269
if (leftover_bytes_ != nullptr) {
256270
grpc_slice_buffer_swap(leftover_bytes_->c_slice_buffer(),
257271
source_buffer_.c_slice_buffer());
@@ -285,14 +299,7 @@ class FrameProtector : public RefCounted<FrameProtector> {
285299

286300
output_buffer_.Clear();
287301

288-
if (GRPC_TRACE_FLAG_ENABLED(secure_endpoint) && ABSL_VLOG_IS_ON(2)) {
289-
for (size_t i = 0; i < slices->count; i++) {
290-
char* data =
291-
grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
292-
VLOG(2) << "WRITE " << this << ": " << data;
293-
gpr_free(data);
294-
}
295-
}
302+
TraceOp("Protect", slices);
296303

297304
tsi_result result = TSI_OK;
298305
if (zero_copy_protector_ != nullptr) {
@@ -678,24 +685,28 @@ class SecureEndpoint final : public EventEngine::Endpoint {
678685
}
679686
ReadArgs args;
680687
args.read_hint_bytes = frame_protector_.min_progress_size();
681-
return wrapped_ep_->Read(
682-
[impl = Ref()](absl::Status status) mutable {
683-
{
684-
grpc_core::MutexLock lock(
685-
impl->frame_protector_.read_mu());
686-
if (status.ok() && impl->wrapped_ep_ == nullptr) {
687-
status =
688-
absl::CancelledError("secure endpoint shutdown");
689-
}
690-
status =
691-
impl->frame_protector_.Unprotect(std::move(status));
692-
}
693-
auto on_read = std::move(impl->on_read_);
694-
impl.reset();
695-
on_read(status);
696-
},
697-
frame_protector_.source_buffer(), &args) &&
698-
MaybeFinishReadImmediately();
688+
bool read_completed_immediately = wrapped_ep_->Read(
689+
[impl = Ref()](absl::Status status) mutable {
690+
{
691+
grpc_core::MutexLock lock(impl->frame_protector_.read_mu());
692+
if (status.ok() && impl->wrapped_ep_ == nullptr) {
693+
status = absl::CancelledError("secure endpoint shutdown");
694+
}
695+
status = impl->frame_protector_.Unprotect(std::move(status));
696+
}
697+
if (status.ok()) {
698+
impl->frame_protector_.TraceOp(
699+
"Read",
700+
impl->frame_protector_.source_buffer()->c_slice_buffer());
701+
}
702+
auto on_read = std::move(impl->on_read_);
703+
impl->frame_protector_.FinishRead(status.ok());
704+
impl.reset();
705+
on_read(status);
706+
},
707+
frame_protector_.source_buffer(), &args);
708+
if (read_completed_immediately) return MaybeFinishReadImmediately();
709+
return false;
699710
}
700711

701712
bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
@@ -717,6 +728,8 @@ class SecureEndpoint final : public EventEngine::Endpoint {
717728
return false;
718729
}
719730
on_write_ = std::move(on_writable);
731+
frame_protector_.TraceOp(
732+
"Write", frame_protector_.output_buffer()->c_slice_buffer());
720733
return wrapped_ep_->Write(
721734
[impl = Ref()](absl::Status status) mutable {
722735
auto on_write = std::move(impl->on_write_);
@@ -747,7 +760,10 @@ class SecureEndpoint final : public EventEngine::Endpoint {
747760
private:
748761
bool MaybeFinishReadImmediately() {
749762
grpc_core::MutexLock lock(frame_protector_.read_mu());
763+
frame_protector_.TraceOp(
764+
"Read(Imm)", frame_protector_.source_buffer()->c_slice_buffer());
750765
auto status = frame_protector_.Unprotect(absl::OkStatus());
766+
frame_protector_.FinishRead(status.ok());
751767
if (status.ok()) return true;
752768
event_engine_->Run([impl = Ref(), status = std::move(status)]() mutable {
753769
auto on_read = std::move(impl->on_read_);

0 commit comments

Comments
 (0)