Skip to content

Conversation

@Harshal6927
Copy link
Member

  • Verify service docs

@Harshal6927 Harshal6927 requested review from a team as code owners December 5, 2025 04:30
Comment on lines 49 to 51
title: str | None = None
content: str | None = None
published: bool | None = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to use Optional since we still need to show 3.9 support here.

Comment on lines 129 to 131
data = data_dict
post = await self.to_model(data, "create")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data = data_dict
post = await self.to_model(data, "create")
post = await self.to_model(data_dict, "create")

# Override update behavior to handle tags
async def update(
self,
data: ModelDictT[Post],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably make the override mirror the service spec properly so there's no linting issues on the copy/paste

Comment on lines 144 to 168
async def update(
self,
data: ModelDictT[Post],
item_id: Any | None = None,
**kwargs,
**kwargs: Any,
) -> Post:
"""Update a post with tags, if provided."""
tags_updated: list[str] = []
if isinstance(data, dict):
tags_updated.extend(data.pop("tags", None) or [])
data["id"] = item_id
data = await self.to_model(data, "update")
existing_tags = [tag.name for tag in data.tags]
tags_to_remove = [tag for tag in data.tags if tag.name not in tags_updated]
tags_to_add = [tag for tag in tags_updated if tag not in existing_tags]
data_dict = schema_dump(data)
tags_updated = data_dict.pop("tags", [])
# Determine the effective item_id - either from parameter or from the data itself
effective_item_id = item_id if item_id is not None else data_dict.get("id")
# Get existing post to access current tags
if effective_item_id is not None:
existing_post = await self.get(effective_item_id)
existing_tags = [tag.name for tag in existing_post.tags]
else:
existing_tags = []
post = await self.to_model(data_dict, "update")
if tags_updated:
tags_updated = [schema_dump(tag) for tag in tags_updated]
# Determine tags to remove and add
tags_to_remove = [tag for tag in post.tags if tag.name not in [t["name"] for t in tags_updated]]
tags_to_add = [tag for tag in tags_updated if tag["name"] not in existing_tags]
for tag_rm in tags_to_remove:
data.tags.remove(tag_rm)
data.tags.extend(
post.tags.remove(tag_rm)
post.tags.extend(
[
await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
for tag_text in tags_to_add
await Tag.as_unique_async(self.repository.session, name=tag["name"], slug=slugify(tag["name"]))
for tag in tags_to_add
],
)
return await super().update(
data=data,
item_id=item_id,
**kwargs,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be remove or modified to mirror exactly how the TeamService works in fullstack. Here's the service for reference:

class TeamService(service.SQLAlchemyAsyncRepositoryService[m.Team]):
    """Team Service."""

    class Repo(repository.SQLAlchemyAsyncSlugRepository[m.Team]):
        """Team Repository."""

        model_type = m.Team

    repository_type = Repo
    match_fields = ["name"]

    async def to_model_on_create(self, data: ModelDictT[m.Team]) -> ModelDictT[m.Team]:
        data = service.schema_dump(data)
        data = await self._populate_slug(data)
        return await self._populate_with_owner_and_tags(data, "create")

    async def to_model_on_update(self, data: ModelDictT[m.Team]) -> ModelDictT[m.Team]:
        data = service.schema_dump(data)
        data = await self._populate_slug(data)
        return await self._populate_with_owner_and_tags(data, "update")

    async def to_model_on_upsert(self, data: ModelDictT[m.Team]) -> ModelDictT[m.Team]:
        data = service.schema_dump(data)
        data = await self._populate_slug(data)
        return await self._populate_with_owner_and_tags(data, "upsert")

    @staticmethod
    def can_view_all(user: m.User) -> bool:
        return bool(
            user.is_superuser
            or any(
                assigned_role.role.name
                for assigned_role in user.roles
                if assigned_role.role.name == constants.SUPERUSER_ACCESS_ROLE
            ),
        )

    async def _populate_slug(self, data: ModelDictT[m.Team]) -> ModelDictT[m.Team]:
        if service.is_dict_without_field(data, "slug") and service.is_dict_with_field(data, "name"):
            data["slug"] = await self.repository.get_available_slug(data["name"])
        return data

    async def _populate_with_owner_and_tags(
        self,
        data: ModelDictT[m.Team],
        operation: str | None,
    ) -> ModelDictT[m.Team]:
        if service.is_dict(data):
            owner_id: UUID | None = data.pop("owner_id", None)
            owner: m.User | None = data.pop("owner", None)
            input_tags: list[str] = data.pop("tags", [])
            data["id"] = data.get("id", uuid4())
            data = await super().to_model(data)
            if input_tags:
                existing_tags = [tag.name for tag in data.tags]
                tags_to_remove = [tag for tag in data.tags if tag.name not in input_tags]
                tags_to_add = [tag for tag in input_tags if tag not in existing_tags]
                for tag_rm in tags_to_remove:
                    data.tags.remove(tag_rm)
                data.tags.extend(
                    [
                        await m.Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
                        for tag_text in tags_to_add
                    ],
                )
            if owner or owner_id:
                for member in data.members:
                    if member.user_id == owner.id if owner is not None else owner_id and not member.is_owner:
                        member.is_owner = True
                        member.role = m.TeamRoles.ADMIN
                        break
                else:
                    owner_data: dict[str, Any] = {"user": owner} if owner else {"user_id": owner_id}
                    data.members.append(m.TeamMember(**owner_data, role=m.TeamRoles.ADMIN, is_owner=True))
        return data

Comment on lines 185 to 196
async def publish_post(
self,
post_id: int,
publish: bool = True,
) -> PostResponse:
"""Publish or unpublish a post with timestamp."""
data = PostUpdate(
published=publish,
published_at=datetime.datetime.utcnow() if publish else None,
)
post = await self.repository.update(
"""Publish or unpublish a post."""
post = await self.update(
item_id=post_id,
data=data,
data={"published": publish},
auto_commit=True,
)
return self.to_schema(post, schema_type=PostResponse)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we want to demonstrate here exactly? this looks like all the default behavior you can do out of the box.

We should consider removing it to reduce confusion if there's nothing new.

Comment on lines 204 to 212
async def to_model(self, data: ModelDictT[Post], operation: str | None = None) -> Post:
"""Convert a dictionary, msgspec Struct, or Pydantic model to a Post model. """
if (is_msgspec_struct(data) or is_pydantic_model(data)) and operation in {"create", "update"} and data.slug is None:
data.slug = await self.repository.get_available_slug(data.name)
if is_dict(data) and "slug" not in data and operation == "create":
data["slug"] = await self.repository.get_available_slug(data["name"])
if is_dict(data) and "slug" not in data and "name" in data and operation == "update":
data["slug"] = await self.repository.get_available_slug(data["name"])
return await super().to_model(data, operation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we remove the slug example? this seems beneficial. Here's the TagsService for reference:

class TagService(service.SQLAlchemyAsyncRepositoryService[m.Tag]):
    """Handles basic lookup operations for an Tag."""

    class Repo(repository.SQLAlchemyAsyncSlugRepository[m.Tag]):
        """Tag Repository."""

        model_type = m.Tag

    repository_type = Repo
    match_fields = ["name"]

    async def to_model_on_create(self, data: service.ModelDictT[m.Tag]) -> service.ModelDictT[m.Tag]:
        data = service.schema_dump(data)
        if service.is_dict_without_field(data, "slug"):
            data["slug"] = await self.repository.get_available_slug(data["name"])
        return data

    async def to_model_on_update(self, data: service.ModelDictT[m.Tag]) -> service.ModelDictT[m.Tag]:
        data = service.schema_dump(data)
        if service.is_dict_without_field(data, "slug") and service.is_dict_with_field(data, "name"):
            data["slug"] = await self.repository.get_available_slug(data["name"])
        return data

    async def to_model_on_upsert(self, data: service.ModelDictT[m.Tag]) -> service.ModelDictT[m.Tag]:
        data = service.schema_dump(data)
        if service.is_dict_without_field(data, "slug") and (tag_name := data.get("name")) is not None:
            data["slug"] = await self.repository.get_available_slug(tag_name)
        return data

Comment on lines 294 to 295
post_data = post_service.to_schema(rows[0][0], schema_type=PostSchema)
posts_list = post_service.to_schema([row[0] for row in rows], schema_type=PostSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure 2 level of arrays is the best thing to demo here.

@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants