@@ -4,10 +4,13 @@ use std::sync::atomic::{AtomicU32, Ordering};
44use std:: sync:: { Arc , RwLock } ;
55
66/// A client wrapper that allows replacing the underlying client at a later point in time.
7- /// Clones of [SharedReplaceableClient] have a shared reference to the underlying client, and also
8- /// a local cached clone of the underlying client. Before every service call, the cached clone is
9- /// updated if the shared client was replaced.
10- #[ derive( Debug , Clone ) ]
7+ /// Clones of this struct have a shared reference to the underlying client, and each clone also
8+ /// has its own cached clone of the underlying client. Before every service call, a check is made
9+ /// whether the shared client was replaced, and the cached clone is updated accordingly.
10+ ///
11+ /// This struct is fully thread-safe, and it works in a lock-free manner except when the client is
12+ /// being replaced. A read-write lock is used then, with minimal locking time.
13+ #[ derive( Debug ) ]
1114pub struct SharedReplaceableClient < C >
1215where
1316 C : Clone + Send + Sync ,
@@ -30,14 +33,17 @@ impl<C> SharedClientData<C>
3033where
3134 C : Clone + Send + Sync ,
3235{
36+ fn fetch ( & self ) -> ( C , u32 ) {
37+ let lock = self . client . read ( ) . unwrap ( ) ;
38+ let client = lock. clone ( ) ;
39+ // Loading generation under lock to ensure the client won't be updated in the meantime.
40+ let generation = self . generation . load ( Ordering :: Acquire ) ;
41+ ( client, generation)
42+ }
43+
3344 fn fetch_newer_than ( & self , current_generation : u32 ) -> Option < ( C , u32 ) > {
34- ( current_generation != self . generation . load ( Ordering :: Acquire ) ) . then ( || {
35- let lock = self . client . read ( ) . unwrap ( ) ;
36- let client = lock. clone ( ) ;
37- // Loading generation under lock to ensure the client won't be updated in the meantime.
38- let generation = self . generation . load ( Ordering :: Acquire ) ;
39- ( client, generation)
40- } )
45+ // fetch() will do a second atomic load, but it's necessary to avoid a race condition.
46+ ( current_generation != self . generation . load ( Ordering :: Acquire ) ) . then ( || self . fetch ( ) )
4147 }
4248
4349 fn replace_client ( & self , client : C ) {
5460 C : Clone + Send + Sync ,
5561{
5662 /// Creates the initial instance of replaceable client with the provided underlying client.
57- /// Use `clone` method to create more instances that share the same underlying client.
63+ /// Use [ `clone()`](Self::clone) method to create more instances that share the same underlying client.
5864 pub fn new ( client : C ) -> Self {
5965 let cloned_client = client. clone ( ) ;
6066 Self {
7783 self . inner_cow ( ) . into_owned ( )
7884 }
7985
80- /// Returns a reference to the underlying client if possible, or its clone otherwise.
86+ /// Returns a reference to this instance's cached clone of the underlying client if it's up to
87+ /// date, or a fresh clone of the shared client otherwise. Because it's an immutable method,
88+ /// it will not update this instance's cached clone. For this reason, prefer to use
89+ /// [`refresh_inner()`](Self::refresh_inner) when possible.
8190 pub fn inner_cow ( & self ) -> Cow < ' _ , C > {
8291 self . shared_data
8392 . fetch_newer_than ( self . cloned_generation )
8695 }
8796
8897 /// Refreshes this instance's cached clone of the underlying client. Returns a mutable reference
89- /// to it. Called automatically by other mutable methods, in particular by all client calls.
98+ /// to it. Called automatically by other mutable methods, in particular by all RPC calls.
99+ ///
100+ /// While this method allows mutable access to the underlying client, any configuration changes
101+ /// will not be shared with other instances, and will be lost if the client gets replaced from
102+ /// anywhere. To make configuration changes, use [`replace_client()`](Self::refresh_client) instead.
90103 pub fn refresh_inner ( & mut self ) -> & mut C {
91104 if let Some ( ( client, generation) ) =
92105 self . shared_data . fetch_newer_than ( self . cloned_generation )
@@ -98,6 +111,25 @@ where
98111 }
99112}
100113
114+ impl < C > Clone for SharedReplaceableClient < C >
115+ where
116+ C : Clone + Send + Sync ,
117+ {
118+ /// Creates a new instance of replaceable client that shares the underlying client with this
119+ /// instance. Replacing a client in either instance will replace it for both instances, and all
120+ /// other clones too.
121+ fn clone ( & self ) -> Self {
122+ // self's cloned_client could've been modified through a mutable reference,
123+ // so for consistent behavior, we need to fetch it from shared_data.
124+ let ( client, generation) = self . shared_data . fetch ( ) ;
125+ Self {
126+ shared_data : self . shared_data . clone ( ) ,
127+ cloned_client : client,
128+ cloned_generation : generation,
129+ }
130+ }
131+ }
132+
101133impl < C > NamespacedClient for SharedReplaceableClient < C >
102134where
103135 C : NamespacedClient + Clone + Send + Sync ,
@@ -110,3 +142,76 @@ where
110142 self . inner_cow ( ) . identity ( )
111143 }
112144}
145+
146+ #[ cfg( test) ]
147+ mod tests {
148+ use crate :: { NamespacedClient , SharedReplaceableClient } ;
149+ use std:: borrow:: Cow ;
150+
151+ #[ derive( Debug , Clone ) ]
152+ struct StubClient {
153+ identity : String ,
154+ }
155+
156+ impl StubClient {
157+ fn new ( identity : & str ) -> Self {
158+ Self {
159+ identity : identity. to_owned ( ) ,
160+ }
161+ }
162+ }
163+
164+ impl NamespacedClient for StubClient {
165+ fn namespace ( & self ) -> String {
166+ "default" . into ( )
167+ }
168+
169+ fn identity ( & self ) -> String {
170+ self . identity . clone ( )
171+ }
172+ }
173+
174+ #[ test]
175+ fn cow_returns_reference_before_and_clone_after_refresh ( ) {
176+ let mut client = SharedReplaceableClient :: new ( StubClient :: new ( "1" ) ) ;
177+ let Cow :: Borrowed ( inner) = client. inner_cow ( ) else {
178+ panic ! ( "expected borrowed inner" ) ;
179+ } ;
180+ assert_eq ! ( inner. identity, "1" ) ;
181+
182+ client. replace_client ( StubClient :: new ( "2" ) ) ;
183+ let Cow :: Owned ( inner) = client. inner_cow ( ) else {
184+ panic ! ( "expected owned inner" ) ;
185+ } ;
186+ assert_eq ! ( inner. identity, "2" ) ;
187+
188+ assert_eq ! ( client. refresh_inner( ) . identity, "2" ) ;
189+ let Cow :: Borrowed ( inner) = client. inner_cow ( ) else {
190+ panic ! ( "expected borrowed inner" ) ;
191+ } ;
192+ assert_eq ! ( inner. identity, "2" ) ;
193+ }
194+
195+ #[ test]
196+ fn client_replaced_in_clones ( ) {
197+ let original1 = SharedReplaceableClient :: new ( StubClient :: new ( "1" ) ) ;
198+ let clone1 = original1. clone ( ) ;
199+ assert_eq ! ( original1. identity( ) , "1" ) ;
200+ assert_eq ! ( clone1. identity( ) , "1" ) ;
201+
202+ original1. replace_client ( StubClient :: new ( "2" ) ) ;
203+ assert_eq ! ( original1. identity( ) , "2" ) ;
204+ assert_eq ! ( clone1. identity( ) , "2" ) ;
205+
206+ let original2 = SharedReplaceableClient :: new ( StubClient :: new ( "3" ) ) ;
207+ let clone2 = original2. clone ( ) ;
208+ assert_eq ! ( original2. identity( ) , "3" ) ;
209+ assert_eq ! ( clone2. identity( ) , "3" ) ;
210+
211+ clone2. replace_client ( StubClient :: new ( "4" ) ) ;
212+ assert_eq ! ( original2. identity( ) , "4" ) ;
213+ assert_eq ! ( clone2. identity( ) , "4" ) ;
214+ assert_eq ! ( original1. identity( ) , "2" ) ;
215+ assert_eq ! ( clone1. identity( ) , "2" ) ;
216+ }
217+ }
0 commit comments