From fff76e0a8557fe4822b942928b32617575b17dbf Mon Sep 17 00:00:00 2001 From: Youssef-SH Date: Tue, 24 Mar 2026 01:37:05 +0100 Subject: [PATCH 1/2] feat: add opt-in DBQueryTracer for DB query tracing execute_sql() is the single path for all database access. This adds an opt-in tracer that records query count, rows returned, and execution time. Tracing is context-local via ContextVar, ensuring isolation per async request and avoiding global state. No overhead when tracing is disabled. --- services/data/postgres_async_db.py | 78 ++++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 14 deletions(-) diff --git a/services/data/postgres_async_db.py b/services/data/postgres_async_db.py index dbde13e60..42e08f326 100644 --- a/services/data/postgres_async_db.py +++ b/services/data/postgres_async_db.py @@ -7,6 +7,7 @@ import math import re import time +from contextvars import ContextVar from services.utils import logging, DBType from typing import List, Tuple @@ -37,12 +38,42 @@ DB_SCHEMA_NAME = os.environ.get("DB_SCHEMA_NAME", "public") operator_match = re.compile('([^:]*):([=><]+)$') +active_db_tracer = ContextVar("active_db_tracer", default=None) # use a ddmmyyy timestamp as the version for triggers TRIGGER_VERSION = "05092024" TRIGGER_NAME_PREFIX = "notify_ui" +# Tracing is scoped via ContextVar, ensuring metrics are isolated per request context +class DBQueryTracer: + def __init__(self): + self._queries = [] + self._tokens = [] + + def __enter__(self): + self._tokens.append(active_db_tracer.set(self)) + return self + + def __exit__(self, exc_type, exc_value, traceback): + active_db_tracer.reset(self._tokens.pop()) + + def _record(self, execution_time, row_count): + self._queries.append( + {"execution_time": execution_time, "row_count": row_count} + ) + + def summary(self): + query_count = len(self._queries) + total_rows = sum(query["row_count"] for query in self._queries) + total_time = sum(query["execution_time"] for query in self._queries) + return { + "query_count": query_count, + "total_time": total_time, + "total_rows": total_rows, + } + + class _AsyncPostgresDB(object): connection = None flow_table_postgres = None @@ -249,27 +280,46 @@ async def execute_sql(self, select_sql: str, values=[], fetch_single=False, expanded=False, limit: int = 0, offset: int = 0, cur: aiopg.Cursor = None, serialize: bool = True) -> Tuple[DBResponse, DBPagination]: async def _execute_on_cursor(_cur): - await _cur.execute(select_sql, values) - - rows = [] - records = await _cur.fetchall() - if serialize: - for record in records: - # pylint-initial-ignore: Lack of __init__ makes this too hard for pylint - # pylint: disable=not-callable - row = self._row_type(**record) - rows.append(row.serialize(expanded)) - else: - rows = records + tracer = active_db_tracer.get() + if tracer: + start_time = time.perf_counter() - count = len(rows) + try: + await _cur.execute(select_sql, values) + + rows = [] + records = await _cur.fetchall() + row_count = len(records) + if serialize: + for record in records: + # pylint-initial-ignore: Lack of __init__ makes this too hard for pylint + # pylint: disable=not-callable + row = self._row_type(**record) + rows.append(row.serialize(expanded)) + else: + rows = records + except Exception: + if tracer: + elapsed = time.perf_counter() - start_time + tracer._record( + execution_time=elapsed, + row_count=0, + ) + raise + + if tracer: + elapsed = time.perf_counter() - start_time + tracer._record( + execution_time=elapsed, + row_count=row_count, + ) # Will raise IndexError in case fetch_single=True and there's no results body = rows[0] if fetch_single else rows pagination = DBPagination( limit=limit, offset=offset, - count=count, + count=row_count, page=math.floor(int(offset) / max(int(limit), 1)) + 1, ) return body, pagination From f188f3d5a42a0f027bfb51d19662261d66112418 Mon Sep 17 00:00:00 2001 From: Youssef-SH Date: Thu, 2 Apr 2026 16:11:02 +0100 Subject: [PATCH 2/2] fix: guard ContextVar reset in DBQueryTracer --- services/data/postgres_async_db.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/data/postgres_async_db.py b/services/data/postgres_async_db.py index 42e08f326..eaf8a5fe4 100644 --- a/services/data/postgres_async_db.py +++ b/services/data/postgres_async_db.py @@ -56,7 +56,8 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): - active_db_tracer.reset(self._tokens.pop()) + if self._tokens: + active_db_tracer.reset(self._tokens.pop()) def _record(self, execution_time, row_count): self._queries.append(