Skip to content

Commit 93be3c3

Browse files
valayDavesaikonen
andauthored
[card-cache-service] optimize caching (#417)
* [card-cache-service] optimize caching --------- Co-authored-by: Sakari Ikonen <[email protected]>
1 parent 4c93e14 commit 93be3c3

File tree

10 files changed

+790
-287
lines changed

10 files changed

+790
-287
lines changed

Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ ENV BUILD_TIMESTAMP=$BUILD_TIMESTAMP
1010
ENV BUILD_COMMIT_HASH=$BUILD_COMMIT_HASH
1111

1212
ARG UI_ENABLED="1"
13-
ARG UI_VERSION="v1.3.10"
13+
ARG UI_VERSION="v1.3.11"
1414
ENV UI_ENABLED=$UI_ENABLED
1515
ENV UI_VERSION=$UI_VERSION
1616

Dockerfile.ui_service

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
FROM python:3.11.7-bookworm
22

33
ARG UI_ENABLED="1"
4-
ARG UI_VERSION="v1.3.10"
4+
ARG UI_VERSION="v1.3.11"
55
ENV UI_ENABLED=$UI_ENABLED
66
ENV UI_VERSION=$UI_VERSION
77

services/ui_backend_service/api/card.py

+64-65
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,17 @@
1010
DBPagination,
1111
DBResponse,
1212
)
13+
from services.ui_backend_service.data.cache.card_cache_manager import wait_until_card_is_ready, CARD_API_HTML_WAIT_TIME
14+
from services.ui_backend_service.data.cache.card_cache_manager import list_cards as list_cards_from_cache
15+
import time
1316
from aiohttp import web
17+
import asyncio
1418

1519

1620
class CardsApi(object):
1721
def __init__(self, app, db, cache=None):
1822
self.db = db
19-
self.cache = getattr(cache, "artifact_cache", None)
20-
23+
self.cache = getattr(cache, "card_cache", None)
2124
app.router.add_route(
2225
"GET",
2326
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards",
@@ -91,8 +94,7 @@ async def get_cards_list_for_task(self, request):
9194
if not task:
9295
return web_response(404, {"data": []})
9396

94-
invalidate_cache = query_param_enabled(request, "invalidate")
95-
cards = await get_cards_for_task(self.cache, task, invalidate_cache)
97+
cards = await get_card_list(self.cache, task, max_wait_time=1)
9698

9799
if cards is None:
98100
# Handle edge: Cache failed to return anything, even errors.
@@ -104,7 +106,6 @@ async def get_cards_list_for_task(self, request):
104106
{"id": data["id"], "hash": hash, "type": data["type"]}
105107
for hash, data in cards.items()
106108
]
107-
108109
# paginate list of cards
109110
limit, page, offset = get_pagination_params(request)
110111
_pages = max(len(card_hashes) // limit, 1)
@@ -141,31 +142,31 @@ async def get_card_content_by_hash(self, request):
141142
schema:
142143
$ref: '#/definitions/ResponsesError405'
143144
"""
144-
145145
hash = request.match_info.get("hash")
146146
task = await self.get_task_by_request(request)
147147
if not task:
148148
return web.Response(
149149
content_type="text/html", status=404, body="Task not found."
150150
)
151-
invalidate_cache = query_param_enabled(request, "invalidate")
152-
153-
cards = await get_cards_for_task(
154-
self.cache, task, invalidate_cache=invalidate_cache
151+
cards = await get_card_html_for_task_async(
152+
self.cache,
153+
task,
154+
hash,
155155
)
156-
157156
if cards is None:
158157
return web.Response(
159158
content_type="text/html",
160159
status=404,
161-
body="Card not found for task. Possibly still being processed.",
160+
body="Card not found for task. Possibly still being processed. Please refresh page to check again.",
162161
)
163162

164163
if cards and hash in cards:
165164
return web.Response(content_type="text/html", body=cards[hash]["html"])
166165
else:
167166
return web.Response(
168-
content_type="text/html", status=404, body="Card not found for task."
167+
content_type="text/html",
168+
status=404,
169+
body="Card not found for task.",
169170
)
170171

171172
@handle_exceptions
@@ -194,16 +195,16 @@ async def get_card_data_by_hash(self, request):
194195
schema:
195196
$ref: '#/definitions/ResponsesError405'
196197
"""
197-
198198
_hash = request.match_info.get("hash")
199199
task = await self.get_task_by_request(request)
200200
if not task:
201201
return web.Response(
202202
content_type="text/html", status=404, body="Task not found."
203203
)
204-
invalidate_cache = query_param_enabled(request, "invalidate")
205-
data = await get_card_data_for_task(
206-
self.cache, task, _hash, invalidate_cache=invalidate_cache
204+
data = await get_card_data_for_task_async(
205+
self.cache,
206+
task,
207+
_hash,
207208
)
208209

209210
if data is None:
@@ -212,8 +213,21 @@ async def get_card_data_by_hash(self, request):
212213
return web_response(200, data)
213214

214215

215-
async def get_card_data_for_task(
216-
cache_client, task, card_hash, invalidate_cache=False
216+
def _card_data_from_cache(local_cache):
217+
data = local_cache.read_data()
218+
if data is None:
219+
return None
220+
return {
221+
"data": data["data"],
222+
"id": local_cache.card_id,
223+
"type": local_cache.card_type,
224+
}
225+
226+
227+
async def get_card_html_for_task_async(
228+
cache_client,
229+
task,
230+
card_hash,
217231
) -> Optional[Dict[str, Dict]]:
218232
"""
219233
Return the card-data from the cache, or nothing.
@@ -232,43 +246,27 @@ async def get_card_data_for_task(
232246
step_name=task.get("step_name"),
233247
task_id=task.get("task_name") or task.get("task_id"),
234248
)
235-
pathspec_with_hash = "{pathspec}/{card_hash}".format(
236-
pathspec=pathspec, card_hash=card_hash
249+
_local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash)
250+
_html = await wait_until_card_is_ready(
251+
cache_client.cache_manager, _local_cache, max_wait_time=CARD_API_HTML_WAIT_TIME
237252
)
238-
res = await cache_client.cache.GetCardData(pathspec, card_hash, invalidate_cache)
239-
240-
if res.has_pending_request():
241-
async for event in res.stream():
242-
if event["type"] == "error":
243-
# raise error, there was an exception during fetching.
244-
raise CardException(event["message"], event["id"], event["traceback"])
245-
await res.wait() # wait until results are ready
246-
data = res.get()
247-
if data and pathspec_with_hash in data:
248-
success, value, detail, trace = unpack_processed_value(data[pathspec_with_hash])
249-
if success:
250-
return value
251-
else:
252-
if value in ["card-not-present", "cannot-fetch-card"]:
253-
return None
254-
raise CardException(detail, value, trace)
255-
return None
253+
return _html
256254

257255

258-
async def get_cards_for_task(
259-
cache_client, task, invalidate_cache=False
256+
async def get_card_data_for_task_async(
257+
cache_client,
258+
task,
259+
card_hash,
260260
) -> Optional[Dict[str, Dict]]:
261261
"""
262-
Return a dictionary of cards from the cache, or nothing.
262+
Return the card-data from the cache, or nothing.
263263
264264
Example:
265265
--------
266266
{
267-
"abc123": {
268-
"id": 1,
269-
"hash": "abc123",
270-
"html": "htmlcontent"
271-
}
267+
"id": 1,
268+
"hash": "abc123",
269+
"data": {}
272270
}
273271
"""
274272
pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format(
@@ -277,25 +275,26 @@ async def get_cards_for_task(
277275
step_name=task.get("step_name"),
278276
task_id=task.get("task_name") or task.get("task_id"),
279277
)
280-
res = await cache_client.cache.GetCards([pathspec], invalidate_cache)
281-
282-
if res.has_pending_request():
283-
async for event in res.stream():
284-
if event["type"] == "error":
285-
# raise error, there was an exception during fetching.
286-
raise CardException(event["message"], event["id"], event["traceback"])
287-
await res.wait() # wait until results are ready
288-
data = res.get()
289-
if data and pathspec in data:
290-
success, value, detail, trace = unpack_processed_value(data[pathspec])
291-
if success:
292-
return value
293-
else:
294-
if value in ["card-not-present", "cannot-fetch-card"]:
295-
return None
296-
raise CardException(detail, value, trace)
278+
await cache_client.cache_manager.register(pathspec)
279+
_local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash)
280+
if not _local_cache.read_ready():
281+
# Since this is a data update call we can return a 404 and the client
282+
# should handle calling back so we only await at registration.
283+
return None
284+
285+
return _card_data_from_cache(_local_cache)
286+
297287

298-
return None
288+
async def get_card_list(
289+
cache_client, task, max_wait_time=3
290+
):
291+
pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format(
292+
flow_id=task.get("flow_id"),
293+
run_id=task.get("run_id") or task.get("run_number"),
294+
step_name=task.get("step_name"),
295+
task_id=task.get("task_name") or task.get("task_id"),
296+
)
297+
return await list_cards_from_cache(cache_client.cache_manager, pathspec, max_wait_time)
299298

300299

301300
def get_pagination_params(request):

0 commit comments

Comments
 (0)