-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
5189 lines (4722 loc) · 233 KB
/
main.py
File metadata and controls
5189 lines (4722 loc) · 233 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
astrbot-plugin-tmp-bot
欧卡2TMP查询插件 (版本 1.8.1)
"""
import re
import asyncio
import aiohttp
import json
import os
import re as _re_local
import base64
import socket
import hashlib
import random
import time
from typing import Optional, List, Dict, Tuple, Any
from datetime import datetime, timedelta
# 引入 AstrBot 核心 API
try:
from astrbot.api.event import filter, AstrMessageEvent, MessageEventResult
from astrbot.api.star import Context, Star, register, StarTools
from astrbot.api import logger
from astrbot.api.message_components import Image, Plain
# 强制 INFO 级别,确保能看到 bans 日志
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
except ImportError:
# 最小化兼容回退
class _DummyFilter:
class EventMessageType:
ALL = "ALL"
def command(self, pattern, **kwargs):
def decorator(func):
return func
return decorator
def event_message_type(self, _type, **kwargs):
def decorator(func):
return func
return decorator
filter = _DummyFilter()
class AstrMessageEvent:
def __init__(self, message_str: str = "", sender_id: str = "0", match=None, group_id: str = None):
self.message_str = message_str
self._sender_id = sender_id
self._match = match
self._group_id = group_id
def get_sender_id(self) -> str:
return self._sender_id
def get_group_id(self) -> str:
return self._group_id or ""
def is_group_message(self) -> bool:
return bool(self._group_id)
def is_private_message(self) -> bool:
return not self._group_id
async def plain_result(self, msg):
return msg
async def chain_result(self, components):
return components
MessageEventResult = Any
class Context: pass
class Star:
def __init__(self, context: Context = None): pass
def register(*args, **kwargs):
def deco(cls):
return cls
return deco
class StarTools:
@staticmethod
def get_data_dir(name: str):
return os.path.join(os.getcwd(), name)
class _Logger:
@staticmethod
def info(msg):
print("[INFO]", msg)
@staticmethod
def error(msg, exc_info=False):
print("[ERROR]", msg)
if exc_info:
import traceback
traceback.print_exc()
logger = _Logger()
# 兼容运行环境缺失时的占位 Image 类
class Image:
@staticmethod
def fromBytes(b: bytes):
return b
@staticmethod
def fromURL(url: str):
return url
class Plain:
def __init__(self, text: str):
self.text = text
USER_GROUP_MAP = {
'Player': '玩家',
'Retired Legend': '退役',
'Game Developer': '游戏开发者',
'Retired Team Member': '退休团队成员',
'Add-On Team': '附加组件团队',
'Game Moderator': '游戏管理员'
}
PROMODS_SERVER_IDS = {50, 51}
def _translate_user_groups(groups: List[Any]) -> List[str]:
translated: List[str] = []
for g in groups:
if g is None:
continue
key = str(g)
translated.append(USER_GROUP_MAP.get(key, key))
return translated
# --- 辅助函数:格式化时间戳 ---
def _format_timestamp_to_readable(timestamp_str: Optional[str]) -> str:
"""将 TruckersMP API 返回的 UTC 时间戳转换为可读格式 (ISO 8601)。"""
if not timestamp_str:
return "未知"
try:
# TruckersMP V2 返回 ISO 8601 (e.g., "2024-05-28T14:30:00.000Z")
clean_str = timestamp_str.replace('T', ' ').split('.')[0].replace('Z', '')
dt_utc = datetime.strptime(clean_str, '%Y-%m-%d %H:%M:%S')
# 直接显示 UTC 时间,并标注时区
return dt_utc.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
# 兼容性回退
return timestamp_str.split('T')[0] if 'T' in timestamp_str else timestamp_str
# -----------------------------
def _format_timestamp_to_beijing(timestamp_str: Optional[str]) -> str:
"""将 UTC 时间戳转换为北京时间 (UTC+8)。兼容 ISO 8601 和简单格式。"""
if not timestamp_str:
return "未知"
s = str(timestamp_str).strip()
if s.lower().startswith('never'):
return "永久封禁"
try:
clean_str = s.replace('T', ' ').split('.')[0].replace('Z', '')
dt_utc = datetime.strptime(clean_str, '%Y-%m-%d %H:%M:%S')
dt_bj = dt_utc + timedelta(hours=8)
return dt_bj.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
try:
# ISO 8601 with timezone offset, e.g. 2025-12-01T07:55:00+00:00
iso = s.replace('Z', '+00:00')
dt = datetime.fromisoformat(iso)
dt_bj = dt + timedelta(hours=8)
return dt_bj.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return s
def _cleanup_cn_location_text(text: str) -> str:
s = str(text or "").strip()
if not s:
return s
try:
s = _re_local.sub(r"\s+", " ", s).strip()
s = _re_local.sub(r"^(?:[\[[][^\]]]+[\]]]\s*)+", "", s)
s = _re_local.sub(r"^<[^>]+>\s*", "", s)
s = _re_local.sub(r"^(?:&\s*)?(?:n|v|adj|adv|vt|vi|prep|pron|conj|abbr)[\..]\s*", "", s, flags=_re_local.IGNORECASE)
s = _re_local.sub(r"^(?:\s*(?:&\s*)?(?:n|v|adj|adv|vt|vi|prep|pron|conj|abbr)[\..]\s*)+", "", s, flags=_re_local.IGNORECASE)
s = _re_local.sub(r"^(?:\s*(?:名|动|形|副|介|代|连|数|量|叹|助|冠)(?:词)?[\..::]\s*)+", "", s)
s = _re_local.sub(r"([^)]*)", "", s)
s = _re_local.sub(r"\([^)]*\)", "", s)
for sep in [";", ";", ","]:
if sep in s:
s = s.split(sep, 1)[0]
s = s.strip(" 、,。.;;")
if _re_local.search(r"\s", s):
head = _re_local.split(r"\s+", s, 1)[0]
if _re_local.search(r"[\u4e00-\u9fff]", head) and not _re_local.fullmatch(r"(?:名|动|形|副|介|代|连|数|量|叹|助|冠)(?:词)?[\..::]?", head):
s = head
return s or text
except Exception:
return text
# -----------------------------
# 自定义异常类
class TmpApiException(Exception):
"""TMP 相关异常的基类"""
pass
class PlayerNotFoundException(TmpApiException):
"""玩家不存在异常"""
pass
class SteamIdNotFoundException(TmpApiException):
"""Steam ID 未绑定 TMP 账号异常"""
pass
class NetworkException(Exception):
"""网络请求异常"""
pass
class ApiResponseException(TmpApiException):
"""API响应异常"""
pass
@register("tmp-bot", "BGYdook", "欧卡2TMP查询插件", "1.8.1", "https://github.com/BGYdook/astrbot-plugin-tmp-bot")
class TmpBotPlugin(Star):
def __init__(self, context, config=None): # 接收 context 和 config
super().__init__(context) # 将 context 传给父类
self.widget_list = []
# 会在真实环境中由框架注入 session/context 等
self.session = None
self._ready = False
self.config = config or {}
self._translate_cache: Dict[str, str] = {}
self._location_maps_loaded: bool = False
self._fullmap_cache: Optional[Dict[str, Any]] = None
self._fullmap_cache_ts: float = 0.0
self._fullmap_last_fetch_ts: float = 0.0
self._fullmap_next_fetch_ts: float = 0.0
self._fullmap_task: Optional[asyncio.Task] = None
self._fullmap_lock = asyncio.Lock()
self._fullmap_fetch_lock = asyncio.Lock()
self._load_location_maps()
try:
bind_path = self.config.get('bind_file')
if not bind_path:
root = os.getcwd()
bind_path = os.path.join(root, 'data', 'tmp_bindings.json')
d = os.path.dirname(bind_path)
if d:
os.makedirs(d, exist_ok=True)
self.bind_file = bind_path
except Exception:
self.bind_file = os.path.join(os.getcwd(), 'tmp_bindings.json')
try:
logger.info("TMP Bot 插件初始化开始")
# 仅做轻量初始化,避免在导入阶段执行网络/阻塞操作
# 真实运行时框架会在 on_load/on_start 注入 session 等资源
self._ready = True
logger.info("TMP Bot 插件初始化完成(就绪)")
except Exception as e:
self._ready = False
logger.exception("TMP Bot 插件初始化发生异常,标记为未就绪:%s", e)
# --- 配置读取辅助 ---
def _cfg_bool(self, key: str, default: bool) -> bool:
v = self.config.get(key, default)
return bool(v) if isinstance(v, (bool, int, str)) else default
def _cfg_int(self, key: str, default: int) -> int:
try:
v = self.config.get(key, default)
return int(v)
except Exception:
return default
def _cfg_str(self, key: str, default: str) -> str:
v = self.config.get(key, default)
if v is None:
return default
return str(v)
async def initialize(self):
# 统一 User-Agent,并更新版本号
timeout_sec = self._cfg_int('api_timeout_seconds', 10)
# 使用 IPv4 优先的连接器,并允许读取环境代理设置(与浏览器/系统行为更一致)
connector = aiohttp.TCPConnector(family=socket.AF_INET)
self.session = aiohttp.ClientSession(
headers={'User-Agent': 'astrBot-TMP-Plugin/1.3.59'},
timeout=aiohttp.ClientTimeout(total=timeout_sec),
connector=connector,
trust_env=True
)
logger.info(f"TMP Bot 插件HTTP会话已创建,超时 {timeout_sec}s")
self._fullmap_task = None
def _get_fullmap_interval(self) -> int:
v = self._cfg_int('ets2map_fullmap_interval_seconds', 60)
return 60 if v < 60 else v
def _start_fullmap_task(self) -> None:
if self._fullmap_task and not self._fullmap_task.done():
return
self._fullmap_task = asyncio.create_task(self._fullmap_loop())
async def _fullmap_loop(self) -> None:
await asyncio.sleep(self._get_fullmap_interval())
while True:
await self._fetch_fullmap()
await asyncio.sleep(self._get_fullmap_interval())
def _start_file_list_task(self) -> None:
"""启动文件清单定时任务"""
if self._file_list_task and not self._file_list_task.done():
return
self._file_list_task = asyncio.create_task(self._file_list_loop())
async def _file_list_loop(self) -> None:
"""文件清单定时循环(每10分钟)"""
# 首次运行等待30秒,让插件完全初始化
await asyncio.sleep(30)
while True:
try:
await self._check_file_list_update()
except Exception as e:
logger.error(f"文件清单检查出错: {e}")
# 每10分钟检查一次
await asyncio.sleep(600)
async def _check_file_list_update(self) -> None:
"""检查文件清单是否有更新"""
if not self.session:
return
# 检查车队平台功能是否启用
vtcm_feature_enable = self._cfg_bool('vtcm_feature_enable', True)
if not vtcm_feature_enable:
return
async with self._file_list_lock:
try:
file_list_data = await self._get_tmp_file_list()
if file_list_data.get("error"):
logger.error(f"获取文件清单失败: {file_list_data.get('msg', '未知错误')}")
return
files = file_list_data.get('data', [])
if not files:
return
# 检测更新的文件
updated_files = []
new_files = []
for file_item in files:
file_path = file_item.get('filePath', '')
file_md5 = file_item.get('md5', '')
if not file_path or not file_md5:
continue
if file_path in self._file_list_cache:
# 检查MD5是否变化
if self._file_list_cache[file_path] != file_md5:
updated_files.append(file_item)
self._file_list_cache[file_path] = file_md5
else:
# 新文件
new_files.append(file_item)
self._file_list_cache[file_path] = file_md5
# 如果有更新,发送通知
if updated_files or new_files:
await self._send_file_update_notification(updated_files, new_files)
except Exception as e:
logger.error(f"检查文件清单更新出错: {e}")
async def _send_file_update_notification(self, updated_files: list, new_files: list) -> None:
"""发送文件更新通知到配置的群聊(发送当天日期前后两天更新的文件)"""
# 获取当前日期及前后两天
now = datetime.now()
today = now.strftime('%Y-%m-%d')
yesterday = (now - timedelta(days=1)).strftime('%Y-%m-%d')
tomorrow = (now + timedelta(days=1)).strftime('%Y-%m-%d')
# 定义允许的日期列表
allowed_dates = [yesterday, today, tomorrow]
# 过滤出前后两天内更新的文件
recent_new_files = []
recent_updated_files = []
for file_item in new_files:
update_time = file_item.get('updateTime', '')
if update_time:
# 提取日期部分(格式:YYYY-MM-DD HH:MM:SS)
file_date = update_time[:10] if len(update_time) >= 10 else update_time
if file_date in allowed_dates:
recent_new_files.append(file_item)
for file_item in updated_files:
update_time = file_item.get('updateTime', '')
if update_time:
# 提取日期部分
file_date = update_time[:10] if len(update_time) >= 10 else update_time
if file_date in allowed_dates:
recent_updated_files.append(file_item)
# 如果没有前后两天内更新的文件,不发送通知
if not recent_new_files and not recent_updated_files:
logger.info(f"前后两天内({yesterday} ~ {tomorrow})没有文件更新,跳过发送通知")
return
# 构建消息
message = f"🔄 TMP文件更新通知 ({yesterday} ~ {tomorrow})\n\n"
if recent_new_files:
message += f"📦 新增文件 ({len(recent_new_files)}个):\n"
for i, file_item in enumerate(recent_new_files[:5], 1):
file_type = file_item.get('type', 0)
type_text = 'UI' if file_type == 1 else '核心' if file_type == 3 else '其他'
message += f" {i}. [{type_text}] {file_item.get('filePath', '未知')}\n"
if len(recent_new_files) > 5:
message += f" ... 还有 {len(recent_new_files) - 5} 个新文件\n"
message += "\n"
if recent_updated_files:
message += f"📝 更新文件 ({len(recent_updated_files)}个):\n"
for i, file_item in enumerate(recent_updated_files[:5], 1):
file_type = file_item.get('type', 0)
type_text = 'UI' if file_type == 1 else '核心' if file_type == 3 else '其他'
message += f" {i}. [{type_text}] {file_item.get('filePath', '未知')}\n"
message += f" 更新时间: {file_item.get('updateTime', '未知')}\n"
if len(recent_updated_files) > 5:
message += f" ... 还有 {len(recent_updated_files) - 5} 个更新文件\n"
# 直接执行全局发送
logger.info("执行文件清单全局发送")
await self._send_file_list_to_all_groups(message)
async def _send_file_list_to_all_groups(self, message: str) -> None:
"""发送文件清单消息到所有群聊(全局发送)"""
try:
# 尝试通过context获取群聊列表
group_list = []
# 方法1: 通过context获取
if hasattr(self.context, 'get_group_list'):
try:
result = await self.context.get_group_list()
if result:
group_list.extend(result)
logger.info(f"通过context获取到 {len(result)} 个群聊")
except Exception as e:
logger.debug(f"通过context获取群聊列表失败: {e}")
# 方法2: 通过adapters获取
if not group_list and hasattr(self.context, 'adapters'):
try:
for adapter in self.context.adapters:
if hasattr(adapter, 'get_group_list'):
result = await adapter.get_group_list()
if result:
group_list.extend(result)
logger.info(f"通过adapter获取到 {len(result)} 个群聊")
except Exception as e:
logger.debug(f"通过adapters获取群聊列表失败: {e}")
# 方法3: 通过platform获取
if not group_list and hasattr(self.context, 'platform'):
try:
platform = self.context.platform
if hasattr(platform, 'get_group_list'):
result = await platform.get_group_list()
if result:
group_list.extend(result)
logger.info(f"通过platform获取到 {len(result)} 个群聊")
except Exception as e:
logger.debug(f"通过platform获取群聊列表失败: {e}")
# 方法4: 尝试通过context的provider获取
if not group_list and hasattr(self.context, 'provider'):
try:
provider = self.context.provider
if hasattr(provider, 'get_group_list'):
result = await provider.get_group_list()
if result:
group_list.extend(result)
logger.info(f"通过provider获取到 {len(result)} 个群聊")
except Exception as e:
logger.debug(f"通过provider获取群聊列表失败: {e}")
# 方法5: 使用收集到的群聊ID
if not group_list and self._collected_groups:
group_list = list(self._collected_groups)
logger.info(f"使用收集到的群聊ID,共 {len(group_list)} 个群聊")
if group_list:
# 去重
group_list = list(set(group_list))
logger.info(f"总共获取到 {len(group_list)} 个群聊,开始发送消息")
# 发送到所有群聊
for group_id in group_list:
try:
await self.context.send_message(group_id, message)
logger.info(f"已发送文件更新通知到群 {group_id}")
except Exception as e:
logger.error(f"发送文件更新通知到群 {group_id} 失败: {e}")
else:
logger.warning("无法获取群聊列表,无法执行全局发送")
except Exception as e:
logger.error(f"全局发送文件清单消息失败: {e}")
async def _fetch_fullmap(self) -> None:
if not self.session:
return
interval = self._get_fullmap_interval()
async with self._fullmap_fetch_lock:
now_wall = time.time()
if now_wall - self._fullmap_last_fetch_ts < interval:
if not self._fullmap_cache:
logger.info(f"fullmap 拉取跳过(限频): interval={interval}s")
return
now_mono = time.monotonic()
if now_mono < self._fullmap_next_fetch_ts:
if not self._fullmap_cache:
logger.info(f"fullmap 拉取跳过(限频): interval={interval}s")
return
self._fullmap_next_fetch_ts = now_mono + interval
self._fullmap_last_fetch_ts = time.time()
url = "https://tracker.ets2map.com/v3/fullmap"
try:
async with self.session.get(url, timeout=self._cfg_int('api_timeout_seconds', 10)) as resp:
if resp.status == 200:
data = await resp.json()
if isinstance(data, dict):
async with self._fullmap_lock:
self._fullmap_cache = data
self._fullmap_cache_ts = time.time()
logger.info("fullmap 拉取成功")
return
logger.info(f"fullmap 拉取失败 status={resp.status}")
except Exception as e:
logger.error(f"fullmap 拉取异常: {e}")
def _get_fullmap_tile_url(self, map_type: str) -> Optional[str]:
data = self._fullmap_cache or {}
candidates: List[str] = []
def walk(v: Any) -> None:
if isinstance(v, dict):
for val in v.values():
walk(val)
return
if isinstance(v, list):
for val in v:
walk(val)
return
if isinstance(v, str):
s = v.strip()
if s.startswith("http") and "{z}" in s and "{x}" in s and "{y}" in s:
candidates.append(s)
if isinstance(data, dict):
if data.get('Data'):
walk(data.get('Data'))
if data.get('data'):
walk(data.get('data'))
walk(data)
else:
walk(data)
if not candidates:
return None
seen = set()
uniq = []
for c in candidates:
if c in seen:
continue
seen.add(c)
uniq.append(c)
candidates = uniq
if map_type == "promods":
for c in candidates:
if "promods" in c.lower():
return c
for c in candidates:
lc = c.lower()
if "ets" in lc and "promods" not in lc:
return c
return candidates[0]
# --- 工具:头像处理 ---
def _normalize_avatar_url(self, url: Optional[str]) -> Optional[str]:
if not url:
return None
# 去除日志可能引入的反引号、括号、引号,以及误传入的 CQ 片段前缀
u = str(url).strip()
# 清理包装字符
for ch in ('`', '"', "'", '(', ')'):
u = u.strip(ch)
# 如果误传了完整片段,剥离前缀
if u.startswith('[CQ:image,file='):
u = u[len('[CQ:image,file='):]
# 去掉结尾的右括号
if u.endswith(']'):
u = u[:-1]
u = u.strip()
return u or None
async def _get_avatar_base64(self, url: str) -> Optional[str]:
if not self.session:
return None
try:
timeout_sec = self._cfg_int('api_timeout_seconds', 10)
async with self.session.get(url, timeout=timeout_sec) as resp:
if resp.status == 200:
content = await resp.read()
if content:
return base64.b64encode(content).decode('ascii')
return None
except Exception:
return None
async def _get_avatar_bytes(self, url: str) -> Optional[bytes]:
if not self.session:
return None
try:
timeout_sec = self._cfg_int('api_timeout_seconds', 10)
async with self.session.get(url, timeout=timeout_sec, allow_redirects=True) as resp:
if resp.status == 200:
content = await resp.read()
if content:
return content
else:
logger.info(f"头像下载失败: 空内容 status=200 url={url}")
return None
else:
logger.info(f"头像下载失败: status={resp.status} url={url}")
return None
except Exception as e:
logger.error(f"头像下载异常: url={url} err={e}", exc_info=False)
return None
async def _translate_text(self, content: str, cache: bool = True) -> str:
s = (content or "").strip()
if not s:
return content
if not self._cfg_bool('baidu_translate_enable', True):
return content
use_cache = self._cfg_bool('baidu_translate_cache_enable', False)
cache_key = hashlib.md5(s.encode('utf-8')).hexdigest()
if cache and use_cache:
cached = self._translate_cache.get(cache_key)
if cached:
return cached
app_id = self._cfg_str('baidu_translate_app_id', '').strip()
app_key = self._cfg_str('baidu_translate_key', '').strip()
if not app_id or not app_key or not self.session:
return content
try:
salt = str(random.randint(1000, 9999))
sign = hashlib.md5((app_id + s + salt + app_key).encode('utf-8')).hexdigest()
url = "https://fanyi-api.baidu.com/api/trans/vip/translate"
params = {
'q': s,
'from': 'auto',
'to': 'zh',
'appid': app_id,
'salt': salt,
'sign': sign
}
async with self.session.get(url, params=params, timeout=self._cfg_int('api_timeout_seconds', 10)) as resp:
if resp.status == 200:
data = await resp.json()
if isinstance(data, dict) and data.get('trans_result'):
dst = data['trans_result'][0].get('dst')
if isinstance(dst, str) and dst.strip():
translated = dst.strip()
if cache and use_cache:
self._translate_cache[cache_key] = translated
return translated
except Exception:
return content
return content
async def _get_avatar_bytes_with_fallback(self, url: str, tmp_id: Optional[str]) -> Optional[bytes]:
"""尝试多种 TruckersMP 头像URL变体,尽可能获取头像字节。"""
base = self._normalize_avatar_url(url)
candidates: List[str] = []
if base:
candidates.append(base)
# 切换 jpg/png
if base.lower().endswith('.jpg'):
candidates.append(base[:-4] + '.png')
elif base.lower().endswith('.png'):
candidates.append(base[:-4] + '.jpg')
# 解析 avatarsN/{id}.{stamp}.{ext} -> 生成多种组合
import re as _re
m = _re.search(r"https?://static\.truckersmp\.com/(avatarsN|avatars)/(\d+)(?:\.\d+)?\.(jpg|png)", base, _re.IGNORECASE)
if m:
folder = m.group(1)
pid = m.group(2)
ext = m.group(3).lower()
alt_ext = 'png' if ext == 'jpg' else 'jpg'
# 去掉时间戳
candidates.append(f"https://static.truckersmp.com/{folder}/{pid}.{ext}")
candidates.append(f"https://static.truckersmp.com/{folder}/{pid}.{alt_ext}")
# 切到另一个目录
other_folder = 'avatars' if folder.lower() == 'avatarsn' else 'avatarsN'
candidates.append(f"https://static.truckersmp.com/{other_folder}/{pid}.{ext}")
candidates.append(f"https://static.truckersmp.com/{other_folder}/{pid}.{alt_ext}")
# 根据 tmp_id 追加常见直连地址
if tmp_id:
for ext in ('jpg', 'png'):
candidates.append(f"https://static.truckersmp.com/avatars/{tmp_id}.{ext}")
candidates.append(f"https://static.truckersmp.com/avatarsN/{tmp_id}.{ext}")
# 去重保持顺序
seen = set()
uniq: List[str] = []
for c in candidates:
if not c:
continue
if c in seen:
continue
seen.add(c)
uniq.append(c)
for c in uniq:
b = await self._get_avatar_bytes(c)
logger.info(f"头像下载尝试: url={c} -> {'成功' if b else '失败'}")
if b:
return b
return None
# --- 内部工具方法 (保持不变) ---
def _load_bindings(self) -> Dict[str, Any]:
try:
if os.path.exists(self.bind_file):
with open(self.bind_file, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
except Exception as e:
logger.error(f"加载绑定数据失败: {e}")
return {}
def _save_bindings(self, bindings: dict) -> bool:
try:
with open(self.bind_file, 'w', encoding='utf-8') as f:
json.dump(bindings, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"保存绑定数据失败: {e}")
return False
def _get_bound_tmp_id(self, user_id: str) -> Optional[str]:
bindings = self._load_bindings()
user_binding = bindings.get(user_id)
if isinstance(user_binding, dict):
return user_binding.get('tmp_id')
return user_binding
def _bind_tmp_id(self, user_id: str, tmp_id: str, player_name: str) -> bool:
bindings = self._load_bindings()
bindings[user_id] = {
'tmp_id': tmp_id,
'player_name': player_name,
'bind_time': asyncio.get_event_loop().time()
}
return self._save_bindings(bindings)
def _unbind_tmp_id(self, user_id: str) -> bool:
bindings = self._load_bindings()
if user_id in bindings:
del bindings[user_id]
return self._save_bindings(bindings)
return False
COUNTRY_MAP_EN_TO_CN = {
"germany": "德国",
"de": "德国",
"france": "法国",
"fr": "法国",
"united kingdom": "英国",
"uk": "英国",
"gb": "英国",
"netherlands": "荷兰",
"nl": "荷兰",
"belgium": "比利时",
"be": "比利时",
"poland": "波兰",
"pl": "波兰",
"czech republic": "捷克",
"czechia": "捷克",
"cz": "捷克",
"slovakia": "斯洛伐克",
"sk": "斯洛伐克",
"italy": "意大利",
"it": "意大利",
"spain": "西班牙",
"es": "西班牙",
"portugal": "葡萄牙",
"pt": "葡萄牙",
"switzerland": "瑞士",
"ch": "瑞士",
"austria": "奥地利",
"at": "奥地利",
"hungary": "匈牙利",
"hu": "匈牙利",
"denmark": "丹麦",
"dk": "丹麦",
"sweden": "瑞典",
"se": "瑞典",
"norway": "挪威",
"no": "挪威",
"finland": "芬兰",
"fi": "芬兰",
"estonia": "爱沙尼亚",
"ee": "爱沙尼亚",
"latvia": "拉脱维亚",
"lv": "拉脱维亚",
"lithuania": "立陶宛",
"lt": "立陶宛",
"russia": "俄罗斯",
"ru": "俄罗斯",
"turkey": "土耳其",
"tr": "土耳其",
"romania": "罗马尼亚",
"ro": "罗马尼亚",
"bulgaria": "保加利亚",
"bg": "保加利亚",
"greece": "希腊",
"gr": "希腊",
"united states": "美国",
"usa": "美国",
"us": "美国",
"iceland": "冰岛",
"is": "冰岛",
"svalbard": "斯瓦尔巴群岛",
}
CITY_MAP_EN_TO_CN = {
"calais": "加来",
"duisburg": "杜伊斯堡",
"berlin": "柏林",
"paris": "巴黎",
"london": "伦敦",
"cambridge": "剑桥",
"milano": "米兰",
"milan": "米兰",
"rome": "罗马",
"madrid": "马德里",
"barcelona": "巴塞罗那",
"lisbon": "里斯本",
"rotterdam": "鹿特丹",
"amsterdam": "阿姆斯特丹",
"brussels": "布鲁塞尔",
"prague": "布拉格",
"vienna": "维也纳",
"budapest": "布达佩斯",
"warsaw": "华沙",
"krakow": "克拉科夫",
"akureyri": "阿克雷里",
"burgos": "布尔戈斯",
"praha": "布拉格",
"steinkjer": "斯泰恩谢尔",
"valmiera": "瓦尔米耶拉",
"umeå": "于默奥",
"umea": "于默奥",
"longyearbyen": "朗伊尔城",
"napoli": "那不勒斯",
"sundsvall": "松兹瓦尔",
}
LOCATION_FIX_MAP = {
"kirkenes": "希尔克内斯",
"kirkenes quarry": "希尔克内斯 采石场",
"c-d road": "加莱-杜伊斯堡",
"cd road": "加莱-杜伊斯堡",
"calais-duisburg road": "加莱-杜伊斯堡",
"calais - duisburg": "加莱-杜伊斯堡",
"calais–duisburg": "加莱-杜伊斯堡",
"calais-duisburg": "加莱-杜伊斯堡",
"calais intersection": "加来 交叉口",
"dortmund": "多特蒙德",
"hannover": "汉诺威",
"hamburg": "汉堡",
"strasbourg": "斯特拉斯堡",
"dijon": "第戎",
"reims": "兰斯",
"brussel": "布鲁塞尔",
"aalborg": "奥尔堡",
"kiruna": "基律纳",
"skellefteå": "谢莱夫特奥",
"skelleftea": "谢莱夫特奥",
"ljubjana": "卢布尔雅那",
"ljubljana": "卢布尔雅那",
"nikel": "尼克尔",
"travemünde": "特拉弗明德",
"travemunde": "特拉弗明德",
"zürich": "苏黎世",
"zurich": "苏黎世",
}
def _load_location_maps(self) -> None:
if getattr(self, "_location_maps_loaded", False):
return
def _strip_cn_city_suffix(cn: str) -> str:
t = (cn or "").strip()
if t.endswith("(城市)"):
t = t[:-4]
return t.strip()
def _parse_table(file_path: str) -> List[Tuple[str, str]]:
try:
if not os.path.exists(file_path):
return []
rows: List[Tuple[str, str]] = []
with open(file_path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line.startswith("|"):
continue
if line.startswith("| English |"):
continue
if line.startswith("|---"):
continue
parts = [p.strip() for p in line.strip("|").split("|")]
if len(parts) < 2:
continue
en = parts[0].strip()
cn = parts[1].strip()
if not en or not cn:
continue
rows.append((en, cn))
return rows
except Exception:
return []
def _add_mapping(en: str, cn: str) -> None:
en_raw = (en or "").strip()
cn_raw = (cn or "").strip()
if not en_raw or not cn_raw:
return
if cn_raw == en_raw:
return
en_key = en_raw.lower()
cn_clean = _cleanup_cn_location_text(cn_raw)
if not cn_clean:
return
status_m = _re_local.search(r"\s*-\s*(?P<status>[A-Za-z]+)\s*\((?P<num>\d+)\)\s*$", en_raw)
en_base = en_raw
if status_m:
en_base = en_raw[: status_m.start()].strip()
if cn_clean.lower() == en_base.lower():
return
city_m = _re_local.search(r"\s*\(City\)\s*$", en_base, flags=_re_local.IGNORECASE)
if city_m:
city_en_base = en_base[: city_m.start()].strip()
city_cn_base = _strip_cn_city_suffix(cn_clean)
if city_en_base and (city_cn_base or cn_clean).lower() == city_en_base.lower():
return
if city_en_base:
self.CITY_MAP_EN_TO_CN[city_en_base.lower()] = city_cn_base or cn_clean
self.LOCATION_FIX_MAP[city_en_base.lower()] = city_cn_base or cn_clean
self.LOCATION_FIX_MAP[en_base.lower()] = city_cn_base or cn_clean
self.LOCATION_FIX_MAP[en_key] = city_cn_base or cn_clean
return
self.COUNTRY_MAP_EN_TO_CN[en_base.lower()] = cn_clean
self.LOCATION_FIX_MAP[en_base.lower()] = cn_clean
self.LOCATION_FIX_MAP[en_key] = cn_clean
try:
root = os.path.dirname(__file__)
except Exception:
root = os.getcwd()
data_dir = os.path.join(root, "TruckersMP-citties-name")
for name in ("s1-cities.md", "promods-cities.md"):
path = os.path.join(data_dir, name)
for en, cn in _parse_table(path):
_add_mapping(en, cn)
self._location_maps_loaded = True
async def _translate_country_city(self, country: Optional[str], city: Optional[str]) -> Tuple[str, str]:
country_en = (country or "").strip()
city_en = (city or "").strip()
def _has_cjk(t: str) -> bool:
return bool(_re_local.search(r"[\u4e00-\u9fff]", t or ""))
def _clean_raw_text(raw: str) -> str:
t = (raw or "").strip()
if not t or _has_cjk(t):
return t
t = _re_local.sub(r"\s*\([^)]*\)\s*", " ", t)
t = _re_local.sub(r"\s*([^)]*)\s*", " ", t)
t = _re_local.sub(r"\s*\[[^\]]*\]\s*", " ", t)
t = _re_local.sub(r"[^A-Za-z\s\-]", " ", t)
t = _re_local.sub(r"\s+", " ", t).strip()
return t
def _ensure_cn_text(text: Optional[str], en_fallback: str, is_city: bool) -> str: