Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,15 @@ The `sources` parameter specifies the content to be sent to Dynatrace and can be

The `params` parameter is an object with the following keys (parameters) that can be used to control the behavior of the `DTAGENT_DB.APP.SEND_TELEMETRY` procedure:

| Param Name | Default Value | Description |
|----------------|---------------|-------------|
| `auto_mode` | `true` | If not set to `false`, Dynatrace Snowflake Observability Agent expects that data delivered in the `source` follows Dynatrace Snowflake Observability Agent data structure. |
| `metrics` | `true` | Should we send metrics based on `METRICS` (auto-mode only). |
| `logs` | `true` | `false` will disable sending telemetry as logs. |
| `events` | `$auto_mode` | `false` will disable sending events based on `EVENT_TIMESTAMPS` (auto-mode); otherwise, `true` will enable sending custom objects as events. |
| `biz_events` | `false` | `true` will enable sending custom objects as bizevents. |
| `davis_events` | `false` | `true` will enable sending custom objects as Davis events. |
| Param Name | Default Value | Description |
|----------------|--------------------|-------------|
| `auto_mode` | `true` | If not set to `false`, Dynatrace Snowflake Observability Agent expects that data delivered in the `source` follows Dynatrace Snowflake Observability Agent data structure. |
| `context` | `telemetry_sender` | Name of the context to identify the data source. This way we can differentiate between data delivered via `SEND_TELEMETRY` or even use `F_LAST_PROCESSED_TS()` |
| `metrics` | `true` | Should we send metrics based on `METRICS` (auto-mode only). |
| `logs` | `true` | `false` will disable sending telemetry as logs. |
| `events` | `$auto_mode` | `false` will disable sending events based on `EVENT_TIMESTAMPS` (auto-mode); otherwise, `true` will enable sending custom objects as events. |
| `biz_events` | `false` | `true` will enable sending custom objects as bizevents. |
| `davis_events` | `false` | `true` will enable sending custom objects as Davis events. |

This stored procedure returns a tuple with number of objects sent:

Expand Down Expand Up @@ -319,7 +320,7 @@ call APP.SEND_TELEMETRY(ARRAY_CONSTRUCT(
'value.list', ARRAY_CONSTRUCT(1, 3),
'value.dict', OBJECT_CONSTRUCT('k', false, 'k2', true)
)
)::variant, OBJECT_CONSTRUCT('auto_mode', false, 'events', true, 'bizevents', true));
)::variant, OBJECT_CONSTRUCT('auto_mode', false, 'context', 'example', 'events', true, 'bizevents', true));
```

## Dynatrace Snowflake Observability Agent self-monitoring
Expand Down
2 changes: 1 addition & 1 deletion SEMANTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
| db.​system | The database management system (DBMS) product being used. It is always 'snowflake' | snowflake |
| deployment.​environment | The deployment environment, e.g., production, staging, or development. | PROD |
| deployment.​environment.​tag | Optional tag for the deployment environment in multitenancy mode | SA080 |
| dsoa.​run.​context | The name of the Dynatrace Snowflake Observability Agent plugin (or part of plugin) used to produce the telemetry (logs, traces, metrics, or events). | query_history |
| host.​name | The name of the host. | mysnowflake.us-east-1.snowflakecomputing.com |
| service.​name | The name of the service. | mysnowflake.us-east-1 |
| telemetry.​exporter.​name | The name of the telemetry exporter. It is always 'dynatrace.snowagent' | dynatrace.snowagent |
Expand All @@ -35,7 +36,6 @@

| Identifier | Description | Example |
|------------|-------------|---------|
| dsoa.​run.​context | The name of the Dynatrace Snowflake Observability Agent plugin (or part of plugin) used to produce the telemetry (logs, traces, metrics, or events). | query_history |
| dsoa.​run.​id | Unique ID of each execution of the Dynatrace Snowflake Observability Agent plugin. It can be used to differentiate between telemetry produced between two executions, e.g., to calculate the change in the system. | 4aa7c76c-e98c-4b8b-a5b3-a8a721bbde2d |
| snowflake.​event.​type | Type of (timestamp based) event | snowflake.table.update |

Expand Down
6 changes: 3 additions & 3 deletions src/dtagent.conf/instruments-def.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ dimensions:
telemetry.exporter.version:
__description: "The version of the telemetry exporter."
__example: "0.8.0.17308403933"
dsoa.run.context:
__description: "The name of the Dynatrace Snowflake Observability Agent plugin (or part of plugin) used to produce the telemetry (logs, traces, metrics, or events)."
__example: "query_history"
# * Attribute definitions
# * KEY - will be matched against ATTRIBUTES object with measurements prepared for sending
# * VALUE[description] - description of the attributes - in the future we should rather update semantic dictionary instead of pushing it constantly
# * VALUE[example] - the example of attribute value
attributes:
dsoa.run.context:
__description: "The name of the Dynatrace Snowflake Observability Agent plugin (or part of plugin) used to produce the telemetry (logs, traces, metrics, or events)."
__example: "query_history"
dsoa.run.id:
__description: "Unique ID of each execution of the Dynatrace Snowflake Observability Agent plugin. It can be used to differentiate between telemetry produced between two executions, e.g., to calculate the change in the system."
__example: "4aa7c76c-e98c-4b8b-a5b3-a8a721bbde2d"
Expand Down
4 changes: 2 additions & 2 deletions src/dtagent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from dtagent.otel.events.generic import GenericEvents
from dtagent.otel.events.davis import DavisEvents
from dtagent.otel.events.bizevents import BizEvents
from dtagent.context import get_context_by_name
from dtagent.context import get_context_name_and_run_id
from dtagent.util import get_now_timestamp_formatted, is_regular_mode

##endregion COMPILE_REMOVE
Expand Down Expand Up @@ -153,7 +153,7 @@ def report_execution_status(self, status: str, task_name: str, exec_id: str, det
bizevents_sent = self._biz_events.report_via_api(
query_data=[data_dict | (details_dict or {})],
event_type="dsoa.task",
context=get_context_by_name("self-monitoring"),
context=get_context_name_and_run_id("self-monitoring"),
is_data_structured=False,
)
bizevents_sent += self._biz_events.flush_events()
Expand Down
14 changes: 7 additions & 7 deletions src/dtagent/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,11 @@ def __init__(self, session: snowpark.Session, params: dict):
"""
Initialization for TelemetrySender class.
"""
from dtagent.context import get_context_by_name # COMPILE_REMOVE
from dtagent.context import get_context_name_and_run_id # COMPILE_REMOVE

Plugin.__init__(self, session=session)
AbstractDynatraceSnowAgentConnector.__init__(self, session)

self.__context_name = "telemetry_sender"
self.__context = get_context_by_name(self.__context_name)

self._params = params or {}
# if not turned off we expect that data delivered in source follows Dynatrace Snowflake Observability Agent data structure
self._auto_mode = self._params.get("auto_mode", True)
Expand All @@ -135,6 +132,9 @@ def __init__(self, session: snowpark.Session, params: dict):
# in case of auto-mode disabled we can send the source as bizevents
self._send_biz_events = next((self._params[key] for key in ["biz_events", "bizevents"] if key in self._params), False)

self.__context_name = self._params.get("context", "telemetry_sender")
self.__context = get_context_name_and_run_id(self.__context_name)

def process(self, run_proc: bool = True) -> None:
"""we don't use it but Plugin marks it as abstract"""

Expand Down Expand Up @@ -177,7 +177,7 @@ def send_data(
"""
from dtagent.otel.events import EventType # COMPILE_REMOVE

self.report_execution_status(status="STARTED", task_name="telemetry_sender", exec_id=exec_id)
self.report_execution_status(status="STARTED", task_name=self.__context_name, exec_id=exec_id)

entries_cnt, logs_cnt, metrics_cnt, events_cnt, bizevents_cnt, davis_events_cnt = (0, 0, 0, 0, 0, 0)
if self._auto_mode:
Expand Down Expand Up @@ -223,7 +223,7 @@ def send_data(
except ValueError as e:
from dtagent import LOG # COMPILE_REMOVE

self.report_execution_status(status="FAILED", task_name="telemetry_sender", exec_id=exec_id)
self.report_execution_status(status="FAILED", task_name=self.__context_name, exec_id=exec_id)
LOG.error("Could not send event due to %s", e)

entries_cnt += 1
Expand Down Expand Up @@ -258,7 +258,7 @@ def send_data(
if self._send_davis_events:
davis_events_cnt += self._davis_events.flush_events()

self.report_execution_status(status="FINISHED", task_name="telemetry_sender", exec_id=exec_id)
self.report_execution_status(status="FINISHED", task_name=self.__context_name, exec_id=exec_id)

self._report_execution(
self.__context_name,
Expand Down
7 changes: 6 additions & 1 deletion src/dtagent/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@
RUN_ID_NAME = "dsoa.run.id"


def get_context_by_name(context_name: str, run_id: Optional[str] = None) -> Dict[str, str]:
def get_context_name_and_run_id(context_name: str, run_id: Optional[str] = None) -> Dict[str, str]:
"""Generates the complete context dictionary based on the given name and optional run ID"""
import uuid

return {
CONTEXT_NAME: context_name,
RUN_ID_NAME: run_id or str(uuid.uuid4().hex),
}


def get_context_name(context_name: Optional[str] = None) -> Dict[str, str]:
"""Generates the context dictionary based on the given context name if provided, otherwise returns empty dict"""
return {CONTEXT_NAME: context_name} if context_name else {}
9 changes: 5 additions & 4 deletions src/dtagent/otel/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ def flush_metrics(self) -> bool:
"""
return self._send_metrics()

def report_via_metrics_api(self, query_data: Dict, start_time: str = "START_TIME") -> bool:
def report_via_metrics_api(self, query_data: Dict, start_time: str = "START_TIME", context_name: Optional[str] = None) -> bool:
"""
Generates payload with Metrics v2 API
"""
from dtagent import LOG, LL_TRACE # COMPILE_REMOVE
from dtagent.context import get_context_name # COMPILE_REMOVE
from dtagent.util import _unpack_json_dict, _esc, _check_timestamp_ms, _is_not_blank # COMPILE_REMOVE

local_metrics_def = _unpack_json_dict(query_data, ["_INSTRUMENTS_DEF"])
Expand Down Expand Up @@ -191,7 +192,7 @@ def __payload_lines(dimensions: str, metric_name: str, metric_value: Union[str,

payload_lines = []
# list all dimensions with their values from the provided data
all_dimensions = {**self._resattr_dims, **_unpack_json_dict(query_data, ["DIMENSIONS"])}
all_dimensions = {**self._resattr_dims, **get_context_name(context_name), **_unpack_json_dict(query_data, ["DIMENSIONS"])}
LOG.log(LL_TRACE, "all_dimensions = %r", all_dimensions)

# prepare dimensions for metrics
Expand All @@ -209,13 +210,13 @@ def __payload_lines(dimensions: str, metric_name: str, metric_value: Union[str,

return self._send_metrics(payload)

def discover_report_metrics(self, query_data: Dict, start_time: str = "START_TIME") -> bool:
def discover_report_metrics(self, query_data: Dict, start_time: str = "START_TIME", context_name: Optional[str] = None) -> bool:
"""
Checks if METRICS section is defined in query data, returns false if not
otherwise reports metrics and returns result of report_via_metrics_api
"""
if "METRICS" in query_data:
return self.report_via_metrics_api(query_data, start_time)
return self.report_via_metrics_api(query_data, start_time, context_name=context_name)
return False


Expand Down
12 changes: 6 additions & 6 deletions src/dtagent/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from dtagent.otel.logs import Logs
from dtagent.otel.spans import Spans
from dtagent.otel.metrics import Metrics
from dtagent.context import CONTEXT_NAME, get_context_by_name
from dtagent.context import CONTEXT_NAME, get_context_name_and_run_id

##endregion COMPILE_REMOVE

Expand Down Expand Up @@ -129,7 +129,7 @@ def _get_table_rows(self, t_data: str) -> Generator[Dict, None, None]:
yield row_dict

def _report_execution(self, measurements_source: str, last_timestamp, last_id, entries_count: dict):
__context = get_context_by_name("self_monitoring")
__context = get_context_name_and_run_id("self_monitoring")

# we cannot use last timestamp when sending logs to DT, because when it is set to snowpark.current_timestamp, the value is taken from a snowflake table
# for DT it would look like 'Column[current_timestamp]'
Expand Down Expand Up @@ -192,7 +192,7 @@ def _process_span_rows(
processing_errors: list[str] = []
span_events_added = 0

__context = get_context_by_name(context_name, run_uuid)
__context = get_context_name_and_run_id(context_name, run_uuid)

for row_dict in f_entry_generator():
query_id = row_dict.get(query_id_col_name, None)
Expand Down Expand Up @@ -284,7 +284,7 @@ def _process_row(
row_id = row.get(row_id_col, None)
LOG.log(LL_TRACE, "Processing row with id = %s", row_id)

if not self._metrics.report_via_metrics_api(row):
if not self._metrics.report_via_metrics_api(row, context_name=context.get(CONTEXT_NAME, None)):
processing_errors.append(f"Problem sending row {row_id} as metric")

span_events_added = 0
Expand Down Expand Up @@ -423,7 +423,7 @@ def _log_entries( # pylint: disable=R0913
if f_event_timestamp_payload_prepare is None:
f_event_timestamp_payload_prepare = self.prepare_timestamp_event

__context = get_context_by_name(context_name, run_uuid)
__context = get_context_name_and_run_id(context_name, run_uuid)

self.processed_last_timestamp = None
processed_entries_cnt = 0
Expand All @@ -435,7 +435,7 @@ def _log_entries( # pylint: disable=R0913

for row_dict in f_entry_generator():

if report_metrics and self._metrics.discover_report_metrics(row_dict, start_time):
if report_metrics and self._metrics.discover_report_metrics(row_dict, start_time, context_name):
processed_metrics_cnt += 1

self.processed_last_timestamp = row_dict.get("TIMESTAMP", None)
Expand Down
4 changes: 2 additions & 2 deletions src/dtagent/plugins/query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
_pack_values_to_json_strings,
)
from dtagent.plugins import Plugin
from dtagent.context import get_context_by_name
from dtagent.context import get_context_name_and_run_id

##endregion COMPILE_REMOVE

Expand All @@ -58,7 +58,7 @@ def process(self, run_proc: bool = True) -> Tuple[int, int, int]:
- number of problems
- number of span events created
"""
__context = get_context_by_name("query_history")
__context = get_context_name_and_run_id("query_history")

def __get_query_operator_event_name(operator: Dict) -> str:
"""Returns string with query operator event."""
Expand Down
4 changes: 2 additions & 2 deletions src/dtagent/plugins/resource_monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from snowflake.snowpark.functions import current_timestamp
from dtagent.util import _unpack_json_dict
from dtagent.plugins import Plugin
from dtagent.context import get_context_by_name
from dtagent.context import get_context_name_and_run_id
from dtagent.otel.events import EventType

##endregion COMPILE_REMOVE
Expand Down Expand Up @@ -126,7 +126,7 @@ def process(self, run_proc: bool = True) -> Tuple[int, int, int, int]:
self._logs.send_log(
"There is no ACCOUNT level resource monitor setup",
log_level=logging.ERROR,
context=get_context_by_name(context_name, run_id),
context=get_context_name_and_run_id(context_name, run_id),
)

_, _, processed_wh, _ = self._log_entries(
Expand Down
Loading
Loading