Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion src/lando/api/legacy/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import re
import subprocess
from abc import ABC, abstractmethod
from time import sleep
from time import monotonic, sleep
from typing import Callable, TypeVar

from celery import Task
Expand All @@ -31,6 +31,7 @@
from lando.main.scm.exceptions import (
NoDiffStartLine,
PatchConflict,
SCMException,
SCMInternalServerError,
)

Expand Down Expand Up @@ -91,6 +92,8 @@ def __init__(
if not self.treestatus_client.ping():
raise ConnectionError("Could not connect to Treestatus")

self.last_maintenance_at: dict[int, float] = {}

self.refresh_active_repos()

if with_ssh:
Expand Down Expand Up @@ -214,6 +217,7 @@ def loop(self):
job = self.job_type.next_job(repositories=self.active_repos).first()

if job is None:
self.run_idle_maintenance()
self.throttle(self.worker_instance.sleep_seconds)
return

Expand Down Expand Up @@ -284,6 +288,40 @@ def refresh_active_repos(self):
]
logger.info(f"{len(self.active_repos)} enabled repos: {self.active_repos}")

def run_idle_maintenance(self):
"""Call `scm.maintenance` on each enabled repo, throttled per repo.

Called when no job is available. Each repo is maintained at most once
per `worker_instance.maintenance_interval_seconds` to avoid unnecessary
cleanup.
"""
now = monotonic()
Comment thread
cgsheeh marked this conversation as resolved.
Outdated
repos_to_maintain = [
repo
for repo in self.enabled_repos
if now - self.last_maintenance_at.get(repo.id, float("-inf"))
>= self.worker_instance.maintenance_interval_seconds
]
if not repos_to_maintain:
return

count = len(repos_to_maintain)
repo_names = [repo.name for repo in repos_to_maintain]
logger.info(f"Starting idle maintenance for {count} repo(s): {repo_names}")
start_time = monotonic()
for repo in repos_to_maintain:
try:
repo.scm.maintenance()
except SCMException:
logger.exception(f"Idle maintenance failed for {repo.name}.")
# Update on success or failure so a broken repo doesn't get hammered
# every idle loop.
self.last_maintenance_at[repo.id] = monotonic()
logger.info(
f"Finished idle maintenance for {count} repo(s) "
f"in {monotonic() - start_time:.2f}s"
)

def update_repo(
self, repo: Repo, job: BaseJob, scm: AbstractSCM, target_cset: str | None
) -> str:
Expand Down
4 changes: 2 additions & 2 deletions src/lando/api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def set_repo_config(config):


@pytest.fixture
def hg_landing_worker(landing_worker_instance):
def hg_landing_worker(landing_worker_instance, treestatusdouble):
worker = landing_worker_instance(
name="test-hg-worker",
scm=SCMType.HG,
Expand All @@ -224,7 +224,7 @@ def hg_landing_worker(landing_worker_instance):


@pytest.fixture
def git_landing_worker(landing_worker_instance):
def git_landing_worker(landing_worker_instance, treestatusdouble):
worker = landing_worker_instance(
name="test-git-worker",
scm=SCMType.GIT,
Expand Down
39 changes: 17 additions & 22 deletions src/lando/api/tests/test_hg.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_integrated_hgrepo_clean_repo(hg_clone):
repo = HgSCM(hg_clone.strpath)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we rename this variable to scm while we're at it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repo = HgSCM() is the convention in this file, oddly enough. I updated this instance, but we should fix the others in a follow-up. :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It pre-dates the SCM split (;


with repo.for_pull(), hg_clone.as_cwd():
# Create a draft commits to clean.
# Create a draft commit to clean.
new_file = hg_clone.join("new-file.txt")
new_file.write("text", mode="w+")
repo.run_hg_cmds(
Expand All @@ -43,37 +43,32 @@ def test_integrated_hgrepo_clean_repo(hg_clone):
new_file.write("Extra data", mode="a")
assert repo.run_hg_cmds([["status"]])

# Can clean working directory without nuking commits
repo.clean_repo(strip_non_public_commits=False)
# `clean_repo` clears the working directory but leaves drafts in place;
# stripping is `maintenance`'s job.
repo.clean_repo()
assert repo.run_hg_cmds([["outgoing"]])
assert not repo.run_hg_cmds([["status"]])

# Dirty the working directory again.
new_file.write("Extra data", mode="a")
# Dirty the directory again before exiting the context manager.
new_file.write("extra data", mode="a")
assert repo.run_hg_cmds([["status"]])

# Cleaning should remove commit and clean working directory.
repo.clean_repo()
with pytest.raises(HgCommandError, match="no changes found"):
repo.run_hg_cmds([["outgoing"]])
assert not repo.run_hg_cmds([["status"]])
with repo.for_pull(), hg_clone.as_cwd():
Comment thread
cgsheeh marked this conversation as resolved.
Outdated
assert not repo.run_hg_cmds(
[["status"]]
), "Working directory should be clean after exiting and re-entering the context."
assert repo.run_hg_cmds(
[["outgoing"]]
), "Draft commits should persist across context exits; `maintenance` strips them."

# Create a commit and dirty the directory before exiting
# the context manager as entering a new context should
# provide a clean repo.
new_file.write("text", mode="w+")
repo.run_hg_cmds(
[["add", new_file.strpath], ["commit", "-m", "new draft commit"]]
)
new_file.write("extra data", mode="a")
assert repo.run_hg_cmds([["outgoing"]])
assert repo.run_hg_cmds([["status"]])
repo.maintenance()

with repo.for_pull(), hg_clone.as_cwd():
# New context should be clean.
with pytest.raises(HgCommandError, match="no changes found"):
repo.run_hg_cmds([["outgoing"]])
assert not repo.run_hg_cmds([["status"]])
assert not repo.run_hg_cmds(
[["status"]]
), "Working directory should be clean after `maintenance` runs."


def test_integrated_hgrepo_can_log(hg_clone):
Expand Down
71 changes: 71 additions & 0 deletions src/lando/api/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from lando.api.legacy.workers.landing_worker import LandingWorker
from lando.main.scm import SCMType
from lando.main.scm.exceptions import SCMException


@pytest.mark.parametrize(
Expand All @@ -29,3 +30,73 @@ def test_Worker__no_SSH_PRIVATE_KEY(

# It should complain, but continue.
assert LandingWorker.SSH_PRIVATE_KEY_ENV_KEY in caplog.text


@pytest.fixture
def mocked_enabled_repos(hg_landing_worker):
Comment thread
cgsheeh marked this conversation as resolved.
"""Pin `enabled_repos` to a single list with mocked SCMs.

`Worker.enabled_repos` returns a fresh QuerySet on each access, so a mock
set on `repo._scm` doesn't survive across calls. This fixture freezes the
list once and replaces each repo's lazy SCM with a `MagicMock`.
"""
repos = list(hg_landing_worker.enabled_repos)
for repo in repos:
repo._scm = mock.MagicMock()
with mock.patch.object(
type(hg_landing_worker), "enabled_repos", new_callable=mock.PropertyMock
) as mock_enabled:
mock_enabled.return_value = repos
yield repos


@pytest.mark.django_db
def test_Worker_run_idle_maintenance_throttles_repeat_calls(
hg_landing_worker, mocked_enabled_repos
):
hg_landing_worker.run_idle_maintenance()
hg_landing_worker.run_idle_maintenance()

for repo in mocked_enabled_repos:
assert (
repo._scm.maintenance.call_count == 1
), "Repeat calls inside `maintenance_interval_seconds` should be throttled."


@pytest.mark.django_db
def test_Worker_run_idle_maintenance_runs_again_after_interval(
hg_landing_worker, mocked_enabled_repos
):
hg_landing_worker.run_idle_maintenance()

# Pretend the previous run happened beyond the throttle window.
interval = hg_landing_worker.worker_instance.maintenance_interval_seconds
for repo in mocked_enabled_repos:
hg_landing_worker.last_maintenance_at[repo.id] -= interval + 1
hg_landing_worker.run_idle_maintenance()
Comment thread
cgsheeh marked this conversation as resolved.

for repo in mocked_enabled_repos:
assert (
repo._scm.maintenance.call_count == 2
), "`maintenance` should run again once `maintenance_interval_seconds` has elapsed."


@pytest.mark.django_db
def test_Worker_run_idle_maintenance_isolates_failures(
caplog, hg_landing_worker, mocked_enabled_repos
):
assert len(mocked_enabled_repos) >= 2, "Test requires at least two enabled repos."

failing_repo, *healthy_repos = mocked_enabled_repos
failing_repo._scm.maintenance.side_effect = SCMException("boom", "", "")
Comment thread
cgsheeh marked this conversation as resolved.

hg_landing_worker.run_idle_maintenance()

for repo in healthy_repos:
repo._scm.maintenance.assert_called_once_with()
assert (
f"Idle maintenance failed for {failing_repo.name}" in caplog.text
), "A failure in one repo's maintenance should be logged."
assert (
failing_repo.id in hg_landing_worker.last_maintenance_at
), "A failed run should still update the timestamp so we don't retry on every idle loop."
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 6.0.4 on 2026-05-06 01:38

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("main", "0051_alter_repo_hooks"),
]

operations = [
migrations.AddField(
model_name="worker",
name="maintenance_interval_seconds",
field=models.IntegerField(default=300),
),
]
1 change: 1 addition & 0 deletions src/lando/main/models/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Worker(BaseModel):

throttle_seconds = models.IntegerField(default=10)
sleep_seconds = models.IntegerField(default=10)
maintenance_interval_seconds = models.IntegerField(default=300)

type = models.CharField(
choices=WorkerType,
Expand Down
13 changes: 9 additions & 4 deletions src/lando/main/scm/abstract_scm.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,10 @@ def push(
def clean_repo(
self,
*,
strip_non_public_commits: bool = True,
attributes_override: str | None = None,
):
"""Clean the local working copy from all extraneous files.

If `strip_non_public_commits` is set, also rewind any commit not present on the
origin.

`attributes_override` is SCM-dependent.
"""

Expand Down Expand Up @@ -199,6 +195,15 @@ def describe_local_changes(self, base_cset: str = "") -> list[CommitData]:
If `base_cset` is passed, use it as the public base to find changes against.
"""

@abstractmethod
def maintenance(self) -> None:
"""Run idle-time maintenance tasks for the repo.

Called from the worker loop during idle periods so background cleanup
(e.g. stripping stale Mercurial drafts, deleting old Git work
branches) doesn't add to per-job latency.
"""

@abstractmethod
def for_pull(self) -> AbstractContextManager:
"""Context manager to prepare the repo with the correct environment variables set for pulling."""
Expand Down
30 changes: 25 additions & 5 deletions src/lando/main/scm/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,18 +488,38 @@ def update_repo(
)
return self.head_ref()

@override
def maintenance(self) -> None:
"""Delete leftover `lando-<timestamp>` work branches.

Each landing creates a fresh work branch in `update_repo`, and they
accumulate on disk indefinitely. Idle-time cleanup keeps the local
branch list small without affecting per-job latency.
"""
branches = self._git_run(
"for-each-ref",
"--format=%(refname:short)",
"refs/heads/lando-*",
cwd=self.path,
).splitlines()
if not branches:
return

# `git branch -D` refuses to delete the currently checked-out branch,
# so move off any `lando-*` branch first.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting bit here. In the future it might make more sense to ensure we are back on the default branch after a job is finished.

if self.get_current_branch().startswith("lando-"):
Comment thread
cgsheeh marked this conversation as resolved.
Outdated
self._git_run("checkout", "--force", self.default_branch, cwd=self.path)

self._git_run("branch", "-D", *branches, cwd=self.path)
Comment thread
cgsheeh marked this conversation as resolved.
Outdated

@override
def clean_repo(
self,
*,
strip_non_public_commits: bool = True,
attributes_override: str | None = None,
):
"""Reset the local repository to the origin"""
if strip_non_public_commits:
self._git_run(
"reset", "--hard", f"origin/{self.default_branch}", cwd=self.path
)
self._git_run("reset", "--hard", f"origin/{self.default_branch}", cwd=self.path)
Comment thread
cgsheeh marked this conversation as resolved.

# We need to differentiate between None and "" here, so we know when we were
# explicitly given an empty string.
Expand Down
31 changes: 22 additions & 9 deletions src/lando/main/scm/hg.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def _run_hg_patch(
# might have partially applied the patch.
logger.info("import failed, retrying with 'patch'", exc_info=exc)
import_cmd += ["--config", "ui.patch=patch"]
self.clean_repo(strip_non_public_commits=False)
self.clean_repo()

self._prevent_hg_modifications(patch_or_diff.name)

Expand Down Expand Up @@ -539,7 +539,7 @@ def update_repo(
if not target_cset:
target_cset = self._get_remote_head(source)

# Strip any lingering changes.
# Clean working directory.
self.clean_repo()

# Pull from "upstream".
Expand Down Expand Up @@ -748,7 +748,6 @@ def read_rejects_files(self) -> dict[str, str]:
def clean_repo(
self,
*,
strip_non_public_commits: bool = True,
attributes_override: str | None = None,
):
"""Clean the local working copy from all extraneous files.
Expand All @@ -768,12 +767,26 @@ def clean_repo(
except HgException:
pass

# Strip any lingering draft changesets.
if strip_non_public_commits:
try:
self.run_hg(["strip", "--no-backup", "-r", "not public()"])
except HgException:
pass
@override
def maintenance(self) -> None:
"""Strip draft commits left over from previous landings.

Runs during worker idle periods so the ~8s strip cost doesn't land in
the user-visible job latency window. This is the only place that
strips drafts; the per-job `clean_repo` calls leave them in place.
"""
try:
self._open()
except hglib.error.ServerError as exc:
raise SCMException(
"Failed to open hg server for maintenance.", "", str(exc)
) from exc
try:
self.run_hg(["strip", "--no-backup", "-r", "not public()"])
except HgException:
pass
finally:
self.hg_repo.close()
Comment thread
cgsheeh marked this conversation as resolved.

@override
def merge_onto(
Expand Down
Loading
Loading