forked from scylladb/scylla-rust-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathself_identity.rs
109 lines (95 loc) · 3.92 KB
/
self_identity.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use crate::utils::{setup_tracing, test_with_3_node_cluster};
use scylla::client::session::Session;
use scylla::client::session_builder::SessionBuilder;
use scylla::client::SelfIdentity;
use scylla_cql::frame::request::options;
use scylla_cql::frame::types;
use scylla_proxy::{
Condition, ProxyError, Reaction, RequestOpcode, RequestReaction, RequestRule, ShardAwareness,
WorkerError,
};
use std::sync::Arc;
use tokio::sync::mpsc;
#[tokio::test]
#[ntest::timeout(20000)]
#[cfg_attr(scylla_cloud_tests, ignore)]
async fn self_identity_is_set_properly_in_startup_message() {
setup_tracing();
let application_name = "test_self_identity";
let application_version = "42.2137.0";
let client_id = "blue18";
let custom_driver_name = "ScyllaDB Rust Driver - test run";
let custom_driver_version = "2137.42.0";
let default_self_identity = SelfIdentity::new();
let full_self_identity = SelfIdentity::new()
.with_application_name(application_name)
.with_application_version(application_version)
.with_client_id(client_id)
.with_custom_driver_name(custom_driver_name)
.with_custom_driver_version(custom_driver_version);
test_given_self_identity(default_self_identity).await;
test_given_self_identity(full_self_identity).await;
}
async fn test_given_self_identity(self_identity: SelfIdentity<'static>) {
let res = test_with_3_node_cluster(
ShardAwareness::QueryNode,
|proxy_uris, translation_map, mut running_proxy| async move {
// We set up proxy, so that it informs us (via startup_rx) about driver's Startup message contents.
let (startup_tx, mut startup_rx) = mpsc::unbounded_channel();
running_proxy.running_nodes[0].change_request_rules(Some(vec![RequestRule(
Condition::RequestOpcode(RequestOpcode::Startup),
RequestReaction::noop().with_feedback_when_performed(startup_tx),
)]));
// DB preparation phase
let _session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.custom_identity(self_identity.clone())
.build()
.await
.unwrap();
let (startup_frame, _shard) = startup_rx.recv().await.unwrap();
let startup_options = types::read_string_map(&mut &*startup_frame.body).unwrap();
for (option_key, facultative_option) in [
(
options::APPLICATION_NAME,
self_identity.get_application_name(),
),
(
options::APPLICATION_VERSION,
self_identity.get_application_version(),
),
(options::CLIENT_ID, self_identity.get_client_id()),
] {
assert_eq!(
startup_options.get(option_key).map(String::as_str),
facultative_option
);
}
for (option_key, default_mandatory_option, custom_mandatory_option) in [
(
options::DRIVER_NAME,
options::DEFAULT_DRIVER_NAME,
self_identity.get_custom_driver_name(),
),
(
options::DRIVER_VERSION,
options::DEFAULT_DRIVER_VERSION,
self_identity.get_custom_driver_version(),
),
] {
assert_eq!(
startup_options.get(option_key).map(String::as_str),
Some(custom_mandatory_option.unwrap_or(default_mandatory_option))
);
}
running_proxy
},
)
.await;
match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}