Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"version": "3.12"
}
},
"containerEnv": {
"TZ": "Europe/Paris"
},
"customizations": {
"vscode": {
"extensions": [
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,7 @@ notebooks/*_processed.ipynb
common/collections/** */.env

# ignore exprts tables.csv files
exports_wdd/*
exports_wdd/*

prefect_flow/data/
prefect_flow/log/
7 changes: 4 additions & 3 deletions bin/odis.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from common.utils.http.async_client import AsyncHttpClient
from common.utils.logging_odis import logger

from pipeline.extract_service import run_extraction
from pipeline.load_service import run_load
# this module is the entry point for the CLI
# it will parse the arguments and execute the requested operation

Expand Down Expand Up @@ -379,7 +381,7 @@ def extract(
)

asyncio.run(
extract_data_sources(config_model, data_sources, max_concurrent_requests=max_concurrent_requests) # type: ignore[call-arg] # noqa: E501
run_extraction(config_model, data_sources, max_concurrent_requests)
)


Expand Down Expand Up @@ -467,8 +469,7 @@ def load(

print(f"\n[blue]Loading data into {ds.name}[/blue]")

loader = create_loader(config_model, ds, handler=FileHandler())
loader.execute()
run_load(config_model, data_sources)

print(f"[blue]Data loaded into {ds.name}[/blue]")

Expand Down
31 changes: 27 additions & 4 deletions common/utils/http/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
retry_if_exception_type,
stop_after_attempt,
stop_after_delay,
wait_exponential,
)

from common.utils.interfaces.http import HttpClient, HttpException
Expand All @@ -30,19 +31,21 @@ def __init__(self, max_connections: int = 100, timeout: int = 1200):
Defaults to 1200 (20 minutes).
"""

conn = aiohttp.TCPConnector(limit=max_connections)
self._max_connections = max_connections
self._timeout = aiohttp.ClientTimeout(total=timeout)
self._session = aiohttp.ClientSession(connector=conn, timeout=self._timeout)
self.base_url = None
self._session = None

logger.debug(
f"AsyncHttpClient initialized with max_connections={max_connections}, timeout={timeout}s"
)

@retry(
retry=retry_if_exception_type(aiohttp.ClientError),
stop=(stop_after_delay(2400) | stop_after_attempt(3)),
stop=(stop_after_delay(2400) | stop_after_attempt(10)),
wait=wait_exponential(multiplier=1, min=10, max=120),
before=before_log(logger, logging.DEBUG),
reraise=True,
reraise=True,
)
async def get(
self,
Expand Down Expand Up @@ -79,3 +82,23 @@ async def get(
except aiohttp.ContentTypeError as e:
logger.error(f"Failed to parse response: {e}")
raise HttpException(f"Failed to parse response from {url}: {e}") from e

async def __aenter__(self):
conn = aiohttp.TCPConnector(limit=self._max_connections)

kwargs = {"connector": conn, "timeout": self._timeout}

if hasattr(self, "base_url") and self.base_url is not None:
if not isinstance(self.base_url, str):
raise ValueError("base_url must be a string")
kwargs["base_url"] = self.base_url

if hasattr(self, "headers") and self.headers:
kwargs["headers"] = self.headers

self._session = aiohttp.ClientSession(**kwargs)
return self

async def __aexit__(self, exc_type, exc, tb):
if self._session and not self._session.closed:
await self._session.close()
1 change: 1 addition & 0 deletions common/utils/logging_config.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: 1
disable_existing_loggers: False

loggers:

Expand Down
8 changes: 6 additions & 2 deletions common/utils/logging_odis.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import logging
import logging.config
import os

from pathlib import Path
from common.config import load_config

# Using classic logger for now. TODO : use python-json-logger
# LOG_CONFIG_PATH = "common/utils/logging_config.yml"
LOG_CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'logging_config.yml')

# Create log directory if it doesn't exist
log_dir = Path("log")
log_dir.mkdir(exist_ok=True)

LOG_CONFIG = load_config(LOG_CONFIG_PATH)

logging.config.dictConfig(LOG_CONFIG)
logger = logging.getLogger("main")
logger = logging.getLogger("main")
9 changes: 9 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ services:
ports:
- "5432:5432"

postgres:
image: postgres:15
environment:
POSTGRES_USER: prefect
POSTGRES_PASSWORD: prefect
POSTGRES_DB: prefect
ports:
- "5433:5432"

pgadmin:
image: dpage/pgadmin4
container_name: pgadmin
Expand Down
4 changes: 4 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from prefect_flow.flow import full_pipeline

if __name__ == "__main__":
full_pipeline("config.yaml")
File renamed without changes.
50 changes: 50 additions & 0 deletions pipeline/extract_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# src/pipeline/extract_service.py

import asyncio
import time
from typing import List
from common.data_source_model import DataSourceModel, DomainModel
from common.utils.factory.extractor_factory import create_extractor
from common.utils.file_handler import FileHandler
from common.utils.http.async_client import AsyncHttpClient
from common.utils.logging_odis import logger
import logging
from typing import List, Optional

async def run_extraction(
config_model: DataSourceModel,
data_sources: List[DomainModel],
max_concurrent_requests: int,
logger_: Optional[logging.Logger] = None,
):
"""
Pure business logic used by both CLI and Prefect.
Extracts all given data sources asynchronously.
"""
logger_ = logger_ or logger # fallback logger "classique"
start_time = time.time()
async with AsyncHttpClient(max_connections=max_concurrent_requests) as http_client:

tasks = []
for ds in data_sources:
logger_.info(f"[extract] preparing extractor for {ds.name}")

extractor = create_extractor(
config_model, ds, http_client=http_client, handler=FileHandler()
)
tasks.append(extractor.execute())

results = await asyncio.gather(*tasks, return_exceptions=True)

# Detect errors
errors = [res for res in results if isinstance(res, Exception)]
if errors:
for err in errors:
logger_.error(f"[extract] error: {err}")

raise RuntimeError(f"{len(errors)} extraction errors occurred")

elapsed = time.time() - start_time
logger_.info(f"[extract] completed in {elapsed:.2f}s for {len(data_sources)} sources")

return results
20 changes: 20 additions & 0 deletions pipeline/load_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# src/pipeline/load_service.py

from typing import List
from common.data_source_model import DataSourceModel, DomainModel
from common.utils.factory.loader_factory import create_loader
from common.utils.file_handler import FileHandler
from common.utils.logging_odis import logger


def run_load(config_model: DataSourceModel, data_sources: List[DomainModel]):
"""
Pure business logic for loading JSON files into PostgreSQL.
Used by both CLI and Prefect.
"""

for ds in data_sources:
logger.info(f"[load] loading {ds.name}")
loader = create_loader(config_model, ds, handler=FileHandler())
loader.execute()
logger.info(f"[load] loaded {ds.name}")
Loading
Loading