Skip to content

Commit d3dbe48

Browse files
authored
refactor!: change PgWireServerHandlers to use impl trait return value (#269)
* refactor: use impl trait for PgWireServerHandlers Signed-off-by: Ning Sun <sunning@greptime.com> * refactor: use impl trait in examples * feat: send notice for noop query implementation * test: update integration test server * fix: duckdb and sqlite examples --------- Signed-off-by: Ning Sun <sunning@greptime.com>
1 parent 111abaa commit d3dbe48

16 files changed

Lines changed: 219 additions & 532 deletions

File tree

examples/bench.rs

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,16 @@ use std::sync::Arc;
33
use async_trait::async_trait;
44
use futures::stream;
55
use futures::StreamExt;
6-
use pgwire::api::cancel::NoopCancelHandler;
7-
use pgwire::api::NoopErrorHandler;
8-
use pgwire::api::PgWireServerHandlers;
96
use tokio::net::TcpListener;
107

11-
use pgwire::api::auth::noop::NoopStartupHandler;
12-
use pgwire::api::copy::NoopCopyHandler;
13-
use pgwire::api::query::{PlaceholderExtendedQueryHandler, SimpleQueryHandler};
8+
use pgwire::api::query::SimpleQueryHandler;
149
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response};
15-
use pgwire::api::{ClientInfo, Type};
10+
use pgwire::api::{ClientInfo, PgWireServerHandlers, Type};
1611
use pgwire::error::PgWireResult;
1712
use pgwire::tokio::process_socket;
1813

1914
pub struct DummyProcessor;
2015

21-
impl NoopStartupHandler for DummyProcessor {}
22-
2316
#[async_trait]
2417
impl SimpleQueryHandler for DummyProcessor {
2518
async fn do_query<'a, C>(
@@ -76,36 +69,9 @@ struct DummyProcessorFactory {
7669
}
7770

7871
impl PgWireServerHandlers for DummyProcessorFactory {
79-
type StartupHandler = DummyProcessor;
80-
type SimpleQueryHandler = DummyProcessor;
81-
type ExtendedQueryHandler = PlaceholderExtendedQueryHandler;
82-
type CopyHandler = NoopCopyHandler;
83-
type CancelHandler = NoopCancelHandler;
84-
type ErrorHandler = NoopErrorHandler;
85-
86-
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
87-
self.handler.clone()
88-
}
89-
90-
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
91-
Arc::new(PlaceholderExtendedQueryHandler)
92-
}
93-
94-
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
72+
fn simple_query_handler(&self) -> Arc<impl SimpleQueryHandler> {
9573
self.handler.clone()
9674
}
97-
98-
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
99-
Arc::new(NoopCopyHandler)
100-
}
101-
102-
fn cancel_handler(&self) -> Arc<Self::CancelHandler> {
103-
Arc::new(NoopCancelHandler)
104-
}
105-
106-
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
107-
Arc::new(NoopErrorHandler)
108-
}
10975
}
11076

11177
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]

examples/common/mod.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::sync::Arc;
2+
3+
use async_trait::async_trait;
4+
use futures::{stream, StreamExt};
5+
6+
use pgwire::api::auth::noop::NoopStartupHandler;
7+
use pgwire::api::query::SimpleQueryHandler;
8+
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response, Tag};
9+
use pgwire::api::{ClientInfo, Type};
10+
use pgwire::error::PgWireResult;
11+
12+
pub struct DummyProcessor;
13+
14+
impl NoopStartupHandler for DummyProcessor {}
15+
16+
#[async_trait]
17+
impl SimpleQueryHandler for DummyProcessor {
18+
async fn do_query<'a, C>(&self, _client: &mut C, query: &str) -> PgWireResult<Vec<Response<'a>>>
19+
where
20+
C: ClientInfo + Unpin + Send + Sync,
21+
{
22+
println!("{:?}", query);
23+
if query.starts_with("SELECT") {
24+
let f1 = FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text);
25+
let f2 = FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text);
26+
let schema = Arc::new(vec![f1, f2]);
27+
28+
let data = vec![
29+
(Some(0), Some("Tom")),
30+
(Some(1), Some("Jerry")),
31+
(Some(2), None),
32+
];
33+
let schema_ref = schema.clone();
34+
let data_row_stream = stream::iter(data.into_iter()).map(move |r| {
35+
let mut encoder = DataRowEncoder::new(schema_ref.clone());
36+
encoder.encode_field(&r.0)?;
37+
encoder.encode_field(&r.1)?;
38+
39+
encoder.finish()
40+
});
41+
42+
Ok(vec![Response::Query(QueryResponse::new(
43+
schema,
44+
data_row_stream,
45+
))])
46+
} else {
47+
Ok(vec![Response::Execution(Tag::new("OK").with_rows(1))])
48+
}
49+
}
50+
}
51+
52+
pub struct DummyProcessorFactory {
53+
pub handler: Arc<DummyProcessor>,
54+
}
55+
56+
impl DummyProcessorFactory {
57+
pub fn new() -> DummyProcessorFactory {
58+
Self {
59+
handler: Arc::new(DummyProcessor),
60+
}
61+
}
62+
}

examples/copy.rs

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,12 @@ use std::sync::Arc;
33

44
use async_trait::async_trait;
55
use futures::{Sink, SinkExt};
6-
use pgwire::api::cancel::NoopCancelHandler;
76
use tokio::net::TcpListener;
87

9-
use pgwire::api::auth::noop::NoopStartupHandler;
108
use pgwire::api::copy::CopyHandler;
11-
use pgwire::api::query::{PlaceholderExtendedQueryHandler, SimpleQueryHandler};
9+
use pgwire::api::query::SimpleQueryHandler;
1210
use pgwire::api::results::{CopyResponse, Response};
13-
use pgwire::api::{ClientInfo, NoopErrorHandler, PgWireConnectionState, PgWireServerHandlers};
11+
use pgwire::api::{ClientInfo, PgWireConnectionState, PgWireServerHandlers};
1412
use pgwire::error::ErrorInfo;
1513
use pgwire::error::{PgWireError, PgWireResult};
1614
use pgwire::messages::copy::{CopyData, CopyDone, CopyFail};
@@ -20,8 +18,6 @@ use pgwire::tokio::process_socket;
2018

2119
pub struct DummyProcessor;
2220

23-
impl NoopStartupHandler for DummyProcessor {}
24-
2521
#[async_trait]
2622
impl SimpleQueryHandler for DummyProcessor {
2723
async fn do_query<'a, C>(&self, client: &mut C, query: &str) -> PgWireResult<Vec<Response<'a>>>
@@ -104,36 +100,13 @@ struct DummyProcessorFactory {
104100
}
105101

106102
impl PgWireServerHandlers for DummyProcessorFactory {
107-
type StartupHandler = DummyProcessor;
108-
type SimpleQueryHandler = DummyProcessor;
109-
type ExtendedQueryHandler = PlaceholderExtendedQueryHandler;
110-
type CopyHandler = DummyProcessor;
111-
type ErrorHandler = NoopErrorHandler;
112-
type CancelHandler = NoopCancelHandler;
113-
114-
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
115-
self.handler.clone()
116-
}
117-
118-
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
119-
Arc::new(PlaceholderExtendedQueryHandler)
120-
}
121-
122-
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
103+
fn simple_query_handler(&self) -> Arc<impl SimpleQueryHandler> {
123104
self.handler.clone()
124105
}
125106

126-
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
107+
fn copy_handler(&self) -> Arc<impl CopyHandler> {
127108
self.handler.clone()
128109
}
129-
130-
fn cancel_handler(&self) -> Arc<Self::CancelHandler> {
131-
Arc::new(NoopCancelHandler)
132-
}
133-
134-
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
135-
Arc::new(NoopErrorHandler)
136-
}
137110
}
138111

139112
#[tokio::main]

examples/duckdb.rs

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ use async_trait::async_trait;
77
use duckdb::{params, Connection, Statement, ToSql};
88
use futures::stream;
99
use pgwire::api::auth::md5pass::{hash_md5_password, Md5PasswordAuthStartupHandler};
10-
use pgwire::api::auth::{AuthSource, DefaultServerParameterProvider, LoginInfo, Password};
11-
use pgwire::api::cancel::NoopCancelHandler;
12-
use pgwire::api::copy::NoopCopyHandler;
10+
use pgwire::api::auth::{
11+
AuthSource, DefaultServerParameterProvider, LoginInfo, Password, StartupHandler,
12+
};
1313
use pgwire::api::portal::{Format, Portal};
1414
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
1515
use pgwire::api::results::{
1616
DescribePortalResponse, DescribeStatementResponse, FieldInfo, QueryResponse, Response, Tag,
1717
};
1818
use pgwire::api::stmt::{NoopQueryParser, StoredStatement};
19-
use pgwire::api::{ClientInfo, NoopErrorHandler, PgWireServerHandlers, Type};
19+
use pgwire::api::{ClientInfo, PgWireServerHandlers, Type};
2020
use pgwire::error::{PgWireError, PgWireResult};
2121
use pgwire::tokio::process_socket;
2222
use tokio::net::TcpListener;
@@ -245,40 +245,20 @@ struct DuckDBBackendFactory {
245245
}
246246

247247
impl PgWireServerHandlers for DuckDBBackendFactory {
248-
type StartupHandler =
249-
Md5PasswordAuthStartupHandler<DummyAuthSource, DefaultServerParameterProvider>;
250-
type SimpleQueryHandler = DuckDBBackend;
251-
type ExtendedQueryHandler = DuckDBBackend;
252-
type CopyHandler = NoopCopyHandler;
253-
type CancelHandler = NoopCancelHandler;
254-
type ErrorHandler = NoopErrorHandler;
255-
256-
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
248+
fn simple_query_handler(&self) -> Arc<impl SimpleQueryHandler> {
257249
self.handler.clone()
258250
}
259251

260-
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
252+
fn extended_query_handler(&self) -> Arc<impl ExtendedQueryHandler> {
261253
self.handler.clone()
262254
}
263255

264-
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
256+
fn startup_handler(&self) -> Arc<impl StartupHandler> {
265257
Arc::new(Md5PasswordAuthStartupHandler::new(
266258
Arc::new(DummyAuthSource),
267259
Arc::new(DefaultServerParameterProvider::default()),
268260
))
269261
}
270-
271-
fn cancel_handler(&self) -> Arc<Self::CancelHandler> {
272-
Arc::new(NoopCancelHandler)
273-
}
274-
275-
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
276-
Arc::new(NoopCopyHandler)
277-
}
278-
279-
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
280-
Arc::new(NoopErrorHandler)
281-
}
282262
}
283263

284264
#[tokio::main]

examples/gluesql.rs

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,19 @@ use std::sync::{Arc, Mutex};
22

33
use async_trait::async_trait;
44
use futures::stream;
5-
use pgwire::api::cancel::NoopCancelHandler;
5+
use gluesql::prelude::*;
66
use tokio::net::TcpListener;
77

8-
use gluesql::prelude::*;
9-
use pgwire::api::auth::noop::NoopStartupHandler;
10-
use pgwire::api::copy::NoopCopyHandler;
11-
use pgwire::api::query::{PlaceholderExtendedQueryHandler, SimpleQueryHandler};
8+
use pgwire::api::query::SimpleQueryHandler;
129
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response, Tag};
13-
use pgwire::api::{ClientInfo, NoopErrorHandler, PgWireServerHandlers, Type};
10+
use pgwire::api::{ClientInfo, PgWireServerHandlers, Type};
1411
use pgwire::error::{PgWireError, PgWireResult};
1512
use pgwire::tokio::process_socket;
1613

1714
pub struct GluesqlProcessor {
1815
glue: Arc<Mutex<Glue<MemoryStorage>>>,
1916
}
2017

21-
impl NoopStartupHandler for GluesqlProcessor {}
22-
2318
#[async_trait]
2419
impl SimpleQueryHandler for GluesqlProcessor {
2520
async fn do_query<'a, C>(&self, _client: &mut C, query: &str) -> PgWireResult<Vec<Response<'a>>>
@@ -163,36 +158,9 @@ struct GluesqlHandlerFactory {
163158
}
164159

165160
impl PgWireServerHandlers for GluesqlHandlerFactory {
166-
type StartupHandler = GluesqlProcessor;
167-
type SimpleQueryHandler = GluesqlProcessor;
168-
type ExtendedQueryHandler = PlaceholderExtendedQueryHandler;
169-
type CopyHandler = NoopCopyHandler;
170-
type CancelHandler = NoopCancelHandler;
171-
type ErrorHandler = NoopErrorHandler;
172-
173-
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
161+
fn simple_query_handler(&self) -> Arc<impl SimpleQueryHandler> {
174162
self.processor.clone()
175163
}
176-
177-
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
178-
Arc::new(PlaceholderExtendedQueryHandler)
179-
}
180-
181-
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
182-
self.processor.clone()
183-
}
184-
185-
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
186-
Arc::new(NoopCopyHandler)
187-
}
188-
189-
fn cancel_handler(&self) -> Arc<Self::CancelHandler> {
190-
Arc::new(NoopCancelHandler)
191-
}
192-
193-
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
194-
Arc::new(NoopErrorHandler)
195-
}
196164
}
197165

198166
#[tokio::main]

0 commit comments

Comments
 (0)