Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8750f63
Notification Improvements Stage 1, 70%
TejasRGitHub Dec 26, 2024
1cd2224
Changes in the principal name
TejasRGitHub Dec 27, 2024
1728d52
Weekly email task
TejasRGitHub Dec 30, 2024
daa709e
Weekly notifications code and minor changes
TejasRGitHub Dec 31, 2024
84bf9fe
Glue Table change notifications
TejasRGitHub Jan 2, 2025
8c861ff
Additional comments and logs plus weekly reminder task
TejasRGitHub Jan 3, 2025
99ed2ec
Syncing changes from local deploy
TejasRGitHub Jan 10, 2025
c119c55
Adding new changes
TejasRGitHub Jan 14, 2025
f1e4079
Minor correction in the create bucket policy code when access to buck…
TejasRGitHub Jan 14, 2025
870a835
New changes
TejasRGitHub Jan 15, 2025
5b8ec01
Adding new changes for admin notifications
TejasRGitHub Jan 15, 2025
17158e2
Linting update
TejasRGitHub Jan 15, 2025
fd32987
Admin notification improvements
TejasRGitHub Jan 15, 2025
457ce82
Adding more refactoring changes
TejasRGitHub Jan 15, 2025
878082a
Refactoring and corrections
TejasRGitHub Jan 21, 2025
401b0de
Minor changes after testing
TejasRGitHub Jan 22, 2025
63dd9f1
Adding new changes and corrections
TejasRGitHub Jan 22, 2025
828edf8
Minor change and linting
TejasRGitHub Jan 22, 2025
8be4298
Formatting changes
TejasRGitHub Jan 22, 2025
a408553
Adding improvements after review comments
TejasRGitHub Jan 23, 2025
8b9b10c
Small slight changes
TejasRGitHub Jan 24, 2025
d83d85c
Weekly notification enums
TejasRGitHub Jan 24, 2025
215ebe4
Few corrections
TejasRGitHub Jan 24, 2025
7a92d9a
Minor corrections
TejasRGitHub Jan 28, 2025
60e0988
Merge branch 'main' into GH-1420-notification-improvements-1
Jan 28, 2025
f60e4ce
Resolving tests
TejasRGitHub Jan 28, 2025
83156c7
Filed reformatted by ruff
TejasRGitHub Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions backend/dataall/modules/catalog/tasks/catalog_indexer_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dataall.base.db import get_engine
from dataall.base.loader import load_modules, ImportMode
from dataall.base.utils.alarm_service import AlarmService
from dataall.modules.notifications.services.admin_notifications import AdminNotificationService
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is a part of PR


log = logging.getLogger(__name__)

Expand All @@ -31,7 +32,14 @@ def index_objects(cls, engine, with_deletes='False'):
CatalogIndexerTask._delete_old_objects(indexed_object_uris)
return len(indexed_object_uris)
except Exception as e:
error_log = f'Error occurred while indexing objects during the cataloging task. Exception: {e}'
log.error(error_log)
AlarmService().trigger_catalog_indexing_failure_alarm(error=str(e))
AdminNotificationService().notify_admins_with_error_log(
process_error='Exception occurred during cataloging task',
error_logs=[error_log],
process_name='Catalog Task'
)
raise e

@classmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import List

from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService


class AdminNotificationService:
admin_group = 'DAAdministrators'

"""
Send email notifications to Admin Group i.e. DAAdministrators in data.all
Args -
1. process_error - string describing in short the error / exception details
2. error_logs - List of all the exception error logs
3. process_name - Code where the exception occurred. Example, inside an ECS task like cataloging task, etc or inside a graphql service
"""
@classmethod
def notify_admins_with_error_log(cls, process_error: str, error_logs: List[str], process_name:str = ''):

subject = f'Data.all alert | Attention Required | Failure in : {process_name}'
email_message = f"""
Following error occurred - <br><br> {process_error} <br><br>
"""
for error_log in error_logs:
email_message += error_log + "<br><br>"

email_message += "Please check the logs in cloudwatch for more details"

SESEmailNotificationService.create_and_send_email_notifications(
subject=subject,
msg=email_message,
recipient_groups_list=[cls.admin_group]
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from dataall.base.aws.cognito import Cognito
from dataall.base.aws.ses import Ses
from dataall.base.config import config
from dataall.base.services.service_provider_factory import ServiceProviderFactory
from dataall.modules.notifications.services.base_email_notification_service import BaseEmailNotificationService

Expand Down Expand Up @@ -60,3 +61,31 @@ def send_email_to_users(email_list, email_provider, message, subject):
# https://aws.amazon.com/blogs/messaging-and-targeting/how-to-send-messages-to-multiple-recipients-with-amazon-simple-email-service-ses/
for emailId in email_list:
email_provider.send_email([emailId], message, subject)

@staticmethod
def create_and_send_email_notifications(subject, msg, recipient_groups_list=None, recipient_email_ids=None):
"""
Method to directly send email notification instead of creating an SQS Task
This approach is used while sending email notifications in an ECS task ( e.g. persistent email reminder task, share expiration task, etc )
Emails send to groups mentioned in recipient_groups_list and / or emails mentioned in recipient_email_ids
"""
if recipient_groups_list is None:
recipient_groups_list = []
if recipient_email_ids is None:
recipient_email_ids = []

share_notification_config = config.get_property(
'modules.datasets_base.features.share_notifications', default=None
)
if share_notification_config:
for share_notification_config_type in share_notification_config.keys():
n_config = share_notification_config[share_notification_config_type]
if n_config.get('active', False) == True:
if share_notification_config_type == 'email':
SESEmailNotificationService.send_email_task(
subject, msg, recipient_groups_list, recipient_email_ids
)
else:
log.info(f'Notification type : {share_notification_config_type} is not active')
else:
log.info('Notifications are not active')
Empty file.
261 changes: 261 additions & 0 deletions backend/dataall/modules/notifications/tasks/weekly_digest_reminder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import logging
import os
from typing import List, Dict, Any

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is a part of PR

from dataall.base.db import get_engine
from dataall.base.loader import load_modules, ImportMode
from dataall.core.environment.db.environment_models import Environment
from dataall.core.stacks.api.enums import StackStatus
from dataall.core.stacks.db.stack_repositories import StackRepository
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.notifications.services.admin_notifications import AdminNotificationService
from dataall.modules.notifications.services.ses_email_notification_service import SESEmailNotificationService
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.services.shares_enums import ShareItemHealthStatus

log = logging.getLogger(__name__)

"""
A container for holding the resource ( e.g. dataset, share object, environment, etc ), receivers and health status ( resource_status ) for sending notifications
"""


class NotificationResource:
def __init__(self, resource, resource_type: str, resource_status: str, receivers: List[str] = None):
self.resource = resource
self.resource_type = resource_type
self.resource_status = resource_status
self.receivers_list = set(receivers)


"""
Notification Bundle - Contains list of notification events for different types of resources ( dataset, shares, environment )
"""


class NotificationResourceBundle:
def __init__(self):
self.share_object_notifications: List[NotificationResource] = []
self.dataset_object_notifications: List[NotificationResource] = []
self.environment_object_notifications: List[NotificationResource] = []


def _get_pending_share_notifications(session):
pending_shares = ShareObjectRepository.get_shares_with_statuses(session=session, status_list=['Submitted'])
log.info(f"Found {len(pending_shares)} pending shares with share object status - Submitted")
share_dataset_map: Dict[ShareObject, DatasetBase] = {
share: DatasetBaseRepository.get_dataset_by_uri(session=session, dataset_uri=share.datasetUri) for share in
pending_shares}
return [
NotificationResource(
resource=share,
resource_type='Share Object',
resource_status=f'{share.status} - Pending Approval',
receivers=[share_dataset_map[share].SamlAdminGroupName, share_dataset_map[share].stewards])
for share in share_dataset_map
]


def _get_unhealthy_share_notification(session):
unhealthy_share_objects: List[ShareObject] = ShareObjectRepository.get_share_object_with_health_status(
session=session, health_status_list=[ShareItemHealthStatus.Unhealthy.value])
log.info(f"Found {len(unhealthy_share_objects)} unhealthy share objects")
return [
NotificationResource(resource=share, resource_type='Share_object', resource_status='Unhealthy',
receivers=[share.groupUri]) for share in unhealthy_share_objects]


def _get_unhealthy_stack_by_type(session, target_uri: str, target_type: Any):
unhealthy_stack_status: List[StackStatus] = [
StackStatus.CREATE_FAILED.value,
StackStatus.DELETE_FAILED.value,
StackStatus.UPDATE_FAILED.value,
StackStatus.UPDATE_ROLLBACK_FAILED.value,
StackStatus.ROLLBACK_FAILED.value
]
resource_objects = session.query(target_type).all()
unhealthy_stack_notification_resources: List[NotificationResource] = []
log.info(f"Found {len(unhealthy_stack_notification_resources)} unhealthy {target_type}")

# Check if stack associated with these datasets / environment exists
# If yes, create a notification resource
for resource in resource_objects:
stack = StackRepository.find_stack_by_target_uri(session=session,
target_uri=resource.__getattribute__(target_uri),
statuses=unhealthy_stack_status)
if stack is not None:
notification_resource = NotificationResource(resource=resource, resource_type=target_type.__name__,
resource_status=stack.status,
receivers=_get_receivers_for_stack(resource=resource,
target_type=target_type))
unhealthy_stack_notification_resources.append(notification_resource)

return unhealthy_stack_notification_resources


def _get_receivers_for_stack(resource, target_type):
if target_type.__name__ == 'Dataset':
return [resource.SamlAdminGroupName, resource.stewards]
if target_type.__name__ == 'Environment':
return [resource.SamlGroupName]

"""
Function to create a map of group name : resource bundle, where each resource bundle contains dataset, share and environment notification lists.
Iterated over all the notification ( NotificationResources ) and then segregate based on the dataset, shares & environment notifications and map the bundle to a team.
"""
def _map_groups_to_resource_bundles(list_of_notifications: List[NotificationResource], resource_bundle_type: str):
for notification in list_of_notifications:
# Get all the receivers groups
notification_receiver_groups = notification.receivers_list
for receiver_group_name in notification_receiver_groups:
if receiver_group_name in group_name_to_resource_bundle_map:
resource_bundle = group_name_to_resource_bundle_map.get(receiver_group_name)
resource_bundle.__getattribute__(resource_bundle_type).append(notification)
else:
resource_bundle = NotificationResourceBundle()
resource_bundle.__getattribute__(resource_bundle_type).append(notification)
group_name_to_resource_bundle_map[receiver_group_name] = resource_bundle


def send_reminder_email(engine):
task_exceptions = []
resources_type_tuple = ()
try:
with engine.scoped_session() as session:
# Get all shares in submitted state
pending_share_notification_resources = _get_pending_share_notifications(session=session)
resources_type_tuple.append((pending_share_notification_resources, "share_object_notifications"))
# Todo : Check if distinct needed for the share object repository
# Get all shares in unhealthy state
unhealthy_share_objects_notification_resources = _get_unhealthy_share_notification(session=session)
resources_type_tuple.append((unhealthy_share_objects_notification_resources, "share_object_notifications"))
# Get all the dataset which are in unhealthy state
unhealthy_datasets_notification_resources = _get_unhealthy_stack_by_type(session=session,
target_uri='datasetUri',
target_type=DatasetBase)
resources_type_tuple.append((unhealthy_share_objects_notification_resources, "dataset_object_notifications"))
# Get all the environments which are in unhealthy state
unhealthy_environment_notification_resources = _get_unhealthy_stack_by_type(session=session,
target_uri='environmentUri',
target_type=Environment)
resources_type_tuple.append(
(unhealthy_environment_notification_resources, "environment_object_notifications"))

# For each notification resource ( i.e. share notification, dataset notification, etc ),
# function _map_groups_to_resource_bundles maps each team name : resource bundle
# Equivalent to calling
# _map_groups_to_resource_bundles(list_of_notifications=pending_share_notification_resources,
# resource_bundle_type="share_object_notifications")
# _map_groups_to_resource_bundles(list_of_notifications=unhealthy_share_objects_notification_resources,
# resource_bundle_type="share_object_notifications") ....

for notification_resources, resource_bundle_type in resources_type_tuple:
_map_groups_to_resource_bundles(list_of_notifications=notification_resources, resource_bundle_type=resource_bundle_type)

for group, resource_bundle in group_name_to_resource_bundle_map.items():
email_body = _construct_email_body(resource_bundle)
log.debug(email_body)
subject = 'Attention Required | Data.all weekly digest'
try:
SESEmailNotificationService.create_and_send_email_notifications(subject=subject, msg=email_body,
recipient_groups_list=[group])
except Exception as e:
log.error(f"Error occurred in sending email while weekly reminder task due to: {e}")
task_exceptions.append(f"Error occurred in sending email while weekly reminder task due to: {e}")
except Exception as e:
log.error(f"Error occurred while running the weekly reminder task: {e}")
task_exceptions.append(f"Error occurred while running the weekly reminder task: {e}")
finally:
if len(task_exceptions) > 0:
log.info("Sending email notifications to the admin team")
AdminNotificationService().notify_admins_with_error_log(
process_error="Error occurred while running the weekly reminder task",
error_logs=task_exceptions,
process_name="Weekly reminder task"
)


def _construct_email_body(resource_bundle: NotificationResourceBundle):
msg_heading = """
Dear Team, <br><br>

This email contains data.al resources where you need to take some actions. For resources which are in unhealthy state we request you to take actions ASAP so as to minimize any disruptions.<br><br>

<b>Helpful Tips:</b><br><br>
For shares which are in unhealthy state, you can re-apply share by clicking on the "Reapply share" button <br>
For environments and datasets which are in unhealthy state, you can go to the AWS account and check the stack associated with that environment/dataset and check the root cause of the stack. Once you address the root cause issue, you can click on "Update Stack" on the Stack Page. <br><br><br>
"""
msg_content = """"""
share_object_table_content = _create_table_for_resource(resource_bundle.share_object_notifications, "shareUri",
"/console/shares/") if len(
resource_bundle.share_object_notifications) > 0 else ""
dataset_object_table_content = _create_table_for_resource(resource_bundle.dataset_object_notifications,
"datasetUri",
"/console/s3-datasets/") if len(
resource_bundle.dataset_object_notifications) > 0 else ""
environment_object_table_content = _create_table_for_resource(resource_bundle.environment_object_notifications,
"environmentUri",
"/console/environments/") if len(
resource_bundle.environment_object_notifications) > 0 else ""

msg_content += share_object_table_content + dataset_object_table_content + environment_object_table_content + "<br><br>"

msg_footer = """
In case your stack(s) or share object(s) are still in unhealthy state after applying remedial measures, please contact data.all team. <br><br>
Regards,<br>
data.all Team
"""

return msg_heading + msg_content + msg_footer


def _create_table_for_resource(list_of_resources, uri_attr, link_uri):
table_heading = """
<tr>
<th align='center'>
Type
</th>
<th align='center'>
Link
</th>
<th align='center'>
Status
</th>
</tr>
"""
table_body = """"""
for resource in list_of_resources:
table_body += f"""
<tr>
<td align='center'>
{resource.resource_type}
</td>
<td align='center'>
{os.environ.get('frontend_domain_url', '') + link_uri + resource.resource.__getattribute__(uri_attr)}
</td>
<td align='center'>
{resource.resource_status}
</td>
</tr>
"""
table = f"""
<table border='1' style='border-collapse:collapse'>
{table_heading}
{table_body}
</table>
<br>
<br>
"""

return table


if __name__ == '__main__':
log.info("Starting weekly reminders task")
load_modules(modes={ImportMode.SHARES_TASK})
ENVNAME = os.environ.get('envname', 'dkrcompose')
ENGINE = get_engine(envname=ENVNAME)
group_name_to_resource_bundle_map: Dict[str, NotificationResourceBundle] = {}
send_reminder_email(engine=ENGINE)
Loading
Loading