Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions services/usage-stats/src/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine

load_dotenv()

raw_url = os.getenv("DATABASE_URL")
async def insert(payload=None, table_name=None, db_url=None):
load_dotenv()

# Ensure it uses asyncpg
if raw_url.startswith("postgresql://"):
raw_url = raw_url.replace("postgresql://", "postgresql+asyncpg://", 1)
else:
raise ValueError(
"Invalid DATABASE_URL: must start with postgresql:// or postgresql+asyncpg://"
)
raw_url = db_url

DATABASE_URL = raw_url
# Ensure it uses asyncpg
if raw_url.startswith("postgresql://"):
raw_url = raw_url.replace("postgresql://", "postgresql+asyncpg://", 1)
else:
raise ValueError(
"Invalid DATABASE_URL: must start with postgresql:// or postgresql+asyncpg://"
)

engine = create_async_engine(DATABASE_URL, echo=True)
DATABASE_URL = raw_url

engine = create_async_engine(DATABASE_URL, echo=True)

async def insert(payload=None, table_name=None):
async with engine.connect() as conn:
if payload is None:
print("No payload provided")
Expand Down
24 changes: 12 additions & 12 deletions services/usage-stats/src/last_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine

load_dotenv()

raw_url = os.getenv("DATABASE_URL")
async def get_last_id(table_name=None, db_url=None):
load_dotenv()

# Ensure it uses asyncpg
if raw_url.startswith("postgresql://"):
raw_url = raw_url.replace("postgresql://", "postgresql+asyncpg://", 1)
else:
raise ValueError(
"Invalid DATABASE_URL: must start with postgresql:// or postgresql+asyncpg://"
)
raw_url = db_url

DATABASE_URL = raw_url
# Ensure it uses asyncpg
if raw_url.startswith("postgresql://"):
raw_url = raw_url.replace("postgresql://", "postgresql+asyncpg://", 1)
else:
raise ValueError(
"Invalid DATABASE_URL: must start with postgresql:// or postgresql+asyncpg://"
)

engine = create_async_engine(DATABASE_URL, echo=True)
DATABASE_URL = raw_url

engine = create_async_engine(DATABASE_URL, echo=True)

async def get_last_id(table_name=None):
async with engine.connect() as conn:
if table_name is None:
print("No table name provided")
Expand Down
42 changes: 21 additions & 21 deletions services/usage-stats/src/macrostrat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,30 @@
from src.insert import insert
from src.last_id import get_last_id

BATCH_SIZE = 1000 # Adjust as needed

raw_url = os.getenv("MARIADB_URL")
async def get_data(last_id, mariadb_url, db_url):
BATCH_SIZE = 1000 # Adjust as needed

# Ensure the URL uses asyncmy driver
if raw_url.startswith("mysql://"):
raw_url = raw_url.replace("mysql://", "mysql+asyncmy://", 1)
else:
raise ValueError(
"Invalid DATABASE_URL: must start with mysql:// or mysql+asyncmy://"
)
raw_url = mariadb_url

DATABASE_URL = raw_url
# Ensure the URL uses asyncmy driver
if raw_url.startswith("mysql://"):
raw_url = raw_url.replace("mysql://", "mysql+asyncmy://", 1)
else:
raise ValueError(
"Invalid DATABASE_URL: must start with mysql:// or mysql+asyncmy://"
)

# Create async engine
engine = create_async_engine(DATABASE_URL, echo=True)
DATABASE_URL = raw_url

# Async session factory
AsyncSessionLocal = sessionmaker(
bind=engine, expire_on_commit=False, class_=AsyncSession
)
# Create async engine
engine = create_async_engine(DATABASE_URL, echo=True)

# Async session factory
AsyncSessionLocal = sessionmaker(
bind=engine, expire_on_commit=False, class_=AsyncSession
)

async def get_data(last_id):
async with AsyncSessionLocal() as session:
query = text(
"""
Expand Down Expand Up @@ -70,10 +70,10 @@ async def get_data(last_id):
for row in rows
]

await insert(payload, "macrostrat")
await insert(payload, "macrostrat", db_url)


async def get_macrostrat_data():
last_id = await get_last_id("macrostrat")
await get_data(last_id)
async def get_macrostrat_data(mariadb_url, db_url):
last_id = await get_last_id("macrostrat", db_url)
await get_data(last_id, mariadb_url, db_url)
print("Data fetching completed.")
54 changes: 27 additions & 27 deletions services/usage-stats/src/rockd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,32 @@
from src.insert import insert
from src.last_id import get_last_id

BATCH_SIZE = 1000 # Adjust batch size as needed

raw_url = os.getenv("MARIADB_URL")
async def get_data(last_id, mariadb_url, db_url):
BATCH_SIZE = 1000 # Adjust batch size as needed

# Ensure the URL uses asyncmy driver
if raw_url.startswith("mysql://"):
raw_url = raw_url.replace("mysql://", "mysql+asyncmy://", 1)
else:
raise ValueError(
"Invalid DATABASE_URL: must start with mysql:// or mysql+asyncmy://"
)
raw_url = mariadb_url

DATABASE_URL = raw_url
# Ensure the URL uses asyncmy driver
if raw_url.startswith("mysql://"):
raw_url = raw_url.replace("mysql://", "mysql+asyncmy://", 1)
else:
raise ValueError(
"Invalid DATABASE_URL: must start with mysql:// or mysql+asyncmy://"
)

# Create async SQLAlchemy engine
engine = create_async_engine(DATABASE_URL, echo=True)
DATABASE_URL = raw_url

# Create async session factory
AsyncSessionLocal = sessionmaker(
bind=engine,
expire_on_commit=False,
class_=AsyncSession,
)
# Create async SQLAlchemy engine
engine = create_async_engine(DATABASE_URL, echo=True)

# Create async session factory
AsyncSessionLocal = sessionmaker(
bind=engine,
expire_on_commit=False,
class_=AsyncSession,
)

async def get_data(last_id):
async with AsyncSessionLocal() as session:
query = text(
"""
Expand Down Expand Up @@ -91,18 +91,18 @@ async def get_data(last_id):
}
)

await insert(payload, "rockd")
await insert(payload, "rockd", db_url)


async def fetch_last_id():
return await get_last_id("rockd")
async def fetch_last_id(db_url):
return await get_last_id("rockd", db_url)


async def fetch_matomo_data(last_id):
await get_data(last_id)
async def fetch_matomo_data(last_id, mariadb_url, db_url):
await get_data(last_id, mariadb_url, db_url)


async def get_rockd_data():
last_id = await fetch_last_id()
await fetch_matomo_data(last_id)
async def get_rockd_data(mariadb_url, db_url):
last_id = await fetch_last_id(db_url)
await fetch_matomo_data(last_id, mariadb_url, db_url)
print("Data fetching completed.")
45 changes: 36 additions & 9 deletions services/usage-stats/worker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,43 @@
import asyncio
import os
import signal
import time

from dotenv import load_dotenv
from src.macrostrat import get_macrostrat_data
from src.rockd import get_rockd_data

load_dotenv()

async def periodic_task(stop_event: asyncio.Event):

def get_env_var(name: str, timeout: int = 30) -> str:
"""
Attempt to retrieve an environment variable `name`.
Waits up to `timeout` seconds, checking every 1 second,
before raising an error if the variable is still not set.
"""
for _ in range(timeout):
value = os.getenv(name)
if value:
print(f"[DEBUG] {name} found: {value}")
return value
print(f"[DEBUG] Waiting for environment variable {name}...")
time.sleep(1)
raise RuntimeError(
f"Environment variable {name} is not set after waiting {timeout} seconds"
)


async def periodic_task(stop_event: asyncio.Event, db_url: str, mariadb_url: str):
while not stop_event.is_set():
try:
await get_rockd_data()
await get_macrostrat_data()
except Exception:
pass
# Pass MARIADB_URL if get_rockd_data needs it
await get_rockd_data(mariadb_url, db_url)

# Pass DATABASE_URL if get_macrostrat_data needs it
await get_macrostrat_data(mariadb_url, db_url)
except Exception as e:
print(f"Error in periodic_task: {e}")

try:
await asyncio.wait_for(stop_event.wait(), timeout=10)
Expand All @@ -20,16 +47,16 @@ async def periodic_task(stop_event: asyncio.Event):

async def main():
stop_event = asyncio.Event()
# Fetch environment variables once at startup
db_url = get_env_var("DATABASE_URL")
mariadb_url = get_env_var("MARIADB_URL")

# Optionally handle graceful shutdown on signals
loop = asyncio.get_running_loop()
for sig in ("SIGINT", "SIGTERM"):
loop.add_signal_handler(getattr(signal, sig), stop_event.set)

await periodic_task(stop_event)
await periodic_task(stop_event, db_url, mariadb_url)


if __name__ == "__main__":
import signal

asyncio.run(main())