forked from scylladb/scylla-rust-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshards.rs
89 lines (77 loc) · 3.11 KB
/
shards.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::collections::HashSet;
use std::sync::Arc;
use crate::utils::{
scylla_supports_tablets, setup_tracing, test_with_3_node_cluster, unique_keyspace_name,
PerformDDL,
};
use scylla::client::session_builder::SessionBuilder;
use tokio::sync::mpsc;
use scylla_proxy::TargetShard;
use scylla_proxy::{
Condition, Reaction, RequestOpcode, RequestReaction, RequestRule, ShardAwareness,
};
use scylla_proxy::{ProxyError, RequestFrame, WorkerError};
#[tokio::test]
#[ntest::timeout(30000)]
#[cfg_attr(scylla_cloud_tests, ignore)]
async fn test_consistent_shard_awareness() {
setup_tracing();
let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move {
let (feedback_txs, mut feedback_rxs): (Vec<_>, Vec<_>) = (0..3).map(|_| {
mpsc::unbounded_channel::<(RequestFrame, Option<TargetShard>)>()
}).unzip();
for (i, tx) in feedback_txs.iter().cloned().enumerate() {
running_proxy.running_nodes[i].change_request_rules(Some(vec![
RequestRule(Condition::RequestOpcode(RequestOpcode::Execute).and(Condition::not(Condition::ConnectionRegisteredAnyEvent)), RequestReaction::noop().with_feedback_when_performed(tx))
]));
}
let session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.build()
.await
.unwrap();
let ks = unique_keyspace_name();
/* Prepare schema */
let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks);
if scylla_supports_tablets(&session).await {
create_ks += " and TABLETS = { 'enabled': false}";
}
session.ddl(create_ks).await.unwrap();
session
.ddl(
format!(
"CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))",
ks
),
)
.await
.unwrap();
let prepared = session.prepare(format!("INSERT INTO {}.t (a, b, c) VALUES (?, ?, 'abc')", ks)).await.unwrap();
let value_lists = [
(4, 2),
(2, 1),
(3, 7),
];
fn assert_one_shard_queried(rx: &mut mpsc::UnboundedReceiver<(RequestFrame, Option<TargetShard>)>) {
let shards = std::iter::from_fn(|| rx.try_recv().ok().map(|(_frame, shard)| shard)).collect::<HashSet<_>>();
if !shards.is_empty() {
assert_eq!(shards.len(), 1);
}
}
for values in value_lists {
for _ in 0..10 {
session.execute_unpaged(&prepared, values).await.unwrap();
}
for rx in feedback_rxs.iter_mut() {
assert_one_shard_queried(rx);
}
}
running_proxy
}).await;
match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}