From 5fd551315d42695fa9fca05fda0c38e59073f188 Mon Sep 17 00:00:00 2001 From: spwoodcock Date: Wed, 17 Dec 2025 12:42:34 +0000 Subject: [PATCH 1/2] feat: simplify code to use pgsql-http requests directly, instead of pg_notify --- .github/workflows/http-postgres.yml | 4 +- .github/workflows/release.yml | 2 +- Dockerfile | 2 +- README.md | 169 ++--- compose.webhook.yml | 3 + compose.yml | 7 +- db/connection.go | 92 ++- db/listener.go | 132 ---- db/notifier.go | 282 -------- db/notifier_test.go | 83 --- db/trigger.go | 255 ++++++-- db/trigger_test.go | 707 ++++++++++---------- go.mod | 2 +- main.go | 234 +++---- main_test.go | 780 ++++++++++++++++++----- parser/audit.go | 134 ---- parser/audit_test.go | 105 --- http.dockerfile => pgsql-http.dockerfile | 5 + webhook/auth.go | 1 - webhook/request.go | 69 -- webhook/request_test.go | 106 --- 21 files changed, 1449 insertions(+), 1725 deletions(-) delete mode 100644 db/listener.go delete mode 100644 db/notifier.go delete mode 100644 db/notifier_test.go delete mode 100644 parser/audit.go delete mode 100644 parser/audit_test.go rename http.dockerfile => pgsql-http.dockerfile (89%) delete mode 100644 webhook/auth.go delete mode 100644 webhook/request.go delete mode 100644 webhook/request_test.go diff --git a/.github/workflows/http-postgres.yml b/.github/workflows/http-postgres.yml index 803e35c..0ed0396 100644 --- a/.github/workflows/http-postgres.yml +++ b/.github/workflows/http-postgres.yml @@ -4,7 +4,7 @@ on: push: branches: [main] paths: - - 'http.dockerfile' + - 'pgsql-http.dockerfile' - '.github/workflows/http-postgres.yml' workflow_dispatch: @@ -22,4 +22,4 @@ jobs: build_target: pg-${{ matrix.pg_version }} image_tags: | "ghcr.io/hotosm/postgres:${{ matrix.pg_version }}-http" - dockerfile: http.dockerfile + dockerfile: pgsql-http.dockerfile diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8423790..e6ee435 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,7 +19,7 @@ jobs: runs-on: ubuntu-latest container: - image: docker.io/goreleaser/goreleaser-cross:v1.24 + image: docker.io/goreleaser/goreleaser-cross:v1.25 steps: - name: Checkout diff --git a/Dockerfile b/Dockerfile index e724739..674123a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.24 AS base +FROM golang:1.25 AS base # Build statically compiled binary diff --git a/README.md b/README.md index 8e3b4e2..73b0b94 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@

- A lightweight webhook for ODK Central submissions and entity property updates. + A lightweight CLI tool for installing webhook triggers in ODK Central database.

@@ -17,75 +17,63 @@ -Call a remote API on ODK Central database events: +Install PostgreSQL triggers that automatically call remote APIs on ODK Central database events: - New submission (XML). - Update entity (entity properties). - Submission review (approved, hasIssues, rejected). -The `centralwebhook` binary is small ~15MB and only consumes -~5MB of memory when running. - -> [!NOTE] -> There is a 8000 byte Postgres limit for the submission XML that can be sent -> until [this issue](https://github.com/hotosm/central-webhook/issues/9) -> is addressed. -> -> The submission XML will simply be truncated in this case. +The `centralwebhook` tool is a simple CLI that installs or uninstalls database triggers. Once installed, the triggers use the `pgsql-http` extension to send HTTP requests directly from the database. ## Prerequisites -- ODK Central running, connecting to an accessible Postgresql database. -- A POST webhook endpoint on your service API, to call when the selected - event occurs. +- ODK Central running, connecting to an accessible PostgreSQL database. +- The `pgsql-http` extension installed and enabled in your PostgreSQL database: + ```sql + CREATE EXTENSION http; + ``` + > [!NOTE] + > **Using our helper images**: We provide PostgreSQL images with the `pgsql-http` extension pre-installed: + > - `ghcr.io/hotosm/postgres:18-http` (based on vanilla PostgreSQL 18 images) + > + > These images are drop-in replacements for standard PostgreSQL images and simply add the extension. + > + > **Installing manually**: If you don't wish to use these images, you must install the `pgsql-http` extension yourself. The extension may require superuser privileges to install. If you cannot install it yourself, ask your database administrator. +- A POST webhook endpoint on your service API, to call when the selected event occurs. ## Usage -The `centralwebhook` tool is a service that runs continually, monitoring the -ODK Central database for updates and triggering the webhook as appropriate. +The `centralwebhook` tool is a CLI that installs or uninstalls database triggers. After installation, the triggers run automatically whenever audit events occur in the database. -### Integrate Into [ODK Central](https://github.com/getodk/central) Stack +### Install Triggers -- It's possible to include this as part of the standard ODK Central docker - compose stack. -- First add the environment variables to your `.env` file: +Install webhook triggers in your database: - ```dotenv - CENTRAL_WEBHOOK_UPDATE_ENTITY_URL=https://your.domain.com/some/webhook - CENTRAL_WEBHOOK_REVIEW_SUBMISSION_URL=https://your.domain.com/some/webhook - CENTRAL_WEBHOOK_NEW_SUBMISSION_URL=https://your.domain.com/some/webhook - CENTRAL_WEBHOOK_API_KEY=your_api_key_key - ``` +```bash +./centralwebhook install \ + -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ + -updateEntityUrl 'https://your.domain.com/some/webhook' \ + -newSubmissionUrl 'https://your.domain.com/some/webhook' \ + -reviewSubmissionUrl 'https://your.domain.com/some/webhook' +``` > [!TIP] -> Omit a xxx_URL variable if you do not wish to use that particular webhook. +> Omit a webhook URL flag if you do not wish to use that particular webhook. > -> The CENTRAL_WEBHOOK_API_KEY variable is also optional, see the -> [APIs With Authentication](#apis-with-authentication) section. - -- Then extend the docker compose configuration at startup: - - ```bash - # Starting from the getodk/central code repo - docker compose -f docker-compose.yml -f /path/to/this/repo/compose.webhook.yml up -d - ``` - -### Other Ways To Run +> The `-apiKey` flag is optional, see the [APIs With Authentication](#apis-with-authentication) section. -

-Via Docker (Standalone) +### Uninstall Triggers -#### Via Docker (Standalone) +Remove webhook triggers from your database: ```bash -docker run -d ghcr.io/hotosm/central-webhook:latest \ - -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ - -updateEntityUrl 'https://your.domain.com/some/webhook' \ - -newSubmissionUrl 'https://your.domain.com/some/webhook' \ - -reviewSubmissionUrl 'https://your.domain.com/some/webhook' +./centralwebhook uninstall \ + -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' ``` -Environment variables are also supported: +### Environment Variables + +All flags can also be provided via environment variables: ```dotenv CENTRAL_WEBHOOK_DB_URI=postgresql://user:pass@localhost:5432/db_name?sslmode=disable @@ -96,74 +84,22 @@ CENTRAL_WEBHOOK_API_KEY=ksdhfiushfiosehf98e3hrih39r8hy439rh389r3hy983y CENTRAL_WEBHOOK_LOG_LEVEL=DEBUG ``` -
- -
-Via Binary (Standalone) +### Via Docker -#### Via Binary (Standalone) - -Download the binary for your platform from the -[releases](https://github.com/hotosm/central-webhook/releases) page. - -Then run with: +You can run the CLI tool via Docker: ```bash -./centralwebhook \ +docker run --rm ghcr.io/hotosm/central-webhook:latest install \ -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ -updateEntityUrl 'https://your.domain.com/some/webhook' \ -newSubmissionUrl 'https://your.domain.com/some/webhook' \ -reviewSubmissionUrl 'https://your.domain.com/some/webhook' ``` -> It's possible to specify a single webhook event, or multiple. - -
- -
-Via Code - -#### Via Code - -Usage via the code / API: - -```go -package main - -import ( - "fmt" - "context" - "log/slog" - - "github.com/hotosm/central-webhook/db" - "github.com/hotosm/central-webhook/webhook" -) - -ctx := context.Background() -log := slog.New() - -dbPool, err := db.InitPool(ctx, log, "postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable") -if err != nil { - fmt.Fprintf(os.Stderr, "could not connect to database: %v", err) -} - -err = SetupWebhook( - log, - ctx, - dbPool, - nil, - "https://your.domain.com/some/entity/webhook", - "https://your.domain.com/some/submission/webhook", - "https://your.domain.com/some/review/webhook", -) -if err != nil { - fmt.Fprintf(os.Stderr, "error setting up webhook: %v", err) -} -``` - -> To not provide a webhook for an event, pass `nil` as the url. +### Download Binary -
+Download the binary for your platform from the +[releases](https://github.com/hotosm/central-webhook/releases) page. ## Webhook Request Payload Examples @@ -208,7 +144,7 @@ Many APIs will not be public and require some sort of authentication. There is an optional `-apiKey` flag that can be used to pass an API key / token provided by the application. -This will be inserted in the `X-API-Key` request header. +This will be inserted in the `X-API-Key` request header when the trigger sends HTTP requests. No other authentication methods are supported for now, but feel free to open an issue (or PR!) for a proposal to support other @@ -217,7 +153,7 @@ auth methods. Example: ```bash -./centralwebhook \ +./centralwebhook install \ -db 'postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable' \ -updateEntityUrl 'https://your.domain.com/some/webhook' \ -apiKey 'ksdhfiushfiosehf98e3hrih39r8hy439rh389r3hy983y' @@ -284,18 +220,25 @@ async def update_entity_status_in_fmtm( raise HTTPException(status_code=400, detail=msg) ``` +## How It Works + +The tool installs PostgreSQL triggers on the `audits` table that: + +1. Detect when audit events occur (entity updates, submission creates/updates) +2. Format the event data into a JSON payload with `type`, `id`, and `data` fields +3. Use the `pgsql-http` extension to send an HTTP POST request directly from the database +4. Include the `X-API-Key` header if provided during installation + +The triggers run automatically after installation - no long-running service is needed. + ## Development -- This package mostly uses the standard library, plus a Postgres driver -and testing framework. +- This package uses the standard library and a Postgres driver. - Binary and container image distribution is automated on new **release**. ### Run The Tests -The test suite depends on a database, so the most convenient way is to run -via docker. - -There is a pre-configured `compose.yml` for testing: +The test suite depends on a database with the `pgsql-http` extension installed. The most convenient way is to run via docker: ```bash docker compose run --rm webhook diff --git a/compose.webhook.yml b/compose.webhook.yml index 9b6639a..9972089 100644 --- a/compose.webhook.yml +++ b/compose.webhook.yml @@ -16,3 +16,6 @@ services: postgres14: condition: service_started restart: always + + postgres14: + image: "ghcr.io/hotosm/postgres:14-http" diff --git a/compose.yml b/compose.yml index fc081c4..f94fab5 100644 --- a/compose.yml +++ b/compose.yml @@ -17,8 +17,6 @@ services: - ./main.go:/app/main.go:ro - ./main_test.go:/app/main_test.go:ro - ./db:/app/db:ro - - ./webhook:/app/webhook:ro - - ./parser:/app/parser:ro # environment: # # Override to use database on host # CENTRAL_WEBHOOK_DB_URI: postgresql://odk:odk@host.docker.internal:5434/odk?sslmode=disable @@ -36,7 +34,10 @@ services: entrypoint: go test -timeout=2m -v ./... db: - image: "postgis/postgis:17-3.5-alpine" + image: "ghcr.io/hotosm/postgres:18-http" + build: + dockerfile: pgsql-http.dockerfile + target: pg-18 container_name: centralwebhook-db environment: - POSTGRES_USER=odk diff --git a/db/connection.go b/db/connection.go index 382fdb4..a87287f 100644 --- a/db/connection.go +++ b/db/connection.go @@ -2,22 +2,96 @@ package db import ( "context" + "fmt" "log/slog" + "os" + "strconv" + "time" "github.com/jackc/pgx/v5/pgxpool" ) +const ( + // Default retry configuration + defaultMaxRetries = 30 + defaultInitialInterval = 1 * time.Second + defaultMaxInterval = 5 * time.Second +) + func InitPool(ctx context.Context, log *slog.Logger, dbUri string) (*pgxpool.Pool, error) { - // get a connection pool - dbPool, err := pgxpool.New(ctx, dbUri) - if err != nil { - log.Error("error connection to DB", "err", err) - return nil, err + // Get retry configuration from environment or use defaults + maxRetries := getEnvInt("CENTRAL_WEBHOOK_DB_MAX_RETRIES", defaultMaxRetries) + initialInterval := getEnvDuration("CENTRAL_WEBHOOK_DB_RETRY_INTERVAL", defaultInitialInterval) + maxInterval := getEnvDuration("CENTRAL_WEBHOOK_DB_MAX_RETRY_INTERVAL", defaultMaxInterval) + + var dbPool *pgxpool.Pool + var err error + interval := initialInterval + + log.Info("Connecting to database", "max_retries", maxRetries) + + for attempt := 0; attempt < maxRetries; attempt++ { + // Try to create connection pool + dbPool, err = pgxpool.New(ctx, dbUri) + if err != nil { + if attempt < maxRetries-1 { + log.Info("Database connection failed, retrying", "attempt", attempt+1, "max_retries", maxRetries, "retry_in", interval, "err", err) + time.Sleep(interval) + // Exponential backoff: double the interval, but cap at maxInterval + interval = min(interval*2, maxInterval) + continue + } + log.Error("error connecting to DB after retries", "attempts", attempt+1, "err", err) + return nil, fmt.Errorf("failed to connect to database after %d attempts: %w", attempt+1, err) + } + + // Try to ping the database + if err = dbPool.Ping(ctx); err != nil { + dbPool.Close() // Close the pool if ping fails + if attempt < maxRetries-1 { + log.Info("Database ping failed, retrying", "attempt", attempt+1, "max_retries", maxRetries, "retry_in", interval, "err", err) + time.Sleep(interval) + // Exponential backoff: double the interval, but cap at maxInterval + interval = min(interval*2, maxInterval) + continue + } + log.Error("error pinging DB after retries", "attempts", attempt+1, "err", err) + return nil, fmt.Errorf("failed to ping database after %d attempts: %w", attempt+1, err) + } + + // Success! + log.Info("Database connection established", "attempts", attempt+1) + return dbPool, nil } - if err = dbPool.Ping(ctx); err != nil { - log.Error("error pinging DB", "err", err) - return nil, err + + // This should never be reached, but included for safety + return nil, fmt.Errorf("failed to connect to database after %d attempts", maxRetries) +} + +// getEnvInt gets an integer from environment variable or returns default +func getEnvInt(key string, defaultValue int) int { + if val := os.Getenv(key); val != "" { + if intVal, err := strconv.Atoi(val); err == nil { + return intVal + } } + return defaultValue +} - return dbPool, nil +// getEnvDuration gets a duration from environment variable or returns default +func getEnvDuration(key string, defaultValue time.Duration) time.Duration { + if val := os.Getenv(key); val != "" { + if duration, err := time.ParseDuration(val); err == nil { + return duration + } + } + return defaultValue +} + +// min returns the minimum of two durations +func min(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b } diff --git a/db/listener.go b/db/listener.go deleted file mode 100644 index c851e51..0000000 --- a/db/listener.go +++ /dev/null @@ -1,132 +0,0 @@ -// Code by @brojonat -// https://github.com/brojonat/notifier -// Idea by @brandur - -package db - -import ( - "context" - "errors" - "sync" - - "github.com/jackc/pgx/v5/pgxpool" -) - -// Listener interface connects to the database and allows callers to listen to a -// particular topic by issuing a LISTEN command. WaitForNotification blocks -// until receiving a notification or until the supplied context expires. The -// default implementation is tightly coupled to pgx (following River's -// implementation), but callers may implement their own listeners for any -// backend they'd like. -type Listener interface { - Close(ctx context.Context) error - Connect(ctx context.Context) error - Listen(ctx context.Context, topic string) error - Ping(ctx context.Context) error - Unlisten(ctx context.Context, topic string) error - WaitForNotification(ctx context.Context) (*Notification, error) -} - -// NewListener return a Listener that draws a connection from the supplied Pool. This -// is somewhat discouraged -func NewListener(dbPool *pgxpool.Pool) Listener { - return &listener{ - mu: sync.Mutex{}, - dbPool: dbPool, - } -} - -type listener struct { - conn *pgxpool.Conn - dbPool *pgxpool.Pool - mu sync.Mutex -} - -// Close the connection to the database. -func (listener *listener) Close(ctx context.Context) error { - listener.mu.Lock() - defer listener.mu.Unlock() - - if listener.conn == nil { - return nil - } - - // Release below would take care of cleanup and potentially put the - // connection back into rotation, but in case a Listen was invoked without a - // subsequent Unlisten on the same topic, close the connection explicitly to - // guarantee no other caller will receive a partially tainted connection. - err := listener.conn.Conn().Close(ctx) - - // Even in the event of an error, make sure conn is set back to nil so that - // the listener can be reused. - listener.conn.Release() - listener.conn = nil - - return err -} - -// Connect to the database. -func (listener *listener) Connect(ctx context.Context) error { - listener.mu.Lock() - defer listener.mu.Unlock() - - if listener.conn != nil { - return errors.New("connection already established") - } - - conn, err := listener.dbPool.Acquire(ctx) - if err != nil { - return err - } - - listener.conn = conn - return nil -} - -// Listen issues a LISTEN command for the supplied topic. -func (listener *listener) Listen(ctx context.Context, topic string) error { - listener.mu.Lock() - defer listener.mu.Unlock() - - _, err := listener.conn.Exec(ctx, "LISTEN \""+topic+"\"") - return err -} - -// Ping the database -func (listener *listener) Ping(ctx context.Context) error { - listener.mu.Lock() - defer listener.mu.Unlock() - - return listener.conn.Ping(ctx) -} - -// Unlisten issues an UNLISTEN from the supplied topic. -func (listener *listener) Unlisten(ctx context.Context, topic string) error { - listener.mu.Lock() - defer listener.mu.Unlock() - - _, err := listener.conn.Exec(ctx, "UNLISTEN \""+topic+"\"") - return err -} - -// WaitForNotification blocks until receiving a notification and returns it. The -// pgx driver should maintain a buffer of notifications, so as long as Listen -// has been called, repeatedly calling WaitForNotification should yield all -// notifications. -func (listener *listener) WaitForNotification(ctx context.Context) (*Notification, error) { - listener.mu.Lock() - defer listener.mu.Unlock() - - pgn, err := listener.conn.Conn().WaitForNotification(ctx) - - if err != nil { - return nil, err - } - - n := Notification{ - Channel: pgn.Channel, - Payload: []byte(pgn.Payload), - } - - return &n, nil -} diff --git a/db/notifier.go b/db/notifier.go deleted file mode 100644 index 4a260bf..0000000 --- a/db/notifier.go +++ /dev/null @@ -1,282 +0,0 @@ -// Code by @brojonat -// https://github.com/brojonat/notifier -// Idea by @brandur - -package db - -import ( - "context" - "errors" - "fmt" - "log/slog" - "slices" - "sync" - "time" -) - -// Notifier interface wraps a Listener. It holds a single Postgres connection -// per process, allows other components in the same program to use it to -// subscribe to any number of topics, waits for notifications, and distributes -// them to listening components as they’re received -type Notifier interface { - // Returns a Subscription to the supplied channel topic which can be used to by - // the caller to receive data published to that channel - Listen(channel string) Subscription - - // this runs the receiving loop forever - Run(ctx context.Context) error -} - -// Subscription provides a means to listen on a particular topic. Notifiers -// return Subscriptions that callers can use to receive updates. -type Subscription interface { - NotificationC() <-chan []byte - EstablishedC() <-chan struct{} - Unlisten(ctx context.Context) -} - -// Notification encapsulates a published message -type Notification struct { - Channel string `json:"channel"` - Payload []byte `json:"payload"` -} - -type subscription struct { - channel string - listenChan chan []byte - notifier *notifier - - establishedChan chan struct{} - establishedChanClose func() - unlistenOnce sync.Once -} - -// NotificationC returns the underlying notification channel. -func (s *subscription) NotificationC() <-chan []byte { return s.listenChan } - -// EstablishedC is a channel that's closed after the Notifier has successfully -// established a connection to the database and started listening for updates. -// -// There's no full guarantee that the notifier will successfully establish a -// listen, so callers will usually want to `select` on it combined with a -// context done, a stop channel, and/or a timeout. -func (s *subscription) EstablishedC() <-chan struct{} { return s.establishedChan } - -// Unlisten unregisters the subscriber from its notifier -func (s *subscription) Unlisten(ctx context.Context) { - s.unlistenOnce.Do(func() { - // Unlisten uses background context in case of cancellation. - if err := s.notifier.unlisten(context.Background(), s); err != nil { - s.notifier.log.Error("error unlistening on channel", "err", err, "channel", s.channel) - } - }) -} - -type notifier struct { - mu sync.RWMutex - log *slog.Logger - listener Listener - subscriptions map[string][]*subscription - channelChanges []channelChange - waitForNotificationCancel context.CancelFunc -} - -func NewNotifier(log *slog.Logger, listener Listener) Notifier { - return ¬ifier{ - mu: sync.RWMutex{}, - log: log, - listener: listener, - subscriptions: make(map[string][]*subscription), - channelChanges: []channelChange{}, - waitForNotificationCancel: context.CancelFunc(func() {}), - } -} - -type channelChange struct { - channel string - close func() - operation string -} - -// Listen returns a Subscription. -func (n *notifier) Listen(channel string) Subscription { - n.mu.Lock() - defer n.mu.Unlock() - - existingSubs := n.subscriptions[channel] - - sub := &subscription{ - channel: channel, - listenChan: make(chan []byte, 2), - notifier: n, - } - n.subscriptions[channel] = append(existingSubs, sub) - - if len(existingSubs) > 0 { - // If there's already another subscription for this channel, reuse its - // established channel. It may already be closed (to indicate that the - // connection is established), but that's okay. - sub.establishedChan = existingSubs[0].establishedChan - sub.establishedChanClose = func() {} // no op since not channel owner - - return sub - } - - // The notifier will close this channel after it's successfully established - // `LISTEN` for the given channel. Gives subscribers a way to confirm a - // listen before moving on, which is especially useful in tests. - sub.establishedChan = make(chan struct{}) - sub.establishedChanClose = sync.OnceFunc(func() { close(sub.establishedChan) }) - - n.channelChanges = append(n.channelChanges, - channelChange{channel, sub.establishedChanClose, "listen"}) - - // Cancel out of blocking on WaitForNotification so changes can be processed - // immediately. - n.waitForNotificationCancel() - - return sub -} - -const listenerTimeout = 10 * time.Second - -// Listens on a topic with an appropriate logging statement. Should be preferred -// to `listener.Listen` for improved logging/telemetry. -func (n *notifier) listenerListen(ctx context.Context, channel string) error { - ctx, cancel := context.WithTimeout(ctx, listenerTimeout) - defer cancel() - - n.log.Debug("listening on channel", "channel", channel) - if err := n.listener.Listen(ctx, channel); err != nil { - return fmt.Errorf("error listening on channel %q: %w", channel, err) - } - - return nil -} - -// Unlistens on a topic with an appropriate logging statement. Should be -// preferred to `listener.Unlisten` for improved logging/telemetry. -func (n *notifier) listenerUnlisten(ctx context.Context, channel string) error { - ctx, cancel := context.WithTimeout(ctx, listenerTimeout) - defer cancel() - - n.log.Debug("unlistening on channel", "channel", channel) - if err := n.listener.Unlisten(ctx, string(channel)); err != nil { - return fmt.Errorf("error unlistening on channel %q: %w", channel, err) - } - - return nil -} - -// this needs to pull channelChange instances from the channelChange channel -// in order to perform LISTEN/UNLISTEN operations on the notifier. -func (n *notifier) processChannelChanges(ctx context.Context) error { - n.log.Debug("processing channel changes...") - n.mu.Lock() - defer n.mu.Unlock() - for _, u := range n.channelChanges { - switch u.operation { - case "listen": - n.log.Debug("listening to new channel", "channel", u.channel) - n.listenerListen(ctx, u.channel) - u.close() - case "unlisten": - n.log.Debug("unlistening from channel", "channel", u.channel) - n.listenerUnlisten(ctx, u.channel) - default: - n.log.Error("got unexpected change operation", "operation", u.operation) - } - } - return nil -} - -// waitOnce blocks until either 1) a notification is received and -// distributed to all topic listeners, 2) the timeout is hit, or 3) an external -// caller calls l.waitForNotificationCancel. In all 3 cases, nil is returned to -// signal good/expected exit conditions, meaning a caller can simply call -// handleNextNotification again. -func (n *notifier) waitOnce(ctx context.Context) error { - if err := n.processChannelChanges(ctx); err != nil { - return err - } - - // WaitForNotification is a blocking function, but since we want to wake - // occasionally to process new `LISTEN`/`UNLISTEN`, we let the context - // timeout and also expose a way for external code to cancel this loop with - // waitForNotificationCancel. - notification, err := func() (*Notification, error) { - const listenTimeout = 30 * time.Second - - ctx, cancel := context.WithTimeout(ctx, listenTimeout) - defer cancel() - - // Provides a way for the blocking wait to be cancelled in case a new - // subscription change comes in. - n.mu.Lock() - n.waitForNotificationCancel = cancel - n.mu.Unlock() - - notification, err := n.listener.WaitForNotification(ctx) - if err != nil { - return nil, fmt.Errorf("error waiting for notification: %w", err) - } - - return notification, nil - }() - if err != nil { - // If the error was a cancellation or the deadline being exceeded but - // there's no error in the parent context, return no error. - if (errors.Is(err, context.Canceled) || - errors.Is(err, context.DeadlineExceeded)) && ctx.Err() == nil { - return nil - } - - return err - } - - n.mu.RLock() - defer n.mu.RUnlock() - - for _, sub := range n.subscriptions[notification.Channel] { - select { - case sub.listenChan <- []byte(notification.Payload): - default: - n.log.Error("dropped notification due to full buffer", "payload", notification.Payload) - } - } - - return nil -} - -func (n *notifier) unlisten(ctx context.Context, sub *subscription) error { - n.mu.Lock() - defer n.mu.Unlock() - - subs := n.subscriptions[sub.channel] - - // stop listening if last subscriber - if len(subs) <= 1 { - // UNLISTEN for this channel - n.listenerUnlisten(ctx, sub.channel) - } - - // remove subscription from the subscriptions map - n.subscriptions[sub.channel] = slices.DeleteFunc(n.subscriptions[sub.channel], func(s *subscription) bool { - return s == sub - }) - if len(n.subscriptions[sub.channel]) < 1 { - delete(n.subscriptions, sub.channel) - } - n.log.Debug("removed subscription", "new_num_subscriptions", len(n.subscriptions[sub.channel]), "channel", sub.channel) - - return nil -} - -func (n *notifier) Run(ctx context.Context) error { - for { - err := n.waitOnce(ctx) - if err != nil || ctx.Err() != nil { - return err - } - } -} diff --git a/db/notifier_test.go b/db/notifier_test.go deleted file mode 100644 index 6558807..0000000 --- a/db/notifier_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package db - -import ( - "context" - "log/slog" - "os" - "sync" - "testing" - - "github.com/matryer/is" -) - -// Note: these tests assume you have a postgres server listening on db:5432 -// with username odk and password odk. -// -// The easiest way to ensure this is to run the tests with docker compose: -// docker compose run --rm webhook - -func TestNotifier(t *testing.T) { - dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") - if len(dbUri) == 0 { - // Default - dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable" - } - - is := is.New(t) - log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - wg := sync.WaitGroup{} - - pool, err := InitPool(ctx, log, dbUri) - is.NoErr(err) - - listener := NewListener(pool) - err = listener.Connect(ctx) - is.NoErr(err) - - n := NewNotifier(log, listener) - wg.Add(1) - go func() { - n.Run(ctx) - wg.Done() - }() - sub := n.Listen("foo") - - conn, err := pool.Acquire(ctx) - wg.Add(1) - go func() { - <-sub.EstablishedC() - conn.Exec(ctx, "select pg_notify('foo', '1')") - conn.Exec(ctx, "select pg_notify('foo', '2')") - conn.Exec(ctx, "select pg_notify('foo', '3')") - conn.Exec(ctx, "select pg_notify('foo', '4')") - conn.Exec(ctx, "select pg_notify('foo', '5')") - wg.Done() - }() - is.NoErr(err) - - wg.Add(1) - - out := make(chan string) - go func() { - <-sub.EstablishedC() - for i := 0; i < 5; i++ { - msg := <-sub.NotificationC() - out <- string(msg) - } - close(out) - wg.Done() - }() - - msgs := []string{} - for r := range out { - msgs = append(msgs, r) - } - is.Equal(msgs, []string{"1", "2", "3", "4", "5"}) - - cancel() - sub.Unlisten(ctx) // uses background ctx anyway - listener.Close(ctx) - wg.Wait() -} diff --git a/db/trigger.go b/db/trigger.go index f59cce0..3ea814b 100644 --- a/db/trigger.go +++ b/db/trigger.go @@ -11,134 +11,176 @@ type TriggerOptions struct { UpdateEntityURL *string NewSubmissionURL *string ReviewSubmissionURL *string + APIKey *string } -// Example parsed JSON -// {"action":"entity.update.version","actorId":1,"details":{"entityDefId":1001,...},"dml_action":"INSERT"}} - +// CreateTrigger creates a PostgreSQL trigger that uses pg-http extension to send +// HTTP requests directly from the database when audit events occur. func CreateTrigger(ctx context.Context, dbPool *pgxpool.Pool, tableName string, opts TriggerOptions) error { - // This trigger runs on the `audits` table by default, and creates a new event - // in the odk-events queue when a new event is created in the table + // Ensure pg-http extension is available + if err := ensureHTTPExtension(ctx, dbPool); err != nil { + return fmt.Errorf("failed to ensure pg-http extension: %w", err) + } if tableName == "" { - // default table (this is configurable for easier tests mainly) tableName = "audits" } - // Create SQL trigger function dynamically, based on params + // Build HTTP headers for pgsql-http + headersSQL := `'Content-Type', 'application/json'` + if opts.APIKey != nil { + headersSQL += fmt.Sprintf(`, 'X-API-Key', %s`, quoteSQLString(*opts.APIKey)) + } + caseStatements := "" + // --------------------------------------------------------------------- + // entity.update.version + // --------------------------------------------------------------------- if opts.UpdateEntityURL != nil { - caseStatements += ` + url := quoteSQLString(*opts.UpdateEntityURL) + caseStatements += fmt.Sprintf(` WHEN 'entity.update.version' THEN - SELECT entity_defs.data + SELECT entity_defs."data" INTO result_data FROM entity_defs WHERE entity_defs.id = (NEW.details->>'entityDefId')::int; - js := jsonb_set(js, '{data}', result_data, true); + webhook_payload := jsonb_build_object( + 'type', 'entity.update.version', + 'id', (NEW.details->'entity'->>'uuid'), + 'data', result_data + ); - IF length(js::text) > 8000 THEN - RAISE NOTICE 'Payload too large, truncating: %', left(js::text, 500) || '...'; - js := jsonb_set(js, '{truncated}', 'true'::jsonb, true); - js := jsonb_set(js, '{data}', '"Payload too large. Truncated."'::jsonb, true); - END IF; - - PERFORM pg_notify('odk-events', js::text); - ` + PERFORM http(( + 'POST', + %s, + http_headers(%s), + 'application/json', + webhook_payload::text + )::http_request); + `, url, headersSQL) } + // --------------------------------------------------------------------- + // submission.create + // --------------------------------------------------------------------- if opts.NewSubmissionURL != nil { - caseStatements += ` + url := quoteSQLString(*opts.NewSubmissionURL) + caseStatements += fmt.Sprintf(` WHEN 'submission.create' THEN SELECT jsonb_build_object('xml', submission_defs.xml) INTO result_data FROM submission_defs WHERE submission_defs.id = (NEW.details->>'submissionDefId')::int; - js := jsonb_set(js, '{data}', result_data, true); - - IF length(js::text) > 8000 THEN - RAISE NOTICE 'Payload too large, truncating: %', left(js::text, 500) || '...'; - js := jsonb_set(js, '{truncated}', 'true'::jsonb, true); - js := jsonb_set(js, '{data}', '"Payload too large. Truncated."'::jsonb, true); - END IF; + webhook_payload := jsonb_build_object( + 'type', 'submission.create', + 'id', (NEW.details->>'instanceId'), + 'data', result_data + ); - PERFORM pg_notify('odk-events', js::text); - ` + PERFORM http(( + 'POST', + %s, + http_headers(%s), + 'application/json', + webhook_payload::text + )::http_request); + `, url, headersSQL) } + // --------------------------------------------------------------------- + // submission.update + // --------------------------------------------------------------------- if opts.ReviewSubmissionURL != nil { - caseStatements += ` + url := quoteSQLString(*opts.ReviewSubmissionURL) + caseStatements += fmt.Sprintf(` WHEN 'submission.update' THEN - SELECT jsonb_build_object('instanceId', submission_defs."instanceId") - INTO result_data - FROM submission_defs - WHERE submission_defs.id = (NEW.details->>'submissionDefId')::int; - - js := jsonb_set(js, '{data}', jsonb_build_object('reviewState', js->'details'->>'reviewState'), true); - js := jsonb_set(js, '{details}', (js->'details')::jsonb - 'reviewState', true); - js := jsonb_set(js, '{details}', (js->'details') || result_data, true); + webhook_payload := jsonb_build_object( + 'type', 'submission.update', + 'id', (NEW.details->>'instanceId'), + 'data', jsonb_build_object( + 'reviewState', NEW.details->>'reviewState' + ) + ); - IF length(js::text) > 8000 THEN - RAISE NOTICE 'Payload too large, truncating: %', left(js::text, 500) || '...'; - js := jsonb_set(js, '{truncated}', 'true'::jsonb, true); - js := jsonb_set(js, '{data}', '"Payload too large. Truncated."'::jsonb, true); - END IF; - - PERFORM pg_notify('odk-events', js::text); - ` + PERFORM http(( + 'POST', + %s, + http_headers(%s), + 'application/json', + webhook_payload::text + )::http_request); + `, url, headersSQL) } - // default ELSE case (always included) + // Default case caseStatements += ` ELSE RETURN NEW; ` - // full function SQL createFunctionSQL := fmt.Sprintf(` CREATE OR REPLACE FUNCTION new_audit_log() RETURNS trigger AS $$ DECLARE - js jsonb; action_type text; result_data jsonb; + webhook_payload jsonb; BEGIN - SELECT to_jsonb(NEW.*) INTO js; - js := jsonb_set(js, '{dml_action}', to_jsonb(TG_OP)); action_type := NEW.action; + -- Prevent duplicate webhooks + -- see https://github.com/hotosm/central-webhook/issues/7 + IF + (action_type = 'submission.create' AND TG_OP != 'INSERT') + OR + (action_type IN ('entity.update.version', 'submission.update') AND TG_OP NOT IN ('INSERT', 'UPDATE')) + THEN + RETURN NEW; + END IF; + CASE action_type %s END CASE; RETURN NEW; + EXCEPTION + WHEN OTHERS THEN + RAISE WARNING 'Error in webhook trigger for action %%: %%', action_type, SQLERRM; + RETURN NEW; END; - $$ LANGUAGE 'plpgsql'; + $$ LANGUAGE plpgsql; `, caseStatements) - // SQL for dropping the existing trigger dropTriggerSQL := fmt.Sprintf(` DROP TRIGGER IF EXISTS new_audit_log_trigger ON %s; `, tableName) - // SQL for creating the new trigger createTriggerSQL := fmt.Sprintf(` CREATE TRIGGER new_audit_log_trigger - BEFORE INSERT OR UPDATE ON %s + AFTER INSERT OR UPDATE ON %s FOR EACH ROW - EXECUTE FUNCTION new_audit_log(); + EXECUTE FUNCTION new_audit_log(); `, tableName) - // Acquire a connection from the pool, close after all statements executed conn, err := dbPool.Acquire(ctx) if err != nil { return err } defer conn.Release() + // Validate that the table exists before creating the trigger + tableExists, err := checkTableExists(ctx, conn, tableName) + if err != nil { + return fmt.Errorf("failed to check if table exists: %w", err) + } + if !tableExists { + return fmt.Errorf("table %q does not exist. This tool requires an ODK Central database with the %q table. Please verify you are connecting to the correct database and that ODK Central has been properly initialized", tableName, tableName) + } + if _, err := conn.Exec(ctx, createFunctionSQL); err != nil { return fmt.Errorf("failed to create function: %w", err) } @@ -149,5 +191,104 @@ func CreateTrigger(ctx context.Context, dbPool *pgxpool.Pool, tableName string, return fmt.Errorf("failed to create trigger: %w", err) } - return err + return nil +} + +// RemoveTrigger removes the webhook trigger from the database +func RemoveTrigger(ctx context.Context, dbPool *pgxpool.Pool, tableName string) error { + if tableName == "" { + tableName = "audits" + } + + conn, err := dbPool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + // Check if table exists (optional check - we use IF EXISTS so it's not required) + // But it's helpful to provide a clear error if the table doesn't exist + tableExists, err := checkTableExists(ctx, conn, tableName) + if err != nil { + return fmt.Errorf("failed to check if table exists: %w", err) + } + if !tableExists { + return fmt.Errorf("table %q does not exist. Please verify you are connecting to the correct database", tableName) + } + + // First, drop the trigger (if it exists) + dropTriggerSQL := fmt.Sprintf(` + DROP TRIGGER IF EXISTS new_audit_log_trigger + ON %s CASCADE; + `, tableName) + + if _, err := conn.Exec(ctx, dropTriggerSQL); err != nil { + return fmt.Errorf("failed to drop trigger: %w", err) + } + + // Then drop the function with CASCADE to handle any remaining dependencies + dropFunctionSQL := ` + DROP FUNCTION IF EXISTS new_audit_log() CASCADE; + ` + + if _, err := conn.Exec(ctx, dropFunctionSQL); err != nil { + return fmt.Errorf("failed to drop function: %w", err) + } + + return nil +} + +// checkTableExists checks if a table exists in the database +func checkTableExists(ctx context.Context, conn *pgxpool.Conn, tableName string) (bool, error) { + var exists bool + query := ` + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = $1 + ); + ` + err := conn.QueryRow(ctx, query, tableName).Scan(&exists) + return exists, err +} + +// ensureHTTPExtension ensures the pg-http extension is installed +func ensureHTTPExtension(ctx context.Context, dbPool *pgxpool.Pool) error { + conn, err := dbPool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + _, err = conn.Exec(ctx, `CREATE EXTENSION IF NOT EXISTS http;`) + if err != nil { + var exists bool + checkSQL := `SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'http');` + if err := conn.QueryRow(ctx, checkSQL).Scan(&exists); err != nil { + return fmt.Errorf("failed to check for http extension: %w", err) + } + if !exists { + return fmt.Errorf("pg-http extension is not installed and cannot be created automatically") + } + } + + return nil +} + +// quoteSQLString safely quotes a string for SQL +func quoteSQLString(s string) string { + return fmt.Sprintf("'%s'", escapeSQLString(s)) +} + +// escapeSQLString escapes single quotes +func escapeSQLString(s string) string { + result := "" + for _, r := range s { + if r == '\'' { + result += "''" + } else { + result += string(r) + } + } + return result } diff --git a/db/trigger_test.go b/db/trigger_test.go index f05d17f..2743723 100644 --- a/db/trigger_test.go +++ b/db/trigger_test.go @@ -3,10 +3,14 @@ package db import ( "context" "encoding/json" + "fmt" + "io" "log/slog" + "net" + "net/http" "os" - "sync" "strings" + "sync" "testing" "time" @@ -20,6 +24,49 @@ import ( // The easiest way to ensure this is to run the tests with docker compose: // docker compose run --rm webhook +// testServer wraps an HTTP server that can be accessed from other Docker containers +type testServer struct { + server *http.Server + listener net.Listener + URL string +} + +func createTestServer(handler http.Handler) (*testServer, error) { + listener, err := net.Listen("tcp", "0.0.0.0:0") + if err != nil { + return nil, err + } + + server := &http.Server{Handler: handler} + + port := listener.Addr().(*net.TCPAddr).Port + + // Use Docker service name so Postgres container can resolve it + hostname := os.Getenv("TEST_SERVER_HOST") + if hostname == "" { + hostname = "webhook" // docker-compose service name + } + + url := fmt.Sprintf("http://%s:%d", hostname, port) + + ts := &testServer{ + server: server, + listener: listener, + URL: url, + } + + go func() { + _ = server.Serve(listener) + }() + + return ts, nil +} + +// Close stops the test server +func (ts *testServer) Close() error { + return ts.server.Close() +} + func createAuditTestsTable(ctx context.Context, conn *pgxpool.Conn, is *is.I) { _, err := conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test CASCADE;`) is.NoErr(err) @@ -62,10 +109,9 @@ func TestEntityTrigger(t *testing.T) { is := is.New(t) log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - wg := sync.WaitGroup{} pool, err := InitPool(ctx, log, dbUri) is.NoErr(err) + defer pool.Close() // Get connection and defer close conn, err := pool.Acquire(ctx) @@ -92,96 +138,33 @@ func TestEntityTrigger(t *testing.T) { // Create audits_test table createAuditTestsTable(ctx, conn, is) - // Insert an entity record - entityInsertSql := ` - INSERT INTO public.entity_defs ( - id, "entityId","createdAt","current","data","creatorId","label" - ) VALUES ( - 1001, - 900, - '2025-01-10 16:23:40.073', - true, - '{"status": "0", "task_id": "26", "version": "1"}', - 5, - 'Task 26 Feature 904487737' - ); - ` - _, err = conn.Exec(ctx, entityInsertSql) - is.NoErr(err) - - // Create audit trigger - updateEntityUrl := "https://test.com" + // Create audit trigger pointing at a dummy URL – we just inspect the SQL + dummyURL := "http://example.com/webhook" err = CreateTrigger(ctx, pool, "audits_test", TriggerOptions{ - UpdateEntityURL: &updateEntityUrl, + UpdateEntityURL: &dummyURL, }) is.NoErr(err) - // Create listener - listener := NewListener(pool) - err = listener.Connect(ctx) - is.NoErr(err) - - // Create notifier - n := NewNotifier(log, listener) - wg.Add(1) - go func() { - n.Run(ctx) - wg.Done() - }() - sub := n.Listen("odk-events") - - // Insert an audit record - auditInsertSql := ` - INSERT INTO audits_test ("actorId", action, details) - VALUES (1, 'entity.update.version', '{"entityDefId": 1001}'); - ` - _, err = conn.Exec(ctx, auditInsertSql) + // Read back the generated function and assert the entity CASE branch exists + var functionSQL string + row := conn.QueryRow(ctx, ` + SELECT pg_get_functiondef(p.oid) + FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE p.proname = 'new_audit_log' AND n.nspname = 'public' + `) + err = row.Scan(&functionSQL) is.NoErr(err) - // Validate the notification content - wg.Add(1) - out := make(chan string) - go func() { - <-sub.EstablishedC() - msg := <-sub.NotificationC() // Get the notification - - log.Info("notification received", "raw", msg) - - out <- string(msg) // Send it to the output channel - close(out) - wg.Done() - }() - - // Process the notification - var notification map[string]interface{} - for msg := range out { - err := json.Unmarshal([]byte(msg), ¬ification) - is.NoErr(err) // Ensure the JSON payload is valid - log.Info("parsed notification", "notification", notification) - } - - // Validate the JSON content - is.Equal(notification["dml_action"], "INSERT") // Ensure action is correct - is.Equal(notification["action"], "entity.update.version") // Ensure action is correct - is.True(notification["details"] != nil) // Ensure details key exists - is.True(notification["data"] != nil) // Ensure data key exists - - // Check nested JSON value for entityDefId in details - details, ok := notification["details"].(map[string]interface{}) - is.True(ok) // Ensure details is a valid map - is.Equal(details["entityDefId"], float64(1001)) // Ensure entityDefId has the correct value - - // Check nested JSON value for status in data - data, ok := notification["data"].(map[string]interface{}) - is.True(ok) // Ensure data is a valid map - is.Equal(data["status"], "0") // Ensure `status` has the correct value + is.True(strings.Contains(functionSQL, "WHEN 'entity.update.version'")) + is.True(strings.Contains(functionSQL, "'type', 'entity.update.version'")) + is.True(strings.Contains(functionSQL, "'id', (NEW.details->'entity'->>'uuid')")) + is.True(strings.Contains(functionSQL, "'data', result_data")) // Cleanup - conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs, audits_test CASCADE;`) - cancel() - sub.Unlisten(ctx) // uses background ctx anyway - listener.Close(ctx) - wg.Wait() + err = RemoveTrigger(ctx, pool, "audits_test") + is.NoErr(err) + _, _ = conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs, audits_test CASCADE;`) } // Test a new submission event type @@ -195,10 +178,9 @@ func TestNewSubmissionTrigger(t *testing.T) { is := is.New(t) log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - wg := sync.WaitGroup{} pool, err := InitPool(ctx, log, dbUri) is.NoErr(err) + defer pool.Close() // Get connection and defer close conn, err := pool.Acquire(ctx) @@ -232,82 +214,70 @@ func TestNewSubmissionTrigger(t *testing.T) { _, err = conn.Exec(ctx, submissionInsertSql) is.NoErr(err) + // Create HTTP test server to receive webhook + var receivedPayload map[string]interface{} + var requestReceived sync.WaitGroup + requestReceived.Add(1) + + server, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + is.NoErr(err) + err = json.Unmarshal(body, &receivedPayload) + is.NoErr(err) + w.WriteHeader(http.StatusOK) + requestReceived.Done() + })) + is.NoErr(err) + defer server.Close() + // Create audit trigger - newSubmissionUrl := "https://test.com" + newSubmissionUrl := server.URL err = CreateTrigger(ctx, pool, "audits_test", TriggerOptions{ - NewSubmissionURL: &newSubmissionUrl, + NewSubmissionURL: &newSubmissionUrl, }) is.NoErr(err) - // Create listener - listener := NewListener(pool) - err = listener.Connect(ctx) - is.NoErr(err) - - // Create notifier - n := NewNotifier(log, listener) - wg.Add(1) - go func() { - n.Run(ctx) - wg.Done() - }() - sub := n.Listen("odk-events") - // Insert an audit record auditInsertSql := ` INSERT INTO audits_test ("actorId", action, details) - VALUES (5, 'submission.create', '{"submissionDefId": 1}'); + VALUES (5, 'submission.create', '{"submissionDefId": 1, "instanceId": "test-instance-123"}'); ` _, err = conn.Exec(ctx, auditInsertSql) is.NoErr(err) - // Validate the notification content - wg.Add(1) - out := make(chan string) + // Wait for HTTP request to be received + done := make(chan struct{}) go func() { - <-sub.EstablishedC() - msg := <-sub.NotificationC() // Get the notification - - log.Info("notification received", "raw", msg) - - out <- string(msg) // Send it to the output channel - close(out) - wg.Done() + requestReceived.Wait() + close(done) }() - // Process the notification - var notification map[string]interface{} - for msg := range out { - err := json.Unmarshal([]byte(msg), ¬ification) - is.NoErr(err) // Ensure the JSON payload is valid - log.Info("parsed notification", "notification", notification) + select { + case <-done: + // Request received successfully + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for webhook request") } - // Validate the JSON content - is.Equal(notification["dml_action"], "INSERT") // Ensure action is correct - is.Equal(notification["action"], "submission.create") // Ensure action is correct - is.True(notification["details"] != nil) // Ensure details key exists - is.True(notification["data"] != nil) // Ensure data key exists + // Validate the webhook payload + is.True(receivedPayload != nil) + is.Equal(receivedPayload["type"], "submission.create") + is.Equal(receivedPayload["id"], "test-instance-123") + is.True(receivedPayload["data"] != nil) - // Check nested JSON value for submissionDefId in details - details, ok := notification["details"].(map[string]interface{}) - is.True(ok) // Ensure details is a valid map - is.Equal(details["submissionDefId"], float64(1)) // Ensure submissionDefId has the correct value - - data, ok := notification["data"].(map[string]interface{}) + data, ok := receivedPayload["data"].(map[string]interface{}) is.True(ok) // Ensure data is a valid map is.Equal(data["xml"], ``) // Ensure `xml` has the correct value // Cleanup - conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs, audits_test CASCADE;`) - cancel() - sub.Unlisten(ctx) // uses background ctx anyway - listener.Close(ctx) - wg.Wait() + err = RemoveTrigger(ctx, pool, "audits_test") + is.NoErr(err) + _, _ = conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs, audits_test CASCADE;`) } -// Test submission truncation works correctly -func TestNewSubmissionTrigger_TruncatesLargePayload(t *testing.T) { +// Test a review submission event type +func TestReviewSubmissionTrigger(t *testing.T) { dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") if len(dbUri) == 0 { // Default @@ -317,11 +287,11 @@ func TestNewSubmissionTrigger_TruncatesLargePayload(t *testing.T) { is := is.New(t) log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - wg := sync.WaitGroup{} pool, err := InitPool(ctx, log, dbUri) is.NoErr(err) + defer pool.Close() + // Get connection and defer close conn, err := pool.Acquire(ctx) is.NoErr(err) defer conn.Release() @@ -332,94 +302,85 @@ func TestNewSubmissionTrigger_TruncatesLargePayload(t *testing.T) { // Create audits_test table createAuditTestsTable(ctx, conn, is) - // Insert submission with large XML - largeXml := "" + strings.Repeat("x", 9000) + "" + // Insert an submission record submissionInsertSql := ` INSERT INTO submission_defs ( id, "submissionId", - xml, - "formDefId", - "submitterId", - "createdAt" + "instanceId" ) VALUES ( 1, 2, - $1, - 7, - 5, - '2025-01-10 16:23:40.073' + '33448049-0df1-4426-9392-d3a294d638ad' ); ` - _, err = conn.Exec(ctx, submissionInsertSql, largeXml) + _, err = conn.Exec(ctx, submissionInsertSql) is.NoErr(err) + // Create HTTP test server to receive webhook + var receivedPayload map[string]interface{} + var requestReceived sync.WaitGroup + requestReceived.Add(1) + + server, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + is.NoErr(err) + err = json.Unmarshal(body, &receivedPayload) + is.NoErr(err) + w.WriteHeader(http.StatusOK) + requestReceived.Done() + })) + is.NoErr(err) + defer server.Close() + // Create audit trigger - newSubmissionUrl := "https://test.com" + reviewSubmissionUrl := server.URL err = CreateTrigger(ctx, pool, "audits_test", TriggerOptions{ - NewSubmissionURL: &newSubmissionUrl, + ReviewSubmissionURL: &reviewSubmissionUrl, }) is.NoErr(err) - // Create listener - listener := NewListener(pool) - err = listener.Connect(ctx) - is.NoErr(err) - - // Create notifier - n := NewNotifier(log, listener) - wg.Add(1) - go func() { - n.Run(ctx) - wg.Done() - }() - sub := n.Listen("odk-events") - // Insert an audit record auditInsertSql := ` INSERT INTO audits_test ("actorId", action, details) - VALUES (5, 'submission.create', '{"submissionDefId": 1}'); + VALUES (5, 'submission.update', '{"submissionDefId": 1, "instanceId": "33448049-0df1-4426-9392-d3a294d638ad", "reviewState": "approved"}'); ` _, err = conn.Exec(ctx, auditInsertSql) is.NoErr(err) - // Validate the notification content - wg.Add(1) - out := make(chan string) + // Wait for HTTP request to be received + done := make(chan struct{}) go func() { - <-sub.EstablishedC() - msg := <-sub.NotificationC() // Get the notification - - log.Info("notification received", "raw", msg) - - out <- string(msg) // Send it to the output channel - close(out) - wg.Done() + requestReceived.Wait() + close(done) }() - // Process the notification - var notification map[string]interface{} - for msg := range out { - err := json.Unmarshal([]byte(msg), ¬ification) - is.NoErr(err) // Ensure the JSON payload is valid - log.Info("parsed notification", "notification", notification) + select { + case <-done: + // Request received successfully + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for webhook request") } - // Assert truncation - is.Equal(notification["truncated"], true) - is.Equal(notification["data"], "Payload too large. Truncated.") - is.Equal(notification["action"], "submission.create") + // Validate the webhook payload + is.True(receivedPayload != nil) + is.Equal(receivedPayload["type"], "submission.update") + is.Equal(receivedPayload["id"], "33448049-0df1-4426-9392-d3a294d638ad") + + // Check reviewState present in data key + data, ok := receivedPayload["data"].(map[string]interface{}) + is.True(ok) // Ensure data is a valid map + is.Equal(data["reviewState"], "approved") // Ensure reviewState has the correct value // Cleanup - conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs, audits_test CASCADE;`) - cancel() - sub.Unlisten(ctx) // uses background ctx anyway - listener.Close(ctx) - wg.Wait() + err = RemoveTrigger(ctx, pool, "audits_test") + is.NoErr(err) + _, _ = conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs, audits_test CASCADE;`) } -// Test a new submission event type -func TestReviewSubmissionTrigger(t *testing.T) { +// Test an unsupported event type and ensure nothing is triggered +func TestNoTrigger(t *testing.T) { dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") if len(dbUri) == 0 { // Default @@ -429,115 +390,123 @@ func TestReviewSubmissionTrigger(t *testing.T) { is := is.New(t) log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - wg := sync.WaitGroup{} pool, err := InitPool(ctx, log, dbUri) is.NoErr(err) + defer pool.Close() // Get connection and defer close conn, err := pool.Acquire(ctx) is.NoErr(err) defer conn.Release() - // Create submission_defs table - createSubmissionDefsTable(ctx, conn, is) - // Create audits_test table createAuditTestsTable(ctx, conn, is) - // Insert an submission record - submissionInsertSql := ` - INSERT INTO submission_defs ( - id, - "submissionId", - "instanceId" - ) VALUES ( - 1, - 2, - '33448049-0df1-4426-9392-d3a294d638ad' - ); - ` - _, err = conn.Exec(ctx, submissionInsertSql) + // Create HTTP test server to receive webhook + var requestReceived sync.WaitGroup + requestReceived.Add(1) + + server, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestReceived.Done() + w.WriteHeader(http.StatusOK) + })) is.NoErr(err) + defer server.Close() // Create audit trigger - reviewSubmissionUrl := "https://test.com" + newSubmissionUrl := server.URL err = CreateTrigger(ctx, pool, "audits_test", TriggerOptions{ - ReviewSubmissionURL: &reviewSubmissionUrl, + NewSubmissionURL: &newSubmissionUrl, }) is.NoErr(err) - // Create listener - listener := NewListener(pool) - err = listener.Connect(ctx) - is.NoErr(err) - - // Create notifier - n := NewNotifier(log, listener) - wg.Add(1) - go func() { - n.Run(ctx) - wg.Done() - }() - sub := n.Listen("odk-events") - - // Insert an audit record + // Insert an audit record with unsupported event type auditInsertSql := ` INSERT INTO audits_test ("actorId", action, details) - VALUES (5, 'submission.update', '{"submissionDefId": 1, "reviewState": "approved"}'); + VALUES (1, 'invalid.event', '{"submissionDefId": 5}'); ` _, err = conn.Exec(ctx, auditInsertSql) is.NoErr(err) - // Validate the notification content - wg.Add(1) - out := make(chan string) - go func() { - <-sub.EstablishedC() - msg := <-sub.NotificationC() // Get the notification - - log.Info("notification received", "raw", msg) + // Validate that no event was triggered for invalid event type + // The HTTP server should not receive a request + select { + case <-time.After(2 * time.Second): + // No request received - this is expected + log.Info("no event triggered for invalid event type") + case <-func() chan struct{} { + done := make(chan struct{}) + go func() { + requestReceived.Wait() + close(done) + }() + return done + }(): + // If a request was received, we failed the test + t.Fatal("unexpected webhook request received for invalid event type") + } - out <- string(msg) // Send it to the output channel - close(out) - wg.Done() - }() + // Cleanup - only drop audits_test since submission_defs wasn't created in this test + err = RemoveTrigger(ctx, pool, "audits_test") + is.NoErr(err) + _, _ = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test CASCADE;`) +} - // Process the notification - var notification map[string]interface{} - for msg := range out { - err := json.Unmarshal([]byte(msg), ¬ification) - is.NoErr(err) // Ensure the JSON payload is valid - log.Info("parsed notification", "notification", notification) +// Test that only the related CASE statements are added to the SQL function +func TestModularSql(t *testing.T) { + dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") + if len(dbUri) == 0 { + // Default + dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable" } - // Validate the JSON content - is.Equal(notification["dml_action"], "INSERT") // Ensure action is correct - is.Equal(notification["action"], "submission.update") // Ensure action is correct - is.True(notification["details"] != nil) // Ensure details key exists - is.True(notification["data"] != nil) // Ensure data key exists + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + pool, err := InitPool(ctx, log, dbUri) + is.NoErr(err) + defer pool.Close() - // Check nested JSON value for submissionDefId - details, ok := notification["details"].(map[string]interface{}) - is.True(ok) // Ensure details is a valid map - is.Equal(details["submissionDefId"], float64(1)) // Ensure submissionDefId has the correct value - is.Equal(details["instanceId"], "33448049-0df1-4426-9392-d3a294d638ad") // Ensure instanceId has the correct value + // Get connection and defer close + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() - // Check reviewState present in data key - data, ok := notification["data"].(map[string]interface{}) - is.True(ok) // Ensure data is a valid map - is.Equal(data["reviewState"], "approved") // Ensure reviewState has the correct value + // Create audits_test table + createAuditTestsTable(ctx, conn, is) + + // Create audit trigger + updateEntityUrl := "https://test.com" + err = CreateTrigger(ctx, pool, "audits_test", TriggerOptions{ + UpdateEntityURL: &updateEntityUrl, + }) + is.NoErr(err) + + // Verify the function only contains the entity.update.version CASE + var functionSQL string + row := conn.QueryRow(ctx, ` + SELECT pg_get_functiondef(p.oid) + FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE p.proname = 'new_audit_log' AND n.nspname = 'public' + `) + err = row.Scan(&functionSQL) + is.NoErr(err) + + // Check that the expected CASE branch is present + is.True(strings.Contains(functionSQL, "WHEN 'entity.update.version'")) + // Ensure that other cases are not present + is.True(!strings.Contains(functionSQL, "WHEN 'submission.create'")) + is.True(!strings.Contains(functionSQL, "WHEN 'submission.update'")) // Cleanup - conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs, audits_test CASCADE;`) - cancel() - sub.Unlisten(ctx) // uses background ctx anyway - listener.Close(ctx) - wg.Wait() + err = RemoveTrigger(ctx, pool, "audits_test") + is.NoErr(err) + _, _ = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test CASCADE;`) } -// Test an unsupported event type and ensure nothing is triggered -func TestNoTrigger(t *testing.T) { +// Test RemoveTrigger function +func TestRemoveTrigger(t *testing.T) { dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") if len(dbUri) == 0 { // Default @@ -547,11 +516,10 @@ func TestNoTrigger(t *testing.T) { is := is.New(t) log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) pool, err := InitPool(ctx, log, dbUri) is.NoErr(err) + defer pool.Close() - // Get connection and defer close conn, err := pool.Acquire(ctx) is.NoErr(err) defer conn.Release() @@ -559,64 +527,67 @@ func TestNoTrigger(t *testing.T) { // Create audits_test table createAuditTestsTable(ctx, conn, is) - // Create audit trigger - newSubmissionUrl := "https://test.com" + // Create trigger + updateEntityUrl := "https://test.com" err = CreateTrigger(ctx, pool, "audits_test", TriggerOptions{ - NewSubmissionURL: &newSubmissionUrl, + UpdateEntityURL: &updateEntityUrl, }) is.NoErr(err) - // Create listener - listener := NewListener(pool) - err = listener.Connect(ctx) + // Verify trigger exists + var triggerExists bool + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_trigger + WHERE tgname = 'new_audit_log_trigger' + ) + `).Scan(&triggerExists) is.NoErr(err) + is.True(triggerExists) - // Create notifier - n := NewNotifier(log, listener) - go func() { - n.Run(ctx) - }() - sub := n.Listen("odk-events") - - // Insert an audit record - auditInsertSql := ` - INSERT INTO audits_test ("actorId", action, details) - VALUES (1, 'invalid.event', '{"submissionDefId": 5}'); - ` - _, err = conn.Exec(ctx, auditInsertSql) + // Verify function exists + var functionExists bool + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE p.proname = 'new_audit_log' AND n.nspname = 'public' + ) + `).Scan(&functionExists) is.NoErr(err) + is.True(functionExists) - // Ensure that no event is fired for incorrect event type - out := make(chan string) - go func() { - <-sub.EstablishedC() - msg := <-sub.NotificationC() // Get the notification - - log.Info("notification received", "raw", msg) + // Remove trigger + err = RemoveTrigger(ctx, pool, "audits_test") + is.NoErr(err) - out <- string(msg) // Send it to the output channel - close(out) - }() + // Verify trigger is removed + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_trigger + WHERE tgname = 'new_audit_log_trigger' + ) + `).Scan(&triggerExists) + is.NoErr(err) + is.True(!triggerExists) - // Validate that no event was triggered for invalid event type - select { - case msg := <-out: - // If a message was received, we failed the test since no event should be fired - t.Fatalf("pnexpected message received: %s", msg) - case <-time.After(1 * time.Second): - // No message should have been received within the timeout - log.Info("no event triggered for invalid event type") - } + // Verify function is removed + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE p.proname = 'new_audit_log' AND n.nspname = 'public' + ) + `).Scan(&functionExists) + is.NoErr(err) + is.True(!functionExists) // Cleanup - conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs, audits_test CASCADE;`) - cancel() - sub.Unlisten(ctx) // uses background ctx anyway - listener.Close(ctx) + _, _ = conn.Exec(ctx, `DROP TABLE IF EXISTS audits_test CASCADE;`) } -// Test that only the related CASE statements are added to the SQL function -func TestModularSql(t *testing.T) { +// Test API key is included in headers +func TestTriggerWithAPIKey(t *testing.T) { dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") if len(dbUri) == 0 { // Default @@ -626,43 +597,101 @@ func TestModularSql(t *testing.T) { is := is.New(t) log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) pool, err := InitPool(ctx, log, dbUri) is.NoErr(err) + defer pool.Close() - // Get connection and defer close conn, err := pool.Acquire(ctx) is.NoErr(err) defer conn.Release() + // Create entity_defs table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs CASCADE;`) + is.NoErr(err) + entityTableCreateSql := ` + CREATE TABLE entity_defs ( + id int4, + "entityId" int4, + "createdAt" timestamptz, + "current" bool, + "data" jsonb, + "creatorId" int4, + "label" text + ); + ` + _, err = conn.Exec(ctx, entityTableCreateSql) + is.NoErr(err) + // Create audits_test table createAuditTestsTable(ctx, conn, is) - // Create audit trigger - updateEntityUrl := "https://test.com" + // Insert an entity record + entityInsertSql := ` + INSERT INTO public.entity_defs ( + id, "entityId","createdAt","current","data","creatorId","label" + ) VALUES ( + 1001, + 900, + '2025-01-10 16:23:40.073', + true, + '{"status": "0"}', + 5, + 'Test Entity' + ); + ` + _, err = conn.Exec(ctx, entityInsertSql) + is.NoErr(err) + + // Create HTTP test server to receive webhook + var receivedHeaders http.Header + var requestReceived sync.WaitGroup + requestReceived.Add(1) + + server, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedHeaders = r.Header + w.WriteHeader(http.StatusOK) + requestReceived.Done() + })) + is.NoErr(err) + defer server.Close() + + // Create audit trigger with API key + updateEntityUrl := server.URL + apiKey := "test-api-key-12345" err = CreateTrigger(ctx, pool, "audits_test", TriggerOptions{ - UpdateEntityURL: &updateEntityUrl, + UpdateEntityURL: &updateEntityUrl, + APIKey: &apiKey, }) is.NoErr(err) - // Verify the function only contains the entity.update.version CASE - var functionSQL string - row := conn.QueryRow(ctx, ` - SELECT pg_get_functiondef(p.oid) - FROM pg_proc p - JOIN pg_namespace n ON p.pronamespace = n.oid - WHERE p.proname = 'new_audit_log' AND n.nspname = 'public' - `) - err = row.Scan(&functionSQL) + // Insert an audit record to trigger the webhook + auditInsertSql := ` + INSERT INTO audits_test ("actorId", action, details) + VALUES (1, 'entity.update.version', '{"entityDefId": 1001, "entity": {"uuid": "test-uuid", "dataset": "test"}}'); + ` + _, err = conn.Exec(ctx, auditInsertSql) is.NoErr(err) - // Check that the expected CASE branch is present - is.True(strings.Contains(functionSQL, "WHEN 'entity.update.version'")) - // Ensure that other cases are not present - is.True(!strings.Contains(functionSQL, "WHEN 'submission.create'")) - is.True(!strings.Contains(functionSQL, "WHEN 'submission.update'")) + // Wait for HTTP request to be received + done := make(chan struct{}) + go func() { + requestReceived.Wait() + close(done) + }() + + select { + case <-done: + // Request received successfully + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for webhook request") + } + + // Validate API key header + is.Equal(receivedHeaders.Get("X-API-Key"), "test-api-key-12345") + is.Equal(receivedHeaders.Get("Content-Type"), "application/json") // Cleanup - conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs, audits_test CASCADE;`) - cancel() + err = RemoveTrigger(ctx, pool, "audits_test") + is.NoErr(err) + _, _ = conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs, audits_test CASCADE;`) } diff --git a/go.mod b/go.mod index bb2dc3f..a1a2244 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/hotosm/central-webhook -go 1.24.2 +go 1.25.5 require ( github.com/jackc/pgx/v5 v5.7.4 diff --git a/main.go b/main.go index b855119..d19371f 100644 --- a/main.go +++ b/main.go @@ -1,5 +1,3 @@ -// Wrapper for the main tool functionality - package main import ( @@ -8,16 +6,10 @@ import ( "fmt" "log/slog" "os" - "os/signal" "path/filepath" "strings" - "syscall" - - "github.com/jackc/pgx/v5/pgxpool" "github.com/hotosm/central-webhook/db" - "github.com/hotosm/central-webhook/parser" - "github.com/hotosm/central-webhook/webhook" ) func getDefaultLogger(lvl slog.Level) *slog.Logger { @@ -37,115 +29,6 @@ func getDefaultLogger(lvl slog.Level) *slog.Logger { })) } -func SetupWebhook( - log *slog.Logger, - ctx context.Context, - dbPool *pgxpool.Pool, - apiKey *string, // use a pointer so it's possible to pass 'nil; - updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl string, -) error { - // setup the listener - listener := db.NewListener(dbPool) - if err := listener.Connect(ctx); err != nil { - log.Error("error setting up listener: %v", "error", err) - return err - } - - // init the trigger function - db.CreateTrigger(ctx, dbPool, "audits", db.TriggerOptions{ - UpdateEntityURL: &updateEntityUrl, - NewSubmissionURL: &newSubmissionUrl, - ReviewSubmissionURL: &reviewSubmissionUrl, - }) - - // setup the notifier - notifier := db.NewNotifier(log, listener) - go notifier.Run(ctx) - - // subscribe to the 'odk-events' channel - log.Info("listening to odk-events channel") - sub := notifier.Listen("odk-events") - - // indefinitely listen for updates - go func() { - <-sub.EstablishedC() - for { - select { - case <-ctx.Done(): - sub.Unlisten(ctx) - log.Info("done listening for notifications") - return - - case data := <-sub.NotificationC(): - eventData := string(data) - log.Debug("got notification", "data", eventData) - - parsedData, err := parser.ParseEventJson(log, ctx, []byte(eventData)) - if err != nil { - log.Error("failed to parse notification", "error", err) - continue // Skip processing this notification - } - - // Only send the request for correctly parsed (supported) events - if parsedData != nil { - if parsedData.Type == "entity.update.version" && updateEntityUrl != "" { - webhook.SendRequest(log, ctx, updateEntityUrl, *parsedData, apiKey) - } else if parsedData.Type == "submission.create" && newSubmissionUrl != "" { - webhook.SendRequest(log, ctx, newSubmissionUrl, *parsedData, apiKey) - } else if parsedData.Type == "submission.update" && reviewSubmissionUrl != "" { - webhook.SendRequest(log, ctx, reviewSubmissionUrl, *parsedData, apiKey) - } else { - log.Debug( - fmt.Sprintf( - "%s event type was triggered, but no webhook url was provided", - parsedData.Type, - ), - "eventType", - parsedData.Type, - ) - } - } - } - } - }() - - // unsubscribe after 60s - // go func() { - // time.Sleep(3 * time.Second) - // sub.Unlisten(ctx) - // }() - - stopCtx, cancel := context.WithCancel(ctx) - defer cancel() - - // Listen for termination signals (e.g., SIGINT/SIGTERM) - go func() { - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - <-c - log.Info("received shutdown signal") - cancel() - }() - - <-stopCtx.Done() - log.Info("application shutting down") - - return nil -} - -func printStartupMsg() { - banner := ` - _____ _ _ __ __ _ _ _ - / ____| | | | | \ \ / / | | | | | | - | | ___ _ __ | |_ _ __ __ _| | \ \ /\ / /__| |__ | |__ ___ ___ | | __ - | | / _ \ '_ \| __| '__/ _' | | \ \/ \/ / _ \ '_ \| '_ \ / _ \ / _ \| |/ / - | |___| __/ | | | |_| | | (_| | | \ /\ / __/ |_) | | | | (_) | (_) | < - \_____\___|_| |_|\__|_| \__,_|_| \/ \/ \___|_.__/|_| |_|\___/ \___/|_|\_\ - ` - fmt.Println(banner) - fmt.Println("") -} - func main() { ctx := context.Background() @@ -157,9 +40,11 @@ func main() { defaultApiKey := os.Getenv("CENTRAL_WEBHOOK_API_KEY") defaultLogLevel := os.Getenv("CENTRAL_WEBHOOK_LOG_LEVEL") + // Database connection var dbUri string - flag.StringVar(&dbUri, "db", defaultDbUri, "DB host (postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable)") + flag.StringVar(&dbUri, "db", defaultDbUri, "DB URI (postgresql://{user}:{password}@{hostname}/{db}?sslmode=disable)") + // Webhook URLs var updateEntityUrl string flag.StringVar(&updateEntityUrl, "updateEntityUrl", defaultUpdateEntityUrl, "Webhook URL for update entity events") @@ -169,13 +54,55 @@ func main() { var reviewSubmissionUrl string flag.StringVar(&reviewSubmissionUrl, "reviewSubmissionUrl", defaultReviewSubmissionUrl, "Webhook URL for review submission events") + // API Key var apiKey string - flag.StringVar(&apiKey, "apiKey", defaultApiKey, "X-API-Key header value, for autenticating with webhook API") + flag.StringVar(&apiKey, "apiKey", defaultApiKey, "X-API-Key header value for authenticating with webhook API") + // Logging var debug bool flag.BoolVar(&debug, "debug", false, "Enable debug logging") - flag.Parse() + // Check if first argument is a command (allows flags after command) + var command string + if len(os.Args) > 1 { + firstArg := os.Args[1] + if firstArg == "install" || firstArg == "uninstall" { + command = firstArg + // Temporarily remove command from os.Args so flag.Parse() can parse remaining flags + originalArgs := os.Args + os.Args = append([]string{os.Args[0]}, os.Args[2:]...) + flag.Parse() + os.Args = originalArgs // Restore for potential future use + } else { + // Command not first, parse flags normally + flag.Parse() + args := flag.Args() + if len(args) == 0 { + fmt.Fprintf(os.Stderr, "Error: command is required. Use 'install' or 'uninstall'\n") + fmt.Fprintf(os.Stderr, "Usage: %s [flags] or %s [flags] \n", os.Args[0], os.Args[0]) + flag.PrintDefaults() + os.Exit(1) + } + command = args[0] + } + } else { + // No arguments at all + flag.Parse() + args := flag.Args() + if len(args) == 0 { + fmt.Fprintf(os.Stderr, "Error: command is required. Use 'install' or 'uninstall'\n") + fmt.Fprintf(os.Stderr, "Usage: %s [flags] or %s [flags] \n", os.Args[0], os.Args[0]) + flag.PrintDefaults() + os.Exit(1) + } + command = args[0] + } + + // Validate command + if command != "install" && command != "uninstall" { + fmt.Fprintf(os.Stderr, "Error: command must be either 'install' or 'uninstall', got: %s\n", command) + os.Exit(1) + } // Set logging level var logLevel slog.Level @@ -188,29 +115,74 @@ func main() { } log := getDefaultLogger(logLevel) + // Validate database URI if dbUri == "" { - fmt.Fprintf(os.Stderr, "DB URI is required\n") + fmt.Fprintf(os.Stderr, "Error: DB URI is required\n") flag.PrintDefaults() os.Exit(1) } - if updateEntityUrl == "" && newSubmissionUrl == "" && reviewSubmissionUrl == "" { - fmt.Fprintf(os.Stderr, "At least one of updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl is required\n") - flag.PrintDefaults() - os.Exit(1) + // For install command, validate at least one webhook URL is provided + if command == "install" { + if updateEntityUrl == "" && newSubmissionUrl == "" && reviewSubmissionUrl == "" { + fmt.Fprintf(os.Stderr, "Error: At least one webhook URL is required for install command\n") + fmt.Fprintf(os.Stderr, " Provide at least one of: -updateEntityUrl, -newSubmissionUrl, -reviewSubmissionUrl\n") + flag.PrintDefaults() + os.Exit(1) + } } // Get a connection pool dbPool, err := db.InitPool(ctx, log, dbUri) if err != nil { - fmt.Fprintf(os.Stderr, "could not connect to database: %v", err) + fmt.Fprintf(os.Stderr, "Error: could not connect to database: %v\n", err) os.Exit(1) } + defer dbPool.Close() + + // Execute command + switch command { + case "install": + var apiKeyPtr *string + if apiKey != "" { + apiKeyPtr = &apiKey + } - printStartupMsg() - err = SetupWebhook(log, ctx, dbPool, &apiKey, updateEntityUrl, newSubmissionUrl, reviewSubmissionUrl) - if err != nil { - fmt.Fprintf(os.Stderr, "error setting up webhook: %v", err) - os.Exit(1) + var updateEntityUrlPtr *string + if updateEntityUrl != "" { + updateEntityUrlPtr = &updateEntityUrl + } + + var newSubmissionUrlPtr *string + if newSubmissionUrl != "" { + newSubmissionUrlPtr = &newSubmissionUrl + } + + var reviewSubmissionUrlPtr *string + if reviewSubmissionUrl != "" { + reviewSubmissionUrlPtr = &reviewSubmissionUrl + } + + log.Info("Installing webhook triggers") + err = db.CreateTrigger(ctx, dbPool, "audits", db.TriggerOptions{ + UpdateEntityURL: updateEntityUrlPtr, + NewSubmissionURL: newSubmissionUrlPtr, + ReviewSubmissionURL: reviewSubmissionUrlPtr, + APIKey: apiKeyPtr, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: failed to install triggers: %v\n", err) + os.Exit(1) + } + log.Info("Webhook triggers installed successfully") + + case "uninstall": + log.Info("Uninstalling webhook triggers") + err = db.RemoveTrigger(ctx, dbPool, "audits") + if err != nil { + fmt.Fprintf(os.Stderr, "Error: failed to uninstall triggers: %v\n", err) + os.Exit(1) + } + log.Info("Webhook triggers uninstalled successfully") } } diff --git a/main_test.go b/main_test.go index 886f441..95259f9 100644 --- a/main_test.go +++ b/main_test.go @@ -1,158 +1,626 @@ package main -// import ( -// "context" -// "encoding/json" -// "log/slog" -// "net/http" -// "net/http/httptest" -// "os" -// "sync" -// "testing" -// "time" - -// "github.com/matryer/is" - -// "github.com/hotosm/central-webhook/db" -// "github.com/hotosm/central-webhook/parser" -// ) - -// func TestSetupWebhook(t *testing.T) { -// dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") -// if len(dbUri) == 0 { -// dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable" -// } - -// is := is.New(t) -// wg := sync.WaitGroup{} -// log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ -// Level: slog.LevelDebug, -// })) -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() - -// dbPool, err := db.InitPool(ctx, log, dbUri) -// is.NoErr(err) -// defer dbPool.Close() - -// conn, err := dbPool.Acquire(ctx) -// is.NoErr(err) -// defer conn.Release() - -// // Create test tables -// conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) -// conn.Exec(ctx, `DROP TABLE IF EXISTS audits;`) -// createTables := []string{ -// `CREATE TABLE IF NOT EXISTS entity_defs ( -// id SERIAL PRIMARY KEY, -// "entityId" INT, -// "createdAt" TIMESTAMPTZ, -// "current" BOOL, -// "data" JSONB, -// "creatorId" INT, -// "label" TEXT -// );`, -// `CREATE TABLE IF NOT EXISTS audits ( -// "actorId" INT, -// action VARCHAR, -// details JSONB -// );`, -// } -// for _, sql := range createTables { -// _, err := conn.Exec(ctx, sql) -// is.NoErr(err) -// } - -// // Insert an entity record -// log.Info("inserting entity details record") -// _, err = conn.Exec(ctx, ` -// INSERT INTO public.entity_defs ( -// id, "entityId","createdAt","current","data","creatorId","label" -// ) VALUES ( -// 1001, -// 900, -// '2025-01-10 16:23:40.073', -// true, -// '{"status": "0", "task_id": "26", "version": "1"}', -// 5, -// 'Task 26 Feature 904487737' -// ); -// `) -// is.NoErr(err) -// log.Info("entity record inserted") - -// // Mock webhook server -// webhookReceived := make(chan bool, 1) -// mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { -// defer r.Body.Close() -// var payload parser.ProcessedEvent -// err := json.NewDecoder(r.Body).Decode(&payload) -// is.NoErr(err) - -// log.Info("payload received", "payload", payload) -// is.Equal(payload.ID, "xxx") // Validate Entity ID - -// // Convert the payload.Data to map[string]string for comparison -// actualData, ok := payload.Data.(map[string]interface{}) -// is.True(ok) // Ensure the type assertion succeeded - -// expectedData := map[string]interface{}{ -// "status": "0", -// "task_id": "26", -// "version": "1", -// } -// is.Equal(actualData, expectedData) // Validate Entity data - -// webhookReceived <- true -// w.Header().Set("Content-Type", "application/json") -// w.WriteHeader(http.StatusOK) -// })) -// defer mockServer.Close() - -// // Start webhook listener -// wg.Add(1) -// go func() { -// defer wg.Done() -// log.Info("starting webhook listener") -// err := SetupWebhook(log, ctx, dbPool, nil, mockServer.URL, mockServer.URL, mockServer.URL) -// if err != nil && ctx.Err() == nil { -// log.Error("webhook listener error", "error", err) -// } -// }() - -// // Wait for the listener to initialize -// log.Info("waiting for listener to initialize") -// time.Sleep(300 * time.Millisecond) // Wait for the listener to be fully set up - -// // Insert an audit log to trigger the webhook -// log.Info("inserting audit log") -// _, err = conn.Exec(ctx, ` -// INSERT INTO audits ("actorId", action, details) -// VALUES ( -// 1, -// 'entity.update.version', -// '{"entityDefId": 1001, "entityId": 1000, "entity": {"uuid": "xxx", "dataset": "test"}}' -// ); -// `) -// is.NoErr(err) - -// // Wait for webhook response or timeout -// select { -// case <-webhookReceived: -// log.Info("webhook received successfully") -// case <-time.After(3 * time.Second): -// t.Fatalf("test timed out waiting for webhook") -// } - -// // Allow some time for final webhook processing -// time.Sleep(100 * time.Millisecond) - -// // Cleanup -// log.Info("cleaning up...") -// cancel() -// wg.Wait() -// conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs;`) -// conn.Exec(ctx, `DROP TABLE IF EXISTS audits;`) -// conn.Release() -// dbPool.Close() -// } +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "os" + "sync" + "testing" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/hotosm/central-webhook/db" + "github.com/matryer/is" +) + +// testServer wraps an HTTP server that can be accessed from other Docker containers +type testServer struct { + server *http.Server + listener net.Listener + URL string +} + +// createTestServer creates an HTTP server bound to 0.0.0.0 so it can be accessed +// from other Docker containers. Returns the server and a URL using the container hostname. +func createTestServer(handler http.Handler) (*testServer, error) { + // Listen on all interfaces (0.0.0.0) so other containers can connect + listener, err := net.Listen("tcp", "0.0.0.0:0") + if err != nil { + return nil, err + } + + server := &http.Server{ + Handler: handler, + } + + // Get the actual port assigned + port := listener.Addr().(*net.TCPAddr).Port + + // Get container hostname, fallback to service name "webhook" if hostname unavailable + hostname, err := os.Hostname() + if err != nil { + hostname = "webhook" // Use service name from docker-compose + } + + // Construct URL using container hostname and port + url := fmt.Sprintf("http://%s:%d", hostname, port) + + ts := &testServer{ + server: server, + listener: listener, + URL: url, + } + + // Start the server in a goroutine + go func() { + _ = server.Serve(listener) + }() + + return ts, nil +} + +// Close stops the test server +func (ts *testServer) Close() error { + return ts.server.Close() +} + +func getTestDBURI() string { + dbUri := os.Getenv("CENTRAL_WEBHOOK_DB_URI") + if len(dbUri) == 0 { + dbUri = "postgresql://odk:odk@db:5432/odk?sslmode=disable" + } + return dbUri +} + +func setupTestTables(ctx context.Context, conn *pgxpool.Conn) error { + // Create entity_defs table + _, err := conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs CASCADE;`) + if err != nil { + return err + } + entityTableSQL := ` + CREATE TABLE entity_defs ( + id int4, + "entityId" int4, + "createdAt" timestamptz, + "current" bool, + "data" jsonb, + "creatorId" int4, + "label" text + ); + ` + _, err = conn.Exec(ctx, entityTableSQL) + if err != nil { + return err + } + + // Create audits table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS audits CASCADE;`) + if err != nil { + return err + } + auditTableSQL := ` + CREATE TABLE audits ( + "actorId" int, + action varchar, + details jsonb + ); + ` + _, err = conn.Exec(ctx, auditTableSQL) + return err +} + +func cleanupTestTables(ctx context.Context, conn *pgxpool.Conn) error { + _, err := conn.Exec(ctx, `DROP TABLE IF EXISTS entity_defs, audits CASCADE;`) + return err +} + +// TestInstallCommand tests that the install command works correctly +func TestInstallCommand(t *testing.T) { + dbUri := getTestDBURI() + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + + pool, err := db.InitPool(ctx, log, dbUri) + is.NoErr(err) + defer pool.Close() + + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Clean up any existing triggers first + _ = db.RemoveTrigger(ctx, pool, "audits") + // Small delay to ensure cleanup completes + time.Sleep(100 * time.Millisecond) + + // Setup test tables + err = setupTestTables(ctx, conn) + is.NoErr(err) + defer func() { + _ = db.RemoveTrigger(ctx, pool, "audits") + cleanupTestTables(ctx, conn) + }() + + // Create mock webhook server + server, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + is.NoErr(err) + defer server.Close() + + // Test install with updateEntityUrl + updateEntityUrl := server.URL + err = db.CreateTrigger(ctx, pool, "audits", db.TriggerOptions{ + UpdateEntityURL: &updateEntityUrl, + }) + is.NoErr(err) + + // Verify trigger exists + var triggerExists bool + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_trigger + WHERE tgname = 'new_audit_log_trigger' + ) + `).Scan(&triggerExists) + is.NoErr(err) + is.True(triggerExists) + + // Verify function exists + var functionExists bool + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE p.proname = 'new_audit_log' AND n.nspname = 'public' + ) + `).Scan(&functionExists) + is.NoErr(err) + is.True(functionExists) + + // Cleanup already handled in defer +} + +// TestUninstallCommand tests that the uninstall command works correctly +func TestUninstallCommand(t *testing.T) { + dbUri := getTestDBURI() + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + + pool, err := db.InitPool(ctx, log, dbUri) + is.NoErr(err) + defer pool.Close() + + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Setup test tables + err = setupTestTables(ctx, conn) + is.NoErr(err) + defer func() { + cleanupTestTables(ctx, conn) + }() + + // Create mock webhook server + server, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + is.NoErr(err) + defer server.Close() + + // Install trigger first + updateEntityUrl := server.URL + err = db.CreateTrigger(ctx, pool, "audits", db.TriggerOptions{ + UpdateEntityURL: &updateEntityUrl, + }) + is.NoErr(err) + + // Verify trigger exists + var triggerExists bool + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_trigger + WHERE tgname = 'new_audit_log_trigger' + ) + `).Scan(&triggerExists) + is.NoErr(err) + is.True(triggerExists) + + // Uninstall trigger + err = db.RemoveTrigger(ctx, pool, "audits") + is.NoErr(err) + + // Verify trigger is removed + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_trigger + WHERE tgname = 'new_audit_log_trigger' + ) + `).Scan(&triggerExists) + is.NoErr(err) + is.True(!triggerExists) + + // Verify function is removed + var functionExists bool + err = conn.QueryRow(ctx, ` + SELECT EXISTS( + SELECT 1 FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE p.proname = 'new_audit_log' AND n.nspname = 'public' + ) + `).Scan(&functionExists) + is.NoErr(err) + is.True(!functionExists) +} + +// TestE2EInstallAndTrigger tests end-to-end: install trigger, insert audit record, verify webhook is called +func TestE2EInstallAndTrigger(t *testing.T) { + dbUri := getTestDBURI() + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + + pool, err := db.InitPool(ctx, log, dbUri) + is.NoErr(err) + defer pool.Close() + + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Clean up any existing triggers first + _ = db.RemoveTrigger(ctx, pool, "audits") + // Small delay to ensure cleanup completes + time.Sleep(100 * time.Millisecond) + + // Setup test tables + err = setupTestTables(ctx, conn) + is.NoErr(err) + defer func() { + _ = db.RemoveTrigger(ctx, pool, "audits") + cleanupTestTables(ctx, conn) + }() + + // Insert an entity record + entityInsertSQL := ` + INSERT INTO public.entity_defs ( + id, "entityId","createdAt","current","data","creatorId","label" + ) VALUES ( + 1001, + 900, + '2025-01-10 16:23:40.073', + true, + '{"status": "0", "task_id": "26", "version": "1"}', + 5, + 'Task 26 Feature 904487737' + ); + ` + _, err = conn.Exec(ctx, entityInsertSQL) + is.NoErr(err) + + // Create HTTP test server to receive webhook + var receivedPayload map[string]interface{} + var receivedHeaders http.Header + var requestReceived sync.WaitGroup + requestReceived.Add(1) + + server, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + receivedHeaders = r.Header + body, err := io.ReadAll(r.Body) + is.NoErr(err) + err = json.Unmarshal(body, &receivedPayload) + is.NoErr(err) + w.WriteHeader(http.StatusOK) + requestReceived.Done() + })) + is.NoErr(err) + defer server.Close() + + // Install trigger + updateEntityUrl := server.URL + err = db.CreateTrigger(ctx, pool, "audits", db.TriggerOptions{ + UpdateEntityURL: &updateEntityUrl, + }) + is.NoErr(err) + + // Insert an audit record to trigger the webhook + auditInsertSQL := ` + INSERT INTO audits ("actorId", action, details) + VALUES (1, 'entity.update.version', '{"entityDefId": 1001, "entity": {"uuid": "test-uuid-123", "dataset": "test"}}'); + ` + _, err = conn.Exec(ctx, auditInsertSQL) + is.NoErr(err) + + // Wait for HTTP request to be received (with timeout) + done := make(chan struct{}) + go func() { + requestReceived.Wait() + close(done) + }() + + select { + case <-done: + // Request received successfully + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for webhook request") + } + + // Validate the webhook payload + is.True(receivedPayload != nil) + is.Equal(receivedPayload["type"], "entity.update.version") + is.Equal(receivedPayload["id"], "test-uuid-123") + is.True(receivedPayload["data"] != nil) + + // Check nested JSON value for status in data + data, ok := receivedPayload["data"].(map[string]interface{}) + is.True(ok) + is.Equal(data["status"], "0") + + // Validate headers + is.Equal(receivedHeaders.Get("Content-Type"), "application/json") +} + +// TestE2EInstallWithAPIKey tests end-to-end with API key authentication +func TestE2EInstallWithAPIKey(t *testing.T) { + dbUri := getTestDBURI() + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + + pool, err := db.InitPool(ctx, log, dbUri) + is.NoErr(err) + defer pool.Close() + + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Clean up any existing triggers first + _ = db.RemoveTrigger(ctx, pool, "audits") + // Small delay to ensure cleanup completes + time.Sleep(100 * time.Millisecond) + + // Setup test tables + err = setupTestTables(ctx, conn) + is.NoErr(err) + defer func() { + _ = db.RemoveTrigger(ctx, pool, "audits") + cleanupTestTables(ctx, conn) + }() + + // Insert an entity record + entityInsertSQL := ` + INSERT INTO public.entity_defs ( + id, "entityId","createdAt","current","data","creatorId","label" + ) VALUES ( + 1001, + 900, + '2025-01-10 16:23:40.073', + true, + '{"status": "0"}', + 5, + 'Test Entity' + ); + ` + _, err = conn.Exec(ctx, entityInsertSQL) + is.NoErr(err) + + // Create HTTP test server to receive webhook + var receivedHeaders http.Header + var requestReceived sync.WaitGroup + requestReceived.Add(1) + + server, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedHeaders = r.Header + w.WriteHeader(http.StatusOK) + requestReceived.Done() + })) + is.NoErr(err) + defer server.Close() + + // Install trigger with API key + updateEntityUrl := server.URL + apiKey := "test-api-key-12345" + err = db.CreateTrigger(ctx, pool, "audits", db.TriggerOptions{ + UpdateEntityURL: &updateEntityUrl, + APIKey: &apiKey, + }) + is.NoErr(err) + + // Insert an audit record to trigger the webhook + auditInsertSQL := ` + INSERT INTO audits ("actorId", action, details) + VALUES (1, 'entity.update.version', '{"entityDefId": 1001, "entity": {"uuid": "test-uuid", "dataset": "test"}}'); + ` + _, err = conn.Exec(ctx, auditInsertSQL) + is.NoErr(err) + + // Wait for HTTP request to be received + done := make(chan struct{}) + go func() { + requestReceived.Wait() + close(done) + }() + + select { + case <-done: + // Request received successfully + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for webhook request") + } + + // Validate API key header + is.Equal(receivedHeaders.Get("X-API-Key"), "test-api-key-12345") + is.Equal(receivedHeaders.Get("Content-Type"), "application/json") +} + +// TestE2EInstallMultipleWebhooks tests installing multiple webhook types +func TestE2EInstallMultipleWebhooks(t *testing.T) { + dbUri := getTestDBURI() + is := is.New(t) + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + ctx := context.Background() + + pool, err := db.InitPool(ctx, log, dbUri) + is.NoErr(err) + defer pool.Close() + + conn, err := pool.Acquire(ctx) + is.NoErr(err) + defer conn.Release() + + // Clean up any existing triggers first + _ = db.RemoveTrigger(ctx, pool, "audits") + // Small delay to ensure cleanup completes + time.Sleep(100 * time.Millisecond) + + // Setup test tables + err = setupTestTables(ctx, conn) + is.NoErr(err) + defer func() { + _ = db.RemoveTrigger(ctx, pool, "audits") + cleanupTestTables(ctx, conn) + conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs CASCADE;`) + }() + + // Create submission_defs table + _, err = conn.Exec(ctx, `DROP TABLE IF EXISTS submission_defs CASCADE;`) + is.NoErr(err) + submissionTableSQL := ` + CREATE TABLE submission_defs ( + id int4, + "submissionId" int4, + "instanceId" uuid, + xml text, + "formDefId" int4, + "submitterId" int4, + "createdAt" timestamptz + ); + ` + _, err = conn.Exec(ctx, submissionTableSQL) + is.NoErr(err) + + // Insert test data + entityInsertSQL := ` + INSERT INTO public.entity_defs ( + id, "entityId","createdAt","current","data","creatorId","label" + ) VALUES ( + 1001, + 900, + '2025-01-10 16:23:40.073', + true, + '{"status": "0"}', + 5, + 'Test Entity' + ); + ` + _, err = conn.Exec(ctx, entityInsertSQL) + is.NoErr(err) + + submissionInsertSQL := ` + INSERT INTO submission_defs ( + id, "submissionId", xml, "formDefId", "submitterId", "createdAt" + ) VALUES ( + 1, + 2, + '', + 7, + 5, + '2025-01-10 16:23:40.073' + ); + ` + _, err = conn.Exec(ctx, submissionInsertSQL) + is.NoErr(err) + + // Create HTTP test servers + var entityPayload map[string]interface{} + var submissionPayload map[string]interface{} + var entityReceived sync.WaitGroup + var submissionReceived sync.WaitGroup + entityReceived.Add(1) + submissionReceived.Add(1) + + entityServer, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + is.NoErr(err) + err = json.Unmarshal(body, &entityPayload) + is.NoErr(err) + w.WriteHeader(http.StatusOK) + entityReceived.Done() + })) + is.NoErr(err) + defer entityServer.Close() + + submissionServer, err := createTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + is.NoErr(err) + err = json.Unmarshal(body, &submissionPayload) + is.NoErr(err) + w.WriteHeader(http.StatusOK) + submissionReceived.Done() + })) + is.NoErr(err) + defer submissionServer.Close() + + // Install triggers with multiple webhook URLs + updateEntityUrl := entityServer.URL + newSubmissionUrl := submissionServer.URL + err = db.CreateTrigger(ctx, pool, "audits", db.TriggerOptions{ + UpdateEntityURL: &updateEntityUrl, + NewSubmissionURL: &newSubmissionUrl, + }) + is.NoErr(err) + + // Trigger entity update + entityAuditSQL := ` + INSERT INTO audits ("actorId", action, details) + VALUES (1, 'entity.update.version', '{"entityDefId": 1001, "entity": {"uuid": "test-uuid-entity", "dataset": "test"}}'); + ` + _, err = conn.Exec(ctx, entityAuditSQL) + is.NoErr(err) + + // Trigger submission create + submissionAuditSQL := ` + INSERT INTO audits ("actorId", action, details) + VALUES (5, 'submission.create', '{"submissionDefId": 1, "instanceId": "test-instance-123"}'); + ` + _, err = conn.Exec(ctx, submissionAuditSQL) + is.NoErr(err) + + // Wait for both webhooks + done := make(chan struct{}) + go func() { + entityReceived.Wait() + submissionReceived.Wait() + close(done) + }() + + select { + case <-done: + // Both requests received successfully + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for webhook requests") + } + + // Validate entity webhook + is.True(entityPayload != nil) + is.Equal(entityPayload["type"], "entity.update.version") + is.Equal(entityPayload["id"], "test-uuid-entity") + + // Validate submission webhook + is.True(submissionPayload != nil) + is.Equal(submissionPayload["type"], "submission.create") + is.Equal(submissionPayload["id"], "test-instance-123") +} diff --git a/parser/audit.go b/parser/audit.go deleted file mode 100644 index 6e73075..0000000 --- a/parser/audit.go +++ /dev/null @@ -1,134 +0,0 @@ -package parser - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "log/slog" -) - -// ** Entities ** // -// Define the nested structs for the details field -type OdkEntityRef struct { - Uuid string `json:"uuid"` // Use string for UUID, as it may be 'uuid:xxx-xxx' - Dataset string `json:"dataset"` -} - -type OdkEntityDetails struct { - Entity OdkEntityRef `json:"entity"` - EntityId int `json:"entityId"` - EntityDefId int `json:"entityDefId"` -} - -// ** Submissions ** // - -type OdkSubmissionDetails struct { - InstanceId string `json:"instanceId"` // Use string for UUID, as it may be 'uuid:xxx-xxx' - // The submissionId field is present, but it's a database reference only, so we ignore it - // SubmissionId int `json:"submissionId"` - SubmissionDefId int `json:"submissionDefId"` -} - -// ** High level wrapper structs ** // - -// OdkAuditLog represents the main structure for the audit log (returned by pg_notify) -type OdkAuditLog struct { - Notes *string `json:"notes"` // Pointer to handle null values - Action string `json:"action"` - ActeeID string `json:"acteeId"` // Use string for UUID - ActorID int `json:"actorId"` - Details interface{} `json:"details"` // Use an interface to handle different detail types - Data interface{} `json:"data"` // Use an interface to handle different data types -} - -// ProcessedEvent represents the final parsed event structure (to send to the webhook API) -type ProcessedEvent struct { - Type string `json:"type"` // The event type, entity update or new submission - ID string `json:"id"` // Entity UUID or Submission InstanceID - Data interface{} `json:"data"` // The actual entity data or wrapped submission XML -} - -// ParseJsonString converts the pg_notify string to OdkAuditLog -func ParseJsonString(log *slog.Logger, data []byte) (*OdkAuditLog, error) { - if len(data) == 0 { - return nil, errors.New("empty input data") - } - - var parsedData OdkAuditLog - if err := json.Unmarshal(data, &parsedData); err != nil { - log.Error("failed to parse JSON data", "error", err, "data", string(data)) - return nil, err - } - log.Debug("parsed notification data", "data", parsedData) - return &parsedData, nil -} - -// ParseEventJson parses the JSON data and extracts the relevant ID and data fields -func ParseEventJson(log *slog.Logger, ctx context.Context, data []byte) (*ProcessedEvent, error) { - // Convert the raw pg_notify string to an OdkAuditLog - rawLog, err := ParseJsonString(log, data) - if err != nil { - return nil, err - } - - // Prepare the result structure - var processedEvent ProcessedEvent - - // Parse the details field based on the action - switch rawLog.Action { - case "entity.update.version": - var entityDetails OdkEntityDetails - if err := parseDetails(rawLog.Details, &entityDetails); err != nil { - log.Error("failed to parse entity.update.version details", "error", err) - return nil, err - } - processedEvent.Type = "entity.update.version" - processedEvent.ID = entityDetails.Entity.Uuid - processedEvent.Data = rawLog.Data - - case "submission.create": - var submissionDetails OdkSubmissionDetails - if err := parseDetails(rawLog.Details, &submissionDetails); err != nil { - log.Error("failed to parse submission.create details", "error", err) - return nil, err - } - processedEvent.Type = "submission.create" - processedEvent.ID = submissionDetails.InstanceId - - // Parse the raw XML data - rawData, ok := rawLog.Data.(map[string]interface{}) - if !ok { - log.Error("invalid data type for submission.create", "data", rawLog.Data) - return nil, errors.New("invalid data type for submission.create") - } - processedEvent.Data = rawData - - case "submission.update": - var submissionDetails OdkSubmissionDetails - if err := parseDetails(rawLog.Details, &submissionDetails); err != nil { - log.Error("failed to parse submission.update details", "error", err) - return nil, err - } - processedEvent.Type = "submission.update" - processedEvent.ID = submissionDetails.InstanceId - processedEvent.Data = rawLog.Data - - default: - // No nothing if the event type is not supported - log.Warn("unsupported action type", "action", rawLog.Action) - return nil, fmt.Errorf("unsupported action type") - } - - log.Debug("parsed event successfully", "processedEvent", processedEvent) - return &processedEvent, nil -} - -// parseDetails helps to unmarshal the details field into the appropriate structure -func parseDetails(details interface{}, target interface{}) error { - detailsBytes, err := json.Marshal(details) - if err != nil { - return err - } - return json.Unmarshal(detailsBytes, target) -} diff --git a/parser/audit_test.go b/parser/audit_test.go deleted file mode 100644 index 73223ae..0000000 --- a/parser/audit_test.go +++ /dev/null @@ -1,105 +0,0 @@ -package parser - -import ( - "context" - "io" - "log/slog" - "testing" - - "github.com/matryer/is" -) - -func TestParseJsonString(t *testing.T) { - is := is.New(t) - log := slog.New(slog.NewJSONHandler(io.Discard, nil)) - - t.Run("Valid JSON", func(t *testing.T) { - input := []byte(`{"id":"123","action":"entity.update.version","actorId":1,"details":{"entity":{"uuid":"abc","dataset":"test"}},"data":{}}`) - result, err := ParseJsonString(log, input) - is.NoErr(err) - is.Equal("entity.update.version", result.Action) - is.Equal(1, result.ActorID) - }) - - t.Run("Empty Input", func(t *testing.T) { - input := []byte("") - result, err := ParseJsonString(log, input) - is.Equal(result, nil) - is.True(err != nil) - is.Equal("empty input data", err.Error()) - }) - - t.Run("Invalid JSON", func(t *testing.T) { - input := []byte(`invalid`) - result, err := ParseJsonString(log, input) - is.Equal(result, nil) - is.True(err != nil) - }) -} - -func TestParseEventJson(t *testing.T) { - is := is.New(t) - log := slog.New(slog.NewJSONHandler(io.Discard, nil)) - ctx := context.Background() - - t.Run("Entity Update Version", func(t *testing.T) { - input := []byte(`{ - "id":"123", - "action":"entity.update.version", - "actorId":1, - "details":{"entity":{"uuid":"abc","dataset":"test"}}, - "data":{} - }`) - result, err := ParseEventJson(log, ctx, input) - is.NoErr(err) - is.Equal("abc", result.ID) - is.Equal(map[string]interface{}{}, result.Data) - }) - - t.Run("Submission Create", func(t *testing.T) { - input := []byte(`{ - "id":"456", - "action":"submission.create", - "actorId":2, - "details":{"instanceId":"sub-123","submissionId":789,"submissionDefId":101112}, - "data":{"xml":""} - }`) - result, err := ParseEventJson(log, ctx, input) - is.NoErr(err) - is.Equal("sub-123", result.ID) - - wrappedData, ok := result.Data.(map[string]interface{}) - is.True(ok) - is.Equal("", wrappedData["xml"]) - }) - - t.Run("Submission Review", func(t *testing.T) { - input := []byte(`{ - "id":"456", - "action":"submission.update", - "details":{"submissionId":789,"submissionDefId":101112}, - "data":{"reviewState":"approved"} - }`) - result, err := ParseEventJson(log, ctx, input) - is.NoErr(err) - - wrappedData, ok := result.Data.(map[string]interface{}) - is.True(ok) - // This will check if ODK changes its audit structure for submission.update over time - is.Equal("approved", wrappedData["reviewState"]) - }) - - t.Run("Unsupported Action", func(t *testing.T) { - input := []byte(`{ - "id":"789", - "action":"unknown.action", - "actorId":3, - "details":{}, - "data":{} - }`) - result, err := ParseEventJson(log, ctx, input) - is.Equal(result, nil) - is.True(err != nil) - is.Equal("unsupported action type", err.Error()) - }) -} diff --git a/http.dockerfile b/pgsql-http.dockerfile similarity index 89% rename from http.dockerfile rename to pgsql-http.dockerfile index f57e620..e82d0b4 100644 --- a/http.dockerfile +++ b/pgsql-http.dockerfile @@ -1,27 +1,32 @@ +# Postgres v14 FROM postgres:14 AS pg-14 RUN apt-get update \ && apt-get install -y postgresql-14-http \ && rm -rf /var/lib/apt/lists/* +# Postgres v15 FROM postgres:15 AS pg-15 RUN apt-get update \ && apt-get install -y postgresql-15-http \ && rm -rf /var/lib/apt/lists/* +# Postgres v16 FROM postgres:16 AS pg-16 RUN apt-get update \ && apt-get install -y postgresql-16-http \ && rm -rf /var/lib/apt/lists/* +# Postgres v17 FROM postgres:17 AS pg-17 RUN apt-get update \ && apt-get install -y postgresql-17-http \ && rm -rf /var/lib/apt/lists/* +# Postgres v18 FROM postgres:18 AS pg-18 RUN apt-get update \ diff --git a/webhook/auth.go b/webhook/auth.go deleted file mode 100644 index d770c2c..0000000 --- a/webhook/auth.go +++ /dev/null @@ -1 +0,0 @@ -package webhook diff --git a/webhook/request.go b/webhook/request.go deleted file mode 100644 index 9c22164..0000000 --- a/webhook/request.go +++ /dev/null @@ -1,69 +0,0 @@ -package webhook - -import ( - "bytes" - "context" - "encoding/json" - "io" - "log/slog" - "net/http" - "time" - - "github.com/hotosm/central-webhook/parser" -) - -// SendRequest parses the request content JSON from the PostgreSQL notification -// and sends the JSON payload to an external API endpoint. -func SendRequest( - log *slog.Logger, - ctx context.Context, - apiEndpoint string, - eventJson parser.ProcessedEvent, - apiKey *string, -) { - // Marshal the payload to JSON - marshaledPayload, err := json.Marshal(eventJson) - if err != nil { - log.Error("failed to marshal payload to JSON", "error", err) - return - } - - // Create the HTTP request - req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiEndpoint, bytes.NewBuffer(marshaledPayload)) - if err != nil { - log.Error("failed to create HTTP request", "error", err) - return - } - req.Header.Set("Content-Type", "application/json") - // Add X-API-Key header if apiKey is provided - if apiKey != nil { - req.Header.Set("X-API-Key", *apiKey) - } - - // Send the request - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - log.Error("failed to send HTTP request", "error", err) - return - } - respBodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - log.Error("failed to read response body", "error", err) - return - } - respBodyString := string(respBodyBytes) - defer resp.Body.Close() - - // Check the response status - if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { - log.Info("webhook called successfully", "status", resp.StatusCode, "endpoint", apiEndpoint) - } else { - log.Error( - "failed to call webhook", - "endpoint", apiEndpoint, - "requestPayload", eventJson, - "responseCode", resp.StatusCode, - "responseBody", respBodyString) - } -} diff --git a/webhook/request_test.go b/webhook/request_test.go deleted file mode 100644 index d939e21..0000000 --- a/webhook/request_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package webhook - -import ( - "context" - "encoding/json" - "io" - "log/slog" - "net/http" - "net/http/httptest" - "os" - "testing" - "time" - - "github.com/matryer/is" - - "github.com/hotosm/central-webhook/parser" -) - -func TestSendRequest(t *testing.T) { - is := is.New(t) - log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) - - // Set up a mock server - var receivedPayload parser.ProcessedEvent - var receivedApiKey string - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Verify content type - is.Equal("application/json", r.Header.Get("Content-Type")) - - // Verify API key was received - receivedApiKey = r.Header.Get("X-API-Key") - - // Read and parse request body - body, err := io.ReadAll(r.Body) - is.NoErr(err) - defer r.Body.Close() - - err = json.Unmarshal(body, &receivedPayload) - is.NoErr(err) - - w.WriteHeader(http.StatusOK) - })) - defer server.Close() - - // Define test cases - testCases := []struct { - name string - event parser.ProcessedEvent - expectedId string - expectedType string - expectedData interface{} - }{ - { - name: "Submission Create Event", - event: parser.ProcessedEvent{ - ID: "23dc865a-4757-431e-b182-67e7d5581c81", - Type: "submission.create", - Data: "XML Data", - }, - expectedId: "23dc865a-4757-431e-b182-67e7d5581c81", - expectedType: "submission.create", - expectedData: "XML Data", - }, - { - name: "Entity Update Event", - event: parser.ProcessedEvent{ - ID: "45fgh789-e32c-56d2-a765-427654321abc", - Type: "entity.update.version", - Data: "{\"field\":\"value\"}", - }, - expectedId: "45fgh789-e32c-56d2-a765-427654321abc", - expectedType: "entity.update.version", - expectedData: "{\"field\":\"value\"}", - }, - { - name: "Submission Review Event", - event: parser.ProcessedEvent{ - ID: "45fgh789-e32c-56d2-a765-427654321abc", - Type: "submission.update", - Data: "approved", - }, - expectedId: "45fgh789-e32c-56d2-a765-427654321abc", - expectedType: "submission.update", - expectedData: "approved", - }, - } - - // Execute test cases - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - testApiKey := "test-api-key" - SendRequest(log, ctx, server.URL, tc.event, &testApiKey) - - // Validate the received payload - is.Equal(tc.expectedId, receivedPayload.ID) - is.Equal(tc.expectedType, receivedPayload.Type) - is.Equal(tc.expectedData, receivedPayload.Data) - - // Validate that the API key header was sent correctly - is.Equal("test-api-key", receivedApiKey) - }) - } -} From 5a7975f07f4f78c5e42c18f95d38cf05614c86b5 Mon Sep 17 00:00:00 2001 From: spwoodcock Date: Wed, 17 Dec 2025 13:03:29 +0000 Subject: [PATCH 2/2] ci: use os.Hostname for webhook testing in ci --- db/trigger_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/db/trigger_test.go b/db/trigger_test.go index 2743723..f764cbb 100644 --- a/db/trigger_test.go +++ b/db/trigger_test.go @@ -37,18 +37,22 @@ func createTestServer(handler http.Handler) (*testServer, error) { return nil, err } - server := &http.Server{Handler: handler} + server := &http.Server{ + Handler: handler, + } port := listener.Addr().(*net.TCPAddr).Port - // Use Docker service name so Postgres container can resolve it - hostname := os.Getenv("TEST_SERVER_HOST") - if hostname == "" { + // Try container hostname first for CI, but fallback to compose service + // name for local runs + hostname, err := os.Hostname() + if err != nil || hostname == "" { hostname = "webhook" // docker-compose service name } url := fmt.Sprintf("http://%s:%d", hostname, port) + ts := &testServer{ server: server, listener: listener,