diff --git a/.gitignore b/.gitignore index 570c8473..3d2de799 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,9 @@ repomix-output.md src/flowerpower/_settings.py .sesskey test.db* +docs/08_jobqueuemanager___pipelinejobqueue_.md +REFACTORING_PLAN.md +GEMINI.md +repomix-output.xml +repomix.config.json +.roo/mcp.json diff --git a/.repomixignore b/.repomixignore deleted file mode 100644 index c63e5bc6..00000000 --- a/.repomixignore +++ /dev/null @@ -1,4 +0,0 @@ -# Add patterns to ignore here, one per line -# Example: -# *.log -# tmp/ diff --git a/.roo/mcp.json b/.roo/mcp.json deleted file mode 100644 index 64aec0e3..00000000 --- a/.roo/mcp.json +++ /dev/null @@ -1,18 +0,0 @@ -// { -// "mcpServers": { -// "conport": { -// "command": "uvx", -// "args": [ -// "--from", -// "context-portal-mcp", -// "conport-mcp", -// "--mode", -// "stdio", -// "--workspace_id", -// "${workspaceFolder}" -// ], -// "disabled": true, -// "alwaysAllow": [] -// } -// } -// } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index caa0bd6c..eecade30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,24 @@ # Changelog +## [0.11.6.20] - 2025-08-14 + +### Changes +- Refactor code for improved readability and consistency +- Refactor pipeline management by removing the runner module and updating the manager. Introduce unit tests for the pipeline and flower power project, ensuring proper functionality and validation of methods. +- feat(pipeline): Implement active Pipeline class with execution logic +- Refactor tests and update dependencies + + + +## [0.11.6.20] - 2025-08-14 + +### Changes +- Refactor pipeline management by removing the runner module and updating the manager. Introduce unit tests for the pipeline and flower power project, ensuring proper functionality and validation of methods. +- feat(pipeline): Implement active Pipeline class with execution logic +- Refactor tests and update dependencies + + + ## [0.11.6.20] - 2025-08-12 ### Changes diff --git a/README.md b/README.md index 994658fa..2176c698 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@
-Welcome to the advanced usage guide for FlowerPower. This document covers more complex configurations and use cases to help you get the most out of the library.
+FlowerPower offers multiple ways to configure your project, ensuring flexibility for different environments and workflows. The configuration is loaded in the following order of precedence:
+.env file.settings.py: A dedicated settings module.anypath.yaml for your project.You can override configuration settings directly in your Python code. This is useful for dynamic adjustments or for settings that are determined at runtime.
+from flowerpower.core.config import settings
+
+# Override the default Redis host
+settings.set('redis.host', 'localhost')
+
+# You can also update nested settings
+settings.set('pipelines.my_pipeline.retries', 3)
+For fine-grained control, you can work directly with PipelineManager and JobQueueManager.
PipelineManager¶The PipelineManager is responsible for loading, validating, and executing data pipelines.
from flowerpower.core.pipeline import PipelineManager
+
+# Initialize the manager
+pipeline_manager = PipelineManager()
+
+# Load a specific pipeline
+pipeline = pipeline_manager.get_pipeline("sales_etl")
+
+# Execute the pipeline
+result = pipeline.run(input_data="path/to/data.csv")
+print(result)
+JobQueueManager¶The JobQueueManager handles job queuing, scheduling, and worker management.
from flowerpower.core.job_queue import JobQueueManager
+
+# Initialize the manager
+job_queue_manager = JobQueueManager()
+
+# Enqueue a job
+job = job_queue_manager.enqueue("my_task", arg1="value1", arg2="value2")
+print(f"Job {job.id} enqueued.")
+
+# Schedule a job to run at a specific time
+job_queue_manager.schedule("my_task", cron="0 0 * * *") # Daily at midnight
+Integrate with popular MLOps and observability tools using adapters.
+FlowerPower uses the library fsspec-utils to provide a unified interface for interacting with different filesystems, including local storage, S3, and GCS. This allows you to switch between storage backends without changing your code.
You can manage workers to process your queued jobs.
+Start a single worker in the foreground:
+ +Start a pool of workers in the background: +
+To stop background workers: +
+FlowerPower supports several scheduling strategies for your jobs:
+0 2 * * *).The FlowerPower plugin flowerpower-io enhances FlowerPower's I/O capabilities, allowing you to connect to various data sources and sinks using a simple plugin architecture.
Supported Types Include:
+To use a plugin, simply specify its type in your pipeline configuration.
+Here are some common issues and how to resolve them:
+redis.host and redis.port settings in your configuration.flowerpower config show command to inspect the loaded configuration and identify any misconfigurations.PYTHONPATH environment variable.Note
+For more detailed information, refer to the API documentation.
+This section provides a comprehensive reference for the FlowerPower Command Line Interface (CLI).
+Initialize a new FlowerPower project.
+This command creates a new FlowerPower project with the necessary directory structure +and configuration files. If no project name is provided, the current directory name +will be used as the project name.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| project_name | +str | +Name of the FlowerPower project to create. If not provided, | +Required | +
| base_dir | +str | +Base directory where the project will be created. If not provided, | +Required | +
| storage_options | +str | +Storage options for filesystem access, as a JSON or dict string | +Required | +
| job_queue_type | +str | +Type of job queue backend to use (rq) | +Required | +
$ flowerpower init --name my-project --base-dir /path/to/projects
+
+# Create a project with RQ as the job queue backend (default)
+Start the Hamilton UI web application.
+This command launches the Hamilton UI, which provides a web interface for +visualizing and interacting with your FlowerPower pipelines. The UI allows you +to explore pipeline execution graphs, view results, and manage jobs.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| port | +str | +Port to run the UI server on | +Required | +
| base_dir | +str | +Base directory where the UI will store its data | +Required | +
| no_migration | +str | +Skip running database migrations on startup | +Required | +
| no_open | +str | +Prevent automatically opening the browser | +Required | +
| settings_file | +str | +Settings profile to use (mini, dev, prod) | +Required | +
| config_file | +str | +Optional custom configuration file path | +Required | +
This section details the commands available under flowerpower job-queue.
Start a worker or worker pool to process jobs.
+This command starts a worker process (or a pool of worker processes) that will +execute jobs from the queue. The worker will continue running until stopped +or can be run in the background.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| background | +str | +Run the worker in the background | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
| num_workers | +str | +Number of worker processes to start (pool mode) | +Required | +
Cancel a job or multiple jobs in the queue.
+This command stops a job from executing (if it hasn't started yet) or signals +it to stop (if already running). Canceling is different from deleting as it +maintains the job history but prevents execution.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| job_id | +str | +ID of the job to cancel (ignored if --all is used) | +Required | +
| all | +str | +Cancel all jobs instead of a specific one | +Required | +
| queue_name | +str | +For RQ only, specifies the queue to cancel jobs from | +Required | +
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
$ flowerpower job-queue cancel-job --all dummy-id
+
+# Cancel all jobs in a specific queue (RQ only)
+$ flowerpower job-queue cancel-job --all dummy-id --queue-name high-priority
+
+# Specify the backend type explicitly
+Cancel a specific schedule.
+Note: This is different from deleting a schedule as it only stops it from running but keeps its configuration.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| schedule_id | +str | +ID of the schedule to cancel | +Required | +
| all | +str | +If True, cancel all schedules | +Required | +
| type | +str | +Type of the job queue (rq) | +Required | +
| name | +str | +Name of the scheduler | +Required | +
| base_dir | +str | +Base directory for the scheduler | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level | +Required | +
Delete a specific job.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| job_id | +str | +ID of the job to delete | +Required | +
| all | +str | +If True, delete all jobs | +Required | +
| queue_name | +str | +Name of the queue (RQ only). If provided and all is True, delete all jobs in the queue | +Required | +
| type | +str | +Type of the job queue (rq) | +Required | +
| name | +str | +Name of the scheduler | +Required | +
| base_dir | +str | +Base directory for the scheduler | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level | +Required | +
Delete a specific schedule.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| schedule_id | +str | +ID of the schedule to delete | +Required | +
| all | +str | +If True, delete all schedules | +Required | +
| type | +str | +Type of the job queue (rq) | +Required | +
| name | +str | +Name of the scheduler | +Required | +
| base_dir | +str | +Base directory for the scheduler | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level | +Required | +
Show all job IDs in the job queue.
+This command displays all job IDs currently in the system, helping you identify +jobs for other operations like getting results, canceling, or deleting jobs.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
$ flowerpower job-queue show-job-ids --type rq
+
+# Show job IDs with a custom scheduler configuration
+Show all schedule IDs in the job queue.
+This command displays all schedule IDs currently in the system, helping you +identify schedules for other operations like pausing, resuming, or deleting schedules.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
$ flowerpower job-queue show-schedule-ids --type rq
+
+# Show schedule IDs with a custom scheduler configuration
+$ flowerpower job-queue show-schedule-ids --name my-scheduler
+
+# Show schedule IDs with debug logging
+Pause a schedule or multiple schedules.
+This command temporarily stops a scheduled job from running while maintaining its +configuration. Paused schedules can be resumed later.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| schedule_id | +str | +ID of the schedule to pause (ignored if --all is used) | +Required | +
| all | +str | +Pause all schedules instead of a specific one | +Required | +
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
$ flowerpower job-queue pause-schedule --all dummy-id
+
+# Note: Schedule pausing is not supported for RQ workers
+Resume a paused schedule or multiple schedules.
+This command restarts previously paused schedules, allowing them to run again according +to their original configuration.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| schedule_id | +str | +ID of the schedule to resume (ignored if --all is used) | +Required | +
| all | +str | +Resume all schedules instead of a specific one | +Required | +
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
$ flowerpower job-queue resume-schedule --all dummy-id
+
+# Note: Schedule resuming is not supported for RQ workers
+
+# Set a specific logging level
+Display detailed information about all jobs in the queue.
+This command shows comprehensive information about jobs including their status, +creation time, execution time, and other details in a user-friendly format.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| type | +str | +Type of job queue backend (rq) | +Required | +
| queue_name | +str | +Name of the queue to show jobs from (RQ only) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
| format | +str | +Output format for the job information | +Required | +
Display detailed information about all schedules.
+This command shows comprehensive information about scheduled jobs including their +timing configuration, status, and other details in a user-friendly format.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
| format | +str | +Output format for the schedule information | +Required | +
Enqueue a pipeline for execution via the job queue.
+This command queues a pipeline for asynchronous execution using the configured +job queue backend (RQ). The job can be executed immediately, after a delay, +or at a specific time.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of the pipeline to enqueue | +Required | +
| base_dir | +str | +Base directory containing pipelines and configurations | +Required | +
| inputs | +str | +Input parameters for the pipeline | +Required | +
| final_vars | +str | +Final variables to request from the pipeline | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
| run_in | +str | +Delay before execution (duration format like '5m', '1h', '30s') | +Required | +
| run_at | +str | +Specific datetime for execution (ISO format) | +Required | +
$ flowerpower job-queue enqueue-pipeline my_pipeline --inputs '{"data_path": "data/file.csv"}'
+
+# Enqueue with delay
+$ flowerpower job-queue enqueue-pipeline my_pipeline --run-in "30m"
+
+# Enqueue for specific time
+Schedule a pipeline for recurring or future execution.
+This command sets up recurring or future execution of a pipeline using cron +expressions or interval-based scheduling via the configured job queue backend.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of the pipeline to schedule | +Required | +
| base_dir | +str | +Base directory containing pipelines and configurations | +Required | +
| cron | +str | +Cron expression for scheduling (e.g., '0 9 * * *' for 9 AM daily) | +Required | +
| interval | +str | +Interval for recurring execution (duration format) | +Required | +
| inputs | +str | +Input parameters for the pipeline | +Required | +
| final_vars | +str | +Final variables to request from the pipeline | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
| schedule_id | +str | +Custom identifier for the schedule | +Required | +
$ flowerpower job-queue schedule-pipeline my_pipeline --cron "0 9 * * *"
+
+# Schedule every 30 minutes
+$ flowerpower job-queue schedule-pipeline my_pipeline --interval "30m"
+
+# Schedule with custom inputs and ID
+$ flowerpower job-queue schedule-pipeline my_pipeline --cron "0 0 * * *" \\
+--inputs '{"env": "prod"}' --schedule-id "nightly-prod"
+Execute a specific job by its ID.
+This command runs a job that has been previously enqueued in the job queue. +The job will be executed immediately regardless of its original schedule.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| job_id | +str | +ID of the job to run | +Required | +
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
List all schedules with detailed status information.
+This command provides enhanced schedule listing showing trigger configuration, +status, next run time, and execution history. This is an enhanced version of +show-schedules with more detailed information.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| type | +str | +Type of job queue backend (rq) | +Required | +
| name | +str | +Name of the scheduler configuration to use | +Required | +
| base_dir | +str | +Base directory for the scheduler configuration | +Required | +
| storage_options | +str | +Storage options as JSON or key=value pairs | +Required | +
| log_level | +str | +Logging level (debug, info, warning, error, critical) | +Required | +
| format | +str | +Output format for the schedule information | +Required | +
| show_status | +str | +Include schedule status information | +Required | +
| show_next_run | +str | +Include next execution time information | +Required | +
$ flowerpower job-queue list-schedules --format json
+
+# List schedules without status information
+$ flowerpower job-queue list-schedules --no-show-status
+
+# List schedules for a specific backend
+This section details the commands available under flowerpower mqtt.
Start an MQTT client to listen to messages on a topic
+The connection to the MQTT broker is established using the provided configuration o a
+MQTT event broker defined in the project configuration file conf/project.yml.
+If not configuration is found, you have to provide the connection parameters,
+such as host, port, username, and password.
The on_message module should contain a function on_message that will be called
+with the message payload as argument.
| Name | +Type | +Description | +Default | +
|---|---|---|---|
| on_message | +str | +Name of the module containing the on_message function | +Required | +
| topic | +str | +MQTT topic to listen to | +Required | +
| base_dir | +str | +Base directory for the module | +Required | +
| host | +str | +MQTT broker host | +Required | +
| port | +str | +MQTT broker port | +Required | +
| username | +str | +MQTT broker username | +Required | +
| password | +str | +MQTT broker password | +Required | +
$ flowerpower mqtt start_listener --on-message my_module --topic my_topic --base-dir /path/to/module
+Run a pipeline on a message
+This command sets up an MQTT listener that executes a pipeline whenever a message is +received on the specified topic. The pipeline can be configured to retry on failure +using exponential backoff with jitter for better resilience.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of the pipeline | +Required | +
| topic | +str | +MQTT topic to listen to | +Required | +
| executor | +str | +Name of the executor | +Required | +
| base_dir | +str | +Base directory for the pipeline | +Required | +
| inputs | +str | +Inputs as JSON or key=value pairs or dict string | +Required | +
| final_vars | +str | +Final variables as JSON or list | +Required | +
| config | +str | +Config for the hamilton pipeline executor | +Required | +
| with_tracker | +str | +Enable tracking with hamilton ui | +Required | +
| with_opentelemetry | +str | +Enable OpenTelemetry tracing | +Required | +
| with_progressbar | +str | +Enable progress bar | +Required | +
| storage_options | +str | +Storage options as JSON, dict string or key=value pairs | +Required | +
| as_job | +str | +Run as a job in the scheduler | +Required | +
| host | +str | +MQTT broker host | +Required | +
| port | +str | +MQTT broker port | +Required | +
| username | +str | +MQTT broker username | +Required | +
| password | +str | +MQTT broker password | +Required | +
| clean_session | +str | +Whether to start a clean session with the broker | +Required | +
| qos | +str | +MQTT Quality of Service level (0, 1, or 2) | +Required | +
| client_id | +str | +Custom MQTT client identifier | +Required | +
| client_id_suffix | +str | +Optional suffix to append to client_id | +Required | +
| config_hook | +str | +Function to process incoming messages into pipeline config | +Required | +
| max_retries | +str | +Maximum number of retry attempts if pipeline execution fails | +Required | +
| retry_delay | +str | +Base delay between retries in seconds | +Required | +
| jitter_factor | +str | +Random factor (0-1) applied to delay for jitter | +Required | +
$ flowerpower mqtt run-pipeline-on-message my_pipeline --topic sensors/data
+
+# Configure retries for resilience
+$ flowerpower mqtt run-pipeline-on-message my_pipeline --topic sensors/data --max-retries 5 --retry-delay 2.0
+
+# Run as a job with custom MQTT settings
+$ flowerpower mqtt run-pipeline-on-message my_pipeline --topic events/process --as-job --qos 2 --host mqtt.example.com
+
+# Use a config hook to process messages
+$ flowerpower mqtt run-pipeline-on-message my_pipeline --topic data/incoming --config-hook process_message
+This section details the commands available under flowerpower pipeline.
Run a pipeline immediately.
+This command executes a pipeline with the specified configuration and inputs. +The pipeline will run synchronously, and the command will wait for completion.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of the pipeline to run | +Required | +
| executor | +str | +Type of executor to use | +Required | +
| base_dir | +str | +Base directory containing pipelines and configurations | +Required | +
| inputs | +str | +Input parameters for the pipeline | +Required | +
| final_vars | +str | +Final variables to request from the pipeline | +Required | +
| config | +str | +Configuration for the Hamilton executor | +Required | +
| cache | +str | +Cache configuration for improved performance | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
| with_adapter | +str | +Configuration for adapters like trackers or monitors | +Required | +
| max_retries | +str | +Maximum number of retry attempts on failure | +Required | +
| retry_delay | +str | +Base delay between retries in seconds | +Required | +
| jitter_factor | +str | +Random factor applied to delay for jitter (0-1) | +Required | +
$ pipeline run my_pipeline --inputs '{"data_path": "data/myfile.csv", "limit": 100}'
+
+# Specify which final variables to calculate
+$ pipeline run my_pipeline --final-vars '["output_table", "summary_metrics"]'
+
+# Configure caching
+$ pipeline run my_pipeline --cache '{"type": "memory", "ttl": 3600}'
+
+# Use a different executor
+$ pipeline run my_pipeline --with-adapter '{"tracker": true, "opentelemetry": true}'
+
+# Set a specific logging level
+Create a new pipeline structure.
+This command creates a new pipeline with the necessary directory structure, +configuration file, and skeleton module file. It prepares all the required +components for you to start implementing your pipeline logic.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name for the new pipeline | +Required | +
| base_dir | +str | +Base directory to create the pipeline in | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
| overwrite | +str | +Whether to overwrite existing pipeline with the same name | +Required | +
Delete a pipeline's configuration and/or module files.
+This command removes a pipeline's configuration file and/or module file from the project. +If neither --cfg nor --module is specified, both will be deleted.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of the pipeline to delete | +Required | +
| base_dir | +str | +Base directory containing the pipeline | +Required | +
| cfg | +str | +Delete only the configuration file | +Required | +
| module | +str | +Delete only the pipeline module | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
Show the DAG (Directed Acyclic Graph) of a pipeline.
+This command generates and displays a visual representation of the pipeline's +execution graph, showing how nodes are connected and dependencies between them.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of the pipeline to visualize | +Required | +
| base_dir | +str | +Base directory containing the pipeline | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
| format | +str | +Output format for the visualization | +Required | +
Save the DAG (Directed Acyclic Graph) of a pipeline to a file.
+This command generates a visual representation of the pipeline's execution graph +and saves it to a file in the specified format.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of the pipeline to visualize | +Required | +
| base_dir | +str | +Base directory containing the pipeline | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
| format | +str | +Output format for the visualization | +Required | +
| output_path | +str | +Custom file path to save the output (defaults to pipeline name) | +Required | +
List all available pipelines in the project.
+This command displays a list of all pipelines defined in the project, +providing an overview of what pipelines are available to run or schedule.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| base_dir | +str | +Base directory containing pipelines | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
| format | +str | +Output format for the list (table, json, yaml) | +Required | +
Show summary information for one or all pipelines.
+This command displays detailed information about pipelines including their +configuration, code structure, and project context. You can view information +for a specific pipeline or get an overview of all pipelines.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of specific pipeline to summarize (all if not specified) | +Required | +
| cfg | +str | +Include configuration details | +Required | +
| code | +str | +Include code/module details | +Required | +
| project | +str | +Include project context information | +Required | +
| base_dir | +str | +Base directory containing pipelines | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
| to_html | +str | +Generate HTML output instead of text | +Required | +
| to_svg | +str | +Generate SVG output (where applicable) | +Required | +
| output_file | +str | +File path to save the output instead of printing to console | +Required | +
Add a hook to a pipeline configuration.
+This command adds a hook function to a pipeline's configuration. Hooks are functions +that are called at specific points during pipeline execution to perform additional +tasks like logging, monitoring, or data validation.
+| Name | +Type | +Description | +Default | +
|---|---|---|---|
| name | +str | +Name of the pipeline to add the hook to | +Required | +
| function_name | +str | +Name of the hook function (must be defined in the pipeline module) | +Required | +
| type | +str | +Type of hook (determines when the hook is called during execution) | +Required | +
| to | +str | +Target node or tag (required for node-specific hooks) | +Required | +
| base_dir | +str | +Base directory containing the pipeline | +Required | +
| storage_options | +str | +Options for storage backends | +Required | +
| log_level | +str | +Set the logging level | +Required | +
$ pipeline add-hook my_pipeline --function validate_inputs --type PRE_RUN
+
+# Add a node-specific hook (executed before a specific node runs)
+$ pipeline add-hook my_pipeline --function validate_data --type NODE_PRE_EXECUTE --to data_processor
+
+# Add a hook for all nodes with a specific tag
+FlowerPower uses a hierarchical configuration system to manage project and pipeline settings. The main configuration classes are:
+ +These classes are designed to be flexible and extensible, allowing you to manage your project's configuration in a clean and organized way.
+Module: flowerpower.cfg.Config
The Config class is the main configuration class that combines project and pipeline settings. It serves as the central configuration manager.
Attributes:
+| Attribute | +Type | +Description | +
|---|---|---|
pipeline |
+PipelineConfig |
+A PipelineConfig object containing pipeline-specific settings. |
+
project |
+ProjectConfig |
+A ProjectConfig object containing project-level settings. |
+
from flowerpower.cfg import Config
+
+# Load default configuration
+config = Config()
+
+# Access project and pipeline settings
+print(config.project.name)
+print(config.pipeline.name)
+Module: flowerpower.cfg.ProjectConfig
The ProjectConfig class manages project-level settings, including job queue and adapter configurations.
Attributes:
+| Attribute | +Type | +Description | +
|---|---|---|
name |
+str |
+The name of the project. | +
job_queue |
+JobQueueConfig |
+A JobQueueConfig object for the job queue settings. |
+
adapter |
+AdapterConfig |
+An AdapterConfig object for the project-level adapter settings. |
+
from flowerpower.cfg import ProjectConfig
+
+# Load project configuration
+project_config = ProjectConfig()
+
+# Access project settings
+print(project_config.name)
+print(project_config.job_queue.type)
+Module: flowerpower.cfg.PipelineConfig
The PipelineConfig class manages pipeline-specific settings, including run settings, scheduling, parameters, and adapter configurations.
Attributes:
+| Attribute | +Type | +Description | +
|---|---|---|
name |
+str |
+The name of the pipeline. | +
run |
+RunConfig |
+A RunConfig object for pipeline execution settings. |
+
schedule |
+ScheduleConfig |
+A ScheduleConfig object for pipeline scheduling. |
+
params |
+dict |
+A dictionary of pipeline parameters. | +
adapter |
+AdapterConfig |
+An AdapterConfig object for pipeline-specific adapter settings. |
+
from flowerpower.cfg import PipelineConfig
+
+# Load pipeline configuration
+pipeline_config = PipelineConfig()
+
+# Access pipeline settings
+print(pipeline_config.name)
+print(pipeline_config.run.executor)
+Module: flowerpower.cfg.ExecutorConfig
Defines the configuration for the pipeline executor (e.g., "local", "threadpool").
+Attributes:
+| Attribute | +Type | +Description | +
|---|---|---|
type |
+str |
+The type of executor (e.g., "local", "threadpool"). | +
config |
+dict |
+A dictionary of executor-specific configurations. | +
from flowerpower.cfg import ExecutorConfig
+
+# Create an ExecutorConfig
+executor_config = ExecutorConfig(type="threadpool", config={"max_workers": 4})
+print(executor_config.type)
+Module: flowerpower.cfg.WithAdapterConfig
Defines settings for using adapters during pipeline execution.
+Attributes:
+| Attribute | +Type | +Description | +
|---|---|---|
adapter_name |
+str |
+The name of the adapter. | +
enabled |
+bool |
+Whether the adapter is enabled. | +
config |
+dict |
+Adapter-specific configurations. | +
from flowerpower.cfg import WithAdapterConfig
+
+# Create a WithAdapterConfig
+adapter_config = WithAdapterConfig(adapter_name="opentelemetry", enabled=True)
+print(adapter_config.enabled)
+Module: flowerpower.cfg.AdapterConfig
A base class for adapter configurations, used for both project and pipeline-level settings.
+Attributes:
+| Attribute | +Type | +Description | +
|---|---|---|
type |
+str |
+The type of adapter. | +
config |
+dict |
+A dictionary of adapter-specific configurations. | +
Module: flowerpower.flowerpower
The FlowerPower class is the main entry point for initializing and interacting with FlowerPower projects. It acts as a factory for FlowerPowerProject instances, allowing users to load existing projects or create new ones.
__new__(cls, name: str | None = None, base_dir: str | None = None, storage_options: dict | BaseStorageOptions | None = {}, fs: AbstractFileSystem | None = None, job_queue_type: str = settings.JOB_QUEUE_TYPE, hooks_dir: str = settings.HOOKS_DIR) -> FlowerPowerProject
+...
+This method is called when you instantiate FlowerPower(). It checks if a project already exists at the specified base_dir and either loads it or initializes a new one.
| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str \| None |
+The name of the project. If None, it defaults to the current directory name. |
+None |
+
base_dir |
+str \| None |
+The base directory where the project will be created or loaded. If None, it defaults to the current working directory. |
+None |
+
storage_options |
+dict \| BaseStorageOptions \| None |
+Storage options for the filesystem. | +{} |
+
fs |
+AbstractFileSystem \| None |
+An fsspec-compatible filesystem instance to use for file operations. | +None |
+
job_queue_type |
+str |
+The type of job queue to use for the project (e.g., "rq"). | +settings.JOB_QUEUE_TYPE |
+
hooks_dir |
+str |
+The directory where the project hooks will be stored. | +settings.HOOKS_DIR |
+
Returns: FlowerPowerProject - An instance of FlowerPowerProject initialized with the new or loaded project.
from flowerpower import FlowerPower
+
+# Initialize or load a project in the current directory
+project = FlowerPower()
+
+# Initialize or load a project with a specific name and job queue type
+project = FlowerPower(name="my-data-project", job_queue_type="rq")
+Module: flowerpower.flowerpower
The FlowerPowerProject class represents an initialized FlowerPower project, providing an interface to manage pipelines, job queues, and project-level settings.
__init__(self, pipeline_manager: PipelineManager, job_queue_manager: JobQueueManager | None = None)
+...
+Initializes a FlowerPowerProject instance. This constructor is typically called internally by FlowerPowerProject.load() or FlowerPowerProject.init().
| Parameter | +Type | +Description | +
|---|---|---|
pipeline_manager |
+PipelineManager |
+An instance of PipelineManager to manage pipelines within this project. |
+
job_queue_manager |
+JobQueueManager \| None |
+An optional instance of JobQueueManager to handle job queue operations. |
+
| Attribute | +Type | +Description | +
|---|---|---|
pipeline_manager |
+PipelineManager |
+Manages pipelines within the project. | +
job_queue_manager |
+JobQueueManager \| None |
+Manages job queue operations, if configured. | +
name |
+str |
+The name of the current project. | +
_base_dir |
+str |
+The base directory of the project. | +
_fs |
+AbstractFileSystem |
+The fsspec-compatible filesystem instance used by the project. | +
_storage_options |
+dict \| Munch \| BaseStorageOptions |
+Storage options for the filesystem. | +
job_queue_type |
+str \| None |
+The type of job queue configured for the project (e.g., "rq"). | +
job_queue_backend |
+Any \| None |
+The backend instance for the job queue, if configured. | +
run(self, name: str, inputs: dict | None = None, final_vars: list[str] | None = None, config: dict | None = None, cache: dict | None = None, executor_cfg: str | dict | ExecutorConfig | None = None, with_adapter_cfg: dict | WithAdapterConfig | None = None, pipeline_adapter_cfg: dict | PipelineAdapterConfig | None = None, project_adapter_cfg: dict | ProjectAdapterConfig | None = None, adapter: dict[str, Any] | None = None, reload: bool = False, log_level: str | None = None, max_retries: int | None = None, retry_delay: float | None = None, jitter_factor: float | None = None, retry_exceptions: tuple | list | None = None, on_success: Callable | tuple[Callable, tuple | None, dict | None] | None = None, on_failure: Callable | tuple[Callable, tuple | None, dict | None] | None = None) -> dict[str, Any]
+...
+Execute a pipeline synchronously and return its results.
+This is a convenience method that delegates to the pipeline manager. It provides the same functionality as self.pipeline_manager.run().
| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name of the pipeline to run. Must be a valid identifier. | ++ |
inputs |
+dict \| None |
+Override pipeline input values. Example: {"data_date": "2025-04-28"} |
+None |
+
final_vars |
+list[str] \| None |
+Specify which output variables to return. Example: ["model", "metrics"] |
+None |
+
config |
+dict \| None |
+Configuration for Hamilton pipeline executor. Example: {"model": "LogisticRegression"} |
+None |
+
cache |
+dict \| None |
+Cache configuration for results. Example: {"recompute": ["node1", "final_node"]} |
+None |
+
executor_cfg |
+str \| dict \| ExecutorConfig \| None |
+Execution configuration, can be: - str: Executor name, e.g. "threadpool", "local" - dict: Raw config, e.g. {"type": "threadpool", "max_workers": 4} - ExecutorConfig: Structured config object |
+None |
+
with_adapter_cfg |
+dict \| WithAdapterConfig \| None |
+Adapter settings for pipeline execution. Example: {"opentelemetry": True, "tracker": False} |
+None |
+
pipeline_adapter_cfg |
+dict \| PipelineAdapterConfig \| None |
+Pipeline-specific adapter settings. Example: {"tracker": {"project_id": "123", "tags": {"env": "prod"}}} |
+None |
+
project_adapter_cfg |
+dict \| ProjectAdapterConfig \| None |
+Project-level adapter settings. Example: {"opentelemetry": {"host": "http://localhost:4317"}} |
+None |
+
adapter |
+dict[str, Any] \| None |
+Custom adapter instance for pipeline Example: {"ray_graph_adapter": RayGraphAdapter()} |
+None |
+
reload |
+bool |
+Force reload of pipeline configuration. | +False |
+
log_level |
+str \| None |
+Logging level for the execution. Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" | +None |
+
max_retries |
+int \| None |
+Maximum number of retries for execution. | +None |
+
retry_delay |
+float \| None |
+Delay between retries in seconds. | +None |
+
jitter_factor |
+float \| None |
+Random jitter factor to add to retry delay | +None |
+
retry_exceptions |
+tuple \| list \| None |
+Exceptions that trigger a retry. | +None |
+
on_success |
+Callable \| tuple[Callable, tuple | None, dict | None] \| None |
+Callback to run on successful pipeline execution. | +None |
+
on_failure |
+Callable \| tuple[Callable, tuple | None, dict | None] \| None |
+Callback to run on pipeline execution failure. | +None |
+
Returns: dict[str, Any] - Pipeline execution results, mapping output variable names to their computed values.
Raises:
+ValueError: If pipeline name doesn't exist or configuration is invalid.ImportError: If pipeline module cannot be imported.RuntimeError: If execution fails due to pipeline or adapter errors.from flowerpower import FlowerPowerProject
+
+project = FlowerPowerProject.load(".")
+
+# Simple execution
+result = project.run("my_pipeline")
+
+# With custom inputs
+result = project.run(
+ "ml_pipeline",
+ inputs={"data_date": "2025-01-01"},
+ final_vars=["model", "metrics"]
+)
+Enqueue a pipeline for execution via the job queue.
+This is a convenience method that delegates to the job queue manager's enqueue_pipeline method. It provides asynchronous pipeline execution.
| Parameter | +Type | +Description | +
|---|---|---|
name |
+str |
+Name of the pipeline to enqueue. | +
*args |
+Any |
+Additional positional arguments for job execution. | +
**kwargs |
+Any |
+Keyword arguments for pipeline execution and job queue options. Supports all parameters from pipeline_manager.run() plus job queue specific options: - run_in: Schedule the job to run after a delay - run_at: Schedule the job to run at a specific datetime - queue_name: Queue to use (for RQ) - timeout: Job execution timeout - retry: Number of retries - result_ttl: Result time to live - ttl: Job time to live |
+
Returns: Job - Job ID or result depending on implementation, or None if job queue not configured.
Raises: RuntimeError: If job queue manager is not configured.
from flowerpower import FlowerPowerProject
+from datetime import datetime
+
+project = FlowerPowerProject.load(".")
+
+# Immediate execution via job queue
+job_id = project.enqueue("my_pipeline", inputs={"date": "today"})
+
+# Delayed execution
+job_id = project.enqueue("my_pipeline", inputs={"date": "today"}, run_in=300)
+
+# Scheduled execution
+job_id = project.enqueue(
+ "my_pipeline",
+ inputs={"date": "today"},
+ run_at=datetime(2025, 1, 1, 9, 0)
+)
+Schedule a pipeline for recurring or future execution.
+This is a convenience method that delegates to the job queue manager's schedule_pipeline method. It provides scheduled pipeline execution.
| Parameter | +Type | +Description | +
|---|---|---|
name |
+str |
+Name of the pipeline to schedule. | +
*args |
+Any |
+Additional positional arguments for scheduling. | +
**kwargs |
+Any |
+Keyword arguments for pipeline execution and scheduling options. Supports all parameters from pipeline_manager.run() plus scheduling options: - cron: Cron expression for recurring execution (e.g., "0 9 * * *") - interval: Time interval for recurring execution (int seconds or dict) - date: Future date for one-time execution (datetime or ISO string) - schedule_id: Unique identifier for the schedule - overwrite: Whether to overwrite existing schedule with same ID |
+
Returns: ScheduledJob - Schedule ID or job ID depending on implementation, or None if job queue not configured.
Raises: RuntimeError: If job queue manager is not configured.
from flowerpower import FlowerPowerProject
+from datetime import datetime, timedelta
+
+project = FlowerPowerProject.load(".")
+
+# Daily schedule with cron
+schedule_id = project.schedule(
+ "daily_metrics",
+ cron="0 9 * * *", # 9 AM daily
+ inputs={"date": "{{ execution_date }}"}
+)
+
+# Interval-based schedule
+schedule_id = project.schedule(
+ "monitoring",
+ interval={"minutes": 15},
+ inputs={"check_type": "health"}
+)
+
+# Future one-time execution
+future_date = datetime.now() + timedelta(days=1)
+schedule_id = project.schedule(
+ "batch_process",
+ date=future_date,
+ inputs={"process_date": "tomorrow"}
+)
+start_worker(self, background: bool = False, queue_names: list[str] | None = None, with_scheduler: bool = True, **kwargs: Any) -> None
+...
+Start a worker process for processing jobs from the queues.
+This is a convenience method that delegates to the job queue manager's start_worker method.
| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
background |
+bool |
+If True, runs the worker in a non-blocking background mode. If False, runs in the current process and blocks until stopped. |
+False |
+
queue_names |
+list[str] \| None |
+List of queue names to process. If None, processes all queues defined in the backend configuration. |
+None |
+
with_scheduler |
+bool |
+Whether to include the scheduler queue for processing scheduled jobs (if supported by the backend). | +True |
+
**kwargs |
+Any |
+Additional worker configuration options specific to the job queue backend. | ++ |
Raises: RuntimeError: If job queue manager is not configured.
from flowerpower import FlowerPowerProject
+
+project = FlowerPowerProject.load(".")
+
+# Start worker in foreground (blocks)
+project.start_worker()
+
+# Start worker in background
+project.start_worker(background=True)
+
+# Start worker for specific queues
+project.start_worker(queue_names=["high_priority", "default"])
+Stop the worker process.
+This is a convenience method that delegates to the job queue manager's stop_worker method.
Raises: RuntimeError: If job queue manager is not configured.
from flowerpower import FlowerPowerProject
+
+project = FlowerPowerProject.load(".")
+project.stop_worker()
+start_worker_pool(self, num_workers: int | None = None, background: bool = False, queue_names: list[str] | None = None, with_scheduler: bool = True, **kwargs: Any) -> None
+...
+Start a pool of worker processes to handle jobs in parallel.
+This is a convenience method that delegates to the job queue manager's start_worker_pool method.
| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
num_workers |
+int \| None |
+Number of worker processes to start. If None, uses CPU count or backend-specific default. |
+None |
+
background |
+bool |
+If True, runs the worker pool in a non-blocking background mode. If False, runs in the current process and blocks until stopped. |
+False |
+
queue_names |
+list[str] \| None |
+List of queue names to process. If None, processes all queues defined in the backend configuration. |
+None |
+
with_scheduler |
+bool |
+Whether to include the scheduler queue for processing scheduled jobs (if supported by the backend). | +True |
+
**kwargs |
+Any |
+Additional worker pool configuration options specific to the job queue backend. | ++ |
Raises: RuntimeError: If job queue manager is not configured.
from flowerpower import FlowerPowerProject
+
+project = FlowerPowerProject.load(".")
+
+# Start worker pool with default number of workers
+project.start_worker_pool()
+
+# Start 4 workers in background
+project.start_worker_pool(num_workers=4, background=True)
+
+# Start worker pool for specific queues
+project.start_worker_pool(
+ num_workers=2,
+ queue_names=["high_priority", "default"]
+)
+Stop all worker processes in the worker pool.
+This is a convenience method that delegates to the job queue manager's stop_worker_pool method.
Raises: RuntimeError: If job queue manager is not configured.
from flowerpower import FlowerPowerProject
+
+project = FlowerPowerProject.load(".")
+project.stop_worker_pool()
+load(cls, base_dir: str | None = None, storage_options: dict | BaseStorageOptions | None = {}, fs: AbstractFileSystem | None = None, log_level: str | None = None) -> "FlowerPowerProject"
+...
+Load an existing FlowerPower project.
+If the project does not exist, it will raise an error.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
base_dir |
+str \| None |
+The base directory of the project. If None, it defaults to the current working directory. |
+None |
+
storage_options |
+dict \| BaseStorageOptions \| None |
+Storage options for the filesystem. | +{} |
+
fs |
+AbstractFileSystem \| None |
+An instance of AbstractFileSystem to use for file operations. |
+None |
+
log_level |
+str \| None |
+The logging level to set for the project. If None, it uses the default log level. |
+None |
+
Returns: FlowerPowerProject - An instance of FlowerPowerProject if the project exists, otherwise None.
Raises: FileNotFoundError: If the project does not exist at the specified base directory.
from flowerpower import FlowerPowerProject
+
+# Load a project from the current directory
+project = FlowerPowerProject.load(".")
+
+# Load a project from a specific path
+project = FlowerPowerProject.load("/path/to/my/project")
+init(cls, name: str | None = None, base_dir: str | None = None, storage_options: dict | BaseStorageOptions | None = {}, fs: AbstractFileSystem | None = None, job_queue_type: str = settings.JOB_QUEUE_TYPE, hooks_dir: str = settings.HOOKS_DIR, log_level: str | None = None) -> "FlowerPowerProject"
+...
+Initialize a new FlowerPower project.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str \| None |
+The name of the project. If None, it defaults to the current directory name. |
+None |
+
base_dir |
+str \| None |
+The base directory where the project will be created. If None, it defaults to the current working directory. |
+None |
+
storage_options |
+dict \| BaseStorageOptions \| None |
+Storage options for the filesystem. | +{} |
+
fs |
+AbstractFileSystem \| None |
+An instance of AbstractFileSystem to use for file operations. |
+None |
+
job_queue_type |
+str |
+The type of job queue to use for the project. | +settings.JOB_QUEUE_TYPE |
+
hooks_dir |
+str |
+The directory where the project hooks will be stored. | +settings.HOOKS_DIR |
+
log_level |
+str \| None |
+The logging level to set for the project. If None, it uses the default log level. |
+None |
+
Returns: FlowerPowerProject - An instance of FlowerPowerProject initialized with the new project.
Raises: FileExistsError: If the project already exists at the specified base directory.
Module: flowerpower.init
The init function is a top-level function that initializes a new FlowerPower project. It is a convenient alias for FlowerPowerProject.init().
init(name: str | None = None, base_dir: str | None = None, storage_options: dict | BaseStorageOptions | None = None, fs: AbstractFileSystem | None = None, job_queue_type: str = settings.JOB_QUEUE_TYPE, hooks_dir: str = settings.HOOKS_DIR)
+Initializes a new FlowerPower project.
+| Parameter | +Type | +Description | +
|---|---|---|
name |
+str | None |
+The name of the project. Defaults to the current directory name. | +
base_dir |
+str | None |
+The base directory for the project. Defaults to the current working directory. | +
storage_options |
+dict | BaseStorageOptions | None |
+Storage options for the filesystem. | +
fs |
+AbstractFileSystem | None |
+An fsspec-compatible filesystem instance. | +
job_queue_type |
+str |
+The type of job queue to use (e.g., "rq"). | +
hooks_dir |
+str |
+The directory for project hooks. | +
Returns: A FlowerPowerProject instance.
Raises: FileExistsError if the project already exists.
```python +from flowerpower import init
+project = init(name="my-new-project", job_queue_type="rq")
+Module: flowerpower.job_queue.JobQueueManager
The JobQueueManager is an abstract base class that defines the interface for job queue operations in FlowerPower. It is responsible for enqueuing, scheduling, and managing jobs.
__init__(self, type: str | None = None, name: str | None = None, base_dir: str | None = None, backend: BaseBackend | None = None, storage_options: dict | None = None, fs: AbstractFileSystem | None = None, **kwargs)
+Initializes the JobQueueManager.
| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
type |
+str \| None |
+The type of job queue backend (e.g., "rq"). | +None |
+
name |
+str \| None |
+The name of the scheduler. | +None |
+
base_dir |
+str \| None |
+The base directory of the project. | +None |
+
backend |
+BaseBackend \| None |
+A backend instance. | +None |
+
storage_options |
+dict \| None |
+Storage options for the filesystem. | +None |
+
fs |
+AbstractFileSystem \| None |
+An fsspec-compatible filesystem instance. | +None |
+
| Attribute | +Type | +Description | +
|---|---|---|
is_worker_running |
+bool |
+Indicates if a worker is currently running. | +
is_scheduler_running |
+bool |
+Indicates if the scheduler is currently running. | +
Enqueues a pipeline for immediate execution.
+| Parameter | +Type | +Description | +
|---|---|---|
name |
+str |
+The name of the pipeline. | +
*args |
+Any |
+Positional arguments for the job. | +
**kwargs |
+Any |
+Keyword arguments for the job. | +
Returns: Job - The enqueued job object.
Raises: ValueError: If the pipeline name is invalid.
from flowerpower.job_queue import JobQueueManager
+
+# Assuming manager is an instance of a concrete JobQueueManager subclass
+job = manager.enqueue_pipeline("my_data_pipeline", data_path="/data/new.csv")
+print(f"Enqueued job: {job.id}")
+Schedules a pipeline for future or recurring execution.
+| Parameter | +Type | +Description | +
|---|---|---|
name |
+str |
+The name of the pipeline. | +
*args |
+Any |
+Positional arguments for the job. | +
**kwargs |
+Any |
+Keyword arguments for the job (e.g., cron_string, interval). |
+
Returns: ScheduledJob - The scheduled job object.
Raises: ValueError: If the pipeline name is invalid or scheduling parameters are insufficient.
from flowerpower.job_queue import JobQueueManager
+
+# Schedule a pipeline to run every day at midnight
+scheduled_job = manager.schedule_pipeline(
+ "daily_report_pipeline",
+ cron_string="0 0 * * *"
+)
+print(f"Scheduled job: {scheduled_job.id}")
+Starts a worker process to process jobs from the queue.
+| Parameter | +Type | +Description | +
|---|---|---|
queue_name |
+str \| list[str] \| None |
+The name(s) of the queue(s) to listen to. Defaults to all queues. | +
**kwargs |
+Any |
+Additional keyword arguments for the worker. | +
Returns: None
Raises: RuntimeError: If the worker fails to start.
from flowerpower.job_queue import JobQueueManager
+
+# Start a worker for a specific queue
+manager.start_worker("high_priority_queue")
+
+# Start a worker for multiple queues
+manager.start_worker(["default", "low_priority"])
+Stops the currently running worker process.
+Returns: None
Raises: RuntimeError: If stopping the worker fails.
start_worker_pool(self, num_workers: int = 1, queue_name: str | list[str] | None = None, **kwargs)
+Starts a pool of worker processes.
+| Parameter | +Type | +Description | +
|---|---|---|
num_workers |
+int |
+The number of worker processes to start. | +
queue_name |
+str \| list[str] \| None |
+The name(s) of the queue(s) for the workers to listen to. Defaults to all queues. | +
**kwargs |
+Any |
+Additional keyword arguments for the worker processes. | +
Returns: None
Raises: RuntimeError: If the worker pool fails to start.
from flowerpower.job_queue import JobQueueManager
+
+# Start a pool of 4 workers
+manager.start_worker_pool(num_workers=4)
+Stops all worker processes in the pool.
+Returns: None
Raises: RuntimeError: If stopping the worker pool fails.
Enqueues a job for immediate, delayed, or scheduled execution.
+| Parameter | +Type | +Description | +
|---|---|---|
func |
+Callable |
+The function to execute. | +
*args |
+Any |
+Positional arguments for the function. | +
**kwargs |
+Any |
+Keyword arguments for the function and job (e.g., job_id, timeout). |
+
Returns: Job - The enqueued job object.
Raises: ValueError: If func is not callable.
from flowerpower.job_queue import JobQueueManager
+
+def my_task(x, y):
+ return x + y
+
+job = manager.enqueue(my_task, 1, 2, job_id="my_sum_job")
+print(f"Enqueued job: {job.id}")
+Enqueues a job to run after a specified delay.
+| Parameter | +Type | +Description | +
|---|---|---|
delay |
+timedelta | int | str |
+The delay before execution. Can be a timedelta object, an integer (seconds), or a string (e.g., "1m" for 1 minute). |
+
func |
+Callable |
+The function to execute. | +
*args |
+Any |
+Positional arguments for the function. | +
**kwargs |
+Any |
+Keyword arguments for the function and job. | +
Returns: Job - The enqueued job object.
Raises: ValueError: If delay is invalid or func is not callable.
from flowerpower.job_queue import JobQueueManager
+from datetime import timedelta
+
+def send_notification(message):
+ print(f"Notification: {message}")
+
+# Enqueue a job to run in 5 minutes
+job = manager.enqueue_in(timedelta(minutes=5), send_notification, "Your report is ready!")
+
+# Enqueue a job to run in 30 seconds (integer delay)
+job = manager.enqueue_in(30, send_notification, "Quick update!")
+
+# Enqueue a job to run in 1 hour (string delay)
+job = manager.enqueue_in("1h", send_notification, "Hourly reminder!")
+Enqueues a job to run at a specific datetime.
+| Parameter | +Type | +Description | +
|---|---|---|
datetime_obj |
+datetime |
+The datetime to execute the job. | +
func |
+Callable |
+The function to execute. | +
*args |
+Any |
+Positional arguments for the function. | +
**kwargs |
+Any |
+Keyword arguments for the function and job. | +
Returns: Job - The enqueued job object.
Raises: ValueError: If datetime_obj is in the past or func is not callable.
from flowerpower.job_queue import JobQueueManager
+from datetime import datetime
+
+def generate_monthly_report(month, year):
+ print(f"Generating report for {month}/{year}")
+
+# Enqueue a job to run at a specific future date and time
+target_time = datetime(2025, 1, 1, 9, 0, 0)
+job = manager.enqueue_at(target_time, generate_monthly_report, 1, 2025)
+add_schedule(self, id: str, func: Callable, cron_string: str | None = None, interval: int | None = None, repeat: int | None = None, enabled: bool = True, **kwargs)
+Schedules a job for repeated or one-time execution.
+| Parameter | +Type | +Description | +
|---|---|---|
id |
+str |
+A unique identifier for the scheduled job. | +
func |
+Callable |
+The function to execute. | +
cron_string |
+str | None |
+A cron string for recurring schedules (e.g., "0 0 * * *" for daily at midnight). | +
interval |
+int | None |
+Interval in seconds for recurring schedules. | +
repeat |
+int | None |
+Number of times to repeat the job. None for infinite. |
+
enabled |
+bool |
+Whether the schedule is active. | +
**kwargs |
+Any |
+Additional keyword arguments for the function and job. | +
Returns: ScheduledJob - The scheduled job object.
Raises: ValueError: If scheduling parameters are invalid or insufficient.
from flowerpower.job_queue import JobQueueManager
+
+def clean_temp_files():
+ print("Cleaning temporary files...")
+
+# Schedule a job to clean temp files every hour
+scheduled_job = manager.add_schedule(
+ id="hourly_cleanup",
+ func=clean_temp_files,
+ interval=3600 # Every hour
+)
+
+# Schedule a job using a cron string (every Monday at 9 AM)
+scheduled_job = manager.add_schedule(
+ id="weekly_summary",
+ func=lambda: print("Generating weekly summary..."),
+ cron_string="0 9 * * MON"
+)
+Gets the result of a completed job.
+| Parameter | +Type | +Description | +
|---|---|---|
job |
+str | Job |
+The job ID or Job object. |
+
delete_result |
+bool |
+If True, deletes the result after retrieval. |
+
Returns: Any - The result of the job execution.
Raises:
+JobNotFinishedError: If the job has not completed yet.JobDoesNotExistError: If the job ID is not found.from flowerpower.job_queue import JobQueueManager
+
+# Assuming 'my_job_id' is the ID of a completed job
+result = manager.get_job_result("my_job_id")
+print(f"Job result: {result}")
+Gets all jobs from specified queues.
+| Parameter | +Type | +Description | +
|---|---|---|
queue_name |
+str | list[str] | None |
+The name of the queue(s). Defaults to all queues. | +
Returns: list[Job] - A list of job objects.
from flowerpower.job_queue import JobQueueManager
+
+# Get all jobs from the default queue
+all_jobs = manager.get_jobs("default")
+
+# Get jobs from multiple queues
+priority_jobs = manager.get_jobs(["high_priority", "medium_priority"])
+Gets all schedules from the scheduler.
+| Parameter | +Type | +Description | +
|---|---|---|
id |
+str | list[str] | None |
+The ID(s) of the schedule(s). Defaults to all schedules. | +
Returns: list[ScheduledJob] - A list of scheduled job objects.
Module: flowerpower.pipeline.PipelineManager
The PipelineManager is the central class for managing pipeline operations in FlowerPower. It provides a unified interface for creating, running, and managing pipelines.
__init__(self, base_dir: str | None = None, storage_options: dict | Munch | BaseStorageOptions | None = None, fs: AbstractFileSystem | None = None, cfg_dir: str | None = None, pipelines_dir: str | None = None, job_queue_type: str = settings.JOB_QUEUE_TYPE, log_level: str | None = None)
+Initializes the PipelineManager.
| Parameter | +Type | +Description | +
|---|---|---|
base_dir |
+str \| None |
+The base directory of the project. Defaults to the current working directory. | +
storage_options |
+dict \| Munch \| BaseStorageOptions \| None |
+Storage options for the filesystem. | +
fs |
+AbstractFileSystem \| None |
+An fsspec-compatible filesystem instance. | +
cfg_dir |
+str \| None |
+The directory for configuration files. | +
pipelines_dir |
+str \| None |
+The directory for pipeline modules. | +
job_queue_type |
+str |
+The type of job queue to use for the project. | +
log_level |
+str \| None |
+The logging level for the manager. | +
Example:
+from flowerpower.pipeline import PipelineManager
+
+# Initialize a manager for the project in the current directory
+manager = PipelineManager()
+| Attribute | +Type | +Description | +
|---|---|---|
registry |
+PipelineRegistry |
+Handles pipeline registration and discovery. | +
scheduler |
+PipelineScheduler |
+Manages job scheduling and execution. | +
visualizer |
+PipelineVisualizer |
+Handles pipeline visualization. | +
io |
+PipelineIOManager |
+Manages pipeline import/export operations. | +
project_cfg |
+ProjectConfig |
+Current project configuration. | +
pipeline_cfg |
+PipelineConfig |
+Current pipeline configuration. | +
pipelines |
+list[str] |
+List of available pipeline names. | +
current_pipeline_name |
+str |
+Name of the currently loaded pipeline. | +
summary |
+dict[str, dict \| str] |
+Summary of all pipelines. | +
_base_dir |
+str |
+The base directory of the project. | +
_fs |
+AbstractFileSystem |
+The filesystem instance used by the manager. | +
_storage_options |
+dict \| Munch \| BaseStorageOptions |
+Storage options for the filesystem. | +
_cfg_dir |
+str |
+The directory for configuration files. | +
_pipelines_dir |
+str |
+The directory for pipeline modules. | +
_project_context |
+FlowerPowerProject \| None |
+Reference to the FlowerPowerProject instance. | +
run(self, name: str, inputs: dict | None = None, final_vars: list[str] | None = None, config: dict | None = None, cache: dict | None = None, executor_cfg: str | dict | ExecutorConfig | None = None, with_adapter_cfg: dict | WithAdapterConfig | None = None, pipeline_adapter_cfg: dict | PipelineAdapterConfig | None = None, project_adapter_cfg: dict | ProjectAdapterConfig | None = None, adapter: dict[str, Any] | None = None, reload: bool = False, log_level: str | None = None, max_retries: int | None = None, retry_delay: float | None = None, jitter_factor: float | None = None, retry_exceptions: tuple | list | None = None, on_success: Callable | tuple[Callable, tuple | None, dict | None] | None = None, on_failure: Callable | tuple[Callable, tuple | None, dict | None] | None = None)
+Execute a pipeline synchronously and return its results.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name of the pipeline to run. Must be a valid identifier. | ++ |
inputs |
+dict \| None |
+Override pipeline input values. Example: {"data_date": "2025-04-28"} |
+None |
+
final_vars |
+list[str] \| None |
+Specify which output variables to return. Example: ["model", "metrics"] |
+None |
+
config |
+dict \| None |
+Configuration for Hamilton pipeline executor. Example: {"model": "LogisticRegression"} |
+None |
+
cache |
+dict \| None |
+Cache configuration for results. Example: {"recompute": ["node1", "final_node"]} |
+None |
+
executor_cfg |
+str \| dict \| ExecutorConfig \| None |
+Execution configuration, can be: - str: Executor name, e.g. "threadpool", "local" - dict: Raw config, e.g. {"type": "threadpool", "max_workers": 4} - ExecutorConfig: Structured config object |
+None |
+
with_adapter_cfg |
+dict \| WithAdapterConfig \| None |
+Adapter settings for pipeline execution. Example: {"opentelemetry": True, "tracker": False} |
+None |
+
pipeline_adapter_cfg |
+dict \| PipelineAdapterConfig \| None |
+Pipeline-specific adapter settings. Example: {"tracker": {"project_id": "123", "tags": {"env": "prod"}}} |
+None |
+
project_adapter_cfg |
+dict \| ProjectAdapterConfig \| None |
+Project-level adapter settings. Example: {"opentelemetry": {"host": "http://localhost:4317"}} |
+None |
+
adapter |
+dict[str, Any] \| None |
+Custom adapter instance for pipeline Example: {"ray_graph_adapter": RayGraphAdapter()} |
+None |
+
reload |
+bool |
+Force reload of pipeline configuration. | +False |
+
log_level |
+str \| None |
+Logging level for the execution. Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" | +None |
+
max_retries |
+int \| None |
+Maximum number of retries for execution. | +None |
+
retry_delay |
+float \| None |
+Delay between retries in seconds. | +None |
+
jitter_factor |
+float \| None |
+Random jitter factor to add to retry delay | +None |
+
retry_exceptions |
+tuple \| list \| None |
+Exceptions that trigger a retry. | +None |
+
on_success |
+Callable \| tuple[Callable, tuple \| None, dict \| None] \| None |
+Callback to run on successful pipeline execution. | +None |
+
on_failure |
+Callable \| tuple[Callable, tuple \| None, dict \| None] \| None |
+Callback to run on pipeline execution failure. | +None |
+
Returns: dict[str, Any] - Pipeline execution results, mapping output variable names to their computed values.
Raises:
+ValueError: If pipeline name doesn't exist or configuration is invalid.ImportError: If pipeline module cannot be imported.RuntimeError: If execution fails due to pipeline or adapter errors.from flowerpower.pipeline import PipelineManager
+
+manager = PipelineManager()
+
+# Simple execution
+result = manager.run("my_pipeline")
+
+# With custom inputs
+result = manager.run(
+ "ml_pipeline",
+ inputs={"data_date": "2025-01-01"},
+ final_vars=["model", "metrics"]
+)
+Create a new pipeline with the given name.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name for the new pipeline. Must be a valid Python identifier. | ++ |
overwrite |
+bool |
+Whether to overwrite existing pipeline with same name. | +False |
+
Returns: None
Raises:
+ValueError: If name is invalid or pipeline exists and overwrite=False.RuntimeError: If file creation fails.PermissionError: If lacking write permissions.from flowerpower.pipeline import PipelineManager
+
+# Create new pipeline
+manager = PipelineManager()
+manager.new("data_transformation")
+
+# Overwrite existing pipeline
+manager.new("data_transformation", overwrite=True)
+Delete an existing pipeline.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name of the pipeline to delete. | ++ |
Returns: None
Raises:
+FileNotFoundError: If the pipeline does not exist.RuntimeError: If deletion fails.from flowerpower.pipeline import PipelineManager
+
+manager = PipelineManager()
+manager.delete("old_pipeline")
+Display a summary of all available pipelines.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
format |
+str |
+Output format for the list ("table", "json", "yaml"). | +"table" |
+
Returns: None
from flowerpower.pipeline import PipelineManager
+
+manager = PipelineManager()
+
+# Show pipelines in table format (default)
+manager.show_pipelines()
+
+# Show pipelines in JSON format
+manager.show_pipelines(format="json")
+Add a hook to a specific pipeline.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name of the pipeline to add the hook to. | ++ |
type |
+HookType |
+Type of the hook (e.g., HookType.MQTT_BUILD_CONFIG). |
++ |
to |
+str |
+Destination of the hook (e.g., "mqtt"). | ++ |
function_name |
+str |
+Name of the function to be called as the hook. | ++ |
Returns: None
Raises:
+ValueError: If the pipeline does not exist or hook type is invalid.FileExistsError: If a hook with the same name and type already exists.from flowerpower.pipeline import PipelineManager, HookType
+
+manager = PipelineManager()
+manager.add_hook(
+ name="my_pipeline",
+ type=HookType.MQTT_BUILD_CONFIG,
+ to="mqtt",
+ function_name="build_mqtt_config"
+)
+Remove a hook from a specific pipeline.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name of the pipeline to remove the hook from. | ++ |
type |
+HookType |
+Type of the hook to remove. | ++ |
function_name |
+str |
+Name of the function that was used as the hook. | ++ |
Returns: None
Raises: FileNotFoundError: If the pipeline or hook does not exist.
from flowerpower.pipeline import PipelineManager, HookType
+
+manager = PipelineManager()
+manager.remove_hook(
+ name="my_pipeline",
+ type=HookType.MQTT_BUILD_CONFIG,
+ function_name="build_mqtt_config"
+)
+import_pipeline(self, name: str, src_base_dir: str, src_fs: AbstractFileSystem | None = None, src_storage_options: dict | BaseStorageOptions | None = None, overwrite: bool = False)
+Import a pipeline from another FlowerPower project.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name for the new pipeline in the current project. | ++ |
src_base_dir |
+str |
+Source FlowerPower project directory or URI. Examples: - Local: "/path/to/other/project" - S3: "s3://bucket/project" - GitHub: "github://org/repo/project" |
++ |
src_fs |
+AbstractFileSystem \| None |
+Pre-configured source filesystem. Example: S3FileSystem(anon=False) |
+None |
+
src_storage_options |
+dict \| BaseStorageOptions \| None |
+Options for source filesystem access. Example: {"key": "ACCESS_KEY", "secret": "SECRET_KEY"} |
+None |
+
overwrite |
+bool |
+Whether to replace existing pipeline if name exists. | +False |
+
Returns: None
Raises:
+ValueError: If pipeline name exists and overwrite=False.FileNotFoundError: If source pipeline not found.RuntimeError: If import fails.from flowerpower.pipeline import PipelineManager
+from s3fs import S3FileSystem
+
+manager = PipelineManager()
+
+# Import from local filesystem
+manager.import_pipeline(
+ "new_pipeline",
+ "/path/to/other/project"
+)
+
+# Import from S3 with custom filesystem
+s3 = S3FileSystem(anon=False)
+manager.import_pipeline(
+ "s3_pipeline",
+ "s3://bucket/project",
+ src_fs=s3
+)
+import_many(self, names: list[str], src_base_dir: str, src_fs: AbstractFileSystem | None = None, src_storage_options: dict | BaseStorageOptions | None = None, overwrite: bool = False)
+Import multiple pipelines from another FlowerPower project.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
names |
+list[str] |
+List of pipeline names to import. | ++ |
src_base_dir |
+str |
+Source FlowerPower project directory or URI. Examples: - Local: "/path/to/other/project" - S3: "s3://bucket/project" - GitHub: "github://org/repo/project" |
++ |
src_fs |
+AbstractFileSystem \| None |
+Pre-configured source filesystem. Example: S3FileSystem(anon=False) |
+None |
+
src_storage_options |
+dict \| BaseStorageOptions \| None |
+Options for source filesystem access. Example: {"key": "ACCESS_KEY", "secret": "SECRET_KEY"} |
+None |
+
overwrite |
+bool |
+Whether to replace existing pipelines if names exist. | +False |
+
Returns: None
Raises:
+ValueError: If any pipeline name exists and overwrite=False.FileNotFoundError: If any source pipeline not found.RuntimeError: If import fails.from flowerpower.pipeline import PipelineManager
+
+manager = PipelineManager()
+
+# Import multiple pipelines
+manager.import_many(
+ names=["pipeline1", "pipeline2"],
+ src_base_dir="/path/to/other/project"
+)
+
+# Import multiple pipelines from S3
+manager.import_many(
+ names=["s3_pipeline_a", "s3_pipeline_b"],
+ src_base_dir="s3://bucket/source",
+ src_storage_options={
+ "key": "ACCESS_KEY",
+ "secret": "SECRET_KEY"
+ }
+)
+export_pipeline(self, name: str, dest_base_dir: str, dest_fs: AbstractFileSystem | None = None, dest_storage_options: dict | BaseStorageOptions | None = None, overwrite: bool = False)
+Export a pipeline to another FlowerPower project.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name of the pipeline to export. | ++ |
dest_base_dir |
+str |
+Destination FlowerPower project directory or URI. Examples: - Local: "/path/to/backup" - S3: "s3://bucket/backups" - GCS: "gs://bucket/backups" |
++ |
dest_fs |
+AbstractFileSystem \| None |
+Pre-configured destination filesystem. Example: GCSFileSystem(project='my-project') |
+None |
+
dest_storage_options |
+dict \| BaseStorageOptions \| None |
+Options for destination filesystem access. Example: {"token": "my_token"} |
+None |
+
overwrite |
+bool |
+Whether to replace existing pipeline in destination if name exists. | +False |
+
Returns: None
Raises:
+FileNotFoundError: If the pipeline does not exist in the current project.FileExistsError: If destination pipeline exists and overwrite=False.RuntimeError: If export fails.from flowerpower.pipeline import PipelineManager
+from gcsfs import GCSFileSystem
+
+manager = PipelineManager()
+
+# Export to local backup
+manager.export_pipeline(
+ "my_pipeline",
+ "/path/to/backup"
+)
+
+# Export to Google Cloud Storage
+gcs = GCSFileSystem(project='my-project')
+manager.export_pipeline(
+ "prod_pipeline",
+ "gs://my-bucket/backups",
+ dest_fs=gcs
+)
+export_many(self, names: list[str], dest_base_dir: str, dest_fs: AbstractFileSystem | None = None, dest_storage_options: dict | BaseStorageOptions | None = None, overwrite: bool = False)
+Export multiple pipelines to another FlowerPower project.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
names |
+list[str] |
+List of pipeline names to export. | ++ |
dest_base_dir |
+str |
+Destination FlowerPower project directory or URI. Examples: - Local: "/path/to/backup" - S3: "s3://bucket/backups" - GCS: "gs://bucket/backups" |
++ |
dest_fs |
+AbstractFileSystem \| None |
+Pre-configured destination filesystem. Example: GCSFileSystem(project='my-project') |
+None |
+
dest_storage_options |
+dict \| BaseStorageOptions \| None |
+Options for destination filesystem access. Example: {"token": "my_token"} |
+None |
+
overwrite |
+bool |
+Whether to replace existing pipelines in destination if names exist. | +False |
+
Returns: None
Raises:
+FileNotFoundError: If any pipeline does not exist in the current project.FileExistsError: If any destination pipeline exists and overwrite=False.RuntimeError: If export fails.from flowerpower.pipeline import PipelineManager
+
+manager = PipelineManager()
+
+# Export multiple pipelines
+manager.export_many(
+ names=["pipeline1", "pipeline2"],
+ dest_base_dir="/path/to/backup"
+)
+
+# Export multiple pipelines from S3
+manager.export_many(
+ names=["s3_pipeline_a", "s3_pipeline_b"],
+ dest_base_dir="s3://bucket/backups",
+ dest_storage_options={
+ "key": "ACCESS_KEY",
+ "secret": "SECRET_KEY"
+ }
+)
+show_dag(self, name: str, format: str = "png", show_outputs: bool = False, display_html: bool = False)
+Generate and display the Directed Acyclic Graph (DAG) of a pipeline.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name of the pipeline to visualize. | ++ |
format |
+str |
+Output format for the DAG ("png", "svg", "html", "dot"). | +"png" |
+
show_outputs |
+bool |
+Whether to include output nodes in the DAG. | +False |
+
display_html |
+bool |
+Whether to display the HTML directly in the notebook (only for "html" format). | +False |
+
Returns: None (displays the DAG directly or saves it to a file).
Raises:
+FileNotFoundError: If the pipeline does not exist.ValueError: If format is invalid or visualization fails.from flowerpower.pipeline import PipelineManager
+
+manager = PipelineManager()
+
+# Show DAG as PNG
+manager.show_dag("my_pipeline")
+
+# Show DAG as SVG with outputs
+manager.show_dag("ml_pipeline", format="svg", show_outputs=True)
+show_execution_graph(self, name: str, format: str = "png", show_outputs: bool = False, display_html: bool = False, inputs: dict | None = None, config: dict | None = None)
+Generate and display the execution graph of a pipeline, considering inputs and configuration.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+Name of the pipeline to visualize. | ++ |
format |
+str |
+Output format for the graph ("png", "svg", "html", "dot"). | +"png" |
+
show_outputs |
+bool |
+Whether to include output nodes in the graph. | +False |
+
display_html |
+bool |
+Whether to display the HTML directly in the notebook (only for "html" format). | +False |
+
inputs |
+dict \| None |
+Input values to consider for graph generation. | +None |
+
config |
+dict \| None |
+Configuration for Hamilton pipeline executor. | +None |
+
Returns: None (displays the graph directly or saves it to a file).
Raises:
+FileNotFoundError: If the pipeline does not exist.ValueError: If format is invalid or visualization fails.Module: flowerpower.job_queue.rq.RQManager
The RQManager is the implementation of JobQueueManager for Redis Queue (RQ). It handles the specifics of interacting with an RQ backend.
__init__(self, name: str, base_dir: str | None = None, backend: RQBackend | None = None, storage_options: dict | None = None, fs: AbstractFileSystem | None = None, log_level: str | None = None)
+Initializes the RQManager.
| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
name |
+str |
+The name of the scheduler instance. | ++ |
base_dir |
+str \| None |
+The base directory of the project. | +None |
+
backend |
+RQBackend \| None |
+An RQBackend instance for Redis connection configuration. |
+None |
+
storage_options |
+dict \| None |
+Storage options for the filesystem. | +None |
+
fs |
+AbstractFileSystem \| None |
+An fsspec-compatible filesystem instance. | +None |
+
log_level |
+str \| None |
+The logging level. | +None |
+
add_job(self, func: Callable, func_args: list | None = None, func_kwargs: dict | None = None, job_id: str | None = None, result_ttl: int | None = None, ttl: int | None = None, timeout: int | None = None, queue_name: str | None = None, run_at: datetime | None = None, run_in: timedelta | int | str | None = None, retry: Retry | None = None, repeat: int | None = None, meta: dict | None = None, failure_ttl: int | None = None, group_id: str | None = None, on_success: Callable | tuple[Callable, tuple | None, dict | None] | None = None, on_failure: Callable | tuple[Callable, tuple | None, dict | None] | None = None, on_stopped: Callable | tuple[Callable, tuple | None, dict | None] | None = None, **job_kwargs)
+Adds a job to the queue for immediate or scheduled execution.
+Warning
+This method is deprecated. Use enqueue, enqueue_in, or enqueue_at instead.
| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
func |
+Callable |
+The function to execute. | ++ |
func_args |
+list | None |
+Positional arguments for the function. | +None |
+
func_kwargs |
+dict | None |
+Keyword arguments for the function. | +None |
+
job_id |
+str | None |
+Unique identifier for the job. | +None |
+
result_ttl |
+int | None |
+Time to live for job result (seconds). | +None |
+
ttl |
+int | None |
+Total time to live for the job (seconds). | +None |
+
timeout |
+int | None |
+Job execution timeout (seconds). | +None |
+
queue_name |
+str | None |
+The name of the RQ queue to use. | +None |
+
run_at |
+datetime | None |
+Specific datetime to run the job. | +None |
+
run_in |
+timedelta | int | str | None |
+Delay before running the job. | +None |
+
retry |
+Retry | None |
+Retry policy for the job. | +None |
+
repeat |
+int | None |
+Number of times to repeat the job. | +None |
+
meta |
+dict | None |
+Arbitrary metadata for the job. | +None |
+
failure_ttl |
+int | None |
+Time to live for failed job result (seconds). | +None |
+
group_id |
+str | None |
+Group ID for the job. | +None |
+
on_success |
+Callable | tuple[Callable, tuple | None, dict | None] | None |
+Callback on job success. | +None |
+
on_failure |
+Callable | tuple[Callable, tuple | None, dict | None] | None |
+Callback on job failure. | +None |
+
on_stopped |
+Callable | tuple[Callable, tuple | None, dict | None] | None |
+Callback on job stopped. | +None |
+
**job_kwargs |
+Any |
+Additional keyword arguments for RQ's Job class. |
++ |
Returns: Job - The enqueued job object.
Raises: ValueError: If required parameters are missing or invalid.
from flowerpower.job_queue.rq import RQManager
+from datetime import datetime, timedelta
+
+manager = RQManager(name="my_rq_manager")
+
+# Enqueue a simple job
+def my_task(x, y):
+ return x + y
+
+job = manager.add_job(my_task, func_args=[1, 2], queue_name="default")
+print(f"Enqueued job {job.id}")
+
+# Schedule a job to run in 5 minutes
+job = manager.add_job(my_task, func_args=[3, 4], run_in=timedelta(minutes=5), queue_name="default")
+
+# Schedule a job to run at a specific time
+target_time = datetime(2025, 1, 1, 10, 0, 0)
+job = manager.add_job(my_task, func_args=[5, 6], run_at=target_time, queue_name="default")
+start_worker(self, background: bool = False, queue_names: list[str] | None = None, with_scheduler: bool = False, **kwargs)
+Starts a worker process for the job queue.
+| Parameter | +Type | +Description | +Default | +
|---|---|---|---|
background |
+bool |
+If True, runs the worker in the background. |
+False |
+
queue_names |
+list[str] \| None |
+A list of RQ queues to listen to. Defaults to all queues. | +None |
+
with_scheduler |
+bool |
+If True, the worker also processes scheduled jobs. |
+False |
+
**kwargs |
+Any |
+Additional arguments for RQ's Worker class. |
++ |
Returns: None
Raises: RuntimeError: If the worker fails to start.
from flowerpower.job_queue.rq import RQManager
+
+manager = RQManager(name="my_rq_manager")
+
+# Start a worker in the foreground, listening to the 'default' queue
+manager.start_worker(queue_names=["default"])
+
+# Start a worker in the background with scheduler enabled
+manager.start_worker(background=True, with_scheduler=True)
+Welcome to the architectural overview of FlowerPower. This document provides a high-level look at the library's design, its core components, and the principles that guide its development. Our goal is to create a powerful, flexible, and easy-to-use platform for building data pipelines and managing asynchronous jobs.
+FlowerPower is built on a foundation of modularity and clear separation of concerns. Key design principles include:
+FlowerPowerProject) simplifies interaction with the library's powerful features.The library's architecture is centered around a few key components that work together to provide a seamless experience.
+graph TD
+ A[FlowerPowerProject] -->|Manages| B(PipelineManager)
+ A -->|Manages| C(JobQueueManager)
+ B -->|Uses| D[Hamilton]
+ C -->|Uses| E[RQManager]
+ E -->|Uses| F[Redis]
+
+ subgraph "Core Components"
+ B
+ C
+ E
+ end
+
+ subgraph "External Dependencies"
+ D
+ F
+ end
+FlowerPowerProject¶The FlowerPowerProject class is the main entry point and public-facing API of the library. It acts as a facade, providing a unified interface to the underlying PipelineManager and JobQueueManager. This simplifies the user experience by abstracting away the complexities of the individual components.
PipelineManager¶The PipelineManager is responsible for everything related to data pipelines:
FlowerPower leverages Hamilton to define the logic of its data pipelines. Hamilton's declarative, function-based approach allows you to define complex dataflows in a clear and maintainable way. Each function in a Hamilton module represents a node in the DAG, and Hamilton automatically resolves the dependencies and executes the functions in the correct order.
+Note
+To learn more about Hamilton, visit the official documentation.
+JobQueueManager and RQManager¶The JobQueueManager is a factory responsible for creating and managing job queue backends. Currently, the primary implementation is the RQManager, which uses the powerful Redis Queue (RQ) library.
The RQManager handles:
RQ uses Redis as its message broker and storage backend. This provides a robust and performant foundation for the job queueing system.
+Tip
+You can monitor and manage your RQ queues using tools like rq-dashboard.
FlowerPower includes a filesystem abstraction layer that allows you to work with local and remote filesystems (e.g., S3, GCS) using a consistent API. This makes it easy to build pipelines that can read from and write to various storage backends without changing your core logic.
+FlowerPower's architecture is designed to be both powerful and flexible. By combining the strengths of Hamilton for dataflow definition and RQ for asynchronous processing, it provides a comprehensive solution for a wide range of data-intensive applications. The modular design and unified interface make it easy to get started, while the extensible nature of the library allows it to grow with your needs.
+