diff --git a/lib/src/api/logs/logger.dart b/lib/src/api/logs/logger.dart index 927338b8..df9e77e7 100644 --- a/lib/src/api/logs/logger.dart +++ b/lib/src/api/logs/logger.dart @@ -2,12 +2,11 @@ // Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information import '../../../api.dart' as api; -import '../../../sdk.dart' as sdk; import 'log_record.dart'; abstract class Logger { void emit({ - sdk.Attributes? attributes, + List attributes = const [], api.Context? context, dynamic body, DateTime? observedTimestamp, diff --git a/lib/src/api/logs/logger_provider.dart b/lib/src/api/logs/logger_provider.dart index ae7be6ba..4c1977f6 100644 --- a/lib/src/api/logs/logger_provider.dart +++ b/lib/src/api/logs/logger_provider.dart @@ -7,7 +7,7 @@ import 'package:opentelemetry/src/api/logs/logger.dart'; abstract class LoggerProvider { /// Gets or creates a [Logger] instance. /// - /// The meter is identified by the combination of [name], [version], + /// The logger is identified by the combination of [name], [version], /// [schemaUrl] and [attributes]. The [name] SHOULD uniquely identify the /// instrumentation scope, such as the instrumentation library /// (e.g. io.opentelemetry.contrib.mongodb), package, module or class name. diff --git a/lib/src/api/logs/noop/noop_logger.dart b/lib/src/api/logs/noop/noop_logger.dart index 4c057448..57c33469 100644 --- a/lib/src/api/logs/noop/noop_logger.dart +++ b/lib/src/api/logs/noop/noop_logger.dart @@ -1,17 +1,17 @@ // Copyright 2021-2022 Workiva. // Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information +import 'package:opentelemetry/src/api/common/attribute.dart'; import 'package:opentelemetry/src/api/context/context.dart'; import 'package:opentelemetry/src/api/logs/log_record.dart'; import 'package:opentelemetry/src/api/logs/logger.dart'; -import 'package:opentelemetry/src/sdk/common/attributes.dart'; class NoopLogger implements Logger { const NoopLogger(); @override void emit({ - Attributes? attributes, + List attributes = const [], Context? context, dynamic body, DateTime? observedTimestamp, diff --git a/lib/src/experimental_api.dart b/lib/src/experimental_api.dart index f2933e07..1f24c4c5 100644 --- a/lib/src/experimental_api.dart +++ b/lib/src/experimental_api.dart @@ -10,13 +10,13 @@ export 'api/context/context_manager.dart' show ContextManager; export 'api/context/noop_context_manager.dart' show NoopContextManager; export 'api/context/zone_context.dart' show ZoneContext; export 'api/context/zone_context_manager.dart' show ZoneContextManager; -export 'api/metrics/counter.dart' show Counter; -export 'api/metrics/meter_provider.dart' show MeterProvider; -export 'api/metrics/meter.dart' show Meter; -export 'api/metrics/noop/noop_meter.dart' show NoopMeter; -export 'api/trace/nonrecording_span.dart' show NonRecordingSpan; -export 'api/logs/logger.dart' show Logger; export 'api/logs/log_record.dart' show Severity; +export 'api/logs/logger.dart' show Logger; export 'api/logs/logger_provider.dart' show LoggerProvider; export 'api/logs/noop/noop_logger.dart' show NoopLogger; export 'api/logs/noop/noop_logger_provider.dart' show NoopLoggerProvider; +export 'api/metrics/counter.dart' show Counter; +export 'api/metrics/meter.dart' show Meter; +export 'api/metrics/meter_provider.dart' show MeterProvider; +export 'api/metrics/noop/noop_meter.dart' show NoopMeter; +export 'api/trace/nonrecording_span.dart' show NonRecordingSpan; diff --git a/lib/src/experimental_sdk.dart b/lib/src/experimental_sdk.dart index 5536a699..654c5317 100644 --- a/lib/src/experimental_sdk.dart +++ b/lib/src/experimental_sdk.dart @@ -6,7 +6,17 @@ library experimental_sdk; import 'package:meta/meta.dart'; +export 'sdk/logs/export_result.dart' show ExportResult, ExportResultCode; +export 'sdk/logs/exporters/console_log_record_exporter.dart' show ConsoleLogRecordExporter; +export 'sdk/logs/exporters/inmemory_log_record_exporter.dart' show InMemoryLogRecordExporter; +export 'sdk/logs/exporters/log_record_exporter.dart' show LogRecordExporter; +export 'sdk/logs/log_record.dart' show ReadableLogRecord, ReadWriteLogRecord, LogRecord; +export 'sdk/logs/log_record_limit.dart' show LogRecordLimits; +export 'sdk/logs/logger.dart' show Logger; +export 'sdk/logs/logger_provider.dart' show LoggerProvider; +export 'sdk/logs/processors/batch_log_record_processor.dart' show BatchLogRecordProcessor; +export 'sdk/logs/processors/log_record_processor.dart' show LogRecordProcessor; export 'sdk/metrics/counter.dart' show Counter; -export 'sdk/metrics/meter_provider.dart' show MeterProvider; export 'sdk/metrics/meter.dart' show Meter; +export 'sdk/metrics/meter_provider.dart' show MeterProvider; export 'sdk/resource/resource.dart' show Resource; diff --git a/lib/src/sdk/common/limits.dart b/lib/src/sdk/common/limits.dart index 506c50ff..4923b88b 100644 --- a/lib/src/sdk/common/limits.dart +++ b/lib/src/sdk/common/limits.dart @@ -4,11 +4,11 @@ import 'package:meta/meta.dart'; import '../../../api.dart' as api; import '../../../sdk.dart' as sdk; +import '../../experimental_sdk.dart' as sdk; /// Applies given [sdk.SpanLimits] to a list of [api.SpanLink]s. @protected -List applyLinkLimits( - List links, sdk.SpanLimits limits) { +List applyLinkLimits(List links, sdk.SpanLimits limits) { final spanLink = []; for (final link in links) { @@ -27,8 +27,7 @@ List applyLinkLimits( for (final attr in link.attributes) { // if attributes num is already greater than maxNumAttributesPerLink // and this key doesn't exist in the list, drop it. - if (attributeMap.length >= limits.maxNumAttributesPerLink && - !attributeMap.containsKey(attr.key)) { + if (attributeMap.length >= limits.maxNumAttributesPerLink && !attributeMap.containsKey(attr.key)) { droppedAttributes++; continue; } @@ -49,8 +48,7 @@ List applyLinkLimits( } } - spanLink.add(api.SpanLink(link.context, - attributes: linkAttributes, droppedAttributes: droppedAttributes)); + spanLink.add(api.SpanLink(link.context, attributes: linkAttributes, droppedAttributes: droppedAttributes)); } return spanLink; } @@ -63,20 +61,46 @@ api.Attribute applyAttributeLimits(api.Attribute attr, sdk.SpanLimits limits) { if (attr.value is String) { attr = api.Attribute.fromString( - attr.key, - applyAttributeLengthLimit( - attr.value as String, limits.maxNumAttributeLength)); + attr.key, applyAttributeLengthLimit(attr.value as String, limits.maxNumAttributeLength)); } else if (attr.value is List) { final listString = attr.value as List; for (var j = 0; j < listString.length; j++) { - listString[j] = applyAttributeLengthLimit( - listString[j], limits.maxNumAttributeLength); + listString[j] = applyAttributeLengthLimit(listString[j], limits.maxNumAttributeLength); } attr = api.Attribute.fromStringList(attr.key, listString); } return attr; } +@protected +api.Attribute applyAttributeLimitsForLog( + api.Attribute attr, + sdk.LogRecordLimits limits, +) { + // if maxNumAttributeLength is less than zero, then it has unlimited length. + if (limits.attributeValueLengthLimit < 0) return attr; + + if (attr.value is String) { + return (attr.value as String).length > limits.attributeValueLengthLimit + ? api.Attribute.fromString(attr.key, (attr.value as String).substring(0, limits.attributeValueLengthLimit)) + : attr; + } else if (attr.value is List) { + final list = (attr.value as List); + List? truncated; + for (int i = 0; i < list.length; i++) { + final s = list[i]; + if (s.length > limits.attributeValueLengthLimit) { + truncated ??= List.from(list, growable: false); + truncated[i] = s.substring(0, limits.attributeValueLengthLimit); + } + } + if (truncated != null) { + return api.Attribute.fromStringList(attr.key, truncated); + } + } + return attr; +} + /// Truncate just strings which length is longer than configuration. /// Reference: https://github.com/open-telemetry/opentelemetry-java/blob/14ffacd1cdd22f5aa556eeda4a569c7f144eadf2/sdk/common/src/main/java/io/opentelemetry/sdk/internal/AttributeUtil.java#L80 @protected diff --git a/lib/src/sdk/logs/export_result.dart b/lib/src/sdk/logs/export_result.dart new file mode 100644 index 00000000..3807b095 --- /dev/null +++ b/lib/src/sdk/logs/export_result.dart @@ -0,0 +1,15 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +class ExportResult { + final ExportResultCode code; + final Exception? error; + final StackTrace? stackTrace; + + ExportResult({required this.code, this.error, this.stackTrace}); +} + +enum ExportResultCode { + success, + failed, +} diff --git a/lib/src/sdk/logs/exporters/console_log_record_exporter.dart b/lib/src/sdk/logs/exporters/console_log_record_exporter.dart new file mode 100644 index 00000000..b1650aa9 --- /dev/null +++ b/lib/src/sdk/logs/exporters/console_log_record_exporter.dart @@ -0,0 +1,61 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; + +/// This is implementation of [sdk.ReadWriteLogRecordExporter] that prints LogRecords to the +/// console. This class can be used for diagnostic purposes. +/// +/// NOTE: This [sdk.LogRecordExporter] is intended for diagnostics use only, output rendered to the console may change at any time. +class ConsoleLogRecordExporter implements sdk.LogRecordExporter { + @override + Future export(List logs) async { + return _sendLogRecords(logs); + } + + /// Shutdown the exporter. + @override + Future shutdown() async {} + + /// Showing logs in console + sdk.ExportResult _sendLogRecords(List logs) { + for (final log in logs) { + print(_makeObject(log)); + } + return sdk.ExportResult(code: sdk.ExportResultCode.success); + } + + /// converts logRecord info into more readable format + Map _makeObject(sdk.ReadableLogRecord log) { + final contextInfo = {}; + contextInfo.addAll({ + 'traceId': log.spanContext.traceId, + 'spanId': log.spanContext.spanId, + 'traceFlags': log.spanContext.traceFlags, + }); + return { + 'resource': { + 'attributes': { + for (final attribute in log.resource.attributes.keys) + attribute: log.resource.attributes.get(attribute), + }, + }, + 'instrumentationScope': { + 'name': log.instrumentationScope.name, + 'version': log.instrumentationScope.version, + 'schemaUrl': log.instrumentationScope.schemaUrl, + 'attributes': { + for (final attribute in log.instrumentationScope.attributes) attribute.key: attribute.value, + } + }, + 'timestamp': log.timeStamp, + 'severityText': log.severityText, + 'severityNumber': log.severityNumber, + 'body': log.body, + 'attributes': { + for (final attribute in log.attributes.keys) attribute: log.resource.attributes.get(attribute), + }, + ...contextInfo, + }; + } +} diff --git a/lib/src/sdk/logs/exporters/inmemory_log_record_exporter.dart b/lib/src/sdk/logs/exporters/inmemory_log_record_exporter.dart new file mode 100644 index 00000000..f2da176c --- /dev/null +++ b/lib/src/sdk/logs/exporters/inmemory_log_record_exporter.dart @@ -0,0 +1,43 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'package:meta/meta.dart'; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; + +/// This class can be used for testing purposes. It stores the exported LogRecords +/// in a list in memory that can be retrieved using the `getFinishedLogRecords()` +/// method. +class InMemoryLogRecordExporter implements sdk.LogRecordExporter { + var _finishedLogRecords = []; + + /// Indicates if the exporter has been "shutdown." + /// When false, exported log records will not be stored in-memory. + @protected + bool _stopped = false; + + @override + Future export(List logs) async { + if (_stopped) { + return sdk.ExportResult( + code: sdk.ExportResultCode.failed, + error: Exception('Exporter has been stopped'), + stackTrace: StackTrace.current, + ); + } + _finishedLogRecords.addAll(logs); + + return sdk.ExportResult(code: sdk.ExportResultCode.success); + } + + @override + Future shutdown() async { + _stopped = true; + reset(); + } + + List get finishedLogRecords => List.unmodifiable(_finishedLogRecords); + + void reset() { + _finishedLogRecords = []; + } +} diff --git a/lib/src/sdk/logs/exporters/log_record_exporter.dart b/lib/src/sdk/logs/exporters/log_record_exporter.dart new file mode 100644 index 00000000..aebafd34 --- /dev/null +++ b/lib/src/sdk/logs/exporters/log_record_exporter.dart @@ -0,0 +1,10 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; + +abstract class LogRecordExporter { + Future export(List logs); + + Future shutdown(); +} diff --git a/lib/src/sdk/logs/log_record.dart b/lib/src/sdk/logs/log_record.dart new file mode 100644 index 00000000..8aca2f7f --- /dev/null +++ b/lib/src/sdk/logs/log_record.dart @@ -0,0 +1,152 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'package:meta/meta.dart'; +import 'package:opentelemetry/api.dart' as api; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_api.dart' as api; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/common/limits.dart'; + +/// https://opentelemetry.io/docs/specs/otel/logs/sdk/#readwritelogrecord +abstract class ReadableLogRecord { + DateTime get timeStamp; + + DateTime get observedTimestamp; + + String get severityText; + + api.Severity get severityNumber; + + dynamic get body; + + sdk.Attributes get attributes; + + api.SpanContext get spanContext; + + sdk.Resource get resource; + + sdk.InstrumentationScope get instrumentationScope; + + int get droppedAttributesCount; +} + +abstract class ReadWriteLogRecord extends ReadableLogRecord { + set body(dynamic severity); + + set severityText(String severity); + + set severityNumber(api.Severity severity); +} + +class LogRecord implements ReadWriteLogRecord { + @override + final sdk.InstrumentationScope instrumentationScope; + + final sdk.Resource _resource; + + final sdk.TimeProvider _timeProvider; + final api.Context _context; + final sdk.LogRecordLimits logRecordLimits; + final DateTime? _timeStamp; + final DateTime? _observedTimestamp; + + bool _isReadonly = false; + String _severityText; + api.Severity _severityNumber; + dynamic _body; + int _totalAttributesCount = 0; + + final sdk.Attributes _attributes; + + @protected + LogRecord({ + required this.instrumentationScope, + required this.logRecordLimits, + api.Severity? severityNumber, + String? severityText, + List attributes = const [], + DateTime? timeStamp, + DateTime? observedTimestamp, + api.Context? context, + dynamic body, + sdk.Resource? resource, + sdk.TimeProvider? timeProvider, + }) : _severityText = severityText ?? api.Severity.unspecified.name, + _resource = resource ?? sdk.Resource([]), + _context = context ?? api.Context.current, + _body = body, + _attributes = sdk.Attributes.empty(), + _severityNumber = severityNumber ?? api.Severity.unspecified, + _timeStamp = timeStamp, + _observedTimestamp = observedTimestamp, + _timeProvider = timeProvider ?? sdk.DateTimeTimeProvider() { + if (attributes.isNotEmpty) setAttributes(attributes); + } + + + @override + sdk.Resource get resource => _resource; + + @override + sdk.Attributes get attributes => _attributes; + + @override + dynamic get body => _body; + + @override + set body(dynamic body) { + if (_isReadonly) return; + _body = body; + } + + @override + api.SpanContext get spanContext => api.spanContextFromContext(_context); + + @override + int get droppedAttributesCount => _totalAttributesCount - attributes.length; + + @override + DateTime get timeStamp => _timeStamp ?? DateTime.fromMicrosecondsSinceEpoch((_timeProvider.now ~/ 1000).toInt()); + + @override + DateTime get observedTimestamp => + _observedTimestamp ?? DateTime.fromMicrosecondsSinceEpoch((_timeProvider.now ~/ 1000).toInt()); + + @override + api.Severity get severityNumber => _severityNumber; + + @override + set severityNumber(api.Severity severity) { + if (_isReadonly) return; + _severityNumber = severity; + } + + @override + String get severityText => _severityText; + + @override + set severityText(String severity) { + if (_isReadonly) return; + _severityText = severity; + } + + void setAttributes(List attributes) { + attributes.forEach(setAttribute); + } + + void setAttribute(api.Attribute attribute) { + if (_isReadonly) return; + if (attribute.key.isEmpty) return; + if (logRecordLimits.attributeCountLimit == 0) return; + _totalAttributesCount += 1; + _attributes.add(applyAttributeLimitsForLog(attribute, logRecordLimits)); + } + + /// A LogRecordProcessor may freely modify logRecord for the duration of the OnEmit call. + /// If logRecord is needed after OnEmit returns (i.e. for asynchronous processing) only reads are permitted. + @internal + void makeReadonly() { + _isReadonly = true; + } +} diff --git a/lib/src/sdk/logs/log_record_limit.dart b/lib/src/sdk/logs/log_record_limit.dart new file mode 100644 index 00000000..2fb07d73 --- /dev/null +++ b/lib/src/sdk/logs/log_record_limit.dart @@ -0,0 +1,17 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +class LogRecordLimits { + final int _attributeCountLimit; + final int _attributeValueLengthLimit; + + const LogRecordLimits({ + int attributeCountLimit = 128, + int attributeValueLengthLimit = -1, + }) : _attributeCountLimit = attributeCountLimit, + _attributeValueLengthLimit = attributeValueLengthLimit; + + int get attributeCountLimit => _attributeCountLimit; + + int get attributeValueLengthLimit => _attributeValueLengthLimit; +} diff --git a/lib/src/sdk/logs/logger.dart b/lib/src/sdk/logs/logger.dart new file mode 100644 index 00000000..8276f5fe --- /dev/null +++ b/lib/src/sdk/logs/logger.dart @@ -0,0 +1,53 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'package:meta/meta.dart'; +import 'package:opentelemetry/api.dart' as api; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/api/context/context.dart'; +import 'package:opentelemetry/src/experimental_api.dart' as api; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; + +class Logger extends api.Logger { + final sdk.InstrumentationScope instrumentationScope; + final sdk.Resource _resource; + final sdk.LogRecordLimits logRecordLimits; + final sdk.TimeProvider timeProvider; + final List processors; + + @protected + Logger({ + required this.instrumentationScope, + required this.logRecordLimits, + required this.timeProvider, + this.processors = const [], + sdk.Resource? resource, + }) : _resource = resource ?? sdk.Resource([]); + + @override + void emit({ + List attributes = const [], + Context? context, + dynamic body, + DateTime? observedTimestamp, + api.Severity? severityNumber, + String? severityText, + DateTime? timeStamp, + }) { + final log = sdk.LogRecord( + logRecordLimits: logRecordLimits, + resource: _resource, + instrumentationScope: instrumentationScope, + context: context, + severityText: severityText, + severityNumber: severityNumber, + attributes: attributes, + body: body, + timeProvider: timeProvider, + ); + for (final processor in processors) { + processor.onEmit(log); + } + log.makeReadonly(); + } +} diff --git a/lib/src/sdk/logs/logger_config.dart b/lib/src/sdk/logs/logger_config.dart new file mode 100644 index 00000000..e54d4f47 --- /dev/null +++ b/lib/src/sdk/logs/logger_config.dart @@ -0,0 +1,14 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +// https://opentelemetry.io/docs/specs/otel/logs/sdk/#loggerconfig +class LoggerConfig { + /// If not explicitly set, + /// the disabled parameter SHOULD default to false ( i.e. Loggers are enabled by default). + /// If a Logger is disabled, it MUST behave equivalently to No-op Logger. + final bool disabled; + + const LoggerConfig({ + this.disabled = false, + }); +} diff --git a/lib/src/sdk/logs/logger_provider.dart b/lib/src/sdk/logs/logger_provider.dart new file mode 100644 index 00000000..c2fb0267 --- /dev/null +++ b/lib/src/sdk/logs/logger_provider.dart @@ -0,0 +1,71 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'package:opentelemetry/api.dart' as api; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_api.dart' as api; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:opentelemetry/src/sdk/logs/logger_config.dart'; +import 'package:quiver/core.dart'; + +const defaultLoggerName = 'opentelemetry'; + +// https://opentelemetry.io/docs/specs/otel/logs/sdk/#loggerprovider +class LoggerProvider implements api.LoggerProvider { + final Map _loggers = {}; + + final LoggerConfig _config; + + final List _processors; + + final sdk.Resource _resource; + + final sdk.LogRecordLimits _logRecordLimits; + + final sdk.TimeProvider _timeProvider; + + LoggerProvider({ + LoggerConfig config = const LoggerConfig(), + sdk.LogRecordLimits logRecordLimits = const LogRecordLimits(), + sdk.Resource? resource, + List? processors, + sdk.TimeProvider? timeProvider, + }) : _processors = processors ?? const [], + _config = config, + _logRecordLimits = logRecordLimits, + _resource = resource ?? sdk.Resource([]), + _timeProvider = timeProvider ?? sdk.DateTimeTimeProvider(); + + @override + api.Logger get( + String name, { + String version = '', + String schemaUrl = '', + List attributes = const [], + }) { + final loggerName = name.isNotEmpty ? name : defaultLoggerName; + final key = hash3(loggerName, version, schemaUrl); + if (_config.disabled) { + return api.NoopLogger(); + } + return _loggers.putIfAbsent( + key, + () => sdk.Logger( + logRecordLimits: _logRecordLimits, + resource: _resource, + instrumentationScope: sdk.InstrumentationScope(loggerName, version, schemaUrl, attributes), + timeProvider: _timeProvider, + processors: _processors + ), + ); + } + + void forceFlush() { + return _processors.forEach((e) => e.forceFlush()); + } + + void shutdown() { + return _processors.forEach((e) => e.shutdown()); + } +} diff --git a/lib/src/sdk/logs/processors/batch_log_record_processor.dart b/lib/src/sdk/logs/processors/batch_log_record_processor.dart new file mode 100644 index 00000000..3a7c76dd --- /dev/null +++ b/lib/src/sdk/logs/processors/batch_log_record_processor.dart @@ -0,0 +1,101 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'dart:async'; +import 'dart:math'; + +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; + +class BatchLogRecordProcessor extends sdk.LogRecordProcessor { + static const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE = 512; + static const OTEL_BLRP_MAX_QUEUE_SIZE = 2048; + static const OTEL_BLRP_SCHEDULE_DELAY = 5000; + static const OTEL_BLRP_EXPORT_TIMEOUT = 30000; + + final int maxExportBatchSize; + final int scheduledDelayMillis; + final int exportTimeoutMillis; + final int maxQueueSize; + final sdk.LogRecordExporter exporter; + + final _finishedLogRecords = []; + + bool _isShutDown = false; + Timer? _timer; + + BatchLogRecordProcessor({ + required this.exporter, + this.maxExportBatchSize = OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + this.scheduledDelayMillis = OTEL_BLRP_SCHEDULE_DELAY, + this.exportTimeoutMillis = OTEL_BLRP_EXPORT_TIMEOUT, + this.maxQueueSize = OTEL_BLRP_MAX_QUEUE_SIZE, + }) : assert( + maxQueueSize >= maxExportBatchSize, + 'BatchLogRecordProcessor: maxExportBatchSize must be smaller or equal to maxQueueSize', + ); + + @override + void onEmit(sdk.ReadableLogRecord logRecord) { + if (_isShutDown) return; + _addToBuffer(logRecord); + } + + @override + Future forceFlush() async { + if (_isShutDown) return; + await _flushAll(); + } + + @override + Future shutdown() async { + _isShutDown = true; + await _flushAll(); + await exporter.shutdown(); + } + + void _addToBuffer(sdk.ReadableLogRecord logRecord) { + if (_finishedLogRecords.length >= maxQueueSize) { + return; + } + _finishedLogRecords.add(logRecord); + _maybeStartTimer(); + } + + Future _flushAll() { + return Future(() async { + final promises = >[]; + final batchCount = (_finishedLogRecords.length / maxExportBatchSize).ceil(); + + for (var i = 0; i < batchCount; i++) { + promises.add(_flushOneBatch()); + } + + try { + await Future.wait(promises); + } catch (e) { + rethrow; + } + }); + } + + Future _flushOneBatch() async { + _clearTimer(); + if (_finishedLogRecords.isEmpty) return; + final result = _finishedLogRecords.sublist(0, min(maxExportBatchSize, _finishedLogRecords.length)); + _finishedLogRecords.removeRange(0, result.length); + await exporter.export(result); + } + + void _maybeStartTimer() { + if (_timer != null) return; + _timer = Timer(Duration(milliseconds: scheduledDelayMillis), () async { + await _flushOneBatch(); + _clearTimer(); + _maybeStartTimer(); + }); + } + + void _clearTimer() { + _timer = null; + } +} diff --git a/lib/src/sdk/logs/processors/log_record_processor.dart b/lib/src/sdk/logs/processors/log_record_processor.dart new file mode 100644 index 00000000..dfe2b714 --- /dev/null +++ b/lib/src/sdk/logs/processors/log_record_processor.dart @@ -0,0 +1,13 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; + +/// https://opentelemetry.io/docs/specs/otel/logs/sdk/#logrecordprocessor +abstract class LogRecordProcessor { + void onEmit(sdk.ReadWriteLogRecord logRecord); + + Future forceFlush(); + + Future shutdown(); +} diff --git a/lib/src/sdk/logs/processors/simple_log_record_processor.dart b/lib/src/sdk/logs/processors/simple_log_record_processor.dart new file mode 100644 index 00000000..63fa7914 --- /dev/null +++ b/lib/src/sdk/logs/processors/simple_log_record_processor.dart @@ -0,0 +1,49 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'dart:async'; + +import 'package:logging/logging.dart' as logging; +import 'package:meta/meta.dart'; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; + +class SimpleLogRecordProcessor implements sdk.LogRecordProcessor { + final logger = logging.Logger('opentelemetry.sdk.logs.simplelogrecordprocessor'); + final sdk.LogRecordExporter exporter; + bool _shutDownOnce = false; + + @visibleForTesting + final exportsCompletion = []; + + SimpleLogRecordProcessor({required this.exporter}); + + bool _isForcedFlushed = false; + + @override + void onEmit(sdk.ReadableLogRecord logRecord) { + if (_shutDownOnce) return; + final completer = Completer(); + exportsCompletion.add(completer); + exporter.export([logRecord]).then((result) { + if (result.code != sdk.ExportResultCode.success) { + logger.shout('SimpleLogRecordProcessor: log record export failed', result.error, result.stackTrace); + } + }).whenComplete(() { + completer.complete(); + if (_isForcedFlushed) return; + exportsCompletion.remove(completer); + }); + } + + @override + Future forceFlush() async { + _isForcedFlushed = true; + await Future.forEach(exportsCompletion, (completer) => completer.future); + } + + @override + Future shutdown() async { + _shutDownOnce = true; + await exporter.shutdown(); + } +} diff --git a/lib/src/sdk/platforms/web/time_providers/web_time_provider.dart b/lib/src/sdk/platforms/web/time_providers/web_time_provider.dart index ff4a0e32..4bb5375c 100644 --- a/lib/src/sdk/platforms/web/time_providers/web_time_provider.dart +++ b/lib/src/sdk/platforms/web/time_providers/web_time_provider.dart @@ -51,4 +51,7 @@ class WebTimeProvider implements TimeProvider { /// for more information. @override Int64 get now => fromDOMHighResTimeStamp(window.performance.now()); + + @override + double get nowNanoseconds => window.performance.now(); } diff --git a/lib/src/sdk/time_providers/datetime_time_provider.dart b/lib/src/sdk/time_providers/datetime_time_provider.dart index e543890a..8a79fc08 100644 --- a/lib/src/sdk/time_providers/datetime_time_provider.dart +++ b/lib/src/sdk/time_providers/datetime_time_provider.dart @@ -8,4 +8,7 @@ import 'time_provider.dart'; class DateTimeTimeProvider implements TimeProvider { @override Int64 get now => Int64(DateTime.now().microsecondsSinceEpoch) * 1000; + + @override + double get nowNanoseconds => DateTime.now().microsecondsSinceEpoch * 1000.0; } diff --git a/lib/src/sdk/time_providers/time_provider.dart b/lib/src/sdk/time_providers/time_provider.dart index 360a8248..10096b82 100644 --- a/lib/src/sdk/time_providers/time_provider.dart +++ b/lib/src/sdk/time_providers/time_provider.dart @@ -15,5 +15,8 @@ abstract class TimeProvider { static const int nanosecondsPerMillisecond = 1000000; /// The current time, in nanoseconds since Unix Epoch. + @Deprecated('This getter will be removed in future without replacement.') Int64 get now; + + double get nowNanoseconds; } diff --git a/test/unit/mocks.dart b/test/unit/mocks.dart index f53b4f6c..8b8ab42a 100644 --- a/test/unit/mocks.dart +++ b/test/unit/mocks.dart @@ -1,12 +1,15 @@ // Copyright 2021-2022 Workiva. // Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information +import 'dart:async'; + +import 'package:fixnum/fixnum.dart'; import 'package:http/http.dart' as http; import 'package:mocktail/mocktail.dart'; +import 'package:opentelemetry/sdk.dart'; import 'package:opentelemetry/src/api/context/context.dart'; import 'package:opentelemetry/src/api/trace/span.dart'; -import 'package:opentelemetry/src/sdk/trace/read_only_span.dart'; -import 'package:opentelemetry/src/sdk/trace/span_processors/span_processor.dart'; +import 'package:opentelemetry/src/experimental_sdk.dart'; class MockContext extends Mock implements Context {} @@ -17,3 +20,24 @@ class MockSpan extends Mock implements Span {} class MockReadOnlySpan extends Mock implements ReadOnlySpan {} class MockSpanProcessor extends Mock implements SpanProcessor {} + +class MockLogRecordProcessor extends Mock implements LogRecordProcessor {} + +class MockLogRecordExporter extends Mock implements LogRecordExporter {} + +class FakeTimeProvider extends Mock implements TimeProvider { + FakeTimeProvider({required Int64 now}) : _now = now; + final Int64 _now; + + @override + Int64 get now => _now; +} + +// reference: https://stackoverflow.com/a/38709440/7676003 +void Function() overridePrint(void Function() testFn, Function(String msg) onPrint) => () { + final spec = ZoneSpecification(print: (_, __, ___, msg) { + // Add to log instead of printing to stdout + onPrint(msg); + }); + return Zone.current.fork(specification: spec).run(testFn); + }; diff --git a/test/unit/sdk/common/limits_test.dart b/test/unit/sdk/common/limits_test.dart new file mode 100644 index 00000000..6365522b --- /dev/null +++ b/test/unit/sdk/common/limits_test.dart @@ -0,0 +1,27 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +import 'package:opentelemetry/api.dart' as api; +import 'package:opentelemetry/src/sdk/common/limits.dart'; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:test/test.dart'; + +void main() { + test('test log record limit', () { + final logLimit = applyAttributeLimitsForLog( + api.Attribute.fromString('key', 'value'), + LogRecordLimits(attributeValueLengthLimit: 2), + ); + + expect(logLimit.value, 'va'); + }); + + test('test log record limit list', () { + final logLimit = applyAttributeLimitsForLog( + api.Attribute.fromStringList('key', ['value1', 'value2']), + LogRecordLimits(attributeValueLengthLimit: 2), + ); + + expect(logLimit.value, ['va', 'va']); + }); +} diff --git a/test/unit/sdk/logs/exporters/console_log_record_exporter_test.dart b/test/unit/sdk/logs/exporters/console_log_record_exporter_test.dart new file mode 100644 index 00000000..a0bdf8fb --- /dev/null +++ b/test/unit/sdk/logs/exporters/console_log_record_exporter_test.dart @@ -0,0 +1,53 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +@TestOn('vm') +import 'package:fixnum/fixnum.dart'; +import 'package:opentelemetry/api.dart' as api; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_api.dart' as api; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:test/test.dart'; + +import '../../../mocks.dart'; + +void main() { + List log = []; + + tearDown(() { + log = []; + }); + + test( + 'Test exporter', + overridePrint( + () async { + final timeProvider = FakeTimeProvider(now: Int64(123)); + final severityDefault = api.Severity.unspecified; + final exporter = sdk.ConsoleLogRecordExporter(); + final tracer = sdk.TracerProviderBase().getTracer('test'); + final parent = tracer.startSpan('parent'); + final context = api.contextWithSpan(api.Context.current, parent); + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + context: context, + logRecordLimits: LogRecordLimits(), + timeProvider: timeProvider, + resource: sdk.Resource([api.Attribute.fromString('resource.name', 'test')]), + ) + ..makeReadonly() + ..body = 'Log Message'; + final spanContext = parent.spanContext; + + await exporter.export([logRecord]); + + expect(log, [ + '{resource: {attributes: {resource.name: test}}, instrumentationScope: {name: library_name, version: library_version, schemaUrl: url://schema, attributes: {}}, timestamp: ${DateTime.fromMicrosecondsSinceEpoch(timeProvider.now.toInt() ~/ 1000)}, severityText: ${severityDefault.name}, severityNumber: $severityDefault, body: null, attributes: {}, traceId: ${spanContext.traceId}, spanId: ${spanContext.spanId}, traceFlags: ${spanContext.traceFlags}}' + ]); + + await exporter.shutdown(); + }, + log.add, + )); +} diff --git a/test/unit/sdk/logs/exporters/inmemory_log_record_exporter_test.dart b/test/unit/sdk/logs/exporters/inmemory_log_record_exporter_test.dart new file mode 100644 index 00000000..87ad915a --- /dev/null +++ b/test/unit/sdk/logs/exporters/inmemory_log_record_exporter_test.dart @@ -0,0 +1,35 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +@TestOn('vm') +import 'package:fixnum/fixnum.dart'; +import 'package:opentelemetry/api.dart' as api; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:test/test.dart'; + +import '../../../mocks.dart'; + +void main() { + test('Test exporter', () async { + final exporter = sdk.InMemoryLogRecordExporter(); + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + timeProvider: FakeTimeProvider(now: Int64(123)), + resource: sdk.Resource([api.Attribute.fromString('resource.name', 'test')]), + ) + ..makeReadonly() + ..body = 'Log Message'; + + await exporter.export([logRecord]); + + expect(exporter.finishedLogRecords.length, 1); + expect(exporter.finishedLogRecords.first.instrumentationScope.name, 'library_name'); + + await exporter.shutdown(); + + expect(exporter.finishedLogRecords.length, 0); + }); +} diff --git a/test/unit/sdk/logs/log_record_test.dart b/test/unit/sdk/logs/log_record_test.dart new file mode 100644 index 00000000..3f274755 --- /dev/null +++ b/test/unit/sdk/logs/log_record_test.dart @@ -0,0 +1,132 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +@TestOn('vm') +import 'package:fixnum/fixnum.dart'; +import 'package:opentelemetry/api.dart' as api; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_api.dart' as api; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:test/test.dart'; + +import '../trace_provider_test.dart'; + +void main() { + test('set readonly will block values from being set', () { + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + timeProvider: FakeTimeProvider(now: Int64(123))) + ..makeReadonly() + ..body = 'Log Message' + ..severityNumber = api.Severity.debug + ..severityText = 'DEBUG' + ..setAttributes([api.Attribute.fromString('key', 'value')]) + ..setAttribute(api.Attribute.fromString('key2', 'value2')); + + expect(logRecord.body, null); + expect(logRecord.severityNumber, api.Severity.unspecified); + expect(logRecord.severityText, api.Severity.unspecified.name); + expect(logRecord.attributes.keys, const []); + expect(logRecord.droppedAttributesCount, 0); + expect(logRecord.timeStamp, + DateTime.fromMicrosecondsSinceEpoch(Int64(123).toInt() ~/ 1000)); + expect(logRecord.observedTimestamp, + DateTime.fromMicrosecondsSinceEpoch(Int64(123).toInt() ~/ 1000)); + }); + + test('logRecord call setter', () { + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + timeProvider: FakeTimeProvider(now: Int64(123))) + ..body = 'Log Message' + ..severityNumber = api.Severity.debug + ..severityText = 'DEBUG' + ..setAttributes([api.Attribute.fromString('key', 'value')]) + ..setAttribute(api.Attribute.fromString('key2', 'value2')); + + expect(logRecord.body, 'Log Message'); + expect(logRecord.severityNumber, api.Severity.debug); + expect(logRecord.severityText, 'DEBUG'); + expect(logRecord.attributes.keys, const ['key', 'key2']); + expect(logRecord.droppedAttributesCount, 0); + expect(logRecord.timeStamp, + DateTime.fromMicrosecondsSinceEpoch(Int64(123).toInt() ~/ 1000)); + expect(logRecord.observedTimestamp, + DateTime.fromMicrosecondsSinceEpoch(Int64(123).toInt() ~/ 1000)); + }); + + test('logRecord update same attribute will create attributesCount diff', () { + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + ) + ..setAttributes([api.Attribute.fromString('key2', 'value')]) + ..setAttribute(api.Attribute.fromString('key2', 'value2')); + + expect(logRecord.droppedAttributesCount, 1); + }); + + test('logRecord time stamp will be converted to Int64', () { + final now = DateTime.now(); + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + timeStamp: now, + observedTimestamp: now, + logRecordLimits: LogRecordLimits(), + ) + ..setAttributes([api.Attribute.fromString('key2', 'value')]) + ..setAttribute(api.Attribute.fromString('key2', 'value2')); + + expect(logRecord.timeStamp, now); + expect(logRecord.observedTimestamp, now); + }); + + test('logRecord set attribute', () { + final now = DateTime.now(); + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + timeStamp: now, + observedTimestamp: now, + logRecordLimits: LogRecordLimits(attributeValueLengthLimit: 2), + ) + ..setAttribute(api.Attribute.fromString('key', 'value')) + ..setAttribute(api.Attribute.fromBoolean('key2', true)) + ..setAttribute(api.Attribute.fromInt('key3', 1)) + ..setAttribute(api.Attribute.fromDouble('key4', 1.1)) + ..setAttribute(api.Attribute.fromStringList('key5', ['value2'])) + ..setAttribute(api.Attribute.fromBooleanList('key6', [true])) + ..setAttribute(api.Attribute.fromIntList('key7', [1])) + ..setAttribute(api.Attribute.fromDoubleList('key8', [1.1])); + + expect(logRecord.droppedAttributesCount, 0); + expect( + logRecord.attributes.keys, + const ['key', 'key2', 'key3', 'key4', 'key5', 'key6', 'key7', 'key8'], + ); + expect(logRecord.attributes.get('key'), 'va'); + }); + + test('logRecord set attribute with limit', () { + final now = DateTime.now(); + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope( + 'library_name', 'library_version', 'url://schema', []), + timeStamp: now, + observedTimestamp: now, + logRecordLimits: LogRecordLimits(attributeValueLengthLimit: 2), + ) + ..setAttribute(api.Attribute.fromString('key', 'value')) + ..setAttribute(api.Attribute.fromStringList('key2', ['value2'])); + + expect(logRecord.attributes.get('key'), 'va'); + expect(logRecord.attributes.get('key2'), const ['va']); + }); +} diff --git a/test/unit/sdk/logs/logger_provider_test.dart b/test/unit/sdk/logs/logger_provider_test.dart new file mode 100644 index 00000000..a6f1f90d --- /dev/null +++ b/test/unit/sdk/logs/logger_provider_test.dart @@ -0,0 +1,57 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +@TestOn('vm') +import 'package:fixnum/src/int64.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:test/test.dart'; + +import '../../mocks.dart'; + +void main() { + setUpAll(() { + registerFallbackValue(sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + )); + }); + + test('traceProvider custom timeProvider', () { + final mockTimeProvider = FakeTimeProvider(now: Int64(123)); + final mockProcessor1 = MockLogRecordProcessor(); + final provider = sdk.LoggerProvider(timeProvider: mockTimeProvider, processors: [mockProcessor1]); + provider.get('foo').emit(); + verify(() => mockProcessor1.onEmit(any( + that: predicate((a) { + if (a is! sdk.ReadWriteLogRecord) return false; + return a.timeStamp == DateTime.fromMicrosecondsSinceEpoch(123 ~/ 1000) && + a.observedTimestamp == DateTime.fromMicrosecondsSinceEpoch(123 ~/ 1000); + }), + ))).called(1); + }); + + test('loggerProvider force flushes all processors', () async { + final mockProcessor1 = MockLogRecordProcessor(); + final mockProcessor2 = MockLogRecordProcessor(); + when(mockProcessor1.forceFlush).thenAnswer((_) async => Future.value()); + when(mockProcessor2.forceFlush).thenAnswer((_) async => Future.value()); + sdk.LoggerProvider(processors: [mockProcessor1, mockProcessor2]).forceFlush(); + + verify(mockProcessor1.forceFlush).called(1); + verify(mockProcessor2.forceFlush).called(1); + }); + + test('loggerProvider shuts down all processors', () async { + final mockProcessor1 = MockLogRecordProcessor(); + final mockProcessor2 = MockLogRecordProcessor(); + when(mockProcessor1.shutdown).thenAnswer((_) async => Future.value()); + when(mockProcessor2.shutdown).thenAnswer((_) async => Future.value()); + sdk.LoggerProvider(processors: [mockProcessor1, mockProcessor2]).shutdown(); + + verify(mockProcessor1.shutdown).called(1); + verify(mockProcessor2.shutdown).called(1); + }); +} diff --git a/test/unit/sdk/logs/logger_test.dart b/test/unit/sdk/logs/logger_test.dart new file mode 100644 index 00000000..250ff1b8 --- /dev/null +++ b/test/unit/sdk/logs/logger_test.dart @@ -0,0 +1,47 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +@TestOn('vm') +import 'package:fixnum/fixnum.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:test/test.dart'; + +import '../../mocks.dart'; + +class MockLockRecordProcessor extends Mock implements sdk.LogRecordProcessor {} + +void main() { + setUpAll(() { + registerFallbackValue(sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + )); + }); + + test('emit new log', () { + final processor = MockLockRecordProcessor(); + sdk.Logger( + logRecordLimits: LogRecordLimits(), + instrumentationScope: sdk.InstrumentationScope( + 'library_name', + 'library_version', + 'url://schema', + [], + ), + resource: sdk.Resource([]), + processors: [processor], + timeProvider: FakeTimeProvider(now: Int64(60)), + ).emit(body: 'TEST!'); + + verify(() => processor.onEmit(any(that: predicate((it) { + return it.attributes.keys.isEmpty == true && + it.instrumentationScope.name == 'library_name' && + it.instrumentationScope.version == 'library_version' && + it.instrumentationScope.schemaUrl == 'url://schema' && + it.resource.attributes.keys.isEmpty == true; + })))).called(1); + }); +} diff --git a/test/unit/sdk/logs/processors/batch_log_record_processor_test.dart b/test/unit/sdk/logs/processors/batch_log_record_processor_test.dart new file mode 100644 index 00000000..6d71ec93 --- /dev/null +++ b/test/unit/sdk/logs/processors/batch_log_record_processor_test.dart @@ -0,0 +1,114 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +@TestOn('vm') +import 'package:fixnum/fixnum.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:opentelemetry/api.dart' as api; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_api.dart' as api; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:test/test.dart'; + +import '../../../mocks.dart'; + +void main() { + test('test processor', () async { + final exporter = MockLogRecordExporter(); + when(() => exporter.export(any())).thenAnswer((_) async => sdk.ExportResult(code: sdk.ExportResultCode.success)); + final processor = sdk.BatchLogRecordProcessor( + exporter: exporter, + scheduledDelayMillis: Duration.zero.inMilliseconds, + ); + + final tracer = sdk.TracerProviderBase().getTracer('test'); + final parent = tracer.startSpan('parent'); + final context = api.contextWithSpan(api.Context.current, parent); + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + context: context, + timeProvider: FakeTimeProvider(now: Int64(123))); + final logRecordA = logRecord + ..body = 'test log' + ..severityNumber = api.Severity.fatal3; + processor.onEmit(logRecordA); + + await Future.delayed(const Duration(milliseconds: 100)); + verify(() => exporter.export(any>( + that: predicate>((a) { + final first = a.first; + return first.body == 'test log' && + first.spanContext.spanId == parent.spanContext.spanId && + first.severityNumber == api.Severity.fatal3; + }), + ))).called(1); + }); + + test('processor shut down', () async { + final exporter = MockLogRecordExporter(); + when(exporter.shutdown).thenAnswer((_) async => sdk.ExportResult(code: sdk.ExportResultCode.success)); + + final processor = sdk.BatchLogRecordProcessor( + exporter: exporter, + scheduledDelayMillis: Duration.zero.inMilliseconds, + ); + + await processor.shutdown(); + + verify(exporter.shutdown).called(1); + }); + + test('processor shut down will not emit log', () async { + final exporter = MockLogRecordExporter(); + when(exporter.shutdown).thenAnswer((_) async => sdk.ExportResult(code: sdk.ExportResultCode.success)); + + final processor = sdk.BatchLogRecordProcessor( + exporter: exporter, + scheduledDelayMillis: Duration.zero.inMilliseconds, + ); + + await processor.shutdown(); + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + ); + final logRecordA = logRecord + ..body = 'test log' + ..severityNumber = api.Severity.fatal3; + processor.onEmit(logRecordA); + + await Future.delayed(const Duration(milliseconds: 100)); + + verifyNever(() => exporter.export(any())); + }); + + test('processor force flush', () async { + final exporter = MockLogRecordExporter(); + when(() => exporter.export(any())).thenAnswer((_) async => sdk.ExportResult(code: sdk.ExportResultCode.success)); + + final processor = sdk.BatchLogRecordProcessor( + exporter: exporter, + scheduledDelayMillis: Duration(seconds: 5).inMilliseconds, + ); + + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + ); + final logRecordA = logRecord + ..body = 'test log' + ..severityNumber = api.Severity.fatal3; + processor.onEmit(logRecordA); + + await processor.forceFlush(); + + verify(() => exporter.export(any>( + that: predicate>((a) { + final first = a.first; + return first.body == 'test log' && first.severityNumber == api.Severity.fatal3; + }), + ))).called(1); + }); +} diff --git a/test/unit/sdk/logs/processors/simple_log_record_processor_test.dart b/test/unit/sdk/logs/processors/simple_log_record_processor_test.dart new file mode 100644 index 00000000..1a6ba544 --- /dev/null +++ b/test/unit/sdk/logs/processors/simple_log_record_processor_test.dart @@ -0,0 +1,90 @@ +// Copyright 2021-2022 Workiva. +// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information + +@TestOn('vm') +import 'dart:async'; + +import 'package:fixnum/fixnum.dart'; +import 'package:logging/logging.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:opentelemetry/sdk.dart' as sdk; +import 'package:opentelemetry/src/experimental_sdk.dart' as sdk; +import 'package:opentelemetry/src/sdk/logs/log_record_limit.dart'; +import 'package:opentelemetry/src/sdk/logs/processors/simple_log_record_processor.dart'; +import 'package:test/test.dart'; + +import '../../../mocks.dart'; + +void main() { + late sdk.LogRecordExporter exporter; + late sdk.LogRecordProcessor processor; + + setUp(() { + exporter = MockLogRecordExporter(); + processor = SimpleLogRecordProcessor(exporter: exporter); + when(() => exporter.export(any())).thenAnswer((_) async => sdk.ExportResult(code: sdk.ExportResultCode.success)); + when(() => exporter.shutdown()).thenAnswer((_) => Future.value()); + }); + + test('executes export', () { + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + ); + + processor.onEmit(logRecord); + + verify(() => exporter.export([logRecord])).called(1); + }); + + test('executes export and fail', () async { + var errorMessage = ''; + Logger.root.onRecord.listen((data) { + errorMessage = data.message; + }); + final logRecord = sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + ); + + when(() => exporter.export(any())).thenAnswer((_) async => sdk.ExportResult(code: sdk.ExportResultCode.failed)); + + processor.onEmit(logRecord); + + await Future.delayed(const Duration(milliseconds: 50)); + + expect(errorMessage, 'SimpleLogRecordProcessor: log record export failed'); + }); + + test('shutdown exporters on forced flush', () async { + await processor.shutdown(); + + verify(exporter.shutdown).called(1); + }); + + test('forceFlush waits for all pending exports to complete', () async { + when(() => exporter.export(any())).thenAnswer((_) async { + await Future.delayed(const Duration(seconds: 1)); + return sdk.ExportResult(code: sdk.ExportResultCode.success); + }); + + // Emit two log records, creating two pending exports. + processor.onEmit( + sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + timeProvider: FakeTimeProvider(now: Int64(123))), + ); + await Future.delayed(Duration(milliseconds: 50)); + processor.onEmit( + sdk.LogRecord( + instrumentationScope: sdk.InstrumentationScope('library_name', 'library_version', 'url://schema', []), + logRecordLimits: LogRecordLimits(), + timeProvider: FakeTimeProvider(now: Int64(123))), + ); + expect((processor as SimpleLogRecordProcessor).exportsCompletion.length, 2); + // Ensure the exports are pending. + final flushFuture = processor.forceFlush(); + expect(flushFuture, completes); + }); +}