From 75c576b7af5f80081f5d6b8fe51801de941786f6 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Tue, 25 Feb 2025 18:14:35 +0800 Subject: [PATCH 01/18] feat(make_idempotent): support making write requests idempotent for primary replicas --- src/replica/mutation.cpp | 159 +++++++++++++++++++++++---------- src/replica/mutation.h | 21 ++++- src/replica/replica.cpp | 18 +++- src/replica/replica.h | 23 +++++ src/replica/replica_2pc.cpp | 127 ++++++++++++++++++++++++-- src/replica/replica_config.cpp | 4 +- src/replica/replica_context.h | 4 +- src/task/task_spec.cpp | 10 ++- 8 files changed, 300 insertions(+), 66 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index e1c73e8aee..d37e617556 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -341,31 +341,71 @@ void mutation::wait_log_task() const } } -mutation_queue::mutation_queue(gpid gpid, - int max_concurrent_op /*= 2*/, - bool batch_write_disabled /*= false*/) - : _max_concurrent_op(max_concurrent_op), _batch_write_disabled(batch_write_disabled) +mutation_queue::mutation_queue(replica *r, + gpid gpid, + int max_concurrent_op, + bool batch_write_disabled) + : _replica(r),_current_op_count(0), _max_concurrent_op(max_concurrent_op), _batch_write_disabled(batch_write_disabled) { - _current_op_count = 0; - _pending_mutation = nullptr; CHECK_NE_MSG(gpid.get_app_id(), 0, "invalid gpid"); _pcount = dsn_task_queue_virtual_length_ptr(RPC_PREPARE, gpid.thread_hash()); } -mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request, replica *r) +// Once the blocking mutation is set, any mutation would not be popped until all previous +// mutations have been committed and applied into the rocksdb. +mutation_ptr mutation_queue::try_unblock() { - task_spec *spec = task_spec::get(code); + CHECK_NOTNULL(_blocking_mutation, ""); - // if not allow write batch, switch work queue - if (_pending_mutation && !spec->rpc_request_is_write_allow_batch) { - _pending_mutation->add_ref(); // released when unlink + // All of the mutations before the blocking mutation must have been in prepare list. + const auto max_prepared_decree = _replica->max_prepared_decree(); + const auto last_applied_decree = _replica->last_applied_decree(); + if (max_prepared_decree > last_applied_decree) { + return {}; + } + + CHECK_EQ(max_prepared_decree, last_applied_decree); + + mutation_ptr mu = _blocking_mutation; + _blocking_mutation = nullptr; + + // + ++_current_op_count; + + return mu; +} + +// Once the popped mutation is found blocking, set it as the blocking mutation. +mutation_ptr mutation_queue::try_block(mutation_ptr &mu) +{ + CHECK_NOTNULL(mu, ""); + + if (!mu->is_blocking) { + ++_current_op_count; + return mu; + } + + CHECK_NULL(_blocking_mutation, ""); + + _blocking_mutation = mu; + return try_unblock(); +} + +mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request) +{ + auto *spec = task_spec::get(code); + CHECK_NOTNULL(spec, ""); + + // If batch is not allowed for this write, switch work queue + if (_pending_mutation != nullptr && !spec->rpc_request_is_write_allow_batch) { + _pending_mutation->add_ref(); // Would be released during unlink. _hdr.add(_pending_mutation); - _pending_mutation = nullptr; + _pending_mutation.reset(); ++(*_pcount); } - // add to work queue - if (!_pending_mutation) { + // Add to work queue + if (_pending_mutation == nullptr) { _pending_mutation = r->new_mutation(invalid_decree); } @@ -377,10 +417,14 @@ mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request, // short-cut if (_current_op_count < _max_concurrent_op && _hdr.is_empty()) { - auto ret = _pending_mutation; - _pending_mutation = nullptr; - _current_op_count++; - return ret; + if (_blocking_mutation != nullptr) { + return try_unblock(); + } + + auto mu = _pending_mutation; + _pending_mutation.reset(); + + return try_block(mu); } // check if need to switch work queue @@ -388,74 +432,94 @@ mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request, _pending_mutation->is_full()) { _pending_mutation->add_ref(); // released when unlink _hdr.add(_pending_mutation); - _pending_mutation = nullptr; + _pending_mutation.reset(); ++(*_pcount); } // get next work item - if (_current_op_count >= _max_concurrent_op) - return nullptr; - else if (_hdr.is_empty()) { + if (_current_op_count >= _max_concurrent_op) { + return {}; + } + + if (_blocking_mutation != nullptr) { + return try_unblock(); + } + + // Try to fetch next work. + mutation_ptr mu; + if (_hdr.is_empty()) { CHECK_NOTNULL(_pending_mutation, "pending mutation cannot be null"); - auto ret = _pending_mutation; - _pending_mutation = nullptr; - _current_op_count++; - return ret; + mu = _pending_mutation; + _pending_mutation.reset(); } else { - _current_op_count++; - return unlink_next_workload(); + mu = unlink_next_workload(); } + + return try_block(mu); } mutation_ptr mutation_queue::check_possible_work(int current_running_count) { _current_op_count = current_running_count; - if (_current_op_count >= _max_concurrent_op) - return nullptr; + if (_current_op_count >= _max_concurrent_op) { + return {}; + } + + if (_blocking_mutation != nullptr) { + return try_unblock(); + } - // no further workload + mutation_ptr mu; if (_hdr.is_empty()) { - if (_pending_mutation != nullptr) { - auto ret = _pending_mutation; - _pending_mutation = nullptr; - _current_op_count++; - return ret; - } else { - return nullptr; + // no further workload + if (_pending_mutation == nullptr) { + return {}; } - } - // run further workload - else { - _current_op_count++; - return unlink_next_workload(); + mu = _pending_mutation; + _pending_mutation.reset(); + } else { + // run further workload + mu = unlink_next_workload(); } + + return try_block(mu); } void mutation_queue::clear() { - if (_pending_mutation != nullptr) { - _pending_mutation = nullptr; + if (_blocking_mutation != nullptr) { + _blocking_mutation.reset(); } mutation_ptr r; while ((r = unlink_next_workload()) != nullptr) { } + + if (_pending_mutation != nullptr) { + _pending_mutation.reset(); + } } void mutation_queue::clear(std::vector &queued_mutations) { - mutation_ptr r; queued_mutations.clear(); + + if (_blocking_mutation != nullptr) { + queued_mutations.emplace_back(std::move(_blocking_mutation)); + _blocking_mutation.reset(); + } + + mutation_ptr r; while ((r = unlink_next_workload()) != nullptr) { queued_mutations.emplace_back(r); } if (_pending_mutation != nullptr) { queued_mutations.emplace_back(std::move(_pending_mutation)); - _pending_mutation = nullptr; + _pending_mutation.reset(); } // we don't reset the current_op_count, coz this is handled by @@ -463,5 +527,6 @@ void mutation_queue::clear(std::vector &queued_mutations) // is handled by prepare_list // _current_op_count = 0; } + } // namespace replication } // namespace dsn diff --git a/src/replica/mutation.h b/src/replica/mutation.h index b6b8ecd144..fd9d6b4e85 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -153,6 +153,11 @@ class mutation : public ref_counter // user requests std::vector client_requests; + // A mutation is blocking means this mutation would begin to be processed after all of + // the previous mutations in the queue have been committed and applied into the rocksdb + // of the primary replica. + bool is_blocking{false}; + // The original request received from the client. While making an atomic request (incr, // check_and_set and check_and_mutate) idempotent, an extra variable is needed to hold // its original request for the purpose of replying to the client. @@ -213,7 +218,7 @@ class replica; class mutation_queue { public: - mutation_queue(gpid gpid, int max_concurrent_op = 2, bool batch_write_disabled = false); + mutation_queue(replica *r, gpid gpid, int max_concurrent_op, bool batch_write_disabled); ~mutation_queue() { @@ -224,7 +229,7 @@ class mutation_queue _current_op_count); } - mutation_ptr add_work(task_code code, dsn::message_ex *request, replica *r); + mutation_ptr add_work(task_code code, dsn::message_ex *request); void clear(); // called when you want to clear the mutation_queue and want to get the remaining messages @@ -235,6 +240,9 @@ class mutation_queue mutation_ptr check_possible_work(int current_running_count); private: + mutation_ptr try_unblock(); + mutation_ptr try_block(mutation_ptr &mu); + mutation_ptr unlink_next_workload() { mutation_ptr r = _hdr.pop_one(); @@ -247,7 +255,8 @@ class mutation_queue void reset_max_concurrent_ops(int max_c) { _max_concurrent_op = max_c; } -private: + replica *_replica; + int _current_op_count; int _max_concurrent_op; bool _batch_write_disabled; @@ -255,6 +264,12 @@ class mutation_queue volatile int *_pcount; mutation_ptr _pending_mutation; slist _hdr; + + // Once a mutation that would get popped is blocking, it should firstly be put in + // `_blocking_mutation`; then, the queue would always return nullptr until previous + // mutations have been committed and applied into the rocksdb of primary replica. + mutation_ptr _blocking_mutation; }; + } // namespace replication } // namespace dsn diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 4175bb8168..68dea6bf31 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -275,7 +275,7 @@ replica::replica(replica_stub *stub, : serverlet(replication_options::kReplicaAppType.c_str()), replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_host_port_cache), app.app_name), _app_info(app), - _primary_states(gpid, FLAGS_staleness_for_commit, FLAGS_batch_write_disabled), + _primary_states(this, gpid, FLAGS_staleness_for_commit, FLAGS_batch_write_disabled), _potential_secondary_states(this), _chkpt_total_size(0), _cur_download_size(0), @@ -561,13 +561,27 @@ void replica::execute_mutation(mutation_ptr &mu) ADD_CUSTOM_POINT(mu->_tracer, "completed"); auto next = _primary_states.write_queue.check_possible_work( - static_cast(_prepare_list->max_decree() - d)); + static_cast(max_prepared_decree() - d)); if (next != nullptr) { init_prepare(next, false); } } +mutation_ptr replica::new_mutation(decree decree, dsn::message_ex *original_request) +{ + auto mu = new_mutation(decree); + mu->original_request = original_request; + return mu; +} + +mutation_ptr replica::new_mutation(decree decree, bool is_blocking) +{ + auto mu = new_mutation(decree); + mu->is_blocking = is_blocking; + return mu; +} + mutation_ptr replica::new_mutation(decree decree) { mutation_ptr mu(new mutation()); diff --git a/src/replica/replica.h b/src/replica/replica.h index b312865c5e..3d537509b8 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -356,6 +356,16 @@ class replica : public serverlet, public ref_counter, public replica_ba void response_client_read(dsn::message_ex *request, error_code error); void response_client_write(dsn::message_ex *request, error_code error); void execute_mutation(mutation_ptr &mu); + + // Create a new mutation with the non-idempotent original request, which is used to reply + // to the client. + mutation_ptr new_mutation(decree decree, dsn::message_ex *original_request); + + // Create a new mutation marked as blocking, which means this mutation would begin to be + // processed after all of the previous mutations in the queue have been committed and applied + // into the rocksdb of primary replica. + mutation_ptr new_mutation(decree decree, bool is_blocking); + mutation_ptr new_mutation(decree decree); // initialization @@ -372,6 +382,19 @@ class replica : public serverlet, public ref_counter, public replica_ba ///////////////////////////////////////////////////////////////// // 2pc + + // - spec: should never be NULL (otherwise the behaviour is undefined). + bool need_reject_non_idempotent(dsn::message_ex *request, task_spec *spec); + + // Decide if it is needed to make the request idempotent. + // - spec: should never be NULL (otherwise the behaviour is undefined). + bool need_make_idempotent(task_spec *spec); + bool need_make_idempotent(dsn::message_ex *request, task_spec *spec); + bool need_make_idempotent(dsn::message_ex *request); + + // Make the request in the mutation idempotent, if needed. + int make_idempotent(mutation_ptr &mu); + // `pop_all_committed_mutations = true` will be used for ingestion empty write // See more about it in `replica_bulk_loader.cpp` void diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index f0c108b7fe..6cbc3af9f3 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -84,6 +84,13 @@ DSN_DEFINE_bool(replication, "reject client write requests if disk status is space insufficient"); DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE); +DSN_DEFINE_bool(replication, + make_write_idempotent, + true, + "Whether to make atomic writes idempotent, including incr, check_and_set " + "and check_and_mutate"); +DSN_TAG_VARIABLE(make_write_idempotent, FT_MUTABLE); + DSN_DEFINE_int32(replication, prepare_timeout_ms_for_secondaries, 3000, @@ -146,9 +153,18 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } - task_spec *spec = task_spec::get(request->rpc_code()); - if (dsn_unlikely(nullptr == spec || request->rpc_code() == TASK_CODE_INVALID)) { - LOG_ERROR("recv message with unhandled rpc name {} from {}, trace_id = {}", + if (dsn_unlikely(request->rpc_code() == TASK_CODE_INVALID)) { + LOG_ERROR("recv message with invalid RPC code {} from {}, trace_id = {}", + request->rpc_code(), + request->header->from_address, + request->header->trace_id); + response_client_write(request, ERR_INVALID_PARAMETERS); + return; + } + + auto *spec = task_spec::get(request->rpc_code()); + if (dsn_unlikely(spec == nullptr)) { + LOG_ERROR("recv message with unhandled RPC code {} from {}, trace_id = {}", request->rpc_code(), request->header->from_address, request->header->trace_id); @@ -156,7 +172,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } - if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) { + if (need_reject_non_idempotent(spec)) { // Ignore non-idempotent write, because duplication provides no guarantee of atomicity to // make this write produce the same result on multiple clusters. METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests); @@ -220,16 +236,108 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) } LOG_DEBUG_PREFIX("got write request from {}", request->header->from_address); - auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this); + auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request); if (mu != nullptr) { init_prepare(mu, false); } } +bool replica::need_reject_non_idempotent(task_spec *spec) +{ + if (!is_duplication_master()) { + return false; + } + + if (FLAGS_make_write_idempotent) { + return false; + } + + return !spec->rpc_request_is_write_idempotent; +} + +bool replica::need_make_idempotent(task_spec *spec) +{ + if (!FLAGS_make_write_idempotent) { + return false; + } + + return !spec->rpc_request_is_write_idempotent; +} + +bool replica::need_make_idempotent(message_ex *request, task_spec *spec) +{ + if (request == nullptr) { + return false; + } + + return need_make_idempotent(spec); +} + +bool replica::need_make_idempotent(dsn::message_ex *request) +{ + if (request == nullptr) { + return false; + } + + auto *spec = task_spec::get(request->rpc_code()); + CHECK_NOTNULL(spec, "RPC code {} not found", request->rpc_code()); + + return need_make_idempotent(spec); +} + +int replica::make_idempotent(mutation_ptr &mu) +{ + CHECK_TRUE(!mu->client_requests.empty()); + + message_ex *request = mu->client_requests.front(); + if (!need_make_idempotent(request)) { + return rocksdb::Status::kOk; + } + + // The original atomic write request must not be batched. + CHECK_EQ(mu->client_requests.size(), 1); + + dsn::message_ex *new_request = nullptr; + const int err = _app->make_idempotent(request, &new_request); + if (dsn_unlikely(err != rocksdb::Status::kOk)) { + // Once error occurred, the response would be returned to the client during + // _app->make_idempotent(). + return err; + } + + CHECK_NOTNULL(new_request, + "new_request should not be null since its original write request must be atomic"); + + // During make_idempotent(), request has been deserialized (i.e. unmarshall() in + // rpc_holder::internal). Once deserialize it again, assertion would fail for + // set_read_msg() in the constructor of rpc_read_stream. + // + // To deserialize it again for writting rocksdb, restore read for it. + request->restore_read(); + + // The decree must have not been assigned. + CHECK_EQ(mu->get_decree(), invalid_decree); + + // Create a new mutation to hold the new idempotent request. The old mutation that hold the + // non-idempotent requests would be released automatically. + // + // No need to create the mutation with is_blocking set to true, since the old mutation has + // been previously popped out from the mutation queue. + mu = new_mutation(invalid_decree, request); + mu->add_client_request(new_request->rpc_code(), new_request); + return rocksdb::Status::kOk; +} + void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_committed_mutations) { CHECK_EQ(partition_status::PS_PRIMARY, status()); + if (make_idempotent(mu) != rocksdb::Status::kOk) { + // Once error occurred, the response must have been returned to the client during + // make_idempotent(). + return; + } + mu->_tracer->set_description("primary"); ADD_POINT(mu->_tracer); @@ -243,8 +351,9 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c mu->set_id(get_ballot(), _prepare_list->max_decree() + 1); // print a debug log if necessary if (FLAGS_prepare_decree_gap_for_debug_logging > 0 && - mu->get_decree() % FLAGS_prepare_decree_gap_for_debug_logging == 0) + mu->get_decree() % FLAGS_prepare_decree_gap_for_debug_logging == 0) { level = LOG_LEVEL_INFO; + } mu->set_timestamp(_uniq_timestamp_us.next()); } else { mu->set_id(get_ballot(), mu->data.header.decree); @@ -351,6 +460,12 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c return; ErrOut: + if (mu->original_request != nullptr) { + // Respond to the original atomic request. And it would never be batched. + response_client_write(mu->original_request, err); + return; + } + for (auto &r : mu->client_requests) { response_client_write(r, err); } diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp index 8fc73eec71..3c1d2ccc53 100644 --- a/src/replica/replica_config.cpp +++ b/src/replica/replica_config.cpp @@ -1040,8 +1040,8 @@ bool replica::update_local_configuration(const replica_configuration &config, // start pending mutations if necessary if (status() == partition_status::PS_PRIMARY) { mutation_ptr next = _primary_states.write_queue.check_possible_work( - static_cast(_prepare_list->max_decree() - last_committed_decree())); - if (next) { + static_cast(max_prepared_decree() - last_committed_decree())); + if (next != nullptr) { init_prepare(next, false); } diff --git a/src/replica/replica_context.h b/src/replica/replica_context.h index c1719c8de9..75d9c0dfcf 100644 --- a/src/replica/replica_context.h +++ b/src/replica/replica_context.h @@ -89,9 +89,9 @@ typedef std::unordered_map<::dsn::host_port, remote_learner_state> learner_map; class primary_context { public: - primary_context(gpid gpid, int max_concurrent_2pc_count = 1, bool batch_write_disabled = false) + primary_context(replica *r,gpid gpid, int max_concurrent_2pc_count, bool batch_write_disabled) : next_learning_version(0), - write_queue(gpid, max_concurrent_2pc_count, batch_write_disabled), + write_queue(r, gpid, max_concurrent_2pc_count, batch_write_disabled), last_prepare_decree_on_new_primary(0), last_prepare_ts_ms(dsn_now_ms()) { diff --git a/src/task/task_spec.cpp b/src/task/task_spec.cpp index ff38b1fbdc..5d07cce692 100644 --- a/src/task/task_spec.cpp +++ b/src/task/task_spec.cpp @@ -59,7 +59,7 @@ void task_spec::register_task_code(task_code code, { CHECK_GE(code, 0); CHECK_LT(code, TASK_SPEC_STORE_CAPACITY); - if (!s_task_spec_store[code]) { + if (s_task_spec_store[code] == nullptr) { s_task_spec_store[code] = std::make_unique(code, code.to_string(), type, pri, pool); auto &spec = s_task_spec_store[code]; @@ -75,8 +75,11 @@ void task_spec::register_task_code(task_code code, spec->rpc_paired_code = ack_code; task_spec::get(ack_code.code())->rpc_paired_code = code; } - } else { - auto spec = task_spec::get(code); + + return; + } + + auto *spec = task_spec::get(code); CHECK_EQ_MSG( spec->type, type, @@ -102,7 +105,6 @@ void task_spec::register_task_code(task_code code, } spec->pool_code = pool; } - } } void task_spec::register_storage_task_code(task_code code, From 0416f4c528227a0de8db58831d7acf7b430bd029 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Tue, 25 Feb 2025 18:34:32 +0800 Subject: [PATCH 02/18] format and fix compilation --- src/replica/mutation.cpp | 48 +++++++++++++++++--------------- src/replica/mutation.h | 4 +-- src/replica/replica.h | 2 +- src/replica/replica_2pc.cpp | 2 +- src/replica/replica_context.h | 2 +- src/task/task_spec.cpp | 52 +++++++++++++++++------------------ 6 files changed, 57 insertions(+), 53 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index d37e617556..4ed4451646 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -345,7 +345,10 @@ mutation_queue::mutation_queue(replica *r, gpid gpid, int max_concurrent_op, bool batch_write_disabled) - : _replica(r),_current_op_count(0), _max_concurrent_op(max_concurrent_op), _batch_write_disabled(batch_write_disabled) + : _replica(r), + _current_op_count(0), + _max_concurrent_op(max_concurrent_op), + _batch_write_disabled(batch_write_disabled) { CHECK_NE_MSG(gpid.get_app_id(), 0, "invalid gpid"); _pcount = dsn_task_queue_virtual_length_ptr(RPC_PREPARE, gpid.thread_hash()); @@ -353,41 +356,41 @@ mutation_queue::mutation_queue(replica *r, // Once the blocking mutation is set, any mutation would not be popped until all previous // mutations have been committed and applied into the rocksdb. -mutation_ptr mutation_queue::try_unblock() +mutation_ptr mutation_queue::try_unblock() { - CHECK_NOTNULL(_blocking_mutation, ""); + CHECK_NOTNULL(_blocking_mutation, ""); // All of the mutations before the blocking mutation must have been in prepare list. - const auto max_prepared_decree = _replica->max_prepared_decree(); - const auto last_applied_decree = _replica->last_applied_decree(); - if (max_prepared_decree > last_applied_decree) { + const auto max_prepared_decree = _replica->max_prepared_decree(); + const auto last_applied_decree = _replica->last_applied_decree(); + if (max_prepared_decree > last_applied_decree) { return {}; - } + } - CHECK_EQ(max_prepared_decree, last_applied_decree); + CHECK_EQ(max_prepared_decree, last_applied_decree); - mutation_ptr mu = _blocking_mutation; - _blocking_mutation = nullptr; + mutation_ptr mu = _blocking_mutation; + _blocking_mutation = nullptr; - // - ++_current_op_count; + // + ++_current_op_count; - return mu; + return mu; } // Once the popped mutation is found blocking, set it as the blocking mutation. -mutation_ptr mutation_queue::try_block(mutation_ptr &mu) +mutation_ptr mutation_queue::try_block(mutation_ptr &mu) { - CHECK_NOTNULL(mu, ""); + CHECK_NOTNULL(mu, ""); - if (!mu->is_blocking) { - ++_current_op_count; - return mu; - } + if (!mu->is_blocking) { + ++_current_op_count; + return mu; + } - CHECK_NULL(_blocking_mutation, ""); + CHECK_NULL(_blocking_mutation, ""); - _blocking_mutation = mu; + _blocking_mutation = mu; return try_unblock(); } @@ -406,7 +409,8 @@ mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request) // Add to work queue if (_pending_mutation == nullptr) { - _pending_mutation = r->new_mutation(invalid_decree); + _pending_mutation = + _replica->new_mutation(invalid_decree, _replica->need_make_idempotent(request)); } LOG_DEBUG("add request with trace_id = {:#018x} into mutation with mutation_tid = {}", diff --git a/src/replica/mutation.h b/src/replica/mutation.h index fd9d6b4e85..a021e80f6a 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -240,8 +240,8 @@ class mutation_queue mutation_ptr check_possible_work(int current_running_count); private: - mutation_ptr try_unblock(); - mutation_ptr try_block(mutation_ptr &mu); + mutation_ptr try_unblock(); + mutation_ptr try_block(mutation_ptr &mu); mutation_ptr unlink_next_workload() { diff --git a/src/replica/replica.h b/src/replica/replica.h index 3d537509b8..9b4607f003 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -384,7 +384,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // 2pc // - spec: should never be NULL (otherwise the behaviour is undefined). - bool need_reject_non_idempotent(dsn::message_ex *request, task_spec *spec); + bool need_reject_non_idempotent(task_spec *spec); // Decide if it is needed to make the request idempotent. // - spec: should never be NULL (otherwise the behaviour is undefined). diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 6cbc3af9f3..3d4c187d49 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -306,7 +306,7 @@ int replica::make_idempotent(mutation_ptr &mu) } CHECK_NOTNULL(new_request, - "new_request should not be null since its original write request must be atomic"); + "new_request should not be null since its original write request must be atomic"); // During make_idempotent(), request has been deserialized (i.e. unmarshall() in // rpc_holder::internal). Once deserialize it again, assertion would fail for diff --git a/src/replica/replica_context.h b/src/replica/replica_context.h index 75d9c0dfcf..69a419ca44 100644 --- a/src/replica/replica_context.h +++ b/src/replica/replica_context.h @@ -89,7 +89,7 @@ typedef std::unordered_map<::dsn::host_port, remote_learner_state> learner_map; class primary_context { public: - primary_context(replica *r,gpid gpid, int max_concurrent_2pc_count, bool batch_write_disabled) + primary_context(replica *r, gpid gpid, int max_concurrent_2pc_count, bool batch_write_disabled) : next_learning_version(0), write_queue(r, gpid, max_concurrent_2pc_count, batch_write_disabled), last_prepare_decree_on_new_primary(0), diff --git a/src/task/task_spec.cpp b/src/task/task_spec.cpp index 5d07cce692..9ed57941da 100644 --- a/src/task/task_spec.cpp +++ b/src/task/task_spec.cpp @@ -77,34 +77,34 @@ void task_spec::register_task_code(task_code code, } return; - } - - auto *spec = task_spec::get(code); - CHECK_EQ_MSG( - spec->type, - type, - "task code {} registerd for {}, which does not match with previously registered {}", - code, - enum_to_string(type), - enum_to_string(spec->type)); - - if (spec->priority != pri) { - LOG_WARNING("overwrite priority for task {} from {} to {}", - code, - enum_to_string(spec->priority), - enum_to_string(pri)); - spec->priority = pri; - } + } - if (spec->pool_code != pool) { - if (spec->pool_code != THREAD_POOL_INVALID) { - LOG_WARNING("overwrite default thread pool for task {} from {} to {}", - code, - spec->pool_code, - pool); - } - spec->pool_code = pool; + auto *spec = task_spec::get(code); + CHECK_EQ_MSG( + spec->type, + type, + "task code {} registerd for {}, which does not match with previously registered {}", + code, + enum_to_string(type), + enum_to_string(spec->type)); + + if (spec->priority != pri) { + LOG_WARNING("overwrite priority for task {} from {} to {}", + code, + enum_to_string(spec->priority), + enum_to_string(pri)); + spec->priority = pri; + } + + if (spec->pool_code != pool) { + if (spec->pool_code != THREAD_POOL_INVALID) { + LOG_WARNING("overwrite default thread pool for task {} from {} to {}", + code, + spec->pool_code, + pool); } + spec->pool_code = pool; + } } void task_spec::register_storage_task_code(task_code code, From 33cd80873d835322587bb3a12fcb174b0eeddc49 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 26 Feb 2025 16:02:33 +0800 Subject: [PATCH 03/18] fix clang-tidy --- src/replica/mutation.cpp | 52 ++++++++++++++++++++----------------- src/replica/mutation.h | 34 ++++++++++++++---------- src/replica/replica.cpp | 1 + src/replica/replica.h | 14 ++++++---- src/replica/replica_2pc.cpp | 30 +++++++-------------- 5 files changed, 68 insertions(+), 63 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index 4ed4451646..2053790169 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -52,23 +52,22 @@ DSN_DEFINE_uint64( "Latency trace will be logged when exceed the write latency threshold, in nanoseconds"); DSN_TAG_VARIABLE(abnormal_write_trace_latency_threshold, FT_MUTABLE); -namespace dsn { -namespace replication { +namespace dsn::replication { + std::atomic mutation::s_tid(0); mutation::mutation() + : _tracer(std::make_shared( + false, "mutation", FLAGS_abnormal_write_trace_latency_threshold)), + _private0(0), + _prepare_ts_ms(0), + _appro_data_bytes(sizeof(mutation_header)), + _create_ts_ns(dsn_now_ns()), + _tid(++s_tid), + _is_sync_to_child(false) { - next = nullptr; - _private0 = 0; _not_logged = 1; - _prepare_ts_ms = 0; strcpy(_name, "0.0.0.0"); - _appro_data_bytes = sizeof(mutation_header); - _create_ts_ns = dsn_now_ns(); - _tid = ++s_tid; - _is_sync_to_child = false; - _tracer = std::make_shared( - false, "mutation", FLAGS_abnormal_write_trace_latency_threshold); } mutation_ptr mutation::copy_no_reply(const mutation_ptr &old_mu) @@ -180,6 +179,12 @@ void mutation::add_client_request(task_code code, dsn::message_ex *request) CHECK_EQ(client_requests.size(), data.updates.size()); } +void mutation::add_client_request(dsn::message_ex *request) +{ + CHECK_NOTNULL(request, ""); + add_client_request(request->rpc_code(), request); +} + void mutation::write_to(const std::function &inserter) const { binary_writer writer(1024); @@ -394,15 +399,16 @@ mutation_ptr mutation_queue::try_block(mutation_ptr &mu) return try_unblock(); } -mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request) +mutation_ptr mutation_queue::add_work(message_ex *request) { - auto *spec = task_spec::get(code); + CHECK_NOTNULL(request, ""); + + auto *spec = task_spec::get(request->rpc_code()); CHECK_NOTNULL(spec, ""); // If batch is not allowed for this write, switch work queue if (_pending_mutation != nullptr && !spec->rpc_request_is_write_allow_batch) { - _pending_mutation->add_ref(); // Would be released during unlink. - _hdr.add(_pending_mutation); + _queue.push(_pending_mutation); _pending_mutation.reset(); ++(*_pcount); } @@ -410,17 +416,17 @@ mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request) // Add to work queue if (_pending_mutation == nullptr) { _pending_mutation = - _replica->new_mutation(invalid_decree, _replica->need_make_idempotent(request)); + _replica->new_mutation(invalid_decree, _replica->need_make_idempotent(spec)); } LOG_DEBUG("add request with trace_id = {:#018x} into mutation with mutation_tid = {}", request->header->trace_id, _pending_mutation->tid()); - _pending_mutation->add_client_request(code, request); + _pending_mutation->add_client_request(request); // short-cut - if (_current_op_count < _max_concurrent_op && _hdr.is_empty()) { + if (_current_op_count < _max_concurrent_op && _queue.empty()) { if (_blocking_mutation != nullptr) { return try_unblock(); } @@ -434,8 +440,7 @@ mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request) // check if need to switch work queue if (_batch_write_disabled || !spec->rpc_request_is_write_allow_batch || _pending_mutation->is_full()) { - _pending_mutation->add_ref(); // released when unlink - _hdr.add(_pending_mutation); + _queue.push(_pending_mutation); _pending_mutation.reset(); ++(*_pcount); } @@ -451,7 +456,7 @@ mutation_ptr mutation_queue::add_work(task_code code, dsn::message_ex *request) // Try to fetch next work. mutation_ptr mu; - if (_hdr.is_empty()) { + if (_queue.empty()) { CHECK_NOTNULL(_pending_mutation, "pending mutation cannot be null"); mu = _pending_mutation; @@ -476,7 +481,7 @@ mutation_ptr mutation_queue::check_possible_work(int current_running_count) } mutation_ptr mu; - if (_hdr.is_empty()) { + if (_queue.empty()) { // no further workload if (_pending_mutation == nullptr) { return {}; @@ -532,5 +537,4 @@ void mutation_queue::clear(std::vector &queued_mutations) // _current_op_count = 0; } -} // namespace replication -} // namespace dsn +} // namespace dsn::replication diff --git a/src/replica/mutation.h b/src/replica/mutation.h index a021e80f6a..b919076d81 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include "common/replication_common.h" @@ -41,7 +42,6 @@ #include "task/task_code.h" #include "utils/autoref_ptr.h" #include "utils/fmt_logging.h" -#include "utils/link.h" namespace dsn { class binary_reader; @@ -106,6 +106,7 @@ class mutation : public ref_counter void set_id(ballot b, decree c); void set_timestamp(int64_t timestamp) { data.header.timestamp = timestamp; } void add_client_request(task_code code, dsn::message_ex *request); + void add_client_request(dsn::message_ex *request); void copy_from(mutation_ptr &old); void set_logged() { @@ -163,9 +164,6 @@ class mutation : public ref_counter // its original request for the purpose of replying to the client. dsn::message_ptr original_request; - // used by pending mutation queue only - mutation *next; - std::shared_ptr _tracer; void set_is_sync_to_child(bool sync_to_child) { _is_sync_to_child = sync_to_child; } @@ -207,9 +205,11 @@ class replica; // mutation queue are queues for mutations waiting to send. // more precisely: for client requests waiting to send. -// mutations are queued as "_hdr + _pending_mutation". that is to say, _hdr.first is the first +// mutations are queued as "_queue + _pending_mutation". that is to say, _queue.first is the first // element in the queue, and pending_mutations is the last. // +// However, once _blocking_mutation is non-null, it is the first element. +// // we keep 2 structure "hdr" and "pending_mutation" coz: // 1. as a container of client requests, capacity of a mutation is limited, so incoming client // requets should be packed into different mutations @@ -223,13 +223,18 @@ class mutation_queue ~mutation_queue() { clear(); - CHECK(_hdr.is_empty(), + CHECK(_queue.empty(), "work queue is deleted when there are still {} running ops or pending work items " "in queue", _current_op_count); } - mutation_ptr add_work(task_code code, dsn::message_ex *request); + mutation_queue(const mutation_queue &) = default; + mutation_queue &operator=(const mutation_queue &) = default; + mutation_queue(mutation_queue &&) = default; + mutation_queue &operator=(mutation_queue &&) = default; + + mutation_ptr add_work(dsn::message_ex *request); void clear(); // called when you want to clear the mutation_queue and want to get the remaining messages @@ -245,12 +250,15 @@ class mutation_queue mutation_ptr unlink_next_workload() { - mutation_ptr r = _hdr.pop_one(); - if (r.get() != nullptr) { - r->release_ref(); // added in add_work - --(*_pcount); + if (_queue.empty()) { + return {}; } - return r; + + const auto work = _queue.front(); + _queue.pop(); + --(*_pcount); + + return work; } void reset_max_concurrent_ops(int max_c) { _max_concurrent_op = max_c; } @@ -263,7 +271,7 @@ class mutation_queue volatile int *_pcount; mutation_ptr _pending_mutation; - slist _hdr; + std::queue _queue; // Once a mutation that would get popped is blocking, it should firstly be put in // `_blocking_mutation`; then, the queue would always return nullptr until previous diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 68dea6bf31..55277d0bbb 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -275,6 +275,7 @@ replica::replica(replica_stub *stub, : serverlet(replication_options::kReplicaAppType.c_str()), replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_host_port_cache), app.app_name), _app_info(app), + _make_write_idempotent(false), _primary_states(this, gpid, FLAGS_staleness_for_commit, FLAGS_batch_write_disabled), _potential_secondary_states(this), _chkpt_total_size(0), diff --git a/src/replica/replica.h b/src/replica/replica.h index 9b4607f003..f8dfbdb975 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -163,7 +163,7 @@ struct deny_client class replica : public serverlet, public ref_counter, public replica_base { public: - ~replica(void); + ~replica(); // return true when the mutation is valid for the current replica bool replay_mutation(mutation_ptr &mu, bool is_private); @@ -384,13 +384,14 @@ class replica : public serverlet, public ref_counter, public replica_ba // 2pc // - spec: should never be NULL (otherwise the behaviour is undefined). - bool need_reject_non_idempotent(task_spec *spec); + bool need_reject_non_idempotent(task_spec *spec) const; // Decide if it is needed to make the request idempotent. // - spec: should never be NULL (otherwise the behaviour is undefined). - bool need_make_idempotent(task_spec *spec); - bool need_make_idempotent(dsn::message_ex *request, task_spec *spec); - bool need_make_idempotent(dsn::message_ex *request); + bool need_make_idempotent(task_spec *spec) const; + + // Decide if it is needed to make the request idempotent. + bool need_make_idempotent(message_ex *request) const; // Make the request in the mutation idempotent, if needed. int make_idempotent(mutation_ptr &mu); @@ -667,6 +668,9 @@ class replica : public serverlet, public ref_counter, public replica_ba app_info _app_info; std::map _extra_envs; + // TODO(wangdan): temporarily used to record, would support soon. + bool _make_write_idempotent; + // uniq timestamp generator for this replica. // // we use it to generate an increasing timestamp for current replica diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 3d4c187d49..cb5014be23 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -84,13 +84,6 @@ DSN_DEFINE_bool(replication, "reject client write requests if disk status is space insufficient"); DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE); -DSN_DEFINE_bool(replication, - make_write_idempotent, - true, - "Whether to make atomic writes idempotent, including incr, check_and_set " - "and check_and_mutate"); -DSN_TAG_VARIABLE(make_write_idempotent, FT_MUTABLE); - DSN_DEFINE_int32(replication, prepare_timeout_ms_for_secondaries, 3000, @@ -236,53 +229,48 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) } LOG_DEBUG_PREFIX("got write request from {}", request->header->from_address); - auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request); + auto mu = _primary_states.write_queue.add_work(request); if (mu != nullptr) { init_prepare(mu, false); } } -bool replica::need_reject_non_idempotent(task_spec *spec) +bool replica::need_reject_non_idempotent(task_spec *spec) const { if (!is_duplication_master()) { return false; } - if (FLAGS_make_write_idempotent) { + if (_make_write_idempotent) { return false; } return !spec->rpc_request_is_write_idempotent; } -bool replica::need_make_idempotent(task_spec *spec) +bool replica::need_make_idempotent(task_spec *spec) const { - if (!FLAGS_make_write_idempotent) { + if (!_make_write_idempotent) { return false; } return !spec->rpc_request_is_write_idempotent; } -bool replica::need_make_idempotent(message_ex *request, task_spec *spec) +bool replica::need_make_idempotent(message_ex *request) const { if (request == nullptr) { return false; } - return need_make_idempotent(spec); -} - -bool replica::need_make_idempotent(dsn::message_ex *request) -{ - if (request == nullptr) { + if (!_make_write_idempotent) { return false; } auto *spec = task_spec::get(request->rpc_code()); CHECK_NOTNULL(spec, "RPC code {} not found", request->rpc_code()); - return need_make_idempotent(spec); + return !spec->rpc_request_is_write_idempotent; } int replica::make_idempotent(mutation_ptr &mu) @@ -324,7 +312,7 @@ int replica::make_idempotent(mutation_ptr &mu) // No need to create the mutation with is_blocking set to true, since the old mutation has // been previously popped out from the mutation queue. mu = new_mutation(invalid_decree, request); - mu->add_client_request(new_request->rpc_code(), new_request); + mu->add_client_request(new_request); return rocksdb::Status::kOk; } From bbaf6988af53dd1152862488d7c6a21928fab8b8 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 26 Feb 2025 18:23:18 +0800 Subject: [PATCH 04/18] fix clang-tidy --- src/replica/mutation.cpp | 8 ++++++-- src/replica/replica.cpp | 2 +- src/replica/replica.h | 5 +++-- src/replica/replica_2pc.cpp | 5 +++-- src/replica/replica_base.h | 2 ++ src/replica/replica_context.h | 1 - src/runtime/serverlet.h | 7 +------ src/task/task_spec.cpp | 6 +++--- 8 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index 2053790169..22b743e61b 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -59,14 +59,18 @@ std::atomic mutation::s_tid(0); mutation::mutation() : _tracer(std::make_shared( false, "mutation", FLAGS_abnormal_write_trace_latency_threshold)), - _private0(0), + _not_logged(1), + _left_secondary_ack_count(0), + _left_potential_secondary_ack_count(0), + _wait_child(0), + _is_error_acked(0), _prepare_ts_ms(0), + _name{0}, _appro_data_bytes(sizeof(mutation_header)), _create_ts_ns(dsn_now_ns()), _tid(++s_tid), _is_sync_to_child(false) { - _not_logged = 1; strcpy(_name, "0.0.0.0"); } diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 55277d0bbb..58ac141d4a 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -378,7 +378,7 @@ void replica::init_state() get_bool_envs(_app_info.envs, replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND, _allow_ingest_behind); } -replica::~replica(void) +replica::~replica() { close(); _prepare_list = nullptr; diff --git a/src/replica/replica.h b/src/replica/replica.h index f8dfbdb975..92db326407 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -71,6 +71,7 @@ class rocksdb_wrapper_test; namespace dsn { class gpid; class host_port; +class task_spec; namespace dist { @@ -86,7 +87,6 @@ namespace replication { class backup_request; class backup_response; - class configuration_restore_request; class detect_hotkey_request; class detect_hotkey_response; @@ -107,6 +107,7 @@ class replica_stub; class replication_app_base; class replication_options; struct dir_node; + typedef dsn::ref_ptr cold_backup_context_ptr; namespace test { @@ -163,7 +164,7 @@ struct deny_client class replica : public serverlet, public ref_counter, public replica_base { public: - ~replica(); + ~replica() override; // return true when the mutation is valid for the current replica bool replay_mutation(mutation_ptr &mu, bool is_private); diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index cb5014be23..e0e323a2b3 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -26,9 +26,10 @@ #include #include -#include +#include #include #include +#include #include #include #include @@ -342,7 +343,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c mu->get_decree() % FLAGS_prepare_decree_gap_for_debug_logging == 0) { level = LOG_LEVEL_INFO; } - mu->set_timestamp(_uniq_timestamp_us.next()); + mu->set_timestamp(static_cast(_uniq_timestamp_us.next())); } else { mu->set_id(get_ballot(), mu->data.header.decree); } diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h index 79583c13f1..b0150a6757 100644 --- a/src/replica/replica_base.h +++ b/src/replica/replica_base.h @@ -46,6 +46,8 @@ struct replica_base { } + virtual ~replica_base() = default; + gpid get_gpid() const { return _gpid; } const char *replica_name() const { return _name.c_str(); } diff --git a/src/replica/replica_context.h b/src/replica/replica_context.h index 69a419ca44..e40cf83dd0 100644 --- a/src/replica/replica_context.h +++ b/src/replica/replica_context.h @@ -118,7 +118,6 @@ class primary_context bool secondary_disk_abnormal() const; -public: // membership mgr, including learners partition_configuration pc; node_statuses statuses; diff --git a/src/runtime/serverlet.h b/src/runtime/serverlet.h index e5a7df0b38..03541ae075 100644 --- a/src/runtime/serverlet.h +++ b/src/runtime/serverlet.h @@ -108,7 +108,7 @@ class serverlet { public: explicit serverlet(const char *nm); - virtual ~serverlet(); + virtual ~serverlet() = default; protected: template @@ -153,11 +153,6 @@ inline serverlet::serverlet(const char *nm) : _name(nm) { } -template -inline serverlet::~serverlet() -{ -} - template template inline bool serverlet::register_rpc_handler(task_code rpc_code, diff --git a/src/task/task_spec.cpp b/src/task/task_spec.cpp index 9ed57941da..1503e52e2a 100644 --- a/src/task/task_spec.cpp +++ b/src/task/task_spec.cpp @@ -59,10 +59,10 @@ void task_spec::register_task_code(task_code code, { CHECK_GE(code, 0); CHECK_LT(code, TASK_SPEC_STORE_CAPACITY); - if (s_task_spec_store[code] == nullptr) { - s_task_spec_store[code] = + if (s_task_spec_store.at(code) == nullptr) { + s_task_spec_store.at(code) = std::make_unique(code, code.to_string(), type, pri, pool); - auto &spec = s_task_spec_store[code]; + auto &spec = s_task_spec_store.at(code); if (type == TASK_TYPE_RPC_REQUEST) { std::string ack_name = std::string(code.to_string()) + std::string("_ACK"); From 48c6caa9aafb76e0fac65eb4fd68991c8c3793bf Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 26 Feb 2025 18:59:24 +0800 Subject: [PATCH 05/18] fix clang-tidy --- src/replica/mutation.h | 10 ++++----- src/replica/replica.h | 21 ++++++++++++------- src/replica/replica_2pc.cpp | 2 +- src/replica/replica_base.h | 4 ++-- src/replica/split/test/replica_split_test.cpp | 6 +----- 5 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/replica/mutation.h b/src/replica/mutation.h index b919076d81..de0baaad79 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -57,7 +57,7 @@ namespace replication { class mutation; -typedef dsn::ref_ptr mutation_ptr; +using mutation_ptr = dsn::ref_ptr; // mutation is the 2pc unit of PacificA, which wraps one or more client requests and add // header informations related to PacificA algorithm for them. @@ -229,10 +229,10 @@ class mutation_queue _current_op_count); } - mutation_queue(const mutation_queue &) = default; - mutation_queue &operator=(const mutation_queue &) = default; - mutation_queue(mutation_queue &&) = default; - mutation_queue &operator=(mutation_queue &&) = default; + mutation_queue(const mutation_queue &) = delete; + mutation_queue &operator=(const mutation_queue &) = delete; + mutation_queue(mutation_queue &&) = delete; + mutation_queue &operator=(mutation_queue &&) = delete; mutation_ptr add_work(dsn::message_ex *request); diff --git a/src/replica/replica.h b/src/replica/replica.h index 92db326407..28fd03ed5d 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -73,16 +73,14 @@ class gpid; class host_port; class task_spec; -namespace dist { - -namespace block_service { +namespace dist::block_service { class block_filesystem; -} // namespace block_service -} // namespace dist +} // namespace dist::block_service namespace security { class access_controller; } // namespace security + namespace replication { class backup_request; @@ -108,11 +106,11 @@ class replication_app_base; class replication_options; struct dir_node; -typedef dsn::ref_ptr cold_backup_context_ptr; +using cold_backup_context_ptr = dsn::ref_ptr; namespace test { class test_checker; -} +} // namespace test #define CHECK_REQUEST_IF_SPLITTING(op_type) \ do { \ @@ -166,6 +164,11 @@ class replica : public serverlet, public ref_counter, public replica_ba public: ~replica() override; + replica(const replica &) = delete; + replica &operator=(const replica &) = delete; + replica(replica &&) = delete; + replica &operator=(replica &&) = delete; + // return true when the mutation is valid for the current replica bool replay_mutation(mutation_ptr &mu, bool is_private); void reset_prepare_list_after_replay(); @@ -796,6 +799,8 @@ class replica : public serverlet, public ref_counter, public replica_ba // Indicate where the storage engine data is corrupted and unrecoverable. bool _data_corrupted{false}; }; -typedef dsn::ref_ptr replica_ptr; + +using replica_ptr = dsn::ref_ptr; + } // namespace replication } // namespace dsn diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index e0e323a2b3..5d5bc7d8d5 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -25,10 +25,10 @@ */ #include -#include #include #include #include +#include #include #include #include diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h index b0150a6757..e28461004a 100644 --- a/src/replica/replica_base.h +++ b/src/replica/replica_base.h @@ -48,9 +48,9 @@ struct replica_base virtual ~replica_base() = default; - gpid get_gpid() const { return _gpid; } + [[nodiscard]] gpid get_gpid() const { return _gpid; } - const char *replica_name() const { return _name.c_str(); } + [[nodiscard]] const char *replica_name() const { return _name.c_str(); } const char *app_name() const { return _app_name.c_str(); } diff --git a/src/replica/split/test/replica_split_test.cpp b/src/replica/split/test/replica_split_test.cpp index 2f18fe751e..8f3e64088f 100644 --- a/src/replica/split/test/replica_split_test.cpp +++ b/src/replica/split/test/replica_split_test.cpp @@ -495,10 +495,6 @@ class replica_split_test : public replica_test_base _parent_split_mgr->_split_status = status; } - primary_context get_replica_primary_context(mock_replica_ptr rep) - { - return rep->_primary_states; - } bool parent_sync_send_write_request() { return _parent_replica->_primary_states.sync_send_write_request; @@ -515,7 +511,7 @@ class replica_split_test : public replica_test_base } bool primary_parent_not_in_split() { - auto context = _parent_replica->_primary_states; + const auto &context = _parent_replica->_primary_states; return context.caught_up_children.size() == 0 && context.register_child_task == nullptr && context.sync_send_write_request == false && context.query_child_task == nullptr && context.split_stopped_secondary.size() == 0 && is_parent_not_in_split(); From f75c2a34e918c4ebad4768db7ea1c390597e2466 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 26 Feb 2025 19:38:27 +0800 Subject: [PATCH 06/18] fix clang-tidy --- src/replica/mutation.cpp | 11 +++++----- src/replica/replica_base.h | 4 ++-- src/replica/split/test/replica_split_test.cpp | 22 +++++++++---------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index 22b743e61b..f3737a4bd4 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -59,11 +59,7 @@ std::atomic mutation::s_tid(0); mutation::mutation() : _tracer(std::make_shared( false, "mutation", FLAGS_abnormal_write_trace_latency_threshold)), - _not_logged(1), - _left_secondary_ack_count(0), - _left_potential_secondary_ack_count(0), - _wait_child(0), - _is_error_acked(0), + _private0(0), _prepare_ts_ms(0), _name{0}, _appro_data_bytes(sizeof(mutation_header)), @@ -71,6 +67,11 @@ mutation::mutation() _tid(++s_tid), _is_sync_to_child(false) { + _not_logged = 1; + _left_secondary_ack_count = 0; + _left_potential_secondary_ack_count = 0; + _wait_child = false; + _is_error_acked = false; strcpy(_name, "0.0.0.0"); } diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h index e28461004a..8590f871ec 100644 --- a/src/replica/replica_base.h +++ b/src/replica/replica_base.h @@ -52,9 +52,9 @@ struct replica_base [[nodiscard]] const char *replica_name() const { return _name.c_str(); } - const char *app_name() const { return _app_name.c_str(); } + [[nodiscard]] const char *app_name() const { return _app_name.c_str(); } - const char *log_prefix() const { return _name.c_str(); } + [[nodiscard]] const char *log_prefix() const { return _name.c_str(); } const metric_entity_ptr &replica_metric_entity() const { diff --git a/src/replica/split/test/replica_split_test.cpp b/src/replica/split/test/replica_split_test.cpp index 8f3e64088f..9b608ae7b1 100644 --- a/src/replica/split/test/replica_split_test.cpp +++ b/src/replica/split/test/replica_split_test.cpp @@ -482,39 +482,39 @@ class replica_split_test : public replica_test_base _child_replica->tracker()->wait_outstanding_tasks(); } - int32_t child_get_prepare_list_count() { return _child_replica->get_plist()->count(); } - bool child_is_prepare_list_copied() + int32_t child_get_prepare_list_count() const { return _child_replica->get_plist()->count(); } + bool child_is_prepare_list_copied() const { return _child_replica->_split_states.is_prepare_list_copied; } - bool child_is_caught_up() { return _child_replica->_split_states.is_caught_up; } + bool child_is_caught_up() const { return _child_replica->_split_states.is_caught_up; } - split_status::type parent_get_split_status() { return _parent_split_mgr->_split_status; } + split_status::type parent_get_split_status() const { return _parent_split_mgr->_split_status; } void parent_set_split_status(split_status::type status) { _parent_split_mgr->_split_status = status; } - bool parent_sync_send_write_request() + bool parent_sync_send_write_request() const { return _parent_replica->_primary_states.sync_send_write_request; } - int32_t parent_stopped_split_size() + int32_t parent_stopped_split_size() const { return _parent_replica->_primary_states.split_stopped_secondary.size(); } - bool is_parent_not_in_split() + bool is_parent_not_in_split() const { return _parent_split_mgr->_child_gpid.get_app_id() == 0 && _parent_split_mgr->_child_init_ballot == 0 && _parent_split_mgr->_split_status == split_status::NOT_SPLIT; } - bool primary_parent_not_in_split() + bool primary_parent_not_in_split() const { const auto &context = _parent_replica->_primary_states; - return context.caught_up_children.size() == 0 && context.register_child_task == nullptr && - context.sync_send_write_request == false && context.query_child_task == nullptr && - context.split_stopped_secondary.size() == 0 && is_parent_not_in_split(); + return context.caught_up_children.empty() && context.register_child_task == nullptr && + !context.sync_send_write_request && context.query_child_task == nullptr && + context.split_stopped_secondary.empty() && is_parent_not_in_split(); } public: From 7311bd6e3edc1bfefef4010ae6bc5e1ef8404e41 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 26 Feb 2025 20:37:00 +0800 Subject: [PATCH 07/18] fix clang-tidy and IWYU --- src/replica/replica_base.h | 2 +- src/replica/split/test/replica_split_test.cpp | 28 ++++++++++++------- src/replica/test/cold_backup_context_test.cpp | 1 - 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h index 8590f871ec..3282ed2207 100644 --- a/src/replica/replica_base.h +++ b/src/replica/replica_base.h @@ -56,7 +56,7 @@ struct replica_base [[nodiscard]] const char *log_prefix() const { return _name.c_str(); } - const metric_entity_ptr &replica_metric_entity() const + [[nodiscard]] const metric_entity_ptr &replica_metric_entity() const { CHECK_NOTNULL(_replica_metric_entity, "replica metric entity (table_id={}, partition_id={}) should has been " diff --git a/src/replica/split/test/replica_split_test.cpp b/src/replica/split/test/replica_split_test.cpp index 9b608ae7b1..dc1831e309 100644 --- a/src/replica/split/test/replica_split_test.cpp +++ b/src/replica/split/test/replica_split_test.cpp @@ -482,34 +482,43 @@ class replica_split_test : public replica_test_base _child_replica->tracker()->wait_outstanding_tasks(); } - int32_t child_get_prepare_list_count() const { return _child_replica->get_plist()->count(); } - bool child_is_prepare_list_copied() const + [[nodiscard]] int32_t child_get_prepare_list_count() const + { + return _child_replica->get_plist()->count(); + } + [[nodiscard]] bool child_is_prepare_list_copied() const { return _child_replica->_split_states.is_prepare_list_copied; } - bool child_is_caught_up() const { return _child_replica->_split_states.is_caught_up; } + [[nodiscard]] bool child_is_caught_up() const + { + return _child_replica->_split_states.is_caught_up; + } - split_status::type parent_get_split_status() const { return _parent_split_mgr->_split_status; } - void parent_set_split_status(split_status::type status) + [[nodiscard]] split_status::type parent_get_split_status() const + { + return _parent_split_mgr->_split_status; + } + void parent_set_split_status(split_status::type status) const { _parent_split_mgr->_split_status = status; } - bool parent_sync_send_write_request() const + [[nodiscard]] bool parent_sync_send_write_request() const { return _parent_replica->_primary_states.sync_send_write_request; } - int32_t parent_stopped_split_size() const + [[nodiscard]] int32_t parent_stopped_split_size() const { return _parent_replica->_primary_states.split_stopped_secondary.size(); } - bool is_parent_not_in_split() const + [[nodiscard]] bool is_parent_not_in_split() const { return _parent_split_mgr->_child_gpid.get_app_id() == 0 && _parent_split_mgr->_child_init_ballot == 0 && _parent_split_mgr->_split_status == split_status::NOT_SPLIT; } - bool primary_parent_not_in_split() const + [[nodiscard]] bool primary_parent_not_in_split() const { const auto &context = _parent_replica->_primary_states; return context.caught_up_children.empty() && context.register_child_task == nullptr && @@ -517,7 +526,6 @@ class replica_split_test : public replica_test_base context.split_stopped_secondary.empty() && is_parent_not_in_split(); } -public: const std::string APP_NAME = "split_table"; const int32_t APP_ID = 2; const int32_t OLD_PARTITION_COUNT = 8; diff --git a/src/replica/test/cold_backup_context_test.cpp b/src/replica/test/cold_backup_context_test.cpp index 529ac0a09c..8d7478bfc0 100644 --- a/src/replica/test/cold_backup_context_test.cpp +++ b/src/replica/test/cold_backup_context_test.cpp @@ -35,7 +35,6 @@ #include "gtest/gtest.h" #include "metadata_types.h" #include "replica/backup/cold_backup_context.h" -#include "replica/replica.h" #include "replica/test/replication_service_test_app.h" #include "utils/autoref_ptr.h" #include "utils/blob.h" From 88dc7f85b2c46780a095d41c0ac4bab8e0ca20ae Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 26 Feb 2025 22:47:04 +0800 Subject: [PATCH 08/18] fix clang-tidy --- src/replica/replica.h | 16 +++++++++--- src/replica/replica_2pc.cpp | 25 +++++++++++-------- src/replica/split/test/replica_split_test.cpp | 4 +-- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/replica/replica.h b/src/replica/replica.h index 28fd03ed5d..cff25b7ea7 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -402,14 +402,22 @@ class replica : public serverlet, public ref_counter, public replica_ba // `pop_all_committed_mutations = true` will be used for ingestion empty write // See more about it in `replica_bulk_loader.cpp` - void - init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_committed_mutations = false); + void init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_committed_mutations); + + void init_prepare(mutation_ptr &mu, bool reconciliation) + { + init_prepare(mu, reconciliation, false); + } + + // + void reply_with_error(const mutation_ptr &mu, const error_code &err); + void send_prepare_message(const ::dsn::host_port &addr, partition_status::type status, const mutation_ptr &mu, int timeout_milliseconds, - bool pop_all_committed_mutations = false, - int64_t learn_signature = invalid_signature); + bool pop_all_committed_mutations, + int64_t learn_signature); void on_append_log_completed(mutation_ptr &mu, error_code err, size_t size); void on_prepare_reply(std::pair pr, error_code err, diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 5d5bc7d8d5..3389f6a700 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -357,7 +357,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c // check bounded staleness if (mu->data.header.decree > last_committed_decree() + FLAGS_staleness_for_commit) { err = ERR_CAPACITY_EXCEEDED; - goto ErrOut; + reply_with_error(mu, err); + return; } // stop prepare bulk load ingestion if there are secondaries unalive @@ -374,7 +375,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c } } if (err != ERR_OK) { - goto ErrOut; + reply_with_error(mu, err); + return; } // stop prepare if there are too few replicas unless it's a reconciliation @@ -384,7 +386,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count) && !reconciliation) { err = ERR_NOT_ENOUGH_MEMBER; - goto ErrOut; + reply_with_error(mu, err); + return; } CHECK_GT(mu->data.header.decree, last_committed_decree()); @@ -392,7 +395,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c // local prepare err = _prepare_list->prepare(mu, partition_status::PS_PRIMARY, pop_all_committed_mutations); if (err != ERR_OK) { - goto ErrOut; + reply_with_error(mu, err); + return; } // remote prepare @@ -404,7 +408,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c partition_status::PS_SECONDARY, mu, FLAGS_prepare_timeout_ms_for_secondaries, - pop_all_committed_mutations); + pop_all_committed_mutations, + invalid_signature); } count = 0; @@ -446,19 +451,19 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c } _primary_states.last_prepare_ts_ms = mu->prepare_ts_ms(); - return; +} -ErrOut: +void replica::reply_with_error(const mutation_ptr &mu, const error_code &err) +{ if (mu->original_request != nullptr) { // Respond to the original atomic request. And it would never be batched. response_client_write(mu->original_request, err); return; } - for (auto &r : mu->client_requests) { - response_client_write(r, err); + for (auto *req : mu->client_requests) { + response_client_write(req, err); } - return; } void replica::send_prepare_message(const ::dsn::host_port &hp, diff --git a/src/replica/split/test/replica_split_test.cpp b/src/replica/split/test/replica_split_test.cpp index dc1831e309..7ff3a3fc3d 100644 --- a/src/replica/split/test/replica_split_test.cpp +++ b/src/replica/split/test/replica_split_test.cpp @@ -508,7 +508,7 @@ class replica_split_test : public replica_test_base { return _parent_replica->_primary_states.sync_send_write_request; } - [[nodiscard]] int32_t parent_stopped_split_size() const + [[nodiscard]] size_t parent_stopped_split_size() const { return _parent_replica->_primary_states.split_stopped_secondary.size(); } @@ -982,7 +982,7 @@ TEST_P(replica_split_test, primary_parent_handle_stop_test) split_status::type meta_split_status; bool lack_of_secondary; bool will_all_stop; - int32_t expected_size; + size_t expected_size; bool expected_all_stopped; } tests[]{{split_status::NOT_SPLIT, false, false, 0, false}, {split_status::SPLITTING, false, false, 0, false}, From 9b3f2adf22c2ee9682236ea952c535e7557900fa Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 26 Feb 2025 22:58:36 +0800 Subject: [PATCH 09/18] fix clang-tidy --- src/replica/replica.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replica/replica.h b/src/replica/replica.h index cff25b7ea7..f79849524b 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -412,7 +412,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // void reply_with_error(const mutation_ptr &mu, const error_code &err); - void send_prepare_message(const ::dsn::host_port &addr, + void send_prepare_message(const ::dsn::host_port &hp, partition_status::type status, const mutation_ptr &mu, int timeout_milliseconds, From 6ce2bb609cee025a495da9b20ea75188bdfa0fc8 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 27 Feb 2025 10:55:27 +0800 Subject: [PATCH 10/18] fix IWYU --- src/replica/split/test/replica_split_test.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/replica/split/test/replica_split_test.cpp b/src/replica/split/test/replica_split_test.cpp index 7ff3a3fc3d..fc1292131b 100644 --- a/src/replica/split/test/replica_split_test.cpp +++ b/src/replica/split/test/replica_split_test.cpp @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -#include #include +#include +#include #include #include #include From 6209bae74e49cb68e01dfb39284c617ceb6bbe95 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 27 Feb 2025 15:27:02 +0800 Subject: [PATCH 11/18] refactor --- src/replica/bulk_load/replica_bulk_loader.cpp | 4 ++-- src/replica/mutation.cpp | 10 ++-------- src/replica/mutation.h | 8 +++++++- src/replica/replica_check.cpp | 2 +- src/replica/replica_chkpt.cpp | 2 +- src/replica/replica_config.cpp | 2 +- src/replica/split/replica_split_manager.cpp | 2 +- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 8b876b4200..d2847e1fe1 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -708,7 +708,7 @@ void replica_bulk_loader::check_ingestion_finish() // checkpoint, to gurantee the condition above, we should pop all committed mutations in // prepare list to gurantee learn type is LT_APP mutation_ptr mu = _replica->new_mutation(invalid_decree); - mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); + mu->add_client_request(nullptr); _replica->init_prepare(mu, false, true); _replica->_primary_states.ingestion_is_empty_prepare_sent = true; } @@ -727,7 +727,7 @@ void replica_bulk_loader::handle_bulk_load_succeed() // send an empty prepare again to gurantee that learner should learn from checkpoint if (status() == partition_status::PS_PRIMARY) { mutation_ptr mu = _replica->new_mutation(invalid_decree); - mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); + mu->add_client_request(nullptr); _replica->init_prepare(mu, false, true); } } diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index f3737a4bd4..fe42463e56 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -154,14 +154,14 @@ void mutation::copy_from(mutation_ptr &old) } } -void mutation::add_client_request(task_code code, dsn::message_ex *request) +void mutation::add_client_request(dsn::message_ex *request) { data.updates.push_back(mutation_update()); mutation_update &update = data.updates.back(); _appro_data_bytes += 32; // approximate code size if (request != nullptr) { - update.code = code; + update.code = request->rpc_code(); update.serialization_type = (dsn_msg_serialize_format)request->header->context.u.serialize_format; update.__set_start_time_ns(dsn_now_ns()); @@ -184,12 +184,6 @@ void mutation::add_client_request(task_code code, dsn::message_ex *request) CHECK_EQ(client_requests.size(), data.updates.size()); } -void mutation::add_client_request(dsn::message_ex *request) -{ - CHECK_NOTNULL(request, ""); - add_client_request(request->rpc_code(), request); -} - void mutation::write_to(const std::function &inserter) const { binary_writer writer(1024); diff --git a/src/replica/mutation.h b/src/replica/mutation.h index de0baaad79..33612d8fc1 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -105,8 +105,14 @@ class mutation : public ref_counter // state change void set_id(ballot b, decree c); void set_timestamp(int64_t timestamp) { data.header.timestamp = timestamp; } - void add_client_request(task_code code, dsn::message_ex *request); + + // Append a write request to this mutation, and also hold it if it is from a client + // to build the response to the client later. + // + // Parameters: + // - request: it is from a client if non-null, otherwise it is an empty write. void add_client_request(dsn::message_ex *request); + void copy_from(mutation_ptr &old); void set_logged() { diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp index c0e13aca55..c4f4c01e16 100644 --- a/src/replica/replica_check.cpp +++ b/src/replica/replica_check.cpp @@ -171,7 +171,7 @@ void replica::broadcast_group_check() if (!FLAGS_empty_write_disabled && dsn_now_ms() >= _primary_states.last_prepare_ts_ms + FLAGS_group_check_interval_ms) { mutation_ptr mu = new_mutation(invalid_decree); - mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); + mu->add_client_request(nullptr); init_prepare(mu, false); } } diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp index b975626b61..6ef6f451c7 100644 --- a/src/replica/replica_chkpt.cpp +++ b/src/replica/replica_chkpt.cpp @@ -227,7 +227,7 @@ void replica::async_trigger_manual_emergency_checkpoint(decree min_checkpoint_de // the decree to at least 1, to ensure that the checkpoint would inevitably // be created even if the replica is empty. mutation_ptr mu = new_mutation(invalid_decree); - mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); + mu->add_client_request(nullptr); init_prepare(mu, false); async_trigger_manual_emergency_checkpoint( diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp index 3c1d2ccc53..4b0671d78e 100644 --- a/src/replica/replica_config.cpp +++ b/src/replica/replica_config.cpp @@ -1175,7 +1175,7 @@ void replica::replay_prepare_list() "copy mutation from mutation_tid={} to mutation_tid={}", old->tid(), mu->tid()); mu->copy_from(old); } else { - mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); + mu->add_client_request(nullptr); LOG_INFO_PREFIX("emit empty mutation {} with mutation_tid={} when replay prepare list", mu->name(), diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index 0e2c9422bb..56f4c00e21 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -719,7 +719,7 @@ void replica_split_manager::parent_handle_child_catch_up( if (!FLAGS_empty_write_disabled) { // empty wirte here to commit sync_point mutation_ptr mu = _replica->new_mutation(invalid_decree); - mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); + mu->add_client_request(nullptr); _replica->init_prepare(mu, false); CHECK_EQ_PREFIX_MSG( sync_point, mu->data.header.decree, "sync_point should be equal to mutation's decree"); From d1f97616037f7a545ae835cd034765513e77c33a Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 27 Feb 2025 16:48:37 +0800 Subject: [PATCH 12/18] fix clang-tidy and IWYU --- src/replica/mutation.cpp | 17 +++++++++-------- src/replica/mutation.h | 8 ++++---- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index fe42463e56..650d458b1d 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -36,6 +36,7 @@ #include "common/replication.codes.h" #include "replica.h" #include "runtime/api_task.h" +#include "task/task_code.h" #include "task/task_spec.h" #include "utils/binary_reader.h" #include "utils/binary_writer.h" @@ -156,27 +157,27 @@ void mutation::copy_from(mutation_ptr &old) void mutation::add_client_request(dsn::message_ex *request) { - data.updates.push_back(mutation_update()); + data.updates.emplace_back(); mutation_update &update = data.updates.back(); _appro_data_bytes += 32; // approximate code size if (request != nullptr) { update.code = request->rpc_code(); update.serialization_type = - (dsn_msg_serialize_format)request->header->context.u.serialize_format; - update.__set_start_time_ns(dsn_now_ns()); + static_cast(request->header->context.u.serialize_format); + update.__set_start_time_ns(static_cast(dsn_now_ns())); request->add_ref(); // released on dctor - void *ptr; - size_t size; + void *ptr = nullptr; + size_t size = 0; CHECK(request->read_next(&ptr, &size), "payload is not present"); request->read_commit(0); // so we can re-read the request buffer in replicated app - update.data.assign((char *)ptr, 0, (int)size); + update.data.assign(static_cast(ptr), 0, size); - _appro_data_bytes += sizeof(int) + (int)size; // data size + _appro_data_bytes += static_cast(sizeof(int) + size); // data size } else { update.code = RPC_REPLICATION_WRITE_EMPTY; - _appro_data_bytes += sizeof(int); // empty data size + _appro_data_bytes += static_cast(sizeof(int)); // empty data size } client_requests.push_back(request); diff --git a/src/replica/mutation.h b/src/replica/mutation.h index 33612d8fc1..e78561c969 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -39,7 +39,6 @@ #include "rpc/rpc_message.h" #include "runtime/api_layer1.h" #include "task/task.h" -#include "task/task_code.h" #include "utils/autoref_ptr.h" #include "utils/fmt_logging.h" @@ -160,9 +159,10 @@ class mutation : public ref_counter // user requests std::vector client_requests; - // A mutation is blocking means this mutation would begin to be processed after all of - // the previous mutations in the queue have been committed and applied into the rocksdb - // of the primary replica. + // A mutation will be a blocking mutation if `is_blocking` is true. A blocking mutation + // will not begin to be poped from the queue and processed until all of mutations before + // it in the queue have been committed and applied into RocksDB. This field is only used + // by primary replicas. bool is_blocking{false}; // The original request received from the client. While making an atomic request (incr, From df317ea023411c4d8dc093bdca61c06d63200ac4 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 27 Feb 2025 20:02:01 +0800 Subject: [PATCH 13/18] refactor --- src/replica/mutation.h | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/replica/mutation.h b/src/replica/mutation.h index e78561c969..652033acb7 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -58,14 +58,19 @@ class mutation; using mutation_ptr = dsn::ref_ptr; -// mutation is the 2pc unit of PacificA, which wraps one or more client requests and add -// header informations related to PacificA algorithm for them. -// both header and client request content are put into "data" member. +// Mutation is 2PC unit of PacificA, which wraps one or more client requests and adds header +// informations related to PacificA algorithm for them. Both header and client request content +// are put into "data" member. class mutation : public ref_counter { public: mutation(); - virtual ~mutation(); + ~mutation() override; + + mutation(const mutation &) = delete; + mutation &operator=(const mutation &) = delete; + mutation(mutation &&) = delete; + mutation &operator=(mutation &&) = delete; // copy mutation from an existing mutation, typically used in partition split // mutation should not reply to client, because parent has already replied From ed57f3fc7a6749116481f09a624c61d0fd5efe47 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Tue, 4 Mar 2025 19:14:55 +0800 Subject: [PATCH 14/18] refactor mutation queue --- src/replica/mutation.cpp | 87 ++++++++++++++----------- src/replica/mutation.h | 112 ++++++++++++++++++++++++++------- src/replica/replica.cpp | 3 +- src/replica/replica_config.cpp | 2 +- 4 files changed, 142 insertions(+), 62 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index 650d458b1d..bc35a619f7 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -359,8 +359,32 @@ mutation_queue::mutation_queue(replica *r, _pcount = dsn_task_queue_virtual_length_ptr(RPC_PREPARE, gpid.thread_hash()); } -// Once the blocking mutation is set, any mutation would not be popped until all previous -// mutations have been committed and applied into the rocksdb. +void mutation_queue::promote_pending() +{ + _queue.push(_pending_mutation); + _pending_mutation.reset(); + ++(*_pcount); +} + +void mutation_queue::try_promote_pending(task_spec *spec) +{ + // Promote `_pending_mutation` to `_queue` in following cases: + // - this client request (whose specification is `spec`) is not allowed to be batched, or + // - the size of `_pending_mutation` reaches the upper limit, or + // - batch write is disabled (initialized by FLAGS_batch_write_disabled). + // + // To optimize short-circuit evaluation, `_batch_write_disabled` is used as the last + // condition to be checked, since it originates from FLAGS_batch_write_disabled which + // is mostly set as false by default, while other conditions vary with different incoming + // client requests. + if (spec->rpc_request_is_write_allow_batch && !_pending_mutation->is_full() && + !_batch_write_disabled) { + return; + } + + promote_pending(); +} + mutation_ptr mutation_queue::try_unblock() { CHECK_NOTNULL(_blocking_mutation, ""); @@ -372,18 +396,22 @@ mutation_ptr mutation_queue::try_unblock() return {}; } + // All of the mutations before the blocking mutation must have been applied. CHECK_EQ(max_prepared_decree, last_applied_decree); + // Pop the blocking mutation into the write pipeline to be processed. mutation_ptr mu = _blocking_mutation; + + // Disable the blocking mutation as it has been popped. _blocking_mutation = nullptr; - // + // Increase the number of the mutations being processed currently as the blocking + // mutation is popped. ++_current_op_count; return mu; } -// Once the popped mutation is found blocking, set it as the blocking mutation. mutation_ptr mutation_queue::try_block(mutation_ptr &mu) { CHECK_NOTNULL(mu, ""); @@ -395,7 +423,11 @@ mutation_ptr mutation_queue::try_block(mutation_ptr &mu) CHECK_NULL(_blocking_mutation, ""); + // Enable the blocking mutation once the immediately popped mutation `mu` is found blocking. _blocking_mutation = mu; + + // If all of mutations before the blocking mutation have been applied, we could unblock + // the queue immediately. return try_unblock(); } @@ -408,9 +440,7 @@ mutation_ptr mutation_queue::add_work(message_ex *request) // If batch is not allowed for this write, switch work queue if (_pending_mutation != nullptr && !spec->rpc_request_is_write_allow_batch) { - _queue.push(_pending_mutation); - _pending_mutation.reset(); - ++(*_pcount); + promote_pending(); } // Add to work queue @@ -425,50 +455,33 @@ mutation_ptr mutation_queue::add_work(message_ex *request) _pending_mutation->add_client_request(request); - // short-cut - if (_current_op_count < _max_concurrent_op && _queue.empty()) { - if (_blocking_mutation != nullptr) { - return try_unblock(); - } - - auto mu = _pending_mutation; - _pending_mutation.reset(); - - return try_block(mu); - } - - // check if need to switch work queue - if (_batch_write_disabled || !spec->rpc_request_is_write_allow_batch || - _pending_mutation->is_full()) { - _queue.push(_pending_mutation); - _pending_mutation.reset(); - ++(*_pcount); - } - - // get next work item if (_current_op_count >= _max_concurrent_op) { + try_promote_pending(spec); return {}; } if (_blocking_mutation != nullptr) { + try_promote_pending(spec); return try_unblock(); } - // Try to fetch next work. mutation_ptr mu; if (_queue.empty()) { - CHECK_NOTNULL(_pending_mutation, "pending mutation cannot be null"); - + // _pending_mutation is non-null mu = _pending_mutation; _pending_mutation.reset(); } else { - mu = unlink_next_workload(); + + try_promote_pending(spec); + + // Try to fetch next work. + mu = pop_internal_queue(); } return try_block(mu); } -mutation_ptr mutation_queue::check_possible_work(int current_running_count) +mutation_ptr mutation_queue::next_work(int current_running_count) { _current_op_count = current_running_count; @@ -491,7 +504,7 @@ mutation_ptr mutation_queue::check_possible_work(int current_running_count) _pending_mutation.reset(); } else { // run further workload - mu = unlink_next_workload(); + mu = pop_internal_queue(); } return try_block(mu); @@ -504,7 +517,7 @@ void mutation_queue::clear() } mutation_ptr r; - while ((r = unlink_next_workload()) != nullptr) { + while ((r = pop_internal_queue()) != nullptr) { } if (_pending_mutation != nullptr) { @@ -522,7 +535,7 @@ void mutation_queue::clear(std::vector &queued_mutations) } mutation_ptr r; - while ((r = unlink_next_workload()) != nullptr) { + while ((r = pop_internal_queue()) != nullptr) { queued_mutations.emplace_back(r); } @@ -532,7 +545,7 @@ void mutation_queue::clear(std::vector &queued_mutations) } // we don't reset the current_op_count, coz this is handled by - // check_possible_work. In which, the variable current_running_count + // next_work. In which, the variable current_running_count // is handled by prepare_list // _current_op_count = 0; } diff --git a/src/replica/mutation.h b/src/replica/mutation.h index 652033acb7..15af33d070 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -58,9 +58,10 @@ class mutation; using mutation_ptr = dsn::ref_ptr; -// Mutation is 2PC unit of PacificA, which wraps one or more client requests and adds header -// informations related to PacificA algorithm for them. Both header and client request content -// are put into "data" member. +// As 2PC unit of PacificA, a mutation contains one or more write requests with header +// information related to PacificA algorithm in `data` member. It is appended to plog +// and written into prepare request broadcast to secondary replicas. It also holds the +// original client requests used to build the response to the client. class mutation : public ref_counter { public: @@ -114,7 +115,7 @@ class mutation : public ref_counter // to build the response to the client later. // // Parameters: - // - request: it is from a client if non-null, otherwise it is an empty write. + // - request: is from a client if non-null, otherwise is an empty write. void add_client_request(dsn::message_ex *request); void copy_from(mutation_ptr &old); @@ -165,7 +166,7 @@ class mutation : public ref_counter std::vector client_requests; // A mutation will be a blocking mutation if `is_blocking` is true. A blocking mutation - // will not begin to be poped from the queue and processed until all of mutations before + // will not begin to be popped from the queue and processed until all of mutations before // it in the queue have been committed and applied into RocksDB. This field is only used // by primary replicas. bool is_blocking{false}; @@ -214,18 +215,26 @@ class mutation : public ref_counter class replica; -// mutation queue are queues for mutations waiting to send. -// more precisely: for client requests waiting to send. -// mutations are queued as "_queue + _pending_mutation". that is to say, _queue.first is the first -// element in the queue, and pending_mutations is the last. +// The mutation queue caches the mutations waiting to be processed in order by the write pipeline, +// including appended to plog and broadcast to secondary replicas. // -// However, once _blocking_mutation is non-null, it is the first element. +// The entire queue is arranged in the order of `_blocking_mutation + _queue + _pending_mutation`, +// meaning that `_blocking_mutation` is the head of the queue if it is non-null, for the reason +// that it is enabled only when the mutation ready to get popped from the queue is a blocking +// mutation: it will block the entire queue from which none could get popped until all of the +// mutations before it have been applied. // -// we keep 2 structure "hdr" and "pending_mutation" coz: -// 1. as a container of client requests, capacity of a mutation is limited, so incoming client -// requets should be packed into different mutations -// 2. number of preparing mutations is also limited, so we should queue new created mutations and -// try to send them as soon as the concurrent condition satisfies. +// Once `_blocking_mutation` is cleared and becomes null, the head of the queue will be the head +// of `_queue`. `_pending_mutation` is the tail of the queue, separated from `_queue` due to the +// following reasons: +// 1. As a carrier for storing client requests, each mutation needs to be size-limited. For each +// incoming request, we need to decide whether to continue storing it in the most recent mutation +// (i.e. `_pending_mutation`) or to create a new one. +// 2. The number of concurrent two-phase commits is limited. We should ensure the requests in +// each mutation could be processed as soon as possible if it does not reach the upper limit, +// even if the requests are in the latest mutation. +// 3. Some writes (such as non-single writes) do not allow batching. Once this kind of requests +// are received, a new mutation (`_pending_mutation`) should be created to hold them. class mutation_queue { public: @@ -245,21 +254,80 @@ class mutation_queue mutation_queue(mutation_queue &&) = delete; mutation_queue &operator=(mutation_queue &&) = delete; + // Append the input request from the client to the queue by filling the latest mutation + // with it. + // + // Parameters: + // - request: must be non-null and from a client. + // + // Return the next mutation needing to be processed in order. Returning null means the + // queue is being blocked or does not have any mutation. mutation_ptr add_work(dsn::message_ex *request); + // Get the next mutation in order, typically called immediately after the current + // mutation was applied, or the membership was changed and we became the primary + // replica. + // + // Parameters: + // - current_running_count: used to reset the current number of the mutations being + // processed, typically the gap between the max committed decree and the max prepared + // decree. `_current_op_count` is never decreased directly: this parameter provides + // the only way to decrease it. + // + // Return the next mutation needing to be processed in order. Returning null means the + // queue is being blocked or does not have any mutation. + mutation_ptr next_work(int current_running_count); + + // Clear the entire queue. void clear(); - // called when you want to clear the mutation_queue and want to get the remaining messages - void clear(std::vector &queued_mutations); - // called when the curren operation is completed or replica configuration is change, - // which triggers further round of operations as returned - mutation_ptr check_possible_work(int current_running_count); + // Get the remaining unprocessed mutations and clear the entire queue. + // + // Parameters: + // - queued_mutations: the output parameter used to hold the remaining unprocessed + // mutations. + void clear(std::vector &queued_mutations); private: + // Promote `_pending_mutation` to `_queue`. Before the promotion, `_pending_mutation` + // should not be null (otherwise the behaviour is undefined). + void promote_pending(); + + // If some conditions are met, promote `_pending_mutation` to `_queue`. Before the + // promotion, `_pending_mutation` should not be null (otherwise the behaviour is + // undefined). + // + // Parameters: + // - spec: the specification for the incoming client request, used to check if this client + // request is allowed to be batched. + void try_promote_pending(task_spec *spec); + + // Once the blocking mutation is enabled, the queue will be blocked and any mutation cannot + // get popped. However, once the mutations before the blocking mutation have been applied + // into RocksDB, the blocking mutation can be disabled and the queue will be "unblocked". + // `_blocking_mutation` should not be null before this function is called. + // + // Return non-null blocking mutation if succeeding in unblocking, otherwise return null + // which means the queue is still blocked. mutation_ptr try_unblock(); + + // If immediately popped `mu` is not a blocking mutation, this function will do nothing + // except that increase the count for the mutations being processed. Otherwise, it will + // set `mu` to `_blocking_mutation` to enable the blocking mutation. `_blocking_mutation` + // should be null before this function is called. + // + // Parameters: + // - mu: the mutation immediately popped from the header of `_queue + _pending_mutation`. + // Should not be null. + // + // Return the next mutation needing to be processed in order. Returning null means the + // queue is being blocked or does not have any mutation. mutation_ptr try_block(mutation_ptr &mu); - mutation_ptr unlink_next_workload() + // Pop the mutation from the header of `_queue`. + // + // Return non-null mutation if the queue is not empty, otherwise return null. + mutation_ptr pop_internal_queue() { if (_queue.empty()) { return {}; @@ -272,7 +340,7 @@ class mutation_queue return work; } - void reset_max_concurrent_ops(int max_c) { _max_concurrent_op = max_c; } + void reset_max_concurrent_ops(int max) { _max_concurrent_op = max; } replica *_replica; diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 58ac141d4a..8b117f15e6 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -561,8 +561,7 @@ void replica::execute_mutation(mutation_ptr &mu) } ADD_CUSTOM_POINT(mu->_tracer, "completed"); - auto next = _primary_states.write_queue.check_possible_work( - static_cast(max_prepared_decree() - d)); + auto next = _primary_states.write_queue.next_work(static_cast(max_prepared_decree() - d)); if (next != nullptr) { init_prepare(next, false); diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp index 4b0671d78e..75e31caf79 100644 --- a/src/replica/replica_config.cpp +++ b/src/replica/replica_config.cpp @@ -1039,7 +1039,7 @@ bool replica::update_local_configuration(const replica_configuration &config, // start pending mutations if necessary if (status() == partition_status::PS_PRIMARY) { - mutation_ptr next = _primary_states.write_queue.check_possible_work( + auto next = _primary_states.write_queue.next_work( static_cast(max_prepared_decree() - last_committed_decree())); if (next != nullptr) { init_prepare(next, false); From 591ddff91eb07620633ddaaa015b77d3ea4f97d4 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 5 Mar 2025 12:55:09 +0800 Subject: [PATCH 15/18] add comments --- src/replica/mutation.cpp | 53 +++++++++++++++++++++++++++++----------- src/replica/mutation.h | 19 ++++++++------ 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index bc35a619f7..7546f9f700 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -373,9 +373,9 @@ void mutation_queue::try_promote_pending(task_spec *spec) // - the size of `_pending_mutation` reaches the upper limit, or // - batch write is disabled (initialized by FLAGS_batch_write_disabled). // - // To optimize short-circuit evaluation, `_batch_write_disabled` is used as the last - // condition to be checked, since it originates from FLAGS_batch_write_disabled which - // is mostly set as false by default, while other conditions vary with different incoming + // Choose `_batch_write_disabled` as the last condition to be checked to optimize the + // performance by short-circuit evaluation since it is actually FLAGS_batch_write_disabled + // which is mostly set false by default while other conditions vary with different incoming // client requests. if (spec->rpc_request_is_write_allow_batch && !_pending_mutation->is_full() && !_batch_write_disabled) { @@ -438,12 +438,14 @@ mutation_ptr mutation_queue::add_work(message_ex *request) auto *spec = task_spec::get(request->rpc_code()); CHECK_NOTNULL(spec, ""); - // If batch is not allowed for this write, switch work queue + // If this request is not allowed to be batched, promote `_pending_mutation` if it is + // non-null. We don't check `_batch_write_disabled` since `_pending_mutation` must be + // null now if it is true. if (_pending_mutation != nullptr && !spec->rpc_request_is_write_allow_batch) { promote_pending(); } - // Add to work queue + // Once `_pending_mutation` is cleared, just assign a new mutation to it. if (_pending_mutation == nullptr) { _pending_mutation = _replica->new_mutation(invalid_decree, _replica->need_make_idempotent(spec)); @@ -453,31 +455,45 @@ mutation_ptr mutation_queue::add_work(message_ex *request) request->header->trace_id, _pending_mutation->tid()); + // Append the incoming client request to `_pending_mutation`. _pending_mutation->add_client_request(request); + // Throttling is triggered as there are too many mutations being processed as 2PC. Return + // null in case more mutations flow into the write pipeline. if (_current_op_count >= _max_concurrent_op) { + // Since the pending mutation was just filled with the client request, try to promote + // it. try_promote_pending(spec); return {}; } + // Once the blocking mutation is enabled, return null if still blocked, or non-null + // blocking mutation if succeeding in unblocking. if (_blocking_mutation != nullptr) { + // Since the pending mutation was just filled with the client request, try to promote + // it. try_promote_pending(spec); return try_unblock(); } mutation_ptr mu; if (_queue.empty()) { - // _pending_mutation is non-null + // `_pending_mutation` must be non-null now. There's no need to promote it as `_queue` + // is empty: just pop it as the next work candidate to be processed. mu = _pending_mutation; _pending_mutation.reset(); } else { - + // Since the pending mutation was just filled with the client request, try to promote + // it. try_promote_pending(spec); - // Try to fetch next work. + // Now the first element of `_queue` is the head of the entire queue. Pop and return it + // as the next work candidate to be processed. mu = pop_internal_queue(); } + // Currently the popped work is still a candidate: once it is a blocking mutation, the queue + // may become blocked and nothing will be returned. return try_block(mu); } @@ -485,28 +501,37 @@ mutation_ptr mutation_queue::next_work(int current_running_count) { _current_op_count = current_running_count; + // Throttling is triggered as there are too many mutations being processed as 2PC. Just + // return null in case more mutations flow into the write pipeline. if (_current_op_count >= _max_concurrent_op) { return {}; } + // Once the blocking mutation is enabled, return null if still blocked, or non-null + // blocking mutation if succeeding in unblocking. if (_blocking_mutation != nullptr) { return try_unblock(); } mutation_ptr mu; if (_queue.empty()) { - // no further workload + // There's not any further work to be processed if `_pending_mutation` is also null. if (_pending_mutation == nullptr) { return {}; } + // `_pending_mutation` is not null now. Just pop it as the next work candidate to be + // processed. mu = _pending_mutation; _pending_mutation.reset(); } else { - // run further workload + // Now the first element of `_queue` is the head of the entire queue. Pop and return it + // as the next work candidate to be processed. mu = pop_internal_queue(); } + // Currently the popped work is still a candidate: once it is a blocking mutation, the queue + // may become blocked and nothing will be returned. return try_block(mu); } @@ -516,6 +541,7 @@ void mutation_queue::clear() _blocking_mutation.reset(); } + // Use pop_internal_queue() to clear `_queue` since `_pcount` should also be updated. mutation_ptr r; while ((r = pop_internal_queue()) != nullptr) { } @@ -534,6 +560,7 @@ void mutation_queue::clear(std::vector &queued_mutations) _blocking_mutation.reset(); } + // Use pop_internal_queue() to clear `_queue` since `_pcount` should also be updated. mutation_ptr r; while ((r = pop_internal_queue()) != nullptr) { queued_mutations.emplace_back(r); @@ -544,10 +571,8 @@ void mutation_queue::clear(std::vector &queued_mutations) _pending_mutation.reset(); } - // we don't reset the current_op_count, coz this is handled by - // next_work. In which, the variable current_running_count - // is handled by prepare_list - // _current_op_count = 0; + // We don't reset the `_current_op_count` here, since it is done by next_work() where the + // parameter `current_running_count` is specified to reset `_current_op_count` as 0. } } // namespace dsn::replication diff --git a/src/replica/mutation.h b/src/replica/mutation.h index 15af33d070..bc5544bf43 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -224,9 +224,9 @@ class replica; // mutation: it will block the entire queue from which none could get popped until all of the // mutations before it have been applied. // -// Once `_blocking_mutation` is cleared and becomes null, the head of the queue will be the head -// of `_queue`. `_pending_mutation` is the tail of the queue, separated from `_queue` due to the -// following reasons: +// Once `_blocking_mutation` is cleared and becomes null, the head of the queue will be the first +// element of `_queue`. `_pending_mutation` is the tail of the queue, separated from `_queue` due +// to the following reasons: // 1. As a carrier for storing client requests, each mutation needs to be size-limited. For each // incoming request, we need to decide whether to continue storing it in the most recent mutation // (i.e. `_pending_mutation`) or to create a new one. @@ -317,14 +317,14 @@ class mutation_queue // should be null before this function is called. // // Parameters: - // - mu: the mutation immediately popped from the header of `_queue + _pending_mutation`. + // - mu: the mutation immediately popped from the head of `_queue + _pending_mutation`. // Should not be null. // // Return the next mutation needing to be processed in order. Returning null means the // queue is being blocked or does not have any mutation. mutation_ptr try_block(mutation_ptr &mu); - // Pop the mutation from the header of `_queue`. + // Pop the mutation from the head of `_queue`. // // Return non-null mutation if the queue is not empty, otherwise return null. mutation_ptr pop_internal_queue() @@ -352,9 +352,12 @@ class mutation_queue mutation_ptr _pending_mutation; std::queue _queue; - // Once a mutation that would get popped is blocking, it should firstly be put in - // `_blocking_mutation`; then, the queue would always return nullptr until previous - // mutations have been committed and applied into the rocksdb of primary replica. + // Once a mutation getting popped from `_queue + _pending_mutation` is found blocking, it + // will firstly be set to `_blocking_mutation` to enable the blocking mutation; then, the + // mutation popped from the queue will always be null. Only after the all of the mutations + // before the blocking mutations have been applied into RocksDB can `_blocking_mutation` + // be popped and disabled; then, the mutations will continue to get popped from this queue + // in order until another blocking mutations appears. mutation_ptr _blocking_mutation; }; From dada8cef70a13dc0c6f5c188f71175b3b2375610 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 5 Mar 2025 15:19:33 +0800 Subject: [PATCH 16/18] refactor --- src/replica/mutation.cpp | 6 ++++-- src/replica/mutation.h | 18 ++++++++---------- src/replica/replica.cpp | 1 - src/replica/replica.h | 7 +++---- src/replica/replica_2pc.cpp | 1 - 5 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index 7546f9f700..9b36255452 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -44,7 +44,6 @@ #include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/latency_tracer.h" -#include "utils/ports.h" DSN_DEFINE_uint64( replication, @@ -416,6 +415,7 @@ mutation_ptr mutation_queue::try_block(mutation_ptr &mu) { CHECK_NOTNULL(mu, ""); + // If the immediately popped mutation is non-blocking, just return it to be processed. if (!mu->is_blocking) { ++_current_op_count; return mu; @@ -445,7 +445,9 @@ mutation_ptr mutation_queue::add_work(message_ex *request) promote_pending(); } - // Once `_pending_mutation` is cleared, just assign a new mutation to it. + // Once `_pending_mutation` is cleared, just assign a new mutation to it. If the client + // request is an atomic write and should be translated into idempotent writes, this new + // mutation will be created as a blocking mutation. if (_pending_mutation == nullptr) { _pending_mutation = _replica->new_mutation(invalid_decree, _replica->need_make_idempotent(spec)); diff --git a/src/replica/mutation.h b/src/replica/mutation.h index bc5544bf43..2632a091c2 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -41,12 +41,14 @@ #include "task/task.h" #include "utils/autoref_ptr.h" #include "utils/fmt_logging.h" +#include "utils/ports.h" namespace dsn { class binary_reader; class binary_writer; class blob; class gpid; +class task_spec; namespace utils { class latency_tracer; @@ -68,10 +70,8 @@ class mutation : public ref_counter mutation(); ~mutation() override; - mutation(const mutation &) = delete; - mutation &operator=(const mutation &) = delete; - mutation(mutation &&) = delete; - mutation &operator=(mutation &&) = delete; + DISALLOW_COPY_AND_ASSIGN(mutation); + DISALLOW_MOVE_AND_ASSIGN(mutation); // copy mutation from an existing mutation, typically used in partition split // mutation should not reply to client, because parent has already replied @@ -249,10 +249,8 @@ class mutation_queue _current_op_count); } - mutation_queue(const mutation_queue &) = delete; - mutation_queue &operator=(const mutation_queue &) = delete; - mutation_queue(mutation_queue &&) = delete; - mutation_queue &operator=(mutation_queue &&) = delete; + DISALLOW_COPY_AND_ASSIGN(mutation_queue); + DISALLOW_MOVE_AND_ASSIGN(mutation_queue); // Append the input request from the client to the queue by filling the latest mutation // with it. @@ -312,8 +310,8 @@ class mutation_queue mutation_ptr try_unblock(); // If immediately popped `mu` is not a blocking mutation, this function will do nothing - // except that increase the count for the mutations being processed. Otherwise, it will - // set `mu` to `_blocking_mutation` to enable the blocking mutation. `_blocking_mutation` + // but increasing the count for the mutations being processed. Otherwise, it will set + // `mu` to `_blocking_mutation` to enable the blocking mutation. `_blocking_mutation` // should be null before this function is called. // // Parameters: diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 8b117f15e6..68e2839d03 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -58,7 +58,6 @@ #include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/latency_tracer.h" -#include "utils/ports.h" #include "utils/rand.h" DSN_DEFINE_bool(replication, diff --git a/src/replica/replica.h b/src/replica/replica.h index f79849524b..8017188c06 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -57,6 +57,7 @@ #include "utils/autoref_ptr.h" #include "utils/error_code.h" #include "utils/metrics.h" +#include "utils/ports.h" #include "utils/thread_access_checker.h" #include "utils/throttling_controller.h" #include "utils/uniq_timestamp_us.h" @@ -164,10 +165,8 @@ class replica : public serverlet, public ref_counter, public replica_ba public: ~replica() override; - replica(const replica &) = delete; - replica &operator=(const replica &) = delete; - replica(replica &&) = delete; - replica &operator=(replica &&) = delete; + DISALLOW_COPY_AND_ASSIGN(replica); + DISALLOW_MOVE_AND_ASSIGN(replica); // return true when the mutation is valid for the current replica bool replay_mutation(mutation_ptr &mu, bool is_private); diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 3389f6a700..c7a415a685 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -75,7 +75,6 @@ #include "utils/fmt_logging.h" #include "utils/latency_tracer.h" #include "utils/metrics.h" -#include "utils/ports.h" #include "utils/thread_access_checker.h" #include "utils/uniq_timestamp_us.h" From c7bda4a69a653bf62bfe458e8568042363a5f429 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 5 Mar 2025 18:15:34 +0800 Subject: [PATCH 17/18] add comments, fix clang-tidy and IWYU --- src/replica/mutation.cpp | 2 +- src/replica/mutation.h | 3 ++- src/replica/replica.h | 51 ++++++++++++++++++++++++++++++------- src/replica/replica_2pc.cpp | 3 ++- 4 files changed, 47 insertions(+), 12 deletions(-) diff --git a/src/replica/mutation.cpp b/src/replica/mutation.cpp index 9b36255452..b630abab9b 100644 --- a/src/replica/mutation.cpp +++ b/src/replica/mutation.cpp @@ -48,7 +48,7 @@ DSN_DEFINE_uint64( replication, abnormal_write_trace_latency_threshold, - 1000 * 1000 * 1000, // 1s + 1000UL * 1000UL * 1000UL, // 1s "Latency trace will be logged when exceed the write latency threshold, in nanoseconds"); DSN_TAG_VARIABLE(abnormal_write_trace_latency_threshold, FT_MUTABLE); diff --git a/src/replica/mutation.h b/src/replica/mutation.h index 2632a091c2..4b35978480 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -216,7 +216,8 @@ class mutation : public ref_counter class replica; // The mutation queue caches the mutations waiting to be processed in order by the write pipeline, -// including appended to plog and broadcast to secondary replicas. +// including appended to plog and broadcast to secondary replicas. This class is only used by +// primary replicas. // // The entire queue is arranged in the order of `_blocking_mutation + _queue + _pending_mutation`, // meaning that `_blocking_mutation` is the head of the queue if it is non-null, for the reason diff --git a/src/replica/replica.h b/src/replica/replica.h index 8017188c06..407d922c6b 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -360,15 +360,33 @@ class replica : public serverlet, public ref_counter, public replica_ba void response_client_write(dsn::message_ex *request, error_code error); void execute_mutation(mutation_ptr &mu); - // Create a new mutation with the non-idempotent original request, which is used to reply - // to the client. + // Create a new mutation with specified decree and the original atomic write request, + // which is used to build the response to the client. + // + // Parameters: + // - decree: invalid_decree, or the real decree assigned to this mutation. + // - original_request: the original request of the atomic write. + // + // Return the newly created mutation. mutation_ptr new_mutation(decree decree, dsn::message_ex *original_request); - // Create a new mutation marked as blocking, which means this mutation would begin to be - // processed after all of the previous mutations in the queue have been committed and applied - // into the rocksdb of primary replica. + // Create a new mutation with specified decree and a flag marking whether this is a + // blocking mutation (for a detailed explanation of blocking mutations, refer to the + // comments for the field `is_blocking` of class `mutation`). + // + // Parameters: + // - decree: invalid_decree, or the real decree assigned to this mutation. + // - is_blocking: true means creating a blocking mutation. + // + // Return the newly created mutation. mutation_ptr new_mutation(decree decree, bool is_blocking); + // Create a new mutation with specified decree. + // + // Parameters: + // - decree: invalid_decree, or the real decree assigned to this mutation. + // + // Return the newly created mutation. mutation_ptr new_mutation(decree decree); // initialization @@ -399,19 +417,33 @@ class replica : public serverlet, public ref_counter, public replica_ba // Make the request in the mutation idempotent, if needed. int make_idempotent(mutation_ptr &mu); - // `pop_all_committed_mutations = true` will be used for ingestion empty write - // See more about it in `replica_bulk_loader.cpp` + // Launch 2PC for the specified mutation: it will be broadcast to secondary replicas, + // appended to plog, and finally applied into storage engine. + // + // Parameters: + // - mu: the mutation pushed into the write pipeline. + // - reconciliation: true means the primary replica will be force to launch 2PC for each + // uncommitted request in its prepared list to make them committed regardless of whether + // there is a quorum to receive the prepare requests. + // - pop_all_committed_mutations: true means popping all committed mutations while preparing + // locally, used for ingestion in bulk loader with empty write. See `replica_bulk_loader.cpp` + // for details. void init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_committed_mutations); + // The same as the above except that `pop_all_committed_mutations` is set false. void init_prepare(mutation_ptr &mu, bool reconciliation) { init_prepare(mu, reconciliation, false); } + // Reply to the client with the error if 2PC failed. // + // Parameters: + // - mu: the mutation for which 2PC failed. + // - err: the error that caused the 2PC failure. void reply_with_error(const mutation_ptr &mu, const error_code &err); - void send_prepare_message(const ::dsn::host_port &hp, + void send_prepare_message(const host_port &hp, partition_status::type status, const mutation_ptr &mu, int timeout_milliseconds, @@ -679,7 +711,8 @@ class replica : public serverlet, public ref_counter, public replica_ba app_info _app_info; std::map _extra_envs; - // TODO(wangdan): temporarily used to record, would support soon. + // TODO(wangdan): temporarily used to mark whether we make all atomic writes idempotent + // for this replica. Would make this configurable soon. bool _make_write_idempotent; // uniq timestamp generator for this replica. diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index c7a415a685..fe766866f5 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -75,6 +75,7 @@ #include "utils/fmt_logging.h" #include "utils/latency_tracer.h" #include "utils/metrics.h" +#include "utils/ports.h" #include "utils/thread_access_checker.h" #include "utils/uniq_timestamp_us.h" @@ -465,7 +466,7 @@ void replica::reply_with_error(const mutation_ptr &mu, const error_code &err) } } -void replica::send_prepare_message(const ::dsn::host_port &hp, +void replica::send_prepare_message(const host_port &hp, partition_status::type status, const mutation_ptr &mu, int timeout_milliseconds, From 437897f27bb920c77b3ea016d06f652fddde3271 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 5 Mar 2025 20:27:01 +0800 Subject: [PATCH 18/18] add comments --- src/replica/replica.h | 38 +++++++++++++++++++++++++++++++------ src/replica/replica_2pc.cpp | 26 ++++++++++++------------- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/src/replica/replica.h b/src/replica/replica.h index 407d922c6b..bc3b0b8fbf 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -402,19 +402,45 @@ class replica : public serverlet, public ref_counter, public replica_ba decree get_replay_start_decree(); ///////////////////////////////////////////////////////////////// - // 2pc + // 2PC - // - spec: should never be NULL (otherwise the behaviour is undefined). + // Given the specification for a client request, decide whether to reject it as it is a + // non-idempotent request. + // + // Parameters: + // - spec: the specification for a client request, should not be null (otherwise the + // behaviour is undefined). + // + // Return true if deciding to reject this client request. bool need_reject_non_idempotent(task_spec *spec) const; - // Decide if it is needed to make the request idempotent. - // - spec: should never be NULL (otherwise the behaviour is undefined). + // Given the specification for a client request, decide whether to make it idempotent. + // + // Parameters: + // - spec: the specification for a client request, should not be null (otherwise the + // behaviour is undefined). + // + // Return true if deciding to make this client request idempotent. bool need_make_idempotent(task_spec *spec) const; - // Decide if it is needed to make the request idempotent. + // Given a client request, decide whether to make it idempotent. + // + // Parameters: + // - request: the client request, could be null. + // + // Return true if deciding to make this client request idempotent. bool need_make_idempotent(message_ex *request) const; - // Make the request in the mutation idempotent, if needed. + // Make the atomic write request (if any) in a mutation idempotent. + // + // Parameters: + // - mu: the mutation where the atomic write request will be translated into idempotent + // one. Should contain at least one client request. Once succeed in translating, `mu` + // will be reassigned with the new idempotent mutation as the output. Thus it is both an + // input and an output parameter. + // + // Return rocksdb::Status::kOk, or other code (rocksdb::Status::Code) if some error + // occurred while making write idempotent. int make_idempotent(mutation_ptr &mu); // Launch 2PC for the specified mutation: it will be broadcast to secondary replicas, diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index fe766866f5..2983408c8f 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -289,29 +289,26 @@ int replica::make_idempotent(mutation_ptr &mu) dsn::message_ex *new_request = nullptr; const int err = _app->make_idempotent(request, &new_request); if (dsn_unlikely(err != rocksdb::Status::kOk)) { - // Once error occurred, the response would be returned to the client during - // _app->make_idempotent(). + // Once some error occurred, the response with error must have been returned to the + // client during _app->make_idempotent(). Thus do nothing here. return err; } CHECK_NOTNULL(new_request, "new_request should not be null since its original write request must be atomic"); - // During make_idempotent(), request has been deserialized (i.e. unmarshall() in - // rpc_holder::internal). Once deserialize it again, assertion would fail for - // set_read_msg() in the constructor of rpc_read_stream. + // During make_idempotent(), the request has been deserialized (i.e. unmarshall() in the + // constructor of `rpc_holder::internal`). Once deserialize it again, assertion would fail for + // set_read_msg() in the constructor of `rpc_read_stream`. // - // To deserialize it again for writting rocksdb, restore read for it. + // To make it deserializable again to be applied into RocksDB, restore read for it. request->restore_read(); // The decree must have not been assigned. CHECK_EQ(mu->get_decree(), invalid_decree); - // Create a new mutation to hold the new idempotent request. The old mutation that hold the - // non-idempotent requests would be released automatically. - // - // No need to create the mutation with is_blocking set to true, since the old mutation has - // been previously popped out from the mutation queue. + // Create a new mutation to hold the new idempotent request. The old mutation holding the + // original atomic write request will be released automatically. mu = new_mutation(invalid_decree, request); mu->add_client_request(new_request); return rocksdb::Status::kOk; @@ -322,8 +319,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c CHECK_EQ(partition_status::PS_PRIMARY, status()); if (make_idempotent(mu) != rocksdb::Status::kOk) { - // Once error occurred, the response must have been returned to the client during - // make_idempotent(). + // If some error occurred, the response with error must have been returned to the + // client during make_idempotent(). Thus do nothing here. return; } @@ -455,12 +452,13 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c void replica::reply_with_error(const mutation_ptr &mu, const error_code &err) { + // Respond to the original atomic request if it is non-null. And it could never be batched. if (mu->original_request != nullptr) { - // Respond to the original atomic request. And it would never be batched. response_client_write(mu->original_request, err); return; } + // Just respond to each client request directly if there is no original request for them. for (auto *req : mu->client_requests) { response_client_write(req, err); }