From 53303ee99d6ba10fd1cee447c2da17965b27b628 Mon Sep 17 00:00:00 2001 From: Emil Sadek Date: Wed, 22 Apr 2026 18:44:41 -0700 Subject: [PATCH] feat: add dagster-adbc notebook --- ...data-pipelines-with-dagster-and-adbc.ipynb | 358 ++++++++++++++++++ registry.json | 8 + 2 files changed, 366 insertions(+) create mode 100644 notebooks/data-pipelines-with-dagster-and-adbc.ipynb diff --git a/notebooks/data-pipelines-with-dagster-and-adbc.ipynb b/notebooks/data-pipelines-with-dagster-and-adbc.ipynb new file mode 100644 index 0000000..fa60717 --- /dev/null +++ b/notebooks/data-pipelines-with-dagster-and-adbc.ipynb @@ -0,0 +1,358 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "514b19ee", + "metadata": {}, + "source": [ + "[Dagster](https://dagster.io/) is a modern data orchestrator for building, scheduling, and monitoring data pipelines as software-defined assets, with a rich ecosystem of integrations for databases, cloud services, and data tools. The [dagster-adbc](https://pypi.org/project/dagster-adbc/) integration provides an [ADBC (Arrow Database Connectivity)](https://arrow.apache.org/adbc/) resource, enabling connectivity with any database or query engine with dedicated or compatible ADBC driver. This notebook uses PostgreSQL, but ADBC supports a wide range of SQL systems including but not limited to Snowflake, Databricks, BigQuery, DuckDB, Trino, Dremio, StarRocks, InfluxDB, PostgreSQL, and MySQL.\n", + "\n", + "In this notebook, you will:\n", + "1. Create an ADBC resource to connect to PostgreSQL\n", + "2. Define assets that ingest CSV data directly into database tables using Arrow\n", + "3. Create a downstream aggregation asset and asset check\n", + "4. Materialize the assets\n", + "\n", + "Requirements:\n", + "1. Python 3\n", + "2. PostgreSQL or Docker" + ] + }, + { + "cell_type": "markdown", + "id": "eef17cce", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "markdown", + "id": "10d91c7d", + "metadata": {}, + "source": [ + "Install `dagster`, `dagster-adbc`, and `dbc`:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "7d4becfb", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Note: you may need to restart the kernel to use updated packages.\n" + ] + } + ], + "source": [ + "%pip install -q dagster dagster-adbc dbc" + ] + }, + { + "cell_type": "markdown", + "id": "890c3ca2", + "metadata": {}, + "source": [ + "Install the PostgreSQL ADBC driver:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ae3467bd", + "metadata": {}, + "outputs": [], + "source": [ + "!dbc install -q postgresql" + ] + }, + { + "cell_type": "markdown", + "id": "fb3b7a9d", + "metadata": {}, + "source": [ + "If you don't already have a PostgreSQL instance running, start an instance with Docker:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "e3163f61", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "cd2d927d00d4467e72b1fa9715dcd6fad3a2651a858b94c4f4ed42d2b660c397\n" + ] + } + ], + "source": [ + "!docker run -d --rm --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -p 5432:5432 postgres" + ] + }, + { + "cell_type": "markdown", + "id": "baf2c2f2", + "metadata": {}, + "source": [ + "Import `urllib`, `dagster`, `pyarrow`, and `dagster_adbc`:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "b1b2e47c", + "metadata": {}, + "outputs": [], + "source": [ + "from urllib.request import urlopen\n", + "\n", + "import dagster as dg\n", + "import pyarrow.csv as csv\n", + "from dagster_adbc import ADBCResource" + ] + }, + { + "cell_type": "markdown", + "id": "5471f894", + "metadata": {}, + "source": [ + "## Dagster Definitions" + ] + }, + { + "cell_type": "markdown", + "id": "def7aea5", + "metadata": {}, + "source": [ + "Define an ADBC resource with your database connection info:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "b206aaf4", + "metadata": {}, + "outputs": [], + "source": [ + "postgres = ADBCResource(\n", + " driver=\"postgresql\",\n", + " uri=\"postgresql://postgres:mysecretpassword@localhost:5432/postgres\",\n", + " autocommit=True,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "f0f62c34", + "metadata": {}, + "source": [ + "Define three assets that each read a CSV as a PyArrow Table and load the data into Postgres:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "1e78db03", + "metadata": {}, + "outputs": [], + "source": [ + "def ingest_csv(adbc_resource: ADBCResource, table_name: str, csv_url: str) -> None:\n", + " with urlopen(csv_url) as f:\n", + " table = csv.read_csv(f)\n", + " with adbc_resource.get_connection() as connection, connection.cursor() as cursor:\n", + " cursor.adbc_ingest(table_name=table_name, data=table, mode=\"replace\")\n", + "\n", + "\n", + "@dg.asset\n", + "def customers(postgres: ADBCResource) -> None:\n", + " url = \"https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv\"\n", + " table_name = \"customers\"\n", + " ingest_csv(postgres, table_name, url)\n", + "\n", + "\n", + "@dg.asset\n", + "def orders(postgres: ADBCResource) -> None:\n", + " url = \"https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv\"\n", + " table_name = \"orders\"\n", + " ingest_csv(postgres, table_name, url)\n", + "\n", + "\n", + "@dg.asset\n", + "def payments(postgres: ADBCResource) -> None:\n", + " url = \"https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv\"\n", + " table_name = \"payments\"\n", + " ingest_csv(postgres, table_name, url)" + ] + }, + { + "cell_type": "markdown", + "id": "0a97ea7b", + "metadata": {}, + "source": [ + "Define a downstream asset that relies on the data from the previous assets:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "a0f6edc2", + "metadata": {}, + "outputs": [], + "source": [ + "@dg.asset(deps=[\"customers\", \"orders\", \"payments\"])\n", + "def orders_aggregation(postgres: ADBCResource) -> None:\n", + " with postgres.get_connection() as connection, connection.cursor() as cursor:\n", + " cursor.execute(\"DROP TABLE IF EXISTS orders_aggregation;\")\n", + " cursor.execute(\"\"\"\n", + " CREATE TABLE orders_aggregation AS (\n", + " SELECT\n", + " c.id AS customer_id,\n", + " c.first_name,\n", + " c.last_name,\n", + " count(DISTINCT o.id) AS total_orders,\n", + " count(DISTINCT p.id) AS total_payments,\n", + " coalesce(sum(p.amount), 0) AS total_amount_spent\n", + " FROM customers AS c\n", + " LEFT JOIN orders AS o\n", + " ON c.id = o.user_id\n", + " LEFT JOIN payments AS p\n", + " ON o.id = p.order_id\n", + " GROUP BY 1, 2, 3\n", + " );\n", + " \"\"\")" + ] + }, + { + "cell_type": "markdown", + "id": "f749183d", + "metadata": {}, + "source": [ + "Define an asset check to verify there are rows in the created table:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "b37c8fa1", + "metadata": {}, + "outputs": [], + "source": [ + "@dg.asset_check(asset=\"orders_aggregation\")\n", + "def orders_aggregation_check(postgres: ADBCResource) -> dg.AssetCheckResult:\n", + " with postgres.get_connection() as connection, connection.cursor() as cursor:\n", + " cursor.execute(\"SELECT count(*) FROM orders_aggregation\")\n", + " row_count = cursor.fetchone()[0]\n", + "\n", + " if row_count == 0:\n", + " return dg.AssetCheckResult(\n", + " passed=False, metadata={\"message\": \"Order aggregation check failed\"}\n", + " )\n", + "\n", + " return dg.AssetCheckResult(passed=True, metadata={\"message\": \"Order aggregation check passed\"})" + ] + }, + { + "cell_type": "markdown", + "id": "a4adb695", + "metadata": {}, + "source": [ + "Create a definitions object with the assets, asset check, and resource:" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "c7675605", + "metadata": {}, + "outputs": [], + "source": [ + "defs = dg.Definitions(\n", + " assets=[customers, orders, payments, orders_aggregation],\n", + " asset_checks=[orders_aggregation_check],\n", + " resources={\"postgres\": postgres},\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "2eaab7ad", + "metadata": {}, + "source": [ + "Materialize the assets:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9708acf0", + "metadata": {}, + "outputs": [], + "source": [ + "job = defs.resolve_job_def(\"__ASSET_JOB\")\n", + "job.execute_in_process()" + ] + }, + { + "cell_type": "markdown", + "id": "d35351c0", + "metadata": {}, + "source": [ + "## Cleanup" + ] + }, + { + "cell_type": "markdown", + "id": "aa9dc7a7", + "metadata": {}, + "source": [ + "If you ran PostgreSQL with Docker earlier, stop and remove the container:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "123564b1", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "some-postgres\n" + ] + } + ], + "source": [ + "!docker stop some-postgres" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.14.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/registry.json b/registry.json index 3c9ad69..bfbe666 100644 --- a/registry.json +++ b/registry.json @@ -134,5 +134,13 @@ "authors": ["ian-cook"], "description": "Define reusable, named connection configurations in TOML files and use them to connect to databases with ADBC, just like ODBC DSNs.", "categories": ["Database Connections"] + }, + { + "title": "Data Pipelines with Dagster and ADBC", + "path": "notebooks/data-pipelines-with-dagster-and-adbc.ipynb", + "date": "2026-04-22", + "authors": ["emil-sadek"], + "description": "Learn how to build data pipelines using Dagster's ADBC integration for Arrow-native database connectivity.", + "categories": ["Database Connections"] } ]