-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathalert_system.py
More file actions
163 lines (137 loc) · 5.06 KB
/
Copy pathalert_system.py
File metadata and controls
163 lines (137 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from confluent_kafka import Consumer, Producer
import mysql.connector
import json
import os
# Configurazione di Kafka per il consumatore e produttore
KAFKA_TOPIC_CONSUMER = 'to-alert-system'
KAFKA_TOPIC_PRODUCER = 'to-notifier'
consumer_config = {
'bootstrap.servers': "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092",
'group.id': 'alert-system-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000
}
producer_config = {
'bootstrap.servers': "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092"
}
# Configurazione del database MySQL
MYSQL_HOST = os.getenv('MYSQL_HOST', 'host.docker.internal')
MYSQL_USER = os.getenv('MYSQL_USER', 'root')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', 'root')
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'db')
MYSQL_PORT = os.getenv('MYSQL_PORT', 3306)
# Crea il consumatore e il produttore Kafka
consumer = Consumer(consumer_config)
producer = Producer(producer_config)
def get_ticker_value_from_db(ticker):
"""
Recupera l'ultimo valore del ticker dal database.
"""
try:
conn = mysql.connector.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
database=MYSQL_DATABASE,
port=MYSQL_PORT
)
cursor = conn.cursor()
# Ottieni l'ultimo valore per il ticker
cursor.execute("""
SELECT value
FROM financial_data
WHERE ticker = %s
ORDER BY timestamp DESC LIMIT 1
""", (ticker,))
result = cursor.fetchone()
conn.close()
if result:
return result[0] # valore più recente del ticker
else:
return None
except mysql.connector.Error as err:
print(f"Errore durante il recupero del valore del ticker dal database: {err}")
return None
def check_profile_and_send_alert(email, ticker,current_value,condition):
"""
Verifica se il valore del ticker supera le soglie e invia un alert.
"""
try:
conn = mysql.connector.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
database=MYSQL_DATABASE,
port=MYSQL_PORT
)
cursor = conn.cursor()
# Esegui la query per ottenere il valore del ticker e le soglie
cursor.execute("""
SELECT ticker, high_value, low_value
FROM users
WHERE email = %s
""", (email,))
result = cursor.fetchone()
if result:
ticker_db, high_value, low_value = result
# Controlla se il valore supera la soglia
if ticker_db == ticker:
if high_value is not None and current_value >= high_value:
send_alert(email, ticker, "high-value exceeded")
elif low_value is not None and current_value <= low_value:
send_alert(email, ticker, "low-value exceeded")
conn.close()
except Exception as e:
print(f"Errore durante la verifica del profilo: {e}")
def send_alert(email, ticker, condition):
"""
Invia un messaggio al topic 'to-notifier' con i dettagli dell'alert.
"""
message = {
'email': email,
'ticker': ticker,
'condition': condition
}
producer.produce(KAFKA_TOPIC_PRODUCER, json.dumps(message))
producer.flush()
print(f"Alert inviato a {email} per {ticker}: {condition}")
def process_message(msg):
"""
Processa il messaggio dal topic 'to-alert-system'.
"""
data = json.loads(msg.value().decode('utf-8'))
if data.get('status') == 'completed':
print(f"Fase di aggiornamento completata per i tickers: {data.get('updated_tickers')}")
return
# Estrai le informazioni dal messaggio
email = data.get('email')
ticker = data.get('ticker')
# Recupera l'ultimo valore del ticker dal database
current_value = get_ticker_value_from_db(ticker)
if current_value is not None:
# Verifica se la condizione è soddisfatta per l'utente
check_profile_and_send_alert(email, ticker, current_value, "high")
check_profile_and_send_alert(email, ticker, current_value, "low")
else:
print(f"Nessun valore disponibile per il ticker {ticker}.")
def main():
# Sottoscrivi al topic Kafka
consumer.subscribe([KAFKA_TOPIC_CONSUMER])
try:
while True:
# Polling dei messaggi da Kafka
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
# Elabora il messaggio ricevuto
process_message(msg)
except KeyboardInterrupt:
print("Alert System interrotto dall'utente.")
finally:
consumer.close()
if __name__ == "__main__":
main()