Skip to content

Commit c17ef14

Browse files
Fix out of order stream (#240)
* fix: out of order stream Co-authored-by: Luca Iachini <luca.iachini89@gmail.com> * bump disintegrate-postgres version Co-authored-by: Luca Iachini <luca.iachini89@gmail.com> * clippy fix --------- Co-authored-by: Valerio Iachini <valerio.iachini@gmail.com>
1 parent 8c2389a commit c17ef14

11 files changed

Lines changed: 385 additions & 38 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,15 @@ To add Disintegrate to your project, follow these steps:
7777
```toml
7878
[dependencies]
7979
disintegrate = "4.0.0"
80-
disintegrate-postgres = "4.0.0"
80+
disintegrate-postgres = "4.0.1"
8181
```
8282
8383
* Disintegrate provides several features that you can enable based on your project requirements. You can include them in your `Cargo.toml` file as follows:
8484
8585
```toml
8686
[dependencies]
8787
disintegrate = { version = "4.0.0", features = ["macros", "serde-prost"] }
88-
disintegrate-postgres = { version = "4.0.0", features = ["listener"] }
88+
disintegrate-postgres = { version = "4.0.1", features = ["listener"] }
8989
```
9090
9191
* The macros feature enables the use of derive macros to simplify events implementations.
@@ -97,7 +97,7 @@ To add Disintegrate to your project, follow these steps:
9797
* To enable Prost serialization, use the `serde-prost` feature: `features = ["serde-prost"]`.
9898
* To enable Protocol Buffers serialization, use the `serde-protobuf` feature: `features = ["serde-protobuf"]`.
9999
100-
* If you're using the PostgreSQL event store backend and want to use the listener mechanism, you can enable the `listener` feature: `disintegrate-postgres = {version = "4.0.0", features = ["listener"]}`.
100+
* If you're using the PostgreSQL event store backend and want to use the listener mechanism, you can enable the `listener` feature: `disintegrate-postgres = {version = "4.0.1", features = ["listener"]}`.
101101
102102
2. Define the list of events in your application. You can use the Event Storming technique to identify the events that occur in your system. Here's an example of defining events using Disintegrate:
103103

disintegrate-postgres/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "disintegrate-postgres"
33
description = "Disintegrate PostgresDB implementation. Not for direct use. Refer to the `disintegrate` crate for details."
4-
version = "4.0.0"
4+
version = "4.0.1"
55
license.workspace = true
66
edition.workspace = true
77
authors.workspace = true

disintegrate-postgres/src/event_store.rs

Lines changed: 69 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,78 @@ where
8484
S: Serde<E> + Send + Sync,
8585
E: Event + Send + Sync,
8686
{
87+
/// Appends events within an existing transaction without committing.
88+
///
89+
/// This allows callers to control when the transaction is committed.
90+
/// Sets SERIALIZABLE isolation, calls `event_store_begin_epoch()` to
91+
/// signal an in-flight write, performs the concurrency check, and inserts events.
92+
pub(crate) async fn append_in_tx<QE>(
93+
&self,
94+
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
95+
events: Vec<E>,
96+
query: StreamQuery<PgEventId, QE>,
97+
version: PgEventId,
98+
) -> Result<Vec<PersistedEvent<PgEventId, E>>, Error>
99+
where
100+
E: Clone,
101+
QE: Event + Clone + Send + Sync,
102+
{
103+
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
104+
.execute(&mut **tx)
105+
.await?;
106+
107+
sqlx::query("SELECT event_store_begin_epoch()")
108+
.execute(&mut **tx)
109+
.await?;
110+
111+
if sqlx::query_scalar(&format!(
112+
"SELECT EXISTS (SELECT 1 FROM event WHERE {})",
113+
CriteriaBuilder::new(&query.change_origin(version)).build()
114+
))
115+
.fetch_one(&mut **tx)
116+
.await?
117+
{
118+
return Err(Error::Concurrency);
119+
}
120+
121+
let mut insert = InsertEventsBuilder::new(&events, &self.serde);
122+
let event_ids: Vec<PgEventId> = insert
123+
.build()
124+
.fetch_all(&mut **tx)
125+
.await?
126+
.into_iter()
127+
.map(|r| r.get(0))
128+
.collect();
129+
130+
let persisted_events = event_ids
131+
.iter()
132+
.zip(events)
133+
.map(|(event_id, event)| PersistedEvent::new(*event_id, event))
134+
.collect::<Vec<_>>();
135+
136+
Ok(persisted_events)
137+
}
138+
87139
/// Streams events based on the provided query and executor.
88140
///
141+
/// Events are returned up to the current epoch, which is the minimum between
142+
/// the lowest in-flight writer's sequence position and the highest committed
143+
/// event ID. This guarantees that readers never observe uncommitted events
144+
/// and never skip events when concurrent writers commit out of sequence order.
145+
///
146+
/// The stream yields `StreamItem::Event` for each matching event and ends
147+
/// with `StreamItem::End(epoch_id)` indicating the epoch up to which events
148+
/// were read.
149+
///
89150
/// # Arguments
90151
///
91152
/// * `executor` - The sqlx executor to use for database operations.
92153
/// * `query` - The stream query specifying the criteria for filtering events.
93154
///
94155
/// # Returns
95156
///
96-
/// A `Result` containing a boxed stream of `PersistedEvent` that matches the query criteria,
97-
/// or an error of type `Error`.
157+
/// A boxed stream of `StreamItem` that contains matching events up to the
158+
/// current epoch, or an error of type `Error`.
98159
pub(crate) fn stream_with<'a, QE, EX>(
99160
&'a self,
100161
executor: EX,
@@ -107,7 +168,7 @@ where
107168
{
108169
let sql = format!(
109170
r#"SELECT event.event_id, event.payload, epoch.__epoch_id
110-
FROM (SELECT COALESCE(MAX(event_id),0) AS __epoch_id FROM event) AS epoch
171+
FROM (values (event_store_current_epoch())) AS epoch(__epoch_id)
111172
LEFT JOIN event ON event.event_id <= epoch.__epoch_id AND ({criteria})
112173
ORDER BY event_id ASC"#,
113174
criteria = CriteriaBuilder::new(query).build()
@@ -203,34 +264,8 @@ where
203264
QE: Event + Clone + Send + Sync,
204265
{
205266
let mut tx = self.pool.begin().await?;
206-
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
207-
.execute(&mut *tx)
208-
.await?;
209267

210-
if sqlx::query_scalar(&format!(
211-
"SELECT EXISTS (SELECT 1 FROM event WHERE {})",
212-
CriteriaBuilder::new(&query.change_origin(version)).build()
213-
))
214-
.fetch_one(&mut *tx)
215-
.await?
216-
{
217-
return Err(Error::Concurrency);
218-
}
219-
220-
let mut insert = InsertEventsBuilder::new(&events, &self.serde);
221-
let event_ids: Vec<PgEventId> = insert
222-
.build()
223-
.fetch_all(&mut *tx)
224-
.await?
225-
.into_iter()
226-
.map(|r| r.get(0))
227-
.collect();
228-
229-
let persisted_events = event_ids
230-
.iter()
231-
.zip(events)
232-
.map(|(event_id, event)| PersistedEvent::new(*event_id, event))
233-
.collect::<Vec<_>>();
268+
let persisted_events = self.append_in_tx(&mut tx, events, query, version).await?;
234269

235270
tx.commit().await.map_err(map_concurrency_err)?;
236271

@@ -257,6 +292,10 @@ where
257292
{
258293
let mut tx = self.pool.begin().await?;
259294

295+
sqlx::query("SELECT event_store_begin_epoch()")
296+
.execute(&mut *tx)
297+
.await?;
298+
260299
let mut insert = InsertEventsBuilder::new(&events, &self.serde);
261300
let event_ids: Vec<PgEventId> = insert
262301
.build()
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
CREATE OR REPLACE FUNCTION event_store_begin_epoch()
2+
RETURNS void AS $$
3+
DECLARE
4+
id BIGINT;
5+
db_id INT;
6+
BEGIN
7+
SELECT last_value INTO id FROM seq_event_event_id;
8+
SELECT oid INTO db_id FROM pg_database WHERE datname = current_database();
9+
10+
PERFORM pg_try_advisory_xact_lock_shared(db_id, 0);
11+
PERFORM pg_try_advisory_xact_lock_shared(1, (id & 0xFFFFFFFF)::bit(32)::integer);
12+
PERFORM pg_try_advisory_xact_lock_shared(2, (id >> 32)::bit(32)::integer);
13+
END;
14+
$$ LANGUAGE plpgsql;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
CREATE OR REPLACE FUNCTION event_store_current_epoch()
2+
RETURNS BIGINT AS $$
3+
DECLARE
4+
persisted_event_id BIGINT;
5+
pending_event_id BIGINT;
6+
db_id INT;
7+
BEGIN
8+
SELECT COALESCE(MAX(event_id), 0) INTO persisted_event_id FROM event;
9+
SELECT oid INTO db_id FROM pg_database WHERE datname = current_database();
10+
11+
SELECT MIN((l3.objid::bigint << 32) + l2.objid::bigint)
12+
INTO pending_event_id
13+
FROM pg_locks l1
14+
INNER JOIN pg_locks l2 ON l1.pid = l2.pid
15+
INNER JOIN pg_locks l3 ON l1.pid = l3.pid
16+
WHERE
17+
l1.classid = db_id
18+
AND l2.classid = 1
19+
AND l3.classid = 2
20+
AND l1.locktype = 'advisory';
21+
22+
RETURN LEAST(COALESCE(pending_event_id, persisted_event_id), persisted_event_id);
23+
END;
24+
$$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)