1
- // Copyright © 2023 Kaleido, Inc.
1
+ // Copyright © 2024 Kaleido, Inc.
2
2
//
3
3
// SPDX-License-Identifier: Apache-2.0
4
4
//
@@ -40,6 +40,7 @@ type transactionOperation struct {
40
40
sentConflict bool
41
41
done chan error
42
42
43
+ opID string
43
44
isShutdown bool
44
45
txInsert * apitypes.ManagedTX
45
46
noncePreAssigned bool
@@ -77,6 +78,7 @@ type transactionWriter struct {
77
78
78
79
type transactionWriterBatch struct {
79
80
id string
81
+ opened time.Time
80
82
ops []* transactionOperation
81
83
timeoutContext context.Context
82
84
timeoutCancel func ()
@@ -122,6 +124,7 @@ func newTransactionWriter(bgCtx context.Context, p *sqlPersistence, conf config.
122
124
123
125
func newTransactionOperation (txID string ) * transactionOperation {
124
126
return & transactionOperation {
127
+ opID : fftypes .ShortID (),
125
128
txID : txID ,
126
129
done : make (chan error , 1 ), // 1 slot to ensure we don't block the writer
127
130
}
@@ -130,6 +133,7 @@ func newTransactionOperation(txID string) *transactionOperation {
130
133
func (op * transactionOperation ) flush (ctx context.Context ) error {
131
134
select {
132
135
case err := <- op .done :
136
+ log .L (ctx ).Debugf ("Flushed write operation %s (err=%v)" , op .opID , err )
133
137
return err
134
138
case <- ctx .Done ():
135
139
return i18n .NewError (ctx , i18n .MsgContextCanceled )
@@ -165,6 +169,7 @@ func (tw *transactionWriter) queue(ctx context.Context, op *transactionOperation
165
169
h := fnv .New32a () // simple non-cryptographic hash algo
166
170
_ , _ = h .Write ([]byte (hashKey ))
167
171
routine := h .Sum32 () % tw .workerCount
172
+ log .L (ctx ).Debugf ("Queuing write operation %s to worker tx_writer_%.4d" , op .opID , routine )
168
173
select {
169
174
case tw .workQueues [routine ] <- op : // it's queued
170
175
case <- ctx .Done (): // timeout of caller context
@@ -180,6 +185,7 @@ func (tw *transactionWriter) worker(i int) {
180
185
defer close (tw .workersDone [i ])
181
186
workerID := fmt .Sprintf ("tx_writer_%.4d" , i )
182
187
ctx := log .WithLogField (tw .bgCtx , "job" , workerID )
188
+ l := log .L (ctx )
183
189
var batch * transactionWriterBatch
184
190
batchCount := 0
185
191
workQueue := tw .workQueues [i ]
@@ -202,24 +208,27 @@ func (tw *transactionWriter) worker(i int) {
202
208
}
203
209
if batch == nil {
204
210
batch = & transactionWriterBatch {
205
- id : fmt .Sprintf ("%.4d_%.9d" , i , batchCount ),
211
+ id : fmt .Sprintf ("%.4d_%.9d" , i , batchCount ),
212
+ opened : time .Now (),
206
213
}
207
214
batch .timeoutContext , batch .timeoutCancel = context .WithTimeout (ctx , tw .batchTimeout )
208
215
batchCount ++
209
216
}
210
217
batch .ops = append (batch .ops , op )
218
+ l .Debugf ("Added write operation %s to batch %s (len=%d)" , op .opID , batch .id , len (batch .ops ))
211
219
case <- timeoutContext .Done ():
212
220
timedOut = true
213
221
select {
214
222
case <- ctx .Done ():
215
- log . L ( ctx ) .Debugf ("Transaction writer ending" )
223
+ l .Debugf ("Transaction writer ending" )
216
224
return
217
225
default :
218
226
}
219
227
}
220
228
221
229
if batch != nil && (timedOut || (len (batch .ops ) >= tw .batchMaxSize )) {
222
230
batch .timeoutCancel ()
231
+ l .Debugf ("Running batch %s (len=%d,timeout=%t,age=%dms)" , batch .id , len (batch .ops ), timedOut , time .Since (batch .opened ).Milliseconds ())
223
232
tw .runBatch (ctx , batch )
224
233
batch = nil
225
234
}
@@ -383,6 +392,7 @@ func (tw *transactionWriter) preInsertIdempotencyCheck(ctx context.Context, b *t
383
392
txOp .sentConflict = true
384
393
txOp .done <- i18n .NewError (ctx , tmmsgs .MsgDuplicateID , txOp .txID )
385
394
} else {
395
+ log .L (ctx ).Debugf ("Adding TX %s from write operation %s to insert idx=%d" , txOp .txID , txOp .opID , len (validInserts ))
386
396
validInserts = append (validInserts , txOp .txInsert )
387
397
}
388
398
}
@@ -413,9 +423,19 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction
413
423
}
414
424
}
415
425
// Do all the transaction updates
426
+ mergedUpdates := make (map [string ]* apitypes.TXUpdates )
416
427
for _ , op := range b .txUpdates {
417
- if err := tw .p .updateTransaction (ctx , op .txID , op .txUpdate ); err != nil {
418
- log .L (ctx ).Errorf ("Update transaction %s failed: %s" , op .txID , err )
428
+ update , merge := mergedUpdates [op .txID ]
429
+ if merge {
430
+ update .Merge (op .txUpdate )
431
+ } else {
432
+ mergedUpdates [op .txID ] = op .txUpdate
433
+ }
434
+ log .L (ctx ).Debugf ("Updating transaction %s in write operation %s (merged=%t)" , op .txID , op .opID , merge )
435
+ }
436
+ for txID , update := range mergedUpdates {
437
+ if err := tw .p .updateTransaction (ctx , txID , update ); err != nil {
438
+ log .L (ctx ).Errorf ("Update transaction %s failed: %s" , txID , err )
419
439
return err
420
440
}
421
441
}
0 commit comments