Skip to content

Commit ce267cb

Browse files
authored
refactor(FQDN): further more refator on idl/dsn.layer2.thrift v2 (#2217)
This is a further refactor on idl/dsn.layer2.thrift based on #2049 the main effects are on partition_configuration structure.
1 parent ae6ace2 commit ce267cb

9 files changed

+57
-20
lines changed

src/meta/backup_engine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
182182
_is_backup_failed = true;
183183
return;
184184
}
185-
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
185+
GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, partition_primary);
186186
}
187187

188188
if (!partition_primary) {

src/meta/duplication/meta_duplication_service.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -764,21 +764,24 @@ void meta_duplication_service::check_follower_app_if_create_completed(
764764
query_err = ERR_INCONSISTENT_STATE;
765765
} else {
766766
for (const auto &pc : resp.partitions) {
767-
if (!pc.hp_primary) {
767+
host_port primary;
768+
GET_HOST_PORT(pc, primary, primary);
769+
if (!primary) {
768770
// Fail once the primary replica is unavailable.
769771
query_err = ERR_INACTIVE_STATE;
770772
break;
771773
}
772774

773775
// Once replica count is more than 1, at least one secondary replica
774776
// is required.
775-
if (1 + pc.hp_secondaries.size() < pc.max_replica_count &&
776-
pc.hp_secondaries.empty()) {
777+
std::vector<host_port> secondaries;
778+
GET_HOST_PORTS(pc, secondaries, secondaries);
779+
if (1 + secondaries.size() < pc.max_replica_count && secondaries.empty()) {
777780
query_err = ERR_NOT_ENOUGH_MEMBER;
778781
break;
779782
}
780783

781-
for (const auto &secondary : pc.hp_secondaries) {
784+
for (const auto &secondary : secondaries) {
782785
if (!secondary) {
783786
// Fail once any secondary replica is unavailable.
784787
query_err = ERR_INACTIVE_STATE;

src/meta/greedy_load_balancer.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,13 @@ bool greedy_load_balancer::all_replica_infos_collected(const node_state &ns)
147147
{
148148
const auto &n = ns.host_port();
149149
return ns.for_each_partition([this, n](const dsn::gpid &pid) {
150-
config_context &cc = *get_config_context(*(t_global_view->apps), pid);
151-
if (cc.find_from_serving(n) == cc.serving.end()) {
150+
config_context *ctx = get_config_context(*(t_global_view->apps), pid);
151+
if (ctx == nullptr) {
152+
LOG_INFO("get_config_context return nullptr for gpid({})", pid);
153+
return false;
154+
}
155+
156+
if (ctx->find_from_serving(n) == ctx->serving.end()) {
152157
LOG_INFO("meta server hasn't collected gpid({})'s info of {}", pid, n);
153158
return false;
154159
}

src/meta/meta_backup_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid)
532532
pid, cold_backup_constant::PROGRESS_FINISHED, dsn::host_port());
533533
return;
534534
}
535-
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
535+
GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, partition_primary);
536536
}
537537
if (!partition_primary) {
538538
LOG_WARNING("{}: partition {} doesn't have a primary now, retry to backup it later",

src/meta/meta_bulk_load_service.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
475475
const std::string &app_name = request.app_name;
476476
const gpid &pid = request.pid;
477477
const auto &primary_addr = request.primary;
478-
const auto &primary_hp = request.hp_primary;
478+
host_port primary_hp;
479+
GET_HOST_PORT(request, primary, primary_hp);
479480

480481
if (err != ERR_OK) {
481482
LOG_ERROR("app({}), partition({}) failed to receive bulk load response from node({}), "

src/meta/server_state_restore.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,18 @@ void server_state::on_query_restore_status(configuration_query_restore_rpc rpc)
251251
for (int32_t i = 0; i < app->partition_count; i++) {
252252
const auto &r_state = app->helpers->restore_states[i];
253253
const auto &pc = app->pcs[i];
254-
if (pc.hp_primary || !pc.hp_secondaries.empty()) {
254+
host_port primary;
255+
GET_HOST_PORT(pc, primary, primary);
256+
if (primary) {
255257
// already have primary, restore succeed
256258
continue;
257259
}
260+
261+
std::vector<host_port> secondaries;
262+
GET_HOST_PORTS(pc, secondaries, secondaries);
263+
if (!secondaries.empty()) {
264+
continue;
265+
}
258266
if (r_state.progress < response.restore_progress[i]) {
259267
response.restore_progress[i] = r_state.progress;
260268
}

src/replica/bulk_load/replica_bulk_loader.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -936,10 +936,12 @@ void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
936936
primary_state.__set_download_progress(_download_progress.load());
937937
primary_state.__set_download_status(_download_status.load());
938938
}
939+
host_port primary;
940+
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
939941
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
940942
group_bulk_load_state,
941943
_replica->_primary_states.pc.primary,
942-
_replica->_primary_states.pc.hp_primary,
944+
primary,
943945
primary_state);
944946
LOG_INFO_PREFIX("primary = {}, download progress = {}%, status = {}",
945947
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
@@ -978,10 +980,12 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
978980

979981
partition_bulk_load_state primary_state;
980982
primary_state.__set_ingest_status(_replica->_app->get_ingestion_status());
983+
host_port primary;
984+
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
981985
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
982986
group_bulk_load_state,
983987
_replica->_primary_states.pc.primary,
984-
_replica->_primary_states.pc.hp_primary,
988+
primary,
985989
primary_state);
986990
LOG_INFO_PREFIX("primary = {}, ingestion status = {}",
987991
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
@@ -1025,10 +1029,12 @@ void replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
10251029

10261030
partition_bulk_load_state primary_state;
10271031
primary_state.__set_is_cleaned_up(is_cleaned_up());
1032+
host_port primary;
1033+
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
10281034
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
10291035
group_bulk_load_state,
10301036
_replica->_primary_states.pc.primary,
1031-
_replica->_primary_states.pc.hp_primary,
1037+
primary,
10321038
primary_state);
10331039
LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}",
10341040
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
@@ -1064,10 +1070,12 @@ void replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
10641070

10651071
partition_bulk_load_state primary_state;
10661072
primary_state.__set_is_paused(_status == bulk_load_status::BLS_PAUSED);
1073+
host_port primary;
1074+
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
10671075
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
10681076
group_bulk_load_state,
10691077
_replica->_primary_states.pc.primary,
1070-
_replica->_primary_states.pc.hp_primary,
1078+
primary,
10711079
primary_state);
10721080
LOG_INFO_PREFIX("primary = {}, bulk_load is_paused = {}",
10731081
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),

src/replica/replica_context.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,18 @@ void primary_context::get_replica_config(partition_status::type st,
134134
bool primary_context::check_exist(const ::dsn::host_port &node, partition_status::type st)
135135
{
136136
switch (st) {
137-
case partition_status::PS_PRIMARY:
138-
return pc.hp_primary == node;
139-
case partition_status::PS_SECONDARY:
140-
return utils::contains(pc.hp_secondaries, node);
137+
case partition_status::PS_PRIMARY: {
138+
DCHECK(pc.__isset.hp_primary, "");
139+
host_port primary;
140+
GET_HOST_PORT(pc, primary, primary);
141+
return primary == node;
142+
}
143+
case partition_status::PS_SECONDARY: {
144+
DCHECK(pc.__isset.hp_secondaries, "");
145+
std::vector<host_port> secondaries;
146+
GET_HOST_PORTS(pc, secondaries, secondaries);
147+
return utils::contains(secondaries, node);
148+
}
141149
case partition_status::PS_POTENTIAL_SECONDARY:
142150
return learners.find(node) != learners.end();
143151
default:

src/replica/replica_stub.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,7 +1700,9 @@ void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_,
17001700
req.__isset.meta_split_status ? req.meta_split_status
17011701
: split_status::NOT_SPLIT);
17021702
} else {
1703-
if (req.config.hp_primary == _primary_host_port) {
1703+
host_port primary;
1704+
GET_HOST_PORT(req.config, primary, primary);
1705+
if (primary == _primary_host_port) {
17041706
LOG_INFO("{}@{}: replica not exists on replica server, which is primary, remove it "
17051707
"from meta server",
17061708
req.config.pid,
@@ -1751,7 +1753,9 @@ void replica_stub::remove_replica_on_meta_server(const app_info &info,
17511753
SET_IP_AND_HOST_PORT(*request, node, primary_address(), _primary_host_port);
17521754
request->type = config_type::CT_DOWNGRADE_TO_INACTIVE;
17531755

1754-
if (_primary_host_port == pc.hp_primary) {
1756+
host_port primary;
1757+
GET_HOST_PORT(pc, primary, primary);
1758+
if (_primary_host_port == primary) {
17551759
RESET_IP_AND_HOST_PORT(request->config, primary);
17561760
} else if (REMOVE_IP_AND_HOST_PORT(
17571761
primary_address(), _primary_host_port, request->config, secondaries)) {

0 commit comments

Comments
 (0)