diff --git a/build.zig b/build.zig index 6225fcd..4708e6c 100644 --- a/build.zig +++ b/build.zig @@ -1,20 +1,8 @@ const std = @import("std"); const protobuf = @import("protobuf"); -// Although this function looks imperative, note that its job is to -// declaratively construct a build graph that will be executed by an external -// runner. pub fn build(b: *std.Build) void { - - // Standard target options allows the person running `zig build` to choose - // what target to build for. Here we do not override the defaults, which - // means any target is allowed, and the default is native. Other options - // for restricting supported target set are available. const target = b.standardTargetOptions(.{}); - - // Standard optimization options allow the person running `zig build` to select - // between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. Here we do not - // set a preferred release mode, allowing the user to decide how to optimize. const optimize = b.standardOptimizeOption(.{}); // Protobuf code generation from the OpenTelemetry proto files. @@ -40,7 +28,7 @@ pub fn build(b: *std.Build) void { }, }); - // debug protoc generation + // Debug protoc generation in all builds protoc_step.verbose = true; const gen_proto = b.step("gen-proto", "Generates Zig files from protobuf definitions"); @@ -57,9 +45,6 @@ pub fn build(b: *std.Build) void { sdk_lib.root_module.addImport("protobuf", protobuf_dep.module("protobuf")); - // This declares intent for the library to be installed into the standard - // location when the user invokes the "install" step (the default step when - // running `zig build`). b.installArtifact(sdk_lib); // Providing a way for the user to request running the unit tests. @@ -71,6 +56,7 @@ pub fn build(b: *std.Build) void { .root_source_file = b.path("src/sdk.zig"), .target = target, .optimize = optimize, + // Allow passing test filter using the build args. .filters = b.args orelse &[0][]const u8{}, }); sdk_unit_tests.root_module.addImport("protobuf", protobuf_dep.module("protobuf")); diff --git a/build.zig.zon b/build.zig.zon index 2ab5639..a112077 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -32,8 +32,8 @@ "build.zig", "build.zig.zon", "src", - // For example... - //"LICENSE", - //"README.md", + "examples", + "LICENSE", + "README.md", }, } diff --git a/examples/metrics/basic.zig b/examples/metrics/basic.zig index c6b66fc..0b52339 100644 --- a/examples/metrics/basic.zig +++ b/examples/metrics/basic.zig @@ -18,8 +18,7 @@ pub fn main() !void { var in_mem = try sdk.InMemoryExporter.init(fba.allocator()); // Create an exporter and a a metric reader to aggregate the metrics - const exporter = try sdk.MetricExporter.new(fba.allocator(), &in_mem.exporter); - const mr = try sdk.MetricReader.init(fba.allocator(), exporter); + const mr = try sdk.MetricReader.init(fba.allocator(), &in_mem.exporter); defer mr.shutdown(); // Register the metric reader to the meter provider @@ -40,7 +39,7 @@ pub fn main() !void { // Print the metrics const stored_metrics = try in_mem.fetch(); - defer stored_metrics.deinit(); + defer fba.allocator().free(stored_metrics); std.debug.print("metric: {any}\n", .{stored_metrics}); } diff --git a/examples/metrics/http_server.zig b/examples/metrics/http_server.zig index 8616c15..807240c 100644 --- a/examples/metrics/http_server.zig +++ b/examples/metrics/http_server.zig @@ -92,10 +92,7 @@ fn setupTelemetry(allocator: std.mem.Allocator) !OTel { var in_mem = try sdk.InMemoryExporter.init(allocator); errdefer in_mem.deinit(); - const exporter = try sdk.MetricExporter.new(allocator, &in_mem.exporter); - errdefer exporter.shutdown(); - - const mr = try sdk.MetricReader.init(allocator, exporter); + const mr = try sdk.MetricReader.init(allocator, &in_mem.exporter); try mp.addReader(mr); diff --git a/src/api/metrics/instrument.zig b/src/api/metrics/instrument.zig index 2a275c9..79642f2 100644 --- a/src/api/metrics/instrument.zig +++ b/src/api/metrics/instrument.zig @@ -3,6 +3,8 @@ const std = @import("std"); const spec = @import("spec.zig"); const Attribute = @import("../../attributes.zig").Attribute; const Attributes = @import("../../attributes.zig").Attributes; +const DataPoint = @import("measurement.zig").DataPoint; +const MeasurementsData = @import("measurement.zig").MeasurementsData; pub const Kind = enum { Counter, @@ -39,8 +41,8 @@ const instrumentData = union(enum) { Gauge_f64: *Gauge(f64), }; -/// Instrument contains all supported instruments. -/// When the Meter wants to create a new instrument, it calls the Get() method. +/// Instrument is a container of all supported instrument types. +/// When the Meter wants to create a new instrument, it calls the new() function. pub const Instrument = struct { const Self = @This(); @@ -49,7 +51,7 @@ pub const Instrument = struct { opts: InstrumentOptions, data: instrumentData, - pub fn Get(kind: Kind, opts: InstrumentOptions, allocator: std.mem.Allocator) !*Self { + pub fn new(kind: Kind, opts: InstrumentOptions, allocator: std.mem.Allocator) !*Self { // Validate name, unit anddescription, optionally throwing an error if non conformant. // See https://opentelemetry.io/docs/specs/otel/metrics/api/#instrument-name-syntax try spec.validateInstrumentOptions(opts); @@ -66,6 +68,7 @@ pub const Instrument = struct { pub fn counter(self: *Self, comptime T: type) !*Counter(T) { const c = try self.allocator.create(Counter(T)); c.* = Counter(T).init(self.allocator); + errdefer self.allocator.destroy(c); self.data = switch (T) { u16 => .{ .Counter_u16 = c }, u32 => .{ .Counter_u32 = c }, @@ -81,6 +84,7 @@ pub const Instrument = struct { pub fn upDownCounter(self: *Self, comptime T: type) !*Counter(T) { const c = try self.allocator.create(Counter(T)); c.* = Counter(T).init(self.allocator); + errdefer self.allocator.destroy(c); self.data = switch (T) { i16 => .{ .UpDownCounter_i16 = c }, i32 => .{ .UpDownCounter_i32 = c }, @@ -96,6 +100,7 @@ pub const Instrument = struct { pub fn histogram(self: *Self, comptime T: type) !*Histogram(T) { const h = try self.allocator.create(Histogram(T)); h.* = try Histogram(T).init(self.allocator, self.opts.histogramOpts); + errdefer self.allocator.destroy(h); self.data = switch (T) { u16 => .{ .Histogram_u16 = h }, u32 => .{ .Histogram_u32 = h }, @@ -113,6 +118,7 @@ pub const Instrument = struct { pub fn gauge(self: *Self, comptime T: type) !*Gauge(T) { const g = try self.allocator.create(Gauge(T)); g.* = Gauge(T).init(self.allocator); + errdefer self.allocator.destroy(g); self.data = switch (T) { i16 => .{ .Gauge_i16 = g }, i32 => .{ .Gauge_i32 = g }, @@ -136,6 +142,14 @@ pub const Instrument = struct { } self.allocator.destroy(self); } + + pub fn getInstrumentsData(self: Self, allocator: std.mem.Allocator) !MeasurementsData { + switch (self.data) { + inline else => |i| { + return i.measurementsData(allocator); + }, + } + } }; /// InstrumentOptions is used to configure the instrument. @@ -166,37 +180,43 @@ pub fn Counter(comptime T: type) type { const Self = @This(); allocator: std.mem.Allocator, - // We should keep track of the current value of the counter for each unique comibination of attribute. - // At the same time, we don't want to allocate memory for each attribute set that comes in. - // So we store all the counters in a single array and keep track of the state of each counter. - cumulative: std.AutoHashMap(?[]Attribute, T), + /// Record the measurements for the counter. + /// The list of measurements will be used when reading the data during a collection cycle. + /// The list is cleared after each collection cycle. + measurements: std.ArrayList(DataPoint(T)), fn init(allocator: std.mem.Allocator) Self { return Self{ - .cumulative = std.AutoHashMap(?[]Attribute, T).init(allocator), + .measurements = std.ArrayList(DataPoint(T)).init(allocator), .allocator = allocator, }; } fn deinit(self: *Self) void { - if (self.cumulative.count() > 0) { - var keyIter = self.cumulative.keyIterator(); - while (keyIter.next()) |key| { - if (key.*) |k| { - self.allocator.free(k); - } + for (self.measurements.items) |m| { + if (m.attributes) |attrs| { + self.allocator.free(attrs); } } - self.cumulative.deinit(); + self.measurements.deinit(); } /// Add the given delta to the counter, using the provided attributes. pub fn add(self: *Self, delta: T, attributes: anytype) !void { const attrs = try Attributes.from(self.allocator, attributes); - if (self.cumulative.getEntry(attrs)) |c| { - c.value_ptr.* += delta; - } else { - try self.cumulative.put(attrs, delta); + try self.measurements.append(DataPoint(T){ .value = delta, .attributes = attrs }); + } + + fn measurementsData(self: Self, allocator: std.mem.Allocator) !MeasurementsData { + switch (T) { + u16, u32, u64, i16, i32, i64 => { + var data = try allocator.alloc(DataPoint(i64), self.measurements.items.len); + for (self.measurements.items, 0..) |m, idx| { + data[idx] = .{ .attributes = m.attributes, .value = @intCast(m.value) }; + } + return .{ .int = data }; + }, + else => unreachable, } } }; @@ -213,10 +233,9 @@ pub fn Histogram(comptime T: type) type { allocator: std.mem.Allocator, options: HistogramOptions, - // Keep track of the current value of the counter for each unique comibination of attribute. - // At the same time, don't want allocate memory for each attribute set that comes in. - // Store all the counters in a single array and keep track of the state of each counter. - cumulative: std.AutoHashMap(?[]Attribute, T), + /// Keeps track of the recorded values for each set of attributes. + /// The measurements are cleared after each collection cycle. + dataPoints: std.ArrayList(DataPoint(T)), // Keeps track of how many values are summed for each set of attributes. counts: std.AutoHashMap(?[]Attribute, usize), @@ -239,7 +258,7 @@ pub fn Histogram(comptime T: type) type { return Self{ .allocator = allocator, .options = opts, - .cumulative = std.AutoHashMap(?[]Attribute, T).init(allocator), + .dataPoints = std.ArrayList(DataPoint(T)).init(allocator), .counts = std.AutoHashMap(?[]Attribute, usize).init(allocator), .buckets = buckets, .bucket_counts = std.AutoHashMap(?[]Attribute, []usize).init(allocator), @@ -247,16 +266,13 @@ pub fn Histogram(comptime T: type) type { } fn deinit(self: *Self) void { - // Cleanup the cumulative hashmap and the keys. - if (self.cumulative.count() > 0) { - var keyIter = self.cumulative.keyIterator(); - while (keyIter.next()) |key1| { - if (key1.*) |k| { - self.allocator.free(k); - } + // Cleanup the arraylist or measures and their attributes. + for (self.dataPoints.items) |m| { + if (m.attributes) |attrs| { + self.allocator.free(attrs); } } - self.cumulative.deinit(); + self.dataPoints.deinit(); // We don't need to free the counts or bucket_counts keys, // because the keys are pointers to the same optional // KeyValueList used in cumulative. @@ -267,12 +283,8 @@ pub fn Histogram(comptime T: type) type { /// Add the given value to the histogram, using the provided attributes. pub fn record(self: *Self, value: T, attributes: anytype) !void { const attrs = try Attributes.from(self.allocator, attributes); + try self.dataPoints.append(DataPoint(T){ .value = value, .attributes = attrs }); - if (self.cumulative.getEntry(attrs)) |c| { - c.value_ptr.* += value; - } else { - try self.cumulative.put(attrs, value); - } // Find the value for the bucket that the value falls in. // If the value is greater than the last bucket, it goes in the last bucket. // If the value is less than the first bucket, it goes in the first bucket. @@ -326,6 +338,26 @@ pub fn Histogram(comptime T: type) type { // The last bucket is returned if the value is greater than it. return self.buckets.len - 1; } + + fn measurementsData(self: Self, allocator: std.mem.Allocator) !MeasurementsData { + switch (T) { + u16, u32, u64, i16, i32, i64 => { + var data = try allocator.alloc(DataPoint(i64), self.dataPoints.items.len); + for (self.dataPoints.items, 0..) |m, idx| { + data[idx] = .{ .attributes = m.attributes, .value = @intCast(m.value) }; + } + return .{ .int = data }; + }, + f32, f64 => { + var data = try allocator.alloc(DataPoint(f64), self.dataPoints.items.len); + for (self.dataPoints.items, 0..) |m, idx| { + data[idx] = .{ .attributes = m.attributes, .value = @floatCast(m.value) }; + } + return .{ .double = data }; + }, + else => unreachable, + } + } }; } @@ -334,37 +366,62 @@ pub fn Gauge(comptime T: type) type { const Self = @This(); allocator: std.mem.Allocator, - values: std.AutoHashMap(?[]Attribute, T), + dataPoints: std.ArrayList(DataPoint(T)), fn init(allocator: std.mem.Allocator) Self { return Self{ .allocator = allocator, - .values = std.AutoHashMap(?[]Attribute, T).init(allocator), + .dataPoints = std.ArrayList(DataPoint(T)).init(allocator), }; } fn deinit(self: *Self) void { - if (self.values.count() > 0) { - var keyIter = self.values.keyIterator(); - while (keyIter.next()) |key| { - if (key.*) |k| { - self.allocator.free(k); - } + for (self.dataPoints.items) |m| { + if (m.attributes) |attrs| { + self.allocator.free(attrs); } } - self.values.deinit(); + self.dataPoints.deinit(); } /// Record the given value to the gauge, using the provided attributes. pub fn record(self: *Self, value: T, attributes: anytype) !void { const attrs = try Attributes.from(self.allocator, attributes); - try self.values.put(attrs, value); + try self.dataPoints.append(DataPoint(T){ .value = value, .attributes = attrs }); + } + + fn measurementsData(self: Self, allocator: std.mem.Allocator) !MeasurementsData { + switch (T) { + i16, i32, i64 => { + var data = try allocator.alloc(DataPoint(i64), self.dataPoints.items.len); + for (self.dataPoints.items, 0..) |m, idx| { + data[idx] = .{ .attributes = m.attributes, .value = @intCast(m.value) }; + } + return .{ .int = data }; + }, + f32, f64 => { + var data = try allocator.alloc(DataPoint(f64), self.dataPoints.items.len); + for (self.dataPoints.items, 0..) |m, idx| { + data[idx] = .{ .attributes = m.attributes, .value = @floatCast(m.value) }; + } + return .{ .double = data }; + }, + else => unreachable, + } } }; } const MeterProvider = @import("meter.zig").MeterProvider; +test "counter with unsupported type does not leak" { + const mp = try MeterProvider.default(); + defer mp.shutdown(); + const meter = try mp.getMeter(.{ .name = "my-meter" }); + const err = meter.createCounter(u1, .{ .name = "a-counter" }); + try std.testing.expectError(spec.FormatError.UnsupportedValueType, err); +} + test "meter can create counter instrument and record increase without attributes" { const mp = try MeterProvider.default(); defer mp.shutdown(); @@ -372,7 +429,7 @@ test "meter can create counter instrument and record increase without attributes var counter = try meter.createCounter(u32, .{ .name = "a-counter" }); try counter.add(10, .{}); - std.debug.assert(counter.cumulative.count() == 1); + std.debug.assert(counter.measurements.items.len == 1); } test "meter can create counter instrument and record increase with attributes" { @@ -387,14 +444,17 @@ test "meter can create counter instrument and record increase with attributes" { try counter.add(100, .{}); try counter.add(1000, .{}); - std.debug.assert(counter.cumulative.count() == 1); - std.debug.assert(counter.cumulative.get(null).? == 1100); + + std.debug.assert(counter.measurements.items.len == 2); + std.debug.assert(counter.measurements.items[0].value == 100); + std.debug.assert(counter.measurements.items[1].value == 1000); const val1: []const u8 = "some-value"; const val2: []const u8 = "another-value"; try counter.add(2, .{ "some-key", val1, "another-key", val2 }); - std.debug.assert(counter.cumulative.count() == 2); + std.debug.assert(counter.measurements.items.len == 3); + std.debug.assert(counter.measurements.items[2].value == 2); } test "meter can create histogram instrument and record value without explicit buckets" { @@ -408,7 +468,8 @@ test "meter can create histogram instrument and record value without explicit bu try histogram.record(15, .{}); try std.testing.expectEqual(.{ 1, 15 }, .{ histogram.min.?, histogram.max.? }); - std.debug.assert(histogram.cumulative.count() == 1); + std.debug.assert(histogram.dataPoints.items.len == 3); + const counts = histogram.bucket_counts.get(null).?; std.debug.assert(counts.len == spec.defaultHistogramBucketBoundaries.len); const expected_counts = &[_]usize{ 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; @@ -426,7 +487,8 @@ test "meter can create histogram instrument and record value with explicit bucke try histogram.record(15, .{}); try std.testing.expectEqual(.{ 1, 15 }, .{ histogram.min.?, histogram.max.? }); - std.debug.assert(histogram.cumulative.count() == 1); + std.debug.assert(histogram.dataPoints.items.len == 3); + const counts = histogram.bucket_counts.get(null).?; std.debug.assert(counts.len == 4); const expected_counts = &[_]usize{ 1, 1, 1, 0 }; @@ -441,8 +503,8 @@ test "meter can create gauge instrument and record value without attributes" { try gauge.record(42, .{}); try gauge.record(-42, .{}); - std.debug.assert(gauge.values.count() == 1); - std.debug.assert(gauge.values.get(null) == -42); + std.debug.assert(gauge.dataPoints.items.len == 2); + std.debug.assert(gauge.dataPoints.pop().value == -42); } test "meter creates upDownCounter and stores value" { @@ -454,24 +516,19 @@ test "meter creates upDownCounter and stores value" { try counter.add(10, .{}); try counter.add(-5, .{}); try counter.add(-4, .{}); - std.debug.assert(counter.cumulative.count() == 1); + std.debug.assert(counter.measurements.items.len == 3); // Validate the number stored is correct. // Empty attributes produce a null key. - if (counter.cumulative.get(null)) |c| { - std.debug.assert(c == 1); - } else { - std.debug.panic("Counter not found", .{}); + var summed: i32 = 0; + for (counter.measurements.items) |m| { + summed += m.value; } + std.debug.assert(summed == 1); try counter.add(1, .{ "some-key", @as(i64, 42) }); - std.debug.assert(counter.cumulative.count() == 2); - - var iter = counter.cumulative.valueIterator(); - while (iter.next()) |v| { - std.debug.assert(v.* == 1); - } + std.debug.assert(counter.measurements.items.len == 4); } test "instrument in meter and instrument in data are the same" { @@ -497,8 +554,41 @@ test "instrument in meter and instrument in data are the same" { if (meter.instruments.get(id)) |instrument| { std.debug.assert(instrument.kind == Kind.Counter); - const counter_value = instrument.data.Counter_u64.cumulative.get(null) orelse unreachable; - try std.testing.expectEqual(100, counter_value); + const counter_value = instrument.data.Counter_u64.measurements.popOrNull() orelse unreachable; + try std.testing.expectEqual(100, counter_value.value); + } else { + std.debug.panic("Counter {s} not found in meter {s} after creation", .{ name, meter.name }); + } +} + +test "instrument fetches measurements from inner" { + const mp = try MeterProvider.default(); + defer mp.shutdown(); + + const name = "test-instrument"; + + const meter = try mp.getMeter(.{ .name = "test-meter" }); + + var c = try meter.createCounter(u64, .{ .name = name }); + try c.add(100, .{}); + + const id = try spec.instrumentIdentifier( + std.testing.allocator, + name, + Kind.Counter.toString(), + "", + "", + ); + defer std.testing.allocator.free(id); + + if (meter.instruments.get(id)) |instrument| { + const measurements = try instrument.getInstrumentsData(std.testing.allocator); + defer switch (measurements) { + inline else => |list| std.testing.allocator.free(list), + }; + + std.debug.assert(measurements.int.len == 1); + try std.testing.expectEqual(@as(i64, 100), measurements.int[0].value); } else { std.debug.panic("Counter {s} not found in meter {s} after creation", .{ name, meter.name }); } diff --git a/src/api/metrics/measurement.zig b/src/api/metrics/measurement.zig index c962956..40bd202 100644 --- a/src/api/metrics/measurement.zig +++ b/src/api/metrics/measurement.zig @@ -1,26 +1,54 @@ const std = @import("std"); -const a = @import("../../attributes.zig"); -const Attribute = a.Attribute; -const Attributes = a.Attributes; +const ArrayList = std.ArrayList; -/// A measurement is a value recorded with an optional set of attributes. +const Attribute = @import("../../attributes.zig").Attribute; +const Attributes = @import("../../attributes.zig").Attributes; +const Kind = @import("instrument.zig").Kind; +const InstrumentOptions = @import("instrument.zig").InstrumentOptions; + +/// A value recorded with an optional set of attributes. /// It represents a single data point collected from an instrument. -pub fn Measurement(comptime T: type) type { +pub fn DataPoint(comptime T: type) type { return struct { const Self = @This(); value: T, attributes: ?[]Attribute = null, + // TODO: consider adding a timestamp field }; } -test "measurement with attributes" { +test "datapoint with attributes" { const key = "name"; const attrs = try Attributes.from(std.testing.allocator, .{ key, true }); defer std.testing.allocator.free(attrs.?); - const m = Measurement(u32){ .value = 42, .attributes = attrs }; + const m = DataPoint(u32){ .value = 42, .attributes = attrs }; try std.testing.expect(m.value == 42); } -test "measurement lists are paired with attributes" {} +/// A union of measurements with either integer or double values. +/// This is used to represent the data collected by a meter. +pub const MeasurementsData = union(enum) { + int: []DataPoint(i64), + double: []DataPoint(f64), +}; + +/// A set of data points with a series of metadata coming from the meter and the instrument. +/// Holds the data collected by a single instrument inside a meter. +pub const Measurements = struct { + meterName: []const u8, + meterAttributes: ?[]Attribute = null, + meterSchemaUrl: ?[]const u8 = null, + + instrumentKind: Kind, + instrumentOptions: InstrumentOptions, + + data: MeasurementsData, + + pub fn deinit(self: *Measurements, allocator: std.mem.Allocator) void { + switch (self.data) { + inline else => |list| allocator.free(list), + } + } +}; diff --git a/src/api/metrics/meter.zig b/src/api/metrics/meter.zig index 7052aba..786e0db 100644 --- a/src/api/metrics/meter.zig +++ b/src/api/metrics/meter.zig @@ -3,6 +3,10 @@ const std = @import("std"); const spec = @import("spec.zig"); const Attribute = @import("../../attributes.zig").Attribute; const Attributes = @import("../../attributes.zig").Attributes; +const DataPoint = @import("measurement.zig").DataPoint; + +const MeasurementsData = @import("measurement.zig").MeasurementsData; +const Measurements = @import("measurement.zig").Measurements; const Instrument = @import("instrument.zig").Instrument; const Kind = @import("instrument.zig").Kind; @@ -122,12 +126,14 @@ const Meter = struct { instruments: std.StringHashMap(*Instrument), allocator: std.mem.Allocator, + mx: std.Thread.Mutex = std.Thread.Mutex{}, + const Self = @This(); /// Create a new Counter instrument using the specified type as the value type. /// This is a monotonic counter that can only be incremented. pub fn createCounter(self: *Self, comptime T: type, options: InstrumentOptions) !*Counter(T) { - var i = try Instrument.Get(.Counter, options, self.allocator); + var i = try Instrument.new(.Counter, options, self.allocator); const c = try i.counter(T); errdefer self.allocator.destroy(c); try self.registerInstrument(i); @@ -138,7 +144,7 @@ const Meter = struct { /// Create a new UpDownCounter instrument using the specified type as the value type. /// This is a counter that can be incremented and decremented. pub fn createUpDownCounter(self: *Self, comptime T: type, options: InstrumentOptions) !*Counter(T) { - var i = try Instrument.Get(.UpDownCounter, options, self.allocator); + var i = try Instrument.new(.UpDownCounter, options, self.allocator); const c = try i.upDownCounter(T); errdefer self.allocator.destroy(c); try self.registerInstrument(i); @@ -149,7 +155,7 @@ const Meter = struct { /// Create a new Histogram instrument using the specified type as the value type. /// A histogram is a metric that samples observations and counts them in different buckets. pub fn createHistogram(self: *Self, comptime T: type, options: InstrumentOptions) !*Histogram(T) { - var i = try Instrument.Get(.Histogram, options, self.allocator); + var i = try Instrument.new(.Histogram, options, self.allocator); const h = try i.histogram(T); errdefer self.allocator.destroy(h); try self.registerInstrument(i); @@ -161,7 +167,7 @@ const Meter = struct { /// A gauge is a metric that represents a single numerical value that can arbitrarily go up and down, /// and represents a point-in-time value. pub fn createGauge(self: *Self, comptime T: type, options: InstrumentOptions) !*Gauge(T) { - var i = try Instrument.Get(.Gauge, options, self.allocator); + var i = try Instrument.new(.Gauge, options, self.allocator); const g = try i.gauge(T); errdefer self.allocator.destroy(g); try self.registerInstrument(i); @@ -173,6 +179,9 @@ const Meter = struct { // Name is case-insensitive. // The remaining are also forming the identifier. fn registerInstrument(self: *Self, instrument: *Instrument) !void { + self.mx.lock(); + defer self.mx.unlock(); + const id = try spec.instrumentIdentifier( self.allocator, instrument.opts.name, @@ -342,7 +351,7 @@ test "meter provider adds multiple metric readers" { std.debug.assert(mp.readers.items.len == 2); } -test "same metric reader cannot be registered with multiple providers" { +test "metric reader cannot be registered with multiple providers" { const mp1 = try MeterProvider.init(std.testing.allocator); defer mp1.shutdown(); @@ -356,7 +365,7 @@ test "same metric reader cannot be registered with multiple providers" { try std.testing.expectError(spec.ResourceError.MetricReaderAlreadyAttached, err); } -test "same metric reader cannot be registered twice on same meter provider" { +test "metric reader cannot be registered twice on same meter provider" { const mp1 = try MeterProvider.init(std.testing.allocator); defer mp1.shutdown(); @@ -411,3 +420,200 @@ test "meter provider with arena allocator" { try counter.add(1, .{ "author", meVal }); } + +const view = @import("../../sdk/metrics/view.zig"); + +/// AggregatedMetrics is a collection of metrics that have been aggregated using the +/// MetricReader's temporality and aggregation functions. +pub const AggregatedMetrics = struct { + fn deduplicate(allocator: std.mem.Allocator, instr: *Instrument, aggregation: view.Aggregation) !MeasurementsData { + // This function is only called on read/export + // which is much less frequent than other SDK operations (e.g. counter add). + // TODO: update to @branchHint in 0.14+ + @setCold(true); + + const allMeasurements: MeasurementsData = try instr.getInstrumentsData(allocator); + defer { + switch (allMeasurements) { + inline else => |list| allocator.free(list), + } + } + + switch (allMeasurements) { + .int => { + var deduped = std.ArrayList(DataPoint(i64)).init(allocator); + + var temp = std.HashMap( + Attributes, + i64, + Attributes.HashContext, + std.hash_map.default_max_load_percentage, + ).init(allocator); + defer temp.deinit(); + + // iterate over all measurements and deduplicate them by applying the aggregation function + // using the attributes as the key. + for (allMeasurements.int) |measure| { + switch (aggregation) { + .Drop => return MeasurementsData{ .int = &[_]DataPoint(i64){} }, + .Sum, .ExplicitBucketHistogram => { + const key = Attributes.with(measure.attributes); + const value = measure.value; + if (temp.get(key)) |v| { + const newValue = v + value; + try temp.put(key, newValue); + } else { + try temp.put(key, value); + } + }, + .LastValue => { + const key = Attributes.with(measure.attributes); + try temp.put(key, measure.value); + }, + } + } + + var iter = temp.iterator(); + while (iter.next()) |entry| { + try deduped.append(DataPoint(i64){ .attributes = entry.key_ptr.*.attributes, .value = entry.value_ptr.* }); + } + return .{ .int = try deduped.toOwnedSlice() }; + }, + .double => { + var deduped = std.ArrayList(DataPoint(f64)).init(allocator); + + var temp = std.AutoHashMap(Attributes, f64).init(allocator); + defer temp.deinit(); + + for (allMeasurements.double) |measure| { + switch (aggregation) { + .Drop => return MeasurementsData{ .double = &[_]DataPoint(f64){} }, + .Sum, .ExplicitBucketHistogram => { + const key = Attributes.with(measure.attributes); + const value = measure.value; + if (temp.get(key)) |v| { + const newValue = v + value; + try temp.put(key, newValue); + } else { + try temp.put(key, value); + } + }, + .LastValue => { + const key = Attributes.with(measure.attributes); + try temp.put(key, measure.value); + }, + } + } + + var iter = temp.iterator(); + while (iter.next()) |entry| { + try deduped.append(DataPoint(f64){ .attributes = entry.key_ptr.*.attributes, .value = entry.value_ptr.* }); + } + return .{ .double = try deduped.toOwnedSlice() }; + }, + } + } + + /// Fetch the aggreagted metrics from the meter. + /// Each instrument is an entry of the slice. + /// Caller owns the returned memory and it should be freed using the AggregatedMetrics allocator. + pub fn fetch(allocator: std.mem.Allocator, meter: *Meter, aggregation: view.AggregationSelector) ![]Measurements { + @setCold(true); + + meter.mx.lock(); + defer meter.mx.unlock(); + + var result = try allocator.alloc(Measurements, meter.instruments.count()); + + var iter = meter.instruments.valueIterator(); + var i: usize = 0; + while (iter.next()) |instr| { + result[i] = Measurements{ + .meterName = meter.name, + .meterSchemaUrl = meter.schema_url, + .meterAttributes = meter.attributes, + .instrumentKind = instr.*.kind, + .instrumentOptions = instr.*.opts, + .data = try deduplicate(allocator, instr.*, aggregation(instr.*.kind)), + }; + i += 1; + } + return result; + } +}; + +test "aggregated metrics deduplicated from meter without attributes" { + const mp = try MeterProvider.init(std.testing.allocator); + defer mp.shutdown(); + const meter = try mp.getMeter(.{ .name = "test", .schema_url = "http://example.com" }); + var counter = try meter.createCounter(u64, .{ .name = "test-counter" }); + try counter.add(1, .{}); + try counter.add(3, .{}); + + var iter = meter.instruments.valueIterator(); + const instr = iter.next() orelse unreachable; + + const deduped = try AggregatedMetrics.deduplicate(std.testing.allocator, instr.*, .Sum); + defer switch (deduped) { + inline else => |m| std.testing.allocator.free(m), + }; + + try std.testing.expectEqualDeep(DataPoint(i64){ .value = 4 }, deduped.int[0]); +} + +test "aggregated metrics deduplicated from meter with attributes" { + const mp = try MeterProvider.init(std.testing.allocator); + defer mp.shutdown(); + + const meterVal: []const u8 = "meter_val"; + const meter = try mp.getMeter(.{ + .name = "test", + .schema_url = "http://example.com", + .attributes = try Attributes.from(std.testing.allocator, .{ "meter_attr", meterVal }), + }); + var counter = try meter.createCounter(u64, .{ .name = "test-counter" }); + const val: []const u8 = "test"; + try counter.add(1, .{ "key", val }); + try counter.add(3, .{ "key", val }); + + var iter = meter.instruments.valueIterator(); + const instr = iter.next() orelse unreachable; + + const deduped = try AggregatedMetrics.deduplicate(std.testing.allocator, instr.*, .Sum); + defer switch (deduped) { + inline else => |m| std.testing.allocator.free(m), + }; + + const attrs = try Attributes.from(std.testing.allocator, .{ "key", val }); + defer if (attrs) |a| std.testing.allocator.free(a); + + try std.testing.expectEqualDeep(DataPoint(i64){ + .attributes = attrs, + .value = 4, + }, deduped.int[0]); +} + +test "aggregated metrics fetch to owned slice" { + const mp = try MeterProvider.init(std.testing.allocator); + defer mp.shutdown(); + + const meter = try mp.getMeter(.{ .name = "test", .schema_url = "http://example.com" }); + var counter = try meter.createCounter(u64, .{ .name = "test-counter" }); + try counter.add(1, .{}); + try counter.add(3, .{}); + + const result = try AggregatedMetrics.fetch(std.testing.allocator, meter, view.DefaultAggregationFor); + defer { + for (result) |m| { + var data = m; + data.deinit(std.testing.allocator); + } + std.testing.allocator.free(result); + } + + try std.testing.expectEqual(1, result.len); + try std.testing.expectEqualStrings(meter.name, result[0].meterName); + try std.testing.expectEqualStrings(meter.schema_url.?, result[0].meterSchemaUrl.?); + try std.testing.expectEqualStrings("test-counter", result[0].instrumentOptions.name); + try std.testing.expectEqual(4, result[0].data.int[0].value); +} diff --git a/src/attributes.zig b/src/attributes.zig index bc3d0f6..531708a 100644 --- a/src/attributes.zig +++ b/src/attributes.zig @@ -9,13 +9,13 @@ fn keyValue(comptime T: type) type { fn resolve(self: keyValue(T)) Attribute { return Attribute{ - .name = self.key, + .key = self.key, .value = switch (@TypeOf(self.value)) { bool => .{ .bool = self.value }, []const u8, [:0]const u8, *[:0]const u8 => .{ .string = self.value }, []u8, [:0]u8, *const [:0]u8 => .{ .string = self.value }, - i64 => .{ .int = self.value }, - f64 => .{ .double = self.value }, + i16, i32, i64, u16, u32, u64 => .{ .int = @intCast(self.value) }, + f32, f64 => .{ .double = @floatCast(self.value) }, else => @compileError("unsupported value type for attribute " ++ @typeName(@TypeOf(self.value))), }, }; @@ -23,22 +23,130 @@ fn keyValue(comptime T: type) type { }; } +/// Represents a value that can be stored in an Attribute. pub const AttributeValue = union(enum) { bool: bool, string: []const u8, int: i64, double: f64, + + fn toString(self: AttributeValue, allocator: std.mem.Allocator) ![]const u8 { + switch (self) { + .bool => { + const ret: []const u8 = if (self.bool) "0" else "1"; + return allocator.dupe(u8, ret); + }, + .string => return allocator.dupe(u8, self.string), + .int => { + var buf = [_]u8{0} ** 64; + const printed = std.fmt.formatIntBuf(&buf, self.int, 10, .lower, .{}); + return allocator.dupe(u8, buf[0..printed]); + }, + .double => { + var buf = [_]u8{0} ** 64; + const printed = try std.fmt.formatFloat(&buf, self.double, .{}); + return allocator.dupe(u8, printed[0..printed.len]); + }, + } + } + + fn toStringNoAlloc(self: AttributeValue) []const u8 { + switch (self) { + .bool => return if (self.bool) "0" else "1", + .string => return self.string, + .int => { + var buf = [_]u8{0} ** 64; + const printed = std.fmt.formatIntBuf(&buf, self.int, 10, .lower, .{}); + return buf[0..printed]; + }, + .double => { + var buf = [_]u8{0} ** 64; + const printed = std.fmt.formatFloat(&buf, self.double, .{ .mode = .decimal, .precision = 6 }) catch "NaN"; + return printed[0..printed.len]; + }, + } + } + + fn hash(self: AttributeValue) u64 { + var h = std.hash.Wyhash.init(0); + switch (self) { + .bool => { + const ret: []const u8 = if (self.bool) "0" else "1"; + h.update(ret); + }, + .string => return h.update(self.string), + .int => { + var buf = [_]u8{0} ** 64; + const printed = std.fmt.formatIntBuf(&buf, self.int, 10, .lower, .{}); + h.update(buf[0..printed]); + }, + .double => { + var buf = [_]u8{0} ** 64; + const printed = try std.fmt.formatFloat(&buf, self.double, .{}); + h.update(printed[0..printed.len]); + }, + } + return h.final(); + } }; +/// Represents a key-value pair. pub const Attribute = struct { - name: []const u8, + key: []const u8, value: AttributeValue, + + // Caller owns the memory returned by this function and shold free it. + fn toString(self: Attribute, allocator: std.mem.Allocator) ![]const u8 { + var buf = [_]u8{0} ** 1024; + const value = try self.value.toString(allocator); + defer allocator.free(value); + + const ret = try std.fmt.bufPrint(&buf, "{s}:{s}", .{ self.key, value }); + return allocator.dupe(u8, ret[0..ret.len]); + } }; /// Creates a slice of attributes from a list of key-value pairs. /// Caller owns the returned memory and should free the slice when done /// through the same allocator. pub const Attributes = struct { + const Self = @This(); + attributes: ?[]Attribute = null, + + pub fn with(attributes: ?[]Attribute) Self { + return Self{ .attributes = attributes }; + } + + pub const HashContext = struct { + pub fn hash(_: HashContext, self: Self) u64 { + var h = std.hash.Wyhash.init(0); + const attrs = self.attributes orelse &[_]Attribute{}; + for (attrs) |attr| { + h.update(attr.key); + h.update(attr.value.toStringNoAlloc()); + } + return h.final(); + } + pub fn eql(_: HashContext, a: Self, b: Self) bool { + const aAttrs = a.attributes orelse &[_]Attribute{}; + const bAttrs = b.attributes orelse &[_]Attribute{}; + if (aAttrs.len != bAttrs.len) { + return false; + } + var compared: usize = 0; + for (aAttrs) |aAttr| { + for (bAttrs) |bAttr| { + if (std.mem.eql(u8, aAttr.key, bAttr.key) and std.meta.eql(aAttr.value, bAttr.value)) { + compared += 1; + } + } + } + return compared == aAttrs.len; + } + }; + + /// Creates a slice of attributes from a list of key-value pairs. + /// Caller owns the returned memory and should free the slice when done via the same allocator. pub fn from(allocator: std.mem.Allocator, keyValues: anytype) !?[]Attribute { // Straight copied from the zig std library: std.fmt. // Check if the argument is a tuple. @@ -74,21 +182,38 @@ pub const Attributes = struct { } }; +test "attribute to string" { + const attr = Attribute{ .key = "name", .value = .{ .string = "value" } }; + const str = try attr.toString(std.testing.allocator); + defer std.testing.allocator.free(str); + + try std.testing.expectEqualStrings("name:value", str); +} + +test "attribute empty string to string" { + const attrs = &[_]Attribute{}; + for (attrs) |attr| { + const str = try attr.toString(std.testing.allocator); + defer std.testing.allocator.free(str); + try std.testing.expectEqualStrings("", str); + } +} + test "attributes are read from list of strings" { const val1: []const u8 = "value1"; const val2: []const u8 = "value2"; const attributes = try Attributes.from(std.testing.allocator, .{ "name", val1, "name2", val2, - "name3", @as(i64, 456), + "name3", @as(u64, 456), "name4", false, }); defer if (attributes) |a| std.testing.allocator.free(a); try std.testing.expect(attributes.?.len == 4); - try std.testing.expectEqualStrings("name", attributes.?[0].name); + try std.testing.expectEqualStrings("name", attributes.?[0].key); try std.testing.expectEqualStrings("value1", attributes.?[0].value.string); - try std.testing.expectEqualStrings("name2", attributes.?[1].name); + try std.testing.expectEqualStrings("name2", attributes.?[1].key); try std.testing.expectEqualStrings("value2", attributes.?[1].value.string); try std.testing.expectEqual(@as(i64, 456), attributes.?[2].value.int); try std.testing.expectEqual(false, attributes.?[3].value.bool); @@ -99,3 +224,49 @@ test "attributes from unit return null" { defer if (attributes) |a| std.testing.allocator.free(a); try std.testing.expectEqual(null, attributes); } + +test "attributes built for slice" { + const val1: []const u8 = "value1"; + const val2: []const u8 = "value2"; + + var list = [_]Attribute{ + .{ .key = "name", .value = .{ .string = val1 } }, + .{ .key = "name2", .value = .{ .string = val2 } }, + .{ .key = "name3", .value = .{ .int = @as(u64, 456) } }, + .{ .key = "name4", .value = .{ .bool = false } }, + }; + + const attrs = Attributes.with(&list); + try std.testing.expect(attrs.attributes.?.len == 4); + try std.testing.expectEqualStrings("name", attrs.attributes.?[0].key); + try std.testing.expectEqualStrings("value1", attrs.attributes.?[0].value.string); + try std.testing.expectEqualStrings("name2", attrs.attributes.?[1].key); + try std.testing.expectEqualStrings("value2", attrs.attributes.?[1].value.string); + try std.testing.expectEqual(@as(i64, 456), attrs.attributes.?[2].value.int); + try std.testing.expectEqual(false, attrs.attributes.?[3].value.bool); +} + +test "attributes equality" { + const val1: []const u8 = "value1"; + const val2: []const u8 = "value2"; + + var list1 = [_]Attribute{ + .{ .key = "name", .value = .{ .string = val1 } }, + .{ .key = "name2", .value = .{ .string = val2 } }, + .{ .key = "name3", .value = .{ .int = @as(u64, 456) } }, + .{ .key = "name4", .value = .{ .bool = false } }, + }; + + var list2 = [_]Attribute{ + .{ .key = "name", .value = .{ .string = val1 } }, + .{ .key = "name2", .value = .{ .string = val2 } }, + .{ .key = "name3", .value = .{ .int = @as(u64, 456) } }, + .{ .key = "name4", .value = .{ .bool = false } }, + }; + + const a1 = Attributes.with(&list1); + const a2 = Attributes.with(&list2); + + try std.testing.expectEqualDeep(a1, a2); + try std.testing.expect(Attributes.HashContext.eql(Attributes.HashContext{}, a1, a2)); +} diff --git a/src/sdk.zig b/src/sdk.zig index 717be00..5f9fc9c 100644 --- a/src/sdk.zig +++ b/src/sdk.zig @@ -12,7 +12,7 @@ test { pub const MeterProvider = @import("api/metrics/meter.zig").MeterProvider; pub const MetricReader = @import("sdk/metrics/reader.zig").MetricReader; pub const MetricExporter = @import("sdk/metrics/exporter.zig").MetricExporter; -pub const InMemoryExporter = @import("sdk/metrics/exporter.zig").ImMemoryExporter; +pub const InMemoryExporter = @import("sdk/metrics/exporter.zig").InMemoryExporter; pub const Counter = @import("api/metrics/instrument.zig").Counter; pub const UpDownCounter = @import("api/metrics/instrument.zig").Counter; diff --git a/src/sdk/metrics/exporter.zig b/src/sdk/metrics/exporter.zig index 071d58f..fe73e9d 100644 --- a/src/sdk/metrics/exporter.zig +++ b/src/sdk/metrics/exporter.zig @@ -4,10 +4,17 @@ const protobuf = @import("protobuf"); const ManagedString = protobuf.ManagedString; const pbmetrics = @import("../../opentelemetry/proto/metrics/v1.pb.zig"); const pbcommon = @import("../../opentelemetry/proto/common/v1.pb.zig"); +const spec = @import("../../api/metrics/spec.zig"); const MeterProvider = @import("../../api/metrics/meter.zig").MeterProvider; -const reader = @import("reader.zig"); -const MetricReadError = reader.MetricReadError; +const MetricReadError = @import("reader.zig").MetricReadError; +const MetricReader = @import("reader.zig").MetricReader; + +const DataPoint = @import("../../api/metrics/measurement.zig").DataPoint; +const MeasurementsData = @import("../../api/metrics/measurement.zig").MeasurementsData; +const Measurements = @import("../../api/metrics/measurement.zig").Measurements; + +const Attributes = @import("../../attributes.zig").Attributes; pub const ExportResult = enum { Success, @@ -19,9 +26,10 @@ pub const MetricExporter = struct { allocator: std.mem.Allocator, exporter: *ExporterIface, - hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - var exportCompleted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false); + // Lock helper to signal shutdown and/or export is in progress + hasShutDown: bool = false, + exportCompleted: std.Thread.Mutex = std.Thread.Mutex{}, pub fn new(allocator: std.mem.Allocator, exporter: *ExporterIface) !*Self { const s = try allocator.create(Self); @@ -34,15 +42,15 @@ pub const MetricExporter = struct { /// ExportBatch exports a batch of metrics data by calling the exporter implementation. /// The passed metrics data will be owned by the exporter implementation. - pub fn exportBatch(self: *Self, metrics: pbmetrics.MetricsData) ExportResult { - if (self.hasShutDown.load(.acquire)) { + pub fn exportBatch(self: *Self, metrics: []Measurements) ExportResult { + if (@atomicLoad(bool, &self.hasShutDown, .acquire)) { // When shutdown has already been called, calling export should be a failure. // https://opentelemetry.io/docs/specs/otel/metrics/sdk/#shutdown-2 return ExportResult.Failure; } - // Acquire the lock to ensure that forceFlush is waiting for export to complete. - _ = exportCompleted.load(.acquire); - defer exportCompleted.store(true, .release); + // Acquire the lock to signal to forceFlush to wait for export to complete. + self.exportCompleted.lock(); + defer self.exportCompleted.unlock(); // Call the exporter function to process metrics data. self.exporter.exportBatch(metrics) catch |e| { @@ -53,40 +61,56 @@ pub const MetricExporter = struct { } // Ensure that all the data is flushed to the destination. - pub fn forceFlush(_: Self, timeout_ms: u64) !void { + pub fn forceFlush(self: *Self, timeout_ms: u64) !void { const start = std.time.milliTimestamp(); // Milliseconds const timeout: i64 = @intCast(timeout_ms); while (std.time.milliTimestamp() < start + timeout) { - if (exportCompleted.load(.acquire)) { + if (self.exportCompleted.tryLock()) { + self.exportCompleted.unlock(); return; - } else std.time.sleep(std.time.ns_per_ms); + } else { + std.time.sleep(std.time.ns_per_ms); + } } return MetricReadError.ForceFlushTimedOut; } pub fn shutdown(self: *Self) void { - self.hasShutDown.store(true, .monotonic); + if (@atomicRmw(bool, &self.hasShutDown, .Xchg, true, .acq_rel)) { + return; + } + // if (@atomicLoad(bool, &self.hasShutDown, .acquire)) { + // // When shutdown has already been called, calling shutdown again is a no-op. + // return; + // } else { + // @atomicStore(bool, &self.hasShutDown, true, .release); self.allocator.destroy(self); + // } } }; // test harness to build a noop exporter. // marked as pub only for testing purposes. -pub fn noopExporter(_: *ExporterIface, metrics: pbmetrics.MetricsData) MetricReadError!void { - defer metrics.deinit(); +pub fn noopExporter(_: *ExporterIface, _: []Measurements) MetricReadError!void { return; } // mocked metric exporter to assert metrics data are read once exported. -fn mockExporter(_: *ExporterIface, metrics: pbmetrics.MetricsData) MetricReadError!void { - defer metrics.deinit(); - if (metrics.resource_metrics.items.len != 1) { +fn mockExporter(_: *ExporterIface, metrics: []Measurements) MetricReadError!void { + defer { + for (metrics) |m| { + var d = m; + d.deinit(std.testing.allocator); + } + std.testing.allocator.free(metrics); + } + if (metrics.len != 1) { + std.debug.print("expectd just one metric, got {d}\n{any}\n", .{ metrics.len, metrics }); return MetricReadError.ExportFailed; - } // only one resource metrics is expected in this mock + } // only one instrument from a single meter is expected in this mock } // test harness to build an exporter that times out. -fn waiterExporter(_: *ExporterIface, metrics: pbmetrics.MetricsData) MetricReadError!void { - defer metrics.deinit(); +fn waiterExporter(_: *ExporterIface, _: []Measurements) MetricReadError!void { // Sleep for 1 second to simulate a slow exporter. std.time.sleep(std.time.ns_per_ms * 1000); return; @@ -97,11 +121,16 @@ test "metric exporter no-op" { var me = try MetricExporter.new(std.testing.allocator, &noop); defer me.shutdown(); - const metrics = pbmetrics.MetricsData{ - .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator), - }; - defer metrics.deinit(); - const result = me.exportBatch(metrics); + var measure = [1]DataPoint(i64){.{ .value = 42 }}; + const measurement: []DataPoint(i64) = measure[0..]; + var metrics = [1]Measurements{.{ + .meterName = "my-meter", + .instrumentKind = .Counter, + .instrumentOptions = .{ .name = "my-counter" }, + .data = .{ .int = measurement }, + }}; + + const result = me.exportBatch(&metrics); try std.testing.expectEqual(ExportResult.Success, result); } @@ -111,10 +140,7 @@ test "metric exporter is called by metric reader" { var mock = ExporterIface{ .exportFn = mockExporter }; - var rdr = try reader.MetricReader.init( - std.testing.allocator, - try MetricExporter.new(std.testing.allocator, &mock), - ); + var rdr = try MetricReader.init(std.testing.allocator, &mock); defer rdr.shutdown(); try mp.addReader(rdr); @@ -133,19 +159,23 @@ test "metric exporter force flush succeeds" { var me = try MetricExporter.new(std.testing.allocator, &noop); defer me.shutdown(); - const metrics = pbmetrics.MetricsData{ - .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator), - }; - defer metrics.deinit(); - const result = me.exportBatch(metrics); + var measure = [1]DataPoint(i64){.{ .value = 42 }}; + const dataPoints: []DataPoint(i64) = measure[0..]; + var metrics = [1]Measurements{Measurements{ + .meterName = "my-meter", + .instrumentKind = .Counter, + .instrumentOptions = .{ .name = "my-counter" }, + .data = .{ .int = dataPoints }, + }}; + + const result = me.exportBatch(&metrics); try std.testing.expectEqual(ExportResult.Success, result); try me.forceFlush(1000); } -fn backgroundRunner(me: *MetricExporter, metrics: pbmetrics.MetricsData) !void { +fn backgroundRunner(me: *MetricExporter, metrics: []Measurements) !void { _ = me.exportBatch(metrics); - metrics.deinit(); } test "metric exporter force flush fails" { @@ -153,19 +183,22 @@ test "metric exporter force flush fails" { var me = try MetricExporter.new(std.testing.allocator, &wait); defer me.shutdown(); - const metrics = pbmetrics.MetricsData{ - .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator), - }; - defer metrics.deinit(); + var measure = [1]DataPoint(i64){.{ .value = 42 }}; + const dataPoints: []DataPoint(i64) = measure[0..]; + var metrics = [1]Measurements{Measurements{ + .meterName = "my-meter", + .instrumentKind = .Counter, + .instrumentOptions = .{ .name = "my-counter" }, + .data = .{ .int = dataPoints }, + }}; var bg = try std.Thread.spawn( .{}, backgroundRunner, - .{ me, metrics }, + .{ me, &metrics }, ); - bg.detach(); + bg.join(); - std.time.sleep(10 * std.time.ns_per_ms); // sleep for 10 ms to ensure the background thread completed const e = me.forceFlush(0); try std.testing.expectError(MetricReadError.ForceFlushTimedOut, e); } @@ -174,29 +207,31 @@ test "metric exporter force flush fails" { /// Implementations can be satisfied by any type by having a member field of type /// ExporterIface and a member function exportBatch with the correct signature. pub const ExporterIface = struct { - exportFn: *const fn (*ExporterIface, pbmetrics.MetricsData) MetricReadError!void, + exportFn: *const fn (*ExporterIface, []Measurements) MetricReadError!void, /// ExportBatch defines the behavior that metric exporters will implement. /// Each metric exporter owns the metrics data passed to it. - pub fn exportBatch(self: *ExporterIface, data: pbmetrics.MetricsData) MetricReadError!void { + pub fn exportBatch(self: *ExporterIface, data: []Measurements) MetricReadError!void { return self.exportFn(self, data); } }; -/// ImMemoryExporter stores in memory the metrics data to be exported. -/// The memory representation uses the types defined in the library. -pub const ImMemoryExporter = struct { +/// InMemoryExporter stores in memory the metrics data to be exported. +/// The metics' representation in memory uses the types defined in the library. +pub const InMemoryExporter = struct { const Self = @This(); allocator: std.mem.Allocator, - data: pbmetrics.MetricsData, + data: std.ArrayList(Measurements) = undefined, // Implement the interface via @fieldParentPtr exporter: ExporterIface, + mx: std.Thread.Mutex = std.Thread.Mutex{}, + pub fn init(allocator: std.mem.Allocator) !*Self { const s = try allocator.create(Self); s.* = Self{ .allocator = allocator, - .data = pbmetrics.MetricsData{ .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(allocator) }, + .data = std.ArrayList(Measurements).init(allocator), .exporter = ExporterIface{ .exportFn = exportBatch, }, @@ -204,89 +239,215 @@ pub const ImMemoryExporter = struct { return s; } pub fn deinit(self: *Self) void { + self.mx.lock(); + for (self.data.items) |*d| { + d.*.deinit(self.allocator); + } self.data.deinit(); + self.mx.unlock(); + self.allocator.destroy(self); } - fn exportBatch(iface: *ExporterIface, metrics: pbmetrics.MetricsData) MetricReadError!void { + // Implements the ExportIFace interface only method. + fn exportBatch(iface: *ExporterIface, metrics: []Measurements) MetricReadError!void { // Get a pointer to the instance of the struct that implements the interface. const self: *Self = @fieldParentPtr("exporter", iface); + self.mx.lock(); + defer self.mx.unlock(); - self.data.deinit(); - self.data = metrics; + // Free up the allocated data points from the previous export. + for (self.data.items) |*d| { + d.*.deinit(self.allocator); + } + self.data.clearAndFree(); + self.data = std.ArrayList(Measurements).fromOwnedSlice(self.allocator, metrics); } - /// Copy the metrics from the in memory exporter. - /// Caller owns the memory and must call deinit() once done. - pub fn fetch(self: *Self) !pbmetrics.MetricsData { - return self.data.dupe(self.allocator); + /// Read the metrics from the in memory exporter. + pub fn fetch(self: *Self) ![]Measurements { + self.mx.lock(); + defer self.mx.unlock(); + return self.data.items; } }; test "in memory exporter stores data" { - var inMemExporter = try ImMemoryExporter.init(std.testing.allocator); + const allocator = std.testing.allocator; + + var inMemExporter = try InMemoryExporter.init(allocator); defer inMemExporter.deinit(); - const exporter = try MetricExporter.new(std.testing.allocator, &inMemExporter.exporter); + const exporter = try MetricExporter.new(allocator, &inMemExporter.exporter); defer exporter.shutdown(); const howMany: usize = 2; - const dp = try std.testing.allocator.alloc(pbmetrics.NumberDataPoint, howMany); - dp[0] = pbmetrics.NumberDataPoint{ - .attributes = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator), - .exemplars = std.ArrayList(pbmetrics.Exemplar).init(std.testing.allocator), - .value = .{ .as_int = @as(i64, 1) }, - }; - dp[1] = pbmetrics.NumberDataPoint{ - .attributes = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator), - .exemplars = std.ArrayList(pbmetrics.Exemplar).init(std.testing.allocator), - .value = .{ .as_int = @as(i64, 2) }, - }; - - const metric = pbmetrics.Metric{ - .metadata = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator), - .name = ManagedString.managed("test_metric"), - .unit = ManagedString.managed("count"), - .data = .{ .sum = pbmetrics.Sum{ - .data_points = std.ArrayList(pbmetrics.NumberDataPoint).fromOwnedSlice(std.testing.allocator, dp), - .aggregation_temporality = .AGGREGATION_TEMPORALITY_CUMULATIVE, - } }, - }; - - var sm = pbmetrics.ScopeMetrics{ - .metrics = std.ArrayList(pbmetrics.Metric).init(std.testing.allocator), - }; - try sm.metrics.append(metric); - - var resource = pbmetrics.ResourceMetrics{ - .scope_metrics = std.ArrayList(pbmetrics.ScopeMetrics).init(std.testing.allocator), - }; - try resource.scope_metrics.append(sm); - var metricsData = pbmetrics.MetricsData{ - .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator), - }; - try metricsData.resource_metrics.append(resource); + const val = @as(u64, 42); + const attrs = try Attributes.from(allocator, .{ "key", val }); + defer std.testing.allocator.free(attrs.?); + + var counterMeasure = try allocator.alloc(DataPoint(i64), 1); + counterMeasure[0] = .{ .value = @as(i64, 1), .attributes = attrs }; + + var histMeasure = try allocator.alloc(DataPoint(f64), 1); + histMeasure[0] = .{ .value = @as(f64, 2.0), .attributes = attrs }; + + var underTest = std.ArrayList(Measurements).init(allocator); + + try underTest.append(Measurements{ + .meterName = "first-meter", + .meterAttributes = null, + .instrumentKind = .Counter, + .instrumentOptions = .{ .name = "counter-abc" }, + .data = .{ .int = counterMeasure }, + }); + try underTest.append(Measurements{ + .meterName = "another-meter", + .meterAttributes = null, + .instrumentKind = .Histogram, + .instrumentOptions = .{ .name = "histogram-abc" }, + .data = .{ .double = histMeasure }, + }); // MetricReader.collect() does a copy of the metrics data, // then calls the exportBatch implementation passing it in. - const ownedData = try metricsData.dupe(std.testing.allocator); - defer metricsData.deinit(); - const result = exporter.exportBatch(ownedData); + const result = exporter.exportBatch(try underTest.toOwnedSlice()); std.debug.assert(result == .Success); const data = try inMemExporter.fetch(); - defer data.deinit(); - std.debug.assert(data.resource_metrics.items.len == 1); - const entry = data.resource_metrics.items[0]; + std.debug.assert(data.len == howMany); + + try std.testing.expectEqualDeep(counterMeasure[0], data[0].data.int[0]); +} + +/// A periodic exporting reader is a specialization of MetricReader +/// that periodically exports metrics data to a destination. +/// The exporter configured in init() should be a push-based exporter. +/// See https://opentelemetry.io/docs/specs/otel/metrics/sdk/#periodic-exporting-metricreader +pub const PeriodicExportingReader = struct { + const Self = @This(); + + allocator: std.mem.Allocator, + exportIntervalMillis: u64, + exportTimeoutMillis: u64, + + // Lock helper to signal shutdown is in progress + shuttingDown: bool = false, + + // This reader will collect metrics data from the MeterProvider. + reader: *MetricReader, + + // The intervals at which the reader should export metrics data + // and wait for each operation to complete. + // Default values are dicated by the OpenTelemetry specification. + const defaultExportIntervalMillis: u64 = 60000; + const defaultExportTimeoutMillis: u64 = 30000; + + pub fn init( + allocator: std.mem.Allocator, + mp: *MeterProvider, + exporter: *ExporterIface, + exportIntervalMs: ?u64, + exportTimeoutMs: ?u64, + ) !*Self { + const s = try allocator.create(Self); + s.* = Self{ + .allocator = allocator, + .reader = try MetricReader.init( + std.testing.allocator, + exporter, + ), + .exportIntervalMillis = exportIntervalMs orelse defaultExportIntervalMillis, + .exportTimeoutMillis = exportTimeoutMs orelse defaultExportTimeoutMillis, + }; + try mp.addReader(s.reader); + + const th = try std.Thread.spawn( + .{}, + collectAndExport, + .{ s.reader, &s.shuttingDown, s.exportIntervalMillis, s.exportTimeoutMillis }, + ); + th.detach(); + return s; + } + + pub fn shutdown(self: *Self) void { + // First signal the background exporter to stop collecting, then close the reader. + @atomicStore(bool, &self.shuttingDown, true, .release); + self.reader.shutdown(); + // Only when the background collector has stopped we can destroy. + self.allocator.destroy(self); + } +}; + +// Function that collects metrics from the reader and exports it to the destination. +// FIXME there is not a timeout for the collect operation. +fn collectAndExport( + reader: *MetricReader, + shuttingDown: *bool, + exportIntervalMillis: u64, + // TODO: add a timeout for the export operation + _: u64, +) void { + // The execution should continue until the reader is shutting down + while (!@atomicLoad(bool, shuttingDown, .acquire)) { + if (reader.meterProvider) |_| { + // This will also call exporter.exportBatch() every interval. + reader.collect() catch |e| { + std.debug.print("PeriodicExportingReader: collecting failed on reader: {?}\n", .{e}); + }; + } else { + std.debug.print("PeriodicExportingReader: no meter provider is registered with this MetricReader {any}\n", .{reader}); + } + + std.time.sleep(exportIntervalMillis * std.time.ns_per_ms); + } +} + +test "e2e periodic exporting metric reader" { + const mp = try MeterProvider.init(std.testing.allocator); + defer mp.shutdown(); + + const waiting: u64 = 100; + + var inMem = try InMemoryExporter.init(std.testing.allocator); + defer inMem.deinit(); + + var per = try PeriodicExportingReader.init( + std.testing.allocator, + mp, + &inMem.exporter, + waiting, + null, + ); + defer per.shutdown(); + + var meter = try mp.getMeter(.{ .name = "test-reader" }); + var counter = try meter.createCounter(u64, .{ + .name = "requests", + .description = "a test counter", + }); + try counter.add(10, .{}); + + var histogram = try meter.createHistogram(f64, .{ + .name = "latency", + .description = "a test histogram", + .histogramOpts = .{ .explicitBuckets = &.{ + 1.0, + 10.0, + 100.0, + } }, + }); + try histogram.record(1.4, .{}); + try histogram.record(10.4, .{}); - std.debug.assert(entry.scope_metrics.items.len == 1); - std.debug.assert(entry.scope_metrics.items[0].metrics.items[0].data.?.sum.data_points.items.len == 2); + std.time.sleep(waiting * 4 * std.time.ns_per_ms); - try std.testing.expectEqual(pbmetrics.Sum, @TypeOf(entry.scope_metrics.items[0].metrics.items[0].data.?.sum)); - const sum: pbmetrics.Sum = entry.scope_metrics.items[0].metrics.items[0].data.?.sum; + const data = try inMem.fetch(); - try std.testing.expectEqual(sum.data_points.items[0].value.?.as_int, 1); + try std.testing.expect(data.len == 2); + //TODO add more assertions } diff --git a/src/sdk/metrics/exporters/otlp.zig b/src/sdk/metrics/exporters/otlp.zig new file mode 100644 index 0000000..d355273 --- /dev/null +++ b/src/sdk/metrics/exporters/otlp.zig @@ -0,0 +1,141 @@ +const std = @import("std"); +const Kind = @import("../instrument.zig").Kind; +const Attribute = @import("../attributes.zig").Attribute; +const instrument = @import("../instrument.zig"); +const Instrument = instrument.Instrument; +const view = @import("../view.zig"); +const protobuf = @import("protobuf"); +const ManagedString = protobuf.ManagedString; +const pbcommon = @import("../../opentelemetry/proto/common/v1.pb.zig"); +const pbmetrics = @import("../../opentelemetry/proto/metrics/v1.pb.zig"); + +pub fn toProtobufMetric( + allocator: std.mem.Allocator, + temporality: *const fn (Kind) view.Temporality, + i: *Instrument, +) !pbmetrics.Metric { + return pbmetrics.Metric{ + .name = ManagedString.managed(i.opts.name), + .description = if (i.opts.description) |d| ManagedString.managed(d) else .Empty, + .unit = if (i.opts.unit) |u| ManagedString.managed(u) else .Empty, + .data = switch (i.data) { + .Counter_u16 => pbmetrics.Metric.data_union{ .sum = pbmetrics.Sum{ + .data_points = try sumDataPoints(allocator, u16, i.data.Counter_u16), + .aggregation_temporality = temporality(i.kind).toProto(), + .is_monotonic = true, + } }, + .Counter_u32 => pbmetrics.Metric.data_union{ .sum = pbmetrics.Sum{ + .data_points = try sumDataPoints(allocator, u32, i.data.Counter_u32), + .aggregation_temporality = temporality(i.kind).toProto(), + .is_monotonic = true, + } }, + + .Counter_u64 => pbmetrics.Metric.data_union{ .sum = pbmetrics.Sum{ + .data_points = try sumDataPoints(allocator, u64, i.data.Counter_u64), + .aggregation_temporality = temporality(i.kind).toProto(), + .is_monotonic = true, + } }, + .Histogram_u16 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ + .data_points = try histogramDataPoints(allocator, u16, i.data.Histogram_u16), + .aggregation_temporality = temporality(i.kind).toProto(), + } }, + + .Histogram_u32 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ + .data_points = try histogramDataPoints(allocator, u32, i.data.Histogram_u32), + .aggregation_temporality = temporality(i.kind).toProto(), + } }, + + .Histogram_u64 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ + .data_points = try histogramDataPoints(allocator, u64, i.data.Histogram_u64), + .aggregation_temporality = temporality(i.kind).toProto(), + } }, + + .Histogram_f32 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ + .data_points = try histogramDataPoints(allocator, f32, i.data.Histogram_f32), + .aggregation_temporality = temporality(i.kind).toProto(), + } }, + .Histogram_f64 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ + .data_points = try histogramDataPoints(allocator, f64, i.data.Histogram_f64), + .aggregation_temporality = temporality(i.kind).toProto(), + } }, + // TODO: add other metrics types. + else => unreachable, + }, + // Metadata used for internal translations and we can discard for now. + // Consumers of SDK should not rely on this field. + .metadata = std.ArrayList(pbcommon.KeyValue).init(allocator), + }; +} + +fn attributeToProtobuf(attribute: Attribute) pbcommon.KeyValue { + return pbcommon.KeyValue{ + .key = ManagedString.managed(attribute.key), + .value = switch (attribute.value) { + .bool => pbcommon.AnyValue{ .value = .{ .bool_value = attribute.value.bool } }, + .string => pbcommon.AnyValue{ .value = .{ .string_value = ManagedString.managed(attribute.value.string) } }, + .int => pbcommon.AnyValue{ .value = .{ .int_value = attribute.value.int } }, + .double => pbcommon.AnyValue{ .value = .{ .double_value = attribute.value.double } }, + // TODO include nested Attribute values + }, + }; +} + +fn attributesToProtobufKeyValueList(allocator: std.mem.Allocator, attributes: ?[]Attribute) !pbcommon.KeyValueList { + if (attributes) |attrs| { + var kvs = pbcommon.KeyValueList{ .values = std.ArrayList(pbcommon.KeyValue).init(allocator) }; + for (attrs) |a| { + try kvs.values.append(attributeToProtobuf(a)); + } + return kvs; + } else { + return pbcommon.KeyValueList{ .values = std.ArrayList(pbcommon.KeyValue).init(allocator) }; + } +} + +fn sumDataPoints(allocator: std.mem.Allocator, comptime T: type, c: *instrument.Counter(T)) !std.ArrayList(pbmetrics.NumberDataPoint) { + var dataPoints = std.ArrayList(pbmetrics.NumberDataPoint).init(allocator); + for (c.measurements.items) |measure| { + const attrs = try attributesToProtobufKeyValueList(allocator, measure.attributes); + const dp = pbmetrics.NumberDataPoint{ + .attributes = attrs.values, + // FIXME add a timestamp to Measurement in order to get it here. + .time_unix_nano = @intCast(std.time.nanoTimestamp()), + // FIXME reader's temporailty is not applied here. + .value = .{ .as_int = @intCast(measure.value) }, + + // TODO: support exemplars. + .exemplars = std.ArrayList(pbmetrics.Exemplar).init(allocator), + }; + try dataPoints.append(dp); + } + return dataPoints; +} + +fn histogramDataPoints(allocator: std.mem.Allocator, comptime T: type, h: *instrument.Histogram(T)) !std.ArrayList(pbmetrics.HistogramDataPoint) { + var dataPoints = std.ArrayList(pbmetrics.HistogramDataPoint).init(allocator); + for (h.dataPoints.items) |measure| { + const attrs = try attributesToProtobufKeyValueList(allocator, measure.attributes); + var dp = pbmetrics.HistogramDataPoint{ + .attributes = attrs.values, + .time_unix_nano = @intCast(std.time.nanoTimestamp()), + // FIXME reader's temporailty is not applied here. + .count = h.counts.get(measure.attributes) orelse 0, + .sum = switch (@TypeOf(h.*)) { + instrument.Histogram(u16), instrument.Histogram(u32), instrument.Histogram(u64) => @as(f64, @floatFromInt(measure.value)), + instrument.Histogram(f32), instrument.Histogram(f64) => @as(f64, @floatCast(measure.value)), + else => unreachable, + }, + .bucket_counts = std.ArrayList(u64).init(allocator), + .explicit_bounds = std.ArrayList(f64).init(allocator), + // TODO support exemplars + .exemplars = std.ArrayList(pbmetrics.Exemplar).init(allocator), + }; + if (h.bucket_counts.get(measure.attributes)) |b| { + try dp.bucket_counts.appendSlice(b); + } + try dp.explicit_bounds.appendSlice(h.buckets); + + try dataPoints.append(dp); + } + return dataPoints; +} diff --git a/src/sdk/metrics/reader.zig b/src/sdk/metrics/reader.zig index 878090b..e7154a1 100644 --- a/src/sdk/metrics/reader.zig +++ b/src/sdk/metrics/reader.zig @@ -5,17 +5,26 @@ const pbcommon = @import("../../opentelemetry/proto/common/v1.pb.zig"); const pbresource = @import("../../opentelemetry/proto/resource/v1.pb.zig"); const pbmetrics = @import("../../opentelemetry/proto/metrics/v1.pb.zig"); const pbutils = @import("../../pbutils.zig"); -const instr = @import("../../api/metrics/instrument.zig"); -const Instrument = instr.Instrument; -const Kind = instr.Kind; +const instrument = @import("../../api/metrics/instrument.zig"); +const Instrument = instrument.Instrument; +const Kind = instrument.Kind; const MeterProvider = @import("../../api/metrics/meter.zig").MeterProvider; +const AggregatedMetrics = @import("../../api/metrics/meter.zig").AggregatedMetrics; + const Attribute = @import("../../attributes.zig").Attribute; +const Attributes = @import("../../attributes.zig").Attributes; +const Measurements = @import("../../api/metrics/measurement.zig").Measurements; +const MeasurementsData = @import("../../api/metrics/measurement.zig").MeasurementsData; + const view = @import("view.zig"); +const TemporalitySelector = view.TemporalitySelector; +const AggregationSelector = view.AggregationSelector; + const exporter = @import("exporter.zig"); const MetricExporter = exporter.MetricExporter; -const Exporter = exporter.ExporterIface; +const ExporterIface = exporter.ExporterIface; const ExportResult = exporter.ExportResult; -const InMemoryExporter = exporter.ImMemoryExporter; +const InMemoryExporter = exporter.InMemoryExporter; /// ExportError represents the failure to export data points /// to a destination. @@ -23,6 +32,7 @@ pub const MetricReadError = error{ CollectFailedOnMissingMeterProvider, ExportFailed, ForceFlushTimedOut, + ConcurrentCollectNotAllowed, }; /// MetricReader reads metrics' data from a MeterProvider. @@ -30,24 +40,25 @@ pub const MetricReadError = error{ pub const MetricReader = struct { allocator: std.mem.Allocator, // Exporter is the destination of the metrics data. - // MetricReader takes oenwrship of the exporter. + // It takes ownership of the collected metrics. exporter: *MetricExporter = undefined, // We can read the instruments' data points from the meters // stored in meterProvider. meterProvider: ?*MeterProvider = null, - temporality: *const fn (Kind) view.Temporality = view.DefaultTemporalityFor, - aggregation: *const fn (Kind) view.Aggregation = view.DefaultAggregationFor, + temporality: TemporalitySelector = view.DefaultTemporalityFor, + aggregation: AggregationSelector = view.DefaultAggregationFor, // Signal that shutdown has been called. - hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + hasShutDown: bool = false, + mx: std.Thread.Mutex = std.Thread.Mutex{}, const Self = @This(); - pub fn init(allocator: std.mem.Allocator, metricExporter: *MetricExporter) !*Self { + pub fn init(allocator: std.mem.Allocator, exporterImpl: *ExporterIface) !*Self { const s = try allocator.create(Self); s.* = Self{ .allocator = allocator, - .exporter = metricExporter, + .exporter = try MetricExporter.new(allocator, exporterImpl), }; return s; } @@ -63,42 +74,42 @@ pub const MetricReader = struct { } pub fn collect(self: *Self) !void { - if (self.hasShutDown.load(.acquire)) { + if (@atomicLoad(bool, &self.hasShutDown, .acquire)) { // When shutdown has already been called, collect is a no-op. return; } - var metricsData = pbmetrics.MetricsData{ .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(self.allocator) }; - defer metricsData.deinit(); + if (!self.mx.tryLock()) { + return MetricReadError.ConcurrentCollectNotAllowed; + } + defer self.mx.unlock(); + var toBeExported = std.ArrayList(Measurements).init(self.allocator); + defer toBeExported.deinit(); if (self.meterProvider) |mp| { // Collect the data from each meter provider. - var mpIter = mp.meters.valueIterator(); - while (mpIter.next()) |meter| { - // Create a resourceMetric for each Meter. - const attrs = try attributesToProtobufKeyValueList(self.allocator, meter.attributes); - var rm = pbmetrics.ResourceMetrics{ - .resource = pbresource.Resource{ .attributes = attrs.values }, - .scope_metrics = std.ArrayList(pbmetrics.ScopeMetrics).init(self.allocator), - }; - // We only use a single ScopeMetric for each ResourceMetric. - var sm = pbmetrics.ScopeMetrics{ - .metrics = std.ArrayList(pbmetrics.Metric).init(self.allocator), + // Measurements can be ported to protobuf structs during OTLP export. + var meters = mp.meters.valueIterator(); + while (meters.next()) |meter| { + const measurements: []Measurements = AggregatedMetrics.fetch(self.allocator, meter, self.aggregation) catch |err| { + std.debug.print("MetricReader: error aggregating data points from meter {s}: {?}", .{ meter.name, err }); + continue; }; - var instrIter = meter.instruments.valueIterator(); - while (instrIter.next()) |i| { - if (toProtobufMetric(self.allocator, self.temporality, i.*)) |metric| { - try sm.metrics.append(metric); - } else |err| { - std.debug.print("MetricReader collect: failed conversion to proto Metric: {?}\n", .{err}); - } - } - try rm.scope_metrics.append(sm); - try metricsData.resource_metrics.append(rm); + // this makes a copy of the measurements to the array list + try toBeExported.appendSlice(measurements); + self.allocator.free(measurements); } - // Finally, export the metrics data through the exporter. - // Copy the data to the exporter's memory and each exporter should own it and free it - // by calling deinit() on the MetricsData once done. - const owned = try metricsData.dupe(self.allocator); + + //TODO: apply temporality before exporting, optionally keeping state in the reader. + // When .Delta temporality is used, it will report the difference between the value + // previsouly collected and the currently collected value. + // This requires keeping state in the reader to store the previous value. + + // Export the metrics data through the exporter. + // The exporter will own the metrics and should free it + // by calling deinit() on the Measurements once done. + // MetricExporter must be built with the same allocator as MetricReader + // to ensure that the memory is managed correctly. + const owned = try toBeExported.toOwnedSlice(); switch (self.exporter.exportBatch(owned)) { ExportResult.Success => return, ExportResult.Failure => return MetricReadError.ExportFailed, @@ -110,163 +121,18 @@ pub const MetricReader = struct { } pub fn shutdown(self: *Self) void { + @atomicStore(bool, &self.hasShutDown, true, .release); self.collect() catch |e| { std.debug.print("MetricReader shutdown: error while collecting metrics: {?}\n", .{e}); }; - self.hasShutDown.store(true, .release); self.exporter.shutdown(); self.allocator.destroy(self); } }; -fn toProtobufMetric( - allocator: std.mem.Allocator, - temporality: *const fn (Kind) view.Temporality, - i: *Instrument, -) !pbmetrics.Metric { - return pbmetrics.Metric{ - .name = ManagedString.managed(i.opts.name), - .description = if (i.opts.description) |d| ManagedString.managed(d) else .Empty, - .unit = if (i.opts.unit) |u| ManagedString.managed(u) else .Empty, - .data = switch (i.data) { - .Counter_u16 => pbmetrics.Metric.data_union{ .sum = pbmetrics.Sum{ - .data_points = try sumDataPoints(allocator, u16, i.data.Counter_u16), - .aggregation_temporality = temporality(i.kind).toProto(), - .is_monotonic = true, - } }, - .Counter_u32 => pbmetrics.Metric.data_union{ .sum = pbmetrics.Sum{ - .data_points = try sumDataPoints(allocator, u32, i.data.Counter_u32), - .aggregation_temporality = temporality(i.kind).toProto(), - .is_monotonic = true, - } }, - - .Counter_u64 => pbmetrics.Metric.data_union{ .sum = pbmetrics.Sum{ - .data_points = try sumDataPoints(allocator, u64, i.data.Counter_u64), - .aggregation_temporality = temporality(i.kind).toProto(), - .is_monotonic = true, - } }, - .Histogram_u16 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ - .data_points = try histogramDataPoints(allocator, u16, i.data.Histogram_u16), - .aggregation_temporality = temporality(i.kind).toProto(), - } }, - - .Histogram_u32 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ - .data_points = try histogramDataPoints(allocator, u32, i.data.Histogram_u32), - .aggregation_temporality = temporality(i.kind).toProto(), - } }, - - .Histogram_u64 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ - .data_points = try histogramDataPoints(allocator, u64, i.data.Histogram_u64), - .aggregation_temporality = temporality(i.kind).toProto(), - } }, - - .Histogram_f32 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ - .data_points = try histogramDataPoints(allocator, f32, i.data.Histogram_f32), - .aggregation_temporality = temporality(i.kind).toProto(), - } }, - .Histogram_f64 => pbmetrics.Metric.data_union{ .histogram = pbmetrics.Histogram{ - .data_points = try histogramDataPoints(allocator, f64, i.data.Histogram_f64), - .aggregation_temporality = temporality(i.kind).toProto(), - } }, - // TODO: add other metrics types. - else => unreachable, - }, - // Metadata used for internal translations and we can discard for now. - // Consumers of SDK should not rely on this field. - .metadata = std.ArrayList(pbcommon.KeyValue).init(allocator), - }; -} - -fn attributeToProtobuf(attribute: Attribute) pbcommon.KeyValue { - return pbcommon.KeyValue{ - .key = ManagedString.managed(attribute.name), - .value = switch (attribute.value) { - .bool => pbcommon.AnyValue{ .value = .{ .bool_value = attribute.value.bool } }, - .string => pbcommon.AnyValue{ .value = .{ .string_value = ManagedString.managed(attribute.value.string) } }, - .int => pbcommon.AnyValue{ .value = .{ .int_value = attribute.value.int } }, - .double => pbcommon.AnyValue{ .value = .{ .double_value = attribute.value.double } }, - // TODO include nested Attribute values - }, - }; -} - -fn attributesToProtobufKeyValueList(allocator: std.mem.Allocator, attributes: ?[]Attribute) !pbcommon.KeyValueList { - if (attributes) |attrs| { - var kvs = pbcommon.KeyValueList{ .values = std.ArrayList(pbcommon.KeyValue).init(allocator) }; - for (attrs) |a| { - try kvs.values.append(attributeToProtobuf(a)); - } - return kvs; - } else { - return pbcommon.KeyValueList{ .values = std.ArrayList(pbcommon.KeyValue).init(allocator) }; - } -} - -fn sumDataPoints(allocator: std.mem.Allocator, comptime T: type, c: *instr.Counter(T)) !std.ArrayList(pbmetrics.NumberDataPoint) { - var dataPoints = std.ArrayList(pbmetrics.NumberDataPoint).init(allocator); - var iter = c.cumulative.iterator(); - while (iter.next()) |measure| { - var attrs = std.ArrayList(pbcommon.KeyValue).init(allocator); - // Attributes are stored as key of the hasmap. - if (measure.key_ptr.*) |kv| { - for (kv) |a| { - try attrs.append(attributeToProtobuf(a)); - } - } - const dp = pbmetrics.NumberDataPoint{ - .attributes = attrs, - .time_unix_nano = @intCast(std.time.nanoTimestamp()), - // FIXME reader's temporailty is not applied here. - .value = .{ .as_int = @intCast(measure.value_ptr.*) }, - - // TODO: support exemplars. - .exemplars = std.ArrayList(pbmetrics.Exemplar).init(allocator), - }; - try dataPoints.append(dp); - } - return dataPoints; -} - -fn histogramDataPoints(allocator: std.mem.Allocator, comptime T: type, h: *instr.Histogram(T)) !std.ArrayList(pbmetrics.HistogramDataPoint) { - var dataPoints = std.ArrayList(pbmetrics.HistogramDataPoint).init(allocator); - var iter = h.cumulative.iterator(); - while (iter.next()) |measure| { - var attrs = std.ArrayList(pbcommon.KeyValue).init(allocator); - // Attributes are stored as key of the hashmap. - if (measure.key_ptr.*) |kv| { - for (kv) |a| { - try attrs.append(attributeToProtobuf(a)); - } - } - var dp = pbmetrics.HistogramDataPoint{ - .attributes = attrs, - .time_unix_nano = @intCast(std.time.nanoTimestamp()), - // FIXME reader's temporailty is not applied here. - .count = h.counts.get(measure.key_ptr.*) orelse 0, - .sum = switch (@TypeOf(h.*)) { - instr.Histogram(u16), instr.Histogram(u32), instr.Histogram(u64) => @as(f64, @floatFromInt(measure.value_ptr.*)), - instr.Histogram(f32), instr.Histogram(f64) => @as(f64, @floatCast(measure.value_ptr.*)), - else => unreachable, - }, - .bucket_counts = std.ArrayList(u64).init(allocator), - .explicit_bounds = std.ArrayList(f64).init(allocator), - // TODO support exemplars - .exemplars = std.ArrayList(pbmetrics.Exemplar).init(allocator), - }; - if (h.bucket_counts.get(measure.key_ptr.*)) |b| { - try dp.bucket_counts.appendSlice(b); - } - try dp.explicit_bounds.appendSlice(h.buckets); - - try dataPoints.append(dp); - } - return dataPoints; -} - test "metric reader shutdown prevents collect() to execute" { var noop = exporter.ExporterIface{ .exportFn = exporter.noopExporter }; - const me = try MetricExporter.new(std.testing.allocator, &noop); - var reader = try MetricReader.init(std.testing.allocator, me); + var reader = try MetricReader.init(std.testing.allocator, &noop); const e = reader.collect(); try std.testing.expectEqual(MetricReadError.CollectFailedOnMissingMeterProvider, e); reader.shutdown(); @@ -279,10 +145,7 @@ test "metric reader collects data from meter provider" { var inMem = try InMemoryExporter.init(std.testing.allocator); defer inMem.deinit(); - var reader = try MetricReader.init( - std.testing.allocator, - try MetricExporter.new(std.testing.allocator, &inMem.exporter), - ); + var reader = try MetricReader.init(std.testing.allocator, &inMem.exporter); defer reader.shutdown(); try mp.addReader(reader); @@ -301,6 +164,8 @@ test "metric reader collects data from meter provider" { try histFloat.record(10.0, .{ "wonderful", v }); try reader.collect(); + + _ = try inMem.fetch(); } fn deltaTemporality(_: Kind) view.Temporality { @@ -314,10 +179,7 @@ test "metric reader custom temporality" { var inMem = try InMemoryExporter.init(std.testing.allocator); defer inMem.deinit(); - var reader = try MetricReader.init( - std.testing.allocator, - try MetricExporter.new(std.testing.allocator, &inMem.exporter), - ); + var reader = try MetricReader.init(std.testing.allocator, &inMem.exporter); reader = reader.withTemporality(deltaTemporality); defer reader.shutdown(); @@ -332,134 +194,6 @@ test "metric reader custom temporality" { try reader.collect(); const data = try inMem.fetch(); - defer data.deinit(); - - std.debug.assert(data.resource_metrics.items.len == 1); - std.debug.assert(data.resource_metrics.items[0].scope_metrics.items[0].metrics.items.len == 1); -} - -/// A periodic exporting metric reader is a specialization of MetricReader -/// that periodically exports metrics data to a destination. -/// The exporter should be a push-based exporter. -/// See https://opentelemetry.io/docs/specs/otel/metrics/sdk/#periodic-exporting-metricreader -pub const PeriodicExportingMetricReader = struct { - const Self = @This(); - - allocator: std.mem.Allocator, - exportIntervalMillis: u64, - exportTimeoutMillis: u64, - - // Lock helper to signal shutdown is in progress - shuttingDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - - // This reader will collect metrics data from the MeterProvider. - reader: *MetricReader, - - // The intervals at which the reader should export metrics data - // and wait for each operation to complete. - // Default values are dicated by the OpenTelemetry specification. - const defaultExportIntervalMillis: u64 = 60000; - const defaultExportTimeoutMillis: u64 = 30000; - - pub fn init( - allocator: std.mem.Allocator, - reader: *MetricReader, - exportIntervalMs: ?u64, - exportTimeoutMs: ?u64, - ) !*Self { - const s = try allocator.create(Self); - s.* = Self{ - .allocator = allocator, - .reader = reader, - .exportIntervalMillis = exportIntervalMs orelse defaultExportIntervalMillis, - .exportTimeoutMillis = exportTimeoutMs orelse defaultExportTimeoutMillis, - }; - try s.start(); - return s; - } - - fn start(self: *Self) !void { - const th = try std.Thread.spawn( - .{}, - collectAndExport, - .{self}, - ); - th.detach(); - return; - } - - pub fn shutdown(self: *Self) void { - self.shuttingDown.store(true, .release); - self.allocator.destroy(self); - } -}; - -// Function that collects metrics from the reader and exports it to the destination. -// FIXME there is not a timeout for the collect operation. -fn collectAndExport(periodicExp: *PeriodicExportingMetricReader) void { - // The execution should continue until the reader is shutting down - while (periodicExp.shuttingDown.load(.acquire) == false) { - if (periodicExp.reader.meterProvider) |_| { - // This will also call exporter.exportBatch() every interval. - periodicExp.reader.collect() catch |e| { - std.debug.print("PeriodicExportingReader: reader collect failed: {?}\n", .{e}); - }; - } else { - std.debug.print("PeriodicExportingReader: no meter provider is registered with this MetricReader {any}\n", .{periodicExp.reader}); - } - - std.time.sleep(periodicExp.exportIntervalMillis * std.time.ns_per_ms); - } -} - -test "e2e periodic exporting metric reader" { - const mp = try MeterProvider.init(std.testing.allocator); - defer mp.shutdown(); - - const waiting: u64 = 10; - - var inMem = try InMemoryExporter.init(std.testing.allocator); - defer inMem.deinit(); - - var reader = try MetricReader.init( - std.testing.allocator, - try MetricExporter.new(std.testing.allocator, &inMem.exporter), - ); - defer reader.shutdown(); - - var pemr = try PeriodicExportingMetricReader.init( - std.testing.allocator, - reader, - waiting, - null, - ); - defer pemr.shutdown(); - - try mp.addReader(pemr.reader); - - var meter = try mp.getMeter(.{ .name = "test-reader" }); - var counter = try meter.createCounter(u64, .{ - .name = "requests", - .description = "a test counter", - }); - try counter.add(10, .{}); - - var histogram = try meter.createHistogram(u64, .{ - .name = "latency", - .description = "a test histogram", - .histogramOpts = .{ .explicitBuckets = &.{ - 1.0, - 10.0, - 100.0, - } }, - }); - try histogram.record(10, .{}); - - std.time.sleep(waiting * 2 * std.time.ns_per_ms); - - const data = try inMem.fetch(); - defer data.deinit(); - std.debug.assert(data.resource_metrics.items.len == 1); - std.debug.assert(data.resource_metrics.items[0].scope_metrics.items[0].metrics.items.len == 2); + std.debug.assert(data.len == 1); } diff --git a/src/sdk/metrics/view.zig b/src/sdk/metrics/view.zig index 5052e8a..e6e5d4e 100644 --- a/src/sdk/metrics/view.zig +++ b/src/sdk/metrics/view.zig @@ -1,18 +1,17 @@ const pbmetrics = @import("../../opentelemetry/proto/metrics/v1.pb.zig"); -const Instrument = @import("../../api/metrics/instrument.zig"); +const instrument = @import("../../api/metrics/instrument.zig"); /// Defines the ways and means to compute aggregated metrics. /// See https://opentelemetry.io/docs/specs/otel/metrics/sdk/#aggregation pub const Aggregation = enum { Drop, - Default, Sum, LastValue, ExplicitBucketHistogram, }; /// Default aggregation for a given kind of instrument. -pub fn DefaultAggregationFor(kind: Instrument.Kind) Aggregation { +pub fn DefaultAggregationFor(kind: instrument.Kind) Aggregation { return switch (kind) { .Counter => Aggregation.Sum, .UpDownCounter => Aggregation.Sum, @@ -21,20 +20,22 @@ pub fn DefaultAggregationFor(kind: Instrument.Kind) Aggregation { }; } -// Temporality +/// Temporality describes how the value should be used. pub const Temporality = enum { Cumulative, Delta, + Unspecified, pub fn toProto(self: Temporality) pbmetrics.AggregationTemporality { return switch (self) { .Cumulative => .AGGREGATION_TEMPORALITY_CUMULATIVE, .Delta => .AGGREGATION_TEMPORALITY_DELTA, + .Unspecified => .AGGREGATION_TEMPORALITY_UNSPECIFIED, }; } }; -pub fn DefaultTemporalityFor(kind: Instrument.Kind) Temporality { +pub fn DefaultTemporalityFor(kind: instrument.Kind) Temporality { return switch (kind) { .Counter => Temporality.Cumulative, .UpDownCounter => Temporality.Cumulative, @@ -42,3 +43,7 @@ pub fn DefaultTemporalityFor(kind: Instrument.Kind) Temporality { .Histogram => Temporality.Cumulative, }; } + +pub const TemporalitySelector = *const fn (instrument.Kind) Temporality; + +pub const AggregationSelector = *const fn (instrument.Kind) Aggregation;