Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/privatemessaging/operations.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -130,6 +130,7 @@ func (pm *privateMessaging) PrepareOperation(ctx context.Context, op *core.Opera
return nil, err
}
transport := &core.TransportWrapper{Group: group, Batch: batch}
pm.prepareBatchForNetworkTransport(ctx, transport)
return opSendBatch(op, node, transport), nil

default:
Expand Down
3 changes: 2 additions & 1 deletion internal/privatemessaging/operations_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -149,6 +149,7 @@ func TestPrepareAndRunBatchSend(t *testing.T) {
assert.Equal(t, node, po.Data.(batchSendData).Node)
assert.Equal(t, group, po.Data.(batchSendData).Transport.Group)
assert.Equal(t, batch, po.Data.(batchSendData).Transport.Batch)
assert.Equal(t, "ns1-remote", po.Data.(batchSendData).Transport.Batch.Namespace) // ensure its set to the network name not the local namespace name

_, phase, err := pm.RunOperation(context.Background(), po)

Expand Down
12 changes: 10 additions & 2 deletions internal/privatemessaging/privatemessaging.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -179,12 +179,20 @@ func (pm *privateMessaging) dispatchUnpinnedBatch(ctx context.Context, payload *
return pm.dispatchBatchCommon(ctx, payload)
}

// prepareBatchForNetworkTransport is mainly for documentation purposes, we need to "normalize" a batch before sending
// it over the network to other parties, so that all nodes see the same batch, regardless of what they call their local
// namespace. This is used when we dispatch a batch per the regular messaging flow, and when we retry a private messaging
// send operation.
func (pm *privateMessaging) prepareBatchForNetworkTransport(ctx context.Context, tw *core.TransportWrapper) {
tw.Batch.Namespace = pm.namespace.NetworkName
}

func (pm *privateMessaging) dispatchBatchCommon(ctx context.Context, payload *batch.DispatchPayload) error {
batch := payload.Batch.GenInflight(payload.Messages, payload.Data)
batch.Namespace = pm.namespace.NetworkName
tw := &core.TransportWrapper{
Batch: batch,
}
pm.prepareBatchForNetworkTransport(ctx, tw)

// Retrieve the group
group, nodes, err := pm.groupManager.getGroupNodes(ctx, batch.Group, false /* fail if not found */)
Expand Down
4 changes: 2 additions & 2 deletions internal/privatemessaging/privatemessaging_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -84,7 +84,7 @@ func newTestPrivateMessagingCommon(t *testing.T, metricsEnabled bool) (*privateM
mmi.On("IsMetricsEnabled").Return(metricsEnabled)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1-remote"}
pm, err := NewPrivateMessaging(ctx, ns, mdi, mdx, mbi, mim, mba, mdm, msa, mmp, mmi, mom, cmi)
assert.NoError(t, err)
cmi.AssertCalled(t, "GetCache", cache.NewCacheConfig(
Expand Down
4 changes: 2 additions & 2 deletions internal/privatemessaging/recipients_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestResolveMemberListNewGroupE2E(t *testing.T) {
um.RunFn = func(a mock.Arguments) {
msg := a[1].(*core.Message)
assert.Equal(t, core.MessageTypeGroupInit, msg.Header.Type)
assert.Equal(t, "ns1", msg.Header.Namespace)
assert.Equal(t, "ns1-remote", msg.Header.Namespace) // note this matches the remote network name, not the local namespace
assert.Len(t, msg.Data, 1)
assert.Equal(t, *dataID, *msg.Data[0].ID)
}
Expand Down