Skip to content

Commit 1bd5d30

Browse files
committed
Add --hints argument to ci-storage cli tool
Hints will allow "load --slot-id=*" to choose a slot which is better than just the most recent one. E.g. we can use a SHA of package-lock.json as a hint.
1 parent c5ebf21 commit 1bd5d30

6 files changed

+229
-96
lines changed

ci-storage

Lines changed: 164 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ def main():
9090
required=True,
9191
help="local directory path",
9292
)
93+
parser.add_argument(
94+
"--hint",
95+
type=str,
96+
default=[],
97+
action="append",
98+
help='optional hints of the CI run to let slot-id="*" specifier find the best slot in the storage to load from; the leftmost matching hints have higher priority',
99+
)
93100
parser.add_argument(
94101
"--exclude",
95102
type=str,
@@ -124,6 +131,7 @@ def main():
124131
)
125132
slot_ids: list[str] = " ".join(args.slot_id).split()
126133
local_dir: str = re.sub(r"/+$", "", args.local_dir)
134+
hints: list[str] = " ".join(args.hint).split()
127135
exclude: list[str] = [
128136
line for line in "\n".join(args.exclude).splitlines() if line.strip()
129137
]
@@ -151,6 +159,7 @@ def main():
151159
storage_max_age_sec=storage_max_age_sec,
152160
slot_id=slot_ids[0],
153161
local_dir=local_dir,
162+
hints=hints,
154163
exclude=exclude,
155164
layer=layer,
156165
verbose=verbose,
@@ -160,15 +169,16 @@ def main():
160169
storage_dir=storage_dir,
161170
storage_max_age_sec=storage_max_age_sec,
162171
)
163-
if action == "load":
172+
elif action == "load":
164173
if not slot_ids:
165-
parser.error(f"for {action} action, at one or many --slot-id is required")
174+
parser.error(f"for {action} action, one or many --slot-id is required")
166175
action_load(
167176
storage_host=storage_host,
168177
storage_dir=storage_dir,
169178
storage_max_age_sec=storage_max_age_sec,
170179
slot_ids=slot_ids,
171180
local_dir=local_dir,
181+
hints=hints,
172182
exclude=exclude,
173183
layer=layer,
174184
verbose=verbose,
@@ -185,8 +195,10 @@ def main():
185195
# enough" slot from the storage:
186196
# - If we are loading a layer, then we try to use the slot id which "best
187197
# matches" the full (non-layer) snapshot loaded in the past.
188-
# - If we are loading a full snapshot, then just use the most recent slot in the
189-
# storage.
198+
# - If we are loading a full snapshot, then first try to use the slot which has
199+
# the largest number of the common hints with the passed hints list (leftmost
200+
# hints elements have higher priority). In worst case, just use the most
201+
# recent slot in the storage.
190202
#
191203
def action_load(
192204
*,
@@ -195,81 +207,61 @@ def action_load(
195207
storage_max_age_sec: int,
196208
slot_ids: list[str],
197209
local_dir: str,
210+
hints: list[str],
198211
exclude: list[str],
199212
layer: list[str],
200213
verbose: bool,
201214
):
202215
os.makedirs(local_dir, exist_ok=True)
203-
meta = SlotMeta.read_from(local_dir=local_dir)
204-
full_snapshot_history = meta.full_snapshot_history
205216

206217
slot_infos = list_slots(
207218
storage_host=storage_host,
208219
storage_dir=storage_dir,
209220
storage_max_age_sec=storage_max_age_sec,
210221
)
211222

212-
in_storage = "layer storage" if layer else "storage"
213-
slot_id = ""
223+
storage = "layer storage" if layer else "storage"
224+
slot_id: str | None = None
214225
for id in map(normalize_slot_id, slot_ids):
215226
prefix = f'Checking slot-id="{id}"...'
216227
if id == "*":
217228
if not slot_infos:
218229
if layer:
219-
print(
220-
f"{prefix} {in_storage} has no slots, so exiting with a no-op"
221-
)
230+
print(f"{prefix} {storage} has no slots, so exiting with a no-op")
222231
else:
223-
print(
224-
f"{prefix} {in_storage} has no slots, so cleaning f{local_dir}"
225-
)
232+
print(f"{prefix} {storage} has no slots, so cleaning f{local_dir}")
226233
action_clean(
227234
local_dir=local_dir,
228235
exclude=exclude,
229236
verbose=verbose,
230237
)
231238
return
232239
elif not layer:
233-
slot_id = list(slot_infos.keys())[0]
234-
print(
235-
f'{prefix} loading the most recent full (non-layer) slot-id="{slot_id}"'
240+
slot_id = infer_best_slot_to_load_full_from(
241+
prefix=prefix,
242+
slot_infos=list(slot_infos.values()),
243+
hints=hints,
236244
)
237245
break
238246
elif layer:
239-
if full_snapshot_history:
240-
print(
241-
f"{prefix} prioritizing layer slots mentioned in the past full snapshot loading history..."
242-
)
243-
for id in full_snapshot_history:
244-
prefix = f'Checking slot-id="{id}" from history...'
245-
if id in slot_infos:
246-
slot_id = id
247-
print(f"{prefix} found in the {in_storage}, using it")
248-
break
249-
else:
250-
print(f"{prefix} not found in the {in_storage}")
251-
if not slot_id:
252-
slot_id = list(slot_infos.keys())[0]
253-
print(
254-
f'No slots from past full snapshot loading history were found in the {in_storage}, so using just the most recent slot-id="{slot_id}"'
255-
)
256-
break
257-
else:
258-
slot_id = list(slot_infos.keys())[0]
259-
print(
260-
f'{prefix} no past loading history, so using just the most recent layer slot-id="{slot_id}"'
261-
)
262-
break
247+
slot_id = infer_best_slot_to_load_layer_from(
248+
prefix=prefix,
249+
slot_infos=list(slot_infos.values()),
250+
full_snapshot_history=SlotMeta.read_from(
251+
local_dir=local_dir
252+
).full_snapshot_history,
253+
)
254+
break
263255
elif id in slot_infos:
264256
slot_id = id
265-
print(f"{prefix} found in the {in_storage}, using it")
257+
print(f"{prefix} found in the {storage}, using it")
266258
break
267259
else:
268-
print(f"{prefix} not found in the {in_storage}")
260+
print(f"{prefix} not found in the {storage}")
269261

270262
if not slot_id:
271263
raise UserException(
272-
f"none of the provided slot id(s) were found in the {in_storage}, aborting"
264+
f"none of the provided slot id(s) were found in the {storage}, aborting"
273265
)
274266

275267
host, port = parse_host_port(storage_host)
@@ -296,42 +288,6 @@ def action_load(
296288
slot_info.meta.write_to(local_dir=local_dir)
297289

298290

299-
#
300-
# Removes everything in local_dir. We use rsync and not rm to keep the excludes
301-
# intact and compatible with the "load" action.
302-
#
303-
def action_clean(
304-
*,
305-
local_dir: str,
306-
exclude: list[str],
307-
verbose: bool,
308-
):
309-
empty_dir = f"{TEMP_DIR}/{EMPTY_DIR}.{normalize_slot_id(local_dir)}"
310-
os.makedirs(empty_dir, exist_ok=True)
311-
try:
312-
check_call(
313-
cmd=[
314-
"rsync",
315-
*build_rsync_args(
316-
host=None,
317-
port=None,
318-
action="load",
319-
exclude=exclude,
320-
layer=[],
321-
verbose=verbose,
322-
),
323-
f"{empty_dir}/",
324-
f"{local_dir}/",
325-
],
326-
print_elapsed=True,
327-
)
328-
finally:
329-
try:
330-
os.rmdir(empty_dir)
331-
except Exception:
332-
pass
333-
334-
335291
#
336292
# Stores the content of the local directory in the storage with the provided
337293
# slot id on a remote host.
@@ -343,13 +299,14 @@ def action_store(
343299
storage_max_age_sec: int,
344300
slot_id: str,
345301
local_dir: str,
302+
hints: list[str],
346303
exclude: list[str],
347304
layer: list[str],
348305
verbose: bool,
349306
):
350307
slot_id = normalize_slot_id(slot_id)
351308
if slot_id == "*":
352-
raise UserException(f'slot_id="{slot_id}" is not allowed for "store" action')
309+
raise UserException(f'slot-id="{slot_id}" is not allowed for "store" action')
353310

354311
meta = None
355312
slot_id_we_used_to_load_from = None
@@ -393,6 +350,7 @@ def action_store(
393350

394351
if meta:
395352
meta.full_snapshot_history.insert(0, slot_id)
353+
meta.hints = hints
396354
meta.write_to(local_dir=local_dir)
397355

398356
print(
@@ -411,6 +369,42 @@ def action_store(
411369
)
412370

413371

372+
#
373+
# Removes everything in local_dir. We use rsync and not rm to keep the excludes
374+
# intact and compatible with the "load" action.
375+
#
376+
def action_clean(
377+
*,
378+
local_dir: str,
379+
exclude: list[str],
380+
verbose: bool,
381+
):
382+
empty_dir = f"{TEMP_DIR}/{EMPTY_DIR}.{normalize_slot_id(local_dir)}"
383+
os.makedirs(empty_dir, exist_ok=True)
384+
try:
385+
check_call(
386+
cmd=[
387+
"rsync",
388+
*build_rsync_args(
389+
host=None,
390+
port=None,
391+
action="load",
392+
exclude=exclude,
393+
layer=[],
394+
verbose=verbose,
395+
),
396+
f"{empty_dir}/",
397+
f"{local_dir}/",
398+
],
399+
print_elapsed=True,
400+
)
401+
finally:
402+
try:
403+
os.rmdir(empty_dir)
404+
except Exception:
405+
pass
406+
407+
414408
#
415409
# Runs the maintenance script for the storage.
416410
#
@@ -431,6 +425,89 @@ def action_maintenance(
431425
)
432426

433427

428+
#
429+
# Given the list of slots in the storage, returns the one which we want the load
430+
# action with slot-id="*" to match.
431+
#
432+
def infer_best_slot_to_load_full_from(
433+
*,
434+
prefix: str,
435+
slot_infos: list[SlotInfo],
436+
hints: list[str],
437+
) -> str:
438+
if not hints:
439+
id = slot_infos[0].id
440+
print(f'{prefix} loading the most recent full (non-layer) slot-id="{id}"')
441+
return id
442+
443+
print(f"{prefix} prioritizing slots matching hints...")
444+
weights: list[tuple[int, int, str]] = []
445+
for slot_info in slot_infos:
446+
weight = ""
447+
matched_hints: list[str] = []
448+
for hint in hints:
449+
if hint in slot_info.meta.hints:
450+
weight += "1"
451+
matched_hints.append(hint)
452+
else:
453+
weight += "0"
454+
if matched_hints:
455+
print(
456+
f'Checking slot-id="{slot_info.id}" from the storage... weight: {weight}, matched hints: {", ".join(matched_hints)}, age: {slot_info.age_sec} sec'
457+
)
458+
weights.append((int(weight), -1 * slot_info.age_sec, slot_info.id))
459+
weights.sort(reverse=True)
460+
if weights:
461+
id = weights[0][2]
462+
print(f'Winner: slot-id="{id}"; loading it, since it has the highest weight')
463+
return id
464+
else:
465+
id = slot_infos[0].id
466+
print(
467+
f'No slots matching hints, so loading the most recent full (non-layer) slot-id="{id}"'
468+
)
469+
return id
470+
471+
472+
#
473+
# Given the list of slots in the storage, returns the one which we want the
474+
# layer load action with slot-id="*" to match.
475+
#
476+
def infer_best_slot_to_load_layer_from(
477+
*,
478+
prefix: str,
479+
slot_infos: list[SlotInfo],
480+
full_snapshot_history: list[str],
481+
) -> str:
482+
if not full_snapshot_history:
483+
id = slot_infos[0].id
484+
print(
485+
f'{prefix} no past loading history, so using just the most recent layer slot-id="{id}"'
486+
)
487+
return id
488+
489+
print(
490+
f"{prefix} prioritizing layer slots mentioned in the past full snapshot loading history..."
491+
)
492+
slot_info_ids = set(slot_info.id for slot_info in slot_infos)
493+
for id in full_snapshot_history:
494+
if id in slot_info_ids:
495+
print(
496+
f'Checking slot-id="{id}" from history... found in the layer storage, using it'
497+
)
498+
return id
499+
else:
500+
print(
501+
f'Checking slot-id="{id}" from history... not found in the layer storage'
502+
)
503+
504+
id = slot_infos[0].id
505+
print(
506+
f'No slots from past full snapshot loading history were found in the layer storage, so using just the most recent slot-id="{id}"'
507+
)
508+
return id
509+
510+
434511
#
435512
# Returns the list of existing slot ids and their ages in seconds, sorted by age
436513
# (i.e. most recently created slots on top of the list). Also, as a side effect,
@@ -700,10 +777,14 @@ class SlotMeta:
700777
# Each time a dir is loaded from some slot (using "*" or by a concrete slot
701778
# id), we prepend slot id to this property.
702779
full_snapshot_history: list[str] = dataclasses.field(default_factory=list[str])
780+
# Hints related to the content of the slot (e.g. commit hash, SHA of
781+
# package-lock.json or any other large-content defining file etc.).
782+
hints: list[str] = dataclasses.field(default_factory=list[str])
703783

704784
def serialize(self) -> str:
705785
serialized = ""
706786
serialized += f"full_snapshot_history={' '.join(unique(self.full_snapshot_history)[0:MAX_FULL_SNAPSHOT_HISTORY])}\n"
787+
serialized += f"hints={' '.join(unique(self.hints))}\n"
707788
return serialized
708789

709790
@staticmethod
@@ -717,6 +798,8 @@ class SlotMeta:
717798
value: str = match.group(2).strip()
718799
if key == "full_snapshot_history":
719800
self.full_snapshot_history = unique(value.split())
801+
elif key == "hints":
802+
self.hints = unique(value.split())
720803
return self
721804

722805
def write_to(self, *, local_dir: str) -> None:

tests/0005-history-is-updated-after-store.test.sh

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)