|
| 1 | +--- |
| 2 | +title: Apache Airflow® Tutorial |
| 3 | +template: basepage |
| 4 | +sidebar_position: 2 |
| 5 | +--- |
| 6 | + |
| 7 | +import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; |
| 8 | + |
| 9 | +## Table of Contents |
| 10 | + |
| 11 | +1. [Prerequisites](#prerequisites) |
| 12 | +2. [Get and start Marquez](#get-marquez) |
| 13 | +3. [Configure Apache Airflow to send events to Marquez](#configure-airflow) |
| 14 | +4. [View Airflow operational analytics and data lineage in Marquez](#view-airflow) |
| 15 | +5. [Next steps](#next-steps) |
| 16 | +6. [Feedback?](#feedback) |
| 17 | + |
| 18 | +# Prerequisites {#prerequisites} |
| 19 | + |
| 20 | +Before you begin, make sure you have installed: |
| 21 | + |
| 22 | +<Tabs groupId="os"> |
| 23 | +<TabItem value="macos" label="MacOS/Linux"> |
| 24 | + |
| 25 | +* [Docker 17.05+](https://docs.docker.com/install) |
| 26 | +* [Docker Compose](https://docs.docker.com/compose/install) |
| 27 | +* [Airflow 2.8+](https://airflow.apache.org/docs/apache-airflow/stable/start.html) |
| 28 | +* [PostgreSQL 14+](https://www.postgresql.org) |
| 29 | + |
| 30 | +</TabItem> |
| 31 | +<TabItem value="windows" label="Windows"> |
| 32 | + |
| 33 | +* [Git Bash](https://gitforwindows.org/) |
| 34 | +* [PostgreSQL 14+](https://www.postgresql.org/) |
| 35 | +* [Docker 17.05+](https://docs.docker.com/install) |
| 36 | +* [Docker Compose](https://docs.docker.com/compose/install) |
| 37 | +* [Airflow 2.8+](https://airflow.apache.org/docs/apache-airflow/stable/start.html) |
| 38 | + |
| 39 | +</TabItem> |
| 40 | +</Tabs> |
| 41 | + |
| 42 | +## Get and start Marquez {#get-marquez} |
| 43 | + |
| 44 | +1. To checkout the Marquez source code, run: |
| 45 | + |
| 46 | + <Tabs groupId="os"> |
| 47 | + <TabItem value="macos" label="MacOS/Linux"> |
| 48 | + |
| 49 | + ```bash |
| 50 | + $ git clone https://github.com/MarquezProject/marquez && cd marquez |
| 51 | + ``` |
| 52 | + |
| 53 | + </TabItem> |
| 54 | + <TabItem value="windows" label="Windows"> |
| 55 | + |
| 56 | + ```bash |
| 57 | + $ git config --global core.autocrlf false |
| 58 | + $ git clone https://github.com/MarquezProject/marquez && cd marquez |
| 59 | + ``` |
| 60 | + |
| 61 | + </TabItem> |
| 62 | + </Tabs> |
| 63 | + |
| 64 | +2. Both Airflow and Marquez require port 5432 for their metastores, but the Marquez services are easier to configure. You can also assign the database service to a new port on the fly. To start Marquez using port 2345 for the database, run: |
| 65 | + |
| 66 | + <Tabs groupId="os"> |
| 67 | + <TabItem value="macos" label="MacOS/Linux"> |
| 68 | + |
| 69 | + ```bash |
| 70 | + $ ./docker/up.sh --db-port 2345 |
| 71 | + ``` |
| 72 | + |
| 73 | + </TabItem> |
| 74 | + <TabItem value="windows" label="Windows"> |
| 75 | + |
| 76 | + Verify that Postgres and Bash are in your `PATH`, then run: |
| 77 | + |
| 78 | + ```bash |
| 79 | + $ sh ./docker/up.sh --db-port 2345 |
| 80 | + ``` |
| 81 | + |
| 82 | + </TabItem> |
| 83 | + </Tabs> |
| 84 | + |
| 85 | +3. To view the Marquez UI and verify it's running, open [http://localhost:3000](http://localhost:3000). The UI allows you to: |
| 86 | + |
| 87 | + - view cross-platform dependencies, meaning you can see the jobs across the tools in your ecosystem that produce or consume a critical table. |
| 88 | + - view run-level metadata of current and previous job runs, enabling you to see the latest status of a job and the update history of a dataset. |
| 89 | + - get a high-level view of resource usage, allowing you to see trends in your operations. |
| 90 | + |
| 91 | +## Configure Airflow to send events to Marquez {#configure-airflow} |
| 92 | + |
| 93 | +1. To configure Airflow to emit OpenLineage events to Marquez, you need to define an OpenLineage transport. One way you can do this is by using an environment variable. To use `http` and send events to the Marquez API running locally on port `5000`, run: |
| 94 | + |
| 95 | + <Tabs groupId="os"> |
| 96 | + <TabItem value="macos" label="MacOS/Linux"> |
| 97 | + |
| 98 | + ```bash |
| 99 | + $ export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}' |
| 100 | + ``` |
| 101 | + |
| 102 | + </TabItem> |
| 103 | + <TabItem value="windows" label="Windows"> |
| 104 | + |
| 105 | + ```bash |
| 106 | + $ set AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}' |
| 107 | + ``` |
| 108 | + |
| 109 | + </TabItem> |
| 110 | + </Tabs> |
| 111 | + |
| 112 | +2. You also need to define a namespace for Airflow jobs. It can be any string. Run: |
| 113 | + |
| 114 | + <Tabs groupId="os"> |
| 115 | + <TabItem value="macos" label="MacOS/Linux"> |
| 116 | + |
| 117 | + ```bash |
| 118 | + $ export AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance' |
| 119 | + ``` |
| 120 | + |
| 121 | + </TabItem> |
| 122 | + <TabItem value="windows" label="Windows"> |
| 123 | + |
| 124 | + ```bash |
| 125 | + $ set AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance' |
| 126 | + ``` |
| 127 | + |
| 128 | + </TabItem> |
| 129 | + </Tabs> |
| 130 | + |
| 131 | +3. To add the required Airflow OpenLineage Provider package to your Airflow environment, run: |
| 132 | + |
| 133 | + <Tabs groupId="os"> |
| 134 | + <TabItem value="macos" label="MacOS/Linux"> |
| 135 | + |
| 136 | + ```bash |
| 137 | + $ pip install apache-airflow-providers-openlineage |
| 138 | + ``` |
| 139 | + |
| 140 | + </TabItem> |
| 141 | + <TabItem value="windows" label="Windows"> |
| 142 | + |
| 143 | + ```bash |
| 144 | + $ pip install apache-airflow-providers-openlineage |
| 145 | + ``` |
| 146 | + |
| 147 | + </TabItem> |
| 148 | + </Tabs> |
| 149 | + |
| 150 | +4. To enable adding a Postgres connection for this tutorial, run: |
| 151 | + |
| 152 | + <Tabs groupId="os"> |
| 153 | + <TabItem value="macos" label="MacOS/Linux"> |
| 154 | + |
| 155 | + ```bash |
| 156 | + $ pip install apache-airflow-providers-postgres |
| 157 | + ``` |
| 158 | + |
| 159 | + </TabItem> |
| 160 | + <TabItem value="windows" label="Windows"> |
| 161 | + |
| 162 | + ```bash |
| 163 | + $ pip install apache-airflow-providers-postgres |
| 164 | + ``` |
| 165 | + |
| 166 | + </TabItem> |
| 167 | + </Tabs> |
| 168 | + |
| 169 | +5. Create a database in your local Postgres instance and create an Airflow Postgres connection using the default ID (`postgres_default`). For help with the former, see: [Postgres Documentation](https://www.postgresql.org/docs/). For help with the latter, see: [Managing Connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#managing-connections). |
| 170 | + |
| 171 | +6. Add a flaky DAG to Airflow that will _often_ create a table in the Postgres database: |
| 172 | + |
| 173 | + ```py |
| 174 | + from __future__ import annotations |
| 175 | + import time |
| 176 | + import random |
| 177 | + |
| 178 | + import pendulum |
| 179 | + from airflow.decorators import dag, task |
| 180 | + from airflow.operators.empty import EmptyOperator |
| 181 | + from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator |
| 182 | + from airflow.datasets import Dataset |
| 183 | + |
| 184 | + SQL="""CREATE TABLE IF NOT EXISTS airflowsample ( |
| 185 | + col1 VARCHAR(255), |
| 186 | + col2 VARCHAR(255) |
| 187 | + )""" |
| 188 | + |
| 189 | + @dag( |
| 190 | + schedule='@hourly', |
| 191 | + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), |
| 192 | + catchup=False, |
| 193 | + tags=["example"], |
| 194 | + dag_display_name="Flaky DAG", |
| 195 | + ) |
| 196 | + |
| 197 | + def example_display_name_brkn(): |
| 198 | + |
| 199 | + sample_task_1 = EmptyOperator( |
| 200 | + task_id="sample_task_1", |
| 201 | + task_display_name="Sample Task 1", |
| 202 | + ) |
| 203 | + |
| 204 | + sample_task_2 = SQLExecuteQueryOperator( |
| 205 | + task_id="sample_task_3", |
| 206 | + sql=SQL, |
| 207 | + conn_id="postgres_default", |
| 208 | + ) |
| 209 | + |
| 210 | + @task( |
| 211 | + task_display_name="Sample Task 3", |
| 212 | + outlets=[Dataset("sample_pg_table")] |
| 213 | + ) |
| 214 | + def sample_task_3(): |
| 215 | + pers = [0, 60, 120, 'fail'] |
| 216 | + per = random.choice(pers) |
| 217 | + time.sleep(per) |
| 218 | + |
| 219 | + sample_task_1 >> sample_task_2 >> sample_task_3() |
| 220 | + |
| 221 | + example_display_name_brkn() |
| 222 | + |
| 223 | + ``` |
| 224 | + |
| 225 | +7. Add another DAG that updates (and then drops) the Postgres table: |
| 226 | + |
| 227 | + ```py |
| 228 | + from __future__ import annotations |
| 229 | + import time |
| 230 | + import random |
| 231 | + |
| 232 | + import pendulum |
| 233 | + from airflow.decorators import dag, task |
| 234 | + from airflow.operators.empty import EmptyOperator |
| 235 | + from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator |
| 236 | + from airflow.datasets import Dataset |
| 237 | + |
| 238 | + SQL_1="INSERT INTO airflowsample (col1) VALUES ('row')" |
| 239 | + |
| 240 | + SQL_2="DROP TABLE airflowsample" |
| 241 | + |
| 242 | + @dag( |
| 243 | + schedule=[Dataset("sample_pg_table")], |
| 244 | + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), |
| 245 | + catchup=False, |
| 246 | + tags=["example"], |
| 247 | + ) |
| 248 | + |
| 249 | + def example_insert_brkn(): |
| 250 | + |
| 251 | + sample_task_1 = EmptyOperator( |
| 252 | + task_id="sample_insert_task_1", |
| 253 | + task_display_name="Sample Task 1", |
| 254 | + ) |
| 255 | + |
| 256 | + sample_task_2 = SQLExecuteQueryOperator( |
| 257 | + task_id="sample_insert_task_2", |
| 258 | + sql=SQL_1, |
| 259 | + conn_id="postgres_default", |
| 260 | + ) |
| 261 | + |
| 262 | + sample_task_3 = SQLExecuteQueryOperator( |
| 263 | + task_id="sample_insert_task_3", |
| 264 | + sql=SQL_2, |
| 265 | + conn_id="postgres_default", |
| 266 | + ) |
| 267 | + |
| 268 | + sample_task_1 >> sample_task_2 >> sample_task_3 |
| 269 | + |
| 270 | + example_insert_brkn() |
| 271 | + |
| 272 | + ``` |
| 273 | + |
| 274 | + This DAG is scheduled on the dataset passed to the `sample_task_3` in the first DAG, so it will run automatically when that DAG completes a run. |
| 275 | + |
| 276 | +8. Run your DAGs by triggering the `Flaky DAG`. To verify that the OpenLineage Provider is configured correctly, check the task logs for an `INFO`-level log reporting the transport type you defined. In this case, the log will say: `OpenLineageClient will use http transport`. |
| 277 | + |
| 278 | +## View Airflow operational analytics and data lineage in Marquez {#view-airflow} |
| 279 | + |
| 280 | +The DataOps view offers a high-level view of historical and in-process operations, including task-level run status and runtime information: |
| 281 | + |
| 282 | + |
| 283 | + |
| 284 | +### Datasets lineage graph |
| 285 | + |
| 286 | +In the Datasets view, click on the dataset to get a cross-platform-capable lineage graph. In this case, you will be able to see the upstream tasks across the two DAGs in your environment that feed the `airflowsample` table in Airflow: |
| 287 | + |
| 288 | + |
| 289 | + |
| 290 | +:::info |
| 291 | + |
| 292 | +Dependencies in other platforms that modify or consume the same dataset will also appear in this graph. |
| 293 | + |
| 294 | +::: |
| 295 | + |
| 296 | +### Leveraging the Marquez graph |
| 297 | + |
| 298 | +When data produced by multiple tools in your data ecosystem arrives late or becomes stale, root-cause analysis is much easier when you know: |
| 299 | +- what jobs and datasets are upstream. |
| 300 | +- what the run status of each upstream job is. |
| 301 | +- how each upstream job has performed recently. |
| 302 | +- whether quality issues have affected upstream datasets. |
| 303 | + |
| 304 | +In the Marquez lineage graph, you can click on an upstream job node to see information including: |
| 305 | +- the latest run status. |
| 306 | +- the last runtime. |
| 307 | +- the time last started. |
| 308 | +- the time last finished. |
| 309 | + |
| 310 | + |
| 311 | + |
| 312 | +You can also access a versioned table schema history from the Marquez graph, so you can see at a glance if data quality in a table has become compromised and when a loss occurred: |
| 313 | + |
| 314 | + |
| 315 | + |
| 316 | +## Next steps {#next-steps} |
| 317 | + |
| 318 | +Continue your journey with Marquez by consulting the following resources: |
| 319 | +- [Backfilling Airflow DAGs Using Marquez](https://openlineage.io/docs/guides/airflow-backfill-dags) |
| 320 | +- [Using Marquez with dbt](https://openlineage.io/docs/guides/dbt) |
| 321 | +- [Using OpenLineage with Spark](https://openlineage.io/docs/guides/spark) |
| 322 | + |
| 323 | +## Feedback? {#feedback} |
| 324 | + |
| 325 | +What did you think of this guide? You can reach out to us on [slack](https://join.slack.com/t/marquezproject/shared_invite/zt-2iylxasbq-GG_zXNcJdNrhC9uUMr3B7A) and leave us feedback, or [open a pull request](https://github.com/MarquezProject/marquez/blob/main/CONTRIBUTING.md#submitting-a-pull-request) with your suggestions! |
0 commit comments