Skip to content

Commit 02cbb34

Browse files
authored
Add store_grouped function (#14)
1 parent 04da2f6 commit 02cbb34

File tree

8 files changed

+513
-37
lines changed

8 files changed

+513
-37
lines changed

README.md

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,14 @@ With a Rust struct that groups timeseries stats into a single row per `database_
4141
```rs
4242
use std::time::{Duration, SystemTime};
4343

44-
#[pco_store::store(timestamp = collected_at, group_by = [database_id], float_round = 2)]
44+
#[pco_store::store(timestamp = collected_at, group_by = [database_id, granularity], float_round = 2)]
4545
pub struct QueryStat {
4646
pub database_id: i64,
47+
/// Number of seconds captured in the query stat. 60 = 1 minute source data, 3600 = 1 hour aggregation
48+
pub granularity: i32,
4749
pub collected_at: SystemTime,
4850
pub fingerprint: i64,
4951
pub calls: i64,
50-
pub total_time: f64,
5152
}
5253
```
5354

@@ -56,25 +57,24 @@ And a matching Postgres table:
5657
```sql
5758
CREATE TABLE query_stats (
5859
database_id bigint NOT NULL,
60+
granularity int NOT NULL,
5961
start_at timestamptz NOT NULL,
6062
end_at timestamptz NOT NULL,
6163
collected_at bytea STORAGE EXTERNAL NOT NULL,
6264
fingerprint bytea STORAGE EXTERNAL NOT NULL,
63-
calls bytea STORAGE EXTERNAL NOT NULL,
64-
total_time bytea STORAGE EXTERNAL NOT NULL
65-
);
66-
CREATE INDEX ON query_stats USING btree (database_id);
67-
CREATE INDEX ON query_stats USING btree (end_at, start_at);
68-
```
65+
calls bytea STORAGE EXTERNAL NOT NULL
66+
) PARTITION BY LIST (granularity);
6967

70-
`STORAGE EXTERNAL` is set so that Postgres doesn't try to compress the already-compressed fields
68+
CREATE TABLE query_stats_1min PARTITION OF query_stats FOR VALUES IN (60);
69+
CREATE TABLE query_stats_1hour PARTITION OF query_stats FOR VALUES IN (3600);
7170

72-
This uses a `(end_at, start_at)` index because it's more selective than `(start_at, end_at)` for common use cases. For example when loading the last week of stats, the `end_at` filter is what's doing the work to filter out rows.
73-
```sql
74-
end_at >= now() - interval '7 days' AND start_at <= now()
71+
CREATE INDEX ON query_stats USING btree (database_id, end_at, start_at, granularity);
7572
```
7673

77-
The stats can be written with `store`, read with `load` + `decompress`, and deleted with `delete`:
74+
The stats can be:
75+
- written with `store`
76+
- read with `load` + `decompress`
77+
- rewritten for better compression with `delete` + `store_grouped`
7878

7979
```rs
8080
async fn example() -> anyhow::Result<()> {
@@ -84,9 +84,10 @@ async fn example() -> anyhow::Result<()> {
8484
let db = &DB_POOL.get().await?;
8585

8686
// Write
87-
let stats = vec![QueryStat { database_id, collected_at: end - Duration::from_secs(120), fingerprint: 1, calls: 1, total_time: 1.0 }];
87+
let default = QueryStat { database_id, granularity: 60, collected_at: end, fingerprint: 1, calls: 1 };
88+
let stats = vec![QueryStat { collected_at: end - Duration::from_secs(120), ..default }];
8889
CompressedQueryStats::store(db, stats).await?;
89-
let stats = vec![QueryStat { database_id, collected_at: end - Duration::from_secs(60), fingerprint: 1, calls: 1, total_time: 1.0 }];
90+
let stats = vec![QueryStat { collected_at: end - Duration::from_secs(60), ..default }];
9091
CompressedQueryStats::store(db, stats).await?;
9192

9293
// Read
@@ -98,21 +99,20 @@ async fn example() -> anyhow::Result<()> {
9899
}
99100
assert_eq!(calls, 2);
100101

101-
// Delete and re-group to improve compression ratio
102-
//
103-
// Note: you'll want to choose the time range passed to `delete` so it only groups, for example, stats
104-
// from the past day into a fewer number of rows. There's a balance to be reached between compression
105-
// ratio and not slowing down read queries with unwanted data from outside the requested time range.
102+
// Delete and re-group to improve compression ratio. This example compacts data into a single row per day.
103+
// The ideal group size will depend on the size and volume of your data.
106104
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
107105
transaction!(db, {
108106
let mut stats = Vec::new();
109107
for group in CompressedQueryStats::delete(db, &[database_id], start, end).await? {
110-
for stat in group.decompress()? {
111-
stats.push(stat);
112-
}
108+
stats.extend(group.decompress()?);
113109
}
114110
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
115-
CompressedQueryStats::store(db, stats).await?;
111+
CompressedQueryStats::store_grouped(db, stats, |stat| {
112+
let collected_at: chrono::DateTime<chrono::Utc> = stat.collected_at.into();
113+
collected_at.duration_trunc(chrono::Duration::days(1)).ok()
114+
})
115+
.await?;
116116
});
117117
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
118118
let group = CompressedQueryStats::load(db, &[database_id], start, end).await?.remove(0);

src/lib.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
313313
Ok(results)
314314
}
315315

316-
/// Writes the provided data to disk.
316+
/// Writes the data to disk.
317317
pub async fn store(db: &deadpool_postgres::Object, rows: Vec<#name>) -> anyhow::Result<()> {
318318
if rows.is_empty() {
319319
return Ok(());
@@ -334,6 +334,39 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
334334
writer.finish().await?;
335335
Ok(())
336336
}
337+
338+
/// Writes the data to disk, with the provided grouping closure applied.
339+
///
340+
/// This can be used to improve the compression ratio and reduce read IO, for example
341+
/// by compacting real-time data into a single row per hour / day / week.
342+
pub async fn store_grouped<F, R>(
343+
db: &deadpool_postgres::Object,
344+
rows: Vec<#name>,
345+
grouping: F,
346+
) -> anyhow::Result<()>
347+
where
348+
F: Fn(&#name) -> R,
349+
R: Eq + std::hash::Hash,
350+
{
351+
if rows.is_empty() {
352+
return Ok(());
353+
}
354+
let mut grouped_rows: ahash::AHashMap<_, Vec<#name>> = ahash::AHashMap::new();
355+
for row in rows {
356+
grouped_rows.entry((#store_group grouping(&row))).or_default().push(row);
357+
}
358+
let sql = #store_sql;
359+
let types = &[#store_types];
360+
let stmt = db.copy_in(&db.prepare_cached(&sql).await?).await?;
361+
let writer = tokio_postgres::binary_copy::BinaryCopyInWriter::new(stmt, types);
362+
futures::pin_mut!(writer);
363+
for rows in grouped_rows.into_values() {
364+
#timestamp_collect
365+
writer.as_mut().write(&[#store_values]).await?;
366+
}
367+
writer.finish().await?;
368+
Ok(())
369+
}
337370
}
338371
}
339372
.into()

tests/expand/boolean.expanded.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl CompressedQueryStats {
8686
}
8787
Ok(results)
8888
}
89-
/// Writes the provided data to disk.
89+
/// Writes the data to disk.
9090
pub async fn store(
9191
db: &deadpool_postgres::Object,
9292
rows: Vec<QueryStat>,
@@ -134,4 +134,60 @@ impl CompressedQueryStats {
134134
writer.finish().await?;
135135
Ok(())
136136
}
137+
/// Writes the data to disk, with the provided grouping closure applied.
138+
///
139+
/// This can be used to improve the compression ratio and reduce read IO, for example
140+
/// by compacting real-time data into a single row per hour / day / week.
141+
pub async fn store_grouped<F, R>(
142+
db: &deadpool_postgres::Object,
143+
rows: Vec<QueryStat>,
144+
grouping: F,
145+
) -> anyhow::Result<()>
146+
where
147+
F: Fn(&QueryStat) -> R,
148+
R: Eq + std::hash::Hash,
149+
{
150+
if rows.is_empty() {
151+
return Ok(());
152+
}
153+
let mut grouped_rows: ahash::AHashMap<_, Vec<QueryStat>> = ahash::AHashMap::new();
154+
for row in rows {
155+
grouped_rows.entry((row.database_id, grouping(&row))).or_default().push(row);
156+
}
157+
let sql = "COPY query_stats (database_id, toplevel, calls) FROM STDIN BINARY";
158+
let types = &[
159+
tokio_postgres::types::Type::INT8,
160+
tokio_postgres::types::Type::BYTEA,
161+
tokio_postgres::types::Type::BYTEA,
162+
];
163+
let stmt = db.copy_in(&db.prepare_cached(&sql).await?).await?;
164+
let writer = tokio_postgres::binary_copy::BinaryCopyInWriter::new(stmt, types);
165+
let mut writer = writer;
166+
#[allow(unused_mut)]
167+
let mut writer = unsafe {
168+
::pin_utils::core_reexport::pin::Pin::new_unchecked(&mut writer)
169+
};
170+
for rows in grouped_rows.into_values() {
171+
writer
172+
.as_mut()
173+
.write(
174+
&[
175+
&rows[0].database_id,
176+
&::pco::standalone::simpler_compress(
177+
&rows.iter().map(|r| r.toplevel as u16).collect::<Vec<_>>(),
178+
::pco::DEFAULT_COMPRESSION_LEVEL,
179+
)
180+
.unwrap(),
181+
&::pco::standalone::simpler_compress(
182+
&rows.iter().map(|r| r.calls).collect::<Vec<_>>(),
183+
::pco::DEFAULT_COMPRESSION_LEVEL,
184+
)
185+
.unwrap(),
186+
],
187+
)
188+
.await?;
189+
}
190+
writer.finish().await?;
191+
Ok(())
192+
}
137193
}

tests/expand/float_round.expanded.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl CompressedQueryStats {
8787
}
8888
Ok(results)
8989
}
90-
/// Writes the provided data to disk.
90+
/// Writes the data to disk.
9191
pub async fn store(
9292
db: &deadpool_postgres::Object,
9393
rows: Vec<QueryStat>,
@@ -138,4 +138,63 @@ impl CompressedQueryStats {
138138
writer.finish().await?;
139139
Ok(())
140140
}
141+
/// Writes the data to disk, with the provided grouping closure applied.
142+
///
143+
/// This can be used to improve the compression ratio and reduce read IO, for example
144+
/// by compacting real-time data into a single row per hour / day / week.
145+
pub async fn store_grouped<F, R>(
146+
db: &deadpool_postgres::Object,
147+
rows: Vec<QueryStat>,
148+
grouping: F,
149+
) -> anyhow::Result<()>
150+
where
151+
F: Fn(&QueryStat) -> R,
152+
R: Eq + std::hash::Hash,
153+
{
154+
if rows.is_empty() {
155+
return Ok(());
156+
}
157+
let mut grouped_rows: ahash::AHashMap<_, Vec<QueryStat>> = ahash::AHashMap::new();
158+
for row in rows {
159+
grouped_rows.entry((row.database_id, grouping(&row))).or_default().push(row);
160+
}
161+
let sql = "COPY query_stats (database_id, calls, total_time) FROM STDIN BINARY";
162+
let types = &[
163+
tokio_postgres::types::Type::INT8,
164+
tokio_postgres::types::Type::BYTEA,
165+
tokio_postgres::types::Type::BYTEA,
166+
];
167+
let stmt = db.copy_in(&db.prepare_cached(&sql).await?).await?;
168+
let writer = tokio_postgres::binary_copy::BinaryCopyInWriter::new(stmt, types);
169+
let mut writer = writer;
170+
#[allow(unused_mut)]
171+
let mut writer = unsafe {
172+
::pin_utils::core_reexport::pin::Pin::new_unchecked(&mut writer)
173+
};
174+
for rows in grouped_rows.into_values() {
175+
writer
176+
.as_mut()
177+
.write(
178+
&[
179+
&rows[0].database_id,
180+
&::pco::standalone::simpler_compress(
181+
&rows.iter().map(|r| r.calls).collect::<Vec<_>>(),
182+
::pco::DEFAULT_COMPRESSION_LEVEL,
183+
)
184+
.unwrap(),
185+
&::pco::standalone::simpler_compress(
186+
&rows
187+
.iter()
188+
.map(|r| (r.total_time * 100f32 as f64).round() as i64)
189+
.collect::<Vec<_>>(),
190+
::pco::DEFAULT_COMPRESSION_LEVEL,
191+
)
192+
.unwrap(),
193+
],
194+
)
195+
.await?;
196+
}
197+
writer.finish().await?;
198+
Ok(())
199+
}
141200
}

0 commit comments

Comments
 (0)