diff --git a/src/lando/api/legacy/workers/base.py b/src/lando/api/legacy/workers/base.py index 09a7ab4fb..a0924478b 100644 --- a/src/lando/api/legacy/workers/base.py +++ b/src/lando/api/legacy/workers/base.py @@ -5,6 +5,7 @@ import re import subprocess from abc import ABC, abstractmethod +from datetime import datetime, timedelta from time import sleep from typing import Callable, TypeVar @@ -31,6 +32,7 @@ from lando.main.scm.exceptions import ( NoDiffStartLine, PatchConflict, + SCMException, SCMInternalServerError, ) @@ -91,6 +93,8 @@ def __init__( if not self.treestatus_client.ping(): raise ConnectionError("Could not connect to Treestatus") + self.last_maintenance_at: dict[int, datetime] = {} + self.refresh_active_repos() if with_ssh: @@ -214,7 +218,7 @@ def loop(self): job = self.job_type.next_job(repositories=self.active_repos).first() if job is None: - self.throttle(self.worker_instance.sleep_seconds) + self.run_idle_maintenance() return with job.processing(): @@ -284,6 +288,67 @@ def refresh_active_repos(self): ] logger.info(f"{len(self.active_repos)} enabled repos: {self.active_repos}") + def run_idle_maintenance(self): + """Call `scm.maintenance` on each enabled repo, throttled per repo. + + Called when no job is available. Each repo is maintained at most once + per `worker_instance.maintenance_interval_seconds` to avoid unnecessary + cleanup. Repos are processed oldest-first by last maintenance time so + the repo that has been waiting longest goes first. Maintenance stops + early once total elapsed time meets or exceeds `sleep_seconds`, so the + worker can promptly check the job queue again. After maintenance + finishes (or is cut short), sleeps for any time remaining in the + `sleep_seconds` interval. + """ + sleep_seconds = self.worker_instance.sleep_seconds + start_time = datetime.now() + interval = timedelta(seconds=self.worker_instance.maintenance_interval_seconds) + + repos_to_maintain = sorted( + ( + repo + for repo in self.enabled_repos + if start_time - self.last_maintenance_at.get(repo.id, datetime.min) + >= interval + ), + key=lambda repo: self.last_maintenance_at.get(repo.id, datetime.min), + ) + + if not repos_to_maintain: + self.throttle(sleep_seconds) + return + + count = len(repos_to_maintain) + repo_names = [repo.name for repo in repos_to_maintain] + logger.info(f"Starting idle maintenance for {count} repo(s): {repo_names}") + + for repo_index, repo in enumerate(repos_to_maintain): + try: + repo.scm.maintenance() + except SCMException: + logger.exception(f"Idle maintenance failed for {repo.name}.") + # Update on success or failure so a broken repo doesn't get hammered + # every idle loop. + self.last_maintenance_at[repo.id] = datetime.now() + elapsed_seconds = (datetime.now() - start_time).total_seconds() + if elapsed_seconds >= sleep_seconds: + logger.info( + f"Idle maintenance budget ({sleep_seconds}s) reached after " + f"{repo_index + 1} of {count} repo(s); stopping early." + ) + break + + maintained_count = repo_index + 1 + duration_seconds = (datetime.now() - start_time).total_seconds() + logger.info( + f"Finished idle maintenance for {maintained_count} of {count} repo(s) " + f"in {duration_seconds:.2f}s" + ) + + remaining_seconds = sleep_seconds - duration_seconds + if remaining_seconds > 0: + self.throttle(int(remaining_seconds)) + def update_repo( self, repo: Repo, job: BaseJob, scm: AbstractSCM, target_cset: str | None ) -> str: diff --git a/src/lando/api/tests/conftest.py b/src/lando/api/tests/conftest.py index 688cea73c..033ad5894 100644 --- a/src/lando/api/tests/conftest.py +++ b/src/lando/api/tests/conftest.py @@ -215,7 +215,7 @@ def set_repo_config(config): @pytest.fixture -def hg_landing_worker(landing_worker_instance): +def hg_landing_worker(landing_worker_instance, treestatusdouble): worker = landing_worker_instance( name="test-hg-worker", scm=SCMType.HG, @@ -224,7 +224,7 @@ def hg_landing_worker(landing_worker_instance): @pytest.fixture -def git_landing_worker(landing_worker_instance): +def git_landing_worker(landing_worker_instance, treestatusdouble): worker = landing_worker_instance( name="test-git-worker", scm=SCMType.GIT, diff --git a/src/lando/api/tests/test_hg.py b/src/lando/api/tests/test_hg.py index fdb04bdf8..a8931b840 100644 --- a/src/lando/api/tests/test_hg.py +++ b/src/lando/api/tests/test_hg.py @@ -28,52 +28,47 @@ def test_integrated_hgrepo_clean_repo(hg_clone): # Test is long and checks various repo cleaning cases as the startup # time for anything using `hg_clone` fixture is very long. - repo = HgSCM(hg_clone.strpath) + scm = HgSCM(hg_clone.strpath) - with repo.for_pull(), hg_clone.as_cwd(): - # Create a draft commits to clean. + with scm.for_pull(), hg_clone.as_cwd(): + # Create a draft commit to clean. new_file = hg_clone.join("new-file.txt") new_file.write("text", mode="w+") - repo.run_hg_cmds( + scm.run_hg_cmds( [["add", new_file.strpath], ["commit", "-m", "new draft commit"]] ) - assert repo.run_hg_cmds([["outgoing"]]) + assert scm.run_hg_cmds([["outgoing"]]) # Dirty the working directory. new_file.write("Extra data", mode="a") - assert repo.run_hg_cmds([["status"]]) + assert scm.run_hg_cmds([["status"]]) - # Can clean working directory without nuking commits - repo.clean_repo(strip_non_public_commits=False) - assert repo.run_hg_cmds([["outgoing"]]) - assert not repo.run_hg_cmds([["status"]]) + # `clean_repo` clears the working directory but leaves drafts in place; + # stripping is `maintenance`'s job. + scm.clean_repo() + assert scm.run_hg_cmds([["outgoing"]]) + assert not scm.run_hg_cmds([["status"]]) - # Dirty the working directory again. - new_file.write("Extra data", mode="a") - assert repo.run_hg_cmds([["status"]]) - - # Cleaning should remove commit and clean working directory. - repo.clean_repo() - with pytest.raises(HgCommandError, match="no changes found"): - repo.run_hg_cmds([["outgoing"]]) - assert not repo.run_hg_cmds([["status"]]) + # Dirty the directory again before exiting the context manager. + new_file.write("extra data", mode="a") + assert scm.run_hg_cmds([["status"]]) - # Create a commit and dirty the directory before exiting - # the context manager as entering a new context should - # provide a clean repo. - new_file.write("text", mode="w+") - repo.run_hg_cmds( - [["add", new_file.strpath], ["commit", "-m", "new draft commit"]] + with scm.for_pull(): + assert not scm.run_hg_cmds([["status"]]), ( + "Working directory should be clean after exiting and re-entering the context." + ) + assert scm.run_hg_cmds([["outgoing"]]), ( + "Draft commits should persist across context exits; `maintenance` strips them." ) - new_file.write("extra data", mode="a") - assert repo.run_hg_cmds([["outgoing"]]) - assert repo.run_hg_cmds([["status"]]) - with repo.for_pull(), hg_clone.as_cwd(): - # New context should be clean. + scm.maintenance() + + with scm.for_pull(), hg_clone.as_cwd(): with pytest.raises(HgCommandError, match="no changes found"): - repo.run_hg_cmds([["outgoing"]]) - assert not repo.run_hg_cmds([["status"]]) + scm.run_hg_cmds([["outgoing"]]) + assert not scm.run_hg_cmds([["status"]]), ( + "Working directory should be clean after `maintenance` runs." + ) def test_integrated_hgrepo_can_log(hg_clone): diff --git a/src/lando/api/tests/test_worker.py b/src/lando/api/tests/test_worker.py index f2fc10746..f864ab886 100644 --- a/src/lando/api/tests/test_worker.py +++ b/src/lando/api/tests/test_worker.py @@ -1,10 +1,12 @@ import os +from datetime import datetime, timedelta from unittest import mock import pytest from lando.api.legacy.workers.landing_worker import LandingWorker from lando.main.scm import SCMType +from lando.main.scm.exceptions import SCMException @pytest.mark.parametrize( @@ -29,3 +31,119 @@ def test_Worker__no_SSH_PRIVATE_KEY( # It should complain, but continue. assert LandingWorker.SSH_PRIVATE_KEY_ENV_KEY in caplog.text + + +@pytest.fixture +def mocked_enabled_repos(hg_landing_worker): + """Configure `hg_landing_worker` for `run_idle_maintenance` tests. + + `Worker.enabled_repos` returns a fresh QuerySet on each access, so a mock + set on `repo._scm` doesn't survive across calls. This fixture freezes the + list once and replaces each repo's lazy SCM with a `MagicMock`. + + Also raises `sleep_seconds` so the per-call maintenance time budget isn't + tripped by fast mocked calls, and patches `throttle` so the post-maintenance + sleep doesn't slow the test. Individual tests may lower `sleep_seconds` + to exercise the budget directly. + """ + repos = list(hg_landing_worker.enabled_repos) + for repo in repos: + repo._scm = mock.MagicMock() + hg_landing_worker.worker_instance.sleep_seconds = 60 + with ( + mock.patch.object( + type(hg_landing_worker), "enabled_repos", new_callable=mock.PropertyMock + ) as mock_enabled, + mock.patch.object(hg_landing_worker, "throttle"), + ): + mock_enabled.return_value = repos + yield repos + + +@pytest.mark.django_db +def test_Worker_run_idle_maintenance_throttles_repeat_calls( + hg_landing_worker, mocked_enabled_repos +): + hg_landing_worker.run_idle_maintenance() + hg_landing_worker.run_idle_maintenance() + + for repo in mocked_enabled_repos: + assert repo._scm.maintenance.call_count == 1, ( + "Repeat calls inside `maintenance_interval_seconds` should be throttled." + ) + + +@pytest.mark.django_db +def test_Worker_run_idle_maintenance_runs_again_after_interval( + hg_landing_worker, mocked_enabled_repos +): + hg_landing_worker.run_idle_maintenance() + + # Pretend the previous run happened beyond the throttle window. + interval = timedelta( + seconds=hg_landing_worker.worker_instance.maintenance_interval_seconds + 1 + ) + for repo in mocked_enabled_repos: + hg_landing_worker.last_maintenance_at[repo.id] -= interval + + hg_landing_worker.run_idle_maintenance() + + for repo in mocked_enabled_repos: + assert repo._scm.maintenance.call_count == 2, ( + "`maintenance` should run again once `maintenance_interval_seconds` has elapsed." + ) + + +@pytest.mark.django_db +def test_Worker_run_idle_maintenance_stops_at_budget_and_prefers_oldest( + hg_landing_worker, mocked_enabled_repos +): + """When the time budget is exhausted, stop early after processing the repo + that has been waiting longest for maintenance.""" + assert len(mocked_enabled_repos) >= 2, "Test requires at least two enabled repos." + + # A `sleep_seconds` budget of 0 means we stop after the very first repo. + hg_landing_worker.worker_instance.sleep_seconds = 0 + + # Make every repo eligible (well past the interval) and pin one repo as the oldest. + interval = timedelta( + seconds=hg_landing_worker.worker_instance.maintenance_interval_seconds + 10 + ) + now = datetime.now() + oldest_repo = mocked_enabled_repos[-1] + for repo in mocked_enabled_repos: + hg_landing_worker.last_maintenance_at[repo.id] = now - interval + hg_landing_worker.last_maintenance_at[oldest_repo.id] = now - (interval * 2) + + hg_landing_worker.run_idle_maintenance() + + assert oldest_repo._scm.maintenance.call_count == 1, ( + "The repo waiting longest should run first when the budget is tight." + ) + for repo in mocked_enabled_repos: + if repo is oldest_repo: + continue + assert repo._scm.maintenance.call_count == 0, ( + "Other repos should be skipped once the budget is exhausted." + ) + + +@pytest.mark.django_db +def test_Worker_run_idle_maintenance_isolates_failures( + caplog, hg_landing_worker, mocked_enabled_repos +): + assert len(mocked_enabled_repos) >= 2, "Test requires at least two enabled repos." + + failing_repo, *healthy_repos = mocked_enabled_repos + failing_repo._scm.maintenance.side_effect = SCMException("boom", "", "") + + hg_landing_worker.run_idle_maintenance() + + for repo in healthy_repos: + repo._scm.maintenance.assert_called_once_with() + assert f"Idle maintenance failed for {failing_repo.name}" in caplog.text, ( + "A failure in one repo's maintenance should be logged." + ) + assert failing_repo.id in hg_landing_worker.last_maintenance_at, ( + "A failed run should still update the timestamp so we don't retry on every idle loop." + ) diff --git a/src/lando/main/migrations/0052_worker_maintenance_interval_seconds.py b/src/lando/main/migrations/0052_worker_maintenance_interval_seconds.py new file mode 100644 index 000000000..afc000654 --- /dev/null +++ b/src/lando/main/migrations/0052_worker_maintenance_interval_seconds.py @@ -0,0 +1,18 @@ +# Generated by Django 6.0.4 on 2026-05-06 01:38 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("main", "0051_alter_repo_hooks"), + ] + + operations = [ + migrations.AddField( + model_name="worker", + name="maintenance_interval_seconds", + field=models.IntegerField(default=300), + ), + ] diff --git a/src/lando/main/models/worker.py b/src/lando/main/models/worker.py index 219092359..4249f5459 100644 --- a/src/lando/main/models/worker.py +++ b/src/lando/main/models/worker.py @@ -26,6 +26,7 @@ class Worker(BaseModel): throttle_seconds = models.IntegerField(default=10) sleep_seconds = models.IntegerField(default=10) + maintenance_interval_seconds = models.IntegerField(default=300) type = models.CharField( choices=WorkerType, diff --git a/src/lando/main/scm/abstract_scm.py b/src/lando/main/scm/abstract_scm.py index 5cfbe5922..1a2567550 100644 --- a/src/lando/main/scm/abstract_scm.py +++ b/src/lando/main/scm/abstract_scm.py @@ -70,14 +70,10 @@ def push( def clean_repo( self, *, - strip_non_public_commits: bool = True, attributes_override: str | None = None, ): """Clean the local working copy from all extraneous files. - If `strip_non_public_commits` is set, also rewind any commit not present on the - origin. - `attributes_override` is SCM-dependent. """ @@ -199,6 +195,15 @@ def describe_local_changes(self, base_cset: str = "") -> list[CommitData]: If `base_cset` is passed, use it as the public base to find changes against. """ + @abstractmethod + def maintenance(self) -> None: + """Perform various maintenance tasks while the worker is idling. + + Called from the worker loop during idle periods so background cleanup + (e.g. stripping stale Mercurial drafts, deleting old Git work + branches) doesn't add to per-job latency. + """ + @abstractmethod def for_pull(self) -> AbstractContextManager: """Context manager to prepare the repo with the correct environment variables set for pulling.""" diff --git a/src/lando/main/scm/git.py b/src/lando/main/scm/git.py index 016d8188a..da27b47f1 100644 --- a/src/lando/main/scm/git.py +++ b/src/lando/main/scm/git.py @@ -499,18 +499,38 @@ def update_repo( ) return self.head_ref() + @override + def maintenance(self) -> None: + """Perform various maintenance tasks while the worker is idling. + + Currently this method cleans up leftover `lando-` work + branches. Each landing creates a fresh work branch in `update_repo`, + and they accumulate on disk indefinitely. Idle-time cleanup keeps the + local branch list small without affecting per-job latency. + """ + branches = self._git_run( + "for-each-ref", + "--format=%(refname:short)", + "refs/heads/lando-*", + cwd=self.path, + ).splitlines() + if not branches: + return + + # `git branch -D` refuses to delete the currently checked-out branch, + # and we are always on a `lando-*` branch at this point. + self._git_run("checkout", "--force", self.default_branch, cwd=self.path) + + self._git_run("branch", "-D", *branches, cwd=self.path) + @override def clean_repo( self, *, - strip_non_public_commits: bool = True, attributes_override: str | None = None, ): """Reset the local repository to the origin""" - if strip_non_public_commits: - self._git_run( - "reset", "--hard", f"origin/{self.default_branch}", cwd=self.path - ) + self._git_run("reset", "--hard", f"origin/{self.default_branch}", cwd=self.path) # We need to differentiate between None and "" here, so we know when we were # explicitly given an empty string. diff --git a/src/lando/main/scm/hg.py b/src/lando/main/scm/hg.py index 86a9cb912..dd607662c 100644 --- a/src/lando/main/scm/hg.py +++ b/src/lando/main/scm/hg.py @@ -328,7 +328,7 @@ def _run_hg_patch( # might have partially applied the patch. logger.info("import failed, retrying with 'patch'", exc_info=exc) import_cmd += ["--config", "ui.patch=patch"] - self.clean_repo(strip_non_public_commits=False) + self.clean_repo() self._prevent_hg_modifications(patch_or_diff.name) @@ -539,7 +539,7 @@ def update_repo( if not target_cset: target_cset = self._get_remote_head(source) - # Strip any lingering changes. + # Clean working directory. self.clean_repo() # Pull from "upstream". @@ -748,7 +748,6 @@ def read_rejects_files(self) -> dict[str, str]: def clean_repo( self, *, - strip_non_public_commits: bool = True, attributes_override: str | None = None, ): """Clean the local working copy from all extraneous files. @@ -768,12 +767,28 @@ def clean_repo( except HgException: pass - # Strip any lingering draft changesets. - if strip_non_public_commits: - try: - self.run_hg(["strip", "--no-backup", "-r", "not public()"]) - except HgException: - pass + @override + def maintenance(self) -> None: + """Perform various maintenance tasks while the worker is idling. + + Currently this method strips draft commits left over from previous + landings. Running during worker idle periods keeps the ~8s strip cost + out of the user-visible job latency window. This is the only place + that strips drafts; the per-job `clean_repo` calls leave them in + place. + """ + try: + self._open() + except hglib.error.ServerError as exc: + raise SCMException( + "Failed to open hg server for maintenance.", "", str(exc) + ) from exc + try: + self.run_hg(["strip", "--no-backup", "-r", "not public()"]) + except HgException: + pass + finally: + self.hg_repo.close() @override def merge_onto( diff --git a/src/lando/main/tests/test_git.py b/src/lando/main/tests/test_git.py index 6a8e5fe1d..3cdeae663 100644 --- a/src/lando/main/tests/test_git.py +++ b/src/lando/main/tests/test_git.py @@ -81,10 +81,6 @@ def test_GitSCM_clone( ) -@pytest.mark.parametrize( - "strip_non_public_commits", - [True, False], -) def test_GitSCM_clean_repo( git_repo: Path, git_setup_user: Callable, @@ -92,7 +88,6 @@ def test_GitSCM_clean_repo( request: pytest.FixtureRequest, tmp_path: Path, create_git_commit: Callable, - strip_non_public_commits: bool, ): clone_path = tmp_path / request.node.name clone_path.mkdir() @@ -125,22 +120,20 @@ def test_GitSCM_clean_repo( mock_git_run = active_mock(scm, "_git_run") - scm.clean_repo(strip_non_public_commits=strip_non_public_commits) + scm.clean_repo() mock_git_run.assert_called_with("clean", "-fdx", cwd=str(clone_path)) - if strip_non_public_commits: - mock_git_run.assert_any_call( - "reset", "--hard", f"origin/{scm.default_branch}", cwd=str(clone_path) - ) - current_commit = subprocess.run( - ["git", "rev-parse", "HEAD"], cwd=str(clone_path), capture_output=True - ).stdout - assert current_commit == original_commit, ( - f"Not on original_commit {original_commit} after using strip_non_public_commits: {current_commit}" - ) - - assert strip_non_public_commits != new_file.exists(), ( - f"strip_non_public_commits not honoured for {new_file}" + mock_git_run.assert_any_call( + "reset", "--hard", f"origin/{scm.default_branch}", cwd=str(clone_path) + ) + current_commit = subprocess.run( + ["git", "rev-parse", "HEAD"], cwd=str(clone_path), capture_output=True + ).stdout + assert current_commit == original_commit, ( + f"`clean_repo` should rewind to {original_commit}, got {current_commit}." + ) + assert not new_file.exists(), ( + f"`clean_repo` should remove the new commit's file: {new_file}" ) @@ -197,6 +190,80 @@ def test_GitSCM_clean_repo_gitattributes( ) +def _list_branches(clone_path: Path) -> list[str]: + """Return all local branch names in `clone_path`.""" + result = subprocess.run( + ["git", "for-each-ref", "--format=%(refname:short)", "refs/heads/"], + cwd=str(clone_path), + capture_output=True, + check=True, + text=True, + ) + return result.stdout.splitlines() + + +@pytest.mark.parametrize("checked_out_branch", ["lando-2025-01-01T120000", "main"]) +def test_GitSCM_maintenance( + git_repo: Path, + git_setup_user: Callable, + request: pytest.FixtureRequest, + tmp_path: Path, + checked_out_branch: str, +): + clone_path = tmp_path / request.node.name + clone_path.mkdir() + scm = GitSCM(str(clone_path)) + scm.clone(str(git_repo)) + git_setup_user(str(clone_path)) + + lando_branches = [ + "lando-2025-01-01T120000", + "lando-2025-04-30T135037", + "lando-2025-05-01T022525", + ] + for branch in lando_branches: + subprocess.run(["git", "branch", branch], cwd=str(clone_path), check=True) + subprocess.run( + ["git", "branch", "feature-keep-me"], cwd=str(clone_path), check=True + ) + + subprocess.run( + ["git", "checkout", checked_out_branch], cwd=str(clone_path), check=True + ) + + scm.maintenance() + + remaining = _list_branches(clone_path) + assert all(not branch.startswith("lando-") for branch in remaining), ( + f"All `lando-*` branches should have been deleted; got {remaining}." + ) + assert "feature-keep-me" in remaining, ( + "Non-`lando-*` branches should be left alone." + ) + assert scm.default_branch in remaining, "Default branch should still exist." + + +def test_GitSCM_maintenance_noop_without_lando_branches( + git_repo: Path, + git_setup_user: Callable, + request: pytest.FixtureRequest, + tmp_path: Path, +): + clone_path = tmp_path / request.node.name + clone_path.mkdir() + scm = GitSCM(str(clone_path)) + scm.clone(str(git_repo)) + git_setup_user(str(clone_path)) + + branches_before = _list_branches(clone_path) + + scm.maintenance() + + assert _list_branches(clone_path) == branches_before, ( + "`maintenance` should be a no-op when there are no `lando-*` branches." + ) + + def remove_git_version_from_patch(patch: str) -> str: """Return a patch with the Git version stripped.""" return re.sub(r"\d+(\.\d+)+$", "", patch)