Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion templates/generic/app/activities.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Dict

from app.handler import HandlerClass
from application_sdk.activities import ActivitiesInterface
from application_sdk.observability.logger_adaptor import get_logger
Expand All @@ -11,4 +13,16 @@ class ActivitiesClass(ActivitiesInterface):
def __init__(self, handler: HandlerClass | None = None):
self.handler = handler or HandlerClass()

# Define activities here
@activity.defn
async def get_workflow_args(
self, workflow_config: Dict[str, Any]
) -> Dict[str, Any]:
"""TODO: Process and merge workflow configuration."""
return workflow_config

@activity.defn
async def extract_and_transform_metadata(
self, args: Dict[str, Any]
) -> Dict[str, Any]:
"""TODO: Implement your extraction and transformation logic."""
raise NotImplementedError("Implement this activity")
3 changes: 2 additions & 1 deletion templates/generic/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ async def run(self, workflow_config: Dict[str, Any]) -> None:
)

# Merge any provided args (from frontend POST body or server config)
_workflow_args: Dict[str, Any] = await workflow.execute_activity_method(
workflow_args: Dict[str, Any] = await workflow.execute_activity_method(
activities_instance.get_workflow_args,
workflow_config,
retry_policy=retry_policy,
start_to_close_timeout=timedelta(seconds=10),
)
_ = workflow_args

# Call other activities here
pass
Expand Down
Loading