Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions knowledge_base/workflow_with_ai_parse_document/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Databricks
.databricks/

# Python
build/
dist/
__pycache__/
*.egg-info
.venv/
*.py[cod]

# Local configuration (keep your settings private)
databricks.local.yml

# IDE
.idea/
.vscode/
.DS_Store

# Scratch/temporary files
scratch/**
!scratch/README.md

# Test documents (don't commit large PDFs)
*.pdf
*.png
*.jpg
*.jpeg
152 changes: 152 additions & 0 deletions knowledge_base/workflow_with_ai_parse_document/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# AI Document Processing Workflow with Structured Streaming

A Databricks Asset Bundle demonstrating **incremental document processing** using `ai_parse_document`, `ai_query`, and Databricks Workflows with Structured Streaming.

## Overview

This example shows how to build an incremental workflow that:
1. **Parses** PDFs and images using [`ai_parse_document`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
2. **Extracts** clean text with incremental processing
3. **Analyzes** content using [`ai_query`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query) with LLMs

All stages run as Python notebook tasks in a Databricks Workflow using Structured Streaming with serverless compute.

## Architecture

```
Source Documents (UC Volume)
Task 1: ai_parse_document → parsed_documents_raw (variant)
Task 2: text extraction → parsed_documents_text (string)
Task 3: ai_query → parsed_documents_structured (json)
```

### Key Features

- **Incremental processing**: Only new files are processed using Structured Streaming checkpoints
- **Serverless compute**: Runs on serverless compute for cost efficiency
- **Task dependencies**: Sequential execution with automatic dependency management
- **Parameterized**: Catalog, schema, volumes, and table names configurable via variables
- **Error handling**: Gracefully handles parsing failures
- **Visual debugging**: Interactive notebook for inspecting results

## Prerequisites

- Databricks workspace with Unity Catalog
- Databricks CLI v0.218.0+
- Unity Catalog volumes for:
- Source documents (PDFs/images)
- Parsed output images
- Streaming checkpoints
- AI functions (`ai_parse_document`, `ai_query`)

## Quick Start

1. **Install and authenticate**
```bash
databricks auth login --host https://your-workspace.cloud.databricks.com
```

2. **Configure** `databricks.yml` with your workspace settings

3. **Validate** the bundle configuration
```bash
databricks bundle validate
```

4. **Deploy**
```bash
databricks bundle deploy
```

5. **Upload documents** to your source volume

6. **Run workflow** from the Databricks UI (Workflows)

## Configuration

Edit `databricks.yml`:

```yaml
variables:
catalog: main # Your catalog
schema: default # Your schema
source_volume_path: /Volumes/main/default/source_documents # Source PDFs
output_volume_path: /Volumes/main/default/parsed_output # Parsed images
checkpoint_base_path: /tmp/checkpoints/ai_parse_workflow # Checkpoints
raw_table_name: parsed_documents_raw # Table names
text_table_name: parsed_documents_text
structured_table_name: parsed_documents_structured
```

## Workflow Tasks

### Task 1: Document Parsing
**File**: `src/transformations/01_parse_documents.py`

Uses `ai_parse_document` to extract text, tables, and metadata from PDFs/images:
- Reads files from volume using Structured Streaming
- Stores variant output with bounding boxes
- Incremental: checkpointed streaming prevents reprocessing

### Task 2: Text Extraction
**File**: `src/transformations/02_extract_text.py`

Extracts clean concatenated text using `transform()`:
- Reads from previous task's table via streaming
- Handles both parser v1.0 and v2.0 formats
- Uses `transform()` for efficient text extraction
- Includes error handling for failed parses

### Task 3: AI Query Extraction
**File**: `src/transformations/03_extract_structured_data.py`

Applies LLM to extract structured insights:
- Reads from text table via streaming
- Uses `ai_query` with Claude Sonnet 4
- Customizable prompt for domain-specific extraction
- Outputs structured JSON

## Visual Debugger

The included notebook visualizes parsing results with interactive bounding boxes.

**Open**: `src/explorations/ai_parse_document -- debug output.py`

**Configure widgets**:
- `input_file`: `/Volumes/main/default/source_docs/sample.pdf`
- `image_output_path`: `/Volumes/main/default/parsed_out/`
- `page_selection`: `all` (or `1-3`, `1,5,10`)

**Features**:
- Color-coded bounding boxes by element type
- Hover tooltips showing extracted content
- Automatic image scaling
- Page selection support

## Project Structure

```
.
├── databricks.yml # Bundle configuration
├── resources/
│ └── ai_parse_document_workflow.job.yml
├── src/
│ ├── transformations/
│ │ ├── 01_parse_documents.py
│ │ ├── 02_extract_text.py
│ │ └── 03_extract_structured_data.py
│ └── explorations/
│ └── ai_parse_document -- debug output.py
└── README.md
```

## Resources

- [Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/)
- [Databricks Workflows](https://docs.databricks.com/workflows/)
- [Structured Streaming](https://docs.databricks.com/structured-streaming/)
- [`ai_parse_document` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
- [`ai_query` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query)
52 changes: 52 additions & 0 deletions knowledge_base/workflow_with_ai_parse_document/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# This is a Databricks asset bundle definition for ai_parse_document_workflow.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: ai_parse_document_workflow

variables:
catalog:
description: The catalog name for the workflow
default: main
schema:
description: The schema name for the workflow
default: default
source_volume_path:
description: Source volume path for PDF files
default: /Volumes/main/default/source_documents
output_volume_path:
description: Output volume path for processed images
default: /Volumes/main/default/parsed_output
checkpoint_base_path:
description: Base path for Structured Streaming checkpoints
default: /tmp/checkpoints/ai_parse_workflow
raw_table_name:
description: Table name for raw parsed documents
default: parsed_documents_raw
text_table_name:
description: Table name for extracted text
default: parsed_documents_text
structured_table_name:
description: Table name for structured data
default: parsed_documents_structured

include:
- resources/*.yml

targets:
dev:
# The default target uses 'mode: development' to create a development copy.
# - Deployed resources get prefixed with '[dev my_user_name]'
# - Any job schedules and triggers are paused by default.
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
mode: development
default: true
workspace:
host: https://your-workspace.cloud.databricks.com

prod:
mode: production
workspace:
host: https://your-workspace.cloud.databricks.com
permissions:
- group_name: users
level: CAN_VIEW
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
resources:
jobs:
ai_parse_document_workflow:
name: ai_parse_document_workflow

environments:
- environment_key: serverless_env
spec:
client: "3"

tasks:
- task_key: parse_documents
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/01_parse_documents.py
base_parameters:
catalog: ${var.catalog}
schema: ${var.schema}
source_volume_path: ${var.source_volume_path}
output_volume_path: ${var.output_volume_path}
checkpoint_location: ${var.checkpoint_base_path}/01_parse_documents
table_name: ${var.raw_table_name}

- task_key: extract_text
depends_on:
- task_key: parse_documents
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/02_extract_text.py
base_parameters:
catalog: ${var.catalog}
schema: ${var.schema}
checkpoint_location: ${var.checkpoint_base_path}/02_extract_text
source_table_name: ${var.raw_table_name}
table_name: ${var.text_table_name}

- task_key: extract_structured_data
depends_on:
- task_key: extract_text
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/03_extract_structured_data.py
base_parameters:
catalog: ${var.catalog}
schema: ${var.schema}
checkpoint_location: ${var.checkpoint_base_path}/03_extract_structured_data
source_table_name: ${var.text_table_name}
table_name: ${var.structured_table_name}

max_concurrent_runs: 1

# Optional: Add a schedule
# schedule:
# quartz_cron_expression: "0 0 * * * ?"
# timezone_id: "UTC"
Loading