From 855f68305a3b8bc3f3e1376057d1f69ae911ce43 Mon Sep 17 00:00:00 2001 From: inge4pres Date: Mon, 3 Feb 2025 23:36:54 +0100 Subject: [PATCH] sdk: resolve memory leaks in exporter Signed-off-by: inge4pres --- src/sdk/metrics/exporter.zig | 76 ++++++++++++++++++++++-------------- src/sdk/metrics/reader.zig | 30 +++++++------- 2 files changed, 61 insertions(+), 45 deletions(-) diff --git a/src/sdk/metrics/exporter.zig b/src/sdk/metrics/exporter.zig index 0677f15..fe73e9d 100644 --- a/src/sdk/metrics/exporter.zig +++ b/src/sdk/metrics/exporter.zig @@ -26,9 +26,10 @@ pub const MetricExporter = struct { allocator: std.mem.Allocator, exporter: *ExporterIface, - hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), - var exportCompleted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false); + // Lock helper to signal shutdown and/or export is in progress + hasShutDown: bool = false, + exportCompleted: std.Thread.Mutex = std.Thread.Mutex{}, pub fn new(allocator: std.mem.Allocator, exporter: *ExporterIface) !*Self { const s = try allocator.create(Self); @@ -42,14 +43,14 @@ pub const MetricExporter = struct { /// ExportBatch exports a batch of metrics data by calling the exporter implementation. /// The passed metrics data will be owned by the exporter implementation. pub fn exportBatch(self: *Self, metrics: []Measurements) ExportResult { - if (self.hasShutDown.load(.acquire)) { + if (@atomicLoad(bool, &self.hasShutDown, .acquire)) { // When shutdown has already been called, calling export should be a failure. // https://opentelemetry.io/docs/specs/otel/metrics/sdk/#shutdown-2 return ExportResult.Failure; } - // Acquire the lock to ensure that forceFlush is waiting for export to complete. - _ = exportCompleted.load(.acquire); - defer exportCompleted.store(true, .release); + // Acquire the lock to signal to forceFlush to wait for export to complete. + self.exportCompleted.lock(); + defer self.exportCompleted.unlock(); // Call the exporter function to process metrics data. self.exporter.exportBatch(metrics) catch |e| { @@ -60,24 +61,31 @@ pub const MetricExporter = struct { } // Ensure that all the data is flushed to the destination. - pub fn forceFlush(_: Self, timeout_ms: u64) !void { + pub fn forceFlush(self: *Self, timeout_ms: u64) !void { const start = std.time.milliTimestamp(); // Milliseconds const timeout: i64 = @intCast(timeout_ms); while (std.time.milliTimestamp() < start + timeout) { - if (exportCompleted.load(.acquire)) { + if (self.exportCompleted.tryLock()) { + self.exportCompleted.unlock(); return; - } else std.time.sleep(std.time.ns_per_ms); + } else { + std.time.sleep(std.time.ns_per_ms); + } } return MetricReadError.ForceFlushTimedOut; } pub fn shutdown(self: *Self) void { - if (self.hasShutDown.load(.acquire)) { - // When shutdown has already been called, calling shutdown again is a no-op. + if (@atomicRmw(bool, &self.hasShutDown, .Xchg, true, .acq_rel)) { return; } - self.hasShutDown.store(true, .release); + // if (@atomicLoad(bool, &self.hasShutDown, .acquire)) { + // // When shutdown has already been called, calling shutdown again is a no-op. + // return; + // } else { + // @atomicStore(bool, &self.hasShutDown, true, .release); self.allocator.destroy(self); + // } } }; @@ -115,7 +123,7 @@ test "metric exporter no-op" { var measure = [1]DataPoint(i64){.{ .value = 42 }}; const measurement: []DataPoint(i64) = measure[0..]; - var metrics = [1]Measurements{Measurements{ + var metrics = [1]Measurements{.{ .meterName = "my-meter", .instrumentKind = .Counter, .instrumentOptions = .{ .name = "my-counter" }, @@ -189,9 +197,8 @@ test "metric exporter force flush fails" { backgroundRunner, .{ me, &metrics }, ); - bg.detach(); + bg.join(); - std.time.sleep(10 * std.time.ns_per_ms); // sleep for 10 ms to ensure the background thread completed const e = me.forceFlush(0); try std.testing.expectError(MetricReadError.ForceFlushTimedOut, e); } @@ -210,7 +217,7 @@ pub const ExporterIface = struct { }; /// InMemoryExporter stores in memory the metrics data to be exported. -/// The memory representation uses the types defined in the library. +/// The metics' representation in memory uses the types defined in the library. pub const InMemoryExporter = struct { const Self = @This(); allocator: std.mem.Allocator, @@ -218,6 +225,8 @@ pub const InMemoryExporter = struct { // Implement the interface via @fieldParentPtr exporter: ExporterIface, + mx: std.Thread.Mutex = std.Thread.Mutex{}, + pub fn init(allocator: std.mem.Allocator) !*Self { const s = try allocator.create(Self); s.* = Self{ @@ -230,30 +239,35 @@ pub const InMemoryExporter = struct { return s; } pub fn deinit(self: *Self) void { - for (self.data.items) |d| { - var data = d; - data.deinit(self.allocator); + self.mx.lock(); + for (self.data.items) |*d| { + d.*.deinit(self.allocator); } self.data.deinit(); + self.mx.unlock(); + self.allocator.destroy(self); } + // Implements the ExportIFace interface only method. fn exportBatch(iface: *ExporterIface, metrics: []Measurements) MetricReadError!void { // Get a pointer to the instance of the struct that implements the interface. const self: *Self = @fieldParentPtr("exporter", iface); + self.mx.lock(); + defer self.mx.unlock(); - for (self.data.items) |d| { - var data = d; - data.deinit(self.allocator); + // Free up the allocated data points from the previous export. + for (self.data.items) |*d| { + d.*.deinit(self.allocator); } - self.data.clearRetainingCapacity(); + self.data.clearAndFree(); self.data = std.ArrayList(Measurements).fromOwnedSlice(self.allocator, metrics); } /// Read the metrics from the in memory exporter. - //FIXME might need a mutex in the exporter as the field might be accessed - // from a thread while it's being cleared in another (via exportBatch). pub fn fetch(self: *Self) ![]Measurements { + self.mx.lock(); + defer self.mx.unlock(); return self.data.items; } }; @@ -321,7 +335,7 @@ pub const PeriodicExportingReader = struct { exportTimeoutMillis: u64, // Lock helper to signal shutdown is in progress - shuttingDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + shuttingDown: bool = false, // This reader will collect metrics data from the MeterProvider. reader: *MetricReader, @@ -361,8 +375,10 @@ pub const PeriodicExportingReader = struct { } pub fn shutdown(self: *Self) void { - self.shuttingDown.store(true, .release); + // First signal the background exporter to stop collecting, then close the reader. + @atomicStore(bool, &self.shuttingDown, true, .release); self.reader.shutdown(); + // Only when the background collector has stopped we can destroy. self.allocator.destroy(self); } }; @@ -371,17 +387,17 @@ pub const PeriodicExportingReader = struct { // FIXME there is not a timeout for the collect operation. fn collectAndExport( reader: *MetricReader, - shuttingDown: *std.atomic.Value(bool), + shuttingDown: *bool, exportIntervalMillis: u64, // TODO: add a timeout for the export operation _: u64, ) void { // The execution should continue until the reader is shutting down - while (!shuttingDown.*.load(.acquire)) { + while (!@atomicLoad(bool, shuttingDown, .acquire)) { if (reader.meterProvider) |_| { // This will also call exporter.exportBatch() every interval. reader.collect() catch |e| { - std.debug.print("PeriodicExportingReader: reader collect failed: {?}\n", .{e}); + std.debug.print("PeriodicExportingReader: collecting failed on reader: {?}\n", .{e}); }; } else { std.debug.print("PeriodicExportingReader: no meter provider is registered with this MetricReader {any}\n", .{reader}); diff --git a/src/sdk/metrics/reader.zig b/src/sdk/metrics/reader.zig index 5d43355..e7154a1 100644 --- a/src/sdk/metrics/reader.zig +++ b/src/sdk/metrics/reader.zig @@ -49,7 +49,7 @@ pub const MetricReader = struct { temporality: TemporalitySelector = view.DefaultTemporalityFor, aggregation: AggregationSelector = view.DefaultAggregationFor, // Signal that shutdown has been called. - hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + hasShutDown: bool = false, mx: std.Thread.Mutex = std.Thread.Mutex{}, const Self = @This(); @@ -74,28 +74,29 @@ pub const MetricReader = struct { } pub fn collect(self: *Self) !void { + if (@atomicLoad(bool, &self.hasShutDown, .acquire)) { + // When shutdown has already been called, collect is a no-op. + return; + } if (!self.mx.tryLock()) { return MetricReadError.ConcurrentCollectNotAllowed; } defer self.mx.unlock(); - - if (self.hasShutDown.load(.acquire)) { - // When shutdown has already been called, collect is a no-op. - return; - } var toBeExported = std.ArrayList(Measurements).init(self.allocator); defer toBeExported.deinit(); if (self.meterProvider) |mp| { // Collect the data from each meter provider. - // TODO: extract MeasurmentsData from all meters and accumulate them with Meter attributes. - // MeasurementsData can be ported much more easilty to protobuf structs during export. + // Measurements can be ported to protobuf structs during OTLP export. var meters = mp.meters.valueIterator(); while (meters.next()) |meter| { - const measurements: []Measurements = try AggregatedMetrics.fetch(self.allocator, meter, self.aggregation); - defer self.allocator.free(measurements); - + const measurements: []Measurements = AggregatedMetrics.fetch(self.allocator, meter, self.aggregation) catch |err| { + std.debug.print("MetricReader: error aggregating data points from meter {s}: {?}", .{ meter.name, err }); + continue; + }; + // this makes a copy of the measurements to the array list try toBeExported.appendSlice(measurements); + self.allocator.free(measurements); } //TODO: apply temporality before exporting, optionally keeping state in the reader. @@ -105,7 +106,7 @@ pub const MetricReader = struct { // Export the metrics data through the exporter. // The exporter will own the metrics and should free it - // by calling deinit() on the MeterMeasurements once done. + // by calling deinit() on the Measurements once done. // MetricExporter must be built with the same allocator as MetricReader // to ensure that the memory is managed correctly. const owned = try toBeExported.toOwnedSlice(); @@ -120,10 +121,10 @@ pub const MetricReader = struct { } pub fn shutdown(self: *Self) void { + @atomicStore(bool, &self.hasShutDown, true, .release); self.collect() catch |e| { std.debug.print("MetricReader shutdown: error while collecting metrics: {?}\n", .{e}); }; - self.hasShutDown.store(true, .release); self.exporter.shutdown(); self.allocator.destroy(self); } @@ -164,8 +165,7 @@ test "metric reader collects data from meter provider" { try reader.collect(); - const data = try inMem.fetch(); - defer std.testing.allocator.free(data); + _ = try inMem.fetch(); } fn deltaTemporality(_: Kind) view.Temporality {