-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
274 lines (228 loc) · 11.1 KB
/
bot.py
File metadata and controls
274 lines (228 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import os
import json
import asyncio
from datetime import datetime, timezone
from pathlib import Path
import discord
from discord import app_commands
from dotenv import load_dotenv
from fetch_node import BASE_URL, fetch_node, fetch_node_block_number, parse_rsc_response
load_dotenv()
# How often to run the heartbeat check (seconds)
_check_s = (os.getenv("HEARTBEAT_CHECK_INTERVAL_SECONDS") or "300").strip()
HEARTBEAT_CHECK_INTERVAL_SECONDS = int(_check_s) if _check_s.isdigit() else 300
# How long without a heartbeat before sending a DM (seconds)
_stale_s = (os.getenv("HEARTBEAT_STALE_THRESHOLD_SECONDS") or "1800").strip()
HEARTBEAT_STALE_THRESHOLD_SECONDS = int(_stale_s) if _stale_s.isdigit() else 1800
REGISTRATIONS_FILE = Path("data") / "registrations.json"
def ensure_registrations_file() -> None:
"""Create data dir and empty registrations file if missing (so volume mount persists on host)."""
REGISTRATIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
if not REGISTRATIONS_FILE.exists():
REGISTRATIONS_FILE.write_text("{}")
_channel_id = (os.getenv("DISCORD_CHANNEL_ID") or "0").strip()
ALLOWED_CHANNEL_ID = int(_channel_id) if _channel_id.isdigit() else 0
_guild_id = (os.getenv("DISCORD_GUILD_ID") or "").strip()
GUILD_ID = int(_guild_id) if _guild_id.isdigit() else None
def load_registrations() -> dict[str, list[str]]:
if REGISTRATIONS_FILE.exists():
return json.loads(REGISTRATIONS_FILE.read_text())
return {}
def save_registrations(data: dict[str, list[str]]) -> None:
REGISTRATIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
REGISTRATIONS_FILE.write_text(json.dumps(data, indent=2))
def load_all_registered_nodes() -> list[str]:
"""Return unique node (wallet) addresses from all registrations."""
if not REGISTRATIONS_FILE.exists():
return []
data = json.loads(REGISTRATIONS_FILE.read_text())
nodes = set()
for wallet_list in data.values():
nodes.update(wallet_list)
return sorted(nodes)
def get_user_ids_for_node(node_address: str) -> list[int]:
"""Return Discord user IDs of everyone who registered this node."""
if not REGISTRATIONS_FILE.exists():
return []
data = json.loads(REGISTRATIONS_FILE.read_text())
return [int(uid) for uid, wallets in data.items() if node_address in wallets]
def parse_block_timestamp(ts: str) -> datetime | None:
"""Parse Blacklight block_timestamp e.g. '2026-03-05 7:21:37.0 +00:00:00' to UTC datetime."""
if not ts:
return None
ts = ts.strip()
# Strip timezone suffix (API uses " +00:00:00" or "+00:00:00"); treat as UTC
for suffix in (" +00:00:00", "+00:00:00", " +00:00", "+00:00"):
if ts.endswith(suffix):
ts = ts[: -len(suffix)].strip()
break
try:
# Parse "2026-03-05 7:43:05.0" (optional fractional seconds)
if "." in ts:
return datetime.strptime(ts, "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=timezone.utc)
return datetime.strptime(ts, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
return None
async def get_latest_heartbeat(node_address: str) -> dict | None:
"""Fetch node data (limit 1) and return the first/latest heartbeat record, or None."""
url = f"{BASE_URL}/nodes/{node_address}"
try:
block = await asyncio.to_thread(fetch_node_block_number, node_address)
if block is None:
print(f"[Heartbeat] No block number from [address] payload; skipping heartbeat fetch for {node_address}")
return None
print(f"[Heartbeat] Block number for node: {block}")
text = await asyncio.to_thread(fetch_node, node_address, block, 1)
data = parse_rsc_response(text)
if isinstance(data, list) and len(data) > 0:
return data[0]
# No exception but no data: API returned something that didn't contain a non-empty data array
print(f"[Heartbeat] URL: {url}")
print(f"[Heartbeat] Response length: {len(text)} chars")
if isinstance(data, list):
print(f"[Heartbeat] Parse ok but data list is empty (node may have no heartbeats for this round)")
else:
print(f"[Heartbeat] Could not find 'data' array in RSC response (first 300 chars): {text[:300]!r}")
return None
except Exception as e:
print(f"[Heartbeat] URL: {url}")
print(f"[Heartbeat] Error fetching node {node_address}: {e}")
return None
async def require_channel(interaction: discord.Interaction) -> bool:
"""Return True if interaction is in the allowed channel, else respond and return False."""
if interaction.channel_id != ALLOWED_CHANNEL_ID:
await interaction.response.send_message(
"This command can only be used in the designated bot channel.",
ephemeral=True,
)
return False
return True
class BlacklightBot(discord.Client):
def __init__(self):
intents = discord.Intents.default()
super().__init__(intents=intents)
self.tree = app_commands.CommandTree(self)
self.allowed_channel_id = ALLOWED_CHANNEL_ID
async def setup_hook(self):
if GUILD_ID:
self.tree.copy_global_to(guild=discord.Object(id=GUILD_ID))
await self.tree.sync(guild=discord.Object(id=GUILD_ID))
else:
self.loop.create_task(self._sync_commands_to_guilds())
self.loop.create_task(self.heartbeat_check_cron())
async def _sync_commands_to_guilds(self):
"""Sync slash commands to every guild the bot is in so they appear immediately."""
await self.wait_until_ready()
for guild in self.guilds:
try:
self.tree.copy_global_to(guild=guild)
await self.tree.sync(guild=guild)
except discord.HTTPException:
pass
async def heartbeat_check_cron(self):
"""Every 30s: for each registered node, check latest heartbeat; DM users if > 1 min old."""
await self.wait_until_ready()
def dm_message(node: str) -> str:
mins = HEARTBEAT_STALE_THRESHOLD_SECONDS // 60
return (
f"Hey. The node you registered ({node}) has not responded to heartbeat transaction "
f"for {mins} minute{'s' if mins != 1 else ''} - you may want to check up on it."
)
def channel_message(node: str) -> str:
mins = HEARTBEAT_STALE_THRESHOLD_SECONDS // 60
return (
f"Node {node} has not responded to a HTX for {mins} minute{'s' if mins != 1 else ''}, "
"it may have gone offline."
)
while not self.is_closed():
nodes = load_all_registered_nodes()
if not nodes:
print("[Heartbeat] No registered nodes; skipping check.")
for node_address in nodes:
url = f"{BASE_URL}/nodes/{node_address}"
print(f"[Heartbeat] Calling endpoint for node {node_address}")
print(f"[Heartbeat] URL: {url}")
latest = await get_latest_heartbeat(node_address)
if not latest:
print(f"[Heartbeat] No heartbeat data returned for {node_address}")
continue
ts_str = latest.get("block_timestamp")
if not ts_str:
print(f"[Heartbeat] No block_timestamp in response for {node_address}")
continue
dt = parse_block_timestamp(ts_str)
if not dt:
print(f"[Heartbeat] Could not parse block_timestamp {ts_str!r} for {node_address}")
continue
now = datetime.now(timezone.utc)
age_seconds = (now - dt).total_seconds()
print(f"[Heartbeat] Last transaction: {ts_str} (age {age_seconds:.0f}s)")
if age_seconds > HEARTBEAT_STALE_THRESHOLD_SECONDS:
# Notify the channel
if self.allowed_channel_id:
try:
channel = self.get_channel(self.allowed_channel_id)
if channel:
await channel.send(channel_message(node_address))
print(f"[Heartbeat] Channel notification sent for {node_address}")
except discord.HTTPException as e:
print(f"[Heartbeat] Failed to send channel message: {e}")
# DM each user who registered this node
user_ids = get_user_ids_for_node(node_address)
for uid in user_ids:
try:
user = self.get_user(uid) or await self.fetch_user(uid)
if user:
await user.send(dm_message(node_address))
print(f"[Heartbeat] DM sent to user {uid} for {node_address}")
except discord.HTTPException as e:
print(f"[Heartbeat] Failed to DM user {uid}: {e}")
else:
print(f"[Heartbeat] No message (heartbeat under {HEARTBEAT_STALE_THRESHOLD_SECONDS}s)")
await asyncio.sleep(HEARTBEAT_CHECK_INTERVAL_SECONDS)
bot = BlacklightBot()
@bot.tree.command(name="register", description="Register a wallet address")
@app_commands.describe(wallet_address="The wallet address to register")
async def register(interaction: discord.Interaction, wallet_address: str):
if not await require_channel(interaction):
return
user_id = str(interaction.user.id)
registrations = load_registrations()
user_wallets = registrations.get(user_id, [])
if wallet_address in user_wallets:
await interaction.response.send_message(
f"You have already registered `{wallet_address}`.", ephemeral=True
)
return
registrations.setdefault(user_id, []).append(wallet_address)
save_registrations(registrations)
await interaction.response.send_message(
f"Wallet `{wallet_address}` registered successfully!", ephemeral=True
)
@bot.tree.command(name="unregister", description="Unregister a wallet address")
@app_commands.describe(wallet_address="The wallet address to unregister")
async def unregister(interaction: discord.Interaction, wallet_address: str):
if not await require_channel(interaction):
return
user_id = str(interaction.user.id)
registrations = load_registrations()
user_wallets = registrations.get(user_id, [])
if wallet_address not in user_wallets:
await interaction.response.send_message(
f"You have not registered `{wallet_address}`.", ephemeral=True
)
return
user_wallets.remove(wallet_address)
if not user_wallets:
del registrations[user_id]
save_registrations(registrations)
await interaction.response.send_message(
f"Wallet `{wallet_address}` has been unregistered.", ephemeral=True
)
@bot.event
async def on_ready():
print(f"Logged in as {bot.user} (ID: {bot.user.id})")
print("------")
if __name__ == "__main__":
ensure_registrations_file()
bot.run(os.getenv("DISCORD_BOT_TOKEN"))