Skip to content

Commit 2b6ac38

Browse files
committed
wip: JSON exporter
1 parent 545008c commit 2b6ac38

File tree

6 files changed

+136
-62
lines changed

6 files changed

+136
-62
lines changed

dont-merge/fake-charm.py

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ def _on_setup_tracing(self, event: ops.SetupTracingEvent) -> None:
7070
def _on_start(self, event: ops.StartEvent) -> None:
7171
"""Dummy docstring."""
7272
self.dummy_load(event, 0.0025)
73-
# FIXME https://github.com/canonical/operator/issues/1561
7473
event.defer()
7574

7675
def _on_db_ready(self, event: DatabaseReadyEvent) -> None:

ops/_tracing/__init__.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,26 @@ def setup_tracing(charm_class_name: str) -> None:
4242
def set_tracing_destination(
4343
*,
4444
url: str | None,
45+
# FIXME: API design choice, decide on CA semantics:
46+
# - a local path to a file with CA data
47+
# - or the CA data itself?
48+
#
49+
# Sadly Requests `verify=` kwarg accepts only:
50+
# - bool: use local `certifi` certs if True
51+
# - str: path to a file (PEM) or a directory (processed with c_rehash)
52+
#
53+
# If we plan to go for own exporter (JSON, etc.,) we should design for the future.
54+
#
55+
# It's not that hard to convert one to another, and yet...
4556
ca: str | None = None,
4657
) -> None:
47-
"""Configure the destination service for tracing data."""
58+
"""Configure the destination service for tracing data.
59+
60+
Args:
61+
url: The URL of the telemetry service to send tracing data to.
62+
ca: The local path (?) to a CA bundle for the service above.
63+
Only in use if the URL is an HTTPS URL.
64+
"""
4865
if not export:
4966
return
5067
export.set_tracing_destination(url=url, ca=ca)

ops/_tracing/buffer.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class Buffer:
8181
"""tracing data ids buffered during this dispatch invocation."""
8282
observed = False
8383
"""Marks that data from this dispatch invocation has been marked observed."""
84-
stored: int|None = None
84+
stored: int | None = None
8585

8686
def __init__(self, path: str):
8787
self.path = path
@@ -252,7 +252,8 @@ def remove(self, id_: int) -> None:
252252
SELECT (length(data)+4095)/4096*4096
253253
FROM tracing
254254
WHERE id = ?
255-
"""
255+
""",
256+
(id_,),
256257
).fetchone()
257258

258259
if not row:

ops/_tracing/export.py

+109-58
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
import contextlib
1717
import logging
1818
import os
19+
import threading
1920
import time
2021
from typing import Sequence
2122

2223
from opentelemetry.exporter.otlp.proto.common._internal import trace_encoder
2324
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
25+
from opentelemetry.exporter.zipkin.json import ZipkinExporter # type: ignore
2426
from opentelemetry.instrumentation.urllib import URLLibInstrumentor
2527
from opentelemetry.sdk.resources import Resource
2628
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
@@ -35,78 +37,100 @@
3537
# Trace `urllib` usage when talking to Pebble
3638
URLLibInstrumentor().instrument()
3739

38-
_OTLP_SPAN_EXPORTER_TIMEOUT = 1 # seconds
40+
# NOTE: nominally int, although float would work just as well in practice
41+
EXPORTER_TIMEOUT: int = 1 # seconds
3942
"""How much to give OTLP span exporter has to push traces to the backend."""
4043

41-
SENDOUT_FACTOR = 2
42-
"""How much buffered chunks to send out for each incoming chunk."""
44+
SENDOUT_FACTOR: int = 2
45+
"""How many buffered chunks to send out for each incoming chunk."""
4346

44-
# FIXME: this creates a separate file next to the CHARM_STATE_FILE
45-
# We could stuff both kinds of data into the same file, I guess?
46-
BUFFER_FILE = '.tracing-data.db'
47-
# Currently ops.storage keeps one long transaction open for the duration of the
48-
# the dispatch, which means we can't use the same file from another thread.
49-
# BUFFER_FILE = '.unit-state.db'
47+
BUFFER_FILE: str = '.tracing-data.db'
48+
"""Name of the file whither data is buffered, located next to .unit-state.db."""
5049

5150

51+
logger = logging.getLogger(__name__)
5252
_exporter: ProxySpanExporter | None = None
5353

5454

55+
# NOTE: OTEL SDK suppresses errors while exporting data
56+
# TODO: decide if we need to remove this before going to prod
57+
logger.addHandler(logging.StreamHandler())
58+
59+
5560
class ProxySpanExporter(SpanExporter):
5661
real_exporter: OTLPSpanExporter | None = None
62+
zipkin_exporter: ZipkinExporter | None = None
63+
settings: tuple[str | None, str | None] = (None, None)
5764

5865
def __init__(self, buffer_path: str):
5966
self.buffer = ops._tracing.buffer.Buffer(buffer_path)
67+
self.lock = threading.Lock()
6068

6169
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
6270
"""Export a batch of telemetry data.
6371
6472
Note: to avoid data loops or recursion, this function cannot be instrumented.
6573
"""
66-
with suppress_juju_log_handler():
67-
# Note:
68-
# this is called in a helper thread, which is daemonic,
69-
# the MainThread will wait at most 10s for this thread.
70-
# Margins:
71-
# - 1s safety margin
72-
# - 1s for buffered data time overhang
73-
# - 2s for live data
74-
deadline = time.monotonic() + 6
75-
76-
assert spans # the BatchSpanProcessor won't call us if there's no data
77-
# TODO: this will change in the JSON experiment
78-
data: bytes = trace_encoder.encode_spans(spans).SerializePartialToString()
79-
rv = self.buffer.pump(data)
80-
assert rv
81-
self.do_export(*rv)
82-
83-
for _ in range(SENDOUT_FACTOR - 1):
84-
if time.monotonic() > deadline:
85-
break
86-
if not (rv := self.buffer.pump()):
87-
break
74+
try:
75+
with suppress_juju_log_handler():
76+
# Note:
77+
# this is called in a helper thread, which is daemonic,
78+
# the MainThread will wait at most 10s for this thread.
79+
# Margins:
80+
# - 1s safety margin
81+
# - 1s for buffered data time overhang
82+
# - 2s for live data
83+
deadline = time.monotonic() + 6
84+
85+
assert spans # the BatchSpanProcessor won't call us if there's no data
86+
# TODO: this will change in the JSON experiment
87+
data: bytes = trace_encoder.encode_spans(spans).SerializePartialToString()
88+
jsons = [s.to_json(indent=None) for s in spans] # type: ignore
89+
f'{{"resourceSpans": [{",".join(jsons)}]}}'
90+
rv = self.buffer.pump(data)
91+
assert rv
8892
self.do_export(*rv)
8993

90-
return SpanExportResult.SUCCESS
94+
for _ in range(SENDOUT_FACTOR - 1):
95+
if time.monotonic() > deadline:
96+
break
97+
if not (rv := self.buffer.pump()):
98+
break
99+
self.do_export(*rv)
100+
101+
url, ca = self.settings
102+
assert url
103+
url = url.replace('4318', '4317')
104+
print(url, ca)
105+
# rv = requests.post(
106+
# url,
107+
# data=json_payload,
108+
# headers= {"Content-Type": "application/json"}, verify=ca,)
109+
# print(rv)
110+
assert self.zipkin_exporter
111+
print(self.zipkin_exporter.export(spans))
112+
return SpanExportResult.SUCCESS
113+
except Exception:
114+
logger.exception('export')
115+
raise
91116

92117
def do_export(self, buffered_id: int, data: bytes) -> None:
93118
"""Export buffered data and remove it from the buffer on success."""
94119
# TODO: this will change in the JSON experiment
95-
if self.real_exporter and self.real_exporter._export(data).ok:
120+
exporter = self.real_exporter
121+
return
122+
if exporter and exporter._export(data).ok:
96123
self.buffer.remove(buffered_id)
97124

98125
def shutdown(self) -> None:
99126
"""Shut down the exporter."""
100-
if self.real_exporter:
101-
self.real_exporter.shutdown()
127+
if exporter := self.real_exporter:
128+
exporter.shutdown()
102129

103130
def force_flush(self, timeout_millis: int = 30000) -> bool:
104131
"""No-op, as the real exporter doesn't buffer."""
105132
return True
106133

107-
def set_real_exporter(self, exporter: OTLPSpanExporter) -> None:
108-
self.real_exporter = exporter
109-
110134

111135
@contextlib.contextmanager
112136
def suppress_juju_log_handler():
@@ -132,6 +156,16 @@ def setup_tracing(charm_class_name: str) -> None:
132156

133157
resource = Resource.create(
134158
attributes={
159+
# https://opentelemetry.io/docs/languages/sdk-configuration/general/
160+
# https://github.com/open-telemetry/semantic-conventions/tree/main/docs/resource#semantic-attributes-with-dedicated-environment-variable
161+
#
162+
# OTEL defines some standard-ish attributes:
163+
# service.name required
164+
# service.instance.id recommended
165+
# service.namespace recommended -- maybe model name?
166+
# service.version recommended
167+
# Following same attribute names as charm_tracing lib
168+
# FIXME: decide if it makes sense
135169
'service.name': service_name,
136170
'compose_service': service_name, # FIXME why is this copy needed?
137171
'charm_type': charm_class_name,
@@ -153,33 +187,50 @@ def setup_tracing(charm_class_name: str) -> None:
153187
set_tracer_provider(provider)
154188

155189

156-
# FIXME make it very cheap to call this method a second time with same arguments
157190
def set_tracing_destination(
158191
*,
159192
url: str | None,
160193
ca: str | None,
161194
) -> None:
162-
# FIXME needs a threading.Lock
163-
# or access to underlying BatchXXX lock
164-
#
165-
# - check if settings are exactly same, do nothing in that case
166-
# - replace current exported with a new exporter
195+
# FIXME only if it's a path, obv...
196+
# should we also check that this path exists?
167197
if ca is not None and not ca.startswith('/'):
168198
raise ValueError(f'{ca=} must be an absolute path')
169-
assert _exporter
170-
171-
# real exporter, hardcoded for now
172-
real_exporter = OTLPSpanExporter(url, timeout=1)
173-
# This is actually the max delay value in the sequence 1, 2, ..., MAX
174-
# Set to 1 to disable sending live data (buffered data is still eventually sent)
175-
# Set to 2 (or more) to enable sending live data (after buffered)
176-
#
177-
# _MAX_RETRY_TIMEOUT = 2 with timeout=1 means:
178-
# - 1 attempt to send live, 1s sleep in the worst case
179-
# _MAX_RETRY_TIMEOUT = 3 or 4 with timeout=1 means:
180-
# - 1st attempt, 1s sleep, 2nd attempt, 1s sleep in the worst case
181-
real_exporter._MAX_RETRY_TIMEOUT = 2 # pyright: ignore[reportAttributeAccessIssue]
182-
_exporter.set_real_exporter(real_exporter)
199+
200+
assert _exporter, 'tracing has not been set up'
201+
with _exporter.lock:
202+
if (url, ca) != _exporter.settings:
203+
if url:
204+
# real exporter, hardcoded for now
205+
real_exporter = OTLPSpanExporter(url, timeout=EXPORTER_TIMEOUT)
206+
# FIXME: shouldn't be hardcoded...
207+
# FIXME API design: if it OK to force the protocol and endpoint
208+
# switch onto the charmers, our users?
209+
#
210+
# OTLP protobuf URL is host:4318/v1/traces
211+
# Zipkin v2 JSON URL is host:9411/api/v2/spans
212+
#
213+
json_url = 'http://localhost:9411/api/v2/spans'
214+
# TODO: session=<custom session that groks ca= better>
215+
zipkin_exporter = ZipkinExporter(
216+
endpoint=json_url, timeout=EXPORTER_TIMEOUT
217+
) # FIXME timeout, etc
218+
# This is actually the max delay value in the sequence 1, 2, ..., MAX
219+
# Set to 1 to disable sending live data (buffered data is still eventually sent)
220+
# Set to 2 (or more) to enable sending live data (after buffered)
221+
#
222+
# _MAX_RETRY_TIMEOUT = 2 with timeout=1 means:
223+
# - 1 attempt to send live, 1s sleep in the worst case
224+
# _MAX_RETRY_TIMEOUT = 3 or 4 with timeout=1 means:
225+
# - 1st attempt, 1s sleep, 2nd attempt, 1s sleep in the worst case
226+
real_exporter._MAX_RETRY_TIMEOUT = 2 # pyright: ignore[reportAttributeAccessIssue]
227+
else:
228+
real_exporter = zipkin_exporter = None
229+
230+
_exporter.real_exporter = real_exporter
231+
_exporter.zipkin_exporter = zipkin_exporter
232+
_exporter.settings = (url, ca)
233+
183234
_exporter.buffer.mark_observed()
184235

185236

ops/charm.py

+5
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,11 @@ def add_status(self, status: model.StatusBase):
11801180
model_.unit._collected_statuses.append(status)
11811181

11821182

1183+
# FIXME: API design choice
1184+
# Should we make this more generic?
1185+
# - call this a TelemetryEvent / TelemeryConfigEvent / SetupTelemetryEvent
1186+
# - provide .set_tracing_destination() in this PR
1187+
# - later, perhaps .set_logging_xxx() and .set_metrics_yyy()?
11831188
class SetupTracingEvent(LifecycleEvent):
11841189
"""FIXME docstring."""
11851190

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ testing = [
4444
]
4545
tracing = [
4646
"opentelemetry-exporter-otlp-proto-http~=1.30",
47+
"opentelemetry-exporter-zipkin-json~=1.30.0",
4748
"opentelemetry-instrumentation-urllib~=0.51b0", # must match the above
4849
]
4950
# Empty for now, because Harness is bundled with the base install, but allow

0 commit comments

Comments
 (0)