-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathxrp.py
More file actions
67 lines (56 loc) · 1.97 KB
/
xrp.py
File metadata and controls
67 lines (56 loc) · 1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import json
import logging
import random
import requests
from utils import format_as_padded_hex
def pick_random_payment_tx(rpc_url, txs) -> dict:
tx_type = ""
while tx_type != "Payment":
tx_hash = random.choice(txs)
logging.info(f"Checking transaction {tx_hash}")
payload = {
"method": "tx",
"params": [
{
"transaction": tx_hash,
"binary": False,
"api_version": 2,
}
],
}
tx = requests.post(rpc_url, json=payload).json()
tx_type = tx.get("result").get("tx_json").get("TransactionType")
if tx_type == "Payment":
return tx_hash
return
def verifier_request(verifier_url, api_key, tx_id, attestation_type="Payment"):
url = f"{verifier_url}/verifier/xrp/{attestation_type}/prepareRequest"
headers = {"x-api-key": api_key, "Content-Type": "application/json"}
payload = {
"attestationType": format_as_padded_hex(attestation_type),
"sourceId": format_as_padded_hex("XRP"),
"requestBody": {"transactionId": tx_id, "inUtxo": "0", "utxo": "0"},
}
response = requests.post(url, headers=headers, json=payload)
return json.loads(response.text)
def get_abi_bytes(rpc, verifier):
payload = {
"method": "ledger",
"params": [{"ledger_index": "validated", "transactions": True}],
}
txs = (
requests.post(rpc["url"], json=payload)
.json()
.get("result", {})
.get("ledger", {})
.get("transactions", [])
)
tx_is_valid = False
while not tx_is_valid:
tx_hash = pick_random_payment_tx(rpc["url"], txs)
verifier_res = verifier_request(verifier["url"], verifier["api_key"], tx_hash)
if verifier_res["status"] != "VALID":
logging.error("Verifier request is not valid")
else:
tx_is_valid = True
return verifier_res["abiEncodedRequest"]