中文 | English
Flow Automation is a core service responsible for orchestrating and executing automated data pipelines. It manages the complete lifecycle of data flow executions, integrates with various infrastructure components, and provides comprehensive APIs for data pipeline management.
This project follows a Hexagonal Architecture (Ports and Adapters), separating core business logic from external interfaces:
┌─────────────────────────────────────────────────────────┐
│ Driver Adapters │
│ (HTTP API, Message Queue Consumers) │
│ ┌───────────────────────────────────────────────────┐ │
│ │ Core Business Logic │ │
│ │ (logics/, module/, pkg/) │ │
│ └───────────────────────────────────────────────────┘ │
│ Driven Adapters │
│ (Database, External Services, Cache) │
└─────────────────────────────────────────────────────────┘
logics/: Core business logic layer, containing data pipeline management, executors, triggers, etc.driveradapters/: Driver adapters (inbound), handling HTTP requests, message queue consumption, etc.drivenadapters/: Driven adapters (outbound), interacting with databases, external services, cache, etc.module/: Domain modules, including initialization and configurationpkg/: Shared utility packages and entity definitions
- Create & Edit: Support for creating, updating, and deleting data pipelines (DAGs)
- Version Control: Data pipeline version management
- Execution Management: Start, cancel, and pause pipeline instances
- Debug Mode: Support for single-step and full debugging
- Batch Operations: Batch query and operations on data pipelines
- Executor Management: Create, update, and delete custom executors
- Action Management: Add, modify, and delete actions for executors
- Access Control: User permission-based executor access control
- Agent Import: Support for importing agents from app store
- Scheduled Triggers: Cron-based scheduled task triggers
- Event Triggers: Support for external event-driven data pipeline execution
- Manual Triggers: Support for user-initiated data pipeline execution
- Callback Handling: Process asynchronous task callbacks
- Token Authentication: OAuth2 authentication via Hydra
- Permission Verification: Fine-grained resource access control
- Business Domain Isolation: Multi-tenant business domain isolation
- Minimal Dependency Mode: Supports disabling ISF dependencies through
auth.enabled/SERVER_AUTH_ENABLED; in this mode the service can run without Hydra, Authentication, Authorization, and User Management, but permission and business-domain isolation are bypassed
- Access Control: API access security policies
- Data Isolation: Tenant data isolation
- Audit Logging: Operation audit trails
- Metrics Monitoring: Prometheus metrics exposure
- Distributed Tracing: OpenTelemetry distributed tracing
- Log Management: Structured logging and streaming logs
- Health Checks: Service health status monitoring
- Data Connections: Manage data source connection configurations
- Data Transformation: Support for data flow transformation and processing
- Operator Registration: Register and manage data processing operators
- Operator Execution: Execute various data processing operators
- Language: Go 1.24.0
- Web Framework: Gin
- Databases:
- MongoDB (data pipeline definitions and instance storage)
- MariaDB/MySQL (relational data)
- Redis (caching and distributed locks)
- Message Queue: Kafka
- Configuration: Viper, godotenv
- Observability: OpenTelemetry, Prometheus
- Container Orchestration: Kubernetes (Helm Charts)
- Go 1.24+
- Docker & Docker Compose (for local infrastructure)
For detailed local development setup instructions, including running infrastructure and mock services, please refer to LOCAL_DEVELOPMENT.md.
-
Start Dependencies:
docker-compose up -d
-
Run Application:
go run main.go
The application will start two services:
- Public API Server:
http://localhost:8082 - Private API Server:
http://localhost:8083
The application uses environment variables for configuration:
.env: Default configuration (committed to Git).env.local: Local development overrides (gitignored)
Key configuration items:
API_SERVER_PORT: Public API port (default 8082)API_SERVER_PRIVATE_PORT: Private API port (default 8083)DB_TYPE(dbType/db_type): Selects the workflow metadata storage backend (mongodbormysql).mongodb: Uses MongoDB to store DAG definitions, execution instances, and other document-style workflow data.mysql: Uses the MySQL/MariaDB-based repository implementation for workflow metadata after relational storage deployment or migration. If the service previously ran with MongoDB storage, migrate the historical workflow data before switching tomysql, for example by runningpython3 migrations/mariadb/0.4.0/pre/02-mongodb_to_mysql_migration.py.
SERVER_AUTH_ENABLED: Controls whether auth-related ISF dependencies are enabled forflow-automation.true(default): Uses real Hydra, Authentication, Authorization, and User Management services.false: Uses built-in mock adapters so the service can run in a minimal-dependency mode without ISF.- In Helm deployments, this is managed by
auth.enabled. - This mode is intended for local development, demos, or minimal-dependency deployments, and it bypasses the original permission and business-domain isolation behavior.
MONGODB_HOST: MongoDB addressREDIS_HOST: Redis addressKAFKA_BROKERS: Kafka cluster addresses
For detailed design and scope of this change, see #273 Dataflow decoupling ISF.
.
├── common/ # Common utilities and constants
├── conf/ # Configuration files
├── docs/ # Documentation
├── drivenadapters/ # Outbound adapters (DB, external APIs)
├── driveradapters/ # Inbound adapters (HTTP, Event Consumers)
│ ├── admin/ # Admin interfaces
│ ├── alarm/ # Alarm interfaces
│ ├── auth/ # Authentication interfaces
│ ├── executor/ # Executor interfaces
│ ├── mgnt/ # Data pipeline management interfaces
│ ├── trigger/ # Trigger interfaces
│ └── ...
├── helm/ # Helm deployment charts
├── logics/ # Business logic
│ ├── mgnt/ # Data pipeline management logic
│ ├── executor/ # Executor logic
│ ├── cronjob/ # Scheduled task logic
│ └── ...
├── mock-server/ # Mock services for local development
├── module/ # Domain modules
├── pkg/ # Shared packages
│ ├── actions/ # Action definitions
│ ├── entity/ # Entity definitions
│ └── ...
├── resource/ # Static resources
├── schema/ # Database schemas
├── scripts/ # Helper scripts
├── store/ # Data storage interfaces
└── utils/ # General utilities
Main API endpoints:
POST /api/automation/v1/dags- Create data pipelinePUT /api/automation/v1/dags/:id- Update data pipelineGET /api/automation/v1/dags/:id- Get data pipeline detailsDELETE /api/automation/v1/dags/:id- Delete data pipelineGET /api/automation/v1/dags- List data pipelines
POST /api/automation/v1/dags/:id/run- Execute data pipelinePOST /api/automation/v1/dags/:id/cancel- Cancel executionGET /api/automation/v1/dags/:id/instances- Query execution instances
POST /api/automation/v1/executors- Create executorPUT /api/automation/v1/executors/:id- Update executorGET /api/automation/v1/executors- List executors
The project uses Helm for Kubernetes deployment:
cd helm/flow-automation
helm install flow-automation . -f values.yaml- Code Style: Follow Go standard code conventions, use
golangci-lintfor code checking - Testing: Use
go testto run unit tests - Mock Generation: Use
go.uber.org/mockto generate test mocks