Skip to content

Commit 9dfe605

Browse files
authored
feat: add log handler that can send stuff to OTel handlers (#1)
- It's implemented as an [OTP log handler](https://hexdocs.pm/logger/Logger.html#module-erlang-otp-handlers) - It reuses [overload protection](https://www.erlang.org/doc/apps/kernel/logger_chapter#protecting-the-handler-from-overload) employed by the console logger - It batches log events before pushing them to OTel server, with 10000 event batches or 5 second debounce windows - It uses a Finch pool to send the events, with a configurable concurrency (of 10 by default) to send a batch but don't block until HTTP request is successful. At most 10 batches can be in flight, at which point the log handler blocks until at least one spot is opened - The overload protection is configured to not have `:sync` mode described in the aforementioned docs, because blocking a logging caller until HTTP call resolves is unreasonable. Thus it's employed in 2 modes: async, or flush - if message queue of the process reaches a configured boundary (200 messages), it will be flushed to not result in memory leaks.
1 parent a2f4c88 commit 9dfe605

13 files changed

+1033
-137
lines changed

lib/otel_metric_exporter.ex

+20-49
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
defmodule OtelMetricExporter do
22
use Supervisor
33
require Logger
4+
alias OtelMetricExporter.OtelApi
45
alias OtelMetricExporter.MetricStore
56
alias Telemetry.Metrics
67

@@ -44,55 +45,25 @@ defmodule OtelMetricExporter do
4445
]
4546

4647
@options_schema NimbleOptions.new!(
47-
metrics: [
48-
type: {:list, {:or, for(x <- @supported_metrics, do: {:struct, x})}},
49-
type_spec: quote(do: list(Metrics.t())),
50-
required: true,
51-
doc: "List of telemetry metrics to track."
52-
],
53-
otlp_endpoint: [
54-
type: :string,
55-
required: true,
56-
subsection: "OTLP transport",
57-
doc: "Endpoint to send metrics to."
58-
],
59-
otlp_protocol: [
60-
type: {:in, [:http_protobuf]},
61-
type_spec: quote(do: protocol()),
62-
default: :http_protobuf,
63-
subsection: "OTLP transport",
64-
doc:
65-
"Protocol to use for OTLP export. Currently only :http_protobuf and :http_json are supported."
66-
],
67-
otlp_headers: [
68-
type: {:map, :string, :string},
69-
default: %{},
70-
subsection: "OTLP transport",
71-
doc: "Headers to send with OTLP requests."
72-
],
73-
otlp_compression: [
74-
type: {:in, [:gzip, nil]},
75-
default: :gzip,
76-
type_spec: quote(do: compression()),
77-
subsection: "OTLP transport",
78-
doc:
79-
"Compression to use for OTLP requests. Allowed values are `:gzip` and `nil`."
80-
],
81-
resource: [
82-
type: :map,
83-
default: %{},
84-
doc: "Resource attributes to send with metrics."
85-
],
86-
export_period: [
87-
type: :pos_integer,
88-
default: :timer.minutes(1),
89-
doc: "Period in milliseconds between metric exports."
90-
],
91-
name: [
92-
type: :atom,
93-
default: :otel_metric_exporter,
94-
doc: "If you require multiple exporters, give each exporter a unique name."
95-
]
48+
[
49+
metrics: [
50+
type: {:list, {:or, for(x <- @supported_metrics, do: {:struct, x})}},
51+
type_spec: quote(do: list(Metrics.t())),
52+
required: true,
53+
doc: "List of telemetry metrics to track."
54+
],
55+
export_period: [
56+
type: :pos_integer,
57+
default: :timer.minutes(1),
58+
doc: "Period in milliseconds between metric exports."
59+
],
60+
name: [
61+
type: :atom,
62+
default: :otel_metric_exporter,
63+
doc:
64+
"If you require multiple exporters, give each exporter a unique name."
65+
]
66+
] ++ OtelApi.public_options()
9667
)
9768

9869
@type option() :: unquote(NimbleOptions.option_typespec(@options_schema))

lib/otel_metric_exporter/application.ex

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule OtelMetricExporter.Application do
22
use Application
3+
require Logger
34

45
@impl true
56
def start(_type, _args) do
+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
defmodule OtelMetricExporter.LogAccumulator do
2+
@moduledoc false
3+
require Logger
4+
alias OtelMetricExporter.Opentelemetry.Proto.Logs.V1.LogRecord
5+
alias OtelMetricExporter.OtelApi
6+
# This module is used as a callback for the logger_olp module.
7+
# `:logger_olp` expects certain callbacks, but doesn't define an explicit
8+
# behaviour for them, so we're using a GenServer behaviour as an approximation.
9+
#
10+
# On top of regular GenServer callbacks, `:logger_olp` expects:
11+
# - `handle_load(event, state) :: state` - required, this is a new log record
12+
# - `reset_state(state) :: state` - optional
13+
# - `notify(handler_event, state) :: state` - optional
14+
#
15+
# Handler events are defined in `:logger_olp` as:
16+
# ```elixir
17+
# handler_event ::
18+
# :idle
19+
# | :restart
20+
# | {:flushed, flushed :: non_neg_integer}
21+
# | {:mode_change, old_mode :: mode, new_mode :: mode}
22+
# mode :: :drop | :sync | :async
23+
# ```
24+
#
25+
# `logger_olp` will switch between sync and async modes depending on the message
26+
# queue length. Sync mode means the process that tries to send the logs will block
27+
# until the logs are sent. Async mode means the process will return immediately and
28+
# send the logs in the background. When this module is used as a callback,
29+
# `logger_olp` should be configured to never change into sync mode, as that will block
30+
# one of the processes emitting the log until an HTTP request is resolved, which
31+
# is not what we want.
32+
33+
@behaviour GenServer
34+
35+
@schema NimbleOptions.new!(
36+
[
37+
metadata: [
38+
type: {:list, :atom},
39+
default: [],
40+
doc: "A list of atoms from metadata to attach as attribute to the log event"
41+
],
42+
metadata_map: [
43+
type: {:map, :atom, :string},
44+
default: %{},
45+
doc: """
46+
Remapping of metadata keys to different attribute names.
47+
Example: Plug adds a `request_id` metadata key to log events, but
48+
semantic convention for OTel is to use `http.request.id`. This can be
49+
achieved by specifying this field to `%{request_id: "http.request.id"}`
50+
"""
51+
],
52+
debounce_ms: [
53+
type: :non_neg_integer,
54+
default: 5_000,
55+
doc: "Period to accumulate logs before sending them"
56+
],
57+
max_buffer_size: [
58+
type: :non_neg_integer,
59+
default: 10_000,
60+
doc: "Max amount of log events to store before sending them"
61+
]
62+
] ++ OtelApi.public_options()
63+
)
64+
65+
@doc false
66+
def options_schema(), do: @schema
67+
68+
defdelegate prepare_log_event(event, config), to: OtelMetricExporter.Protocol
69+
70+
def check_config(config) do
71+
with {:ok, validated} <-
72+
NimbleOptions.validate(Map.merge(config, OtelApi.defaults()), @schema) do
73+
{:ok, validated}
74+
end
75+
end
76+
77+
def init(config) do
78+
Process.flag(:trap_exit, true)
79+
# Store the handler-specific Finch name in the state
80+
{:ok, api, rest} = OtelApi.new(config)
81+
82+
{:ok,
83+
Map.merge(rest, %{
84+
api: api,
85+
event_queue: [],
86+
queue_len: 0,
87+
timer_ref: nil,
88+
pending_tasks: %{}
89+
})}
90+
end
91+
92+
def handle_load(event, state) do
93+
state
94+
|> add_event(event)
95+
|> send_schedule_or_block()
96+
end
97+
98+
def handle_cast({:config_changed, config}, state) do
99+
{:ok, api, rest} = OtelApi.new(config)
100+
101+
{:noreply, Map.merge(state, Map.merge(rest, %{api: api}))}
102+
end
103+
104+
# Do nothing on an empty queue
105+
def handle_info(:send_log_batch, %{event_queue: []} = state), do: {:noreply, state}
106+
107+
def handle_info(:send_log_batch, state)
108+
when map_size(state.pending_tasks) < state.api.otlp_concurrent_requests do
109+
# We can spin up a new task to send the logs
110+
{:noreply, send_events_via_task(state)}
111+
end
112+
113+
# No task budget, so we're blocking on this message
114+
def handle_info(:send_log_batch, state) do
115+
{:noreply, state |> block_until_any_task_ready() |> send_events_via_task()}
116+
end
117+
118+
def handle_info({ref, result}, state)
119+
when is_map_key(state.pending_tasks, ref) do
120+
if match?({:error, _}, result) do
121+
Logger.debug(
122+
"Error sending logs to #{state.api.otlp_endpoint}: #{inspect(elem(result, 1))}"
123+
)
124+
end
125+
126+
# Remove the task from the pending tasks map
127+
{:noreply, state}
128+
end
129+
130+
def handle_info({:DOWN, ref, :process, _, _}, state)
131+
when is_map_key(state.pending_tasks, ref) do
132+
# Remove the task from the pending tasks map
133+
{:noreply, %{state | pending_tasks: Map.delete(state.pending_tasks, ref)}}
134+
end
135+
136+
def terminate(_reason, state) do
137+
# Send any remaining logs if possible
138+
send_events_via_task(state)
139+
end
140+
141+
# We don't care about event order# If we have a here, because it's all timestamped
142+
# and up to the log display to order
143+
defp add_event(%{event_queue: queue, queue_len: len} = state, event)
144+
when is_struct(event, LogRecord),
145+
do: %{state | event_queue: [event | queue], queue_len: len + 1}
146+
147+
# If we have the maximum number of concurrent requests, block
148+
defp send_schedule_or_block(%{pending_tasks: pending_tasks} = state)
149+
when map_size(pending_tasks) == state.api.otlp_concurrent_requests do
150+
state
151+
|> block_until_any_task_ready()
152+
|> send_schedule_or_block()
153+
end
154+
155+
# If we have enough events to send, send immediately
156+
defp send_schedule_or_block(%{queue_len: len} = state)
157+
when len >= state.max_buffer_size do
158+
# Send the logs immediately
159+
send_events_via_task(state)
160+
end
161+
162+
# If we have a schedule already, do nothing
163+
defp send_schedule_or_block(%{timer_ref: ref} = state) when not is_nil(ref), do: state
164+
165+
defp send_schedule_or_block(%{debounce_ms: debounce_ms} = state) do
166+
timer_ref = Process.send_after(self(), :send_log_batch, debounce_ms)
167+
168+
%{state | timer_ref: timer_ref}
169+
end
170+
171+
defp block_until_any_task_ready(%{pending_tasks: pending_tasks} = state) do
172+
# Block via a receive, waiting for a completion message or a down message
173+
# from a task that we started
174+
receive do
175+
{ref, _result} when is_map_key(pending_tasks, ref) ->
176+
# Remove the task from the pending tasks map
177+
%{state | pending_tasks: Map.delete(pending_tasks, ref)}
178+
179+
{:DOWN, ref, :process, _, _} when is_map_key(pending_tasks, ref) ->
180+
# Remove the task from the pending tasks map
181+
%{state | pending_tasks: Map.delete(pending_tasks, ref)}
182+
end
183+
end
184+
185+
defp send_events_via_task(%{api: api, event_queue: queue} = state) do
186+
task =
187+
Task.Supervisor.async_nolink(state.task_supervisor, OtelApi, :send_log_events, [
188+
api,
189+
queue
190+
])
191+
192+
%{
193+
state
194+
| event_queue: [],
195+
queue_len: 0,
196+
pending_tasks: Map.put(state.pending_tasks, task.ref, :pending)
197+
}
198+
end
199+
end

0 commit comments

Comments
 (0)