Skip to content

Commit 6f46fc2

Browse files
authored
Merge pull request #787 from Migorithm/refactor/remove-batchid
refactor: batch
2 parents f40ce73 + c5c7b44 commit 6f46fc2

8 files changed

Lines changed: 69 additions & 89 deletions

File tree

duva/src/domains/cluster_actors/actor.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ use crate::domains::QueryIO;
1515
use crate::domains::TAsyncReadWrite;
1616
use crate::domains::caches::cache_manager::CacheManager;
1717
use crate::domains::cluster_actors::consensus::election::ElectionVoting;
18-
use crate::domains::cluster_actors::hash_ring::BatchId;
1918
use crate::domains::cluster_actors::hash_ring::MigrationBatch;
2019
use crate::domains::cluster_actors::hash_ring::PendingMigration;
2120
use crate::domains::cluster_actors::hash_ring::PendingMigrationBatch;
21+
2222
use crate::domains::cluster_actors::topology::{NodeReplInfo, Topology};
2323
use crate::domains::operation_logs::WriteRequest;
2424
use crate::domains::operation_logs::interfaces::TWriteAheadLog;
@@ -346,22 +346,14 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
346346
return;
347347
};
348348

349-
match self.hash_ring.list_replids_for_keys(&req.request.all_keys()) {
350-
| Ok(replids) if replids.keys().all(|replid| *replid == self.replication.replid) => {
349+
match self.hash_ring.key_ownership(&req.request.all_keys()) {
350+
| Ok(replids) if replids.all_belongs_to(&self.replication.replid) => {
351351
self.req_consensus(req).await;
352352
},
353353
| Ok(replids) => {
354354
// To notify client's of what keys have been moved.
355355
// ! Still, client won't know where the key has been moved. The assumption here is client SHOULD have correct hashring information.
356-
let moved_keys = replids
357-
.iter()
358-
.filter_map(|(replid, keys)| {
359-
if *replid != self.replication.replid { Some(keys.iter()) } else { None }
360-
})
361-
.flatten()
362-
.copied()
363-
.collect::<Vec<_>>()
364-
.join(" ");
356+
let moved_keys = replids.except(&self.replication.replid).join(" ");
365357
req.callback.send(format!("MOVED {moved_keys}").into());
366358
},
367359
| Err(err) => {
@@ -1278,11 +1270,11 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
12781270
return;
12791271
};
12801272

1281-
self.pending_migrations
1282-
.as_mut()
1283-
.map(|p| p.add_batch(target.id.clone(), PendingMigrationBatch::new(callback, keys)));
1273+
self.pending_migrations.as_mut().map(|p| {
1274+
p.add_batch(target.batch_id.clone(), PendingMigrationBatch::new(callback, keys))
1275+
});
12841276

1285-
let _ = target_peer.send(MigrateBatch { batch_id: target.id, cache_entries }).await;
1277+
let _ = target_peer.send(MigrateBatch { batch_id: target.batch_id, cache_entries }).await;
12861278
}
12871279

12881280
pub(crate) async fn receive_batch(
@@ -1327,7 +1319,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
13271319
}
13281320
error!(
13291321
"Failed to write some keys during migration for batch {}",
1330-
migrate_batch.batch_id.0
1322+
migrate_batch.batch_id
13311323
);
13321324
}
13331325
});
@@ -1410,7 +1402,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
14101402
}
14111403
}
14121404

1413-
pub(crate) async fn send_batch_ack(&mut self, batch_id: BatchId, to: PeerIdentifier) {
1405+
pub(crate) async fn send_batch_ack(&mut self, batch_id: String, to: PeerIdentifier) {
14141406
let Some(peer) = self.members.get_mut(&to) else {
14151407
return;
14161408
};

duva/src/domains/cluster_actors/actor/tests/partitionings.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::domains::QueryIO;
22
use crate::domains::caches::cache_objects::{CacheValue, TypedValue};
3-
use crate::domains::cluster_actors::hash_ring::BatchId;
3+
44
use crate::domains::cluster_actors::hash_ring::{HashRing, tests::migration_task_create_helper};
55

66
use std::time::Duration;
@@ -376,7 +376,7 @@ async fn test_migrate_batch_send_migrate_batch_peer_message() {
376376
// THEN
377377
assert_expected_queryio(
378378
&buf,
379-
QueryIO::MigrateBatch(MigrateBatch { batch_id: batch.id, cache_entries: vec![] }),
379+
QueryIO::MigrateBatch(MigrateBatch { batch_id: batch.batch_id, cache_entries: vec![] }),
380380
)
381381
.await;
382382
}
@@ -390,7 +390,7 @@ async fn test_receive_batch_when_empty_cache_entries() {
390390
let (buf, _id) = cluster_actor.test_add_peer(6909, Some(replid.clone()), true);
391391

392392
// WHEN
393-
let batch = MigrateBatch { batch_id: BatchId("empty_test".into()), cache_entries: vec![] };
393+
let batch = MigrateBatch { batch_id: "empty_test".into(), cache_entries: vec![] };
394394
cluster_actor.receive_batch(batch.clone(), &cache_manager, _id).await;
395395

396396
// THEN - verify that no log index is incremented
@@ -416,10 +416,8 @@ async fn test_receive_batch_when_consensus_is_required() {
416416

417417
let cache_entries = vec![CacheEntry::new("success_key3", "value2")];
418418

419-
let batch = MigrateBatch {
420-
batch_id: BatchId("success_test".into()),
421-
cache_entries: cache_entries.clone(),
422-
};
419+
let batch =
420+
MigrateBatch { batch_id: "success_test".into(), cache_entries: cache_entries.clone() };
423421

424422
// WHEN
425423
cluster_actor.receive_batch(batch, &cache_manager, ack_to.clone()).await;
@@ -456,10 +454,8 @@ async fn test_receive_batch_when_noreplica_found() {
456454
let cache_entries =
457455
vec![CacheEntry::new("success_key3", "value2"), CacheEntry::new("success_key4", "value4")];
458456

459-
let batch = MigrateBatch {
460-
batch_id: BatchId("success_test".into()),
461-
cache_entries: cache_entries.clone(),
462-
};
457+
let batch =
458+
MigrateBatch { batch_id: "success_test".into(), cache_entries: cache_entries.clone() };
463459

464460
// WHEN
465461
let task = tokio::spawn(recv.wait_message(SchedulerMessage::SendBatchAck {
@@ -495,7 +491,7 @@ async fn test_unblock_write_reqs_if_done_when_migrations_still_pending() {
495491

496492
// Add pending migration (simulating migration still in progress)
497493
let (callback, _migration_rx) = Callback::create();
498-
let batch_id = BatchId("test_batch".into());
494+
let batch_id = "test_batch".into();
499495
cluster_actor
500496
.pending_migrations
501497
.as_mut()
@@ -566,7 +562,7 @@ async fn test_handle_migration_ack_failure() {
566562
let mut cluster_actor = setup_blocked_cluster_actor_with_requests(1).await;
567563
let (_hwm, _cache_manager) = Helper::cache_manager();
568564
let (callback, callback_rx) = Callback::create();
569-
let batch_id = BatchId("failure_batch".into());
565+
let batch_id = "failure_batch".to_string();
570566

571567
// Ensure pending_migrations is set up
572568
assert!(cluster_actor.pending_migrations.is_some());
@@ -606,9 +602,9 @@ async fn test_handle_migration_ack_batch_id_not_found() {
606602
.pending_migrations
607603
.as_mut()
608604
.unwrap()
609-
.add_batch(BatchId("existing_batch".into()), PendingMigrationBatch::new(callback, vec![]));
605+
.add_batch("existing_batch".into(), PendingMigrationBatch::new(callback, vec![]));
610606

611-
let non_existent_batch_id = BatchId("non_existent_batch".into());
607+
let non_existent_batch_id = "non_existent_batch".into();
612608
let ack = MigrationBatchAck { batch_id: non_existent_batch_id, success: true };
613609

614610
// WHEN
@@ -648,7 +644,7 @@ async fn test_handle_migration_ack_success_case_with_pending_reqs_and_migration(
648644

649645
// Add the last pending migration with the test keys
650646
let (callback, callback_rx) = Callback::create();
651-
let batch_id = BatchId("last_batch".into());
647+
let batch_id = "last_batch".to_string();
652648
cluster_actor
653649
.pending_migrations
654650
.as_mut()

duva/src/domains/cluster_actors/command.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::ReplicationState;
2-
use crate::domains::cluster_actors::hash_ring::{BatchId, MigrationBatch};
2+
use crate::domains::cluster_actors::hash_ring::MigrationBatch;
33
use crate::domains::cluster_actors::replication::{ReplicationId, ReplicationRole};
44
use crate::domains::cluster_actors::topology::Topology;
55
use crate::domains::operation_logs::WriteRequest;
@@ -29,7 +29,7 @@ pub enum SchedulerMessage {
2929
RebalanceRequest { request_to: PeerIdentifier, lazy_option: LazyOption },
3030
ScheduleMigrationBatch(MigrationBatch, Callback<anyhow::Result<()>>),
3131
TryUnblockWriteReqs,
32-
SendBatchAck { batch_id: BatchId, to: PeerIdentifier },
32+
SendBatchAck { batch_id: String, to: PeerIdentifier },
3333
}
3434

3535
impl From<SchedulerMessage> for ClusterCommand {

duva/src/domains/cluster_actors/hash_ring.rs

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -91,20 +91,6 @@ impl HashRing {
9191
.map(|(_, node_id)| node_id.as_ref())
9292
}
9393

94-
fn find_node(&self, hash: u64) -> Option<&PeerIdentifier> {
95-
// Find the first vnode with hash >= target hash
96-
self.pnodes.get(self.find_replid(hash)?)
97-
}
98-
99-
/// Verifies that all given keys belong to the specified node according to the hash ring
100-
pub(crate) fn verify_key_belongs_to_node(
101-
&self,
102-
keys: &[&str],
103-
expected_node: &PeerIdentifier,
104-
) -> bool {
105-
keys.iter().all(|key| self.find_node(fnv_1a_hash(key)) == Some(expected_node))
106-
}
107-
10894
pub(crate) fn create_migration_tasks(
10995
&self,
11096
new_ring: &HashRing,
@@ -120,23 +106,23 @@ impl HashRing {
120106

121107
// Check each partition for ownership changes
122108
for (i, &token) in tokens.iter().enumerate() {
123-
let prev_token = if i == 0 { tokens[tokens.len() - 1] } else { tokens[i - 1] };
124-
let (start, end) = (prev_token.wrapping_add(1), token);
125-
126-
if let (Some(old_owner), Some(new_owner)) =
127-
(self.find_replid(token), new_ring.find_replid(token))
128-
&& (old_owner != new_owner)
109+
if let Some(old_owner) = self.find_replid(token)
110+
&& let Some(new_owner) = new_ring.find_replid(token)
129111
{
130-
// If both old and new owners exist, we need to check if ownership changed
112+
if old_owner == new_owner {
113+
continue;
114+
}
131115

132116
// Node ownership changed for this partition
133117
// Need to migrate data from old node to new node
134-
let affected_keys = filter_keys_in_partition(&keys, start, end);
135-
if !affected_keys.is_empty() {
136-
migration_tasks.entry(new_owner.clone()).or_default().push(MigrationTask {
137-
task_id: (start, end),
138-
keys_to_migrate: affected_keys,
139-
});
118+
let prev_token = if i == 0 { tokens[tokens.len() - 1] } else { tokens[i - 1] };
119+
let (start, end) = (prev_token.wrapping_add(1), token);
120+
let keys_to_migrate = filter_keys_in_partition(&keys, start, end);
121+
if !keys_to_migrate.is_empty() {
122+
migration_tasks
123+
.entry(new_owner.clone())
124+
.or_default()
125+
.push(MigrationTask { range: (start, end), keys_to_migrate });
140126
}
141127
}
142128
}
@@ -158,10 +144,7 @@ impl HashRing {
158144
self.vnodes.len()
159145
}
160146

161-
pub(crate) fn list_replids_for_keys<'a>(
162-
&self,
163-
keys: &[&'a str],
164-
) -> anyhow::Result<HashMap<ReplicationId, Vec<&'a str>>> {
147+
pub(crate) fn key_ownership<'a>(&self, keys: &[&'a str]) -> anyhow::Result<KeyOwnership<'a>> {
165148
let mut map: HashMap<ReplicationId, Vec<&str>> = HashMap::new();
166149

167150
for key in keys {
@@ -174,7 +157,7 @@ impl HashRing {
174157
v.push(key);
175158
}
176159

177-
Ok(map)
160+
Ok(KeyOwnership(map))
178161
}
179162

180163
#[cfg(test)]
@@ -197,6 +180,21 @@ impl HashRing {
197180
}
198181
}
199182

183+
pub(crate) struct KeyOwnership<'a>(HashMap<ReplicationId, Vec<&'a str>>);
184+
impl<'a> KeyOwnership<'a> {
185+
pub(crate) fn all_belongs_to(&self, target: &ReplicationId) -> bool {
186+
self.0.keys().all(|replid| replid == target)
187+
}
188+
pub(crate) fn except(self, target: &ReplicationId) -> Vec<&'a str> {
189+
self.0
190+
.iter()
191+
.filter_map(|(replid, keys)| if replid != target { Some(keys.iter()) } else { None })
192+
.flatten()
193+
.copied()
194+
.collect::<Vec<_>>()
195+
}
196+
}
197+
200198
fn filter_keys_in_partition(
201199
keys: &[String],
202200
partition_start: u64,

duva/src/domains/cluster_actors/hash_ring/migration_task.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{ReplicationId, domains::cluster_actors::ConsensusRequest, types::Cal
44

55
#[derive(Debug, Clone, PartialEq, Eq)]
66
pub(crate) struct MigrationTask {
7-
pub(crate) task_id: (u64, u64), // (start_hash, end_hash)
7+
pub(crate) range: (u64, u64), // (start_hash, end_hash)
88
pub(crate) keys_to_migrate: Vec<String>, // actual keys in this range
99
}
1010

@@ -14,19 +14,16 @@ impl MigrationTask {
1414
}
1515
}
1616

17-
#[derive(Debug, Clone, PartialEq, Eq, Hash, bincode::Encode, bincode::Decode)]
18-
pub(crate) struct BatchId(pub(crate) String);
19-
2017
#[derive(Debug, Clone, PartialEq, Eq)]
2118
pub(crate) struct MigrationBatch {
22-
pub(crate) id: BatchId,
19+
pub(crate) batch_id: String,
2320
pub(crate) target_repl: ReplicationId,
2421
pub(crate) tasks: Vec<MigrationTask>,
2522
}
2623

2724
impl MigrationBatch {
2825
pub(crate) fn new(target_repl: ReplicationId, tasks: Vec<MigrationTask>) -> Self {
29-
Self { id: BatchId(uuid::Uuid::now_v7().to_string()), target_repl, tasks }
26+
Self { batch_id: uuid::Uuid::now_v7().to_string(), target_repl, tasks }
3027
}
3128
}
3229

@@ -48,16 +45,16 @@ impl PendingMigrationBatch {
4845
#[derive(Debug, Default)]
4946
pub(crate) struct PendingMigration {
5047
requests: VecDeque<ConsensusRequest>,
51-
batches: HashMap<BatchId, PendingMigrationBatch>,
48+
batches: HashMap<String, PendingMigrationBatch>,
5249
}
5350
impl PendingMigration {
5451
pub(crate) fn add_req(&mut self, req: ConsensusRequest) {
5552
self.requests.push_back(req);
5653
}
57-
pub(crate) fn add_batch(&mut self, id: BatchId, batch: PendingMigrationBatch) {
54+
pub(crate) fn add_batch(&mut self, id: String, batch: PendingMigrationBatch) {
5855
self.batches.insert(id, batch);
5956
}
60-
pub(crate) fn pop_batch(&mut self, id: &BatchId) -> Option<PendingMigrationBatch> {
57+
pub(crate) fn pop_batch(&mut self, id: &String) -> Option<PendingMigrationBatch> {
6158
self.batches.remove(id)
6259
}
6360
pub(crate) fn pending_requests(self) -> VecDeque<ConsensusRequest> {

duva/src/domains/cluster_actors/hash_ring/tests/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mod migration;
99

1010
pub(crate) fn migration_task_create_helper(start_hash: u64, end_hash: u64) -> MigrationTask {
1111
MigrationTask {
12-
task_id: (start_hash, end_hash),
12+
range: (start_hash, end_hash),
1313
keys_to_migrate: (start_hash..end_hash).map(|i| format!("key_{i}")).collect(),
1414
}
1515
}

duva/src/domains/peers/command.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,7 @@ mod peer_messages {
5353
use super::*;
5454
use crate::domains::{
5555
caches::cache_objects::CacheEntry,
56-
cluster_actors::{
57-
hash_ring::{BatchId, HashRing},
58-
replication::ReplicationId,
59-
},
56+
cluster_actors::{hash_ring::HashRing, replication::ReplicationId},
6057
operation_logs::{WriteOperation, logger::ReplicatedLogs},
6158
peers::peer::PeerState,
6259
};
@@ -171,22 +168,22 @@ mod peer_messages {
171168

172169
#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)]
173170
pub struct MigrateBatch {
174-
pub(crate) batch_id: BatchId,
171+
pub(crate) batch_id: String,
175172
pub(crate) cache_entries: Vec<CacheEntry>,
176173
}
177174

178175
#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)]
179176
pub struct MigrationBatchAck {
180-
pub(crate) batch_id: BatchId,
177+
pub(crate) batch_id: String,
181178
pub(crate) success: bool,
182179
}
183180

184181
impl MigrationBatchAck {
185-
pub(crate) fn with_reject(batch_id: BatchId) -> Self {
182+
pub(crate) fn with_reject(batch_id: String) -> Self {
186183
Self { batch_id, success: false }
187184
}
188185

189-
pub(crate) fn with_success(batch_id: BatchId) -> Self {
186+
pub(crate) fn with_success(batch_id: String) -> Self {
190187
Self { success: true, batch_id }
191188
}
192189
}

0 commit comments

Comments
 (0)