diff --git a/jobs/webcompat-kb/tests/test_bugzilla.py b/jobs/webcompat-kb/tests/test_bugzilla.py index 62935e34..50c12837 100644 --- a/jobs/webcompat-kb/tests/test_bugzilla.py +++ b/jobs/webcompat-kb/tests/test_bugzilla.py @@ -10,7 +10,6 @@ from webcompat_kb.bugzilla import BugzillaToBigQuery from webcompat_kb.bugzilla import extract_int_from_field from webcompat_kb.bugzilla import parse_string_to_json -from webcompat_kb.bugzilla import RELATION_CONFIG, LINK_FIELDS, ETP_RELATION_CONFIG def to_history(data: list[dict[str, Any]]) -> Mapping[int, list[History]]: @@ -999,13 +998,13 @@ def test_extract_int_from_field(): field = extract_int_from_field("P3") assert field == 3 - field = extract_int_from_field("critical") + field = extract_int_from_field("critical", value_map={"critical": 1}) assert field == 1 field = extract_int_from_field("--") assert field is None - field = extract_int_from_field("N/A") + field = extract_int_from_field("N/A", value_map={"n/a": None}) assert field is None field = extract_int_from_field("") diff --git a/jobs/webcompat-kb/webcompat_kb/bqhelpers.py b/jobs/webcompat-kb/webcompat_kb/bqhelpers.py new file mode 100644 index 00000000..98510f56 --- /dev/null +++ b/jobs/webcompat-kb/webcompat_kb/bqhelpers.py @@ -0,0 +1,19 @@ +from google.cloud import bigquery + + +def ensure_table( + client: bigquery.Client, bq_dataset_id: str, table_id: str, recreate: bool +) -> None: + table = bigquery.Table( + f"{client.project}.{bq_dataset_id}.{table_id}", + schema=[ + bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("who", "STRING", mode="REQUIRED"), + bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("score_delta", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("reasons", "STRING", mode="REPEATED"), + ], + ) + if recreate: + client.delete_table(table, not_found_ok=True) + client.create_table(table, exists_ok=True) diff --git a/jobs/webcompat-kb/webcompat_kb/bugzilla.py b/jobs/webcompat-kb/webcompat_kb/bugzilla.py index 0d19472d..5a93741e 100644 --- a/jobs/webcompat-kb/webcompat_kb/bugzilla.py +++ b/jobs/webcompat-kb/webcompat_kb/bugzilla.py @@ -1,4 +1,6 @@ import argparse +import enum +import json import logging import os import re @@ -7,14 +9,13 @@ Any, Iterable, Iterator, - Mapping, MutableMapping, - NamedTuple, Optional, - Sequence, - Union, + Self, cast, ) +from collections import defaultdict +from collections.abc import Sequence, Mapping from dataclasses import dataclass from datetime import datetime, timedelta @@ -22,22 +23,168 @@ from google.cloud import bigquery from .base import EtlJob +from .bqhelpers import ensure_table -Bug = Mapping[str, Any] -BugsById = Mapping[int, Bug] -MutBugsById = MutableMapping[int, Bug] -Relations = Mapping[str, list[Mapping[str, Any]]] -RelationConfig = Mapping[str, Mapping[str, Any]] +class BugFetchError(Exception): + pass -@dataclass + +@dataclass(frozen=True) +class Bug: + id: int + summary: str + status: str + resolution: str + product: str + component: str + creator: str + see_also: list[str] + depends_on: list[int] + blocks: list[int] + priority: Optional[int] + severity: Optional[int] + creation_time: datetime + assigned_to: Optional[str] + keywords: list[str] + url: str + user_story: str + last_resolved: Optional[datetime] + last_change_time: datetime + whiteboard: str + webcompat_priority: Optional[str] + webcompat_score: Optional[int] + + @property + def parsed_user_story(self) -> Mapping[str, Any]: + return parse_user_story(self.user_story) + + @property + def resolved(self) -> Optional[datetime]: + if self.status in {"RESOLVED", "VERIFIED"} and self.last_resolved: + return self.last_resolved + return None + + @classmethod + def from_bugzilla(cls, bug: bugdantic.bugzilla.Bug) -> Self: + assert bug.id is not None + assert bug.summary is not None + assert bug.status is not None + assert bug.resolution is not None + assert bug.product is not None + assert bug.component is not None + assert bug.creator is not None + assert bug.see_also is not None + assert bug.depends_on is not None + assert bug.blocks is not None + assert bug.priority is not None + assert bug.severity is not None + assert bug.creation_time is not None + assert bug.assigned_to is not None + assert bug.keywords is not None + assert bug.url is not None + assert bug.last_change_time is not None + assert bug.whiteboard is not None + assert bug.cf_user_story is not None + + return cls( + id=bug.id, + summary=bug.summary, + status=bug.status, + resolution=bug.resolution, + product=bug.product, + component=bug.component, + see_also=bug.see_also, + depends_on=bug.depends_on, + blocks=bug.blocks, + priority=extract_int_from_field( + bug.priority, + value_map={ + "--": None, + }, + ), + severity=extract_int_from_field( + bug.severity, + value_map={ + "n/a": None, + "--": None, + "blocker": 1, + "critical": 1, + "major": 2, + "normal": 3, + "minor": 4, + "trivial": 4, + "enhancement": 4, + }, + ), + creation_time=bug.creation_time, + assigned_to=bug.assigned_to + if bug.assigned_to != "nobody@mozilla.org" + else None, + keywords=bug.keywords, + url=bug.url, + user_story=bug.cf_user_story, + last_resolved=bug.cf_last_resolved, + last_change_time=bug.last_change_time, + whiteboard=bug.whiteboard, + creator=bug.creator, + webcompat_priority=( + bug.cf_webcompat_priority + if bug.cf_webcompat_priority != "---" + else None + ), + webcompat_score=extract_int_from_field( + bug.cf_webcompat_score, + value_map={ + "---": None, + "?": None, + }, + ), + ) + + def to_json(self) -> Mapping[str, Any]: + fields = {**vars(self)} + for key in fields: + if isinstance(fields[key], datetime): + fields[key] = fields[key].isoformat() + return fields + + @classmethod + def from_json(cls, bug_data: Mapping[str, Any]) -> Self: + return cls( + id=bug_data["id"], + summary=bug_data["summary"], + status=bug_data["status"], + resolution=bug_data["resolution"], + product=bug_data["product"], + component=bug_data["component"], + see_also=bug_data["see_also"], + depends_on=bug_data["depends_on"], + blocks=bug_data["blocks"], + priority=bug_data["priority"], + severity=bug_data["severity"], + creation_time=datetime.fromisoformat(bug_data["creation_time"]), + assigned_to=bug_data["assigned_to"], + keywords=bug_data["keywords"], + url=bug_data["url"], + user_story=bug_data["user_story"], + last_resolved=datetime.fromisoformat(bug_data["last_resolved"]) if bug_data["last_resolved"] is not None else None , + last_change_time=datetime.fromisoformat(bug_data["last_change_time"]), + whiteboard=bug_data["whiteboard"], + creator=bug_data["creator"], + webcompat_priority=bug_data["webcompat_priority"], + webcompat_score=bug_data["webcompat_score"], + ) + + +@dataclass(frozen=True) class BugHistoryChange: field_name: str added: str removed: str -@dataclass +@dataclass(frozen=True) class BugHistoryEntry: number: int who: str @@ -45,7 +192,8 @@ class BugHistoryEntry: changes: list[BugHistoryChange] -class HistoryRow(NamedTuple): +@dataclass(frozen=True) +class HistoryChange: number: int who: str change_time: datetime @@ -54,29 +202,43 @@ class HistoryRow(NamedTuple): removed: str -class BugFetchError(Exception): - pass +class PropertyChange(enum.StrEnum): + added = "added" + removed = "removed" -BUGZILLA_URL = "https://bugzilla.mozilla.org/" - -OTHER_BROWSER = ["bugs.chromium.org", "bugs.webkit.org", "crbug.com"] -STANDARDS_ISSUES = ["github.com/w3c", "github.com/whatwg", "github.com/wicg"] -STANDARDS_POSITIONS = ["standards-positions"] -INTERVENTIONS = ["github.com/mozilla-extensions/webcompat-addon"] -FIELD_MAP = { - "blocker": 1, - "critical": 1, - "major": 2, - "normal": 3, - "minor": 4, - "trivial": 4, - "enhancement": 4, - "n/a": None, - "--": None, -} +@dataclass(frozen=True) +class PropertyHistoryItem: + change_time: datetime + change: PropertyChange + + +class PropertyHistory: + """Representation of the history of a specific boolean property + (i.e. one that can be present or not)""" + + def __init__(self) -> None: + self.data: list[PropertyHistoryItem] = [] + + def __len__(self) -> int: + return len(self.data) + + def add(self, change_time: datetime, change: PropertyChange) -> None: + self.data.append(PropertyHistoryItem(change_time=change_time, change=change)) + + def missing_initial_add(self) -> bool: + """Check if the property was initially added""" + self.data.sort(key=lambda x: x.change_time) + return len(self.data) == 0 or self.data[0].change == PropertyChange.added + + +BugId = int +BugsById = Mapping[BugId, Bug] +MutBugsById = MutableMapping[BugId, Bug] + +HistoryByBug = Mapping[BugId, Sequence[BugHistoryEntry]] -FILTER_CONFIG = { +BUG_QUERIES: Mapping[str, dict[str, str]] = { "site_reports_wc": { "product": "Web Compatibility", "component": "Site Reports", @@ -147,91 +309,44 @@ class BugFetchError(Exception): }, } -RELATION_CONFIG = { - "core_bugs": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "core_bug", "type": "INTEGER", "mode": "REQUIRED"}, - ], - "source": "depends_on", - "store_id": "core", - }, - "breakage_reports": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "breakage_bug", "type": "INTEGER", "mode": "REQUIRED"}, - ], - "source": "blocks", - "store_id": "breakage", - }, - "interventions": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "code_url", "type": "STRING", "mode": "REQUIRED"}, - ], - "source": "see_also", - "condition": INTERVENTIONS, - }, - "other_browser_issues": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "issue_url", "type": "STRING", "mode": "REQUIRED"}, - ], - "source": "see_also", - "condition": OTHER_BROWSER, - }, - "standards_issues": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "issue_url", "type": "STRING", "mode": "REQUIRED"}, - ], - "source": "see_also", - "condition": STANDARDS_ISSUES, - }, - "standards_positions": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "discussion_url", "type": "STRING", "mode": "REQUIRED"}, - ], - "source": "see_also", - "condition": STANDARDS_POSITIONS, - }, -} -ETP_RELATION_CONFIG = { - "etp_breakage_reports": { - "fields": [ - {"name": "breakage_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "etp_meta_bug", "type": "INTEGER", "mode": "REQUIRED"}, - ], - "source": "depends_on", - "store_id": "breakage", - }, -} +@dataclass +class BugLinkConfig: + table_name: str + from_field_name: str + to_field_name: str + -LINK_FIELDS = ["other_browser_issues", "standards_issues", "standards_positions"] -PLATFORM_RELATION_CONFIG = {key: RELATION_CONFIG[key] for key in LINK_FIELDS} +@dataclass +class ExternalLinkConfig: + table_name: str + field_name: str + match_substrs: list[str] -def extract_int_from_field(field: Optional[str]) -> Optional[int]: - if field: - if field.lower() in FIELD_MAP: - return FIELD_MAP[field.lower()] +def extract_int_from_field( + field_value: Optional[str], value_map: Optional[Mapping[str, Optional[int]]] = None +) -> Optional[int]: + if field_value: + if value_map and field_value.lower() in value_map: + return value_map[field_value.lower()] - match = re.search(r"\d+", field) + match = re.search(r"\d+", field_value) if match: return int(match.group()) - + logging.warning( + f"Unexpected field value '{field_value}', could not convert to integer" + ) return None -def parse_string_to_json(input_string: str) -> Union[str, Mapping[str, Any]]: +def parse_user_story(input_string: str) -> Mapping[str, str | list[str]]: if not input_string: - return "" + return {} lines = input_string.splitlines() - result_dict: dict[str, Any] = {} + result_dict: dict[str, str | list[str]] = {} for line in lines: if line: @@ -239,39 +354,34 @@ def parse_string_to_json(input_string: str) -> Union[str, Mapping[str, Any]]: if len(key_value) == 2: key, value = key_value if key in result_dict: - if isinstance(result_dict[key], list): - result_dict[key].append(value) + current_value = result_dict[key] + if isinstance(current_value, list): + current_value.append(value) else: - result_dict[key] = [result_dict[key], value] + result_dict[key] = [current_value, value] else: result_dict[key] = value if not result_dict: - return "" + return {} return result_dict -class BugzillaToBigQuery: - def __init__( - self, - client: bigquery.Client, - bq_dataset_id: str, - bugzilla_api_key: Optional[str], - write: bool, - include_history: bool, - recreate_history: bool, - ): - bz_config = bugdantic.BugzillaConfig( - BUGZILLA_URL, bugzilla_api_key, allow_writes=write - ) - self.bz_client = bugdantic.Bugzilla(bz_config) - self.client = client - self.bq_dataset_id = bq_dataset_id - self.write = write - self.include_history = include_history - self.recreate_history = recreate_history +class BugCache(Mapping): + def __init__(self, bz_client: bugdantic.Bugzilla): + self.bz_client = bz_client + self.bugs: MutBugsById = {} + + def __getitem__(self, key: BugId) -> Bug: + return self.bugs[key] - def fetch_bugs(self, params: dict[str, str]) -> tuple[bool, MutBugsById]: + def __len__(self) -> int: + return len(self.bugs) + + def __iter__(self) -> Iterator[BugId]: + yield from self.bugs + + def bz_fetch_bugs(self, params: dict[str, str]) -> None: fields = [ "id", "summary", @@ -293,504 +403,293 @@ def fetch_bugs(self, params: dict[str, str]) -> tuple[bool, MutBugsById]: "cf_last_resolved", "last_change_time", "whiteboard", - "creator", "cf_webcompat_priority", "cf_webcompat_score", ] try: - bugs = self.bz_client.search(query=params, include_fields=fields) - data: MutBugsById = {} + bugs = self.bz_client.search( + query=params, include_fields=fields, page_size=200 + ) for bug in bugs: assert bug.id is not None - data[bug.id] = bug.to_dict() - fetch_completed = True + self.bugs[bug.id] = Bug.from_bugzilla(bug) except Exception as e: logging.error(f"Error: {e}") - fetch_completed = False - data = {} - - return fetch_completed, data - - def kb_bugs_from_platform_bugs( - self, - platform_bugs: BugsById, - kb_ids: set[int], - site_report_ids: set[int], - ) -> BugsById: - """Get a list of platform bugs that should also be considered knowledge base bugs - - These are platform bugs that aren't blocking an existing kb entry - """ - - filtered = {} - - for bug_id, source_bug in platform_bugs.items(): - # Check if the platform bug already has a kb entry and skip if so - if any(blocked_id in kb_ids for blocked_id in source_bug["blocks"]): - continue - - bug = {**source_bug} - - # Only store a breakage bug as it's the relation we care about - bug["blocks"] = [ - blocked_id - for blocked_id in bug["blocks"] - if blocked_id in site_report_ids - ] - - # Don't store bugs that platform bug depends on - bug["depends_on"] = [] - - filtered[bug_id] = bug - - return filtered - - def unify_etp_dependencies( - self, - etp_reports: BugsById, - etp_dependencies: BugsById, - ) -> BugsById: - """Unify blocked and depends_on for each ETP bug as their dependencies are inconsistent, - keep only ETP meta bugs and store them only in depends_on field.""" - - filtered = {} - - for bug_id, source_bug in etp_reports.items(): - bug = {**source_bug} - - blocks = [ - blocked_id - for blocked_id in bug["blocks"] - if blocked_id in etp_dependencies - and "meta" in etp_dependencies[blocked_id]["keywords"] - ] - - depends = [ - depends_id - for depends_id in bug["depends_on"] - if depends_id in etp_dependencies - and "meta" in etp_dependencies[depends_id]["keywords"] - ] - - bug["depends_on"] = blocks + depends - bug["blocks"] = [] - - filtered[bug_id] = bug - - return filtered - - def chunked_list(self, data: list[int], size: int) -> Iterator[list[int]]: - for i in range(0, len(data), size): - yield data[i : i + size] - - def fetch_by_id(self, bug_ids: set[int]) -> tuple[bool, MutBugsById]: - chunk_size = 400 - all_bugs: dict[int, Bug] = {} - all_completed = True + raise - for chunk in self.chunked_list(list(bug_ids), chunk_size): - logging.info(f"Fetching {len(chunk)} bugs") - - completed, bugs = self.fetch_bugs({"id": ",".join(map(str, chunk))}) - if completed: - all_bugs.update(bugs) - else: - all_completed = False - break - - return all_completed, all_bugs - - def fetch_related_bugs( - self, bugs: BugsById, relations: list[str], all_bugs: MutBugsById - ) -> tuple[bool, MutBugsById]: - related_ids: set[int] = set() - related_bugs: dict[int, Bug] = {} + def fetch_missing_relations(self, bugs: BugsById, relation: str) -> int: + related_ids: set[str] = set() for bug in bugs.values(): - for relation_property in relations: - related_ids |= set( - bug_id - for bug_id in bug[relation_property] - if bug_id not in all_bugs - ) - related_bugs.update( - { - bug_id: all_bugs[bug_id] - for bug_id in bug[relation_property] - if bug_id in all_bugs - } - ) - - completed, fetched_bugs = self.fetch_by_id(related_ids) - all_bugs.update(fetched_bugs) - related_bugs.update(fetched_bugs) - return completed, related_bugs + related_ids |= { + str(bug_id) for bug_id in getattr(bug, relation) if bug_id not in self + } - def filter_core_bugs(self, bugs: BugsById) -> BugsById: - return { - bug_id: bug - for bug_id, bug in bugs.items() - if bug["product"] != "Web Compatibility" - } + if related_ids: + self.bz_fetch_bugs({"id": ",".join(related_ids)}) + return len(related_ids) - def fetch_all_bugs( - self, - ) -> Optional[ - tuple[ - MutBugsById, MutBugsById, MutBugsById, MutBugsById, MutBugsById, MutBugsById - ] - ]: - """Get all the bugs that should be imported into BigQuery. - - :returns: A tuple of (all bugs, site report bugs, knowledge base bugs, - core bugs, ETP report bugs, ETP dependencies).""" - fetched_bugs = {} - all_bugs: dict[int, Bug] = {} - - for category, filter_config in FILTER_CONFIG.items(): - logging.info(f"Fetching {category} bugs") - completed, fetched_bugs[category] = self.fetch_bugs(filter_config) - all_bugs.update(fetched_bugs[category]) - if not completed: - return None - - site_reports = fetched_bugs["site_reports_wc"] - site_reports.update(fetched_bugs["site_reports_other"]) - - logging.info("Fetching site-report blocking bugs") - completed_site_report_deps, site_reports_deps = self.fetch_related_bugs( - site_reports, ["depends_on"], all_bugs - ) + def into_mapping(self) -> BugsById: + """Convert the data into a plain dict. - if not completed_site_report_deps: - logging.error("Failed to fetch site report blocking bugs") - return None + Also reset this object, so we aren't sharing the state between multiple places + """ + bugs = self.bugs + self.bugs = {} + return bugs + + +def is_site_report(bug: Bug) -> bool: + return (bug.product == "Web Compatibility" and bug.component == "Site Reports") or ( + bug.product != "Web Compatibility" and "webcompat:site-report" in bug.keywords + ) + + +def is_etp_report(bug: Bug) -> bool: + return ( + bug.product == "Web Compatibility" and bug.component == "Privacy: Site Reports" + ) + + +def is_kb_entry(bug: Bug) -> bool: + """Get things that are directly in the knowledge base. + + This doesn't include core bugs that should be considered part of the knowledge base + because they directly block a platform bug.""" + return bug.product == "Web Compatibility" and bug.component == "Knowledge Base" + + +def is_webcompat_platform_bug(bug: Bug) -> bool: + """Check if a bug is a platform bug . + + These are only actually in the kb if they also block a site report""" + return ( + bug.product != "Web Compatibility" and "webcompat:platform-bug" in bug.keywords + ) + + +def is_parity_bug(bug: Bug) -> bool: + return any(item.startswith("parity-") for item in bug.keywords) + + +def get_links( + all_bugs: BugsById, source_bugs: set[BugId], config: ExternalLinkConfig +) -> Mapping[BugId, list[str]]: + rv: defaultdict[int, list[str]] = defaultdict(list) + for bug_id in source_bugs: + bug = all_bugs[bug_id] + for entry in bug.see_also: + if any(substr in entry for substr in config.match_substrs): + rv[bug_id].append(entry) + return rv + + +def get_kb_bug_core_bugs( + all_bugs: BugsById, kb_bugs: set[BugId], platform_bugs: set[BugId] +) -> Mapping[BugId, set[BugId]]: + rv = defaultdict(set) + for kb_id in kb_bugs: + if kb_id not in platform_bugs: + for bug_id in all_bugs[kb_id].depends_on: + if bug_id in platform_bugs: + rv[kb_id].add(bug_id) + return rv + + +def get_kb_bug_site_report( + all_bugs: BugsById, kb_bugs: set[BugId], site_report_bugs: set[BugId] +) -> Mapping[BugId, set[BugId]]: + rv = defaultdict(set) + for kb_id in kb_bugs: + if kb_id in site_report_bugs: + rv[kb_id].add(kb_id) + for bug_id in all_bugs[kb_id].blocks: + if bug_id in site_report_bugs: + rv[kb_id].add(bug_id) + return rv + + +def get_etp_breakage_reports( + all_bugs: BugsById, etp_reports: set[BugId] +) -> Mapping[BugId, set[BugId]]: + rv = {} + for bug_id in etp_reports: + report_bug = all_bugs[bug_id] + meta_bugs = { + meta_id + for meta_id in report_bug.depends_on + report_bug.blocks + if "meta" in all_bugs[meta_id].keywords + } + if meta_bugs: + rv[bug_id] = meta_bugs + return rv - kb_bugs = fetched_bugs["knowledge_base"] - logging.info("Fetching blocking bugs for KB bugs") - completed_platform_bugs, kb_deps = self.fetch_related_bugs( - kb_bugs, ["depends_on"], all_bugs - ) +def fetch_all_bugs( + bz_client: bugdantic.Bugzilla, +) -> BugsById: + """Get all the bugs that should be imported into BigQuery. - if not completed: - logging.error("Failed to fetch blocking bugs") - return None + :returns: A tuple of (all bugs, site report bugs, knowledge base bugs, + core bugs, ETP report bugs, ETP dependencies).""" - platform_bugs = fetched_bugs["platform_bugs"] - platform_bugs.update(self.filter_core_bugs(site_reports_deps)) - platform_bugs.update(self.filter_core_bugs(kb_deps)) + bug_cache = BugCache(bz_client) - etp_reports = fetched_bugs["site_reports_etp"] + for category, filter_config in BUG_QUERIES.items(): + logging.info(f"Fetching {category} bugs") + bug_cache.bz_fetch_bugs(filter_config) - etp_completed, etp_dependencies = self.fetch_related_bugs( - etp_reports, ["depends_on", "blocks"], all_bugs + fetch_count = -1 + while fetch_count != 0: + # Get all blocking bugs for site reports or kb entries or etp site reports + # This can take more than one iteration if dependencies themselves turn out + # to be site reports that were excluded by a the date cutoff + fetch_count = bug_cache.fetch_missing_relations( + { + bug_id: bug + for bug_id, bug in bug_cache.items() + if is_site_report(bug) or is_kb_entry(bug) or is_etp_report(bug) + }, + "depends_on", ) - - if not etp_completed: - logging.error("Failed to fetch etp blocking bugs") - return None - - return ( - all_bugs, - site_reports, - kb_bugs, - platform_bugs, - etp_reports, - etp_dependencies, + fetch_count += bug_cache.fetch_missing_relations( + {bug_id: bug for bug_id, bug in bug_cache.items() if is_etp_report(bug)}, + "blocks", ) + if fetch_count: + logging.info(f"Fetched {fetch_count} related bugs") - def process_relations( - self, bugs: BugsById, relation_config: RelationConfig - ) -> tuple[Mapping[int, Mapping[str, list[int | str]]], Mapping[str, set[int]]]: - """Build relationship tables based on information in the bugs. + return bug_cache.into_mapping() - :returns: A mapping {bug_id: {relationship name: [related items]}} and - a mapping {store id: {bug ids}} - """ - # The types here are wrong; the return values are lists of ints or lists of strings but not both. - # However enforcing that property is hard without building specific types for the two cases - relations: dict[int, dict[str, list[int | str]]] = {} - related_bug_ids: dict[str, set[int]] = {} - for config in relation_config.values(): - if "store_id" in config: - related_bug_ids[config["store_id"]] = set() - - for bug_id, bug in bugs.items(): - relations[bug_id] = {rel: [] for rel in relation_config.keys()} +class BugHistoryUpdater: + def __init__( + self, + bq_client: bigquery.Client, + bq_dataset_id: str, + bz_client: bugdantic.Bugzilla, + recreate: bool = False, + ): + self.bq_client = bq_client + self.bq_dataset_id = bq_dataset_id + self.bz_client = bz_client + self.recreate = recreate - for rel, config in relation_config.items(): - related_items = bug[config["source"]] + def run(self, all_bugs: BugsById) -> HistoryByBug: + existing_records = self.bigquery_fetch_history(all_bugs.keys()) - for item in related_items: - if "condition" in config and not any( - c in item for c in config["condition"] - ): - continue + new_bugs, existing_bugs = self.group_bugs(all_bugs) - relations[bug_id][rel].append(item) + new_bugs_history = self.missing_records( + existing_records, self.new_bugs_history(new_bugs) + ) + existing_bugs_history = self.missing_records( + existing_records, self.existing_bugs_history(existing_bugs) + ) - if config.get("store_id"): - assert isinstance(item, int) - related_bug_ids[config["store_id"]].add(item) + if not (new_bugs_history or existing_bugs_history): + logging.info("No relevant history updates") + return {} - return relations, related_bug_ids + return self.merge_history(existing_bugs_history, new_bugs_history) - def add_kb_entry_breakage( - self, - kb_data: Mapping[int, Mapping[str, list[int | str]]], - kb_dep_ids: Mapping[str, set[int]], - site_reports: BugsById, - ) -> None: - """Add breakage relations for bugs that are both kb entries and also site reports - - If a core bug has the webcompat:platform-bug keyword it's a kb entry. - If it also has the webcompat:site-report keyword it's a site report. - In this case we want the bug to reference itself in the breakage_reports table.""" - for bug_id in set(kb_data.keys()) & set(site_reports.keys()): - if bug_id not in kb_dep_ids["breakage"]: - kb_data[bug_id]["breakage_reports"].append(bug_id) - kb_dep_ids["breakage"].add(bug_id) - - def fetch_missing_deps( - self, all_bugs: BugsById, kb_dep_ids: Mapping[str, set[int]] - ) -> Optional[tuple[BugsById, BugsById]]: - dep_ids = {item for sublist in kb_dep_ids.values() for item in sublist} - - # Check for missing bugs - missing_ids = dep_ids - set(all_bugs.keys()) - - if missing_ids: - logging.info( - "Fetching missing core bugs and breakage reports from Bugzilla" - ) - completed, missing_bugs = self.fetch_bugs( - {"id": ",".join(map(str, missing_ids))} - ) - if not completed: - return None - - # Separate core bugs for updating relations. - core_dependenies = set(kb_dep_ids.get("core", set())) - core_missing = { - bug_id: bug - for bug_id, bug in missing_bugs.items() - if bug_id in core_dependenies - } + def group_bugs(self, all_bugs: BugsById) -> tuple[BugsById, BugsById]: + all_ids = set(all_bugs.keys()) + if not self.recreate: + existing_ids = self.bigquery_fetch_imported_ids() + new_ids = all_ids - existing_ids else: - missing_bugs, core_missing = {}, {} - - return missing_bugs, core_missing - - def add_links( - self, - kb_processed: Mapping[int, Mapping[str, list[int | str]]], - dep_processed: Mapping[int, Mapping[str, list[int | str]]], - ) -> Mapping[int, Mapping[str, list[int | str]]]: - """Create links between kb entries and external data such - as standards issues.""" - result = {**kb_processed} - - for kb_bug_id in result: - for core_bug_id in result[kb_bug_id]["core_bugs"]: - assert isinstance(core_bug_id, int) - for sub_key in LINK_FIELDS: - if sub_key in result[kb_bug_id] and sub_key in dep_processed.get( - core_bug_id, {} - ): - for link_item in dep_processed[core_bug_id][sub_key]: - if link_item not in result[kb_bug_id][sub_key]: - result[kb_bug_id][sub_key].append(link_item) - - return result + new_ids = all_ids - def build_relations( - self, bugs: BugsById, relation_config: RelationConfig - ) -> Relations: - relations: dict[str, list[Mapping[str, Any]]] = { - key: [] for key in relation_config.keys() + new_bugs = { + bug_id: bug for bug_id, bug in all_bugs.items() if bug_id in new_ids } - - for bug_id, bug in bugs.items(): - for field_key, items in bug.items(): - fields = relation_config[field_key]["fields"] - - for row in items: - relation_row = {fields[0]["name"]: bug_id, fields[1]["name"]: row} - relations[field_key].append(relation_row) - - return relations - - def convert_bug_data(self, bug: Bug) -> dict[str, Any]: - resolved = None - if bug["status"] in ["RESOLVED", "VERIFIED"] and bug["cf_last_resolved"]: - resolved = bug["cf_last_resolved"] - - user_story = parse_string_to_json(bug["cf_user_story"]) - - assigned_to = ( - bug["assigned_to"] if bug["assigned_to"] != "nobody@mozilla.org" else None - ) - webcompat_priority = ( - bug.get("cf_webcompat_priority") - if bug.get("cf_webcompat_priority") != "---" - else None - ) - - return { - "number": bug["id"], - "title": bug["summary"], - "status": bug["status"], - "resolution": bug["resolution"], - "product": bug["product"], - "component": bug["component"], - "creator": bug["creator"], - "severity": extract_int_from_field(bug["severity"]), - "priority": extract_int_from_field(bug["priority"]), - "creation_time": bug["creation_time"].isoformat(), - "assigned_to": assigned_to, - "keywords": bug["keywords"], - "url": bug["url"], - "user_story": user_story, - "user_story_raw": bug["cf_user_story"], - "resolved_time": resolved.isoformat() if resolved is not None else None, - "whiteboard": bug["whiteboard"], - "webcompat_priority": webcompat_priority, - "webcompat_score": extract_int_from_field(bug.get("cf_webcompat_score")), + existing_bugs = { + bug_id: bug for bug_id, bug in all_bugs.items() if bug_id not in new_ids } + return new_bugs, existing_bugs - def update_bugs(self, bugs: BugsById) -> None: - res = [self.convert_bug_data(bug) for bug in bugs.values()] - - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - schema=[ - bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("title", "STRING", mode="REQUIRED"), - bigquery.SchemaField("status", "STRING", mode="REQUIRED"), - bigquery.SchemaField("resolution", "STRING", mode="REQUIRED"), - bigquery.SchemaField("product", "STRING", mode="REQUIRED"), - bigquery.SchemaField("component", "STRING", mode="REQUIRED"), - bigquery.SchemaField("creator", "STRING", mode="REQUIRED"), - bigquery.SchemaField("severity", "INTEGER"), - bigquery.SchemaField("priority", "INTEGER"), - bigquery.SchemaField("creation_time", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField("assigned_to", "STRING"), - bigquery.SchemaField("keywords", "STRING", mode="REPEATED"), - bigquery.SchemaField("url", "STRING"), - bigquery.SchemaField("user_story", "JSON"), - bigquery.SchemaField("user_story_raw", "STRING"), - bigquery.SchemaField("resolved_time", "TIMESTAMP"), - bigquery.SchemaField("whiteboard", "STRING"), - bigquery.SchemaField("webcompat_priority", "STRING"), - bigquery.SchemaField("webcompat_score", "INTEGER"), - ], - write_disposition="WRITE_TRUNCATE", - ) - - bugs_table = f"{self.bq_dataset_id}.bugzilla_bugs" + def merge_history(self, *sources: HistoryByBug) -> HistoryByBug: + history: defaultdict[int, list[BugHistoryEntry]] = defaultdict(list) + for source in sources: + for bug_id, changes in source.items(): + history[bug_id].extend(changes) + return history - job = self.client.load_table_from_json( - res, - bugs_table, - job_config=job_config, - ) + def new_bugs_history(self, new_bugs: BugsById) -> HistoryByBug: + history = self.bugzilla_fetch_history(new_bugs.keys()) + synthetic_history = self.create_synthetic_history(new_bugs, history) + return self.merge_history(history, synthetic_history) - logging.info("Writing to `bugzilla_bugs` table") + def existing_bugs_history(self, existing_bugs: BugsById) -> HistoryByBug: + last_import_time = self.bigquery_last_import() - try: - job.result() - except Exception as e: - print(f"ERROR: {e}") - if job.errors: - for error in job.errors: - logging.error(error) + if last_import_time is None: + logging.info("No previous history update found") + return {} - table = self.client.get_table(bugs_table) - logging.info(f"Loaded {table.num_rows} rows into {table}") + updated_bugs = { + bug_id + for bug_id, bug in existing_bugs.items() + if bug.last_change_time > last_import_time + } - def update_kb_ids(self, ids: Iterable[int]) -> None: - res = [{"number": kb_id} for kb_id in ids] + if not updated_bugs: + logging.info(f"No updated bygs since {last_import_time.isoformat()}") + return {} - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - schema=[ - bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), - ], - write_disposition="WRITE_TRUNCATE", + logging.info( + f"Fetching bugs {updated_bugs} updated since {last_import_time.isoformat()}" ) - kb_bugs_table = f"{self.bq_dataset_id}.kb_bugs" + bugs_full_history = self.bugzilla_fetch_history(updated_bugs) + # Filter down to only recent updates, since we always get the full history + bugs_history = {} + for bug_id, bug_full_history in bugs_full_history.items(): + bug_history = [ + item for item in bug_full_history if item.change_time > last_import_time + ] + if bug_history: + bugs_history[bug_id] = bug_history - job = self.client.load_table_from_json( - res, - kb_bugs_table, - job_config=job_config, - ) + return bugs_history - logging.info("Writing to `kb_bugs` table") + def missing_records( + self, existing_records: HistoryByBug, updates: HistoryByBug + ) -> HistoryByBug: + if not existing_records: + return updates - try: - job.result() - except Exception as e: - print(f"ERROR: {e}") - if job.errors: - for error in job.errors: - logging.error(error) + existing_history = set(self.flatten_history(existing_records)) + new_history = set(self.flatten_history(updates)) - table = self.client.get_table(kb_bugs_table) - logging.info(f"Loaded {table.num_rows} rows into {table}") + diff = new_history - existing_history - def update_relations( - self, relations: Relations, relation_config: RelationConfig - ) -> None: - for key, value in relations.items(): - if value: - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - schema=[ - bigquery.SchemaField( - item["name"], item["type"], mode=item["mode"] - ) - for item in relation_config[key]["fields"] - ], - write_disposition="WRITE_TRUNCATE", - ) + return self.unflatten_history(item for item in new_history if item in diff) - relation_table = f"{self.bq_dataset_id}.{key}" - job = self.client.load_table_from_json( - cast(Iterable[dict[str, Any]], value), - relation_table, - job_config=job_config, - ) - - logging.info(f"Writing to `{relation_table}` table") + def bigquery_fetch_imported_ids(self) -> set[int]: + query = f""" + SELECT number + FROM `{self.bq_dataset_id}.bugzilla_bugs` + """ + res = self.bq_client.query(query).result() + rows = list(res) - try: - job.result() - except Exception as e: - print(f"ERROR: {e}") - if job.errors: - for error in job.errors: - logging.error(error) + imported_ids = {bug["number"] for bug in rows} - table = self.client.get_table(relation_table) - logging.info(f"Loaded {table.num_rows} rows into {table}") + return imported_ids - def get_last_import_datetime(self) -> Optional[datetime]: + def bigquery_last_import(self) -> Optional[datetime]: query = f""" SELECT MAX(run_at) AS last_run_at FROM `{self.bq_dataset_id}.import_runs` WHERE is_history_fetch_completed = TRUE """ - res = self.client.query(query).result() + res = self.bq_client.query(query).result() row = list(res)[0] return row["last_run_at"] - def fetch_bugs_history( - self, ids: Iterable[int] - ) -> tuple[Mapping[int, list[bugdantic.bugzilla.History]], bool]: + def bugzilla_fetch_history(self, ids: Iterable[int]) -> HistoryByBug: history: dict[int, list[bugdantic.bugzilla.History]] = {} chunk_size = 100 ids_list = list(ids) @@ -836,161 +735,19 @@ def fetch_bugs_history( else: break - completed = len(ids_list) == 0 - return history, completed - - def serialize_history_entry(self, entry: BugHistoryEntry) -> dict[str, Any]: - return { - "number": entry.number, - "who": entry.who, - "change_time": entry.change_time.isoformat(), - "changes": [ - { - "field_name": change.field_name, - "added": change.added, - "removed": change.removed, - } - for change in entry.changes - ], - } - - def update_history( - self, records: list[BugHistoryEntry], recreate: bool = False - ) -> None: - if not records and not recreate: - logging.info("No history records to update") - return - - write_disposition = "WRITE_APPEND" if not recreate else "WRITE_TRUNCATE" - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - schema=[ - bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("who", "STRING", mode="REQUIRED"), - bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField( - "changes", - "RECORD", - mode="REPEATED", - fields=[ - bigquery.SchemaField("field_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("added", "STRING", mode="REQUIRED"), - bigquery.SchemaField("removed", "STRING", mode="REQUIRED"), - ], - ), - ], - write_disposition=write_disposition, - ) - - history_table = f"{self.bq_dataset_id}.bugs_history" - - job = self.client.load_table_from_json( - (self.serialize_history_entry(item) for item in records), - history_table, - job_config=job_config, - ) - - logging.info("Writing to `bugs_history` table") - - try: - job.result() - except Exception as e: - print(f"ERROR: {e}") - if job.errors: - for error in job.errors: - logging.error(error) - - table = self.client.get_table(history_table) - logging.info(f"Loaded {len(records)} rows into {table}") - - def get_existing_history_records_by_ids( - self, bug_ids: Iterable[int] - ) -> list[BugHistoryEntry]: - formatted_numbers = ", ".join(str(bug_id) for bug_id in bug_ids) - - query = f""" - SELECT * - FROM `{self.bq_dataset_id}.bugs_history` - WHERE number IN ({formatted_numbers}) - """ - result = self.client.query(query).result() - return [ - BugHistoryEntry( - row["number"], - row["who"], - row["change_time"], - changes=[ - BugHistoryChange( - change["field_name"], change["added"], change["removed"] - ) - for change in row["changes"] - ], - ) - for row in result - ] - - def flatten_history(self, records: Iterable[BugHistoryEntry]) -> list[HistoryRow]: - history = [] - for record in records: - for change in record.changes: - history_row = HistoryRow( - record.number, - record.who, - record.change_time, - change.field_name, - change.added, - change.removed, - ) - history.append(history_row) - - return history - - def unflatten_history(self, diff: Sequence[HistoryRow]) -> list[BugHistoryEntry]: - changes: dict[tuple[int, str, datetime], BugHistoryEntry] = {} - for item in diff: - key = (item.number, item.who, item.change_time) - - if key not in changes: - changes[key] = BugHistoryEntry( - number=item.number, - who=item.who, - change_time=item.change_time, - changes=[], - ) - changes[key].changes.append( - BugHistoryChange( - field_name=item.field_name, - added=item.added, - removed=item.removed, - ) + if len(ids_list) != 0: + raise BugFetchError( + f"Failed to fetch bug history for {','.join(str(item) for item in ids_list)}" ) - return list(changes.values()) + return self.bugzilla_to_history_entry(history) - def filter_only_unsaved_changes( - self, history_updates: list[BugHistoryEntry], bug_ids: set[int] - ) -> list[BugHistoryEntry]: - existing_records = self.get_existing_history_records_by_ids(bug_ids) - - if not existing_records: - return history_updates - - existing_history = self.flatten_history(existing_records) - new_history = self.flatten_history(history_updates) - - diff = set(new_history) - set(existing_history) - - return self.unflatten_history([item for item in new_history if item in diff]) - - def extract_history_fields( + def bugzilla_to_history_entry( self, updated_history: Mapping[int, list[bugdantic.bugzilla.History]] - ) -> tuple[list[BugHistoryEntry], set[int]]: - result = [] - bug_ids = set() + ) -> HistoryByBug: + rv: defaultdict[int, list[BugHistoryEntry]] = defaultdict(list) for bug_id, history in updated_history.items(): - filtered_changes = [] - for record in history: relevant_changes = [ BugHistoryChange( @@ -1010,257 +767,340 @@ def extract_history_fields( change_time=record.when, changes=relevant_changes, ) - filtered_changes.append(filtered_record) - bug_ids.add(bug_id) - - if filtered_changes: - result.extend(filtered_changes) - - return result, bug_ids - - def filter_relevant_history( - self, updated_history: Mapping[int, list[bugdantic.bugzilla.History]] - ) -> list[BugHistoryEntry]: - only_unsaved_changes = [] - result, bug_ids = self.extract_history_fields(updated_history) - - if result: - only_unsaved_changes = self.filter_only_unsaved_changes(result, bug_ids) - - return only_unsaved_changes + rv[bug_id].append(filtered_record) - def get_bugs_updated_since_last_import( - self, all_bugs: BugsById, last_import_time: datetime - ) -> set[int]: - return { - bug["id"] - for bug in all_bugs.values() - if bug["last_change_time"] > last_import_time - } + return rv - def get_imported_ids(self) -> set[int]: + def bigquery_fetch_history(self, bug_ids: Iterable[int]) -> HistoryByBug: + rv: defaultdict[int, list[BugHistoryEntry]] = defaultdict(list) + formatted_numbers = ", ".join(str(bug_id) for bug_id in bug_ids) query = f""" - SELECT number - FROM `{self.bq_dataset_id}.bugzilla_bugs` - """ - res = self.client.query(query).result() - rows = list(res) - - imported_ids = {bug["number"] for bug in rows} - - return imported_ids - - def create_keyword_map( - self, history: list[BugHistoryEntry] - ) -> Mapping[int, Mapping[str, Mapping[str, list[datetime]]]]: - keyword_history: dict[int, dict[str, dict[str, list[datetime]]]] = {} - - for record in history: - bug_id = record.number - timestamp = record.change_time - - for change in record.changes: - if change.field_name == "keywords": - if bug_id not in keyword_history: - keyword_history[bug_id] = {"added": {}, "removed": {}} - - keyword_records = keyword_history[bug_id] - - for action in ["added", "removed"]: - keywords = getattr(change, action) - if keywords: - for keyword in keywords.split(", "): - if keyword not in keyword_records[action]: - keyword_records[action][keyword] = [] + SELECT * + FROM `{self.bq_dataset_id}.bugs_history` + WHERE number IN ({formatted_numbers}) + """ + result = self.bq_client.query(query).result() + for row in result: + rv[row["number"]].append( + BugHistoryEntry( + row["number"], + row["who"], + row["change_time"], + changes=[ + BugHistoryChange( + change["field_name"], change["added"], change["removed"] + ) + for change in row["changes"] + ], + ) + ) + return rv + + def keyword_history( + self, history: HistoryByBug + ) -> Mapping[BugId, Mapping[str, PropertyHistory]]: + """Get the time each keyword has been added and removed from each bug""" + keyword_history: defaultdict[int, dict[str, PropertyHistory]] = defaultdict( + dict + ) - keyword_records[action][keyword].append(timestamp) + for bug_id, records in history.items(): + for record in records: + for change in record.changes: + if change.field_name == "keywords": + for src, change_type in [ + (change.added, PropertyChange.added), + (change.removed, PropertyChange.removed), + ]: + for keyword in src.split(", "): + if keyword not in keyword_history[bug_id]: + keyword_history[bug_id][keyword] = PropertyHistory() + keyword_history[bug_id][keyword].add( + change_time=record.change_time, change=change_type + ) return keyword_history - def is_removed_earliest( - self, added_times: list[datetime], removed_times: list[datetime] - ) -> bool: - events = [(at, "added") for at in added_times] + [ - (rt, "removed") for rt in removed_times - ] - events.sort() - - if not events: - return False - - return events[0][1] == "removed" - def get_missing_keywords( self, bug_id: int, current_keywords: list[str], - keyword_history: Mapping[int, Mapping[str, Mapping[str, list[datetime]]]], - ) -> list[str]: - missing_keywords = [] + keyword_history: Mapping[BugId, Mapping[str, PropertyHistory]], + ) -> set[str]: + missing_keywords = set() # Check if keyword exists, but is not in "added" history for keyword in current_keywords: - if bug_id not in keyword_history or keyword not in keyword_history[ - bug_id - ].get("added", {}): - if keyword not in missing_keywords: - missing_keywords.append(keyword) + if bug_id not in keyword_history or keyword not in keyword_history[bug_id]: + missing_keywords.add(keyword) # Check for keywords that have "removed" record as the earliest # event in the sorted timeline if bug_id in keyword_history: - for keyword, removed_times in ( - keyword_history[bug_id].get("removed", {}).items() - ): - added_times = keyword_history[bug_id].get("added", {}).get(keyword, []) - - removed_earliest = self.is_removed_earliest(added_times, removed_times) - - if removed_earliest and keyword not in missing_keywords: - missing_keywords.append(keyword) + for keyword, history in keyword_history[bug_id].items(): + if history.missing_initial_add(): + missing_keywords.add(keyword) return missing_keywords - def build_missing_history( - self, bugs_without_history: Iterable[tuple[Bug, list[str]]] - ) -> list[BugHistoryEntry]: - result: list[BugHistoryEntry] = [] - for bug, missing_keywords in bugs_without_history: - record = BugHistoryEntry( - number=bug["id"], - who=bug["creator"], - change_time=bug["creation_time"], - changes=[ - BugHistoryChange( - added=", ".join(missing_keywords), - field_name="keywords", - removed="", - ) - ], - ) - result.append(record) - return result - def create_synthetic_history( - self, bugs: BugsById, history: list[BugHistoryEntry] - ) -> list[BugHistoryEntry]: - keyword_history = self.create_keyword_map(history) + self, all_bugs: BugsById, history: HistoryByBug + ) -> HistoryByBug: + """Backfill history entries for bug creation. - bugs_without_history = [] - - for bug_id, bug in bugs.items(): - current_keywords = bug["keywords"] + If a bug has keywords set, but there isn't a history entry corresponding + to the keyword being added, we assume they were set on bug creation, and + create a history entry to represent that.""" + result: dict[int, list[BugHistoryEntry]] = {} + keyword_history = self.keyword_history(history) + for bug_id, bug in all_bugs.items(): missing_keywords = self.get_missing_keywords( - bug_id, current_keywords, keyword_history + bug_id, bug.keywords, keyword_history ) if missing_keywords: - bugs_without_history.append((bug, missing_keywords)) + record = BugHistoryEntry( + number=bug.id, + who=bug.creator, + change_time=bug.creation_time, + changes=[ + BugHistoryChange( + added=", ".join(missing_keywords), + field_name="keywords", + removed="", + ) + ], + ) + result[bug.id] = [record] + return result - return self.build_missing_history(bugs_without_history) + def flatten_history(self, history: HistoryByBug) -> Iterable[HistoryChange]: + for records in history.values(): + for record in records: + for change in record.changes: + yield HistoryChange( + record.number, + record.who, + record.change_time, + change.field_name, + change.added, + change.removed, + ) - def fetch_history_for_new_bugs( - self, all_bugs: BugsById, recreate: bool = False - ) -> tuple[list[BugHistoryEntry], set[int], bool]: - only_unsaved_changes: list[BugHistoryEntry] = [] + def unflatten_history(self, diff: Iterable[HistoryChange]) -> HistoryByBug: + changes: dict[tuple[int, str, datetime], BugHistoryEntry] = {} + for item in diff: + key = (item.number, item.who, item.change_time) - all_ids = set(all_bugs.keys()) - if not recreate: - existing_ids = self.get_imported_ids() - new_ids = all_ids - existing_ids - else: - new_ids = all_ids + if key not in changes: + changes[key] = BugHistoryEntry( + number=item.number, + who=item.who, + change_time=item.change_time, + changes=[], + ) + changes[key].changes.append( + BugHistoryChange( + field_name=item.field_name, + added=item.added, + removed=item.removed, + ) + ) - logging.info(f"Fetching new bugs history: {list(new_ids)}") + rv: dict[int, list[BugHistoryEntry]] = {} + for change in changes.values(): + if change.number not in rv: + rv[change.number] = [] + rv[change.number].append(change) + return rv - new_bugs = { - bug_id: bug for bug_id, bug in all_bugs.items() if bug_id in new_ids - } - bugs_history, completed = self.fetch_bugs_history(new_bugs.keys()) - if not completed: - return only_unsaved_changes, new_ids, False +class BigQueryImporter: + """Class to handle all writes to BigQuery""" - history, _ = self.extract_history_fields(bugs_history) + def __init__(self, client: bigquery.Client, bq_dataset_id: str, write: bool): + self.client = client + self.bq_dataset_id = bq_dataset_id + self.write = write - synthetic_history = self.create_synthetic_history(new_bugs, history) + def write_table( + self, + table: str, + schema: list[bigquery.SchemaField], + rows: Sequence[Mapping[str, Any]], + overwrite: bool, + ) -> None: + ensure_table(self.client, self.bq_dataset_id, table, False) + table = f"{self.bq_dataset_id}.{table}" - new_bugs_history = history + synthetic_history + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + schema=schema, + write_disposition="WRITE_APPEND" if not overwrite else "WRITE_TRUNCATE", + ) - if new_bugs_history: - only_unsaved_changes = self.filter_only_unsaved_changes( - new_bugs_history, new_ids + if self.write: + job = self.client.load_table_from_json( + cast(Iterable[dict[str, Any]], rows), + table, + job_config=job_config, ) + job.result() + logging.info(f"Wrote {len(rows)} records into {table}") + else: + logging.info(f"Skipping writes, would have written {len(rows)} to {table}") + for row in rows: + logging.debug(f" {row}") - return only_unsaved_changes, new_ids, True - - def fetch_history_updates( - self, all_existing_bugs: BugsById - ) -> tuple[Mapping[int, list[bugdantic.bugzilla.History]], bool]: - last_import_time = self.get_last_import_datetime() + def convert_bug(self, bug: Bug) -> Mapping[str, Any]: + return { + "number": bug.id, + "title": bug.summary, + "status": bug.status, + "resolution": bug.resolution, + "product": bug.product, + "component": bug.component, + "creator": bug.creator, + "severity": bug.severity, + "priority": bug.priority, + "creation_time": bug.creation_time.isoformat(), + "assigned_to": bug.assigned_to, + "keywords": bug.keywords, + "url": bug.url, + "user_story": bug.parsed_user_story, + "user_story_raw": bug.user_story, + "resolved_time": bug.resolved.isoformat() + if bug.resolved is not None + else None, + "whiteboard": bug.whiteboard, + "webcompat_priority": bug.webcompat_priority, + "webcompat_score": bug.webcompat_score, + "depends_on": bug.depends_on, + "blocks": bug.blocks + } - if last_import_time is not None: - updated_bug_ids = self.get_bugs_updated_since_last_import( - all_existing_bugs, last_import_time - ) + def convert_history_entry(self, entry: BugHistoryEntry) -> Mapping[str, Any]: + return { + "number": entry.number, + "who": entry.who, + "change_time": entry.change_time.isoformat(), + "changes": [ + { + "field_name": change.field_name, + "added": change.added, + "removed": change.removed, + } + for change in entry.changes + ], + } - logging.info( - f"Fetching bugs updated after last import: {updated_bug_ids} at {last_import_time.strftime('%Y-%m-%dT%H:%M:%SZ')}" # noqa - ) + def insert_bugs(self, all_bugs: BugsById) -> None: + table = "bugzilla_bugs" + schema = [ + bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("title", "STRING", mode="REQUIRED"), + bigquery.SchemaField("status", "STRING", mode="REQUIRED"), + bigquery.SchemaField("resolution", "STRING", mode="REQUIRED"), + bigquery.SchemaField("product", "STRING", mode="REQUIRED"), + bigquery.SchemaField("component", "STRING", mode="REQUIRED"), + bigquery.SchemaField("creator", "STRING", mode="REQUIRED"), + bigquery.SchemaField("severity", "INTEGER"), + bigquery.SchemaField("priority", "INTEGER"), + bigquery.SchemaField("creation_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("assigned_to", "STRING"), + bigquery.SchemaField("keywords", "STRING", mode="REPEATED"), + bigquery.SchemaField("url", "STRING"), + bigquery.SchemaField("user_story", "JSON"), + bigquery.SchemaField("user_story_raw", "STRING"), + bigquery.SchemaField("resolved_time", "TIMESTAMP"), + bigquery.SchemaField("whiteboard", "STRING"), + bigquery.SchemaField("webcompat_priority", "STRING"), + bigquery.SchemaField("webcompat_score", "INTEGER"), + bigquery.SchemaField("depends_on", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("blocks", "INTEGER", mode="REPEATED"), + ] + rows = [self.convert_bug(bug) for bug in all_bugs.values()] + self.write_table(table, schema, rows, overwrite=True) - if updated_bug_ids: - bugs_full_history, completed = self.fetch_bugs_history(updated_bug_ids) - # Filter down to only recent updates, since we always get the full history - bugs_history = {} - for bug_id, bug_full_history in bugs_full_history.items(): - bug_history = [ - item - for item in bug_full_history - if item.when > last_import_time - ] - if bug_history: - bugs_history[bug_id] = bug_history - - return bugs_history, completed - - logging.warning("No previous history update found") - - return {}, True - - def fetch_bug_history( - self, all_bugs: BugsById, recreate: bool = False - ) -> tuple[list[BugHistoryEntry], bool]: - filtered_new_history, new_ids, completed = self.fetch_history_for_new_bugs( - all_bugs, recreate - ) - if not completed: - return [], False + def insert_history_changes( + self, history_entries: HistoryByBug, recreate: bool + ) -> None: + schema = [ + bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("who", "STRING", mode="REQUIRED"), + bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField( + "changes", + "RECORD", + mode="REPEATED", + fields=[ + bigquery.SchemaField("field_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("added", "STRING", mode="REQUIRED"), + bigquery.SchemaField("removed", "STRING", mode="REQUIRED"), + ], + ), + ] - existing_bugs = { - bug_id: bug for bug_id, bug in all_bugs.items() if bug_id not in new_ids - } + rows = [ + self.convert_history_entry(entry) + for entries in history_entries.values() + for entry in entries + ] + self.write_table("bugs_history", schema, rows, overwrite=recreate) - existing_bugs_history, completed = self.fetch_history_updates(existing_bugs) - if not completed: - return [], False + def insert_bug_list( + self, table_name: str, field_name: str, bugs: Iterable[BugId] + ) -> None: + schema = [bigquery.SchemaField(field_name, "INTEGER", mode="REQUIRED")] + rows = [{field_name: bug_id} for bug_id in bugs] + self.write_table(table_name, schema, rows, overwrite=True) - if filtered_new_history or existing_bugs_history: - filtered_existing = self.filter_relevant_history(existing_bugs_history) - filtered_records = filtered_existing + filtered_new_history - return filtered_records, True + def insert_bug_links( + self, link_config: BugLinkConfig, links_by_bug: Mapping[BugId, Iterable[BugId]] + ) -> None: + schema = [ + bigquery.SchemaField( + link_config.from_field_name, "INTEGER", mode="REQUIRED" + ), + bigquery.SchemaField(link_config.to_field_name, "INTEGER", mode="REQUIRED"), + ] + rows = [ + { + link_config.from_field_name: from_bug_id, + link_config.to_field_name: to_bug_id, + } + for from_bug_id, to_bug_ids in links_by_bug.items() + for to_bug_id in to_bug_ids + ] + self.write_table(link_config.table_name, schema, rows, overwrite=True) - logging.info("No relevant history updates") - return [], True + def insert_external_links( + self, + link_config: ExternalLinkConfig, + links_by_bug: Mapping[BugId, Iterable[str]], + ) -> None: + schema = [ + bigquery.SchemaField("knowledge_base_bug", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField(link_config.field_name, "STRING", mode="REQUIRED"), + ] + rows = [ + {"knowledge_base_bug": bug_id, link_config.field_name: link_text} + for bug_id, links in links_by_bug.items() + for link_text in links + ] + self.write_table(link_config.table_name, schema, rows, overwrite=True) def record_import_run( self, start_time: float, - history_fetch_completed: bool, count: int, - history_count: int, + history_count: Optional[int], last_change_time: datetime, ) -> None: + if not self.write: + return + elapsed_time = time.monotonic() - start_time elapsed_time_delta = timedelta(seconds=elapsed_time) run_at = last_change_time - elapsed_time_delta @@ -1270,8 +1110,10 @@ def record_import_run( { "run_at": formatted_time, "bugs_imported": count, - "bugs_history_updated": history_count, - "is_history_fetch_completed": history_fetch_completed, + "bugs_history_updated": history_count + if history_count is not None + else 0, + "is_history_fetch_completed": history_count is not None, }, ] bugbug_runs_table = f"{self.bq_dataset_id}.import_runs" @@ -1281,103 +1123,195 @@ def record_import_run( else: logging.info("Last import run recorded") - def run(self) -> None: - start_time = time.monotonic() - fetch_all_result = self.fetch_all_bugs() +def get_kb_entries(all_bugs: BugsById, site_report_blockers: set[BugId]) -> set[BugId]: + direct_kb_entries = {bug_id for bug_id, bug in all_bugs.items() if is_kb_entry(bug)} + kb_blockers = { + dependency + for bug_id in direct_kb_entries + for dependency in all_bugs[bug_id].depends_on + } + # We include any bug that's blocking a site report but isn't in Web Compatibility + platform_site_report_blockers = { + bug_id + for bug_id in site_report_blockers + if bug_id not in kb_blockers and all_bugs[bug_id].product != "Web Compatibility" + } + # We also include all other bugs that are platform bugs but don't depend on a kb entry + # TODO: This is probably too many bugs; platform bugs that don't block any site reports + # should likely be excluded + platform_kb_entries = { + bug_id + for bug_id in all_bugs + if bug_id not in kb_blockers and is_webcompat_platform_bug(all_bugs[bug_id]) + } + return direct_kb_entries | platform_site_report_blockers | platform_kb_entries + + +def group_bugs( + all_bugs: BugsById, +) -> tuple[set[BugId], set[BugId], set[BugId], set[BugId]]: + """Extract groups of bugs according to their types""" + site_reports = {bug_id for bug_id, bug in all_bugs.items() if is_site_report(bug)} + etp_reports = {bug_id for bug_id, bug in all_bugs.items() if is_etp_report(bug)} + site_report_blockers = { + dependency + for bug_id in site_reports + for dependency in all_bugs[bug_id].depends_on + } + assert site_report_blockers.issubset(all_bugs.keys()) + kb_bugs = get_kb_entries(all_bugs, site_report_blockers) + platform_bugs = { + bug_id + for bug_id in all_bugs + if all_bugs[bug_id].product != "Web Compatibility" + } + return site_reports, etp_reports, kb_bugs, platform_bugs + + +def write_data( + path: str, + all_bugs: BugsById, + site_reports: set[BugId], + etp_reports: set[BugId], + kb_bugs: set[BugId], + platform_bugs: set[BugId], + bug_links: Iterable[tuple[BugLinkConfig, Mapping[BugId, set[BugId]]]], + external_links: Iterable[tuple[ExternalLinkConfig, Mapping[BugId, list[str]]]], +) -> None: + data: dict[str, Any] = {} + data["all_bugs"] = {bug_id: bug.to_json() for bug_id, bug in all_bugs.items()} + data["site_report"] = list(site_reports) + data["etp_reports"] = list(etp_reports) + data["kb_bugs"] = list(kb_bugs) + data["platform_bugs"] = list(platform_bugs) + for bug_link_config, link_data in bug_links: + data[bug_link_config.table_name] = { + bug_id: list(values) for bug_id, values in link_data.items() + } + for external_link_config, external_link_data in external_links: + data[external_link_config.table_name] = { + bug_id: list(values) for bug_id, values in external_link_data.items() + } + + with open("path", "w") as f: + json.dump(data, f) + + +def run( + client: bigquery.Client, + bq_dataset_id: str, + bz_client: bugdantic.Bugzilla, + write: bool, + include_history: bool, + recreate_history: bool, + write_bug_data_path: Optional[str], + load_bug_data_path: Optional[str], +) -> None: + start_time = time.monotonic() + + try: + if load_bug_data_path: + with open(load_bug_data_path) as f: + data = json.load(f) + all_bugs: Mapping[int, Bug] = {int(bug_id): Bug.from_json(bug_data) for bug_id, bug_data in data["all_bugs"].items()} + else: + all_bugs = fetch_all_bugs(bz_client) + except Exception as e: + raise BugFetchError( + "Fetching bugs from Bugzilla was not completed due to an error, aborting." + ) from e + + history_changes = None + if include_history: + history_updater = BugHistoryUpdater( + client, bq_dataset_id, bz_client, recreate_history + ) + try: + history_changes = history_updater.run(all_bugs) + except Exception as e: + logging.error(f"Exception updating history: {e}") + raise + else: + logging.info("Not updating bug history") - if fetch_all_result is None: - raise BugFetchError( - "Fetching bugs from Bugzilla was not completed due to an error, aborting." - ) + site_reports, etp_reports, kb_bugs, platform_bugs = group_bugs(all_bugs) + # Links between different kinds of bugs + bug_links = [ + ( + BugLinkConfig("breakage_reports", "knowledge_base_bug", "breakage_bug"), + get_kb_bug_site_report(all_bugs, kb_bugs, site_reports), + ), + ( + BugLinkConfig("core_bugs", "knowledge_base_bug", "core_bug"), + get_kb_bug_core_bugs(all_bugs, kb_bugs, platform_bugs), + ), ( + BugLinkConfig("etp_breakage_reports", "breakage_bug", "etp_meta_bug"), + get_etp_breakage_reports(all_bugs, etp_reports), + ), + ] + + # Links between bugs and external data sources + external_links = [ + (config, get_links(all_bugs, kb_bugs, config)) + for config in [ + ExternalLinkConfig( + "interventions", + "code_url", + ["github.com/mozilla-extensions/webcompat-addon"], + ), + ExternalLinkConfig( + "other_browser_issues", + "issue_url", + ["bugs.chromium.org", "bugs.webkit.org", "crbug.com"], + ), + ExternalLinkConfig( + "standards_issues", + "issue_url", + ["github.com/w3c", "github.com/whatwg", "github.com/wicg"], + ), + ExternalLinkConfig( + "standards_positions", "discussion_url", ["standards-positions"] + ), + ] + ] + + if write_bug_data_path: + write_data( + write_bug_data_path, all_bugs, site_reports, + etp_reports, kb_bugs, platform_bugs, - etp_reports, - etp_dependencies, - ) = fetch_all_result - - # Add platform bugs that should be imported as knowledge base bugs (with some - # modifications to their dependencies) - kb_bugs.update( - self.kb_bugs_from_platform_bugs( - platform_bugs, set(kb_bugs.keys()), set(site_reports.keys()) - ) + bug_links, + external_links, ) - # Process KB bugs fields and get their dependant core/breakage bugs ids. - kb_data, kb_dep_ids = self.process_relations(kb_bugs, RELATION_CONFIG) - self.add_kb_entry_breakage(kb_data, kb_dep_ids, site_reports) + last_change_time_max = max(bug.last_change_time for bug in all_bugs.values()) - fetch_missing_result = self.fetch_missing_deps(all_bugs, kb_dep_ids) - if fetch_missing_result is None: - raise BugFetchError( - "Fetching missing dependencies from Bugzilla was not completed due to an error, aborting." - ) + # Finally do the actual import + importer = BigQueryImporter(client, bq_dataset_id, write) + importer.insert_bugs(all_bugs) + if history_changes is not None: + importer.insert_history_changes(history_changes, recreate=recreate_history) - missing_bugs, core_missing = fetch_missing_result - - platform_bugs.update(core_missing) - all_bugs.update(missing_bugs) - - # Process core bugs and update KB data with missing links from core bugs. - if platform_bugs: - core_data, _ = self.process_relations( - platform_bugs, PLATFORM_RELATION_CONFIG - ) - kb_data = self.add_links(kb_data, core_data) + importer.insert_bug_list("kb_bugs", "number", kb_bugs) - # Build relations for BQ tables. - rels = self.build_relations(kb_data, RELATION_CONFIG) - - kb_ids = list(kb_data.keys()) - - if self.include_history: - history_changes, history_fetch_completed = self.fetch_bug_history( - all_bugs, self.recreate_history - ) - else: - logging.info("Not updating bug history") - history_changes = [] - history_fetch_completed = False - - etp_rels: Mapping[str, list[Mapping[str, Any]]] = {} - if etp_reports: - etp_reports_unified = self.unify_etp_dependencies( - etp_reports, etp_dependencies - ) - etp_data, _ = self.process_relations( - etp_reports_unified, ETP_RELATION_CONFIG - ) + for bug_link_config, data in bug_links: + importer.insert_bug_links(bug_link_config, data) - etp_rels = self.build_relations(etp_data, ETP_RELATION_CONFIG) + for external_link_config, links_by_bug in external_links: + importer.insert_external_links(external_link_config, links_by_bug) - if self.write: - if history_fetch_completed: - self.update_history(history_changes, self.recreate_history) - elif self.include_history: - logging.warning("Failed to fetch bug history, not updating") - self.update_bugs(all_bugs) - self.update_kb_ids(kb_ids) - self.update_relations(rels, RELATION_CONFIG) - self.update_relations(etp_rels, ETP_RELATION_CONFIG) - - last_change_time_max = max( - all_bugs.values(), key=lambda x: x["last_change_time"] - )["last_change_time"] - - self.record_import_run( - start_time, - history_fetch_completed, - len(all_bugs), - len(history_changes), - last_change_time_max, - ) - else: - logging.info("Skipping writes") + importer.record_import_run( + start_time, + len(all_bugs), + len(history_changes) if history_changes is not None else None, + last_change_time_max, + ) class BugzillaJob(EtlJob): @@ -1405,15 +1339,32 @@ def add_arguments(cls, parser: argparse.ArgumentParser) -> None: action="store_true", help="Re-read bug history from scratch", ) + group.add_argument( + "--bugzilla-write-bug-data", + action="store", + help="Path to write bug data as a JSON file", + ) + group.add_argument( + "--bugzilla-load-bug-data", + action="store", + help="Path to JSON file to load bug data from", + ) def main(self, client: bigquery.Client, args: argparse.Namespace) -> None: - bz_bq = BugzillaToBigQuery( + bz_config = bugdantic.BugzillaConfig( + "https://bugzilla.mozilla.org", + args.bugzilla_api_key, + allow_writes=args.write, + ) + bz_client = bugdantic.Bugzilla(bz_config) + + run( client, args.bq_kb_dataset, - args.bugzilla_api_key, + bz_client, args.write, args.bugzilla_include_history, args.bugzilla_recreate_history, + args.bugzilla_write_bug_data, + args.bugzilla_load_bug_data, ) - - bz_bq.run() diff --git a/jobs/webcompat-kb/webcompat_kb/metric_changes.py b/jobs/webcompat-kb/webcompat_kb/metric_changes.py index 38a49360..376249f9 100644 --- a/jobs/webcompat-kb/webcompat_kb/metric_changes.py +++ b/jobs/webcompat-kb/webcompat_kb/metric_changes.py @@ -9,7 +9,8 @@ from google.cloud import bigquery from .base import EtlJob -from .bugzilla import parse_string_to_json +from .bqhelpers import ensure_table +from .bugzilla import parse_user_story FIXED_STATES = {"RESOLVED", "VERIFIED"} @@ -62,23 +63,6 @@ class ScoreChange: reasons: list[str] -def ensure_table(client: bigquery.Client, bq_dataset_id: str, recreate: bool) -> None: - table_id = f"{client.project}.{bq_dataset_id}.webcompat_topline_metric_changes" - table = bigquery.Table( - table_id, - schema=[ - bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("who", "STRING", mode="REQUIRED"), - bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField("score_delta", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("reasons", "STRING", mode="REPEATED"), - ], - ) - if recreate: - client.delete_table(table, not_found_ok=True) - client.create_table(table, exists_ok=True) - - def get_last_recorded_date(client: bigquery.Client, bq_dataset_id: str) -> datetime: query = f""" SELECT change_time @@ -258,7 +242,17 @@ def bugs_historic_states( if field_change.field_name == "keywords": for keyword in field_change.added.split(", "): if keyword: - prev.keywords.remove(keyword) + try: + prev.keywords.remove(keyword) + except ValueError: + # Occasionally keywords change case + for prev_keyword in prev.keywords: + if prev_keyword.lower() == keyword.lower(): + prev.keywords.remove(prev_keyword) + logging.warning(f"Didn't find keyword {keyword} using {prev_keyword}") + break + else: + raise for keyword in field_change.removed.split(", "): if keyword: prev.keywords.append(keyword) @@ -333,7 +327,7 @@ def compute_historic_scores( "index": i, "keywords": state.keywords, "url": state.url, - "user_story": parse_string_to_json(state.user_story), + "user_story": parse_user_story(state.user_story), } ) @@ -361,10 +355,10 @@ def compute_historic_scores( client.delete_table(tmp_name) for bug_id, computed_scores in rv.items(): - current_score = current_scores.get(bug_id, 0) + current_score = float(current_scores.get(bug_id, 0)) if ( computed_scores[0] != current_score and states[0].status not in FIXED_STATES - ) or bug_id == 1953996: + ): history_logging = "\n".join( f" {score}: {state}" for score, state in zip(computed_scores, historic_states[bug_id]) @@ -511,12 +505,11 @@ def insert_score_changes( ) if write: - job = client.load_table_from_json( + client.load_table_from_json( rows, changes_table, job_config=job_config, - ) - job.result() + ).result() logging.info(f"Wrote {len(rows)} records into {changes_table}") else: logging.info("Skipping writes, would have written:") @@ -527,8 +520,9 @@ def insert_score_changes( def update_metric_changes( client: bigquery.Client, bq_dataset_id: str, write: bool, recreate: bool ) -> None: - ensure_table(client, bq_dataset_id, recreate) + ensure_table(client, bq_dataset_id, "webcompat_topline_metric_changes", recreate) last_recorded_date = get_last_recorded_date(client, bq_dataset_id) + logging.info(f"Last change time {last_recorded_date}") changes_by_bug = get_bug_changes(client, bq_dataset_id, last_recorded_date) current_bug_data = get_bugs( client, bq_dataset_id, last_recorded_date, iter(changes_by_bug.keys())