Skip to content

Commit dd677b6

Browse files
committed
workers: revision worker implementation
NOTE: original branch name references the wrong bug. WIP DO NOT MERGE - implement base RevisionWorker (bug 1788728) - implement Supervisor worker (bug 1835861) - implement Processor worker (bug 1835862) - add repo.use_revision_worker feature flag (bug 1788732) - add main worker flag and capacity/throttle flags - add method to parse diff and list affected files <******* - add test coverage for revision_worker.py - add new start/stop commands to manage workers - add new flags to stop workers gracefully (*_WORKER_STOPPED) - refactor dependency and stack fetching and parsing using networkx <******** - rename old command lando-cli landing-worker to lando-cli start-landing-worker TODO: - detect stack change on page load - add tests for new warnings
1 parent ba50223 commit dd677b6

24 files changed

+1596
-127
lines changed

Dockerfile

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ RUN cd / && pip install --no-cache /app
5454
ENV PYTHONPATH /app
5555
RUN chown -R app:app /app
5656

57-
# Create repos directory for transplanting in landing-worker
57+
# Create repos directory for landing-worker and revision worker.
5858
RUN mkdir /repos
59+
RUN chown -R app:app /repos
5960

6061
# Run as a non-privileged user
6162
USER app

Dockerfile-dev

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ ENV PYTHONUNBUFFERED=1
2020
ENV FLASK_RUN_PORT=9000
2121
ENV FLASK_RUN_HOST=0.0.0.0
2222
ENV FLASK_DEBUG=1
23+
ENV HTTP_ALLOWED=1
2324

2425
ENTRYPOINT ["lando-cli"]
2526
CMD ["run"]
@@ -48,6 +49,7 @@ RUN cd / && pip install --no-cache /app
4849
ENV PYTHONPATH /app
4950
RUN chown -R app:app /app
5051

52+
# Create repos directory for landing worker and revision worker.
5153
RUN mkdir /repos
5254
RUN chown -R app:app /repos
5355

docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ services:
111111
- smtp
112112
lando-api.landing-worker:
113113
image: lando-api
114-
command: ["landing-worker"]
114+
command: ["start-landing-worker"]
115115
environment:
116116
- ENV=localdev
117117
- DATABASE_URL=postgresql://postgres:[email protected]/lando_api_dev

landoapi/api/landing_jobs.py

+3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ def put(landing_job_id: str, data: dict):
6363

6464
if landing_job.status in (LandingJobStatus.SUBMITTED, LandingJobStatus.DEFERRED):
6565
landing_job.transition_status(LandingJobAction.CANCEL)
66+
for revision in landing_job.revisions:
67+
# Unlock patches so they can be modified in the future.
68+
revision.patch_locked = False
6669
db.session.commit()
6770
return {"id": landing_job.id}, 200
6871
else:

landoapi/api/transplants.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ def post(phab: PhabricatorClient, data: dict):
364364
}
365365

366366
raw_diff = phab.call_conduit("differential.getrawdiff", diffID=diff["id"])
367-
lando_revision.set_patch(raw_diff, patch_data)
367+
lando_revision.set_patch(raw_diff, patch_data, final=True)
368368
db.session.commit()
369369
lando_revisions.append(lando_revision)
370370

@@ -445,11 +445,9 @@ def get_list(phab: PhabricatorClient, stack_revision_id: str):
445445
limit=len(revision_phids),
446446
)
447447

448-
# Return both transplants and landing jobs, since for repos that were switched
449-
# both or either of these could be populated.
450-
451448
rev_ids = [phab.expect(r, "id") for r in phab.expect(revs, "data")]
452449

450+
# Find landing jobs based on related revisions or legacy revision_to_diff_id field.
453451
landing_jobs = LandingJob.revisions_query(rev_ids).all()
454452

455453
return [job.serialize() for job in landing_jobs], 200

landoapi/cli.py

+46-2
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,64 @@ def worker(celery_arguments):
6565
celery.worker_main((sys.argv[0],) + celery_arguments)
6666

6767

68-
@cli.command(name="landing-worker")
69-
def landing_worker():
68+
@cli.command(name="start-landing-worker")
69+
def start_landing_worker():
7070
from landoapi.app import auth0_subsystem, lando_ui_subsystem
71+
from landoapi.workers.landing_worker import LandingWorker
7172

7273
exclusions = [auth0_subsystem, lando_ui_subsystem]
7374
for system in get_subsystems(exclude=exclusions):
7475
system.ensure_ready()
7576

77+
worker = LandingWorker()
78+
ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "0")
79+
worker.start()
80+
81+
82+
@cli.command(name="stop-landing-worker")
83+
def stop_landing_worker():
84+
from landoapi.storage import db_subsystem
7685
from landoapi.workers.landing_worker import LandingWorker
7786

87+
db_subsystem.ensure_ready()
7888
worker = LandingWorker()
89+
ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "1")
90+
91+
92+
@cli.command(name="start-revision-worker")
93+
@click.argument("role")
94+
def start_revision_worker(role):
95+
from landoapi.app import auth0_subsystem, lando_ui_subsystem
96+
from landoapi.workers.revision_worker import Processor, Supervisor
97+
98+
roles = {
99+
"processor": Processor,
100+
"supervisor": Supervisor,
101+
}
102+
103+
if role not in roles:
104+
raise ValueError(f"Unknown worker role specified ({role}).")
105+
106+
exclusions = [auth0_subsystem, lando_ui_subsystem]
107+
for system in get_subsystems(exclude=exclusions):
108+
system.ensure_ready()
109+
110+
worker = roles[role]()
111+
ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "0")
79112
worker.start()
80113

81114

115+
@cli.command(name="stop-revision-worker")
116+
def stop_revision_worker():
117+
"""Stops all revision workers (supervisor and processors)."""
118+
from landoapi.storage import db_subsystem
119+
from landoapi.workers.revision_worker import RevisionWorker
120+
121+
db_subsystem.ensure_ready()
122+
worker = RevisionWorker()
123+
ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "1")
124+
125+
82126
@cli.command(name="run-pre-deploy-sequence")
83127
def run_pre_deploy_sequence():
84128
"""Runs the sequence of commands required before a deployment."""

landoapi/hg.py

+10
Original file line numberDiff line numberDiff line change
@@ -649,3 +649,13 @@ def read_checkout_file(self, path: str) -> str:
649649

650650
with checkout_file_path.open() as f:
651651
return f.read()
652+
653+
def has_incoming(self, source: str) -> bool:
654+
"""Check if there are any incoming changes from the remote repo."""
655+
try:
656+
self.run_hg(["incoming", source, "--limit", "1"])
657+
except hglib.error.CommandError as e:
658+
if b"no changes found" not in e.out:
659+
logger.error(e)
660+
return False
661+
return True

landoapi/models/configuration.py

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ class ConfigurationKey(enum.Enum):
2323

2424
LANDING_WORKER_PAUSED = "LANDING_WORKER_PAUSED"
2525
LANDING_WORKER_STOPPED = "LANDING_WORKER_STOPPED"
26+
REVISION_WORKER_PAUSED = "REVISION_WORKER_PAUSED"
27+
REVISION_WORKER_STOPPED = "REVISION_WORKER_STOPPED"
28+
REVISION_WORKER_CAPACITY = "REVISION_WORKER_CAPACITY"
2629
API_IN_MAINTENANCE = "API_IN_MAINTENANCE"
2730
WORKER_THROTTLE_SECONDS = "WORKER_THROTTLE_SECONDS"
2831

landoapi/models/landing_job.py

+31-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from sqlalchemy.dialects.postgresql.json import JSONB
1818

1919
from landoapi.models.base import Base
20-
from landoapi.models.revisions import Revision, revision_landing_job
20+
from landoapi.models.revisions import Revision, RevisionStatus, revision_landing_job
2121
from landoapi.storage import db
2222

2323
logger = logging.getLogger(__name__)
@@ -36,7 +36,7 @@ class LandingJobStatus(enum.Enum):
3636
column of `LandingJob`.
3737
"""
3838

39-
# Initial creation state.
39+
# Ready to be picked up state.
4040
SUBMITTED = "SUBMITTED"
4141

4242
# Actively being processed.
@@ -272,6 +272,14 @@ def set_landed_revision_diffs(self):
272272
.values(diff_id=revision.diff_id)
273273
)
274274

275+
def has_non_ready_revisions(self) -> bool:
276+
"""Return whether any of the revisions are in a non-ready state or not."""
277+
return bool(
278+
{r.status for r in self.revisions}.intersection(
279+
RevisionStatus.NON_READY_STATES
280+
)
281+
)
282+
275283
def transition_status(
276284
self,
277285
action: LandingJobAction,
@@ -321,21 +329,42 @@ def transition_status(
321329

322330
self.status = actions[action]["status"]
323331

332+
if action == LandingJobAction.CANCEL:
333+
self.ready_revisions()
334+
324335
if action in (LandingJobAction.FAIL, LandingJobAction.DEFER):
325336
self.error = kwargs["message"]
337+
self.fail_revisions()
326338

327339
if action == LandingJobAction.LAND:
328340
self.landed_commit_id = kwargs["commit_id"]
341+
self.land_revisions()
329342

330343
if commit:
331344
db.session.commit()
332345

346+
def fail_revisions(self):
347+
"""Mark all revisions in landing jobs as failed."""
348+
for revision in self.revisions:
349+
revision.fail()
350+
351+
def land_revisions(self):
352+
"""Mark all revisions in landing jobs as landed."""
353+
for revision in self.revisions:
354+
revision.land()
355+
356+
def ready_revisions(self):
357+
"""Mark all revisions in landing jobs as ready."""
358+
for revision in self.revisions:
359+
revision.ready()
360+
333361
def serialize(self) -> dict[str, Any]:
334362
"""Return a JSON compatible dictionary."""
335363
return {
336364
"id": self.id,
337365
"status": self.status.value,
338366
"landing_path": self.serialized_landing_path,
367+
"duration_seconds": self.duration_seconds,
339368
"error_breakdown": self.error_breakdown,
340369
"details": (
341370
self.error or self.landed_commit_id

0 commit comments

Comments
 (0)