Skip to content

Commit 61b446f

Browse files
author
ninsmiracle
committed
add two matric and type key if dup non-idempotent write retried
1 parent e60dc0a commit 61b446f

12 files changed

+114
-17
lines changed

src/common/duplication_common.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ DSN_DEFINE_uint32(replication,
4040
"send mutation log batch bytes size per rpc");
4141
DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);
4242

43+
DSN_DEFINE_bool("replication",
44+
force_send_no_idempotent_when_duplication,
45+
false,
46+
"receive client idempotent write requests and send them to backup cluster when "
47+
"doing duplication");
48+
DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE);
49+
4350
const std::string duplication_constants::kDuplicationCheckpointRootDir /*NOLINT*/ = "duplication";
4451
const std::string duplication_constants::kClustersSectionName /*NOLINT*/ = "pegasus.clusters";
4552
const std::string duplication_constants::kDuplicationEnvMasterClusterKey /*NOLINT*/ =

src/common/duplication_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ namespace dsn {
3434
namespace replication {
3535

3636
DSN_DECLARE_uint32(duplicate_log_batch_bytes);
37+
DSN_DECLARE_bool(force_send_no_idempotent_when_duplication);
3738

3839
typedef rpc_holder<duplication_modify_request, duplication_modify_response> duplication_modify_rpc;
3940
typedef rpc_holder<duplication_add_request, duplication_add_response> duplication_add_rpc;

src/replica/mutation.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
#include "common/replication_common.h"
3636
#include "common/replication_other_types.h"
37+
#include "common/duplication_common.h"
3738
#include "consensus_types.h"
3839
#include "runtime/api_layer1.h"
3940
#include "runtime/rpc/rpc_message.h"
@@ -42,7 +43,6 @@
4243
#include "utils/autoref_ptr.h"
4344
#include "utils/fmt_logging.h"
4445
#include "utils/link.h"
45-
#include "utils/flags.h"
4646

4747
namespace dsn {
4848
class binary_reader;
@@ -55,8 +55,6 @@ class latency_tracer;
5555

5656
namespace replication {
5757

58-
DSN_DECLARE_bool(force_send_no_idempotent_when_duplication);
59-
6058
class mutation;
6159

6260
typedef dsn::ref_ptr<mutation> mutation_ptr;

src/replica/replica_2pc.cpp

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,6 @@ DSN_DEFINE_bool(replication,
8585
true,
8686
"reject client write requests if disk status is space insufficient");
8787
DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE);
88-
89-
DSN_DEFINE_bool("replication",
90-
force_send_no_idempotent_when_duplication,
91-
false,
92-
"receive client idempotent write requests and send them to backup cluster when "
93-
"doing duplication");
94-
DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE);
95-
96-
9788
DSN_DEFINE_int32(replication,
9889
prepare_timeout_ms_for_secondaries,
9990
1000,

src/server/info_collector.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,11 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s
207207
INIT_COUNTER(incr_qps);
208208
INIT_COUNTER(check_and_set_qps);
209209
INIT_COUNTER(check_and_mutate_qps);
210+
INIT_COUNTER(force_receive_no_idempotent_duplicate_qps);
210211
INIT_COUNTER(scan_qps);
211212
INIT_COUNTER(duplicate_qps);
212213
INIT_COUNTER(dup_shipped_ops);
214+
INIT_COUNTER(dup_retry_no_idempotent_duplicate_qps);
213215
INIT_COUNTER(dup_failed_shipping_ops);
214216
INIT_COUNTER(dup_recent_mutation_loss_count);
215217
INIT_COUNTER(recent_read_cu);

src/server/info_collector.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,14 @@ class info_collector
6666
incr_qps->set(row_stats.incr_qps);
6767
check_and_set_qps->set(row_stats.check_and_set_qps);
6868
check_and_mutate_qps->set(row_stats.check_and_mutate_qps);
69+
force_receive_no_idempotent_duplicate_qps->set(
70+
row_stats.force_receive_no_idempotent_duplicate_qps);
6971
scan_qps->set(row_stats.scan_qps);
7072
duplicate_qps->set(row_stats.duplicate_qps);
7173
dup_shipped_ops->set(row_stats.dup_shipped_ops);
7274
dup_failed_shipping_ops->set(row_stats.dup_failed_shipping_ops);
75+
dup_retry_no_idempotent_duplicate_qps->set(
76+
row_stats.dup_retry_no_idempotent_duplicate_qps);
7377
dup_recent_mutation_loss_count->set(row_stats.dup_recent_mutation_loss_count);
7478
recent_read_cu->set(row_stats.recent_read_cu);
7579
recent_write_cu->set(row_stats.recent_write_cu);
@@ -144,10 +148,12 @@ class info_collector
144148
::dsn::perf_counter_wrapper incr_qps;
145149
::dsn::perf_counter_wrapper check_and_set_qps;
146150
::dsn::perf_counter_wrapper check_and_mutate_qps;
151+
::dsn::perf_counter_wrapper force_receive_no_idempotent_duplicate_qps;
147152
::dsn::perf_counter_wrapper scan_qps;
148153
::dsn::perf_counter_wrapper duplicate_qps;
149154
::dsn::perf_counter_wrapper dup_shipped_ops;
150155
::dsn::perf_counter_wrapper dup_failed_shipping_ops;
156+
::dsn::perf_counter_wrapper dup_retry_no_idempotent_duplicate_qps;
151157
::dsn::perf_counter_wrapper dup_recent_mutation_loss_count;
152158
::dsn::perf_counter_wrapper recent_read_cu;
153159
::dsn::perf_counter_wrapper recent_write_cu;

src/server/pegasus_mutation_duplicator.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "utils/errors.h"
4949
#include "utils/fmt_logging.h"
5050
#include "utils/rand.h"
51+
#include "pegasus_rpc_types.h"
5152

5253
METRIC_DEFINE_counter(replica,
5354
dup_shipped_successful_requests,
@@ -59,6 +60,11 @@ METRIC_DEFINE_counter(replica,
5960
dsn::metric_unit::kRequests,
6061
"The number of failed DUPLICATE requests sent from client");
6162

63+
METRIC_DEFINE_counter(replica,
64+
dup_retry_no_idempotent_duplicate_qps,
65+
dsn::metric_unit::kRequests,
66+
"The qps of Non-idempotent write when doing DUPLICATE which is Retried");
67+
6268
namespace dsn {
6369
namespace replication {
6470
struct replica_base;
@@ -205,6 +211,9 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
205211
// retry this rpc
206212
_inflights[hash].push_front(rpc);
207213
_env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s);
214+
215+
type_force_send_no_idempotent_if_need(rpc);
216+
208217
return;
209218
}
210219
if (_inflights[hash].empty()) {
@@ -221,6 +230,74 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
221230
}
222231
}
223232

233+
void pegasus_mutation_duplicator::type_force_send_no_idempotent_if_need(duplicate_rpc &rpc)
234+
{
235+
236+
// there maybe more than one mutation in one dup rpc
237+
if (dsn::replication::FLAGS_force_send_no_idempotent_when_duplication) {
238+
for (auto entry : rpc.request().entries) {
239+
if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
240+
entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
241+
entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
242+
243+
METRIC_VAR_INCREMENT(dup_retry_no_idempotent_duplicate_qps);
244+
245+
dsn::message_ex *write =
246+
dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);
247+
248+
if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
249+
incr_rpc raw_rpc(write);
250+
absl::string_view unmarshall_key(raw_rpc.request().key.data(),
251+
raw_rpc.request().key.length());
252+
253+
LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing "
254+
"duplication,"
255+
"key is [{}]",
256+
unmarshall_key);
257+
continue;
258+
}
259+
260+
if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
261+
check_and_set_rpc raw_rpc(write);
262+
absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(),
263+
raw_rpc.request().hash_key.length());
264+
absl::string_view unmarshall_ori_sort_key(
265+
raw_rpc.request().check_sort_key.data(),
266+
raw_rpc.request().check_sort_key.length());
267+
absl::string_view unmarshall_set_sort_key(
268+
raw_rpc.request().set_sort_key.data(),
269+
raw_rpc.request().set_sort_key.length());
270+
271+
LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried "
272+
"when doing duplication,"
273+
"hash key [{}], check sort key [{}],"
274+
"set sort key [{}]",
275+
unmarshall_hash_key,
276+
unmarshall_ori_sort_key,
277+
unmarshall_set_sort_key);
278+
continue;
279+
}
280+
281+
if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
282+
check_and_mutate_rpc raw_rpc(write);
283+
absl::string_view unmarshall_hash_key(raw_rpc.request().hash_key.data(),
284+
raw_rpc.request().hash_key.length());
285+
absl::string_view unmarshall_ori_sort_key(
286+
raw_rpc.request().check_sort_key.data(),
287+
raw_rpc.request().check_sort_key.length());
288+
289+
LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been "
290+
"retried when doing duplication,"
291+
"hash key is [{}] , sort key is [{}] .",
292+
unmarshall_hash_key,
293+
unmarshall_ori_sort_key);
294+
continue;
295+
}
296+
}
297+
}
298+
}
299+
}
300+
224301
void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb)
225302
{
226303
_total_shipped_size = 0;

src/server/pegasus_mutation_duplicator.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator
7373

7474
void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err);
7575

76+
void type_force_send_no_idempotent_if_need(duplicate_rpc &rpc);
77+
7678
private:
7779
friend class pegasus_mutation_duplicator_test;
7880

@@ -91,6 +93,7 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator
9193

9294
METRIC_VAR_DECLARE_counter(dup_shipped_successful_requests);
9395
METRIC_VAR_DECLARE_counter(dup_shipped_failed_requests);
96+
METRIC_VAR_DECLARE_counter(dup_retry_no_idempotent_duplicate_qps);
9497
};
9598

9699
// Decodes the binary `request_data` into write request in thrift struct, and

src/server/pegasus_write_service.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ METRIC_DEFINE_counter(replica,
127127
"The number of DUPLICATE requests");
128128

129129
METRIC_DEFINE_counter(replica,
130-
no_idempotent_duplicate,
130+
force_receive_no_idempotent_duplicate_qps,
131131
dsn::metric_unit::kRequests,
132-
"The number of forced idempotent requests when doing duplication");
132+
"statistic the those no idempotent qps of DUPLICATE requests Force received");
133133

134134
METRIC_DEFINE_percentile_int64(replica,
135135
dup_time_lag_ms,
@@ -174,7 +174,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
174174
METRIC_VAR_INIT_replica(check_and_set_latency_ns),
175175
METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
176176
METRIC_VAR_INIT_replica(dup_requests),
177-
METRIC_VAR_INIT_replica(no_idempotent_duplicate),
177+
METRIC_VAR_INIT_replica(force_receive_no_idempotent_duplicate_qps),
178178
METRIC_VAR_INIT_replica(dup_time_lag_ms),
179179
METRIC_VAR_INIT_replica(dup_lagging_writes),
180180
_put_batch_size(0),
@@ -427,7 +427,7 @@ int pegasus_write_service::duplicate(int64_t decree,
427427
request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
428428
request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
429429
// receive no idempotent request from master cluster via duplication
430-
METRIC_VAR_INCREMENT(no_idempotent_duplicate);
430+
METRIC_VAR_INCREMENT(force_receive_no_idempotent_duplicate_qps);
431431

432432
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
433433
incr_rpc rpc(write);

src/server/pegasus_write_service.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ class pegasus_write_service : dsn::replication::replica_base
232232
METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns);
233233

234234
METRIC_VAR_DECLARE_counter(dup_requests);
235-
METRIC_VAR_DECLARE_counter(no_idempotent_duplicate);
235+
METRIC_VAR_DECLARE_counter(force_receive_no_idempotent_duplicate_qps);
236236
METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
237237
METRIC_VAR_DECLARE_counter(dup_lagging_writes);
238238

src/shell/command_helper.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,9 +813,11 @@ struct row_data
813813
incr_qps += row.incr_qps;
814814
check_and_set_qps += row.check_and_set_qps;
815815
check_and_mutate_qps += row.check_and_mutate_qps;
816+
force_receive_no_idempotent_duplicate_qps += row.force_receive_no_idempotent_duplicate_qps;
816817
scan_qps += row.scan_qps;
817818
duplicate_qps += row.duplicate_qps;
818819
dup_shipped_ops += row.dup_shipped_ops;
820+
dup_retry_no_idempotent_duplicate_qps += row.dup_retry_no_idempotent_duplicate_qps;
819821
dup_failed_shipping_ops += row.dup_failed_shipping_ops;
820822
dup_recent_mutation_loss_count += row.dup_recent_mutation_loss_count;
821823
recent_read_cu += row.recent_read_cu;
@@ -880,9 +882,11 @@ struct row_data
880882
double incr_qps = 0;
881883
double check_and_set_qps = 0;
882884
double check_and_mutate_qps = 0;
885+
double force_receive_no_idempotent_duplicate_qps = 0;
883886
double scan_qps = 0;
884887
double duplicate_qps = 0;
885888
double dup_shipped_ops = 0;
889+
double dup_retry_no_idempotent_duplicate_qps = 0;
886890
double dup_failed_shipping_ops = 0;
887891
double dup_recent_mutation_loss_count = 0;
888892
double recent_read_cu = 0;
@@ -954,12 +958,16 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
954958
row.check_and_set_qps += value;
955959
else if (counter_name == "check_and_mutate_qps")
956960
row.check_and_mutate_qps += value;
961+
else if (counter_name == "force_receive_no_idempotent_duplicate_qps")
962+
row.force_receive_no_idempotent_duplicate_qps += value;
957963
else if (counter_name == "scan_qps")
958964
row.scan_qps += value;
959965
else if (counter_name == "duplicate_qps")
960966
row.duplicate_qps += value;
961967
else if (counter_name == "dup_shipped_ops")
962968
row.dup_shipped_ops += value;
969+
else if (counter_name == "dup_retry_no_idempotent_duplicate_qps")
970+
row.dup_retry_no_idempotent_duplicate_qps += value;
963971
else if (counter_name == "dup_failed_shipping_ops")
964972
row.dup_failed_shipping_ops += value;
965973
else if (counter_name == "dup_recent_mutation_loss_count")

src/shell/commands/table_management.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,9 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args)
555555
sum.incr_qps += row.incr_qps;
556556
sum.check_and_set_qps += row.check_and_set_qps;
557557
sum.check_and_mutate_qps += row.check_and_mutate_qps;
558+
sum.force_receive_no_idempotent_duplicate_qps +=
559+
row.force_receive_no_idempotent_duplicate_qps;
560+
558561
sum.scan_qps += row.scan_qps;
559562
sum.recent_read_cu += row.recent_read_cu;
560563
sum.recent_write_cu += row.recent_write_cu;
@@ -651,6 +654,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args)
651654
tp.append_data(row.incr_qps);
652655
tp.append_data(row.check_and_set_qps);
653656
tp.append_data(row.check_and_mutate_qps);
657+
tp.append_data(row.force_receive_no_idempotent_duplicate_qps);
654658
tp.append_data(row.scan_qps);
655659
tp.append_data(row.recent_read_cu);
656660
tp.append_data(row.recent_write_cu);

0 commit comments

Comments
 (0)