16
16
import contextlib
17
17
import logging
18
18
import os
19
+ import ssl
19
20
import threading
20
21
import time
21
- from typing import Any , Sequence
22
+ import urllib .request
23
+ from typing import Sequence
22
24
23
- from opentelemetry . exporter . otlp . proto . common . _internal import trace_encoder
24
- from opentelemetry . exporter . otlp . proto . http . trace_exporter import OTLPSpanExporter
25
+ # FIXME: single-file Python package can't be marked as py.typed
26
+ import otlp_json # type: ignore
25
27
from opentelemetry .instrumentation .urllib import URLLibInstrumentor
26
28
from opentelemetry .sdk .resources import Resource
27
29
from opentelemetry .sdk .trace import ReadableSpan , TracerProvider
33
35
import ops .jujucontext
34
36
import ops .log
35
37
38
+ # FIXME otlp_json is typed...
39
+ # https://github.com/python/typing/issues/1333
40
+
36
41
# Trace `urllib` usage when talking to Pebble
37
42
URLLibInstrumentor ().instrument ()
38
43
56
61
logger .addHandler (logging .StreamHandler ())
57
62
58
63
59
- def proto_to_json (data : Any ) -> str :
60
- """FIXME: move to own module and reimplement"""
61
- # xxx
62
- import base64
63
- import json
64
- from google .protobuf .json_format import MessageToDict
65
- dic = MessageToDict (data )
66
-
67
- for rs in dic ["resourceSpans" ]:
68
- for ss in rs ["scopeSpans" ]:
69
- for sp in ss ["spans" ]:
70
- for k in "parentSpanId spanId traceId" .split ():
71
- if k in sp :
72
- sp [k ] = base64 .b64decode (sp [k ]).hex ()
73
- sp ["kind" ] = {
74
- "SPAN_KIND_UNSPECIFIED" : 0 ,
75
- "SPAN_KIND_INTERNAL" : 1 ,
76
- "SPAN_KIND_SERVER" : 2 ,
77
- "SPAN_KIND_CLIENT" : 3 ,
78
- "SPAN_KIND_PRODUCER" : 4 ,
79
- "SPAN_KIND_CONSUMER" : 5 ,
80
- }[sp ["kind" ]]
81
-
82
- return json .dumps (dic )
83
-
84
-
85
-
86
64
class ProxySpanExporter (SpanExporter ):
87
- real_exporter : OTLPSpanExporter | None = None
88
65
settings : tuple [str | None , str | None ] = (None , None )
66
+ cache : dict [str | None , ssl .SSLContext ]
89
67
90
68
def __init__ (self , buffer_path : str ):
91
69
self .buffer = ops ._tracing .buffer .Buffer (buffer_path )
92
70
self .lock = threading .Lock ()
71
+ self .cache = {}
93
72
94
73
def export (self , spans : Sequence [ReadableSpan ]) -> SpanExportResult :
95
74
"""Export a batch of telemetry data.
@@ -110,15 +89,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
110
89
assert spans # the BatchSpanProcessor won't call us if there's no data
111
90
# TODO: this will change in the JSON experiment
112
91
# __import__("pdb").set_trace()
113
- data : bytes = proto_to_json (trace_encoder .encode_spans (spans )).encode ("utf-8" )
114
92
# FIXME can't use stock exporter, must DIY
115
- import requests
116
- r = requests .post ("http://localhost:4318/v1/traces" ,
117
- data = data ,
118
- headers = {"Content-Type" : "application/json" })
119
- r .raise_for_status ()
120
93
121
- rv = self .buffer .pump (data )
94
+ rv = self .buffer .pump (( otlp_json . encode_spans ( spans ), otlp_json . CONTENT_TYPE ) )
122
95
assert rv
123
96
self .do_export (* rv )
124
97
@@ -134,18 +107,65 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
134
107
logger .exception ('export' )
135
108
raise
136
109
137
- def do_export (self , buffered_id : int , data : bytes ) -> None :
110
+ def ssl_context (self , ca : str | None ) -> ssl .SSLContext :
111
+ if context := self .cache .get (ca ):
112
+ return context
113
+ context = self ._ssl_context (ca )
114
+ self .cache .clear ()
115
+ self .cache [ca ] = context
116
+ return context
117
+
118
+ def _ssl_context (self , ca : str | None ) -> ssl .SSLContext :
119
+ # FIXME: What should our protocol range be?
120
+ # this means TLS {v1, v1.1, v1.2}
121
+ context = ssl .SSLContext (ssl .PROTOCOL_TLS_CLIENT )
122
+ # FIXME: we should probably allow ca=None
123
+ # and then we'd pick up system or certifi certs?
124
+ assert ca
125
+ context .load_verify_locations (cadata = ca )
126
+ # FIXME: what's recommended?
127
+ # context.set_ciphers(...)
128
+ # FIXME: we need to set NPN if we're setting ALPN?
129
+ # Does this work the same way across Py 3.8~3.13?
130
+ context .set_npn_protocols ('http/1.1' )
131
+ context .set_alpn_protocols ('http/1.1' )
132
+ # TODO: check that we don't need these:
133
+ # .set_sni_callback
134
+ return context
135
+
136
+ def do_export (self , buffered_id : int , data : bytes , mime : str ) -> None :
138
137
"""Export buffered data and remove it from the buffer on success."""
139
- return
140
- # FIXME: this will change in the JSON experiment
141
- exporter = self .real_exporter
142
- if exporter and exporter ._export (data ).ok :
143
- self .buffer .remove (buffered_id )
138
+ print (self .settings , len (data ), mime )
139
+ url , ca = self .settings
140
+ if not url :
141
+ return
142
+
143
+ # FIXME cache
144
+
145
+ # FIXME: is this custom code worth it?
146
+ # or would it be easier and safer to use `requests`?
147
+ assert url .startswith (('http://' , 'https://' ))
148
+ context = self .ssl_context (ca ) if url .startswith ('https://' ) else None
149
+
150
+ with urllib .request .urlopen ( # noqa: S310
151
+ urllib .request .Request ( # noqa: S310
152
+ url ,
153
+ data = data ,
154
+ headers = {'Content-Type' : mime },
155
+ method = 'POST' ,
156
+ ),
157
+ context = context ,
158
+ ) as rv :
159
+ # from typing_extensions import reveal_type
160
+ # reveal_type(rv.status) # Any
161
+ #
162
+ # FIXME: .status requires Python 3.9+, WAT?
163
+ if rv .status < 300 :
164
+ self .buffer .remove (buffered_id )
144
165
145
166
def shutdown (self ) -> None :
146
167
"""Shut down the exporter."""
147
- if exporter := self .real_exporter :
148
- exporter .shutdown ()
168
+ pass
149
169
150
170
def force_flush (self , timeout_millis : int = 30000 ) -> bool :
151
171
"""No-op, as the real exporter doesn't buffer."""
@@ -243,42 +263,7 @@ def set_tracing_destination(
243
263
raise ValueError (f'{ ca = } must be an absolute path' )
244
264
245
265
assert _exporter , 'tracing has not been set up'
246
- with _exporter .lock :
247
- if (url , ca ) != _exporter .settings :
248
- if url :
249
- # real exporter, hardcoded for now
250
- real_exporter = OTLPSpanExporter (url , timeout = EXPORTER_TIMEOUT )
251
- # FIXME: shouldn't be hardcoded...
252
- # FIXME API design: if it OK to force the protocol and endpoint
253
- # switch onto the charmers, our users?
254
- #
255
- # OTLP protobuf URL is host:4318/v1/traces
256
- # Zipkin v2 JSON URL is host:9411/api/v2/spans
257
- #
258
- # FIXME: on the other hand, Jaeger 2 should accept OTLP JSON too
259
- # https://www.jaegertracing.io/docs/2.3/apis/#opentelemetry-protocol
260
- #
261
- # The real question is what COS and COS-lite accept.
262
- #
263
- # json_url = 'http://localhost:9411/api/v2/spans'
264
- # TODO: session=<custom session that groks ca= better>
265
- # zipkin_exporter = ZipkinExporter(
266
- # endpoint=json_url, timeout=EXPORTER_TIMEOUT
267
- # )
268
- # This is actually the max delay value in the sequence 1, 2, ..., MAX
269
- # Set to 1 to disable sending live data (buffered data is still eventually sent)
270
- # Set to 2 (or more) to enable sending live data (after buffered)
271
- #
272
- # _MAX_RETRY_TIMEOUT = 2 with timeout=1 means:
273
- # - 1 attempt to send live, 1s sleep in the worst case
274
- # _MAX_RETRY_TIMEOUT = 3 or 4 with timeout=1 means:
275
- # - 1st attempt, 1s sleep, 2nd attempt, 1s sleep in the worst case
276
- real_exporter ._MAX_RETRY_TIMEOUT = 2 # pyright: ignore[reportAttributeAccessIssue]
277
- else :
278
- real_exporter = None
279
-
280
- _exporter .real_exporter = real_exporter
281
- _exporter .settings = (url , ca )
266
+ _exporter .settings = (url , ca )
282
267
283
268
_exporter .buffer .mark_observed ()
284
269
0 commit comments