-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmatrix_bot.py
More file actions
422 lines (353 loc) · 18.7 KB
/
Copy pathmatrix_bot.py
File metadata and controls
422 lines (353 loc) · 18.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
import asyncio
import os
import json
import functools
import pathlib
import re
import logging # Added for improved logging
import time # Added for timestamping
from urllib.parse import urlparse, urlencode, parse_qs
from dotenv import load_dotenv
from nio import AsyncClient, LoginError, RoomMessageText, MatrixRoom, RoomMemberEvent
# Configure logging
# Log level will be set based on environment variable LOG_LEVEL
log_level_str = os.environ.get("LOG_LEVEL", "INFO").upper()
log_level = getattr(logging, log_level_str, logging.INFO) # Fallback to INFO if invalid
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
logger.info(f"Logging level set to: {logging.getLevelName(logger.getEffectiveLevel())}")
# Global ALTS and SERVICES dictionaries
ALTS = {}
SERVICES = {}
# Constants
TEMPLATE = """
{new}
<a href="{old}">source</a>
""" # This is HTML, Matrix uses Markdown or HTML subset. Will need adjustment.
# Utility functions from linkchanbot (oneline is still used)
@functools.cache
def oneline(s: str) -> str:
"""
Converts newlines and tabs to ASCII representations.
"""
s = s.replace("\\", "\\\\")
return s.replace("\n", "\\n").replace("\t", "\\t")
@functools.cache
def mk_newlinks(link):
"""
The core logic of link substitution.
Given a link, returns either:
[str...] A list of new links.
[False] A list with a single False element.
"""
logger.debug(f"mk_newlinks: Processing link: {link}")
if not ALTS or not SERVICES:
logger.warning("mk_newlinks: ALTS or SERVICES not loaded. Link substitution will not work.")
return [False]
original_link = link # Keep original for logging
# Prepare and parse link string
if not link.startswith("https://") and not link.startswith("http://"):
link = "https://" + link
try:
url = urlparse(link)
except ValueError:
logger.warning(f"mk_newlinks: Invalid URL '{original_link}' resulted in ValueError during parsing.")
return [False] # Invalid URL
# Enforce HTTPS
url = url._replace(scheme="https")
parsed_netloc_lower = url.netloc.lower() # Normalize netloc to lowercase for matching
# Recognise service
service_key_matched = None # This will store the canonical key from SERVICES dict
if parsed_netloc_lower in SERVICES:
service_key_matched = parsed_netloc_lower # Direct match with a primary service domain (already lowercase)
else:
# Iterate through services and their alt_domains (also lowercased for comparison)
for main_domain, service_data in SERVICES.items():
# Ensure alt_domains are treated as lowercase for matching
alt_domains_lower = [ad.lower() for ad in service_data.get("alt_domains", [])]
if parsed_netloc_lower in alt_domains_lower:
service_key_matched = main_domain # Matched an alt_domain, use its main_domain key
break
if not service_key_matched:
logger.debug(f"mk_newlinks: Service not recognized for domain '{url.netloc}' (normalized: '{parsed_netloc_lower}') from link '{original_link}'.")
return [False]
logger.debug(f"mk_newlinks: Recognized link '{original_link}' as service '{service_key_matched}' (matched on '{parsed_netloc_lower}').")
# Keep only allowed URL queries (path, query, fragment should retain original casing from url object)
allowed_queries = SERVICES[service_key_matched].get("query_whitelist", [])
old_queries = parse_qs(url.query, keep_blank_values=True)
new_queries = {
query: v for (query, v) in old_queries.items() if query in allowed_queries
}
# Create a new URL object for modification that only has scheme, queries, path, etc. from original,
# but netloc will be replaced by alt domains.
# The original url object still has original casing for path/query.
url_to_substitute = url._replace(query=urlencode(new_queries, doseq=True))
# Find alts for replacing `service_key_matched`
applicable_alts = {
altsite: alt_data for (altsite, alt_data) in ALTS.items() if alt_data.get("service", "").lower() == service_key_matched
}
logger.debug(f"mk_newlinks: Found applicable_alts: {list(applicable_alts.keys())} for service '{service_key_matched}'.")
if not applicable_alts:
logger.debug(f"mk_newlinks: No applicable alts found for service '{service_key_matched}' from link '{original_link}'.")
return [False]
# Make new substitutes
# When substituting, we use the altsite (which should be a domain) as the new netloc.
# The path, query (now sanitized), and fragment are taken from url_to_substitute.
newlinks = list(
map(
lambda new_alt_domain: url_to_substitute._replace(netloc=new_alt_domain).geturl(),
applicable_alts.keys(),
)
)
return newlinks
def load_config_data():
"""Loads alts.json and services.json."""
global ALTS, SERVICES
script_dir = pathlib.Path(__file__).parent
default_alts_path = script_dir / "sample.config" / "alts.json"
default_services_path = script_dir / "sample.config" / "services.json"
alts_json_path_str = os.environ.get("MATRIX_BOT_ALTS_JSON_PATH")
services_json_path_str = os.environ.get("MATRIX_BOT_SERVICES_JSON_PATH")
alts_path = pathlib.Path(alts_json_path_str) if alts_json_path_str else default_alts_path
services_path = pathlib.Path(services_json_path_str) if services_json_path_str else default_services_path
try:
with open(alts_path, "r") as f:
ALTS = json.load(f)
logger.info(f"Successfully loaded ALTS from {alts_path}")
except FileNotFoundError:
logger.error(f"ALTS file not found at {alts_path}. Link substitution for some services might not work.")
ALTS = {} # Ensure ALTS is an empty dict if file not found
except json.decoder.JSONDecodeError as e:
logger.error(f"JSON syntax error in {alts_path}: {e}")
ALTS = {}
try:
with open(services_path, "r") as f:
SERVICES = json.load(f)
logger.info(f"Successfully loaded SERVICES from {services_path}")
except FileNotFoundError:
logger.error(f"SERVICES file not found at {services_path}. Link substitution will likely not work.")
SERVICES = {} # Ensure SERVICES is an empty dict if file not found
except json.decoder.JSONDecodeError as e:
logger.error(f"JSON syntax error in {services_path}: {e}")
SERVICES = {}
# Validate ALTS (optional, but good practice)
if ALTS:
for altsite, alt in ALTS.items():
if "service" not in alt:
logger.warning(f"alts.json: '{altsite}' has no 'service' value, it might be ignored or cause issues.")
# Dynamically add x.com as an alias for twitter.com if twitter.com service exists
if "twitter.com" in SERVICES:
if "alt_domains" not in SERVICES["twitter.com"]:
SERVICES["twitter.com"]["alt_domains"] = []
if "x.com" not in SERVICES["twitter.com"]["alt_domains"]:
SERVICES["twitter.com"]["alt_domains"].append("x.com")
logger.info("Dynamically added 'x.com' as an alt_domain for 'twitter.com' service.")
def find_links_in_text(text):
"""Finds URLs in a given text string.
Attempts to find URLs with or without http(s):// prefix.
"""
# This regex looks for:
# 1. Optional http:// or https://
# 2. Optional www.
# 3. A domain name part (sequence of subdomain.domain.tld)
# 4. A path part (only valid URL characters: ASCII letters, digits, and URL-safe special chars)
# The key fix: Use a character class that only includes valid URL path characters,
# which excludes non-ASCII text like Arabic, Chinese, etc.
url_pattern = re.compile(
r'(?:(?:http[s]?://|ftp://|www\.)|(?:(?!(?:http[s]?|ftp)://|www\.))(?=[a-zA-Z0-9]))' # Scheme or www, or start of domain
r'(?:[a-zA-Z0-9\-]+\.)+(?:[a-zA-Z]{2,})' # domain.tld
r'(?::[0-9]+)?' # Optional port
r'(?:/[a-zA-Z0-9\-._~:/?#\[\]@!$&\'()*+,;=%]*)?', # Optional path with valid URL characters only
re.IGNORECASE | re.ASCII # Make matching case-insensitive and use ASCII character classes
)
# Previous simpler regex for http(s) only:
# url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
return url_pattern.findall(text)
def extract_reply_content_from_formatted_body(formatted_body):
"""Extract actual reply content from Matrix formatted_body, excluding quoted content.
Matrix replies include quoted content wrapped in <mx-reply> tags.
This function removes the <mx-reply> section and returns just the actual reply.
"""
if not formatted_body:
return None
# Remove <mx-reply>...</mx-reply> section (including nested content)
# Use DOTALL flag to match across newlines
reply_pattern = r'<mx-reply>.*?</mx-reply>'
clean_content = re.sub(reply_pattern, '', formatted_body, flags=re.DOTALL | re.IGNORECASE)
# Clean up HTML tags but preserve the text content
# This will extract the text content including any URLs that are in the link text
clean_content = re.sub(r'<[^>]+>', '', clean_content)
# Note: We don't add URLs from href attributes back because they're already
# in the link text, and adding them again causes duplicates
return clean_content.strip()
def has_reply_relationship(event_source):
"""Check if the event is a reply by looking for m.relates_to."""
if not isinstance(event_source, dict):
return False
content = event_source.get('content', {})
relates_to = content.get('m.relates_to', {})
return 'm.in_reply_to' in relates_to
def get_content_for_link_processing(event):
"""Get the appropriate content for link processing based on message type.
For replies: Extract content excluding quoted portions using structured data.
For regular messages: Use the message body directly.
"""
# Check if this is a reply using structured data
if has_reply_relationship(event.source):
# For replies, try to extract clean content from formatted_body
if hasattr(event, 'formatted_body') and event.formatted_body:
clean_content = extract_reply_content_from_formatted_body(event.formatted_body)
if clean_content:
return clean_content
# Fallback to quote-based filtering if no formatted_body
return find_links_excluding_quotes_fallback(event.body)
# For non-replies, process the entire message body
return event.body
def find_links_excluding_quotes_fallback(text):
"""Fallback method: Finds URLs in text but excludes URLs from quoted sections.
This is used when formatted_body is not available.
Matrix replies often include quoted content in the message body using fallback format:
> <@user:example.com> Original message with URLs
This function excludes URLs that appear in such quoted sections (lines starting with '>').
"""
lines = text.split('\n')
non_quoted_lines = []
for line in lines:
# Skip lines that start with '>' (Matrix quote format)
# Also handle lines that start with whitespace + '>'
stripped = line.lstrip()
if not stripped.startswith('>'):
non_quoted_lines.append(line)
# Join the non-quoted lines and return the text for processing
return '\n'.join(non_quoted_lines)
async def main():
load_dotenv()
load_config_data()
homeserver = os.environ["HOMESERVER"]
user_id = os.environ["USER_ID"]
# Password is now optional if a store is used and valid
password = os.environ.get("PASSWORD")
device_id = os.environ.get("DEVICE_ID", "linkmatrixbot") # Used for initial login if no store
store_path_str = os.environ.get("MATRIX_BOT_STORE_PATH")
if store_path_str:
store_path = pathlib.Path(store_path_str)
store_path.mkdir(parents=True, exist_ok=True) # Ensure store directory exists
logger.info(f"Using Matrix client store path: {store_path.resolve()}")
else:
store_path = None
logger.info("No MATRIX_BOT_STORE_PATH defined, session will not be persisted.")
client = AsyncClient(homeserver, user_id, store_path=store_path, device_id=device_id if not store_path or not (store_path / f"matrix-nio-sync-{user_id.replace(':','_')}.sqlite").exists() else None)
# Try to load session from store first
if client.user_id and client.access_token and client.device_id:
logger.info(f"Successfully loaded session for {client.user_id} (device: {client.device_id}) from store.")
# Verify token works with a light request, e.g., whoami, or just proceed
try:
# A light sync can verify the token and get initial state
logger.info("Verifying loaded session with an initial sync...")
await client.sync_once(timeout=10000, full_state=True) # Short timeout, full state for first sync
logger.info("Session verified and initial sync complete.")
except Exception as e:
logger.warning(f"Loaded session for {client.user_id} might be invalid or sync failed: {e}. Attempting password login if available.", exc_info=True)
# Clear potentially bad stored credentials before password login
client.access_token = None
client.device_id = None # Let login create a new one if needed
# Consider clearing the store file if sync fails catastrophically with stored creds
if password:
logger.info(f"Attempting to login with password for {user_id}...")
try:
login_response = await client.login(password, device_name=device_id)
if isinstance(login_response, LoginError):
logger.error(f"Password login failed: {login_response.message}")
return
logger.info("Password login successful. New session will be saved to store.")
except Exception as e_login:
logger.error(f"Password login exception: {e_login}", exc_info=True)
return
else:
logger.error("No password provided to re-authenticate after stored session failed.")
return
# If sync_once succeeded with stored creds, we are good.
elif password:
logger.info(f"No valid session in store or first run. Attempting to login with password for {user_id}...")
try:
login_response = await client.login(password, device_name=device_id)
if isinstance(login_response, LoginError):
logger.error(f"Password login failed: {login_response.message}")
return
logger.info("Password login successful. Session will be saved to store if store_path is set.")
except Exception as e:
logger.error(f"Password login exception: {e}", exc_info=True)
return
else:
logger.error("No stored session found and no password provided. Cannot login.")
return
# Initialize bot_startup_time here. It will be set properly before sync_forever.
bot_startup_time = 0
# Define callbacks as regular async functions
# Invitation handling logic is removed.
# Bot will only operate in rooms it's already a member of.
async def message_handler_callback(room: MatrixRoom, event: RoomMessageText):
# This callback reads bot_startup_time from the enclosing main() scope
logger.debug(
f"Message received in room {room.room_id} ({room.display_name}) | Sender: {event.sender} | Body: {event.body}"
)
if event.server_timestamp <= bot_startup_time: # Uses bot_startup_time from main's scope
logger.debug(f"Ignoring old message from {event.sender} with timestamp {event.server_timestamp} (startup: {bot_startup_time})")
return
if event.sender == client.user_id: # Don't reply to our own messages
logger.debug("Message is from self, ignoring.")
return
# Get the appropriate content for link processing
content_to_process = get_content_for_link_processing(event)
found_links = find_links_in_text(content_to_process)
if not found_links:
logger.debug("No links found in message.")
return
logger.debug(f"Found links: {found_links}")
replies = []
for link in found_links:
logger.debug(f"Processing link for substitution: {link}")
new_links = mk_newlinks(link) # This function now has more logging
if new_links and new_links[0] is not False: # Check explicitly for not False
substituted_link = new_links[0]
logger.debug(f"Substituted link {link} -> {substituted_link}")
reply_text = substituted_link # Changed to only send the new link
replies.append(reply_text)
else:
logger.debug(f"No substitution found or mk_newlinks failed for {link}")
if replies:
full_reply = "\n".join(replies)
logger.info(f"Sending reply to room {room.room_id}: {full_reply}")
try:
await client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": full_reply},
)
except Exception as e:
logger.error(f"Failed to send message to room {room.room_id}: {e}", exc_info=True)
else:
logger.debug(f"No replies generated for message from {event.sender} in room {room.room_id}.")
# Register callbacks explicitly after client login and before starting sync loop
logger.info("Registering event callbacks...")
# client.add_event_callback(on_room_invite_callback, RoomMemberEvent) # Removed invite callback
client.add_event_callback(message_handler_callback, RoomMessageText)
logger.info("Event callbacks registered.")
# Set the actual startup timestamp *before* starting the sync loop that uses it.
bot_startup_time = int(time.time() * 1000)
logger.info(f"Bot startup timestamp set to: {bot_startup_time}")
logger.info("Starting sync_forever with server (full_state=True for initial sync)...")
try:
# full_state=True on the first run of sync_forever will get initial room states.
# nio handles the transition from initial sync to subsequent incremental syncs.
await client.sync_forever(timeout=30000, full_state=True)
except Exception as e:
logger.error(f"Error during sync_forever: {e}", exc_info=True)
finally:
logger.info("Closing client...")
await client.close()
if __name__ == "__main__":
asyncio.run(main())