File tree Expand file tree Collapse file tree
duva/src/domains/cluster_actors Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -284,14 +284,21 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
284284 return ;
285285 } ;
286286
287- // TODO get_node_for_keys need to be revisited as currently it takes only the first key
288- match self . hash_ring . get_node_for_keys ( & req. request . all_keys ( ) ) {
289- | Ok ( replid) if replid == self . replication . replid => {
287+ match self . hash_ring . list_replids_for_keys ( & req. request . all_keys ( ) ) {
288+ | Ok ( replids) if replids. keys ( ) . all ( |replid| * replid == self . replication . replid ) => {
290289 self . req_consensus ( req) . await ;
291290 } ,
292- | Ok ( replid) => {
293- err ! ( "Given keys {:?} moved to {}" , req. request. all_keys( ) , replid) ;
294- let _ = req. callback . send ( format ! ( "MOVED {replid}" ) . into ( ) ) ;
291+ | Ok ( replids) => {
292+ let moved_keys = replids
293+ . iter ( )
294+ . filter_map ( |( replid, keys) | {
295+ if * replid != self . replication . replid { Some ( keys. iter ( ) ) } else { None }
296+ } )
297+ . flatten ( )
298+ . copied ( )
299+ . collect :: < Vec < _ > > ( )
300+ . join ( " " ) ;
301+ let _ = req. callback . send ( format ! ( "MOVED {}" , moved_keys) . into ( ) ) ;
295302 } ,
296303 | Err ( err) => {
297304 err ! ( "{}" , err) ;
Original file line number Diff line number Diff line change @@ -126,7 +126,6 @@ impl FromStr for LazyOption {
126126#[ derive( Debug , Clone , PartialEq , Eq ) ]
127127pub struct SessionRequest {
128128 pub ( crate ) request_id : u64 ,
129-
130129 pub ( crate ) client_id : Uuid ,
131130}
132131impl SessionRequest {
Original file line number Diff line number Diff line change @@ -158,12 +158,23 @@ impl HashRing {
158158 self . vnodes . len ( )
159159 }
160160
161- pub ( crate ) fn get_node_for_keys ( & self , keys : & [ & str ] ) -> anyhow:: Result < ReplicationId > {
162- // Use the first key to determine the node
163- let hash = fnv_1a_hash ( keys[ 0 ] ) ;
164- self . find_replid ( hash)
165- . cloned ( )
166- . ok_or_else ( || anyhow:: anyhow!( "No node found for keys: {:?}" , keys) )
161+ pub ( crate ) fn list_replids_for_keys < ' a > (
162+ & self ,
163+ keys : & [ & ' a str ] ,
164+ ) -> anyhow:: Result < HashMap < ReplicationId , Vec < & ' a str > > > {
165+ let mut map: HashMap < ReplicationId , Vec < & str > > = HashMap :: new ( ) ;
166+
167+ for key in keys {
168+ let hash = fnv_1a_hash ( key) ;
169+ let replid = self
170+ . find_replid ( hash)
171+ . cloned ( )
172+ . ok_or_else ( || anyhow:: anyhow!( "No node found for keys: {:?}" , keys) ) ?;
173+ let v = map. entry ( replid) . or_default ( ) ;
174+ v. push ( key) ;
175+ }
176+
177+ Ok ( map)
167178 }
168179
169180 #[ cfg( test) ]
You can’t perform that action at this time.
0 commit comments