@@ -56,13 +56,27 @@ impl GrpcContext {
5656 comms_timeouts : comms_timeouts. clone ( ) ,
5757 } )
5858 }
59- pub ( crate ) async fn lock ( & self ) -> tokio:: sync:: OwnedMutexGuard < ( ) > {
59+ /// Override the timeout config in the context for the given request
60+ fn override_timeout < R : MessageIdTimeout > ( & mut self , request : Option < R > ) {
61+ let timeout = request
62+ . map ( |r| r. timeout ( self . comms_timeouts . request ( ) , & bus ( ) ) )
63+ . unwrap_or_else ( || self . comms_timeouts . request ( ) ) ;
64+
65+ self . endpoint = self
66+ . endpoint
67+ . clone ( )
68+ . connect_timeout ( self . comms_timeouts . connect ( ) + Duration :: from_millis ( 500 ) )
69+ . timeout ( timeout) ;
70+ }
71+ pub ( crate ) async fn lock ( & self ) -> GrpcLockGuard {
6072 self . lock . clone ( ) . lock_owned ( ) . await
6173 }
6274 pub ( crate ) async fn connect ( & self ) -> Result < GrpcClient , SvcError > {
6375 GrpcClient :: new ( self ) . await
6476 }
65- pub ( crate ) async fn connect_locked ( & self ) -> Result < GrpcClientLocked , SvcError > {
77+ pub ( crate ) async fn connect_locked (
78+ & self ,
79+ ) -> Result < GrpcClientLocked , ( GrpcLockGuard , SvcError ) > {
6680 GrpcClientLocked :: new ( self ) . await
6781 }
6882}
@@ -72,7 +86,7 @@ impl GrpcContext {
7286pub ( crate ) struct GrpcClient {
7387 context : GrpcContext ,
7488 /// gRPC Mayastor Client
75- pub ( crate ) client : MayaClient ,
89+ pub ( crate ) mayastor : MayaClient ,
7690}
7791pub ( crate ) type MayaClient = MayastorClient < Channel > ;
7892impl GrpcClient {
@@ -96,23 +110,48 @@ impl GrpcClient {
96110
97111 Ok ( Self {
98112 context : context. clone ( ) ,
99- client,
113+ mayastor : client,
100114 } )
101115 }
102116}
103117
104- /// Wrapper over all gRPC Clients types with implicit locking for serialization
118+ /// Async Lock guard for gRPC operations.
119+ /// It's used by the GrpcClientLocked to ensure there's only one operation in progress
120+ /// at a time while still allowing for multiple gRPC clients.
121+ type GrpcLockGuard = tokio:: sync:: OwnedMutexGuard < ( ) > ;
122+
123+ /// Wrapper over all gRPC Clients types with implicit locking for serialization.
105124pub ( crate ) struct GrpcClientLocked {
106125 /// gRPC auto CRUD guard lock
107- _lock : tokio :: sync :: OwnedMutexGuard < ( ) > ,
126+ _lock : GrpcLockGuard ,
108127 client : GrpcClient ,
109128}
110129impl GrpcClientLocked {
111- pub ( crate ) async fn new ( context : & GrpcContext ) -> Result < Self , SvcError > {
112- let client = GrpcClient :: new ( context) . await ?;
130+ /// Create new locked client from the given context
131+ /// A connection is established with the timeouts specified from the context.
132+ /// Only one `Self` is allowed at a time by making use of a lock guard.
133+ pub ( crate ) async fn new ( context : & GrpcContext ) -> Result < Self , ( GrpcLockGuard , SvcError ) > {
134+ let _lock = context. lock ( ) . await ;
135+
136+ let client = match GrpcClient :: new ( context) . await {
137+ Ok ( client) => client,
138+ Err ( error) => return Err ( ( _lock, error) ) ,
139+ } ;
140+
141+ Ok ( Self { _lock, client } )
142+ }
143+ /// Reconnect the client to use for the given request
144+ /// This is useful when we want to issue the next gRPC using a different timeout
145+ /// todo: tower should allow us to handle this better by keeping the same "backend" client
146+ /// but modifying the timeout layer?
147+ pub ( crate ) async fn reconnect < R : MessageIdTimeout > ( self , request : R ) -> Result < Self , SvcError > {
148+ let mut context = self . context . clone ( ) ;
149+ context. override_timeout ( Some ( request) ) ;
150+
151+ let client = GrpcClient :: new ( & context) . await ?;
113152
114153 Ok ( Self {
115- _lock : context . lock ( ) . await ,
154+ _lock : self . _lock ,
116155 client,
117156 } )
118157 }
0 commit comments