diff --git a/internal/blockchain/common/common.go b/internal/blockchain/common/common.go index 564b9da0f8..0393b45b74 100644 --- a/internal/blockchain/common/common.go +++ b/internal/blockchain/common/common.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -43,6 +43,12 @@ type BlockchainCallbacks interface { SetHandler(namespace string, handler blockchain.Callbacks) SetOperationalHandler(namespace string, handler core.OperationCallbacks) + // BulkOperationUpdates is a synchronous way to update multiple operations and will return when the updates have been committed to the database or there has been an error + // An insertion ordering guarantee is only provided when this code is called on a single goroutine inside of the connector. + // It is the responsibility of the connector code to allocate that routine, and ensure that there is only one. + // Note: onComplete at each update level is not called, as this is a bulk operation and should be reponsibility of the caller to manage if needed. + BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error + OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) // Common logic for parsing a BatchPinOrNetworkAction event, and if not discarded to add it to the by-namespace map PrepareBatchPinOrNetworkAction(ctx context.Context, events EventsToDispatch, subInfo *SubscriptionInfo, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef, params *BatchPinParams) @@ -64,6 +70,17 @@ type callbacks struct { opHandlers map[string]core.OperationCallbacks } +// BulkOperationUpdates implements BlockchainCallbacks. +func (cb *callbacks) BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error { + if handler, ok := cb.opHandlers[namespace]; ok { + return handler.BulkOperationUpdates(ctx, updates) + } + // We don't want to error as it just means this update was not for this namespace + // This is unlikely to happen in practice, as the namespace is always passed in the operation handler + log.L(ctx).Errorf("No operation handler found for namespace '%s'", namespace) + return nil +} + type subscriptions struct { subs map[string]*SubscriptionInfo } @@ -153,13 +170,15 @@ func (cb *callbacks) SetOperationalHandler(namespace string, handler core.Operat func (cb *callbacks) OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) if handler, ok := cb.opHandlers[namespace]; ok { - handler.OperationUpdate(&core.OperationUpdate{ - Plugin: plugin.Name(), - NamespacedOpID: nsOpID, - Status: status, - BlockchainTXID: blockchainTXID, - ErrorMessage: errorMessage, - Output: opOutput, + handler.OperationUpdate(&core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + Plugin: plugin.Name(), + NamespacedOpID: nsOpID, + Status: status, + BlockchainTXID: blockchainTXID, + ErrorMessage: errorMessage, + Output: opOutput, + }, }) return } diff --git a/internal/blockchain/common/common_test.go b/internal/blockchain/common/common_test.go index f20c5f1a4c..b59b27a4c4 100644 --- a/internal/blockchain/common/common_test.go +++ b/internal/blockchain/common/common_test.go @@ -44,7 +44,7 @@ func TestCallbackOperationUpdate(t *testing.T) { cb.SetOperationalHandler("ns1", mcb) mbi.On("Name").Return("utblockchain") - mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == nsOpID && update.Status == core.OpStatusSucceeded && update.BlockchainTXID == "tx1" && @@ -455,3 +455,54 @@ func TestErrorWrappingNonConflict(t *testing.T) { _, conforms := err.(operations.ConflictError) assert.False(t, conforms) } + +func TestCallbackBulkOperationUpdate(t *testing.T) { + nsOpID := "ns1:" + fftypes.NewUUID().String() + nsOpID2 := "ns1:" + fftypes.NewUUID().String() + + mbi := &blockchainmocks.Plugin{} + mcb := &coremocks.OperationCallbacks{} + cb := NewBlockchainCallbacks() + cb.SetOperationalHandler("ns1", mcb) + + mbi.On("Name").Return("utblockchain") + mcb.On("BulkOperationUpdates", mock.Anything, mock.MatchedBy(func(updates []*core.OperationUpdate) bool { + assert.True(t, updates[0].NamespacedOpID == nsOpID && + updates[0].Status == core.OpStatusSucceeded && + updates[0].BlockchainTXID == "tx1" && + updates[0].ErrorMessage == "err" && + updates[0].Plugin == "utblockchain") + + assert.True(t, updates[1].NamespacedOpID == nsOpID2 && + updates[1].Status == core.OpStatusSucceeded && + updates[1].BlockchainTXID == "tx2" && + updates[1].ErrorMessage == "err" && + updates[1].Plugin == "utblockchain") + + return true + })).Return(nil).Once() + + cb.BulkOperationUpdates(context.Background(), "ns1", []*core.OperationUpdate{ + { + NamespacedOpID: nsOpID, + Status: core.OpStatusSucceeded, + BlockchainTXID: "tx1", + ErrorMessage: "err", + Output: fftypes.JSONObject{}, + Plugin: "utblockchain", + }, + { + NamespacedOpID: nsOpID2, + Status: core.OpStatusSucceeded, + BlockchainTXID: "tx2", + ErrorMessage: "err", + Output: fftypes.JSONObject{}, + Plugin: "utblockchain", + }, + }) + + // No Handler + cb.BulkOperationUpdates(context.Background(), "ns2", []*core.OperationUpdate{}) + + mcb.AssertExpectations(t) +} diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 739a3e2989..f1e39cfe81 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -1825,7 +1825,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { "transactionIndex": "0" }`) - em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+operationID.String() && update.Status == core.OpStatusSucceeded && update.BlockchainTXID == "0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8" && @@ -1911,7 +1911,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) { "updated": "2022-08-03T18:55:43.781941Z" }`) - em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+operationID.String() && update.Status == core.OpStatusPending && update.BlockchainTXID == "0x929c898a46762d91e9f4b0b8e2800863dcf4a40f694109dc4cd19dbd334fa4cc" && @@ -1955,7 +1955,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { em := &coremocks.OperationCallbacks{} e.SetOperationHandler("ns1", em) - txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+operationID.String() && update.Status == core.OpStatusFailed && update.ErrorMessage == "Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument" && diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 23e14b2109..0480abc565 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -1778,7 +1778,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { "receivedAt": 1630033474675 }`) - em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+operationID.String() && update.Status == core.OpStatusSucceeded && update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" && @@ -1839,7 +1839,7 @@ func TestHandleReceiptFailedTx(t *testing.T) { "transactionHash": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" }`) - em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+operationID.String() && update.Status == core.OpStatusFailed && update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" && diff --git a/internal/blockchain/tezos/tezos_test.go b/internal/blockchain/tezos/tezos_test.go index 6ed39ae656..6df9b89ffd 100644 --- a/internal/blockchain/tezos/tezos_test.go +++ b/internal/blockchain/tezos/tezos_test.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -685,7 +685,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { } }`) - tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+operationID.String() && update.Status == core.OpStatusSucceeded && update.BlockchainTXID == "ooGcrcazgcGBrY1iym329ovV13MnWrTmV1fttCwWKH5DiYUQsiq" && @@ -769,7 +769,7 @@ func TestHandleReceiptTXUpdateTezosConnect(t *testing.T) { "updated": "2023-09-10T14:49:36.030604Z" }`) - tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+operationID.String() && update.Status == core.OpStatusPending && update.BlockchainTXID == "onhZJDmz5JihnW1RaZ96f17FgUBv3GoERkRECK3XVFt1kL5E6Yy" && diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index 8cccd5c344..e91a17d7f7 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -316,4 +316,6 @@ var ( MsgFiltersEmpty = ffe("FF10475", "No filters specified in contract listener: %s.", 500) MsgContractListenerBlockchainFilterLimit = ffe("FF10476", "Blockchain plugin only supports one filter for contract listener: %s.", 500) MsgDuplicateContractListenerFilterLocation = ffe("FF10477", "Duplicate filter provided for contract listener for location", 400) + MsgInvalidNamespaceForOperationUpdate = ffe("FF10478", "Received different namespace for operation update '%s' than expected for manager '%s'") + MsgEmptyPluginForOperationUpdate = ffe("FF10479", "Received empty plugin for operation update '%s'") ) diff --git a/internal/dataexchange/ffdx/dxevent.go b/internal/dataexchange/ffdx/dxevent.go index 3043d05950..372e673183 100644 --- a/internal/dataexchange/ffdx/dxevent.go +++ b/internal/dataexchange/ffdx/dxevent.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -90,13 +90,15 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { switch msg.Type { case messageFailed: - h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{ - Plugin: h.Name(), - NamespacedOpID: msg.RequestID, - Status: core.OpStatusFailed, - ErrorMessage: msg.Error, - Output: msg.Info, - OnComplete: e.Ack, + h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + Plugin: h.Name(), + NamespacedOpID: msg.RequestID, + Status: core.OpStatusFailed, + ErrorMessage: msg.Error, + Output: msg.Info, + }, + OnComplete: e.Ack, }) return case messageDelivered: @@ -104,33 +106,39 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { if h.capabilities.Manifest { status = core.OpStatusPending } - h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{ - Plugin: h.Name(), - NamespacedOpID: msg.RequestID, - Status: status, - Output: msg.Info, - OnComplete: e.Ack, + h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + Plugin: h.Name(), + NamespacedOpID: msg.RequestID, + Status: status, + Output: msg.Info, + }, + OnComplete: e.Ack, }) return case messageAcknowledged: - h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{ - Plugin: h.Name(), - NamespacedOpID: msg.RequestID, - Status: core.OpStatusSucceeded, - VerifyManifest: h.capabilities.Manifest, - DXManifest: msg.Manifest, - Output: msg.Info, - OnComplete: e.Ack, + h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + Plugin: h.Name(), + NamespacedOpID: msg.RequestID, + Status: core.OpStatusSucceeded, + VerifyManifest: h.capabilities.Manifest, + DXManifest: msg.Manifest, + Output: msg.Info, + }, + OnComplete: e.Ack, }) return case blobFailed: - h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{ - Plugin: h.Name(), - NamespacedOpID: msg.RequestID, - Status: core.OpStatusFailed, - ErrorMessage: msg.Error, - Output: msg.Info, - OnComplete: e.Ack, + h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + Plugin: h.Name(), + NamespacedOpID: msg.RequestID, + Status: core.OpStatusFailed, + ErrorMessage: msg.Error, + Output: msg.Info, + }, + OnComplete: e.Ack, }) return case blobDelivered: @@ -138,23 +146,27 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { if h.capabilities.Manifest { status = core.OpStatusPending } - h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{ - Plugin: h.Name(), - NamespacedOpID: msg.RequestID, - Status: status, - Output: msg.Info, - OnComplete: e.Ack, + h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + Plugin: h.Name(), + NamespacedOpID: msg.RequestID, + Status: status, + Output: msg.Info, + }, + OnComplete: e.Ack, }) return case blobAcknowledged: - h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{ - Plugin: h.Name(), - NamespacedOpID: msg.RequestID, - Status: core.OpStatusSucceeded, - Output: msg.Info, - VerifyManifest: h.capabilities.Manifest, - DXHash: msg.Hash, - OnComplete: e.Ack, + h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + Plugin: h.Name(), + NamespacedOpID: msg.RequestID, + Status: core.OpStatusSucceeded, + Output: msg.Info, + VerifyManifest: h.capabilities.Manifest, + DXHash: msg.Hash, + }, + OnComplete: e.Ack, }) return diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index 00c63dd53a..8b83aefef1 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -68,7 +68,7 @@ type callbacks struct { opHandlers map[string]core.OperationCallbacks } -func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdate) { +func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdateAsync) { namespace, _, _ := core.ParseNamespacedOpID(ctx, update.NamespacedOpID) if handler, ok := cb.opHandlers[namespace]; ok { handler.OperationUpdate(update) diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 63efbcd8d0..2f9ba85836 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -142,7 +142,7 @@ func TestInitMissingURL(t *testing.T) { func opAcker() func(args mock.Arguments) { return func(args mock.Arguments) { - args[0].(*core.OperationUpdate).OnComplete() + args[0].(*core.OperationUpdateAsync).OnComplete() } } @@ -492,7 +492,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) { assert.NoError(t, err) namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID1 && ev.Status == core.OpStatusFailed && ev.ErrorMessage == "pop" && @@ -503,7 +503,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg)) namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID2 && ev.Status == core.OpStatusSucceeded && ev.Plugin == "ffdx" @@ -513,7 +513,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg)) namespacedID3 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID3 && ev.Status == core.OpStatusSucceeded && ev.DXManifest == `{"manifest":true}` && @@ -552,7 +552,7 @@ func TestMessageEvents(t *testing.T) { assert.NoError(t, err) namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID1 && ev.Status == core.OpStatusFailed && ev.ErrorMessage == "pop" && @@ -563,7 +563,7 @@ func TestMessageEvents(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg)) namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID2 && ev.Status == core.OpStatusSucceeded && ev.Plugin == "ffdx" @@ -573,7 +573,7 @@ func TestMessageEvents(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg)) namespacedID3 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID3 && ev.Status == core.OpStatusSucceeded && ev.DXManifest == `{"manifest":true}` && @@ -617,7 +617,7 @@ func TestBlobEvents(t *testing.T) { assert.NoError(t, err) namespacedID5 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID5 && ev.Status == core.OpStatusFailed && ev.ErrorMessage == "pop" && @@ -628,7 +628,7 @@ func TestBlobEvents(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"5"}`, string(msg)) namespacedID6 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID6 && ev.Status == core.OpStatusSucceeded && ev.Output.String() == `{"some":"details"}` && @@ -650,7 +650,7 @@ func TestBlobEvents(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"9"}`, string(msg)) namespacedID10 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID10 && ev.Status == core.OpStatusSucceeded && ev.Output.String() == `{"signatures":"and stuff"}` && @@ -683,7 +683,7 @@ func TestEventsWithManifest(t *testing.T) { h.SetOperationHandler("ns1", ocb) namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID1 && ev.Status == core.OpStatusPending && ev.Plugin == "ffdx" @@ -693,7 +693,7 @@ func TestEventsWithManifest(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg)) namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID()) - ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool { + ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool { return ev.NamespacedOpID == namespacedID2 && ev.Status == core.OpStatusPending && ev.Plugin == "ffdx" diff --git a/internal/operations/manager.go b/internal/operations/manager.go index 27e67c5dc1..1c9c1ae27d 100644 --- a/internal/operations/manager.go +++ b/internal/operations/manager.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -47,7 +47,8 @@ type Manager interface { ResubmitOperations(ctx context.Context, txID *fftypes.UUID) (total int, resubmit []*core.Operation, err error) AddOrReuseOperation(ctx context.Context, op *core.Operation, hooks ...database.PostCompletionHook) error BulkInsertOperations(ctx context.Context, ops ...*core.Operation) error - SubmitOperationUpdate(update *core.OperationUpdate) + SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error + SubmitOperationUpdate(update *core.OperationUpdateAsync) GetOperationByIDCached(ctx context.Context, opID *fftypes.UUID) (*core.Operation, error) ResolveOperationByID(ctx context.Context, opID *fftypes.UUID, op *core.OperationUpdateDTO) error Start() error @@ -77,6 +78,18 @@ type operationsManager struct { cache cache.CInterface } +// SubmitBulkOperationUpdate implements Manager. +func (om *operationsManager) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error { + for _, update := range updates { + errString := "" + if update.ErrorMessage != "" { + errString = fmt.Sprintf(" error=%s", update.ErrorMessage) + } + log.L(om.ctx).Debugf("%s updating operation %s status=%s%s", update.Plugin, update.NamespacedOpID, update.Status, errString) + } + return om.updater.SubmitBulkOperationUpdates(ctx, updates) +} + func NewOperationsManager(ctx context.Context, ns string, di database.Plugin, txHelper txcommon.Helper, cacheManager cache.Manager) (Manager, error) { if di == nil || txHelper == nil { return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "OperationsManager") @@ -193,12 +206,14 @@ func (om *operationsManager) RunOperation(ctx context.Context, op *core.Prepared // Ok, we're failed failState = core.OpStatusFailed } - om.SubmitOperationUpdate(&core.OperationUpdate{ - NamespacedOpID: op.NamespacedIDString(), - Plugin: op.Plugin, - Status: failState, - ErrorMessage: err.Error(), - Output: outputs, + om.SubmitOperationUpdate(&core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: op.NamespacedIDString(), + Plugin: op.Plugin, + Status: failState, + ErrorMessage: err.Error(), + Output: outputs, + }, }) } else { // No error so move us from "Initialized" to "Pending" @@ -209,11 +224,13 @@ func (om *operationsManager) RunOperation(ctx context.Context, op *core.Prepared newState = core.OpStatusSucceeded } - om.SubmitOperationUpdate(&core.OperationUpdate{ - NamespacedOpID: op.NamespacedIDString(), - Plugin: op.Plugin, - Status: newState, - Output: outputs, + om.SubmitOperationUpdate(&core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: op.NamespacedIDString(), + Plugin: op.Plugin, + Status: newState, + Output: outputs, + }, }) } return outputs, err @@ -284,7 +301,7 @@ func (om *operationsManager) ResolveOperationByID(ctx context.Context, opID *fft return om.updater.resolveOperation(ctx, om.namespace, opID, op.Status, op.Error, op.Output) } -func (om *operationsManager) SubmitOperationUpdate(update *core.OperationUpdate) { +func (om *operationsManager) SubmitOperationUpdate(update *core.OperationUpdateAsync) { errString := "" if update.ErrorMessage != "" { errString = fmt.Sprintf(" error=%s", update.ErrorMessage) diff --git a/internal/operations/manager_test.go b/internal/operations/manager_test.go index 626dc7292f..2be14fdc1b 100644 --- a/internal/operations/manager_test.go +++ b/internal/operations/manager_test.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -186,8 +186,8 @@ func TestRunOperationSyncSuccess(t *testing.T) { om, cancel := newTestOperations(t) defer cancel() - om.updater.workQueues = []chan *core.OperationUpdate{ - make(chan *core.OperationUpdate), + om.updater.workQueues = []chan *core.OperationUpdateAsync{ + make(chan *core.OperationUpdateAsync), } om.updater.cancelFunc() @@ -208,8 +208,8 @@ func TestRunOperationFailIdempotentInit(t *testing.T) { om, cancel := newTestOperations(t) defer cancel() - om.updater.workQueues = []chan *core.OperationUpdate{ - make(chan *core.OperationUpdate, 1), + om.updater.workQueues = []chan *core.OperationUpdateAsync{ + make(chan *core.OperationUpdateAsync, 1), } ctx := context.Background() @@ -236,8 +236,8 @@ func TestRunOperationFailNonIdempotentInit(t *testing.T) { om, cancel := newTestOperations(t) defer cancel() - om.updater.workQueues = []chan *core.OperationUpdate{ - make(chan *core.OperationUpdate, 1), + om.updater.workQueues = []chan *core.OperationUpdateAsync{ + make(chan *core.OperationUpdateAsync, 1), } ctx := context.Background() @@ -264,8 +264,8 @@ func TestRunOperationFailConflict(t *testing.T) { om, cancel := newTestOperations(t) defer cancel() - om.updater.workQueues = []chan *core.OperationUpdate{ - make(chan *core.OperationUpdate, 1), + om.updater.workQueues = []chan *core.OperationUpdateAsync{ + make(chan *core.OperationUpdateAsync, 1), } ctx := context.Background() @@ -289,8 +289,8 @@ func TestRunOperationFailRemainPending(t *testing.T) { om, cancel := newTestOperations(t) defer cancel() - om.updater.workQueues = []chan *core.OperationUpdate{ - make(chan *core.OperationUpdate), + om.updater.workQueues = []chan *core.OperationUpdateAsync{ + make(chan *core.OperationUpdateAsync), } om.updater.cancelFunc() @@ -742,3 +742,159 @@ func TestErrTernaryHelper(t *testing.T) { assert.Equal(t, core.OpPhasePending, ErrTernary(nil, core.OpPhaseInitializing, core.OpPhasePending)) assert.Equal(t, core.OpPhaseInitializing, ErrTernary(fmt.Errorf("pop"), core.OpPhaseInitializing, core.OpPhasePending)) } + +func TestSubmitBulkOperationUpdates(t *testing.T) { + om, cancel := newTestOperations(t) + defer cancel() + + ctx := context.Background() + + operations := make([]*core.Operation, 0) + opID := fftypes.NewUUID() + op := &core.Operation{ + Namespace: "ns1", + ID: opID, + Plugin: "blockchain", + Type: core.OpTypeBlockchainPinBatch, + Status: core.OpStatusInitialized, + } + op2ID := fftypes.NewUUID() + op2 := &core.Operation{ + Namespace: "ns2", + ID: op2ID, + Plugin: "blockchain", + Type: core.OpTypeBlockchainContractDeploy, + Status: core.OpStatusInitialized, + } + operations = append(operations, op, op2) + + submittedUpdate := &core.OperationUpdate{ + NamespacedOpID: "ns1:" + opID.String(), + Status: core.OpStatusSucceeded, + ErrorMessage: "my-error-message", + Plugin: "blockchain", + } + + submittedUpdate2 := &core.OperationUpdate{ + NamespacedOpID: "ns1:" + op2ID.String(), + Status: core.OpStatusSucceeded, + ErrorMessage: "my-error-message", + Plugin: "blockchain", + } + + mdi := om.database.(*databasemocks.Plugin) + mdi.On("GetOperations", ctx, "ns1", mock.Anything).Return(operations, nil, nil) + + mdi.On("UpdateOperation", ctx, "ns1", opID, mock.Anything, mock.Anything).Return(true, nil) + mdi.On("UpdateOperation", ctx, "ns2", op2ID, mock.Anything, mock.Anything).Return(true, nil) + + err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate, submittedUpdate2}) + assert.NoError(t, err) +} + +func TestSubmitBulkOperationUpdatesErrorNoPlugin(t *testing.T) { + om, cancel := newTestOperations(t) + defer cancel() + + ctx := context.Background() + + operations := make([]*core.Operation, 0) + opID := fftypes.NewUUID() + op := &core.Operation{ + ID: opID, + Plugin: "blockchain", + Type: core.OpTypeBlockchainPinBatch, + Status: core.OpStatusInitialized, + } + + operations = append(operations, op) + + submittedUpdate := &core.OperationUpdate{ + NamespacedOpID: "ns1:" + opID.String(), + Status: core.OpStatusSucceeded, + ErrorMessage: "my-error-message", + } + + mdi := om.database.(*databasemocks.Plugin) + mdi.On("GetOperations", ctx, "ns1", mock.Anything).Return(operations, nil, nil) + + err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate}) + assert.Error(t, err) + assert.Regexp(t, "FF10479", err.Error()) +} + +func TestSubmitBulkOperationUpdatesErrorWrongNamespace(t *testing.T) { + om, cancel := newTestOperations(t) + defer cancel() + + ctx := context.Background() + + operations := make([]*core.Operation, 0) + opID := fftypes.NewUUID() + op := &core.Operation{ + ID: opID, + Plugin: "blockchain", + Type: core.OpTypeBlockchainPinBatch, + Status: core.OpStatusInitialized, + } + operations = append(operations, op) + + submittedUpdate := &core.OperationUpdate{ + NamespacedOpID: "different-namespace:" + opID.String(), + Status: core.OpStatusSucceeded, + ErrorMessage: "my-error-message", + Plugin: "blockchain", + } + + err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate}) + assert.Error(t, err) + assert.Regexp(t, "FF10478", err.Error()) +} + +func TestSubmitBulkOperationUpdatesIgnoredBadID(t *testing.T) { + om, cancel := newTestOperations(t) + defer cancel() + + ctx := context.Background() + + submittedUpdate := &core.OperationUpdate{ + NamespacedOpID: "ns1:BAD-UUID", + Status: core.OpStatusSucceeded, + ErrorMessage: "my-error-message", + Plugin: "blockchain", + } + + err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate}) + assert.Error(t, err) + assert.Regexp(t, "FF00138", err.Error()) +} + +func TestSubmitBulkOperationUpdatesError(t *testing.T) { + om, cancel := newTestOperations(t) + defer cancel() + + ctx := context.Background() + + operations := make([]*core.Operation, 0) + opID := fftypes.NewUUID() + op := &core.Operation{ + ID: opID, + Plugin: "blockchain", + Type: core.OpTypeBlockchainPinBatch, + Status: core.OpStatusInitialized, + } + operations = append(operations, op) + + submittedUpdate := &core.OperationUpdate{ + NamespacedOpID: "ns1:" + opID.String(), + Status: core.OpStatusSucceeded, + ErrorMessage: "my-error-message", + Plugin: "blockchain", + } + + mdi := om.database.(*databasemocks.Plugin) + mdi.On("GetOperations", ctx, "ns1", mock.Anything).Return(operations, nil, errors.New("Failed to get operations")) + + err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate}) + assert.Error(t, err) +} diff --git a/internal/operations/operation_updater.go b/internal/operations/operation_updater.go index 9f3b6d3a6f..0def75ea2d 100644 --- a/internal/operations/operation_updater.go +++ b/internal/operations/operation_updater.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -35,7 +35,7 @@ import ( ) type operationUpdaterBatch struct { - updates []*core.OperationUpdate + updates []*core.OperationUpdateAsync timeoutContext context.Context timeoutCancel func() } @@ -47,7 +47,7 @@ type operationUpdater struct { manager *operationsManager database database.Plugin txHelper txcommon.Helper - workQueues []chan *core.OperationUpdate + workQueues []chan *core.OperationUpdateAsync workersDone []chan struct{} conf operationUpdaterConf closed bool @@ -87,13 +87,49 @@ func newOperationUpdater(ctx context.Context, om *operationsManager, di database } // pickWorker ensures multiple updates for the same ID go to the same worker -func (ou *operationUpdater) pickWorker(ctx context.Context, id *fftypes.UUID, update *core.OperationUpdate) chan *core.OperationUpdate { +func (ou *operationUpdater) pickWorker(ctx context.Context, id *fftypes.UUID, update *core.OperationUpdateAsync) chan *core.OperationUpdateAsync { worker := id.HashBucket(ou.conf.workerCount) log.L(ctx).Debugf("Submitting operation update id=%s status=%s blockchainTX=%s worker=opu_%.3d", id, update.Status, update.BlockchainTXID, worker) return ou.workQueues[worker] } -func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *core.OperationUpdate) { +// SubmitBulkOperationUpdates is a synchronous write of batch of operation updates +func (ou *operationUpdater) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error { + validUpdates := []*core.OperationUpdate{} + for _, update := range updates { + ns, _, err := core.ParseNamespacedOpID(ctx, update.NamespacedOpID) + if err != nil { + log.L(ctx).Warnf("Unable to update operation '%s' due to invalid ID: %s", update.NamespacedOpID, err) + return err + } + + if ns != ou.manager.namespace { + log.L(ou.ctx).Errorf("Received operation update from different namespace '%s'", ns) + return i18n.NewError(ctx, coremsgs.MsgInvalidNamespaceForOperationUpdate, ns, ou.manager.namespace) + } + + if update.Plugin == "" { + log.L(ou.ctx).Errorf("Cannot supply empty plugin on operation update '%s'", update.NamespacedOpID) + return i18n.NewError(ctx, coremsgs.MsgEmptyPluginForOperationUpdate, update.NamespacedOpID) + } + + validUpdates = append(validUpdates, update) + } + + // Notice how this is not using the workers and is synchronous + // The reason is because we want for all updates to be stored at once in this order + // If offloaded into workers the updates would be processed in parallel, in different DB TX and in a different order + // Up to the caller to retry if this fails + err := ou.doBatchUpdateAsGroup(ctx, validUpdates) + if err != nil { + log.L(ctx).Warnf("Exiting while updating operations: %s", err) + return err + } + + return nil +} + +func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *core.OperationUpdateAsync) { ns, id, err := core.ParseNamespacedOpID(ctx, update.NamespacedOpID) if err != nil { log.L(ctx).Warnf("Unable to update operation '%s' due to invalid ID: %s", update.NamespacedOpID, err) @@ -119,17 +155,17 @@ func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *c return } // Otherwise do it in-line on this context - err = ou.doBatchUpdateWithRetry(ctx, []*core.OperationUpdate{update}) + err = ou.doBatchUpdateWithRetry(ctx, []*core.OperationUpdateAsync{update}) if err != nil { log.L(ctx).Warnf("Exiting while updating operation: %s", err) } } func (ou *operationUpdater) initQueues() { - ou.workQueues = make([]chan *core.OperationUpdate, ou.conf.workerCount) + ou.workQueues = make([]chan *core.OperationUpdateAsync, ou.conf.workerCount) ou.workersDone = make([]chan struct{}, ou.conf.workerCount) for i := 0; i < ou.conf.workerCount; i++ { - ou.workQueues[i] = make(chan *core.OperationUpdate, ou.conf.queueLength) + ou.workQueues[i] = make(chan *core.OperationUpdateAsync, ou.conf.queueLength) ou.workersDone[i] = make(chan struct{}) } } @@ -181,14 +217,23 @@ func (ou *operationUpdater) updaterLoop(index int) { } } -func (ou *operationUpdater) doBatchUpdateWithRetry(ctx context.Context, updates []*core.OperationUpdate) error { +func (ou *operationUpdater) doBatchUpdateAsGroup(ctx context.Context, updates []*core.OperationUpdate) error { + return ou.database.RunAsGroup(ctx, func(ctx context.Context) error { + return ou.doBatchUpdate(ctx, updates) + }) +} + +func (ou *operationUpdater) doBatchUpdateWithRetry(ctx context.Context, updates []*core.OperationUpdateAsync) error { return ou.retry.Do(ctx, "operation update", func(attempt int) (retry bool, err error) { - err = ou.database.RunAsGroup(ctx, func(ctx context.Context) error { - return ou.doBatchUpdate(ctx, updates) - }) + syncUpdates := []*core.OperationUpdate{} + for _, update := range updates { + syncUpdates = append(syncUpdates, &update.OperationUpdate) + } + err = ou.doBatchUpdateAsGroup(ctx, syncUpdates) if err != nil { return true, err } + for _, update := range updates { if update.OnComplete != nil { update.OnComplete() @@ -199,7 +244,6 @@ func (ou *operationUpdater) doBatchUpdateWithRetry(ctx context.Context, updates } func (ou *operationUpdater) doBatchUpdate(ctx context.Context, updates []*core.OperationUpdate) error { - // Get all the operations that match opIDs := make([]*fftypes.UUID, 0, len(updates)) for _, update := range updates { diff --git a/internal/operations/operation_updater_test.go b/internal/operations/operation_updater_test.go index 6d31db81a7..6a4fd97c57 100644 --- a/internal/operations/operation_updater_test.go +++ b/internal/operations/operation_updater_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -105,24 +105,28 @@ func TestNewOperationUpdaterNoConcurrency(t *testing.T) { func TestSubmitUpdateBadIDIgnored(t *testing.T) { ou := newTestOperationUpdater(t) ou.close() - ou.workQueues = []chan *core.OperationUpdate{ - make(chan *core.OperationUpdate), + ou.workQueues = []chan *core.OperationUpdateAsync{ + make(chan *core.OperationUpdateAsync), } ou.cancelFunc() - ou.SubmitOperationUpdate(ou.ctx, &core.OperationUpdate{ - NamespacedOpID: "!!!" + fftypes.NewUUID().String(), + ou.SubmitOperationUpdate(ou.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "!!!" + fftypes.NewUUID().String(), + }, }) } func TestSubmitUpdateClosed(t *testing.T) { ou := newTestOperationUpdater(t) ou.close() - ou.workQueues = []chan *core.OperationUpdate{ - make(chan *core.OperationUpdate), + ou.workQueues = []chan *core.OperationUpdateAsync{ + make(chan *core.OperationUpdateAsync), } ou.cancelFunc() - ou.SubmitOperationUpdate(ou.ctx, &core.OperationUpdate{ - NamespacedOpID: "ns1:" + fftypes.NewUUID().String(), + ou.SubmitOperationUpdate(ou.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "ns1:" + fftypes.NewUUID().String(), + }, }) } @@ -139,9 +143,11 @@ func TestSubmitUpdateSyncFallbackOpNotFound(t *testing.T) { mdi.On("GetOperations", customCtx, mock.Anything, mock.Anything).Return(nil, nil, nil) complete := false - ou.SubmitOperationUpdate(customCtx, &core.OperationUpdate{ - NamespacedOpID: "ns1:" + fftypes.NewUUID().String(), - OnComplete: func() { complete = true }, + ou.SubmitOperationUpdate(customCtx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "ns1:" + fftypes.NewUUID().String(), + }, + OnComplete: func() { complete = true }, }) assert.True(t, complete) @@ -158,8 +164,10 @@ func TestSubmitUpdateDatabaseError(t *testing.T) { mdi := ou.database.(*databasemocks.Plugin) mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - ou.SubmitOperationUpdate(ctx, &core.OperationUpdate{ - NamespacedOpID: "ns1:" + fftypes.NewUUID().String(), + ou.SubmitOperationUpdate(ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "ns1:" + fftypes.NewUUID().String(), + }, }) mdi.AssertExpectations(t) @@ -170,8 +178,10 @@ func TestSubmitUpdateWrongNS(t *testing.T) { defer ou.close() customCtx := context.WithValue(context.Background(), "dbtx", "on this context") - ou.SubmitOperationUpdate(customCtx, &core.OperationUpdate{ - NamespacedOpID: "ns2:" + fftypes.NewUUID().String(), + ou.SubmitOperationUpdate(customCtx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "ns2:" + fftypes.NewUUID().String(), + }, }) } @@ -218,21 +228,27 @@ func TestSubmitUpdateWorkerE2ESuccess(t *testing.T) { }).Once() om.Start() - om.SubmitOperationUpdate(&core.OperationUpdate{ - NamespacedOpID: "ns1:" + opID1.String(), - Status: core.OpStatusSucceeded, - BlockchainTXID: "tx12345", + om.SubmitOperationUpdate(&core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "ns1:" + opID1.String(), + Status: core.OpStatusSucceeded, + BlockchainTXID: "tx12345", + }, }) - om.SubmitOperationUpdate(&core.OperationUpdate{ - NamespacedOpID: "ns1:" + opID2.String(), - Status: core.OpStatusFailed, - ErrorMessage: "err1", - Output: fftypes.JSONObject{"test": true}, + om.SubmitOperationUpdate(&core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "ns1:" + opID2.String(), + Status: core.OpStatusFailed, + ErrorMessage: "err1", + Output: fftypes.JSONObject{"test": true}, + }, }) - om.SubmitOperationUpdate(&core.OperationUpdate{ - NamespacedOpID: "ns1:" + opID3.String(), - Status: core.OpStatusFailed, - ErrorMessage: "err2", + om.SubmitOperationUpdate(&core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "ns1:" + opID3.String(), + Status: core.OpStatusFailed, + ErrorMessage: "err2", + }, }) <-done @@ -250,8 +266,10 @@ func TestUpdateLoopExitRetryCancelledContext(t *testing.T) { ou.cancelFunc() }) - ou.SubmitOperationUpdate(ou.ctx, &core.OperationUpdate{ - NamespacedOpID: "ns1:" + fftypes.NewUUID().String(), + ou.SubmitOperationUpdate(ou.ctx, &core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: "ns1:" + fftypes.NewUUID().String(), + }, }) ou.updaterLoop(0) diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index 5fc3c3df79..65039b6010 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -41,7 +41,11 @@ func (bc *boundCallbacks) checkStopped() error { return nil } -func (bc *boundCallbacks) OperationUpdate(update *core.OperationUpdate) { +func (bc *boundCallbacks) BulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error { + return bc.o.operations.SubmitBulkOperationUpdates(ctx, updates) +} + +func (bc *boundCallbacks) OperationUpdate(update *core.OperationUpdateAsync) { bc.o.operations.SubmitOperationUpdate(update) } diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 06bd633831..5dc1b83808 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -68,15 +68,23 @@ func TestBoundCallbacks(t *testing.T) { nsOpID := "ns1:" + opID.String() dataID := fftypes.NewUUID() - update := &core.OperationUpdate{ + operationUpdate := &core.OperationUpdate{ NamespacedOpID: nsOpID, Status: core.OpStatusFailed, BlockchainTXID: "0xffffeeee", ErrorMessage: "error info", Output: info, } - mom.On("SubmitOperationUpdate", update).Return().Once() - bc.OperationUpdate(update) + operationUpdateAsync := &core.OperationUpdateAsync{ + OperationUpdate: *operationUpdate, + } + mom.On("SubmitOperationUpdate", operationUpdateAsync).Return().Once() + bc.OperationUpdate(operationUpdateAsync) + + ctx := context.Background() + updates := []*core.OperationUpdate{operationUpdate} + mom.On("SubmitBulkOperationUpdates", ctx, updates).Return(nil).Once() + bc.BulkOperationUpdates(ctx, updates) mei.On("SharedStorageBatchDownloaded", mss, "payload1", []byte(`{}`)).Return(nil, fmt.Errorf("pop")) _, err := bc.SharedStorageBatchDownloaded("payload1", []byte(`{}`)) diff --git a/internal/shareddownload/download_worker.go b/internal/shareddownload/download_worker.go index cae26e501c..68409aa9fe 100644 --- a/internal/shareddownload/download_worker.go +++ b/internal/shareddownload/download_worker.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -63,11 +63,13 @@ func (dw *downloadWorker) attemptWork(work *downloadWork) { if err != nil { log.L(dw.ctx).Errorf("Download operation %s/%s attempt=%d/%d failed: %s", work.preparedOp.Type, work.preparedOp.ID, work.attempts, dw.dm.retryMaxAttempts, err) if isLastAttempt { - dw.dm.operations.SubmitOperationUpdate(&core.OperationUpdate{ - NamespacedOpID: work.preparedOp.NamespacedIDString(), - Plugin: work.preparedOp.Plugin, - Status: core.OpStatusFailed, - ErrorMessage: err.Error(), + dw.dm.operations.SubmitOperationUpdate(&core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + NamespacedOpID: work.preparedOp.NamespacedIDString(), + Plugin: work.preparedOp.Plugin, + Status: core.OpStatusFailed, + ErrorMessage: err.Error(), + }, }) } else { go dw.dm.waitAndRetryDownload(work) diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 2f07077b2a..30b5a4b884 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -74,13 +74,15 @@ type callbacks struct { func (cb *callbacks) OperationUpdate(ctx context.Context, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) if handler, ok := cb.opHandlers[namespace]; ok { - handler.OperationUpdate(&core.OperationUpdate{ - Plugin: cb.plugin.Name(), - NamespacedOpID: nsOpID, - Status: status, - BlockchainTXID: blockchainTXID, - ErrorMessage: errorMessage, - Output: opOutput, + handler.OperationUpdate(&core.OperationUpdateAsync{ + OperationUpdate: core.OperationUpdate{ + Plugin: cb.plugin.Name(), + NamespacedOpID: nsOpID, + Status: status, + BlockchainTXID: blockchainTXID, + ErrorMessage: errorMessage, + Output: opOutput, + }, }) } else { log.L(ctx).Errorf("No handler found for token operation '%s'", nsOpID) diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 886e1d80cc..56d693998f 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -1032,7 +1032,7 @@ func TestReceiptEvents(t *testing.T) { mockCalled := make(chan bool) // receipt: bad ID - passed through - mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:wrong" && update.Status == core.OpStatusPending && update.Plugin == "fftokens" @@ -1050,7 +1050,7 @@ func TestReceiptEvents(t *testing.T) { <-mockCalled // receipt: success - mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+opID.String() && update.Status == core.OpStatusSucceeded && update.BlockchainTXID == "0xffffeeee" && @@ -1070,7 +1070,7 @@ func TestReceiptEvents(t *testing.T) { <-mockCalled // receipt: update - mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+opID.String() && update.Status == core.OpStatusPending && update.BlockchainTXID == "0xffffeeee" @@ -1089,7 +1089,7 @@ func TestReceiptEvents(t *testing.T) { <-mockCalled // receipt: failure - mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool { + mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool { return update.NamespacedOpID == "ns1:"+opID.String() && update.Status == core.OpStatusFailed && update.BlockchainTXID == "0xffffeeee" && diff --git a/mocks/apiservermocks/ffi_swagger_gen.go b/mocks/apiservermocks/ffi_swagger_gen.go index 08a9311f59..8a6beb8f47 100644 --- a/mocks/apiservermocks/ffi_swagger_gen.go +++ b/mocks/apiservermocks/ffi_swagger_gen.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package apiservermocks diff --git a/mocks/apiservermocks/server.go b/mocks/apiservermocks/server.go index 71247dfb95..209e30aabc 100644 --- a/mocks/apiservermocks/server.go +++ b/mocks/apiservermocks/server.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package apiservermocks diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 3f2788b224..95390c40f2 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package assetmocks diff --git a/mocks/batchmocks/manager.go b/mocks/batchmocks/manager.go index a8ceb379e8..f0b548261f 100644 --- a/mocks/batchmocks/manager.go +++ b/mocks/batchmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package batchmocks diff --git a/mocks/blockchaincommonmocks/firefly_subscriptions.go b/mocks/blockchaincommonmocks/firefly_subscriptions.go index 835d36edfc..534b555daf 100644 --- a/mocks/blockchaincommonmocks/firefly_subscriptions.go +++ b/mocks/blockchaincommonmocks/firefly_subscriptions.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package blockchaincommonmocks diff --git a/mocks/blockchainmocks/callbacks.go b/mocks/blockchainmocks/callbacks.go index e4cd093a95..581314a026 100644 --- a/mocks/blockchainmocks/callbacks.go +++ b/mocks/blockchainmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package blockchainmocks diff --git a/mocks/blockchainmocks/plugin.go b/mocks/blockchainmocks/plugin.go index 1a8cc47286..ed0b41b049 100644 --- a/mocks/blockchainmocks/plugin.go +++ b/mocks/blockchainmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package blockchainmocks diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index 957f64e75d..43f49a9db5 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package broadcastmocks diff --git a/mocks/cachemocks/manager.go b/mocks/cachemocks/manager.go index ed923f1558..892a94f2e4 100644 --- a/mocks/cachemocks/manager.go +++ b/mocks/cachemocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package cachemocks diff --git a/mocks/contractmocks/manager.go b/mocks/contractmocks/manager.go index 06c26e9343..32654a420c 100644 --- a/mocks/contractmocks/manager.go +++ b/mocks/contractmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package contractmocks diff --git a/mocks/coremocks/operation_callbacks.go b/mocks/coremocks/operation_callbacks.go index 0e6acabc5d..7d00fed033 100644 --- a/mocks/coremocks/operation_callbacks.go +++ b/mocks/coremocks/operation_callbacks.go @@ -1,8 +1,10 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package coremocks import ( + context "context" + core "github.com/hyperledger/firefly/pkg/core" mock "github.com/stretchr/testify/mock" ) @@ -12,8 +14,26 @@ type OperationCallbacks struct { mock.Mock } +// BulkOperationUpdates provides a mock function with given fields: ctx, updates +func (_m *OperationCallbacks) BulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error { + ret := _m.Called(ctx, updates) + + if len(ret) == 0 { + panic("no return value specified for BulkOperationUpdates") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []*core.OperationUpdate) error); ok { + r0 = rf(ctx, updates) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // OperationUpdate provides a mock function with given fields: update -func (_m *OperationCallbacks) OperationUpdate(update *core.OperationUpdate) { +func (_m *OperationCallbacks) OperationUpdate(update *core.OperationUpdateAsync) { _m.Called(update) } diff --git a/mocks/databasemocks/callbacks.go b/mocks/databasemocks/callbacks.go index 4bb148b577..f225311cfd 100644 --- a/mocks/databasemocks/callbacks.go +++ b/mocks/databasemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package databasemocks diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index 75f9abae40..31da62ec00 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package databasemocks diff --git a/mocks/dataexchangemocks/callbacks.go b/mocks/dataexchangemocks/callbacks.go index e39b409a0d..b9bc754fb7 100644 --- a/mocks/dataexchangemocks/callbacks.go +++ b/mocks/dataexchangemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package dataexchangemocks diff --git a/mocks/dataexchangemocks/dx_event.go b/mocks/dataexchangemocks/dx_event.go index 2ee5e469f6..a5d70b9a34 100644 --- a/mocks/dataexchangemocks/dx_event.go +++ b/mocks/dataexchangemocks/dx_event.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package dataexchangemocks diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index ecd545651e..c3a116fe65 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package dataexchangemocks diff --git a/mocks/datamocks/manager.go b/mocks/datamocks/manager.go index ef595ebd69..12725eeef1 100644 --- a/mocks/datamocks/manager.go +++ b/mocks/datamocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package datamocks diff --git a/mocks/definitionsmocks/handler.go b/mocks/definitionsmocks/handler.go index c4d2ba66f4..c7c5b38576 100644 --- a/mocks/definitionsmocks/handler.go +++ b/mocks/definitionsmocks/handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package definitionsmocks diff --git a/mocks/definitionsmocks/sender.go b/mocks/definitionsmocks/sender.go index 23e487e476..d4720e15c7 100644 --- a/mocks/definitionsmocks/sender.go +++ b/mocks/definitionsmocks/sender.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package definitionsmocks diff --git a/mocks/eventmocks/event_manager.go b/mocks/eventmocks/event_manager.go index a5fad4a769..6558286a48 100644 --- a/mocks/eventmocks/event_manager.go +++ b/mocks/eventmocks/event_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package eventmocks diff --git a/mocks/eventsmocks/callbacks.go b/mocks/eventsmocks/callbacks.go index f4ec135b9c..8b607a1ea8 100644 --- a/mocks/eventsmocks/callbacks.go +++ b/mocks/eventsmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package eventsmocks diff --git a/mocks/eventsmocks/plugin.go b/mocks/eventsmocks/plugin.go index 2efc0a2425..48e0b2363e 100644 --- a/mocks/eventsmocks/plugin.go +++ b/mocks/eventsmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package eventsmocks diff --git a/mocks/identitymanagermocks/manager.go b/mocks/identitymanagermocks/manager.go index 51b648ea92..c814b9d858 100644 --- a/mocks/identitymanagermocks/manager.go +++ b/mocks/identitymanagermocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package identitymanagermocks diff --git a/mocks/identitymocks/callbacks.go b/mocks/identitymocks/callbacks.go index f3f9946f8f..49e2334c46 100644 --- a/mocks/identitymocks/callbacks.go +++ b/mocks/identitymocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package identitymocks diff --git a/mocks/identitymocks/plugin.go b/mocks/identitymocks/plugin.go index bdb0e52b8d..62c596a587 100644 --- a/mocks/identitymocks/plugin.go +++ b/mocks/identitymocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package identitymocks diff --git a/mocks/metricsmocks/manager.go b/mocks/metricsmocks/manager.go index 33a5e7bd78..e2b261d888 100644 --- a/mocks/metricsmocks/manager.go +++ b/mocks/metricsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package metricsmocks diff --git a/mocks/multipartymocks/manager.go b/mocks/multipartymocks/manager.go index 65cf1ea118..c4d78d8d64 100644 --- a/mocks/multipartymocks/manager.go +++ b/mocks/multipartymocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package multipartymocks diff --git a/mocks/namespacemocks/manager.go b/mocks/namespacemocks/manager.go index fa6e9c5b39..1c29f3c8fd 100644 --- a/mocks/namespacemocks/manager.go +++ b/mocks/namespacemocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package namespacemocks diff --git a/mocks/networkmapmocks/manager.go b/mocks/networkmapmocks/manager.go index 2751287852..1231c3d08b 100644 --- a/mocks/networkmapmocks/manager.go +++ b/mocks/networkmapmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package networkmapmocks diff --git a/mocks/operationmocks/manager.go b/mocks/operationmocks/manager.go index 98a2a22fed..9d3fcbe15c 100644 --- a/mocks/operationmocks/manager.go +++ b/mocks/operationmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package operationmocks @@ -268,8 +268,26 @@ func (_m *Manager) Start() error { return r0 } +// SubmitBulkOperationUpdates provides a mock function with given fields: ctx, updates +func (_m *Manager) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error { + ret := _m.Called(ctx, updates) + + if len(ret) == 0 { + panic("no return value specified for SubmitBulkOperationUpdates") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []*core.OperationUpdate) error); ok { + r0 = rf(ctx, updates) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // SubmitOperationUpdate provides a mock function with given fields: update -func (_m *Manager) SubmitOperationUpdate(update *core.OperationUpdate) { +func (_m *Manager) SubmitOperationUpdate(update *core.OperationUpdateAsync) { _m.Called(update) } diff --git a/mocks/orchestratormocks/orchestrator.go b/mocks/orchestratormocks/orchestrator.go index 6f94f78900..2027fa90db 100644 --- a/mocks/orchestratormocks/orchestrator.go +++ b/mocks/orchestratormocks/orchestrator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package orchestratormocks diff --git a/mocks/privatemessagingmocks/manager.go b/mocks/privatemessagingmocks/manager.go index 8988ca5859..5ba8bedce8 100644 --- a/mocks/privatemessagingmocks/manager.go +++ b/mocks/privatemessagingmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package privatemessagingmocks diff --git a/mocks/shareddownloadmocks/callbacks.go b/mocks/shareddownloadmocks/callbacks.go index 26bcd708f3..f5ec29ae16 100644 --- a/mocks/shareddownloadmocks/callbacks.go +++ b/mocks/shareddownloadmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package shareddownloadmocks diff --git a/mocks/shareddownloadmocks/manager.go b/mocks/shareddownloadmocks/manager.go index 33226238f6..53c98a53ee 100644 --- a/mocks/shareddownloadmocks/manager.go +++ b/mocks/shareddownloadmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package shareddownloadmocks diff --git a/mocks/sharedstoragemocks/callbacks.go b/mocks/sharedstoragemocks/callbacks.go index 6296a2b7c8..ee6a6afad3 100644 --- a/mocks/sharedstoragemocks/callbacks.go +++ b/mocks/sharedstoragemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package sharedstoragemocks diff --git a/mocks/sharedstoragemocks/plugin.go b/mocks/sharedstoragemocks/plugin.go index a48e274235..4b7e61bde0 100644 --- a/mocks/sharedstoragemocks/plugin.go +++ b/mocks/sharedstoragemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package sharedstoragemocks diff --git a/mocks/spieventsmocks/manager.go b/mocks/spieventsmocks/manager.go index a61707c9aa..111ca05dff 100644 --- a/mocks/spieventsmocks/manager.go +++ b/mocks/spieventsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package spieventsmocks diff --git a/mocks/syncasyncmocks/bridge.go b/mocks/syncasyncmocks/bridge.go index bce0a36631..a5fe867ceb 100644 --- a/mocks/syncasyncmocks/bridge.go +++ b/mocks/syncasyncmocks/bridge.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncasyncmocks diff --git a/mocks/syncasyncmocks/sender.go b/mocks/syncasyncmocks/sender.go index fd4bafa34f..41d2da4e0f 100644 --- a/mocks/syncasyncmocks/sender.go +++ b/mocks/syncasyncmocks/sender.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncasyncmocks diff --git a/mocks/systemeventmocks/event_interface.go b/mocks/systemeventmocks/event_interface.go index dd0d4398ef..24650a858e 100644 --- a/mocks/systemeventmocks/event_interface.go +++ b/mocks/systemeventmocks/event_interface.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package systemeventmocks diff --git a/mocks/tokenmocks/callbacks.go b/mocks/tokenmocks/callbacks.go index 36a362a344..da55fa787d 100644 --- a/mocks/tokenmocks/callbacks.go +++ b/mocks/tokenmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package tokenmocks diff --git a/mocks/tokenmocks/plugin.go b/mocks/tokenmocks/plugin.go index b3292e799c..ea61d3d2a4 100644 --- a/mocks/tokenmocks/plugin.go +++ b/mocks/tokenmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package tokenmocks diff --git a/mocks/txcommonmocks/helper.go b/mocks/txcommonmocks/helper.go index 4fbbc91931..57ab913d65 100644 --- a/mocks/txcommonmocks/helper.go +++ b/mocks/txcommonmocks/helper.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package txcommonmocks diff --git a/mocks/txwritermocks/writer.go b/mocks/txwritermocks/writer.go index 5358086e05..811a440907 100644 --- a/mocks/txwritermocks/writer.go +++ b/mocks/txwritermocks/writer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package txwritermocks diff --git a/mocks/websocketsmocks/web_sockets_namespaced.go b/mocks/websocketsmocks/web_sockets_namespaced.go index 2a036a3b1a..6c32559991 100644 --- a/mocks/websocketsmocks/web_sockets_namespaced.go +++ b/mocks/websocketsmocks/web_sockets_namespaced.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package websocketsmocks diff --git a/mocks/wsmocks/ws_client.go b/mocks/wsmocks/ws_client.go index fccb7288b7..1a75210045 100644 --- a/mocks/wsmocks/ws_client.go +++ b/mocks/wsmocks/ws_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package wsmocks diff --git a/pkg/core/operation.go b/pkg/core/operation.go index ff7a39607f..6cbd884ba3 100644 --- a/pkg/core/operation.go +++ b/pkg/core/operation.go @@ -1,4 +1,4 @@ -// Copyright © 2024 Kaleido, Inc. +// Copyright © 2025 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -244,7 +244,9 @@ func ParseNamespacedOpID(ctx context.Context, nsIDStr string) (string, *fftypes. } type OperationCallbacks interface { - OperationUpdate(update *OperationUpdate) + // This is an asynchronous update and you can use onComplete to signal the update has been committed to the DB through a function call + OperationUpdate(update *OperationUpdateAsync) + BulkOperationUpdates(ctx context.Context, updates []*OperationUpdate) error } // OperationUpdate notifies FireFly of an update to an operation. @@ -262,7 +264,11 @@ type OperationUpdate struct { VerifyManifest bool DXManifest string DXHash string - OnComplete func() +} + +type OperationUpdateAsync struct { + OperationUpdate + OnComplete func() } type OperationDetailError struct {