Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions internal/blockchain/common/common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -43,6 +43,12 @@ type BlockchainCallbacks interface {
SetHandler(namespace string, handler blockchain.Callbacks)
SetOperationalHandler(namespace string, handler core.OperationCallbacks)

// BulkOperationUpdates is a synchronous way to update multiple operations and will return when the updates have been committed to the database or there has been an error
// An insertion ordering guarantee is only provided when this code is called on a single goroutine inside of the connector.
// It is the responsibility of the connector code to allocate that routine, and ensure that there is only one.
// Note: onComplete at each update level is not called, as this is a bulk operation and should be reponsibility of the caller to manage if needed.
BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error

OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject)
// Common logic for parsing a BatchPinOrNetworkAction event, and if not discarded to add it to the by-namespace map
PrepareBatchPinOrNetworkAction(ctx context.Context, events EventsToDispatch, subInfo *SubscriptionInfo, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef, params *BatchPinParams)
Expand All @@ -64,6 +70,17 @@ type callbacks struct {
opHandlers map[string]core.OperationCallbacks
}

// BulkOperationUpdates implements BlockchainCallbacks.
func (cb *callbacks) BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error {
if handler, ok := cb.opHandlers[namespace]; ok {
return handler.BulkOperationUpdates(ctx, updates)
}
// We don't want to error as it just means this update was not for this namespace
// This is unlikely to happen in practice, as the namespace is always passed in the operation handler
log.L(ctx).Errorf("No operation handler found for namespace '%s'", namespace)
return nil
}

type subscriptions struct {
subs map[string]*SubscriptionInfo
}
Expand Down Expand Up @@ -153,13 +170,15 @@ func (cb *callbacks) SetOperationalHandler(namespace string, handler core.Operat
func (cb *callbacks) OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID)
if handler, ok := cb.opHandlers[namespace]; ok {
handler.OperationUpdate(&core.OperationUpdate{
Plugin: plugin.Name(),
NamespacedOpID: nsOpID,
Status: status,
BlockchainTXID: blockchainTXID,
ErrorMessage: errorMessage,
Output: opOutput,
handler.OperationUpdate(&core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: plugin.Name(),
NamespacedOpID: nsOpID,
Status: status,
BlockchainTXID: blockchainTXID,
ErrorMessage: errorMessage,
Output: opOutput,
},
})
return
}
Expand Down
53 changes: 52 additions & 1 deletion internal/blockchain/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestCallbackOperationUpdate(t *testing.T) {
cb.SetOperationalHandler("ns1", mcb)

mbi.On("Name").Return("utblockchain")
mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == nsOpID &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "tx1" &&
Expand Down Expand Up @@ -455,3 +455,54 @@ func TestErrorWrappingNonConflict(t *testing.T) {
_, conforms := err.(operations.ConflictError)
assert.False(t, conforms)
}

func TestCallbackBulkOperationUpdate(t *testing.T) {
nsOpID := "ns1:" + fftypes.NewUUID().String()
nsOpID2 := "ns1:" + fftypes.NewUUID().String()

mbi := &blockchainmocks.Plugin{}
mcb := &coremocks.OperationCallbacks{}
cb := NewBlockchainCallbacks()
cb.SetOperationalHandler("ns1", mcb)

mbi.On("Name").Return("utblockchain")
mcb.On("BulkOperationUpdates", mock.Anything, mock.MatchedBy(func(updates []*core.OperationUpdate) bool {
assert.True(t, updates[0].NamespacedOpID == nsOpID &&
updates[0].Status == core.OpStatusSucceeded &&
updates[0].BlockchainTXID == "tx1" &&
updates[0].ErrorMessage == "err" &&
updates[0].Plugin == "utblockchain")

assert.True(t, updates[1].NamespacedOpID == nsOpID2 &&
updates[1].Status == core.OpStatusSucceeded &&
updates[1].BlockchainTXID == "tx2" &&
updates[1].ErrorMessage == "err" &&
updates[1].Plugin == "utblockchain")

return true
})).Return(nil).Once()

cb.BulkOperationUpdates(context.Background(), "ns1", []*core.OperationUpdate{
{
NamespacedOpID: nsOpID,
Status: core.OpStatusSucceeded,
BlockchainTXID: "tx1",
ErrorMessage: "err",
Output: fftypes.JSONObject{},
Plugin: "utblockchain",
},
{
NamespacedOpID: nsOpID2,
Status: core.OpStatusSucceeded,
BlockchainTXID: "tx2",
ErrorMessage: "err",
Output: fftypes.JSONObject{},
Plugin: "utblockchain",
},
})

// No Handler
cb.BulkOperationUpdates(context.Background(), "ns2", []*core.OperationUpdate{})

mcb.AssertExpectations(t)
}
8 changes: 4 additions & 4 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -1825,7 +1825,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
"transactionIndex": "0"
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8" &&
Expand Down Expand Up @@ -1911,7 +1911,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) {
"updated": "2022-08-03T18:55:43.781941Z"
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusPending &&
update.BlockchainTXID == "0x929c898a46762d91e9f4b0b8e2800863dcf4a40f694109dc4cd19dbd334fa4cc" &&
Expand Down Expand Up @@ -1955,7 +1955,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {

em := &coremocks.OperationCallbacks{}
e.SetOperationHandler("ns1", em)
txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusFailed &&
update.ErrorMessage == "Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument" &&
Expand Down
6 changes: 3 additions & 3 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -1778,7 +1778,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
"receivedAt": 1630033474675
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" &&
Expand Down Expand Up @@ -1839,7 +1839,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {
"transactionHash": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusFailed &&
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" &&
Expand Down
6 changes: 3 additions & 3 deletions internal/blockchain/tezos/tezos_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
}
}`)

tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "ooGcrcazgcGBrY1iym329ovV13MnWrTmV1fttCwWKH5DiYUQsiq" &&
Expand Down Expand Up @@ -769,7 +769,7 @@ func TestHandleReceiptTXUpdateTezosConnect(t *testing.T) {
"updated": "2023-09-10T14:49:36.030604Z"
}`)

tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusPending &&
update.BlockchainTXID == "onhZJDmz5JihnW1RaZ96f17FgUBv3GoERkRECK3XVFt1kL5E6Yy" &&
Expand Down
4 changes: 3 additions & 1 deletion internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -316,4 +316,6 @@ var (
MsgFiltersEmpty = ffe("FF10475", "No filters specified in contract listener: %s.", 500)
MsgContractListenerBlockchainFilterLimit = ffe("FF10476", "Blockchain plugin only supports one filter for contract listener: %s.", 500)
MsgDuplicateContractListenerFilterLocation = ffe("FF10477", "Duplicate filter provided for contract listener for location", 400)
MsgInvalidNamespaceForOperationUpdate = ffe("FF10478", "Received different namespace for operation update '%s' than expected for manager '%s'")
MsgEmptyPluginForOperationUpdate = ffe("FF10479", "Received empty plugin for operation update '%s'")
)
98 changes: 55 additions & 43 deletions internal/dataexchange/ffdx/dxevent.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -90,71 +90,83 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) {

switch msg.Type {
case messageFailed:
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusFailed,
ErrorMessage: msg.Error,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusFailed,
ErrorMessage: msg.Error,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case messageDelivered:
status := core.OpStatusSucceeded
if h.capabilities.Manifest {
status = core.OpStatusPending
}
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: status,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: status,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case messageAcknowledged:
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusSucceeded,
VerifyManifest: h.capabilities.Manifest,
DXManifest: msg.Manifest,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusSucceeded,
VerifyManifest: h.capabilities.Manifest,
DXManifest: msg.Manifest,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case blobFailed:
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusFailed,
ErrorMessage: msg.Error,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusFailed,
ErrorMessage: msg.Error,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case blobDelivered:
status := core.OpStatusSucceeded
if h.capabilities.Manifest {
status = core.OpStatusPending
}
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: status,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: status,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case blobAcknowledged:
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusSucceeded,
Output: msg.Info,
VerifyManifest: h.capabilities.Manifest,
DXHash: msg.Hash,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusSucceeded,
Output: msg.Info,
VerifyManifest: h.capabilities.Manifest,
DXHash: msg.Hash,
},
OnComplete: e.Ack,
})
return

Expand Down
4 changes: 2 additions & 2 deletions internal/dataexchange/ffdx/ffdx.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -68,7 +68,7 @@ type callbacks struct {
opHandlers map[string]core.OperationCallbacks
}

func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdate) {
func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdateAsync) {
namespace, _, _ := core.ParseNamespacedOpID(ctx, update.NamespacedOpID)
if handler, ok := cb.opHandlers[namespace]; ok {
handler.OperationUpdate(update)
Expand Down
Loading