@@ -10,7 +10,6 @@ use std::io;
10
10
use std:: time:: Duration ;
11
11
12
12
use ignore_result:: Ignore ;
13
- use rustls:: ClientConfig ;
14
13
use tokio:: select;
15
14
use tokio:: sync:: mpsc;
16
15
use tokio:: time:: { self , Instant } ;
@@ -39,6 +38,7 @@ use crate::proto::{AuthPacket, ConnectRequest, ConnectResponse, ErrorCode, OpCod
39
38
use crate :: record;
40
39
#[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
41
40
use crate :: sasl:: { SaslInitiator , SaslOptions , SaslSession } ;
41
+ use crate :: tls:: TlsOptions ;
42
42
43
43
pub const PASSWORD_LEN : usize = 16 ;
44
44
pub const DEFAULT_SESSION_TIMEOUT : Duration = Duration :: from_secs ( 6 ) ;
@@ -59,59 +59,81 @@ impl RequestOperation for (WatcherId, StateResponser) {
59
59
}
60
60
}
61
61
62
- pub struct Session {
62
+ #[ derive( Default ) ]
63
+ pub struct Builder {
64
+ tls : Option < TlsOptions > ,
65
+ #[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
66
+ sasl : Option < SaslOptions > ,
67
+ authes : Vec < MarshalledRequest > ,
63
68
readonly : bool ,
64
69
detached : bool ,
70
+ session : Option < SessionInfo > ,
71
+ session_timeout : Duration ,
72
+ connection_timeout : Duration ,
73
+ }
65
74
66
- connector : Connector ,
75
+ impl Builder {
76
+ pub fn with_tls ( self , tls : Option < TlsOptions > ) -> Self {
77
+ Self { tls, ..self }
78
+ }
67
79
68
- configured_connection_timeout : Duration ,
80
+ #[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
81
+ pub fn with_sasl ( self , sasl : Option < SaslOptions > ) -> Self {
82
+ Self { sasl, ..self }
83
+ }
69
84
70
- last_zxid : i64 ,
71
- last_recv : Instant ,
72
- last_send : Instant ,
73
- last_ping : Option < Instant > ,
74
- tick_timeout : Duration ,
75
- ping_timeout : Duration ,
76
- session_expired_timeout : Duration ,
85
+ pub fn with_authes ( self , authes : & [ AuthPacket ] ) -> Self {
86
+ Self { authes : authes. iter ( ) . map ( |auth| MarshalledRequest :: new ( OpCode :: Auth , auth) ) . collect ( ) , ..self }
87
+ }
77
88
78
- pub session : SessionInfo ,
79
- session_state : SessionState ,
80
- pub session_timeout : Duration ,
89
+ pub fn with_readonly ( self , readonly : bool ) -> Self {
90
+ Self { readonly , .. self }
91
+ }
81
92
82
- #[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
83
- sasl_options : Option < SaslOptions > ,
84
- #[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
85
- sasl_session : Option < SaslSession > ,
93
+ pub fn with_detached ( self , detached : bool ) -> Self {
94
+ Self { detached, ..self }
95
+ }
86
96
87
- pub authes : Vec < MarshalledRequest > ,
88
- state_sender : tokio:: sync:: watch:: Sender < SessionState > ,
97
+ pub fn with_session ( self , session : Option < SessionInfo > ) -> Self {
98
+ Self { session, ..self }
99
+ }
89
100
90
- watch_manager : WatchManager ,
91
- unwatch_receiver : Option < mpsc :: UnboundedReceiver < ( WatcherId , StateResponser ) > > ,
92
- }
101
+ pub fn with_session_timeout ( self , session_timeout : Duration ) -> Self {
102
+ Self { session_timeout , .. self }
103
+ }
93
104
94
- impl Session {
95
- #[ allow( clippy:: too_many_arguments) ]
96
- pub fn new (
97
- session : Option < SessionInfo > ,
98
- authes : & [ AuthPacket ] ,
99
- readonly : bool ,
100
- detached : bool ,
101
- tls_config : ClientConfig ,
102
- #[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ] sasl_options : Option < SaslOptions > ,
103
- session_timeout : Duration ,
104
- connection_timeout : Duration ,
105
- ) -> ( Session , tokio:: sync:: watch:: Receiver < SessionState > ) {
106
- let session = session. unwrap_or_else ( || SessionInfo :: new ( SessionId ( 0 ) , Vec :: with_capacity ( PASSWORD_LEN ) ) ) ;
105
+ pub fn with_connection_timeout ( self , connection_timeout : Duration ) -> Self {
106
+ Self { connection_timeout, ..self }
107
+ }
108
+
109
+ pub fn build ( self ) -> Result < ( Session , tokio:: sync:: watch:: Receiver < SessionState > ) , Error > {
110
+ let session = match self . session {
111
+ Some ( session) => {
112
+ if session. is_readonly ( ) {
113
+ return Err ( Error :: new_other (
114
+ format ! ( "can't reestablish readonly and hence local session {}" , session. id( ) ) ,
115
+ None ,
116
+ ) ) ;
117
+ }
118
+ Span :: current ( ) . record ( "session" , display ( session. id ( ) ) ) ;
119
+ session
120
+ } ,
121
+ None => SessionInfo :: new ( SessionId ( 0 ) , Vec :: with_capacity ( PASSWORD_LEN ) ) ,
122
+ } ;
123
+ if self . session_timeout < Duration :: ZERO {
124
+ return Err ( Error :: BadArguments ( & "session timeout must not be negative" ) ) ;
125
+ } else if self . connection_timeout < Duration :: ZERO {
126
+ return Err ( Error :: BadArguments ( & "connection timeout must not be negative" ) ) ;
127
+ }
128
+ let tls_config = self . tls . unwrap_or_default ( ) . into_config ( ) ?;
107
129
let ( state_sender, state_receiver) = tokio:: sync:: watch:: channel ( SessionState :: Disconnected ) ;
108
130
let now = Instant :: now ( ) ;
109
131
let ( watch_manager, unwatch_receiver) = WatchManager :: new ( ) ;
110
132
let mut session = Session {
111
- readonly,
112
- detached,
133
+ readonly : self . readonly ,
134
+ detached : self . detached ,
113
135
114
- configured_connection_timeout : connection_timeout,
136
+ configured_connection_timeout : self . connection_timeout ,
115
137
116
138
last_zxid : session. last_zxid ,
117
139
last_recv : now,
@@ -122,22 +144,60 @@ impl Session {
122
144
session_expired_timeout : Duration :: ZERO ,
123
145
connector : Connector :: new ( tls_config) ,
124
146
#[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
125
- sasl_options,
147
+ sasl_options : self . sasl ,
126
148
#[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
127
149
sasl_session : None ,
128
150
129
151
session,
130
- session_timeout,
152
+ session_timeout : self . session_timeout ,
131
153
session_state : SessionState :: Disconnected ,
132
154
133
- authes : authes . iter ( ) . map ( |auth| MarshalledRequest :: new ( OpCode :: Auth , auth ) ) . collect ( ) ,
155
+ authes : self . authes ,
134
156
state_sender,
135
157
watch_manager,
136
158
unwatch_receiver : Some ( unwatch_receiver) ,
137
159
} ;
138
- let timeout = if session_timeout. is_zero ( ) { DEFAULT_SESSION_TIMEOUT } else { session_timeout } ;
160
+ let timeout = if self . session_timeout . is_zero ( ) { DEFAULT_SESSION_TIMEOUT } else { self . session_timeout } ;
139
161
session. reset_timeout ( timeout) ;
140
- ( session, state_receiver)
162
+ Ok ( ( session, state_receiver) )
163
+ }
164
+ }
165
+
166
+ pub struct Session {
167
+ readonly : bool ,
168
+ detached : bool ,
169
+
170
+ connector : Connector ,
171
+
172
+ configured_connection_timeout : Duration ,
173
+
174
+ last_zxid : i64 ,
175
+ last_recv : Instant ,
176
+ last_send : Instant ,
177
+ last_ping : Option < Instant > ,
178
+ tick_timeout : Duration ,
179
+ ping_timeout : Duration ,
180
+ session_expired_timeout : Duration ,
181
+
182
+ pub session : SessionInfo ,
183
+ session_state : SessionState ,
184
+ pub session_timeout : Duration ,
185
+
186
+ #[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
187
+ sasl_options : Option < SaslOptions > ,
188
+ #[ cfg( any( feature = "sasl-digest-md5" , feature = "sasl-gssapi" ) ) ]
189
+ sasl_session : Option < SaslSession > ,
190
+
191
+ pub authes : Vec < MarshalledRequest > ,
192
+ state_sender : tokio:: sync:: watch:: Sender < SessionState > ,
193
+
194
+ watch_manager : WatchManager ,
195
+ unwatch_receiver : Option < mpsc:: UnboundedReceiver < ( WatcherId , StateResponser ) > > ,
196
+ }
197
+
198
+ impl Session {
199
+ pub fn builder ( ) -> Builder {
200
+ Builder :: default ( )
141
201
}
142
202
143
203
fn is_readonly_allowed ( & self ) -> bool {
0 commit comments