-
Notifications
You must be signed in to change notification settings - Fork 7
feat: Added sample app for event based workflows #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
747cbcb
c089c38
922dada
05dcbc5
c4c0895
802ba00
7b39c75
bab91c6
e128765
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| # 📨 Events App | ||
|
|
||
| A simple application demonstrating how to build apps with the Atlan Application SDK using events. | ||
|
|
||
|  | ||
|
||
|
|
||
| ## Features | ||
| - Simulates event-driven workflows | ||
| - Real-time workflow status tracking | ||
| - Integration with Temporal for workflow management | ||
| - Demonstrates async and sync activities in a workflow | ||
| - Example of basic workflow implementation | ||
|
|
||
| ## Usage | ||
|
|
||
| > [!NOTE] | ||
| > To run, first see the [main project README](../README.md) for prerequisites. | ||
|
|
||
| ### Run the Events Application | ||
|
|
||
| ```bash | ||
| uv run main.py | ||
| ``` | ||
|
|
||
| ### Access the Application | ||
|
|
||
| Once the application is running: | ||
|
|
||
| - **Temporal UI**: Access the Temporal Web UI at `http://localhost:8233` (or your Temporal UI address) to monitor workflow executions. | ||
|
|
||
| ## Development | ||
|
|
||
| ### Project Structure | ||
| ``` | ||
| . | ||
| ├── activities.py # Workflow activities | ||
| ├── main.py # Application entry point | ||
| ├── workflows.py # Workflow definitions | ||
| ├── subscriber_manifest.json # Subscriber manifest | ||
| └── README.md # This file | ||
| ``` | ||
|
|
||
| ## Learning Resources | ||
| - [Temporal Documentation](https://docs.temporal.io/) | ||
| - [Atlan Application SDK Documentation](https://github.com/atlanhq/application-sdk/tree/main/docs) | ||
| - [Python FastAPI Documentation](https://fastapi.tiangolo.com/) | ||
|
|
||
| ## Contributing | ||
| We welcome contributions! Please feel free to submit a Pull Request. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import asyncio | ||
|
|
||
| from application_sdk.activities import ActivitiesInterface | ||
| from application_sdk.observability.logger_adaptor import get_logger | ||
| from temporalio import activity | ||
|
|
||
|
|
||
| logger = get_logger(__name__) | ||
| activity.logger = logger | ||
|
|
||
|
|
||
| class SampleActivities(ActivitiesInterface): | ||
| @activity.defn | ||
| async def activity_1(self): | ||
| logger.info("Activity 1") | ||
|
|
||
| await asyncio.sleep(5) | ||
|
|
||
| return | ||
|
|
||
| @activity.defn | ||
| async def activity_2(self): | ||
| logger.info("Activity 2") | ||
|
|
||
| await asyncio.sleep(5) | ||
|
|
||
| return | ||
|
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| import asyncio | ||
| import json | ||
| import os | ||
| from datetime import datetime | ||
|
|
||
| from dapr import clients | ||
|
|
||
| from application_sdk.application import BaseApplication | ||
| from application_sdk.clients.utils import get_workflow_client | ||
| from application_sdk.constants import APPLICATION_NAME, PUBSUB_NAME | ||
| from application_sdk.observability.logger_adaptor import get_logger | ||
| from application_sdk.outputs.eventstore import ( | ||
| ApplicationEventNames, | ||
| Event, | ||
| EventMetadata, | ||
| EventTypes, | ||
| WorkflowStates, | ||
| ) | ||
| from application_sdk.worker import Worker | ||
| from events.workflows import SampleWorkflow | ||
| from events.activities import SampleActivities | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
| async def start_worker(): | ||
| workflow_client = get_workflow_client( | ||
| application_name=APPLICATION_NAME, | ||
| ) | ||
| await workflow_client.load() | ||
|
|
||
| activities = SampleActivities() | ||
|
|
||
| worker = Worker( | ||
| workflow_client=workflow_client, | ||
| workflow_activities=SampleWorkflow.get_activities(activities), | ||
| workflow_classes=[SampleWorkflow], | ||
| passthrough_modules=["application_sdk", "os", "pandas"], | ||
| ) | ||
|
|
||
| # Start the worker in a separate thread | ||
| await worker.start(daemon=True) | ||
|
|
||
| async def application_subscriber(): | ||
| # Open the application manifest in the current directory | ||
| application_manifest = json.load( | ||
| open(os.path.join(os.path.dirname(__file__), "subscriber_manifest.json")) | ||
| ) | ||
|
|
||
| # 2 Steps to setup event registration, | ||
| # 1. Setup the event in the manifest, with event name, type and filters => This creates an event trigger | ||
| # 2. Register the event subscription to a workflow => This binds the workflow to the event trigger | ||
|
|
||
| # Initialize the application | ||
| application = BaseApplication( | ||
| name=APPLICATION_NAME, | ||
| application_manifest=application_manifest, # Optional, if the manifest has event registration, it will be bootstrapped | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this optional ?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made the application manifest optional for now since we are only using it for event subscriptions, and event subscriptions are optional |
||
|
|
||
| ) | ||
|
|
||
| # Register the event subscription to a workflow | ||
| application.register_event_subscription("AssetExtractionCompleted", SampleWorkflow) | ||
|
|
||
| # Can also register the events to multiple workflows | ||
| # application.register_event_subscription("ErrorEvent", SampleWorkflow) | ||
|
|
||
| # Setup the workflow is needed to start the worker | ||
| await application.setup_workflow( | ||
| workflow_classes=[SampleWorkflow], activities_class=SampleActivities | ||
| ) | ||
| await application.start_worker() | ||
|
|
||
| await application.setup_server( | ||
| workflow_class=SampleWorkflow, | ||
| ui_enabled=False, | ||
| ) | ||
|
|
||
| await asyncio.gather(application.start_server(), simulate_worklflow_end_event()) | ||
|
|
||
|
|
||
| async def simulate_worklflow_end_event(): | ||
| await asyncio.sleep(15) | ||
|
|
||
| # Simulates that a dependent workflow has ended | ||
| event = Event( | ||
| metadata=EventMetadata( | ||
| workflow_type="AssetExtractionWorkflow", | ||
| workflow_state=WorkflowStates.COMPLETED.value, | ||
| workflow_id="123", | ||
| workflow_run_id="456", | ||
| application_name="AssetExtractionApplication", | ||
| event_published_client_timestamp=int(datetime.now().timestamp()), | ||
| ), | ||
| event_type=EventTypes.APPLICATION_EVENT.value, | ||
| event_name=ApplicationEventNames.WORKFLOW_END.value, | ||
| data={}, | ||
| ) | ||
| with clients.DaprClient() as client: | ||
| client.publish_event( | ||
| pubsub_name=PUBSUB_NAME, | ||
| topic_name=event.get_topic_name(), | ||
| data=json.dumps(event.model_dump(mode="json")), | ||
| data_content_type="application/json", | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(application_subscriber()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| { | ||
| "applicationId": "publish-app", | ||
| "eventRegistration": { | ||
| "consumes": [ | ||
| { | ||
| "eventId": "AssetExtractionCompleted", | ||
| "eventType": "application_event", | ||
| "eventName": "workflow_end", | ||
| "version": "v1", | ||
| "filters": [ | ||
| { | ||
| "path": "event.data.metadata.workflow_type", | ||
| "operator": "==", | ||
| "value": "AssetExtractionWorkflow" | ||
| }, | ||
| { | ||
| "path": "event.data.metadata.workflow_state", | ||
| "operator": "==", | ||
| "value": "completed" | ||
| } | ||
| ] | ||
| } | ||
| ], | ||
|
|
||
| "produces": [ | ||
| { | ||
| "eventId": "PublishAssetCompleted", | ||
| "eventType": "application_event", | ||
| "eventName": "workflow_end" | ||
| } | ||
| ] | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
|
|
||
| from datetime import timedelta | ||
| from typing import Any, Callable, Dict, List, Type, cast | ||
| from application_sdk.outputs.eventstore import Event | ||
|
|
||
| from temporalio import workflow | ||
|
|
||
| from application_sdk.activities import ActivitiesInterface | ||
| from application_sdk.observability.logger_adaptor import get_logger | ||
| from application_sdk.workflows import WorkflowInterface | ||
| from events.activities import SampleActivities | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
|
|
||
| # Workflow that will be triggered by an event | ||
| @workflow.defn | ||
| class SampleWorkflow(WorkflowInterface): | ||
| activities_cls: Type[ActivitiesInterface] = SampleActivities | ||
|
|
||
| @workflow.run | ||
| async def run(self, workflow_config: dict[str, Any]): | ||
| # Get the workflow configuration from the state store | ||
| workflow_args: Dict[str, Any] = await workflow.execute_activity_method( | ||
| self.activities_cls.get_workflow_args, | ||
| workflow_config, # Pass the whole config containing workflow_id | ||
| start_to_close_timeout=self.default_start_to_close_timeout, | ||
| heartbeat_timeout=self.default_heartbeat_timeout, | ||
| ) | ||
|
|
||
| workflow_run_id = workflow.info().run_id | ||
| workflow_args["workflow_run_id"] = workflow_run_id | ||
|
|
||
| # When a workflow is triggered by an event, the event is passed in as a dictionary | ||
| event = Event(**workflow_args["event"]) | ||
|
|
||
| # We can also check the event data to get the workflow name and id | ||
| workflow_type = event.metadata.workflow_type | ||
| workflow_id = event.metadata.workflow_id | ||
|
|
||
| print("workflow_type", workflow_type) | ||
| print("workflow_id", workflow_id) | ||
|
||
|
|
||
| await workflow.execute_activity_method( | ||
| self.activities_cls.activity_1, | ||
| start_to_close_timeout=timedelta(seconds=10), | ||
| heartbeat_timeout=timedelta(seconds=10), | ||
| ) | ||
| await workflow.execute_activity_method( | ||
| self.activities_cls.activity_2, | ||
| start_to_close_timeout=timedelta(seconds=10), | ||
| heartbeat_timeout=timedelta(seconds=10), | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def get_activities(activities: ActivitiesInterface) -> List[Callable[..., Any]]: | ||
| sample_activities = cast(SampleActivities, activities) | ||
|
|
||
| return [ | ||
| sample_activities.activity_1, | ||
| sample_activities.activity_2, | ||
| sample_activities.get_workflow_args, | ||
| ] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,7 +35,7 @@ test = [ | |
| download-components = "python scripts/download_components.py --ref v0.1.1rc10" | ||
|
|
||
| # Dapr and Temporal service tasks | ||
| start-dapr = "dapr run --enable-api-logging --log-level debug --app-id app --app-port 3000 --dapr-http-port 3500 --dapr-grpc-port 50001 --dapr-http-max-request-size 1024 --resources-path components" | ||
| start-dapr = "dapr run --enable-api-logging --log-level debug --app-id app --app-port 8000 --dapr-http-port 3500 --dapr-grpc-port 50001 --dapr-http-max-request-size 1024 --resources-path components" | ||
| start-temporal = "temporal server start-dev --db-filename /tmp/temporal.db" | ||
| start-deps.shell = "poe start-dapr & poe start-temporal &" | ||
| stop-deps.shell = "lsof -ti:3000,3500,7233,50001 | xargs kill -9 2>/dev/null || true" | ||
|
|
@@ -59,4 +59,4 @@ build-backend = "hatchling.build" | |
|
|
||
| [tool.uv.sources] | ||
| # atlan-application-sdk = { path = "../application-sdk", editable = true } | ||
| # atlan-application-sdk = { git = "https://github.com/atlanhq/application-sdk", rev = "28e44136a73e923423fb19a7f3d2a802b75e19c6" } | ||
| atlan-application-sdk = { git = "https://github.com/atlanhq/application-sdk", rev = "079acca9b08bbf8a19f8e3e157a4b92bc79f70c2" } | ||
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed can we have two separate app. event generator app. event consumer app please
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think that'll be ideal - we can demonstrate the concept under events directory itself under single app via two workflows - One workflow triggers another. Add more in the readme around use-cases that this can power via mermaid diagrams (ex: app to app, app to workflow, workflow to workflow and more).
@SanilK2108
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey folks, updated the flow here -
Added a UI, user can go to the UI to start a workflow -
WorkflowTriggeredByUI.When this workflow ends, another workflow
WorkflowTriggeredByEventstarts automatically