Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies = [
"scipy",
"tables>=3.8",
]
requires-python = ">=3.10"
requires-python = ">=3.12"

[project.optional-dependencies]
pipeline = [
Expand Down
151 changes: 77 additions & 74 deletions u19_pipeline/alert_system/rig_maintenance/check_rig_maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import logging
import os
import sys
from datetime import datetime
from datetime import date, datetime, timedelta
from pathlib import Path

import pandas as pd

try:
# When installed/used as a package, import directly
from u19_pipeline import lab, rig_maintenance
from u19_pipeline import lab, rig_maintenance, scheduler
from u19_pipeline.utils import slack_utils as su
except Exception as e: # pragma: no cover - only happens when package not available
print(f"Error importing modules: {e}")
Expand Down Expand Up @@ -120,83 +122,39 @@ def create_slack_message(overdue_items, current_date):
}
return message

# Group items by status
no_record_items = [item for item in overdue_items if item["status"] == "NO_RECORD"]
overdue_items_list = [item for item in overdue_items if item["status"] == "OVERDUE"]
# Split into overdue vs upcoming items based on status
overdue_list = [item for item in overdue_items if item.get("status") == "OVERDUE"]
upcoming_list = [item for item in overdue_items if item.get("status") == "UPCOMING"]

rigs_overdue = {item["location"] for item in overdue_list}
rigs_upcoming = {item["location"] for item in upcoming_list}
rigs_with_issues = sorted(rigs_overdue | rigs_upcoming)

overdue_lines = "\n".join(f"• {loc}" for loc in sorted(rigs_overdue)) or "• None"
upcoming_lines = "\n".join(f"• {loc}" for loc in sorted(rigs_upcoming)) or "• None"

text = (
f"⚠️ *Rig Maintenance Alert* - {current_date}\n\n"
f"*Overdue maintenance* rigs ({len(rigs_overdue)}):\n{overdue_lines}\n\n"
f"*Upcoming maintenance within warning window* rigs ({len(rigs_upcoming)}):\n{upcoming_lines}\n\n"
"For full details of overdue and upcoming items per rig, please visit "
"<https://braincogs-webgui.pni.princeton.edu/rig_maintenance|the rig maintenance web page>."
)

# Build message blocks. Slack limits: max 50 blocks and ~3000 chars per text field.
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"⚠️ *Rig Maintenance Alert* - {current_date}\n\n{len(overdue_items)} items require attention",
"text": text,
},
},
{"type": "divider"},
}
]

if no_record_items:
no_record_text = "\n".join([f"• {item['location']} - {item['maintenance_type']}" for item in no_record_items])
# Truncate if too long for Slack
if len(no_record_text) > 2500:
truncated = no_record_text[:2400] + "\n• ... (truncated)"
no_record_text = truncated
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"📋 *No Maintenance Records* ({len(no_record_items)} items):\n{no_record_text}",
},
}
)

if overdue_items_list:
if no_record_items:
blocks.append({"type": "divider"})

# Sort by most overdue first
overdue_items_list.sort(key=lambda x: x["days_overdue"], reverse=True)
overdue_text = "\n".join(
[
f"• {item['location']} - {item['maintenance_type']}: "
f"{item['days_overdue']} days overdue (last: {item['last_maintenance']})"
for item in overdue_items_list
]
)

# If the overdue_text is too large or blocks would exceed limits, fall back to a short list
if len(overdue_text) > 2500 or len(blocks) + 1 > 45:
# Provide a short top-N list and include full details in the log only
top_n = overdue_items_list[:25]
short_text = "\n".join(
[f"• {it['location']} - {it['maintenance_type']} ({it['days_overdue']} days)" for it in top_n]
)
if len(overdue_items_list) > len(top_n):
short_text += f"\n• ... and {len(overdue_items_list) - len(top_n)} more items (see logs)"

blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (f"🚨 *Overdue Maintenance* ({len(overdue_items_list)} items):\n{short_text}"),
},
}
)
else:
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"🚨 *Overdue Maintenance* ({len(overdue_items_list)} items):\n{overdue_text}",
},
}
)

message = {"text": f"Rig Maintenance Alert - {len(overdue_items)} items require attention", "blocks": blocks}
message = {
"text": f"Rig Maintenance Alert - {len(rigs_with_issues)} rigs have overdue or upcoming maintenance",
"blocks": blocks,
}

return message

Expand Down Expand Up @@ -231,8 +189,16 @@ def check_overdue_maintenance():
list: List of dictionaries containing overdue maintenance information
"""
overdue_items = []
upcoming_items = []
current_date = datetime.now().date()

# Determine which locations are active based on recent schedules (last 30 days)

five_weeks_ago = date.today() - timedelta(weeks=5)
query = scheduler.Schedule & f'date >= "{five_weeks_ago}"' & 'subject_fullname not like "test%"'
locations = query.fetch("location")
unique_locations = sorted(set(locations))

# Get all maintenance types and their intervals
maintenance_fetch = getattr(rig_maintenance.MaintenanceType, "fetch")
maintenance_types = maintenance_fetch(as_dict=True)
Expand All @@ -242,11 +208,13 @@ def check_overdue_maintenance():
'system_type = "rig"',
'acquisition_type in ("behavior", "electrophysiology", "2photon")',
"(location_description is not NULL and length(trim(location_description)) > 0)",
f"location in ({', '.join([f'"{loc}"' for loc in unique_locations])})",
]

merged_queries = " and ".join(queries)

loc_query = lab.Location & merged_queries

loc_fetch = getattr(loc_query, "fetch")
locations = loc_fetch(as_dict=True)

Expand All @@ -255,12 +223,21 @@ def check_overdue_maintenance():

for location in locations:
location_name = location["location"]
rig_number_of_lines = location["number_of_lines"]
logger.info(f"🔧 Checking rig: {location_name}")
logger.debug("-" * 61)

for mtype in maintenance_types:
maintenance_type = mtype["maintenance_type"]
main_num_of_lines = mtype.get("number_of_lines", 0)
if not (main_num_of_lines == 0 or main_num_of_lines == rig_number_of_lines):
# logger.info(
# f" {maintenance_type:.<30} SKIPPED (rig has {rig_number_of_lines} lines, "
# f"maintenance type for {main_num_of_lines} lines)"
# )
continue
interval_days = mtype["interval_days"]
notification_window = mtype.get("notification_window", 0)

# Find the most recent maintenance record for this rig and type
recent_q = rig_maintenance.RigMaintenance & {
Expand Down Expand Up @@ -305,11 +282,30 @@ def check_overdue_maintenance():
)
logger.info(f" {maintenance_type:.<30} OVERDUE by {days_overdue} days ❌")
else:
# Maintenance is up to date
# Not yet overdue: check if within notification window
days_until_due = interval_days - days_since_last
logger.info(f" {maintenance_type:.<30} OK ({days_until_due} days until due) ✅")

return overdue_items
if 0 < days_until_due <= notification_window:
upcoming_items.append(
{
"location": location_name,
"maintenance_type": maintenance_type,
"last_maintenance": last_maintenance_date,
"days_since_last": days_since_last,
"interval_days": interval_days,
"days_until_due": days_until_due,
"notification_window": notification_window,
"status": "UPCOMING",
"message": f"{maintenance_type} due in {days_until_due} days",
}
)
logger.info(
f" {maintenance_type:.<30} DUE SOON in {days_until_due} days (window {notification_window}) ⚠️"
)
else:
logger.info(f" {maintenance_type:.<30} OK ({days_until_due} days until due) ✅")

# Return combined list; status distinguishes OVERDUE vs UPCOMING
return overdue_items + upcoming_items


def log_summary(overdue_items):
Expand Down Expand Up @@ -392,6 +388,13 @@ def main():
# Log summary with rich formatting
log_summary(overdue_items)

records = pd.DataFrame(overdue_items)
records.columns = [" ".join(w.capitalize() for w in col.split("_")) for col in records.columns]
print(records)

records = records[["Location", "Maintenance Type", "Status"]]


# Send Slack notification with summary only
current_date = datetime.now().date()
send_slack_notification(overdue_items, current_date)
Expand Down
64 changes: 48 additions & 16 deletions u19_pipeline/alert_system/water_weigh_alert/water_weigh_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import time

import datajoint as dj
import u19_pipeline.lab as lab
import pandas as pd

import u19_pipeline.lab as lab
import u19_pipeline.utils.slack_utils as su
from u19_pipeline.utils.subject_metadata import fetch_slack_handles_for_lab_managers_by_subject

slack_configuration_dictionary = {
'slack_notification_channel': ['subject_health'],
"slack_notification_channel": ["subject_health"],
}

# Query from file
Expand Down Expand Up @@ -114,25 +115,52 @@ def find_unreturned_subjects() -> pd.DataFrame:

# Obtain the last transport date
local_transport_data: pd.DataFrame
local_transport_data = dj.U("subject_fullname").aggr(
action.Transport() & f'DATE(transport_out_datetime) = "{today}"',
# transport_out_datetime="MAX(transport_out_datetime)",
last_transport="MAX(transport_out_datetime)",
transport_in_datetime="MAX(transport_in_datetime)",
).fetch(format='frame')

unreturned_subjects: pd.DataFrame = local_transport_data[
local_transport_data["transport_in_datetime"].isnull()
]
local_transport_data = (
dj.U("subject_fullname")
.aggr(
action.Transport() & f'DATE(transport_out_datetime) = "{today}"',
# transport_out_datetime="MAX(transport_out_datetime)",
last_transport="MAX(transport_out_datetime)",
transport_in_datetime="MAX(transport_in_datetime)",
)
.fetch(format="frame")
)

unreturned_subjects: pd.DataFrame = local_transport_data[local_transport_data["transport_in_datetime"].isnull()]

return unreturned_subjects


def slack_alert_message_format_weight_water(subjects_not_watered, subjects_not_weighted, subjects_not_trained, missing_transport):
def slack_alert_message_format_weight_water(
subjects_not_watered: pd.DataFrame,
subjects_not_weighted: pd.DataFrame,
subjects_not_trained: pd.DataFrame,
missing_transport: pd.DataFrame,
):
now = datetime.datetime.now()
datestr = now.strftime("%d-%b-%Y %H:%M:%S")

print(missing_transport)
temp_missing_transport = missing_transport.copy().reset_index()
notifiable_subjects = set(
temp_missing_transport["subject_fullname"].tolist()
+ subjects_not_watered["subject_fullname"].tolist()
+ subjects_not_weighted["subject_fullname"].tolist()
)

slack_handles: list[str] = fetch_slack_handles_for_lab_managers_by_subject(notifiable_subjects)
lab_manager_text = "\n\n"
if len(slack_handles) >= 1:
lab_manager_text += "Lab Manager"
if len(slack_handles) > 1:
lab_manager_text += "s"

slack_handles_formatted = ", ".join("<@" + handle + ">" for handle in slack_handles)
if slack_handles_formatted:
lab_manager_text += (
" " + slack_handles_formatted + ", please be advised that your labs' subjects are listed below."
)

msep = dict()
msep["type"] = "divider"

Expand All @@ -141,7 +169,8 @@ def slack_alert_message_format_weight_water(subjects_not_watered, subjects_not_w
m1["type"] = "section"
m1_1 = dict()
m1_1["type"] = "mrkdwn"
m1_1["text"] = ":rotating_light: *Subjects Status Alert *"

m1_1["text"] = ":rotating_light: *Subjects Status Alert *" + lab_manager_text
m1["text"] = m1_1

# Info for subjects missing water
Expand Down Expand Up @@ -243,8 +272,7 @@ def main_water_weigh_alert():
subject_not_returned = find_unreturned_subjects()

slack_json_message = slack_alert_message_format_weight_water(
subjects_not_watered, subjects_not_weighted, subjects_not_trained,
missing_transport=subject_not_returned
subjects_not_watered, subjects_not_weighted, subjects_not_trained, missing_transport=subject_not_returned
)

webhooks_list = su.get_webhook_list(slack_configuration_dictionary, lab)
Expand All @@ -253,3 +281,7 @@ def main_water_weigh_alert():
for this_webhook in webhooks_list:
su.send_slack_notification(this_webhook, slack_json_message)
time.sleep(1)


if __name__ == "__main__":
main_water_weigh_alert()
Loading