From 9bfae1b8e30ba0b7968086876166fe94d5f100cd Mon Sep 17 00:00:00 2001 From: James Graham Date: Thu, 17 Apr 2025 09:17:41 +0100 Subject: [PATCH 1/2] Handle capitalization changes in keywords It seems sometimes keywords can change in their capitalization, so if we don't find the keyword in the historic set check for entries that differ only in case. --- jobs/webcompat-kb/webcompat_kb/metric_changes.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/jobs/webcompat-kb/webcompat_kb/metric_changes.py b/jobs/webcompat-kb/webcompat_kb/metric_changes.py index 38a49360..78f6bf7e 100644 --- a/jobs/webcompat-kb/webcompat_kb/metric_changes.py +++ b/jobs/webcompat-kb/webcompat_kb/metric_changes.py @@ -258,7 +258,17 @@ def bugs_historic_states( if field_change.field_name == "keywords": for keyword in field_change.added.split(", "): if keyword: - prev.keywords.remove(keyword) + try: + prev.keywords.remove(keyword) + except ValueError: + # Occasionally keywords change case + for prev_keyword in prev.keywords: + if prev_keyword.lower() == keyword.lower(): + prev.keywords.remove(prev_keyword) + logging.warning(f"Didn't find keyword {keyword} using {prev_keyword}") + break + else: + raise for keyword in field_change.removed.split(", "): if keyword: prev.keywords.append(keyword) From 0e6fe4ae0758880192b586ae01b4612924b0ce47 Mon Sep 17 00:00:00 2001 From: James Graham Date: Thu, 17 Apr 2025 17:14:32 +0100 Subject: [PATCH 2/2] WIP refactor webcompat ETL --- jobs/webcompat-kb/tests/test_bugzilla.py | 5 +- jobs/webcompat-kb/webcompat_kb/bqhelpers.py | 19 + jobs/webcompat-kb/webcompat_kb/bugzilla.py | 1947 ++++++++--------- .../webcompat_kb/metric_changes.py | 34 +- 4 files changed, 979 insertions(+), 1026 deletions(-) create mode 100644 jobs/webcompat-kb/webcompat_kb/bqhelpers.py diff --git a/jobs/webcompat-kb/tests/test_bugzilla.py b/jobs/webcompat-kb/tests/test_bugzilla.py index 62935e34..50c12837 100644 --- a/jobs/webcompat-kb/tests/test_bugzilla.py +++ b/jobs/webcompat-kb/tests/test_bugzilla.py @@ -10,7 +10,6 @@ from webcompat_kb.bugzilla import BugzillaToBigQuery from webcompat_kb.bugzilla import extract_int_from_field from webcompat_kb.bugzilla import parse_string_to_json -from webcompat_kb.bugzilla import RELATION_CONFIG, LINK_FIELDS, ETP_RELATION_CONFIG def to_history(data: list[dict[str, Any]]) -> Mapping[int, list[History]]: @@ -999,13 +998,13 @@ def test_extract_int_from_field(): field = extract_int_from_field("P3") assert field == 3 - field = extract_int_from_field("critical") + field = extract_int_from_field("critical", value_map={"critical": 1}) assert field == 1 field = extract_int_from_field("--") assert field is None - field = extract_int_from_field("N/A") + field = extract_int_from_field("N/A", value_map={"n/a": None}) assert field is None field = extract_int_from_field("") diff --git a/jobs/webcompat-kb/webcompat_kb/bqhelpers.py b/jobs/webcompat-kb/webcompat_kb/bqhelpers.py new file mode 100644 index 00000000..98510f56 --- /dev/null +++ b/jobs/webcompat-kb/webcompat_kb/bqhelpers.py @@ -0,0 +1,19 @@ +from google.cloud import bigquery + + +def ensure_table( + client: bigquery.Client, bq_dataset_id: str, table_id: str, recreate: bool +) -> None: + table = bigquery.Table( + f"{client.project}.{bq_dataset_id}.{table_id}", + schema=[ + bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("who", "STRING", mode="REQUIRED"), + bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("score_delta", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("reasons", "STRING", mode="REPEATED"), + ], + ) + if recreate: + client.delete_table(table, not_found_ok=True) + client.create_table(table, exists_ok=True) diff --git a/jobs/webcompat-kb/webcompat_kb/bugzilla.py b/jobs/webcompat-kb/webcompat_kb/bugzilla.py index 0d19472d..5a93741e 100644 --- a/jobs/webcompat-kb/webcompat_kb/bugzilla.py +++ b/jobs/webcompat-kb/webcompat_kb/bugzilla.py @@ -1,4 +1,6 @@ import argparse +import enum +import json import logging import os import re @@ -7,14 +9,13 @@ Any, Iterable, Iterator, - Mapping, MutableMapping, - NamedTuple, Optional, - Sequence, - Union, + Self, cast, ) +from collections import defaultdict +from collections.abc import Sequence, Mapping from dataclasses import dataclass from datetime import datetime, timedelta @@ -22,22 +23,168 @@ from google.cloud import bigquery from .base import EtlJob +from .bqhelpers import ensure_table -Bug = Mapping[str, Any] -BugsById = Mapping[int, Bug] -MutBugsById = MutableMapping[int, Bug] -Relations = Mapping[str, list[Mapping[str, Any]]] -RelationConfig = Mapping[str, Mapping[str, Any]] +class BugFetchError(Exception): + pass -@dataclass + +@dataclass(frozen=True) +class Bug: + id: int + summary: str + status: str + resolution: str + product: str + component: str + creator: str + see_also: list[str] + depends_on: list[int] + blocks: list[int] + priority: Optional[int] + severity: Optional[int] + creation_time: datetime + assigned_to: Optional[str] + keywords: list[str] + url: str + user_story: str + last_resolved: Optional[datetime] + last_change_time: datetime + whiteboard: str + webcompat_priority: Optional[str] + webcompat_score: Optional[int] + + @property + def parsed_user_story(self) -> Mapping[str, Any]: + return parse_user_story(self.user_story) + + @property + def resolved(self) -> Optional[datetime]: + if self.status in {"RESOLVED", "VERIFIED"} and self.last_resolved: + return self.last_resolved + return None + + @classmethod + def from_bugzilla(cls, bug: bugdantic.bugzilla.Bug) -> Self: + assert bug.id is not None + assert bug.summary is not None + assert bug.status is not None + assert bug.resolution is not None + assert bug.product is not None + assert bug.component is not None + assert bug.creator is not None + assert bug.see_also is not None + assert bug.depends_on is not None + assert bug.blocks is not None + assert bug.priority is not None + assert bug.severity is not None + assert bug.creation_time is not None + assert bug.assigned_to is not None + assert bug.keywords is not None + assert bug.url is not None + assert bug.last_change_time is not None + assert bug.whiteboard is not None + assert bug.cf_user_story is not None + + return cls( + id=bug.id, + summary=bug.summary, + status=bug.status, + resolution=bug.resolution, + product=bug.product, + component=bug.component, + see_also=bug.see_also, + depends_on=bug.depends_on, + blocks=bug.blocks, + priority=extract_int_from_field( + bug.priority, + value_map={ + "--": None, + }, + ), + severity=extract_int_from_field( + bug.severity, + value_map={ + "n/a": None, + "--": None, + "blocker": 1, + "critical": 1, + "major": 2, + "normal": 3, + "minor": 4, + "trivial": 4, + "enhancement": 4, + }, + ), + creation_time=bug.creation_time, + assigned_to=bug.assigned_to + if bug.assigned_to != "nobody@mozilla.org" + else None, + keywords=bug.keywords, + url=bug.url, + user_story=bug.cf_user_story, + last_resolved=bug.cf_last_resolved, + last_change_time=bug.last_change_time, + whiteboard=bug.whiteboard, + creator=bug.creator, + webcompat_priority=( + bug.cf_webcompat_priority + if bug.cf_webcompat_priority != "---" + else None + ), + webcompat_score=extract_int_from_field( + bug.cf_webcompat_score, + value_map={ + "---": None, + "?": None, + }, + ), + ) + + def to_json(self) -> Mapping[str, Any]: + fields = {**vars(self)} + for key in fields: + if isinstance(fields[key], datetime): + fields[key] = fields[key].isoformat() + return fields + + @classmethod + def from_json(cls, bug_data: Mapping[str, Any]) -> Self: + return cls( + id=bug_data["id"], + summary=bug_data["summary"], + status=bug_data["status"], + resolution=bug_data["resolution"], + product=bug_data["product"], + component=bug_data["component"], + see_also=bug_data["see_also"], + depends_on=bug_data["depends_on"], + blocks=bug_data["blocks"], + priority=bug_data["priority"], + severity=bug_data["severity"], + creation_time=datetime.fromisoformat(bug_data["creation_time"]), + assigned_to=bug_data["assigned_to"], + keywords=bug_data["keywords"], + url=bug_data["url"], + user_story=bug_data["user_story"], + last_resolved=datetime.fromisoformat(bug_data["last_resolved"]) if bug_data["last_resolved"] is not None else None , + last_change_time=datetime.fromisoformat(bug_data["last_change_time"]), + whiteboard=bug_data["whiteboard"], + creator=bug_data["creator"], + webcompat_priority=bug_data["webcompat_priority"], + webcompat_score=bug_data["webcompat_score"], + ) + + +@dataclass(frozen=True) class BugHistoryChange: field_name: str added: str removed: str -@dataclass +@dataclass(frozen=True) class BugHistoryEntry: number: int who: str @@ -45,7 +192,8 @@ class BugHistoryEntry: changes: list[BugHistoryChange] -class HistoryRow(NamedTuple): +@dataclass(frozen=True) +class HistoryChange: number: int who: str change_time: datetime @@ -54,29 +202,43 @@ class HistoryRow(NamedTuple): removed: str -class BugFetchError(Exception): - pass +class PropertyChange(enum.StrEnum): + added = "added" + removed = "removed" -BUGZILLA_URL = "https://bugzilla.mozilla.org/" - -OTHER_BROWSER = ["bugs.chromium.org", "bugs.webkit.org", "crbug.com"] -STANDARDS_ISSUES = ["github.com/w3c", "github.com/whatwg", "github.com/wicg"] -STANDARDS_POSITIONS = ["standards-positions"] -INTERVENTIONS = ["github.com/mozilla-extensions/webcompat-addon"] -FIELD_MAP = { - "blocker": 1, - "critical": 1, - "major": 2, - "normal": 3, - "minor": 4, - "trivial": 4, - "enhancement": 4, - "n/a": None, - "--": None, -} +@dataclass(frozen=True) +class PropertyHistoryItem: + change_time: datetime + change: PropertyChange + + +class PropertyHistory: + """Representation of the history of a specific boolean property + (i.e. one that can be present or not)""" + + def __init__(self) -> None: + self.data: list[PropertyHistoryItem] = [] + + def __len__(self) -> int: + return len(self.data) + + def add(self, change_time: datetime, change: PropertyChange) -> None: + self.data.append(PropertyHistoryItem(change_time=change_time, change=change)) + + def missing_initial_add(self) -> bool: + """Check if the property was initially added""" + self.data.sort(key=lambda x: x.change_time) + return len(self.data) == 0 or self.data[0].change == PropertyChange.added + + +BugId = int +BugsById = Mapping[BugId, Bug] +MutBugsById = MutableMapping[BugId, Bug] + +HistoryByBug = Mapping[BugId, Sequence[BugHistoryEntry]] -FILTER_CONFIG = { +BUG_QUERIES: Mapping[str, dict[str, str]] = { "site_reports_wc": { "product": "Web Compatibility", "component": "Site Reports", @@ -147,91 +309,44 @@ class BugFetchError(Exception): }, } -RELATION_CONFIG = { - "core_bugs": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "core_bug", "type": "INTEGER", "mode": "REQUIRED"}, - ], - "source": "depends_on", - "store_id": "core", - }, - "breakage_reports": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "breakage_bug", "type": "INTEGER", "mode": "REQUIRED"}, - ], - "source": "blocks", - "store_id": "breakage", - }, - "interventions": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "code_url", "type": "STRING", "mode": "REQUIRED"}, - ], - "source": "see_also", - "condition": INTERVENTIONS, - }, - "other_browser_issues": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "issue_url", "type": "STRING", "mode": "REQUIRED"}, - ], - "source": "see_also", - "condition": OTHER_BROWSER, - }, - "standards_issues": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "issue_url", "type": "STRING", "mode": "REQUIRED"}, - ], - "source": "see_also", - "condition": STANDARDS_ISSUES, - }, - "standards_positions": { - "fields": [ - {"name": "knowledge_base_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "discussion_url", "type": "STRING", "mode": "REQUIRED"}, - ], - "source": "see_also", - "condition": STANDARDS_POSITIONS, - }, -} -ETP_RELATION_CONFIG = { - "etp_breakage_reports": { - "fields": [ - {"name": "breakage_bug", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "etp_meta_bug", "type": "INTEGER", "mode": "REQUIRED"}, - ], - "source": "depends_on", - "store_id": "breakage", - }, -} +@dataclass +class BugLinkConfig: + table_name: str + from_field_name: str + to_field_name: str + -LINK_FIELDS = ["other_browser_issues", "standards_issues", "standards_positions"] -PLATFORM_RELATION_CONFIG = {key: RELATION_CONFIG[key] for key in LINK_FIELDS} +@dataclass +class ExternalLinkConfig: + table_name: str + field_name: str + match_substrs: list[str] -def extract_int_from_field(field: Optional[str]) -> Optional[int]: - if field: - if field.lower() in FIELD_MAP: - return FIELD_MAP[field.lower()] +def extract_int_from_field( + field_value: Optional[str], value_map: Optional[Mapping[str, Optional[int]]] = None +) -> Optional[int]: + if field_value: + if value_map and field_value.lower() in value_map: + return value_map[field_value.lower()] - match = re.search(r"\d+", field) + match = re.search(r"\d+", field_value) if match: return int(match.group()) - + logging.warning( + f"Unexpected field value '{field_value}', could not convert to integer" + ) return None -def parse_string_to_json(input_string: str) -> Union[str, Mapping[str, Any]]: +def parse_user_story(input_string: str) -> Mapping[str, str | list[str]]: if not input_string: - return "" + return {} lines = input_string.splitlines() - result_dict: dict[str, Any] = {} + result_dict: dict[str, str | list[str]] = {} for line in lines: if line: @@ -239,39 +354,34 @@ def parse_string_to_json(input_string: str) -> Union[str, Mapping[str, Any]]: if len(key_value) == 2: key, value = key_value if key in result_dict: - if isinstance(result_dict[key], list): - result_dict[key].append(value) + current_value = result_dict[key] + if isinstance(current_value, list): + current_value.append(value) else: - result_dict[key] = [result_dict[key], value] + result_dict[key] = [current_value, value] else: result_dict[key] = value if not result_dict: - return "" + return {} return result_dict -class BugzillaToBigQuery: - def __init__( - self, - client: bigquery.Client, - bq_dataset_id: str, - bugzilla_api_key: Optional[str], - write: bool, - include_history: bool, - recreate_history: bool, - ): - bz_config = bugdantic.BugzillaConfig( - BUGZILLA_URL, bugzilla_api_key, allow_writes=write - ) - self.bz_client = bugdantic.Bugzilla(bz_config) - self.client = client - self.bq_dataset_id = bq_dataset_id - self.write = write - self.include_history = include_history - self.recreate_history = recreate_history +class BugCache(Mapping): + def __init__(self, bz_client: bugdantic.Bugzilla): + self.bz_client = bz_client + self.bugs: MutBugsById = {} + + def __getitem__(self, key: BugId) -> Bug: + return self.bugs[key] - def fetch_bugs(self, params: dict[str, str]) -> tuple[bool, MutBugsById]: + def __len__(self) -> int: + return len(self.bugs) + + def __iter__(self) -> Iterator[BugId]: + yield from self.bugs + + def bz_fetch_bugs(self, params: dict[str, str]) -> None: fields = [ "id", "summary", @@ -293,504 +403,293 @@ def fetch_bugs(self, params: dict[str, str]) -> tuple[bool, MutBugsById]: "cf_last_resolved", "last_change_time", "whiteboard", - "creator", "cf_webcompat_priority", "cf_webcompat_score", ] try: - bugs = self.bz_client.search(query=params, include_fields=fields) - data: MutBugsById = {} + bugs = self.bz_client.search( + query=params, include_fields=fields, page_size=200 + ) for bug in bugs: assert bug.id is not None - data[bug.id] = bug.to_dict() - fetch_completed = True + self.bugs[bug.id] = Bug.from_bugzilla(bug) except Exception as e: logging.error(f"Error: {e}") - fetch_completed = False - data = {} - - return fetch_completed, data - - def kb_bugs_from_platform_bugs( - self, - platform_bugs: BugsById, - kb_ids: set[int], - site_report_ids: set[int], - ) -> BugsById: - """Get a list of platform bugs that should also be considered knowledge base bugs - - These are platform bugs that aren't blocking an existing kb entry - """ - - filtered = {} - - for bug_id, source_bug in platform_bugs.items(): - # Check if the platform bug already has a kb entry and skip if so - if any(blocked_id in kb_ids for blocked_id in source_bug["blocks"]): - continue - - bug = {**source_bug} - - # Only store a breakage bug as it's the relation we care about - bug["blocks"] = [ - blocked_id - for blocked_id in bug["blocks"] - if blocked_id in site_report_ids - ] - - # Don't store bugs that platform bug depends on - bug["depends_on"] = [] - - filtered[bug_id] = bug - - return filtered - - def unify_etp_dependencies( - self, - etp_reports: BugsById, - etp_dependencies: BugsById, - ) -> BugsById: - """Unify blocked and depends_on for each ETP bug as their dependencies are inconsistent, - keep only ETP meta bugs and store them only in depends_on field.""" - - filtered = {} - - for bug_id, source_bug in etp_reports.items(): - bug = {**source_bug} - - blocks = [ - blocked_id - for blocked_id in bug["blocks"] - if blocked_id in etp_dependencies - and "meta" in etp_dependencies[blocked_id]["keywords"] - ] - - depends = [ - depends_id - for depends_id in bug["depends_on"] - if depends_id in etp_dependencies - and "meta" in etp_dependencies[depends_id]["keywords"] - ] - - bug["depends_on"] = blocks + depends - bug["blocks"] = [] - - filtered[bug_id] = bug - - return filtered - - def chunked_list(self, data: list[int], size: int) -> Iterator[list[int]]: - for i in range(0, len(data), size): - yield data[i : i + size] - - def fetch_by_id(self, bug_ids: set[int]) -> tuple[bool, MutBugsById]: - chunk_size = 400 - all_bugs: dict[int, Bug] = {} - all_completed = True + raise - for chunk in self.chunked_list(list(bug_ids), chunk_size): - logging.info(f"Fetching {len(chunk)} bugs") - - completed, bugs = self.fetch_bugs({"id": ",".join(map(str, chunk))}) - if completed: - all_bugs.update(bugs) - else: - all_completed = False - break - - return all_completed, all_bugs - - def fetch_related_bugs( - self, bugs: BugsById, relations: list[str], all_bugs: MutBugsById - ) -> tuple[bool, MutBugsById]: - related_ids: set[int] = set() - related_bugs: dict[int, Bug] = {} + def fetch_missing_relations(self, bugs: BugsById, relation: str) -> int: + related_ids: set[str] = set() for bug in bugs.values(): - for relation_property in relations: - related_ids |= set( - bug_id - for bug_id in bug[relation_property] - if bug_id not in all_bugs - ) - related_bugs.update( - { - bug_id: all_bugs[bug_id] - for bug_id in bug[relation_property] - if bug_id in all_bugs - } - ) - - completed, fetched_bugs = self.fetch_by_id(related_ids) - all_bugs.update(fetched_bugs) - related_bugs.update(fetched_bugs) - return completed, related_bugs + related_ids |= { + str(bug_id) for bug_id in getattr(bug, relation) if bug_id not in self + } - def filter_core_bugs(self, bugs: BugsById) -> BugsById: - return { - bug_id: bug - for bug_id, bug in bugs.items() - if bug["product"] != "Web Compatibility" - } + if related_ids: + self.bz_fetch_bugs({"id": ",".join(related_ids)}) + return len(related_ids) - def fetch_all_bugs( - self, - ) -> Optional[ - tuple[ - MutBugsById, MutBugsById, MutBugsById, MutBugsById, MutBugsById, MutBugsById - ] - ]: - """Get all the bugs that should be imported into BigQuery. - - :returns: A tuple of (all bugs, site report bugs, knowledge base bugs, - core bugs, ETP report bugs, ETP dependencies).""" - fetched_bugs = {} - all_bugs: dict[int, Bug] = {} - - for category, filter_config in FILTER_CONFIG.items(): - logging.info(f"Fetching {category} bugs") - completed, fetched_bugs[category] = self.fetch_bugs(filter_config) - all_bugs.update(fetched_bugs[category]) - if not completed: - return None - - site_reports = fetched_bugs["site_reports_wc"] - site_reports.update(fetched_bugs["site_reports_other"]) - - logging.info("Fetching site-report blocking bugs") - completed_site_report_deps, site_reports_deps = self.fetch_related_bugs( - site_reports, ["depends_on"], all_bugs - ) + def into_mapping(self) -> BugsById: + """Convert the data into a plain dict. - if not completed_site_report_deps: - logging.error("Failed to fetch site report blocking bugs") - return None + Also reset this object, so we aren't sharing the state between multiple places + """ + bugs = self.bugs + self.bugs = {} + return bugs + + +def is_site_report(bug: Bug) -> bool: + return (bug.product == "Web Compatibility" and bug.component == "Site Reports") or ( + bug.product != "Web Compatibility" and "webcompat:site-report" in bug.keywords + ) + + +def is_etp_report(bug: Bug) -> bool: + return ( + bug.product == "Web Compatibility" and bug.component == "Privacy: Site Reports" + ) + + +def is_kb_entry(bug: Bug) -> bool: + """Get things that are directly in the knowledge base. + + This doesn't include core bugs that should be considered part of the knowledge base + because they directly block a platform bug.""" + return bug.product == "Web Compatibility" and bug.component == "Knowledge Base" + + +def is_webcompat_platform_bug(bug: Bug) -> bool: + """Check if a bug is a platform bug . + + These are only actually in the kb if they also block a site report""" + return ( + bug.product != "Web Compatibility" and "webcompat:platform-bug" in bug.keywords + ) + + +def is_parity_bug(bug: Bug) -> bool: + return any(item.startswith("parity-") for item in bug.keywords) + + +def get_links( + all_bugs: BugsById, source_bugs: set[BugId], config: ExternalLinkConfig +) -> Mapping[BugId, list[str]]: + rv: defaultdict[int, list[str]] = defaultdict(list) + for bug_id in source_bugs: + bug = all_bugs[bug_id] + for entry in bug.see_also: + if any(substr in entry for substr in config.match_substrs): + rv[bug_id].append(entry) + return rv + + +def get_kb_bug_core_bugs( + all_bugs: BugsById, kb_bugs: set[BugId], platform_bugs: set[BugId] +) -> Mapping[BugId, set[BugId]]: + rv = defaultdict(set) + for kb_id in kb_bugs: + if kb_id not in platform_bugs: + for bug_id in all_bugs[kb_id].depends_on: + if bug_id in platform_bugs: + rv[kb_id].add(bug_id) + return rv + + +def get_kb_bug_site_report( + all_bugs: BugsById, kb_bugs: set[BugId], site_report_bugs: set[BugId] +) -> Mapping[BugId, set[BugId]]: + rv = defaultdict(set) + for kb_id in kb_bugs: + if kb_id in site_report_bugs: + rv[kb_id].add(kb_id) + for bug_id in all_bugs[kb_id].blocks: + if bug_id in site_report_bugs: + rv[kb_id].add(bug_id) + return rv + + +def get_etp_breakage_reports( + all_bugs: BugsById, etp_reports: set[BugId] +) -> Mapping[BugId, set[BugId]]: + rv = {} + for bug_id in etp_reports: + report_bug = all_bugs[bug_id] + meta_bugs = { + meta_id + for meta_id in report_bug.depends_on + report_bug.blocks + if "meta" in all_bugs[meta_id].keywords + } + if meta_bugs: + rv[bug_id] = meta_bugs + return rv - kb_bugs = fetched_bugs["knowledge_base"] - logging.info("Fetching blocking bugs for KB bugs") - completed_platform_bugs, kb_deps = self.fetch_related_bugs( - kb_bugs, ["depends_on"], all_bugs - ) +def fetch_all_bugs( + bz_client: bugdantic.Bugzilla, +) -> BugsById: + """Get all the bugs that should be imported into BigQuery. - if not completed: - logging.error("Failed to fetch blocking bugs") - return None + :returns: A tuple of (all bugs, site report bugs, knowledge base bugs, + core bugs, ETP report bugs, ETP dependencies).""" - platform_bugs = fetched_bugs["platform_bugs"] - platform_bugs.update(self.filter_core_bugs(site_reports_deps)) - platform_bugs.update(self.filter_core_bugs(kb_deps)) + bug_cache = BugCache(bz_client) - etp_reports = fetched_bugs["site_reports_etp"] + for category, filter_config in BUG_QUERIES.items(): + logging.info(f"Fetching {category} bugs") + bug_cache.bz_fetch_bugs(filter_config) - etp_completed, etp_dependencies = self.fetch_related_bugs( - etp_reports, ["depends_on", "blocks"], all_bugs + fetch_count = -1 + while fetch_count != 0: + # Get all blocking bugs for site reports or kb entries or etp site reports + # This can take more than one iteration if dependencies themselves turn out + # to be site reports that were excluded by a the date cutoff + fetch_count = bug_cache.fetch_missing_relations( + { + bug_id: bug + for bug_id, bug in bug_cache.items() + if is_site_report(bug) or is_kb_entry(bug) or is_etp_report(bug) + }, + "depends_on", ) - - if not etp_completed: - logging.error("Failed to fetch etp blocking bugs") - return None - - return ( - all_bugs, - site_reports, - kb_bugs, - platform_bugs, - etp_reports, - etp_dependencies, + fetch_count += bug_cache.fetch_missing_relations( + {bug_id: bug for bug_id, bug in bug_cache.items() if is_etp_report(bug)}, + "blocks", ) + if fetch_count: + logging.info(f"Fetched {fetch_count} related bugs") - def process_relations( - self, bugs: BugsById, relation_config: RelationConfig - ) -> tuple[Mapping[int, Mapping[str, list[int | str]]], Mapping[str, set[int]]]: - """Build relationship tables based on information in the bugs. + return bug_cache.into_mapping() - :returns: A mapping {bug_id: {relationship name: [related items]}} and - a mapping {store id: {bug ids}} - """ - # The types here are wrong; the return values are lists of ints or lists of strings but not both. - # However enforcing that property is hard without building specific types for the two cases - relations: dict[int, dict[str, list[int | str]]] = {} - related_bug_ids: dict[str, set[int]] = {} - for config in relation_config.values(): - if "store_id" in config: - related_bug_ids[config["store_id"]] = set() - - for bug_id, bug in bugs.items(): - relations[bug_id] = {rel: [] for rel in relation_config.keys()} +class BugHistoryUpdater: + def __init__( + self, + bq_client: bigquery.Client, + bq_dataset_id: str, + bz_client: bugdantic.Bugzilla, + recreate: bool = False, + ): + self.bq_client = bq_client + self.bq_dataset_id = bq_dataset_id + self.bz_client = bz_client + self.recreate = recreate - for rel, config in relation_config.items(): - related_items = bug[config["source"]] + def run(self, all_bugs: BugsById) -> HistoryByBug: + existing_records = self.bigquery_fetch_history(all_bugs.keys()) - for item in related_items: - if "condition" in config and not any( - c in item for c in config["condition"] - ): - continue + new_bugs, existing_bugs = self.group_bugs(all_bugs) - relations[bug_id][rel].append(item) + new_bugs_history = self.missing_records( + existing_records, self.new_bugs_history(new_bugs) + ) + existing_bugs_history = self.missing_records( + existing_records, self.existing_bugs_history(existing_bugs) + ) - if config.get("store_id"): - assert isinstance(item, int) - related_bug_ids[config["store_id"]].add(item) + if not (new_bugs_history or existing_bugs_history): + logging.info("No relevant history updates") + return {} - return relations, related_bug_ids + return self.merge_history(existing_bugs_history, new_bugs_history) - def add_kb_entry_breakage( - self, - kb_data: Mapping[int, Mapping[str, list[int | str]]], - kb_dep_ids: Mapping[str, set[int]], - site_reports: BugsById, - ) -> None: - """Add breakage relations for bugs that are both kb entries and also site reports - - If a core bug has the webcompat:platform-bug keyword it's a kb entry. - If it also has the webcompat:site-report keyword it's a site report. - In this case we want the bug to reference itself in the breakage_reports table.""" - for bug_id in set(kb_data.keys()) & set(site_reports.keys()): - if bug_id not in kb_dep_ids["breakage"]: - kb_data[bug_id]["breakage_reports"].append(bug_id) - kb_dep_ids["breakage"].add(bug_id) - - def fetch_missing_deps( - self, all_bugs: BugsById, kb_dep_ids: Mapping[str, set[int]] - ) -> Optional[tuple[BugsById, BugsById]]: - dep_ids = {item for sublist in kb_dep_ids.values() for item in sublist} - - # Check for missing bugs - missing_ids = dep_ids - set(all_bugs.keys()) - - if missing_ids: - logging.info( - "Fetching missing core bugs and breakage reports from Bugzilla" - ) - completed, missing_bugs = self.fetch_bugs( - {"id": ",".join(map(str, missing_ids))} - ) - if not completed: - return None - - # Separate core bugs for updating relations. - core_dependenies = set(kb_dep_ids.get("core", set())) - core_missing = { - bug_id: bug - for bug_id, bug in missing_bugs.items() - if bug_id in core_dependenies - } + def group_bugs(self, all_bugs: BugsById) -> tuple[BugsById, BugsById]: + all_ids = set(all_bugs.keys()) + if not self.recreate: + existing_ids = self.bigquery_fetch_imported_ids() + new_ids = all_ids - existing_ids else: - missing_bugs, core_missing = {}, {} - - return missing_bugs, core_missing - - def add_links( - self, - kb_processed: Mapping[int, Mapping[str, list[int | str]]], - dep_processed: Mapping[int, Mapping[str, list[int | str]]], - ) -> Mapping[int, Mapping[str, list[int | str]]]: - """Create links between kb entries and external data such - as standards issues.""" - result = {**kb_processed} - - for kb_bug_id in result: - for core_bug_id in result[kb_bug_id]["core_bugs"]: - assert isinstance(core_bug_id, int) - for sub_key in LINK_FIELDS: - if sub_key in result[kb_bug_id] and sub_key in dep_processed.get( - core_bug_id, {} - ): - for link_item in dep_processed[core_bug_id][sub_key]: - if link_item not in result[kb_bug_id][sub_key]: - result[kb_bug_id][sub_key].append(link_item) - - return result + new_ids = all_ids - def build_relations( - self, bugs: BugsById, relation_config: RelationConfig - ) -> Relations: - relations: dict[str, list[Mapping[str, Any]]] = { - key: [] for key in relation_config.keys() + new_bugs = { + bug_id: bug for bug_id, bug in all_bugs.items() if bug_id in new_ids } - - for bug_id, bug in bugs.items(): - for field_key, items in bug.items(): - fields = relation_config[field_key]["fields"] - - for row in items: - relation_row = {fields[0]["name"]: bug_id, fields[1]["name"]: row} - relations[field_key].append(relation_row) - - return relations - - def convert_bug_data(self, bug: Bug) -> dict[str, Any]: - resolved = None - if bug["status"] in ["RESOLVED", "VERIFIED"] and bug["cf_last_resolved"]: - resolved = bug["cf_last_resolved"] - - user_story = parse_string_to_json(bug["cf_user_story"]) - - assigned_to = ( - bug["assigned_to"] if bug["assigned_to"] != "nobody@mozilla.org" else None - ) - webcompat_priority = ( - bug.get("cf_webcompat_priority") - if bug.get("cf_webcompat_priority") != "---" - else None - ) - - return { - "number": bug["id"], - "title": bug["summary"], - "status": bug["status"], - "resolution": bug["resolution"], - "product": bug["product"], - "component": bug["component"], - "creator": bug["creator"], - "severity": extract_int_from_field(bug["severity"]), - "priority": extract_int_from_field(bug["priority"]), - "creation_time": bug["creation_time"].isoformat(), - "assigned_to": assigned_to, - "keywords": bug["keywords"], - "url": bug["url"], - "user_story": user_story, - "user_story_raw": bug["cf_user_story"], - "resolved_time": resolved.isoformat() if resolved is not None else None, - "whiteboard": bug["whiteboard"], - "webcompat_priority": webcompat_priority, - "webcompat_score": extract_int_from_field(bug.get("cf_webcompat_score")), + existing_bugs = { + bug_id: bug for bug_id, bug in all_bugs.items() if bug_id not in new_ids } + return new_bugs, existing_bugs - def update_bugs(self, bugs: BugsById) -> None: - res = [self.convert_bug_data(bug) for bug in bugs.values()] - - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - schema=[ - bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("title", "STRING", mode="REQUIRED"), - bigquery.SchemaField("status", "STRING", mode="REQUIRED"), - bigquery.SchemaField("resolution", "STRING", mode="REQUIRED"), - bigquery.SchemaField("product", "STRING", mode="REQUIRED"), - bigquery.SchemaField("component", "STRING", mode="REQUIRED"), - bigquery.SchemaField("creator", "STRING", mode="REQUIRED"), - bigquery.SchemaField("severity", "INTEGER"), - bigquery.SchemaField("priority", "INTEGER"), - bigquery.SchemaField("creation_time", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField("assigned_to", "STRING"), - bigquery.SchemaField("keywords", "STRING", mode="REPEATED"), - bigquery.SchemaField("url", "STRING"), - bigquery.SchemaField("user_story", "JSON"), - bigquery.SchemaField("user_story_raw", "STRING"), - bigquery.SchemaField("resolved_time", "TIMESTAMP"), - bigquery.SchemaField("whiteboard", "STRING"), - bigquery.SchemaField("webcompat_priority", "STRING"), - bigquery.SchemaField("webcompat_score", "INTEGER"), - ], - write_disposition="WRITE_TRUNCATE", - ) - - bugs_table = f"{self.bq_dataset_id}.bugzilla_bugs" + def merge_history(self, *sources: HistoryByBug) -> HistoryByBug: + history: defaultdict[int, list[BugHistoryEntry]] = defaultdict(list) + for source in sources: + for bug_id, changes in source.items(): + history[bug_id].extend(changes) + return history - job = self.client.load_table_from_json( - res, - bugs_table, - job_config=job_config, - ) + def new_bugs_history(self, new_bugs: BugsById) -> HistoryByBug: + history = self.bugzilla_fetch_history(new_bugs.keys()) + synthetic_history = self.create_synthetic_history(new_bugs, history) + return self.merge_history(history, synthetic_history) - logging.info("Writing to `bugzilla_bugs` table") + def existing_bugs_history(self, existing_bugs: BugsById) -> HistoryByBug: + last_import_time = self.bigquery_last_import() - try: - job.result() - except Exception as e: - print(f"ERROR: {e}") - if job.errors: - for error in job.errors: - logging.error(error) + if last_import_time is None: + logging.info("No previous history update found") + return {} - table = self.client.get_table(bugs_table) - logging.info(f"Loaded {table.num_rows} rows into {table}") + updated_bugs = { + bug_id + for bug_id, bug in existing_bugs.items() + if bug.last_change_time > last_import_time + } - def update_kb_ids(self, ids: Iterable[int]) -> None: - res = [{"number": kb_id} for kb_id in ids] + if not updated_bugs: + logging.info(f"No updated bygs since {last_import_time.isoformat()}") + return {} - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - schema=[ - bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), - ], - write_disposition="WRITE_TRUNCATE", + logging.info( + f"Fetching bugs {updated_bugs} updated since {last_import_time.isoformat()}" ) - kb_bugs_table = f"{self.bq_dataset_id}.kb_bugs" + bugs_full_history = self.bugzilla_fetch_history(updated_bugs) + # Filter down to only recent updates, since we always get the full history + bugs_history = {} + for bug_id, bug_full_history in bugs_full_history.items(): + bug_history = [ + item for item in bug_full_history if item.change_time > last_import_time + ] + if bug_history: + bugs_history[bug_id] = bug_history - job = self.client.load_table_from_json( - res, - kb_bugs_table, - job_config=job_config, - ) + return bugs_history - logging.info("Writing to `kb_bugs` table") + def missing_records( + self, existing_records: HistoryByBug, updates: HistoryByBug + ) -> HistoryByBug: + if not existing_records: + return updates - try: - job.result() - except Exception as e: - print(f"ERROR: {e}") - if job.errors: - for error in job.errors: - logging.error(error) + existing_history = set(self.flatten_history(existing_records)) + new_history = set(self.flatten_history(updates)) - table = self.client.get_table(kb_bugs_table) - logging.info(f"Loaded {table.num_rows} rows into {table}") + diff = new_history - existing_history - def update_relations( - self, relations: Relations, relation_config: RelationConfig - ) -> None: - for key, value in relations.items(): - if value: - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - schema=[ - bigquery.SchemaField( - item["name"], item["type"], mode=item["mode"] - ) - for item in relation_config[key]["fields"] - ], - write_disposition="WRITE_TRUNCATE", - ) + return self.unflatten_history(item for item in new_history if item in diff) - relation_table = f"{self.bq_dataset_id}.{key}" - job = self.client.load_table_from_json( - cast(Iterable[dict[str, Any]], value), - relation_table, - job_config=job_config, - ) - - logging.info(f"Writing to `{relation_table}` table") + def bigquery_fetch_imported_ids(self) -> set[int]: + query = f""" + SELECT number + FROM `{self.bq_dataset_id}.bugzilla_bugs` + """ + res = self.bq_client.query(query).result() + rows = list(res) - try: - job.result() - except Exception as e: - print(f"ERROR: {e}") - if job.errors: - for error in job.errors: - logging.error(error) + imported_ids = {bug["number"] for bug in rows} - table = self.client.get_table(relation_table) - logging.info(f"Loaded {table.num_rows} rows into {table}") + return imported_ids - def get_last_import_datetime(self) -> Optional[datetime]: + def bigquery_last_import(self) -> Optional[datetime]: query = f""" SELECT MAX(run_at) AS last_run_at FROM `{self.bq_dataset_id}.import_runs` WHERE is_history_fetch_completed = TRUE """ - res = self.client.query(query).result() + res = self.bq_client.query(query).result() row = list(res)[0] return row["last_run_at"] - def fetch_bugs_history( - self, ids: Iterable[int] - ) -> tuple[Mapping[int, list[bugdantic.bugzilla.History]], bool]: + def bugzilla_fetch_history(self, ids: Iterable[int]) -> HistoryByBug: history: dict[int, list[bugdantic.bugzilla.History]] = {} chunk_size = 100 ids_list = list(ids) @@ -836,161 +735,19 @@ def fetch_bugs_history( else: break - completed = len(ids_list) == 0 - return history, completed - - def serialize_history_entry(self, entry: BugHistoryEntry) -> dict[str, Any]: - return { - "number": entry.number, - "who": entry.who, - "change_time": entry.change_time.isoformat(), - "changes": [ - { - "field_name": change.field_name, - "added": change.added, - "removed": change.removed, - } - for change in entry.changes - ], - } - - def update_history( - self, records: list[BugHistoryEntry], recreate: bool = False - ) -> None: - if not records and not recreate: - logging.info("No history records to update") - return - - write_disposition = "WRITE_APPEND" if not recreate else "WRITE_TRUNCATE" - job_config = bigquery.LoadJobConfig( - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - schema=[ - bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("who", "STRING", mode="REQUIRED"), - bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField( - "changes", - "RECORD", - mode="REPEATED", - fields=[ - bigquery.SchemaField("field_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("added", "STRING", mode="REQUIRED"), - bigquery.SchemaField("removed", "STRING", mode="REQUIRED"), - ], - ), - ], - write_disposition=write_disposition, - ) - - history_table = f"{self.bq_dataset_id}.bugs_history" - - job = self.client.load_table_from_json( - (self.serialize_history_entry(item) for item in records), - history_table, - job_config=job_config, - ) - - logging.info("Writing to `bugs_history` table") - - try: - job.result() - except Exception as e: - print(f"ERROR: {e}") - if job.errors: - for error in job.errors: - logging.error(error) - - table = self.client.get_table(history_table) - logging.info(f"Loaded {len(records)} rows into {table}") - - def get_existing_history_records_by_ids( - self, bug_ids: Iterable[int] - ) -> list[BugHistoryEntry]: - formatted_numbers = ", ".join(str(bug_id) for bug_id in bug_ids) - - query = f""" - SELECT * - FROM `{self.bq_dataset_id}.bugs_history` - WHERE number IN ({formatted_numbers}) - """ - result = self.client.query(query).result() - return [ - BugHistoryEntry( - row["number"], - row["who"], - row["change_time"], - changes=[ - BugHistoryChange( - change["field_name"], change["added"], change["removed"] - ) - for change in row["changes"] - ], - ) - for row in result - ] - - def flatten_history(self, records: Iterable[BugHistoryEntry]) -> list[HistoryRow]: - history = [] - for record in records: - for change in record.changes: - history_row = HistoryRow( - record.number, - record.who, - record.change_time, - change.field_name, - change.added, - change.removed, - ) - history.append(history_row) - - return history - - def unflatten_history(self, diff: Sequence[HistoryRow]) -> list[BugHistoryEntry]: - changes: dict[tuple[int, str, datetime], BugHistoryEntry] = {} - for item in diff: - key = (item.number, item.who, item.change_time) - - if key not in changes: - changes[key] = BugHistoryEntry( - number=item.number, - who=item.who, - change_time=item.change_time, - changes=[], - ) - changes[key].changes.append( - BugHistoryChange( - field_name=item.field_name, - added=item.added, - removed=item.removed, - ) + if len(ids_list) != 0: + raise BugFetchError( + f"Failed to fetch bug history for {','.join(str(item) for item in ids_list)}" ) - return list(changes.values()) + return self.bugzilla_to_history_entry(history) - def filter_only_unsaved_changes( - self, history_updates: list[BugHistoryEntry], bug_ids: set[int] - ) -> list[BugHistoryEntry]: - existing_records = self.get_existing_history_records_by_ids(bug_ids) - - if not existing_records: - return history_updates - - existing_history = self.flatten_history(existing_records) - new_history = self.flatten_history(history_updates) - - diff = set(new_history) - set(existing_history) - - return self.unflatten_history([item for item in new_history if item in diff]) - - def extract_history_fields( + def bugzilla_to_history_entry( self, updated_history: Mapping[int, list[bugdantic.bugzilla.History]] - ) -> tuple[list[BugHistoryEntry], set[int]]: - result = [] - bug_ids = set() + ) -> HistoryByBug: + rv: defaultdict[int, list[BugHistoryEntry]] = defaultdict(list) for bug_id, history in updated_history.items(): - filtered_changes = [] - for record in history: relevant_changes = [ BugHistoryChange( @@ -1010,257 +767,340 @@ def extract_history_fields( change_time=record.when, changes=relevant_changes, ) - filtered_changes.append(filtered_record) - bug_ids.add(bug_id) - - if filtered_changes: - result.extend(filtered_changes) - - return result, bug_ids - - def filter_relevant_history( - self, updated_history: Mapping[int, list[bugdantic.bugzilla.History]] - ) -> list[BugHistoryEntry]: - only_unsaved_changes = [] - result, bug_ids = self.extract_history_fields(updated_history) - - if result: - only_unsaved_changes = self.filter_only_unsaved_changes(result, bug_ids) - - return only_unsaved_changes + rv[bug_id].append(filtered_record) - def get_bugs_updated_since_last_import( - self, all_bugs: BugsById, last_import_time: datetime - ) -> set[int]: - return { - bug["id"] - for bug in all_bugs.values() - if bug["last_change_time"] > last_import_time - } + return rv - def get_imported_ids(self) -> set[int]: + def bigquery_fetch_history(self, bug_ids: Iterable[int]) -> HistoryByBug: + rv: defaultdict[int, list[BugHistoryEntry]] = defaultdict(list) + formatted_numbers = ", ".join(str(bug_id) for bug_id in bug_ids) query = f""" - SELECT number - FROM `{self.bq_dataset_id}.bugzilla_bugs` - """ - res = self.client.query(query).result() - rows = list(res) - - imported_ids = {bug["number"] for bug in rows} - - return imported_ids - - def create_keyword_map( - self, history: list[BugHistoryEntry] - ) -> Mapping[int, Mapping[str, Mapping[str, list[datetime]]]]: - keyword_history: dict[int, dict[str, dict[str, list[datetime]]]] = {} - - for record in history: - bug_id = record.number - timestamp = record.change_time - - for change in record.changes: - if change.field_name == "keywords": - if bug_id not in keyword_history: - keyword_history[bug_id] = {"added": {}, "removed": {}} - - keyword_records = keyword_history[bug_id] - - for action in ["added", "removed"]: - keywords = getattr(change, action) - if keywords: - for keyword in keywords.split(", "): - if keyword not in keyword_records[action]: - keyword_records[action][keyword] = [] + SELECT * + FROM `{self.bq_dataset_id}.bugs_history` + WHERE number IN ({formatted_numbers}) + """ + result = self.bq_client.query(query).result() + for row in result: + rv[row["number"]].append( + BugHistoryEntry( + row["number"], + row["who"], + row["change_time"], + changes=[ + BugHistoryChange( + change["field_name"], change["added"], change["removed"] + ) + for change in row["changes"] + ], + ) + ) + return rv + + def keyword_history( + self, history: HistoryByBug + ) -> Mapping[BugId, Mapping[str, PropertyHistory]]: + """Get the time each keyword has been added and removed from each bug""" + keyword_history: defaultdict[int, dict[str, PropertyHistory]] = defaultdict( + dict + ) - keyword_records[action][keyword].append(timestamp) + for bug_id, records in history.items(): + for record in records: + for change in record.changes: + if change.field_name == "keywords": + for src, change_type in [ + (change.added, PropertyChange.added), + (change.removed, PropertyChange.removed), + ]: + for keyword in src.split(", "): + if keyword not in keyword_history[bug_id]: + keyword_history[bug_id][keyword] = PropertyHistory() + keyword_history[bug_id][keyword].add( + change_time=record.change_time, change=change_type + ) return keyword_history - def is_removed_earliest( - self, added_times: list[datetime], removed_times: list[datetime] - ) -> bool: - events = [(at, "added") for at in added_times] + [ - (rt, "removed") for rt in removed_times - ] - events.sort() - - if not events: - return False - - return events[0][1] == "removed" - def get_missing_keywords( self, bug_id: int, current_keywords: list[str], - keyword_history: Mapping[int, Mapping[str, Mapping[str, list[datetime]]]], - ) -> list[str]: - missing_keywords = [] + keyword_history: Mapping[BugId, Mapping[str, PropertyHistory]], + ) -> set[str]: + missing_keywords = set() # Check if keyword exists, but is not in "added" history for keyword in current_keywords: - if bug_id not in keyword_history or keyword not in keyword_history[ - bug_id - ].get("added", {}): - if keyword not in missing_keywords: - missing_keywords.append(keyword) + if bug_id not in keyword_history or keyword not in keyword_history[bug_id]: + missing_keywords.add(keyword) # Check for keywords that have "removed" record as the earliest # event in the sorted timeline if bug_id in keyword_history: - for keyword, removed_times in ( - keyword_history[bug_id].get("removed", {}).items() - ): - added_times = keyword_history[bug_id].get("added", {}).get(keyword, []) - - removed_earliest = self.is_removed_earliest(added_times, removed_times) - - if removed_earliest and keyword not in missing_keywords: - missing_keywords.append(keyword) + for keyword, history in keyword_history[bug_id].items(): + if history.missing_initial_add(): + missing_keywords.add(keyword) return missing_keywords - def build_missing_history( - self, bugs_without_history: Iterable[tuple[Bug, list[str]]] - ) -> list[BugHistoryEntry]: - result: list[BugHistoryEntry] = [] - for bug, missing_keywords in bugs_without_history: - record = BugHistoryEntry( - number=bug["id"], - who=bug["creator"], - change_time=bug["creation_time"], - changes=[ - BugHistoryChange( - added=", ".join(missing_keywords), - field_name="keywords", - removed="", - ) - ], - ) - result.append(record) - return result - def create_synthetic_history( - self, bugs: BugsById, history: list[BugHistoryEntry] - ) -> list[BugHistoryEntry]: - keyword_history = self.create_keyword_map(history) + self, all_bugs: BugsById, history: HistoryByBug + ) -> HistoryByBug: + """Backfill history entries for bug creation. - bugs_without_history = [] - - for bug_id, bug in bugs.items(): - current_keywords = bug["keywords"] + If a bug has keywords set, but there isn't a history entry corresponding + to the keyword being added, we assume they were set on bug creation, and + create a history entry to represent that.""" + result: dict[int, list[BugHistoryEntry]] = {} + keyword_history = self.keyword_history(history) + for bug_id, bug in all_bugs.items(): missing_keywords = self.get_missing_keywords( - bug_id, current_keywords, keyword_history + bug_id, bug.keywords, keyword_history ) if missing_keywords: - bugs_without_history.append((bug, missing_keywords)) + record = BugHistoryEntry( + number=bug.id, + who=bug.creator, + change_time=bug.creation_time, + changes=[ + BugHistoryChange( + added=", ".join(missing_keywords), + field_name="keywords", + removed="", + ) + ], + ) + result[bug.id] = [record] + return result - return self.build_missing_history(bugs_without_history) + def flatten_history(self, history: HistoryByBug) -> Iterable[HistoryChange]: + for records in history.values(): + for record in records: + for change in record.changes: + yield HistoryChange( + record.number, + record.who, + record.change_time, + change.field_name, + change.added, + change.removed, + ) - def fetch_history_for_new_bugs( - self, all_bugs: BugsById, recreate: bool = False - ) -> tuple[list[BugHistoryEntry], set[int], bool]: - only_unsaved_changes: list[BugHistoryEntry] = [] + def unflatten_history(self, diff: Iterable[HistoryChange]) -> HistoryByBug: + changes: dict[tuple[int, str, datetime], BugHistoryEntry] = {} + for item in diff: + key = (item.number, item.who, item.change_time) - all_ids = set(all_bugs.keys()) - if not recreate: - existing_ids = self.get_imported_ids() - new_ids = all_ids - existing_ids - else: - new_ids = all_ids + if key not in changes: + changes[key] = BugHistoryEntry( + number=item.number, + who=item.who, + change_time=item.change_time, + changes=[], + ) + changes[key].changes.append( + BugHistoryChange( + field_name=item.field_name, + added=item.added, + removed=item.removed, + ) + ) - logging.info(f"Fetching new bugs history: {list(new_ids)}") + rv: dict[int, list[BugHistoryEntry]] = {} + for change in changes.values(): + if change.number not in rv: + rv[change.number] = [] + rv[change.number].append(change) + return rv - new_bugs = { - bug_id: bug for bug_id, bug in all_bugs.items() if bug_id in new_ids - } - bugs_history, completed = self.fetch_bugs_history(new_bugs.keys()) - if not completed: - return only_unsaved_changes, new_ids, False +class BigQueryImporter: + """Class to handle all writes to BigQuery""" - history, _ = self.extract_history_fields(bugs_history) + def __init__(self, client: bigquery.Client, bq_dataset_id: str, write: bool): + self.client = client + self.bq_dataset_id = bq_dataset_id + self.write = write - synthetic_history = self.create_synthetic_history(new_bugs, history) + def write_table( + self, + table: str, + schema: list[bigquery.SchemaField], + rows: Sequence[Mapping[str, Any]], + overwrite: bool, + ) -> None: + ensure_table(self.client, self.bq_dataset_id, table, False) + table = f"{self.bq_dataset_id}.{table}" - new_bugs_history = history + synthetic_history + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + schema=schema, + write_disposition="WRITE_APPEND" if not overwrite else "WRITE_TRUNCATE", + ) - if new_bugs_history: - only_unsaved_changes = self.filter_only_unsaved_changes( - new_bugs_history, new_ids + if self.write: + job = self.client.load_table_from_json( + cast(Iterable[dict[str, Any]], rows), + table, + job_config=job_config, ) + job.result() + logging.info(f"Wrote {len(rows)} records into {table}") + else: + logging.info(f"Skipping writes, would have written {len(rows)} to {table}") + for row in rows: + logging.debug(f" {row}") - return only_unsaved_changes, new_ids, True - - def fetch_history_updates( - self, all_existing_bugs: BugsById - ) -> tuple[Mapping[int, list[bugdantic.bugzilla.History]], bool]: - last_import_time = self.get_last_import_datetime() + def convert_bug(self, bug: Bug) -> Mapping[str, Any]: + return { + "number": bug.id, + "title": bug.summary, + "status": bug.status, + "resolution": bug.resolution, + "product": bug.product, + "component": bug.component, + "creator": bug.creator, + "severity": bug.severity, + "priority": bug.priority, + "creation_time": bug.creation_time.isoformat(), + "assigned_to": bug.assigned_to, + "keywords": bug.keywords, + "url": bug.url, + "user_story": bug.parsed_user_story, + "user_story_raw": bug.user_story, + "resolved_time": bug.resolved.isoformat() + if bug.resolved is not None + else None, + "whiteboard": bug.whiteboard, + "webcompat_priority": bug.webcompat_priority, + "webcompat_score": bug.webcompat_score, + "depends_on": bug.depends_on, + "blocks": bug.blocks + } - if last_import_time is not None: - updated_bug_ids = self.get_bugs_updated_since_last_import( - all_existing_bugs, last_import_time - ) + def convert_history_entry(self, entry: BugHistoryEntry) -> Mapping[str, Any]: + return { + "number": entry.number, + "who": entry.who, + "change_time": entry.change_time.isoformat(), + "changes": [ + { + "field_name": change.field_name, + "added": change.added, + "removed": change.removed, + } + for change in entry.changes + ], + } - logging.info( - f"Fetching bugs updated after last import: {updated_bug_ids} at {last_import_time.strftime('%Y-%m-%dT%H:%M:%SZ')}" # noqa - ) + def insert_bugs(self, all_bugs: BugsById) -> None: + table = "bugzilla_bugs" + schema = [ + bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("title", "STRING", mode="REQUIRED"), + bigquery.SchemaField("status", "STRING", mode="REQUIRED"), + bigquery.SchemaField("resolution", "STRING", mode="REQUIRED"), + bigquery.SchemaField("product", "STRING", mode="REQUIRED"), + bigquery.SchemaField("component", "STRING", mode="REQUIRED"), + bigquery.SchemaField("creator", "STRING", mode="REQUIRED"), + bigquery.SchemaField("severity", "INTEGER"), + bigquery.SchemaField("priority", "INTEGER"), + bigquery.SchemaField("creation_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("assigned_to", "STRING"), + bigquery.SchemaField("keywords", "STRING", mode="REPEATED"), + bigquery.SchemaField("url", "STRING"), + bigquery.SchemaField("user_story", "JSON"), + bigquery.SchemaField("user_story_raw", "STRING"), + bigquery.SchemaField("resolved_time", "TIMESTAMP"), + bigquery.SchemaField("whiteboard", "STRING"), + bigquery.SchemaField("webcompat_priority", "STRING"), + bigquery.SchemaField("webcompat_score", "INTEGER"), + bigquery.SchemaField("depends_on", "INTEGER", mode="REPEATED"), + bigquery.SchemaField("blocks", "INTEGER", mode="REPEATED"), + ] + rows = [self.convert_bug(bug) for bug in all_bugs.values()] + self.write_table(table, schema, rows, overwrite=True) - if updated_bug_ids: - bugs_full_history, completed = self.fetch_bugs_history(updated_bug_ids) - # Filter down to only recent updates, since we always get the full history - bugs_history = {} - for bug_id, bug_full_history in bugs_full_history.items(): - bug_history = [ - item - for item in bug_full_history - if item.when > last_import_time - ] - if bug_history: - bugs_history[bug_id] = bug_history - - return bugs_history, completed - - logging.warning("No previous history update found") - - return {}, True - - def fetch_bug_history( - self, all_bugs: BugsById, recreate: bool = False - ) -> tuple[list[BugHistoryEntry], bool]: - filtered_new_history, new_ids, completed = self.fetch_history_for_new_bugs( - all_bugs, recreate - ) - if not completed: - return [], False + def insert_history_changes( + self, history_entries: HistoryByBug, recreate: bool + ) -> None: + schema = [ + bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("who", "STRING", mode="REQUIRED"), + bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField( + "changes", + "RECORD", + mode="REPEATED", + fields=[ + bigquery.SchemaField("field_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("added", "STRING", mode="REQUIRED"), + bigquery.SchemaField("removed", "STRING", mode="REQUIRED"), + ], + ), + ] - existing_bugs = { - bug_id: bug for bug_id, bug in all_bugs.items() if bug_id not in new_ids - } + rows = [ + self.convert_history_entry(entry) + for entries in history_entries.values() + for entry in entries + ] + self.write_table("bugs_history", schema, rows, overwrite=recreate) - existing_bugs_history, completed = self.fetch_history_updates(existing_bugs) - if not completed: - return [], False + def insert_bug_list( + self, table_name: str, field_name: str, bugs: Iterable[BugId] + ) -> None: + schema = [bigquery.SchemaField(field_name, "INTEGER", mode="REQUIRED")] + rows = [{field_name: bug_id} for bug_id in bugs] + self.write_table(table_name, schema, rows, overwrite=True) - if filtered_new_history or existing_bugs_history: - filtered_existing = self.filter_relevant_history(existing_bugs_history) - filtered_records = filtered_existing + filtered_new_history - return filtered_records, True + def insert_bug_links( + self, link_config: BugLinkConfig, links_by_bug: Mapping[BugId, Iterable[BugId]] + ) -> None: + schema = [ + bigquery.SchemaField( + link_config.from_field_name, "INTEGER", mode="REQUIRED" + ), + bigquery.SchemaField(link_config.to_field_name, "INTEGER", mode="REQUIRED"), + ] + rows = [ + { + link_config.from_field_name: from_bug_id, + link_config.to_field_name: to_bug_id, + } + for from_bug_id, to_bug_ids in links_by_bug.items() + for to_bug_id in to_bug_ids + ] + self.write_table(link_config.table_name, schema, rows, overwrite=True) - logging.info("No relevant history updates") - return [], True + def insert_external_links( + self, + link_config: ExternalLinkConfig, + links_by_bug: Mapping[BugId, Iterable[str]], + ) -> None: + schema = [ + bigquery.SchemaField("knowledge_base_bug", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField(link_config.field_name, "STRING", mode="REQUIRED"), + ] + rows = [ + {"knowledge_base_bug": bug_id, link_config.field_name: link_text} + for bug_id, links in links_by_bug.items() + for link_text in links + ] + self.write_table(link_config.table_name, schema, rows, overwrite=True) def record_import_run( self, start_time: float, - history_fetch_completed: bool, count: int, - history_count: int, + history_count: Optional[int], last_change_time: datetime, ) -> None: + if not self.write: + return + elapsed_time = time.monotonic() - start_time elapsed_time_delta = timedelta(seconds=elapsed_time) run_at = last_change_time - elapsed_time_delta @@ -1270,8 +1110,10 @@ def record_import_run( { "run_at": formatted_time, "bugs_imported": count, - "bugs_history_updated": history_count, - "is_history_fetch_completed": history_fetch_completed, + "bugs_history_updated": history_count + if history_count is not None + else 0, + "is_history_fetch_completed": history_count is not None, }, ] bugbug_runs_table = f"{self.bq_dataset_id}.import_runs" @@ -1281,103 +1123,195 @@ def record_import_run( else: logging.info("Last import run recorded") - def run(self) -> None: - start_time = time.monotonic() - fetch_all_result = self.fetch_all_bugs() +def get_kb_entries(all_bugs: BugsById, site_report_blockers: set[BugId]) -> set[BugId]: + direct_kb_entries = {bug_id for bug_id, bug in all_bugs.items() if is_kb_entry(bug)} + kb_blockers = { + dependency + for bug_id in direct_kb_entries + for dependency in all_bugs[bug_id].depends_on + } + # We include any bug that's blocking a site report but isn't in Web Compatibility + platform_site_report_blockers = { + bug_id + for bug_id in site_report_blockers + if bug_id not in kb_blockers and all_bugs[bug_id].product != "Web Compatibility" + } + # We also include all other bugs that are platform bugs but don't depend on a kb entry + # TODO: This is probably too many bugs; platform bugs that don't block any site reports + # should likely be excluded + platform_kb_entries = { + bug_id + for bug_id in all_bugs + if bug_id not in kb_blockers and is_webcompat_platform_bug(all_bugs[bug_id]) + } + return direct_kb_entries | platform_site_report_blockers | platform_kb_entries + + +def group_bugs( + all_bugs: BugsById, +) -> tuple[set[BugId], set[BugId], set[BugId], set[BugId]]: + """Extract groups of bugs according to their types""" + site_reports = {bug_id for bug_id, bug in all_bugs.items() if is_site_report(bug)} + etp_reports = {bug_id for bug_id, bug in all_bugs.items() if is_etp_report(bug)} + site_report_blockers = { + dependency + for bug_id in site_reports + for dependency in all_bugs[bug_id].depends_on + } + assert site_report_blockers.issubset(all_bugs.keys()) + kb_bugs = get_kb_entries(all_bugs, site_report_blockers) + platform_bugs = { + bug_id + for bug_id in all_bugs + if all_bugs[bug_id].product != "Web Compatibility" + } + return site_reports, etp_reports, kb_bugs, platform_bugs + + +def write_data( + path: str, + all_bugs: BugsById, + site_reports: set[BugId], + etp_reports: set[BugId], + kb_bugs: set[BugId], + platform_bugs: set[BugId], + bug_links: Iterable[tuple[BugLinkConfig, Mapping[BugId, set[BugId]]]], + external_links: Iterable[tuple[ExternalLinkConfig, Mapping[BugId, list[str]]]], +) -> None: + data: dict[str, Any] = {} + data["all_bugs"] = {bug_id: bug.to_json() for bug_id, bug in all_bugs.items()} + data["site_report"] = list(site_reports) + data["etp_reports"] = list(etp_reports) + data["kb_bugs"] = list(kb_bugs) + data["platform_bugs"] = list(platform_bugs) + for bug_link_config, link_data in bug_links: + data[bug_link_config.table_name] = { + bug_id: list(values) for bug_id, values in link_data.items() + } + for external_link_config, external_link_data in external_links: + data[external_link_config.table_name] = { + bug_id: list(values) for bug_id, values in external_link_data.items() + } + + with open("path", "w") as f: + json.dump(data, f) + + +def run( + client: bigquery.Client, + bq_dataset_id: str, + bz_client: bugdantic.Bugzilla, + write: bool, + include_history: bool, + recreate_history: bool, + write_bug_data_path: Optional[str], + load_bug_data_path: Optional[str], +) -> None: + start_time = time.monotonic() + + try: + if load_bug_data_path: + with open(load_bug_data_path) as f: + data = json.load(f) + all_bugs: Mapping[int, Bug] = {int(bug_id): Bug.from_json(bug_data) for bug_id, bug_data in data["all_bugs"].items()} + else: + all_bugs = fetch_all_bugs(bz_client) + except Exception as e: + raise BugFetchError( + "Fetching bugs from Bugzilla was not completed due to an error, aborting." + ) from e + + history_changes = None + if include_history: + history_updater = BugHistoryUpdater( + client, bq_dataset_id, bz_client, recreate_history + ) + try: + history_changes = history_updater.run(all_bugs) + except Exception as e: + logging.error(f"Exception updating history: {e}") + raise + else: + logging.info("Not updating bug history") - if fetch_all_result is None: - raise BugFetchError( - "Fetching bugs from Bugzilla was not completed due to an error, aborting." - ) + site_reports, etp_reports, kb_bugs, platform_bugs = group_bugs(all_bugs) + # Links between different kinds of bugs + bug_links = [ + ( + BugLinkConfig("breakage_reports", "knowledge_base_bug", "breakage_bug"), + get_kb_bug_site_report(all_bugs, kb_bugs, site_reports), + ), + ( + BugLinkConfig("core_bugs", "knowledge_base_bug", "core_bug"), + get_kb_bug_core_bugs(all_bugs, kb_bugs, platform_bugs), + ), ( + BugLinkConfig("etp_breakage_reports", "breakage_bug", "etp_meta_bug"), + get_etp_breakage_reports(all_bugs, etp_reports), + ), + ] + + # Links between bugs and external data sources + external_links = [ + (config, get_links(all_bugs, kb_bugs, config)) + for config in [ + ExternalLinkConfig( + "interventions", + "code_url", + ["github.com/mozilla-extensions/webcompat-addon"], + ), + ExternalLinkConfig( + "other_browser_issues", + "issue_url", + ["bugs.chromium.org", "bugs.webkit.org", "crbug.com"], + ), + ExternalLinkConfig( + "standards_issues", + "issue_url", + ["github.com/w3c", "github.com/whatwg", "github.com/wicg"], + ), + ExternalLinkConfig( + "standards_positions", "discussion_url", ["standards-positions"] + ), + ] + ] + + if write_bug_data_path: + write_data( + write_bug_data_path, all_bugs, site_reports, + etp_reports, kb_bugs, platform_bugs, - etp_reports, - etp_dependencies, - ) = fetch_all_result - - # Add platform bugs that should be imported as knowledge base bugs (with some - # modifications to their dependencies) - kb_bugs.update( - self.kb_bugs_from_platform_bugs( - platform_bugs, set(kb_bugs.keys()), set(site_reports.keys()) - ) + bug_links, + external_links, ) - # Process KB bugs fields and get their dependant core/breakage bugs ids. - kb_data, kb_dep_ids = self.process_relations(kb_bugs, RELATION_CONFIG) - self.add_kb_entry_breakage(kb_data, kb_dep_ids, site_reports) + last_change_time_max = max(bug.last_change_time for bug in all_bugs.values()) - fetch_missing_result = self.fetch_missing_deps(all_bugs, kb_dep_ids) - if fetch_missing_result is None: - raise BugFetchError( - "Fetching missing dependencies from Bugzilla was not completed due to an error, aborting." - ) + # Finally do the actual import + importer = BigQueryImporter(client, bq_dataset_id, write) + importer.insert_bugs(all_bugs) + if history_changes is not None: + importer.insert_history_changes(history_changes, recreate=recreate_history) - missing_bugs, core_missing = fetch_missing_result - - platform_bugs.update(core_missing) - all_bugs.update(missing_bugs) - - # Process core bugs and update KB data with missing links from core bugs. - if platform_bugs: - core_data, _ = self.process_relations( - platform_bugs, PLATFORM_RELATION_CONFIG - ) - kb_data = self.add_links(kb_data, core_data) + importer.insert_bug_list("kb_bugs", "number", kb_bugs) - # Build relations for BQ tables. - rels = self.build_relations(kb_data, RELATION_CONFIG) - - kb_ids = list(kb_data.keys()) - - if self.include_history: - history_changes, history_fetch_completed = self.fetch_bug_history( - all_bugs, self.recreate_history - ) - else: - logging.info("Not updating bug history") - history_changes = [] - history_fetch_completed = False - - etp_rels: Mapping[str, list[Mapping[str, Any]]] = {} - if etp_reports: - etp_reports_unified = self.unify_etp_dependencies( - etp_reports, etp_dependencies - ) - etp_data, _ = self.process_relations( - etp_reports_unified, ETP_RELATION_CONFIG - ) + for bug_link_config, data in bug_links: + importer.insert_bug_links(bug_link_config, data) - etp_rels = self.build_relations(etp_data, ETP_RELATION_CONFIG) + for external_link_config, links_by_bug in external_links: + importer.insert_external_links(external_link_config, links_by_bug) - if self.write: - if history_fetch_completed: - self.update_history(history_changes, self.recreate_history) - elif self.include_history: - logging.warning("Failed to fetch bug history, not updating") - self.update_bugs(all_bugs) - self.update_kb_ids(kb_ids) - self.update_relations(rels, RELATION_CONFIG) - self.update_relations(etp_rels, ETP_RELATION_CONFIG) - - last_change_time_max = max( - all_bugs.values(), key=lambda x: x["last_change_time"] - )["last_change_time"] - - self.record_import_run( - start_time, - history_fetch_completed, - len(all_bugs), - len(history_changes), - last_change_time_max, - ) - else: - logging.info("Skipping writes") + importer.record_import_run( + start_time, + len(all_bugs), + len(history_changes) if history_changes is not None else None, + last_change_time_max, + ) class BugzillaJob(EtlJob): @@ -1405,15 +1339,32 @@ def add_arguments(cls, parser: argparse.ArgumentParser) -> None: action="store_true", help="Re-read bug history from scratch", ) + group.add_argument( + "--bugzilla-write-bug-data", + action="store", + help="Path to write bug data as a JSON file", + ) + group.add_argument( + "--bugzilla-load-bug-data", + action="store", + help="Path to JSON file to load bug data from", + ) def main(self, client: bigquery.Client, args: argparse.Namespace) -> None: - bz_bq = BugzillaToBigQuery( + bz_config = bugdantic.BugzillaConfig( + "https://bugzilla.mozilla.org", + args.bugzilla_api_key, + allow_writes=args.write, + ) + bz_client = bugdantic.Bugzilla(bz_config) + + run( client, args.bq_kb_dataset, - args.bugzilla_api_key, + bz_client, args.write, args.bugzilla_include_history, args.bugzilla_recreate_history, + args.bugzilla_write_bug_data, + args.bugzilla_load_bug_data, ) - - bz_bq.run() diff --git a/jobs/webcompat-kb/webcompat_kb/metric_changes.py b/jobs/webcompat-kb/webcompat_kb/metric_changes.py index 78f6bf7e..376249f9 100644 --- a/jobs/webcompat-kb/webcompat_kb/metric_changes.py +++ b/jobs/webcompat-kb/webcompat_kb/metric_changes.py @@ -9,7 +9,8 @@ from google.cloud import bigquery from .base import EtlJob -from .bugzilla import parse_string_to_json +from .bqhelpers import ensure_table +from .bugzilla import parse_user_story FIXED_STATES = {"RESOLVED", "VERIFIED"} @@ -62,23 +63,6 @@ class ScoreChange: reasons: list[str] -def ensure_table(client: bigquery.Client, bq_dataset_id: str, recreate: bool) -> None: - table_id = f"{client.project}.{bq_dataset_id}.webcompat_topline_metric_changes" - table = bigquery.Table( - table_id, - schema=[ - bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("who", "STRING", mode="REQUIRED"), - bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField("score_delta", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("reasons", "STRING", mode="REPEATED"), - ], - ) - if recreate: - client.delete_table(table, not_found_ok=True) - client.create_table(table, exists_ok=True) - - def get_last_recorded_date(client: bigquery.Client, bq_dataset_id: str) -> datetime: query = f""" SELECT change_time @@ -343,7 +327,7 @@ def compute_historic_scores( "index": i, "keywords": state.keywords, "url": state.url, - "user_story": parse_string_to_json(state.user_story), + "user_story": parse_user_story(state.user_story), } ) @@ -371,10 +355,10 @@ def compute_historic_scores( client.delete_table(tmp_name) for bug_id, computed_scores in rv.items(): - current_score = current_scores.get(bug_id, 0) + current_score = float(current_scores.get(bug_id, 0)) if ( computed_scores[0] != current_score and states[0].status not in FIXED_STATES - ) or bug_id == 1953996: + ): history_logging = "\n".join( f" {score}: {state}" for score, state in zip(computed_scores, historic_states[bug_id]) @@ -521,12 +505,11 @@ def insert_score_changes( ) if write: - job = client.load_table_from_json( + client.load_table_from_json( rows, changes_table, job_config=job_config, - ) - job.result() + ).result() logging.info(f"Wrote {len(rows)} records into {changes_table}") else: logging.info("Skipping writes, would have written:") @@ -537,8 +520,9 @@ def insert_score_changes( def update_metric_changes( client: bigquery.Client, bq_dataset_id: str, write: bool, recreate: bool ) -> None: - ensure_table(client, bq_dataset_id, recreate) + ensure_table(client, bq_dataset_id, "webcompat_topline_metric_changes", recreate) last_recorded_date = get_last_recorded_date(client, bq_dataset_id) + logging.info(f"Last change time {last_recorded_date}") changes_by_bug = get_bug_changes(client, bq_dataset_id, last_recorded_date) current_bug_data = get_bugs( client, bq_dataset_id, last_recorded_date, iter(changes_by_bug.keys())