datapipe-quickstart is a sample Dagster project demonstrating a data pipeline for ingesting, processing, and analyzing sales data. It utilizes DuckDB as its local database.
This project showcases how to build a data pipeline with Dagster, including:
- Defining assets for data ingestion, transformation, and analysis.
- Using partitions for incremental data processing.
- Setting up schedules for regular pipeline runs.
- Implementing sensors to trigger jobs based on external events (e.g., new request files).
- Performing data quality checks.
- Data Ingestion: Loads data from CSV files for products, sales representatives, and sales transactions.
- Data Transformation: Joins the ingested data to create a unified view.
- Data Analysis:
- Calculates monthly sales performance.
- Calculates product performance by category.
- Ad-hoc Reporting: Allows users to submit ad-hoc data requests via JSON files.
- Scheduled Updates: A weekly schedule keeps the core data up-to-date.
- Data Quality: Includes an asset check to identify missing dimensions in the joined data.
- Local Data Warehouse: Uses DuckDB for storing and querying data.
datapipe-quickstart/
├── data/ # Input CSV files, DuckDB database, and ad-hoc request files
│ ├── products.csv
│ ├── sales_data.csv
│ ├── sales_reps.csv
│ ├── mydb.duckdb # DuckDB database file (created on first run)
│ └── requests/ # Directory for ad-hoc request JSON files
│ └── sample_request/
│ └── request.json # Example ad-hoc request
├── datapipe_quickstart/ # Python package for the Dagster pipeline
│ ├── __init__.py
│ ├── assets.py # Defines data assets and transformations
│ ├── definitions.py # Main Dagster definitions (assets, schedules, sensors, resources)
│ ├── partitions.py # Defines partitions used by assets
│ ├── schedules.py # Defines pipeline schedules
│ └── sensors.py # Defines sensors for triggering jobs
├── .gitignore
├── LICENSE # Apache License 2.0
├── py.typed # Marker for PEP 561 type information
├── pyproject.toml # Build system and Dagster project configuration
├── requirements.txt # Python dependencies
├── setup.cfg # Package metadata
└── setup.py # Setup script for the Python package
- Python 3.12
- pip
- Clone the repository or ensure you are in the project's root directory.
- Create and activate a virtual environment (recommended):
python -m venv venv source venv/bin/activate # On Windows use `venv\Scripts\activate`
- Install dependencies:
The
pip install -r requirements.txt pip install -e . # Installs the project in editable mode
requirements.txtincludes:dagster: The core Dagster library.dagster-duckdb: Integration for DuckDB.dagster-webserver: For running Dagit, the Dagster UI. Thesetup.pyalso listsdagster-cloudandpytest(for development).
This method uses Docker Compose to set up and run the Dagster instance along with its dependencies (like PostgreSQL) in containers.
- Ensure Docker and Docker Compose are installed. Follow the official installation guides for Docker and Docker Compose.
- Clone the repository or ensure you are in the project's root directory.
- Copy the Docker Compose template file:
In the
datapipe-quickstartdirectory, copy the template to create your local Docker Compose configuration:You can reviewcp docker-compose.yml.tmpl docker-compose.yml
docker-compose.ymland make any modifications if needed. - Build and run the services:
This command will build the Docker images (if they don't exist) and start all the services defined in the
docker compose up --build -d
docker-compose.ymlfile in detached mode (-d). - Access Dagit UI:
Once the services are up and running, the Dagit UI will be accessible at
http://localhost:3000.
-
Start Dagit (Dagster UI): From the
datapipe-quickstartdirectory, run:dagster dev
This will start the Dagit webserver, typically available at
http://localhost:3000. -
Materialize Assets: In Dagit, you can:
- View the asset graph.
- Manually materialize assets.
- Observe scheduled runs and sensor activity.
The first time you materialize assets that interact with DuckDB (e.g.,
products,sales_data), thedata/mydb.duckdbfile will be created.
The pipeline ingests data from the following CSV files located in the data/ directory:
products.csv: Contains product information (product_id, product_name, category, price).sales_reps.csv: Contains sales representative information (rep_id, rep_name, department, hire_date).sales_data.csv: Contains sales transaction records (order_id, date, product_id, rep_id, customer_name, quantity, dollar_amount).
The project uses DuckDB. The database file is stored at data/mydb.duckdb. Tables are created and populated by the assets.
Defined in datapipe_quickstart/assets.py:
products: Loads product data fromproducts.csvinto DuckDB.sales_reps: Loads sales representative data fromsales_reps.csvinto DuckDB.sales_data: Loads sales transaction data fromsales_data.csvinto DuckDB.joined_data: Creates a view in DuckDB by joiningsales_data,sales_reps, andproducts.missing_dimension_check: An asset check onjoined_datato ensurerep_nameandproduct_nameare not null.monthly_sales_performance: Calculates aggregated sales performance by month and representative. Partitioned by month.product_performance: Calculates aggregated sales performance by product category. Partitioned by product category.adhoc_request: Processes ad-hoc data requests based on configurations provided via JSON files.
Defined in datapipe_quickstart/schedules.py:
weekly_update_schedule(analysis_update_job): Runs every Monday at midnight to materialize assets upstream ofjoined_data, effectively refreshing the core data tables.
Defined in datapipe_quickstart/sensors.py:
adhoc_request_sensor: Monitors thedata/requests/directory for new or modified.jsonfiles. When a change is detected, it triggers theadhoc_request_jobwith the configuration from the JSON file.
Defined in datapipe_quickstart/partitions.py:
monthly_partition: A monthly partition definition starting from "2024-01-01", used by themonthly_sales_performanceasset.product_category_partition: A static partition definition for product categories (Electronics,Books,Home and Garden,Clothing), used by theproduct_performanceasset.
To make an ad-hoc data request:
- Create a JSON file in the
datapipe-quickstart/data/requests/directory. - The JSON file should contain the configuration for the
adhoc_requestasset. Seedatapipe_quickstart/assets.pyfor theAdhocRequestConfigstructure:An example is provided in{ "department": "Electronics", "product": "Laptop", "start_date": "2024-03-01", "end_date": "2024-04-01" }data/sample_request/request.json. - The
adhoc_request_sensorwill detect the new/modified file and trigger a run of theadhoc_requestasset with the provided configuration. The results (a preview of the queried data) will be available in the Dagit UI for that run.
This project is licensed under the Apache License 2.0. See the LICENSE file for details.