Skip to content

Commit 1e6f65d

Browse files
Merge pull request #32 from aaditshah18/feature/intermediate_labs2
Added Intermediate lab for cloud composer
2 parents a94bf7e + 7c7d171 commit 1e6f65d

File tree

7 files changed

+479
-58
lines changed

7 files changed

+479
-58
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,61 @@
11
# Watch the tutorial video for this lab at [Cloud Composer Lab1](https://youtu.be/JB_I416LQ7A)
2+
3+
4+
# Google Cloud Composer Airflow Tutorial
5+
6+
Welcome to the Google Cloud Composer Airflow tutorial! This guide will help you understand the basics of using Apache Airflow with Google Cloud Composer, set up a simple Directed Acyclic Graph (DAG), and deploy it on Google Cloud Composer.
7+
8+
## What is Google Cloud Composer?
9+
10+
Google Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow. It allows you to author, schedule, and monitor workflows as directed acyclic graphs (DAGs) of tasks. Cloud Composer helps you automate and orchestrate tasks across cloud and on-premises environments.
11+
12+
## What is Apache Airflow?
13+
14+
Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. It allows you to define workflows as code, ensuring they are versionable, testable, and maintainable. Airflow uses DAGs to represent workflows, where each node is a task and edges define the execution order.
15+
16+
## Tutorial Overview
17+
18+
In this tutorial, we will create a simple DAG consisting of two tasks:
19+
1. A `BashOperator` task that prints "Hello, Cloud Composer!".
20+
2. A `PythonOperator` task that runs a simple Python function.
21+
22+
Refer to [this page](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html) for more operators.
23+
24+
25+
### Prerequisites
26+
27+
- A Google Cloud Platform account.
28+
- A Google Cloud Composer environment set up.
29+
- Basic knowledge of Python and Airflow concepts.
30+
31+
## Explanation of the DAG
32+
- default_args: Contains default arguments for the DAG. Here, it specifies the owner and start date.
33+
- dag: Defines the DAG with an ID, default arguments, description, and schedule interval.
34+
- hello_world_task: A BashOperator that prints "Hello, Cloud Composer!".
35+
- say_hello_task: A PythonOperator that runs a simple Python function.
36+
37+
## Setting Up the Environment
38+
39+
1. **Create a Google Cloud Composer environment:**
40+
- Navigate to the Cloud Console.
41+
- Go to the Cloud Composer section.
42+
- Create a new environment.
43+
44+
2. **Upload the DAG to Cloud Composer:**
45+
- Once the environment is ready, upload the DAG file to the `dags` folder in your Cloud Storage bucket associated with the Composer environment.
46+
47+
3. **Trigger the DAG:**
48+
- Go to the Airflow web interface through the Cloud Composer environment.
49+
- Manually trigger the DAG and observe the task execution.
50+
51+
52+
## Conclusion
53+
This tutorial provided a basic introduction to creating and deploying a DAG in Google Cloud Composer using Apache Airflow. With these foundations, you can start building more complex workflows to automate various tasks in your cloud and on-premises environments.
54+
55+
56+
## Important Links
57+
58+
- Quickstart Guide to Google Cloud Composer - [Getting Started With Composer](https://cloud.google.com/composer/docs/run-apache-airflow-dag)
59+
- Installing Google Cloud CLI - [Installing gcloud](https://cloud.google.com/sdk/docs/install-sdk)
60+
- Identity Management of Cloud Composer - [Control Access](https://cloud.google.com/composer/docs/how-to/access-control#composer-sa)
61+
- Airflow Operators - [Operators](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Cloud Composer Intermediate Tutorial
2+
3+
This lab contains a Cloud Composer Intermediate tutorial that demonstrates various features of Apache Airflow including task parameterization, file operations, HTTP requests, task dependencies, and the use of sensors in GCP's Cloud Composer. The tutorial is structured into three Directed Acyclic Graphs (DAGs), each showcasing different functionalities.
4+
5+
## Prerequisites
6+
7+
- Apache Airflow installed
8+
- Google Cloud Storage (GCS) access with appropriate permissions
9+
- Python 3.x
10+
- Required Python packages: pandas, requests, apache-airflow, apache-airflow-providers-google
11+
12+
13+
## DAGs Overview
14+
15+
### DAG 1: Parameterize File Path and Use FileSensor
16+
- This DAG reads, processes a file, and uses a GCS File Sensor to detect the existence of a processed file.
17+
18+
Tasks
19+
- read_and_serialize_task: Reads a CSV file from GCS, serializes it to JSON, and stores the result in XCom.
20+
- process_task: Processes the serialized data from the previous task, fills missing values, and writes the output back to GCS.
21+
- file_sensor_task: Waits for the processed file to appear in GCS, then logs the task completion status.
22+
23+
### DAG 2: File Operations and HTTP Request
24+
This DAG performs file operations on GCS and makes an HTTP request.
25+
26+
Tasks
27+
- file_op_task: Reads a file from GCS and logs its content.
28+
- http_request_task: Makes an HTTP GET request to a given URL and logs the response.
29+
30+
### DAG 3: Task Dependencies
31+
This DAG demonstrates task dependencies using dummy tasks, a bash command, and Python callable tasks.
32+
33+
Tasks
34+
- start_task: Dummy task used to start the DAG.
35+
- bash_task: Runs a simple bash command.
36+
- middle_task: Logs a message indicating the task execution.
37+
- branch_task: Dummy task used for branching logic.
38+
- end_task: Logs a message indicating the task completion.
39+
40+
## Functions
41+
`read_and_serialize`
42+
Reads a CSV file from GCS, serializes it to JSON, and logs the content.
43+
44+
`read_and_serialize_return`
45+
Wrapper function that returns the result of read_and_serialize
46+
47+
`process_file`
48+
Processes the serialized data, fills missing values, and writes the output back to GCS.
49+
50+
`file_operation`
51+
Reads a file from GCS and logs its content.
52+
53+
`make_http_request`
54+
Makes an HTTP GET request to a specified URL and logs the response.
55+
56+
`log_file_sensor_output`
57+
Logs the output of the file sensor task.
58+
59+
## How to Run
60+
- Ensure Airflow is properly installed and configured.
61+
- Place the Python script and required files in the appropriate directories.
62+
- Trigger the DAGs using the Airflow UI or CLI.
63+
64+
```
65+
airflow dags trigger dag_1_parameterize
66+
airflow dags trigger dag_file_and_http
67+
airflow dags trigger dag_3_dependencies
68+
```
69+
70+
71+
## Conclusion
72+
This tutorial covers the basics of using Airflow with google cloud composer for various tasks such as file operations, HTTP requests, and demonstrating task dependencies. By following this tutorial, you should gain a good understanding of how to create and manage DAGs in Airflow.

Labs/GCP_Labs/Cloud_Composer_Labs/Lab2/__init__.py

Whitespace-only changes.
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
from airflow import DAG
2+
from airflow.operators.dummy import DummyOperator
3+
from airflow.operators.python import PythonOperator
4+
from airflow.operators.bash import BashOperator
5+
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
6+
from datetime import datetime, timedelta
7+
from dag_functions import (
8+
file_operation,
9+
make_http_request,
10+
process_file,
11+
read_and_serialize_return,
12+
log_file_sensor_output,
13+
final_task,
14+
)
15+
import logging
16+
17+
AIRFLOW_TASK = "airflow.task"
18+
OUTPUT_PATH = "us-east1-composer-airflow-1c67778d-bucket/data/dag_processed_file.csv"
19+
logger = logging.getLogger(AIRFLOW_TASK)
20+
21+
default_args = {
22+
'owner': 'Aadit',
23+
'start_date': datetime(2023, 9, 17),
24+
'retries': 0, # Number of retries in case of task failure
25+
'retry_delay': timedelta(minutes=5), # Delay before retries
26+
}
27+
28+
dag_1 = DAG(
29+
'dag_1_parameterize',
30+
default_args=default_args,
31+
description='DAG to parameterize file path, process file, and use FileSensor',
32+
schedule_interval=None,
33+
catchup=False,
34+
)
35+
36+
read_serialize_task = PythonOperator(
37+
task_id='read_and_serialize',
38+
python_callable=read_and_serialize_return,
39+
op_kwargs={
40+
'file_path': 'us-east1-composer-airflow-1c67778d-bucket/data/dag_processing_file.csv'
41+
},
42+
dag=dag_1,
43+
)
44+
45+
process_task = PythonOperator(
46+
task_id='process_file',
47+
python_callable=process_file,
48+
op_kwargs={
49+
'output_path': OUTPUT_PATH,
50+
},
51+
provide_context=True,
52+
dag=dag_1,
53+
)
54+
55+
# File sensor task to check for the processed file's existence
56+
file_sensor_task = GCSObjectExistenceSensor(
57+
task_id='file_sensor_task',
58+
bucket='us-east1-composer-airflow-1c67778d-bucket',
59+
object='data/dag_processed_file.csv',
60+
poke_interval=10,
61+
timeout=300,
62+
dag=dag_1,
63+
on_success_callback=log_file_sensor_output,
64+
on_failure_callback=log_file_sensor_output,
65+
)
66+
67+
# Final task to execute after the file sensor task
68+
final_processing_task = PythonOperator(
69+
task_id='final_processing_task',
70+
python_callable=final_task,
71+
op_kwargs={
72+
'output_path': OUTPUT_PATH,
73+
},
74+
dag=dag_1,
75+
)
76+
77+
78+
read_serialize_task >> process_task >> file_sensor_task >> final_processing_task
79+
80+
dag_2 = DAG(
81+
'dag_file_and_http',
82+
default_args=default_args,
83+
description='DAG for file operations and HTTP request',
84+
schedule_interval=None,
85+
catchup=False,
86+
)
87+
88+
file_op_task = PythonOperator(
89+
task_id='file_operation',
90+
python_callable=file_operation,
91+
op_kwargs={'file_path': OUTPUT_PATH},
92+
dag=dag_2,
93+
)
94+
95+
http_request_task = PythonOperator(
96+
task_id='http_request',
97+
python_callable=make_http_request,
98+
op_kwargs={'url': 'https://jsonplaceholder.typicode.com/todos/1'},
99+
dag=dag_2,
100+
)
101+
102+
file_op_task >> http_request_task
103+
104+
### DAG 3: Task Dependencies
105+
106+
dag_3 = DAG(
107+
'dag_3_dependencies',
108+
default_args=default_args,
109+
description='DAG to demonstrate task dependencies',
110+
schedule_interval=None,
111+
catchup=False,
112+
)
113+
114+
# DummyOperator: Used for grouping and branching logic
115+
start_task = DummyOperator(
116+
task_id='start_task',
117+
dag=dag_3,
118+
)
119+
120+
# BashOperator: Runs a simple bash command
121+
bash_task = BashOperator(
122+
task_id='bash_task',
123+
bash_command='echo "This is a bash command"',
124+
dag=dag_3,
125+
)
126+
127+
# PythonOperator: Runs a Python callable
128+
middle_task = PythonOperator(
129+
task_id='middle_task',
130+
python_callable=lambda: logger.info("Middle Task"),
131+
dag=dag_3,
132+
trigger_rule='all_done', # Execute regardless of the upstream task's status
133+
)
134+
135+
# DummyOperator: Used for grouping and branching logic
136+
branch_task = DummyOperator(
137+
task_id='branch_task',
138+
dag=dag_3,
139+
)
140+
141+
# PythonOperator: Runs a Python callable
142+
end_task = PythonOperator(
143+
task_id='end_task',
144+
python_callable=lambda: logger.info("End Task"),
145+
dag=dag_3,
146+
)
147+
148+
# Set task dependencies
149+
150+
"""
151+
The task executes with the following dependencies:
152+
153+
- `start_task`: Initiates the workflow and triggers two parallel tasks.
154+
- `bash_task`: Executes a bash script (or a set of operations) and passes the result to `middle_task`.
155+
- `branch_task`: Executes an independent task (or a set of operations).
156+
- `middle_task`: Processes the output of `bash_task`.
157+
- `end_task`: Finalizes the workflow by consuming the results of `middle_task` and `branch_task`.
158+
159+
The dependency structure ensures that:
160+
- `bash_task` and `branch_task` run concurrently.
161+
- `middle_task` depends on the completion of `bash_task`.
162+
- `end_task` waits for both `middle_task` and `branch_task` to finish.
163+
164+
This parallel execution can optimize the overall runtime, especially when tasks are I/O bound or computationally independent.
165+
"""
166+
167+
start_task >> [bash_task, branch_task]
168+
bash_task >> middle_task >> end_task
169+
branch_task >> end_task
170+
171+
# If this script is run directly, allow command-line interaction with the DAG
172+
if __name__ == "__main__":
173+
dag_1.cli()
174+
dag_2.cli()
175+
dag_3.cli()

0 commit comments

Comments
 (0)