|
2 | 2 | import logging |
3 | 3 | import sys |
4 | 4 | from pathlib import Path |
5 | | -from typing import Any |
| 5 | +from typing import Any, Final |
6 | 6 |
|
7 | 7 | import requests |
8 | | -from pycoingecko import CoinGeckoAPI |
| 8 | +from pycoingecko import CoinGeckoAPI # pyright: ignore[reportMissingTypeStubs] |
| 9 | +from tenacity import retry, stop_after_attempt, wait_exponential_jitter |
9 | 10 | from web3 import Web3 |
10 | 11 | from web3.contract import Contract |
11 | 12 |
|
12 | 13 | # Configuration |
13 | | -RPC_URL = "https://flare-api.flare.network/ext/C/rpc" |
14 | | -REGISTRY_ADDRESS = "0xaD67FE66660Fb8dFE9d6b1b4240d8650e30F6019" |
15 | | -EXPLORER_API_URL = "https://flare-explorer.flare.network/api" |
16 | | -ISSUES_FILE = Path("issues.md") |
17 | | -MAX_MARKET_CAP_RANK = 100 |
18 | | -HEADER_TEMPLATE = """--- |
| 14 | +RPC_URL: Final[str] = "https://flare-api.flare.network/ext/C/rpc" |
| 15 | +REGISTRY_ADDRESS: Final[str] = "0xaD67FE66660Fb8dFE9d6b1b4240d8650e30F6019" |
| 16 | +EXPLORER_API_URL: Final[str] = "https://flare-explorer.flare.network/api" |
| 17 | +ISSUES_FILE: Final[Path] = Path("issues.md") |
| 18 | +MAX_MARKET_CAP_RANK: Final[int] = 100 |
| 19 | +HEADER_TEMPLATE: Final[str] = """--- |
19 | 20 | title: "[auto_req]: Potential New Feeds" |
20 | 21 | assignees: dineshpinto |
21 | 22 | labels: "enhancement" |
22 | 23 | --- |
23 | 24 | """ |
| 25 | + |
| 26 | +# Logging |
24 | 27 | logging.basicConfig( |
25 | 28 | encoding="utf-8", |
26 | 29 | level=logging.INFO, |
|
29 | 32 | logger = logging.getLogger(__name__) |
30 | 33 |
|
31 | 34 |
|
32 | | -def get_contract_abi(contract_address: str) -> dict[str, Any]: |
33 | | - """Fetch the ABI for a contract from the Chain Explorer API.""" |
34 | | - params = {"module": "contract", "action": "getabi", "address": contract_address} |
35 | | - headers = {"accept": "application/json"} |
| 35 | +class ExplorerError(RuntimeError): |
| 36 | + """Raised when the chain explorer cannot provide a contract ABI.""" |
| 37 | + |
36 | 38 |
|
| 39 | +@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter(initial=1, max=8)) |
| 40 | +def _get( |
| 41 | + session: requests.Session, **kwargs: dict[str, str] | str |
| 42 | +) -> requests.Response: |
| 43 | + """`requests.get` with automatic retry and logging.""" |
| 44 | + logger.debug("GET %s", kwargs.get("url") or kwargs.get("url")) |
| 45 | + resp = session.get(**kwargs, timeout=10) # pyright: ignore[reportArgumentType] |
| 46 | + resp.raise_for_status() |
| 47 | + return resp |
| 48 | + |
| 49 | + |
| 50 | +def get_contract_abi( |
| 51 | + contract_address: str, session: requests.Session |
| 52 | +) -> list[dict[str, Any]]: |
| 53 | + """Return ABI for `contract_address` from the explorer API.""" |
| 54 | + params = {"module": "contract", "action": "getabi", "address": contract_address} |
37 | 55 | try: |
38 | | - response = requests.get( |
39 | | - EXPLORER_API_URL, params=params, headers=headers, timeout=10 |
| 56 | + r = _get( |
| 57 | + session, |
| 58 | + url=EXPLORER_API_URL, |
| 59 | + params=params, |
| 60 | + headers={"accept": "application/json"}, |
40 | 61 | ) |
41 | | - response.raise_for_status() |
42 | | - result = response.json().get("result") |
43 | | - return json.loads(result) |
44 | | - except (requests.RequestException, ValueError, json.JSONDecodeError): |
45 | | - logger.exception("Error fetching ABI for contract") |
46 | | - sys.exit(1) |
| 62 | + return json.loads(r.json()["result"]) |
| 63 | + except (requests.RequestException, json.JSONDecodeError, KeyError) as exc: |
| 64 | + msg = f"Could not fetch ABI for {contract_address}" |
| 65 | + raise ExplorerError(msg) from exc |
47 | 66 |
|
48 | 67 |
|
49 | | -def format_coin_info(coin_data: dict[str, Any]) -> str: |
50 | | - """Format coin data dictionary to a readable string.""" |
51 | | - coin_info = { |
52 | | - "name": coin_data.get("name", "N/A"), |
53 | | - "symbol": coin_data.get("symbol", "N/A"), |
54 | | - "coingecko_id": coin_data.get("id", "N/A"), |
55 | | - "price_change_percentage_24h": coin_data.get("data", {}) |
56 | | - .get("price_change_percentage_24h", {}) |
57 | | - .get("usd", "N/A"), |
58 | | - "total_volume": coin_data.get("data", {}).get("total_volume", "N/A"), |
59 | | - "coingecko_link": f"https://www.coingecko.com/en/coins/{coin_data.get('id', '')}", |
60 | | - "description": coin_data.get("data", {}).get("content", {}).get("description") |
61 | | - if coin_data.get("data", {}).get("content", {}) |
62 | | - else "", |
63 | | - } |
| 68 | +def decode_feed_name(feed: bytes) -> str: |
| 69 | + """Convert feed bytes (returned by contract) to a plain symbol.""" |
| 70 | + return feed[1:].decode().rstrip("\x00").split("/")[0] |
64 | 71 |
|
65 | | - return "\n".join(f"{key}: {value}" for key, value in coin_info.items()) |
66 | 72 |
|
| 73 | +def get_current_feeds(contract: Contract) -> set[str]: |
| 74 | + """Return a **set** of existing feed symbols.""" |
| 75 | + feeds: list[bytes] = contract.functions.fetchAllCurrentFeeds().call()[0] |
| 76 | + return {decode_feed_name(f) for f in feeds} |
67 | 77 |
|
68 | | -def get_current_feeds(contract: Contract) -> list[str]: |
69 | | - """Fetch the current block latency feeds from the contract.""" |
70 | | - try: |
71 | | - feeds = contract.functions.fetchAllCurrentFeeds().call() |
72 | | - return [ |
73 | | - feed[1:].decode("utf-8").rstrip("\x00").split("/")[0] for feed in feeds[0] |
74 | | - ] |
75 | | - except Exception: |
76 | | - logger.exception("Error fetching current feeds") |
77 | | - sys.exit(1) |
78 | 78 |
|
| 79 | +def prettify_coin(coin: dict[str, Any]) -> str: |
| 80 | + """Convert `coin` dict into a readable block of Markdown.""" |
| 81 | + d = coin.get("data", {}) |
| 82 | + change_24h = d.get("price_change_percentage_24h", {}).get("usd", "N/A") |
| 83 | + total_vol = d.get("total_volume", "N/A") |
79 | 84 |
|
80 | | -def write_issues_file(path: Path, header: str, coins: list[dict[str, Any]]) -> None: |
81 | | - """Write coin data to the issues.md file.""" |
82 | | - with path.open("w", encoding="utf-8") as file: |
83 | | - file.write(header) |
84 | | - file.write("Coins matching criteria:\n\n") |
85 | | - for coin in coins: |
86 | | - file.write(f"## {coin['name']}\n") |
87 | | - file.write(format_coin_info(coin)) |
88 | | - file.write("\n\n") |
89 | | - logger.info("New feeds written to %s", path) |
| 85 | + return ( |
| 86 | + f"name: {coin.get('name', 'N/A')}\n" |
| 87 | + f"symbol: {coin.get('symbol', 'N/A')}\n" |
| 88 | + f"coingecko_id: {coin.get('id', 'N/A')}\n" |
| 89 | + f"price_change_percentage_24h: {change_24h}\n" |
| 90 | + f"total_volume: {total_vol}\n" |
| 91 | + f"coingecko_link: https://www.coingecko.com/en/coins/{coin.get('id', '')}\n" |
| 92 | + ) |
90 | 93 |
|
91 | 94 |
|
92 | | -if __name__ == "__main__": |
| 95 | +def write_issue(coins: list[dict[str, Any]]) -> None: |
| 96 | + """Write the Markdown issue file.""" |
| 97 | + body = ["Coins matching criteria:\n"] |
| 98 | + body += [f"## {c['name']}\n{prettify_coin(c)}\n" for c in coins] |
| 99 | + ISSUES_FILE.write_text(HEADER_TEMPLATE + "\n".join(body), encoding="utf-8") |
| 100 | + logger.info("Wrote %d coin(s) to %s", len(coins), ISSUES_FILE) |
| 101 | + |
| 102 | + |
| 103 | +def main() -> None: |
| 104 | + cg = CoinGeckoAPI() |
| 105 | + session = requests.Session() |
| 106 | + |
93 | 107 | w3 = Web3(Web3.HTTPProvider(RPC_URL)) |
94 | | - logger.info("Connected to RPC `%s`", RPC_URL) |
| 108 | + if not w3.is_connected(): |
| 109 | + msg = f"Cannot reach RPC at {RPC_URL}" |
| 110 | + raise ConnectionError(msg) |
| 111 | + logger.info("Connected to RPC %s", RPC_URL) |
95 | 112 |
|
96 | | - # Get contract registry |
97 | 113 | registry = w3.eth.contract( |
98 | 114 | address=Web3.to_checksum_address(REGISTRY_ADDRESS), |
99 | | - abi=get_contract_abi(REGISTRY_ADDRESS), |
| 115 | + abi=get_contract_abi(REGISTRY_ADDRESS, session), |
100 | 116 | ) |
101 | | - |
102 | | - # Set up contract |
103 | | - fast_updater_address = registry.functions.getContractAddressByName( |
104 | | - "FastUpdater" |
105 | | - ).call() |
| 117 | + updater_addr = registry.functions.getContractAddressByName("FastUpdater").call() |
106 | 118 | fast_updater = w3.eth.contract( |
107 | | - address=Web3.to_checksum_address(fast_updater_address), |
108 | | - abi=get_contract_abi(fast_updater_address), |
| 119 | + address=Web3.to_checksum_address(updater_addr), |
| 120 | + abi=get_contract_abi(updater_addr, session), |
109 | 121 | ) |
110 | | - logger.info("Connected to FastUpdater contract `%s`", fast_updater_address) |
| 122 | + logger.info("FastUpdater contract %s", fast_updater.address) |
111 | 123 |
|
112 | | - # Query block latency feeds |
113 | 124 | current_feeds = get_current_feeds(fast_updater) |
114 | 125 |
|
115 | | - # Fetch trending coins |
116 | | - cg = CoinGeckoAPI() |
117 | | - trending = cg.get_search_trending() |
118 | | - |
119 | | - # Filter coins based on criteria |
120 | | - selected_coins = [ |
121 | | - coin["item"] |
122 | | - for coin in trending["coins"] |
123 | | - if coin["item"].get("market_cap_rank", float("inf")) < MAX_MARKET_CAP_RANK |
124 | | - and coin["item"].get("symbol") not in current_feeds |
| 126 | + trending = cg.get_search_trending()["coins"] # pyright: ignore[reportUnknownMemberType] |
| 127 | + logger.info("Found potential coins %s", [f"{c['item']['name']}" for c in trending]) |
| 128 | + candidates = [ |
| 129 | + c["item"] |
| 130 | + for c in trending |
| 131 | + if c["item"].get("market_cap_rank", float("inf")) < MAX_MARKET_CAP_RANK |
| 132 | + and c["item"]["symbol"] not in current_feeds |
125 | 133 | ] |
| 134 | + if not candidates: |
| 135 | + sys.exit(78) # Skips GitHub workflow 'if' steps |
| 136 | + write_issue(candidates) |
126 | 137 |
|
127 | | - # Write results to issues file |
128 | | - if selected_coins: |
129 | | - write_issues_file(ISSUES_FILE, HEADER_TEMPLATE, selected_coins) |
| 138 | + |
| 139 | +if __name__ == "__main__": |
| 140 | + try: |
| 141 | + main() |
| 142 | + except Exception: |
| 143 | + logger.exception("Fatal error") |
| 144 | + sys.exit(1) |
0 commit comments