@@ -131,23 +131,36 @@ impl Clone for RedisNode {
131131/// identify the true block producer of any given block. This is done by iterating over each epochs
132132/// and finding the highest epoch with a quorum of blocks produced.
133133pub struct RedisLeaderLeaseAdapter {
134- /// list of all redis nodes to connect to
134+ /// Redis nodes that participate in leader election and block reconciliation
135135 redis_nodes : Vec < RedisNode > ,
136- /// number of nodes for which a lock needs to be held to consider the producer legitimate
136+ /// Minimum number of Redis nodes that must agree for ownership or writes to be accepted
137137 quorum : usize ,
138+ /// Redis key that stores the current lease owner id with a TTL
138139 lease_key : String ,
140+ /// Redis key that stores the monotonically increasing fencing token
139141 epoch_key : String ,
142+ /// Redis stream key that stores published blocks for reconciliation
140143 block_stream_key : String ,
141- lease_owner_token : String ,
142- drop_release_guard : std:: sync:: Arc < ( ) > ,
144+ /// Unique id for this adapter instance, written into the lease key when it acquires leadership
145+ lease_owner_id : String ,
146+ /// Locally cached fencing token currently used for block writes.
143147 current_epoch_token : std:: sync:: Arc < std:: sync:: Mutex < Option < u64 > > > ,
148+ /// Lease TTL applied to the Redis lease key and renewed on successful writes.
144149 lease_ttl_millis : u64 ,
150+ /// Safety margin subtracted from the nominal TTL when evaluating remaining lease validity.
145151 lease_drift_millis : u64 ,
152+ /// Timeout used for individual Redis node operations.
146153 node_timeout : Duration ,
154+ /// Approximate upper bound for the Redis block stream and per-round reconciliation reads.
155+ stream_max_len : u32 ,
156+ /// Base delay before retrying lease acquisition after a failed attempt.
147157 retry_delay_millis : u64 ,
158+ /// Random jitter added to the retry delay to reduce election collisions.
148159 max_retry_delay_offset_millis : u64 ,
160+ /// Maximum number of lease acquisition attempts per election round.
149161 max_attempts : usize ,
150- stream_max_len : u32 ,
162+ /// Ensures only the last live clone attempts lease release during drop.
163+ drop_release_guard : std:: sync:: Arc < ( ) > ,
151164}
152165
153166impl Clone for RedisLeaderLeaseAdapter {
@@ -158,16 +171,16 @@ impl Clone for RedisLeaderLeaseAdapter {
158171 lease_key : self . lease_key . clone ( ) ,
159172 epoch_key : self . epoch_key . clone ( ) ,
160173 block_stream_key : self . block_stream_key . clone ( ) ,
161- lease_owner_token : self . lease_owner_token . clone ( ) ,
162- drop_release_guard : self . drop_release_guard . clone ( ) ,
174+ lease_owner_id : self . lease_owner_id . clone ( ) ,
163175 current_epoch_token : self . current_epoch_token . clone ( ) ,
164176 lease_ttl_millis : self . lease_ttl_millis ,
165177 lease_drift_millis : self . lease_drift_millis ,
166178 node_timeout : self . node_timeout ,
179+ stream_max_len : self . stream_max_len ,
167180 retry_delay_millis : self . retry_delay_millis ,
168181 max_retry_delay_offset_millis : self . max_retry_delay_offset_millis ,
169182 max_attempts : self . max_attempts ,
170- stream_max_len : self . stream_max_len ,
183+ drop_release_guard : self . drop_release_guard . clone ( ) ,
171184 }
172185 }
173186}
@@ -228,7 +241,7 @@ impl RedisLeaderLeaseAdapter {
228241 let max_retry_delay_offset_millis =
229242 u64:: try_from ( max_retry_delay_offset. as_millis ( ) ) ?;
230243 let max_attempts = usize:: try_from ( max_attempts) ?. max ( 1 ) ;
231- let lease_owner_token = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
244+ let lease_owner_id = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
232245 let epoch_key = format ! ( "{lease_key}:epoch:token" ) ;
233246 let block_stream_key = format ! ( "{lease_key}:block:stream" ) ;
234247 let lease_drift_millis = lease_ttl_millis
@@ -241,16 +254,16 @@ impl RedisLeaderLeaseAdapter {
241254 lease_key,
242255 epoch_key,
243256 block_stream_key,
244- lease_owner_token,
245- drop_release_guard : std:: sync:: Arc :: new ( ( ) ) ,
257+ lease_owner_id,
246258 current_epoch_token : std:: sync:: Arc :: new ( std:: sync:: Mutex :: new ( None ) ) ,
247259 lease_ttl_millis,
248260 lease_drift_millis,
249261 node_timeout,
262+ stream_max_len,
250263 retry_delay_millis,
251264 max_retry_delay_offset_millis,
252265 max_attempts,
253- stream_max_len ,
266+ drop_release_guard : std :: sync :: Arc :: new ( ( ) ) ,
254267 } )
255268 }
256269
@@ -293,7 +306,7 @@ impl RedisLeaderLeaseAdapter {
293306 self . node_timeout ,
294307 redis:: Script :: new ( CHECK_LEASE_OWNER_SCRIPT )
295308 . key ( & self . lease_key )
296- . arg ( & self . lease_owner_token )
309+ . arg ( & self . lease_owner_id )
297310 . invoke_async :: < i32 > ( & mut connection) ,
298311 )
299312 . await ;
@@ -323,7 +336,7 @@ impl RedisLeaderLeaseAdapter {
323336 redis:: Script :: new ( PROMOTE_LEADER_SCRIPT )
324337 . key ( & self . lease_key )
325338 . key ( & self . epoch_key )
326- . arg ( & self . lease_owner_token )
339+ . arg ( & self . lease_owner_id )
327340 . arg ( self . lease_ttl_millis )
328341 . invoke_async :: < u64 > ( & mut connection) ,
329342 )
@@ -353,7 +366,7 @@ impl RedisLeaderLeaseAdapter {
353366 self . node_timeout ,
354367 redis:: Script :: new ( RELEASE_LOCK_SCRIPT )
355368 . key ( & self . lease_key )
356- . arg ( & self . lease_owner_token )
369+ . arg ( & self . lease_owner_id )
357370 . invoke_async :: < i32 > ( & mut connection) ,
358371 )
359372 . await ;
@@ -521,7 +534,7 @@ impl RedisLeaderLeaseAdapter {
521534 async fn release_lease_on_client (
522535 redis_client : redis:: Client ,
523536 lease_key : String ,
524- lease_owner_token : String ,
537+ lease_owner_id : String ,
525538 node_timeout : Duration ,
526539 ) {
527540 let connection = timeout (
@@ -538,7 +551,7 @@ impl RedisLeaderLeaseAdapter {
538551 node_timeout,
539552 redis:: Script :: new ( RELEASE_LOCK_SCRIPT )
540553 . key ( lease_key)
541- . arg ( lease_owner_token )
554+ . arg ( lease_owner_id )
542555 . invoke_async :: < i32 > ( & mut connection) ,
543556 )
544557 . await ;
@@ -547,15 +560,15 @@ impl RedisLeaderLeaseAdapter {
547560 async fn release_lease_on_clients (
548561 redis_clients : Vec < redis:: Client > ,
549562 lease_key : String ,
550- lease_owner_token : String ,
563+ lease_owner_id : String ,
551564 node_timeout : Duration ,
552565 ) {
553566 let _ =
554567 futures:: future:: join_all ( redis_clients. into_iter ( ) . map ( |redis_client| {
555568 Self :: release_lease_on_client (
556569 redis_client,
557570 lease_key. clone ( ) ,
558- lease_owner_token . clone ( ) ,
571+ lease_owner_id . clone ( ) ,
559572 node_timeout,
560573 )
561574 } ) )
@@ -565,15 +578,15 @@ impl RedisLeaderLeaseAdapter {
565578 fn release_lease_on_clients_sync (
566579 redis_clients : Vec < redis:: Client > ,
567580 lease_key : String ,
568- lease_owner_token : String ,
581+ lease_owner_id : String ,
569582 ) {
570583 redis_clients. into_iter ( ) . for_each ( |redis_client| {
571584 let Ok ( mut connection) = redis_client. get_connection ( ) else {
572585 return ;
573586 } ;
574587 let _ = redis:: Script :: new ( RELEASE_LOCK_SCRIPT )
575588 . key ( & lease_key)
576- . arg ( & lease_owner_token )
589+ . arg ( & lease_owner_id )
577590 . invoke :: < i32 > ( & mut connection) ;
578591 } ) ;
579592 }
@@ -942,7 +955,7 @@ impl RedisLeaderLeaseAdapter {
942955 . key ( & self . epoch_key )
943956 . key ( & self . lease_key )
944957 . arg ( epoch)
945- . arg ( & self . lease_owner_token )
958+ . arg ( & self . lease_owner_id )
946959 . arg ( block_height)
947960 . arg ( block_data)
948961 . arg ( self . lease_ttl_millis )
@@ -1179,7 +1192,7 @@ impl Drop for RedisLeaderLeaseAdapter {
11791192 Self :: release_lease_on_clients (
11801193 redis_clients,
11811194 self . lease_key . clone ( ) ,
1182- self . lease_owner_token . clone ( ) ,
1195+ self . lease_owner_id . clone ( ) ,
11831196 self . node_timeout ,
11841197 ) ,
11851198 ) ;
@@ -1194,7 +1207,7 @@ impl Drop for RedisLeaderLeaseAdapter {
11941207 Self :: release_lease_on_clients_sync (
11951208 redis_clients,
11961209 self . lease_key . clone ( ) ,
1197- self . lease_owner_token . clone ( ) ,
1210+ self . lease_owner_id . clone ( ) ,
11981211 ) ;
11991212 }
12001213}
@@ -1841,7 +1854,7 @@ mod tests {
18411854 "Adapter should acquire lease"
18421855 ) ;
18431856 let adapter_clone = adapter. clone ( ) ;
1844- let owner_token = adapter. lease_owner_token . clone ( ) ;
1857+ let owner_id = adapter. lease_owner_id . clone ( ) ;
18451858
18461859 // when
18471860 drop ( adapter_clone) ;
@@ -1852,7 +1865,7 @@ mod tests {
18521865 . iter ( )
18531866 . filter ( |redis_url| {
18541867 read_lease_owner ( redis_url, & lease_key) . as_deref ( )
1855- == Some ( owner_token . as_str ( ) )
1868+ == Some ( owner_id . as_str ( ) )
18561869 } )
18571870 . count ( ) ;
18581871 assert ! (
@@ -1895,7 +1908,7 @@ mod tests {
18951908 . iter ( )
18961909 . filter ( |redis_url| {
18971910 read_lease_owner ( redis_url, & lease_key) . as_deref ( )
1898- == Some ( adapter. lease_owner_token . as_str ( ) )
1911+ == Some ( adapter. lease_owner_id . as_str ( ) )
18991912 } )
19001913 . count ( ) ;
19011914
@@ -1962,7 +1975,7 @@ mod tests {
19621975 . iter ( )
19631976 . filter ( |redis_url| {
19641977 read_lease_owner ( redis_url, & lease_key) . as_deref ( )
1965- == Some ( second_adapter. lease_owner_token . as_str ( ) )
1978+ == Some ( second_adapter. lease_owner_id . as_str ( ) )
19661979 } )
19671980 . count ( ) ;
19681981
@@ -2123,7 +2136,7 @@ mod tests {
21232136 . iter ( )
21242137 . filter ( |redis_url| {
21252138 read_lease_owner ( redis_url, & lease_key) . as_deref ( )
2126- == Some ( adapter. lease_owner_token . as_str ( ) )
2139+ == Some ( adapter. lease_owner_id . as_str ( ) )
21272140 } )
21282141 . count ( ) ;
21292142
@@ -2855,7 +2868,7 @@ mod tests {
28552868 let mut conn = client. get_connection ( ) . expect ( "conn" ) ;
28562869 let _: ( ) = redis:: cmd ( "SET" )
28572870 . arg ( & lease_key)
2858- . arg ( & candidate_b. lease_owner_token )
2871+ . arg ( & candidate_b. lease_owner_id )
28592872 . arg ( "PX" )
28602873 . arg ( 5000u64 )
28612874 . query ( & mut conn)
@@ -2874,11 +2887,11 @@ mod tests {
28742887
28752888 // Verify A owns nodes A,B but NOT node C
28762889 let owns_a = read_lease_owner ( & redis_a. redis_url ( ) , & lease_key)
2877- == Some ( adapter_a. lease_owner_token . clone ( ) ) ;
2890+ == Some ( adapter_a. lease_owner_id . clone ( ) ) ;
28782891 let owns_b = read_lease_owner ( & redis_b. redis_url ( ) , & lease_key)
2879- == Some ( adapter_a. lease_owner_token . clone ( ) ) ;
2892+ == Some ( adapter_a. lease_owner_id . clone ( ) ) ;
28802893 let owns_c = read_lease_owner ( & redis_c. redis_url ( ) , & lease_key)
2881- == Some ( adapter_a. lease_owner_token . clone ( ) ) ;
2894+ == Some ( adapter_a. lease_owner_id . clone ( ) ) ;
28822895 assert ! ( owns_a && owns_b, "A should own nodes A and B" ) ;
28832896 assert ! ( !owns_c, "A should NOT own node C (held by B)" ) ;
28842897
@@ -2898,7 +2911,7 @@ mod tests {
28982911
28992912 // then — A should now own node C too
29002913 let owns_c_after = read_lease_owner ( & redis_c. redis_url ( ) , & lease_key)
2901- == Some ( adapter_a. lease_owner_token . clone ( ) ) ;
2914+ == Some ( adapter_a. lease_owner_id . clone ( ) ) ;
29022915 assert ! ( owns_c_after, "Lock expansion should have acquired node C" ) ;
29032916
29042917 // Verify writes now go to all 3 nodes
@@ -2943,7 +2956,7 @@ mod tests {
29432956 // Simulate B's promote_leader.lua on node C: SET NX + INCR
29442957 let _: ( ) = redis:: cmd ( "SET" )
29452958 . arg ( & lease_key)
2946- . arg ( & candidate_b. lease_owner_token )
2959+ . arg ( & candidate_b. lease_owner_id )
29472960 . arg ( "PX" )
29482961 . arg ( 5000u64 )
29492962 . query ( & mut conn)
0 commit comments