Skip to content

Commit 009038d

Browse files
committed
workers: revision worker implementation
WIP DO NOT MERGE Commit message TBD - add abstract Worker class (bug 1744327) - add main worker flag and capacity/throttle flags - add many to many fields + association to revisions/landing jobs - add method to parse diff and list affected files - add more test coverage for revision_worker.py - add mots integration (bug 1740107) - add new RevisionWorker that pre-processes revisions (bug 1788728) - add new RevisionWorker that pre-processes revisions (bug 1788728) - add new start/stop commands to manage workers - add new flags to stop workers gracefully (*_WORKER_STOPPED) - add patch caching on disk - add proper loop/process functionality to workers - add repo.use_revision_worker feature flag (bug 1788732) - add mots hashes check - improved edge search functionality - implement stack hashes to detect changes in revisions (via get_stack_hashes) - include new Lando revision info via API endpoint - refactor dependency and stack fetching and parsing using networkx - refactored revision worker and landing worker to use Worker class - remove s3/boto/etc. dependencies (bug 1753728) - rename old command lando-cli landing-worker to lando-cli start-landing-worker - run pre/post mots query - store mots output in revision model
1 parent 0839599 commit 009038d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2272
-370
lines changed

.flake8

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[flake8]
22
max-line-length = 88
33
select = C,E,F,W,B,B9
4-
ignore = E203, E501, W503, B006
4+
ignore = E203, E501, W503, B006, E712, E711
55
exclude =
66
.hg,
77
.git,

Dockerfile

+6-1
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,13 @@ RUN cd / && pip install --no-cache /app
5454
ENV PYTHONPATH /app
5555
RUN chown -R app:app /app
5656

57-
# Create repos directory for transplanting in landing-worker
57+
# Create repos directory for landing-worker and revision worker.
5858
RUN mkdir /repos
59+
RUN chown -R app:app /repos
60+
61+
# Create patches directory to cache patches.
62+
RUN mkdir /patches
63+
RUN chown -R app:app /patches
5964

6065
# Run as a non-privileged user
6166
USER app

Dockerfile-dev

+6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ ENV PYTHONUNBUFFERED=1
2020
ENV FLASK_RUN_PORT=9000
2121
ENV FLASK_RUN_HOST=0.0.0.0
2222
ENV FLASK_DEBUG=1
23+
ENV HTTP_ALLOWED=1
2324

2425
ENTRYPOINT ["lando-cli"]
2526
CMD ["run"]
@@ -48,9 +49,14 @@ RUN cd / && pip install --no-cache /app
4849
ENV PYTHONPATH /app
4950
RUN chown -R app:app /app
5051

52+
# Create repos directory for landing worker and revision worker.
5153
RUN mkdir /repos
5254
RUN chown -R app:app /repos
5355

56+
# Create patches directory to store cached patches.
57+
RUN mkdir /patches
58+
RUN chown -R app:app /patches
59+
5460
# Run as a non-privileged user
5561
USER app
5662

docker-compose.yml

+13-12
Original file line numberDiff line numberDiff line change
@@ -131,25 +131,24 @@ services:
131131
- smtp
132132
lando-api.landing-worker:
133133
image: lando-api
134-
command: ["landing-worker"]
134+
command: ["start-landing-worker"]
135135
environment:
136-
- ENV=localdev
137-
- DATABASE_URL=postgresql://postgres:[email protected]/lando_api_dev
138-
- SENTRY_DSN=
139-
# See http://docs.celeryproject.org/en/stable/getting-started/brokers/redis.html#configuration
140-
# for the full URL format.
141-
- CELERY_BROKER_URL=redis://redis.queue/0
142-
- OIDC_IDENTIFIER=https://lando-api.test
143-
- OIDC_DOMAIN=https://auth0.test
144-
- LANDO_UI_URL=https://lando.test
145-
- REPO_CLONES_PATH=/repos
146-
- REPOS_TO_LAND=localdev
136+
CELERY_BROKER_URL: "redis://redis.queue/0"
137+
DATABASE_URL: "postgresql://postgres:[email protected]/lando_api_dev"
138+
ENV: "localdev"
139+
LANDO_UI_URL: "https://lando.test"
140+
OIDC_DOMAIN: "https://auth0.test"
141+
OIDC_IDENTIFIER: "https://lando-api.test"
142+
REPOS_TO_LAND: "localdev"
143+
REPO_CLONES_PATH: "/repos"
144+
SENTRY_DSN: ""
147145
user: root
148146
volumes:
149147
- ./:/app
150148
- ./migrations/:/migrations/
151149
# Prevent writing python cache to the host.
152150
- caches_cache:/app/.cache/
151+
- repos:/repos
153152
depends_on:
154153
- lando-api.db
155154
- redis.queue
@@ -177,3 +176,5 @@ volumes:
177176
caches_pycache:
178177
caches_cache:
179178
caches_pytest_cache:
179+
repos:
180+
patches:

landoapi/api/landing_jobs.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ def put(landing_job_id: str, data: dict):
6262
)
6363

6464
if landing_job.status in (LandingJobStatus.SUBMITTED, LandingJobStatus.DEFERRED):
65-
landing_job.transition_status(LandingJobAction.CANCEL)
66-
db.session.commit()
65+
landing_job.transition_status(LandingJobAction.CANCEL, commit=True, db=db)
6766
return {"id": landing_job.id}, 200
6867
else:
6968
raise ProblemException(

landoapi/api/revisions.py

+13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from landoapi.decorators import require_phabricator_api_key
1111
from landoapi.models import SecApprovalRequest
1212
from landoapi.phabricator import PhabricatorClient
13+
from landoapi.models.revisions import Revision
1314
from landoapi.projects import get_secure_project_phid
1415
from landoapi.revisions import revision_is_secure
1516
from landoapi.secapproval import send_sanitized_commit_message_for_review
@@ -88,3 +89,15 @@ def request_sec_approval(phab: PhabricatorClient, data: dict):
8889
db.session.commit()
8990

9091
return {}, 200
92+
93+
94+
def get_stack_hashes(revision_id: int) -> tuple:
95+
"""
96+
Given a revision, returns revision stack hashes.
97+
98+
A stack hash is used to detect a change in a revision.
99+
"""
100+
revision = Revision.query.filter(Revision.id == revision_id).one_or_none()
101+
if revision:
102+
return revision.stack_hashes, 200
103+
return {}, 404

landoapi/api/stacks.py

+20-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from flask import current_app
99
from landoapi.commit_message import format_commit_message
1010
from landoapi.decorators import require_phabricator_api_key
11+
from landoapi.models.revisions import Revision
1112
from landoapi.phabricator import PhabricatorClient
1213
from landoapi.projects import (
1314
get_release_managers,
@@ -116,19 +117,25 @@ def get(phab: PhabricatorClient, revision_id: str):
116117
}
117118

118119
revisions_response = []
119-
for revision_phid, revision in stack_data.revisions.items():
120-
fields = PhabricatorClient.expect(revision, "fields")
120+
for _phid, phab_revision in stack_data.revisions.items():
121+
lando_revision = Revision.query.filter(
122+
Revision.revision_id == phab_revision["id"]
123+
).one_or_none()
124+
revision_phid = PhabricatorClient.expect(phab_revision, "phid")
125+
fields = PhabricatorClient.expect(phab_revision, "fields")
121126
diff_phid = PhabricatorClient.expect(fields, "diffPHID")
122127
repo_phid = PhabricatorClient.expect(fields, "repositoryPHID")
123128
diff = stack_data.diffs[diff_phid]
124-
human_revision_id = "D{}".format(PhabricatorClient.expect(revision, "id"))
129+
human_revision_id = "D{}".format(PhabricatorClient.expect(phab_revision, "id"))
125130
revision_url = urllib.parse.urljoin(
126131
current_app.config["PHABRICATOR_URL"], human_revision_id
127132
)
128-
secure = revision_is_secure(revision, secure_project_phid)
129-
commit_description = find_title_and_summary_for_display(phab, revision, secure)
130-
bug_id = get_bugzilla_bug(revision)
131-
reviewers = get_collated_reviewers(revision)
133+
secure = revision_is_secure(phab_revision, secure_project_phid)
134+
commit_description = find_title_and_summary_for_display(
135+
phab, phab_revision, secure
136+
)
137+
bug_id = get_bugzilla_bug(phab_revision)
138+
reviewers = get_collated_reviewers(phab_revision)
132139
accepted_reviewers = reviewers_for_commit_message(
133140
reviewers, users, projects, sec_approval_project_phid
134141
)
@@ -163,16 +170,16 @@ def get(phab: PhabricatorClient, revision_id: str):
163170
{
164171
"id": human_revision_id,
165172
"phid": revision_phid,
166-
"status": serialize_status(revision),
173+
"status": serialize_status(phab_revision),
167174
"blocked_reason": blocked.get(revision_phid, ""),
168175
"bug_id": bug_id,
169176
"title": commit_description.title,
170177
"url": revision_url,
171178
"date_created": PhabricatorClient.to_datetime(
172-
PhabricatorClient.expect(revision, "fields", "dateCreated")
179+
PhabricatorClient.expect(phab_revision, "fields", "dateCreated")
173180
).isoformat(),
174181
"date_modified": PhabricatorClient.to_datetime(
175-
PhabricatorClient.expect(revision, "fields", "dateModified")
182+
PhabricatorClient.expect(phab_revision, "fields", "dateModified")
176183
).isoformat(),
177184
"summary": commit_description.summary,
178185
"commit_message_title": commit_message_title,
@@ -183,6 +190,9 @@ def get(phab: PhabricatorClient, revision_id: str):
183190
"reviewers": serialize_reviewers(reviewers, users, projects, diff_phid),
184191
"is_secure": secure,
185192
"is_using_secure_commit_message": commit_description.sanitized,
193+
"lando_revision": lando_revision.serialize()
194+
if lando_revision
195+
else None,
186196
}
187197
)
188198

landoapi/api/transplants.py

+3-10
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# This Source Code Form is subject to the terms of the Mozilla Public
22
# License, v. 2.0. If a copy of the MPL was not distributed with this
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
from datetime import datetime
45
import logging
56
import urllib.parse
67

7-
from datetime import datetime
88
from typing import Optional
99

1010
import kombu
@@ -308,8 +308,6 @@ def post(phab: PhabricatorClient, data: dict):
308308
}
309309

310310
lando_revisions = []
311-
312-
# Build the patches to land.
313311
for revision, diff in to_land:
314312
reviewers = get_collated_reviewers(revision)
315313
accepted_reviewers = reviewers_for_commit_message(
@@ -395,6 +393,8 @@ def post(phab: PhabricatorClient, data: dict):
395393
)
396394

397395
db.session.add(job)
396+
397+
# Commit to get job ID.
398398
db.session.commit()
399399

400400
job.add_revisions(lando_revisions)
@@ -425,7 +425,6 @@ def post(phab: PhabricatorClient, data: dict):
425425
def get_list(phab: PhabricatorClient, stack_revision_id: str):
426426
"""Return a list of Transplant objects"""
427427
revision_id_int = revision_id_to_int(stack_revision_id)
428-
429428
revision = phab.call_conduit(
430429
"differential.revision.search", constraints={"ids": [revision_id_int]}
431430
)
@@ -444,12 +443,6 @@ def get_list(phab: PhabricatorClient, stack_revision_id: str):
444443
constraints={"phids": revision_phids},
445444
limit=len(revision_phids),
446445
)
447-
448-
# Return both transplants and landing jobs, since for repos that were switched
449-
# both or either of these could be populated.
450-
451446
rev_ids = [phab.expect(r, "id") for r in phab.expect(revs, "data")]
452-
453447
landing_jobs = LandingJob.revisions_query(rev_ids).all()
454-
455448
return [job.serialize() for job in landing_jobs], 200

landoapi/app.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ def load_config() -> dict[str, Any]:
6262
}
6363

6464
config_keys = (
65-
"AWS_ACCESS_KEY",
66-
"AWS_SECRET_KEY",
6765
"BUGZILLA_API_KEY",
6866
"BUGZILLA_URL",
6967
"CACHE_REDIS_DB",
@@ -82,15 +80,15 @@ def load_config() -> dict[str, Any]:
8280
"MAIL_USERNAME",
8381
"OIDC_DOMAIN",
8482
"OIDC_IDENTIFIER",
85-
"PATCH_BUCKET_NAME",
8683
"PHABRICATOR_ADMIN_API_KEY",
8784
"PHABRICATOR_UNPRIVILEGED_API_KEY",
8885
"PHABRICATOR_URL",
89-
"REPO_CLONES_PATH",
86+
"PINGBACK_ENABLED",
9087
"REPOS_TO_LAND",
88+
"REPO_CLONES_PATH",
9189
"SENTRY_DSN",
92-
"TRANSPLANT_PASSWORD",
9390
"TRANSPLANT_API_KEY",
91+
"TRANSPLANT_PASSWORD",
9492
"TRANSPLANT_URL",
9593
"TRANSPLANT_USERNAME",
9694
"TREESTATUS_URL",

landoapi/cache.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ class CacheSubsystem(Subsystem):
2626

2727
def init_app(self, app):
2828
super().init_app(app)
29-
3029
host = self.flask_app.config.get("CACHE_REDIS_HOST")
31-
if not host:
30+
if self.flask_app.config.get("CACHE_DISABLED"):
3231
# Default to not caching for testing.
33-
logger.warning("Cache initialized in null mode, caching disabled.")
34-
cache_config = {"CACHE_TYPE": "null", "CACHE_NO_NULL_WARNING": True}
32+
logger.warning("Cache initialized in null mode.")
33+
cache_config = {"CACHE_TYPE": "NullCache"}
34+
elif not host:
35+
logger.warning("Cache initialized in filesystem mode.")
36+
cache_config = {"CACHE_TYPE": "FileSystemCache", "CACHE_DIR": "/tmp/cache"}
3537
else:
3638
cache_config = {"CACHE_TYPE": "redis", "CACHE_REDIS_HOST": host}
3739
config_keys = ("CACHE_REDIS_PORT", "CACHE_REDIS_PASSWORD", "CACHE_REDIS_DB")

landoapi/cli.py

+47-3
Original file line numberDiff line numberDiff line change
@@ -68,20 +68,64 @@ def worker(celery_arguments):
6868
celery.worker_main((sys.argv[0],) + celery_arguments)
6969

7070

71-
@cli.command(name="landing-worker")
72-
def landing_worker():
71+
@cli.command(name="start-landing-worker")
72+
def start_landing_worker():
7373
from landoapi.app import auth0_subsystem, lando_ui_subsystem
74+
from landoapi.workers.landing_worker import LandingWorker
7475

7576
exclusions = [auth0_subsystem, lando_ui_subsystem]
7677
for system in get_subsystems(exclude=exclusions):
7778
system.ensure_ready()
7879

79-
from landoapi.workers.landing_worker import LandingWorker
80+
ConfigurationVariable.set(LandingWorker.STOP_KEY, VariableType.BOOL, "0")
8081

8182
worker = LandingWorker()
8283
worker.start()
8384

8485

86+
@cli.command(name="stop-landing-worker")
87+
def stop_landing_worker():
88+
from landoapi.workers.landing_worker import LandingWorker
89+
from landoapi.storage import db_subsystem
90+
91+
db_subsystem.ensure_ready()
92+
ConfigurationVariable.set(LandingWorker.STOP_KEY, VariableType.BOOL, "1")
93+
94+
95+
@cli.command(name="start-revision-worker")
96+
@click.argument("role")
97+
def start_revision_worker(role):
98+
from landoapi.app import auth0_subsystem, lando_ui_subsystem
99+
from landoapi.workers.revision_worker import RevisionWorker, Supervisor, Processor
100+
101+
roles = {
102+
"processor": Processor,
103+
"supervisor": Supervisor,
104+
}
105+
106+
if role not in roles:
107+
raise ValueError(f"Unknown worker role specified ({role}).")
108+
109+
exclusions = [auth0_subsystem, lando_ui_subsystem]
110+
for system in get_subsystems(exclude=exclusions):
111+
system.ensure_ready()
112+
113+
ConfigurationVariable.set(RevisionWorker.STOP_KEY, VariableType.BOOL, "0")
114+
115+
worker = roles[role]()
116+
worker.start()
117+
118+
119+
@cli.command(name="stop-revision-worker")
120+
def stop_revision_worker():
121+
"""Stops all revision workers (supervisor and processors)."""
122+
from landoapi.workers.revision_worker import RevisionWorker
123+
from landoapi.storage import db_subsystem
124+
125+
db_subsystem.ensure_ready()
126+
RevisionWorker.stop()
127+
128+
85129
@cli.command(name="run-pre-deploy-sequence")
86130
def run_pre_deploy_sequence():
87131
"""Runs the sequence of commands required before a deployment."""

landoapi/commit_message.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
IRC_NICK = r"[a-zA-Z0-9\-\_.]*[a-zA-Z0-9\-\_]+"
4646

4747
# fmt: off
48-
REVIEWERS_RE = re.compile( # noqa: E131
48+
REVIEWERS_RE = re.compile(
4949
r"([\s\(\.\[;,])" # before "r" delimiter
5050
+ r"(" + SPECIFIER + r")" # flag
5151
+ r"(" # capture all reviewers
@@ -209,3 +209,6 @@ def bug_list_to_commit_string(bug_ids: Iterable[str]) -> str:
209209
return "No bug"
210210

211211
return f"Bug {', '.join(sorted(set(bug_ids)))}"
212+
213+
214+
# flake8: noqa: E131

0 commit comments

Comments
 (0)