Skip to content

Commit 317175b

Browse files
committed
Add initial implementation of OpAMP
Just reports effective config at startup, but adds wiring for eventual dynamic config updates and reporting.
1 parent 6c95544 commit 317175b

File tree

12 files changed

+590
-8
lines changed

12 files changed

+590
-8
lines changed

.github/workflows/ci-main.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ jobs:
2727
steps:
2828
- uses: actions/checkout@v6
2929

30+
- name: Enable long paths (Windows)
31+
if: runner.os == 'Windows'
32+
run: git config --system core.longpaths true
33+
3034
- name: Set up Python ${{ matrix.python-version }}
3135
uses: actions/setup-python@v6
3236
with:

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ dependencies = [
3333
"opentelemetry-instrumentation-system-metrics==0.60b1",
3434
"opentelemetry-semantic-conventions==0.60b1",
3535
"protobuf>=6.31.1", # not our direct dep, prevents installing vulnerable proto versions (CVE‑2025‑4565)
36+
"opentelemetry-opamp-client @ git+https://github.com/open-telemetry/opentelemetry-python-contrib.git@88e5bfc630baa9a0789dd28694319afd506eae09#subdirectory=opamp/opentelemetry-opamp-client", # unreleased, pinned to merge commit of PR #3635
37+
"requests>=2.20.0", # OpAMP: HTTP transport (already transitive dep, making explicit)
3638
]
3739

3840
[project.urls]
@@ -46,6 +48,9 @@ configurator = "splunk_otel.configurator:SplunkConfigurator"
4648
[project.entry-points.opentelemetry_distro]
4749
splunk_distro = "splunk_otel.distro:SplunkDistro"
4850

51+
[tool.hatch.metadata]
52+
allow-direct-references = true
53+
4954
[tool.hatch.version]
5055
path = "src/splunk_otel/__about__.py"
5156

src/splunk_otel/callgraphs/__init__.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,30 @@
88
SPLUNK_SNAPSHOT_SAMPLING_INTERVAL,
99
)
1010

11+
_DEFAULT_SNAPSHOT_SAMPLING_INTERVAL = 10
1112

12-
def _configure_callgraphs_if_enabled(env=None):
13+
14+
class CallgraphsState:
15+
"""Runtime state of the Callgraph (aka snapshot) profiler, for OpAMP reporting."""
16+
17+
def __init__(self, processor: "CallgraphsSpanProcessor | None", interval: int):
18+
self._processor = processor
19+
self._interval = interval
20+
21+
def is_enabled(self) -> bool:
22+
return self._processor is not None
23+
24+
def interval(self) -> int:
25+
if self._processor is not None:
26+
return self._processor.interval_millis()
27+
return self._interval
28+
29+
30+
def _configure_callgraphs_if_enabled(env=None) -> CallgraphsState:
1331
env = env or Env()
32+
interval = env.getint(SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, _DEFAULT_SNAPSHOT_SAMPLING_INTERVAL)
1433
if env.is_true(SPLUNK_SNAPSHOT_PROFILER_ENABLED):
15-
trace.get_tracer_provider().add_span_processor(
16-
CallgraphsSpanProcessor(env.getval(OTEL_SERVICE_NAME), env.getint(SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, 10))
17-
)
34+
processor = CallgraphsSpanProcessor(env.getval(OTEL_SERVICE_NAME), interval)
35+
trace.get_tracer_provider().add_span_processor(processor)
36+
return CallgraphsState(processor, interval)
37+
return CallgraphsState(None, interval)

src/splunk_otel/callgraphs/span_processor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ def on_end(self, span: ReadableSpan) -> None:
7474
if len(self._span_id_to_trace_id) == 0:
7575
self._profiler.pause_after(60.0)
7676

77+
def interval_millis(self) -> int:
78+
return int(self._profiler.interval_seconds * 1000)
79+
7780
def shutdown(self) -> None:
7881
self._profiler.stop()
7982

src/splunk_otel/configurator.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,52 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from opentelemetry import trace
1516
from opentelemetry.sdk._configuration import _OTelSDKConfigurator
1617

1718
from splunk_otel.profile import _start_profiling_if_enabled
1819
from splunk_otel.callgraphs import _configure_callgraphs_if_enabled
20+
from splunk_otel.opamp import _start_opamp_if_enabled
21+
from splunk_otel.opamp.config_registry import ConfigRegistry
22+
from splunk_otel.env import (
23+
Env,
24+
SPLUNK_PROFILER_ENABLED,
25+
SPLUNK_PROFILER_CALL_STACK_INTERVAL,
26+
SPLUNK_SNAPSHOT_PROFILER_ENABLED,
27+
SPLUNK_SNAPSHOT_SAMPLING_INTERVAL,
28+
OTEL_SERVICE_NAME,
29+
OTEL_EXPORTER_OTLP_ENDPOINT,
30+
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
31+
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
32+
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
33+
)
1934

2035

2136
class SplunkConfigurator(_OTelSDKConfigurator):
2237
def _configure(self, **kwargs):
2338
super()._configure(**kwargs)
24-
_start_profiling_if_enabled()
25-
_configure_callgraphs_if_enabled()
39+
40+
env = Env()
41+
profiling = _start_profiling_if_enabled(env)
42+
callgraphs = _configure_callgraphs_if_enabled(env)
43+
44+
registry = self._build_registry(profiling, callgraphs, env)
45+
resource = trace.get_tracer_provider().resource
46+
_start_opamp_if_enabled(resource.attributes, registry, env)
47+
48+
def _build_registry(self, profiling_state, callgraphs_state, env) -> ConfigRegistry:
49+
registry = ConfigRegistry()
50+
registry.register(SPLUNK_PROFILER_ENABLED, getter=lambda: str(profiling_state.is_enabled()).lower())
51+
registry.register(SPLUNK_PROFILER_CALL_STACK_INTERVAL, getter=lambda: str(profiling_state.interval_millis()))
52+
registry.register(SPLUNK_SNAPSHOT_PROFILER_ENABLED, getter=lambda: str(callgraphs_state.is_enabled()).lower())
53+
registry.register(SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, getter=lambda: str(callgraphs_state.interval()))
54+
55+
for key in (
56+
OTEL_SERVICE_NAME,
57+
OTEL_EXPORTER_OTLP_ENDPOINT,
58+
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
59+
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
60+
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
61+
):
62+
registry.register(key, getter=lambda k=key: env.getval(k) or None)
63+
return registry

src/splunk_otel/env.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@
5555
SPLUNK_SNAPSHOT_SAMPLING_INTERVAL = "SPLUNK_SNAPSHOT_SAMPLING_INTERVAL"
5656
SPLUNK_SNAPSHOT_SELECTION_PROBABILITY = "SPLUNK_SNAPSHOT_SELECTION_PROBABILITY"
5757
SPLUNK_REALM = "SPLUNK_REALM"
58+
SPLUNK_OPAMP_ENABLED = "SPLUNK_OPAMP_ENABLED"
59+
SPLUNK_OPAMP_ENDPOINT = "SPLUNK_OPAMP_ENDPOINT"
60+
SPLUNK_OPAMP_TOKEN = "SPLUNK_OPAMP_TOKEN" # noqa: S105
61+
SPLUNK_OPAMP_POLLING_INTERVAL = "SPLUNK_OPAMP_POLLING_INTERVAL"
62+
OTEL_SERVICE_NAME = "OTEL_SERVICE_NAME"
63+
OTEL_EXPORTER_OTLP_ENDPOINT = "OTEL_EXPORTER_OTLP_ENDPOINT"
64+
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
65+
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"
66+
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
5867

5968
_pylogger = logging.getLogger(__name__)
6069

src/splunk_otel/opamp/__init__.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# Copyright Splunk Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
17+
from opentelemetry.sdk.resources import (
18+
TELEMETRY_SDK_LANGUAGE,
19+
TELEMETRY_SDK_NAME,
20+
TELEMETRY_SDK_VERSION,
21+
)
22+
from opentelemetry._opamp.agent import OpAMPAgent
23+
from opentelemetry._opamp.client import OpAMPClient
24+
from opentelemetry._opamp.proto import opamp_pb2
25+
26+
from splunk_otel.env import (
27+
Env,
28+
SPLUNK_ACCESS_TOKEN,
29+
SPLUNK_OPAMP_ENABLED,
30+
SPLUNK_OPAMP_ENDPOINT,
31+
SPLUNK_OPAMP_POLLING_INTERVAL,
32+
SPLUNK_OPAMP_TOKEN,
33+
)
34+
from splunk_otel.distro import _DISTRO_NAME
35+
from splunk_otel.opamp.config_registry import ConfigRegistry
36+
37+
logger = logging.getLogger(__name__)
38+
39+
_IDENTIFYING_RESOURCE_KEYS = (
40+
"service.name",
41+
"service.namespace",
42+
"service.instance.id",
43+
"service.version",
44+
)
45+
46+
_NON_IDENTIFYING_RESOURCE_KEYS = (
47+
"os.type",
48+
"os.name",
49+
"os.version",
50+
"host.name",
51+
"host.arch",
52+
"process.pid",
53+
"process.runtime.name",
54+
"process.runtime.version",
55+
# Note: deployment.environment.name may need to move: splunk-otel-java puts this in identifying
56+
"deployment.environment.name",
57+
)
58+
59+
_DEFAULT_POLLING_INTERVAL_MS = 30000
60+
61+
62+
def _start_opamp_if_enabled(resource_attrs, registry: ConfigRegistry, env: Env) -> None:
63+
if not env.is_true(SPLUNK_OPAMP_ENABLED):
64+
logger.debug("OpAMP disabled (SPLUNK_OPAMP_ENABLED not set to true)")
65+
return
66+
67+
endpoint = env.getval(SPLUNK_OPAMP_ENDPOINT)
68+
if not endpoint:
69+
logger.warning("SPLUNK_OPAMP_ENABLED=true but SPLUNK_OPAMP_ENDPOINT is not set; OpAMP disabled")
70+
return
71+
72+
token = env.getval(SPLUNK_OPAMP_TOKEN) or env.getval(SPLUNK_ACCESS_TOKEN)
73+
polling_interval_ms = env.getint(SPLUNK_OPAMP_POLLING_INTERVAL, _DEFAULT_POLLING_INTERVAL_MS)
74+
75+
logger.info("Starting OpAMP client: %s", endpoint)
76+
77+
try:
78+
_start_opamp(endpoint, token, polling_interval_ms, registry, resource_attrs)
79+
80+
except Exception:
81+
logger.exception("Failed to start OpAMP client")
82+
83+
84+
def _start_opamp(endpoint: str, token: str, polling_interval_ms: int, registry: ConfigRegistry, resource_attrs):
85+
headers = {}
86+
if token:
87+
headers["Authorization"] = f"Bearer {token}"
88+
89+
identifying_attrs, non_identifying_attrs = _build_agent_attributes(resource_attrs)
90+
client = OpAMPClient(
91+
endpoint=endpoint,
92+
headers=headers,
93+
agent_identifying_attributes=identifying_attrs,
94+
agent_non_identifying_attributes=non_identifying_attrs,
95+
)
96+
client.update_effective_config(
97+
{"": registry.get_all()},
98+
content_type="application/json",
99+
)
100+
agent = OpAMPAgent(
101+
interval=polling_interval_ms / 1000,
102+
message_handler=_handle_server_message,
103+
client=client,
104+
)
105+
agent.start()
106+
logger.info("OpAMP client started")
107+
108+
109+
def _build_agent_attributes(resource_attrs) -> tuple[dict, dict]:
110+
from splunk_otel.__about__ import __version__ as distro_version
111+
112+
identifying_attrs = {}
113+
for key in _IDENTIFYING_RESOURCE_KEYS:
114+
val = resource_attrs.get(key)
115+
if val is not None:
116+
identifying_attrs[key] = str(val)
117+
118+
identifying_attrs.update(
119+
{
120+
TELEMETRY_SDK_LANGUAGE: "python",
121+
TELEMETRY_SDK_NAME: "opentelemetry",
122+
TELEMETRY_SDK_VERSION: str(resource_attrs.get(TELEMETRY_SDK_VERSION, "unknown")),
123+
"telemetry.distro.name": _DISTRO_NAME,
124+
"telemetry.distro.version": distro_version,
125+
}
126+
)
127+
128+
non_identifying_attrs = {}
129+
for key in _NON_IDENTIFYING_RESOURCE_KEYS:
130+
val = resource_attrs.get(key)
131+
if val is not None:
132+
non_identifying_attrs[key] = str(val)
133+
134+
return identifying_attrs, non_identifying_attrs
135+
136+
137+
def _handle_server_message(
138+
_agent: OpAMPAgent,
139+
_client: OpAMPClient,
140+
message: opamp_pb2.ServerToAgent,
141+
) -> None:
142+
logger.debug("ServerToAgent: flags=%s", message.flags)
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# Copyright Splunk Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Config registry for OpAMP EffectiveConfig reporting (and eventually remote updates).
17+
18+
Feature modules register the keys they own at startup. OpAMP reads all registered
19+
values when building EffectiveConfig, and will call write callbacks for remote
20+
config updates in Phase 2.
21+
22+
Usage:
23+
registry = ConfigRegistry()
24+
25+
# in profile.py
26+
registry.register("SPLUNK_PROFILER_ENABLED", getter=lambda: str(ctx.running))
27+
28+
# in opamp/__init__.py
29+
registry.get_all() # -> {"SPLUNK_PROFILER_ENABLED": "true", ...}
30+
"""
31+
32+
from __future__ import annotations
33+
34+
import logging
35+
from dataclasses import dataclass
36+
from typing import Callable
37+
38+
logger = logging.getLogger(__name__)
39+
40+
41+
@dataclass
42+
class _ConfigEntry:
43+
getter: Callable[[], str | None]
44+
setter: Callable[[str], None] | None = None
45+
46+
47+
class ConfigRegistry:
48+
def __init__(self):
49+
self._entries: dict[str, _ConfigEntry] = {}
50+
51+
def register(
52+
self,
53+
key: str,
54+
*,
55+
getter: Callable[[], str],
56+
setter: Callable[[str], None] | None = None,
57+
) -> None:
58+
"""
59+
Register a config key with read (and optionally write) callbacks.
60+
61+
Args:
62+
key: The config key name (e.g. "SPLUNK_PROFILER_ENABLED")
63+
getter: Returns the current string value of this key.
64+
setter: Applies an updated string value. None means read-only (restart required).
65+
"""
66+
self._entries[key] = _ConfigEntry(getter=getter, setter=setter)
67+
68+
def get_all(self) -> dict[str, str]:
69+
"""Return current values for all registered keys, omitting keys whose getter returns None."""
70+
out = {}
71+
for key, entry in self._entries.items():
72+
try:
73+
val = entry.getter()
74+
if val is not None:
75+
out[key] = val
76+
except Exception: # noqa: BLE001
77+
logger.warning("Failed to read config key %s", key, exc_info=True)
78+
return out
79+
80+
def update(self, updates: dict[str, str]) -> list[str]:
81+
"""
82+
Update registry with a dict of keys to values.
83+
"""
84+
updated_keys = []
85+
for key, value in updates.items():
86+
entry = self._entries.get(key)
87+
if entry is None:
88+
logger.warning("Ignoring unknown config key: %s", key)
89+
continue
90+
if entry.setter is None:
91+
logger.warning("Ignoring read-only config key: %s", key)
92+
continue
93+
try:
94+
entry.setter(value)
95+
updated_keys.append(key)
96+
logger.info("Applied config update: %s=%s", key, value)
97+
except Exception:
98+
logger.exception("Failed to apply config key %s=%s", key, value)
99+
return updated_keys

0 commit comments

Comments
 (0)