|
35 | 35 | #include "common/common.h"
|
36 | 36 | #include "common/duplication_common.h"
|
37 | 37 | #include "duplication_internal_types.h"
|
| 38 | +#include "gutil/map_util.h" |
38 | 39 | #include "pegasus/client.h"
|
39 | 40 | #include "pegasus_key_schema.h"
|
40 | 41 | #include "rpc/rpc_message.h"
|
@@ -238,28 +239,42 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
|
238 | 239 | auto batch_request = std::make_unique<dsn::apps::duplicate_request>();
|
239 | 240 | uint batch_count = 0;
|
240 | 241 | uint batch_bytes = 0;
|
| 242 | + // The rpc codes should be ignored: |
| 243 | + // - RPC_RRDB_RRDB_DUPLICATE: Now not supports duplicating the deuplicate mutations to the |
| 244 | + // remote cluster. |
| 245 | + // - RPC_RRDB_RRDB_BULK_LOAD: Now not supports the control flow RPC. |
| 246 | + const static std::set<int> ingnored_rpc_code = {dsn::apps::RPC_RRDB_RRDB_DUPLICATE, |
| 247 | + dsn::apps::RPC_RRDB_RRDB_BULK_LOAD}; |
| 248 | + |
241 | 249 | for (auto mut : muts) {
|
242 | 250 | // mut: 0=timestamp, 1=rpc_code, 2=raw_message
|
243 | 251 | batch_count++;
|
244 | 252 | dsn::task_code rpc_code = std::get<1>(mut);
|
245 | 253 | dsn::blob raw_message = std::get<2>(mut);
|
246 | 254 | auto dreq = std::make_unique<dsn::apps::duplicate_request>();
|
247 | 255 |
|
248 |
| - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { |
249 |
| - // ignore if it is a DUPLICATE |
250 |
| - // Because DUPLICATE comes from other clusters should not be forwarded to any other |
251 |
| - // destinations. A DUPLICATE is meant to be targeting only one cluster. |
| 256 | + if (gutil::ContainsKey(ingnored_rpc_code, rpc_code)) { |
| 257 | + // It it do not recommend to use bulkload and normal writing in the same app, |
| 258 | + // it may also cause inconsistency between actual data and expected data |
| 259 | + // And duplication will not dup the data of bulkload to backup clusters, |
| 260 | + // if you want to force use it, you can permit this risk in you own way on the clusters |
| 261 | + // you maintenance. For example, you can do bulkload both on master-clusters and |
| 262 | + // backup-cluster (with duplication enable) at the same time, but this will inevitably |
| 263 | + // cause data inconsistency problems. |
| 264 | + if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) { |
| 265 | + LOG_DEBUG_PREFIX("Ignore sending bulkload rpc when doing duplication"); |
| 266 | + } |
252 | 267 | continue;
|
253 |
| - } else { |
254 |
| - dsn::apps::duplicate_entry entry; |
255 |
| - entry.__set_raw_message(raw_message); |
256 |
| - entry.__set_task_code(rpc_code); |
257 |
| - entry.__set_timestamp(std::get<0>(mut)); |
258 |
| - entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id()); |
259 |
| - batch_request->entries.emplace_back(std::move(entry)); |
260 |
| - batch_bytes += raw_message.length(); |
261 | 268 | }
|
262 | 269 |
|
| 270 | + dsn::apps::duplicate_entry entry; |
| 271 | + entry.__set_raw_message(raw_message); |
| 272 | + entry.__set_task_code(rpc_code); |
| 273 | + entry.__set_timestamp(std::get<0>(mut)); |
| 274 | + entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id()); |
| 275 | + batch_request->entries.emplace_back(std::move(entry)); |
| 276 | + batch_bytes += raw_message.length(); |
| 277 | + |
263 | 278 | if (batch_count == muts.size() || batch_bytes >= FLAGS_duplicate_log_batch_bytes ||
|
264 | 279 | batch_bytes >= dsn::replication::FLAGS_dup_max_allowed_write_size) {
|
265 | 280 | // since all the plog's mutations of replica belong to same gpid though the hash of
|
|
0 commit comments