Project context for AI coding assistants. See agents.md for the open standard.
Event-driven notification system for DAO governance, built as a pnpm monorepo with 4 microservices connected via RabbitMQ:
- Logic System (
apps/logic-system/) - Polls AntiCapture REST API every Xs, detects governance events, publishes trigger events - Dispatcher (
apps/dispatcher/) - Consumes trigger events, fetches subscribers with temporal filtering, routes notifications - Subscription Server (
apps/subscription-server/) - Fastify REST API for user preferences, PostgreSQL persistence, Slack OAuth - Consumer (
apps/consumers/) - Delivers notifications via Telegram (telegraf) and Slack (@slack/bolt) bots
Supporting packages: anticapture-client (REST SDK wrapper), messages (templates), rabbitmq-client (AMQP wrapper).
Dashboard (apps/dashboard/) provides read-only metrics via Next.js.
pnpm install # Install all dependencies
pnpm dev # Start all services with Docker Compose
pnpm build # Build all services (via Turbo)
pnpm test # Run all tests (via Turbo)
# Service-specific (filter shortcuts)
pnpm logic-system <cmd> # e.g., pnpm logic-system test
pnpm dispatcher <cmd>
pnpm subscription-server <cmd>
pnpm consumer <cmd>
# Testing specific services/patterns
pnpm --filter @notification-system/logic-system test
pnpm --filter @notification-system/integrated-tests test -- --testNamePattern="voting"
NODE_ENV=test pnpm --filter @notification-system/integrated-tests test
# Type checking and linting
pnpm consumer check-types
pnpm logic-system lintLogic System (polls API every 30s)
-> publishes TriggerEvent to dispatcher-queue
-> Dispatcher consumes, fetches subscribers with temporal filtering
-> publishes NotificationPayload to notifications.exchange (topic)
-> Consumer binds notifications.<channel>.* per platform
-> delivers via Telegram/Slack -> marks as sent in Subscription Server
Step-by-step guide for agents and developers. Use the same trigger id (e.g. my-trigger) in Logic System and Dispatcher so routing works.
- Create trigger in
apps/logic-system/src/triggers/: extendTrigger<T>(seebase-trigger.ts), implementfetchData()andprocess(data[], lastTimestamp?). - Optional: If the trigger needs a dedicated data layer, add a repository in
apps/logic-system/src/repositories/and use it from the trigger. - Register the trigger in
App.setupTriggers()inapps/logic-system/src/app.ts.
- Create handler in
apps/dispatcher/src/services/triggers/: extendBaseTriggerHandler, use the same trigger id as in Logic System. - Register the handler in
TriggerProcessorServiceviaaddHandler()inapps/dispatcher/src/app.ts.
- Add message templates in
packages/messages/src/triggers/(e.g.my-trigger.ts) with{{placeholder}}syntax; export frompackages/messages/src/index.ts. - Add buttons config in
packages/messages/src/triggers/buttons.tsif the notification needs CTAs (e.g. explorer links).
- Logic System: Add tests in
apps/logic-system/tests/(e.g. next to or mirroring the trigger). Use stubs or fakes for the RabbitMQ dispatcher and repositories (preferred over mocks); assertfetchData()/process()behavior. - Dispatcher: Add or extend co-located tests (e.g.
my-trigger.service.test.tsinapps/dispatcher/src/services/triggers/). Use stubs or fakes forISubscriptionClient,INotificationClient, and dependencies (preferred over mocks); assert handler logic and payload shape. - Messages: Add tests in
packages/messagesfor new templates (placeholder replacement, edge cases) if non-trivial.
- Add or extend tests in
apps/integrated-tests/: cover the new trigger flow (Logic System → Dispatcher → Consumer) using real RabbitMQ (testcontainers), in-memory DB, and stubs/fakes (or mocks during transition) for Telegram/Slack. Run withNODE_ENV=testand optionally filter:pnpm --filter @notification-system/integrated-tests test -- --testNamePattern="my-trigger".
| Step | Location | Action |
|---|---|---|
| 1a | Logic System src/triggers/ |
New class extending Trigger<T>, implement fetchData() and process() |
| 1b | Logic System src/repositories/ |
(Optional) New repository if trigger needs dedicated data access |
| 1c | Logic System src/app.ts |
Register trigger in App.setupTriggers() |
| 2a | Dispatcher src/services/triggers/ |
New handler extending BaseTriggerHandler |
| 2b | Dispatcher src/app.ts |
Register handler with TriggerProcessorService.addHandler() |
| 3a | Messages src/triggers/ |
New template file; export from index.ts |
| 3b | Messages src/triggers/buttons.ts |
Add button config if trigger has CTAs |
| 4 | Logic System, Dispatcher, Messages | Add/update unit tests |
| 5 | apps/integrated-tests/ |
Add or extend integration test for the new trigger flow |
Key tables:
users- User profiles withchannel,channel_user_iduser_preferences- DAO subscriptions withis_active,created_at(temporal filtering)user_notifications- Delivery tracking for deduplicationuser_addresses- Wallet addresses for personalized notificationschannel_workspaces- Slack workspace metadataslack_workspaces- Encrypted OAuth tokens (AES-256-CBC)
Migrations in apps/subscription-server/db/migrations/ (Knex.js).
Unit tests: Prefer stubs and fakes over mocks. The codebase still has many mocks; new tests and refactors should use stubs/fakes where possible (e.g. in-memory or fake implementations of interfaces) to improve maintainability and avoid over-coupling to implementation details.
Integration tests (apps/integrated-tests/): Uses @testcontainers/rabbitmq for real RabbitMQ, SQLite in-memory DB, and stubs/fakes (or temporary mocks) for Telegram/Slack. Run with NODE_ENV=test. Prefer stubs and fakes for external service boundaries as we migrate away from mocks.
Dashboard tests: Node.js built-in test module (tsx --test).
Required .env variables:
DATABASE_URL=postgresql://user:pass@localhost/dbname
RABBITMQ_URL=amqp://localhost
ANTICAPTURE_API_URL=https://...
TELEGRAM_BOT_TOKEN=...
SLACK_SIGNING_SECRET=...
TOKEN_ENCRYPTION_KEY=... # 64-char hex for AES-256-CBC
- Language: TypeScript (strict mode) across all services
- Validation: Zod schemas for environment variables and API inputs
- Monorepo: pnpm workspaces + Turbo for builds
- Testing: Vitest (all apps + packages), MSW for HTTP/JSON stubs in integrated-tests, Node.js test runner (dashboard)
- Package manager: pnpm 10.x, Node.js >= 18
- GitHub Actions deploys to Railway on push to
devormain - Path-based triggers for selective service deployment
- Docker Compose in
docker-compose.ymlfor local development - Each service has its own
Dockerfile
To test without real blockchain events, insert mock data into the AntiCapture API database. The Logic System polls this data and triggers notifications.
- Identify the correct schema (check
information_schema.schemata) - Find an active user in subscription-server with
is_active = true - Get the user's wallet address from
user_addressestable - Run the indexer locally (
pnpm serve) or just use the onde deployed on dev.
- Always disable triggers before INSERT: Tables have
live_querytriggers that fail on manual inserts - Use real
proposal_idfor votes: Fake IDs cause API 500 errors - Prefix mock tx_hash with
0xmock: Makes cleanup easy - Use current timestamp:
extract(epoch from now())::bigintin SQL
SET search_path TO "<schema_uuid>";
ALTER TABLE votes_onchain DISABLE TRIGGER ALL;
INSERT INTO votes_onchain (tx_hash, dao_id, voter_account_id, proposal_id, support, voting_power, reason, timestamp)
VALUES (
'0xmock_vote_' || extract(epoch from now())::bigint,
'ENS',
'<user_wallet_address>',
'<real_proposal_id_from_proposals_onchain>',
'1', -- 0=Against, 1=For, 2=Abstain
1000000000000000000,
'Mock vote for testing',
extract(epoch from now())::bigint
);
ALTER TABLE votes_onchain ENABLE TRIGGER ALL;Important: The underlying tables are in a UUID schema (e.g. ecbe454c-b8fe-4659-8864-4cc68148cfde), not in anticapture (which has views). Find the correct schema via: SELECT pg_get_viewdef('anticapture.voting_power_history', true);
Critical: Use a fixed transaction hash (not extract(epoch from now())) so both tables share the exact same value. The API joins voting_power_history with delegations on transaction_hash, and requires delegation.log_index < voting_power_history.log_index. If the join fails, changeType becomes 'other', which bypasses threshold filtering entirely.
SET search_path TO "<underlying_uuid_schema>";
ALTER TABLE voting_power_history DISABLE TRIGGER ALL;
ALTER TABLE delegations DISABLE TRIGGER ALL;
-- Insert delegations FIRST with log_index = 0
INSERT INTO delegations (transaction_hash, dao_id, delegate_account_id, delegator_account_id, delegated_value, previous_delegate, timestamp, log_index)
VALUES (
'0xmock_vp_test',
'ENS',
'<user_wallet_address>',
'0x1111111111111111111111111111111111111111',
1000000000000000000,
'0x0000000000000000000000000000000000000000',
extract(epoch from now())::bigint,
0 -- must be < voting_power_history.log_index
);
-- Then insert voting_power_history with log_index = 1
INSERT INTO voting_power_history (transaction_hash, dao_id, account_id, voting_power, delta, delta_mod, timestamp, log_index)
VALUES (
'0xmock_vp_test',
'ENS',
'<user_wallet_address>',
5000000000000000000,
1000000000000000000,
1000000000000000000,
extract(epoch from now())::bigint,
1 -- must be > delegation.log_index
);
ALTER TABLE voting_power_history ENABLE TRIGGER ALL;
ALTER TABLE delegations ENABLE TRIGGER ALL;Tests the offchain-proposal-finished trigger. The proposal must have state = 'closed' and end within the last 24 hours (the trigger cursor starts at now() - 24h).
-- No triggers on snapshot.proposals, safe to insert directly
INSERT INTO snapshot.proposals (id, space_id, author, title, body, discussion, type, start, "end", state, created, updated, link, flagged)
VALUES (
'0xmock_offchain_finished_' || extract(epoch from now())::bigint,
'ens.eth',
'0x1111111111111111111111111111111111111111',
'[MOCK] Test Finished Offchain Proposal',
'Mock proposal for testing offchain-proposal-finished notification.',
'',
'single-choice',
(extract(epoch from now()) - 86400)::integer, -- started 24h ago
(extract(epoch from now()) - 60)::integer, -- ended 1 minute ago (within cursor window)
'closed',
(extract(epoch from now()) - 86400)::integer,
extract(epoch from now())::integer,
'https://snapshot.org/#/ens.eth',
false
);Key differences from new-proposal insert:
state = 'closed'(not'active')end= recent past (within last 24h so trigger cursor picks it up)
-- No triggers on snapshot.proposals, safe to insert directly
INSERT INTO snapshot.proposals (id, space_id, author, title, body, discussion, type, start, "end", state, created, updated, link, flagged)
VALUES (
'0xmock_offchain_' || extract(epoch from now())::bigint,
'ens.eth',
'0x1111111111111111111111111111111111111111',
'[MOCK] Test Offchain Proposal for Notification System',
'This is a mock offchain proposal inserted for testing the notification pipeline.',
'<discussion_url_from_existing_proposal>', -- e.g. https://discuss.ens.domains/t/...
'single-choice',
extract(epoch from now())::integer,
(extract(epoch from now()) + 604800)::integer, -- ends in 7 days
'active',
extract(epoch from now())::integer,
extract(epoch from now())::integer,
'<link_from_existing_proposal>', -- copy from: SELECT link FROM snapshot.proposals ORDER BY created DESC LIMIT 5
false
);SET search_path TO "<schema_uuid>";
ALTER TABLE votes_onchain DISABLE TRIGGER ALL;
ALTER TABLE voting_power_history DISABLE TRIGGER ALL;
ALTER TABLE delegations DISABLE TRIGGER ALL;
DELETE FROM votes_onchain WHERE tx_hash LIKE '0xmock%';
DELETE FROM voting_power_history WHERE transaction_hash LIKE '0xmock%';
DELETE FROM delegations WHERE transaction_hash LIKE '0xmock%';
DELETE FROM snapshot.proposals WHERE id LIKE '0xmock%';
ALTER TABLE votes_onchain ENABLE TRIGGER ALL;
ALTER TABLE voting_power_history ENABLE TRIGGER ALL;
ALTER TABLE delegations ENABLE TRIGGER ALL;-- Get real proposal IDs
SELECT id, LEFT(description, 50), status FROM proposals_onchain ORDER BY timestamp DESC LIMIT 5;
-- Check for orphan votes (will cause API 500)
SELECT v.* FROM votes_onchain v LEFT JOIN proposals_onchain p ON v.proposal_id = p.id WHERE p.id IS NULL;
-- List mock records
SELECT 'VOTE' as type, tx_hash, timestamp FROM votes_onchain WHERE tx_hash LIKE '0xmock%'
UNION ALL SELECT 'VP', transaction_hash, timestamp FROM voting_power_history WHERE transaction_hash LIKE '0xmock%'
UNION ALL SELECT 'DELEG', transaction_hash, timestamp FROM delegations WHERE transaction_hash LIKE '0xmock%'
UNION ALL SELECT 'OFFCHAIN', id, created FROM snapshot.proposals WHERE id LIKE '0xmock%';