Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/events/batch_pin_complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,9 @@ func TestPersistBatchGoodDataMessageFail(t *testing.T) {

em.mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil)
em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimzation miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop"))
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimzation miss")).Once()
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{}, nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop")).Once()

em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil)

Expand Down
2 changes: 1 addition & 1 deletion internal/events/dx_callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func TestMessageReceiveMessagePersistMessageFail(t *testing.T) {
em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil)
em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization fail"))
em.mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop"))
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return(nil, fmt.Errorf("pop"))

// no ack as we are simulating termination mid retry
mde := newMessageReceivedNoAck("peer1", tw)
Expand Down
60 changes: 48 additions & 12 deletions internal/events/persist_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package events

import (
"context"
"database/sql/driver"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/pkg/core"
Expand Down Expand Up @@ -298,22 +300,56 @@ func (em *eventManager) persistBatchContent(ctx context.Context, batch *core.Bat
})
if err != nil {
log.L(ctx).Debugf("Batch message insert optimization failed for batch '%s': %s", batch.ID, err)
// Fall back to individual upserts
for i, msg := range batch.Payload.Messages {
postHookUpdateMessageCache := func() {
mm := matchedMsgs[i]
em.data.UpdateMessageCache(mm.message, mm.data)
}
if err = em.database.UpsertMessage(ctx, msg, database.UpsertOptimizationExisting, postHookUpdateMessageCache); err != nil {
if err == database.HashMismatch {
log.L(ctx).Errorf("Invalid message entry %d in batch'%s'. Hash mismatch with existing record with same UUID '%s' Hash=%s", i, batch.ID, msg.Header.ID, msg.Hash)
return false, nil // This is not retryable. skip this data entry

// Messages are immutable in their contents, and it's entirely possible we're being sent
// messages we've already been sent in a previous batch, and subsequently modified th
// state of (they've been confirmed etc.).
// So we find a list of those that aren't in the DB and so and insert just those.
var foundIDs []*core.IDAndSequence
foundIDs, err = em.database.GetMessageIDs(ctx, batch.Namespace, messageIDFilter(ctx, batch.Payload.Messages))
if err == nil {
remainingInserts := make([]*core.Message, 0, len(batch.Payload.Messages))
for _, m := range batch.Payload.Messages {
isFound := false
for _, foundID := range foundIDs {
if foundID.ID.Equals(m.Header.ID) {
isFound = true
log.L(ctx).Warnf("Message %s in batch '%s' is a duplicate", m.Header.ID, batch.ID)
break
}
}
if !isFound {
remainingInserts = append(remainingInserts, m)
}
log.L(ctx).Errorf("Failed to insert message entry %d in batch '%s': %s", i, batch.ID, err)
return false, err // a persistence failure here is considered retryable (so returned)
}
if len(remainingInserts) > 0 {
// Only the remaining ones get updates
err = em.database.InsertMessages(ctx, batch.Payload.Messages, func() {
for _, mm := range matchedMsgs {
for _, m := range remainingInserts {
if mm.message.Header.ID.Equals(m.Header.ID) {
em.data.UpdateMessageCache(mm.message, mm.data)
}
}
}
})
}
}
// If we have an error at this point, we cannot insert (must not be a duplicate)
if err != nil {
log.L(ctx).Errorf("Failed to insert messages: %s", err)
return false, err // a persistence failure here is considered retryable (so returned)
}
}

return true, nil
}

func messageIDFilter(ctx context.Context, msgs []*core.Message) ffapi.Filter {
fb := database.MessageQueryFactory.NewFilter(ctx)
ids := make([]driver.Value, len(msgs))
for i, msg := range msgs {
ids[i] = msg.Header.ID
}
return fb.In("id", ids)
}
18 changes: 11 additions & 7 deletions internal/events/persist_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestPersistBatchContentSentByUsNotFoundFallback(t *testing.T) {

}

func TestPersistBatchContentSentByUsFoundMismatch(t *testing.T) {
func TestPersistBatchContentSentByUsFoundDup(t *testing.T) {

em := newTestEventManager(t)
defer em.cleanup(t)
Expand All @@ -234,21 +234,24 @@ func TestPersistBatchContentSentByUsFoundMismatch(t *testing.T) {
batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data})
batch.Node = testNodeID

msgID := batch.Payload.Messages[0].Header.ID
em.mdm.On("GetMessageWithDataCached", em.ctx, batch.Payload.Messages[0].Header.ID).Return(&core.Message{
Header: core.MessageHeader{
ID: fftypes.NewUUID(),
ID: msgID,
},
}, nil, true, nil)

em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(database.HashMismatch)
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{
{ID: *msgID},
}, nil)

em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil)

ok, err := em.persistBatchContent(em.ctx, batch, []*messageAndData{})
assert.NoError(t, err)
assert.False(t, ok)
assert.True(t, ok)

}

Expand All @@ -261,9 +264,10 @@ func TestPersistBatchContentInsertMessagesFail(t *testing.T) {
batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data})

em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(nil).Run(func(args mock.Arguments) {
args[3].(database.PostCompletionHook)()
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss")).Once()
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{}, nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(nil).Once().Run(func(args mock.Arguments) {
args[2].(database.PostCompletionHook)()
})

msgData := &messageAndData{
Expand Down