Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/events/batch_pin_complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,9 @@ func TestPersistBatchGoodDataMessageFail(t *testing.T) {

em.mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil)
em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimzation miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop"))
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimzation miss")).Once()
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{}, nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop")).Once()

em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil)

Expand Down
2 changes: 1 addition & 1 deletion internal/events/dx_callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func TestMessageReceiveMessagePersistMessageFail(t *testing.T) {
em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil)
em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization fail"))
em.mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop"))
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return(nil, fmt.Errorf("pop"))

// no ack as we are simulating termination mid retry
mde := newMessageReceivedNoAck("peer1", tw)
Expand Down
59 changes: 47 additions & 12 deletions internal/events/persist_batch.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.

Check failure on line 1 in internal/events/persist_batch.go

View workflow job for this annotation

GitHub Actions / build

Expected:2025, Actual: 2023 Kaleido, Inc. (goheader)

Check failure on line 1 in internal/events/persist_batch.go

View workflow job for this annotation

GitHub Actions / build

Expected:2025, Actual: 2023 Kaleido, Inc. (goheader)
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,7 +18,9 @@

import (
"context"
"database/sql/driver"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/pkg/core"
Expand Down Expand Up @@ -298,22 +300,55 @@
})
if err != nil {
log.L(ctx).Debugf("Batch message insert optimization failed for batch '%s': %s", batch.ID, err)
// Fall back to individual upserts
for i, msg := range batch.Payload.Messages {
postHookUpdateMessageCache := func() {
mm := matchedMsgs[i]
em.data.UpdateMessageCache(mm.message, mm.data)
}
if err = em.database.UpsertMessage(ctx, msg, database.UpsertOptimizationExisting, postHookUpdateMessageCache); err != nil {
if err == database.HashMismatch {
log.L(ctx).Errorf("Invalid message entry %d in batch'%s'. Hash mismatch with existing record with same UUID '%s' Hash=%s", i, batch.ID, msg.Header.ID, msg.Hash)
return false, nil // This is not retryable. skip this data entry

// Messages are immutable in their contents, and it's entirely possible we're being sent duplicate message data
// that is for messages we've subsequently modified the state of (they've been confirmed).
// So we find a list of those that aren't in the DB and so an insert of just those\
var foundIDs []*core.IDAndSequence
foundIDs, err = em.database.GetMessageIDs(ctx, batch.Namespace, messageIDFilter(ctx, batch.Payload.Messages))
if err == nil {
remainingInserts := make([]*core.Message, 0, len(batch.Payload.Messages))
for _, m := range batch.Payload.Messages {
isFound := false
for _, foundID := range foundIDs {
if foundID.ID.Equals(m.Header.ID) {
isFound = true
log.L(ctx).Errorf("Message %s in batch '%s' is a duplicated", m.Header.ID, batch.ID)
break
}
}
if !isFound {
remainingInserts = append(remainingInserts, m)
}
log.L(ctx).Errorf("Failed to insert message entry %d in batch '%s': %s", i, batch.ID, err)
return false, err // a persistence failure here is considered retryable (so returned)
}
if len(remainingInserts) > 0 {
// Only the remaining ones get updates
err = em.database.InsertMessages(ctx, batch.Payload.Messages, func() {
for _, mm := range matchedMsgs {
for _, m := range remainingInserts {
if mm.message.Header.ID.Equals(m.Header.ID) {
em.data.UpdateMessageCache(mm.message, mm.data)
}
}
}
})
}
}
// If we have an error at this point, we're cannot insert (must not be a duplicate)
if err != nil {
log.L(ctx).Errorf("Failed to insert messages: %s", err)
return false, err // a persistence failure here is considered retryable (so returned)
}
}

return true, nil
}

func messageIDFilter(ctx context.Context, msgs []*core.Message) ffapi.Filter {
fb := database.MessageQueryFactory.NewFilter(ctx)
ids := make([]driver.Value, len(msgs))
for i, msg := range msgs {
ids[i] = msg.Header.ID
}
return fb.In("id", ids)
}
18 changes: 11 additions & 7 deletions internal/events/persist_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestPersistBatchContentSentByUsNotFoundFallback(t *testing.T) {

}

func TestPersistBatchContentSentByUsFoundMismatch(t *testing.T) {
func TestPersistBatchContentSentByUsFoundDup(t *testing.T) {

em := newTestEventManager(t)
defer em.cleanup(t)
Expand All @@ -234,21 +234,24 @@ func TestPersistBatchContentSentByUsFoundMismatch(t *testing.T) {
batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data})
batch.Node = testNodeID

msgID := batch.Payload.Messages[0].Header.ID
em.mdm.On("GetMessageWithDataCached", em.ctx, batch.Payload.Messages[0].Header.ID).Return(&core.Message{
Header: core.MessageHeader{
ID: fftypes.NewUUID(),
ID: msgID,
},
}, nil, true, nil)

em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(database.HashMismatch)
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{
{ID: *msgID},
}, nil)

em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil)

ok, err := em.persistBatchContent(em.ctx, batch, []*messageAndData{})
assert.NoError(t, err)
assert.False(t, ok)
assert.True(t, ok)

}

Expand All @@ -261,9 +264,10 @@ func TestPersistBatchContentInsertMessagesFail(t *testing.T) {
batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data})

em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(nil).Run(func(args mock.Arguments) {
args[3].(database.PostCompletionHook)()
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss")).Once()
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{}, nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(nil).Once().Run(func(args mock.Arguments) {
args[2].(database.PostCompletionHook)()
})

msgData := &messageAndData{
Expand Down
Loading