Emmett is a library that helps developing an event sourced system.
The following are the tables used for building Event Sourcing module.
| Column | Type | Nullable | Default | Notes |
|---|---|---|---|---|
| stream_id | TEXT | NO | ||
| stream_position | BIGINT | NO | ||
| partition | TEXT | NO | '${customer_id}' | Partition key value |
| stream_type | TEXT | NO | ||
| stream_metadata | JSONB | NO | ||
| is_archived | BOOLEAN | NO | FALSE |
- Primary key:
(stream_id, partition, is_archived) - Partitioning:
PARTITION BY LIST (partition) - Indexes: Unique index on
(stream_id, partition, is_archived)includingstream_position
| Column | Type | Nullable | Default | Notes |
|---|---|---|---|---|
| stream_id | TEXT | NO | ||
| stream_position | BIGINT | NO | ||
| partition | TEXT | NO | '${customer_id}' | Partition key value |
| message_kind | CHAR(1) | NO | 'E' | |
| message_data | JSONB | NO | ||
| message_metadata | JSONB | NO | ||
| message_schema_version | TEXT | NO | ||
| message_type | TEXT | NO | ||
| message_id | TEXT | NO | ||
| is_archived | BOOLEAN | NO | FALSE | |
| global_position | BIGINT | YES | nextval('emt_global_message_position') | Monotonic sequence |
| transaction_id | XID8 | NO | PostgreSQL 64-bit XID | |
| created | TIMESTAMPTZ | NO | now() |
- Primary key:
(stream_id, stream_position, partition, is_archived) - Partitioning:
PARTITION BY LIST (partition) - Sequences:
emt_global_message_position(used byglobal_position)
| Column | Type | Nullable | Default | Notes |
|---|---|---|---|---|
| subscription_id | TEXT | NO | ||
| version | INT | NO | 1 | Part of PK |
| partition | TEXT | NO | '${customer_id}' | Partition key value |
| last_processed_position | BIGINT | NO | ||
| last_processed_transaction_id | XID8 | NO | PostgreSQL 64-bit XID |
- Primary key:
(subscription_id, partition, version) - Partitioning:
PARTITION BY LIST (partition)
The tables above implement Emmett's event store for PostgreSQL. See the Emmett documentation for an overview: Emmett overview.
- A command handler decides on changes and records them as immutable events.
- Events are appended to a stream identified by
stream_id. - Each append increments
stream_positionwithin that stream. This enables optimistic concurrency checks (compare expected position before writing). - A row in
messagesis written per event. global_positionis assigned from a sequence to provide a monotonic, cross-stream ordering for consumers.- The
streamstable tracks the stream's type, latest position and metadata for quick access and listing.
partitionis a logical partition key. It is used for multi-tenancy.- Both
streamsandmessagesincludeis_archived. Archiving allows hiding historical streams/messages without deleting rows and is part of the primary key, preserving history.
sequenceDiagram
participant C as "Command Handler"
participant ES as "Event Store"
participant DB as PostgreSQL
Note over C: Decide events, set expectedPosition = n
C->>ES: Append(stream_id, events, expectedPosition)
ES->>DB: SELECT max(stream_position) FROM messages WHERE stream_id=...
alt Position matches
ES->>DB: INSERT INTO messages
ES->>DB: UPDATE streams
ES-->>C: Append OK (new stream_position, global_position)
else Conflict
ES-->>C: VersionConflictError
end
- Projections/read models consume events in
messages, typically by increasingglobal_position. - Each projection keeps its cursor in
subscriptionsaslast_processed_position(andlast_processed_transaction_id) so it can resume after restarts. - Processing flow:
- Load next batch of events where
global_position>last_processed_position. - Apply events to your read model store (e.g., PostgreSQL tables, Elasticsearch, caches).
- Update the corresponding row in
subscriptionsto the last successfully processed position.
- Load next batch of events where
- Using the position bookmark ensures at-least-once processing with idempotent projections. The transaction id helps detect boundary cases around transaction visibility.
flowchart TD
Start([Start]) --> Init[Read subscriptions row]
Init --> Read[Fetch events]
Read --> Apply[Apply to read model]
Apply --> Bookmark[Update subscriptions]
Bookmark --> Wait[Wait for new events]
Wait --> Read
This example demonstrates an event-sourced shopping cart implementation using the Emmett framework. The cart follows CQRS and Event Sourcing patterns with clear separation between commands, events, and domain states.
Commands represent the intent to perform an action on the cart:
| Command | Description | Parameters |
|---|---|---|
| CreateCart | Initializes a new shopping cart | tenantId, cartId, currency |
| AddItemToCart | Adds an item to the cart | tenantId, cartId, item (sku, quantity, price) |
| RemoveItemFromCart | Removes a specific quantity of an item | tenantId, cartId, sku, quantity |
| CartEmptied | Empties all items from the cart | tenantId, cartId |
| CartCheckedOut | Completes the cart checkout process | tenantId, cartId, orderId, total |
| CartCancelled | Cancels the cart with a reason | tenantId, cartId, reason |
Events represent what has happened in the system and are immutable:
| Event | Description | Data |
|---|---|---|
| CartCreated | Cart has been initialized | currency |
| ItemAddedToCart | An item was added to the cart | item (sku, quantity, price) |
| ItemRemovedFromCart | An item was removed from the cart | sku, quantity |
| CartEmptied | All items were removed from the cart | null |
| CartCheckedOut | Cart checkout was completed | orderId, total |
| CartCancelled | Cart was cancelled | reason |
The cart can exist in different states throughout its lifecycle:
| State | Description | Properties |
|---|---|---|
| InitCart | Initial state before cart creation | status: "init", items: [] |
| ActiveCart | Cart is active and can be modified | status: "active", tenantId, cartId, currency, items |
| CheckedOutCart | Cart has been checked out | status: "checkedOut", tenantId, cartId, currency, items |
| CancelledCart | Cart has been cancelled | status: "cancelled", tenantId, cartId, currency, items |
InitCart → ActiveCart (via CartCreated)
ActiveCart → ActiveCart (via ItemAddedToCart, ItemRemovedFromCart, CartEmptied)
ActiveCart → CheckedOutCart (via CartCheckedOut)
ActiveCart → CancelledCart (via CartCancelled)
- Only positive quantities are allowed for items
- Cannot remove more items than exist in the cart
- Cannot modify a cart that has been checked out or cancelled
- Total amount cannot be negative during checkout
- All operations require valid tenantId and cartId
When the following events occur:
- Cart created
- An item added (SKU-123 x2 @ $25)
- Another item added (SKU-456 x1 @ $15)
- The first item removed (SKU-123 x1)
- Checkout the cart
"Writes" inserts/updates to the following tables.
| message_data | message_type |
|---|---|
| CartCreated | {"currency": "USD"} |
| ItemAddedToCart | {"item": {"sku": "SKU-123", "name": "Item 123", "quantity": 2, "unitPrice": 25}} |
| ItemAddedToCart | {"item": {"sku": "SKU-456", "name": "Item 456", "quantity": 1, "unitPrice": 15}} |
| ItemRemovedFromCart | {"sku": "SKU-123", "quantity": 1} |
| CartCheckedOut | {"total": 40, "orderId": "38fc43c4-893d-4398-9290-2441490d0545"} |
| stream_id | stream_position | stream_type |
|---|---|---|
| de9f2d24-2475-49dc-88d7-652028650204 | 5 | cart |
Then, "Reads" inserts into the following table. subscription_id contains what executed the projection.
| subscription_id | last_processed_position |
|---|---|
| carts-read-model:de9f2d24-2475-49dc-88d7-652028650204 | 5 |
| cart_id | currency | items_json |
|---|---|---|
| de9f2d24-2475-49dc-88d7-652028650204 | USD | {"items": [{"sku": "SKU-123", "name": "Item 123", "quantity": 1, "unitPrice": 25}, {"sku": "SKU-456", "name": "Item 456", "quantity": 1, "unitPrice": 15}], "total": 40, "orderId": "38fc43c4-893d-4398-9290-2441490d0545"} |
As long as you only touch the domain business logic, you would not need to worry about the streams table and the subscriptions table. Those are the system tables to store the internal states.