-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
101 lines (84 loc) · 3.47 KB
/
main.py
File metadata and controls
101 lines (84 loc) · 3.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import asyncio
import logging
import sys
from prometheus_client import start_http_server
from shared import constants
from listeners.file_notifications import FileNotificationListener
from listeners.end_readout import EndReadoutListener
from shared.task_processor import TaskProcessor
# file notification with expected sensors name in it
# expected sensors lives in s3 bucket
# topics are file notifications and end readout
# counter for .fits file
# counter for .json file
# counter for end run kafka message
# if files are missing then generate a log message
# counter for missing files over time
#
# add histogram summary over sliding time window
# add summary of files processed during the window: prometheus Summary
# all metrics are valid over the observation window, files come in every 30 seconds
#
# log error if a file comes in late or is missing
# TODO image source is MC, if its not MC then log it and pass on the end readout event
#
#
#
# TODO not sure we need persistence, we could just query aws for the actual images.
# this would work even for failover
# images get generated every 7 seconds
# end readout is in tai time
# timestamp end of readout compare to when files arrive to make sure they arrive within the window (7 seconds)
# file notifications should be sent out within 7 seconds of the timestampEndOfReadout time (tai time)
# file notification times are UTC
# unexplained file omission (UFO)
# Logging config
if constants.DEBUG_LOGS == "true":
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
else:
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
log = logging.getLogger(__name__)
async def main():
tasks = []
# start prometheus
start_http_server(8000)
# Initialize and start the TaskProcessor (singleton)
log.info("starting task processor...")
task_processor = TaskProcessor.get_instance()
tasks.append(task_processor.start())
# start our kafka listeners
log.info("starting file notification listener...")
file_notification_params = {
"topic": constants.FILE_NOTIFICATION_TOPIC_NAME,
"bootstrap_servers": constants.FILE_NOTIFICATION_KAFKA_BOOTSTRAP_SERVERS,
"group_id": constants.FILE_NOTIFICATION_KAFKA_GROUP_ID,
"enable_batch_processing": constants.KAFKA_BATCH_PROCESS_FILE_NOTIFICATIONS,
}
tasks.append(
FileNotificationListener(
**file_notification_params
).start()
)
if constants.SHOULD_RUN_END_READOUT_LISTENER:
log.info("starting end readout listener")
end_readout_listener_params = {
"topic": constants.END_READOUT_TOPIC_NAME,
"bootstrap_servers": constants.END_READOUT_KAFKA_BOOTSTRAP_SERVERS,
"group_id": constants.END_READOUT_KAFKA_GROUP_ID,
}
if constants.IS_PROD == "True":
end_readout_listener_params["schema_registry"] = constants.END_READOUT_SCHEMA_REGISTRY
end_readout_listener_params["auth"] = {
"security_protocol": constants.END_READOUT_SECURITY_PROTOCOL,
"sasl_mechanism": constants.END_READOUT_SASL_MECHANISM,
"sasl_plain_username": constants.END_READOUT_SASL_USERNAME,
"sasl_plain_password": constants.END_READOUT_SASL_PASSWORD,
}
tasks.append(
EndReadoutListener(
**end_readout_listener_params
).start()
)
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())