Skip to content

Commit 762d195

Browse files
feat(executors): add workflow specific metrics to otel
* track workflow execution count and duration * track each step of workflow execution count and duration * export garf build info as a metric * add support for simulation in server endpoint
1 parent 481de96 commit 762d195

5 files changed

Lines changed: 115 additions & 11 deletions

File tree

libs/executors/garf/executors/entrypoints/server.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import typer
2929
import uvicorn
3030
import yaml
31+
from garf.executors import telemetry
3132
from garf.executors.entrypoints import tasks, utils
3233
from garf.executors.entrypoints.tracer import (
3334
initialize_logger,
@@ -158,6 +159,7 @@ def execute_workflow(
158159
config_path: str | pathlib.Path = None,
159160
selected_aliases: Optional[list[str]] = None,
160161
skipped_aliases: Optional[list[str]] = None,
162+
simulate: bool = False,
161163
) -> list[str]:
162164
"""Runs garf workflow till completion."""
163165
try:
@@ -170,6 +172,7 @@ def execute_workflow(
170172
execution_workflow=execution_workflow.model_dump(),
171173
selected_aliases=selected_aliases,
172174
skipped_aliases=skipped_aliases,
175+
simulate=simulate,
173176
)
174177

175178

@@ -252,7 +255,7 @@ def _init_workflow(
252255
else:
253256
raise workflow.GarfWorkflowError('Neither workflow path nor file provided')
254257
return workflow.Workflow(
255-
steps=workflow_data.get('steps'),
258+
**workflow_data,
256259
execution_config=config_data,
257260
)
258261

@@ -266,6 +269,14 @@ def main(
266269
int, typer.Option('--port', '-p', help='Port to start the server')
267270
] = 8000,
268271
):
272+
telemetry.executor_info.set(
273+
1,
274+
{
275+
'version_executors': garf.executors.__version__,
276+
'version_core': garf.core.__version__,
277+
'version_io': garf.io.__version__,
278+
},
279+
)
269280
uvicorn.run(app, host=host, port=port, log_config=None)
270281

271282

libs/executors/garf/executors/entrypoints/tasks.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,16 @@ def execute_workflow(
129129
execution_workflow: workflow.Workflow,
130130
selected_aliases: list[str],
131131
skipped_aliases: list[str],
132+
simulate: bool,
132133
):
133134
"""Executes a batch of queries."""
134135
return workflow_runner.WorkflowRunner(
135136
execution_workflow=execution_workflow
136-
).run(selected_aliases=selected_aliases, skipped_aliases=skipped_aliases)
137+
).run(
138+
selected_aliases=selected_aliases,
139+
skipped_aliases=skipped_aliases,
140+
simulate=simulate,
141+
)
137142

138143

139144
@app.task(pydantic=True)

libs/executors/garf/executors/executor.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
from typing import Optional
2222

2323
from garf.core import query_editor, report, report_fetcher
24-
from garf.executors import execution_context, query_processor, telemetry
24+
from garf.executors import (
25+
exceptions,
26+
execution_context,
27+
query_processor,
28+
telemetry,
29+
)
2530
from garf.executors.telemetry import tracer
2631
from garf.io.writers import abs_writer
2732
from opentelemetry import trace
@@ -78,7 +83,11 @@ def execute(
7883
context = query_processor.process_gquery(context)
7984
if self.preprocessors:
8085
_handle_processors(processors=self.preprocessors, context=context)
81-
results = self._execute(query=query_text, title=title, context=context)
86+
try:
87+
results = self._execute(query=query_text, title=title, context=context)
88+
except exceptions.GarfExecutorError as e:
89+
telemetry.executor_error_counter.add(1, executor_attributes)
90+
raise e
8291
if hasattr(self, 'fetcher'):
8392
fetcher_attributes = {
8493
'api.client.class': self.fetcher.api_client.__class__.__name__

libs/executors/garf/executors/telemetry.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,24 @@
2020
)
2121
meter = metrics.get_meter('garf.executors')
2222

23+
executor_info = meter.create_gauge(
24+
'garf_info',
25+
unit='',
26+
description='Build info of garf executor',
27+
)
28+
2329
executor_counter = meter.create_counter(
2430
'garf_execute_total',
2531
unit='1',
2632
description='Counts number of executor invocations',
2733
)
2834

35+
executor_error_counter = meter.create_counter(
36+
'garf_execute_errors_total',
37+
unit='1',
38+
description='Counts number of executor failures',
39+
)
40+
2941
executor_histogram = meter.create_histogram(
3042
'garf_execute_duration_seconds',
3143
unit='s',
@@ -36,3 +48,39 @@
3648
unit='s',
3749
description='Measures report writes duration in seconds',
3850
)
51+
52+
workflow_counter = meter.create_counter(
53+
'garf_workflow_run_total',
54+
unit='1',
55+
description='Counts number of workflow runs',
56+
)
57+
58+
workflow_histogram = meter.create_histogram(
59+
'garf_workflow_run_duration_seconds',
60+
unit='s',
61+
description='Measures workflow run duration in seconds',
62+
)
63+
64+
workflow_error_counter = meter.create_counter(
65+
'garf_workflow_run_errors_total',
66+
unit='1',
67+
description='Counts number of workflow failures',
68+
)
69+
70+
workflow_step_counter = meter.create_counter(
71+
'garf_workflow_step_run_total',
72+
unit='1',
73+
description='Counts number of runs of workflow step',
74+
)
75+
76+
workflow_step_histogram = meter.create_histogram(
77+
'garf_workflow_step_run_duration_seconds',
78+
unit='s',
79+
description='Measures run duration in seconds of workflow step',
80+
)
81+
82+
workflow_step_error_counter = meter.create_counter(
83+
'garf_workflow_step_run_errors_total',
84+
unit='1',
85+
description='Counts number of workflow step failures',
86+
)

libs/executors/garf/executors/workflows/workflow_runner.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
import logging
1919
import pathlib
20+
import time
2021
from typing import Final
2122

2223
import yaml
23-
from garf.executors import exceptions, setup
24+
from garf.executors import exceptions, setup, telemetry
2425
from garf.executors.telemetry import tracer
2526
from garf.executors.workflows import workflow
27+
from opentelemetry import trace
2628

2729
logger = logging.getLogger(__name__)
2830

@@ -65,6 +67,7 @@ def from_file(
6567
execution_workflow=execution_workflow, wf_parent=workflow_file.parent
6668
)
6769

70+
@tracer.start_as_current_span('workflow.run')
6871
def run(
6972
self,
7073
enable_cache: bool = False,
@@ -73,13 +76,23 @@ def run(
7376
skipped_aliases: list[str] | None = None,
7477
simulate: bool = False,
7578
) -> list[str]:
79+
span = trace.get_current_span()
80+
start_time = time.perf_counter()
81+
workflow_attributes = {}
82+
if name := self.workflow.name:
83+
workflow_attributes.update({'workflow.name': name})
84+
if version := self.workflow.metadata.version:
85+
workflow_attributes.update({'workflow.version': version})
86+
if workflow_attributes:
87+
span.set_attributes(workflow_attributes)
7688
self.workflow.compile()
7789
skipped_aliases = skipped_aliases or []
7890
selected_aliases = selected_aliases or []
7991
execution_results = []
8092
logger.info('Starting Garf Workflow...')
8193
for i, step in enumerate(self.workflow.steps, 1):
8294
step_name = f'{i}-{step.fetcher}'
95+
8396
if step.alias:
8497
step_name = f'{step_name}-{step.alias}'
8598
if step.alias in skipped_aliases:
@@ -98,6 +111,10 @@ def run(
98111
step.alias,
99112
)
100113
continue
114+
workflow_step_attributes = {
115+
**workflow_attributes,
116+
'workflow.step.name': step_name,
117+
}
101118
with tracer.start_as_current_span(step_name):
102119
logger.info(
103120
'Running step %d, fetcher: %s, alias: %s', i, step.fetcher, step.alias
@@ -123,13 +140,27 @@ def run(
123140
batch[q.title] = q.text
124141
else:
125142
batch[query.title] = query.text
126-
query_executor.execute_batch(
127-
batch,
128-
step.context,
129-
step.parallel_threshold or self.parallel_threshold,
130-
)
131-
execution_results.append(step_name)
143+
try:
144+
step_start_time = time.perf_counter()
145+
query_executor.execute_batch(
146+
batch,
147+
step.context,
148+
step.parallel_threshold or self.parallel_threshold,
149+
)
150+
execution_results.append(step_name)
151+
telemetry.workflow_step_counter.add(1, workflow_step_attributes)
152+
step_duration = time.perf_counter() - step_start_time
153+
telemetry.workflow_step_histogram.record(
154+
step_duration, workflow_step_attributes
155+
)
156+
except exceptions.GarfExecutorError as e:
157+
telemetry.workflow_step_error_counter.add(1, workflow_step_attributes)
158+
telemetry.workflow_error_counter.add(1, workflow_attributes)
159+
raise e
132160
logger.info('Garf Workflow completed.')
161+
telemetry.workflow_counter.add(1, workflow_attributes)
162+
duration = time.perf_counter() - start_time
163+
telemetry.workflow_histogram.record(duration, workflow_attributes)
133164
return execution_results
134165

135166
def compile(self, path: str | pathlib.Path) -> str:

0 commit comments

Comments
 (0)