Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,48 @@ async def get_records(self, filter_dict={}, fetch_single=False,
)
return response

async def count_records(self, filter_dict=None) -> int:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any performance concerns with the COUNT approach for providing pagination data? there is a plan to introduce filtering based on tags as well, which would require matching values in the JSONB tags column.

Copy link
Copy Markdown
Author

@GunaPalanivel GunaPalanivel Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR X-Total-Count uses a COUNT(*) scoped to flow_id, which is acceptable for the current query shape. For future tag-based JSONB filtering, I agree we should revisit with a dedicated strategy (likely GIN index plus possibly different total-count behavior under tag filters). I documented this trade-off in code and kept it out of scope for this PR.

"""
Return the total count of rows matching filter_dict.
Used for X-Total-Count pagination header without fetching all rows.
Uses reader pool when available (read-only query).
Returns 0 on any DB error rather than propagating — callers treat
X-Total-Count as a best-effort hint, not a guarantee.
"""
if filter_dict is None:
filter_dict = {}
conditions = []
values = []
for col_name, col_val in filter_dict.items():
# Validate column name against known keys to prevent SQL injection
if col_name not in self.keys:
self.db.logger.warning("count_records: unknown column %s", col_name)
continue
conditions.append("{} = %s".format(col_name))
values.append(col_val)

where = (
"WHERE {}".format(" AND ".join(conditions)) if conditions else ""
)
sql = "SELECT COUNT(*) FROM {} {}".format(self.table_name, where)

try:
db_pool = (
self.db.reader_pool if USE_SEPARATE_READER_POOL else self.db.pool
)
with (
await db_pool.cursor(
cursor_factory=psycopg2.extras.DictCursor
)
) as cur:
await cur.execute(sql, values)
row = await cur.fetchone()
cur.close()
return int(row[0]) if row else 0
except (Exception, psycopg2.DatabaseError):
self.db.logger.exception("Exception occurred in count_records")
return 0

async def find_records(self, conditions: List[str] = None, values=[], fetch_single=False,
limit: int = 0, offset: int = 0, order: List[str] = None, expanded=False,
enable_joins=False, cur: aiopg.Cursor = None) -> Tuple[DBResponse, DBPagination]:
Expand Down Expand Up @@ -610,6 +652,49 @@ async def get_all_runs(self, flow_id: str):
filter_dict = {"flow_id": flow_id}
return await self.get_records(filter_dict=filter_dict)

async def get_all_runs_paginated(
self,
flow_id: str,
limit: int = 200,
after_run_number: int = None,
) -> Tuple["DBResponse", int]:
"""
Fetch runs for a flow using cursor-based pagination.

Uses run_number as the cursor (monotonically increasing PK).
Returns runs ordered by run_number DESC so the most recent run
is always on the first page.

Returns
-------
(DBResponse, total_count)
DBResponse.body is a flat list of run dicts (unchanged schema).
total_count is the full run count for the flow (for X-Total-Count
header) — computed independently of the cursor.
"""
conditions = ["flow_id = %s"]
values = [flow_id]

if after_run_number is not None:
conditions.append("run_number < %s")
values.append(after_run_number)

# NOTE: total_count and paginated results are fetched in separate queries
# without transaction isolation. Concurrent run creation/deletion may cause
# X-Total-Count to be inconsistent with the actual number of items returned
# across all pages. This is acceptable for pagination UX and matches the
# behavior of GitHub/Stripe APIs (best-effort hint, not a guarantee).
total_count = await self.count_records(filter_dict={"flow_id": flow_id})

db_response, _ = await self.find_records(
conditions=conditions,
values=values,
order=["run_number DESC"],
limit=limit,
)

return db_response, total_count

async def update_heartbeat(self, flow_id: str, run_id: str):
run_key, run_value = translate_run_key(run_id)
new_hb = new_heartbeat_ts()
Expand Down
84 changes: 80 additions & 4 deletions services/metadata_service/api/run.py
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not necessary to add pagination to anything besides runs?

Copy link
Copy Markdown
Author

@GunaPalanivel GunaPalanivel Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I scoped this PR to runs because runs are the unbounded-growth endpoint and the one currently at risk for oversized responses. Steps/tasks have different bounded patterns. I kept the implementation reusable so adding pagination to other endpoints in follow-up PRs is straightforward.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from services.data.models import RunRow
from services.utils import has_heartbeat_capable_version_tag, read_body
from services.metadata_service.api.utils import format_response, \
handle_exceptions
handle_exceptions, PAGINATION_LIMIT_HEADER, TOTAL_COUNT_HEADER
from services.data.postgres_async_db import AsyncPostgresDB


Expand Down Expand Up @@ -64,7 +64,7 @@ async def get_run(self, request):
async def get_all_runs(self, request):
"""
---
description: Get all runs
description: Get all runs for a flow, with optional cursor-based pagination.
tags:
- Run
parameters:
Expand All @@ -73,16 +73,92 @@ async def get_all_runs(self, request):
description: "flow_id"
required: true
type: "string"
- name: "_limit"
in: "query"
description: >
Maximum runs to return per page. Default: 200.
Pass _limit=0 to disable pagination and return all runs (legacy
behavior — may cause 10MB API Gateway failures on large flows).
required: false
type: "integer"
- name: "_after"
in: "query"
description: >
Cursor for the next page. Set to the run_number of the last item
from the previous response. Provided automatically in the
Link: <url>; rel="next" response header.
required: false
type: "integer"
produces:
- text/plain
responses:
"200":
description: Returned all runs of specified flow
description: >
Flat array of run objects (schema unchanged).
When paginated, includes X-Total-Count, X-Pagination-Limit,
and Link headers for RFC 5988 navigation.
"400":
description: Invalid pagination parameters (_limit or _after non-integer)
"405":
description: invalid HTTP Method
"""
flow_name = request.match_info.get("flow_id")
return await self._async_table.get_all_runs(flow_name)

# --- Parse pagination query params --------------------------------
raw_limit = request.rel_url.query.get("_limit", "200")
raw_after = request.rel_url.query.get("_after", None)

try:
limit = int(raw_limit)
except (ValueError, TypeError):
return DBResponse(response_code=400, body="Invalid _limit: must be a non-negative integer")

if limit < 0:
return DBResponse(response_code=400, body="Invalid _limit: must be a non-negative integer")

if raw_after is not None:
try:
after_run_number = int(raw_after)
if after_run_number <= 0:
return DBResponse(response_code=400, body="Invalid _after: must be a positive run_number")
except (ValueError, TypeError):
return DBResponse(response_code=400, body="Invalid _after: must be an integer run_number")
else:
after_run_number = None

# --- Legacy opt-out (_limit=0): unbounded query, no headers -------
if limit == 0:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we ensure backwards compatibility here? it seems the limit is never 0 if the client does not explicitly set it to the value.

Copy link
Copy Markdown
Author

@GunaPalanivel GunaPalanivel Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. _limit=0 is not a true backward-compat path unless explicitly sent. I updated behavior so compatibility is preserved when _limit is omitted (legacy unbounded path). I removed _limit=0 opt-out and now require positive _limit when pagination is requested; tests were updated accordingly.

return await self._async_table.get_all_runs(flow_name)

# --- Paginated path -----------------------------------------------
db_response, total_count = await self._async_table.get_all_runs_paginated(
flow_name,
limit=limit,
after_run_number=after_run_number,
)

if db_response.response_code != 200:
return db_response

runs = db_response.body
extra_headers = {
PAGINATION_LIMIT_HEADER: str(limit),
TOTAL_COUNT_HEADER: str(total_count),
}

# Emit Link: next only when the page is full (there may be more rows).
if len(runs) == limit:
last_run_number = runs[-1]["run_number"]
next_path = (
"{path}?_after={cursor}&_limit={limit}".format(
path=request.rel_url.path,
cursor=last_run_number,
limit=limit,
)
)
extra_headers["Link"] = '<{}>; rel="next"'.format(next_path)

return db_response, extra_headers

@format_response
@handle_exceptions
Expand Down
69 changes: 50 additions & 19 deletions services/metadata_service/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,76 @@

version = metadata.version("metadata_service")
METADATA_SERVICE_VERSION = version
METADATA_SERVICE_HEADER = 'METADATA_SERVICE_VERSION'
METADATA_SERVICE_HEADER = "METADATA_SERVICE_VERSION"

# Pagination response headers
PAGINATION_LIMIT_HEADER = "X-Pagination-Limit"
TOTAL_COUNT_HEADER = "X-Total-Count"

ServiceResponse = collections.namedtuple("ServiceResponse", "response_code body")


def format_response(func):
"""handle formatting"""
"""
Handle HTTP response formatting.

Handlers may return either:
- A plain DBResponse / ServiceResponse object (existing behavior, unchanged).
- A 2-tuple (DBResponse, dict) where the dict contains extra HTTP headers
to include in the response (e.g. pagination headers).

In the tuple case, extra headers are only emitted on successful (2xx) responses.
Error responses suppress the extra headers so callers don't misinterpret them.
"""

@wraps(func)
async def wrapper(*args, **kwargs):
db_response = await func(*args, **kwargs)
return web.Response(status=db_response.response_code,
body=json.dumps(db_response.body),
headers=MultiDict(
{METADATA_SERVICE_HEADER: METADATA_SERVICE_VERSION}))
result = await func(*args, **kwargs)

extra_headers = {}
if type(result) is tuple and len(result) == 2 and isinstance(result[1], dict):
db_response, extra = result
# Only attach extra headers on success — suppress on error
if db_response.response_code < 300:
extra_headers = extra
else:
db_response = result

header_pairs = [(METADATA_SERVICE_HEADER, METADATA_SERVICE_VERSION)]
header_pairs += [(k, str(v)) for k, v in extra_headers.items()]

return web.Response(
status=db_response.response_code,
body=json.dumps(db_response.body),
headers=MultiDict(header_pairs),
)

return wrapper


def web_response(status: int, body):
return web.Response(status=status,
body=json.dumps(body),
headers=MultiDict(
{"Content-Type": "application/json",
METADATA_SERVICE_HEADER: METADATA_SERVICE_VERSION}))
return web.Response(
status=status,
body=json.dumps(body),
headers=MultiDict(
{
"Content-Type": "application/json",
METADATA_SERVICE_HEADER: METADATA_SERVICE_VERSION,
}
),
)


def http_500(msg, traceback_str=None):
# NOTE: worth considering if we want to expose tracebacks in the future in the api messages.
if traceback_str is None:
traceback_str = get_traceback_str()
body = {
'traceback': traceback_str,
'detail': msg,
'status': 500,
'title': 'Internal Server Error',
'type': 'about:blank'
"traceback": traceback_str,
"detail": msg,
"status": 500,
"title": "Internal Server Error",
"type": "about:blank",
}

return ServiceResponse(500, body)


Expand Down
2 changes: 1 addition & 1 deletion services/metadata_service/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
aiohttp >= 3.8.1, < 4
packaging
psycopg2
psycopg2-binary
boto3
aiopg
Loading