Skip to content

Commit cb98e5d

Browse files
committed
Implement compact
1 parent 449d89d commit cb98e5d

File tree

2 files changed

+48
-42
lines changed

2 files changed

+48
-42
lines changed

README.md

Lines changed: 5 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,14 @@ async fn example() -> anyhow::Result<()> {
9898
}
9999
assert_eq!(calls, 2);
100100

101-
// Delete and re-group to improve compression ratio
101+
// Improves the compression ratio by deleting and re-inserting the data into fewer rows.
102102
//
103-
// Note: you'll want to choose the time range passed to `delete` so it only groups, for example, stats
104-
// from the past day into a fewer number of rows. There's a balance to be reached between compression
105-
// ratio and not slowing down read queries with unwanted data from outside the requested time range.
103+
// There's a balance to be reached between improving the compression ratio and forcing future read
104+
// queries to load a lot of unwanted data. The exact threshold will depend on the volume of data,
105+
// but 50 thousand rows per group may be a good goal.
106106
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
107-
transaction!(db, {
108-
let mut stats = Vec::new();
109-
for group in QueryStats::delete(db, &[database_id], start, end).await? {
110-
for stat in group.decompress()? {
111-
stats.push(stat);
112-
}
113-
}
114-
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
115-
QueryStats::store(db, stats).await?;
116-
});
107+
QueryStats::compact(db, &[database_id], start, end).await?
117108
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
118-
let group = QueryStats::load(db, &[database_id], start, end).await?.remove(0);
119-
assert_eq!(group.start_at, end - Duration::from_secs(120));
120-
assert_eq!(group.end_at, end - Duration::from_secs(60));
121-
let stats = group.decompress()?;
122-
assert_eq!(stats[0].collected_at, end - Duration::from_secs(120));
123-
assert_eq!(stats[1].collected_at, end - Duration::from_secs(60));
124109

125110
Ok(())
126111
}
@@ -134,28 +119,6 @@ pub static DB_POOL: std::sync::LazyLock<std::sync::Arc<deadpool_postgres::Pool>>
134119
let mgr = deadpool_postgres::Manager::from_config(pg_config, tokio_postgres::NoTls, mgr_config);
135120
deadpool_postgres::Pool::builder(mgr).build().unwrap().into()
136121
});
137-
138-
#[macro_export]
139-
macro_rules! transaction {
140-
($db: ident, $block: expr) => {
141-
$db.execute("BEGIN", &[]).await?;
142-
let result: anyhow::Result<()> = (|| async {
143-
$block
144-
Ok(())
145-
})().await;
146-
match result {
147-
Ok(result) => {
148-
$db.execute("COMMIT", &[]).await?;
149-
result
150-
}
151-
Err(err) => {
152-
$db.execute("ROLLBACK", &[]).await?;
153-
anyhow::bail!(err);
154-
}
155-
}
156-
}
157-
}
158-
pub use transaction;
159122
```
160123

161124
Additional examples can be found in [tests/tests.rs](tests/tests.rs).

src/lib.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,28 @@ use quote::quote;
44
use syn::parse::{Parse, ParseStream};
55
use syn::{Ident, ItemStruct, Lit, Result, Token, Type, bracketed, parse_macro_input};
66

7+
#[macro_export]
8+
macro_rules! transaction {
9+
($db: ident, $block: expr) => {
10+
$db.execute("BEGIN", &[]).await?;
11+
let result: anyhow::Result<()> = (|| async {
12+
$block
13+
Ok(())
14+
})().await;
15+
match result {
16+
Ok(result) => {
17+
$db.execute("COMMIT", &[]).await?;
18+
result
19+
}
20+
Err(err) => {
21+
$db.execute("ROLLBACK", &[]).await?;
22+
anyhow::bail!(err);
23+
}
24+
}
25+
}
26+
}
27+
pub use transaction;
28+
729
struct Arguments {
830
timestamp: Option<Ident>,
931
group_by: Vec<Ident>,
@@ -73,6 +95,7 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
7395
// load and delete
7496
let mut fields = Vec::new();
7597
let mut load_filters = Vec::new();
98+
let mut load_filter_names = Vec::new();
7699
let mut load_checks = Vec::new();
77100
let mut load_where = Vec::new();
78101
let mut load_params = Vec::new();
@@ -85,6 +108,7 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
85108
if group_by.iter().any(|i| *i == ident) {
86109
fields.push(quote! { pub #ident: #ty, });
87110
load_filters.push(quote! { #ident: &[#ty], });
111+
load_filter_names.push(quote! { #ident, });
88112
let name = format!("{ident}");
89113
load_checks.push(quote! {
90114
if #ident.is_empty() {
@@ -126,11 +150,13 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
126150
filter_start: SystemTime,
127151
filter_end: SystemTime,
128152
});
153+
load_filter_names.push(quote! { filter_start, filter_end, });
129154
load_fields.push(quote! { filter: true, filter_start, filter_end, });
130155
delete_fields.push(quote! { filter: false, filter_start, filter_end, });
131156
}
132157
let fields = tokens(fields);
133158
let load_filters = tokens(load_filters);
159+
let load_filter_names = tokens(load_filter_names);
134160
let load_checks = tokens(load_checks);
135161
let load_where = if load_where.is_empty() { "true".to_string() } else { load_where.join(" AND ") };
136162
let load_params = tokens(load_params);
@@ -296,6 +322,23 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
296322
Ok(results)
297323
}
298324

325+
/// Improves the compression ratio by deleting multiple rows and re-inserting the data into fewer rows.
326+
///
327+
/// There's a balance to be reached between improving the compression ratio and forcing future read
328+
/// queries to load a lot of unwanted data. The exact threshold will depend on the volume of data,
329+
/// but 50 thousand rows per group may be a good goal.
330+
pub async fn compact(db: &deadpool_postgres::Object, #load_filters) -> anyhow::Result<()> {
331+
pco_store::transaction!(db, {
332+
let mut rows = Vec::new();
333+
for group in Self::delete(db, #load_filter_names).await? {
334+
for row in group.decompress()? {
335+
rows.push(row);
336+
}
337+
}
338+
Self::store(db, rows).await?;
339+
});
340+
}
341+
299342
/// Decompresses a group of data points.
300343
pub fn decompress(self) -> anyhow::Result<Vec<#name>> {
301344
let mut results = Vec::new();

0 commit comments

Comments
 (0)