Skip to content

Commit 55fbaa3

Browse files
committed
fix: fast updates sampling, crashing, registration spam
1 parent 8143f33 commit 55fbaa3

File tree

4 files changed

+96
-160
lines changed

4 files changed

+96
-160
lines changed

observer/fast_updates_manager.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from collections import deque
21
from collections.abc import Sequence
32

43
from attrs import define, field, frozen
@@ -20,7 +19,8 @@ class FastUpdate:
2019

2120
@define
2221
class FastUpdatesManager:
23-
fast_updates: deque[FastUpdate] = field(factory=deque)
22+
last_update_block: int
23+
last_update: FastUpdate
2424
address_list: set[ChecksumAddress] = field(factory=set)
2525

2626
async def check_addresses(
@@ -36,30 +36,24 @@ def check_update_length(
3636
mb = Message.builder()
3737
messages = []
3838
level = MessageLevel.WARNING
39-
fus = list(
40-
filter(
41-
lambda x: x.reward_epoch_id >= fast_update_re
42-
and x.address in self.address_list,
43-
self.fast_updates,
44-
)
45-
)
46-
if len(fus) > 0:
47-
fu = fus[-1]
48-
else:
39+
40+
if self.last_update.reward_epoch_id != fast_update_re or not nr_of_feeds:
4941
return messages
50-
rounded_nr_feeds = nr_of_feeds
51-
# update arrays are whole bytes, so we need to round the number of feeds
52-
# up to the nearest multiple of 8 if it is not one already
53-
if nr_of_feeds % 8 != 0:
54-
rounded_nr_feeds = nr_of_feeds + (8 - nr_of_feeds % 8)
55-
if rounded_nr_feeds > 0 and len(fu.update_array) != rounded_nr_feeds:
42+
43+
# round up to the next multiple of 8 as update array is encoded in integer
44+
# number of bytes
45+
expected = nr_of_feeds + 7 - (nr_of_feeds - 1) % 8
46+
submitted = len(self.last_update.update_array)
47+
48+
if expected != submitted:
5649
messages.append(
5750
mb.build(
5851
level,
5952
(
60-
"Incorrect length of last update array, should be"
61-
f" {rounded_nr_feeds} but got {len(fu.update_array)}"
53+
f"incorrect number of sent feeds, should be {expected} but got "
54+
f"{submitted}"
6255
),
6356
)
6457
)
58+
6559
return messages

observer/observer.py

Lines changed: 27 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,10 @@ async def observer_loop(config: Configuration) -> None:
396396
contracts = cm.get_contracts_list()
397397
event_signatures = cm.get_events()
398398

399-
fum = FastUpdatesManager()
399+
entity = signing_policy.entity_mapper.by_identity_address[tia]
400+
fum = FastUpdatesManager(
401+
block_number, FastUpdate(reward_epoch.id, entity.signing_policy_address, [])
402+
)
400403
spm = SigningPolicyManager(signing_policy, signing_policy)
401404
rm = RewardManager()
402405

@@ -418,6 +421,7 @@ async def observer_loop(config: Configuration) -> None:
418421
)
419422
last_minimal_conditions_check = int(time.time())
420423
last_ping = time.time()
424+
last_registration_check = time.time()
421425

422426
node_connections = defaultdict(deque)
423427
uptime_validations = 0
@@ -431,10 +435,6 @@ async def observer_loop(config: Configuration) -> None:
431435
voter_registration_started_ts: int = 0
432436
registered: bool = False
433437

434-
preregistration_started: bool = False
435-
preregistration_started_ts: int = 0
436-
preregistered: bool = False
437-
438438
nr_of_feeds: int = 0
439439
fast_update_re: int = 0
440440

@@ -528,6 +528,8 @@ async def observer_loop(config: Configuration) -> None:
528528
case "VoterRegistered":
529529
e = VoterRegistered.from_dict(data["args"])
530530
spb.add(e)
531+
if registered:
532+
continue
531533
entity = signing_policy.entity_mapper.by_identity_address[
532534
tia
533535
]
@@ -543,73 +545,51 @@ async def observer_loop(config: Configuration) -> None:
543545
e = VoterRegistrationInfo.from_dict(data["args"])
544546
spb.add(e)
545547
case "VotePowerBlockSelected":
546-
preregistration_started = False
547-
preregistered = False
548548
e = VotePowerBlockSelected.from_dict(data["args"])
549549
spb.add(e)
550+
if registered:
551+
continue
550552
voter_registration_started = True
551553
voter_registration_started_ts = int(time.time())
552554
case "RandomAcquisitionStarted":
553555
e = RandomAcquisitionStarted.from_dict(data["args"])
554556
spb.add(e)
555-
preregistration_started = True
556-
preregistration_started_ts = int(time.time())
557557
case "FastUpdateFeedsSubmitted":
558-
e = FastUpdateFeedsSubmitted.from_dict(
559-
data["args"],
560-
data["address"],
561-
data["transactionHash"],
562-
)
558+
e = FastUpdateFeedsSubmitted.from_dict(data)
563559
tx = await w.eth.get_transaction(e.transaction_hash)
564560
spa, address, update_array = calculate_update_from_tx(
565561
config, w, tx
566562
)
567563
entity = signing_policy.entity_mapper.by_identity_address[
568564
tia
569565
]
570-
fum.fast_updates.append(
571-
FastUpdate(
566+
if un_prefix_0x(entity.signing_policy_address) == spa:
567+
fum.last_update = FastUpdate(
572568
signing_policy.reward_epoch.id,
573569
address,
574570
update_array,
575571
)
576-
)
577-
# with expected sample size 1 and average block time of
578-
# 1s, this should be an ok approximation
579-
if (
580-
len(fum.fast_updates)
581-
> minimal_conditions.time_period.value
582-
):
583-
fum.fast_updates.popleft()
584-
if un_prefix_0x(entity.signing_policy_address) == spa:
572+
fum.last_update_block = int(data["blockNumber"])
585573
# We check update array when we receive a new one
586574
event_messages.extend(
587575
fum.check_update_length(nr_of_feeds, fast_update_re)
588576
)
589577
fum.address_list.add(address)
590578
case "FastUpdateFeeds":
591-
e = FastUpdateFeeds.from_dict(
592-
data["args"],
593-
data["address"],
594-
data["transactionHash"],
595-
)
579+
e = FastUpdateFeeds.from_dict(data)
596580
nr_of_feeds, fast_update_re = (
597581
len(e.feeds),
598582
ref.from_voting_epoch(
599583
vef.make_epoch(e.voting_round_id)
600584
).id,
601585
)
602586
case "VoterPreRegistered":
603-
e = VoterPreRegistered.from_dict(
604-
data["args"],
605-
data["address"],
606-
data["transactionHash"],
607-
)
587+
e = VoterPreRegistered.from_dict(data)
608588
entity = signing_policy.entity_mapper.by_identity_address[
609589
tia
610590
]
611591
if tia == e.voter:
612-
preregistered = True
592+
registered = True
613593

614594
for tx in block_data["transactions"]:
615595
assert not isinstance(tx, bytes)
@@ -692,7 +672,7 @@ async def observer_loop(config: Configuration) -> None:
692672

693673
min_cond_messages.extend(
694674
minimal_conditions.calculate_ftso_block_latency_feeds(
695-
entity, spm, fum
675+
entity, signing_policy, fum.last_update_block, block
696676
)
697677
)
698678
min_cond_messages.extend(
@@ -784,23 +764,16 @@ async def observer_loop(config: Configuration) -> None:
784764
while len(signatures) > minimal_conditions.time_period.value // 90:
785765
signatures.popleft()
786766

787-
# reporting on registration and preregistration
788-
if preregistration_started and not preregistered:
789-
mb = Message.builder()
790-
if int(time.time() - preregistration_started_ts) > 60:
791-
level = MessageLevel.CRITICAL
792-
message = mb.build(
793-
level,
794-
(
795-
"Voter not preregistered after "
796-
f"{int(time.time() - preregistration_started_ts) // 60}"
797-
" minutes"
798-
),
799-
)
800-
messages.append(message)
801-
802-
if voter_registration_started and not registered:
803-
mb = Message.builder()
767+
# reporting on registration and preregistration once per minute
768+
interval = 60
769+
if int(time.time() - voter_registration_started_ts) > 15 * 60:
770+
interval = 10
771+
if (
772+
int(time.time() - last_registration_check) > interval
773+
and voter_registration_started
774+
and not registered
775+
):
776+
mb = Message.builder().add(network=config.chain_id)
804777
if int(time.time() - voter_registration_started_ts) > 60:
805778
level = MessageLevel.CRITICAL
806779
message = mb.build(

observer/types.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,11 @@ class FastUpdateFeedsSubmitted:
191191
signing_policy_address: ChecksumAddress
192192

193193
@classmethod
194-
def from_dict(cls, d: dict[str, Any], address: ChecksumAddress, tx_hash: HexBytes):
194+
def from_dict(cls, data: EventData):
195+
d = data["args"]
196+
address = data["address"]
197+
tx_hash = data["transactionHash"]
198+
195199
return cls(
196200
voting_round_id=int(d["votingRoundId"]),
197201
emitter_address=address,
@@ -209,7 +213,11 @@ class FastUpdateFeeds:
209213
decimals: list[int]
210214

211215
@classmethod
212-
def from_dict(cls, d: dict[str, Any], address: ChecksumAddress, tx_hash: HexBytes):
216+
def from_dict(cls, data: EventData):
217+
d = data["args"]
218+
address = data["address"]
219+
tx_hash = data["transactionHash"]
220+
213221
return cls(
214222
voting_round_id=int(d["votingEpochId"]),
215223
emitter_address=address,
@@ -228,9 +236,14 @@ class VoterPreRegistered:
228236
reward_epoch_id: int
229237

230238
@classmethod
231-
def from_dict(cls, d: dict[str, Any], address: ChecksumAddress, tx_hash: HexBytes):
239+
def from_dict(cls, data: EventData):
240+
d = data["args"]
241+
block = data["blockNumber"]
242+
address = data["address"]
243+
tx_hash = data["transactionHash"]
244+
232245
return cls(
233-
block=int(d["block"]),
246+
block=int(block),
234247
emitter_address=address,
235248
transaction_hash=tx_hash,
236249
voter=d["voter"],

0 commit comments

Comments
 (0)