Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/v/cloud_topics/frontend/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ frontend::refine_timequery_result(

namespace {

raft::replicate_options update_replicate_options(raft::replicate_options opts) {
raft::replicate_options update_replicate_options(
raft::replicate_options opts, model::term_id expected_term) {
// We overwrite the consistency level in cloud topics. Since you're already
// willing to wait for object storage uploads, it's much safer to make sure
// metadata is written to a majority before acking the write as well. This
Expand All @@ -473,6 +474,7 @@ raft::replicate_options update_replicate_options(raft::replicate_options opts) {
// consumers - and to prevent situations where we have to suffix truncate
// the log, we just force a majority to persist the write before responding.
opts.consistency = raft::consistency_level::quorum_ack;
opts.expected_term = expected_term;
return opts;
}

Expand All @@ -496,7 +498,7 @@ struct upload_and_replicate_stages {
, ctp_stm_api(make_ctp_stm_api(this->partition))
, batches(std::move(batches))
, batch_id(batch_id)
, opts(update_replicate_options(opts))
, opts(opts)
, timeout(timeout) {}

ss::promise<> request_enqueued;
Expand Down Expand Up @@ -575,6 +577,7 @@ ss::future<> bg_upload_and_replicate(
placeholders.batches.size());

// Replicate
op->opts = update_replicate_options(op->opts, fence.term);
auto replicate_stages = partition->replicate_in_stages(
op->batch_id, std::move(placeholders.batches.front()), op->opts);

Expand Down Expand Up @@ -639,7 +642,6 @@ ss::future<> bg_upload_and_replicate(

ss::future<std::expected<kafka::offset, std::error_code>> frontend::replicate(
chunked_vector<model::record_batch> batches, raft::replicate_options opts) {
opts = update_replicate_options(opts);
chunked_vector<model::record_batch_header> headers;
headers.reserve(batches.size());
for (const auto& batch : batches) {
Expand Down Expand Up @@ -695,6 +697,7 @@ ss::future<std::expected<kafka::offset, std::error_code>> frontend::replicate(
placeholder_batches.push_back(std::move(batch));
}

opts = update_replicate_options(opts, fence.term);
auto result = co_await _partition->replicate(
std::move(placeholder_batches), opts);

Expand Down
9 changes: 7 additions & 2 deletions src/v/cloud_topics/level_one/metastore/simple_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <seastar/coroutine/as_future.hh>

#include <optional>

namespace cloud_topics::l1 {

namespace {
Expand Down Expand Up @@ -83,9 +85,12 @@ ss::future<std::expected<void, simple_stm::errc>>
simple_stm::replicate_and_wait(
model::term_id term, model::record_batch batch, ss::abort_source& as) {
auto opts = raft::replicate_options(
raft::consistency_level::quorum_ack, std::ref(as));
raft::consistency_level::quorum_ack,
/*expected_term=*/term,
/*timeout=*/std::nullopt,
std::ref(as));
opts.set_force_flush();
auto res = co_await _raft->replicate(term, std::move(batch), opts);
auto res = co_await _raft->replicate(std::move(batch), opts);
if (res.has_error()) {
co_return std::unexpected(errc::raft_error);
}
Expand Down
7 changes: 5 additions & 2 deletions src/v/cloud_topics/level_zero/stm/ctp_stm_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ ctp_stm_api::replicated_apply(
vlog(cd_log.debug, "Replicating batch {} in term {}", batch.header(), term);

auto opts = raft::replicate_options(
raft::consistency_level::quorum_ack, as);
raft::consistency_level::quorum_ack,
/*expected_term=*/term,
/*timeout=*/std::nullopt,
as);
opts.set_force_flush();
auto res = co_await _stm->_raft->replicate(term, std::move(batch), opts);
auto res = co_await _stm->_raft->replicate(std::move(batch), opts);

if (res.has_error()) {
vlog(cd_log.debug, "Failed to replicate batch: {}", res.error());
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -898,11 +898,11 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
.handle_exception_type(broken_promise_to_shutdown);
auto op_state_reset = ss::defer([&] { _active_operation_res.reset(); });

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
auto opts = raft::replicate_options(
raft::consistency_level::quorum_ack, current_term);
opts.set_force_flush();

auto result = co_await _raft->replicate(
current_term, std::move(batch), opts);
auto result = co_await _raft->replicate(std::move(batch), opts);
if (!result) {
vlog(
_logger.warn,
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/distributed_kv_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ class distributed_kv_stm final : public raft::persisted_stm<> {

ss::future<errc> replicate_and_wait(simple_batch_builder builder) {
auto r = co_await _raft->replicate(
_insync_term,
std::move(builder).build(),
raft::replicate_options(raft::consistency_level::quorum_ack));
raft::replicate_options(
raft::consistency_level::quorum_ack, _insync_term));

if (!r) {
co_return errc::replication_error;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ ss::future<bool> id_allocator_stm::set_state(
auto batch = serialize_cmd(
state_cmd{.next_state = value}, model::record_batch_type::id_allocator);
auto r = co_await _raft->replicate(
_insync_term,
std::move(batch),
raft::replicate_options(raft::consistency_level::quorum_ack));
raft::replicate_options(
raft::consistency_level::quorum_ack, _insync_term));
if (!r) {
co_return false;
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,10 @@ ss::future<log_eviction_stm::offset_result> log_eviction_stm::replicate_command(
model::record_batch batch,
ss::lowres_clock::time_point deadline,
std::optional<std::reference_wrapper<ss::abort_source>> as) {
auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
auto opts = raft::replicate_options(
raft::consistency_level::quorum_ack, _raft->term());
opts.set_force_flush();
auto fut = _raft->replicate(_raft->term(), std::move(batch), opts);
auto fut = _raft->replicate(std::move(batch), opts);

/// Execute the replicate command bound by timeout and cancellable via
/// abort_source mechanism
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/partition_properties_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,10 @@ partition_properties_stm::replicate_properties_update(
_log.debug, "replicating update partition properties command: {}", cmd);
raft::replicate_options r_opts(
raft::consistency_level::quorum_ack,
_insync_term,
std::chrono::milliseconds(timeout / 1ms));
r_opts.set_force_flush();
auto r = co_await _raft->replicate(_insync_term, std::move(b), r_opts);
auto r = co_await _raft->replicate(std::move(b), r_opts);

if (r.has_error()) {
vlog(
Expand Down
Loading