Skip to content

Commit 1b9b20a

Browse files
slvrtrnloyd
andauthored
feat: Query::fetch_bytes (#182)
Co-authored-by: Paul Loyd <pavelko95@gmail.com>
1 parent 60cfcab commit 1b9b20a

File tree

23 files changed

+892
-303
lines changed

23 files changed

+892
-303
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ jobs:
3232
- run: rustup toolchain install ${{ env.MSRV }} --profile minimal
3333
- run: rustup override set ${{ env.MSRV }}
3434
- run: rustup show active-toolchain -v
35+
- run: cargo update -p native-tls --precise 0.2.13 # 0.2.14 requires rustc 1.80
36+
- run: cargo update -p litemap --precise 0.7.4 # 0.7.5 requires rustc 1.81
37+
- run: cargo update -p zerofrom --precise 0.1.5 # 0.1.6 requires rustc 1.81
3538
- run: cargo build
3639
- run: cargo build --no-default-features
3740
- run: cargo build --features uuid,time,chrono

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ license = "MIT OR Apache-2.0"
1010
readme = "README.md"
1111
edition = "2021"
1212
# update `derive/Cargo.toml` and CI if changed
13+
# TODO: after bumping to v1.80, remove `--precise` in the "msrv" CI job
1314
rust-version = "1.73.0"
1415

1516
[lints.rust]
@@ -70,6 +71,8 @@ uuid = ["dep:uuid"]
7071
time = ["dep:time"]
7172
lz4 = ["dep:lz4_flex", "dep:cityhash-rs"]
7273
chrono = ["dep:chrono"]
74+
futures03 = []
75+
7376
## TLS
7477
native-tls = ["dep:hyper-tls"]
7578
# ext: native-tls-alpn

benches/README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ Then upload the `perf.script` file to [Firefox Profiler](https://profiler.firefo
2323

2424
These benchmarks are run against a real ClickHouse server, so it must be started:
2525
```bash
26-
docker run -d -p 8123:8123 -p 9000:9000 --name ch clickhouse/clickhouse-server
27-
26+
docker compose up -d
2827
cargo bench --bench <case>
2928
```
3029

benches/select.rs

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use hyper::{
1414
};
1515
use serde::Deserialize;
1616

17-
use clickhouse::{error::Result, Client, Compression, Row};
17+
use clickhouse::{
18+
error::{Error, Result},
19+
Client, Compression, Row,
20+
};
1821

1922
mod common;
2023

@@ -52,36 +55,80 @@ fn select(c: &mut Criterion) {
5255
let _server = common::start_server(addr, move |req| serve(req, chunk.clone()));
5356
let runner = common::start_runner();
5457

55-
#[allow(dead_code)]
56-
#[derive(Debug, Row, Deserialize)]
58+
#[derive(Default, Debug, Row, Deserialize)]
5759
struct SomeRow {
5860
a: u64,
5961
b: i64,
6062
c: i32,
6163
d: u32,
6264
}
6365

64-
async fn run(client: Client, iters: u64) -> Result<Duration> {
66+
async fn select_rows(client: Client, iters: u64) -> Result<Duration> {
67+
let mut sum = SomeRow::default();
6568
let start = Instant::now();
6669
let mut cursor = client
6770
.query("SELECT ?fields FROM some")
6871
.fetch::<SomeRow>()?;
6972

7073
for _ in 0..iters {
71-
black_box(cursor.next().await?);
74+
let Some(row) = cursor.next().await? else {
75+
return Err(Error::NotEnoughData);
76+
};
77+
sum.a = sum.a.wrapping_add(row.a);
78+
sum.b = sum.b.wrapping_add(row.b);
79+
sum.c = sum.c.wrapping_add(row.c);
80+
sum.d = sum.d.wrapping_add(row.d);
7281
}
7382

83+
black_box(sum);
7484
Ok(start.elapsed())
7585
}
7686

77-
let mut group = c.benchmark_group("select");
87+
async fn select_bytes(client: Client, min_size: u64) -> Result<Duration> {
88+
let start = Instant::now();
89+
let mut cursor = client
90+
.query("SELECT value FROM some")
91+
.fetch_bytes("RowBinary")?;
92+
93+
let mut size = 0;
94+
while size < min_size {
95+
let buf = black_box(cursor.next().await?);
96+
size += buf.unwrap().len() as u64;
97+
}
98+
99+
Ok(start.elapsed())
100+
}
101+
102+
let mut group = c.benchmark_group("rows");
78103
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
79-
group.bench_function("no compression", |b| {
104+
group.bench_function("uncompressed", |b| {
105+
b.iter_custom(|iters| {
106+
let client = Client::default()
107+
.with_url(format!("http://{addr}"))
108+
.with_compression(Compression::None);
109+
runner.run(select_rows(client, iters))
110+
})
111+
});
112+
#[cfg(feature = "lz4")]
113+
group.bench_function("lz4", |b| {
114+
b.iter_custom(|iters| {
115+
let client = Client::default()
116+
.with_url(format!("http://{addr}"))
117+
.with_compression(Compression::Lz4);
118+
runner.run(select_rows(client, iters))
119+
})
120+
});
121+
group.finish();
122+
123+
const MIB: u64 = 1024 * 1024;
124+
let mut group = c.benchmark_group("mbytes");
125+
group.throughput(Throughput::Bytes(MIB));
126+
group.bench_function("uncompressed", |b| {
80127
b.iter_custom(|iters| {
81128
let client = Client::default()
82129
.with_url(format!("http://{addr}"))
83130
.with_compression(Compression::None);
84-
runner.run(run(client, iters))
131+
runner.run(select_bytes(client, iters * MIB))
85132
})
86133
});
87134
#[cfg(feature = "lz4")]
@@ -90,7 +137,7 @@ fn select(c: &mut Criterion) {
90137
let client = Client::default()
91138
.with_url(format!("http://{addr}"))
92139
.with_compression(Compression::Lz4);
93-
runner.run(run(client, iters))
140+
runner.run(select_bytes(client, iters * MIB))
94141
})
95142
});
96143
group.finish();

examples/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ If something is missing, or you found a mistake in one of these examples, please
2828
- [custom_http_headers.rs](custom_http_headers.rs) - setting additional HTTP headers to the client, or overriding the generated ones
2929
- [query_id.rs](query_id.rs) - setting a specific `query_id` on the query level
3030
- [session_id.rs](session_id.rs) - using the client in the session context with temporary tables
31+
- [stream_into_file.rs](stream_into_file.rs) - streaming the query result as raw bytes into a file in an arbitrary format. Required cargo features: `futures03`.
32+
- [stream_arbitrary_format_rows.rs](stream_arbitrary_format_rows.rs) - streaming the query result in an arbitrary format, row by row. Required cargo features: `futures03`.
3133

3234
## How to run
3335

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use tokio::io::AsyncBufReadExt;
2+
3+
use clickhouse::Client;
4+
5+
/// An example of streaming raw data in an arbitrary format leveraging the
6+
/// [`AsyncBufReadExt`] helpers. In this case, the format is `JSONEachRow`.
7+
/// Incoming data is then split into lines, and each line is deserialized into
8+
/// `serde_json::Value`, a dynamic representation of JSON values.
9+
///
10+
/// Similarly, it can be used with other formats such as CSV, TSV, and others
11+
/// that produce each row on a new line; the only difference will be in how the
12+
/// data is parsed. See also: https://clickhouse.com/docs/en/interfaces/formats
13+
///
14+
/// Note: `lines()` produces a new `String` for each line, so it's not the
15+
/// most performant way to interate over lines.
16+
#[tokio::main]
17+
async fn main() {
18+
let client = Client::default().with_url("http://localhost:8123");
19+
let mut lines = client
20+
.query(
21+
"SELECT number, hex(randomPrintableASCII(20)) AS hex_str
22+
FROM system.numbers
23+
LIMIT 100",
24+
)
25+
.fetch_bytes("JSONEachRow")
26+
.unwrap()
27+
.lines();
28+
29+
while let Some(line) = lines.next_line().await.unwrap() {
30+
let value: serde_json::Value = serde_json::de::from_str(&line).unwrap();
31+
println!("JSONEachRow value: {}", value);
32+
}
33+
}

examples/stream_into_file.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use clickhouse::{query::BytesCursor, Client};
2+
use std::time::Instant;
3+
use tokio::{fs::File, io::AsyncWriteExt};
4+
5+
// Examples of streaming the result of a query in an arbitrary format into a
6+
// file. In this case, `CSVWithNamesAndTypes` format is used.
7+
// Check also other formats in https://clickhouse.com/docs/en/interfaces/formats.
8+
//
9+
// Note: there is no need to wrap `File` into `BufWriter` because `BytesCursor`
10+
// is buffered internally already and produces chunks of data.
11+
12+
const NUMBERS: u32 = 100_000;
13+
14+
fn query(numbers: u32) -> BytesCursor {
15+
let client = Client::default().with_url("http://localhost:8123");
16+
17+
client
18+
.query(
19+
"SELECT number, hex(randomPrintableASCII(20)) AS hex_str
20+
FROM system.numbers
21+
LIMIT {limit: Int32}",
22+
)
23+
.param("limit", numbers)
24+
.fetch_bytes("CSVWithNamesAndTypes")
25+
.unwrap()
26+
}
27+
28+
// Pattern 1: use the `tokio::io::copy_buf` helper.
29+
//
30+
// It shows integration with `tokio::io::AsyncBufWriteExt` trait.
31+
async fn tokio_copy_buf(filename: &str) {
32+
let mut cursor = query(NUMBERS);
33+
let mut file = File::create(filename).await.unwrap();
34+
tokio::io::copy_buf(&mut cursor, &mut file).await.unwrap();
35+
}
36+
37+
// Pattern 2: use `BytesCursor::next()`.
38+
async fn cursor_next(filename: &str) {
39+
let mut cursor = query(NUMBERS);
40+
let mut file = File::create(filename).await.unwrap();
41+
42+
while let Some(bytes) = cursor.next().await.unwrap() {
43+
file.write_all(&bytes).await.unwrap();
44+
println!("chunk of {}B written to {filename}", bytes.len());
45+
}
46+
}
47+
48+
// Pattern 3: use the `futures::(Try)StreamExt` traits.
49+
#[cfg(feature = "futures03")]
50+
async fn futures03_stream(filename: &str) {
51+
use futures::TryStreamExt;
52+
53+
let mut cursor = query(NUMBERS);
54+
let mut file = File::create(filename).await.unwrap();
55+
56+
while let Some(bytes) = cursor.try_next().await.unwrap() {
57+
file.write_all(&bytes).await.unwrap();
58+
println!("chunk of {}B written to {filename}", bytes.len());
59+
}
60+
}
61+
62+
#[tokio::main]
63+
async fn main() {
64+
let start = Instant::now();
65+
tokio_copy_buf("output-1.csv").await;
66+
println!("written to output-1.csv in {:?}", start.elapsed());
67+
68+
let start = Instant::now();
69+
cursor_next("output-2.csv").await;
70+
println!("written to output-2.csv in {:?}", start.elapsed());
71+
72+
#[cfg(feature = "futures03")]
73+
{
74+
let start = Instant::now();
75+
futures03_stream("output-3.csv").await;
76+
println!("written to output-3.csv in {:?}", start.elapsed());
77+
}
78+
}

src/bytes_ext.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ impl BytesExt {
1717
self.bytes.len() - self.cursor
1818
}
1919

20+
#[inline(always)]
21+
pub(crate) fn is_empty(&self) -> bool {
22+
debug_assert!(self.cursor <= self.bytes.len());
23+
self.cursor >= self.bytes.len()
24+
}
25+
2026
#[inline(always)]
2127
pub(crate) fn set_remaining(&mut self, n: usize) {
2228
// We can use `bytes.advance()` here, but it's slower.
@@ -26,13 +32,15 @@ impl BytesExt {
2632
#[cfg(any(test, feature = "lz4", feature = "watch"))]
2733
#[inline(always)]
2834
pub(crate) fn advance(&mut self, n: usize) {
35+
debug_assert!(n <= self.remaining());
36+
2937
// We can use `bytes.advance()` here, but it's slower.
3038
self.cursor += n;
3139
}
3240

3341
#[inline(always)]
3442
pub(crate) fn extend(&mut self, chunk: Bytes) {
35-
if self.cursor == self.bytes.len() {
43+
if self.is_empty() {
3644
// Most of the time, we read the next chunk after consuming the previous one.
3745
self.bytes = chunk;
3846
self.cursor = 0;

0 commit comments

Comments
 (0)