Skip to content

Re #3159 - better send test handling #3162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions changedetectionio/blueprint/ui/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def ajax_callback_send_notification_test(watch_uuid=None):

# Watch_uuid could be unset in the case it`s used in tag editor, global settings
import apprise
from changedetectionio.notification.handler import process_notification
import queue
from changedetectionio.update_worker import update_worker
from changedetectionio.notification.apprise_plugin.assets import apprise_asset

from changedetectionio.notification.apprise_plugin.custom_handlers import apprise_http_custom_handler

apobj = apprise.Apprise(asset=apprise_asset)
Expand Down Expand Up @@ -92,7 +92,19 @@ def ajax_callback_send_notification_test(watch_uuid=None):

n_object['as_async'] = False
n_object.update(watch.extra_notification_token_values())
sent_obj = process_notification(n_object, datastore)

# Create a temporary notification queue for this test
notification_q = queue.Queue()

# Create a temporary update_worker instance just for using queue_notification_for_watch
worker = update_worker(queue.Queue(), notification_q, None, datastore)

# Use queue_notification_for_watch to process the notification with all tokens
worker.queue_notification_for_watch(notification_q, n_object, watch)

# Get the notification from the queue and process it
from changedetectionio.notification.handler import process_notification
sent_obj = process_notification(notification_q.get(), datastore)

except Exception as e:
e_str = str(e)
Expand Down