forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtopology.hh
353 lines (278 loc) · 11.2 KB
/
topology.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/*
*
* Modified by ScyllaDB
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include <unordered_set>
#include <unordered_map>
#include <compare>
#include <iostream>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/bool_class.hh>
#include "locator/types.hh"
#include "inet_address_vectors.hh"
using namespace seastar;
namespace locator {
class topology;
}
namespace std {
std::ostream& operator<<(std::ostream& out, const locator::topology&);
}
namespace locator {
class node;
using node_holder = std::unique_ptr<node>;
class node {
public:
using this_node = bool_class<struct this_node_tag>;
using idx_type = int;
enum class state {
none = 0,
joining, // while bootstrapping, replacing
normal,
leaving, // while decommissioned, removed, replaced
left // after decommissioned, removed, replaced
};
private:
const locator::topology* _topology;
locator::host_id _host_id;
inet_address _endpoint;
endpoint_dc_rack _dc_rack;
state _state;
// Is this node the `localhost` instance
this_node _is_this_node;
idx_type _idx = -1;
public:
node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node = this_node::no, idx_type idx = -1);
node(const node&) = delete;
node(node&&) = delete;
const locator::topology* topology() const noexcept {
return _topology;
}
const locator::host_id& host_id() const noexcept {
return _host_id;
}
const inet_address& endpoint() const noexcept {
return _endpoint;
}
const endpoint_dc_rack& dc_rack() const noexcept {
return _dc_rack;
}
// Is this "localhost"?
this_node is_this_node() const noexcept { return _is_this_node; }
// idx < 0 means "unassigned"
idx_type idx() const noexcept { return _idx; }
state get_state() const noexcept { return _state; }
static std::string to_string(state);
private:
static node_holder make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node = this_node::no, idx_type idx = -1);
node_holder clone() const;
void set_topology(const locator::topology* topology) noexcept { _topology = topology; }
void set_idx(idx_type idx) noexcept { _idx = idx; }
void set_state(state state) noexcept { _state = state; }
friend class topology;
};
class topology {
public:
struct config {
host_id this_host_id;
inet_address this_endpoint;
endpoint_dc_rack local_dc_rack;
bool disable_proximity_sorting = false;
bool operator==(const config&) const = default;
};
topology(config cfg);
topology(topology&&) noexcept;
topology& operator=(topology&&) noexcept;
future<topology> clone_gently() const;
future<> clear_gently() noexcept;
public:
const config& get_config() const noexcept { return _cfg; }
const node* this_node() const noexcept {
return _this_node;
}
// Adds a node with given host_id, endpoint, and DC/rack.
const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state);
// Optionally updates node's current host_id, endpoint, or DC/rack.
// Note: the host_id may be updated from null to non-null after a new node gets a new, random host_id,
// or a peer node host_id may be updated when the node is replaced with another node using the same ip address.
const node* update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st);
// Removes a node using its host_id
// Returns true iff the node was found and removed.
bool remove_node(host_id id);
// Looks up a node by its host_id.
// Returns a pointer to the node if found, or nullptr otherwise.
const node* find_node(host_id id) const noexcept;
// Looks up a node by its inet_address.
// Returns a pointer to the node if found, or nullptr otherwise.
const node* find_node(const inet_address& ep) const noexcept;
// Finds a node by its index
// Returns a pointer to the node if found, or nullptr otherwise.
const node* find_node(node::idx_type idx) const noexcept;
// Returns true if a node with given host_id is found
bool has_node(host_id id) const noexcept;
bool has_node(inet_address id) const noexcept;
/**
* Stores current DC/rack assignment for ep
*
* Adds or updates a node with given endpoint
*/
const node* add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st);
// Legacy entry point from token_metadata::update_topology
const node* add_or_update_endpoint(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st) {
return add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st));
}
const node* add_or_update_endpoint(inet_address ep, host_id id) {
return add_or_update_endpoint(ep, id, std::nullopt, std::nullopt);
}
/**
* Removes current DC/rack assignment for ep
* Returns true if the node was found and removed.
*/
bool remove_endpoint(inet_address ep);
/**
* Returns true iff contains given endpoint.
*/
bool has_endpoint(inet_address) const;
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
get_datacenter_endpoints() const {
return _dc_endpoints;
}
const std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>&
get_datacenter_racks() const {
return _dc_racks;
}
const std::unordered_set<sstring>& get_datacenters() const noexcept {
return _datacenters;
}
// Get dc/rack location of this node
const endpoint_dc_rack& get_location() const noexcept {
return _this_node ? _this_node->dc_rack() : _cfg.local_dc_rack;
}
// Get dc/rack location of a node identified by host_id
// The specified node must exist.
const endpoint_dc_rack& get_location(host_id id) const {
return find_node(id)->dc_rack();
}
// Get dc/rack location of a node identified by endpoint
// The specified node must exist.
const endpoint_dc_rack& get_location(const inet_address& ep) const;
// Get datacenter of this node
const sstring& get_datacenter() const noexcept {
return get_location().dc;
}
// Get datacenter of a node identified by host_id
// The specified node must exist.
const sstring& get_datacenter(host_id id) const {
return get_location(id).dc;
}
// Get datacenter of a node identified by endpoint
// The specified node must exist.
const sstring& get_datacenter(inet_address ep) const {
return get_location(ep).dc;
}
// Get rack of this node
const sstring& get_rack() const noexcept {
return get_location().rack;
}
// Get rack of a node identified by host_id
// The specified node must exist.
const sstring& get_rack(host_id id) const {
return get_location(id).rack;
}
// Get rack of a node identified by endpoint
// The specified node must exist.
const sstring& get_rack(inet_address ep) const {
return get_location(ep).rack;
}
auto get_local_dc_filter() const noexcept {
return [ this, local_dc = get_datacenter() ] (inet_address ep) {
return get_datacenter(ep) == local_dc;
};
};
template <std::ranges::range Range>
inline size_t count_local_endpoints(const Range& endpoints) const {
return std::count_if(endpoints.begin(), endpoints.end(), get_local_dc_filter());
}
/**
* This method will sort the <tt>List</tt> by proximity to the given
* address.
*/
void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const;
void for_each_node(std::function<void(const node*)> func) const;
private:
bool is_configured_this_node(const node&) const;
const node* add_node(node_holder node);
void remove_node(const node* node);
static std::string debug_format(const node*);
void index_node(const node* node);
void unindex_node(const node* node);
node_holder pop_node(const node* node);
static node* make_mutable(const node* nptr) {
return const_cast<node*>(nptr);
}
/**
* compares two endpoints in relation to the target endpoint, returning as
* Comparator.compare would
*
* The closest nodes to a given node are:
* 1. The node itself
* 2. Nodes in the same RACK as the reference node
* 3. Nodes in the same DC as the reference node
*/
std::weak_ordering compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
unsigned _shard;
config _cfg;
const node* _this_node = nullptr;
std::vector<node_holder> _nodes;
std::unordered_map<host_id, const node*> _nodes_by_host_id;
std::unordered_map<inet_address, const node*> _nodes_by_endpoint;
std::unordered_map<sstring, std::unordered_set<const node*>> _dc_nodes;
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<const node*>>> _dc_rack_nodes;
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,
std::unordered_set<inet_address>>
_dc_endpoints;
/** map: DC -> (multi-map: rack -> endpoints in that rack) */
std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>
_dc_racks;
bool _sort_by_proximity = true;
// pre-calculated
std::unordered_set<sstring> _datacenters;
void calculate_datacenters();
const std::unordered_map<inet_address, const node*>& get_nodes_by_endpoint() const noexcept {
return _nodes_by_endpoint;
};
friend class token_metadata_impl;
public:
void test_compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
friend std::ostream& std::operator<<(std::ostream& out, const topology&);
};
} // namespace locator
namespace std {
std::ostream& operator<<(std::ostream& out, const locator::node& node);
std::ostream& operator<<(std::ostream& out, const locator::node::state& state);
} // namespace std
template <>
struct fmt::formatter<locator::node> : fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format(const locator::node& node, FormatContext& ctx) const {
return fmt::format_to(ctx.out(), "{}/{}", node.host_id(), node.endpoint());
}
};
template <>
struct fmt::formatter<locator::node::state> : fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format(const locator::node::state& state, FormatContext& ctx) const {
return fmt::format_to(ctx.out(), "{}", locator::node::to_string(state));
}
};