Skip to content

Commit 6ad9fdb

Browse files
Merge branch 'main' into alex/etl_mmap_34
2 parents ebce8c3 + 1fc4ace commit 6ad9fdb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1470
-219
lines changed

cl/phase1/network/gossip/gossip_manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,22 @@ func (g *GossipManager) Close() error {
101101
}
102102

103103
func (g *GossipManager) newPubsubValidator(service serviceintf.Service[any], conditions ...ConditionFunc) pubsub.ValidatorEx {
104+
var selfID peer.ID
105+
if h := g.p2p.Host(); h != nil {
106+
selfID = h.ID()
107+
}
104108
return func(ctx context.Context, pid peer.ID, msg *pubsub.Message) (result pubsub.ValidationResult) {
105109
defer func() {
106110
if r := recover(); r != nil {
107111
log.Error("[GossipManager] panic in validator, rejecting message", "err", r, "topic", msg.GetTopic())
108112
result = pubsub.ValidationReject
109113
}
110114
}()
115+
// Skip validation for self-published messages: they were already validated
116+
// by ProcessMessage before Publish was called.
117+
if selfID != "" && pid == selfID {
118+
return pubsub.ValidationAccept
119+
}
111120
curVersion := g.beaconConfig.GetCurrentStateVersion(g.ethClock.GetCurrentEpoch())
112121
// parse the topic and subnet
113122
topic := msg.GetTopic()

cl/phase1/network/services/aggregate_and_proof_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(
247247
index: aggregateAndProof.SignedAggregateAndProof.Message.AggregatorIndex,
248248
}
249249
if a.seenAggreatorIndexes.Contains(seenIndex) {
250-
return nil
250+
return fmt.Errorf("%w: aggregator already seen", ErrIgnore)
251251
}
252252

253253
committee, err := headState.GetBeaconCommitee(slot, committeeIndex)

cl/phase1/network/services/attestation_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
263263
// mark the validator as seen
264264
epochLastTime, ok := s.validatorAttestationSeen.Get(vIndex)
265265
if ok && epochLastTime == targetEpoch {
266-
return nil
266+
return fmt.Errorf("validator already seen in target epoch %w", ErrIgnore)
267267
}
268268
s.validatorAttestationSeen.Add(vIndex, targetEpoch)
269269

cmd/rlpgen/handlers.go

Lines changed: 134 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,62 @@ func addToImports(named *types.Named) (typ string) {
8989
return
9090
}
9191

92+
func boolHandle(b1, b2, b3 *bytes.Buffer, fieldType types.Type, fieldName string) {
93+
// size - bool encoded as 0 or 1 (1 byte)
94+
fmt.Fprintf(b1, " size += 1\n")
95+
96+
// encode - bool encoded as 0 or 1
97+
fmt.Fprintf(b2, " var bval uint64\n")
98+
fmt.Fprintf(b2, " if obj.%s {\n", fieldName)
99+
fmt.Fprintf(b2, " bval = 1\n")
100+
fmt.Fprintf(b2, " }\n")
101+
fmt.Fprintf(b2, " if err := rlp.EncodeInt(bval, w, b[:]); err != nil {\n")
102+
fmt.Fprintf(b2, " return err\n")
103+
fmt.Fprintf(b2, " }\n")
104+
105+
// decode
106+
fmt.Fprintf(b3, " if n, err := s.Uint(); err != nil {\n")
107+
fmt.Fprintf(b3, " %s\n", decodeErrorMsg(fieldName))
108+
fmt.Fprintf(b3, " } else {\n")
109+
fmt.Fprintf(b3, " obj.%s = n != 0\n", fieldName)
110+
fmt.Fprintf(b3, " }\n")
111+
}
112+
113+
func boolPtrHandle(b1, b2, b3 *bytes.Buffer, fieldType types.Type, fieldName string) {
114+
// size - bool encoded as 0 or 1 (1 byte) or empty if nil
115+
fmt.Fprintf(b1, " if obj.%s != nil {\n", fieldName)
116+
fmt.Fprintf(b1, " size += 1\n")
117+
fmt.Fprintf(b1, " }\n")
118+
119+
// encode - bool encoded as 0 or 1, or empty if nil
120+
fmt.Fprintf(b2, " var bval uint64\n")
121+
fmt.Fprintf(b2, " if obj.%s != nil {\n", fieldName)
122+
fmt.Fprintf(b2, " if *obj.%s {\n", fieldName)
123+
fmt.Fprintf(b2, " bval = 1\n")
124+
fmt.Fprintf(b2, " }\n")
125+
fmt.Fprintf(b2, " if err := rlp.EncodeInt(bval, w, b[:]); err != nil {\n")
126+
fmt.Fprintf(b2, " return err\n")
127+
fmt.Fprintf(b2, " }\n")
128+
fmt.Fprintf(b2, " } else {\n")
129+
fmt.Fprintf(b2, " if err := rlp.EncodeInt(0, w, b[:]); err != nil {\n")
130+
fmt.Fprintf(b2, " return err\n")
131+
fmt.Fprintf(b2, " }\n")
132+
fmt.Fprintf(b2, " }\n")
133+
134+
// decode
135+
fmt.Fprintf(b3, " if n, err := s.Uint(); err != nil {\n")
136+
fmt.Fprintf(b3, " %s\n", decodeErrorMsg(fieldName))
137+
fmt.Fprintf(b3, " } else {\n")
138+
fmt.Fprintf(b3, " bval := n != 0\n")
139+
fmt.Fprintf(b3, " obj.%s = &bval\n", fieldName)
140+
fmt.Fprintf(b3, " }\n")
141+
}
142+
92143
func uint64CastTo(kind types.BasicKind) string {
93144
var cast string
94145
switch kind {
146+
case types.Int8:
147+
cast = "int8"
95148
case types.Int16:
96149
cast = "int16"
97150
case types.Int32:
@@ -100,6 +153,8 @@ func uint64CastTo(kind types.BasicKind) string {
100153
cast = "int"
101154
case types.Int64:
102155
cast = "int64"
156+
case types.Uint8:
157+
cast = "uint8"
103158
case types.Uint16:
104159
cast = "uint16"
105160
case types.Uint32:
@@ -294,15 +349,10 @@ func _shortArrayHandle(b1, b2, b3 *bytes.Buffer, fieldName string, size int) { /
294349
fmt.Fprintf(b2, " return err\n")
295350
fmt.Fprintf(b2, " }\n")
296351

297-
// decode
298-
addDecodeBuf(b3)
299-
fmt.Fprintf(b3, " if b, err = s.Bytes(); err != nil {\n")
300-
fmt.Fprintf(b3, " %s\n", decodeErrorMsg(fieldName))
352+
// decode - optimized: use s.ReadBytes() directly into fixed-size array
353+
fmt.Fprintf(b3, " if err = s.ReadBytes(obj.%s[:]); err != nil {\n", fieldName)
354+
fmt.Fprintf(b3, " return fmt.Errorf(\"error decoding field %s, err: %%w\", err)\n", fieldName)
301355
fmt.Fprintf(b3, " }\n")
302-
fmt.Fprintf(b3, " if len(b) > 0 && len(b) != %d {\n", size)
303-
fmt.Fprintf(b3, " %s\n", decodeLenMismatch(size))
304-
fmt.Fprintf(b3, " }\n")
305-
fmt.Fprintf(b3, " copy(obj.%s[:], b)\n", fieldName)
306356
}
307357

308358
func _shortArrayPtrHandle(b1, b2, b3 *bytes.Buffer, fieldType types.Type, fieldName string, size int) error {
@@ -339,16 +389,11 @@ func _shortArrayPtrHandle(b1, b2, b3 *bytes.Buffer, fieldType types.Type, fieldN
339389
fmt.Fprintf(b2, " }\n")
340390
fmt.Fprintf(b2, " }\n")
341391

342-
// decode
343-
addDecodeBuf(b3)
344-
fmt.Fprintf(b3, " if b, err = s.Bytes(); err != nil {\n")
345-
fmt.Fprintf(b3, " %s\n", decodeErrorMsg(fieldName))
346-
fmt.Fprintf(b3, " }\n")
347-
fmt.Fprintf(b3, " if len(b) > 0 && len(b) != %d {\n", size)
348-
fmt.Fprintf(b3, " %s\n", decodeLenMismatch(size))
349-
fmt.Fprintf(b3, " }\n")
392+
// decode - optimized: use s.ReadBytes() directly
350393
fmt.Fprintf(b3, " obj.%s = &%s{}\n", fieldName, typ)
351-
fmt.Fprintf(b3, " copy((*obj.%s)[:], b)\n", fieldName)
394+
fmt.Fprintf(b3, " if err = s.ReadBytes((*obj.%s)[:]); err != nil {\n", fieldName)
395+
fmt.Fprintf(b3, " return fmt.Errorf(\"error decoding field %s, err: %%w\", err)\n", fieldName)
396+
fmt.Fprintf(b3, " }\n")
352397

353398
return nil
354399
}
@@ -463,8 +508,7 @@ func byteSliceHandle(b1, b2, b3 *bytes.Buffer, _ types.Type, fieldName string) {
463508
fmt.Fprintf(b2, " return err\n")
464509
fmt.Fprintf(b2, " }\n")
465510

466-
// decode
467-
addDecodeBuf(b3)
511+
// decode - no buffer needed, s.Bytes() returns directly
468512
fmt.Fprintf(b3, " if obj.%s, err = s.Bytes(); err != nil {\n", fieldName)
469513
fmt.Fprintf(b3, " %s\n", decodeErrorMsg(fieldName))
470514
fmt.Fprintf(b3, " }\n")
@@ -661,6 +705,77 @@ func hashSliceHandle(b1, b2, b3 *bytes.Buffer, fieldType types.Type, fieldName s
661705
_shortArraySliceHandle(b1, b2, b3, fieldType, fieldName, 32)
662706
}
663707

708+
func hashSliceHandleOptimized(b1, b2, b3 *bytes.Buffer, fieldType types.Type, fieldName string) {
709+
// Optimized handler for []common.Hash with pre-allocation limit to prevent DoS attacks
710+
// Similar to decodeTopics2 in log.go - limits pre-allocation to 128 elements
711+
712+
var typ string
713+
if slc, ok := fieldType.(*types.Slice); !ok {
714+
_exit("hashSliceHandleOptimized: expected fieldType to be Slice")
715+
} else {
716+
if named, ok := slc.Elem().(*types.Named); !ok {
717+
_exit("hashSliceHandleOptimized: expected fieldType to be Slice Named")
718+
} else {
719+
typ = addToImports(named)
720+
}
721+
}
722+
723+
// size
724+
addIntSize(b1)
725+
fmt.Fprintf(b1, " gidx = (32 + 1) * len(obj.%s)\n", fieldName)
726+
fmt.Fprintf(b1, " size += rlp.ListPrefixLen(gidx) + gidx\n")
727+
728+
// encode
729+
addIntEncode(b2)
730+
fmt.Fprintf(b2, " gidx = (32 + 1) * len(obj.%s)\n", fieldName)
731+
fmt.Fprintf(b2, " if err := rlp.EncodeStructSizePrefix(gidx, w, b[:]); err != nil {\n")
732+
fmt.Fprintf(b2, " return err\n")
733+
fmt.Fprintf(b2, " }\n")
734+
fmt.Fprintf(b2, " for i := 0; i < len(obj.%s); i++ {\n", fieldName)
735+
fmt.Fprintf(b2, " if err := rlp.EncodeString(obj.%s[i][:], w, b[:]); err != nil {\n", fieldName)
736+
fmt.Fprintf(b2, " return err\n")
737+
fmt.Fprintf(b2, " }\n")
738+
fmt.Fprintf(b2, " }\n")
739+
740+
// decode - with pre-allocation optimization and fast-path for common cases
741+
// Calculate expected list length and apply hard limit of 128 to prevent DoS
742+
// Only call s.List() ONCE to get size, calculate listLen once
743+
// No buffer needed since both paths use direct ReadBytes
744+
fmt.Fprintf(b3, " l, err := s.List()\n")
745+
fmt.Fprintf(b3, " if err != nil {\n")
746+
fmt.Fprintf(b3, " return fmt.Errorf(\"error decoding field %s - expected list start, err: %%w\", err)\n", fieldName)
747+
fmt.Fprintf(b3, " }\n")
748+
fmt.Fprintf(b3, " var listLen int\n")
749+
fmt.Fprintf(b3, " if l > 0 {\n")
750+
fmt.Fprintf(b3, " listLen = int(l / (1 + 32)) // Each hash: 1-byte RLP prefix + 32-byte hash\n")
751+
fmt.Fprintf(b3, " preAlloc := min(128, listLen) // Hard limit against DoS\n")
752+
fmt.Fprintf(b3, " obj.%s = make([]%s, 0, preAlloc)\n", fieldName, typ)
753+
fmt.Fprintf(b3, " } else {\n")
754+
fmt.Fprintf(b3, " obj.%s = []%s{}\n", fieldName, typ)
755+
fmt.Fprintf(b3, " }\n")
756+
// Fast-path: Read directly into pre-allocated slice (zero-alloc, zero-copy)
757+
// Slow-path: Still use direct ReadBytes but allocate full size needed
758+
fmt.Fprintf(b3, " if listLen <= 128 {\n")
759+
fmt.Fprintf(b3, " // Fast-path: within pre-alloc limit, use pre-allocated buffer\n")
760+
fmt.Fprintf(b3, " obj.%s = obj.%s[:listLen]\n", fieldName, fieldName)
761+
fmt.Fprintf(b3, " for i := 0; i < listLen; i++ {\n")
762+
fmt.Fprintf(b3, " if err = s.ReadBytes(obj.%s[i][:]); err != nil {\n", fieldName)
763+
fmt.Fprintf(b3, " return err\n")
764+
fmt.Fprintf(b3, " }\n")
765+
fmt.Fprintf(b3, " }\n")
766+
fmt.Fprintf(b3, " } else if listLen > 128 {\n")
767+
fmt.Fprintf(b3, " // Slow-path: exceeded pre-alloc limit, allocate exact size and use direct ReadBytes\n")
768+
fmt.Fprintf(b3, " obj.%s = make([]%s, listLen)\n", fieldName, typ)
769+
fmt.Fprintf(b3, " for i := 0; i < listLen; i++ {\n")
770+
fmt.Fprintf(b3, " if err = s.ReadBytes(obj.%s[i][:]); err != nil {\n", fieldName)
771+
fmt.Fprintf(b3, " return err\n")
772+
fmt.Fprintf(b3, " }\n")
773+
fmt.Fprintf(b3, " }\n")
774+
fmt.Fprintf(b3, " }\n")
775+
776+
endListDecode(b3, fieldName)
777+
}
778+
664779
func hashPtrSliceHandle(b1, b2, b3 *bytes.Buffer, fieldType types.Type, fieldName string) {
665780
_shortArrayPtrSliceHandle(b1, b2, b3, fieldType, fieldName, 32)
666781
}

cmd/rlpgen/matcher.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type handle func(b1, b2, b3 *bytes.Buffer, fieldType types.Type, fieldName strin
2929
// all possible types that this generator can handle for the time being.
3030
// to add a new type add a string representation of type here and write the handle function for it in the `handlers.go`
3131
var handlers = map[string]handle{
32+
"bool": boolHandle,
33+
"*bool": boolPtrHandle,
3234
"uint64": uintHandle,
3335
"*uint64": uintPtrHandle,
3436
"big.Int": bigIntHandle,
@@ -50,7 +52,7 @@ var handlers = map[string]handle{
5052
"[]*types.BlockNonce": blockNoncePtrSliceHandle,
5153
"[]common.Address": addressSliceHandle,
5254
"[]*common.Address": addressPtrSliceHandle,
53-
"[]common.Hash": hashSliceHandle,
55+
"[]common.Hash": hashSliceHandleOptimized,
5456
"[]*common.Hash": hashPtrSliceHandle,
5557
"[n]byte": byteArrayHandle,
5658
"*[n]byte": byteArrayPtrHandle,
@@ -76,9 +78,13 @@ func matchTypeToString(fieldType types.Type, in string) string {
7678
// matches string representation of a type to a corresponding function
7779
func matchStrTypeToFunc(strType string) handle {
7880
switch strType {
79-
case "int16", "int32", "int", "int64", "uint16", "uint32", "uint", "uint64":
81+
case "bool":
82+
return handlers["bool"]
83+
case "*bool":
84+
return handlers["*bool"]
85+
case "int8", "int16", "int32", "int", "int64", "uint8", "uint16", "uint32", "uint", "uint64":
8086
return handlers["uint64"]
81-
case "*int16", "*int32", "*int", "*int64", "*uint16", "*uint32", "*uint", "*uint64":
87+
case "*int8", "*int16", "*int32", "*int", "*int64", "*uint8", "*uint16", "*uint32", "*uint", "*uint64":
8288
return handlers["*uint64"]
8389
default:
8490
if fn, ok := handlers[strType]; ok {

cmd/rpcdaemon/rpcservices/eth_backend.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,22 @@ func (back *RemoteBackend) RemovePeer(ctx context.Context, request *remoteproto.
415415
return result, nil
416416
}
417417

418+
func (back *RemoteBackend) AddTrustedPeer(ctx context.Context, request *remoteproto.AddPeerRequest) (*remoteproto.AddPeerReply, error) {
419+
result, err := back.remoteEthBackend.AddTrustedPeer(ctx, request)
420+
if err != nil {
421+
return nil, fmt.Errorf("ETHBACKENDClient.AddTrustedPeer() error: %w", err)
422+
}
423+
return result, nil
424+
}
425+
426+
func (back *RemoteBackend) RemoveTrustedPeer(ctx context.Context, request *remoteproto.RemovePeerRequest) (*remoteproto.RemovePeerReply, error) {
427+
result, err := back.remoteEthBackend.RemoveTrustedPeer(ctx, request)
428+
if err != nil {
429+
return nil, fmt.Errorf("ETHBACKENDClient.RemoveTrustedPeer() error: %w", err)
430+
}
431+
return result, nil
432+
}
433+
418434
func (back *RemoteBackend) Peers(ctx context.Context) ([]*p2p.PeerInfo, error) {
419435
rpcPeers, err := back.remoteEthBackend.Peers(ctx, &emptypb.Empty{})
420436
if err != nil {

db/state/aggregator_debug.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,14 @@ func (ac *aggDirtyFilesRoTx) FilesWithMissedAccessors() (mf *MissedAccessorAggFi
9494
domain: make(map[kv.Domain]*MissedAccessorDomainFiles),
9595
ii: make(map[kv.InvertedIdx]*MissedAccessorIIFiles),
9696
}
97-
97+
domainDL := readDirNames(ac.agg.dirs.SnapDomain)
98+
accessorDL := readDirNames(ac.agg.dirs.SnapAccessors)
9899
for _, d := range ac.domain {
99-
mf.domain[d.d.Name] = d.FilesWithMissedAccessors()
100+
mf.domain[d.d.Name] = d.filesWithMissedAccessors(domainDL, accessorDL)
100101
}
101-
102102
for _, ii := range ac.ii {
103-
mf.ii[ii.ii.Name] = ii.FilesWithMissedAccessors()
103+
mf.ii[ii.ii.Name] = ii.filesWithMissedAccessors(accessorDL)
104104
}
105-
106105
return
107106
}
108107

@@ -140,13 +139,13 @@ func (d *Domain) DebugBeginDirtyFilesRo() *domainDirtyFilesRoTx {
140139
}
141140
}
142141

143-
func (d *domainDirtyFilesRoTx) FilesWithMissedAccessors() (mf *MissedAccessorDomainFiles) {
142+
func (d *domainDirtyFilesRoTx) filesWithMissedAccessors(domainDL, accessorDL dirListing) *MissedAccessorDomainFiles {
144143
return &MissedAccessorDomainFiles{
145144
files: map[statecfg.Accessors][]*FilesItem{
146-
statecfg.AccessorBTree: d.d.missedBtreeAccessors(d.files),
147-
statecfg.AccessorHashMap: d.d.missedMapAccessors(d.files),
145+
statecfg.AccessorBTree: d.d.missedBtreeAccessors(d.files, domainDL),
146+
statecfg.AccessorHashMap: d.d.missedMapAccessors(d.files, domainDL),
148147
},
149-
history: d.history.FilesWithMissedAccessors(),
148+
history: d.history.filesWithMissedAccessors(accessorDL),
150149
}
151150
}
152151

@@ -178,11 +177,11 @@ func (h *History) DebugBeginDirtyFilesRo() *historyDirtyFilesRoTx {
178177
}
179178
}
180179

181-
func (f *historyDirtyFilesRoTx) FilesWithMissedAccessors() (mf *MissedAccessorHistoryFiles) {
180+
func (f *historyDirtyFilesRoTx) filesWithMissedAccessors(dl dirListing) *MissedAccessorHistoryFiles {
182181
return &MissedAccessorHistoryFiles{
183-
ii: f.ii.FilesWithMissedAccessors(),
182+
ii: f.ii.filesWithMissedAccessors(dl),
184183
files: map[statecfg.Accessors][]*FilesItem{
185-
statecfg.AccessorHashMap: f.h.missedMapAccessors(f.files),
184+
statecfg.AccessorHashMap: f.h.missedMapAccessors(f.files, dl),
186185
},
187186
}
188187
}
@@ -214,10 +213,10 @@ func (ii *InvertedIndex) DebugBeginDirtyFilesRo() *iiDirtyFilesRoTx {
214213
}
215214
}
216215

217-
func (f *iiDirtyFilesRoTx) FilesWithMissedAccessors() (mf *MissedAccessorIIFiles) {
216+
func (f *iiDirtyFilesRoTx) filesWithMissedAccessors(dl dirListing) (mf *MissedAccessorIIFiles) {
218217
return &MissedAccessorIIFiles{
219218
files: map[statecfg.Accessors][]*FilesItem{
220-
statecfg.AccessorHashMap: f.ii.missedMapAccessors(f.files),
219+
statecfg.AccessorHashMap: f.ii.missedMapAccessors(f.files, dl),
221220
},
222221
}
223222
}

db/state/dirty_files.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package state
1919
import (
2020
"errors"
2121
"fmt"
22+
"os"
2223
"path/filepath"
2324
"regexp"
2425
"strconv"
@@ -773,6 +774,25 @@ func (files visibleFiles) VisibleFiles() []VisibleFile {
773774
return res
774775
}
775776

777+
// dirListing holds a directory path and its pre-read entry names.
778+
type dirListing struct {
779+
dir string
780+
names []string
781+
}
782+
783+
// readDirNames reads a directory once and returns a dirListing for reuse.
784+
func readDirNames(dir string) dirListing {
785+
entries, err := os.ReadDir(dir)
786+
if err != nil {
787+
panic(err)
788+
}
789+
names := make([]string, len(entries))
790+
for i, e := range entries {
791+
names[i] = e.Name()
792+
}
793+
return dirListing{dir: dir, names: names}
794+
}
795+
776796
// fileItemsWithMissedAccessors returns list of files with missed accessors
777797
// here "accessors" are generated dynamically by `accessorsFor`
778798
func fileItemsWithMissedAccessors(dirtyFiles []*FilesItem, aggregationStep uint64, accessorsFor func(fromStep, toStep kv.Step) []string) (l []*FilesItem) {

0 commit comments

Comments
 (0)