Skip to content

Commit 58daa9d

Browse files
Merge TX updates in same batch
Signed-off-by: Peter Broadhurst <[email protected]>
1 parent 0903a52 commit 58daa9d

File tree

6 files changed

+164
-7
lines changed

6 files changed

+164
-7
lines changed

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
9090
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
9191
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
9292
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
93-
github.com/hyperledger/firefly-common v1.4.2 h1:sBbiTFWDu1qCnXFA6JobasJl4AXphCAUZU/R4nyWPdE=
94-
github.com/hyperledger/firefly-common v1.4.2/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE=
9593
github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401 h1:bcIg8zUalHyjxPmhIggwg/VK/IVDvpH2XJwCkfAYrSU=
9694
github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE=
9795
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=

internal/persistence/postgres/eventstreams_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,11 @@ func TestEventStreamAfterPaginatePSQL(t *testing.T) {
108108
var eventStreams []*apitypes.EventStream
109109
for i := 0; i < 20; i++ {
110110
es := &apitypes.EventStream{
111-
ID: fftypes.NewUUID(),
112-
Name: strPtr(fmt.Sprintf("es_%.3d", i)),
111+
ID: fftypes.NewUUID(),
112+
Name: strPtr(fmt.Sprintf("es_%.3d", i)),
113+
BatchTimeout: ffDurationPtr(22222 * time.Second),
114+
RetryTimeout: ffDurationPtr(33333 * time.Second),
115+
BlockedRetryDelay: ffDurationPtr(44444 * time.Second),
113116
}
114117
err := p.WriteStream(ctx, es)
115118
assert.NoError(t, err)

internal/persistence/postgres/transaction_writer.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,9 +421,19 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction
421421
}
422422
}
423423
// Do all the transaction updates
424+
mergedUpdates := make(map[string]*apitypes.TXUpdates)
424425
for _, op := range b.txUpdates {
425-
if err := tw.p.updateTransaction(ctx, op.txID, op.txUpdate); err != nil {
426-
log.L(ctx).Errorf("Update transaction %s failed: %s", op.txID, err)
426+
update, merge := mergedUpdates[op.txID]
427+
if merge {
428+
update.Merge(op.txUpdate)
429+
} else {
430+
mergedUpdates[op.txID] = op.txUpdate
431+
}
432+
log.L(ctx).Debugf("Updating transaction %s in write operation %s (merged=%t)", op.txID, op.opID, merge)
433+
}
434+
for txID, update := range mergedUpdates {
435+
if err := tw.p.updateTransaction(ctx, txID, update); err != nil {
436+
log.L(ctx).Errorf("Update transaction %s failed: %s", txID, err)
427437
return err
428438
}
429439
}

internal/persistence/postgres/transaction_writer_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/hyperledger/firefly-common/pkg/fftypes"
2828
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
2929
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
30+
"github.com/sirupsen/logrus"
3031
"github.com/stretchr/testify/assert"
3132
)
3233

@@ -275,6 +276,48 @@ func TestExecuteBatchOpsUpdateTXFail(t *testing.T) {
275276
assert.NoError(t, mdb.ExpectationsWereMet())
276277
}
277278

279+
func TestExecuteBatchOpsUpdateTXMerge(t *testing.T) {
280+
logrus.SetLevel(logrus.TraceLevel)
281+
282+
ctx, p, mdb, done := newMockSQLPersistence(t)
283+
defer done()
284+
285+
mdb.ExpectBegin()
286+
mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1))
287+
mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1))
288+
mdb.ExpectCommit()
289+
290+
err := p.db.RunAsGroup(ctx, func(ctx context.Context) error {
291+
return p.writer.executeBatchOps(ctx, &transactionWriterBatch{
292+
txUpdates: []*transactionOperation{
293+
{
294+
txID: "11111",
295+
txUpdate: &apitypes.TXUpdates{
296+
Status: ptrTo(apitypes.TxStatusPending),
297+
From: strPtr("0xaaaaa"),
298+
},
299+
},
300+
{
301+
txID: "22222",
302+
txUpdate: &apitypes.TXUpdates{
303+
Status: ptrTo(apitypes.TxStatusPending),
304+
},
305+
},
306+
{
307+
txID: "11111",
308+
txUpdate: &apitypes.TXUpdates{
309+
Status: ptrTo(apitypes.TxStatusSucceeded),
310+
TransactionHash: strPtr("0xaabbcc"),
311+
},
312+
},
313+
},
314+
})
315+
})
316+
assert.NoError(t, err)
317+
318+
assert.NoError(t, mdb.ExpectationsWereMet())
319+
}
320+
278321
func TestExecuteBatchOpsUpsertReceiptFail(t *testing.T) {
279322
ctx, p, mdb, done := newMockSQLPersistence(t)
280323
defer done()
@@ -455,3 +498,33 @@ func TestQueueClosedContext(t *testing.T) {
455498
p.writer.queue(closedCtx, newTransactionOperation("tx1"))
456499

457500
}
501+
502+
func TestStopDoneWorker(t *testing.T) {
503+
tw := &transactionWriter{
504+
workersDone: []chan struct{}{
505+
make(chan struct{}),
506+
},
507+
}
508+
tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background())
509+
close(tw.workersDone[0])
510+
tw.stop()
511+
}
512+
513+
func TestStopDoneCtx(t *testing.T) {
514+
tw := &transactionWriter{
515+
workersDone: []chan struct{}{
516+
make(chan struct{}, 1),
517+
},
518+
}
519+
tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background())
520+
tw.cancelCtx()
521+
go func() {
522+
time.Sleep(10 * time.Millisecond)
523+
tw.workersDone[0] <- struct{}{}
524+
}()
525+
tw.stop()
526+
}
527+
528+
func ptrTo[T any](v T) *T {
529+
return &v
530+
}

pkg/apitypes/managed_tx.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2023 Kaleido, Inc.
1+
// Copyright © 2024 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -208,6 +208,51 @@ type TXUpdates struct {
208208
ErrorMessage *string `json:"errorMessage,omitempty"`
209209
}
210210

211+
func (txu *TXUpdates) Merge(txu2 *TXUpdates) {
212+
if txu2.Status != nil {
213+
txu.Status = txu2.Status
214+
}
215+
if txu2.DeleteRequested != nil {
216+
txu.DeleteRequested = txu2.DeleteRequested
217+
}
218+
if txu2.From != nil {
219+
txu.From = txu2.From
220+
}
221+
if txu2.To != nil {
222+
txu.To = txu2.To
223+
}
224+
if txu2.Nonce != nil {
225+
txu.Nonce = txu2.Nonce
226+
}
227+
if txu2.Gas != nil {
228+
txu.Gas = txu2.Gas
229+
}
230+
if txu2.Value != nil {
231+
txu.Value = txu2.Value
232+
}
233+
if txu2.GasPrice != nil {
234+
txu.GasPrice = txu2.GasPrice
235+
}
236+
if txu2.TransactionData != nil {
237+
txu.TransactionData = txu2.TransactionData
238+
}
239+
if txu2.TransactionHash != nil {
240+
txu.TransactionHash = txu2.TransactionHash
241+
}
242+
if txu2.PolicyInfo != nil {
243+
txu.PolicyInfo = txu2.PolicyInfo
244+
}
245+
if txu2.FirstSubmit != nil {
246+
txu.FirstSubmit = txu2.FirstSubmit
247+
}
248+
if txu2.LastSubmit != nil {
249+
txu.LastSubmit = txu2.LastSubmit
250+
}
251+
if txu2.ErrorMessage != nil {
252+
txu.ErrorMessage = txu2.ErrorMessage
253+
}
254+
}
255+
211256
// TXWithStatus is a convenience object that fetches all data about a transaction into one
212257
// large JSON payload (with limits on certain parts, such as the history entries).
213258
// Note that in LevelDB persistence this is the stored form of the single document object.

pkg/apitypes/managed_tx_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,31 @@ func TestReceiptRecord(t *testing.T) {
8585
r.SetUpdated(t2)
8686
assert.Equal(t, t2, r.Updated)
8787
}
88+
89+
func TestTXUpdatesMerge(t *testing.T) {
90+
txu := &TXUpdates{}
91+
txu2 := &TXUpdates{
92+
Status: ptrTo(TxStatusPending),
93+
DeleteRequested: fftypes.Now(),
94+
From: ptrTo("1111"),
95+
To: ptrTo("2222"),
96+
Nonce: fftypes.NewFFBigInt(3333),
97+
Gas: fftypes.NewFFBigInt(4444),
98+
Value: fftypes.NewFFBigInt(5555),
99+
GasPrice: fftypes.JSONAnyPtr(`{"some": "stuff"}`),
100+
TransactionData: ptrTo("xxxx"),
101+
TransactionHash: ptrTo("yyyy"),
102+
PolicyInfo: fftypes.JSONAnyPtr(`{"more": "stuff"}`),
103+
FirstSubmit: fftypes.Now(),
104+
LastSubmit: fftypes.Now(),
105+
ErrorMessage: ptrTo("pop"),
106+
}
107+
txu.Merge(txu2)
108+
assert.Equal(t, *txu2, *txu)
109+
txu.Merge(&TXUpdates{})
110+
assert.Equal(t, *txu2, *txu)
111+
}
112+
113+
func ptrTo[T any](v T) *T {
114+
return &v
115+
}

0 commit comments

Comments
 (0)