From 8c9f99e96fc1a5184cde5494e490dfb90d3a2443 Mon Sep 17 00:00:00 2001 From: IDoneShaveIt Date: Sun, 19 Nov 2023 17:14:52 +0200 Subject: [PATCH] ELE-2007 - validate ci monitor alerts --- .github/workflows/test-warehouse.yml | 7 +- elementary/cli/cli.py | 2 + elementary/cli/e2e/__init__.py | 0 elementary/cli/e2e/cli.py | 172 ++++++++++++++++++ elementary/cli/e2e/mocks/__init__.py | 0 .../e2e/mocks/e2e_data_monitoring_alerts.py | 69 +++++++ elementary/cli/e2e/mocks/e2e_slack_client.py | 137 ++++++++++++++ .../cli/e2e/mocks/e2e_slack_integration.py | 28 +++ .../alerts/data_monitoring_alerts.py | 3 + tests/e2e/__init__.py | 0 10 files changed, 415 insertions(+), 3 deletions(-) create mode 100644 elementary/cli/e2e/__init__.py create mode 100644 elementary/cli/e2e/cli.py create mode 100644 elementary/cli/e2e/mocks/__init__.py create mode 100644 elementary/cli/e2e/mocks/e2e_data_monitoring_alerts.py create mode 100644 elementary/cli/e2e/mocks/e2e_slack_client.py create mode 100644 elementary/cli/e2e/mocks/e2e_slack_integration.py create mode 100644 tests/e2e/__init__.py diff --git a/.github/workflows/test-warehouse.yml b/.github/workflows/test-warehouse.yml index 9420639e7..ffaa8464b 100644 --- a/.github/workflows/test-warehouse.yml +++ b/.github/workflows/test-warehouse.yml @@ -159,14 +159,15 @@ jobs: - name: Run monitor env: - SLACK_WEBHOOK: ${{ secrets.CI_SLACK_WEBHOOK }} + SLACK_TOKEN: ${{ secrets.CI_SLACK_TOKEN }} run: > - edr monitor + edr e2e-monitor -t "${{ inputs.warehouse-type }}" --group-by table --project-dir "${{ env.DBT_PKG_INTEG_TESTS_DIR }}" --project-profile-target "${{ inputs.warehouse-type }}" - --slack-webhook "$SLACK_WEBHOOK" + --slack-token "$SLACK_TOKEN" + --slack-channel-name data-ops - name: Validate alerts statuses were updated working-directory: ${{ env.ELMENTARY_INTERNAL_DBT_PKG_DIR }} diff --git a/elementary/cli/cli.py b/elementary/cli/cli.py index 3bb39662f..3f98417ab 100644 --- a/elementary/cli/cli.py +++ b/elementary/cli/cli.py @@ -5,6 +5,7 @@ from pyfiglet import Figlet import elementary.cli.upgrade +from elementary.cli.e2e.cli import e2e_monitor from elementary.config.config import Config from elementary.monitor.cli import monitor, report, send_report from elementary.operations.cli import run_operation @@ -36,6 +37,7 @@ class ElementaryCLI(click.MultiCommand): "report": report, "send-report": send_report, "run-operation": run_operation, + "e2e-monitor": e2e_monitor, } def list_commands(self, ctx): diff --git a/elementary/cli/e2e/__init__.py b/elementary/cli/e2e/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/elementary/cli/e2e/cli.py b/elementary/cli/e2e/cli.py new file mode 100644 index 000000000..795e69cc4 --- /dev/null +++ b/elementary/cli/e2e/cli.py @@ -0,0 +1,172 @@ +import sys + +import click + +from elementary.cli.e2e.mocks.e2e_data_monitoring_alerts import E2EDataMonitoringAlerts +from elementary.config.config import Config +from elementary.monitor.cli import Command, common_options, get_cli_properties +from elementary.tracking.anonymous_tracking import AnonymousCommandLineTracking +from elementary.utils.ordered_yaml import OrderedYaml + +yaml = OrderedYaml() + + +@click.group(invoke_without_command=True) +@common_options(Command.MONITOR) +@click.option( + "--slack-webhook", + "-sw", + type=str, + default=None, + help="A slack webhook URL for sending alerts to a specific channel.", +) +@click.option( + "--deprecated-slack-webhook", + "-s", # Deprecated - will be used for --select in the future + type=str, + default=None, + help="DEPRECATED! - A slack webhook URL for sending alerts to a specific channel.", +) +@click.option( + "--timezone", + "-tz", + type=str, + default=None, + help="The timezone of which all timestamps will be converted to. (default is user local timezone)", +) +@click.option( + "--full-refresh-dbt-package", + "-f", + type=bool, + default=False, + help="Force running a full refresh of all incremental models in the edr dbt package (usually this is not needed, " + "see documentation to learn more).", +) +@click.option( + "--dbt-vars", + type=str, + default=None, + help="Specify raw YAML string of your dbt variables.", +) +@click.option( + "--test", + type=bool, + default=False, + help="Whether to send a test message in case there are no alerts.", +) +@click.option( + "--suppression-interval", + type=int, + default=0, + help="The number of hours to suppress alerts after an alert was sent (this is a global default setting).", +) +@click.option( + "--group-by", + type=click.Choice(["alert", "table"]), + default=None, + help="Whether to group alerts by 'alert' or by 'table'", +) +@click.option( + "--override-dbt-project-config", + "-oc", + is_flag=True, + help="Whether to override the settings (slack channel, suppression interval) " + "in the model or test meta in the dbt project with the parameters provided by the CLI.", +) +@click.option( + "--report-url", + type=str, + default=None, + help="The report URL for the alert attached links.", +) +@click.pass_context +def e2e_monitor( + ctx, + days_back, + slack_webhook, + deprecated_slack_webhook, + slack_token, + slack_channel_name, + timezone, + config_dir, + profiles_dir, + project_dir, + update_dbt_package, + full_refresh_dbt_package, + dbt_quoting, + profile_target, + project_profile_target, + dbt_vars, + test, + disable_samples, + env, + select, + group_by, + target_path, + suppression_interval, + override_dbt_project_config, + report_url, +): + """ + Run e2e test for edr monitor command. + """ + if ctx.invoked_subcommand is not None: + return + if deprecated_slack_webhook is not None: + click.secho( + '\n"-s" is deprecated and won\'t be supported in the near future.\n' + 'Please use "-sw" or "--slack-webhook" for passing Slack webhook.\n', + fg="bright_red", + ) + slack_webhook = deprecated_slack_webhook + vars = yaml.loads(dbt_vars) if dbt_vars else None + config = Config( + config_dir=config_dir, + profiles_dir=profiles_dir, + project_dir=project_dir, + profile_target=profile_target, + project_profile_target=project_profile_target, + target_path=target_path, + dbt_quoting=dbt_quoting, + slack_webhook=slack_webhook, + slack_token=slack_token, + slack_channel_name=slack_channel_name, + timezone=timezone, + env=env, + slack_group_alerts_by=group_by, + report_url=report_url, + ) + anonymous_tracking = AnonymousCommandLineTracking(config) + anonymous_tracking.set_env("use_select", bool(select)) + try: + config.validate_monitor() + data_monitoring = E2EDataMonitoringAlerts( + config=config, + tracking=anonymous_tracking, + force_update_dbt_package=update_dbt_package, + send_test_message_on_success=test, + disable_samples=disable_samples, + filter=select, + global_suppression_interval=suppression_interval, + override_config=override_dbt_project_config, + ) + # The call to track_cli_start must be after the constructor of DataMonitoringAlerts as it enriches the tracking + # properties. This is a tech-debt that should be fixed in the future. + anonymous_tracking.track_cli_start( + Command.MONITOR, get_cli_properties(), ctx.command.name + ) + success = data_monitoring.run_alerts( + days_back, full_refresh_dbt_package, dbt_vars=vars + ) + anonymous_tracking.track_cli_end( + Command.MONITOR, data_monitoring.properties(), ctx.command.name + ) + if not success: + sys.exit(1) + except Exception as exc: + anonymous_tracking.track_cli_exception(Command.MONITOR, exc, ctx.command.name) + raise + + validation_passed = data_monitoring.validate_send_alerts() + if not validation_passed: + raise Exception("Validation failed") diff --git a/elementary/cli/e2e/mocks/__init__.py b/elementary/cli/e2e/mocks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/elementary/cli/e2e/mocks/e2e_data_monitoring_alerts.py b/elementary/cli/e2e/mocks/e2e_data_monitoring_alerts.py new file mode 100644 index 000000000..9b9f7f59d --- /dev/null +++ b/elementary/cli/e2e/mocks/e2e_data_monitoring_alerts.py @@ -0,0 +1,69 @@ +from typing import Optional + +from elementary.cli.e2e.mocks.e2e_slack_integration import E2ESlackIntegration +from elementary.config.config import Config +from elementary.monitor.data_monitoring.alerts.data_monitoring_alerts import ( + DataMonitoringAlerts, +) +from elementary.tracking.tracking_interface import Tracking +from elementary.utils.log import get_logger + +logger = get_logger(__name__) + + +class E2EDataMonitoringAlerts(DataMonitoringAlerts): + def __init__( + self, + config: Config, + tracking: Optional[Tracking] = None, + filter: Optional[str] = None, + force_update_dbt_package: bool = False, + disable_samples: bool = False, + send_test_message_on_success: bool = False, + global_suppression_interval: int = 0, + override_config: bool = False, + ): + tracking = None + super().__init__( + config, + tracking, + filter, + force_update_dbt_package, + disable_samples, + send_test_message_on_success, + global_suppression_interval, + override_config, + ) + + def _get_integration_client(self): + return E2ESlackIntegration( + config=self.config, + tracking=self.tracking, + override_config_defaults=self.override_config_defaults, + ) + + # Validate that we actually posted the alerts at Slack + # Currently only checking that we sent the right amount of alerts + def validate_send_alerts(self): + logger.info("Validating alerts sent successfully") + validated_alerts = 0 + + integration_instance_unique_id = self.alerts_integraion.client.unique_id + channel_messages = ( + self.alerts_integraion.client.get_channel_messages_with_replies( + channel_name=self.config.slack_channel_name, after_hours=0.5 + ) + ) + for messages in channel_messages: + if len(messages) == 2: + if messages[1].get("text") == integration_instance_unique_id: + validated_alerts += 1 + + validation_passed = validated_alerts == self.alerts_to_send_count + if validation_passed: + logger.info("Validation passed - all of the alerts were sent successfully") + else: + logger.error( + f"Validation fails - expected {self.alerts_to_send_count} to be sent, but found only {validated_alerts}." + ) + return validation_passed diff --git a/elementary/cli/e2e/mocks/e2e_slack_client.py b/elementary/cli/e2e/mocks/e2e_slack_client.py new file mode 100644 index 000000000..e23cbbd68 --- /dev/null +++ b/elementary/cli/e2e/mocks/e2e_slack_client.py @@ -0,0 +1,137 @@ +import json +import uuid +from datetime import datetime, timedelta +from typing import Optional + +from ratelimit import limits, sleep_and_retry # type: ignore[import] +from slack_sdk.errors import SlackApiError + +from elementary.clients.slack.client import ( + ONE_MINUTE, + ONE_SECOND, + SlackClient, + SlackWebClient, +) +from elementary.clients.slack.schema import SlackMessageSchema +from elementary.config.config import Config +from elementary.tracking.tracking_interface import Tracking + + +class E2ESlackWebClient(SlackWebClient): + def __init__( + self, + token: str, + tracking: Optional[Tracking] = None, + ): + super().__init__(token, tracking) + self.unique_id = str(uuid.uuid4()) + + @sleep_and_retry + @limits(calls=50, period=ONE_MINUTE) + def get_channel_messages(self, channel_name: str, after_hours: float = 24): + channel_id = self._get_channel_id(channel_name=channel_name) + min_timestamp = ( + (datetime.utcnow() - timedelta(hours=after_hours)).timestamp() + if after_hours + else 0 + ) + cursor = None + messsges = [] + while True: + response = self.client.conversations_history( + channel=channel_id, oldest=min_timestamp, cursor=cursor + ) + messsges.extend(response["messages"]) + cursor = response.get("response_metadata", {}).get("next_cursor") + if cursor is None: + break + return messsges + + @sleep_and_retry + @limits(calls=50, period=ONE_MINUTE) + def get_channel_messages_with_replies( + self, channel_name: str, after_hours: float = 24 + ): + channel_id = self._get_channel_id(channel_name=channel_name) or "" + messages_with_replies = [] + messages = self.get_channel_messages( + channel_name=channel_name, after_hours=after_hours + ) + for message in messages: + messages_with_replies.append( + self._get_channel_message_replies( + channel_id=channel_id, message_ts=message.get("ts") + ) + ) + return messages_with_replies + + def _get_channel_message_replies(self, channel_id: str, message_ts: str): + response = self.client.conversations_replies(channel=channel_id, ts=message_ts) + return response["messages"] + + @sleep_and_retry + @limits(calls=1, period=ONE_SECOND) + def reply_to_message( + self, channel_name: str, message_ts: str, reply_text: str + ) -> bool: + try: + self.client.chat_postMessage( + channel=channel_name, text=reply_text, thread_ts=str(message_ts) + ) + return True + except SlackApiError as err: + if self._handle_send_err(err, channel_name): + return self.reply_to_message( + channel_name=channel_name, + message_ts=message_ts, + reply_text=reply_text, + ) + if self.tracking: + self.tracking.record_internal_exception(err) + return False + + @sleep_and_retry + @limits(calls=1, period=ONE_SECOND) + def send_message( + self, channel_name: str, message: SlackMessageSchema, **kwargs + ) -> bool: + try: + response = self.client.chat_postMessage( + channel=channel_name, + text=message.text, + blocks=json.dumps(message.blocks) if message.blocks else None, + attachments=json.dumps(message.attachments) + if message.attachments + else None, + ) + message_ts = response.get("ts", "0") + self.reply_to_message( + channel_name=channel_name, + message_ts=message_ts, + reply_text=self.unique_id, + ) + return True + except SlackApiError as err: + if self._handle_send_err(err, channel_name): + return self.send_message(channel_name, message) + if self.tracking: + self.tracking.record_internal_exception(err) + return False + + +class E2ESlackClient(SlackClient): + def __init__(self, tracking: Optional[Tracking] = None): + super().__init__(tracking) + + @staticmethod + def create_client( + config: Config, tracking: Optional[Tracking] = None + ) -> Optional[E2ESlackWebClient]: + if not config.has_slack: + return None + if config.slack_token: + return E2ESlackWebClient(token=config.slack_token, tracking=tracking) + elif config.slack_webhook: + # We can't read Slack messages using webhook + return None + return None diff --git a/elementary/cli/e2e/mocks/e2e_slack_integration.py b/elementary/cli/e2e/mocks/e2e_slack_integration.py new file mode 100644 index 000000000..475ec0004 --- /dev/null +++ b/elementary/cli/e2e/mocks/e2e_slack_integration.py @@ -0,0 +1,28 @@ +from typing import Optional + +from elementary.cli.e2e.mocks.e2e_slack_client import E2ESlackClient +from elementary.config.config import Config +from elementary.monitor.data_monitoring.alerts.integrations.slack.slack import ( + SlackIntegration, +) +from elementary.tracking.tracking_interface import Tracking + + +class E2ESlackIntegration(SlackIntegration): + def __init__( + self, + config: Config, + tracking: Optional[Tracking] = None, + override_config_defaults=False, + *args, + **kwargs + ) -> None: + super().__init__(config, tracking, override_config_defaults, *args, **kwargs) + + def _initial_client(self, *args, **kwargs): + slack_client = E2ESlackClient.create_client( + config=self.config, tracking=self.tracking + ) + if not slack_client: + raise Exception("Could not initial Slack client") + return slack_client diff --git a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py index 309b708ca..39b81e608 100644 --- a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py +++ b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py @@ -48,6 +48,7 @@ def __init__( self.global_suppression_interval, self.override_config, ) + self.alerts_to_send_count = 0 self.sent_alert_count = 0 self.send_test_message_on_success = send_test_message_on_success self.override_config_defaults = override_config @@ -142,6 +143,8 @@ def _send_alerts( self.execution_properties["sent_alert_count"] = self.sent_alert_count return + self.alerts_to_send_count = len(alerts) + sent_alert_ids_by_type: Dict[ResourceType, List[str]] = { ResourceType.TEST: [], ResourceType.MODEL: [], diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 000000000..e69de29bb