Skip to content

Commit fd80e27

Browse files
committed
Functionality added
1 parent 1c9c43a commit fd80e27

File tree

8 files changed

+563
-227
lines changed

8 files changed

+563
-227
lines changed

src/cpp/ext/otel/otel_client_call_tracer.cc

Lines changed: 101 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "opentelemetry/context/context.h"
4343
#include "opentelemetry/metrics/sync_instruments.h"
4444
#include "opentelemetry/trace/context.h"
45+
#include "opentelemetry/trace/tracer.h"
4546
#include "src/core/client_channel/client_channel_filter.h"
4647
#include "src/core/lib/channel/channel_stack.h"
4748
#include "src/core/lib/channel/status_util.h"
@@ -50,6 +51,7 @@
5051
#include "src/core/lib/resource_quota/arena.h"
5152
#include "src/core/lib/slice/slice.h"
5253
#include "src/core/lib/slice/slice_buffer.h"
54+
#include "src/core/lib/surface/call.h"
5355
#include "src/core/lib/transport/metadata_batch.h"
5456
#include "src/core/telemetry/tcp_tracer.h"
5557
#include "src/core/util/sync.h"
@@ -65,7 +67,7 @@ namespace internal {
6567

6668
OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
6769
const OpenTelemetryPluginImpl::ClientCallTracer* parent,
68-
bool arena_allocated)
70+
uint64_t attempt_num, bool is_transparent_retry, bool arena_allocated)
6971
: parent_(parent),
7072
arena_allocated_(arena_allocated),
7173
start_time_(absl::Now()) {
@@ -84,6 +86,16 @@ OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
8486
/*optional_labels=*/{},
8587
/*is_client=*/true, parent_->otel_plugin_));
8688
}
89+
if (parent_->otel_plugin_->tracer_ != nullptr) {
90+
std::array<
91+
std::pair<absl::string_view, opentelemetry::common::AttributeValue>, 2>
92+
attributes = {
93+
std::make_pair("previous-rpc-attempts", attempt_num),
94+
std::make_pair("transparent-retry", is_transparent_retry)};
95+
span_ = parent_->otel_plugin_->tracer_->StartSpan(
96+
absl::StrCat("Attempt.", GetMethodFromPath(parent_->path_)),
97+
attributes);
98+
}
8799
}
88100

89101
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
@@ -109,40 +121,71 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
109121
return true;
110122
},
111123
parent_->otel_plugin_);
112-
if (parent_->otel_plugin_->tracer_ != nullptr) {
124+
if (span_ != nullptr) {
113125
GrpcTextMapCarrier carrier(send_initial_metadata);
114-
opentelemetry::context::Context empty_context;
115-
auto context_with_span =
116-
opentelemetry::trace::SetSpan(empty_context, span_);
117-
parent_->otel_plugin_->text_map_propagator_->Inject(
118-
carrier, context_with_span /*TODO:fixme*/);
126+
opentelemetry::context::Context context;
127+
context = opentelemetry::trace::SetSpan(context, span_);
128+
parent_->otel_plugin_->text_map_propagator_->Inject(carrier, context);
119129
}
120130
}
121131

122132
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
123133
RecordSendMessage(const grpc_core::SliceBuffer& send_message) {
124-
RecordAnnotation(
125-
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
134+
if (span_ != nullptr) {
135+
std::array<
136+
std::pair<absl::string_view, opentelemetry::common::AttributeValue>, 2>
137+
attributes = {std::make_pair("sequence-number", send_seq_num_++),
138+
std::make_pair("message-size", send_message.Length())};
139+
span_->AddEvent("Outbound message", attributes);
140+
}
126141
}
127142

128143
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
129144
RecordSendCompressedMessage(
130145
const grpc_core::SliceBuffer& send_compressed_message) {
131-
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
132-
send_compressed_message.Length()));
146+
if (span_ != nullptr) {
147+
std::array<
148+
std::pair<absl::string_view, opentelemetry::common::AttributeValue>, 2>
149+
attributes = {std::make_pair("sequence-number",
150+
opentelemetry::common::AttributeValue(
151+
send_seq_num_ - 1)),
152+
std::make_pair("message-size",
153+
opentelemetry::common::AttributeValue(
154+
send_compressed_message.Length()))};
155+
span_->AddEvent("Outbound message compressed", attributes);
156+
}
133157
}
134158

135159
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
136160
RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) {
137-
RecordAnnotation(
138-
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
161+
if (span_ != nullptr) {
162+
std::array<
163+
std::pair<absl::string_view, opentelemetry::common::AttributeValue>, 2>
164+
attributes = {
165+
std::make_pair(
166+
absl::string_view("sequence-number"),
167+
opentelemetry::common::AttributeValue(recv_seq_num_++)),
168+
std::make_pair(
169+
"message-size",
170+
opentelemetry::common::AttributeValue(recv_message.Length()))};
171+
span_->AddEvent("Inbound message", attributes);
172+
}
139173
}
140174

141175
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
142176
RecordReceivedDecompressedMessage(
143177
const grpc_core::SliceBuffer& recv_decompressed_message) {
144-
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
145-
recv_decompressed_message.Length()));
178+
if (span_ != nullptr) {
179+
std::array<
180+
std::pair<absl::string_view, opentelemetry::common::AttributeValue>, 2>
181+
attributes = {std::make_pair(absl::string_view("sequence-number"),
182+
opentelemetry::common::AttributeValue(
183+
recv_seq_num_ - 1)),
184+
std::make_pair("message-size",
185+
opentelemetry::common::AttributeValue(
186+
recv_decompressed_message.Length()))};
187+
span_->AddEvent("Inbound message decompressed", attributes);
188+
}
146189
}
147190

148191
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
@@ -188,6 +231,14 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
188231
parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size
189232
->Record(incoming_bytes, labels, opentelemetry::context::Context{});
190233
}
234+
if (span_ != nullptr) {
235+
if (status.ok()) {
236+
span_->SetStatus(opentelemetry::trace::StatusCode::kOk);
237+
} else {
238+
span_->SetStatus(opentelemetry::trace::StatusCode::kError,
239+
status.ToString());
240+
}
241+
}
191242
}
192243

193244
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
@@ -205,6 +256,9 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordCancel(
205256

206257
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordEnd(
207258
const gpr_timespec& /*latency*/) {
259+
if (span_ != nullptr) {
260+
span_->End();
261+
}
208262
if (arena_allocated_) {
209263
this->~CallAttemptTracer();
210264
} else {
@@ -213,8 +267,10 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordEnd(
213267
}
214268

215269
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
216-
RecordAnnotation(absl::string_view /*annotation*/) {
217-
// Not implemented
270+
RecordAnnotation(absl::string_view annotation) {
271+
if (span_ != nullptr) {
272+
span_->AddEvent(annotation);
273+
}
218274
}
219275

220276
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
@@ -264,11 +320,31 @@ OpenTelemetryPluginImpl::ClientCallTracer::ClientCallTracer(
264320
otel_plugin_(otel_plugin),
265321
scope_config_(std::move(scope_config)) {
266322
if (otel_plugin_->tracer_ != nullptr) {
267-
span_ = otel_plugin_->tracer_->StartSpan("blah");
323+
opentelemetry::trace::StartSpanOptions options;
324+
// Get the parent span from the parent call if available, otherwise fall
325+
// back to the threadlocal span.
326+
// We are intentionally reusing census_context to save opentelemetry's Span
327+
// on the context to avoid introducing a new type for opentelemetry inside
328+
// gRPC Core. There's no risk of collisions since we do not allow multiple
329+
// tracing systems active for the same call.
330+
auto* parent_span = reinterpret_cast<opentelemetry::trace::Span*>(
331+
arena->GetContext<census_context>());
332+
if (parent_span != nullptr) {
333+
options.parent = parent_span->GetContext();
334+
} else {
335+
options.parent =
336+
opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext();
337+
}
338+
span_ = otel_plugin_->tracer_->StartSpan(
339+
absl::StrCat("Sent.", GetMethodFromPath(path_)), options);
268340
}
269341
}
270342

271-
OpenTelemetryPluginImpl::ClientCallTracer::~ClientCallTracer() {}
343+
OpenTelemetryPluginImpl::ClientCallTracer::~ClientCallTracer() {
344+
if (span_ != nullptr) {
345+
span_->End();
346+
}
347+
}
272348

273349
OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer*
274350
OpenTelemetryPluginImpl::ClientCallTracer::StartNewAttempt(
@@ -277,21 +353,25 @@ OpenTelemetryPluginImpl::ClientCallTracer::StartNewAttempt(
277353
// on the heap, so that in the common case we don't require a heap
278354
// allocation, nor do we unnecessarily grow the arena.
279355
bool is_first_attempt = true;
356+
uint64_t attempt_num;
280357
{
281358
grpc_core::MutexLock lock(&mu_);
282359
if (transparent_retries_ != 0 || retries_ != 0) {
283360
is_first_attempt = false;
284361
}
362+
attempt_num = retries_;
285363
if (is_transparent_retry) {
286364
++transparent_retries_;
287365
} else {
288366
++retries_;
289367
}
290368
}
291369
if (is_first_attempt) {
292-
return arena_->New<CallAttemptTracer>(this, /*arena_allocated=*/true);
370+
return arena_->New<CallAttemptTracer>(
371+
this, attempt_num, is_transparent_retry, /*arena_allocated=*/true);
293372
}
294-
return new CallAttemptTracer(this, /*arena_allocated=*/false);
373+
return new CallAttemptTracer(this, attempt_num, is_transparent_retry,
374+
/*arena_allocated=*/false);
295375
}
296376

297377
absl::string_view OpenTelemetryPluginImpl::ClientCallTracer::MethodForStats()

src/cpp/ext/otel/otel_client_call_tracer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class OpenTelemetryPluginImpl::ClientCallTracer
5151
: public grpc_core::ClientCallTracer::CallAttemptTracer {
5252
public:
5353
CallAttemptTracer(const OpenTelemetryPluginImpl::ClientCallTracer* parent,
54+
uint64_t attempt_num, bool is_transparent_retry,
5455
bool arena_allocated);
5556

5657
std::string TraceId() override {
@@ -118,6 +119,8 @@ class OpenTelemetryPluginImpl::ClientCallTracer
118119
std::atomic<uint64_t> incoming_bytes_{0};
119120
std::atomic<uint64_t> outgoing_bytes_{0};
120121
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
122+
uint64_t send_seq_num_ = 0;
123+
uint64_t recv_seq_num_ = 0;
121124
};
122125

123126
ClientCallTracer(

0 commit comments

Comments
 (0)