Skip to content

Commit 0a8100a

Browse files
feat(executors): capture metrics for all executors
1 parent c7a4c4d commit 0a8100a

5 files changed

Lines changed: 63 additions & 33 deletions

File tree

libs/core/garf/core/api_clients.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,14 @@
2828
import pydantic
2929
import requests
3030
import smart_open
31-
from garf.core import exceptions, query_editor
31+
from garf.core import exceptions, query_editor, telemetry
3232
from garf.core.telemetry import tracer
3333
from opentelemetry import metrics, trace
3434
from typing_extensions import TypeAlias, override
3535

3636
ApiRowElement: TypeAlias = Union[int, float, str, bool, list, dict, None]
3737
ApiResponseRow: TypeAlias = dict[str, ApiRowElement]
3838

39-
meter = metrics.get_meter('garf.core')
40-
41-
api_counter = meter.create_counter(
42-
'garf_api_call_total',
43-
unit='1',
44-
description='Counts number of requests to API',
45-
)
46-
4739

4840
class GarfApiResponse(pydantic.BaseModel):
4941
"""Base class for specifying response."""
@@ -71,12 +63,12 @@ class GarfApiError(exceptions.GarfError):
7163
ipaddress.ip_network(n)
7264
for n in (
7365
'169.254.0.0/16', # link-local — AWS IMDS & GCE metadata endpoint
74-
'127.0.0.0/8', # IPv4 loopback
75-
'10.0.0.0/8', # RFC-1918 private
76-
'172.16.0.0/12', # RFC-1918 private
66+
'127.0.0.0/8', # IPv4 loopback
67+
'10.0.0.0/8', # RFC-1918 private
68+
'172.16.0.0/12', # RFC-1918 private
7769
'192.168.0.0/16', # RFC-1918 private
78-
'::1/128', # IPv6 loopback
79-
'fe80::/10', # IPv6 link-local
70+
'::1/128', # IPv6 loopback
71+
'fe80::/10', # IPv6 link-local
8072
)
8173
]
8274

@@ -126,7 +118,7 @@ def call_api(
126118
span = trace.get_current_span()
127119
response = self.get_response(request, **kwargs)
128120
span.set_attribute('num_rows_api_response', len(response.results))
129-
api_counter.add(1, {'api.client.class': self.__class__.__name__})
121+
telemetry.api_counter.add(1, {'api.client.class': self.__class__.__name__})
130122
return response
131123

132124
@abc.abstractmethod
@@ -283,4 +275,4 @@ def _field_converter(field: str):
283275
return int(field)
284276
with contextlib.suppress(ValueError):
285277
return float(field)
286-
return field
278+
return field

libs/core/garf/core/telemetry.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,20 @@
1313
# limitations under the License.
1414

1515
# pylint: disable=C0330, g-bad-import-order, g-multiple-import
16-
from opentelemetry import trace
16+
from opentelemetry import metrics, trace
1717

1818
tracer = trace.get_tracer(
1919
instrumenting_module_name='garf.core',
2020
)
21+
meter = metrics.get_meter('garf.core')
22+
23+
api_counter = meter.create_counter(
24+
'garf_api_call_total',
25+
unit='1',
26+
description='Counts number of requests to API',
27+
)
28+
29+
30+
cache_size_meter = meter.create_gauge(
31+
'garf_cache_size_bytes', unit='By', description='Size of garf cache in bytes'
32+
)

libs/executors/garf/executors/api_executor.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,9 @@
3333
)
3434
from garf.executors.telemetry import tracer
3535
from garf.io.writers import abs_writer
36-
from opentelemetry import metrics, trace
36+
from opentelemetry import trace
3737

3838
logger = logging.getLogger(__name__)
39-
meter = metrics.get_meter('garf.executors')
40-
41-
api_counter = meter.create_counter(
42-
'garf_api_execute_total',
43-
unit='1',
44-
description='Counts number of API executions',
45-
)
4639

4740

4841
class ApiQueryExecutor(executor.Executor):
@@ -121,16 +114,12 @@ def _execute(
121114
logger.debug('starting query %s', query)
122115
title = pathlib.Path(title).name.split('.')[0]
123116
try:
124-
results = self.fetcher.fetch(
117+
return self.fetcher.fetch(
125118
query_specification=query,
126119
args=context.query_parameters,
127120
title=title,
128121
**context.fetcher_parameters,
129122
)
130-
api_counter.add(
131-
1, {'api.client.class': self.fetcher.api_client.__class__.__name__}
132-
)
133-
return results
134123
except Exception as e:
135124
logger.error('%s generated an exception: %s', title, str(e))
136125
raise exceptions.GarfExecutorError(

libs/executors/garf/executors/executor.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
import asyncio
1818
import inspect
1919
import logging
20+
import time
2021
from typing import Optional
2122

2223
from garf.core import query_editor, report, report_fetcher
23-
from garf.executors import execution_context, query_processor
24+
from garf.executors import execution_context, query_processor, telemetry
2425
from garf.executors.telemetry import tracer
2526
from garf.io.writers import abs_writer
2627
from opentelemetry import trace
@@ -58,7 +59,9 @@ def execute(
5859
Returns:
5960
Report with data if query returns some data otherwise empty Report.
6061
"""
62+
start_time = time.perf_counter()
6163
span = trace.get_current_span()
64+
executor_attributes = {'executor.class': self.__class__.__name__}
6265
query_spec = (
6366
query_editor.QuerySpecification(
6467
text=query, title=title, args=context.query_parameters
@@ -76,14 +79,25 @@ def execute(
7679
if self.preprocessors:
7780
_handle_processors(processors=self.preprocessors, context=context)
7881
results = self._execute(query=query_text, title=title, context=context)
82+
if hasattr(self, 'fetcher'):
83+
fetcher_attributes = {
84+
'api.client.class': self.fetcher.api_client.__class__.__name__
85+
}
86+
executor_attributes.update(fetcher_attributes)
87+
telemetry.executor_counter.add(1, executor_attributes)
7988
if (results or results.results_placeholder) and (
8089
self.writers or context.writer
8190
):
8291
writer_clients = self.writers or context.writer_clients
83-
return write_many(writer_clients, results, title)
92+
write_outputs = write_many(writer_clients, results, title)
93+
duration = time.perf_counter() - start_time
94+
telemetry.executor_histogram.record(duration, executor_attributes)
95+
return write_outputs
8496
span.set_attribute('execute.num_results', len(results))
8597
if self.postprocessors:
8698
_handle_processors(processors=self.postprocessors, context=context)
99+
duration = time.perf_counter() - start_time
100+
telemetry.executor_histogram.record(duration, executor_attributes)
87101
return results
88102

89103
def _execute(
@@ -207,12 +221,17 @@ def write_many(
207221
) -> Optional[str]:
208222
writing_results = []
209223
for writer_client in writer_clients:
224+
start_time = time.perf_counter()
210225
logger.debug(
211226
'Start writing data for query %s via %s writer',
212227
title,
213228
type(writer_client),
214229
)
215230
writing_result = writer_client.write(results, title)
231+
duration = time.perf_counter() - start_time
232+
telemetry.write_histogram.record(
233+
duration, {'writer_class': writer_client.__class__.__name__}
234+
)
216235
logger.debug(
217236
'Finish writing data for query %s via %s writer',
218237
title,

libs/executors/garf/executors/telemetry.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,26 @@
1313
# limitations under the License.
1414

1515
# pylint: disable=C0330, g-bad-import-order, g-multiple-import
16-
from opentelemetry import trace
16+
from opentelemetry import metrics, trace
1717

1818
tracer = trace.get_tracer(
1919
instrumenting_module_name='garf.executors',
2020
)
21+
meter = metrics.get_meter('garf.executors')
22+
23+
executor_counter = meter.create_counter(
24+
'garf_execute_total',
25+
unit='1',
26+
description='Counts number of executor invocations',
27+
)
28+
29+
executor_histogram = meter.create_histogram(
30+
'garf_execute_duration_seconds',
31+
unit='s',
32+
description='Measures execution duration in seconds',
33+
)
34+
write_histogram = meter.create_histogram(
35+
'garf_write_duration_seconds',
36+
unit='s',
37+
description='Measures report writes duration in seconds',
38+
)

0 commit comments

Comments
 (0)