Skip to content

Commit 69e6bc4

Browse files
committed
Addl dag, revised storyline, new screenshots.
Signed-off-by: merobi-hub <[email protected]>
1 parent c18a0ea commit 69e6bc4

File tree

4 files changed

+85
-36
lines changed

4 files changed

+85
-36
lines changed
119 KB
Loading
47.4 KB
Loading
42.2 KB
Loading

docs/v2/docs/airflow_tutorial/index.mdx

Lines changed: 85 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ $ git clone https://github.com/MarquezProject/marquez && cd marquez
6161
</TabItem>
6262
</Tabs>
6363

64-
Both Airflow and Marquez require port 5432 for their metastores, but the Marquez services are much easier to configure, even on the fly. So start Marquez with an alternate port:
64+
Both Airflow and Marquez require port 5432 for their metastores, but the Marquez services are easier to configure. You can also assign the database service to a new port on the fly. To start Marquez using port 2345 for the database, run:
6565

6666
<Tabs groupId="start">
6767
<TabItem value="macos" label="MacOS/Linux">
@@ -82,42 +82,47 @@ $ sh ./docker/up.sh --db-port 2345
8282
</TabItem>
8383
</Tabs>
8484

85-
To view the Marquez UI and verify it's running, open [http://localhost:3000](http://localhost:3000). The UI allow you to discover dependencies between jobs and the datasets they produce and consume via the lineage graph, view run-level metadata of current and previous job runs, and get a high-level view of current and historical operations.
85+
To view the Marquez UI and verify it's running, open [http://localhost:3000](http://localhost:3000). The UI allows you to:
86+
- cross-platform dependencies, meaning you can see the jobs across the tools in your ecosystem that produce or consume a critical table.
87+
- view run-level metadata of current and previous job runs, enabling you to see the latest status of a job and the update history of a dataset.
88+
- get a high-level view of resource usage, allowing you to see trends in your operations.
8689

8790
## Configure Airflow to send events to Marquez {#configure-airflow}
8891

89-
1. To configure Airflow to emit OpenLineage events to Marquez, you need to define an OpenLineage transport and namespace. This is easy to do using environment variables. Run:
92+
1. To configure Airflow to emit OpenLineage events to Marquez, you need to define an OpenLineage transport. One way you can do this is by using an environment variable. To use `http` and send events to the Marquez API running locally on port `5000`, run:
9093

9194
```bash
9295
$ export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
9396
```
9497

98+
2. You also need to define a namespace for Airflow jobs. It can be any string. Run:
99+
95100
```bash
96101
$ export AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
97102
```
98103

99-
2. To add the required Airflow OpenLineage Provider package to your Airflow environment, run:
104+
3. To add the required Airflow OpenLineage Provider package to your Airflow environment, run:
100105

101106
```bash
102107
$ pip install apache-airflow-providers-openlineage
103108
```
104109

105-
3. To enable adding a Postgres connection for this tutorial, run:
110+
4. To enable adding a Postgres connection for this tutorial, run:
106111

107112
```bash
108113
$ pip install apache-airflow-providers-postgres
109114
```
110115

111-
4. Now add both packages to `requirements.txt`:
116+
5. Now add both packages to `requirements.txt`:
112117

113118
```txt
114119
apache-airflow-providers-openlineage
115120
apache-airflow-providers-postgres
116121
```
117122

118-
5. Create a database in your local Postgres instance and create an Airflow Postgres connection. For help, see: #add URL
123+
6. Create a database in your local Postgres instance and create an Airflow Postgres connection. For help, see: #add URL
119124

120-
6. Add a flaky DAG to Airflow that will _often_ create a table in the Postgres database:
125+
7. Add a flaky DAG to Airflow that will _often_ create a table in the Postgres database:
121126

122127
```py
123128
from __future__ import annotations
@@ -128,14 +133,13 @@ To view the Marquez UI and verify it's running, open [http://localhost:3000](htt
128133
from airflow.decorators import dag, task
129134
from airflow.operators.empty import EmptyOperator
130135
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
136+
from airflow.datasets import Dataset
131137

132-
SQL_1="""CREATE TABLE IF NOT EXISTS airflowsample (
138+
SQL="""CREATE TABLE IF NOT EXISTS airflowsample (
133139
col1 VARCHAR(255),
134140
col2 VARCHAR(255)
135141
)"""
136142

137-
SQL_2="""DROP TABLE airflowsample"""
138-
139143
@dag(
140144
schedule='@hourly',
141145
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
@@ -144,76 +148,121 @@ To view the Marquez UI and verify it's running, open [http://localhost:3000](htt
144148
dag_display_name="Flaky DAG",
145149
)
146150

147-
def flaky_dag():
151+
def example_display_name_brkn():
148152

149153
sample_task_1 = EmptyOperator(
150154
task_id="sample_task_1",
151155
task_display_name="Sample Task 1",
152156
)
153157

158+
sample_task_2 = SQLExecuteQueryOperator(
159+
task_id="sample_task_3",
160+
sql=SQL,
161+
conn_id="postgres_default",
162+
)
163+
154164
@task(
155-
task_display_name="Sample Task 2",
165+
task_display_name="Sample Task 3",
166+
outlets=[Dataset("sample_pg_table")]
156167
)
157-
def sample_task_2():
168+
def sample_task_3():
158169
pers = [0, 60, 120, 'fail']
159170
per = random.choice(pers)
160171
time.sleep(per)
161172

162-
sample_task_3 = SQLExecuteQueryOperator(
163-
task_id="sample_task_3",
173+
sample_task_1 >> sample_task_2 >> sample_task_3()
174+
175+
example_display_name_brkn()
176+
177+
```
178+
179+
8. Add another DAG that updates (and then drops) the table the above DAG creates:
180+
181+
```py
182+
from __future__ import annotations
183+
import time
184+
import random
185+
186+
import pendulum
187+
from airflow.decorators import dag, task
188+
from airflow.operators.empty import EmptyOperator
189+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
190+
from airflow.datasets import Dataset
191+
192+
SQL_1="INSERT INTO airflowsample (col1) VALUES ('row')"
193+
194+
SQL_2="DROP TABLE airflowsample"
195+
196+
@dag(
197+
schedule=[Dataset("sample_pg_table")],
198+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
199+
catchup=False,
200+
tags=["example"],
201+
)
202+
203+
def example_insert_brkn():
204+
205+
sample_task_1 = EmptyOperator(
206+
task_id="sample_insert_task_1",
207+
task_display_name="Sample Task 1",
208+
)
209+
210+
sample_task_2 = SQLExecuteQueryOperator(
211+
task_id="sample_insert_task_2",
164212
sql=SQL_1,
165213
conn_id="postgres_default",
166214
)
167215

168-
sample_task_4 = SQLExecuteQueryOperator(
169-
task_id="sample_task_4",
216+
sample_task_3 = SQLExecuteQueryOperator(
217+
task_id="sample_insert_task_3",
170218
sql=SQL_2,
171219
conn_id="postgres_default",
172220
)
173221

174-
sample_task_1 >> sample_task_2() >> sample_task_3 >> sample_task_4
222+
sample_task_1 >> sample_task_2 >> sample_task_3
223+
224+
example_insert_brkn()
175225

176-
flaky_dag()
177226
```
178227

179-
7. Run your DAG. To verify that the OpenLineage Provider is configured correctly, check the task logs for an `INFO`-level log reporting the transport type you defined: `OpenLineageClient will use http transport`.
228+
This DAG is scheduled on the first one using an Airflow Dataset, so it will run automatically when `Flaky DAG` completes a run.
229+
230+
8. Run your DAGs by triggering the `Flaky DAG`. To verify that the OpenLineage Provider is configured correctly, check the task logs for an `INFO`-level log reporting the transport type you defined. In this case, the log will say: `OpenLineageClient will use http transport`.
180231

181232
## View Airflow operational analytics and data lineage in Marquez {#view-airflow}
182233

183-
The DataOps view offers a high-level view of historical and in-process operations, including task-level run status and runtime information at a glance:
234+
The DataOps view offers a high-level view of historical and in-process operations, including task-level run status and runtime information:
184235

185236
![](marquez_dataops.png)
186237

187238
### Datasets lineage graph
188239

189-
In the Datasets view, you can click on a dataset to inspect a cross-platfrom-capable lineage graph. In this case, you can view the upstream tasks feeding the `airflowsample` table in Airflow:
240+
In the Datasets view, click on the dataset to get a cross-platfrom-capable lineage graph. In this case, you will be able to see the upstream tasks across the two DAGs in your environment that feed the `airflowsample` table in Airflow:
190241

191-
![](marquez_graph_wide.png)
242+
![](brkn_graph.png)
192243

193244
:::info
194245

195-
Dependencies in other platforms that modify or consume the same dataset will also appear in the graph.
246+
Dependencies in other platforms that modify or consume the same dataset will also appear in this graph.
196247

197248
:::
198249

199-
### Dataset details
250+
### Leveraging the Marquez graph
200251

201-
Click on the dataset node for a more details, including the schema, the time of the most recent update, and any facets in the OpenLineage event payload:
252+
If the `airflowsample` table were to get stale (imagine that), you would need to know about all the upstream dependencies in order to diagnose and resolve the issue efficiently.
202253

203-
![](marquez_dataset_drawer.png)
254+
In the graph, you can click on an upstream job node to see information including:
255+
- the latest run status.
256+
- the last runtime.
257+
- the time last started.
258+
- the time last finished.
204259

205-
### Versioned schema history
260+
![](brkn_detail_lrg.png)
206261

207-
Click on the versions tab in the drawer for a versioned schema history:
262+
You can also access a versioned table schema history from the Marquez graph, so you can see at a glance if data quality in a critical table has become compromised and when a loss occurred:
208263

209264
![](marquez_dataset_versions.png)
210265

211-
![](marquez_jobs_drawer.png)
212-
213-
![](marquez_jobs_view.png)
214-
215-
![](marquez_events.png)
216-
217266
## Next Steps {#next-steps}
218267

219268

0 commit comments

Comments
 (0)