From e04715ecbba63a3d757d154809a7e1ad2e1b92c5 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Mon, 29 Apr 2024 20:27:09 +0800 Subject: [PATCH] refactor(FQDN): feather refator on idl/dsn.layer2.thrift --- src/client/partition_resolver_simple.cpp | 10 +- src/client/replication_ddl_client.cpp | 56 +++++--- src/common/replication_common.cpp | 8 +- src/common/replication_other_types.h | 30 +++- src/meta/cluster_balance_policy.cpp | 11 +- src/meta/load_balance_policy.cpp | 15 +- src/meta/meta_bulk_load_ingestion_context.cpp | 8 +- src/meta/meta_bulk_load_service.cpp | 24 ++-- src/meta/meta_data.cpp | 7 +- src/meta/meta_data.h | 48 +++++-- src/meta/meta_http_service.cpp | 40 ++++-- src/meta/meta_split_service.cpp | 4 +- src/meta/partition_guardian.cpp | 46 +++--- src/meta/server_load_balancer.cpp | 4 +- src/meta/server_state.cpp | 124 ++++++++-------- .../balancer_simulator/balancer_simulator.cpp | 7 +- src/meta/test/json_compacity.cpp | 13 +- src/replica/backup/replica_backup_manager.cpp | 10 +- src/replica/bulk_load/replica_bulk_loader.cpp | 36 +++-- src/replica/duplication/replica_follower.cpp | 15 +- src/replica/duplication/replica_follower.h | 5 +- src/replica/replica_2pc.cpp | 19 +-- src/replica/replica_backup.cpp | 11 +- src/replica/replica_config.cpp | 133 ++++++++++++++---- src/replica/replica_context.cpp | 16 ++- src/replica/split/replica_split_manager.cpp | 39 ++--- src/runtime/rpc/rpc_host_port.h | 2 +- src/server/hotspot_partition_calculator.cpp | 6 +- src/shell/command_helper.h | 27 ++-- src/shell/commands/data_operations.cpp | 13 +- src/shell/commands/node_management.cpp | 10 +- src/shell/commands/recovery.cpp | 24 ++-- src/shell/commands/table_management.cpp | 24 ++-- .../detect_hotspot/test_detect_hotspot.cpp | 25 ++-- src/test/function_test/utils/test_util.cpp | 8 +- src/test/kill_test/kill_testor.cpp | 12 +- src/test/kill_test/partition_kill_testor.cpp | 15 +- 37 files changed, 598 insertions(+), 307 deletions(-) diff --git a/src/client/partition_resolver_simple.cpp b/src/client/partition_resolver_simple.cpp index 08d98790fb..71a73509e9 100644 --- a/src/client/partition_resolver_simple.cpp +++ b/src/client/partition_resolver_simple.cpp @@ -414,14 +414,18 @@ void partition_resolver_simple::handle_pending_requests(std::deque last_drops; + GET_HOST_PORTS(pc, last_drops, last_drops); + if (last_drops.empty()) { return host_port(); } - return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)]; + return last_drops[rand::next_u32(0, last_drops.size() - 1)]; } error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp) diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index 1844b87d94..86c2c2d337 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -167,9 +167,19 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na int ready_count = 0; for (int i = 0; i < partition_count; i++) { const auto &pc = query_resp.partitions[i]; - if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) { - ready_count++; + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (!primary) { + continue; + } + + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (secondaries.size() + 1 < max_replica_count) { + continue; } + + ready_count++; } if (ready_count == partition_count) { std::cout << app_name << " is ready now: (" << ready_count << "/" << partition_count @@ -435,11 +445,16 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st int read_unhealthy = 0; for (const auto &pc : pcs) { int replica_count = 0; - if (pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { replica_count++; } - replica_count += pc.hp_secondaries.size(); - if (pc.hp_primary) { + + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + replica_count += secondaries.size(); + if (primary) { if (replica_count >= pc.max_replica_count) { fully_healthy++; } else if (replica_count < 2) { @@ -573,13 +588,18 @@ dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_ } for (const auto &pc : pcs) { - if (pc.hp_primary) { - auto find = tmp_map.find(pc.hp_primary); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { + auto find = tmp_map.find(primary); if (find != tmp_map.end()) { find->second.primary_count++; } } - for (const auto &secondary : pc.hp_secondaries) { + + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { auto find = tmp_map.find(secondary); if (find != tmp_map.end()) { find->second.secondary_count++; @@ -766,14 +786,18 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name, int read_unhealthy = 0; for (const auto &pc : pcs) { int replica_count = 0; - if (pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { replica_count++; - node_stat[pc.hp_primary].first++; + node_stat[primary].first++; total_prim_count++; } - replica_count += pc.hp_secondaries.size(); - total_sec_count += pc.hp_secondaries.size(); - if (pc.hp_primary) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + replica_count += secondaries.size(); + total_sec_count += secondaries.size(); + if (primary) { if (replica_count >= pc.max_replica_count) { fully_healthy++; } else if (replica_count < 2) { @@ -783,14 +807,14 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name, write_unhealthy++; read_unhealthy++; } - for (const auto &secondary : pc.hp_secondaries) { + for (const auto &secondary : secondaries) { node_stat[secondary].second++; } tp_details.add_row(pc.pid.get_partition_index()); tp_details.append_data(pc.ballot); tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count)); - tp_details.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-"); - tp_details.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ","))); + tp_details.append_data(primary ? primary.to_string() : "-"); + tp_details.append_data(fmt::format("[{}]", fmt::join(secondaries, ","))); } mtp.add(std::move(tp_details)); diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp index e34bcb5838..3dc8b0e29b 100644 --- a/src/common/replication_common.cpp +++ b/src/common/replication_common.cpp @@ -175,12 +175,16 @@ int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_ rc.learner_signature = invalid_signature; SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary); - if (node == pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (node == primary) { rc.status = partition_status::PS_PRIMARY; return true; } - if (utils::contains(pc.hp_secondaries, node)) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (utils::contains(secondaries, node)) { rc.status = partition_status::PS_SECONDARY; return true; } diff --git a/src/common/replication_other_types.h b/src/common/replication_other_types.h index ea194a82af..d42ddae080 100644 --- a/src/common/replication_other_types.h +++ b/src/common/replication_other_types.h @@ -36,6 +36,7 @@ #include "consensus_types.h" #include "replica_admin_types.h" #include "common/replication_enums.h" +#include "runtime/rpc/dns_resolver.h" #include "runtime/rpc/rpc_address.h" #include "runtime/rpc/rpc_host_port.h" @@ -78,18 +79,33 @@ inline bool is_member(const partition_configuration &pc, const rpc_address &node inline bool is_partition_config_equal(const partition_configuration &pc1, const partition_configuration &pc2) { - // secondaries no need to be same order - for (const auto &pc1_secondary : pc1.hp_secondaries) { + if (pc1.ballot != pc2.ballot || pc1.pid != pc2.pid || + pc1.max_replica_count != pc2.max_replica_count || + pc1.last_committed_decree != pc2.last_committed_decree) { + return false; + } + + host_port pc1_primary; + GET_HOST_PORT(pc1, primary, pc1_primary); + host_port pc2_primary; + GET_HOST_PORT(pc2, primary, pc2_primary); + if (pc1_primary != pc2_primary) { + return false; + } + + // secondaries no need to be in the same order. + std::vector pc1_secondaries; + GET_HOST_PORTS(pc1, secondaries, pc1_secondaries); + for (const auto &pc1_secondary : pc1_secondaries) { if (!is_secondary(pc2, pc1_secondary)) { return false; } } + + std::vector pc2_secondaries; + GET_HOST_PORTS(pc2, secondaries, pc2_secondaries); // last_drops is not considered into equality check - return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid && - pc1.max_replica_count == pc2.max_replica_count && pc1.primary == pc2.primary && - pc1.hp_primary == pc2.hp_primary && pc1.secondaries.size() == pc2.secondaries.size() && - pc1.hp_secondaries.size() == pc2.hp_secondaries.size() && - pc1.last_committed_decree == pc2.last_committed_decree; + return pc1_secondaries.size() == pc2_secondaries.size(); } class replica_helper diff --git a/src/meta/cluster_balance_policy.cpp b/src/meta/cluster_balance_policy.cpp index 2b10e39ece..6e492fca4f 100644 --- a/src/meta/cluster_balance_policy.cpp +++ b/src/meta/cluster_balance_policy.cpp @@ -226,12 +226,17 @@ bool cluster_balance_policy::get_app_migration_info(std::shared_ptr a info.partitions.reserve(app->pcs.size()); for (const auto &pc : app->pcs) { std::map pstatus_map; - pstatus_map[pc.hp_primary] = partition_status::PS_PRIMARY; - if (pc.hp_secondaries.size() != pc.max_replica_count - 1) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + pstatus_map[primary] = partition_status::PS_PRIMARY; + + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (secondaries.size() != pc.max_replica_count - 1) { // partition is unhealthy return false; } - for (const auto &secondary : pc.hp_secondaries) { + for (const auto &secondary : secondaries) { pstatus_map[secondary] = partition_status::PS_SECONDARY; } info.partitions.push_back(std::move(pstatus_map)); diff --git a/src/meta/load_balance_policy.cpp b/src/meta/load_balance_policy.cpp index 9295796746..ff1529dd03 100644 --- a/src/meta/load_balance_policy.cpp +++ b/src/meta/load_balance_policy.cpp @@ -35,6 +35,7 @@ #include "meta_admin_types.h" #include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep #include "runtime/rpc/rpc_address.h" +#include "runtime/rpc/rpc_host_port.h" #include "utils/command_manager.h" #include "utils/fail_point.h" #include "utils/flags.h" @@ -173,14 +174,16 @@ generate_balancer_request(const app_mapper &apps, new_proposal_action(to, to, config_type::CT_UPGRADE_TO_PRIMARY)); result.action_list.emplace_back(new_proposal_action(to, from, config_type::CT_REMOVE)); break; - case balance_type::COPY_SECONDARY: + case balance_type::COPY_SECONDARY: { ans = "copy_secondary"; result.balance_type = balancer_request_type::copy_secondary; + host_port primary; + GET_HOST_PORT(pc, primary, primary); result.action_list.emplace_back( - new_proposal_action(pc.hp_primary, to, config_type::CT_ADD_SECONDARY_FOR_LB)); - result.action_list.emplace_back( - new_proposal_action(pc.hp_primary, from, config_type::CT_REMOVE)); + new_proposal_action(primary, to, config_type::CT_ADD_SECONDARY_FOR_LB)); + result.action_list.emplace_back(new_proposal_action(primary, from, config_type::CT_REMOVE)); break; + } default: CHECK(false, ""); } @@ -567,7 +570,9 @@ void ford_fulkerson::update_decree(int node_id, const node_state &ns) { ns.for_each_primary(_app->app_id, [&, this](const gpid &pid) { const auto &pc = _app->pcs[pid.get_partition_index()]; - for (const auto &secondary : pc.hp_secondaries) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { auto i = _host_port_id.find(secondary); CHECK(i != _host_port_id.end(), "invalid secondary: {}", secondary); _network[node_id][i->second]++; diff --git a/src/meta/meta_bulk_load_ingestion_context.cpp b/src/meta/meta_bulk_load_ingestion_context.cpp index 6921d4ea86..48dd59026c 100644 --- a/src/meta/meta_bulk_load_ingestion_context.cpp +++ b/src/meta/meta_bulk_load_ingestion_context.cpp @@ -49,8 +49,12 @@ void ingestion_context::partition_node_info::create(const partition_configuratio { pid = pc.pid; std::unordered_set current_nodes; - current_nodes.insert(pc.hp_primary); - for (const auto &secondary : pc.hp_secondaries) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + current_nodes.insert(primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { current_nodes.insert(secondary); } for (const auto &node : current_nodes) { diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index aa2688ce55..acb66e2443 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -371,7 +371,9 @@ bool bulk_load_service::check_partition_status( } pc = app->pcs[pid.get_partition_index()]; - if (!pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (!primary) { LOG_WARNING("app({}) partition({}) primary is invalid, try it later", app_name, pid); tasking::enqueue( LPC_META_STATE_NORMAL, @@ -382,7 +384,9 @@ bool bulk_load_service::check_partition_status( return false; } - if (pc.hp_secondaries.size() < pc.max_replica_count - 1) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (secondaries.size() < pc.max_replica_count - 1) { bulk_load_status::type p_status; { zauto_read_lock l(_lock); @@ -434,7 +438,7 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()]; req->pid = pid; req->app_name = app_name; - SET_IP_AND_HOST_PORT(*req, primary, pc.primary, pc.hp_primary); + SET_OBJ_IP_AND_HOST_PORT(*req, primary, pc, primary); req->remote_provider_name = ainfo.file_provider_type; req->cluster_name = ainfo.cluster_name; req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid); @@ -1201,8 +1205,12 @@ bool bulk_load_service::check_ever_ingestion_succeed(const partition_configurati } std::vector current_nodes; - current_nodes.emplace_back(pc.hp_primary); - for (const auto &secondary : pc.hp_secondaries) { + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + current_nodes.emplace_back(primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { current_nodes.emplace_back(secondary); } @@ -1270,13 +1278,13 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g return; } - const auto &primary = pc.hp_primary; - ballot meta_ballot = pc.ballot; + host_port primary; + GET_HOST_PORT(pc, primary, primary); tasking::enqueue( LPC_BULK_LOAD_INGESTION, _meta_svc->tracker(), std::bind( - &bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot), + &bulk_load_service::send_ingestion_request, this, app_name, pid, primary, pc.ballot), 0, std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); } diff --git a/src/meta/meta_data.cpp b/src/meta/meta_data.cpp index 44da02f058..c7359aac15 100644 --- a/src/meta/meta_data.cpp +++ b/src/meta/meta_data.cpp @@ -132,9 +132,11 @@ bool construct_replica(meta_view view, const gpid &pid, int max_replica_count) // we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the // only primary dead // when add node to pc.last_drops, we don't remove it from our cc.drop_list - CHECK(pc.hp_last_drops.empty(), "last_drops of partition({}) must be empty", pid); + std::vector last_drops; + GET_HOST_PORTS(pc, last_drops, last_drops); + CHECK(last_drops.empty(), "last_drops of partition({}) must be empty", pid); for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) { - if (pc.hp_last_drops.size() + 1 >= max_replica_count) { + if (last_drops.size() + 1 >= max_replica_count) { break; } // similar to cc.drop_list, pc.last_drop is also a stack structure @@ -540,7 +542,6 @@ app_state::app_state(const app_info &info) : app_info(info), helpers(new app_sta for (int i = 0; i != app_info::partition_count; ++i) { pcs[i].pid.set_partition_index(i); } - helpers->on_init_partitions(); } diff --git a/src/meta/meta_data.h b/src/meta/meta_data.h index dd2533ecfc..c7c3f893df 100644 --- a/src/meta/meta_data.h +++ b/src/meta/meta_data.h @@ -266,10 +266,26 @@ struct partition_configuration_stateless { partition_configuration &pc; partition_configuration_stateless(partition_configuration &_pc) : pc(_pc) {} - std::vector &workers() { return pc.hp_last_drops; } - std::vector &hosts() { return pc.hp_secondaries; } - bool is_host(const host_port &node) const { return utils::contains(pc.hp_secondaries, node); } - bool is_worker(const host_port &node) const { return utils::contains(pc.hp_last_drops, node); } + std::vector &workers() + { + DCHECK(pc.__isset.hp_last_drops, ""); + return pc.hp_last_drops; + } + std::vector &hosts() + { + DCHECK(pc.__isset.hp_secondaries, ""); + return pc.hp_secondaries; + } + bool is_host(const host_port &node) const + { + DCHECK(pc.__isset.hp_secondaries, ""); + return utils::contains(pc.hp_secondaries, node); + } + bool is_worker(const host_port &node) const + { + DCHECK(pc.__isset.hp_last_drops, ""); + return utils::contains(pc.hp_last_drops, node); + } bool is_member(const host_port &node) const { return is_host(node) || is_worker(node); } }; @@ -483,10 +499,16 @@ inline config_context *get_config_context(app_mapper &apps, const dsn::gpid &gpi return &(iter->second->helpers->contexts[gpid.get_partition_index()]); } -inline int replica_count(const partition_configuration &pc) +inline size_t replica_count(const partition_configuration &pc) { - int ans = pc.hp_primary ? 1 : 0; - return ans + pc.hp_secondaries.size(); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + size_t rc = primary ? 1 : 0; + + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + rc += secondaries.size(); + return rc; } enum health_status @@ -503,18 +525,20 @@ enum health_status inline health_status partition_health_status(const partition_configuration &pc, int mutation_2pc_min_replica_count) { - if (!pc.hp_primary) { - if (pc.hp_secondaries.empty()) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (!primary) { + if (secondaries.empty()) { return HS_DEAD; } return HS_UNREADABLE; } - - const auto replica_count = pc.hp_secondaries.size() + 1; + const auto replica_count = secondaries.size() + 1; if (replica_count < mutation_2pc_min_replica_count) { return HS_UNWRITABLE; } - if (replica_count < pc.max_replica_count) { return HS_WRITABLE_ILL; } diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp index b1e7af2d94..c0d8f960c8 100644 --- a/src/meta/meta_http_service.cpp +++ b/src/meta/meta_http_service.cpp @@ -144,14 +144,18 @@ void meta_http_service::get_app_handler(const http_request &req, http_response & int read_unhealthy = 0; for (const auto &pc : response.partitions) { int replica_count = 0; - if (pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (primary) { replica_count++; - node_stat[pc.hp_primary].first++; + node_stat[primary].first++; total_prim_count++; } - replica_count += pc.hp_secondaries.size(); - total_sec_count += pc.hp_secondaries.size(); - if (pc.hp_primary) { + replica_count += secondaries.size(); + total_sec_count += secondaries.size(); + if (primary) { if (replica_count >= pc.max_replica_count) { fully_healthy++; } else if (replica_count < 2) { @@ -164,9 +168,9 @@ void meta_http_service::get_app_handler(const http_request &req, http_response & tp_details.add_row(pc.pid.get_partition_index()); tp_details.append_data(pc.ballot); tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count)); - tp_details.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-"); - tp_details.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ","))); - for (const auto &secondary : pc.hp_secondaries) { + tp_details.append_data(primary ? primary.to_string() : "-"); + tp_details.append_data(fmt::format("[{}]", fmt::join(secondaries, ","))); + for (const auto &secondary : secondaries) { node_stat[secondary].second++; } } @@ -318,11 +322,15 @@ void meta_http_service::list_app_handler(const http_request &req, http_response int read_unhealthy = 0; for (const auto &pc : response.partitions) { int replica_count = 0; - if (pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (primary) { replica_count++; } - replica_count += pc.hp_secondaries.size(); - if (pc.hp_primary) { + replica_count += secondaries.size(); + if (primary) { if (replica_count >= pc.max_replica_count) { fully_healthy++; } else if (replica_count < 2) { @@ -408,13 +416,17 @@ void meta_http_service::list_node_handler(const http_request &req, http_response CHECK_EQ(app.partition_count, response_app.partition_count); for (const auto &pc : response_app.partitions) { - if (pc.hp_primary) { - auto find = tmp_map.find(pc.hp_primary); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (primary) { + auto find = tmp_map.find(primary); if (find != tmp_map.end()) { find->second.primary_count++; } } - for (const auto &secondary : pc.hp_secondaries) { + for (const auto &secondary : secondaries) { auto find = tmp_map.find(secondary); if (find != tmp_map.end()) { find->second.secondary_count++; diff --git a/src/meta/meta_split_service.cpp b/src/meta/meta_split_service.cpp index df0c642a2d..2b61245653 100644 --- a/src/meta/meta_split_service.cpp +++ b/src/meta/meta_split_service.cpp @@ -310,7 +310,9 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec, // TODO(yingchun): should use conference? auto child_pc = app->pcs[child_gpid.get_partition_index()]; child_pc.secondaries = request.child_config.secondaries; - child_pc.__set_hp_secondaries(request.child_config.hp_secondaries); + std::vector secondaries; + GET_HOST_PORTS(request.child_config, secondaries, secondaries); + child_pc.__set_hp_secondaries(secondaries); _state->update_configuration_locally(*app, update_child_request); if (parent_context.msg) { diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp index 2cbf49122e..5ba4593ceb 100644 --- a/src/meta/partition_guardian.cpp +++ b/src/meta/partition_guardian.cpp @@ -86,11 +86,15 @@ pc_status partition_guardian::cure(meta_view view, CHECK(acts.empty(), ""); pc_status status; - if (!pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + if (!primary) { status = on_missing_primary(view, gpid); - } else if (static_cast(pc.hp_secondaries.size()) + 1 < pc.max_replica_count) { + } else if (static_cast(secondaries.size()) + 1 < pc.max_replica_count) { status = on_missing_secondary(view, gpid); - } else if (static_cast(pc.hp_secondaries.size()) >= pc.max_replica_count) { + } else if (static_cast(secondaries.size()) >= pc.max_replica_count) { status = on_redundant_secondary(view, gpid); } else { status = pc_status::healthy; @@ -125,8 +129,8 @@ void partition_guardian::reconfig(meta_view view, const configuration_update_req // handle the dropped out servers if (request.type == config_type::CT_DROP_PARTITION) { cc->serving.clear(); - - const auto &last_drops = request.config.hp_last_drops; + std::vector last_drops; + GET_HOST_PORTS(request.config, last_drops, last_drops); for (const auto &last_drop : last_drops) { cc->record_drop_history(last_drop); } @@ -163,6 +167,8 @@ bool partition_guardian::from_proposals(meta_view &view, host_port target; host_port node; GET_HOST_PORT(action, target, target); + host_port primary; + GET_HOST_PORT(pc, primary, primary); std::string reason; if (!target) { reason = "action target is invalid"; @@ -189,10 +195,10 @@ bool partition_guardian::from_proposals(meta_view &view, switch (action.type) { case config_type::CT_ASSIGN_PRIMARY: - is_action_valid = (node == target && !pc.primary && !is_secondary(pc, node)); + is_action_valid = (node == target && !primary && !is_secondary(pc, node)); break; case config_type::CT_UPGRADE_TO_PRIMARY: - is_action_valid = (node == target && !pc.primary && is_secondary(pc, node)); + is_action_valid = (node == target && !primary && is_secondary(pc, node)); break; case config_type::CT_ADD_SECONDARY: case config_type::CT_ADD_SECONDARY_FOR_LB: @@ -247,9 +253,13 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi action.type = config_type::CT_INVALID; // try to upgrade a secondary to primary if the primary is missing - if (!pc.hp_secondaries.empty()) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + std::vector last_drops; + GET_HOST_PORTS(pc, last_drops, last_drops); + if (!secondaries.empty()) { RESET_IP_AND_HOST_PORT(action, node); - for (const auto &secondary : pc.hp_secondaries) { + for (const auto &secondary : secondaries) { const auto ns = get_node_state(*(view.nodes), secondary, false); CHECK_NOTNULL(ns, "invalid secondary: {}", secondary); if (dsn_unlikely(!ns->alive())) { @@ -284,7 +294,7 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi } // if nothing in the last_drops, it means that this is a newly created partition, so let's // just find a node and assign primary for it. - else if (pc.hp_last_drops.empty()) { + else if (last_drops.empty()) { dsn::host_port min_primary_server; newly_partitions *min_primary_server_np = nullptr; @@ -338,10 +348,10 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi dr.last_prepared_decree); } - for (int i = 0; i < pc.hp_last_drops.size(); ++i) { + for (int i = 0; i < last_drops.size(); ++i) { int dropped_index = -1; for (int k = 0; k < cc.dropped.size(); k++) { - if (cc.dropped[k].node == pc.hp_last_drops[i]) { + if (cc.dropped[k].node == last_drops[i]) { dropped_index = k; break; } @@ -353,13 +363,13 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi dropped_index); } - if (pc.hp_last_drops.size() == 1) { + if (last_drops.size() == 1) { LOG_WARNING("{}: the only node({}) is dead, waiting it to come back", gpid_name, FMT_HOST_PORT_AND_IP(pc, last_drops.back())); SET_OBJ_IP_AND_HOST_PORT(action, node, pc, last_drops.back()); } else { - std::vector nodes(pc.hp_last_drops.end() - 2, pc.hp_last_drops.end()); + std::vector nodes(last_drops.end() - 2, last_drops.end()); std::vector collected_info(2); bool ready = true; @@ -668,9 +678,11 @@ pc_status partition_guardian::on_redundant_secondary(meta_view &view, const dsn: const node_mapper &nodes = *(view.nodes); const partition_configuration &pc = *get_config(*(view.apps), gpid); int target = 0; - int load = nodes.find(pc.hp_secondaries.front())->second.partition_count(); - for (int i = 0; i != pc.hp_secondaries.size(); ++i) { - int l = nodes.find(pc.hp_secondaries[i])->second.partition_count(); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + int load = nodes.find(secondaries.front())->second.partition_count(); + for (int i = 0; i != secondaries.size(); ++i) { + int l = nodes.find(secondaries[i])->second.partition_count(); if (l > load) { load = l; target = i; diff --git a/src/meta/server_load_balancer.cpp b/src/meta/server_load_balancer.cpp index 1a57858e38..a3016634a4 100644 --- a/src/meta/server_load_balancer.cpp +++ b/src/meta/server_load_balancer.cpp @@ -179,7 +179,9 @@ void server_load_balancer::register_proposals(meta_view view, continue; } - if (!pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (!primary) { resp.err = ERR_INVALID_PARAMETERS; return; } diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 2c5019cb9a..650eb6b8df 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -732,12 +732,16 @@ void server_state::initialize_node_state() for (auto &app_pair : _all_apps) { app_state &app = *(app_pair.second); for (const auto &pc : app.pcs) { - if (pc.hp_primary) { - node_state *ns = get_node_state(_nodes, pc.hp_primary, true); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { + node_state *ns = get_node_state(_nodes, primary, true); ns->put_partition(pc.pid, true); } - for (const auto &secondary : pc.hp_secondaries) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { CHECK(secondary, "invalid secondary: {}", secondary); node_state *ns = get_node_state(_nodes, secondary, true); ns->put_partition(pc.pid, false); @@ -1463,63 +1467,40 @@ void server_state::request_check(const partition_configuration &old_pc, const configuration_update_request &request) { const auto &new_pc = request.config; + host_port old_primary; + GET_HOST_PORT(old_pc, primary, old_primary); + host_port req_node; + GET_HOST_PORT(request, node, req_node); + std::vector old_secondaries; + GET_HOST_PORTS(old_pc, secondaries, old_secondaries); switch (request.type) { case config_type::CT_ASSIGN_PRIMARY: - if (request.__isset.hp_node) { - CHECK_NE(old_pc.hp_primary, request.hp_node); - CHECK(!utils::contains(old_pc.hp_secondaries, request.hp_node), ""); - } else { - CHECK_NE(old_pc.primary, request.node); - CHECK(!utils::contains(old_pc.secondaries, request.node), ""); - } + CHECK_NE(old_primary, req_node); + CHECK(!utils::contains(old_secondaries, req_node), ""); break; case config_type::CT_UPGRADE_TO_PRIMARY: - if (request.__isset.hp_node) { - CHECK_NE(old_pc.hp_primary, request.hp_node); - CHECK(utils::contains(old_pc.hp_secondaries, request.hp_node), ""); - } else { - CHECK_NE(old_pc.primary, request.node); - CHECK(utils::contains(old_pc.secondaries, request.node), ""); - } + CHECK_NE(old_primary, req_node); + CHECK(utils::contains(old_secondaries, req_node), ""); break; case config_type::CT_DOWNGRADE_TO_SECONDARY: - if (request.__isset.hp_node) { - CHECK_EQ(old_pc.hp_primary, request.hp_node); - CHECK(!utils::contains(old_pc.hp_secondaries, request.hp_node), ""); - } else { - CHECK_EQ(old_pc.primary, request.node); - CHECK(!utils::contains(old_pc.secondaries, request.node), ""); - } + CHECK_EQ(old_primary, req_node); + CHECK(!utils::contains(old_secondaries, req_node), ""); break; case config_type::CT_DOWNGRADE_TO_INACTIVE: case config_type::CT_REMOVE: - if (request.__isset.hp_node) { - CHECK(old_pc.hp_primary == request.hp_node || - utils::contains(old_pc.hp_secondaries, request.hp_node), - ""); - } else { - CHECK(old_pc.primary == request.node || - utils::contains(old_pc.secondaries, request.node), - ""); - } + CHECK(old_primary == req_node || utils::contains(old_secondaries, req_node), ""); break; case config_type::CT_UPGRADE_TO_SECONDARY: - if (request.__isset.hp_node) { - CHECK_NE(old_pc.hp_primary, request.hp_node); - CHECK(!utils::contains(old_pc.hp_secondaries, request.hp_node), ""); - } else { - CHECK_NE(old_pc.primary, request.node); - CHECK(!utils::contains(old_pc.secondaries, request.node), ""); - } + CHECK_NE(old_primary, req_node); + CHECK(!utils::contains(old_secondaries, req_node), ""); break; case config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT: { - if (request.__isset.hp_node) { - CHECK_EQ(old_pc.hp_primary, new_pc.hp_primary); - CHECK(old_pc.hp_secondaries == new_pc.hp_secondaries, ""); - } else { - CHECK_EQ(old_pc.primary, new_pc.primary); - CHECK(old_pc.secondaries == new_pc.secondaries, ""); - } + host_port new_primary; + GET_HOST_PORT(new_pc, primary, new_primary); + CHECK_EQ(old_primary, new_primary); + std::vector new_secondaries; + GET_HOST_PORTS(new_pc, secondaries, new_secondaries); + CHECK(old_secondaries == new_secondaries, ""); break; } default: @@ -1579,7 +1560,9 @@ void server_state::update_configuration_locally( break; case config_type::CT_DROP_PARTITION: { - for (const auto &last_drop : new_pc.hp_last_drops) { + std::vector last_drops; + GET_HOST_PORTS(new_pc, last_drops, last_drops); + for (const auto &last_drop : last_drops) { ns = get_node_state(_nodes, last_drop, false); if (ns != nullptr) { ns->remove_partition(gpid, false); @@ -1907,7 +1890,9 @@ void server_state::downgrade_secondary_to_inactive(std::shared_ptr &a partition_configuration &pc = app->pcs[pidx]; config_context &cc = app->helpers->contexts[pidx]; - CHECK(pc.hp_primary, "this shouldn't be called if the primary is invalid"); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + CHECK(primary, "this shouldn't be called if the primary is invalid"); if (config_status::pending_remote_sync != cc.stage) { configuration_update_request request; request.info = *app; @@ -1940,8 +1925,10 @@ void server_state::downgrade_stateless_nodes(std::shared_ptr &app, partition_configuration &pc = req->config; unsigned i = 0; - for (; i < pc.hp_secondaries.size(); ++i) { - if (pc.hp_secondaries[i] == node) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + for (; i < secondaries.size(); ++i) { + if (secondaries[i] == node) { SET_OBJ_IP_AND_HOST_PORT(*req, node, pc, last_drops[i]); break; } @@ -2065,7 +2052,9 @@ void server_state::on_partition_node_dead(std::shared_ptr &app, return; } - if (pc.hp_primary) { + host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { downgrade_secondary_to_inactive(app, pidx, node); return; } @@ -2267,12 +2256,14 @@ error_code server_state::construct_partitions( app->app_id, pc.pid.get_partition_index(), boost::lexical_cast(pc)); - if (pc.hp_last_drops.size() + 1 < pc.max_replica_count) { + std::vector last_drops; + GET_HOST_PORTS(pc, last_drops, last_drops); + if (last_drops.size() + 1 < pc.max_replica_count) { hint_message += fmt::format("WARNING: partition({}.{}) only collects {}/{} " "of replicas, may lost data", app->app_id, pc.pid.get_partition_index(), - pc.hp_last_drops.size() + 1, + last_drops.size() + 1, pc.max_replica_count); } succeed_count++; @@ -2600,7 +2591,9 @@ bool server_state::check_all_partitions() for (int i = 0; i < add_secondary_actions.size(); ++i) { gpid &pid = add_secondary_gpids[i]; const auto *pc = get_config(_all_apps, pid); - if (!add_secondary_proposed[i] && pc->hp_secondaries.empty()) { + std::vector secondaries; + GET_HOST_PORTS(*pc, secondaries, secondaries); + if (!add_secondary_proposed[i] && secondaries.empty()) { const auto &action = add_secondary_actions[i]; CHECK(action.hp_node, ""); if (_add_secondary_enable_flow_control && add_secondary_running_nodes[action.hp_node] >= @@ -2721,22 +2714,27 @@ void server_state::check_consistency(const dsn::gpid &gpid) auto &app = *(iter->second); auto &pc = app.pcs[gpid.get_partition_index()]; - if (app.is_stateful) { - if (pc.hp_primary) { - const auto it = _nodes.find(pc.hp_primary); - CHECK(it != _nodes.end(), "invalid primary: {}", pc.hp_primary); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + std::vector last_drops; + GET_HOST_PORTS(pc, last_drops, last_drops); + if (primary) { + const auto it = _nodes.find(primary); + CHECK(it != _nodes.end(), "invalid primary: {}", primary); CHECK_EQ(it->second.served_as(gpid), partition_status::PS_PRIMARY); - CHECK(!utils::contains(pc.hp_last_drops, pc.hp_primary), + CHECK(!utils::contains(last_drops, primary), "primary({}) shouldn't appear in last_drops", - pc.hp_primary); + primary); } - for (const auto &secondary : pc.hp_secondaries) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { const auto it = _nodes.find(secondary); CHECK(it != _nodes.end(), "invalid secondary: {}", secondary); CHECK_EQ(it->second.served_as(gpid), partition_status::PS_SECONDARY); - CHECK(!utils::contains(pc.hp_last_drops, secondary), + CHECK(!utils::contains(last_drops, secondary), "secondary({}) shouldn't appear in last_drops", secondary); } diff --git a/src/meta/test/balancer_simulator/balancer_simulator.cpp b/src/meta/test/balancer_simulator/balancer_simulator.cpp index 4648577a03..d2d33cff33 100644 --- a/src/meta/test/balancer_simulator/balancer_simulator.cpp +++ b/src/meta/test/balancer_simulator/balancer_simulator.cpp @@ -113,10 +113,12 @@ void generate_balanced_apps(/*out*/ app_mapper &apps, for (auto &pc : app->pcs) { temp.clear(); - while (pc.hp_secondaries.size() + 1 < pc.max_replica_count) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + while (secondaries.size() + 1 < pc.max_replica_count) { const auto &n = pq2.pop(); if (!is_member(pc, n)) { - pc.hp_secondaries.push_back(n); + secondaries.push_back(n); nodes[n].put_partition(pc.pid, false); } temp.push_back(n); @@ -155,6 +157,7 @@ void random_move_primary(app_mapper &apps, node_mapper &nodes, int primary_move_ for (auto &pc : app.pcs) { int n = random32(1, space_size) / 100; if (n < primary_move_ratio) { + CHECK(pc.hp_primary, ""); int indice = random32(0, 1); nodes[pc.hp_primary].remove_partition(pc.pid, true); std::swap(pc.primary, pc.secondaries[indice]); diff --git a/src/meta/test/json_compacity.cpp b/src/meta/test/json_compacity.cpp index de01ec52a5..e5528fb8a1 100644 --- a/src/meta/test/json_compacity.cpp +++ b/src/meta/test/json_compacity.cpp @@ -86,15 +86,20 @@ void meta_service_test_app::json_compacity() // 4. old pc version const char *json3 = "{\"pid\":\"1.1\",\"ballot\":234,\"max_replica_count\":3," - "\"primary\":\"invalid address\",\"secondaries\":[\"127.0.0.1:6\"]," - "\"hp_primary\":\"invalid host_port\",\"hp_secondaries\":[\"localhost:6\"]," + "\"primary\":\"127.0.0.1:1\",\"secondaries\":[\"127.0.0.1:6\"]," + "\"hp_primary\":\"localhost:1\",\"hp_secondaries\":[\"localhost:6\"]," "\"last_drops\":[],\"last_committed_decree\":157}"; dsn::partition_configuration pc; dsn::json::json_forwarder::decode( dsn::blob(json3, 0, strlen(json3)), pc); ASSERT_EQ(234, pc.ballot); - ASSERT_TRUE(!pc.hp_primary); - ASSERT_TRUE(!pc.primary); + // As how we do in src/meta/server_state.cpp, we have to set the '__isset' fields manually. + ASSERT_FALSE(pc.__isset.hp_primary); + ASSERT_TRUE(pc.hp_primary); + ASSERT_TRUE(pc.primary); + ASSERT_STREQ("127.0.0.1:1", pc.primary.to_string()); + ASSERT_EQ("localhost:1", pc.hp_primary.to_string()); + ASSERT_FALSE(pc.__isset.hp_secondaries); ASSERT_EQ(1, pc.hp_secondaries.size()); ASSERT_EQ(1, pc.secondaries.size()); ASSERT_STREQ("127.0.0.1:6", pc.secondaries[0].to_string()); diff --git a/src/replica/backup/replica_backup_manager.cpp b/src/replica/backup/replica_backup_manager.cpp index 42882a2597..a6feafb6ce 100644 --- a/src/replica/backup/replica_backup_manager.cpp +++ b/src/replica/backup/replica_backup_manager.cpp @@ -235,9 +235,13 @@ void replica_backup_manager::send_clear_request_to_secondaries(const gpid &pid, request.__set_pid(pid); request.__set_policy_name(policy_name); - for (const auto &secondary : _replica->_primary_states.pc.secondaries) { - rpc::call_one_way_typed( - secondary, RPC_CLEAR_COLD_BACKUP, request, get_gpid().thread_hash()); + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { + rpc::call_one_way_typed(dsn::dns_resolver::instance().resolve_address(secondary), + RPC_CLEAR_COLD_BACKUP, + request, + get_gpid().thread_hash()); } } diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index e8eb03987e..f6122a8502 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -188,7 +188,9 @@ void replica_bulk_loader::broadcast_group_bulk_load(const bulk_load_request &met LOG_INFO_PREFIX("start to broadcast group bulk load"); - for (const auto &secondary : _replica->_primary_states.pc.hp_secondaries) { + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { if (secondary == _stub->primary_host_port()) { continue; } @@ -741,7 +743,9 @@ void replica_bulk_loader::handle_bulk_load_finish(bulk_load_status::type new_sta } if (status() == partition_status::PS_PRIMARY) { - for (const auto &secondary : _replica->_primary_states.pc.hp_secondaries) { + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { _replica->_primary_states.reset_node_bulk_load_states(secondary); } } @@ -939,7 +943,9 @@ void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo primary_state.download_status); int32_t total_progress = primary_state.download_progress; - for (const auto &secondary : _replica->_primary_states.pc.hp_secondaries) { + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { const auto &secondary_state = _replica->_primary_states.secondary_bulk_load_states[secondary]; int32_t s_progress = @@ -979,11 +985,12 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary), enum_to_string(primary_state.ingest_status)); + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); bool is_group_ingestion_finish = (primary_state.ingest_status == ingestion_status::IS_SUCCEED) && - (_replica->_primary_states.pc.hp_secondaries.size() + 1 == - _replica->_primary_states.pc.max_replica_count); - for (const auto &secondary : _replica->_primary_states.pc.hp_secondaries) { + (secondaries.size() + 1 == _replica->_primary_states.pc.max_replica_count); + for (const auto &secondary : secondaries) { const auto &secondary_state = _replica->_primary_states.secondary_bulk_load_states[secondary]; ingestion_status::type ingest_status = secondary_state.__isset.ingest_status @@ -1026,10 +1033,11 @@ void replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response) FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary), primary_state.is_cleaned_up); - bool group_flag = - (primary_state.is_cleaned_up) && (_replica->_primary_states.pc.hp_secondaries.size() + 1 == - _replica->_primary_states.pc.max_replica_count); - for (const auto &secondary : _replica->_primary_states.pc.hp_secondaries) { + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); + bool group_flag = (primary_state.is_cleaned_up) && + (secondaries.size() + 1 == _replica->_primary_states.pc.max_replica_count); + for (const auto &secondary : secondaries) { const auto &secondary_state = _replica->_primary_states.secondary_bulk_load_states[secondary]; bool is_cleaned_up = secondary_state.__isset.is_cleaned_up ? secondary_state.is_cleaned_up @@ -1065,10 +1073,12 @@ void replica_bulk_loader::report_group_is_paused(bulk_load_response &response) FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary), primary_state.is_paused); + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); bool group_is_paused = - primary_state.is_paused && (_replica->_primary_states.pc.hp_secondaries.size() + 1 == - _replica->_primary_states.pc.max_replica_count); - for (const auto &secondary : _replica->_primary_states.pc.hp_secondaries) { + primary_state.is_paused && + (secondaries.size() + 1 == _replica->_primary_states.pc.max_replica_count); + for (const auto &secondary : secondaries) { partition_bulk_load_state secondary_state = _replica->_primary_states.secondary_bulk_load_states[secondary]; bool is_paused = secondary_state.__isset.is_paused ? secondary_state.is_paused : false; diff --git a/src/replica/duplication/replica_follower.cpp b/src/replica/duplication/replica_follower.cpp index 4918f60e81..815286c757 100644 --- a/src/replica/duplication/replica_follower.cpp +++ b/src/replica/duplication/replica_follower.cpp @@ -178,7 +178,9 @@ error_code replica_follower::update_master_replica_config(error_code err, query_ return ERR_INCONSISTENT_STATE; } - if (dsn_unlikely(!resp.partitions[0].hp_primary)) { + dsn::host_port primary; + GET_HOST_PORT(resp.partitions[0], primary, primary); + if (dsn_unlikely(!primary)) { LOG_ERROR_PREFIX("master[{}] partition address is invalid", master_replica_name()); return ERR_INVALID_STATE; } @@ -203,9 +205,14 @@ void replica_follower::copy_master_replica_checkpoint() dsn::message_ex *msg = dsn::message_ex::create_request(RPC_QUERY_LAST_CHECKPOINT_INFO, 0, _pc.pid.thread_hash()); dsn::marshall(msg, request); - rpc::call(_pc.primary, msg, &_tracker, [&](error_code err, learn_response &&resp) mutable { - nfs_copy_checkpoint(err, std::move(resp)); - }); + dsn::host_port primary; + GET_HOST_PORT(_pc, primary, primary); + rpc::call(dsn::dns_resolver::instance().resolve_address(primary), + msg, + &_tracker, + [&](error_code err, learn_response &&resp) mutable { + nfs_copy_checkpoint(err, std::move(resp)); + }); } // ThreadPool: THREAD_POOL_DEFAULT diff --git a/src/replica/duplication/replica_follower.h b/src/replica/duplication/replica_follower.h index acf978c7ad..7df337e956 100644 --- a/src/replica/duplication/replica_follower.h +++ b/src/replica/duplication/replica_follower.h @@ -26,6 +26,7 @@ #include "common/gpid.h" #include "dsn.layer2_types.h" #include "replica/replica_base.h" +#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep #include "runtime/rpc/rpc_host_port.h" #include "runtime/task/task_tracker.h" #include "utils/error_code.h" @@ -78,7 +79,9 @@ class replica_follower : replica_base std::string master_replica_name() { std::string app_info = fmt::format("{}.{}", _master_cluster_name, _master_app_name); - if (_pc.hp_primary) { + dsn::host_port primary; + GET_HOST_PORT(_pc, primary, primary); + if (primary) { return fmt::format("{}({}|{})", app_info, FMT_HOST_PORT_AND_IP(_pc, primary), _pc.pid); } return app_info; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 8f48438d1a..77223065ef 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -188,6 +188,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } + std::vector secondaries; + GET_HOST_PORTS(_primary_states.pc, secondaries, secondaries); if (request->rpc_code() == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) { auto cur_bulk_load_status = _bulk_loader->get_bulk_load_status(); if (cur_bulk_load_status != bulk_load_status::BLS_DOWNLOADED && @@ -200,8 +202,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) LOG_INFO_PREFIX("receive bulk load ingestion request"); // bulk load ingestion request requires that all secondaries should be alive - if (static_cast(_primary_states.pc.hp_secondaries.size()) + 1 < - _primary_states.pc.max_replica_count) { + if (static_cast(secondaries.size()) + 1 < _primary_states.pc.max_replica_count) { response_client_write(request, ERR_NOT_ENOUGH_MEMBER); return; } @@ -209,7 +210,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) _bulk_load_ingestion_start_time_ms = dsn_now_ms(); } - if (static_cast(_primary_states.pc.hp_secondaries.size()) + 1 < + if (static_cast(secondaries.size()) + 1 < _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) { response_client_write(request, ERR_NOT_ENOUGH_MEMBER); return; @@ -257,6 +258,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c mu->set_is_sync_to_child(_primary_states.sync_send_write_request); // check bounded staleness + std::vector secondaries; + GET_HOST_PORTS(_primary_states.pc, secondaries, secondaries); if (mu->data.header.decree > last_committed_decree() + FLAGS_staleness_for_commit) { err = ERR_CAPACITY_EXCEEDED; goto ErrOut; @@ -269,8 +272,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c break; } LOG_INFO_PREFIX("try to prepare bulk load mutation({})", mu->name()); - if (static_cast(_primary_states.pc.hp_secondaries.size()) + 1 < - _primary_states.pc.max_replica_count) { + if (static_cast(secondaries.size()) + 1 < _primary_states.pc.max_replica_count) { err = ERR_NOT_ENOUGH_MEMBER; break; } @@ -282,7 +284,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c // stop prepare if there are too few replicas unless it's a reconciliation // for reconciliation, we should ensure every prepared mutation to be committed // please refer to PacificA paper - if (static_cast(_primary_states.pc.hp_secondaries.size()) + 1 < + if (static_cast(secondaries.size()) + 1 < _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count) && !reconciliation) { err = ERR_NOT_ENOUGH_MEMBER; @@ -299,9 +301,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c // remote prepare mu->set_prepare_ts(); - mu->set_left_secondary_ack_count( - static_cast(_primary_states.pc.hp_secondaries.size())); - for (const auto &secondary : _primary_states.pc.hp_secondaries) { + mu->set_left_secondary_ack_count(static_cast(secondaries.size())); + for (const auto &secondary : secondaries) { send_prepare_message(secondary, partition_status::PS_SECONDARY, mu, diff --git a/src/replica/replica_backup.cpp b/src/replica/replica_backup.cpp index 79ec2cd0a0..671e14fa82 100644 --- a/src/replica/replica_backup.cpp +++ b/src/replica/replica_backup.cpp @@ -47,6 +47,8 @@ #include "replica/replication_app_base.h" #include "replica_stub.h" #include "runtime/api_layer1.h" +#include "runtime/rpc/dns_resolver.h" +#include "runtime/rpc/rpc_host_port.h" #include "runtime/task/async_calls.h" #include "utils/autoref_ptr.h" #include "utils/env.h" @@ -256,10 +258,15 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo void replica::send_backup_request_to_secondary(const backup_request &request) { - for (const auto &secondary : _primary_states.pc.secondaries) { + std::vector secondaries; + GET_HOST_PORTS(_primary_states.pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { // primary will send backup_request to secondary periodically // so, we shouldn't handle the response - rpc::call_one_way_typed(secondary, RPC_COLD_BACKUP, request, get_gpid().thread_hash()); + rpc::call_one_way_typed(dsn::dns_resolver::instance().resolve_address(secondary), + RPC_COLD_BACKUP, + request, + get_gpid().thread_hash()); } } diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp index 23767986b6..ead9229141 100644 --- a/src/replica/replica_config.cpp +++ b/src/replica/replica_config.cpp @@ -174,8 +174,14 @@ void replica::assign_primary(configuration_update_request &proposal) SET_IP_AND_HOST_PORT( proposal.config, primary, _stub->primary_address(), _stub->primary_host_port()); replica_helper::remove_node(_stub->primary_address(), proposal.config.secondaries); - replica_helper::remove_node(_stub->primary_host_port(), proposal.config.hp_secondaries); - + if (proposal.config.__isset.hp_secondaries) { + replica_helper::remove_node(_stub->primary_host_port(), proposal.config.hp_secondaries); + } else { + // In a rolling upgrade scenario, the proposal request may not have the hp_secondaries + // field. + LOG_WARNING_PREFIX("proposal.config.hp_secondaries field is not set, it's happened in a " + "rolling upgrade scenario"); + } update_configuration_on_meta_server(proposal.type, node, proposal.config); } @@ -190,15 +196,24 @@ void replica::add_potential_secondary(const configuration_update_request &propos CHECK_EQ(proposal.config.ballot, get_ballot()); CHECK_EQ(proposal.config.pid, _primary_states.pc.pid); - CHECK_EQ(proposal.config.hp_primary, _primary_states.pc.hp_primary); - CHECK(proposal.config.hp_secondaries == _primary_states.pc.hp_secondaries, ""); - + CHECK_EQ(proposal.config.primary, _primary_states.pc.primary); + CHECK(proposal.config.secondaries == _primary_states.pc.secondaries, ""); + dsn::host_port primary; + GET_HOST_PORT(proposal.config, primary, primary); + // The local host_port type 'hp_primary' field must be set. + CHECK(_primary_states.pc.__isset.hp_primary, ""); + CHECK_EQ(primary, _primary_states.pc.hp_primary); + std::vector secondaries; + GET_HOST_PORTS(proposal.config, secondaries, secondaries); + // The local host_port type 'hp_secondaries' field must be set. + CHECK(_primary_states.pc.__isset.hp_secondaries, ""); + CHECK(secondaries == _primary_states.pc.hp_secondaries, ""); host_port node; GET_HOST_PORT(proposal, node, node); CHECK(!_primary_states.check_exist(node, partition_status::PS_PRIMARY), "node = {}", node); CHECK(!_primary_states.check_exist(node, partition_status::PS_SECONDARY), "node = {}", node); - int potential_secondaries_count = + const int potential_secondaries_count = _primary_states.pc.hp_secondaries.size() + _primary_states.learners.size(); if (potential_secondaries_count >= _primary_states.pc.max_replica_count - 1) { if (proposal.type == config_type::CT_ADD_SECONDARY) { @@ -270,9 +285,19 @@ void replica::downgrade_to_secondary_on_primary(configuration_update_request &pr } CHECK_EQ(proposal.config.pid, _primary_states.pc.pid); - CHECK_EQ(proposal.config.hp_primary, _primary_states.pc.hp_primary); - CHECK(proposal.config.hp_secondaries == _primary_states.pc.hp_secondaries, ""); - CHECK_EQ(proposal.hp_node, proposal.config.hp_primary); + dsn::host_port primary; + GET_HOST_PORT(proposal.config, primary, primary); + // The local host_port type 'hp_primary' field must be set. + CHECK(_primary_states.pc.__isset.hp_primary, ""); + CHECK_EQ(primary, _primary_states.pc.hp_primary); + std::vector secondaries; + GET_HOST_PORTS(proposal.config, secondaries, secondaries); + // The local host_port type 'hp_secondaries' field must be set. + CHECK(_primary_states.pc.__isset.hp_secondaries, ""); + CHECK(secondaries == _primary_states.pc.hp_secondaries, ""); + dsn::host_port node; + GET_HOST_PORT(proposal, node, node); + CHECK_EQ(node, primary); CHECK_EQ(proposal.node, proposal.config.primary); RESET_IP_AND_HOST_PORT(proposal.config, primary); @@ -287,12 +312,20 @@ void replica::downgrade_to_inactive_on_primary(configuration_update_request &pro return; CHECK_EQ(proposal.config.pid, _primary_states.pc.pid); - CHECK_EQ(proposal.config.hp_primary, _primary_states.pc.hp_primary); - CHECK(proposal.config.hp_secondaries == _primary_states.pc.hp_secondaries, ""); + dsn::host_port primary; + GET_HOST_PORT(proposal.config, primary, primary); + // The local host_port type 'hp_primary' field must be set. + CHECK(_primary_states.pc.__isset.hp_primary, ""); + CHECK_EQ(primary, _primary_states.pc.hp_primary); + std::vector secondaries; + GET_HOST_PORTS(proposal.config, secondaries, secondaries); + // The local host_port type 'hp_secondaries' field must be set. + CHECK(_primary_states.pc.__isset.hp_secondaries, ""); + CHECK(secondaries == _primary_states.pc.hp_secondaries, ""); host_port node; GET_HOST_PORT(proposal, node, node); - if (node == proposal.config.hp_primary) { + if (node == primary) { CHECK_EQ(proposal.node, proposal.config.primary); RESET_IP_AND_HOST_PORT(proposal.config, primary); } else { @@ -300,9 +333,17 @@ void replica::downgrade_to_inactive_on_primary(configuration_update_request &pro CHECK(replica_helper::remove_node(proposal.node, proposal.config.secondaries), "remove node failed, node = {}", proposal.node); - CHECK(replica_helper::remove_node(node, proposal.config.hp_secondaries), - "remove node failed, node = {}", - node); + if (proposal.config.__isset.hp_secondaries) { + CHECK(replica_helper::remove_node(node, proposal.config.hp_secondaries), + "remove node failed, node = {}", + node); + } else { + // In a rolling upgrade scenario, the proposal request may not have the hp_secondaries + // field. + LOG_WARNING_PREFIX( + "proposal.config.hp_secondaries field is not set, it's happened in a " + "rolling upgrade scenario"); + } } update_configuration_on_meta_server( @@ -315,8 +356,18 @@ void replica::remove(configuration_update_request &proposal) return; CHECK_EQ(proposal.config.pid, _primary_states.pc.pid); - CHECK_EQ(proposal.config.hp_primary, _primary_states.pc.hp_primary); - CHECK(proposal.config.hp_secondaries == _primary_states.pc.hp_secondaries, ""); + CHECK_EQ(proposal.config.primary, _primary_states.pc.primary); + dsn::host_port primary; + GET_HOST_PORT(proposal.config, primary, primary); + // The local host_port type 'hp_primary' field must be set. + CHECK(_primary_states.pc.__isset.hp_primary, ""); + CHECK_EQ(primary, _primary_states.pc.hp_primary); + CHECK(proposal.config.secondaries == _primary_states.pc.secondaries, ""); + std::vector secondaries; + GET_HOST_PORTS(proposal.config, secondaries, secondaries); + // The local host_port type 'hp_secondaries' field must be set. + CHECK(_primary_states.pc.__isset.hp_secondaries, ""); + CHECK(secondaries == _primary_states.pc.hp_secondaries, ""); host_port node; GET_HOST_PORT(proposal, node, node); @@ -324,7 +375,7 @@ void replica::remove(configuration_update_request &proposal) switch (st) { case partition_status::PS_PRIMARY: - CHECK_EQ(proposal.config.hp_primary, node); + CHECK_EQ(primary, node); CHECK_EQ(proposal.config.primary, proposal.node); RESET_IP_AND_HOST_PORT(proposal.config, primary); break; @@ -332,9 +383,17 @@ void replica::remove(configuration_update_request &proposal) CHECK(replica_helper::remove_node(proposal.node, proposal.config.secondaries), "remove node failed, node = {}", proposal.node); - CHECK(replica_helper::remove_node(node, proposal.config.hp_secondaries), - "remove_node failed, node = {}", - node); + if (proposal.config.__isset.hp_secondaries) { + CHECK(replica_helper::remove_node(node, proposal.config.hp_secondaries), + "remove_node failed, node = {}", + node); + } else { + // In a rolling upgrade scenario, the proposal request may not have the hp_secondaries + // field. + LOG_WARNING_PREFIX( + "proposal.config.hp_secondaries field is not set, it's happened in a " + "rolling upgrade scenario"); + } } break; case partition_status::PS_POTENTIAL_SECONDARY: break; @@ -388,7 +447,9 @@ void replica::update_configuration_on_meta_server(config_type::type type, CHECK(status() == partition_status::PS_INACTIVE && _inactive_is_transient && _is_initializing, ""); - CHECK_EQ(new_pc.hp_primary, node); + dsn::host_port primary; + GET_HOST_PORT(new_pc, primary, primary); + CHECK_EQ(primary, node); } else if (type != config_type::CT_ASSIGN_PRIMARY && type != config_type::CT_UPGRADE_TO_PRIMARY) { CHECK_EQ(status(), partition_status::PS_PRIMARY); @@ -502,8 +563,18 @@ void replica::on_update_configuration_on_meta_server_reply( // post-update work items? if (resp.err == ERR_OK) { CHECK_EQ(req->config.pid, resp.config.pid); - CHECK_EQ(req->config.hp_primary, resp.config.hp_primary); - CHECK(req->config.hp_secondaries == resp.config.hp_secondaries, ""); + CHECK_EQ(req->config.primary, resp.config.primary); + dsn::host_port req_primary; + GET_HOST_PORT(req->config, primary, req_primary); + dsn::host_port resp_primary; + GET_HOST_PORT(resp.config, primary, resp_primary); + CHECK_EQ(req_primary, resp_primary); + CHECK(req->config.secondaries == resp.config.secondaries, ""); + std::vector req_secondaries; + GET_HOST_PORTS(req->config, secondaries, req_secondaries); + std::vector resp_secondaries; + GET_HOST_PORTS(resp.config, secondaries, resp_secondaries); + CHECK(req_secondaries == resp_secondaries, ""); switch (req->type) { case config_type::CT_UPGRADE_TO_PRIMARY: @@ -654,7 +725,9 @@ bool replica::update_configuration(const partition_configuration &pc) if (rconfig.status == partition_status::PS_PRIMARY && (rconfig.ballot > get_ballot() || status() != partition_status::PS_PRIMARY)) { - _primary_states.reset_membership(pc, pc.hp_primary != _stub->primary_host_port()); + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + _primary_states.reset_membership(pc, primary != _stub->primary_host_port()); } if (pc.ballot > get_ballot() || @@ -1084,6 +1157,8 @@ void replica::on_config_sync(const app_info &info, update_app_envs(info.envs); _is_duplication_master = info.duplicating; + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); if (status() == partition_status::PS_PRIMARY) { if (nullptr != _primary_states.reconfiguration_task) { // already under reconfiguration, skip configuration sync @@ -1093,10 +1168,10 @@ void replica::on_config_sync(const app_info &info, } else { if (_is_initializing) { // in initializing, when replica still primary, need to inc ballot - if (pc.hp_primary == _stub->primary_host_port() && + if (primary == _stub->primary_host_port() && status() == partition_status::PS_INACTIVE && _inactive_is_transient) { update_configuration_on_meta_server(config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT, - pc.hp_primary, + primary, const_cast(pc)); return; } @@ -1106,8 +1181,8 @@ void replica::on_config_sync(const app_info &info, update_configuration(pc); if (status() == partition_status::PS_INACTIVE && !_inactive_is_transient) { - if (pc.hp_primary == _stub->primary_host_port() // dead primary - || !pc.hp_primary // primary is dead (otherwise let primary remove this) + if (primary == _stub->primary_host_port() // dead primary + || !primary // primary is dead (otherwise let primary remove this) ) { LOG_INFO_PREFIX("downgrade myself as inactive is not transient, remote_config({})", boost::lexical_cast(pc)); diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp index 1d483d7015..7a597d8964 100644 --- a/src/replica/replica_context.cpp +++ b/src/replica/replica_context.cpp @@ -106,13 +106,17 @@ void primary_context::reset_membership(const partition_configuration &new_pc, bo pc = new_pc; - if (pc.hp_primary) { - statuses[pc.hp_primary] = partition_status::PS_PRIMARY; + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { + statuses[primary] = partition_status::PS_PRIMARY; } - for (auto it = new_pc.hp_secondaries.begin(); it != new_pc.hp_secondaries.end(); ++it) { - statuses[*it] = partition_status::PS_SECONDARY; - learners.erase(*it); + std::vector secondaries; + GET_HOST_PORTS(new_pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { + statuses[secondary] = partition_status::PS_SECONDARY; + learners.erase(secondary); } for (auto it = learners.begin(); it != learners.end(); ++it) { @@ -135,8 +139,10 @@ bool primary_context::check_exist(const ::dsn::host_port &node, partition_status { switch (st) { case partition_status::PS_PRIMARY: + DCHECK(pc.__isset.hp_primary, ""); return pc.hp_primary == node; case partition_status::PS_SECONDARY: + DCHECK(pc.__isset.hp_secondaries, ""); return utils::contains(pc.hp_secondaries, node); case partition_status::PS_POTENTIAL_SECONDARY: return learners.find(node) != learners.end(); diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index e59096e4d4..38fc295837 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -775,14 +775,18 @@ void replica_split_manager::update_child_group_partition_count( return; } - if (!_replica->_primary_states.learners.empty() || - _replica->_primary_states.pc.hp_secondaries.size() + 1 < - _replica->_primary_states.pc.max_replica_count) { - LOG_ERROR_PREFIX("there are {} learners or not have enough secondaries(count is {})", - _replica->_primary_states.learners.size(), - _replica->_primary_states.pc.hp_secondaries.size()); - parent_handle_split_error( - "update_child_group_partition_count failed, have learner or lack of secondary", true); + if (!_replica->_primary_states.learners.empty()) { + LOG_ERROR_PREFIX("there are {} learners", _replica->_primary_states.learners.size()); + parent_handle_split_error("update_child_group_partition_count failed, have learner", true); + return; + } + + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); + if (secondaries.size() + 1 < _replica->_primary_states.pc.max_replica_count) { + LOG_ERROR_PREFIX("there are not enough secondaries(count is {})", secondaries.size()); + parent_handle_split_error("update_child_group_partition_count failed, lack of secondary", + true); return; } @@ -1221,14 +1225,17 @@ void replica_split_manager::trigger_primary_parent_split( _meta_split_status = meta_split_status; if (meta_split_status == split_status::SPLITTING) { - if (!_replica->_primary_states.learners.empty() || - _replica->_primary_states.pc.hp_secondaries.size() + 1 < - _replica->_primary_states.pc.max_replica_count) { - LOG_WARNING_PREFIX( - "there are {} learners or not have enough secondaries(count is {}), wait for " - "next round", - _replica->_primary_states.learners.size(), - _replica->_primary_states.pc.hp_secondaries.size()); + if (!_replica->_primary_states.learners.empty()) { + LOG_WARNING_PREFIX("there are {} learners, wait for next round", + _replica->_primary_states.learners.size()); + return; + } + + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries, secondaries); + if (secondaries.size() + 1 < _replica->_primary_states.pc.max_replica_count) { + LOG_WARNING_PREFIX("there are not enough secondaries(count is {}), wait for next round", + secondaries.size()); return; } diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h index 2a643bb4e0..7e34714010 100644 --- a/src/runtime/rpc/rpc_host_port.h +++ b/src/runtime/rpc/rpc_host_port.h @@ -73,7 +73,7 @@ class TProtocol; } else { \ _target.reserve(_obj.field.size()); \ for (const auto &addr : _obj.field) { \ - _target.emplace_back(host_port::from_address(addr)); \ + _target.emplace_back(dsn::host_port::from_address(addr)); \ } \ } \ } while (0) diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp index b2b71e6987..81819ebbda 100644 --- a/src/server/hotspot_partition_calculator.cpp +++ b/src/server/hotspot_partition_calculator.cpp @@ -224,9 +224,9 @@ void hotspot_partition_calculator::send_detect_hotkey_request( req.type = hotkey_type; req.action = action; req.pid = dsn::gpid(app_id, partition_index); - auto error = - _shell_context->ddl_client->detect_hotkey(pcs[partition_index].hp_primary, req, resp); - + dsn::host_port primary; + GET_HOST_PORT(pcs[partition_index], primary, primary); + auto error = _shell_context->ddl_client->detect_hotkey(primary, req, resp); LOG_INFO("{} {} hotkey detection in {}.{}, server: {}", (action == dsn::replication::detect_action::STOP) ? "Stop" : "Start", (hotkey_type == dsn::replication::hotkey_type::WRITE) ? "write" : "read", diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 0a5440b400..64ce7859ed 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -1590,7 +1590,9 @@ inline std::unique_ptr create_table_aggregate_stats_calcs row.app_id); for (const auto &pc : iter->second) { - if (pc.hp_primary != node) { + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary != node) { // Ignore once the replica of the metrics is not the primary of the partition. continue; } @@ -1621,7 +1623,9 @@ create_partition_aggregate_stats_calcs(const int32_t table_id, partition_stat_map increases; partition_stat_map rates; for (size_t i = 0; i < rows.size(); ++i) { - if (pcs[i].hp_primary != node) { + dsn::host_port primary; + GET_HOST_PORT(pcs[i], primary, primary); + if (primary != node) { // Ignore once the replica of the metrics is not the primary of the partition. continue; } @@ -1876,12 +1880,15 @@ inline bool get_app_partition_stat(shell_context *sc, m.name, app_id_x, partition_index_x, counter_name)) { // only primary partition will be counted const auto find = pcs_by_appid.find(app_id_x); - if (find != pcs_by_appid.end() && - find->second[partition_index_x].hp_primary == nodes[i].hp) { - row_data &row = rows[app_id_name[app_id_x]][partition_index_x]; - row.row_name = std::to_string(partition_index_x); - row.app_id = app_id_x; - update_app_pegasus_perf_counter(row, counter_name, m.value); + if (find != pcs_by_appid.end()) { + dsn::host_port primary; + GET_HOST_PORT(find->second[partition_index_x], primary, primary); + if (primary == nodes[i].hp) { + row_data &row = rows[app_id_name[app_id_x]][partition_index_x]; + row.row_name = std::to_string(partition_index_x); + row.app_id = app_id_x; + update_app_pegasus_perf_counter(row, counter_name, m.value); + } } } else if (parse_app_perf_counter_name(m.name, app_name, counter_name)) { // if the app_name from perf-counter isn't existed(maybe the app was dropped), it @@ -2130,7 +2137,9 @@ inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_s if (find == pcs_by_appid.end()) // app id not found continue; auto &pc = find->second[partition_index_x]; - if (pc.hp_primary != nodes[i].hp) // not primary replica + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary != nodes[i].hp) // not primary replica continue; if (pc.partition_flags != 0) // already calculated continue; diff --git a/src/shell/commands/data_operations.cpp b/src/shell/commands/data_operations.cpp index 150f33bede..0641b2f8eb 100644 --- a/src/shell/commands/data_operations.cpp +++ b/src/shell/commands/data_operations.cpp @@ -2241,7 +2241,9 @@ create_rdb_estimated_keys_stats_calcs(const int32_t table_id, partition_stat_map sums; for (size_t i = 0; i < rows.size(); ++i) { - if (pcs[i].hp_primary != node) { + dsn::host_port primary; + GET_HOST_PORT(pcs[i], primary, primary); + if (primary != node) { // Ignore once the replica of the metrics is not the primary of the partition. continue; } @@ -2886,9 +2888,12 @@ bool calculate_hash_value(command_executor *e, shell_context *sc, arguments args tp.add_row_name_and_data("partition_index", partition_index); if (pcs.size() > partition_index) { const auto &pc = pcs[partition_index]; - tp.add_row_name_and_data("primary", pc.hp_primary.to_string()); - tp.add_row_name_and_data("secondaries", - fmt::format("{}", fmt::join(pc.hp_secondaries, ","))); + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + tp.add_row_name_and_data("primary", primary); + tp.add_row_name_and_data("secondaries", fmt::format("{}", fmt::join(secondaries, ","))); } } tp.output(std::cout); diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp index 803d2ee2af..5a1de5e99e 100644 --- a/src/shell/commands/node_management.cpp +++ b/src/shell/commands/node_management.cpp @@ -348,13 +348,17 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) } for (const auto &pc : pcs) { - if (pc.hp_primary) { - auto find = tmp_map.find(pc.hp_primary); + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { + auto find = tmp_map.find(primary); if (find != tmp_map.end()) { find->second.primary_count++; } } - for (const auto &secondary : pc.hp_secondaries) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + for (const auto &secondary : secondaries) { auto find = tmp_map.find(secondary); if (find != tmp_map.end()) { find->second.secondary_count++; diff --git a/src/shell/commands/recovery.cpp b/src/shell/commands/recovery.cpp index 7793142c68..9720e9f0d7 100644 --- a/src/shell/commands/recovery.cpp +++ b/src/shell/commands/recovery.cpp @@ -165,12 +165,13 @@ bool recover(command_executor *e, shell_context *sc, arguments args) dsn::host_port diagnose_recommend(const ddd_partition_info &pinfo) { - if (pinfo.config.hp_last_drops.size() < 2) { + std::vector last_drops; + GET_HOST_PORTS(pinfo.config, last_drops, last_drops); + if (last_drops.size() < 2) { return dsn::host_port(); } - std::vector last_two_nodes(pinfo.config.hp_last_drops.end() - 2, - pinfo.config.hp_last_drops.end()); + std::vector last_two_nodes(last_drops.end() - 2, last_drops.end()); std::vector last_dropped; for (auto &node : last_two_nodes) { auto it = std::find_if(pinfo.dropped.begin(), @@ -291,12 +292,13 @@ bool ddd_diagnose(command_executor *e, shell_context *sc, arguments args) << "last_committed(" << pinfo.config.last_committed_decree << ")" << std::endl; out << " ----" << std::endl; dsn::host_port latest_dropped, secondary_latest_dropped; - if (pinfo.config.hp_last_drops.size() > 0) { - latest_dropped = pinfo.config.hp_last_drops[pinfo.config.hp_last_drops.size() - 1]; + std::vector last_drops; + GET_HOST_PORTS(pinfo.config, last_drops, last_drops); + if (last_drops.size() > 0) { + latest_dropped = last_drops[last_drops.size() - 1]; } - if (pinfo.config.hp_last_drops.size() > 1) { - secondary_latest_dropped = - pinfo.config.hp_last_drops[pinfo.config.hp_last_drops.size() - 2]; + if (last_drops.size() > 1) { + secondary_latest_dropped = last_drops[last_drops.size() - 2]; } int j = 0; for (const ddd_node_info &n : pinfo.dropped) { @@ -320,12 +322,12 @@ bool ddd_diagnose(command_executor *e, shell_context *sc, arguments args) } out << " ----" << std::endl; j = 0; - for (const auto &r : pinfo.config.hp_last_drops) { + for (const auto &r : last_drops) { out << " last_drops[" << j++ << "]: " << "node(" << r.to_string() << ")"; - if (j == (int)pinfo.config.hp_last_drops.size() - 1) + if (j == (int)last_drops.size() - 1) out << " <== the secondary latest"; - else if (j == (int)pinfo.config.hp_last_drops.size()) + else if (j == (int)last_drops.size()) out << " <== the latest"; out << std::endl; } diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index b75e27f56a..1f8aa68ee3 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -334,11 +334,15 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) double disk_used_for_all_replicas = 0; int all_replicas_count = 0; for (const auto &pc : pcs) { + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); std::string primary_str("-"); - if (pc.hp_primary) { + if (primary) { bool disk_found = false; double disk_value = 0; - auto f1 = disk_map.find(pc.hp_primary); + auto f1 = disk_map.find(primary); if (f1 != disk_map.end()) { auto &sub_map = f1->second; auto f2 = sub_map.find(pc.pid.get_partition_index()); @@ -353,7 +357,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) } bool count_found = false; double count_value = 0; - auto f3 = count_map.find(pc.hp_primary); + auto f3 = count_map.find(primary); if (f3 != count_map.end()) { auto &sub_map = f3->second; auto f4 = sub_map.find(pc.pid.get_partition_index()); @@ -363,7 +367,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) } } std::stringstream oss; - oss << replication_ddl_client::node_name(pc.hp_primary, resolve_ip) << "("; + oss << replication_ddl_client::node_name(primary, resolve_ip) << "("; if (disk_found) oss << disk_value; else @@ -380,12 +384,12 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) { std::stringstream oss; oss << "["; - for (int j = 0; j < pc.hp_secondaries.size(); j++) { + for (int j = 0; j < secondaries.size(); j++) { if (j != 0) oss << ","; bool found = false; double value = 0; - auto f1 = disk_map.find(pc.hp_secondaries[j]); + auto f1 = disk_map.find(secondaries[j]); if (f1 != disk_map.end()) { auto &sub_map = f1->second; auto f2 = sub_map.find(pc.pid.get_partition_index()); @@ -398,7 +402,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) } bool count_found = false; double count_value = 0; - auto f3 = count_map.find(pc.hp_secondaries[j]); + auto f3 = count_map.find(secondaries[j]); if (f3 != count_map.end()) { auto &sub_map = f3->second; auto f3 = sub_map.find(pc.pid.get_partition_index()); @@ -408,7 +412,7 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) } } - oss << replication_ddl_client::node_name(pc.hp_secondaries[j], resolve_ip) << "("; + oss << replication_ddl_client::node_name(secondaries[j], resolve_ip) << "("; if (found) oss << value; else @@ -427,8 +431,8 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) if (detailed) { tp_details.add_row(std::to_string(pc.pid.get_partition_index())); tp_details.append_data(pc.ballot); - tp_details.append_data(fmt::format( - "{}/{}", pc.hp_secondaries.size() + (pc.hp_primary ? 1 : 0), pc.max_replica_count)); + tp_details.append_data( + fmt::format("{}/{}", secondaries.size() + (primary ? 1 : 0), pc.max_replica_count)); tp_details.append_data(primary_str); tp_details.append_data(secondary_str); } diff --git a/src/test/function_test/detect_hotspot/test_detect_hotspot.cpp b/src/test/function_test/detect_hotspot/test_detect_hotspot.cpp index 3c549d4cf6..85a35c4d86 100644 --- a/src/test/function_test/detect_hotspot/test_detect_hotspot.cpp +++ b/src/test/function_test/detect_hotspot/test_detect_hotspot.cpp @@ -32,6 +32,7 @@ #include "include/pegasus/error.h" #include "replica_admin_types.h" #include "runtime/api_layer1.h" +#include "runtime/rpc/rpc_host_port.h" #include "test/function_test/utils/test_util.h" #include "test/function_test/utils/utils.h" #include "utils/error_code.h" @@ -99,7 +100,9 @@ class detect_hotspot_test : public test_util dsn::replication::detect_hotkey_response resp; for (const auto &pc : pcs_) { req.pid = pc.pid; - ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(pc.hp_primary, req, resp)); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(primary, req, resp)); if (!resp.hotkey_result.empty()) { find_hotkey = true; break; @@ -118,14 +121,17 @@ class detect_hotspot_test : public test_util req.action = dsn::replication::detect_action::STOP; for (const auto &pc : pcs_) { - ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(pc.hp_primary, req, resp)); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(primary, req, resp)); ASSERT_EQ(dsn::ERR_OK, resp.err); } req.action = dsn::replication::detect_action::QUERY; for (const auto &pc : pcs_) { - req.pid = pc.pid; - ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(pc.hp_primary, req, resp)); + host_port primary; + GET_HOST_PORT(pc, primary, primary); + ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(primary, req, resp)); ASSERT_EQ("Can't get hotkey now, now state: hotkey_collector_state::STOPPED", resp.err_hint); } @@ -156,13 +162,13 @@ class detect_hotspot_test : public test_util req.pid = dsn::gpid(table_id_, target_partition); dsn::replication::detect_hotkey_response resp; - ASSERT_EQ(dsn::ERR_OK, - ddl_client_->detect_hotkey(pcs_[target_partition].hp_primary, req, resp)); + host_port primary; + GET_HOST_PORT(pcs_[target_partition], primary, primary); + ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(primary, req, resp)); ASSERT_EQ(dsn::ERR_OK, resp.err); req.action = dsn::replication::detect_action::QUERY; - ASSERT_EQ(dsn::ERR_OK, - ddl_client_->detect_hotkey(pcs_[target_partition].hp_primary, req, resp)); + ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(primary, req, resp)); ASSERT_EQ("Can't get hotkey now, now state: hotkey_collector_state::COARSE_DETECTING", resp.err_hint); @@ -172,8 +178,7 @@ class detect_hotspot_test : public test_util max_seconds_to_detect_hotkey, detection_type::write_data, key_type::random_dataset)); req.action = dsn::replication::detect_action::QUERY; - ASSERT_EQ(dsn::ERR_OK, - ddl_client_->detect_hotkey(pcs_[target_partition].hp_primary, req, resp)); + ASSERT_EQ(dsn::ERR_OK, ddl_client_->detect_hotkey(primary, req, resp)); ASSERT_EQ("Can't get hotkey now, now state: hotkey_collector_state::STOPPED", resp.err_hint); } diff --git a/src/test/function_test/utils/test_util.cpp b/src/test/function_test/utils/test_util.cpp index 7a41f18822..5e784783ac 100644 --- a/src/test/function_test/utils/test_util.cpp +++ b/src/test/function_test/utils/test_util.cpp @@ -174,8 +174,12 @@ void test_util::wait_table_healthy(const std::string &table_name) const std::vector pcs; ASSERT_EQ(dsn::ERR_OK, ddl_client_->list_app(table_name, table_id, pcount, pcs)); for (const auto &pc : pcs) { - ASSERT_TRUE(pc.primary); - ASSERT_EQ(1 + pc.secondaries.size(), pc.max_replica_count); + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + ASSERT_TRUE(primary); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + ASSERT_EQ(1 + secondaries.size(), pc.max_replica_count); } }, 180); diff --git a/src/test/kill_test/kill_testor.cpp b/src/test/kill_test/kill_testor.cpp index 50570bf1b4..d413197ab9 100644 --- a/src/test/kill_test/kill_testor.cpp +++ b/src/test/kill_test/kill_testor.cpp @@ -111,18 +111,22 @@ dsn::error_code kill_testor::get_partition_info(bool debug_unhealthy, LOG_DEBUG("access meta and query partition status success"); for (const auto &pc : pcs) { int replica_count = 0; - if (pc.hp_primary) { + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + if (primary) { replica_count++; } - replica_count += pc.hp_secondaries.size(); + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries, secondaries); + replica_count += secondaries.size(); if (replica_count == pc.max_replica_count) { healthy_partition_cnt++; } else { const auto &info = fmt::format("gpid={}, primary={}, secondaries=[{}], last_committed_decree={}", pc.pid, - pc.hp_primary, - fmt::join(pc.hp_secondaries, ", "), + primary, + fmt::join(secondaries, ", "), pc.last_committed_decree); if (debug_unhealthy) { LOG_INFO("found unhealthy partition, {}", info); diff --git a/src/test/kill_test/partition_kill_testor.cpp b/src/test/kill_test/partition_kill_testor.cpp index d1dad5d3e2..5c9d3fafd5 100644 --- a/src/test/kill_test/partition_kill_testor.cpp +++ b/src/test/kill_test/partition_kill_testor.cpp @@ -29,6 +29,8 @@ #include "dsn.layer2_types.h" #include "partition_kill_testor.h" #include "remote_cmd/remote_command.h" +#include "runtime/rpc/dns_resolver.h" +#include "runtime/rpc/rpc_host_port.h" #include "runtime/task/task.h" #include "test/kill_test/kill_testor.h" #include "utils/autoref_ptr.h" @@ -88,11 +90,14 @@ void partition_kill_testor::run() results[i].second = err.to_string(); } }; - tasks[i] = dsn::dist::cmd::async_call_remote(pc.primary, - "replica.kill_partition", - arguments, - callback, - std::chrono::milliseconds(5000)); + dsn::host_port primary; + GET_HOST_PORT(pc, primary, primary); + tasks[i] = dsn::dist::cmd::async_call_remote( + dsn::dns_resolver::instance().resolve_address(primary), + "replica.kill_partition", + arguments, + callback, + std::chrono::milliseconds(5000)); } for (int i = 0; i < tasks.size(); ++i) {