Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(FQDN): further more refator on idl/dsn.layer2.thrift #2049

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: true
LambdaBodyIndentation: Signature
Language: Cpp
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
Expand Down
10 changes: 7 additions & 3 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,18 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
host_port partition_resolver_simple::get_host_port(const partition_configuration &pc) const
{
if (_app_is_stateful) {
return pc.hp_primary;
host_port primary;
GET_HOST_PORT(pc, primary, primary);
return primary;
}

if (pc.hp_last_drops.empty()) {
std::vector<host_port> last_drops;
GET_HOST_PORTS(pc, last_drops, last_drops);
if (last_drops.empty()) {
return host_port();
}

return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)];
return last_drops[rand::next_u32(0, last_drops.size() - 1)];
}

error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp)
Expand Down
56 changes: 40 additions & 16 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,19 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na
int ready_count = 0;
for (int i = 0; i < partition_count; i++) {
const auto &pc = query_resp.partitions[i];
if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) {
ready_count++;
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (!primary) {
continue;
}

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (secondaries.size() + 1 < max_replica_count) {
continue;
}

ready_count++;
}
if (ready_count == partition_count) {
std::cout << app_name << " is ready now: (" << ready_count << "/" << partition_count
Expand Down Expand Up @@ -435,11 +445,16 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
int read_unhealthy = 0;
for (const auto &pc : pcs) {
int replica_count = 0;
if (pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (primary) {
replica_count++;
}
replica_count += pc.hp_secondaries.size();
if (pc.hp_primary) {

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
replica_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
} else if (replica_count < 2) {
Expand Down Expand Up @@ -573,13 +588,18 @@ dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_
}

for (const auto &pc : pcs) {
if (pc.hp_primary) {
auto find = tmp_map.find(pc.hp_primary);
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (primary) {
auto find = tmp_map.find(primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (const auto &secondary : pc.hp_secondaries) {

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
for (const auto &secondary : secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
Expand Down Expand Up @@ -766,14 +786,18 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int read_unhealthy = 0;
for (const auto &pc : pcs) {
int replica_count = 0;
if (pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (primary) {
replica_count++;
node_stat[pc.hp_primary].first++;
node_stat[primary].first++;
total_prim_count++;
}
replica_count += pc.hp_secondaries.size();
total_sec_count += pc.hp_secondaries.size();
if (pc.hp_primary) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
replica_count += secondaries.size();
total_sec_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
} else if (replica_count < 2) {
Expand All @@ -783,14 +807,14 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
write_unhealthy++;
read_unhealthy++;
}
for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : secondaries) {
node_stat[secondary].second++;
}
tp_details.add_row(pc.pid.get_partition_index());
tp_details.append_data(pc.ballot);
tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count));
tp_details.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ",")));
tp_details.append_data(primary ? primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(secondaries, ",")));
}
mtp.add(std::move(tp_details));

Expand Down
8 changes: 6 additions & 2 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,16 @@ int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
rc.learner_signature = invalid_signature;
SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary);

if (node == pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (node == primary) {
rc.status = partition_status::PS_PRIMARY;
return true;
}

if (utils::contains(pc.hp_secondaries, node)) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (utils::contains(secondaries, node)) {
rc.status = partition_status::PS_SECONDARY;
return true;
}
Expand Down
30 changes: 23 additions & 7 deletions src/common/replication_other_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "consensus_types.h"
#include "replica_admin_types.h"
#include "common/replication_enums.h"
#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"

Expand Down Expand Up @@ -78,18 +79,33 @@ inline bool is_member(const partition_configuration &pc, const rpc_address &node
inline bool is_partition_config_equal(const partition_configuration &pc1,
const partition_configuration &pc2)
{
// secondaries no need to be same order
for (const auto &pc1_secondary : pc1.hp_secondaries) {
if (pc1.ballot != pc2.ballot || pc1.pid != pc2.pid ||
pc1.max_replica_count != pc2.max_replica_count ||
pc1.last_committed_decree != pc2.last_committed_decree) {
return false;
}

host_port pc1_primary;
GET_HOST_PORT(pc1, primary, pc1_primary);
host_port pc2_primary;
GET_HOST_PORT(pc2, primary, pc2_primary);
if (pc1_primary != pc2_primary) {
return false;
}

// secondaries no need to be in the same order.
std::vector<host_port> pc1_secondaries;
GET_HOST_PORTS(pc1, secondaries, pc1_secondaries);
for (const auto &pc1_secondary : pc1_secondaries) {
if (!is_secondary(pc2, pc1_secondary)) {
return false;
}
}

std::vector<host_port> pc2_secondaries;
GET_HOST_PORTS(pc2, secondaries, pc2_secondaries);
// last_drops is not considered into equality check
return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid &&
pc1.max_replica_count == pc2.max_replica_count && pc1.primary == pc2.primary &&
pc1.hp_primary == pc2.hp_primary && pc1.secondaries.size() == pc2.secondaries.size() &&
pc1.hp_secondaries.size() == pc2.hp_secondaries.size() &&
pc1.last_committed_decree == pc2.last_committed_decree;
return pc1_secondaries.size() == pc2_secondaries.size();
}

class replica_helper
Expand Down
11 changes: 8 additions & 3 deletions src/meta/cluster_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,17 @@ bool cluster_balance_policy::get_app_migration_info(std::shared_ptr<app_state> a
info.partitions.reserve(app->pcs.size());
for (const auto &pc : app->pcs) {
std::map<host_port, partition_status::type> pstatus_map;
pstatus_map[pc.hp_primary] = partition_status::PS_PRIMARY;
if (pc.hp_secondaries.size() != pc.max_replica_count - 1) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
pstatus_map[primary] = partition_status::PS_PRIMARY;

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (secondaries.size() != pc.max_replica_count - 1) {
// partition is unhealthy
return false;
}
for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : secondaries) {
pstatus_map[secondary] = partition_status::PS_SECONDARY;
}
info.partitions.push_back(std::move(pstatus_map));
Expand Down
15 changes: 10 additions & 5 deletions src/meta/load_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "meta_admin_types.h"
#include "rpc/dns_resolver.h" // IWYU pragma: keep
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "utils/command_manager.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
Expand Down Expand Up @@ -172,14 +173,16 @@ generate_balancer_request(const app_mapper &apps,
new_proposal_action(to, to, config_type::CT_UPGRADE_TO_PRIMARY));
result.action_list.emplace_back(new_proposal_action(to, from, config_type::CT_REMOVE));
break;
case balance_type::COPY_SECONDARY:
case balance_type::COPY_SECONDARY: {
ans = "copy_secondary";
result.balance_type = balancer_request_type::copy_secondary;
host_port primary;
GET_HOST_PORT(pc, primary, primary);
result.action_list.emplace_back(
new_proposal_action(pc.hp_primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
result.action_list.emplace_back(
new_proposal_action(pc.hp_primary, from, config_type::CT_REMOVE));
new_proposal_action(primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
result.action_list.emplace_back(new_proposal_action(primary, from, config_type::CT_REMOVE));
break;
}
default:
CHECK(false, "");
}
Expand Down Expand Up @@ -566,7 +569,9 @@ void ford_fulkerson::update_decree(int node_id, const node_state &ns)
{
ns.for_each_primary(_app->app_id, [&, this](const gpid &pid) {
const auto &pc = _app->pcs[pid.get_partition_index()];
for (const auto &secondary : pc.hp_secondaries) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
for (const auto &secondary : secondaries) {
auto i = _host_port_id.find(secondary);
CHECK(i != _host_port_id.end(), "invalid secondary: {}", secondary);
_network[node_id][i->second]++;
Expand Down
8 changes: 6 additions & 2 deletions src/meta/meta_bulk_load_ingestion_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ void ingestion_context::partition_node_info::create(const partition_configuratio
{
pid = pc.pid;
std::unordered_set<host_port> current_nodes;
current_nodes.insert(pc.hp_primary);
for (const auto &secondary : pc.hp_secondaries) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
current_nodes.insert(primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
for (const auto &secondary : secondaries) {
current_nodes.insert(secondary);
}
for (const auto &node : current_nodes) {
Expand Down
24 changes: 16 additions & 8 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ bool bulk_load_service::check_partition_status(
}

pc = app->pcs[pid.get_partition_index()];
if (!pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (!primary) {
LOG_WARNING("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(
LPC_META_STATE_NORMAL,
Expand All @@ -382,7 +384,9 @@ bool bulk_load_service::check_partition_status(
return false;
}

if (pc.hp_secondaries.size() < pc.max_replica_count - 1) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (secondaries.size() < pc.max_replica_count - 1) {
bulk_load_status::type p_status;
{
zauto_read_lock l(_lock);
Expand Down Expand Up @@ -434,7 +438,7 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()];
req->pid = pid;
req->app_name = app_name;
SET_IP_AND_HOST_PORT(*req, primary, pc.primary, pc.hp_primary);
SET_OBJ_IP_AND_HOST_PORT(*req, primary, pc, primary);
req->remote_provider_name = ainfo.file_provider_type;
req->cluster_name = ainfo.cluster_name;
req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid);
Expand Down Expand Up @@ -1201,8 +1205,12 @@ bool bulk_load_service::check_ever_ingestion_succeed(const partition_configurati
}

std::vector<host_port> current_nodes;
current_nodes.emplace_back(pc.hp_primary);
for (const auto &secondary : pc.hp_secondaries) {
dsn::host_port primary;
GET_HOST_PORT(pc, primary, primary);
current_nodes.emplace_back(primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
for (const auto &secondary : secondaries) {
current_nodes.emplace_back(secondary);
}

Expand Down Expand Up @@ -1270,13 +1278,13 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
return;
}

const auto &primary = pc.hp_primary;
ballot meta_ballot = pc.ballot;
host_port primary;
GET_HOST_PORT(pc, primary, primary);
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot),
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, pc.ballot),
0,
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
}
Expand Down
6 changes: 4 additions & 2 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ bool construct_replica(meta_view view, const gpid &pid, int max_replica_count)
// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
CHECK(pc.hp_last_drops.empty(), "last_drops of partition({}) must be empty", pid);
std::vector<host_port> last_drops;
GET_HOST_PORTS(pc, last_drops, last_drops);
CHECK(last_drops.empty(), "last_drops of partition({}) must be empty", pid);
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
// hp_last_drops is added in the steps bellow.
if (pc.hp_last_drops.size() + 1 >= max_replica_count) {
break;
}
Expand Down Expand Up @@ -540,7 +543,6 @@ app_state::app_state(const app_info &info) : app_info(info), helpers(new app_sta
for (int i = 0; i != app_info::partition_count; ++i) {
pcs[i].pid.set_partition_index(i);
}

helpers->on_init_partitions();
}

Expand Down
Loading
Loading