Skip to content

Fujio-Turner/PouchPipes

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

115 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

PouchPipes πŸ₯”πŸ“¦ v2.2.2

PouchPipes Mascot

Portable Over Unreliable Changes Handler that pipes CouchDB / Sync Gateway / Capella _changes feeds into clean, reliable downstream pipelines.

A production-ready, async Python 3 processor for the Couchbase _changes feed. It connects to Sync Gateway, Capella App Services, Couchbase Edge Server, or Apache CouchDB, consumes document changes via longpoll or continuous streaming, and forwards them to a downstream consumer β€” stdout, HTTP endpoint, RDBMS (PostgreSQL, MySQL, MS SQL, Oracle), or cloud blob storage (AWS S3, MinIO, S3-compatible).

Built for real-world workloads: multi-job pipelines, checkpoint management so you never re-process, schema mapping with 58 transform functions, throttled feed consumption for large datasets, configurable retry with exponential backoff, and full async concurrency control.

Changes Worker Architecture

When attachment processing is enabled, the pipeline includes an attachment stage:

Attachments Architecture


What's New in v2.0

v2.0 is a major architecture redesign that replaces the monolithic config.json with a job-centric, composable document model stored in Couchbase Lite collections.

  • Multi-job pipelines β€” Run multiple independent _changes feed pipelines concurrently, each with its own source, output, schema mapping, and checkpoint.
  • Reusable inputs & outputs β€” Define a source once, wire it to multiple outputs via jobs. No more duplicating config.
  • Job lifecycle control β€” Start, stop, restart, and monitor individual jobs via REST API or the dashboard.
  • PipelineManager β€” Thread-per-job orchestrator with crash detection, exponential-backoff restart, and graceful shutdown.
  • v1.x auto-migration β€” Existing config.json is automatically migrated to the new model on first startup.

πŸ“„ Full architecture details: docs/DESIGN_2_0.md
πŸ“„ Release notes: RELEASE_NOTES.md


How It Works

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Sync Gateway /      β”‚         β”‚                  β”‚         β”‚  HTTP Endpoint      β”‚
β”‚  App Services /      β”‚ ──GET── β”‚  changes_worker  β”‚ ──PUT── β”‚  (any REST API)     β”‚
β”‚  Edge Server /       β”‚ _changesβ”‚                  β”‚  POST   β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  CouchDB             β”‚ ◄─JSON─ β”‚  β€’ Schema Mappingβ”‚  DELETE β”‚  RDBMS              β”‚
β”‚                      β”‚         β”‚  β€’ Serialize     β”‚ ──────► β”‚  (Postgres/MySQL/   β”‚
β”‚  /{db}/_changes      β”‚         β”‚  β€’ Checkpoint    β”‚         β”‚   MSSQL/Oracle)     β”‚
β”‚                      β”‚         β”‚  β€’ Dead Letter Q β”‚         β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                      β”‚         β”‚  β€’ Attachments   β”‚         β”‚  Cloud Storage      β”‚
β”‚                      β”‚         β”‚                  β”‚         β”‚  (AWS S3/MinIO)     β”‚
β”‚                      β”‚         β”‚                  β”‚         β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                      β”‚         β”‚                  β”‚         β”‚  stdout             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  1. Consume β€” Longpoll, continuous, or WebSocket _changes feed with auto-reconnect
  2. Filter β€” Skip deletes, removes, or limit to specific channels
  3. Fetch β€” Bulk or individual doc fetching when include_docs=false
  4. Map β€” Schema mappings transform JSON documents into SQL rows, remapped JSON, etc.
  5. Attachments (optional) β€” Detect, fetch, and upload document attachments to cloud storage, HTTP, or filesystem
  6. Forward β€” Serialize (JSON, XML, msgpack, etc.) and send to stdout, HTTP, RDBMS, or S3
  7. Checkpoint β€” Save last_seq as a _local/ doc so restarts resume exactly where they left off

Quick Start

Prerequisites

  • Python 3.11+
  • A running Sync Gateway, Capella App Services, Edge Server, or CouchDB instance

Install & Run

pip install -r requirements.txt

# Test connectivity first
python main.py --config config.json --test

# Run the worker
python main.py --config config.json

Run with Docker

docker build -t changes-worker .

docker run --rm \
  -v $(pwd)/config.json:/app/config.json \
  changes-worker

Run with Docker Compose

# Headless β€” worker + Prometheus metrics only (port 9090)
docker compose up --build

# With Admin UI β€” worker + metrics + web dashboard (ports 9090 + 8080)
docker compose --profile ui up --build

Set "admin_ui": { "enabled": false } in config.json for headless deployments where you only need /_metrics on port 9090.

CLI

Flag Description
--config <path> Path to config.json (default: config.json)
--test Test connectivity to source + output, then exit (exit code 0/1)
--version Print version and exit

Key Features

Feature Description
Multi-job pipelines Run multiple independent _changes pipelines concurrently β€” each with its own source, output, mapping, and checkpoint
Job lifecycle control Start, stop, restart, and monitor jobs via REST API or dashboard
Multi-source Sync Gateway, App Services, Edge Server, CouchDB β€” automatic compatibility handling
Multiple outputs stdout, HTTP endpoint, RDBMS (Postgres/MySQL/MSSQL/Oracle), AWS S3 (MinIO/S3-compatible)
Feed modes Longpoll, continuous, WebSocket, SSE/EventSource
Schema mapping Transform JSON docs into SQL table rows with 58 built-in transform functions
Checkpoint CBL-style _local/ doc checkpoints β€” never re-process on restart
Dead letter queue Failed docs saved for later retry (CBL or JSONL file)
Attachment processing Detect, fetch, and upload document attachments to S3, HTTP, or filesystem with optional post-processing
Retry + backoff Configurable exponential backoff on both source and output sides
Prometheus metrics Built-in /_metrics endpoint with pipeline, system, and runtime metrics
Admin UI Web dashboard with real-time monitoring, job management, schema editor, and setup wizard
Startup validation Every config setting validated before launch β€” clear error messages
Dry run Process the feed and log what would be sent without sending
Embedded storage Couchbase Lite CE for config, checkpoints, mappings, and DLQ in Docker
Structured logging SG-inspired log keys, per-key levels, file rotation, and sensitive data redaction

πŸ“„ Full feature details: docs/FEATURES.md


Source Compatibility

Capability Sync Gateway App Services Edge Server CouchDB
Feed types longpoll, continuous, websocket longpoll, continuous, websocket longpoll, continuous, sse longpoll, continuous, eventsource
_bulk_get βœ… βœ… ❌ (individual GET) βœ…
Bearer auth βœ… βœ… ❌ βœ…
Session cookie auth βœ… βœ… ❌ ❌
Channels filter βœ… βœ… βœ… ❌
active_only βœ… βœ… βœ… ❌
Scoped keyspace βœ… βœ… βœ… ❌

πŸ“„ Full compatibility matrix & auto-behaviors: docs/SOURCE_TYPES.md


v2.0 Job Architecture

In v2.0, the monolithic config.json is replaced by a composable document model:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Input A      β”‚     β”‚   Job 1       β”‚     β”‚  Output X     β”‚
β”‚  (SG prices)  │────►│  A β†’ X        │────►│  (PostgreSQL) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  + mapping    β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Input A      β”‚     β”‚   Job 2       β”‚     β”‚  Output Y     β”‚
β”‚  (SG prices)  │────►│  A β†’ Y        │────►│  (HTTP API)   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  + mapping    β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Input B      β”‚     β”‚   Job 3       β”‚     β”‚  Output X     β”‚
β”‚  (SG orders)  │────►│  B β†’ X        │────►│  (PostgreSQL) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  + mapping    β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  • Inputs β€” Reusable _changes feed source definitions (host, auth, feed settings)
  • Outputs β€” Reusable output configs by type: RDBMS, HTTP, Cloud (S3), stdout
  • Jobs β€” Connect one input β†’ one output with a schema mapping, checkpoint, and lifecycle

Each job runs in its own thread with an isolated asyncio event loop, HTTP session, and checkpoint.

πŸ“„ Document model & collections: docs/DESIGN_2_0.md
πŸ“„ Job lifecycle & document schema: docs/JOBS.md


REST API

Job Control (v2.0)

Method Path Description
GET /api/jobs List all jobs with state
GET /api/jobs/{id} Get a single job
POST /api/jobs Create a new job
PUT /api/jobs/{id} Update a job
DELETE /api/jobs/{id} Delete a job
POST /api/jobs/{id}/start Start a job
POST /api/jobs/{id}/stop Graceful stop
POST /api/jobs/{id}/restart Stop + start
GET /api/jobs/{id}/state Job status, uptime, error count
POST /api/_restart Restart all jobs
POST /api/_offline Stop all jobs (keep config)
POST /api/_online Resume all jobs after offline

Inputs & Outputs CRUD (v2.0)

Method Path Description
GET /api/inputs_changes Get all input definitions
POST /api/inputs_changes Save inputs document
PUT /api/inputs_changes/{id} Update one input entry
DELETE /api/inputs_changes/{id} Delete one input entry
GET /api/outputs_{type} Get outputs (type = rdbms, http, cloud, stdout)
POST /api/outputs_{type} Save outputs document
PUT /api/outputs_{type}/{id} Update one output entry
DELETE /api/outputs_{type}/{id} Delete one output entry

Infrastructure

Method Path Description
GET /api/config Get infrastructure config
POST /api/config Save infrastructure config
GET /_metrics Prometheus metrics endpoint
GET /_status Health check

Admin UI

A web-based admin dashboard at http://localhost:8080:

Page Path Description
Dashboard / Multi-job status table with per-job start/stop/restart controls, live charts, architecture diagram
Settings /settings Infrastructure config (logging, metrics, admin UI, CBL, shutdown)
Schema Mappings /schema Visual drag-and-drop field mapping with transforms, AI assist, and coverage stats
Setup Wizard /wizard Guided setup: connect source β†’ configure output β†’ map fields β†’ create job
Logs /logs Real-time log viewer with job filter, log key filter, and level filter
Dead Letter Queue /dlq Browse, retry, and purge failed documents with job and reason filtering
Glossary /glossary Reference for all 58 built-in transform functions
Help /help Documentation and getting started guide

πŸ“„ Full documentation: docs/ADMIN_UI.md


One Process Per Collection

Each job monitors exactly one scope/collection. To process multiple collections, create multiple jobs:

                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                β”‚          changes_worker (v2.0)       β”‚
                β”‚                                      β”‚
config.json ──► β”‚  PipelineManager                     β”‚
                β”‚    β”œβ”€β”€ Thread 1: Job "pricesβ†’PG"     β”‚
                β”‚    β”‚     └── us.prices _changes      β”‚
                β”‚    β”œβ”€β”€ Thread 2: Job "ordersβ†’PG"     β”‚
                β”‚    β”‚     └── us.orders _changes      β”‚
                β”‚    └── Thread 3: Job "pricesβ†’HTTP"   β”‚
                β”‚          └── us.prices _changes      β”‚
                β”‚                                      β”‚
                β”‚  Shared: metrics :9090, UI :8080,    β”‚
                β”‚          CBL store, maintenance      β”‚
                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Project Structure

change_stream_db/
β”œβ”€β”€ main.py                   # Main worker entry point + poll_changes logic
β”œβ”€β”€ pipeline.py               # Per-job thread wrapper (v2.0)
β”œβ”€β”€ pipeline_manager.py       # Multi-job thread orchestrator (v2.0)
β”œβ”€β”€ pipeline_logging.py       # Structured logging (log keys, redaction, rotation)
β”œβ”€β”€ cbl_store.py              # Couchbase Lite CE storage layer (v2.0 collections)
β”œβ”€β”€ config.json               # Configuration (v1.x format, auto-migrated to v2.0)
β”œβ”€β”€ requirements.txt          # Python dependencies
β”œβ”€β”€ Dockerfile                # Container image (includes CBL-C 3.2.1)
β”œβ”€β”€ docker-compose.yml        # Docker Compose setup
β”œβ”€β”€ rest/
β”‚   β”œβ”€β”€ api_v2.py             # v2.0 REST API: inputs, outputs, jobs CRUD
β”‚   β”œβ”€β”€ api_v2_jobs_control.py# Job lifecycle endpoints (start/stop/restart)
β”‚   β”œβ”€β”€ changes_http.py       # _changes feed HTTP client logic
β”‚   β”œβ”€β”€ output_http.py        # HTTP output, dead letter queue, serialization
β”‚   β”œβ”€β”€ attachments.py        # Attachment processor orchestrator
β”‚   β”œβ”€β”€ attachment_config.py  # Attachment configuration parser
β”‚   β”œβ”€β”€ attachment_stream.py  # Streaming attachment download
β”‚   β”œβ”€β”€ attachment_upload.py  # Upload to S3/HTTP/filesystem
β”‚   β”œβ”€β”€ attachment_multipart.py # Multipart attachment handling
β”‚   └── attachment_postprocess.py # Post-upload actions (update doc, delete, purge)
β”œβ”€β”€ cloud/
β”‚   β”œβ”€β”€ cloud_base.py         # Abstract base forwarder + CloudMetrics
β”‚   └── cloud_s3.py           # AWS S3 / MinIO / S3-compatible output
β”œβ”€β”€ db/
β”‚   β”œβ”€β”€ db_base.py            # Base DB forwarder + schema mapping
β”‚   β”œβ”€β”€ db_postgres.py        # PostgreSQL output
β”‚   β”œβ”€β”€ db_mysql.py           # MySQL output
β”‚   β”œβ”€β”€ db_mssql.py           # MS SQL Server output
β”‚   └── db_oracle.py          # Oracle output
β”œβ”€β”€ schema/
β”‚   β”œβ”€β”€ mapper.py             # Schema mapper (JSON β†’ SQL operations)
β”‚   └── validator.py          # Mapping file validator
β”œβ”€β”€ web/
β”‚   β”œβ”€β”€ server.py             # Web server module
β”‚   β”œβ”€β”€ templates/            # Admin UI HTML pages (8 pages)
β”‚   └── static/               # CSS, JS, icons, favicon
β”œβ”€β”€ mappings/                 # Schema mapping files (JSON)
β”œβ”€β”€ tests/                    # Unit tests (24 test files, 775+ tests)
β”œβ”€β”€ docs/                     # Documentation (22 docs)
β”œβ”€β”€ img/                      # Architecture diagrams
β”œβ”€β”€ guide/                    # Developer guides (release checklist, style guide)
β”œβ”€β”€ logs/                     # Log output (gitignored)
└── release_works/            # Release planning documents

Documentation

Document Description
docs/DESIGN_2_0.md v2.0 architecture: job model, CBL collections, PipelineManager, phases
docs/JOBS.md Job document schema, lifecycle, and checkpoint isolation
docs/UI_JOBS_MANAGEMENT.md Multi-job UI design: dashboard, logs, DLQ filtering
docs/CONFIGURATION.md Full config.json reference with all settings
docs/FEATURES.md Detailed feature documentation (feeds, output, metrics, etc.)
docs/SOURCE_TYPES.md Source compatibility matrix and auto-behaviors
docs/DESIGN.md Pipeline architecture, failure modes, checkpoint strategies
docs/SCHEMA_MAPPING.md Schema mapping format, transforms, and examples
docs/ADMIN_UI.md Admin dashboard documentation
docs/WIZARD.md Setup wizard guide
docs/ATTACHMENTS.md Attachment processing: modes, detection, fetch, upload, post-process
docs/CLOUD_BLOB_PLAN.md Cloud blob storage design document
docs/LOGGING.md Structured logging: log keys, redaction, rotation, TRACE level
docs/DLQ.md Dead letter queue: lifecycle, replay, retention
docs/CBL_DATABASE.md Couchbase Lite database schema
docs/CBL_STORE.md CBL store API reference
docs/CHANGES_PROCESSING.md Changes feed processing internals
docs/DEBUGGING.md Debugging and troubleshooting guide
docs/RDBMS_IMPLEMENTATION.md RDBMS output implementation details
docs/HA.md High Availability via CBL replication (v3.0 roadmap)
docs/MULTI_PIPELINE_PLAN.md Multi-pipeline threading design reference
guide/RELEASE.md Release checklist and best practices

Upgrading from v1.x

v2.0 is backward compatible at startup. On first run, the worker automatically migrates your existing config.json into the new document model:

  1. Creates an input entry from gateway + auth + changes_feed
  2. Creates an output entry from output
  3. Creates a default job connecting them
  4. Preserves your checkpoint

Your original config.json is not modified β€” you can roll back to v1.7.0 at any time. See the RELEASE_NOTES.md for rollback details.


License

See LICENSE.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors