-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhome.py
More file actions
294 lines (266 loc) Β· 9.72 KB
/
Copy pathhome.py
File metadata and controls
294 lines (266 loc) Β· 9.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import logging
import os
import subprocess
import sys
import uuid
from datetime import datetime
from pathlib import Path
import psutil
import pytz
import sentry_sdk
import streamlit as st
from applogging import init_logging
from sidebar import make_sidebar
from utils import (
SENTRY_DSN,
VERSION,
WAIT_TIME_SECONDS,
get_prices_user,
init_authenticator,
wait_time,
)
logger = logging.getLogger("gas_station_app")
st.set_page_config(
page_title="Carburoam",
page_icon="β½",
)
def trigger_etl():
"""
Trigger the ETL process in a subprocess.
Args:
None
Returns:
None
"""
# create a new uuid for process opening
str_uuid = str(uuid.uuid4())
with (
open(f"outputs/stdout_{str_uuid}.txt", "wb") as out,
open(f"outputs/stderr_{str_uuid}.txt", "wb") as err,
):
subprocess.Popen(
[f"{sys.executable}", "utils.py", "--action", "etl"],
stdout=out,
stderr=err,
)
def check_last_job(multiplier=1.0):
"""
Check if the last job is recent, if not, start a new job.
Args:
multiplier (float): a multiplier to the WAIT_TIME_SECONDS to increase
the time to wait before starting a new job. Default is 1.0.
Returns:
None
"""
if os.path.exists("lastjob.txt"):
# check the last job date, do not start subprocess if recent
with open("lastjob.txt", "r") as file:
date = file.read()
# parse the date (dumped as datetime.now())
date = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f")
st.session_state["lastjob"] = date
# if the detla from now is greater than WAIT_TIME_SECONDS
if (
datetime.now() - date
).total_seconds() > WAIT_TIME_SECONDS * multiplier:
logger.info("Last job was not recent, starting new job")
trigger_etl()
else:
logger.info("Last job was recent, skipping")
def cleanup_output_files():
"""
Remove output files older than 1 day.
"""
for file in Path("outputs").glob("*.txt"):
# get last modified date
try:
last_modified = datetime.fromtimestamp(file.stat().st_mtime)
# if the file is older than 1 day, remove it
if (datetime.now() - last_modified).days > 1:
logger.info(f"Removing {file}")
file.unlink()
except FileNotFoundError:
# it means another process has deleted the file
pass
def main():
# check if the pid file exists
if not os.path.exists("pid.txt"):
logger.info("No pid file found, creating a job")
# if it doesn't exist, trigger the subprocess job
# delete and remove output files under outputs
cleanup_output_files()
# launch routine in case no pid
if os.path.exists("lastjob.txt"):
check_last_job()
else:
trigger_etl()
else:
# check if pid is still running
with open("pid.txt", "r") as file:
pid = int(file.read())
# check if the process is still running
try:
process_etl = psutil.Process(pid)
except psutil.NoSuchProcess:
process_etl = None
Path("pid.txt").unlink(missing_ok=True)
if process_etl is None or not process_etl.is_running():
logger.info("PID not found, creating a new job")
# delete and remove output files under outputs
cleanup_output_files()
# launch routine in case no pid
if os.path.exists("lastjob.txt"):
check_last_job(multiplier=1.2)
else:
trigger_etl()
elif os.path.exists("lastjob.txt"):
with open("lastjob.txt", "r") as file:
date = file.read()
# parse the date (dumped as datetime.now())
date = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f")
st.session_state["lastjob"] = date
# if the detla from now is greater than WAIT_TIME_SECONDS
if (
datetime.now() - date
).total_seconds() > WAIT_TIME_SECONDS * 1.2:
logger.info("Process is stall, new job is not created")
# kill the process
process_etl.kill()
# check if pid file still exists
if os.path.exists("pid.txt"):
os.remove("pid.txt")
cleanup_output_files()
trigger_etl()
else:
logger.info("Process is creating a job, waiting")
# display info about last job if any
if os.path.exists("lastjob.txt"):
with open("lastjob.txt", "r") as file:
date = file.read()
date = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f")
st.session_state["lastjob"] = date
authenticator, _ = init_authenticator()
with st.sidebar:
authenticator.login(location="sidebar")
if st.session_state["authentication_status"]:
logger.info("User logged in")
authenticator.logout("Logout", "sidebar")
st.write(f"Welcome on Carburoam, *{st.session_state['name']}*")
st.title("Stations ππΈπ’οΈ")
# create a dataframe from the custom stations and the prices
if st.session_state["username"] is not None:
get_prices_user(st.session_state["username"])
st.divider()
if st.session_state.get("lastjob"):
# convert the date to a string
last_job_datetime = st.session_state["lastjob"]
# convert to CET timezone
last_job_datetime = last_job_datetime.replace(tzinfo=pytz.utc)
last_job_datetime_paris = last_job_datetime.astimezone(
pytz.timezone("Europe/Paris")
)
last_job_str = datetime.strftime(
last_job_datetime_paris, "%A, %d %B %Y - %H:%M"
)
st.metric(
last_job_str,
"last extract of prices",
)
st.caption("Customize your experience β¬οΈ")
if st.session_state["username"] == "admin":
st.title("Admin dashboard π οΈ")
st.page_link(
"pages/admin.py", label="Click here to go to admin dashboard"
)
st.title("Profile dashboard π")
st.page_link(
"pages/profile.py",
label="Click here to go to your profile dashboard",
)
st.title("Stations dashboard β½")
st.page_link(
"pages/stations.py",
label="Click here to go to your stations dashboard",
)
elif st.session_state["authentication_status"] is False:
with st.sidebar:
st.page_link(
"pages/demo.py", label="Demo without registration", icon="π"
)
if "failed_login_attempts" in st.session_state:
try_dict = st.session_state["failed_login_attempts"]
try_number = try_dict.get(st.session_state["username"], 0)
logger.warning(
f"User {st.session_state['username']} failed to login once"
)
else:
try_number = 0
if try_number > 1:
logger.warning(
(
f"User {st.session_state['username']} failed "
"to login several times"
)
)
st.page_link(
"pages/forgot.py",
label="Forgot credentials? Click here for help",
)
st.error("Username/password is incorrect.")
wait_time(try_number)
elif st.session_state["authentication_status"] is None:
with st.sidebar:
st.page_link(
"pages/demo.py", label="Demo without registration", icon="π"
)
st.title("Welcome on Carburoam ππΈπ’οΈ newcomer !")
st.warning("π Please enter your username and password")
c0, c1 = st.columns(2)
with c0:
st.page_link(
"pages/register.py",
label="Not registered ? Click here to register",
)
with c1:
st.caption(
"*Want to see a demo of the website before register ?"
" Sure ! Click on demo on the left π*"
)
st.write()
c0, c1 = st.columns(2)
with c0:
st.page_link(
"pages/forgot.py",
label="Forgot credentials? Click here for help",
)
with c1:
st.caption(
"If you didn't entered a real email, don't worry, just DM me ! π"
)
make_sidebar(VERSION)
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
# Add data like request headers and IP for users,
# see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
send_default_pii=True,
)
# There is no official sentry/streamlit integration
# But people found some workarounds in the discussion:
# https://github.com/streamlit/streamlit/issues/3426
#
# This code might stop working in a future version
@st.cache_data()
def sentry_patch_streamlit():
"""Streamlit catches all exceptions, this monkey patch send exceptions to Sentry."""
import sys
script_runner = sys.modules["streamlit.runtime.scriptrunner.exec_code"]
original_func = script_runner.handle_uncaught_app_exception
def sentry_patched_func(ex):
sentry_sdk.capture_exception(ex)
original_func(ex)
script_runner.handle_uncaught_app_exception = sentry_patched_func
if __name__ == "__main__":
f"})"
init_logging()
main()