-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
66 lines (48 loc) · 1.72 KB
/
database.py
File metadata and controls
66 lines (48 loc) · 1.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import os
from pathlib import Path
from dotenv import load_dotenv
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
load_dotenv()
def _build_database_url() -> str:
configured_url = os.getenv("DATABASE_URL")
if configured_url:
return configured_url
db_user = os.getenv("DB_USER", "postgres")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")
if db_password and db_host and db_name:
return f"postgresql+asyncpg://{db_user}:{db_password}@{db_host}/{db_name}"
sqlite_path = Path(__file__).resolve().parent / "resume_agent.db"
return f"sqlite+aiosqlite:///{sqlite_path.as_posix()}"
DATABASE_URL = _build_database_url()
engine = create_async_engine(DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
def _build_redis_client():
redis_url = os.getenv("REDIS_URL")
if redis_url:
return redis.from_url(redis_url, decode_responses=True)
redis_host = os.getenv("REDIS_HOST")
if not redis_host:
return None
return redis.Redis(
host=redis_host,
port=int(os.getenv("REDIS_PORT", "6379")),
db=int(os.getenv("REDIS_DB", "0")),
decode_responses=True,
)
redis_client = _build_redis_client()