Skip to content

Commit 828d191

Browse files
committed
feat: update pgwire to 0.39
1 parent fe83376 commit 828d191

4 files changed

Lines changed: 49 additions & 13 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ bytes = "1.11.1"
1919
chrono = { version = "0.4", features = ["std"] }
2020
datafusion = { version = "53" }
2121
futures = "0.3"
22-
pgwire = { version = "0.38", default-features = false }
22+
pgwire = { version = "0.39", default-features = false }
2323
postgres-types = "0.2"
2424
rust_decimal = { version = "1.41", features = ["db-postgres"] }
2525
tokio = { version = "1", default-features = false }

datafusion-postgres/src/handlers.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ use datafusion::sql::sqlparser;
1111
use log::info;
1212
use pgwire::api::auth::noop::NoopStartupHandler;
1313
use pgwire::api::auth::StartupHandler;
14+
use pgwire::api::cancel::{CancelHandler, DefaultCancelHandler};
1415
use pgwire::api::portal::{Format, Portal};
1516
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
1617
use pgwire::api::results::{FieldInfo, Response, Tag};
1718
use pgwire::api::stmt::QueryParser;
18-
use pgwire::api::{ClientInfo, ErrorHandler, PgWireServerHandlers, Type};
19+
use pgwire::api::{ClientInfo, ConnectionManager, ErrorHandler, PgWireServerHandlers, Type};
1920
use pgwire::error::{PgWireError, PgWireResult};
2021
use pgwire::messages::PgWireBackendMessage;
2122
use pgwire::types::format::FormatOptions;
@@ -29,19 +30,34 @@ use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type};
2930
use datafusion_pg_catalog::sql::PostgresCompatibilityParser;
3031

3132
/// Simple startup handler that does no authentication
32-
pub struct SimpleStartupHandler;
33+
pub struct SimpleStartupHandler {
34+
connection_manager: Arc<ConnectionManager>,
35+
}
3336

3437
#[async_trait::async_trait]
35-
impl NoopStartupHandler for SimpleStartupHandler {}
38+
impl NoopStartupHandler for SimpleStartupHandler {
39+
fn connection_manager(&self) -> Option<Arc<ConnectionManager>> {
40+
Some(self.connection_manager.clone())
41+
}
42+
}
3643

3744
pub struct HandlerFactory {
3845
pub session_service: Arc<DfSessionService>,
46+
cancel_handler: Arc<DefaultCancelHandler>,
47+
startup_handler: Arc<SimpleStartupHandler>,
3948
}
4049

4150
impl HandlerFactory {
4251
pub fn new(session_context: Arc<SessionContext>) -> Self {
4352
let session_service = Arc::new(DfSessionService::new(session_context));
44-
HandlerFactory { session_service }
53+
let connection_manager = Arc::new(ConnectionManager::new());
54+
HandlerFactory {
55+
session_service,
56+
cancel_handler: Arc::new(DefaultCancelHandler::new(connection_manager.clone())),
57+
startup_handler: Arc::new(SimpleStartupHandler {
58+
connection_manager: connection_manager.clone(),
59+
}),
60+
}
4561
}
4662

4763
pub fn new_with_hooks(
@@ -52,7 +68,14 @@ impl HandlerFactory {
5268
session_context,
5369
query_hooks,
5470
));
55-
HandlerFactory { session_service }
71+
let connection_manager = Arc::new(ConnectionManager::new());
72+
HandlerFactory {
73+
session_service,
74+
cancel_handler: Arc::new(DefaultCancelHandler::new(connection_manager.clone())),
75+
startup_handler: Arc::new(SimpleStartupHandler {
76+
connection_manager: connection_manager.clone(),
77+
}),
78+
}
5679
}
5780
}
5881

@@ -66,12 +89,16 @@ impl PgWireServerHandlers for HandlerFactory {
6689
}
6790

6891
fn startup_handler(&self) -> Arc<impl StartupHandler> {
69-
Arc::new(SimpleStartupHandler)
92+
self.startup_handler.clone()
7093
}
7194

7295
fn error_handler(&self) -> Arc<impl ErrorHandler> {
7396
Arc::new(LoggingErrorHandler)
7497
}
98+
99+
fn cancel_handler(&self) -> Arc<impl CancelHandler> {
100+
self.cancel_handler.clone()
101+
}
75102
}
76103

77104
struct LoggingErrorHandler;

datafusion-postgres/src/testing.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use datafusion::prelude::{SessionConfig, SessionContext};
44
use datafusion_pg_catalog::pg_catalog::setup_pg_catalog;
55
use futures::Sink;
66
use pgwire::{
7-
api::{ClientInfo, ClientPortalStore, PgWireConnectionState, METADATA_USER},
7+
api::{
8+
store::MemPortalStore, ClientInfo, ClientPortalStore, PgWireConnectionState,
9+
SessionExtensions, METADATA_USER,
10+
},
811
messages::{
912
response::TransactionStatus, startup::SecretKey, PgWireBackendMessage, ProtocolVersion,
1013
},
@@ -29,8 +32,9 @@ pub fn setup_handlers() -> DfSessionService {
2932
#[derive(Debug, Default)]
3033
pub struct MockClient {
3134
metadata: HashMap<String, String>,
32-
portal_store: HashMap<String, String>,
35+
portal_store: MemPortalStore<String>,
3336
pub sent_messages: Vec<PgWireBackendMessage>,
37+
session_extensions: SessionExtensions,
3438
}
3539

3640
impl MockClient {
@@ -40,8 +44,9 @@ impl MockClient {
4044

4145
MockClient {
4246
metadata,
43-
portal_store: HashMap::default(),
47+
portal_store: MemPortalStore::new(),
4448
sent_messages: Vec::new(),
49+
session_extensions: SessionExtensions::new(),
4550
}
4651
}
4752

@@ -98,10 +103,14 @@ impl ClientInfo for MockClient {
98103
fn sni_server_name(&self) -> Option<&str> {
99104
None
100105
}
106+
107+
fn session_extensions(&self) -> &pgwire::api::SessionExtensions {
108+
&self.session_extensions
109+
}
101110
}
102111

103112
impl ClientPortalStore for MockClient {
104-
type PortalStore = HashMap<String, String>;
113+
type PortalStore = MemPortalStore<String>;
105114
fn portal_store(&self) -> &Self::PortalStore {
106115
&self.portal_store
107116
}

0 commit comments

Comments
 (0)