Skip to content

Commit 2910254

Browse files
committed
cleaned up the otel grpc fw proxy api
1 parent fedbed8 commit 2910254

File tree

3 files changed

+35
-58
lines changed

3 files changed

+35
-58
lines changed

exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_forward_proxy.h

+10-10
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,23 @@ class OPENTELEMETRY_EXPORT_TYPE OtlpGrpcForwardProxy
2222
{
2323
struct Impl;
2424
std::unique_ptr<Impl> impl;
25+
2526
OtlpGrpcForwardProxy() = delete;
27+
2628
public:
27-
enum class ExportMode {
28-
Unknown = -1,
29-
Sync = 0,
30-
// For async to work, options.max_concurrent_requests > 1.
31-
AsyncBlockOnFull = 1,
32-
AsyncDropOnFull = 2,
33-
};
29+
3430
explicit OtlpGrpcForwardProxy(const OtlpGrpcClientOptions& options);
3531
~OtlpGrpcForwardProxy();
32+
3633
void SetActive(bool active);
3734
bool IsActive() const;
35+
3836
void AddListenAddress(const std::string& listenAddress);
39-
void RegisterTraceExporter( ExportMode exportMode );
40-
void RegisterMetricExporter( ExportMode exportMode );
41-
void RegisterLogRecordExporter( ExportMode exportMode );
37+
38+
void RegisterTraceExporter( );
39+
void RegisterMetricExporter( );
40+
void RegisterLogRecordExporter( );
41+
4242
void Start();
4343
void Wait();
4444
void Shutdown();

exporters/otlp/src/otlp_grpc_forward_proxy.cc

+22-45
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ struct OtlpGrpcForwardProxy::Impl
4242
{
4343
bool active{ false };
4444

45-
OtlpGrpcClientOptions clientOptions;
45+
OtlpGrpcClientOptions clientOptions{};
4646
std::shared_ptr<OtlpGrpcClient> client;
4747

4848
grpc::ServerBuilder serverBuilder;
@@ -65,10 +65,6 @@ struct OtlpGrpcForwardProxy::Impl
6565
std::unique_ptr<MetricsService::StubInterface> metricExporterStub;
6666

6767
#if defined(ENABLE_ASYNC_EXPORT)
68-
ExportMode logExporterMode{ ExportMode::Unknown };
69-
ExportMode traceExporterMode{ ExportMode::Unknown };;
70-
ExportMode metricExporterMode{ ExportMode::Unknown };;
71-
7268
struct LogExporterProxyAsync;
7369
struct TraceExporterProxyAsync;
7470
struct MetricExporterProxyAsync;
@@ -123,11 +119,8 @@ struct OtlpGrpcForwardProxy::Impl
123119
OTEL_INTERNAL_LOG_ERROR("[otlp_grpc_forward_proxy] Loop detected!");
124120
return true;
125121
}
126-
else
127-
{
128-
// Add what we've got so far
129-
context->AddMetadata(kFwProxyRidHeader, value );
130-
}
122+
// Add what we've got so far
123+
context->AddMetadata(kFwProxyRidHeader, value );
131124
}
132125
// Add ourselves too.
133126
context->AddMetadata(kFwProxyRidHeader, fw_proxy_id);
@@ -154,7 +147,6 @@ OtlpGrpcForwardProxy::Impl::Impl(const OtlpGrpcClientOptions& options)
154147
uint64_t p1 = (uint64_t(rd())<<32) | uint64_t(rd());
155148
sprintf_s(buf, "%08.8lx%08.8lx", p0, p1);
156149
fw_proxy_id = buf;
157-
158150
}
159151

160152
#if defined(ENABLE_ASYNC_EXPORT)
@@ -222,9 +214,8 @@ grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServe
222214
}
223215
#endif
224216

225-
// This is bit ugly, but still easier to understand (at least by me) than template<>'s
226217
#if defined(ENABLE_ASYNC_EXPORT)
227-
#define MAKE_PROXY_ASYNC(NAME, STUB, SERVICE, REQUEST, RESPONSE, TEXT, MODE) \
218+
#define MAKE_PROXY_ASYNC(NAME, STUB, SERVICE, REQUEST, RESPONSE, TEXT) \
228219
struct OtlpGrpcForwardProxy::Impl::NAME final : public SERVICE::CallbackService { \
229220
OtlpGrpcForwardProxy::Impl& impl; \
230221
explicit NAME(OtlpGrpcForwardProxy::Impl& impl_): impl(impl_){} \
@@ -234,22 +225,16 @@ grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServe
234225
auto context{ impl.client->MakeClientContext(impl.clientOptions) }; \
235226
if( impl.CheckForLoop(context.get(), cbServerContext->client_metadata()) ) \
236227
return impl.Finish(cbServerContext, exportResult, grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy detected loop.")); \
237-
OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export"); \
238-
auto syncStatus{ grpc::Status(grpc::Status(grpc::StatusCode::DO_NOT_USE, "")) }; \
228+
OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] async " TEXT " export"); \
239229
auto arena{ std::make_unique<google::protobuf::Arena>(impl.arenaOptions) }; \
240-
auto request{ *req }; \
241-
exportResult = impl.client->DelegateAsyncExport( impl.clientOptions, impl.STUB.get(), std::move(context), std::move(arena), std::move(request), \
230+
auto reqCopy{ *req }; \
231+
exportResult = impl.client->DelegateAsyncExport( impl.clientOptions, impl.STUB.get(), std::move(context), std::move(arena), std::move(reqCopy), \
242232
[implPtr = &impl] (sdk::common::ExportResult r, std::unique_ptr<google::protobuf::Arena>&&, const REQUEST&, RESPONSE* ) -> bool { \
243233
return implPtr->HandleExportResult(r); \
244234
}); \
245-
if( exportResult == sdk::common::ExportResult::kFailureFull && impl.MODE == OtlpGrpcForwardProxy::ExportMode::AsyncBlockOnFull ) { \
246-
OTEL_INTERNAL_LOG_DEBUG("[otlp_grpc_forward_proxy] " TEXT " export (blocking)"); \
247-
auto syncContext{ impl.client->MakeClientContext(impl.clientOptions ) }; \
248-
syncStatus = impl.STUB->Export( syncContext.get(), *req, resp ); \
249-
} \
250-
return impl.Finish(cbServerContext, exportResult, syncStatus); \
235+
return impl.Finish(cbServerContext, exportResult, grpc::Status(grpc::StatusCode::DO_NOT_USE, "")); \
251236
} \
252-
return impl.Finish(cbServerContext, exportResult, grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "OtlpGrpcForwardProxy is not active.")); \
237+
return impl.Finish(cbServerContext, exportResult, grpc::Status(grpc::StatusCode::DO_NOT_USE, "")); \
253238
}};
254239
#endif
255240

@@ -269,18 +254,19 @@ grpc::ServerUnaryReactor* OtlpGrpcForwardProxy::Impl::Finish(grpc::CallbackServe
269254
}};
270255

271256
#if defined(ENABLE_ASYNC_EXPORT)
272-
MAKE_PROXY_ASYNC(MetricExporterProxyAsync, metricExporterStub, MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, "Async Metric", metricExporterMode)
273-
MAKE_PROXY_ASYNC(TraceExporterProxyAsync, traceExporterStub, TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, "Async Trace", traceExporterMode)
274-
MAKE_PROXY_ASYNC(LogExporterProxyAsync, logExporterStub, LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, "Async Log", logExporterMode)
257+
MAKE_PROXY_ASYNC(MetricExporterProxyAsync, metricExporterStub, MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, "metric")
258+
MAKE_PROXY_ASYNC(TraceExporterProxyAsync, traceExporterStub, TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, "trace")
259+
MAKE_PROXY_ASYNC(LogExporterProxyAsync, logExporterStub, LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, "log")
275260
#endif
276261

277-
MAKE_PROXY(MetricExporterProxy, metricExporterStub, MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, "Metric")
278-
MAKE_PROXY(TraceExporterProxy, traceExporterStub, TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, "Trace")
279-
MAKE_PROXY(LogExporterProxy, logExporterStub, LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, "Log")
262+
MAKE_PROXY(MetricExporterProxy, metricExporterStub, MetricsService, ExportMetricsServiceRequest, ExportMetricsServiceResponse, "metric")
263+
MAKE_PROXY(TraceExporterProxy, traceExporterStub, TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, "trace")
264+
MAKE_PROXY(LogExporterProxy, logExporterStub, LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse, "log")
280265

281266
OtlpGrpcForwardProxy::OtlpGrpcForwardProxy(const OtlpGrpcClientOptions& options_)
282267
: impl(std::make_unique<Impl>(options_))
283268
{
269+
assert(impl != nullptr);
284270
}
285271

286272
OtlpGrpcForwardProxy::~OtlpGrpcForwardProxy()
@@ -306,67 +292,58 @@ void OtlpGrpcForwardProxy::AddListenAddress(const std::string& listenAddress)
306292
impl->serverBuilder.AddListeningPort(listenAddress, grpc::InsecureServerCredentials(), &( pair.port ));
307293
}
308294

309-
void OtlpGrpcForwardProxy::RegisterMetricExporter( ExportMode exportMode )
295+
void OtlpGrpcForwardProxy::RegisterMetricExporter( )
310296
{
311297
assert(impl != nullptr);
312298
assert(impl->metricExporterStub == nullptr);
313299
assert(impl->metricExporterProxy == nullptr);
314300
impl->metricExporterStub = impl->client->MakeMetricsServiceStub();
315301
#if defined(ENABLE_ASYNC_EXPORT)
316302
assert(impl->metricExporterProxyAsync == nullptr);
317-
assert(impl->metricExporterMode == ExportMode::Unknown);
318-
if( exportMode != ExportMode::Sync && impl->clientOptions.max_concurrent_requests > 1 )
303+
if( impl->clientOptions.max_concurrent_requests > 1 )
319304
{
320-
impl->metricExporterMode = exportMode;
321305
impl->metricExporterProxyAsync = std::make_unique<Impl::MetricExporterProxyAsync>(*impl);
322306
impl->serverBuilder.RegisterService(impl->metricExporterProxyAsync.get());
323307
return;
324308
}
325-
impl->metricExporterMode = ExportMode::Sync;
326309
#endif
327310
impl->metricExporterProxy = std::make_unique<Impl::MetricExporterProxy>(*impl);
328311
impl->serverBuilder.RegisterService(impl->metricExporterProxy.get());
329312
}
330313

331-
void OtlpGrpcForwardProxy::RegisterTraceExporter( ExportMode exportMode )
314+
void OtlpGrpcForwardProxy::RegisterTraceExporter( )
332315
{
333316
assert(impl != nullptr);
334317
assert(impl->traceExporterStub == nullptr);
335318
assert(impl->traceExporterProxy == nullptr);
336319
impl->traceExporterStub = impl->client->MakeTraceServiceStub();
337320
#if defined(ENABLE_ASYNC_EXPORT)
338321
assert(impl->traceExporterProxyAsync == nullptr);
339-
assert(impl->traceExporterMode == ExportMode::Unknown);
340-
if( exportMode != ExportMode::Sync && impl->clientOptions.max_concurrent_requests > 1 )
322+
if( impl->clientOptions.max_concurrent_requests > 1 )
341323
{
342-
impl->traceExporterMode = exportMode;
343324
impl->traceExporterProxyAsync = std::make_unique<Impl::TraceExporterProxyAsync>(*impl);
344325
impl->serverBuilder.RegisterService(impl->traceExporterProxyAsync.get());
345326
return;
346327
}
347-
impl->traceExporterMode = ExportMode::Sync;
348328
#endif
349329
impl->traceExporterProxy = std::make_unique<Impl::TraceExporterProxy>(*impl);
350330
impl->serverBuilder.RegisterService(impl->traceExporterProxy.get());
351331
}
352332

353-
void OtlpGrpcForwardProxy::RegisterLogRecordExporter( ExportMode exportMode )
333+
void OtlpGrpcForwardProxy::RegisterLogRecordExporter( )
354334
{
355335
assert(impl != nullptr);
356336
assert(impl->logExporterStub == nullptr);
357337
assert(impl->logExporterProxy == nullptr);
358338
impl->logExporterStub = impl->client->MakeLogsServiceStub();
359339
#if defined(ENABLE_ASYNC_EXPORT)
360340
assert(impl->logExporterProxyAsync == nullptr);
361-
assert(impl->logExporterMode == ExportMode::Unknown);
362-
if( exportMode != ExportMode::Sync && impl->clientOptions.max_concurrent_requests > 1 )
341+
if( impl->clientOptions.max_concurrent_requests > 1 )
363342
{
364-
impl->logExporterMode = exportMode;
365343
impl->logExporterProxyAsync = std::make_unique<Impl::LogExporterProxyAsync>(*impl);
366344
impl->serverBuilder.RegisterService(impl->logExporterProxyAsync.get());
367345
return;
368346
}
369-
impl->logExporterMode = ExportMode::Sync;
370347
#endif
371348
impl->logExporterProxy = std::make_unique<Impl::LogExporterProxy>(*impl);
372349
impl->serverBuilder.RegisterService(impl->logExporterProxy.get());

x/x.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -400,9 +400,9 @@ struct proxy_thread
400400
proxy->SetActive(true);
401401

402402
proxy->AddListenAddress(listenAddress);
403-
proxy->RegisterMetricExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
404-
proxy->RegisterTraceExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
405-
proxy->RegisterLogRecordExporter(OtlpGrpcForwardProxy::ExportMode::AsyncDropOnFull);
403+
proxy->RegisterMetricExporter();
404+
proxy->RegisterTraceExporter();
405+
proxy->RegisterLogRecordExporter();
406406
proxy->Start();
407407
{
408408
std::unique_lock<std::mutex> lock(mu);

0 commit comments

Comments
 (0)