Skip to content

Commit 8ac281f

Browse files
authored
Support filtering by any fields (#28)
1 parent 7c502e4 commit 8ac281f

File tree

13 files changed

+5471
-223
lines changed

13 files changed

+5471
-223
lines changed

Cargo.lock

Lines changed: 241 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ futures = "0.3"
2020
pco = "0.4"
2121
proc-macro2 = "1.0"
2222
quote = "1.0"
23+
serde = { version = "1.0", features = ["derive"] }
24+
serde_json = { version = "1.0" }
25+
serde_with = "3.16"
2326
syn = "2.0"
2427
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
2528

2629
[dev-dependencies]
27-
chrono = "0.4"
30+
chrono = { version = "0.4", features = ["serde"] }
2831
dotenvy = "0.15"
2932
macrotest = "1.1"
3033
serial_test = "3.2"

README.md

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ To see the generated code, look in [tests/expand](tests/expand) or run `cargo ex
1212
## Supported data types
1313

1414
- pco supports `u16`, `u32`, `u64`, `i16`, `i32`, `i64`, `f16`, `f32`, `f64`
15-
- pco_store adds support for `SystemTime`, `bool`
15+
- pco_store adds support for `chrono::DateTime`, `std::time::SystemTime`, `bool`
1616

1717
## Performance
1818

@@ -79,20 +79,22 @@ The stats can be:
7979
```rs
8080
async fn example() -> anyhow::Result<()> {
8181
let database_id = 1;
82+
let granularity = 60;
8283
let start = SystemTime::UNIX_EPOCH;
8384
let end = SystemTime::now();
8485
let db = &DB_POOL.get().await?;
8586

8687
// Write
87-
let default = QueryStat { database_id, granularity: 60, collected_at: end, fingerprint: 1, calls: 1 };
88+
let default = QueryStat { database_id, granularity, collected_at: end, fingerprint: 1, calls: 1 };
8889
let stats = vec![QueryStat { collected_at: end - Duration::from_secs(120), ..default }];
8990
CompressedQueryStats::store(db, stats).await?;
9091
let stats = vec![QueryStat { collected_at: end - Duration::from_secs(60), ..default }];
9192
CompressedQueryStats::store(db, stats).await?;
9293

9394
// Read
9495
let mut calls = 0;
95-
for group in CompressedQueryStats::load(db, &[database_id], start, end).await? {
96+
let filter = Filter::new(&[database_id], &[granularity], start..=end);
97+
for group in CompressedQueryStats::load(db, filter.clone()).await? {
9698
for stat in group.decompress()? {
9799
calls += stat.calls;
98100
}
@@ -104,7 +106,7 @@ async fn example() -> anyhow::Result<()> {
104106
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
105107
transaction!(db, {
106108
let mut stats = Vec::new();
107-
for group in CompressedQueryStats::delete(db, &[database_id], start, end).await? {
109+
for group in CompressedQueryStats::delete(db, filter.clone()).await? {
108110
stats.extend(group.decompress()?);
109111
}
110112
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
@@ -115,7 +117,7 @@ async fn example() -> anyhow::Result<()> {
115117
.await?;
116118
});
117119
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
118-
let group = CompressedQueryStats::load(db, &[database_id], start, end).await?.remove(0);
120+
let group = CompressedQueryStats::load(db, filter).await?.remove(0);
119121
assert_eq!(group.start_at, end - Duration::from_secs(120));
120122
assert_eq!(group.end_at, end - Duration::from_secs(60));
121123
let stats = group.decompress()?;
@@ -160,6 +162,33 @@ pub use transaction;
160162

161163
Additional examples can be found in [tests/tests.rs](tests/tests.rs).
162164

165+
## Filtering
166+
167+
pco_store generates a `Filter` struct to specify read-time filters. Required fields from `group_by` and `fingerprint` will be filtered in SQL before the data is decompressed, but other fields can be filtered after decompression but before the data is returned to the caller as an optimization to avoid pointless allocations.
168+
169+
Timestamps are accepted as an inclusive range (with precision automatically truncated to microseconds), and all other fields are accepted as an array to check for inclusion in that array.
170+
171+
### Creating a filter
172+
173+
- `Filter::new()` is a shorthand to set the required fields from `group_by` and `timestamp`
174+
- Optional filters can be set as fields on the struct: `filter.fingerprint = vec![1]`
175+
- Struct literal syntax can also be used: `Filter { fingerprint: vec![1], ..Filter::default() }`
176+
177+
### Filter deserialization using serde
178+
179+
Non-timestamp fields can be passed either as an array, or as a single value which is automatically wrapped in an array.
180+
181+
Timestamps support multiple formats:
182+
- `["ts1", "ts2"]`: an array with two timestamps becomes an inclusive range `ts1..=ts2`
183+
- `["ts1"]`: an array with a single timestamp becomes an inclusive range `ts1..=ts1`
184+
- `"ts1"`: a single timestamp becomes an inclusive range `ts1..=ts1`
185+
186+
### Filter convenience functions
187+
188+
- `range_bounds` returns the time range lower and upper bounds
189+
- `range_duration` returns the duration of the filter's time range
190+
- `range_shift` mutably shifts the time range's start and end by a certain amount, e.g. to filter for "today, 7 days ago"
191+
163192
## Contributions are welcome to
164193

165194
- support decompression of only the fields requested at runtime

benches/comparison/pco_store.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ pub async fn load() -> Result<()> {
9090
let db = &DB_POOL.get().await.unwrap();
9191
let database_ids: Vec<i64> = db.query_one("SELECT array_agg(DISTINCT database_id) FROM comparison_pco_stores", &[]).await?.get(0);
9292
let mut stats = Vec::new();
93-
for group in CompressedQueryStats::load(db, &database_ids, SystemTime::UNIX_EPOCH, SystemTime::now()).await? {
93+
let filter = Filter::new(&database_ids, SystemTime::UNIX_EPOCH..=SystemTime::now());
94+
for group in CompressedQueryStats::load(db, filter).await? {
9495
for stat in group.decompress()? {
9596
stats.push(stat);
9697
}

0 commit comments

Comments
 (0)