Skip to content

Commit 9b91b8a

Browse files
committed
refactor(FQDN): feather refator on idl/dsn.layer2.thrift
1 parent 2bd86c1 commit 9b91b8a

39 files changed

+615
-315
lines changed

.clang-format

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ JavaScriptQuotes: Leave
112112
JavaScriptWrapImports: true
113113
KeepEmptyLinesAtTheStartOfBlocks: true
114114
LambdaBodyIndentation: Signature
115-
Language: Cpp
116115
MacroBlockBegin: ''
117116
MacroBlockEnd: ''
118117
MaxEmptyLinesToKeep: 1

src/client/partition_resolver_simple.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,14 +414,18 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
414414
host_port partition_resolver_simple::get_host_port(const partition_configuration &pc) const
415415
{
416416
if (_app_is_stateful) {
417-
return pc.hp_primary;
417+
host_port primary;
418+
GET_HOST_PORT(pc, primary, primary);
419+
return primary;
418420
}
419421

420-
if (pc.hp_last_drops.empty()) {
422+
std::vector<host_port> last_drops;
423+
GET_HOST_PORTS(pc, last_drops, last_drops);
424+
if (last_drops.empty()) {
421425
return host_port();
422426
}
423427

424-
return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)];
428+
return last_drops[rand::next_u32(0, last_drops.size() - 1)];
425429
}
426430

427431
error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp)

src/client/replication_ddl_client.cpp

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,19 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na
167167
int ready_count = 0;
168168
for (int i = 0; i < partition_count; i++) {
169169
const auto &pc = query_resp.partitions[i];
170-
if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) {
171-
ready_count++;
170+
host_port primary;
171+
GET_HOST_PORT(pc, primary, primary);
172+
if (!primary) {
173+
continue;
174+
}
175+
176+
std::vector<host_port> secondaries;
177+
GET_HOST_PORTS(pc, secondaries, secondaries);
178+
if (secondaries.size() + 1 < max_replica_count) {
179+
continue;
172180
}
181+
182+
ready_count++;
173183
}
174184
if (ready_count == partition_count) {
175185
std::cout << app_name << " is ready now: (" << ready_count << "/" << partition_count
@@ -435,11 +445,16 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
435445
int read_unhealthy = 0;
436446
for (const auto &pc : pcs) {
437447
int replica_count = 0;
438-
if (pc.hp_primary) {
448+
host_port primary;
449+
GET_HOST_PORT(pc, primary, primary);
450+
if (primary) {
439451
replica_count++;
440452
}
441-
replica_count += pc.hp_secondaries.size();
442-
if (pc.hp_primary) {
453+
454+
std::vector<host_port> secondaries;
455+
GET_HOST_PORTS(pc, secondaries, secondaries);
456+
replica_count += secondaries.size();
457+
if (primary) {
443458
if (replica_count >= pc.max_replica_count) {
444459
fully_healthy++;
445460
} else if (replica_count < 2) {
@@ -573,13 +588,18 @@ dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_
573588
}
574589

575590
for (const auto &pc : pcs) {
576-
if (pc.hp_primary) {
577-
auto find = tmp_map.find(pc.hp_primary);
591+
host_port primary;
592+
GET_HOST_PORT(pc, primary, primary);
593+
if (primary) {
594+
auto find = tmp_map.find(primary);
578595
if (find != tmp_map.end()) {
579596
find->second.primary_count++;
580597
}
581598
}
582-
for (const auto &secondary : pc.hp_secondaries) {
599+
600+
std::vector<host_port> secondaries;
601+
GET_HOST_PORTS(pc, secondaries, secondaries);
602+
for (const auto &secondary : secondaries) {
583603
auto find = tmp_map.find(secondary);
584604
if (find != tmp_map.end()) {
585605
find->second.secondary_count++;
@@ -766,14 +786,18 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
766786
int read_unhealthy = 0;
767787
for (const auto &pc : pcs) {
768788
int replica_count = 0;
769-
if (pc.hp_primary) {
789+
host_port primary;
790+
GET_HOST_PORT(pc, primary, primary);
791+
if (primary) {
770792
replica_count++;
771-
node_stat[pc.hp_primary].first++;
793+
node_stat[primary].first++;
772794
total_prim_count++;
773795
}
774-
replica_count += pc.hp_secondaries.size();
775-
total_sec_count += pc.hp_secondaries.size();
776-
if (pc.hp_primary) {
796+
std::vector<host_port> secondaries;
797+
GET_HOST_PORTS(pc, secondaries, secondaries);
798+
replica_count += secondaries.size();
799+
total_sec_count += secondaries.size();
800+
if (primary) {
777801
if (replica_count >= pc.max_replica_count) {
778802
fully_healthy++;
779803
} else if (replica_count < 2) {
@@ -783,14 +807,14 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
783807
write_unhealthy++;
784808
read_unhealthy++;
785809
}
786-
for (const auto &secondary : pc.hp_secondaries) {
810+
for (const auto &secondary : secondaries) {
787811
node_stat[secondary].second++;
788812
}
789813
tp_details.add_row(pc.pid.get_partition_index());
790814
tp_details.append_data(pc.ballot);
791815
tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count));
792-
tp_details.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-");
793-
tp_details.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ",")));
816+
tp_details.append_data(primary ? primary.to_string() : "-");
817+
tp_details.append_data(fmt::format("[{}]", fmt::join(secondaries, ",")));
794818
}
795819
mtp.add(std::move(tp_details));
796820

src/common/replication_common.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,16 @@ int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
175175
rc.learner_signature = invalid_signature;
176176
SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary);
177177

178-
if (node == pc.hp_primary) {
178+
host_port primary;
179+
GET_HOST_PORT(pc, primary, primary);
180+
if (node == primary) {
179181
rc.status = partition_status::PS_PRIMARY;
180182
return true;
181183
}
182184

183-
if (utils::contains(pc.hp_secondaries, node)) {
185+
std::vector<host_port> secondaries;
186+
GET_HOST_PORTS(pc, secondaries, secondaries);
187+
if (utils::contains(secondaries, node)) {
184188
rc.status = partition_status::PS_SECONDARY;
185189
return true;
186190
}

src/common/replication_other_types.h

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "consensus_types.h"
3737
#include "replica_admin_types.h"
3838
#include "common/replication_enums.h"
39+
#include "rpc/dns_resolver.h"
3940
#include "rpc/rpc_address.h"
4041
#include "rpc/rpc_host_port.h"
4142

@@ -78,18 +79,33 @@ inline bool is_member(const partition_configuration &pc, const rpc_address &node
7879
inline bool is_partition_config_equal(const partition_configuration &pc1,
7980
const partition_configuration &pc2)
8081
{
81-
// secondaries no need to be same order
82-
for (const auto &pc1_secondary : pc1.hp_secondaries) {
82+
if (pc1.ballot != pc2.ballot || pc1.pid != pc2.pid ||
83+
pc1.max_replica_count != pc2.max_replica_count ||
84+
pc1.last_committed_decree != pc2.last_committed_decree) {
85+
return false;
86+
}
87+
88+
host_port pc1_primary;
89+
GET_HOST_PORT(pc1, primary, pc1_primary);
90+
host_port pc2_primary;
91+
GET_HOST_PORT(pc2, primary, pc2_primary);
92+
if (pc1_primary != pc2_primary) {
93+
return false;
94+
}
95+
96+
// secondaries no need to be in the same order.
97+
std::vector<host_port> pc1_secondaries;
98+
GET_HOST_PORTS(pc1, secondaries, pc1_secondaries);
99+
for (const auto &pc1_secondary : pc1_secondaries) {
83100
if (!is_secondary(pc2, pc1_secondary)) {
84101
return false;
85102
}
86103
}
104+
105+
std::vector<host_port> pc2_secondaries;
106+
GET_HOST_PORTS(pc2, secondaries, pc2_secondaries);
87107
// last_drops is not considered into equality check
88-
return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid &&
89-
pc1.max_replica_count == pc2.max_replica_count && pc1.primary == pc2.primary &&
90-
pc1.hp_primary == pc2.hp_primary && pc1.secondaries.size() == pc2.secondaries.size() &&
91-
pc1.hp_secondaries.size() == pc2.hp_secondaries.size() &&
92-
pc1.last_committed_decree == pc2.last_committed_decree;
108+
return pc1_secondaries.size() == pc2_secondaries.size();
93109
}
94110

95111
class replica_helper

src/meta/cluster_balance_policy.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,12 +227,17 @@ bool cluster_balance_policy::get_app_migration_info(std::shared_ptr<app_state> a
227227
info.partitions.reserve(app->pcs.size());
228228
for (const auto &pc : app->pcs) {
229229
std::map<host_port, partition_status::type> pstatus_map;
230-
pstatus_map[pc.hp_primary] = partition_status::PS_PRIMARY;
231-
if (pc.hp_secondaries.size() != pc.max_replica_count - 1) {
230+
host_port primary;
231+
GET_HOST_PORT(pc, primary, primary);
232+
pstatus_map[primary] = partition_status::PS_PRIMARY;
233+
234+
std::vector<host_port> secondaries;
235+
GET_HOST_PORTS(pc, secondaries, secondaries);
236+
if (secondaries.size() != pc.max_replica_count - 1) {
232237
// partition is unhealthy
233238
return false;
234239
}
235-
for (const auto &secondary : pc.hp_secondaries) {
240+
for (const auto &secondary : secondaries) {
236241
pstatus_map[secondary] = partition_status::PS_SECONDARY;
237242
}
238243
info.partitions.push_back(std::move(pstatus_map));

src/meta/load_balance_policy.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "meta_admin_types.h"
3636
#include "rpc/dns_resolver.h" // IWYU pragma: keep
3737
#include "rpc/rpc_address.h"
38+
#include "rpc/rpc_host_port.h"
3839
#include "utils/command_manager.h"
3940
#include "utils/fail_point.h"
4041
#include "utils/flags.h"
@@ -172,14 +173,16 @@ generate_balancer_request(const app_mapper &apps,
172173
new_proposal_action(to, to, config_type::CT_UPGRADE_TO_PRIMARY));
173174
result.action_list.emplace_back(new_proposal_action(to, from, config_type::CT_REMOVE));
174175
break;
175-
case balance_type::COPY_SECONDARY:
176+
case balance_type::COPY_SECONDARY: {
176177
ans = "copy_secondary";
177178
result.balance_type = balancer_request_type::copy_secondary;
179+
host_port primary;
180+
GET_HOST_PORT(pc, primary, primary);
178181
result.action_list.emplace_back(
179-
new_proposal_action(pc.hp_primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
180-
result.action_list.emplace_back(
181-
new_proposal_action(pc.hp_primary, from, config_type::CT_REMOVE));
182+
new_proposal_action(primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
183+
result.action_list.emplace_back(new_proposal_action(primary, from, config_type::CT_REMOVE));
182184
break;
185+
}
183186
default:
184187
CHECK(false, "");
185188
}
@@ -566,7 +569,9 @@ void ford_fulkerson::update_decree(int node_id, const node_state &ns)
566569
{
567570
ns.for_each_primary(_app->app_id, [&, this](const gpid &pid) {
568571
const auto &pc = _app->pcs[pid.get_partition_index()];
569-
for (const auto &secondary : pc.hp_secondaries) {
572+
std::vector<host_port> secondaries;
573+
GET_HOST_PORTS(pc, secondaries, secondaries);
574+
for (const auto &secondary : secondaries) {
570575
auto i = _host_port_id.find(secondary);
571576
CHECK(i != _host_port_id.end(), "invalid secondary: {}", secondary);
572577
_network[node_id][i->second]++;

src/meta/meta_bulk_load_ingestion_context.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,12 @@ void ingestion_context::partition_node_info::create(const partition_configuratio
4949
{
5050
pid = pc.pid;
5151
std::unordered_set<host_port> current_nodes;
52-
current_nodes.insert(pc.hp_primary);
53-
for (const auto &secondary : pc.hp_secondaries) {
52+
host_port primary;
53+
GET_HOST_PORT(pc, primary, primary);
54+
current_nodes.insert(primary);
55+
std::vector<host_port> secondaries;
56+
GET_HOST_PORTS(pc, secondaries, secondaries);
57+
for (const auto &secondary : secondaries) {
5458
current_nodes.insert(secondary);
5559
}
5660
for (const auto &node : current_nodes) {

src/meta/meta_bulk_load_service.cpp

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,9 @@ bool bulk_load_service::check_partition_status(
371371
}
372372

373373
pc = app->pcs[pid.get_partition_index()];
374-
if (!pc.hp_primary) {
374+
host_port primary;
375+
GET_HOST_PORT(pc, primary, primary);
376+
if (!primary) {
375377
LOG_WARNING("app({}) partition({}) primary is invalid, try it later", app_name, pid);
376378
tasking::enqueue(
377379
LPC_META_STATE_NORMAL,
@@ -382,7 +384,9 @@ bool bulk_load_service::check_partition_status(
382384
return false;
383385
}
384386

385-
if (pc.hp_secondaries.size() < pc.max_replica_count - 1) {
387+
std::vector<host_port> secondaries;
388+
GET_HOST_PORTS(pc, secondaries, secondaries);
389+
if (secondaries.size() < pc.max_replica_count - 1) {
386390
bulk_load_status::type p_status;
387391
{
388392
zauto_read_lock l(_lock);
@@ -434,7 +438,7 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
434438
const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()];
435439
req->pid = pid;
436440
req->app_name = app_name;
437-
SET_IP_AND_HOST_PORT(*req, primary, pc.primary, pc.hp_primary);
441+
SET_OBJ_IP_AND_HOST_PORT(*req, primary, pc, primary);
438442
req->remote_provider_name = ainfo.file_provider_type;
439443
req->cluster_name = ainfo.cluster_name;
440444
req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid);
@@ -1201,8 +1205,12 @@ bool bulk_load_service::check_ever_ingestion_succeed(const partition_configurati
12011205
}
12021206

12031207
std::vector<host_port> current_nodes;
1204-
current_nodes.emplace_back(pc.hp_primary);
1205-
for (const auto &secondary : pc.hp_secondaries) {
1208+
dsn::host_port primary;
1209+
GET_HOST_PORT(pc, primary, primary);
1210+
current_nodes.emplace_back(primary);
1211+
std::vector<host_port> secondaries;
1212+
GET_HOST_PORTS(pc, secondaries, secondaries);
1213+
for (const auto &secondary : secondaries) {
12061214
current_nodes.emplace_back(secondary);
12071215
}
12081216

@@ -1270,13 +1278,13 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
12701278
return;
12711279
}
12721280

1273-
const auto &primary = pc.hp_primary;
1274-
ballot meta_ballot = pc.ballot;
1281+
host_port primary;
1282+
GET_HOST_PORT(pc, primary, primary);
12751283
tasking::enqueue(
12761284
LPC_BULK_LOAD_INGESTION,
12771285
_meta_svc->tracker(),
12781286
std::bind(
1279-
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot),
1287+
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, pc.ballot),
12801288
0,
12811289
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
12821290
}

src/meta/meta_data.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,11 @@ bool construct_replica(meta_view view, const gpid &pid, int max_replica_count)
132132
// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
133133
// only primary dead
134134
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
135-
CHECK(pc.hp_last_drops.empty(), "last_drops of partition({}) must be empty", pid);
135+
std::vector<host_port> last_drops;
136+
GET_HOST_PORTS(pc, last_drops, last_drops);
137+
CHECK(last_drops.empty(), "last_drops of partition({}) must be empty", pid);
136138
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
139+
// hp_last_drops is added in the steps bellow.
137140
if (pc.hp_last_drops.size() + 1 >= max_replica_count) {
138141
break;
139142
}
@@ -540,7 +543,6 @@ app_state::app_state(const app_info &info) : app_info(info), helpers(new app_sta
540543
for (int i = 0; i != app_info::partition_count; ++i) {
541544
pcs[i].pid.set_partition_index(i);
542545
}
543-
544546
helpers->on_init_partitions();
545547
}
546548

0 commit comments

Comments
 (0)