Skip to content

Commit cf53105

Browse files
Merge pull request #784 from amcmahon-rh/max_items
Reapply mirror release version alias writes and ensure batch size is correct [RHELDST-28333]
2 parents 074f6a9 + f953146 commit cf53105

File tree

5 files changed

+520
-33
lines changed

5 files changed

+520
-33
lines changed

exodus_gw/aws/dynamodb.py

+73-32
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
import functools
12
import gzip
23
import json
34
import logging
45
from datetime import datetime
5-
from itertools import islice
66
from threading import Lock
77
from typing import Any
88

@@ -25,12 +25,14 @@ def __init__(
2525
from_date: str,
2626
env_obj: Environment | None = None,
2727
deadline: datetime | None = None,
28+
mirror_writes: bool = False,
2829
):
2930
self.env = env
3031
self.settings = settings
3132
self.from_date = from_date
3233
self.env_obj = env_obj or get_environment(env)
3334
self.deadline = deadline
35+
self.mirror_writes = mirror_writes
3436
self.client = DynamoDBClientWrapper(self.env_obj.aws_profile).client
3537
self._lock = Lock()
3638
self._definitions = None
@@ -208,6 +210,31 @@ def query_definitions(self) -> dict[str, Any]:
208210
out = json.loads(item_json)
209211
return out
210212

213+
@functools.lru_cache(maxsize=2500)
214+
def uris_for_item(self, item) -> list[str]:
215+
"""Returns all URIs to be written for the given item.
216+
217+
In practice, always returns either one or two URIs depending on
218+
configured aliases and other settings, though the caller should
219+
assume any number of URIs.
220+
"""
221+
222+
# Resolve aliases. We only write to the deepest path
223+
# after all alias resolution, hence only using the
224+
# first result from uri_alias.
225+
uris = [uri_alias(item.web_uri, self.aliases_for_write)[0]]
226+
227+
# We only want to mirror writes for release ver aliases. Recalculating
228+
# the aliases completely is a bit inefficient, but I'd rather not
229+
# duplicate any alias logic.
230+
if (
231+
self.mirror_writes
232+
and uri_alias(item.web_uri, self._aliases(["releasever_alias"]))[0]
233+
!= item.web_uri
234+
):
235+
uris.append(item.web_uri)
236+
return uris
237+
211238
def create_request(
212239
self,
213240
items: list[models.Item],
@@ -216,8 +243,6 @@ def create_request(
216243
"""Create the dictionary structure expected by batch_write_item."""
217244
table_name = self.env_obj.table
218245
request: dict[str, list[Any]] = {table_name: []}
219-
uri_aliases = self.aliases_for_write
220-
221246
for item in items:
222247
# Items carry their own from_date. This effectively resolves
223248
# conflicts in the case of two publishes updating the same web_uri
@@ -226,35 +251,32 @@ def create_request(
226251
# updated timestamp.
227252
from_date = str(item.updated)
228253

229-
# Resolve aliases. We only write to the deepest path
230-
# after all alias resolution, hence only using the
231-
# first result from uri_alias.
232-
web_uri = uri_alias(item.web_uri, uri_aliases)[0]
233-
234-
if delete:
235-
request[table_name].append(
236-
{
237-
"DeleteRequest": {
238-
"Key": {
239-
"from_date": {"S": from_date},
240-
"web_uri": {"S": web_uri},
254+
for web_uri in self.uris_for_item(item):
255+
if delete:
256+
request[table_name].append(
257+
{
258+
"DeleteRequest": {
259+
"Key": {
260+
"from_date": {"S": from_date},
261+
"web_uri": {"S": web_uri},
262+
}
241263
}
242264
}
243-
}
244-
)
245-
else:
246-
request[table_name].append(
247-
{
248-
"PutRequest": {
249-
"Item": {
250-
"from_date": {"S": from_date},
251-
"web_uri": {"S": web_uri},
252-
"object_key": {"S": item.object_key},
253-
"content_type": {"S": item.content_type},
265+
)
266+
else:
267+
request[table_name].append(
268+
{
269+
"PutRequest": {
270+
"Item": {
271+
"from_date": {"S": from_date},
272+
"web_uri": {"S": web_uri},
273+
"object_key": {"S": item.object_key},
274+
"content_type": {"S": item.content_type},
275+
}
254276
}
255277
}
256-
}
257-
)
278+
)
279+
258280
return request
259281

260282
def create_config_request(self, config):
@@ -332,11 +354,30 @@ def _batch_write(req):
332354
return _batch_write(request)
333355

334356
def get_batches(self, items: list[models.Item]):
335-
"""Divide the publish items into batches of size 'write_batch_size'."""
357+
"""
358+
Divide the publish items into batches of size 'write_batch_size'.
359+
360+
Due to mirroring, an item might have multiple write requests. We need
361+
to account for this when splitting items into batches. We memoize the
362+
results of uri_for_item to avoid recalculating aliases.
363+
"""
336364
it = iter(items)
337-
batches = list(
338-
iter(lambda: tuple(islice(it, self.settings.write_batch_size)), ())
339-
)
365+
batches: list[list[models.Item]] = []
366+
current_batch: list[models.Item] = []
367+
current_batch_size = 0
368+
for item in it:
369+
item_weight = len(self.uris_for_item(item))
370+
if (
371+
current_batch_size + item_weight
372+
> self.settings.write_batch_size
373+
):
374+
batches.append(current_batch)
375+
current_batch = []
376+
current_batch_size = 0
377+
current_batch.append(item)
378+
current_batch_size += item_weight
379+
if current_batch:
380+
batches.append(current_batch)
340381
return batches
341382

342383
def write_batch(self, items: list[models.Item], delete: bool = False):

exodus_gw/settings.py

+4
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,10 @@ class Settings(BaseSettings):
475475
s3_pool_size: int = 3
476476
"""Number of S3 clients to cache"""
477477

478+
mirror_writes_enabled: bool = True
479+
"""Whether both the original url and releasever alias are written during
480+
phase 1 commits."""
481+
478482
model_config = SettingsConfigDict(env_prefix="exodus_gw_")
479483

480484

exodus_gw/worker/publish.py

+9
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,14 @@ def dynamodb(self):
186186
self.from_date,
187187
self.env_obj,
188188
self.task.deadline,
189+
self.should_mirror_writes,
189190
)
190191
return self._dynamodb
191192

193+
@property
194+
def should_mirror_writes(self):
195+
return False
196+
192197
@property
193198
def task_ready(self) -> bool:
194199
task = self.task
@@ -446,6 +451,10 @@ class CommitPhase1(CommitBase):
446451
# phase1 commit is allowed to proceed in either of these states.
447452
PUBLISH_STATES = [PublishStates.committing, PublishStates.pending]
448453

454+
@property
455+
def should_mirror_writes(self):
456+
return self.settings.mirror_writes_enabled
457+
449458
@property
450459
def item_select(self):
451460
# Query for items to be handled by phase1 commit.

0 commit comments

Comments
 (0)