-
Notifications
You must be signed in to change notification settings - Fork 954
Expand file tree
/
Copy pathproject_repo.py
More file actions
66 lines (55 loc) · 2.47 KB
/
Copy pathproject_repo.py
File metadata and controls
66 lines (55 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Copyright (C) 2025-2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Callable
from datetime import datetime
import sqlalchemy as sa
from loguru import logger
from sqlalchemy.ext.asyncio.session import AsyncSession
from db.schema import PipelineDB, ProjectDB
from pydantic_models import Project
from repositories.base import BaseRepository
from repositories.mappers import ProjectMapper
from utils.short_uuid import ShortUUID
class ProjectRepository(BaseRepository):
def __init__(self, db: AsyncSession):
super().__init__(db, ProjectDB)
@property
def to_schema(self) -> Callable[[Project], ProjectDB]:
return ProjectMapper.to_schema
@property
def from_schema(self) -> Callable[[ProjectDB], Project]:
return ProjectMapper.from_schema
async def save(self, project: Project) -> Project:
project_schema: ProjectDB = self.to_schema(project)
project_schema.pipeline = PipelineDB(
project_id=project_schema.id,
inference_device="CPU",
)
self.db.add(project_schema)
await self.db.commit()
return project
async def update_dataset_timestamp(self, project_id: str | ShortUUID) -> None:
"""Update the dataset_updated_at timestamp for the given project."""
await self.db.execute(
sa.update(ProjectDB)
.where(ProjectDB.id == str(project_id))
.values(
dataset_updated_at=sa.func.current_timestamp(),
updated_at=sa.func.current_timestamp(),
),
)
logger.info(f"Updated dataset timestamp for project {project_id} to current time.")
await self.db.commit()
async def get_dataset_timestamp(self, project_id: str | ShortUUID) -> datetime:
"""Get the dataset_updated_at timestamp for the given project."""
result = await self.db.execute(sa.select(self.schema.dataset_updated_at).where(ProjectDB.id == str(project_id)))
return result.scalar_one()
async def get_first_project(self) -> Project | None:
"""Get the first project in a deterministic order.
Projects are ordered by creation time, then by ID as a stable tie-breaker.
"""
result = await self.db.execute(
sa.select(ProjectDB).order_by(ProjectDB.created_at.asc(), ProjectDB.id.asc()).limit(1),
)
first_project = result.scalars().first()
return self.from_schema(first_project) if first_project else None