Skip to content

Commit 6b980f2

Browse files
committed
Support filtering by any fields
1 parent cf33268 commit 6b980f2

File tree

10 files changed

+5792
-721
lines changed

10 files changed

+5792
-721
lines changed

Cargo.lock

Lines changed: 241 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ futures = "0.3"
2020
pco = "0.4"
2121
proc-macro2 = "1.0"
2222
quote = "1.0"
23+
serde = { version = "1.0", features = ["derive"] }
24+
serde_json = { version = "1.0" }
25+
serde_with = "3.16"
2326
syn = "2.0"
2427
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
2528

src/lib.rs

Lines changed: 203 additions & 53 deletions
Large diffs are not rendered by default.

tests/chrono.rs

Lines changed: 475 additions & 475 deletions
Large diffs are not rendered by default.

tests/expand/boolean.expanded.rs

Lines changed: 546 additions & 18 deletions
Large diffs are not rendered by default.

tests/expand/float_round.expanded.rs

Lines changed: 546 additions & 18 deletions
Large diffs are not rendered by default.

tests/expand/no_group_by.expanded.rs

Lines changed: 531 additions & 6 deletions
Large diffs are not rendered by default.

tests/expand/query_stats.expanded.rs

Lines changed: 1546 additions & 41 deletions
Large diffs are not rendered by default.

tests/expand/query_stats_chrono.expanded.rs

Lines changed: 1670 additions & 73 deletions
Large diffs are not rendered by default.

tests/tests.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ async fn timestamp() {
6565

6666
// Read
6767
let mut calls = 0;
68-
for group in CompressedQueryStats::load(db, &[database_id], start, end).await.unwrap() {
68+
let filter = Filter { database_id: vec![database_id], collected_at: Some(start..=end), ..Filter::default() };
69+
for group in CompressedQueryStats::load(db, filter).await.unwrap() {
6970
for stat in group.decompress().unwrap() {
7071
calls += stat.calls;
7172
}
@@ -75,7 +76,8 @@ async fn timestamp() {
7576
// Delete and re-group to improve compression
7677
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
7778
let mut stats = Vec::new();
78-
for group in CompressedQueryStats::delete(db, &[database_id], start, end).await.unwrap() {
79+
let filter = Filter { database_id: vec![database_id], collected_at: Some(start..=end), ..Filter::default() };
80+
for group in CompressedQueryStats::delete(db, filter.clone()).await.unwrap() {
7981
stats.extend(group.decompress().unwrap());
8082
}
8183
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
@@ -86,9 +88,7 @@ async fn timestamp() {
8688
.await
8789
.unwrap();
8890
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
89-
let group = CompressedQueryStats::load(db, &[database_id], start, end).await.unwrap().remove(0);
90-
assert_eq!(group.start_at, end - Duration::from_secs(120));
91-
assert_eq!(group.end_at, end - Duration::from_secs(60));
91+
let group = CompressedQueryStats::load(db, filter.clone()).await.unwrap().remove(0);
9292
let stats = group.decompress().unwrap();
9393
assert_eq!(stats[0].collected_at, end - Duration::from_secs(120));
9494
assert_eq!(stats[1].collected_at, end - Duration::from_secs(60));
@@ -106,7 +106,8 @@ async fn timestamp() {
106106
}
107107
let start = start + Duration::from_secs(3 * 60); // minute 3, skipping the first 2 minutes in the group
108108
let end = start + Duration::from_secs(23 * 60); // minute 26, skipping the last 4 minutes in the group
109-
let groups = CompressedQueryStats::load(db, &[database_id], start, end).await.unwrap();
109+
let filter = Filter { collected_at: Some(start..=end), ..filter };
110+
let groups = CompressedQueryStats::load(db, filter).await.unwrap();
110111
assert_eq!(3, groups.len());
111112
let (mut calls, mut min, mut max) = (0, SystemTime::now(), SystemTime::UNIX_EPOCH);
112113
for group in groups {
@@ -136,7 +137,8 @@ async fn timestamp() {
136137
let end = end + Duration::from_secs(5 * 60); // minute 31
137138
let stat = QueryStat { database_id, collected_at: end, fingerprint: 1, calls: 1, total_time: 1.0, new_col: 1 };
138139
CompressedQueryStats::store(db, vec![stat]).await.unwrap();
139-
let groups = CompressedQueryStats::load(db, &[database_id], start, end).await.unwrap();
140+
let filter = Filter { database_id: vec![database_id], collected_at: Some(start..=end), ..Filter::default() };
141+
let groups = CompressedQueryStats::load(db, filter).await.unwrap();
140142
assert_eq!(4, groups.len());
141143
let (mut calls, mut new_col, mut min, mut max) = (0, 0, SystemTime::now(), SystemTime::UNIX_EPOCH);
142144
for group in groups {
@@ -199,7 +201,8 @@ async fn aggregate() {
199201

200202
// Read
201203
let mut calls = 0;
202-
for group in CompressedQueryStats::load(db, &[database_id], &[granularity], start, end).await.unwrap() {
204+
let filter = Filter { database_id: vec![database_id], granularity: vec![granularity], collected_at: Some(start..=end), ..Filter::default() };
205+
for group in CompressedQueryStats::load(db, filter.clone()).await.unwrap() {
203206
for stat in group.decompress().unwrap() {
204207
calls += stat.calls;
205208
}
@@ -211,7 +214,8 @@ async fn aggregate() {
211214
let mut stats: AHashMap<_, QueryStat> = AHashMap::new();
212215
let start: SystemTime = DateTime::<Utc>::from(end - Duration::from_secs(3600)).duration_trunc(chrono::Duration::hours(1)).unwrap().into();
213216
let end = start + Duration::from_secs(3600);
214-
for group in CompressedQueryStats::load(db, &[database_id], &[60], start, end).await.unwrap() {
217+
let filter = Filter { collected_at: Some(start..=end), ..filter };
218+
for group in CompressedQueryStats::load(db, filter.clone()).await.unwrap() {
215219
for stat in group.decompress().unwrap() {
216220
match stats.entry((stat.database_id, stat.fingerprint)) {
217221
Entry::Occupied(mut entry) => {
@@ -231,9 +235,8 @@ async fn aggregate() {
231235
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
232236
CompressedQueryStats::store(db, stats).await.unwrap();
233237
assert_eq!(3, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
234-
let group = CompressedQueryStats::load(db, &[database_id], &[3600], start, end).await.unwrap().remove(0);
235-
assert_eq!(group.start_at, start);
236-
assert_eq!(group.end_at, start);
238+
let filter = Filter { granularity: vec![3600], ..filter };
239+
let group = CompressedQueryStats::load(db, filter).await.unwrap().remove(0);
237240
let stats = group.decompress().unwrap();
238241
assert_eq!(stats[0].collected_at, start);
239242
assert_eq!(stats[0].calls, 2);
@@ -269,7 +272,8 @@ async fn no_timestamp() {
269272

270273
// Read
271274
let mut calls = 0;
272-
for group in CompressedQueryStats::load(db, &[database_id]).await.unwrap() {
275+
let filter = Filter { database_id: vec![database_id], ..Filter::default() };
276+
for group in CompressedQueryStats::load(db, filter.clone()).await.unwrap() {
273277
for stat in group.decompress().unwrap() {
274278
calls += stat.calls;
275279
}
@@ -279,15 +283,15 @@ async fn no_timestamp() {
279283
// Delete and re-group
280284
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
281285
let mut stats = Vec::new();
282-
for group in CompressedQueryStats::delete(db, &[database_id]).await.unwrap() {
286+
for group in CompressedQueryStats::delete(db, filter.clone()).await.unwrap() {
283287
for stat in group.decompress().unwrap() {
284288
stats.push(stat);
285289
}
286290
}
287291
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
288292
CompressedQueryStats::store(db, stats).await.unwrap();
289293
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
290-
let group = CompressedQueryStats::load(db, &[database_id]).await.unwrap().remove(0);
294+
let group = CompressedQueryStats::load(db, filter).await.unwrap().remove(0);
291295
let stats = group.decompress().unwrap();
292296
assert_eq!(stats[0].calls, 1);
293297
assert_eq!(stats[1].calls, 2);
@@ -322,7 +326,8 @@ async fn no_group_by() {
322326

323327
// Read
324328
let mut calls = 0;
325-
for group in CompressedQueryStats::load(db).await.unwrap() {
329+
let filter = Filter::default();
330+
for group in CompressedQueryStats::load(db, filter.clone()).await.unwrap() {
326331
for stat in group.decompress().unwrap() {
327332
calls += stat.calls;
328333
}
@@ -332,15 +337,15 @@ async fn no_group_by() {
332337
// Delete and re-group
333338
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
334339
let mut stats = Vec::new();
335-
for group in CompressedQueryStats::delete(db).await.unwrap() {
340+
for group in CompressedQueryStats::delete(db, filter.clone()).await.unwrap() {
336341
for stat in group.decompress().unwrap() {
337342
stats.push(stat);
338343
}
339344
}
340345
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
341346
CompressedQueryStats::store(db, stats).await.unwrap();
342347
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await.unwrap().get::<_, i64>(0));
343-
let group = CompressedQueryStats::load(db).await.unwrap().remove(0);
348+
let group = CompressedQueryStats::load(db, filter).await.unwrap().remove(0);
344349
let stats = group.decompress().unwrap();
345350
assert_eq!(stats[0].calls, 1);
346351
assert_eq!(stats[1].calls, 2);
@@ -375,7 +380,8 @@ async fn table_name() {
375380

376381
// Read
377382
let mut calls = 0;
378-
for group in CompressedQueryStats::load(db).await.unwrap() {
383+
let filter = Filter::default();
384+
for group in CompressedQueryStats::load(db, filter).await.unwrap() {
379385
for stat in group.decompress().unwrap() {
380386
calls += stat.calls;
381387
}
@@ -412,7 +418,8 @@ async fn float_round() {
412418

413419
// Read
414420
let mut total_time = 0.0;
415-
for group in CompressedQueryStats::load(db, &[database_id]).await.unwrap() {
421+
let filter = Filter { database_id: vec![database_id], ..Filter::default() };
422+
for group in CompressedQueryStats::load(db, filter).await.unwrap() {
416423
for stat in group.decompress().unwrap() {
417424
total_time += stat.total_time;
418425
}
@@ -447,7 +454,8 @@ async fn float_round() {
447454

448455
// Read
449456
let mut total_time = 0.0;
450-
for group in CompressedQueryStats::load(db, &[database_id]).await.unwrap() {
457+
let filter = Filter { database_id: vec![database_id], ..Filter::default() };
458+
for group in CompressedQueryStats::load(db, filter).await.unwrap() {
451459
for stat in group.decompress().unwrap() {
452460
total_time += stat.total_time;
453461
}
@@ -484,6 +492,7 @@ async fn boolean() {
484492
CompressedQueryStats::store(db, stats.clone()).await.unwrap();
485493

486494
// Read
487-
let group = CompressedQueryStats::load(db, &[database_id]).await.unwrap().remove(0);
495+
let filter = Filter { database_id: vec![database_id], ..Filter::default() };
496+
let group = CompressedQueryStats::load(db, filter).await.unwrap().remove(0);
488497
assert_eq!(stats, group.decompress().unwrap());
489498
}

0 commit comments

Comments
 (0)