Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 358 additions & 0 deletions notebooks/data-pipelines-with-dagster-and-adbc.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,358 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "514b19ee",
"metadata": {},
"source": [
"[Dagster](https://dagster.io/) is a modern data orchestrator for building, scheduling, and monitoring data pipelines as software-defined assets, with a rich ecosystem of integrations for databases, cloud services, and data tools. The [dagster-adbc](https://pypi.org/project/dagster-adbc/) integration provides an [ADBC (Arrow Database Connectivity)](https://arrow.apache.org/adbc/) resource, enabling connectivity with any database or query engine with dedicated or compatible ADBC driver. This notebook uses PostgreSQL, but ADBC supports a wide range of SQL systems including but not limited to Snowflake, Databricks, BigQuery, DuckDB, Trino, Dremio, StarRocks, InfluxDB, PostgreSQL, and MySQL.\n",
"\n",
"In this notebook, you will:\n",
"1. Create an ADBC resource to connect to PostgreSQL\n",
"2. Define assets that ingest CSV data directly into database tables using Arrow\n",
"3. Create a downstream aggregation asset and asset check\n",
"4. Materialize the assets\n",
"\n",
"Requirements:\n",
"1. Python 3\n",
"2. PostgreSQL or Docker"
]
},
{
"cell_type": "markdown",
"id": "eef17cce",
"metadata": {},
"source": [
"## Setup"
]
},
{
"cell_type": "markdown",
"id": "10d91c7d",
"metadata": {},
"source": [
"Install `dagster`, `dagster-adbc`, and `dbc`:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "7d4becfb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Note: you may need to restart the kernel to use updated packages.\n"
]
}
],
"source": [
"%pip install -q dagster dagster-adbc dbc"
]
},
{
"cell_type": "markdown",
"id": "890c3ca2",
"metadata": {},
"source": [
"Install the PostgreSQL ADBC driver:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ae3467bd",
"metadata": {},
"outputs": [],
"source": [
"!dbc install -q postgresql"
]
},
{
"cell_type": "markdown",
"id": "fb3b7a9d",
"metadata": {},
"source": [
"If you don't already have a PostgreSQL instance running, start an instance with Docker:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "e3163f61",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"cd2d927d00d4467e72b1fa9715dcd6fad3a2651a858b94c4f4ed42d2b660c397\n"
]
}
],
"source": [
"!docker run -d --rm --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -p 5432:5432 postgres"
]
},
{
"cell_type": "markdown",
"id": "baf2c2f2",
"metadata": {},
"source": [
"Import `urllib`, `dagster`, `pyarrow`, and `dagster_adbc`:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "b1b2e47c",
"metadata": {},
"outputs": [],
"source": [
"from urllib.request import urlopen\n",
"\n",
"import dagster as dg\n",
"import pyarrow.csv as csv\n",
"from dagster_adbc import ADBCResource"
]
},
{
"cell_type": "markdown",
"id": "5471f894",
"metadata": {},
"source": [
"## Dagster Definitions"
]
},
{
"cell_type": "markdown",
"id": "def7aea5",
"metadata": {},
"source": [
"Define an ADBC resource with your database connection info:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "b206aaf4",
"metadata": {},
"outputs": [],
"source": [
"postgres = ADBCResource(\n",
" driver=\"postgresql\",\n",
" uri=\"postgresql://postgres:mysecretpassword@localhost:5432/postgres\",\n",
" autocommit=True,\n",
")"
]
},
{
"cell_type": "markdown",
"id": "f0f62c34",
"metadata": {},
"source": [
"Define three assets that each read a CSV as a PyArrow Table and load the data into Postgres:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "1e78db03",
"metadata": {},
"outputs": [],
"source": [
"def ingest_csv(adbc_resource: ADBCResource, table_name: str, csv_url: str) -> None:\n",
" with urlopen(csv_url) as f:\n",
" table = csv.read_csv(f)\n",
" with adbc_resource.get_connection() as connection, connection.cursor() as cursor:\n",
" cursor.adbc_ingest(table_name=table_name, data=table, mode=\"replace\")\n",
"\n",
"\n",
"@dg.asset\n",
"def customers(postgres: ADBCResource) -> None:\n",
" url = \"https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv\"\n",
" table_name = \"customers\"\n",
" ingest_csv(postgres, table_name, url)\n",
"\n",
"\n",
"@dg.asset\n",
"def orders(postgres: ADBCResource) -> None:\n",
" url = \"https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv\"\n",
" table_name = \"orders\"\n",
" ingest_csv(postgres, table_name, url)\n",
"\n",
"\n",
"@dg.asset\n",
"def payments(postgres: ADBCResource) -> None:\n",
" url = \"https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv\"\n",
" table_name = \"payments\"\n",
" ingest_csv(postgres, table_name, url)"
]
},
{
"cell_type": "markdown",
"id": "0a97ea7b",
"metadata": {},
"source": [
"Define a downstream asset that relies on the data from the previous assets:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "a0f6edc2",
"metadata": {},
"outputs": [],
"source": [
"@dg.asset(deps=[\"customers\", \"orders\", \"payments\"])\n",
"def orders_aggregation(postgres: ADBCResource) -> None:\n",
" with postgres.get_connection() as connection, connection.cursor() as cursor:\n",
" cursor.execute(\"DROP TABLE IF EXISTS orders_aggregation;\")\n",
" cursor.execute(\"\"\"\n",
" CREATE TABLE orders_aggregation AS (\n",
" SELECT\n",
" c.id AS customer_id,\n",
" c.first_name,\n",
" c.last_name,\n",
" count(DISTINCT o.id) AS total_orders,\n",
" count(DISTINCT p.id) AS total_payments,\n",
" coalesce(sum(p.amount), 0) AS total_amount_spent\n",
" FROM customers AS c\n",
" LEFT JOIN orders AS o\n",
" ON c.id = o.user_id\n",
" LEFT JOIN payments AS p\n",
" ON o.id = p.order_id\n",
" GROUP BY 1, 2, 3\n",
" );\n",
" \"\"\")"
]
},
{
"cell_type": "markdown",
"id": "f749183d",
"metadata": {},
"source": [
"Define an asset check to verify there are rows in the created table:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "b37c8fa1",
"metadata": {},
"outputs": [],
"source": [
"@dg.asset_check(asset=\"orders_aggregation\")\n",
"def orders_aggregation_check(postgres: ADBCResource) -> dg.AssetCheckResult:\n",
" with postgres.get_connection() as connection, connection.cursor() as cursor:\n",
" cursor.execute(\"SELECT count(*) FROM orders_aggregation\")\n",
" row_count = cursor.fetchone()[0]\n",
"\n",
" if row_count == 0:\n",
" return dg.AssetCheckResult(\n",
" passed=False, metadata={\"message\": \"Order aggregation check failed\"}\n",
" )\n",
"\n",
" return dg.AssetCheckResult(passed=True, metadata={\"message\": \"Order aggregation check passed\"})"
]
},
{
"cell_type": "markdown",
"id": "a4adb695",
"metadata": {},
"source": [
"Create a definitions object with the assets, asset check, and resource:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "c7675605",
"metadata": {},
"outputs": [],
"source": [
"defs = dg.Definitions(\n",
" assets=[customers, orders, payments, orders_aggregation],\n",
" asset_checks=[orders_aggregation_check],\n",
" resources={\"postgres\": postgres},\n",
")"
]
},
{
"cell_type": "markdown",
"id": "2eaab7ad",
"metadata": {},
"source": [
"Materialize the assets:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9708acf0",
"metadata": {},
"outputs": [],
"source": [
"job = defs.resolve_job_def(\"__ASSET_JOB\")\n",
"job.execute_in_process()"
]
},
{
"cell_type": "markdown",
"id": "d35351c0",
"metadata": {},
"source": [
"## Cleanup"
]
},
{
"cell_type": "markdown",
"id": "aa9dc7a7",
"metadata": {},
"source": [
"If you ran PostgreSQL with Docker earlier, stop and remove the container:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "123564b1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"some-postgres\n"
]
}
],
"source": [
"!docker stop some-postgres"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.14.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
8 changes: 8 additions & 0 deletions registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,13 @@
"authors": ["ian-cook"],
"description": "Define reusable, named connection configurations in TOML files and use them to connect to databases with ADBC, just like ODBC DSNs.",
"categories": ["Database Connections"]
},
{
"title": "Data Pipelines with Dagster and ADBC",
"path": "notebooks/data-pipelines-with-dagster-and-adbc.ipynb",
"date": "2026-04-22",
"authors": ["emil-sadek"],
"description": "Learn how to build data pipelines using Dagster's ADBC integration for Arrow-native database connectivity.",
"categories": ["Database Connections"]
}
]