Skip to content

0.1.3 Orchestration: cron and airflow #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: 3-012-viewing-data-through-duck-db-from-iceberg
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions local-data-platform/local_data_platform/cron_flow.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great start. I suggest you move this to a package eg: .../local_data_platform/cron/{your_python_script}
Let's say an end user wants to use any cron functionality then they would simply do
"""
from local_data_platform.cron import cron_flow
"""

Copy link
Collaborator

@redpheonixx redpheonixx Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Can we add logger module to add logs for eg. in exception part @brmhastra @mrutunjay-kinagi @tusharchou This way it will be easy to debug in case pipeline breaks.
above point is for all python scripts written till now and is not only subjected to this PR only.
we can add logger module in later stages as well.

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
def cron_flow(cron_expression: str, etl_function: callable, *args, **kwargs) -> None:
"""
Schedule an ETL (Extract, Transform, Load) process to run at specific intervals using a cron job.

Args:
cron_expression (str): The cron expression that specifies the schedule for the ETL job. It follows the standard
cron format: "minute hour day_of_month month day_of_week".
Example: "0 2 * * *" for running the ETL job daily at 2 AM.
etl_function (callable): The function that executes the ETL process, which can be passed as a reference.
This function should handle the extract, transform, and load stages of the process.
*args: Additional positional arguments to pass to the ETL function.
**kwargs: Additional keyword arguments to pass to the ETL function.

Returns:
None

Raises:
ValueError: If the cron expression is invalid or if the ETL function is not callable.
Exception: If an error occurs during the scheduling or execution of the ETL job.

Example:
cron_flow("0 0 * * 0", run_etl_process, source="data_source", target="data_target")

Notes:
- This function requires the cron job to be set up and managed using a task scheduler (e.g., cron in Unix-like
systems or any equivalent scheduling tool).
- Ensure that the ETL function handles any errors internally, as the scheduling system may not handle exceptions
r
"""
print("I Orchestrate!")




def run_etl_process(source: str, target: str, transform_function: callable = None, *args, **kwargs) -> None:
"""
Execute the ETL (Extract, Transform, Load) process.

This function orchestrates the three stages of an ETL process:
1. Extract data from a source.
2. Optionally transform the data using a transformation function.
3. Load the transformed (or raw) data into the target destination.

Args:
source (str): The data source from which to extract the data. It can be a file path, database connection string,
or API endpoint.
target (str): The destination where the data will be loaded. It can be a database, file path, or another storage
mechanism.
transform_function (callable, optional): A function that performs the transformation of the extracted data.
This function should accept the extracted data as input and return the transformed data. If no transformation
is needed, this argument can be left as `None` (default is `None`).
*args: Additional positional arguments to pass to the transformation function or loading process.
**kwargs: Additional keyword arguments to pass to the transformation function or loading process.

Returns:
None

Raises:
ValueError: If the source or target is invalid or if the transformation function does not return the expected output.
IOError: If an error occurs during data extraction or loading.
Exception: For any other issues that occur during the ETL process.

Example:
run_etl_process(source="database_connection_string",
target="data_warehouse",
transform_function=transform_data,
batch_size=1000)

Notes:
- The `transform_function` is optional. If provided, it should handle the logic for transforming the extracted data.
- This function assumes that the source and target formats are compatible with the extraction and loading logic
implemented in the ETL process.
- The actual implementation of extraction, transformation, and loading will depend on the specific needs of the
application (e.g., connecting to databases, reading/writing files, etc.).
"""