Skip to content

Commit 8603a99

Browse files
committed
Support storing a String for every row
1 parent 8c7daac commit 8603a99

11 files changed

+394
-328
lines changed

Cargo.lock

Lines changed: 57 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
3030
chrono = { version = "0.4", features = ["serde"] }
3131
dotenvy = "0.15"
3232
macrotest = "1.1"
33+
serde-brief = { version = "0.2", features = ["alloc"] }
3334
serial_test = "3.2"
3435
tokio = { version = "1.43", features = ["full"] }
36+
zeekstd = "0.6"
3537

3638
[[bench]]
3739
name = "bucket_size"

README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,14 @@ pco_store extends that with support for:
1717
- `chrono::DateTime`, `std::time::SystemTime`
1818
- `bool`
1919
- `uuid::Uuid` (only as a `group_by` field)
20-
- `String` (only as a `group_by` field)
20+
- `String`
21+
22+
For binary types like `String`, additional dependencies are needed:
23+
24+
```toml
25+
serde-brief = { version = "0.2", features = ["alloc"] }
26+
zeekstd = "0.6"
27+
```
2128

2229
## Performance
2330

src/lib.rs

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,9 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
128128
for field in model.fields.iter() {
129129
let ident = field.ident.clone().unwrap();
130130
let ty_original = field.ty.clone();
131+
let ty_original_s = quote! { #ty_original }.to_string();
131132
let mut ty = field.ty.clone();
132-
let round_float_field = float_round.is_some() && quote! { #ty }.to_string().starts_with("f");
133+
let round_float_field = float_round.is_some() && ty_original_s.starts_with("f");
133134
if group_by.iter().any(|i| *i == ident) {
134135
decompressed_fields.push(quote! { #ident: self.#ident.clone(), });
135136
} else {
@@ -139,14 +140,29 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
139140
if round_float_field {
140141
ty = Type::Verbatim(quote! { i64 });
141142
}
142-
if quote! { #ty_original }.to_string() == "bool" {
143+
if ty_original_s == "bool" {
143144
ty = Type::Verbatim(quote! { u16 });
144145
}
146+
let decompress_field = if ty_original_s == "String" {
147+
quote! {
148+
{
149+
// let input = zeekstd::BytesWrapper::new(&self.#ident);
150+
// let mut decoder = zeekstd::Decoder::new(input).context("zeekstd::Decoder")?;
151+
let cursor = std::io::Cursor::new(&self.#ident);
152+
let mut decoder = zeekstd::Decoder::new(cursor).context("zeekstd::Decoder")?;
153+
let mut bytes = Vec::new();
154+
std::io::copy(&mut decoder, &mut bytes).context("io::copy")?;
155+
serde_brief::from_slice(&bytes).context("serde_brief::from_slice")?
156+
}
157+
}
158+
} else {
159+
quote! { ::pco::standalone::simple_decompress(&self.#ident)? }
160+
};
145161
decompress_fields.push(quote! {
146162
let #ident: Vec<#ty> = if self.#ident.is_empty() {
147163
Vec::new()
148164
} else {
149-
::pco::standalone::simple_decompress(&self.#ident)?
165+
#decompress_field
150166
};
151167
});
152168
compressed_field_sizes.push(quote! { #ident.len(), });
@@ -167,7 +183,7 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
167183
decompressed_fields.push(quote! {
168184
#ident: #ident.get(index).cloned().unwrap_or_default() as #ty_original / #float_round as #ty_original,
169185
});
170-
} else if quote! { #ty_original }.to_string() == "bool" {
186+
} else if ty_original_s == "bool" {
171187
decompressed_fields.push(quote! {
172188
#ident: #ident.get(index).cloned().unwrap_or_default() == 1,
173189
});
@@ -190,12 +206,13 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
190206
for field in model.fields.iter() {
191207
let ident = field.ident.clone().unwrap();
192208
let ty_original = field.ty.clone();
209+
let ty_original_s = quote! { #ty_original }.to_string();
193210
let mut ty = field.ty.clone();
194-
let round_float_field = float_round.is_some() && quote! { #ty }.to_string().starts_with("f");
211+
let round_float_field = float_round.is_some() && ty_original_s.starts_with("f");
195212
if round_float_field {
196213
ty = Type::Verbatim(quote! { i64 });
197214
}
198-
if quote! { #ty_original }.to_string() == "bool" {
215+
if ty_original_s == "bool" {
199216
ty = Type::Verbatim(quote! { u16 });
200217
}
201218
if group_by.iter().any(|i| *i == ident) {
@@ -219,16 +236,35 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
219236
store_types.push(Ident::new("BYTEA", Span::call_site()));
220237
let expr = if round_float_field {
221238
quote! { (r.#ident * #float_round as #ty_original).round() as i64 }
222-
} else if quote! { #ty_original }.to_string() == "bool" {
239+
} else if ty_original_s == "bool" {
223240
quote! { r.#ident as u16 }
241+
} else if ty_original_s == "String" {
242+
// TODO: needless clone here since we're consuming the rows
243+
// That could be avoided by adding a `for row in rows` loop that inserts into each Vec
244+
quote! { r.#ident.clone() }
224245
} else {
225246
quote! { r.#ident }
226247
};
227-
store_values.push(quote! {
228-
&::pco::standalone::simpler_compress(
229-
&rows.iter().map(|r| #expr).collect::<Vec<_>>(), ::pco::DEFAULT_COMPRESSION_LEVEL
230-
).unwrap(),
231-
});
248+
let data = quote! { rows.iter().map(|r| #expr).collect::<Vec<_>>() };
249+
if ty_original_s == "String" {
250+
store_values.push(quote! {
251+
&{
252+
use std::io::Write;
253+
let bytes = serde_brief::to_vec(&#data).context("serde_brief::to_slice")?;
254+
let mut compressed_bytes = Vec::new();
255+
let mut encoder = zeekstd::Encoder::new(&mut compressed_bytes).context("zeekstd::Encoder")?;
256+
encoder.write_all(&bytes).context("encoder.write_all")?;
257+
encoder.finish().context("encoder.finish")?;
258+
compressed_bytes
259+
},
260+
});
261+
} else {
262+
store_values.push(quote! {
263+
&::pco::standalone::simpler_compress(
264+
&#data, ::pco::DEFAULT_COMPRESSION_LEVEL
265+
)?,
266+
});
267+
}
232268
}
233269
}
234270
let store_fields = store_fields.join(", ");
@@ -306,6 +342,7 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
306342

307343
/// Decompresses a group of data points.
308344
pub fn decompress(self) -> anyhow::Result<Vec<#name>> {
345+
use anyhow::Context;
309346
let mut results = Vec::new();
310347
#decompress_fields
311348
let len = [#compressed_field_sizes].into_iter().max().unwrap_or(0);
@@ -320,6 +357,7 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
320357

321358
/// Writes the data to disk.
322359
pub async fn store(db: &impl ::std::ops::Deref<Target = deadpool_postgres::ClientWrapper>, rows: Vec<#name>) -> anyhow::Result<()> {
360+
use anyhow::Context;
323361
if rows.is_empty() {
324362
return Ok(());
325363
}
@@ -353,6 +391,7 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
353391
F: Fn(&#name) -> R,
354392
R: Eq + std::hash::Hash,
355393
{
394+
use anyhow::Context;
356395
if rows.is_empty() {
357396
return Ok(());
358397
}

tests/expand/boolean.expanded.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ impl CompressedQueryStats {
6464
}
6565
/// Decompresses a group of data points.
6666
pub fn decompress(self) -> anyhow::Result<Vec<QueryStat>> {
67+
use anyhow::Context;
6768
let mut results = Vec::new();
6869
let toplevel: Vec<u16> = if self.toplevel.is_empty() {
6970
Vec::new()
@@ -93,6 +94,7 @@ impl CompressedQueryStats {
9394
db: &impl ::std::ops::Deref<Target = deadpool_postgres::ClientWrapper>,
9495
rows: Vec<QueryStat>,
9596
) -> anyhow::Result<()> {
97+
use anyhow::Context;
9698
if rows.is_empty() {
9799
return Ok(());
98100
}
@@ -120,15 +122,13 @@ impl CompressedQueryStats {
120122
&[
121123
&rows[0].database_id,
122124
&::pco::standalone::simpler_compress(
123-
&rows.iter().map(|r| r.toplevel as u16).collect::<Vec<_>>(),
124-
::pco::DEFAULT_COMPRESSION_LEVEL,
125-
)
126-
.unwrap(),
125+
&rows.iter().map(|r| r.toplevel as u16).collect::<Vec<_>>(),
126+
::pco::DEFAULT_COMPRESSION_LEVEL,
127+
)?,
127128
&::pco::standalone::simpler_compress(
128-
&rows.iter().map(|r| r.calls).collect::<Vec<_>>(),
129-
::pco::DEFAULT_COMPRESSION_LEVEL,
130-
)
131-
.unwrap(),
129+
&rows.iter().map(|r| r.calls).collect::<Vec<_>>(),
130+
::pco::DEFAULT_COMPRESSION_LEVEL,
131+
)?,
132132
],
133133
)
134134
.await?;
@@ -149,6 +149,7 @@ impl CompressedQueryStats {
149149
F: Fn(&QueryStat) -> R,
150150
R: Eq + std::hash::Hash,
151151
{
152+
use anyhow::Context;
152153
if rows.is_empty() {
153154
return Ok(());
154155
}
@@ -179,15 +180,13 @@ impl CompressedQueryStats {
179180
&[
180181
&rows[0].database_id,
181182
&::pco::standalone::simpler_compress(
182-
&rows.iter().map(|r| r.toplevel as u16).collect::<Vec<_>>(),
183-
::pco::DEFAULT_COMPRESSION_LEVEL,
184-
)
185-
.unwrap(),
183+
&rows.iter().map(|r| r.toplevel as u16).collect::<Vec<_>>(),
184+
::pco::DEFAULT_COMPRESSION_LEVEL,
185+
)?,
186186
&::pco::standalone::simpler_compress(
187-
&rows.iter().map(|r| r.calls).collect::<Vec<_>>(),
188-
::pco::DEFAULT_COMPRESSION_LEVEL,
189-
)
190-
.unwrap(),
187+
&rows.iter().map(|r| r.calls).collect::<Vec<_>>(),
188+
::pco::DEFAULT_COMPRESSION_LEVEL,
189+
)?,
191190
],
192191
)
193192
.await?;

0 commit comments

Comments
 (0)