Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion internal/blockchain/common/common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -43,6 +43,11 @@ type BlockchainCallbacks interface {
SetHandler(namespace string, handler blockchain.Callbacks)
SetOperationalHandler(namespace string, handler core.OperationCallbacks)

// BulkOperationUpdates is a synchronous way to update multiple operations and will return when the updates have been committed to the database or there has been an error
// An insertion ordering guarantee is only provided when this code is called on a single goroutine inside of the connector.
// It is the responsibility of the connector code to allocate that routine, and ensure that there is only one.
BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error

OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject)
// Common logic for parsing a BatchPinOrNetworkAction event, and if not discarded to add it to the by-namespace map
PrepareBatchPinOrNetworkAction(ctx context.Context, events EventsToDispatch, subInfo *SubscriptionInfo, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef, params *BatchPinParams)
Expand All @@ -64,6 +69,17 @@ type callbacks struct {
opHandlers map[string]core.OperationCallbacks
}

// BulkOperationUpdates implements BlockchainCallbacks.
func (cb *callbacks) BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error {
if handler, ok := cb.opHandlers[namespace]; ok {
return handler.BulkOperationUpdates(ctx, updates)
}
// We don't want to error as it just means this update was not for this namespace
// This is unlikely to happen in practice, as the namespace is always passed in the operation handler
log.L(ctx).Errorf("No operation handler found for namespace '%s'", namespace)
return nil
}

type subscriptions struct {
subs map[string]*SubscriptionInfo
}
Expand Down
51 changes: 51 additions & 0 deletions internal/blockchain/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,3 +455,54 @@ func TestErrorWrappingNonConflict(t *testing.T) {
_, conforms := err.(operations.ConflictError)
assert.False(t, conforms)
}

func TestCallbackBulkOperationUpdate(t *testing.T) {
nsOpID := "ns1:" + fftypes.NewUUID().String()
nsOpID2 := "ns1:" + fftypes.NewUUID().String()

mbi := &blockchainmocks.Plugin{}
mcb := &coremocks.OperationCallbacks{}
cb := NewBlockchainCallbacks()
cb.SetOperationalHandler("ns1", mcb)

mbi.On("Name").Return("utblockchain")
mcb.On("BulkOperationUpdates", mock.Anything, mock.MatchedBy(func(updates []*core.OperationUpdate) bool {
assert.True(t, updates[0].NamespacedOpID == nsOpID &&
updates[0].Status == core.OpStatusSucceeded &&
updates[0].BlockchainTXID == "tx1" &&
updates[0].ErrorMessage == "err" &&
updates[0].Plugin == "utblockchain")

assert.True(t, updates[1].NamespacedOpID == nsOpID2 &&
updates[1].Status == core.OpStatusSucceeded &&
updates[1].BlockchainTXID == "tx2" &&
updates[1].ErrorMessage == "err" &&
updates[1].Plugin == "utblockchain")

return true
})).Return(nil).Once()

cb.BulkOperationUpdates(context.Background(), "ns1", []*core.OperationUpdate{
{
NamespacedOpID: nsOpID,
Status: core.OpStatusSucceeded,
BlockchainTXID: "tx1",
ErrorMessage: "err",
Output: fftypes.JSONObject{},
Plugin: "utblockchain",
},
{
NamespacedOpID: nsOpID2,
Status: core.OpStatusSucceeded,
BlockchainTXID: "tx2",
ErrorMessage: "err",
Output: fftypes.JSONObject{},
Plugin: "utblockchain",
},
})

// No Handler
cb.BulkOperationUpdates(context.Background(), "ns2", []*core.OperationUpdate{})

mcb.AssertExpectations(t)
}
15 changes: 14 additions & 1 deletion internal/operations/manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -47,6 +47,7 @@ type Manager interface {
ResubmitOperations(ctx context.Context, txID *fftypes.UUID) (total int, resubmit []*core.Operation, err error)
AddOrReuseOperation(ctx context.Context, op *core.Operation, hooks ...database.PostCompletionHook) error
BulkInsertOperations(ctx context.Context, ops ...*core.Operation) error
SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error
SubmitOperationUpdate(update *core.OperationUpdate)
GetOperationByIDCached(ctx context.Context, opID *fftypes.UUID) (*core.Operation, error)
ResolveOperationByID(ctx context.Context, opID *fftypes.UUID, op *core.OperationUpdateDTO) error
Expand Down Expand Up @@ -77,6 +78,18 @@ type operationsManager struct {
cache cache.CInterface
}

// SubmitBulkOperationUpdate implements Manager.
func (om *operationsManager) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error {
for _, update := range updates {
errString := ""
if update.ErrorMessage != "" {
errString = fmt.Sprintf(" error=%s", update.ErrorMessage)
}
log.L(om.ctx).Debugf("%s updating operation %s status=%s%s", update.Plugin, update.NamespacedOpID, update.Status, errString)
}
return om.updater.SubmitBulkOperationUpdates(ctx, updates)
}

func NewOperationsManager(ctx context.Context, ns string, di database.Plugin, txHelper txcommon.Helper, cacheManager cache.Manager) (Manager, error) {
if di == nil || txHelper == nil {
return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "OperationsManager")
Expand Down
132 changes: 132 additions & 0 deletions internal/operations/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,3 +742,135 @@ func TestErrTernaryHelper(t *testing.T) {
assert.Equal(t, core.OpPhasePending, ErrTernary(nil, core.OpPhaseInitializing, core.OpPhasePending))
assert.Equal(t, core.OpPhaseInitializing, ErrTernary(fmt.Errorf("pop"), core.OpPhaseInitializing, core.OpPhasePending))
}

func TestSubmitBulkOperationUpdates(t *testing.T) {
om, cancel := newTestOperations(t)
defer cancel()

om.updater.workQueues = []chan *core.OperationUpdate{
make(chan *core.OperationUpdate, 1),
}

ctx := context.Background()

operations := make([]*core.Operation, 0)
opID := fftypes.NewUUID()
op := &core.Operation{
ID: opID,
Plugin: "blockchain",
Type: core.OpTypeBlockchainPinBatch,
Status: core.OpStatusInitialized,
}
op2ID := fftypes.NewUUID()
op2 := &core.Operation{
ID: op2ID,
Plugin: "blockchain",
Type: core.OpTypeBlockchainContractDeploy,
Status: core.OpStatusInitialized,
}
operations = append(operations, op, op2)

submittedUpdate := &core.OperationUpdate{
NamespacedOpID: "ns1:" + opID.String(),
Status: core.OpStatusSucceeded,
ErrorMessage: "my-error-message",
}

submittedUpdate2 := &core.OperationUpdate{
NamespacedOpID: "ns1:" + op2ID.String(),
Status: core.OpStatusSucceeded,
ErrorMessage: "my-error-message",
}

mdi := om.database.(*databasemocks.Plugin)
mdi.On("GetOperations", ctx, "ns1", mock.Anything).Return(operations, nil, nil)

err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate, submittedUpdate2})
assert.NoError(t, err)
}

func TestSubmitBulkOperationUpdatesIgnoredUpdate(t *testing.T) {
om, cancel := newTestOperations(t)
defer cancel()

om.updater.workQueues = []chan *core.OperationUpdate{
make(chan *core.OperationUpdate, 1),
}

ctx := context.Background()

operations := make([]*core.Operation, 0)
opID := fftypes.NewUUID()
op := &core.Operation{
ID: opID,
Plugin: "blockchain",
Type: core.OpTypeBlockchainPinBatch,
Status: core.OpStatusInitialized,
}
operations = append(operations, op)

submittedUpdate := &core.OperationUpdate{
NamespacedOpID: "different-namespace:" + opID.String(),
Status: core.OpStatusSucceeded,
ErrorMessage: "my-error-message",
Plugin: "blockchain",
}

err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate})
assert.NoError(t, err)
}

func TestSubmitBulkOperationUpdatesIgnoredBadID(t *testing.T) {
om, cancel := newTestOperations(t)
defer cancel()

om.updater.workQueues = []chan *core.OperationUpdate{
make(chan *core.OperationUpdate, 1),
}

ctx := context.Background()

submittedUpdate := &core.OperationUpdate{
NamespacedOpID: "ns1:BAD-UUID",
Status: core.OpStatusSucceeded,
ErrorMessage: "my-error-message",
Plugin: "blockchain",
}

err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate})
assert.NoError(t, err)
}

func TestSubmitBulkOperationUpdatesError(t *testing.T) {
om, cancel := newTestOperations(t)
defer cancel()

om.updater.workQueues = []chan *core.OperationUpdate{
make(chan *core.OperationUpdate, 1),
}

ctx := context.Background()

operations := make([]*core.Operation, 0)
opID := fftypes.NewUUID()
op := &core.Operation{
ID: opID,
Plugin: "blockchain",
Type: core.OpTypeBlockchainPinBatch,
Status: core.OpStatusInitialized,
}
operations = append(operations, op)

submittedUpdate := &core.OperationUpdate{
NamespacedOpID: "ns1:" + opID.String(),
Status: core.OpStatusSucceeded,
ErrorMessage: "my-error-message",
Plugin: "blockchain",
}

mdi := om.database.(*databasemocks.Plugin)
mdi.On("GetOperations", ctx, "ns1", mock.Anything).Return(operations, nil, errors.New("Failed to get operations"))

err := om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate})
assert.Error(t, err)
}
58 changes: 48 additions & 10 deletions internal/operations/operation_updater.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -93,6 +93,37 @@ func (ou *operationUpdater) pickWorker(ctx context.Context, id *fftypes.UUID, up
return ou.workQueues[worker]
}

// SubmitBulkOperationUpdates will wait for the commit to DB before calling the onCommit
func (ou *operationUpdater) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error {
validUpdates := []*core.OperationUpdate{}
for _, update := range updates {
ns, _, err := core.ParseNamespacedOpID(ctx, update.NamespacedOpID)
if err != nil {
log.L(ctx).Warnf("Unable to update operation '%s' due to invalid ID: %s", update.NamespacedOpID, err)
continue
}

if ns != ou.manager.namespace {
log.L(ou.ctx).Debugf("Ignoring operation update from different namespace '%s'", ns)
continue
}

validUpdates = append(validUpdates, update)
}

// Notice how this is not using the workers and is synchronous
// The reason is because we want for all updates to be stored at once in this order
// If offloaded into workers the updates would be processed in parallel, in different DB TX and in a different order
// Up to the caller to retry if this fails
err := ou.doBatchUpdateAsGroup(ctx, validUpdates)
if err != nil {
log.L(ctx).Warnf("Exiting while updating operations: %s", err)
return err
}

return nil
}

func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *core.OperationUpdate) {
ns, id, err := core.ParseNamespacedOpID(ctx, update.NamespacedOpID)
if err != nil {
Expand Down Expand Up @@ -181,25 +212,32 @@ func (ou *operationUpdater) updaterLoop(index int) {
}
}

func (ou *operationUpdater) doBatchUpdateAsGroup(ctx context.Context, updates []*core.OperationUpdate) error {
err := ou.database.RunAsGroup(ctx, func(ctx context.Context) error {
return ou.doBatchUpdate(ctx, updates)
})
if err != nil {
return err
}
for _, update := range updates {
if update.OnComplete != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this in practice now?

Copy link
Contributor Author

@EnriqueL8 EnriqueL8 Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I struggled with this one, it's still available in the struct core.OperationUpdate but it doesn't serve any purpose as we are returning synchronously so the top connector go routine can do what it needs when the updates have been committed.

So we can remove but we need to leave it for the existing flow and just comment on this flow that it will not be used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why @EnriqueL8 ?
Because you think there's other stuff on core.OperationUpdate that needs to be on this batch API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of the fields in core.OperationUpdate are needed in this batch API so yes we could create a new type that has that subset of fields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @peterbroadhurst for the nudge and suggestion - move to OperationUpdateAsync and OperationUpdate

update.OnComplete()
}
}
return nil
}

func (ou *operationUpdater) doBatchUpdateWithRetry(ctx context.Context, updates []*core.OperationUpdate) error {
return ou.retry.Do(ctx, "operation update", func(attempt int) (retry bool, err error) {
err = ou.database.RunAsGroup(ctx, func(ctx context.Context) error {
return ou.doBatchUpdate(ctx, updates)
})
err = ou.doBatchUpdateAsGroup(ctx, updates)
if err != nil {
return true, err
}
for _, update := range updates {
if update.OnComplete != nil {
update.OnComplete()
}
}
return false, nil
})
}

func (ou *operationUpdater) doBatchUpdate(ctx context.Context, updates []*core.OperationUpdate) error {

// Get all the operations that match
opIDs := make([]*fftypes.UUID, 0, len(updates))
for _, update := range updates {
Expand Down
6 changes: 5 additions & 1 deletion internal/orchestrator/bound_callbacks.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -41,6 +41,10 @@ func (bc *boundCallbacks) checkStopped() error {
return nil
}

func (bc *boundCallbacks) BulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate) error {
return bc.o.operations.SubmitBulkOperationUpdates(ctx, updates)
}

func (bc *boundCallbacks) OperationUpdate(update *core.OperationUpdate) {
bc.o.operations.SubmitOperationUpdate(update)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/orchestrator/bound_callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func TestBoundCallbacks(t *testing.T) {
mom.On("SubmitOperationUpdate", update).Return().Once()
bc.OperationUpdate(update)

ctx := context.Background()
updates := []*core.OperationUpdate{update}
mom.On("SubmitBulkOperationUpdates", ctx, updates).Return(nil).Once()
bc.BulkOperationUpdates(ctx, updates)

mei.On("SharedStorageBatchDownloaded", mss, "payload1", []byte(`{}`)).Return(nil, fmt.Errorf("pop"))
_, err := bc.SharedStorageBatchDownloaded("payload1", []byte(`{}`))
assert.EqualError(t, err, "pop")
Expand Down
2 changes: 1 addition & 1 deletion mocks/apiservermocks/ffi_swagger_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/apiservermocks/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/assetmocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/batchmocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading