Skip to content

Commit d8369f7

Browse files
committed
feat: OTLPLogExporter
1 parent 343bd07 commit d8369f7

File tree

3 files changed

+420
-6
lines changed

3 files changed

+420
-6
lines changed

Diff for: lib/src/experimental_sdk.dart

+7-6
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@ library experimental_sdk;
66

77
import 'package:meta/meta.dart';
88

9-
export 'sdk/logs/log_record.dart' show ReadableLogRecord, ReadWriteLogRecord, LogRecord;
10-
export 'sdk/logs/logger.dart' show Logger;
11-
export 'sdk/logs/logger_provider.dart' show LoggerProvider;
129
export 'sdk/logs/export_result.dart' show ExportResult, ExportResultCode;
13-
export 'sdk/logs/processors/log_record_processor.dart' show LogRecordProcessor;
14-
export 'sdk/logs/processors/noop_log_processor.dart' show NoopLogRecordProcessor;
15-
export 'sdk/logs/processors/batch_log_record_processor.dart' show BatchLogRecordProcessor;
1610
export 'sdk/logs/exporters/console_log_record_exporter.dart' show ConsoleLogRecordExporter;
1711
export 'sdk/logs/exporters/inmemory_log_record_exporter.dart' show InMemoryLogRecordExporter;
1812
export 'sdk/logs/exporters/log_record_exporter.dart' show LogRecordExporter;
13+
export 'sdk/logs/exporters/otlp_log_exporter.dart' show OTLPLogExporter;
14+
export 'sdk/logs/log_record.dart' show ReadableLogRecord, ReadWriteLogRecord, LogRecord;
1915
export 'sdk/logs/log_record_limit.dart' show LogRecordLimits;
16+
export 'sdk/logs/logger.dart' show Logger;
17+
export 'sdk/logs/logger_provider.dart' show LoggerProvider;
18+
export 'sdk/logs/processors/batch_log_record_processor.dart' show BatchLogRecordProcessor;
19+
export 'sdk/logs/processors/log_record_processor.dart' show LogRecordProcessor;
20+
export 'sdk/logs/processors/noop_log_processor.dart' show NoopLogRecordProcessor;
2021
export 'sdk/metrics/counter.dart' show Counter;
2122
export 'sdk/metrics/meter.dart' show Meter;
2223
export 'sdk/metrics/meter_provider.dart' show MeterProvider;

Diff for: lib/src/sdk/logs/exporters/otlp_log_exporter.dart

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright 2021-2022 Workiva.
2+
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information
3+
4+
import 'dart:async';
5+
6+
import 'package:fixnum/fixnum.dart';
7+
import 'package:http/http.dart' as http;
8+
import 'package:logging/logging.dart';
9+
import 'package:opentelemetry/api.dart' as api;
10+
import 'package:opentelemetry/sdk.dart' as sdk;
11+
import 'package:opentelemetry/src/experimental_api.dart' as api;
12+
import 'package:opentelemetry/src/experimental_sdk.dart' as sdk;
13+
14+
import '../../proto/opentelemetry/proto/collector/logs/v1/logs_service.pb.dart' as pb_logs_service;
15+
import '../../proto/opentelemetry/proto/common/v1/common.pb.dart' as pb_common;
16+
import '../../proto/opentelemetry/proto/logs/v1/logs.pb.dart' as pb_logs;
17+
import '../../proto/opentelemetry/proto/logs/v1/logs.pbenum.dart' as pg_logs_enum;
18+
import '../../proto/opentelemetry/proto/resource/v1/resource.pb.dart' as pb_resource;
19+
20+
class OTLPLogExporter implements sdk.LogRecordExporter {
21+
final Logger _log = Logger('opentelemetry.LogCollectorExporter');
22+
23+
final Uri uri;
24+
final http.Client client;
25+
final Map<String, String> headers;
26+
var _isShutdown = false;
27+
28+
OTLPLogExporter(
29+
this.uri, {
30+
http.Client? httpClient,
31+
this.headers = const {},
32+
}) : client = httpClient ?? http.Client();
33+
34+
@override
35+
Future<sdk.ExportResult> export(List<sdk.ReadableLogRecord> logRecords) async {
36+
if (_isShutdown) {
37+
return sdk.ExportResult(
38+
code: sdk.ExportResultCode.failed,
39+
);
40+
}
41+
42+
if (logRecords.isEmpty) {
43+
return sdk.ExportResult(
44+
code: sdk.ExportResultCode.success,
45+
);
46+
}
47+
48+
return _send(uri, logRecords).then((_) {
49+
return sdk.ExportResult(
50+
code: sdk.ExportResultCode.success,
51+
);
52+
}).catchError((e, st) {
53+
return sdk.ExportResult(
54+
code: sdk.ExportResultCode.failed,
55+
error: e,
56+
stackTrace: st,
57+
);
58+
});
59+
}
60+
61+
Future<void> _send(
62+
Uri uri,
63+
List<sdk.ReadableLogRecord> logRecords,
64+
) async {
65+
try {
66+
final body = pb_logs_service.ExportLogsServiceRequest(resourceLogs: _logsToProtobuf(logRecords));
67+
68+
final headers = {'Content-Type': 'application/x-protobuf'}..addAll(this.headers);
69+
70+
await client.post(uri, body: body.writeToBuffer(), headers: headers);
71+
} catch (e) {
72+
_log.warning('Failed to export ${logRecords.length} logs.', e);
73+
}
74+
}
75+
76+
@override
77+
Future<void> shutdown() async {
78+
_isShutdown = true;
79+
client.close();
80+
}
81+
82+
/// Group and construct the protobuf equivalent of the given list of [api.LogRecord]s.
83+
/// Logs are grouped by a trace provider's [sdk.Resource] and a tracer's
84+
/// [sdk.InstrumentationScope].
85+
Iterable<pb_logs.ResourceLogs> _logsToProtobuf(List<sdk.ReadableLogRecord> logRecords) {
86+
// use a map of maps to group spans by resource and instrumentation library
87+
final rsm = <sdk.Resource, Map<sdk.InstrumentationScope, List<pb_logs.LogRecord>>>{};
88+
for (final logRecord in logRecords) {
89+
final il = rsm[logRecord.resource] ?? <sdk.InstrumentationScope, List<pb_logs.LogRecord>>{};
90+
91+
if (logRecord.instrumentationScope != null) {
92+
il[logRecord.instrumentationScope!] = il[logRecord.instrumentationScope] ?? <pb_logs.LogRecord>[]
93+
..add(_logToProtobuf(logRecord));
94+
}
95+
if (logRecord.resource != null) {
96+
rsm[logRecord.resource!] = il;
97+
}
98+
}
99+
100+
final rss = <pb_logs.ResourceLogs>[];
101+
for (final il in rsm.entries) {
102+
// for each distinct resource, construct the protobuf equivalent
103+
final attrs = <pb_common.KeyValue>[];
104+
for (final attr in il.key.attributes.keys) {
105+
attrs.add(pb_common.KeyValue(key: attr, value: _attributeValueToProtobuf(il.key.attributes.get(attr)!)));
106+
}
107+
108+
final rs = pb_logs.ResourceLogs(resource: pb_resource.Resource(attributes: attrs));
109+
// for each distinct instrumentation library, construct the protobuf equivalent
110+
for (final ils in il.value.entries) {
111+
rs.scopeLogs.add(pb_logs.ScopeLogs(
112+
logRecords: ils.value,
113+
scope: pb_common.InstrumentationScope(name: ils.key.name, version: ils.key.version)));
114+
}
115+
rss.add(rs);
116+
}
117+
return rss;
118+
}
119+
120+
pb_logs.LogRecord _logToProtobuf(sdk.ReadableLogRecord log) {
121+
var spanId = <int>[];
122+
var traceId = <int>[];
123+
if (log.spanContext != null) {
124+
spanId = log.spanContext!.spanId.get();
125+
traceId = log.spanContext!.traceId.get();
126+
}
127+
return pb_logs.LogRecord(
128+
timeUnixNano: log.timeStamp,
129+
severityNumber:
130+
log.severityNumber != null ? pg_logs_enum.SeverityNumber.valueOf(log.severityNumber!.index) : null,
131+
severityText: log.severityText,
132+
droppedAttributesCount: log.droppedAttributesCount,
133+
body: _attributeONEValueToProtobuf(log.body),
134+
attributes: (log.attributes?.keys ?? [])
135+
.map((key) => pb_common.KeyValue(key: key, value: _attributeValueToProtobuf(log.attributes!.get(key)!))),
136+
spanId: spanId,
137+
traceId: traceId,
138+
observedTimeUnixNano: log.observedTimestamp);
139+
}
140+
141+
pb_common.AnyValue _attributeONEValueToProtobuf(Object value) {
142+
switch (value.runtimeType) {
143+
case String:
144+
return pb_common.AnyValue(stringValue: value as String);
145+
case bool:
146+
return pb_common.AnyValue(boolValue: value as bool);
147+
case double:
148+
return pb_common.AnyValue(doubleValue: value as double);
149+
case int:
150+
return pb_common.AnyValue(intValue: Int64(value as int));
151+
}
152+
return pb_common.AnyValue();
153+
}
154+
155+
pb_common.AnyValue _attributeValueToProtobuf(Object value) {
156+
if (value is String) {
157+
return pb_common.AnyValue(stringValue: value);
158+
}
159+
if (value is bool) {
160+
return pb_common.AnyValue(boolValue: value);
161+
}
162+
if (value is double) {
163+
return pb_common.AnyValue(doubleValue: value);
164+
}
165+
if (value is int) {
166+
return pb_common.AnyValue(intValue: Int64(value));
167+
}
168+
if (value is List<String> || value is List<bool> || value is List<double> || value is List<int>) {
169+
final output = <pb_common.AnyValue>[];
170+
final values = value as List;
171+
for (final i in values) {
172+
output.add(_attributeValueToProtobuf(i));
173+
}
174+
return pb_common.AnyValue(arrayValue: pb_common.ArrayValue(values: output));
175+
}
176+
return pb_common.AnyValue();
177+
}
178+
}

0 commit comments

Comments
 (0)