diff --git a/.gitignore b/.gitignore index 88360e2..8ce433e 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,17 @@ doc/_build/ # configuration .env +useful_code/.ipynb_checkpoints/ +useful_code/Tests.ipynb +useful_code/hop_comms.ipynb +useful_code/local-testing.ipynb +useful_code/logging.log +useful_code/slack_test.py +useful_code/SNEWS_MSGs/21_08_30/subscribed_messages.json +hop_comms/SNEWS_MSGs/21_08_30/subscribed_messages.json +hop_comms/slack_test.py +hop_comms/logging.log +hop_comms/local-testing.ipynb +hop_comms/.ipynb_checkpoints/local-testing-checkpoint.ipynb +hop_comms/.ipynb_checkpoints/dev-testing-checkpoint.ipynb +hop_comms/.ipynb_checkpoints/dev-testing2-subscribe-checkpoint.ipynb diff --git a/hop_comms/default_env.env b/hop_comms/default_env.env new file mode 100644 index 0000000..f355a64 --- /dev/null +++ b/hop_comms/default_env.env @@ -0,0 +1,10 @@ +TIME_STRING_FORMAT="%y/%m/%d %H:%M:%S" +DATABASE_SERVER="mongodb://localhost:27017/" + +NEW_DATABASE=0 +COINCIDENCE_THRESHOLD=10 +MSG_EXPIRATION=120 + +HOP_BROKER="kafka.scimma.org" +OBSERVATION_TOPIC="kafka://${HOP_BROKER}/snews.experiments-test" +ALERT_TOPIC="kafka://${HOP_BROKER}/snews.alert-test" \ No newline at end of file diff --git a/hop_comms/detector_properties.json b/hop_comms/detector_properties.json new file mode 100644 index 0000000..f5c1170 --- /dev/null +++ b/hop_comms/detector_properties.json @@ -0,0 +1 @@ +{"TEST": ["TEST", 0, "TESTloc"], "Super-K": ["Super-K", 1, "loc Super-K"], "Hyper-K": ["Hyper-K", 2, "loc Hyper-K"], "SNO+": ["SNO+", 3, "loc SNO+"], "KamLAND": ["KamLAND", 4, "loc KamLAND"], "LVD": ["LVD", 5, "loc LVD"], "ICE": ["ICE", 6, "loc ICE"], "Borexino": ["Borexino", 7, "loc Borexino"], "HALO-1kT": ["HALO-1kT", 8, "loc HALO-1kT"], "HALO": ["HALO", 9, "loc HALO"], "NOvA": ["NOvA", 10, "loc NOvA"], "KM3NeT": ["KM3NeT", 11, "loc KM3NeT"], "Baksan": ["Baksan", 12, "loc Baksan"], "JUNO": ["JUNO", 13, "loc JUNO"], "LZ": ["LZ", 14, "loc LZ"], "DUNE": ["DUNE", 15, "loc DUNE"], "MicroBooNe": ["MicroBooNe", 16, "loc MicroBooNe"], "SBND": ["SBND", 17, "loc SBND"], "DS-20K": ["DS-20K", 18, "loc DS-20K"], "XENONnT": ["XENONnT", 19, "loc XENONnT"], "PandaX-4T": ["PandaX-4T", 20, "loc PandaX-4T"]} \ No newline at end of file diff --git a/hop_comms/dev-testing.ipynb b/hop_comms/dev-testing.ipynb new file mode 100644 index 0000000..49ec00d --- /dev/null +++ b/hop_comms/dev-testing.ipynb @@ -0,0 +1,477 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "6f2d8643", + "metadata": {}, + "source": [ + "# Hop-pub functions" + ] + }, + { + "cell_type": "markdown", + "id": "65610884", + "metadata": {}, + "source": [ + "Sebastian Torres-Lara
\n", + "Melih Kara
\n", + "30-08-2021" + ] + }, + { + "cell_type": "markdown", + "id": "dbdff061", + "metadata": {}, + "source": [ + "Todo: \n", + "- Check how the heartbeat runs in the background. \n", + "- Check if `SNEWSObservation` and `SNEWSHeartbeat` are good enough or if they can be improved
\n", + "- ~For me (Melih) publising often fails (simple hop publish) for some reason. Need to investigate.~" + ] + }, + { + "cell_type": "markdown", + "id": "e1afae6e", + "metadata": {}, + "source": [ + "~Should we implement the following format?~ I do not like it. \n", + "```python\n", + "return SNEWSObservation(message_id=str(uuid.uuid4()),\n", + " detector_id=self.detector.id,\n", + " sent_time=self.time_str(), \n", + " neutrino_time=self.time_str(), \n", + " machine_time=self.time_str(), \n", + " location=self.detector.location,\n", + " p_value=0.5,\n", + " status=\"none\",\n", + " content=\"none\").asdict()\n", + "```\n", + "SNEWSObservation requires all fields to be filled for the different piers the idea is the opposite.
\n", + "we should be able to not-return some of the fields.
\n", + "\n", + "**-** Maybe fill the empty sections with `'none'` ?" + ] + }, + { + "cell_type": "markdown", + "id": "2a391167", + "metadata": {}, + "source": [ + "## Application" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "5418d888", + "metadata": {}, + "outputs": [], + "source": [ + "from hop_pub import Publish_Observation" + ] + }, + { + "cell_type": "markdown", + "id": "cae59fb0", + "metadata": {}, + "source": [ + "First create an object with the information.
\n", + "Below, the `Publish_Observation` can have \n", + "- `message` argument as a dictionary, `SNEWSObservation` or `SNEWSHeartbeat` objects, a file path containing a json or a dictionary.
\n", + "- `detector` argument can also be specified in which case the object knows its ID and location. If not given, it takes a detector called 'TEST'.
\n", + "- `env_path` as a string containing the broker, time formatting and other information. If not specified, uses the deafult environment 'test_env.env'\n", + "- If `welcome` argument is set, it displays the versions and the connected topics." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "da1d8fcc", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "##################################################\n", + "# Publish SNEWS OBSERVATION Messages #\n", + "#____________________XENONnT_____________________#\n", + "#_________________19-loc XENONnT_________________#\n", + "##################################################\n", + "Your Python version:\n", + " 3.7.11 (default, Jul 27 2021, 14:32:16) \n", + "[GCC 7.5.0]\n", + "Current hop-client version:0.4.0\n", + " snews version:0.0.1\n", + "\n", + "Publishing to kafka.scimma.org\n", + "Observation Topic:\n", + "==> kafka://kafka.scimma.org/snews.experiments-test\n", + "Heartbeat Topic:\n", + "==> kafka://kafka.scimma.org/snews.experiments-test\n", + "\n", + "\n" + ] + } + ], + "source": [ + "publisher = Publish_Observation(detector='XENONnT', welcome=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "868dcfd7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['detector_id',\n", + " 'machine_time',\n", + " 'neutrino_time',\n", + " 'sent_time',\n", + " 'status',\n", + " 'p_value']" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "publisher.tier_keys_['Significance_Tier']" + ] + }, + { + "cell_type": "markdown", + "id": "72f91848", + "metadata": {}, + "source": [ + "### See & Modify the message before publishing" + ] + }, + { + "cell_type": "markdown", + "id": "84091130", + "metadata": {}, + "source": [ + "If no message fed in the beginning, it produces a default one like the following.
\n", + "One can add, or modify the fields." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "f36ab444", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Following OBS message to be published:\n", + "Current time:21/08/30 19:33:28\n", + "\n", + "message_id :19_O_21/08/30_19:33:26\n", + "detector_id :19\n", + "sent_time :21/08/30 19:33:26\n", + "neutrino_time :21/08/30 19:33:26\n", + "machine_time :21/08/30 19:33:26\n", + "location :loc XENONnT\n", + "p_value :0\n", + "status :none\n", + "content :none\n", + "\n", + "> modify self.message_dict or \n", + "> use .publish_to_tiers() method to publish (see .publish_to)\n" + ] + } + ], + "source": [ + "publisher.display_message()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "8d288242", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Following OBS message to be published:\n", + "Current time:21/08/30 19:33:29\n", + "\n", + "message_id :19_O_21/08/30_19:33:26\n", + "detector_id :19\n", + "sent_time :21/08/30 19:33:26\n", + "neutrino_time :21/08/30 19:33:26\n", + "machine_time :21/08/30 19:33:26\n", + "location :loc XENONnT\n", + "p_value :0\n", + "status :none\n", + "content :This is a modified content\n", + "New Field :This is a new field\n", + "\n", + "> modify self.message_dict or \n", + "> use .publish_to_tiers() method to publish (see .publish_to)\n" + ] + } + ], + "source": [ + "# modify the content and add a non-existing field\n", + "publisher.message_dict['content'] = 'This is a modified content'\n", + "publisher.message_dict['New Field'] = 'This is a new field'\n", + "publisher.display_message()" + ] + }, + { + "cell_type": "markdown", + "id": "2170e39d", + "metadata": {}, + "source": [ + "### Submission to Different Tiers" + ] + }, + { + "cell_type": "markdown", + "id": "48700be6", + "metadata": {}, + "source": [ + "Here, it is not clear to me conceptually. There is only one topic that the experiments can publish their messages.
\n", + "The different Tiers should be based on the content of the messages. Currently, the script does this by taking only the fields relevant for that tier. See ``" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "fb2addc2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'Significance_Tier': True, 'Coincidence_Tier': True, 'Timing_Tier': True}\n", + "{'Significance_Tier': True, 'Coincidence_Tier': True, 'Timing_Tier': False}\n" + ] + } + ], + "source": [ + "# See which tiers are connected\n", + "print(publisher.publish_to)\n", + "publisher.publish_to['Timing_Tier'] = False\n", + "print(publisher.publish_to)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "8efd733f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Publishing OBS message to Significance_Tier:\n", + "detector_id :19\n", + "machine_time :21/08/30 19:40:05\n", + "neutrino_time :21/08/30 19:40:05\n", + "sent_time :21/08/30 19:40:12\n", + "status :none\n", + "p_value :0\n", + "\n", + "Publishing OBS message to Coincidence_Tier:\n", + "detector_id :19\n", + "machine_time :21/08/30 19:40:05\n", + "neutrino_time :21/08/30 19:40:05\n", + "sent_time :21/08/30 19:40:16\n", + "status :none\n", + "\n", + "Publishing OBS message to Timing_Tier:\n", + "detector_id :19\n", + "machine_time :21/08/30 19:40:05\n", + "neutrino_time :21/08/30 19:40:05\n", + "sent_time :21/08/30 19:40:19\n", + "status :none\n", + "content :none\n" + ] + } + ], + "source": [ + "publisher.publish_to_tiers()" + ] + }, + { + "cell_type": "markdown", + "id": "8bab95e3", + "metadata": {}, + "source": [ + "---\n", + "**Publish the message**" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "99ba4479", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Published the Heartbeat message to kafka://kafka.scimma.org/snews.experiments-test:\n", + "detector_id :19\n", + "message_id :19_H_21/08/30_19:39:32\n", + "status :ON\n", + "sent_time :21/08/30 19:39:32\n" + ] + } + ], + "source": [ + "publisher.publish()" + ] + }, + { + "cell_type": "markdown", + "id": "dd35e6eb", + "metadata": {}, + "source": [ + "---" + ] + }, + { + "cell_type": "markdown", + "id": "1f241df1", + "metadata": {}, + "source": [ + "# Heartbeat Messages" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "10745f7e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Press Ctrl+C to exit\n", + "\n", + "Published the Heartbeat message to kafka://kafka.scimma.org/snews.experiments-test:\n", + "detector_id :19\n", + "message_id :19_H_21/08/30_19:40:53\n", + "status :OFF\n", + "sent_time :21/08/30 19:40:53\n", + "\n", + "Published the Heartbeat message to kafka://kafka.scimma.org/snews.experiments-test:\n", + "detector_id :19\n", + "message_id :19_H_21/08/30_19:41:03\n", + "status :OFF\n", + "sent_time :21/08/30 19:41:03\n", + "\n", + "Published the Heartbeat message to kafka://kafka.scimma.org/snews.experiments-test:\n", + "detector_id :19\n", + "message_id :19_H_21/08/30_19:41:13\n", + "status :ON\n", + "sent_time :21/08/30 19:41:13\n" + ] + } + ], + "source": [ + "from hop_pub import Publish_Heartbeat\n", + "publisher = Publish_Heartbeat(detector='XENONnT', rate=10)" + ] + }, + { + "cell_type": "markdown", + "id": "576ca975", + "metadata": {}, + "source": [ + "---" + ] + }, + { + "cell_type": "markdown", + "id": "7e79c7f9", + "metadata": {}, + "source": [ + "# Alert Messages" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "5dff618e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Published ALERT message to kafka://kafka.scimma.org/snews.alert-test !!!\n" + ] + } + ], + "source": [ + "from hop_pub import Publish_Alert\n", + "alert_publisher = Publish_Alert()\n", + "alert_publisher.publish()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ed9d4402", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Snews-Venv", + "language": "python", + "name": "snews-venv" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/hop_comms/dev-testing2-subscribe.ipynb b/hop_comms/dev-testing2-subscribe.ipynb new file mode 100644 index 0000000..24d08c8 --- /dev/null +++ b/hop_comms/dev-testing2-subscribe.ipynb @@ -0,0 +1,273 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0e8a1428", + "metadata": {}, + "source": [ + "# SNEWS hop_comms tests" + ] + }, + { + "cell_type": "markdown", + "id": "76e0b064", + "metadata": {}, + "source": [ + "## hop_sub\n", + "\n", + "First, subscribe to some channels using this notebook. Then, publish some messages using [this notebook](dev-testing.ipynb)" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "77ccb8d8", + "metadata": {}, + "outputs": [], + "source": [ + "from hop_sub import HopSubscribe" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "05057a33", + "metadata": {}, + "outputs": [], + "source": [ + "from hop_sub import HopSubscribe\n", + "subscriber = HopSubscribe()" + ] + }, + { + "cell_type": "markdown", + "id": "c2548dc6", + "metadata": {}, + "source": [ + "### Observations" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "bd2bc5e7", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Subscribing to OBSERVATION Topic\n", + "Broker:kafka://kafka.scimma.org/snews.experiments-test\n", + "OBSERVATION from 19 at 21/08/30 19:40:12\n", + "##################################################\n", + "# detector_id :19 #\n", + "# machine_time :21/08/30 19:40:05 #\n", + "# neutrino_time :21/08/30 19:40:05 #\n", + "# sent_time :21/08/30 19:40:12 #\n", + "# status :none #\n", + "# p_value :0 #\n", + "##################################################\n", + "OBSERVATION from 19 at 21/08/30 19:40:16\n", + "##################################################\n", + "# detector_id :19 #\n", + "# machine_time :21/08/30 19:40:05 #\n", + "# neutrino_time :21/08/30 19:40:05 #\n", + "# sent_time :21/08/30 19:40:16 #\n", + "# status :none #\n", + "##################################################\n", + "OBSERVATION from 19 at 21/08/30 19:40:19\n", + "##################################################\n", + "# detector_id :19 #\n", + "# machine_time :21/08/30 19:40:05 #\n", + "# neutrino_time :21/08/30 19:40:05 #\n", + "# sent_time :21/08/30 19:40:19 #\n", + "# status :none #\n", + "# content :none #\n", + "##################################################\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/tmp/ipykernel_1731/4199098751.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0;31m# env='some-env-file.env' can be given\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0;31m# publish using Publish_Observation\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 4\u001b[0;31m \u001b[0msubscriber\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msubscribe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'O'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mverbose\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", + "\u001b[0;32m/mnt/c/Users/bj7780/Documents/GitHub/hop-SNalert-app/useful_code/hop_sub.py\u001b[0m in \u001b[0;36msubscribe\u001b[0;34m(self, which_topic, verbose)\u001b[0m\n\u001b[1;32m 83\u001b[0m \u001b[0mstream\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mStream\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpersist\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 84\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mstream\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mopen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbroker\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"r\"\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 85\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mmessage\u001b[0m \u001b[0;32min\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 86\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mwhich_topic\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mupper\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m==\u001b[0m\u001b[0;34m'A'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0msnews_utils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdisplay_gif\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 87\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/hop/io.py\u001b[0m in \u001b[0;36m__iter__\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 340\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 341\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__iter__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 342\u001b[0;31m \u001b[0;32myield\u001b[0m \u001b[0;32mfrom\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 343\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 344\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__enter__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/hop/io.py\u001b[0m in \u001b[0;36mread\u001b[0;34m(self, metadata, autocommit, **kwargs)\u001b[0m\n\u001b[1;32m 306\u001b[0m \u001b[0mobject\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 307\u001b[0m \"\"\"\n\u001b[0;32m--> 308\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mmessage\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_consumer\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstream\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mautocommit\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mautocommit\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 309\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_unpack\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmessage\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmetadata\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mmetadata\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 310\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/adc/consumer.py\u001b[0m in \u001b[0;36m_stream_forever\u001b[0;34m(self, autocommit, batch_size, batch_timeout)\u001b[0m\n\u001b[1;32m 108\u001b[0m \u001b[0mlast_message\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mconfluent_kafka\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mMessage\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 109\u001b[0m \u001b[0;32mwhile\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 110\u001b[0;31m \u001b[0mmessages\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_consumer\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mconsume\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbatch_size\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mbatch_timeout\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtotal_seconds\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 111\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mm\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mmessages\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 112\u001b[0m \u001b[0merr\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0merror\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mKeyboardInterrupt\u001b[0m: " + ] + } + ], + "source": [ + "# subscribe to \"O\"bservation channel\n", + "# env='some-env-file.env' can be given\n", + "# publish using Publish_Observation\n", + "subscriber.subscribe('O', verbose=True)" + ] + }, + { + "cell_type": "markdown", + "id": "090a9f12", + "metadata": {}, + "source": [ + "---\n", + "### Heartbeats" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "aa3fdb4b", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Subscribing to HEARTBEAT Topic\n", + "Broker:kafka://kafka.scimma.org/snews.experiments-test\n", + "HEARTBEAT from 19 at 21/08/30 19:40:53\n", + "##################################################\n", + "# detector_id :19 #\n", + "# message_id :19_H_21/08/30_19:40:53 #\n", + "# status :OFF #\n", + "# sent_time :21/08/30 19:40:53 #\n", + "##################################################\n", + "HEARTBEAT from 19 at 21/08/30 19:41:03\n", + "##################################################\n", + "# detector_id :19 #\n", + "# message_id :19_H_21/08/30_19:41:03 #\n", + "# status :OFF #\n", + "# sent_time :21/08/30 19:41:03 #\n", + "##################################################\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/tmp/ipykernel_1731/3121971097.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0msubscriber\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msubscribe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'H'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mverbose\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", + "\u001b[0;32m/mnt/c/Users/bj7780/Documents/GitHub/hop-SNalert-app/useful_code/hop_sub.py\u001b[0m in \u001b[0;36msubscribe\u001b[0;34m(self, which_topic, verbose)\u001b[0m\n\u001b[1;32m 83\u001b[0m \u001b[0mstream\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mStream\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpersist\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 84\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mstream\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mopen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbroker\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"r\"\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 85\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mmessage\u001b[0m \u001b[0;32min\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 86\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mwhich_topic\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mupper\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m==\u001b[0m\u001b[0;34m'A'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0msnews_utils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdisplay_gif\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 87\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/hop/io.py\u001b[0m in \u001b[0;36m__iter__\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 340\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 341\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__iter__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 342\u001b[0;31m \u001b[0;32myield\u001b[0m \u001b[0;32mfrom\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 343\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 344\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__enter__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/hop/io.py\u001b[0m in \u001b[0;36mread\u001b[0;34m(self, metadata, autocommit, **kwargs)\u001b[0m\n\u001b[1;32m 306\u001b[0m \u001b[0mobject\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 307\u001b[0m \"\"\"\n\u001b[0;32m--> 308\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mmessage\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_consumer\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstream\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mautocommit\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mautocommit\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 309\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_unpack\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmessage\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmetadata\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mmetadata\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 310\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/adc/consumer.py\u001b[0m in \u001b[0;36m_stream_forever\u001b[0;34m(self, autocommit, batch_size, batch_timeout)\u001b[0m\n\u001b[1;32m 108\u001b[0m \u001b[0mlast_message\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mconfluent_kafka\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mMessage\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 109\u001b[0m \u001b[0;32mwhile\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 110\u001b[0;31m \u001b[0mmessages\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_consumer\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mconsume\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbatch_size\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mbatch_timeout\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtotal_seconds\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 111\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mm\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mmessages\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 112\u001b[0m \u001b[0merr\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0merror\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mKeyboardInterrupt\u001b[0m: " + ] + } + ], + "source": [ + "# Subscribe to \"H\"eartbeat topic\n", + "# publish using Publish_Heartbeat\n", + "subscriber.subscribe('H', verbose=True)" + ] + }, + { + "cell_type": "markdown", + "id": "7c62055d", + "metadata": {}, + "source": [ + "---\n", + "### Alert Messages" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "48f4437b", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Subscribing to ALERT Topic\n", + "Broker:kafka://kafka.scimma.org/snews.alert-test\n" + ] + }, + { + "data": { + "text/html": [ + "" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "##################################################\n", + "# time :21/08/30 19:42:50 #\n", + "# content :This is a SNEWS Alarm! #\n", + "##################################################\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/tmp/ipykernel_1731/2386257847.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;31m# Subscribe to \"A\"lert messages\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0;31m# Publish using Publish_Alert\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 3\u001b[0;31m \u001b[0msubscriber\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msubscribe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'A'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mverbose\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", + "\u001b[0;32m/mnt/c/Users/bj7780/Documents/GitHub/hop-SNalert-app/useful_code/hop_sub.py\u001b[0m in \u001b[0;36msubscribe\u001b[0;34m(self, which_topic, verbose)\u001b[0m\n\u001b[1;32m 83\u001b[0m \u001b[0mstream\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mStream\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpersist\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 84\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mstream\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mopen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbroker\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"r\"\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 85\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mmessage\u001b[0m \u001b[0;32min\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 86\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mwhich_topic\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mupper\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m==\u001b[0m\u001b[0;34m'A'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0msnews_utils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdisplay_gif\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 87\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/hop/io.py\u001b[0m in \u001b[0;36m__iter__\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 340\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 341\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__iter__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 342\u001b[0;31m \u001b[0;32myield\u001b[0m \u001b[0;32mfrom\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 343\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 344\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__enter__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/hop/io.py\u001b[0m in \u001b[0;36mread\u001b[0;34m(self, metadata, autocommit, **kwargs)\u001b[0m\n\u001b[1;32m 306\u001b[0m \u001b[0mobject\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 307\u001b[0m \"\"\"\n\u001b[0;32m--> 308\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mmessage\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_consumer\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstream\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mautocommit\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mautocommit\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 309\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_unpack\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmessage\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmetadata\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mmetadata\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 310\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/anaconda3/envs/snews-venv/lib/python3.7/site-packages/adc/consumer.py\u001b[0m in \u001b[0;36m_stream_forever\u001b[0;34m(self, autocommit, batch_size, batch_timeout)\u001b[0m\n\u001b[1;32m 108\u001b[0m \u001b[0mlast_message\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mconfluent_kafka\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mMessage\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 109\u001b[0m \u001b[0;32mwhile\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 110\u001b[0;31m \u001b[0mmessages\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_consumer\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mconsume\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbatch_size\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mbatch_timeout\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtotal_seconds\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 111\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mm\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mmessages\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 112\u001b[0m \u001b[0merr\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0merror\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mKeyboardInterrupt\u001b[0m: " + ] + } + ], + "source": [ + "# Subscribe to \"A\"lert messages\n", + "# Publish using Publish_Alert\n", + "subscriber.subscribe('A', verbose=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "966feede", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Snews-Venv", + "language": "python", + "name": "snews-venv" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/hop_comms/hop_pub.py b/hop_comms/hop_pub.py new file mode 100644 index 0000000..6b4f953 --- /dev/null +++ b/hop_comms/hop_pub.py @@ -0,0 +1,242 @@ +""" +An interface for SNEWS member experiment +to publish their observation and heartbeat messages + +Created: +August 2021 +Authors: +Melih Kara +Sebastian Torres-Lara +""" +import hop, snews, sys, time, os, json +from hop import Stream +from datetime import datetime +from collections import namedtuple +from dotenv import load_dotenv +import snews_utils + +Detector = namedtuple("Detector", ["name", "id", "location"]) + +class Publish: + """ Class to format and publish messages + """ + def __init__(self, message, detector, env_path): + #### + self.publish_to = {'Significance_Tier':True, 'Coincidence_Tier':True, 'Timing_Tier':True} + self.common_keys_ = ['detector_id','machine_time','neutrino_time', + 'sent_time','status'] + self.tier_keys_ = {'Significance_Tier':self.common_keys_ + ['p_value'], + 'Coincidence_Tier':self.common_keys_, + 'Timing_Tier':self.common_keys_ + ['content']} + #### + self.detector = snews_utils.get_detector(detector) + snews_utils.set_env(env_path) + self.broker = os.getenv("HOP_BROKER") + self.observation_topic = os.getenv("OBSERVATION_TOPIC") + self.alert_topic = os.getenv("ALERT_TOPIC") + self.heartbeat_topic = self.observation_topic + self.times = snews_utils.TimeStuff(env_path) + self.time_str = lambda : self.times.get_snews_time() + + self.message_dict = message + self.format_message(message) + self.__version__ = "0.0.6" + + + def id_format(self, topic_type='O'): + """ Returns formatted message ID + time format should always be same for all detectors + """ + date_time = self.times.get_snews_time(fmt="%y/%m/%d_%H:%M:%S") + return f'{self.detector.id}_{topic_type}_{date_time}' + + + def default_dict(self): + """ Returns the default dictionary + with all entries being 'none' + """ + return {"message_id": self.id_format(), + "detector_id": self.detector.id, + "sent_time": self.time_str(), + "neutrino_time": self.time_str(), + "machine_time": self.time_str(), + "location": self.detector.location, + "p_value": 0, + "status": "none", + "content": "none"} + + + def format_message(self, message): + """ Format the message + Takes the deafult dict and modifies the fields + according to input message + """ + # if no message is provided, make default + if type(message)==type(None): + self.message_dict = self.default_dict() + if isinstance(message,dict): + # overwrite default ones, add new ones, keep missing ones + self.message_dict = {**self.message_dict, **message} + if isinstance(message,list): + pass + if isinstance(message,hop.plugins.snews.SNEWSObservation): + pass + # read from a file + if isinstance(message,str): + try: + with open(message) as json_file: + self.message_dict = json.load(json_file) + except: + print(f'{message} is not a json file!' + 'Using a default example dict') + self.message_dict = self.default_dict() + finally: + self.format_message(self.message_dict) + + + # How to handle different tiers? + # Currently it publishes one message for each tier + # masks the unrelevant fields for each + def publish_to_tiers(self): # TODO: check tiers, combine keys + """ Publish messages to the indicated streams # Submit one message containing all + """ + for tier, flag in self.publish_to.items(): + if flag: + # if publish_to:tier is True + # select the relevant keys + tier_data = {x:self.message_dict[x] for x in self.tier_keys_[tier]} + # update sent time (overwrite) + tier_data['sent_time'] = self.time_str() + stream = Stream(persist=False) + with stream.open(self.observation_topic, "w") as s: + try: + s.write(tier_data) + except: None + print(f"\nPublishing OBS message to {tier}:") + for k,v in tier_data.items(): + print(f'{k:<20s}:{v}') + + + def display_message(self): + """ Display the mesagge without publishing + """ + print(f"Following OBS message to be published:\nCurrent time:{self.time_str()}\n") + for k,v in self.message_dict.items(): + print(f'{k:<20s}:{v}') + print(f"\n> modify self.message_dict or \n" + "> use .publish_to_tiers() method to publish (see .publish_to)") + + +class Publish_Observation(Publish): + """ Class to publish observation messages + """ + def __init__(self, msg=None, detector='TEST', welcome=False, env_path=None): + super().__init__(msg, detector, env_path) + self.summarize = lambda env_path : snews_utils.summarize(self.detector, "OBSERVATION", env_path) + if welcome: self.summarize(env_path) + + + def publish(self): + """ Publish the current message + """ + obs_message = self.message_dict + obs_message['message_id'] = self.id_format(topic_type='O') + obs_message['status'] = 'ON' + obs_message['sent_time'] = self.time_str() + stream = Stream(persist=False) + with stream.open(self.observation_topic, "w") as s: + s.write(obs_message) + print(f"\nPublished OBS message to {self.observation_topic}:") + for k,v in obs_message.items(): + print(f'{k:<20s}:{v}') + + +class Publish_Heartbeat(Publish): + """ Class to publish hearbeat messages continuously + """ + def __init__(self, msg=None, detector='TEST', rate=30, env_path=None): + super().__init__(msg, detector, env_path) + self.rate = rate # seconds + self.summarize = lambda env_path : snews_utils.summarize(self.detector, "HEARTBEAT", env_path) + self.run_continouosly = self.background_schedule(self.rate) + + + def retrieve_status(self): + """ Script to retrieve detector status + """ + import numpy as np + return np.random.choice(['ON','OFF']) + + + def publish(self): + """ Publish heartbeat message + Publish default dict + """ + # hb_keys = ['detector_id','sent_time','status'] + # heartbeat_message = {k:v for k,v in self.message_dict if k in hb_keys} + heartbeat_message = {} + heartbeat_message['detector_id'] = self.detector.id + heartbeat_message['message_id'] = self.id_format(topic_type='H') + heartbeat_message['status'] = self.retrieve_status() + heartbeat_message['sent_time'] = self.time_str() + stream = Stream(persist=True) + try: + with stream.open(self.heartbeat_topic, "w") as s: + s.write(heartbeat_message) + print(f"\nPublished the Heartbeat message to {self.heartbeat_topic}:") + for k,v in heartbeat_message.items(): + print(f'{k:<20s}:{v}') + except: + print(f'publish() failed at {self.time_str()}') + + + # NEEDS WORK + def background_schedule(self, schedule=10): + """ Publish Heartbeat messages in background + On a given schedule + + Notes + ----- + Needs more work. Killing the process is not easy. + """ + from apscheduler.schedulers.background import BackgroundScheduler + import os + + scheduler = BackgroundScheduler() + scheduler.add_job(self.publish, 'interval', seconds=schedule) + scheduler.start() + print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) + try: + # This is here to simulate application activity (which keeps the main thread alive). + while True: + time.sleep(1) + except (KeyboardInterrupt, SystemExit): + # Not strictly necessary if daemonic mode is enabled but should be done if possible + scheduler.shutdown() + + +# Publish Alerts based on coincidince. +# Only relevant for the server +class Publish_Alert: + """ Class to publish SNEWS SuperNova Alerts + """ + def __init__(self, env_path=None): + snews_utils.set_env(env_path) + self.broker = os.getenv("HOP_BROKER") + self.alert_topic = os.getenv("ALERT_TOPIC") + self.times = snews_utils.TimeStuff(env_path) + self.time_str = lambda : self.times.get_snews_time() + + # decider should call this + def publish(self): + # from IPython.display import HTML, display + # giphy_snews = "https://raw.githubusercontent.com/SNEWS2/hop-SNalert-app/KaraMelih-dev/useful_code/snalert.gif" + # if snews_utils.isnotebook(): + # display(HTML(f'')) + snews_utils.display_gif() + alert_message = {'time':self.time_str()} + alert_message['content'] = 'This is a SNEWS Alarm!' + stream = Stream(persist=False) + with stream.open(self.alert_topic, "w") as s: + s.write(alert_message) + print(f"\nPublished ALERT message to {self.alert_topic} !!!") diff --git a/hop_comms/hop_sub.py b/hop_comms/hop_sub.py new file mode 100644 index 0000000..993dd0e --- /dev/null +++ b/hop_comms/hop_sub.py @@ -0,0 +1,95 @@ +""" +hop subscribe class +for the SNEWS member experiments (also others?) +to subscribe and listen to the alert topics + +# Author: +Sebastian Torres-Lara, Univ of Houston +Melih Kara kara@kit.edu + +Notes +https://docs.python.org/3/howto/logging.html + +""" + +# Imports +import snews_utils +from hop import Stream +import os, json +from collections import namedtuple + + +class HopSubscribe: + def __init__(self, env_path=None): + snews_utils.set_env(env_path) + self.broker = os.getenv("HOP_BROKER") + self.observation_topic = os.getenv("OBSERVATION_TOPIC") # only snews can subscribe + self.alert_topic = os.getenv("ALERT_TOPIC") + # for testing + self.heartbeat_topic = self.observation_topic + self.logger = snews_utils.get_logger('snews_sub','logging.log') + + # time object/strings + self.times = snews_utils.TimeStuff(env_path) + self.hr = self.times.get_hour() + self.date = self.times.get_date() + self.snews_time = lambda : self.times.get_snews_time() + + + def save_message(self, message): + """ Save messages to a json file + """ + path = f'SNEWS_MSGs/{self.times.get_date()}/' + snews_utils.make_dir(path) + file = path+'subscribed_messages.json' + # read the existing file + try: + data = json.load(open(file)) + if not isinstance(data, dict): + print('Incompatible file format!') + return None + # TODO: deal with `list` type objects + except: + data = {} + # add new message with a current time stamp + current_time = self.snews_time() + data[current_time] = message + with open(file, 'w') as outfile: + json.dump(data, outfile, indent=4, sort_keys=True) + # self.logger.info(str(message)) + + def subscribe(self, which_topic='A', verbose=False): + ''' Subscribe and listen to a given topic + Arguments + --------- + which_topic : str + Topic to listen. If 'A' or 'O' listens the + alert or observation topics set by the env file + long string indicating the full topic link can also + be given + + ''' + if len(which_topic)==1: + # set topic enum, get name and broker + topic = snews_utils.set_topic_state(which_topic) + name = topic.topic_name + broker = topic.topic_broker + print(f'Subscribing to {name} Topic\nBroker:{broker}') + else: + name = which_topic.split('/')[-1] + broker = which_topic + # self.logger.info(f'{self.snews_time()}: Listening the {broker}\n') + # Initiate hop_stream + stream = Stream(persist=True) + with stream.open(broker, "r") as s: + for message in s: + if which_topic.upper()=='A': snews_utils.display_gif() + else: + print(f"{name} from {message['detector_id']}" + f" at {message['sent_time']}") + if verbose: + print('#'.center(50, '#')) + for k,v in message.items(): + print(f'# {k:<20s}:{v:<25} #') + print('#'.center(50, '#')) + self.save_message(message) \ No newline at end of file diff --git a/hop_comms/make_detector_file.py b/hop_comms/make_detector_file.py new file mode 100644 index 0000000..e5063a5 --- /dev/null +++ b/hop_comms/make_detector_file.py @@ -0,0 +1,39 @@ +""" +Melih Kara 26/06/2021 + +Script to create detector file. +New detectors can be added here. +""" + +import json +from collections import namedtuple + +Detector = namedtuple("Detector", ["name", "id", "location"]) +detectors = { + "TEST" : Detector("TEST", 0,"TESTloc"), + "Super-K" : Detector("Super-K", 1, "loc Super-K"), + "Hyper-K" : Detector("Hyper-K", 2, "loc Hyper-K"), + "SNO+" : Detector("SNO+", 3, "loc SNO+"), + "KamLAND" : Detector("KamLAND", 4, "loc KamLAND"), + "LVD" : Detector("LVD", 5, "loc LVD"), + "ICE" : Detector("ICE", 6, "loc ICE"), + "Borexino" : Detector("Borexino", 7, "loc Borexino"), + "HALO-1kT" : Detector("HALO-1kT", 8, "loc HALO-1kT"), + "HALO" : Detector("HALO", 9, "loc HALO"), + "NOvA" : Detector("NOvA", 10, "loc NOvA"), + "KM3NeT" : Detector("KM3NeT", 11, "loc KM3NeT"), + "Baksan" : Detector("Baksan", 12, "loc Baksan"), + "JUNO" : Detector("JUNO", 13, "loc JUNO"), + "LZ" : Detector("LZ", 14, "loc LZ"), + "DUNE" : Detector("DUNE", 15, "loc DUNE"), + "MicroBooNe" :Detector("MicroBooNe", 16, "loc MicroBooNe"), + "SBND" : Detector("SBND", 17, "loc SBND"), + "DS-20K" : Detector("DS-20K", 18, "loc DS-20K"), + "XENONnT" : Detector("XENONnT", 19, "loc XENONnT"), + "PandaX-4T" : Detector("PandaX-4T", 20, "loc PandaX-4T"), + } + +with open('detector_properties.json', 'w') as outfile: + json.dump(detectors, outfile) + +print('detector_properties.json file created!') \ No newline at end of file diff --git a/hop_comms/observation-example.json b/hop_comms/observation-example.json new file mode 100644 index 0000000..2e69ed5 --- /dev/null +++ b/hop_comms/observation-example.json @@ -0,0 +1 @@ +{"message_id": 0, "detector_id": 0, "sent_time": "21/06/06 14:21:37", "neutrino_time": "87/02/24 14:23:41", "machine_time": "21/06/06 14:21:37", "location": "test", "p_value": 0, "status": "test", "content": "test"} \ No newline at end of file diff --git a/useful_code/slack_alert.py b/hop_comms/slack_alert.py similarity index 100% rename from useful_code/slack_alert.py rename to hop_comms/slack_alert.py diff --git a/hop_comms/snalert.gif b/hop_comms/snalert.gif new file mode 100644 index 0000000..44284b6 Binary files /dev/null and b/hop_comms/snalert.gif differ diff --git a/hop_comms/snews_utils.py b/hop_comms/snews_utils.py new file mode 100644 index 0000000..8fe0f9a --- /dev/null +++ b/hop_comms/snews_utils.py @@ -0,0 +1,175 @@ +from dotenv import load_dotenv +from datetime import datetime +from collections import namedtuple +import os, json +from pathlib import Path + + +def check_mongo_connection(): + pass + + +def check_hop_connection(): + pass + + +def set_env(self, env_path=None): + """ Set environment + Arguments + --------- + env_path : str (optional) + path for the environment file. + Use default settings if not given + """ + env = env_path or './default_env.env' + load_dotenv(env) + + +def make_dir(path): + if Path(path).is_dir(): + pass + else: + os.makedirs(path) + + +class TimeStuff: + ''' SNEWS format datetime objects + ''' + def __init__(self, env=None): + set_env(env) + self.snews_t_format = os.getenv("TIME_STRING_FORMAT") + self.hour_fmt = "%H:%M:%S" + self.date_fmt = "%y_%m_%d" + + self.get_datetime = datetime.utcnow() + self.get_snews_time = lambda fmt=self.snews_t_format : datetime.utcnow().strftime(fmt) + self.get_hour = lambda fmt=self.hour_fmt : datetime.utcnow().strftime(fmt) + self.get_date = lambda fmt=self.date_fmt : datetime.utcnow().strftime(fmt) + + +def set_topic_state(which_topic): + Topics = namedtuple('Topics', ['topic_name', 'topic_broker']) + topics = { + 'A': Topics('ALERT', 'kafka://kafka.scimma.org/snews.alert-test'), + 'O': Topics('OBSERVATION', 'kafka://kafka.scimma.org/snews.experiments-test'), + 'H': Topics('HEARTBEAT', 'kafka://kafka.scimma.org/snews.experiments-test') + } + return topics[which_topic.upper()] + + +# retrieve the detector properties +def retrieve_detectors(detectors_path="./detector_properties.json"): + ''' Retrieve the name-ID-location of the + participating detectors. + + ''' + # search for the pre-saved detectors file, create if not exist + if not os.path.isfile(detectors_path): + os.system(f'python make_detector_file.py') + + with open(detectors_path) as json_file: + detectors = json.load(json_file) + + # make a namedtuple + Detector = namedtuple("Detector", ["name", "id", "location"]) + for k,v in detectors.items(): + detectors[k] = Detector(v[0], v[1], v[2]) + return detectors + + +def get_detector(detector, detectors_path="./detector_properties.json"): + """ Return the selected detector properties + + """ + Detector = namedtuple("Detector", ["name", "id", "location"]) + if isinstance(detector, Detector): return detector # not needed? + # search for the detector name in `detectors` + detectors = retrieve_detectors(detectors_path) + if isinstance(detector, str): + try: + return detectors[detector] + except KeyError: + print(f'{detector} is not a valid detector!') + return detectors['TEST'] + + +def summarize(detector, topic_type_, env_path=None): + """ Summarize the current configuration + """ + import hop, snews, sys + set_env(env_path) + broker = os.getenv("HOP_BROKER") + observation_topic = os.getenv("OBSERVATION_TOPIC") + heartbeat_topic = os.getenv("OBSERVATION_TOPIC") + alert_topic = os.getenv("ALERT_TOPIC") + topic_type = f"Publish SNEWS {topic_type_} Messages" + print( + '#'.center(50, '#')+ + f'\n# {topic_type:^46} #\n' + f'#{detector.name:_^48}#\n' + f'#{str(detector.id)+"-"+detector.location:_^48}#\n'+ + '#'.center(50, '#')+ + f'\nYour Python version:\n {sys.version}\n' + f'Current hop-client version:{hop.__version__}\n' + f' snews version:{snews.__version__}\n\n' + f'Publishing to {broker}\n' + f'Observation Topic:\n==> {observation_topic}\n' + f'Heartbeat Topic:\n==> {heartbeat_topic}\n\n') + + +def isnotebook(): + """ Tell if the script is running on a notebook + """ + try: + shell = get_ipython().__class__.__name__ + if shell == 'ZMQInteractiveShell': + return True # Jupyter notebook or qtconsole + elif shell == 'TerminalInteractiveShell': + return False # Terminal running IPython + else: + return False # Other type (?) + except NameError: + return False # Probably standard Python interpreter + + +def get_logger(scriptname, logfile_name): + import logging + # Gets or creates a logger + logger = logging.getLogger(scriptname) + + # set log level + logger.setLevel(logging.INFO) + # define file handler and set formatter + file_handler = logging.FileHandler(logfile_name) + formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(name)s : %(message)s') + file_handler.setFormatter(formatter) + # add file handler to logger + logger.addHandler(file_handler) + return logger + + +def display_gif(): + if isnotebook(): + from IPython.display import HTML, display + giphy_snews = "https://raw.githubusercontent.com/SNEWS2/hop-SNalert-app/KaraMelih-dev/hop_comms/snalert.gif" + display(HTML(f'')) + +## Not working properly +# def run_parallel(nparallel=2): +# """ Run publish & subscribe methods in parallel +# Only works for notebooks. Requires ipyparallel +# Arguments +# --------- +# nparallel : int +# number of cells to run in parallel +# """ +# if not isnotebook(): +# import sys +# sys.exit('Cannot run processes in parallel') +# # enable the extension in the current environment +# os.system('ipcluster nbextension enable --user') +# os.system(f'ipcluster start -n {nparallel}') +# from ipyparallel import Client +# rc = Client() +# print("Run `%%px -a -t 0` magic command on the notebook!") +# return None \ No newline at end of file diff --git a/useful_code/hop_comms.ipynb b/useful_code/hop_comms.ipynb deleted file mode 100644 index 7e16ce3..0000000 --- a/useful_code/hop_comms.ipynb +++ /dev/null @@ -1,129 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "tribal-taylor", - "metadata": {}, - "outputs": [], - "source": [ - "from ipyparallel import Client\n", - "rc = Client()" - ] - }, - { - "cell_type": "markdown", - "id": "japanese-bryan", - "metadata": {}, - "source": [ - "Tutorial for ipyparallel : https://ipython-books.github.io/59-distributing-python-code-across-multiple-cores-with-ipython/" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "thirty-central", - "metadata": {}, - "outputs": [], - "source": [ - "%%px -a -t 3\n", - "\n", - "import hop_sub\n", - "hop_sub.sub_alrt()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "valuable-plain", - "metadata": {}, - "outputs": [], - "source": [ - "# %%px -a -t 0\n", - "# # SNEWS only\n", - "# import hop_sub\n", - "\n", - "# hop_sub.sub_obs()" - ] - }, - { - "cell_type": "markdown", - "id": "severe-medicaid", - "metadata": {}, - "source": [ - "-------------------" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "generic-river", - "metadata": {}, - "outputs": [], - "source": [ - "import hop_pub" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "suffering-declaration", - "metadata": {}, - "outputs": [], - "source": [ - "hop_pub.pub_test('A')" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "alive-verse", - "metadata": {}, - "outputs": [], - "source": [ - "hop_pub.pub_ccd_tier('A')" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "paperback-mortgage", - "metadata": {}, - "outputs": [], - "source": [ - "hop_pub.pub_sig_tier('A')" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "precious-teach", - "metadata": {}, - "outputs": [], - "source": [ - "hop_pub.pub_t_tier('A')" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.8" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/useful_code/hop_pub.py b/useful_code/hop_pub.py deleted file mode 100644 index 71ce81e..0000000 --- a/useful_code/hop_pub.py +++ /dev/null @@ -1,141 +0,0 @@ -# Author: Sebastian Torres-Lara, Univ of Houston - -# Imports -from hop import Stream -from datetime import datetime -import time - -# get current time year_month_day_hr_min_sec (UTC) -def time_str(): - return datetime.now().utcnow().strftime("%y/%m/%d %H:%M:%S") - - -# sets up stream source depending on topic -def set_topic(topic_type): - hop_broker = "kafka.scimma.org" - observation_topic = f"kafka://{hop_broker}/snews.experiments-test" - alert_topic = f"kafka://{hop_broker}/snews.alert-test" - heartbeat_topic = f"kafka://{hop_broker}/snews.alert-test" - if topic_type == "A": - return alert_topic - elif topic_type == "O": - return observation_topic - elif topic_type == "H": - return heartbeat_topic - else: - print( - "INVALID ENTRY:\nUse 'A' for ALERT_TOPIC \nOR\n 'O' for OBSERVATION_TOPIC \nOR\n 'H' for HEARTBEAT_TOPIC" - ) - - -# formats topic message, returns it as dict -def format_msg(SN_Trigger_Signal, topic_state): - # needs to take in trigger data - # get params from detector data, ideally pass the data as a dict - detc_name = 18 - sent_time_hr = datetime.now().utcnow().strftime("%H:%M:%S") - nu_t = datetime.now().utcnow().strftime("%H:%M:%S") - machine_t = datetime.now().utcnow().strftime("%H:%M:%S") - loc = "Somewhere" - p_val = "15 Mev : 0.69...." # method call - status = "ON" - t_series = '"4:12:56":{10.0:2}' # nested dict: time bin with a dict, with E (MeV) bin containing num of events - - if topic_state == "A": - return { - "message_id": 'Alert', - "detector_id": detc_name, - "sent_time": sent_time_hr, - "machine_time": machine_t, - } - elif topic_state == "H": - return { - "message_id": 'Heartbeat', - "detector_id": detc_name, - "machine_time": machine_t, - "status": status, - } - elif topic_state == "TT": - return { - "message_id": 'Timing Tier', - "detector_id": detc_name, - "neutrino_time": nu_t, - "machine_time": machine_t, - "status": status, - "timing-series": t_series, # publish as dict, t_bin:event_num - } - elif topic_state == "ST": - return { - "message_id": 'Significance Tier', - "detector_id": detc_name, - "machine_time": machine_t, - "neutrino_time": nu_t, - "status": status, - "p_value": p_val, # publish as dict, E_val:p_val - } - elif topic_state == "CT": - return { - "message_id": 'Coincidence Tier', - "detector_id": detc_name, - "machine_time": machine_t, - "neutrino_time": nu_t, - "status": status, - } - - -# with hop stream theses method publishes a single message everytime it is called -# must pass it the detector signal - -def pub_t_tier(detector_data): - detector_data = None - stream = Stream(persist=False) - msg = format_msg(detector_data, "TT") - with stream.open(set_topic("O"), "w") as s: - s.write(msg) - print(f"Publishing OBS message:\n{msg}") - - -def pub_sig_tier(detector_data): - detector_data = None - stream = Stream(persist=False) - msg = format_msg(detector_data, "ST") - with stream.open(set_topic("O"), "w") as s: - s.write(msg) - print(f"Publishing OBS message:\n{msg}") - - -def pub_ccd_tier(detector_data): - detector_data = None - stream = Stream(persist=False) - msg = format_msg(detector_data, "CT") - with stream.open(set_topic("O"), "w") as s: - s.write(msg) - print(f"Publishing OBS message:\n{msg}") - - -def pub_alert(detector_data): - detector_data = None - stream = Stream(persist=False) - msg = format_msg(detector_data, "A") - with stream.open(set_topic("A"), "w") as s: - s.write(msg) - print(f"Publishing ALERT message:\n{msg}") - - -def pub_heartbeat(detector_data): - stream = Stream(persist=True) - # while True: - with stream.open(set_topic("H"), "w") as s: - # detector_data = None needs to be called every time it's iterated ! - obs_msg = format_msg(detector_data, "H") - s.write(obs_msg) - time.sleep(60) - - -def pub_test(detector_data): - detector_data = None - stream = Stream(persist=False) - msg = {"message_id": 'DS_20k_test_obs', "detector_id": 'DS_20k', "sent_time": "21/06/06 14:21:37", "neutrino_time": datetime.now().strftime("%H:%M:%S"), "machine_time": datetime.now().strftime("%H:%M:%S"), "location": "test", "p_value": 0, "status": "test", "content": "test"} - with stream.open(set_topic("O"), "w") as s: - s.write(msg) - print(f"Publishing OBS message:\n{msg}") \ No newline at end of file diff --git a/useful_code/hop_sub.py b/useful_code/hop_sub.py deleted file mode 100644 index 9d59bcb..0000000 --- a/useful_code/hop_sub.py +++ /dev/null @@ -1,91 +0,0 @@ -# Author: Sebastian Torres-Lara, Univ of Houston - -# Imports -from datetime import datetime -from hop import Stream -from pathlib import Path -import os -import slack_alert - - -# get current time -def hr_str(): - return datetime.now().utcnow().strftime("%H:%M:%S") - - -# get current date -def date_str(): - return datetime.now().utcnow().strftime("%y_%m_%d") - - -# make dir with current data (yr_m_d) -def make_dir(path): - if Path(path).is_dir(): - pass - else: - os.makedirs(path) - - -# this method sets up the source for the Stream -def set_topic(topic_type): - hop_broker = "kafka.scimma.org" - observation_topic = f"kafka://{hop_broker}/snews.experiments-test" - alert_topic = f"kafka://{hop_broker}/snews.alert-test" - - if topic_type == "A": - return alert_topic - elif topic_type == "O": - return observation_topic - else: - print( - "INVALID ENTRY:\nUse 'A' for ALERT_TOPIC \nOR\n 'O' for OBSERVATION_TOPIC" - ) - - -# Sets up a persistent stream to OBSERVATION_TOPIC, -# sends OBS message to save_message_obs() -def sub_obs(): - stream = Stream(persist=True) - - with stream.open(set_topic("O"), "r") as s: - for message in s: - # print(f"saw an OBS at: {time_str()} from {message['detector_id']}") - slack_alert.send_slack_msg('O', message) - save_message_obs(message, message['detector_id']) - - -# Sets up a persistent stream to ALERT_TOPIC, -# sends OBS message to save_message_alert() -def sub_alrt(): - stream = Stream(persist=True) - - with stream.open(set_topic("A"), "r") as s: - for message in s: - # print(f"saw an ALERT at: {time_str()} from {message['detector_id']}") - slack_alert.send_slack_msg('A', message) - save_message_alert(message) - - -# This method converts hop OBS messages to a string and saves it to a txt file -def save_message_obs(message, detector_name): - path = f'SNEWS_MSGs/OBS/{date_str()}/' - make_dir(path) - text_file = open(f"{path}SNEWS_OBS_{detector_name}_{hr_str()}.txt", "w+") - text_file.write(str(message)) - text_file.close() - - -# This method converts hop ALERT messages to a string and saves it to a txt file -def save_message_alert(message): - path = f'SNEWS_MSGs/ALERT/{date_str()}/' - make_dir(path) - text_file = open(f"{path}SNEWS_ALERT_{hr_str()}.txt", "w+") - text_file.write(str(message)) - text_file.close() - -# Whenever an alert is sent this method will search throug -def alert_send_rev_search(): - pass -# with stream.open(set_topic("A"), "r") as s: -# for message in s: -# # send to reverse search module