Skip to content

Commit 9e2446e

Browse files
committed
Should fix env issue
1 parent 03310cb commit 9e2446e

File tree

5 files changed

+108
-88
lines changed

5 files changed

+108
-88
lines changed

services/usage-stats/src/insert.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,23 @@
55
from sqlalchemy import text
66
from sqlalchemy.ext.asyncio import create_async_engine
77

8-
load_dotenv()
8+
async def insert(payload=None, table_name=None, db_url=None):
9+
load_dotenv()
910

10-
raw_url = os.getenv("DATABASE_URL")
11+
raw_url = db_url
1112

12-
# Ensure it uses asyncpg
13-
if raw_url.startswith("postgresql://"):
14-
raw_url = raw_url.replace("postgresql://", "postgresql+asyncpg://", 1)
15-
else:
16-
raise ValueError(
17-
"Invalid DATABASE_URL: must start with postgresql:// or postgresql+asyncpg://"
18-
)
19-
20-
DATABASE_URL = raw_url
21-
22-
engine = create_async_engine(DATABASE_URL, echo=True)
13+
# Ensure it uses asyncpg
14+
if raw_url.startswith("postgresql://"):
15+
raw_url = raw_url.replace("postgresql://", "postgresql+asyncpg://", 1)
16+
else:
17+
raise ValueError(
18+
"Invalid DATABASE_URL: must start with postgresql:// or postgresql+asyncpg://"
19+
)
20+
21+
DATABASE_URL = raw_url
2322

23+
engine = create_async_engine(DATABASE_URL, echo=True)
2424

25-
async def insert(payload=None, table_name=None):
2625
async with engine.connect() as conn:
2726
if payload is None:
2827
print("No payload provided")

services/usage-stats/src/last_id.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,23 @@
55
from sqlalchemy import text
66
from sqlalchemy.ext.asyncio import create_async_engine
77

8-
load_dotenv()
9-
10-
raw_url = os.getenv("DATABASE_URL")
11-
12-
# Ensure it uses asyncpg
13-
if raw_url.startswith("postgresql://"):
14-
raw_url = raw_url.replace("postgresql://", "postgresql+asyncpg://", 1)
15-
else:
16-
raise ValueError(
17-
"Invalid DATABASE_URL: must start with postgresql:// or postgresql+asyncpg://"
18-
)
19-
20-
DATABASE_URL = raw_url
8+
async def get_last_id(table_name=None, db_url=None):
9+
load_dotenv()
10+
11+
raw_url = db_url
12+
13+
# Ensure it uses asyncpg
14+
if raw_url.startswith("postgresql://"):
15+
raw_url = raw_url.replace("postgresql://", "postgresql+asyncpg://", 1)
16+
else:
17+
raise ValueError(
18+
"Invalid DATABASE_URL: must start with postgresql:// or postgresql+asyncpg://"
19+
)
2120

22-
engine = create_async_engine(DATABASE_URL, echo=True)
21+
DATABASE_URL = raw_url
2322

23+
engine = create_async_engine(DATABASE_URL, echo=True)
2424

25-
async def get_last_id(table_name=None):
2625
async with engine.connect() as conn:
2726
if table_name is None:
2827
print("No table name provided")

services/usage-stats/src/macrostrat.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,29 @@
77
from src.insert import insert
88
from src.last_id import get_last_id
99

10-
BATCH_SIZE = 1000 # Adjust as needed
10+
async def get_data(last_id, mariadb_url, db_url):
11+
BATCH_SIZE = 1000 # Adjust as needed
1112

12-
raw_url = os.getenv("MARIADB_URL")
13+
raw_url = mariadb_url
1314

14-
# Ensure the URL uses asyncmy driver
15-
if raw_url.startswith("mysql://"):
16-
raw_url = raw_url.replace("mysql://", "mysql+asyncmy://", 1)
17-
else:
18-
raise ValueError(
19-
"Invalid DATABASE_URL: must start with mysql:// or mysql+asyncmy://"
20-
)
21-
22-
DATABASE_URL = raw_url
15+
# Ensure the URL uses asyncmy driver
16+
if raw_url.startswith("mysql://"):
17+
raw_url = raw_url.replace("mysql://", "mysql+asyncmy://", 1)
18+
else:
19+
raise ValueError(
20+
"Invalid DATABASE_URL: must start with mysql:// or mysql+asyncmy://"
21+
)
2322

24-
# Create async engine
25-
engine = create_async_engine(DATABASE_URL, echo=True)
23+
DATABASE_URL = raw_url
2624

27-
# Async session factory
28-
AsyncSessionLocal = sessionmaker(
29-
bind=engine, expire_on_commit=False, class_=AsyncSession
30-
)
25+
# Create async engine
26+
engine = create_async_engine(DATABASE_URL, echo=True)
3127

28+
# Async session factory
29+
AsyncSessionLocal = sessionmaker(
30+
bind=engine, expire_on_commit=False, class_=AsyncSession
31+
)
3232

33-
async def get_data(last_id):
3433
async with AsyncSessionLocal() as session:
3534
query = text(
3635
"""
@@ -70,10 +69,10 @@ async def get_data(last_id):
7069
for row in rows
7170
]
7271

73-
await insert(payload, "macrostrat")
72+
await insert(payload, "macrostrat", db_url)
7473

7574

76-
async def get_macrostrat_data():
77-
last_id = await get_last_id("macrostrat")
78-
await get_data(last_id)
75+
async def get_macrostrat_data(mariadb_url, db_url):
76+
last_id = await get_last_id("macrostrat", db_url)
77+
await get_data(last_id, mariadb_url, db_url)
7978
print("Data fetching completed.")

services/usage-stats/src/rockd.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,32 +8,32 @@
88
from src.insert import insert
99
from src.last_id import get_last_id
1010

11-
BATCH_SIZE = 1000 # Adjust batch size as needed
11+
async def get_data(last_id, mariadb_url, db_url):
12+
BATCH_SIZE = 1000 # Adjust batch size as needed
1213

13-
raw_url = os.getenv("MARIADB_URL")
14+
raw_url = mariadb_url
1415

15-
# Ensure the URL uses asyncmy driver
16-
if raw_url.startswith("mysql://"):
17-
raw_url = raw_url.replace("mysql://", "mysql+asyncmy://", 1)
18-
else:
19-
raise ValueError(
20-
"Invalid DATABASE_URL: must start with mysql:// or mysql+asyncmy://"
21-
)
16+
# Ensure the URL uses asyncmy driver
17+
if raw_url.startswith("mysql://"):
18+
raw_url = raw_url.replace("mysql://", "mysql+asyncmy://", 1)
19+
else:
20+
raise ValueError(
21+
"Invalid DATABASE_URL: must start with mysql:// or mysql+asyncmy://"
22+
)
2223

23-
DATABASE_URL = raw_url
24+
DATABASE_URL = raw_url
2425

25-
# Create async SQLAlchemy engine
26-
engine = create_async_engine(DATABASE_URL, echo=True)
26+
# Create async SQLAlchemy engine
27+
engine = create_async_engine(DATABASE_URL, echo=True)
2728

28-
# Create async session factory
29-
AsyncSessionLocal = sessionmaker(
30-
bind=engine,
31-
expire_on_commit=False,
32-
class_=AsyncSession,
33-
)
29+
# Create async session factory
30+
AsyncSessionLocal = sessionmaker(
31+
bind=engine,
32+
expire_on_commit=False,
33+
class_=AsyncSession,
34+
)
3435

3536

36-
async def get_data(last_id):
3737
async with AsyncSessionLocal() as session:
3838
query = text(
3939
"""
@@ -91,18 +91,18 @@ async def get_data(last_id):
9191
}
9292
)
9393

94-
await insert(payload, "rockd")
94+
await insert(payload, "rockd", db_url)
9595

9696

97-
async def fetch_last_id():
98-
return await get_last_id("rockd")
97+
async def fetch_last_id(db_url):
98+
return await get_last_id("rockd", db_url)
9999

100100

101-
async def fetch_matomo_data(last_id):
102-
await get_data(last_id)
101+
async def fetch_matomo_data(last_id, mariadb_url, db_url):
102+
await get_data(last_id, mariadb_url, db_url)
103103

104104

105-
async def get_rockd_data():
106-
last_id = await fetch_last_id()
107-
await fetch_matomo_data(last_id)
105+
async def get_rockd_data(mariadb_url, db_url):
106+
last_id = await fetch_last_id(db_url)
107+
await fetch_matomo_data(last_id, mariadb_url, db_url)
108108
print("Data fetching completed.")

services/usage-stats/worker.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,39 @@
1+
import os
12
import asyncio
3+
import time
4+
import signal
5+
from dotenv import load_dotenv
26

37
from src.macrostrat import get_macrostrat_data
48
from src.rockd import get_rockd_data
59

6-
7-
async def periodic_task(stop_event: asyncio.Event):
10+
load_dotenv()
11+
12+
def get_env_var(name: str, timeout: int = 30) -> str:
13+
"""
14+
Attempt to retrieve an environment variable `name`.
15+
Waits up to `timeout` seconds, checking every 1 second,
16+
before raising an error if the variable is still not set.
17+
"""
18+
for _ in range(timeout):
19+
value = os.getenv(name)
20+
if value:
21+
print(f"[DEBUG] {name} found: {value}")
22+
return value
23+
print(f"[DEBUG] Waiting for environment variable {name}...")
24+
time.sleep(1)
25+
raise RuntimeError(f"Environment variable {name} is not set after waiting {timeout} seconds")
26+
27+
async def periodic_task(stop_event: asyncio.Event, db_url: str, mariadb_url: str):
828
while not stop_event.is_set():
929
try:
10-
await get_rockd_data()
11-
await get_macrostrat_data()
12-
except Exception:
13-
pass
30+
# Pass MARIADB_URL if get_rockd_data needs it
31+
await get_rockd_data(mariadb_url, db_url)
32+
33+
# Pass DATABASE_URL if get_macrostrat_data needs it
34+
await get_macrostrat_data(mariadb_url, db_url)
35+
except Exception as e:
36+
print(f"Error in periodic_task: {e}")
1437

1538
try:
1639
await asyncio.wait_for(stop_event.wait(), timeout=10)
@@ -20,16 +43,16 @@ async def periodic_task(stop_event: asyncio.Event):
2043

2144
async def main():
2245
stop_event = asyncio.Event()
46+
# Fetch environment variables once at startup
47+
db_url = get_env_var("DATABASE_URL")
48+
mariadb_url = get_env_var("MARIADB_URL")
2349

24-
# Optionally handle graceful shutdown on signals
2550
loop = asyncio.get_running_loop()
2651
for sig in ("SIGINT", "SIGTERM"):
2752
loop.add_signal_handler(getattr(signal, sig), stop_event.set)
2853

29-
await periodic_task(stop_event)
54+
await periodic_task(stop_event, db_url, mariadb_url)
3055

3156

3257
if __name__ == "__main__":
33-
import signal
34-
3558
asyncio.run(main())

0 commit comments

Comments
 (0)