Skip to content

Commit 18abe99

Browse files
committed
workers: revision worker implementation
PREVIOUS versions (most recent first): - 8e73e5a - 403e760 - 262f7a5 WIP DO NOT MERGE Commit message TBD - add main worker flag and capacity/throttle flags - add method to parse diff and list affected files - add more test coverage for revision_worker.py - add mots integration (bug 1740107) - add new RevisionWorker that pre-processes revisions (bug 1788728) - add new start/stop commands to manage workers - add new flags to stop workers gracefully (*_WORKER_STOPPED) - add repo.use_revision_worker feature flag (bug 1788732) - add mots hashes check - include new Lando revision info via API endpoint - refactor dependency and stack fetching and parsing using networkx - rename old command lando-cli landing-worker to lando-cli start-landing-worker - run pre/post mots query - store mots output in revision model TODO: - detect stack change on page load - add tests for new warnings
1 parent e53bf92 commit 18abe99

32 files changed

+1711
-136
lines changed

.flake8

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[flake8]
22
max-line-length = 88
33
select = C,E,F,W,B,B9
4-
ignore = E203, E501, W503, B006
4+
ignore = E203, E501, W503, B006, E712, E711
55
exclude =
66
.hg,
77
.git,

Dockerfile

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ RUN cd / && pip install --no-cache /app
5454
ENV PYTHONPATH /app
5555
RUN chown -R app:app /app
5656

57-
# Create repos directory for transplanting in landing-worker
57+
# Create repos directory for landing-worker and revision worker.
5858
RUN mkdir /repos
59+
RUN chown -R app:app /repos
5960

6061
# Run as a non-privileged user
6162
USER app

Dockerfile-dev

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ ENV PYTHONUNBUFFERED=1
2020
ENV FLASK_RUN_PORT=9000
2121
ENV FLASK_RUN_HOST=0.0.0.0
2222
ENV FLASK_DEBUG=1
23+
ENV HTTP_ALLOWED=1
2324

2425
ENTRYPOINT ["lando-cli"]
2526
CMD ["run"]
@@ -48,6 +49,7 @@ RUN cd / && pip install --no-cache /app
4849
ENV PYTHONPATH /app
4950
RUN chown -R app:app /app
5051

52+
# Create repos directory for landing worker and revision worker.
5153
RUN mkdir /repos
5254
RUN chown -R app:app /repos
5355

docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ services:
111111
- smtp
112112
lando-api.landing-worker:
113113
image: lando-api
114-
command: ["landing-worker"]
114+
command: ["start-landing-worker"]
115115
environment:
116116
- ENV=localdev
117117
- DATABASE_URL=postgresql://postgres:[email protected]/lando_api_dev

landoapi/api/landing_jobs.py

+3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ def put(landing_job_id: str, data: dict):
6363

6464
if landing_job.status in (LandingJobStatus.SUBMITTED, LandingJobStatus.DEFERRED):
6565
landing_job.transition_status(LandingJobAction.CANCEL)
66+
for revision in landing_job.revisions:
67+
# Unlock patches so they can be modified in the future.
68+
revision.patch_locked = False
6669
db.session.commit()
6770
return {"id": landing_job.id}, 200
6871
else:

landoapi/api/transplants.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ def post(phab: PhabricatorClient, data: dict):
365365
}
366366

367367
raw_diff = phab.call_conduit("differential.getrawdiff", diffID=diff["id"])
368-
lando_revision.set_patch(raw_diff, patch_data)
368+
lando_revision.set_patch(raw_diff, patch_data, final=True)
369369
db.session.commit()
370370
lando_revisions.append(lando_revision)
371371

@@ -446,11 +446,9 @@ def get_list(phab: PhabricatorClient, stack_revision_id: str):
446446
limit=len(revision_phids),
447447
)
448448

449-
# Return both transplants and landing jobs, since for repos that were switched
450-
# both or either of these could be populated.
451-
452449
rev_ids = [phab.expect(r, "id") for r in phab.expect(revs, "data")]
453450

451+
# Find landing jobs based on related revisions or legacy revision_to_diff_id field.
454452
landing_jobs = LandingJob.revisions_query(rev_ids).all()
455453

456454
return [job.serialize() for job in landing_jobs], 200

landoapi/cli.py

+47-3
Original file line numberDiff line numberDiff line change
@@ -68,20 +68,64 @@ def worker(celery_arguments):
6868
celery.worker_main((sys.argv[0],) + celery_arguments)
6969

7070

71-
@cli.command(name="landing-worker")
72-
def landing_worker():
71+
@cli.command(name="start-landing-worker")
72+
def start_landing_worker():
7373
from landoapi.app import auth0_subsystem, lando_ui_subsystem
74+
from landoapi.workers.landing_worker import LandingWorker
7475

7576
exclusions = [auth0_subsystem, lando_ui_subsystem]
7677
for system in get_subsystems(exclude=exclusions):
7778
system.ensure_ready()
7879

79-
from landoapi.workers.landing_worker import LandingWorker
80+
ConfigurationVariable.set(LandingWorker.STOP_KEY, VariableType.BOOL, "0")
8081

8182
worker = LandingWorker()
8283
worker.start()
8384

8485

86+
@cli.command(name="stop-landing-worker")
87+
def stop_landing_worker():
88+
from landoapi.workers.landing_worker import LandingWorker
89+
from landoapi.storage import db_subsystem
90+
91+
db_subsystem.ensure_ready()
92+
ConfigurationVariable.set(LandingWorker.STOP_KEY, VariableType.BOOL, "1")
93+
94+
95+
@cli.command(name="start-revision-worker")
96+
@click.argument("role")
97+
def start_revision_worker(role):
98+
from landoapi.app import auth0_subsystem, lando_ui_subsystem
99+
from landoapi.workers.revision_worker import RevisionWorker, Supervisor, Processor
100+
101+
roles = {
102+
"processor": Processor,
103+
"supervisor": Supervisor,
104+
}
105+
106+
if role not in roles:
107+
raise ValueError(f"Unknown worker role specified ({role}).")
108+
109+
exclusions = [auth0_subsystem, lando_ui_subsystem]
110+
for system in get_subsystems(exclude=exclusions):
111+
system.ensure_ready()
112+
113+
ConfigurationVariable.set(RevisionWorker.STOP_KEY, VariableType.BOOL, "0")
114+
115+
worker = roles[role]()
116+
worker.start()
117+
118+
119+
@cli.command(name="stop-revision-worker")
120+
def stop_revision_worker():
121+
"""Stops all revision workers (supervisor and processors)."""
122+
from landoapi.workers.revision_worker import RevisionWorker
123+
from landoapi.storage import db_subsystem
124+
125+
db_subsystem.ensure_ready()
126+
RevisionWorker.stop()
127+
128+
85129
@cli.command(name="run-pre-deploy-sequence")
86130
def run_pre_deploy_sequence():
87131
"""Runs the sequence of commands required before a deployment."""

landoapi/commit_message.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
IRC_NICK = r"[a-zA-Z0-9\-\_.]*[a-zA-Z0-9\-\_]+"
4646

4747
# fmt: off
48-
REVIEWERS_RE = re.compile( # noqa: E131
48+
REVIEWERS_RE = re.compile(
4949
r"([\s\(\.\[;,])" # before "r" delimiter
5050
+ r"(" + SPECIFIER + r")" # flag
5151
+ r"(" # capture all reviewers
@@ -209,3 +209,6 @@ def bug_list_to_commit_string(bug_ids: Iterable[str]) -> str:
209209
return "No bug"
210210

211211
return f"Bug {', '.join(sorted(set(bug_ids)))}"
212+
213+
214+
# flake8: noqa: E131

landoapi/hg.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# This Source Code Form is subject to the terms of the Mozilla Public
22
# License, v. 2.0. If a copy of the MPL was not distributed with this
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4-
import copy
54
import configparser
5+
import copy
66
import logging
77
import os
88
import shlex
@@ -650,3 +650,13 @@ def read_checkout_file(self, path: str) -> str:
650650

651651
with checkout_file_path.open() as f:
652652
return f.read()
653+
654+
def has_incoming(self, source: str) -> bool:
655+
"""Check if there are any incoming changes from the remote repo."""
656+
try:
657+
self.run_hg(["incoming", source, "--limit", "1"])
658+
except hglib.error.CommandError as e:
659+
if b"no changes found" not in e.out:
660+
logger.error(e)
661+
return False
662+
return True

landoapi/models/__init__.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55
from landoapi.models.landing_job import LandingJob
6-
from landoapi.models.revisions import Revision
76
from landoapi.models.secapproval import SecApprovalRequest
87
from landoapi.models.transplant import Transplant
98
from landoapi.models.configuration import ConfigurationVariable
10-
from landoapi.models.revisions import DiffWarning
9+
from landoapi.models.revisions import DiffWarning, Revision
1110

1211
__all__ = [
1312
"LandingJob",

landoapi/models/configuration.py

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ class ConfigurationKey(enum.Enum):
2424

2525
LANDING_WORKER_PAUSED = "LANDING_WORKER_PAUSED"
2626
LANDING_WORKER_STOPPED = "LANDING_WORKER_STOPPED"
27+
REVISION_WORKER_PAUSED = "REVISION_WORKER_PAUSED"
28+
REVISION_WORKER_STOPPED = "REVISION_WORKER_STOPPED"
29+
REVISION_WORKER_CAPACITY = "REVISION_WORKER_CAPACITY"
2730
API_IN_MAINTENANCE = "API_IN_MAINTENANCE"
2831
WORKER_THROTTLE_SECONDS = "WORKER_THROTTLE_SECONDS"
2932

landoapi/models/landing_job.py

+31-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from sqlalchemy.dialects.postgresql.json import JSONB
2020

2121
from landoapi.models.base import Base
22-
from landoapi.models.revisions import Revision, revision_landing_job
22+
from landoapi.models.revisions import Revision, RevisionStatus, revision_landing_job
2323
from landoapi.storage import db
2424

2525
logger = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ class LandingJobStatus(enum.Enum):
3838
column of `LandingJob`.
3939
"""
4040

41-
# Initial creation state.
41+
# Ready to be picked up state.
4242
SUBMITTED = "SUBMITTED"
4343

4444
# Actively being processed.
@@ -274,6 +274,14 @@ def set_landed_revision_diffs(self):
274274
.values(diff_id=revision.diff_id)
275275
)
276276

277+
def has_non_ready_revisions(self) -> bool:
278+
"""Return whether any of the revisions are in a non-ready state or not."""
279+
return bool(
280+
set(r.status for r in self.revisions).intersection(
281+
RevisionStatus.NON_READY_STATES
282+
)
283+
)
284+
277285
def transition_status(
278286
self,
279287
action: LandingJobAction,
@@ -323,21 +331,42 @@ def transition_status(
323331

324332
self.status = actions[action]["status"]
325333

334+
if action == LandingJobAction.CANCEL:
335+
self.ready_revisions()
336+
326337
if action in (LandingJobAction.FAIL, LandingJobAction.DEFER):
327338
self.error = kwargs["message"]
339+
self.fail_revisions()
328340

329341
if action == LandingJobAction.LAND:
330342
self.landed_commit_id = kwargs["commit_id"]
343+
self.land_revisions()
331344

332345
if commit:
333346
db.session.commit()
334347

348+
def fail_revisions(self):
349+
"""Mark all revisions in landing jobs as failed."""
350+
for revision in self.revisions:
351+
revision.fail()
352+
353+
def land_revisions(self):
354+
"""Mark all revisions in landing jobs as landed."""
355+
for revision in self.revisions:
356+
revision.land()
357+
358+
def ready_revisions(self):
359+
"""Mark all revisions in landing jobs as ready."""
360+
for revision in self.revisions:
361+
revision.ready()
362+
335363
def serialize(self) -> dict[str, Any]:
336364
"""Return a JSON compatible dictionary."""
337365
return {
338366
"id": self.id,
339367
"status": self.status.value,
340368
"landing_path": self.serialized_landing_path,
369+
"duration_seconds": self.duration_seconds,
341370
"error_breakdown": self.error_breakdown,
342371
"details": (
343372
self.error or self.landed_commit_id

0 commit comments

Comments
 (0)