Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 64 additions & 14 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import math
import re
import time
from contextvars import ContextVar
from services.utils import logging, DBType
from typing import List, Tuple

Expand Down Expand Up @@ -37,12 +38,42 @@
DB_SCHEMA_NAME = os.environ.get("DB_SCHEMA_NAME", "public")

operator_match = re.compile('([^:]*):([=><]+)$')
Comment thread
Youssef-SH marked this conversation as resolved.
active_db_tracer = ContextVar("active_db_tracer", default=None)

# use a ddmmyyy timestamp as the version for triggers
TRIGGER_VERSION = "05092024"
TRIGGER_NAME_PREFIX = "notify_ui"


# Tracing is scoped via ContextVar, ensuring metrics are isolated per request context
class DBQueryTracer:
def __init__(self):
self._queries = []
self._tokens = []
Comment thread
Youssef-SH marked this conversation as resolved.

def __enter__(self):
self._tokens.append(active_db_tracer.set(self))
return self

def __exit__(self, exc_type, exc_value, traceback):
active_db_tracer.reset(self._tokens.pop())

def _record(self, execution_time, row_count):
self._queries.append(
{"execution_time": execution_time, "row_count": row_count}
)

def summary(self):
query_count = len(self._queries)
total_rows = sum(query["row_count"] for query in self._queries)
total_time = sum(query["execution_time"] for query in self._queries)
return {
"query_count": query_count,
"total_time": total_time,
"total_rows": total_rows,
}


class _AsyncPostgresDB(object):
connection = None
flow_table_postgres = None
Expand Down Expand Up @@ -249,27 +280,46 @@ async def execute_sql(self, select_sql: str, values=[], fetch_single=False,
expanded=False, limit: int = 0, offset: int = 0,
Comment thread
Youssef-SH marked this conversation as resolved.
Comment thread
Youssef-SH marked this conversation as resolved.
cur: aiopg.Cursor = None, serialize: bool = True) -> Tuple[DBResponse, DBPagination]:
async def _execute_on_cursor(_cur):
await _cur.execute(select_sql, values)

rows = []
records = await _cur.fetchall()
if serialize:
for record in records:
# pylint-initial-ignore: Lack of __init__ makes this too hard for pylint
# pylint: disable=not-callable
row = self._row_type(**record)
rows.append(row.serialize(expanded))
else:
rows = records
tracer = active_db_tracer.get()
if tracer:
start_time = time.perf_counter()

count = len(rows)
try:
await _cur.execute(select_sql, values)

rows = []
records = await _cur.fetchall()
row_count = len(records)
if serialize:
for record in records:
# pylint-initial-ignore: Lack of __init__ makes this too hard for pylint
# pylint: disable=not-callable
row = self._row_type(**record)
rows.append(row.serialize(expanded))
else:
rows = records
except Exception:
if tracer:
elapsed = time.perf_counter() - start_time
tracer._record(
execution_time=elapsed,
row_count=0,
)
raise

if tracer:
elapsed = time.perf_counter() - start_time
tracer._record(
execution_time=elapsed,
row_count=row_count,
)

# Will raise IndexError in case fetch_single=True and there's no results
body = rows[0] if fetch_single else rows
pagination = DBPagination(
limit=limit,
offset=offset,
count=count,
count=row_count,
page=math.floor(int(offset) / max(int(limit), 1)) + 1,
)
return body, pagination
Expand Down