Skip to content

Conversation

@eliastam
Copy link
Collaborator

@eliastam eliastam commented Dec 5, 2025

@allenss-amazon This is a draft PR, I will raise a revision with unit test and integration tests as well

changes:

  1. created ft_internal_update with admin flag. this command will be used to replicate create/drop with global version and index metadata version and fingerprint. Customers will not be allowed to call it
  2. gRPC broadcast only broadcasts to primaries.
  3. replicas will only receive one information via replication stream (ft_internal_update) (fixes dual source of truth)
  4. for CMD cluster no change happen we still replicate create/ drop as is.

Failure modes behaviour

  1. One Primary is down: No change in customer experience (we will get Unable to contact all cluster members).
    Once the node join back the cluster(or performed failover), it will get the index via gRPC (then it sends it to its replicas via replication stream)
  2. One Replica is down, No change in customer experience. create and drop will return ok.
    Replica will get the index from its primary later when it joins the cluster.
  3. corrupted binary in ft.internal_update will crash the instance and increase metric where we can use to track those events.
    Added a config to allow skipping that command to avoid a poison pill situation as an action that the operator could do.

Testing

  1. integration test passes
  2. manual testing
    Note: if you want to test the code you need to get this fix as well to fix replica crash during drop index
    Fix replica crash by using legacy ACL path during replication #510

CME

  1. Show cluster structure
    echo "=== CME (Cluster Mode) Test ===" && echo "" && echo "Cluster Structure:" && /tmp/valkey/src/valkey-cli -c -p 7000 CLUSTER NODES
=== CME (Cluster Mode) Test ===

Cluster Structure:
f78399f13c864d6c3d3ca4f6f3c300857102d419 127.0.0.1:7000@17000 myself,master - 0 0 1 connected 0-8191
10a60c6587d8f35b2e2be47241335e40b61294b9 127.0.0.1:7002@17002 slave 4519ea74c5aab5a91d58033be7dee8eaa714669d 0 1764965420000 2 connected
4519ea74c5aab5a91d58033be7dee8eaa714669d 127.0.0.1:7001@17001 master - 0 1764965420214 2 connected 8192-16383
8b37362676cd02c47eca8b5eb6ae46467e0d5614 127.0.0.1:7003@17003 slave f78399f13c864d6c3d3ca4f6f3c300857102d419 0 1764965421220 1 connected
  1. show cluster initial status

echo "Initial Index Status:" && echo "Node 7000 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7000 FT._LIST && echo "Node 7001 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7001 FT._LIST && echo "Node 7002 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7002 FT._LIST && echo "Node 7003 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7003 FT._LIST

Initial Index Status:
Node 7000 (primary):

Node 7001 (primary):

Node 7002 (replica):

Node 7003 (replica):

  1. Creating an index in primay 1 (7000)
    /tmp/valkey/src/valkey-cli -c -p 7000 FT.CREATE cluster_demo SCHEMA field VECTOR HNSW 6 TYPE FLOAT32 DIM 128 DISTANCE_METRIC L2

OK

  1. FT.List after creating:
After creating index on node 7000:
Node 7000 (primary):
cluster_demo
Node 7001 (primary):
cluster_demo
Node 7002 (replica):
cluster_demo
Node 7003 (replica):
cluster_demo
  1. drop from the other primary (7001) and list

/tmp/valkey/src/valkey-cli -c -p 7001 FT.DROPINDEX cluster_demo && echo "After dropping index:" && echo "Node 7000 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7000 FT._LIST && echo "Node 7001 (primary):" && /tmp/valkey/src/valkey-cli -c -p 7001 FT._LIST && echo "Node 7002 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7002 FT._LIST && echo "Node 7003 (replica):" && /tmp/valkey/src/valkey-cli -c -p 7003 FT._LIST

OK
After dropping index:
Node 7000 (primary):

Node 7001 (primary):

Node 7002 (replica):

Node 7003 (replica):y

7.AOF files for reference:
note

Node 7000 AOF:

*2
$6
SELECT
$1
0
*4
$18
FT.INTERNAL_UPDATE
$12
cluster_demo
$137
�������"z
8type.googleapis.com/valkey_search.data_model.IndexSchema>

cluster_demo-�?2%
fieldfield                                                                                                                                                 � (�P�
@
$13                                                                                                                                                        ���������
*4
$18
FT.INTERNAL_UPDATE
$12
cluster_demo
$2
$13                                                                                                                                                        ���������

Node 7001 AOF:

*2
$6
SELECT
$1
0
*4
$18
FT.INTERNAL_UPDATE
$12
cluster_demo
$137
�������"z
8type.googleapis.com/valkey_search.data_model.IndexSchema>

cluster_demo-�?2%
fieldfield                                                                                                                                                 � (�P�
@
$0

*4
$18
FT.INTERNAL_UPDATE
$12
cluster_demo
$2
$12                                                                                                                                                        ������֩=

CMD test

  1. create and list
    echo "=== CMD (Primary-Replica) Test ===" && echo "Creating cluster_demo on primary:" && /tmp/valkey/src/valkey-cli -p 8888 FT.CREATE cluster_demo SCHEMA field VECTOR HNSW 6 TYPE FLOAT32 DIM 128 DISTANCE_METRIC L2 && echo "Listing indexes:" && echo "Primary (8888):" && /tmp/valkey/src/valkey-cli -p 8888 FT._LIST && echo "Replica (8889):" && /tmp/valkey/src/valkey-cli -p 8889 FT._LIST

=== CMD (Primary-Replica) Test ===
Creating cluster_demo on primary:
OK
Listing indexes:
Primary (8888):
cluster_demo
Replica (8889):
cluster_demo
  1. drop

/tmp/valkey/src/valkey-cli -p 8888 FT.DROPINDEX cluster_demo && echo "Primary (8888):" && /tmp/valkey/src/valkey-cli -p 8888 FT._LIST && echo "Replica (8889):" && /tmp/valkey/src/valkey-cli -p 8889 FT._LIST

OK
Primary (8888):

Replica (8889):

AOF file: (same bevahiour)

*2
$6
SELECT
$1
0
*13
$9
FT.CREATE
$10
test_clean
$6
SCHEMA
$5
field
$6
VECTOR
$4
HNSW
$1
6
$4
TYPE
$7
FLOAT32
$3
DIM
$3
128
$15
DISTANCE_METRIC
$2
L2
*2
$12
FT.DROPINDEX
$10
test_clean

Copy link
Member

@allenss-amazon allenss-amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks OK to me.

We will want to get this reviewed by the original author of the metadata handling system (Jacob @ GCP) when it's all done.

@eliastam eliastam force-pushed the index-replication-feature branch 9 times, most recently from 7d64a1b to 2e0c761 Compare December 16, 2025 01:14
Comment on lines 47 to 50
if (argc != kFTInternalUpdateArgCount) {
return absl::InvalidArgumentError(
"ERR wrong number of arguments for FT_INTERNAL_UPDATE");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, if you've registered the command with 4 args, this can't happen. Maybe a CHECK is the right thing here?

const char *header_data = ValkeyModule_StringPtrLen(argv[3], &header_len);
coordinator::GlobalMetadataVersionHeader version_header;
if (!version_header.ParseFromArray(header_data, header_len)) {
return HandleParseFailure(ctx, "GlobalMetadataVersionHeader", header_len,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

@eliastam eliastam force-pushed the index-replication-feature branch 6 times, most recently from 8343573 to 0cf9994 Compare January 7, 2026 18:47
metadata.mutable_version_header()->set_top_level_min_version(
top_level_min_version);

metadata_ = metadata;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this redundant with line 213?

}

std::vector<vmsdk::cluster_map::NodeInfo> MetadataManager::GetPrimaryNodes(
ValkeyModuleCtx *ctx) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx isn't used, right? I think it should be removed from the signature. Also, I'm still unclear on why you need to be doing a refresh of the cluster map here. If this is to work around an initialization problem, let's solve that in initialization.

Copy link
Collaborator

@murphyjacob4 murphyjacob4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall makes a lot of sense. Converging these all into an internal command and then serializing that to the replication stream solves most of the problems. Mostly want to double check that we are properly guarding against BroadcastMetadata on replicas

We may want to start out with broadcasting to all nodes, and only accept the broadcast if we are a primary. This way we can guard against propagation delays.

It is okay to accept a metadata broadcast iff you are a primary locally. The remote view of whether that node is a primary won't matter (it could also be out of sync). If we reconcile metadata locally while being demoted, this is okay, but it may need to full sync to our new primary (our offset will be ahead of the new primary, which we can't recover from)


// Call FT.INTERNAL_UPDATE for coordinator to ensure unified AOF replication
CallFTInternalUpdate(new_entry, metadata.version_header(), encoded_id,
"CreateEntry");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, maybe we can also rename the function? I know right now it is just create, but if we later support alter, it would make sense to just support upsert.

Suggested change
"CreateEntry");
"UpsertEntry");

kMetadataBroadcastClusterMessageReceiverId,
payload.c_str(), payload.size());

auto primary_nodes = GetPrimaryNodes(ctx);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the local node's view is stale, I guess we could send it to a primary that is now demoted. I think we need to double check our primaryship when we receive a metadata message as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

(*mutable_entries)[id].set_fingerprint(fingerprint);
(*mutable_entries)[id].set_encoding_version(
rt_it->second.encoding_version);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check we are a primary when we get broadcasted metadata? As a replica, we should never even request the metadata on a conflict since we are going to wait for our primary

Comment on lines 981 to 986
if (!vmsdk::IsMainThread()) {
thread_safe_ctx =
vmsdk::MakeUniqueValkeyDetachedThreadSafeContext(detached_ctx_.get());
ValkeyModule_ThreadSafeContextLock(thread_safe_ctx.get());
safe_context = thread_safe_ctx.get();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we made it a constraint that metadata would only get processed by the main thread for simplicity. My mental model may be slightly out of date though. Is that no longer true?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right resolved it

@murphyjacob4
Copy link
Collaborator

Also - have we thought through upgrade/downgrade? We will enqueue these metadata changes into the replication buffer which may cause problems on downlevel versions

@eliastam eliastam force-pushed the index-replication-feature branch 3 times, most recently from 634a341 to e0e3ad5 Compare January 15, 2026 02:47
@eliastam eliastam force-pushed the index-replication-feature branch 4 times, most recently from 86e108e to 65ce359 Compare January 15, 2026 05:16
Created ft_internal_update with admin flag. This command will be used to replicate create/drop with global version and index metadata version and fingerprint. Customers will not be allowed to call it.

gRPC broadcast only happen to primaries.
Replicas will only receive information via replication stream (ft_internal_update).
For CMD cluster no change happen we still replicate create/drop as is.

Signed-off-by: Elias Tamraz <[email protected]>

// Helper function to handle parse failures with poison pill recovery
absl::Status HandleInternalUpdateFailure(
ValkeyModuleCtx *ctx, bool success, const std::string &operation_type,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is success and status redundant? It seems like we set success == status.ok. Maybe we just use status?

VMSDK_LOG(WARNING, ctx)
<< "Failed during CreateEntryOnReplica callback for type " << type_name
<< ", id " << id << " from " << "CreateEntryOnReplica";
Metrics::GetStats().process_internal_update_callback_failures_cnt++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does incrementing the stat/logging really matter if we just crash next line?

"during loading";

auto obj_name = ObjName::Decode(id);
auto callback_result = TriggerCallbacks(type_name, obj_name, *metadata_entry);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just return this status if it isn't OK? The only other exit path is returning status OK. We can handle the error in the calling function

entry.SerializeToString(&metadata_binary);
header.SerializeToString(&header_binary);

ValkeyModuleCallReply *reply = ValkeyModule_Call(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is only executed on primaries, right? Via FT.CREATE and FT.DROP. And FT._INTERNAL_UPDATE just replicates on primaries with no effect, right?

Should we replace this call with a simple call to VM_Replicate?

return absl::OkStatus();
}

absl::Status MetadataManager::CallFTInternalUpdateForReconciliation(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this code is very similar to the code below. Can we unify to one function? It seems like reconciliation is basically identical to CallFTInternalUpdate(proposed_entry, metadata_.Get().version_header()). Am I missing something?

@eliastam eliastam force-pushed the index-replication-feature branch 7 times, most recently from cb114c3 to b726d63 Compare January 21, 2026 07:24
- Remove redundant success parameter from HandleInternalUpdateFailure
- Remove unnecessary logging before crashes
- Return error status instead of crashing
- Replace ValkeyModule_Call with ValkeyModule_Replicate
- Rename CallFTInternalUpdate to ReplicateFTInternalUpdate

Signed-off-by: Elias Tamraz <[email protected]>
@eliastam eliastam force-pushed the index-replication-feature branch from b726d63 to 343960e Compare January 21, 2026 07:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants