Skip to content

Commit

Permalink
sdk: resolve memory leaks in exporter
Browse files Browse the repository at this point in the history
Signed-off-by: inge4pres <[email protected]>
  • Loading branch information
inge4pres committed Feb 3, 2025
1 parent 7b0d0ef commit 855f683
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 45 deletions.
76 changes: 46 additions & 30 deletions src/sdk/metrics/exporter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ pub const MetricExporter = struct {

allocator: std.mem.Allocator,
exporter: *ExporterIface,
hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),

var exportCompleted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false);
// Lock helper to signal shutdown and/or export is in progress
hasShutDown: bool = false,
exportCompleted: std.Thread.Mutex = std.Thread.Mutex{},

pub fn new(allocator: std.mem.Allocator, exporter: *ExporterIface) !*Self {
const s = try allocator.create(Self);
Expand All @@ -42,14 +43,14 @@ pub const MetricExporter = struct {
/// ExportBatch exports a batch of metrics data by calling the exporter implementation.
/// The passed metrics data will be owned by the exporter implementation.
pub fn exportBatch(self: *Self, metrics: []Measurements) ExportResult {
if (self.hasShutDown.load(.acquire)) {
if (@atomicLoad(bool, &self.hasShutDown, .acquire)) {
// When shutdown has already been called, calling export should be a failure.
// https://opentelemetry.io/docs/specs/otel/metrics/sdk/#shutdown-2
return ExportResult.Failure;
}
// Acquire the lock to ensure that forceFlush is waiting for export to complete.
_ = exportCompleted.load(.acquire);
defer exportCompleted.store(true, .release);
// Acquire the lock to signal to forceFlush to wait for export to complete.
self.exportCompleted.lock();
defer self.exportCompleted.unlock();

// Call the exporter function to process metrics data.
self.exporter.exportBatch(metrics) catch |e| {
Expand All @@ -60,24 +61,31 @@ pub const MetricExporter = struct {
}

// Ensure that all the data is flushed to the destination.
pub fn forceFlush(_: Self, timeout_ms: u64) !void {
pub fn forceFlush(self: *Self, timeout_ms: u64) !void {
const start = std.time.milliTimestamp(); // Milliseconds
const timeout: i64 = @intCast(timeout_ms);
while (std.time.milliTimestamp() < start + timeout) {
if (exportCompleted.load(.acquire)) {
if (self.exportCompleted.tryLock()) {
self.exportCompleted.unlock();
return;
} else std.time.sleep(std.time.ns_per_ms);
} else {
std.time.sleep(std.time.ns_per_ms);
}
}
return MetricReadError.ForceFlushTimedOut;
}

pub fn shutdown(self: *Self) void {
if (self.hasShutDown.load(.acquire)) {
// When shutdown has already been called, calling shutdown again is a no-op.
if (@atomicRmw(bool, &self.hasShutDown, .Xchg, true, .acq_rel)) {
return;
}
self.hasShutDown.store(true, .release);
// if (@atomicLoad(bool, &self.hasShutDown, .acquire)) {
// // When shutdown has already been called, calling shutdown again is a no-op.
// return;
// } else {
// @atomicStore(bool, &self.hasShutDown, true, .release);
self.allocator.destroy(self);
// }
}
};

Expand Down Expand Up @@ -115,7 +123,7 @@ test "metric exporter no-op" {

var measure = [1]DataPoint(i64){.{ .value = 42 }};
const measurement: []DataPoint(i64) = measure[0..];
var metrics = [1]Measurements{Measurements{
var metrics = [1]Measurements{.{
.meterName = "my-meter",
.instrumentKind = .Counter,
.instrumentOptions = .{ .name = "my-counter" },
Expand Down Expand Up @@ -189,9 +197,8 @@ test "metric exporter force flush fails" {
backgroundRunner,
.{ me, &metrics },
);
bg.detach();
bg.join();

std.time.sleep(10 * std.time.ns_per_ms); // sleep for 10 ms to ensure the background thread completed
const e = me.forceFlush(0);
try std.testing.expectError(MetricReadError.ForceFlushTimedOut, e);
}
Expand All @@ -210,14 +217,16 @@ pub const ExporterIface = struct {
};

/// InMemoryExporter stores in memory the metrics data to be exported.
/// The memory representation uses the types defined in the library.
/// The metics' representation in memory uses the types defined in the library.
pub const InMemoryExporter = struct {
const Self = @This();
allocator: std.mem.Allocator,
data: std.ArrayList(Measurements) = undefined,
// Implement the interface via @fieldParentPtr
exporter: ExporterIface,

mx: std.Thread.Mutex = std.Thread.Mutex{},

pub fn init(allocator: std.mem.Allocator) !*Self {
const s = try allocator.create(Self);
s.* = Self{
Expand All @@ -230,30 +239,35 @@ pub const InMemoryExporter = struct {
return s;
}
pub fn deinit(self: *Self) void {
for (self.data.items) |d| {
var data = d;
data.deinit(self.allocator);
self.mx.lock();
for (self.data.items) |*d| {
d.*.deinit(self.allocator);
}
self.data.deinit();
self.mx.unlock();

self.allocator.destroy(self);
}

// Implements the ExportIFace interface only method.
fn exportBatch(iface: *ExporterIface, metrics: []Measurements) MetricReadError!void {
// Get a pointer to the instance of the struct that implements the interface.
const self: *Self = @fieldParentPtr("exporter", iface);
self.mx.lock();
defer self.mx.unlock();

for (self.data.items) |d| {
var data = d;
data.deinit(self.allocator);
// Free up the allocated data points from the previous export.
for (self.data.items) |*d| {
d.*.deinit(self.allocator);
}
self.data.clearRetainingCapacity();
self.data.clearAndFree();
self.data = std.ArrayList(Measurements).fromOwnedSlice(self.allocator, metrics);
}

/// Read the metrics from the in memory exporter.
//FIXME might need a mutex in the exporter as the field might be accessed
// from a thread while it's being cleared in another (via exportBatch).
pub fn fetch(self: *Self) ![]Measurements {
self.mx.lock();
defer self.mx.unlock();
return self.data.items;
}
};
Expand Down Expand Up @@ -321,7 +335,7 @@ pub const PeriodicExportingReader = struct {
exportTimeoutMillis: u64,

// Lock helper to signal shutdown is in progress
shuttingDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
shuttingDown: bool = false,

// This reader will collect metrics data from the MeterProvider.
reader: *MetricReader,
Expand Down Expand Up @@ -361,8 +375,10 @@ pub const PeriodicExportingReader = struct {
}

pub fn shutdown(self: *Self) void {
self.shuttingDown.store(true, .release);
// First signal the background exporter to stop collecting, then close the reader.
@atomicStore(bool, &self.shuttingDown, true, .release);
self.reader.shutdown();
// Only when the background collector has stopped we can destroy.
self.allocator.destroy(self);
}
};
Expand All @@ -371,17 +387,17 @@ pub const PeriodicExportingReader = struct {
// FIXME there is not a timeout for the collect operation.
fn collectAndExport(
reader: *MetricReader,
shuttingDown: *std.atomic.Value(bool),
shuttingDown: *bool,
exportIntervalMillis: u64,
// TODO: add a timeout for the export operation
_: u64,
) void {
// The execution should continue until the reader is shutting down
while (!shuttingDown.*.load(.acquire)) {
while (!@atomicLoad(bool, shuttingDown, .acquire)) {
if (reader.meterProvider) |_| {
// This will also call exporter.exportBatch() every interval.
reader.collect() catch |e| {
std.debug.print("PeriodicExportingReader: reader collect failed: {?}\n", .{e});
std.debug.print("PeriodicExportingReader: collecting failed on reader: {?}\n", .{e});
};
} else {
std.debug.print("PeriodicExportingReader: no meter provider is registered with this MetricReader {any}\n", .{reader});
Expand Down
30 changes: 15 additions & 15 deletions src/sdk/metrics/reader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub const MetricReader = struct {
temporality: TemporalitySelector = view.DefaultTemporalityFor,
aggregation: AggregationSelector = view.DefaultAggregationFor,
// Signal that shutdown has been called.
hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
hasShutDown: bool = false,
mx: std.Thread.Mutex = std.Thread.Mutex{},

const Self = @This();
Expand All @@ -74,28 +74,29 @@ pub const MetricReader = struct {
}

pub fn collect(self: *Self) !void {
if (@atomicLoad(bool, &self.hasShutDown, .acquire)) {
// When shutdown has already been called, collect is a no-op.
return;
}
if (!self.mx.tryLock()) {
return MetricReadError.ConcurrentCollectNotAllowed;
}
defer self.mx.unlock();

if (self.hasShutDown.load(.acquire)) {
// When shutdown has already been called, collect is a no-op.
return;
}
var toBeExported = std.ArrayList(Measurements).init(self.allocator);
defer toBeExported.deinit();

if (self.meterProvider) |mp| {
// Collect the data from each meter provider.
// TODO: extract MeasurmentsData from all meters and accumulate them with Meter attributes.
// MeasurementsData can be ported much more easilty to protobuf structs during export.
// Measurements can be ported to protobuf structs during OTLP export.
var meters = mp.meters.valueIterator();
while (meters.next()) |meter| {
const measurements: []Measurements = try AggregatedMetrics.fetch(self.allocator, meter, self.aggregation);
defer self.allocator.free(measurements);

const measurements: []Measurements = AggregatedMetrics.fetch(self.allocator, meter, self.aggregation) catch |err| {
std.debug.print("MetricReader: error aggregating data points from meter {s}: {?}", .{ meter.name, err });
continue;
};
// this makes a copy of the measurements to the array list
try toBeExported.appendSlice(measurements);
self.allocator.free(measurements);
}

//TODO: apply temporality before exporting, optionally keeping state in the reader.
Expand All @@ -105,7 +106,7 @@ pub const MetricReader = struct {

// Export the metrics data through the exporter.
// The exporter will own the metrics and should free it
// by calling deinit() on the MeterMeasurements once done.
// by calling deinit() on the Measurements once done.
// MetricExporter must be built with the same allocator as MetricReader
// to ensure that the memory is managed correctly.
const owned = try toBeExported.toOwnedSlice();
Expand All @@ -120,10 +121,10 @@ pub const MetricReader = struct {
}

pub fn shutdown(self: *Self) void {
@atomicStore(bool, &self.hasShutDown, true, .release);
self.collect() catch |e| {
std.debug.print("MetricReader shutdown: error while collecting metrics: {?}\n", .{e});
};
self.hasShutDown.store(true, .release);
self.exporter.shutdown();
self.allocator.destroy(self);
}
Expand Down Expand Up @@ -164,8 +165,7 @@ test "metric reader collects data from meter provider" {

try reader.collect();

const data = try inMem.fetch();
defer std.testing.allocator.free(data);
_ = try inMem.fetch();
}

fn deltaTemporality(_: Kind) view.Temporality {
Expand Down

0 comments on commit 855f683

Please sign in to comment.