Skip to content

Commit 5dca080

Browse files
authored
llo: scaling improvements and safeguard channel definition version management (#17435)
* llo/observation: add short lived cache to deamplify ea requests * llo/transmitter: batch transmit requests to reduce commit overhead * llo/mercurytransmitter: use a cancellable context for consistency * llo/channeldefinitions: safeguard against invalid blocknumbers * bump wsrpc * llo/observation: remove unnecessary logging
1 parent 23e41f5 commit 5dca080

File tree

22 files changed

+338
-80
lines changed

22 files changed

+338
-80
lines changed

core/scripts/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ require (
373373
github.com/smartcontractkit/mcms v0.18.0 // indirect
374374
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20241009055228-33d0c0bf38de // indirect
375375
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20241009055228-33d0c0bf38de // indirect
376-
github.com/smartcontractkit/wsrpc v0.8.5-0.20250318131857-4568a0f8d12d // indirect
376+
github.com/smartcontractkit/wsrpc v0.8.5-0.20250502134807-c57d3d995945 // indirect
377377
github.com/sourcegraph/conc v0.3.0 // indirect
378378
github.com/spf13/afero v1.12.0 // indirect
379379
github.com/spf13/cast v1.7.1 // indirect

core/scripts/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,8 +1243,8 @@ github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20241009055228-
12431243
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20241009055228-33d0c0bf38de/go.mod h1:Sl2MF/Fp3fgJIVzhdGhmZZX2BlnM0oUUyBP4s4xYb6o=
12441244
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20241009055228-33d0c0bf38de h1:66VQxXx3lvTaAZrMBkIcdH9VEjujUEvmBQdnyOJnkOc=
12451245
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20241009055228-33d0c0bf38de/go.mod h1:NSc7hgOQbXG3DAwkOdWnZzLTZENXSwDJ7Va1nBp0YU0=
1246-
github.com/smartcontractkit/wsrpc v0.8.5-0.20250318131857-4568a0f8d12d h1:B/LvtTIFwbkzoFE7723Q0IQBv/lcaTm0LgWBCZPUtvs=
1247-
github.com/smartcontractkit/wsrpc v0.8.5-0.20250318131857-4568a0f8d12d/go.mod h1:m3pdp17i4bD50XgktkzWetcV5yaLsi7Gunbv4ZgN6qg=
1246+
github.com/smartcontractkit/wsrpc v0.8.5-0.20250502134807-c57d3d995945 h1:zxcODLrFytOKmAd8ty8S/XK6WcIEJEgRBaL7sY/7l4Y=
1247+
github.com/smartcontractkit/wsrpc v0.8.5-0.20250502134807-c57d3d995945/go.mod h1:m3pdp17i4bD50XgktkzWetcV5yaLsi7Gunbv4ZgN6qg=
12481248
github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec=
12491249
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
12501250
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=

core/services/llo/channeldefinitions/onchain_channel_definition_cache.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,10 @@ func (c *channelDefinitionCache) Start(ctx context.Context) error {
171171
return err
172172
} else if pd != nil {
173173
c.definitions = pd.Definitions
174-
c.initialBlockNum = pd.BlockNum + 1
175174
c.definitionsVersion = uint32(pd.Version)
175+
if pd.BlockNum+1 > c.initialBlockNum {
176+
c.initialBlockNum = pd.BlockNum + 1
177+
}
176178
} else {
177179
// ensure non-nil map ready for assignment later
178180
c.definitions = make(llotypes.ChannelDefinitions)
@@ -264,6 +266,9 @@ func (c *channelDefinitionCache) readLogs(ctx context.Context) (err error) {
264266
}
265267
unpacked.DonId = new(big.Int).SetBytes(log.Topics[1])
266268

269+
//nolint:gosec // disable G115
270+
unpacked.Raw.BlockNumber = uint64(log.BlockNumber)
271+
267272
if unpacked.DonId.Cmp(big.NewInt(int64(c.donID))) != 0 {
268273
// skip logs for other donIDs, shouldn't happen given the
269274
// FilterLogs call, but belts and braces
@@ -286,7 +291,8 @@ func (c *channelDefinitionCache) scanFromBlockNum() int64 {
286291
c.newLogMu.RLock()
287292
defer c.newLogMu.RUnlock()
288293
if c.newLog != nil {
289-
return int64(c.newLog.Raw.BlockNumber) + 1
294+
//nolint:gosec // disable G115
295+
return int64(c.newLog.Raw.BlockNumber)
290296
}
291297
return c.initialBlockNum
292298
}

core/services/llo/mercurytransmitter/orm.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,10 @@ func (o *orm) Prune(ctx context.Context, serverURL string, maxSize, batchSize in
208208
res, err = o.ds.ExecContext(ctx, `
209209
DELETE FROM llo_mercury_transmit_queue AS q
210210
USING (
211-
SELECT transmission_hash
211+
SELECT transmission_hash
212212
FROM llo_mercury_transmit_queue
213-
WHERE don_id = $1
214-
AND server_url = $2
213+
WHERE don_id = $1
214+
AND server_url = $2
215215
AND seq_nr < $3
216216
ORDER BY seq_nr ASC
217217
LIMIT $4

core/services/llo/mercurytransmitter/queue.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,16 @@ func (tq *transmitQueue) BlockingPop() (t *Transmission) {
140140
}
141141

142142
func (tq *transmitQueue) IsEmpty() bool {
143-
tq.mu.RLock()
144-
defer tq.mu.RUnlock()
145-
return tq.pq.Len() == 0
143+
return tq.Len() == 0
144+
}
145+
146+
func (tq *transmitQueue) Len() int {
147+
tq.cond.L.Lock()
148+
defer tq.cond.L.Unlock()
149+
150+
sz := tq.pq.Len()
151+
tq.cond.Signal()
152+
return sz
146153
}
147154

148155
func (tq *transmitQueue) Start(context.Context) error {

core/services/llo/mercurytransmitter/queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func Test_Queue(t *testing.T) {
133133
}
134134

135135
tq.Push(testTransmissions[maxSize+3]) // push one more to trigger eviction
136-
require.Equal(t, maxSize, tq.(*transmitQueue).pq.Len())
136+
require.Equal(t, maxSize, tq.(*transmitQueue).Len())
137137
require.Len(t, deleter.hashes, 4) // evicted overfill entries (3 oversize plus 1 more to make room)
138138

139139
// oldest entries removed

core/services/llo/mercurytransmitter/transmitter.go

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"sync"
11+
"time"
1112

1213
"github.com/prometheus/client_golang/prometheus"
1314
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -29,7 +30,9 @@ import (
2930

3031
const (
3132
// Mercury server error codes
32-
DuplicateReport = 2
33+
DuplicateReport = 2
34+
commitInterval = time.Millisecond * 25
35+
commitBufferSize = 1000
3336
)
3437

3538
var (
@@ -123,6 +126,8 @@ type transmitter struct {
123126

124127
stopCh services.StopChan
125128
wg *sync.WaitGroup
129+
130+
commitCh chan *Transmission
126131
}
127132

128133
type Opts struct {
@@ -158,6 +163,7 @@ func newTransmitter(opts Opts) *transmitter {
158163
opts.FromAccount,
159164
make(services.StopChan),
160165
&sync.WaitGroup{},
166+
make(chan *Transmission, 1000*len(servers)),
161167
}
162168
}
163169

@@ -200,6 +206,7 @@ func (mt *transmitter) Start(ctx context.Context) (err error) {
200206
})
201207
}
202208

209+
mt.spawnCommitLoops()
203210
return g.Wait()
204211
})
205212
}
@@ -248,37 +255,36 @@ func (mt *transmitter) Transmit(
248255
sigs []types.AttributedOnchainSignature,
249256
) (err error) {
250257
ok := mt.IfStarted(func() {
251-
err = mt.transmit(ctx, digest, seqNr, report, sigs)
258+
for serverURL := range mt.servers {
259+
t := &Transmission{
260+
ServerURL: serverURL,
261+
ConfigDigest: digest,
262+
SeqNr: seqNr,
263+
Report: report,
264+
Sigs: sigs,
265+
}
266+
select {
267+
case mt.commitCh <- t:
268+
case <-ctx.Done():
269+
err = fmt.Errorf("failed to add transmission to commit channel: %w", ctx.Err())
270+
}
271+
}
252272
})
273+
253274
if !ok {
254275
return errors.New("transmitter is not started")
255276
}
256-
return
277+
278+
return err
257279
}
258280

259-
func (mt *transmitter) transmit(
260-
ctx context.Context,
261-
digest types.ConfigDigest,
262-
seqNr uint64,
263-
report ocr3types.ReportWithInfo[llotypes.ReportInfo],
264-
sigs []types.AttributedOnchainSignature,
265-
) error {
281+
func (mt *transmitter) transmit(ctx context.Context, transmissions []*Transmission) error {
266282
// On shutdown appears that libocr can pass us a pre-canceled context;
267283
// don't even bother trying to insert/transmit in this case
268284
if ctx.Err() != nil {
269285
return fmt.Errorf("cannot transmit; context already canceled: %w", ctx.Err())
270286
}
271287

272-
transmissions := make([]*Transmission, 0, len(mt.servers))
273-
for serverURL := range mt.servers {
274-
transmissions = append(transmissions, &Transmission{
275-
ServerURL: serverURL,
276-
ConfigDigest: digest,
277-
SeqNr: seqNr,
278-
Report: report,
279-
Sigs: sigs,
280-
})
281-
}
282288
// NOTE: This insert on its own can leave orphaned records in the case of
283289
// shutdown, because:
284290
// 1. Transmitter is shut down after oracle
@@ -312,11 +318,15 @@ func (mt *transmitter) transmit(
312318
for i := range transmissions {
313319
t := transmissions[i]
314320
if mt.verboseLogging {
315-
mt.lggr.Debugw("Transmit report", "digest", digest.Hex(), "seqNr", seqNr, "reportFormat", report.Info.ReportFormat, "reportLifeCycleStage", report.Info.LifeCycleStage, "transmissionHash", fmt.Sprintf("%x", t.Hash()))
321+
mt.lggr.Debugw("Transmit report",
322+
"digest", t.ConfigDigest.Hex(), "seqNr", t.SeqNr, "reportFormat", t.Report.Info.ReportFormat,
323+
"reportLifeCycleStage", t.Report.Info.LifeCycleStage,
324+
"transmissionHash", fmt.Sprintf("%x", t.Hash()))
316325
}
317-
s := mt.servers[t.ServerURL]
326+
318327
// OK to do this synchronously since pushing to queue is just a mutex
319328
// lock and array append and ought to be extremely fast
329+
s := mt.servers[t.ServerURL]
320330
if ok := s.q.Push(t); !ok {
321331
s.transmitQueuePushErrorCount.Inc()
322332
// This shouldn't be possible since transmitter is always shut down
@@ -332,3 +342,53 @@ func (mt *transmitter) transmit(
332342
func (mt *transmitter) FromAccount(ctx context.Context) (ocrtypes.Account, error) {
333343
return ocrtypes.Account(mt.fromAccount), nil
334344
}
345+
346+
func (mt *transmitter) spawnCommitLoops() {
347+
for x := 0; x < len(mt.servers); x++ {
348+
mt.wg.Add(1)
349+
350+
go func() {
351+
defer mt.wg.Done()
352+
353+
var err error
354+
ctx, cancel := mt.stopCh.NewCtx()
355+
defer cancel()
356+
357+
buff := cap(mt.commitCh) / 10
358+
transmissions := make([]*Transmission, 0, buff)
359+
ticker := time.NewTicker(commitInterval)
360+
defer ticker.Stop()
361+
362+
for {
363+
select {
364+
case <-ctx.Done():
365+
if len(transmissions) >= buff {
366+
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second)
367+
defer closeCancel()
368+
if err = mt.transmit(closeCtx, transmissions); err != nil {
369+
mt.lggr.Error("Error transmitting records when stopping", "error", err)
370+
}
371+
}
372+
return
373+
374+
case <-ticker.C:
375+
if len(transmissions) > 0 {
376+
err = mt.transmit(ctx, transmissions)
377+
transmissions = make([]*Transmission, 0, buff)
378+
}
379+
380+
case t := <-mt.commitCh:
381+
transmissions = append(transmissions, t)
382+
if len(transmissions) >= buff {
383+
err = mt.transmit(ctx, transmissions)
384+
transmissions = make([]*Transmission, 0, buff)
385+
}
386+
}
387+
388+
if err != nil {
389+
mt.lggr.Error("Error transmitting records", "error", err)
390+
}
391+
}
392+
}()
393+
}
394+
}

core/services/llo/mercurytransmitter/transmitter_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func Test_Transmitter_Transmit(t *testing.T) {
110110
require.NoError(t, mt.servers[sURL].q.Init([]*Transmission{}))
111111
require.NoError(t, mt.servers[sURL2].q.Init([]*Transmission{}))
112112
require.NoError(t, mt.servers[sURL3].q.Init([]*Transmission{}))
113+
mt.spawnCommitLoops()
113114

114115
return nil
115116
})
@@ -125,24 +126,27 @@ func Test_Transmitter_Transmit(t *testing.T) {
125126
err = mt.Transmit(testutils.Context(t), digest, seqNr, report, sigs)
126127
require.NoError(t, err)
127128

129+
// wait for the commit loop to run
130+
time.Sleep(2 * commitInterval)
131+
128132
// ensure it was added to the queue
129-
require.Equal(t, 1, mt.servers[sURL].q.(*transmitQueue).pq.Len())
133+
require.Equal(t, 1, mt.servers[sURL].q.(*transmitQueue).Len())
130134
assert.Equal(t, &Transmission{
131135
ServerURL: sURL,
132136
ConfigDigest: digest,
133137
SeqNr: seqNr,
134138
Report: report,
135139
Sigs: sigs,
136140
}, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission))
137-
require.Equal(t, 1, mt.servers[sURL2].q.(*transmitQueue).pq.Len())
141+
require.Equal(t, 1, mt.servers[sURL2].q.(*transmitQueue).Len())
138142
assert.Equal(t, &Transmission{
139143
ServerURL: sURL2,
140144
ConfigDigest: digest,
141145
SeqNr: seqNr,
142146
Report: report,
143147
Sigs: sigs,
144148
}, mt.servers[sURL2].q.(*transmitQueue).pq.Pop().(*Transmission))
145-
require.Equal(t, 1, mt.servers[sURL3].q.(*transmitQueue).pq.Len())
149+
require.Equal(t, 1, mt.servers[sURL3].q.(*transmitQueue).Len())
146150
assert.Equal(t, &Transmission{
147151
ServerURL: sURL3,
148152
ConfigDigest: digest,

0 commit comments

Comments
 (0)