Skip to content

Commit 2f279ce

Browse files
committed
feat: restructure validation logic
1 parent 8ebc3fc commit 2f279ce

File tree

7 files changed

+549
-279
lines changed

7 files changed

+549
-279
lines changed

observer/observer.py

Lines changed: 11 additions & 279 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,11 @@
66
from eth_keys.datatypes import Signature as EthSignature
77
from py_flare_common.fsp.epoch.epoch import RewardEpoch
88
from py_flare_common.fsp.messaging import (
9-
parse_generic_tx,
109
parse_submit1_tx,
1110
parse_submit2_tx,
1211
parse_submit_signature_tx,
1312
)
14-
from py_flare_common.fsp.messaging.byte_parser import ByteParser
15-
from py_flare_common.fsp.messaging.types import ParsedPayload
1613
from py_flare_common.fsp.messaging.types import Signature as SSignature
17-
from py_flare_common.ftso.commit import commit_hash
1814
from web3 import AsyncWeb3
1915
from web3._utils.events import get_event_data
2016
from web3.middleware import ExtraDataToPOAMiddleware
@@ -23,7 +19,6 @@
2319
Configuration,
2420
)
2521
from observer.reward_epoch_manager import (
26-
Entity,
2722
SigningPolicy,
2823
)
2924
from observer.types import (
@@ -36,11 +31,11 @@
3631
VoterRegistrationInfo,
3732
VoterRemoved,
3833
)
34+
from observer.validation.validation import validate_round
3935

4036
from .message import Message, MessageLevel
4137
from .notification import notify_discord, notify_generic, notify_slack, notify_telegram
4238
from .voting_round import (
43-
VotingRound,
4439
VotingRoundManager,
4540
WTxData,
4641
)
@@ -182,280 +177,22 @@ async def get_signing_policy_events(
182177
return builder.build()
183178

184179

185-
def log_issue(config: Configuration, issue: Message):
186-
LOGGER.log(issue.level.value, issue.message)
180+
def log_message(config: Configuration, message: Message):
181+
LOGGER.log(message.level.value, message.message)
187182

188183
n = config.notification
189184

190185
if n.discord is not None:
191-
notify_discord(n.discord, issue.level.name + " " + issue.message)
186+
notify_discord(n.discord, message.level.name + " " + message.message)
192187

193188
if n.slack is not None:
194-
notify_slack(n.slack, issue.level.name + " " + issue.message)
189+
notify_slack(n.slack, message.level.name + " " + message.message)
195190

196191
if n.telegram is not None:
197-
notify_telegram(n.telegram, issue.level.name + " " + issue.message)
192+
notify_telegram(n.telegram, message.level.name + " " + message.message)
198193

199194
if n.generic is not None:
200-
notify_generic(n.generic, issue)
201-
202-
203-
def extract[T](
204-
payloads: list[tuple[ParsedPayload[T], WTxData]],
205-
round: int,
206-
time_range: range,
207-
) -> tuple[ParsedPayload[T], WTxData] | None:
208-
if not payloads:
209-
return
210-
211-
latest: tuple[ParsedPayload[T], WTxData] | None = None
212-
213-
for pl, wtx in payloads:
214-
if pl.voting_round_id != round:
215-
continue
216-
if not (time_range.start <= wtx.timestamp < time_range.stop):
217-
continue
218-
219-
if latest is None or wtx.timestamp > latest[1].timestamp:
220-
latest = (pl, wtx)
221-
222-
return latest
223-
224-
225-
def validate_ftso(round: VotingRound, entity: Entity, config: Configuration):
226-
mb = Message.builder().add(
227-
network=config.chain_id,
228-
round=round.voting_epoch,
229-
protocol=100,
230-
)
231-
232-
epoch = round.voting_epoch
233-
ftso = round.ftso
234-
finalization = ftso.finalization
235-
236-
_submit1 = ftso.submit_1.by_identity[entity.identity_address]
237-
submit_1 = _submit1.extract_latest(range(epoch.start_s, epoch.end_s))
238-
239-
_submit2 = ftso.submit_2.by_identity[entity.identity_address]
240-
submit_2 = _submit2.extract_latest(
241-
range(epoch.next.start_s, epoch.next.reveal_deadline())
242-
)
243-
244-
sig_grace = max(
245-
epoch.next.start_s + 55 + 1, (finalization and finalization.timestamp + 1) or 0
246-
)
247-
_submit_sig = ftso.submit_signatures.by_identity[entity.identity_address]
248-
submit_sig = _submit_sig.extract_latest(
249-
range(epoch.next.reveal_deadline(), sig_grace)
250-
)
251-
252-
# TODO:(matej) check for transactions that happened too late (or too early)
253-
254-
issues = []
255-
256-
s1 = submit_1 is not None
257-
s2 = submit_2 is not None
258-
ss = submit_sig is not None
259-
260-
if not s1:
261-
issues.append(mb.build(MessageLevel.INFO, "no submit1 transaction"))
262-
263-
if s1 and not s2:
264-
issues.append(
265-
mb.build(
266-
MessageLevel.CRITICAL, "no submit2 transaction, causing reveal offence"
267-
)
268-
)
269-
270-
if s2:
271-
indices = [
272-
str(i)
273-
for i, v in enumerate(submit_2.parsed_payload.payload.values)
274-
if v is None
275-
]
276-
277-
if indices:
278-
issues.append(
279-
mb.build(
280-
MessageLevel.WARNING,
281-
f"submit 2 had 'None' on indices {', '.join(indices)}",
282-
)
283-
)
284-
285-
if s1 and s2:
286-
# TODO:(matej) should just build back from parsed message
287-
bp = ByteParser(parse_generic_tx(submit_2.wtx_data.input).ftso.payload)
288-
rnd = bp.uint256()
289-
feed_v = bp.drain()
290-
291-
hashed = commit_hash(entity.submit_address, epoch.id, rnd, feed_v)
292-
293-
if submit_1.parsed_payload.payload.commit_hash.hex() != hashed:
294-
issues.append(
295-
mb.build(
296-
MessageLevel.CRITICAL,
297-
"commit hash and reveal didn't match, causing reveal offence",
298-
),
299-
)
300-
301-
if not ss:
302-
issues.append(
303-
mb.build(MessageLevel.ERROR, "no submit signatures transaction"),
304-
)
305-
306-
if finalization and ss:
307-
s = Signature.from_vrs(submit_sig.parsed_payload.payload.signature)
308-
addr = s.recover_public_key_from_msg_hash(
309-
finalization.to_message()
310-
).to_checksum_address()
311-
312-
if addr != entity.signing_policy_address:
313-
issues.append(
314-
mb.build(
315-
MessageLevel.ERROR,
316-
"submit signatures signature doesn't match finalization",
317-
),
318-
)
319-
320-
return issues
321-
322-
323-
def validate_fdc(round: VotingRound, entity: Entity, config: Configuration):
324-
mb = Message.builder().add(
325-
network=config.chain_id,
326-
round=round.voting_epoch,
327-
protocol=200,
328-
)
329-
330-
epoch = round.voting_epoch
331-
fdc = round.fdc
332-
finalization = fdc.finalization
333-
334-
_submit1 = fdc.submit_1.by_identity[entity.identity_address]
335-
submit_1 = _submit1.extract_latest(range(epoch.start_s, epoch.end_s))
336-
337-
_submit2 = fdc.submit_2.by_identity[entity.identity_address]
338-
submit_2 = _submit2.extract_latest(
339-
range(epoch.next.start_s, epoch.next.reveal_deadline())
340-
)
341-
342-
sig_grace = max(
343-
epoch.next.start_s + 55 + 1, (finalization and finalization.timestamp + 1) or 0
344-
)
345-
_submit_sig = fdc.submit_signatures.by_identity[entity.identity_address]
346-
submit_sig = _submit_sig.extract_latest(
347-
range(epoch.next.reveal_deadline(), sig_grace)
348-
)
349-
submit_sig_deadline = _submit_sig.extract_latest(
350-
range(epoch.next.reveal_deadline(), epoch.next.end_s)
351-
)
352-
353-
# TODO:(matej) move this to py-flare-common
354-
bp = ByteParser(
355-
sorted(fdc.consensus_bitvote.items(), key=lambda x: x[1], reverse=True)[0][0]
356-
)
357-
n_requests = bp.uint16()
358-
votes = bp.drain()
359-
consensus_bitvote = [False for _ in range(n_requests)]
360-
for j, byte in enumerate(reversed(votes)):
361-
for shift in range(8):
362-
i = n_requests - 1 - j * 8 - shift
363-
if i < 0 and (byte >> shift) & 1 == 1:
364-
raise ValueError("Invalid payload length.")
365-
elif i >= 0:
366-
consensus_bitvote[i] = (byte >> shift) & 1 == 1
367-
368-
# TODO:(matej) check for transactions that happened too late (or too early)
369-
370-
issues = []
371-
372-
s1 = submit_1 is not None
373-
s2 = submit_2 is not None
374-
ss = submit_sig is not None
375-
ssd = submit_sig_deadline is not None
376-
377-
sorted_requests = fdc.requests.sorted()
378-
assert len(sorted_requests) == n_requests
379-
380-
if not s1:
381-
# NOTE:(matej) this is expected behaviour in fdc
382-
pass
383-
384-
if not s2:
385-
issues.append(mb.build(MessageLevel.ERROR, "no submit2 transaction"))
386-
387-
expected_signatures = True
388-
# TODO:(matej) unnest some
389-
if s2:
390-
if submit_2.parsed_payload.payload.number_of_requests != len(sorted_requests):
391-
issues.append(
392-
mb.build(
393-
MessageLevel.ERROR,
394-
"submit 2 length didn't match number of requests in round",
395-
)
396-
)
397-
expected_signatures = False
398-
else:
399-
for i, (r, bit, cbit) in enumerate(
400-
zip(
401-
sorted_requests,
402-
submit_2.parsed_payload.payload.bit_vector,
403-
consensus_bitvote,
404-
)
405-
):
406-
idx = n_requests - 1 - i
407-
at = r.attestation_type
408-
si = r.source_id
409-
410-
if cbit and not bit:
411-
issues.append(
412-
mb.build(
413-
MessageLevel.ERROR,
414-
"submit2 didn't confirm request that was part of consensus "
415-
f"{at.representation}/{si.representation} at index {idx}",
416-
)
417-
)
418-
expected_signatures = False
419-
420-
if s2 and expected_signatures and not ssd:
421-
issues.append(
422-
mb.build(
423-
MessageLevel.CRITICAL,
424-
"no submit signatures transaction, causing reveal offence",
425-
)
426-
)
427-
428-
if s2 and ssd and not ss:
429-
issues.append(
430-
mb.build(
431-
MessageLevel.ERROR,
432-
(
433-
"no submit signatures transaction during grace period, "
434-
"causing loss of rewards"
435-
),
436-
)
437-
)
438-
439-
if not s2 and not ss:
440-
issues.append(
441-
mb.build(MessageLevel.ERROR, "no submit signatures transaction"),
442-
)
443-
444-
if finalization and ss:
445-
s = Signature.from_vrs(submit_sig.parsed_payload.payload.signature)
446-
addr = s.recover_public_key_from_msg_hash(
447-
finalization.to_message()
448-
).to_checksum_address()
449-
450-
if addr != entity.signing_policy_address:
451-
issues.append(
452-
mb.build(
453-
MessageLevel.ERROR,
454-
"submit signatures signature doesn't match finalization",
455-
)
456-
)
457-
458-
return issues
195+
notify_generic(n.generic, message)
459196

460197

461198
async def observer_loop(config: Configuration) -> None:
@@ -516,7 +253,7 @@ async def observer_loop(config: Configuration) -> None:
516253
# set up target address from config
517254
tia = w.to_checksum_address(config.identity_address)
518255
# TODO:(matej) log version and initial voting round, maybe signing policy info
519-
log_issue(
256+
log_message(
520257
config,
521258
Message.builder()
522259
.add(network=config.chain_id)
@@ -723,13 +460,8 @@ async def observer_loop(config: Configuration) -> None:
723460

724461
rounds = vrm.finalize(block_data)
725462
for r in rounds:
726-
for i in validate_ftso(
727-
r, signing_policy.entity_mapper.by_identity_address[tia], config
728-
):
729-
log_issue(config, i)
730-
for i in validate_fdc(
731-
r, signing_policy.entity_mapper.by_identity_address[tia], config
732-
):
733-
log_issue(config, i)
463+
entity = signing_policy.entity_mapper.by_identity_address[tia]
464+
for message in validate_round(r, entity, config):
465+
log_message(config, message)
734466

735467
block_number = latest_block

observer/validation/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)