Skip to content

Commit 0f1de29

Browse files
authored
node: introduce settings to configure the number of events per miniblock (#4553)
## Changes ### Configuration (`crypto/config.go`) - Added two new on-chain configuration keys: - `stream.maxEventsPerMiniblock` - Maximum number of events per miniblock - `stream.maxTotalEventsSizePerMiniblock` - Maximum combined size (in bytes) of all protobuf-encoded events - Defined protocol-level defaults that also serve as hard limits: - `MaxEventsPerMiniblockDefault = 15,000` events - `StreamMaxTotalEventsSizePerMiniblockDefault = 10 MB` - Added corresponding fields to `OnChainSettings` struct ### Event Selection (`minipool.go`) - Modified `eventHashes()` and `eventHashesAsBytes()` to enforce configured limits - Implemented dual-limit logic: - Stops at max event count, OR - Stops when cumulative event size exceeds max size - Events are selected in order until either limit is reached ### Proposal & Validation (`stream_view.go`, `stream.go`) - Added `MiniblockEventLimits()` helper function that: - Reads limits from on-chain settings - Falls back to defaults if not configured - Enforces protocol max limits even if on-chain values are higher - Enhanced `ProposeNextMiniblock()` to reject proposals exceeding limits - Added validation in `tryApplyCandidate()` to reject oversized candidates before applying ### Testing (`replication_test.go`) - Added `TestReplicatedMbProductionWithTooManyEvents` - validates max event count enforcement - Added `TestReplicatedMbProductionWithTooBigEvents` - validates max total size enforcement - Both tests verify that miniblocks are properly split when limits are exceeded ## Behavior **Before**: Miniblocks could contain unlimited events from the minipool, potentially creating very large blocks that impact network performance and blockchain transaction costs. **After**: Miniblocks are capped at: - Default: 15,000 events OR 10 MB total size (whichever is reached first) - Configurable via on-chain settings within protocol max limits - Events exceeding limits remain in the minipool for subsequent miniblocks ## Motivation - **Network efficiency**: Prevents oversized miniblocks that strain replication and storage - **Cost control**: Limits on-chain registration transaction sizes - **Configurability**: Operators can tune limits via on-chain settings without code changes - **Safety**: Hard-coded maximum limits prevent misconfiguration ## Testing - ✅ New tests verify both limit types are enforced correctly - ✅ Existing miniblock production tests still pass - ✅ Tests confirm events properly overflow into subsequent miniblocks
1 parent 47d620e commit 0f1de29

File tree

8 files changed

+383
-53
lines changed

8 files changed

+383
-53
lines changed

core/node/crypto/config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,25 @@ const (
7070
StreamUserInboxStreamHistoryMiniblocksConfigKey = "stream.historyminiblocks.a1"
7171
StreamUserSettingsStreamHistoryMiniblocksConfigKey = "stream.historyminiblocks.a5"
7272

73+
// StreamMaxEventsPerMiniblockKey is the maximum number of events in a miniblock.
74+
StreamMaxEventsPerMiniblockKey = "stream.maxeventsperminiblock"
75+
76+
// StreamMaxTotalEventsSizePerMiniblockKey is the maximum size (in bytes) of all protobuf encoded events
77+
// combined in a miniblock.
78+
StreamMaxTotalEventsSizePerMiniblockKey = "stream.maxtotaleventssizeperminiblock"
79+
7380
// StreamDistributionExtraCandidatesCountCountKey is the key for many extra nodes on top of
7481
// replication factor must be picked as candidates to place a stream on. From these candidates
7582
// the best replication factor nodes are picked.
7683
StreamDistributionExtraCandidatesCountCountKey = "stream.distribution.extracandidatescount"
84+
85+
// MaxEventsPerMiniblockDefault defines the default (also the maximum) number of events in a miniblock
86+
// if not overwritten by StreamMaxEventsPerMiniblockKey.
87+
MaxEventsPerMiniblockDefault = 15_000
88+
89+
// StreamMaxTotalEventsSizePerMiniblockDefault defines the default (also the maximum) size of all protobuf encoded events
90+
// combined in a miniblock if not overwritten by StreamMaxTotalEventsSizePerMiniblockKey.
91+
StreamMaxTotalEventsSizePerMiniblockDefault = 10 * 1024 * 1024
7792
)
7893

7994
var (
@@ -170,6 +185,12 @@ type OnChainSettings struct {
170185
// Number of miniblocks to keep for each type of stream.
171186
// 0 means keep all miniblocks.
172187
StreamHistoryMiniblocks StreamHistoryMiniblocks `mapstructure:",squash"`
188+
189+
// StreamMaxEventsPerMiniblock is the maximum number of events that can be included in a miniblock.
190+
StreamMaxEventsPerMiniblock uint64 `mapstructure:"stream.maxeventsperminiblock"`
191+
// StreamMaxTotalEventsSizePerMiniblock is the maximum size (in bytes) of all protobuf encoded events
192+
// combined in a miniblock.
193+
StreamMaxTotalEventsSizePerMiniblock uint64 `mapstructure:"stream.maxtotaleventssizeperminiblock"`
173194
}
174195

175196
type StreamHistoryMiniblocks struct {

core/node/events/miniblock_job.go

Lines changed: 65 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99

1010
"github.com/ethereum/go-ethereum/common"
1111

12+
"github.com/towns-protocol/towns/core/node/crypto"
13+
1214
"github.com/towns-protocol/towns/core/blockchain"
1315
. "github.com/towns-protocol/towns/core/node/base"
1416
"github.com/towns-protocol/towns/core/node/logging"
@@ -125,8 +127,7 @@ func (j *mbJob) produceCandidate(ctx context.Context, blockNum blockchain.BlockN
125127
return nil
126128
}
127129

128-
err = j.saveCandidate(ctx)
129-
if err != nil {
130+
if err := j.saveCandidate(ctx); err != nil {
130131
return err
131132
}
132133

@@ -158,16 +159,18 @@ func (j *mbJob) makeCandidate(ctx context.Context) error {
158159
}
159160

160161
func (j *mbJob) makeReplicatedProposal(ctx context.Context) (*mbProposal, *StreamView, error) {
162+
// retrieve remote proposals
161163
proposals, view, err := j.processRemoteProposals(ctx)
162164
if err != nil {
163165
return nil, nil, err
164166
}
165167

166-
localProposal := view.proposeNextMiniblock(ctx, j.cache.Params().ChainConfig.Get(), j.forceSnapshot)
168+
// create local proposal
169+
localProposal := view.proposeNextMiniblock(ctx, j.cache.Params().ChainConfig.Get(), j.forceSnapshot, j.replicated)
167170

171+
// Combine proposals into a single proposal that is used to create a miniblock candidate.
168172
proposals = append(proposals, localProposal)
169-
170-
combined, err := j.combineProposals(proposals)
173+
combined, err := j.combineProposals(proposals, j.cache.Params().ChainConfig.Get())
171174
if err != nil {
172175
return nil, nil, err
173176
}
@@ -181,10 +184,10 @@ func (j *mbJob) makeLocalProposal(ctx context.Context) (*mbProposal, *StreamView
181184
return nil, nil, err
182185
}
183186

184-
prop := view.proposeNextMiniblock(ctx, j.cache.Params().ChainConfig.Get(), j.forceSnapshot)
187+
prop := view.proposeNextMiniblock(ctx, j.cache.Params().ChainConfig.Get(), j.forceSnapshot, j.replicated)
185188

186189
// Is there anything to do?
187-
if len(prop.eventHashes) == 0 && !prop.shouldSnapshot {
190+
if len(prop.events) == 0 && !prop.shouldSnapshot {
188191
return nil, view, nil
189192
}
190193

@@ -212,6 +215,7 @@ func (j *mbJob) processRemoteProposals(ctx context.Context) ([]*mbProposal, *Str
212215
if err != nil {
213216
return nil, nil, err
214217
}
218+
215219
if view.minipool.generation != request.NewMiniblockNum {
216220
// TODO: REPLICATION: FIX: if any MissingEvents are received, should they still be attempted to be added? I.e. loop below should be still executed?
217221
return nil, nil, RiverError(
@@ -220,25 +224,26 @@ func (j *mbJob) processRemoteProposals(ctx context.Context) ([]*mbProposal, *Str
220224
)
221225
}
222226

223-
// Apply received MissingEvents, even if there are not enough quorum of proposals.
227+
// Apply received MissingEvents, even if there is not enough to reach quorum at the moment.
228+
// This will ensure that events propagate over all replicas.
224229
added := make(map[common.Hash]bool)
225230
converted := make([]*mbProposal, len(proposals))
226231
for i, p := range proposals {
227-
converted[i] = mbProposalFromProto(p.response.Proposal)
228-
229-
if converted[i].newMiniblockNum != view.minipool.generation {
232+
// if the proposal was created on a different minipool generation, ignore it.
233+
if p.response.GetProposal().GetNewMiniblockNum() != view.minipool.generation {
230234
continue
231235
}
232236

237+
// add missing events that replicas provided to the local minipool and keep track of the new stream view
233238
for _, e := range p.response.MissingEvents {
234239
parsed, err := ParseEvent(e)
235240
if err != nil {
236241
logging.FromCtx(ctx).Errorw("mbJob.processRemoteProposals: error parsing event", "error", err)
237242
continue
238243
}
244+
239245
if _, ok := added[parsed.Hash]; !ok {
240246
added[parsed.Hash] = true
241-
242247
if !view.minipool.events.Has(parsed.Hash) {
243248
newView, err := j.stream.addEvent(ctx, parsed, true)
244249
if err == nil {
@@ -255,6 +260,8 @@ func (j *mbJob) processRemoteProposals(ctx context.Context) ([]*mbProposal, *Str
255260
}
256261
}
257262
}
263+
264+
converted[i] = mbProposalFromProto(p.response.Proposal, view)
258265
}
259266

260267
// View might have been updated by adding events, check if stream advanced in the meantime.
@@ -281,7 +288,13 @@ func (j *mbJob) processRemoteProposals(ctx context.Context) ([]*mbProposal, *Str
281288
return nil, nil, RiverError(Err_INTERNAL, "mbJob.processRemoteProposals: no proposals and no errors")
282289
}
283290

284-
func (j *mbJob) combineProposals(proposals []*mbProposal) (*mbProposal, error) {
291+
// combineProposals combines the given proposals into a single proposal that can be
292+
// used to create a miniblock candidate.
293+
//
294+
// The combined proposal will contain all events from the given proposals that appear at least
295+
// quorumNum times. It will also contain the ShouldSnapshot flag if at least quorumNum proposals
296+
// contain ShouldSnapshot events.
297+
func (j *mbJob) combineProposals(proposals []*mbProposal, cfg *crypto.OnChainSettings) (*mbProposal, error) {
285298
// Sanity check: all proposals must have the same miniblock number and prev hash.
286299
for _, p := range proposals {
287300
if p.newMiniblockNum != proposals[0].newMiniblockNum || p.prevMiniblockHash != proposals[0].prevMiniblockHash {
@@ -295,49 +308,71 @@ func (j *mbJob) combineProposals(proposals []*mbProposal) (*mbProposal, error) {
295308
return nil, RiverError(Err_INTERNAL, "mbJob.combineProposals: not enough proposals")
296309
}
297310

298-
// Count ShouldSnapshot.
299-
shouldSnapshotNum := 0
311+
// count on how many replicates votes to create a snapshot in the miniblock candidate.
312+
proposalSnapshotVotesCount := 0
300313
for _, p := range proposals {
301314
if p.shouldSnapshot {
302-
shouldSnapshotNum++
315+
proposalSnapshotVotesCount++
303316
}
304317
}
305-
shouldSnapshot := shouldSnapshotNum >= quorumNum
306318

307-
// Count event hashes.
319+
// Count how often each event appears in the proposals.
320+
// Only events that appear at least quorumNum times are included in the combined proposal.
308321
eventCounts := make(map[common.Hash]int)
309322
for _, p := range proposals {
310-
for _, h := range p.eventHashes {
311-
eventCounts[h]++
323+
for _, ev := range p.events {
324+
eventCounts[ev.Hash]++
312325
}
313326
}
314327

315-
events := make([]common.Hash, 0, len(eventCounts))
316-
317-
// walk over all event hashes again, adding them to the events list if they have quorum.
328+
// walk over all event hashes and add the events to the combined proposal if reached quorum.
318329
// do it this way to preserve order of events as they were received in a single proposal.
319-
// we do not attempt to order events across proposals.
330+
// we do not attempt to order events across proposals. allow up to set limits in the
331+
// combined proposal.
332+
var (
333+
maxEventsPerMiniblock, maxEventCombinedSize = MiniblockEventLimits(cfg)
334+
combinedEvents = make([]mbProposalEvent, 0, len(eventCounts))
335+
combinedTotalEventSize = 0
336+
)
337+
338+
var reachedLimit bool
320339
for _, p := range proposals {
321-
for _, h := range p.eventHashes {
322-
if c, ok := eventCounts[h]; ok {
340+
for _, event := range p.events {
341+
if c, ok := eventCounts[event.Hash]; ok {
342+
// only include events that appear at least quorumNum times
323343
if c >= quorumNum {
324-
events = append(events, h)
344+
// if adding the next event to the combined proposal exceeds limits, stop processing all proposals
345+
if len(combinedEvents) >= maxEventsPerMiniblock ||
346+
combinedTotalEventSize+event.Size > maxEventCombinedSize {
347+
reachedLimit = true
348+
break
349+
}
350+
combinedEvents = append(combinedEvents, event)
351+
// keep track of the total size of all events in the combined proposal
352+
combinedTotalEventSize += event.Size
325353
}
326-
delete(eventCounts, h)
354+
// if the next proposal contains the same event, it won't be processed again.
355+
delete(eventCounts, event.Hash)
327356
}
328357
}
358+
if reachedLimit {
359+
break
360+
}
329361
}
330362

363+
// ShouldSnapshot flag is set if at least quorumNum proposals contain ShouldSnapshot vote.
364+
shouldSnapshot := proposalSnapshotVotesCount >= quorumNum
365+
331366
// Is there anything to do?
332-
if len(events) == 0 && !shouldSnapshot {
367+
if len(combinedEvents) == 0 && !shouldSnapshot {
333368
return nil, nil
334369
}
335370

336371
return &mbProposal{
337372
newMiniblockNum: proposals[0].newMiniblockNum,
338373
prevMiniblockHash: proposals[0].prevMiniblockHash,
339374
shouldSnapshot: shouldSnapshot,
340-
eventHashes: events,
375+
events: combinedEvents,
341376
}, nil
342377
}
343378

core/node/events/minipool.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package events
33
import (
44
"github.com/ethereum/go-ethereum/common"
55

6+
"github.com/towns-protocol/towns/core/node/crypto"
7+
68
. "github.com/towns-protocol/towns/core/node/utils"
79
)
810

@@ -76,14 +78,34 @@ func (m *minipoolInstance) getEnvelopeBytes() ([][]byte, error) {
7678
return bytes, nil
7779
}
7880

79-
func (m *minipoolInstance) eventHashes() []common.Hash {
80-
hashes := make([]common.Hash, m.events.Len())
81-
for i, e := range m.events.Values {
82-
hashes[i] = e.Hash
81+
// proposalEvents returns all events from m that can be included in a proposal.
82+
func (m *minipoolInstance) proposalEvents() []mbProposalEvent {
83+
events := make([]mbProposalEvent, 0, m.events.Len())
84+
for _, e := range m.events.Values {
85+
events = append(events, mbProposalEvent{Hash: e.Hash, Size: len(e.Envelope.Event)})
8386
}
84-
return hashes
87+
return events
88+
}
89+
90+
// proposalEventsWithMiniblockLimits returns events from m that can be included in a proposal
91+
// until the limits as specified in the given cfg are reached.
92+
func (m *minipoolInstance) proposalEventsWithMiniblockLimits(cfg *crypto.OnChainSettings) []mbProposalEvent {
93+
maxEventsPerMiniblock, maxEventCombinedSize := MiniblockEventLimits(cfg)
94+
events := make([]mbProposalEvent, 0, m.events.Len())
95+
totalEventsSize := 0
96+
for _, e := range m.events.Values {
97+
eventSize := len(e.Envelope.Event)
98+
// Check if adding the next event would exceed limits before appending
99+
if len(events) >= maxEventsPerMiniblock || totalEventsSize+eventSize > maxEventCombinedSize {
100+
return events
101+
}
102+
events = append(events, mbProposalEvent{Hash: e.Hash, Size: eventSize})
103+
totalEventsSize += eventSize
104+
}
105+
return events
85106
}
86107

108+
// eventHashesAsBytes returns all event hashes from m.
87109
func (m *minipoolInstance) eventHashesAsBytes() [][]byte {
88110
hashes := make([][]byte, m.events.Len())
89111
for i, e := range m.events.Values {

0 commit comments

Comments
 (0)