From cd2809a5fd99d413b8d2df24fe7ee55d02b86830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Zaffarano?= Date: Wed, 5 Feb 2025 15:10:39 +0100 Subject: [PATCH 1/4] Group by owner --- elementary/cli/cli.py | 8 +- elementary/config/config.py | 6 +- .../monitor/alerts/alerts_groups/__init__.py | 2 + .../alerts/alerts_groups/grouped_by_owner.py | 50 +++++++++ elementary/monitor/alerts/grouping_type.py | 1 + elementary/monitor/cli.py | 5 +- .../alerts/data_monitoring_alerts.py | 18 ++- .../alerts/integrations/README.md | 11 +- .../alerts/integrations/base_integration.py | 11 ++ .../integrations/slack/message_builder.py | 3 +- .../alerts/integrations/slack/slack.py | 106 +++++++++++++++++- .../alerts/integrations/teams/teams.py | 98 ++++++++++++++++ .../alerts/integrations/utils/report_link.py | 17 +++ 13 files changed, 320 insertions(+), 16 deletions(-) create mode 100644 elementary/monitor/alerts/alerts_groups/grouped_by_owner.py diff --git a/elementary/cli/cli.py b/elementary/cli/cli.py index 3bb39662f..b0377bd1b 100644 --- a/elementary/cli/cli.py +++ b/elementary/cli/cli.py @@ -38,12 +38,12 @@ class ElementaryCLI(click.MultiCommand): "run-operation": run_operation, } - def list_commands(self, ctx): - return self._CMD_MAP.keys() + def list_commands(self, ctx) -> list[str]: + return list(self._CMD_MAP.keys()) - def get_command(self, ctx, name): + def get_command(self, ctx, cmd_name): ctx.auto_envvar_prefix = "EDR" - return self._CMD_MAP.get(name) + return self._CMD_MAP.get(cmd_name) def format_help(self, ctx, formatter): try: diff --git a/elementary/config/config.py b/elementary/config/config.py index 3eaefef46..167a8dca6 100644 --- a/elementary/config/config.py +++ b/elementary/config/config.py @@ -223,11 +223,13 @@ def has_send_report_platform(self): @property def has_slack(self) -> bool: - return self.slack_webhook or (self.slack_token and self.slack_channel_name) + return self.slack_webhook is not None or ( + self.slack_token is not None and self.slack_channel_name is not None + ) @property def has_teams(self) -> bool: - return self.teams_webhook + return self.teams_webhook is not None @property def has_s3(self): diff --git a/elementary/monitor/alerts/alerts_groups/__init__.py b/elementary/monitor/alerts/alerts_groups/__init__.py index 00cd4b513..bc1468d53 100644 --- a/elementary/monitor/alerts/alerts_groups/__init__.py +++ b/elementary/monitor/alerts/alerts_groups/__init__.py @@ -1,9 +1,11 @@ from .alerts_group import AlertsGroup from .base_alerts_group import BaseAlertsGroup +from .grouped_by_owner import GroupedByOwnerAlerts from .grouped_by_table import GroupedByTableAlerts __all__ = [ "AlertsGroup", "BaseAlertsGroup", "GroupedByTableAlerts", + "GroupedByOwnerAlerts", ] diff --git a/elementary/monitor/alerts/alerts_groups/grouped_by_owner.py b/elementary/monitor/alerts/alerts_groups/grouped_by_owner.py new file mode 100644 index 000000000..ff5c4e1e0 --- /dev/null +++ b/elementary/monitor/alerts/alerts_groups/grouped_by_owner.py @@ -0,0 +1,50 @@ +from typing import Dict, List, Optional, Union + +from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup +from elementary.monitor.alerts.model_alert import ModelAlertModel +from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel +from elementary.monitor.alerts.test_alert import TestAlertModel +from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import ( + ReportLinkData, + get_owner_test_runs_link, +) + + +class GroupedByOwnerAlerts(AlertsGroup): + owner: Optional[str] + + def __init__( + self, + owner: Optional[str], + alerts: List[Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]], + ) -> None: + super().__init__(alerts) + self.owner = owner + + @property + def report_url(self) -> Optional[str]: + return self.alerts[0].report_url + + @property + def summary(self) -> str: + return f"{self.owner}: {len(self.alerts)} issues detected" + + def get_report_link(self) -> Optional[ReportLinkData]: + if not self.model_errors: + return get_owner_test_runs_link(self.report_url, self.owner) + + return None + + @property + def unified_meta(self) -> Dict: + model_unified_meta = {} + test_unified_meta = {} + for alert in self.alerts: + alert_unified_meta = alert.unified_meta + if alert_unified_meta: + if isinstance(alert, ModelAlertModel): + model_unified_meta = alert_unified_meta + break + + test_unified_meta = alert_unified_meta + return model_unified_meta or test_unified_meta diff --git a/elementary/monitor/alerts/grouping_type.py b/elementary/monitor/alerts/grouping_type.py index 11537b806..bb36a8672 100644 --- a/elementary/monitor/alerts/grouping_type.py +++ b/elementary/monitor/alerts/grouping_type.py @@ -4,3 +4,4 @@ class GroupingType(str, Enum): BY_ALERT = "alert" BY_TABLE = "table" + BY_OWNER = "owner" diff --git a/elementary/monitor/cli.py b/elementary/monitor/cli.py index 019293f1f..7619d6bd5 100644 --- a/elementary/monitor/cli.py +++ b/elementary/monitor/cli.py @@ -3,6 +3,7 @@ import click from elementary.config.config import Config +from elementary.monitor.alerts.grouping_type import GroupingType from elementary.monitor.data_monitoring.alerts.data_monitoring_alerts import ( DataMonitoringAlerts, ) @@ -241,9 +242,9 @@ def get_cli_properties() -> dict: ) @click.option( "--group-by", - type=click.Choice(["alert", "table"]), + type=click.Choice(list(map(lambda e: e.value, GroupingType))), default=None, - help="Whether to group alerts by 'alert' or by 'table'", + help=f"Whether to group alerts by: {', '.join(map(lambda e: e.value, GroupingType))}", ) @click.option( "--override-dbt-project-config", diff --git a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py index 496dba5a8..29792e8e4 100644 --- a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py +++ b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py @@ -1,6 +1,6 @@ import json from collections import defaultdict -from datetime import datetime +from datetime import datetime, timezone from typing import DefaultDict, Dict, List, Optional, Union from alive_progress import alive_bar @@ -17,7 +17,10 @@ MessagingIntegrationError, ) from elementary.monitor.alerts.alert_messages.builder import AlertMessageBuilder -from elementary.monitor.alerts.alerts_groups import GroupedByTableAlerts +from elementary.monitor.alerts.alerts_groups import ( + GroupedByOwnerAlerts, + GroupedByTableAlerts, +) from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup from elementary.monitor.alerts.grouping_type import GroupingType from elementary.monitor.alerts.model_alert import ModelAlertModel @@ -159,7 +162,7 @@ def _get_suppressed_alerts( alerts_last_sent_times: Dict[str, datetime], ) -> List[str]: suppressed_alerts = [] - current_time_utc = convert_time_to_timezone(datetime.utcnow()) + current_time_utc = convert_time_to_timezone(datetime.now(timezone.utc)) for alert in alerts: alert_class_id = alert.alert_class_id suppression_interval = alert.data.get_suppression_interval( @@ -221,6 +224,7 @@ def _format_alerts( formatted_alerts = [] grouped_by_table_alerts = [] model_ids_to_alerts_map = defaultdict(lambda: []) + owner_to_alerts_map = defaultdict(lambda: []) default_alerts_group_by_strategy = GroupingType( self.config.slack_group_alerts_by @@ -248,6 +252,9 @@ def _format_alerts( model_ids_to_alerts_map[formatted_alert.model_unique_id].append( formatted_alert ) + elif grouping_type == GroupingType.BY_OWNER: + for owner in formatted_alert.owners: + owner_to_alerts_map[owner].append(formatted_alert) else: formatted_alerts.append(formatted_alert) except ValueError: @@ -266,7 +273,10 @@ def _format_alerts( alerts=alerts_by_model, env=self.config.specified_env ) ) - + for owner, alerts_by_owner in owner_to_alerts_map.items(): + grouped_by_table_alerts.append( + GroupedByOwnerAlerts(owner=owner, alerts=alerts_by_owner) + ) self.execution_properties["had_group_by_table"] = ( len(grouped_by_table_alerts) > 0 ) diff --git a/elementary/monitor/data_monitoring/alerts/integrations/README.md b/elementary/monitor/data_monitoring/alerts/integrations/README.md index c8aaa8ce7..64d477a56 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/README.md +++ b/elementary/monitor/data_monitoring/alerts/integrations/README.md @@ -122,7 +122,16 @@ The different alert types are: - Owners - Subscribers -#### 6. Fallback template (_method name:_ `_get_fallback_template`) +##### 6. Alerts grouped by owner (_method name:_ `_get_group_by_owner_template`) + +- link to report (filter by owner) +- Owner +- List of failures, using `alert.summary` to include model and test name. +- List of warnings, using `alert.summary` to include model and test name. +- Tags +- Subscribers + +#### 7. Fallback template (_method name:_ `_get_fallback_template`) We try to send the formatted message and in case it fails (due to a bug or API change) we send the fallback alert, which is usually a raw JSON of the alert object. You can find an example of this in the Slack integration (`elementary/monitor/data_monitoring/alerts/integrations/slack.py`). diff --git a/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py b/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py index e4781b5ab..83a859b93 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/base_integration.py @@ -3,6 +3,9 @@ from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup +from elementary.monitor.alerts.alerts_groups.grouped_by_owner import ( + GroupedByOwnerAlerts, +) from elementary.monitor.alerts.model_alert import ModelAlertModel from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel from elementary.monitor.alerts.test_alert import TestAlertModel @@ -45,6 +48,8 @@ def _get_alert_template( return self._get_source_freshness_template(alert) elif isinstance(alert, GroupedByTableAlerts): return self._get_group_by_table_template(alert) + elif isinstance(alert, GroupedByOwnerAlerts): + return self._get_group_by_owner_template(alert) elif isinstance(alert, BaseAlertsGroup): return self._get_alerts_group_template(alert) @@ -76,6 +81,12 @@ def _get_group_by_table_template( ): raise NotImplementedError + @abstractmethod + def _get_group_by_owner_template( + self, alert: GroupedByOwnerAlerts, *args, **kwargs + ): + raise NotImplementedError + @abstractmethod def _get_alerts_group_template(self, alert: BaseAlertsGroup, *args, **kwargs): raise NotImplementedError diff --git a/elementary/monitor/data_monitoring/alerts/integrations/slack/message_builder.py b/elementary/monitor/data_monitoring/alerts/integrations/slack/message_builder.py index 435d18684..b50560d32 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/slack/message_builder.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/slack/message_builder.py @@ -49,7 +49,8 @@ def add_preview_to_slack_alert( ): if preview_blocks: validated_preview_blocks = self._validate_preview_blocks(preview_blocks) - self._add_blocks_as_attachments(validated_preview_blocks) + if validated_preview_blocks: + self._add_blocks_as_attachments(validated_preview_blocks) def add_details_to_slack_alert( self, diff --git a/elementary/monitor/data_monitoring/alerts/integrations/slack/slack.py b/elementary/monitor/data_monitoring/alerts/integrations/slack/slack.py index a0e5ce30d..3f9f26765 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/slack/slack.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/slack/slack.py @@ -11,6 +11,9 @@ from elementary.config.config import Config from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup +from elementary.monitor.alerts.alerts_groups.grouped_by_owner import ( + GroupedByOwnerAlerts, +) from elementary.monitor.alerts.model_alert import ModelAlertModel from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel from elementary.monitor.alerts.test_alert import TestAlertModel @@ -23,6 +26,7 @@ ) from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import ( get_model_test_runs_link, + get_owner_test_runs_link, ) from elementary.tracking.tracking_interface import Tracking from elementary.utils.json_utils import ( @@ -687,8 +691,7 @@ def _get_source_freshness_template( [ self.message_builder.create_context_block(["*Result message*"]), self.message_builder.create_text_section_block( - f"Failed to calculate the source freshness\n" - f"```{alert.error}```" + f"Failed to calculate the source freshness\n```{alert.error}```" ), ] ) @@ -872,6 +875,104 @@ def _get_group_by_table_template( title=title_blocks, preview=preview_blocks, details=details_blocks ) + def _get_group_by_owner_template( + self, alert: GroupedByOwnerAlerts, *args, **kwargs + ) -> SlackAlertMessageSchema: + alerts = alert.alerts + + self.message_builder.add_message_color(self._get_color(alert.status)) + + title_blocks = [ + self.message_builder.create_header_block( + f"{self._get_display_name(alert.status)}: {alert.summary}" + ), + self._get_alert_type_counters_block(alert), + ] + + report_link = get_owner_test_runs_link(alert.report_url, alert.owner) + + if report_link: + report_link_block = self.message_builder.create_context_block( + [ + f"<{report_link.url}|{report_link.text}>", + ], + ) + title_blocks.append(report_link_block) + + # attention required : tags, owners, subscribers + preview_blocks = [] + + tags = list_of_lists_of_strings_to_comma_delimited_unique_strings( + [alert.tags or [] for alert in alerts], + ) + owners = list_of_lists_of_strings_to_comma_delimited_unique_strings( + [[alert.owner] if alert.owner else []] + ) + subscribers = list_of_lists_of_strings_to_comma_delimited_unique_strings( + [alert.subscribers or [] for alert in alerts] + ) + preview_blocks.append( + self.message_builder.create_text_section_block( + f"*Tags*: {tags if tags else '_No tags_'}" + ) + ) + preview_blocks.append( + self.message_builder.create_text_section_block( + f"*Owner*: {owners if owners else '_No owners_'}" + ) + ) + preview_blocks.append( + self.message_builder.create_text_section_block( + f"*Subscribers*: {subscribers if subscribers else '_No subscribers_'}" + ) + ) + + details_blocks = [] + # Model errors + if alert.model_errors: + details_blocks.append( + self.message_builder.create_text_section_block("*Model errors*") + ) + details_blocks.append(self.message_builder.create_divider_block()) + block_header = self.message_builder.create_context_block( + self._get_model_error_block_header(alert.model_errors) + ) + block_body = self.message_builder.create_text_section_block( + self._get_model_error_block_body(alert.model_errors) + ) + details_blocks.extend([block_header, block_body]) + + # Test failures + if alert.test_failures: + details_blocks.append( + self.message_builder.create_text_section_block("*Test failures*") + ) + rows = [alert.summary for alert in alert.test_failures] + text = "\n".join([f":small_red_triangle: {row}" for row in rows]) + details_blocks.append(self.message_builder.create_text_section_block(text)) + + # Test warnings + if alert.test_warnings: + details_blocks.append( + self.message_builder.create_text_section_block("*Test warnings*") + ) + rows = [alert.summary for alert in alert.test_warnings] + text = "\n".join([f":warning: {row}" for row in rows]) + details_blocks.append(self.message_builder.create_text_section_block(text)) + + # Test errors + if alert.test_errors: + details_blocks.append( + self.message_builder.create_text_section_block("*Test errors*") + ) + rows = [alert.summary for alert in alert.test_errors] + text = "\n".join([f":exclamation: {row}" for row in rows]) + details_blocks.append(self.message_builder.create_text_section_block(text)) + + return SlackAlertMessageSchema( + title=title_blocks, preview=preview_blocks, details=details_blocks + ) + def _add_compact_sub_group_details_block( self, details_blocks: list, @@ -1112,6 +1213,7 @@ def send_alert( integration_params = self._get_integration_params(alert=alert) channel_name = integration_params.get("channel") logger.debug(f"Sending alert to Slack channel: {channel_name}") + template: SlackMessageSchema | None = None try: self._fix_owners_and_subscribers(alert) template = self._get_alert_template(alert) diff --git a/elementary/monitor/data_monitoring/alerts/integrations/teams/teams.py b/elementary/monitor/data_monitoring/alerts/integrations/teams/teams.py index fb0c329eb..4295aeecd 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/teams/teams.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/teams/teams.py @@ -8,6 +8,9 @@ from elementary.clients.teams.client import TeamsClient from elementary.config.config import Config from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts +from elementary.monitor.alerts.alerts_groups.grouped_by_owner import ( + GroupedByOwnerAlerts, +) from elementary.monitor.alerts.model_alert import ModelAlertModel from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel from elementary.monitor.alerts.test_alert import TestAlertModel @@ -129,6 +132,7 @@ def _add_report_link_if_applicable( ModelAlertModel, SourceFreshnessAlertModel, GroupedByTableAlerts, + GroupedByOwnerAlerts, ], ): report_link = alert.get_report_link() @@ -520,6 +524,100 @@ def _get_group_by_table_template( self._get_section("*Test errors*", f"{text}") ) + def _get_group_by_owner_template( + self, alert: GroupedByOwnerAlerts, *args, **kwargs + ): + alerts = alert.alerts + title = f"{self._get_display_name(alert.status)}: {alert.summary}" + subtitle = "" + + if alert.model_errors: + subtitle = ( + subtitle + + (" | " + f"😵 Model errors: {len(alert.model_errors)}") + if subtitle + else f"😵 Model errors: {len(alert.model_errors)}" + ) + if alert.test_failures: + subtitle = ( + subtitle + + (" | " + f"🔺 Test failures: {len(alert.test_failures)}") + if subtitle + else f"🔺 Test failures: {len(alert.test_failures)}" + ) + if alert.test_warnings: + subtitle = ( + subtitle + + (" | " + f"⚠ Test warnings: {len(alert.test_warnings)}") + if subtitle + else f"⚠ Test warnings: {len(alert.test_warnings)}" + ) + if alert.test_errors: + subtitle = ( + subtitle + (" | " + f"❗ Test errors: {len(alert.test_errors)}") + if subtitle + else f"❗ Test errors: {len(alert.test_errors)}" + ) + + self._add_report_link_if_applicable(alert) + + self.message_builder.title(title) + self.message_builder.text(subtitle) + + tags = list_of_lists_of_strings_to_comma_delimited_unique_strings( + [alert.tags or [] for alert in alerts] + ) + owners = list_of_lists_of_strings_to_comma_delimited_unique_strings( + [[alert.owner] if alert.owner else []] + ) + subscribers = list_of_lists_of_strings_to_comma_delimited_unique_strings( + [alert.subscribers or [] for alert in alerts] + ) + + self.message_builder.addSection( + self._get_section("*Tags*", f"_{tags if tags else "No tags"}_") + ) + self.message_builder.addSection( + self._get_section("*Owners*", f"_{owners if owners else "No owners"}_") + ) + self.message_builder.addSection( + self._get_section( + "*Subscribers*", f"_{subscribers if subscribers else 'No subscribers'}_" + ) + ) + + if alert.model_errors: + section = cardsection() + section.activityTitle("*Model errors*") + section.activitySubtitle( + f"{self._get_model_error_block_header(alert.model_errors)}" + ) + section.activityText( + f"{self._get_model_error_block_body(alert.model_errors)}" + ) + self.message_builder.addSection(section) + + if alert.test_failures: + rows = [alert.concise_name for alert in alert.test_failures] + text = "
".join([f"🔺 {row}" for row in rows]) + self.message_builder.addSection( + self._get_section("*Test failures*", f"{text}") + ) + + if alert.test_warnings: + rows = [alert.concise_name for alert in alert.test_warnings] + text = "
".join([f"⚠ {row}" for row in rows]) + self.message_builder.addSection( + self._get_section("*Test warnings*", f"{text}") + ) + + if alert.test_errors: + rows = [alert.concise_name for alert in alert.test_errors] + text = "
".join([f"❗ {row}" for row in rows]) + self.message_builder.addSection( + self._get_section("*Test errors*", f"{text}") + ) + def _get_sub_group_detailed_section( self, alerts: Sequence[ diff --git a/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py b/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py index 651a364af..a09062342 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py @@ -66,3 +66,20 @@ def get_model_test_runs_link( report_link = ReportLinkData(url=url, text=TEST_RUNS_LINK_TEXT) return report_link + + +def get_owner_test_runs_link( + report_url: Optional[str], owner: Optional[str] +) -> Optional[ReportLinkData]: + report_link = None + + if owner and report_url: + formatted_report_url = _get_formatted_report_url(report_url) + url = ( + f"{formatted_report_url}/report/{ReportPath.TEST_RUNS.value}/?treeType=owners" + f'&treeFilters=["failures","warnings"]' + f'&treeNode={{"id":"folderNode_{owner}"}}' + ) + report_link = ReportLinkData(url=url, text=TEST_RUNS_LINK_TEXT) + + return report_link From d31d873adb772c345c2737ed56c43791355845c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Zaffarano?= Date: Wed, 5 Feb 2025 16:53:57 +0100 Subject: [PATCH 2/4] Update url --- .../alerts/integrations/utils/report_link.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py b/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py index a09062342..69ad90fd0 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py @@ -14,6 +14,7 @@ class ReportLinkData(BaseModel): class ReportPath(Enum): TEST_RUNS = "test-runs" + TEST_RESULTS = "test-results" MODEL_RUNS = "model-runs" @@ -76,9 +77,9 @@ def get_owner_test_runs_link( if owner and report_url: formatted_report_url = _get_formatted_report_url(report_url) url = ( - f"{formatted_report_url}/report/{ReportPath.TEST_RUNS.value}/?treeType=owners" - f'&treeFilters=["failures","warnings"]' - f'&treeNode={{"id":"folderNode_{owner}"}}' + f"{formatted_report_url}/report/{ReportPath.TEST_RESULTS.value}/?tree_view_by=owners" + f'&tree_filters=["failures","warnings"]' + f'&tree_node={{"id":"folderNode_{owner}"}}' ) report_link = ReportLinkData(url=url, text=TEST_RUNS_LINK_TEXT) From f2150e1dbdcbadaae4b4b0a028ab01a7b3199bc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Zaffarano?= Date: Thu, 6 Feb 2025 11:00:55 +0100 Subject: [PATCH 3/4] Add a second grouping level by model --- .../alerts/alerts_groups/grouped_by_owner.py | 4 +- .../alerts/integrations/slack/slack.py | 62 +++++++++++++------ .../alerts/integrations/utils/report_link.py | 2 +- 3 files changed, 45 insertions(+), 23 deletions(-) diff --git a/elementary/monitor/alerts/alerts_groups/grouped_by_owner.py b/elementary/monitor/alerts/alerts_groups/grouped_by_owner.py index ff5c4e1e0..f7aba5c00 100644 --- a/elementary/monitor/alerts/alerts_groups/grouped_by_owner.py +++ b/elementary/monitor/alerts/alerts_groups/grouped_by_owner.py @@ -6,7 +6,7 @@ from elementary.monitor.alerts.test_alert import TestAlertModel from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import ( ReportLinkData, - get_owner_test_runs_link, + get_test_runs_by_owner_link, ) @@ -31,7 +31,7 @@ def summary(self) -> str: def get_report_link(self) -> Optional[ReportLinkData]: if not self.model_errors: - return get_owner_test_runs_link(self.report_url, self.owner) + return get_test_runs_by_owner_link(self.report_url, self.owner) return None diff --git a/elementary/monitor/data_monitoring/alerts/integrations/slack/slack.py b/elementary/monitor/data_monitoring/alerts/integrations/slack/slack.py index 3f9f26765..a730d66f9 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/slack/slack.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/slack/slack.py @@ -1,7 +1,8 @@ import json import re from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional, Sequence, Union +from itertools import groupby +from typing import Any, Dict, List, Optional, Sequence, Union from slack_sdk.models.blocks import SectionBlock @@ -26,7 +27,7 @@ ) from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import ( get_model_test_runs_link, - get_owner_test_runs_link, + get_test_runs_by_owner_link, ) from elementary.tracking.tracking_interface import Tracking from elementary.utils.json_utils import ( @@ -889,8 +890,7 @@ def _get_group_by_owner_template( self._get_alert_type_counters_block(alert), ] - report_link = get_owner_test_runs_link(alert.report_url, alert.owner) - + report_link = get_test_runs_by_owner_link(alert.report_url, alert.owner) if report_link: report_link_block = self.message_builder.create_context_block( [ @@ -899,23 +899,14 @@ def _get_group_by_owner_template( ) title_blocks.append(report_link_block) - # attention required : tags, owners, subscribers + # attention required : owner, subscribers preview_blocks = [] - - tags = list_of_lists_of_strings_to_comma_delimited_unique_strings( - [alert.tags or [] for alert in alerts], - ) owners = list_of_lists_of_strings_to_comma_delimited_unique_strings( [[alert.owner] if alert.owner else []] ) subscribers = list_of_lists_of_strings_to_comma_delimited_unique_strings( [alert.subscribers or [] for alert in alerts] ) - preview_blocks.append( - self.message_builder.create_text_section_block( - f"*Tags*: {tags if tags else '_No tags_'}" - ) - ) preview_blocks.append( self.message_builder.create_text_section_block( f"*Owner*: {owners if owners else '_No owners_'}" @@ -947,8 +938,15 @@ def _get_group_by_owner_template( details_blocks.append( self.message_builder.create_text_section_block("*Test failures*") ) - rows = [alert.summary for alert in alert.test_failures] - text = "\n".join([f":small_red_triangle: {row}" for row in rows]) + + text = "" + for model, errors in self._group_alerts_by_model( + alert.test_failures + ).items(): + rows = [alert.concise_name for alert in errors] + text += f":small_blue_diamond: *{model} ({len(errors)})*\n" + text += "\n".join([f"\t:small_red_triangle: {row}" for row in rows]) + text += "\n" details_blocks.append(self.message_builder.create_text_section_block(text)) # Test warnings @@ -956,8 +954,14 @@ def _get_group_by_owner_template( details_blocks.append( self.message_builder.create_text_section_block("*Test warnings*") ) - rows = [alert.summary for alert in alert.test_warnings] - text = "\n".join([f":warning: {row}" for row in rows]) + text = "" + for model, errors in self._group_alerts_by_model( + alert.test_warnings + ).items(): + rows = [alert.concise_name for alert in errors] + text += f":small_blue_diamond: *{model} ({len(errors)})*\n" + text += "\n".join([f"\t:warning: {row}" for row in rows]) + text += "\n" details_blocks.append(self.message_builder.create_text_section_block(text)) # Test errors @@ -965,14 +969,32 @@ def _get_group_by_owner_template( details_blocks.append( self.message_builder.create_text_section_block("*Test errors*") ) - rows = [alert.summary for alert in alert.test_errors] - text = "\n".join([f":exclamation: {row}" for row in rows]) + text = "" + for model, errors in self._group_alerts_by_model( + alert.test_errors + ).items(): + rows = [alert.concise_name for alert in errors] + text += f":small_blue_diamond: *{model} ({len(errors)})*\n" + text += "\n".join([f"\t:exclamation: {row}" for row in rows]) + text += "\n" details_blocks.append(self.message_builder.create_text_section_block(text)) return SlackAlertMessageSchema( title=title_blocks, preview=preview_blocks, details=details_blocks ) + @staticmethod + def _group_alerts_by_model( + alerts: List[TestAlertModel | SourceFreshnessAlertModel], + ) -> dict[str, list[TestAlertModel | SourceFreshnessAlertModel]]: + def key(e: TestAlertModel | SourceFreshnessAlertModel): + if isinstance(e, TestAlertModel): + return f"{e.schema_name}.{e.table_name}" + else: + return f"{e.schema_name}.{e.source_name}" + + return {key: list(group) for key, group in groupby(alerts, key=key)} + def _add_compact_sub_group_details_block( self, details_blocks: list, diff --git a/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py b/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py index 69ad90fd0..ea96d37d3 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/utils/report_link.py @@ -69,7 +69,7 @@ def get_model_test_runs_link( return report_link -def get_owner_test_runs_link( +def get_test_runs_by_owner_link( report_url: Optional[str], owner: Optional[str] ) -> Optional[ReportLinkData]: report_link = None From 267dac56306c9922441c847867ed233209b1527f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Zaffarano?= Date: Thu, 6 Feb 2025 11:17:06 +0100 Subject: [PATCH 4/4] Fix pyright validations --- elementary/config/config.py | 10 +++++----- .../alerts/integrations/slack/message_builder.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/elementary/config/config.py b/elementary/config/config.py index 167a8dca6..1d31d1770 100644 --- a/elementary/config/config.py +++ b/elementary/config/config.py @@ -1,6 +1,6 @@ import os from pathlib import Path -from typing import Optional +from typing import Optional, cast import google.auth # type: ignore[import] from dateutil import tz @@ -88,11 +88,11 @@ def __init__( config = self._load_configuration() - self.target_dir = self._first_not_none( + self.target_dir = str(self._first_not_none( target_path, config.get("target-path"), os.getcwd(), - ) + )) os.makedirs(os.path.abspath(self.target_dir), exist_ok=True) os.environ["DBT_LOG_PATH"] = os.path.abspath(target_path) @@ -129,11 +129,11 @@ def __init__( slack_config.get("group_alerts_by"), GroupingType.BY_ALERT.value, ) - self.group_alerts_threshold = self._first_not_none( + self.group_alerts_threshold = cast(int, self._first_not_none( group_alerts_threshold, slack_config.get("group_alerts_threshold"), self.DEFAULT_GROUP_ALERTS_THRESHOLD, - ) + )) teams_config = config.get(self._TEAMS, {}) self.teams_webhook = self._first_not_none( diff --git a/elementary/monitor/data_monitoring/alerts/integrations/slack/message_builder.py b/elementary/monitor/data_monitoring/alerts/integrations/slack/message_builder.py index b50560d32..6230c842c 100644 --- a/elementary/monitor/data_monitoring/alerts/integrations/slack/message_builder.py +++ b/elementary/monitor/data_monitoring/alerts/integrations/slack/message_builder.py @@ -87,6 +87,6 @@ def _validate_preview_blocks(cls, preview_blocks: Optional[SlackBlocksType] = No padding_length = ( SlackMessageBuilder._MAX_ALERT_PREVIEW_BLOCKS - preview_blocks_count ) - padding = [cls.create_empty_section_block() for i in range(padding_length)] + padding = [cls.create_empty_section_block() for _ in range(padding_length)] padded_preview_blocks.extend(padding) return padded_preview_blocks