Skip to content

Commit cdb44c4

Browse files
committed
Update example to use wrapping transaction
1 parent 3a064f0 commit cdb44c4

File tree

1 file changed

+44
-17
lines changed

1 file changed

+44
-17
lines changed

README.md

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,22 @@ end_at >= now() - interval '7 days' AND start_at <= now()
7777
The stats can be written with `store`, read with `load` + `decompress`, and deleted with `delete`:
7878

7979
```rs
80-
async fn example() {
80+
async fn example() -> anyhow::Result<()> {
8181
let database_id = 1;
8282
let start = SystemTime::UNIX_EPOCH;
8383
let end = SystemTime::now();
84-
let db = &DB_POOL.get().await.unwrap();
84+
let db = &DB_POOL.get().await?;
8585

8686
// Write
8787
let stats = vec![QueryStat { database_id, collected_at: end - Duration::from_secs(120), fingerprint: 1, calls: 1, total_time: 1.0 }];
88-
QueryStats::store(db, stats).await.unwrap();
88+
QueryStats::store(db, stats).await?;
8989
let stats = vec![QueryStat { database_id, collected_at: end - Duration::from_secs(60), fingerprint: 1, calls: 1, total_time: 1.0 }];
90-
QueryStats::store(db, stats).await.unwrap();
90+
QueryStats::store(db, stats).await?;
9191

9292
// Read
9393
let mut calls = 0;
94-
for group in QueryStats::load(db, &[database_id], start, end).await.unwrap() {
95-
for stat in group.decompress().unwrap() {
94+
for group in QueryStats::load(db, &[database_id], start, end).await? {
95+
for stat in group.decompress()? {
9696
calls += stat.calls;
9797
}
9898
}
@@ -103,22 +103,26 @@ async fn example() {
103103
// Note: you'll want to choose the time range passed to `delete` so it only groups, for example, stats
104104
// from the past day into a fewer number of rows. There's a balance to be reached between compression
105105
// ratio and not slowing down read queries with unwanted data from outside the requested time range.
106-
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
107-
let mut stats = Vec::new();
108-
for group in QueryStats::delete(db, &[database_id], start, end).await.unwrap() {
109-
for stat in group.decompress().unwrap() {
110-
stats.push(stat);
106+
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
107+
transaction!(db, {
108+
let mut stats = Vec::new();
109+
for group in QueryStats::delete(db, &[database_id], start, end).await? {
110+
for stat in group.decompress()? {
111+
stats.push(stat);
112+
}
111113
}
112-
}
113-
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
114-
QueryStats::store(db, stats).await.unwrap();
115-
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
116-
let group = QueryStats::load(db, &[database_id], start, end).await.unwrap().remove(0);
114+
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
115+
QueryStats::store(db, stats).await?;
116+
});
117+
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
118+
let group = QueryStats::load(db, &[database_id], start, end).await?.remove(0);
117119
assert_eq!(group.start_at, end - Duration::from_secs(120));
118120
assert_eq!(group.end_at, end - Duration::from_secs(60));
119-
let stats = group.decompress().unwrap();
121+
let stats = group.decompress()?;
120122
assert_eq!(stats[0].collected_at, end - Duration::from_secs(120));
121123
assert_eq!(stats[1].collected_at, end - Duration::from_secs(60));
124+
125+
Ok(())
122126
}
123127

124128
use std::str::FromStr;
@@ -130,6 +134,28 @@ pub static DB_POOL: std::sync::LazyLock<std::sync::Arc<deadpool_postgres::Pool>>
130134
let mgr = deadpool_postgres::Manager::from_config(pg_config, tokio_postgres::NoTls, mgr_config);
131135
deadpool_postgres::Pool::builder(mgr).build().unwrap().into()
132136
});
137+
138+
#[macro_export]
139+
macro_rules! transaction {
140+
($db: ident, $block: expr) => {
141+
$db.execute("BEGIN", &[]).await?;
142+
let result: anyhow::Result<()> = (|| async {
143+
$block
144+
Ok(())
145+
})().await;
146+
match result {
147+
Ok(result) => {
148+
$db.execute("COMMIT", &[]).await?;
149+
result
150+
}
151+
Err(err) => {
152+
$db.execute("ROLLBACK", &[]).await?;
153+
anyhow::bail!(err);
154+
}
155+
}
156+
}
157+
}
158+
pub use transaction;
133159
```
134160

135161
Additional examples can be found in [tests/tests.rs](tests/tests.rs).
@@ -140,3 +166,4 @@ Additional examples can be found in [tests/tests.rs](tests/tests.rs).
140166
- support other storage models (filesystem, S3, etc)
141167
- support compression for other data types (text, enums, etc)
142168
- add a stream/generator API to avoid allocating Vecs when loading data
169+
- [add `copy_in` support to deadpool_postgres and tokio_postgres `GenericClient`](https://github.com/deadpool-rs/deadpool/issues/397)

0 commit comments

Comments
 (0)