Skip to content

Latest commit

 

History

History
315 lines (223 loc) · 12.2 KB

File metadata and controls

315 lines (223 loc) · 12.2 KB

PgFlow

Workflows, background jobs and cron in Elixir and Postgres powered by PGMQ

A native Elixir implementation of pgflow — a PostgreSQL-based workflow engine built on pgmq. Define multi-step DAG workflows ("flows"), one-off background jobs ("jobs"), and scheduled cron jobs and flows — all backed by the same PostgreSQL queuing infrastructure with retries, visibility timeouts, and delivery guarantees. Built on OTP with supervised workers, adaptive backoff polling, and an optional LISTEN/NOTIFY strategy for low-latency task dispatch. Compatible with the TypeScript/Deno pgflow project, sharing the same database schema and SQL functions.

Why PgFlow?

  • No extra infrastructure — Runs entirely in PostgreSQL using pgmq. No Redis, no external queue service.
  • Queryable state — All workflow state lives in SQL tables. Debug with SELECT * FROM pgflow.runs.
  • Automatic retries — Failed steps retry with exponential backoff. Only failed steps retry, not the whole workflow.
  • Parallel processing — Steps run concurrently when dependencies allow. Fan-out with map for array processing.
  • Cross-language — Same flows can be processed by Elixir or Deno (Supabase) workers side-by-side.

Further Reading

Prerequisites

  • Elixir 1.17+
  • PostgreSQL 17+ with:
    • pgmq — pgflow's queue backbone. The mix pgflow.gen.pgmq_migration task installs it via SQL (works on Neon, self-hosted, and any plain Postgres). Skip this step if your environment already ships pgmq (e.g. Supabase projects or managed services where pgmq is pre-enabled).

    • pg_cron (only for cron-scheduled flows/jobs) — requires two server-level settings before CREATE EXTENSION pg_cron will succeed:

      • shared_preload_libraries = 'pg_cron' (requires a Postgres restart)
      • cron.database_name = '<your_app_db>' — pg_cron's metadata lives in exactly one DB; defaults to postgres

      Supported on Neon, AWS RDS (PG 12.5+), Aurora (PG 12.6+), and Supabase — each with its own setup path (parameter groups, API calls, etc. — check your host's docs). If your host doesn't support pg_cron, generate extensions with mix pgflow.gen.postgres_extensions_migration --no-cron — cron-scheduled flows/jobs become unavailable but the rest of pgflow works.

    • Standard extensions: citext, pg_trgm, pgcrypto.

  • An Ecto repository

The provided Docker setup (Postgres 17) includes all extensions pre-configured.

Installation

Add pgflow to your dependencies in mix.exs:

def deps do
  [
    {:pgflow, "~> 0.1.0"}
  ]
end

Then fetch dependencies:

mix deps.get

Quick Start

1. Database Setup

For development, use the provided Docker Compose which builds a pre-configured Postgres image:

docker compose up -d

This builds a Postgres 17 image with pgmq, pg_cron, and the pgflow schema pre-loaded. Database available at localhost:54322 (user: postgres, password: postgres, database: pgflow_test).

Resetting the database: The pgflow schema is loaded by the Docker init script on first container creation only. If you drop the database (e.g. mix ecto.reset), you must re-apply it:

# Destroy the Docker volume and start fresh
docker compose down -v && docker compose up -d

For your application, generate consumer migrations using the setup tasks. Each writes one wrapper migration into your app's priv/repo/migrations/:

# 1. Install required Postgres extensions (citext, pg_trgm, pgcrypto, pg_cron).
#    Pass `--no-cron` if pg_cron isn't available on your host.
mix pgflow.gen.postgres_extensions_migration

# 2. Install pgmq (unless your Postgres already provides it as an extension —
#    e.g. Supabase. On most hosts pgmq isn't native; use this task).
mix pgflow.gen.pgmq_migration

# 3. Install the pgflow schema + Elixir helper functions. Add `--dashboard`
#    to also install the LiveView dashboard schema. `--no-helpers` skips
#    the Elixir-binding SQL helpers (only useful if you're using a
#    different client).
mix pgflow.setup

# 4. Apply everything.
mix ecto.migrate

The generated setup_pgflow.exs migration just calls PgFlow.Migration.up/0 and PgFlow.HelpersMigration.up/0 — new pgflow releases bump the vendored SQL, not your migration list.

2. Define a Flow

defmodule MyApp.Flows.ProcessOrder do
  use PgFlow.Flow

  @flow queue: :process_order, max_attempts: 3, base_delay: 5, timeout: 60

  step :validate do
    fn input, _ctx ->
      %{order_id: input["order_id"], valid: true}
    end
  end

  step :charge_payment, depends_on: [:validate] do
    fn deps, _ctx ->
      %{charged: true, amount: deps["validate"]["amount"]}
    end
  end

  step :send_confirmation, depends_on: [:charge_payment] do
    fn deps, _ctx ->
      %{sent: true}
    end
  end
end

See PgFlow.Flow moduledocs for the full DSL reference (step options, map macro, handler input, error handling).

3. Compile the Flow to Database

Before workers can process a flow, it must be "compiled" into the database. This creates the flow record, PGMQ queue, and step definitions:

mix pgflow.gen.flow_migration MyApp.Flows.ProcessOrder
mix ecto.migrate

Note: If you start a worker for a flow that hasn't been compiled, you'll get a helpful error message with the exact command to run.

4. Configure and Start

# config/config.exs
config :my_app, MyApp.PgFlow,
  repo: MyApp.Repo,
  flows: [MyApp.Flows.ProcessOrder],
  signal_strategy: :notify              # use LISTEN/NOTIFY for low-latency (requires pgmq 1.8+)

All options have sensible defaults — only repo is required. See PgFlow.Config for the full list (concurrency, batch size, poll intervals, recovery, etc.).

# lib/my_app/application.ex
def start(_type, _args) do
  children = [
    MyApp.Repo,
    {PgFlow, Application.fetch_env!(:my_app, MyApp.PgFlow)}
  ]

  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

5. Trigger a Flow

# Async — returns immediately with run_id
{:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})

# Sync — waits for completion (with optional timeout)
{:ok, run} = PgFlow.start_flow_sync(:process_order, %{"order_id" => 123}, timeout: 30_000)

# Check run status
{:ok, run} = PgFlow.get_run(run_id)
{:ok, run} = PgFlow.get_run_with_states(run_id)

Background Jobs

PgFlow supports simple background jobs — one-off tasks like sending emails or processing webhooks. Jobs are single-step flows under the hood, reusing the same queuing infrastructure, retries, and dashboard visibility.

defmodule MyApp.Jobs.SendEmail do
  use PgFlow.Job

  @job queue: :send_email, max_attempts: 5, base_delay: 10, timeout: 120

  perform :deliver do
    fn input, _ctx ->
      Mailer.send(input["to"], input["subject"], input["body"])
      %{sent: true}
    end
  end
end

The step name in perform :deliver do is optional — when omitted, it defaults to the @job queue/slug value.

See PgFlow.Job moduledocs for the full options reference.

# Compile to database
mix pgflow.gen.job_migration MyApp.Jobs.SendEmail
mix ecto.migrate
# Enqueue a job
{:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com", "subject" => "Hello"})

Cron Scheduling

Both flows and jobs support cron scheduling via pg_cron. Add a cron option to run on a schedule:

@flow queue: :daily_report, cron: [schedule: "0 9 * * *", input: %{"type" => "daily"}]
@job queue: :cleanup, cron: [schedule: "@hourly"]

The cron schedule SQL is generated automatically when you run mix pgflow.gen.flow_migration or mix pgflow.gen.job_migration and migrate.

Mix Tasks

One-time setup

Run these once when adding pgflow to a project. Migrations are applied via mix ecto.migrate.

Task Description
mix pgflow.gen.postgres_extensions_migration Migration: citext, pg_trgm, pgcrypto, pg_cron
mix pgflow.gen.pgmq_migration Migration: pgmq via SQL-only install
mix pgflow.setup Wrapper migration: core schema + helpers
mix pgflow.gen.helpers_migration Migration: Elixir helpers standalone (setup bundles these)
mix pgflow.stamp Adopt an existing pgflow schema into EctoEvolver tracking

Per-flow / per-job

Run once per flow or job module. Each generates a migration that compiles the flow/job definition into the database.

Task Description
mix pgflow.gen.flow_migration MyApp.Flow Generate migration to compile flow to database
mix pgflow.gen.job_migration MyApp.Job Generate migration to compile job to database

Verification & test DB

Task Description
mix pgflow.check_schema Verify pgflow database schema compatibility
mix pgflow.test.setup Set up test database
mix pgflow.test.reset Reset test database (teardown + setup)
mix pgflow.test.teardown Tear down test database

Dashboard

PgFlow includes an optional Phoenix LiveView dashboard for monitoring workflows, jobs, workers, and cron schedules in real-time.

See DASHBOARD.md for installation instructions.

LiveView Integration

PgFlow.LiveClient provides a LiveView-native client for tracking flow and job runs in real-time. It manages PubSub subscriptions and applies incremental updates to %Run{} structs in your socket assigns:

defmodule MyAppWeb.OrderLive do
  use MyAppWeb, :live_view
  alias PgFlow.LiveClient

  def mount(_params, _session, socket) do
    {:ok, LiveClient.init(socket, pubsub: MyApp.PubSub)}
  end

  def handle_event("process", params, socket) do
    {:ok, socket} = LiveClient.start_flow(socket, :process_order, params)
    {:noreply, socket}
  end

  def handle_info({:pgflow, _, _} = msg, socket) do
    {:noreply, LiveClient.handle_info(msg, socket)}
  end
end

Requires the :pubsub option in your PgFlow config. See LIVE_CLIENT.md for the full API, multiple run tracking, and struct reference.

Demo App

See demo/README.md for a Phoenix LiveView application demonstrating PgFlow with real-time flow visualization.

Telemetry

PgFlow emits :telemetry events across worker, poll, task, and run lifecycles for monitoring and metrics collection. See PgFlow.Telemetry moduledocs for event names, measurements, and metadata.

Testing

# Start the database (same as Quick Start step 1)
docker compose up -d

# Run tests
mix test

Compatibility with PgFlow TypeScript/Deno

This Elixir implementation is compatible with the TypeScript/Deno version — same PostgreSQL schema, same SQL functions, same PGMQ message format. Workers can run side-by-side. See ELIXIR_VS_SUPABASE.md for a detailed comparison and schema divergences.

License

MIT