forked from Project-N-E-K-O/N.E.K.O
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtoken_tracker.py
More file actions
2223 lines (1899 loc) · 96.4 KB
/
token_tracker.py
File metadata and controls
2223 lines (1899 loc) · 96.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
全局 LLM Token 用量追踪模块
通过 monkey-patch OpenAI SDK 的 chat.completions.create(同步 + 异步),
自动拦截所有 LLM 调用(包括 LangChain 底层调用)的 usage 数据。
用 ContextVar 标记调用类型,确保 Nuitka/PyInstaller 兼容。
Usage:
from utils.token_tracker import TokenTracker, install_hooks, llm_call_context
# 启动时安装 hooks
install_hooks()
TokenTracker.get_instance().start_periodic_save()
# 在调用模块标记 call_type
with llm_call_context("conversation"):
async for chunk in llm.astream(messages):
...
"""
import atexit
import asyncio
import copy
import functools
import gzip
import hashlib
import hmac
import json
import logging
import os
import secrets
import threading
import time
import urllib.request
import urllib.error
from collections import deque
from contextlib import contextmanager
from contextvars import ContextVar
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Optional
from utils.config_manager import get_config_manager
from utils.file_utils import atomic_write_json
from utils.logger_config import get_module_logger
logger = get_module_logger(__name__)
# ---------------------------------------------------------------------------
# ContextVar: 调用类型标记(替代 stack inspection,Nuitka/PyInstaller 兼容)
# ---------------------------------------------------------------------------
_current_call_type: ContextVar[str] = ContextVar('_llm_call_type', default='unknown')
@contextmanager
def llm_call_context(call_type: str):
"""Context manager,在代码块内标记当前 LLM 调用类型。"""
token = _current_call_type.set(call_type)
try:
yield
finally:
_current_call_type.reset(token)
def set_call_type(call_type: str):
"""简单设置当前调用类型(适用于不方便 wrap 的场景)。"""
_current_call_type.set(call_type)
# ---------------------------------------------------------------------------
# 辅助函数
# ---------------------------------------------------------------------------
def _deep_copy_day(day: dict) -> dict:
"""深拷贝一天的统计数据。"""
return copy.deepcopy(day)
def _merge_day_stats(target: dict, source: dict):
"""将 source 的统计数据累加到 target 中(原地修改 target)。"""
for k in ("total_prompt_tokens", "total_completion_tokens", "total_tokens",
"cached_tokens", "total_prompt_chars", "call_count", "error_count"):
target[k] = target.get(k, 0) + source.get(k, 0)
# by_model
t_bm = target.setdefault("by_model", {})
for model, bucket in source.get("by_model", {}).items():
if model not in t_bm:
t_bm[model] = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0,
"cached_tokens": 0, "prompt_chars": 0, "call_count": 0}
for k in ("prompt_tokens", "completion_tokens", "total_tokens",
"cached_tokens", "prompt_chars", "call_count"):
t_bm[model][k] = t_bm[model].get(k, 0) + bucket.get(k, 0)
# by_call_type
t_bt = target.setdefault("by_call_type", {})
for ct, bucket in source.get("by_call_type", {}).items():
if ct not in t_bt:
t_bt[ct] = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0,
"cached_tokens": 0, "prompt_chars": 0, "call_count": 0}
for k in ("prompt_tokens", "completion_tokens", "total_tokens",
"cached_tokens", "prompt_chars", "call_count"):
t_bt[ct][k] = t_bt[ct].get(k, 0) + bucket.get(k, 0)
# ---------------------------------------------------------------------------
# 跨进程文件锁(O_CREAT | O_EXCL 方式,跨平台)
# ---------------------------------------------------------------------------
@contextmanager
def _file_lock(lock_path: Path, timeout: float = 10.0):
"""基于文件系统的跨进程互斥锁。
使用 O_CREAT | O_EXCL 原子创建锁文件,确保同一时刻只有一个进程持有锁。
锁文件中写入 PID + 时间戳,用于超时后检测过期锁。
"""
fd = -1
deadline = time.monotonic() + timeout
while True:
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR)
# 写入 PID 便于调试
os.write(fd, f"{os.getpid()},{time.time()}".encode())
break
except (FileExistsError, PermissionError, OSError):
# 检测过期锁(持有超过 30 秒视为进程崩溃后的残留)
try:
lock_age = time.time() - os.path.getmtime(str(lock_path))
if lock_age > 30:
try:
os.unlink(str(lock_path))
except OSError:
pass
continue
except OSError:
pass
if time.monotonic() >= deadline:
logger.warning("Token tracker: file lock timeout, force removing stale lock")
try:
os.unlink(str(lock_path))
except OSError:
time.sleep(0.1)
raise TimeoutError(f"file lock timeout after {timeout}s: {lock_path}")
time.sleep(0.05)
try:
yield
finally:
try:
os.close(fd)
except OSError:
pass
for _retry in range(3):
try:
os.unlink(str(lock_path))
break
except OSError:
if _retry < 2:
time.sleep(0.05)
# ---------------------------------------------------------------------------
# 远程遥测上报配置(参考 vLLM DO_NOT_TRACK 机制)
#
# 设计与 vLLM 一致:秘钥和地址硬编码在源码中,无需用户配置环境变量。
# HMAC 不是为了防止逆向(代码本身可读),而是防止随机噪声和简单伪造。
# ---------------------------------------------------------------------------
# ★ 发版前修改:遥测服务器地址。为空则不上报。
_TELEMETRY_SERVER_URL = "http://118.31.122.91:8099"
if _TELEMETRY_SERVER_URL and not _TELEMETRY_SERVER_URL.startswith(("http://", "https://")):
logger.warning("Token tracker: invalid telemetry URL scheme, disabling remote reporting")
_TELEMETRY_SERVER_URL = ""
# ★ 发版前修改:HMAC 签名密钥(与 server.py 中的 HMAC_SECRET 保持一致)
_TELEMETRY_HMAC_SECRET = "neko-v1-a3f8b2c1d4e5f6789012345678abcdef" # noqa: S105
# Opt-out 开关(标准 DO_NOT_TRACK 约定,用户可自行设置)
_DO_NOT_TRACK = any(
os.getenv(v, "").strip() in ("1", "true", "yes")
for v in ("NEKO_DO_NOT_TRACK", "DO_NOT_TRACK")
)
# 上报间隔(60 秒)
# 节流设计:
# record() → 即时写入内存(零 I/O)
# save() → 每 60s 本地落盘,然后调用 _report_to_server()
# _report_to_server() → 仅当距上次上报 ≥ 60s 时才真正发 HTTP
# 所以每个进程最多每 1 分钟发一次请求。3 个 server 进程 = 180 req/h/device。
_TELEMETRY_REPORT_INTERVAL = 60
# 上报超时
_TELEMETRY_TIMEOUT = 10 # 秒
# Gzip 上报阈值:< 1KB 的 payload 不压缩。gzip 头 + CRC 有 ~20B 固定开销,
# 小 payload 压缩比往往 < 2x,不值得。典型 daily_stats payload 5-50KB raw,
# gzip 后通常压到 1/5-1/10。服务端 v2 起支持 Content-Encoding: gzip;老服
# 务端不解析就直接 415,故首次发布要 server 先升级再开客户端 gzip。
_TELEMETRY_GZIP_THRESHOLD = 1024
def _get_app_version_from_changelog() -> str:
"""从 config/changelog/ 目录中读取最高版本号作为当前 app 版本。"""
changelog_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "config", "changelog"
)
if not os.path.isdir(changelog_dir):
return "unknown"
best_ver: tuple[int, ...] = (0,)
best_stem = "unknown"
try:
for fname in os.listdir(changelog_dir):
if not fname.endswith(".md"):
continue
stem = fname[:-3]
try:
ver = tuple(int(x) for x in stem.split("."))
except (ValueError, AttributeError):
continue
if ver > best_ver:
best_ver = ver
best_stem = stem
return best_stem
except OSError as e:
logger.debug(f"Token tracker: failed to read changelog dir: {e}")
return "unknown"
_MACHINE_ID_PLACEHOLDERS = {
# systemd 在 first-boot 前的占位
"uninitialized",
# 全零/全 F:VM 镜像克隆未重置、sysprep 异常、虚拟主板默认值的常见非真实 ID
"00000000000000000000000000000000",
"ffffffffffffffffffffffffffffffff",
"00000000-0000-0000-0000-000000000000",
"ffffffff-ffff-ffff-ffff-ffffffffffff",
}
def _is_valid_machine_id(value: Optional[str]) -> bool:
"""合理性校验 OS 机器 ID,防止占位值或镜像克隆未重置的非真实 ID 把多台
机器折叠到同一个 device_id。
要求去掉 GUID 分隔符后正好 32 位十六进制,且不在已知占位符黑名单里。
校验失败时调用方应 fallback 到 legacy 算法,而不是把无效值当指纹用。
"""
if not value:
return False
normalized = value.strip().lower()
if normalized in _MACHINE_ID_PLACEHOLDERS:
return False
hex_only = normalized.replace("-", "")
if len(hex_only) != 32:
return False
return all(c in "0123456789abcdef" for c in hex_only)
def _read_os_machine_id() -> Optional[str]:
"""读取操作系统级稳定机器标识。
- Windows: HKLM\\SOFTWARE\\Microsoft\\Cryptography\\MachineGuid
- macOS: IOPlatformUUID(ioreg -rd1 -c IOPlatformExpertDevice)
- Linux: /etc/machine-id 或 /var/lib/dbus/machine-id
这些 ID 由系统安装时生成,绑定到主板/系统而非网络配置,不会因为
网卡变化(VPN / Docker / 外接 NIC)或安装路径变化(Steam 库迁移、
源码版 / 打包版切换)漂移。
每个来源的返回值都会过 _is_valid_machine_id 合理性校验,避免占位值
(systemd `uninitialized`、全零/全 F GUID)被当成有效指纹。读取失败
或校验不通过返回 None,调用方需 fallback 到 legacy 算法。
"""
import sys
try:
if sys.platform == "win32":
import winreg
try:
key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Cryptography",
0,
winreg.KEY_READ | winreg.KEY_WOW64_64KEY,
)
try:
value, _ = winreg.QueryValueEx(key, "MachineGuid")
finally:
winreg.CloseKey(key)
candidate = value.strip() if isinstance(value, str) else None
if _is_valid_machine_id(candidate):
return candidate
except OSError:
return None
elif sys.platform == "darwin":
import re
import subprocess
try:
out = subprocess.run(
["ioreg", "-rd1", "-c", "IOPlatformExpertDevice"],
capture_output=True,
text=True,
timeout=2,
)
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
if out.returncode == 0:
m = re.search(r'"IOPlatformUUID"\s*=\s*"([^"]+)"', out.stdout)
if m:
candidate = m.group(1).strip()
if _is_valid_machine_id(candidate):
return candidate
else:
for path in ("/etc/machine-id", "/var/lib/dbus/machine-id"):
try:
with open(path, "r", encoding="utf-8") as f:
value = f.read().strip()
except (FileNotFoundError, PermissionError, OSError):
continue
if _is_valid_machine_id(value):
return value
except Exception:
return None
return None
def _get_legacy_device_id() -> str:
"""旧版 device_id 算法(保留用于迁移期 fold)。
SHA256(uuid.getnode() | install_dir | "neko-telemetry")。getnode 在多网卡
机器上不稳定(VPN / Docker / 外接网卡 enumeration order 变化),install_dir
随安装位置变化,所以这个 ID 容易"漂",长期 retention 数据会被打散。新版本
保留它仅用于 server 端 fold 历史数据:客户端在 payload 中同时上报新旧两个
ID,server 后续可通过 events 表里的 device_id_legacy 字段建立 mapping。
"""
import uuid as _uuid
import platform
try:
machine_id = str(_uuid.getnode())
except Exception:
machine_id = platform.node()
install_salt = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
raw = f"{machine_id}|{install_salt}|neko-telemetry"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _get_anonymous_device_id() -> str:
"""生成稳定的匿名设备指纹。
优先使用 OS 级稳定标识(_read_os_machine_id),失败时回退到 legacy 算法
保证不会写入空值。结果为 64 字符十六进制 SHA256,不可逆,不含 PII。
与 legacy 算法的命名空间用 "neko-telemetry-v2" 区分,确保新旧 ID 不会
在哈希空间相撞。
参考 vLLM: 只用硬件/系统信息生成匿名 ID,不含用户 PII。
"""
os_id = _read_os_machine_id()
if os_id:
return hashlib.sha256(f"{os_id}|neko-telemetry-v2".encode("utf-8")).hexdigest()
return _get_legacy_device_id()
# ---------------------------------------------------------------------------
# A/B test 分支 / 用户 locale / 时区
#
# 三者都是描述「这台机器/这个用户当前是谁」的副字段:
# - branch:首次启动时随机抽签后落盘,后续启动只读不改,保证同一设备稳定。
# 扩展 _TELEMETRY_BRANCHES 元组即可触发 split,新用户随机进新池。注意:
# 从池里移除某分支会让落盘旧值被严格校验判非法、按当前池重抽迁组(见
# privacy_default_off_v1 退役说明),这是退役实验的有意行为,不是
# append-only 扩展场景。
# - locale / timezone:每次上报时取当下值;同一设备换语言/换时区都视为同
# 一个 device_id,server 端按 "latest seen" 覆写即可。
# ---------------------------------------------------------------------------
_TELEMETRY_BRANCH_FILE = ".telemetry_branch"
# A/B 池(只决定「首启默认值」实验分组;首启后用户行为已落盘、不再响应覆写,其分组
# 归因对默认值实验无意义,分析端按真·首启样本过滤即可):
# - "main":控制组,沿用历史默认——主动搭话里的「屏幕分享来源」
# (proactiveVisionChatEnabled)首启默认开;隐私模式仍按地区分流。
# - "vision_chat_default_off":实验组,把「屏幕分享来源」首启默认翻成关,并在前端
# 检测到用户进游戏/娱乐(弹「要不要开屏幕分享搭话」)或进专注工作(弹「要不要关
# 屏幕分享避嫌」)时一次性弹窗。注意这一组只改屏幕分享来源默认值,**不动**隐私
# 模式默认值。
# 地区交互:屏幕分享来源只有在隐私模式关(vision 开)时才有意义;隐私默认仍按
# 地区分流(仅中国地区默认隐私关),海外默认隐私开 → 对本实验天然 no-op。抽签
# 全地区随机,海外也会落实验组但首启覆写 / 弹窗都不生效;分析时按 locale 过滤,
# A/B 差异主要体现在国内。
# - "proactive_interval_20s":海外专属实验组,把「主动搭话间隔」
# (proactiveChatInterval)首启默认从控制组 15s 拉长到 20s,看更慢的搭话节奏对
# 海外用户的影响。**不动**隐私模式 / 屏幕分享来源默认值,也没有弹窗。
# 地区交互:与 vision_chat_default_off 方向相反——只在海外(前端
# _isUserRegionChina() 为 false)才覆写间隔默认值;国内落到本组天然 no-op。抽签
# 全地区随机、三组互斥(同设备只落一个 branch),但 vision 实验差异在国内、本组
# 只影响海外,目标地区不重叠,可同时在线观测。注意 _bucket_proactive_interval
# 把 15s / 20s 都归进「10-30s」桶,所以 cohort 命中靠 branch 维度区分,不靠间隔桶。
#
# 已退役实验(老落盘值被 _read 严格校验判非法 → 下次启动按当前池随机重抽,落 main、
# vision_chat_default_off 或 proactive_interval_20s。都是已过首启的用户,重抽只改
# telemetry 标签、不动已落盘的用户偏好,对「默认值」实验无影响,故不为其单独做确定性
# 迁移):
# - "privacy_default_off_v1"(试国外隐私默认关):前期数据效果差,已下线。
# - "privacy_default_off_v2"(试国内隐私默认开):改方向去测屏幕分享来源,已下线。
# - "proactive_interval_25s"(试海外搭话间隔 20s→25s):数据点没能通过 A/A 测试,
# 下线回退到 proactive_interval_20s(15s→20s)重测;A/A 管线修好前不重新上线。
_TELEMETRY_BRANCHES: tuple = ("main", "vision_chat_default_off", "proactive_interval_20s")
# 进程级缓存:keyed by str(config_dir)。写盘失败的环境下(只读 FS / 权限拒绝),
# 不缓存就每次 secrets.choice 重抽,导致同一 install 的 TokenTracker 上报和
# 前端 `/conversation-settings` 拿到不同分支,A/B 归因被打散。dict.setdefault
# 在 CPython GIL 下是原子的,足以扛住模块内的并发首抽。
_telemetry_branch_cache: dict = {}
def _get_telemetry_branch(config_dir: Path) -> str:
"""读取或抽签生成 A/B test 分支标识,持久化在 config_dir 下。
多进程冷启动安全:用 ``O_CREAT | O_EXCL`` 原子创建保证只有一个进程能写入;
其它并发进程拿到 FileExistsError 后回读同一文件,确保 device-stable
cohorting(同 device 不同 worker 不会落到不同 branch)。同款模式见
_file_lock 的实现。
进程级缓存:首次 resolve 后落 `_telemetry_branch_cache`,后续调用直接命中。
主要为只读 FS / 权限错误等持久化失败的环境兜底——多 cohort 下没有这层缓存,
每次 `secrets.choice` 都会重抽,同一进程内不同调用方会观察到不同分支。
"""
cache_key = str(config_dir)
cached_proc = _telemetry_branch_cache.get(cache_key)
if cached_proc is not None:
return cached_proc
p = config_dir / _TELEMETRY_BRANCH_FILE
def _read() -> Optional[str]:
# 返 None 只表示「文件不存在 / 内容非法」两种确定状态;transient I/O 错误
# 故意向上冒泡。否则老设备一次读盘失败会被吞成 None,slow path 把
# FileExistsError 当成「文件存在但内容坏」走自愈覆盖,静默把设备改组。
# 让 OSError 透出,让 `/conversation-settings` 的 except 把 telemetryBranch
# 返 None,前端保留 pending marker,下次启动 fast path 读到合法值收敛。
#
# 严格校验:活跃分支都在 _TELEMETRY_BRANCHES 里,所以正常情况下不会误杀。
# 唯一例外是退役实验(如 privacy_default_off_v1)——它被有意移出池,落盘旧值
# 在这里判非法、触发按当前池重抽(见上方退役说明),正是「让老实验群退出原
# 分支」的预期路径。
if not p.exists():
return None
value = p.read_text(encoding="utf-8").strip()
if value in _TELEMETRY_BRANCHES:
return value
return None
# Fast path:文件已存在直接读
cached = _read()
if cached is not None:
return _telemetry_branch_cache.setdefault(cache_key, cached)
branch = secrets.choice(_TELEMETRY_BRANCHES) if _TELEMETRY_BRANCHES else "main"
try:
config_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.debug(f"Token tracker: failed to create config dir for branch file: {e}")
# Slow path:原子创建。两个进程同时走到这里只有一个成功,另一个回读拿到
# 同一 branch,保证 device-stable。
try:
fd = os.open(str(p), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
try:
os.write(fd, branch.encode("utf-8"))
finally:
os.close(fd)
return _telemetry_branch_cache.setdefault(cache_key, branch)
except FileExistsError:
# 另一个进程抢先写了 —— 回读它写的值,确保两个进程返回同一 branch
peer = _read()
if peer is not None:
return _telemetry_branch_cache.setdefault(cache_key, peer)
# peer 是 None 说明文件存在但内容不在 _TELEMETRY_BRANCHES 里(截断/损坏/
# 跨版本残留)。这种情况下若只返回本进程抽到的值不修盘,下次进程重启会
# 再走一次「读到坏值 → fast path miss → slow path 拿到 FileExistsError →
# 重抽」,cohort 在多次启动间反复翻滚。覆盖修盘保证只有这一次重抽,
# 之后就稳定。
try:
with open(p, "w", encoding="utf-8") as f:
f.write(branch)
except Exception as e:
logger.debug(f"Token tracker: failed to heal corrupt branch file: {e}")
return _telemetry_branch_cache.setdefault(cache_key, branch)
except Exception as e:
# 写盘失败不致命:进程级缓存 setdefault 保证同一进程后续所有调用方拿到
# 相同分支,TokenTracker 上报和前端 API 不会互相打架。下次进程重启时若
# 写盘仍然失败,缓存重新随机抽——按设计这就是 server 端看到的分布噪声
# 来源,不构成「同一 install 多个分支」的错误数据。
logger.debug(f"Token tracker: failed to persist telemetry branch: {e}")
return _telemetry_branch_cache.setdefault(cache_key, branch)
def get_telemetry_branch() -> str:
"""对外暴露的 A/B test 分支读取入口。
`_get_telemetry_branch` 是内部实现(参数化 config_dir,方便测试);本函数从
全局 config_manager 取 config_dir 后转发。前端通过 API 拿到 branch 后可在
首次启动时按分支选择默认行为,与 token tracker 自身上报的 branch 保持一致。
"""
return _get_telemetry_branch(get_config_manager().config_dir)
def _get_telemetry_locale() -> str:
"""获取用户 UI locale (zh-CN / en-US / ja-JP …)。
优先用 language_utils.get_global_language_full —— 它先看 Steam 设置再 fallback
到系统语言,是 codebase 里 "用户真正在用的 UI 语言" 的真值。失败回退到
stdlib locale。
"""
try:
from utils.language_utils import get_global_language_full
loc = get_global_language_full()
if loc:
return str(loc)[:32]
except Exception:
pass
try:
import locale as _locale
sys_locale = _locale.getlocale()[0]
if sys_locale:
return str(sys_locale)[:32]
except Exception:
pass
return "unknown"
def _is_release_build() -> bool:
"""是否打包过 —— PyInstaller (``sys.frozen``) 或 Nuitka (``__compiled__`` /
``__nuitka_binary_dir``)。两种打包器都要识别:PyInstaller 走 spec 链路,
Nuitka 走 build_nuitka.bat 链路。"""
import sys
if getattr(sys, "frozen", False):
return True
# Nuitka 在每个编译模块的 globals 里注入 __compiled__;主模块还有
# __nuitka_binary_dir。先看当前模块 globals,再兜底主模块属性,确保 standalone
# 和 onefile 两种 Nuitka 模式都能识别。
if "__compiled__" in globals() or "__nuitka_binary_dir" in globals():
return True
main_mod = sys.modules.get("__main__")
if main_mod is not None and (
hasattr(main_mod, "__nuitka_binary_dir") or hasattr(main_mod, "__compiled__")
):
return True
return False
def _get_telemetry_metadata() -> tuple[str, str]:
"""一次性返回 ``(distribution, steam_user_id)``,两个字段同源同观测点。
合并自原 ``_get_telemetry_distribution()`` 与 ``_get_telemetry_steam_user_id()``:
Steamworks ``Users.GetSteamID()`` **只调一次**,distribution 与
steam_user_id 从同一次观测派生。原本两个函数各调一次 ``GetSteamID()``,
Steamworks SDK 异步 init 时两次调用可能跨越 ready 边界——第一次返 0
(distribution 走 ``release``)、第二次返 Steam64(steam_user_id 拿到),
产出 ``release + 非空 Steam64`` 的矛盾态。合并后该矛盾态在源头消除。
**不变量**:返回的 steam_user_id 非空 ⟹ distribution == ``steam``。
(反之不成立:steam + 空 ID 是合法尾部,见判定 3。)
判定顺序(沿用原逻辑):
1. 非 release build → ``("source", "")``。源码运行哪怕开着 Steam 客户端
也算 source —— 只有 release 才可能是 Steam 版。
2. release + ``GetSteamID()`` 拿到非零 Steam64 → ``("steam", str(sid))``。
锚定首个信号,distribution 与 ID 同次观测。
3. release + 工坊订阅 > 0 或 ``workshop_config.json`` 存在 → ``("steam", "")``。
证明这台机器跑过 Steam 版(cloudsave 会把 workshop_config.json 打包
带走),但本次没从 Steam 客户端拿到登录用户(没开 / 断网)。
4. release 但无任何 Steam 信号 → ``("release", "")``。
Steam64 用 string 而非 int 上报,避免 u64(常超 2^53)在 JS / 部分 JSON
消费方精度丢失。所有异常 swallow —— 埋点不能抛。
"""
if not _is_release_build():
return "source", ""
# 实时探测:GetSteamID() 只调一次,结果同时决定 distribution 和
# steam_user_id —— 这是修复 race 的核心,不再分两次调用跨越 ready 边界。
try:
from utils.steam_state import get_steamworks
sw = get_steamworks()
if sw is not None:
sid = 0
try:
sid = int(sw.Users.GetSteamID() or 0)
except Exception:
sid = 0
if sid > 0:
return "steam", str(sid)
# 没拿到登录用户,但订阅过工坊也算 Steam 版(steam + 空 ID)。
try:
if int(sw.Workshop.GetNumSubscribedItems() or 0) > 0:
return "steam", ""
except Exception:
pass
except Exception:
pass
# 磁盘兜底:之前任何一次会话写过 workshop_config.json 即证明跑过 Steam
# 版,即使本次 Steam 客户端没开(cloudsave 会把它带走)。
try:
from utils.config_manager import get_config_manager
cm = get_config_manager()
if (cm.config_dir / "workshop_config.json").exists():
return "steam", ""
except Exception:
pass
return "release", ""
def _get_telemetry_timezone() -> str:
"""获取本地时区。优先 IANA (Asia/Shanghai),回退到 UTC 偏移 (+08:00)。"""
try:
import tzlocal
tz = tzlocal.get_localzone()
if tz is not None:
name = str(tz)
if name:
return name[:64]
except Exception:
pass
try:
now_local = datetime.now().astimezone()
local_tz = now_local.tzinfo
if local_tz is not None:
name = str(local_tz)
# Windows 上 astimezone 可能给出 "China Standard Time" 这类非 IANA 字串,
# 没有 '/' 时退到 offset 表示,避免污染按 IANA 切片的分析。
if name and "/" in name:
return name[:64]
# 取实际 UTC 偏移(aware datetime 反映当前 DST 状态)。time.altzone /
# time.daylight 不行:time.daylight 只表示"locale 有没有 DST 制度",
# 不是"现在是不是 DST",在有 DST 的时区会全年报 DST 偏移。
offset = now_local.utcoffset()
if offset is not None:
total_sec = int(offset.total_seconds())
sign = "+" if total_sec >= 0 else "-"
abs_sec = abs(total_sec)
return f"{sign}{abs_sec // 3600:02d}:{(abs_sec % 3600) // 60:02d}"
except Exception:
pass
return "unknown"
_DEVICE_HW_CACHE: Optional[str] = None
def _get_device_hw() -> str:
"""设备硬件画像(低基数 enum 复合串),进程内只算一次。
形如 ``win|x86_64|16to32|9to16``(os|arch|ram_tier|cpu_tier)。作为 devices
表的**设备属性**(非计数)上报,用来 JOIN 留存做"低配设备首日流失率"——区分
"跑不动而走"与"不喜欢而走"。
所有维度都是分桶 enum,**绝不发原始值**(RAM 字节 / GPU 型号 / 机器名)——
守 dim 低基数 + 零 PII(同 #1426 T3)。
检测全部 inline(psutil / platform / os):不 import memory.embeddings —— 那会
触发 module-layering 的 utils(L1)→memory(L2) 反转 + 制造 memory↔utils 环
(check_module_layering 对函数内 lazy import 同样计)。RAM 检测本就是 psutil
一行、没复用价值;真正值得复用的 CPU AVX/VNNI cpuid 检测对"跑不动流失"是
二阶信号(多数用户走远程 LLM),暂不收,想要可把检测抽成 utils 层共享 util。
任一维度失败回退 'unknown',整体绝不抛(埋点不能挡上报)。
"""
global _DEVICE_HW_CACHE
if _DEVICE_HW_CACHE is not None:
return _DEVICE_HW_CACHE
import platform as _plat
sysname = (_plat.system() or "").lower()
os_tag = {"windows": "win", "darwin": "mac", "linux": "linux"}.get(sysname, "other")
mach = (_plat.machine() or "").lower()
if mach in ("x86_64", "amd64", "x64"):
arch = "x86_64"
elif mach in ("arm64", "aarch64"):
arch = "arm64"
else:
arch = "other"
try:
import psutil
gb = psutil.virtual_memory().total / (1024 ** 3)
ram_tag = ("lt8" if gb < 8 else "8to16" if gb < 16
else "16to32" if gb < 32 else "ge32")
except Exception:
ram_tag = "unknown" # psutil 缺失/异常:降级 unknown,埋点不能挡上报
try:
n = os.cpu_count() or 0
cpu_tag = ("unknown" if n <= 0 else "le4" if n <= 4 else "5to8" if n <= 8
else "9to16" if n <= 16 else "gt16")
except Exception:
cpu_tag = "unknown" # cpu_count 异常:降级 unknown,不抛
_DEVICE_HW_CACHE = f"{os_tag}|{arch}|{ram_tag}|{cpu_tag}"
return _DEVICE_HW_CACHE
def _compute_telemetry_signature(payload_json: str, timestamp: float) -> str:
"""计算遥测上报的 HMAC-SHA256 签名。"""
body_hash = hashlib.sha256(payload_json.encode("utf-8")).hexdigest()
message = f"{timestamp}|{body_hash}"
return hmac.new(
_TELEMETRY_HMAC_SECRET.encode("utf-8"),
message.encode("utf-8"),
hashlib.sha256,
).hexdigest()
# ---------------------------------------------------------------------------
# 主动搭话 / 隐私模式设置快照埋点
# ---------------------------------------------------------------------------
def _bucket_proactive_interval(seconds) -> str:
"""把 proactiveChatInterval(1-3600 秒)分桶成低基数 enum。
**不上报 raw 秒数** —— 那是连续值,进 dim 会让 metric_key 基数爆炸
(跟之前 lanlan_name 同类教训)。分 5 桶覆盖典型配置区间。
"""
try:
s = float(seconds)
except (TypeError, ValueError):
return "unknown"
if s < 10:
return "<10s"
if s < 30:
return "10-30s"
if s < 60:
return "30-60s"
if s < 300:
return "60-300s"
return ">=300s"
def record_settings_state() -> None:
"""读当前主动搭话 / 隐私模式设置,打一个 settings_state counter。
触发时机:**仅** app 启动(record_app_start,仅 main_server 进程)。
语义说明(CodeRabbit 反馈后定型):server 端 instrument_counters 按
(stat_date, device_id, metric_key) 累加 UPSERT。本函数只在启动打点,
所以一条记录 = "用户本次启动时的设置组合"。每天每设备启动几次就 +几,
是**观测次数**而非 gauge 式"当前最终状态"——但对"深度用户惯用什么档"
的分析够用:按 device 取计数最高的 combo 即其惯用档。
刻意**不**在 save_global_conversation_settings 里打点:那样用户一天内
每切一次设置就给一个新 combo +1,把分布污染成"切换轨迹"。要精确的
per-device-per-day 最终状态需要 server 端 gauge/overwrite 语义,当前
instrument 管道不支持,且对本分析非必要。
用途:server 端按 device 活跃天数 / event_count 切出深度用户,再看他们
settings_state 各 dim 组合的分布 —— 即"深度用户把主动搭话 / 隐私模式
定在什么档"。
dim 全是低基数 enum,interval 分桶不发 raw 秒数:
- proactive: on / off(proactiveChatEnabled)
- interval: <10s / 10-30s / ... / >=300s(off 时为 "off")
- vision_chat: on / off(proactiveVisionChatEnabled)
- privacy: on / off(隐私模式 = proactiveVisionEnabled 反面,默认关)
"""
if _DO_NOT_TRACK:
return
try:
from utils.preferences import load_global_conversation_settings
from utils.instrument import counter as _c
s = load_global_conversation_settings()
proactive_on = bool(s.get("proactiveChatEnabled", False))
_c(
"settings_state", 1,
proactive="on" if proactive_on else "off",
interval=(_bucket_proactive_interval(s.get("proactiveChatInterval", 0))
if proactive_on else "off"),
vision_chat="on" if s.get("proactiveVisionChatEnabled", False) else "off",
# 隐私模式 = proactiveVisionEnabled 的反面(default True → 默认隐私关)
privacy="on" if not s.get("proactiveVisionEnabled", True) else "off",
)
except Exception:
# 埋点失败不影响业务,静默
pass
# ---------------------------------------------------------------------------
# TokenTracker 单例
# ---------------------------------------------------------------------------
class TokenTracker:
"""线程安全 + 多进程安全的全局 LLM token 用量追踪器。
设计:
- 所有进程共享单个 token_usage.json 文件
- 内存中只追踪"尚未落盘的增量"(delta)
- save() 使用文件锁做 read-merge-write,保证多进程不丢数据
- get_stats() 读磁盘 + 合并内存 delta,不做任何文件删除
"""
_instance: Optional['TokenTracker'] = None
_init_lock = threading.Lock()
@classmethod
def get_instance(cls) -> 'TokenTracker':
if cls._instance is None:
with cls._init_lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self._lock = threading.Lock()
self._config_manager = get_config_manager()
# 尚未落盘的增量数据(save 成功后清空)
self._delta_daily: dict = {}
self._delta_records: deque = deque(maxlen=200)
# 持久化控制
self._save_interval = 60 # 秒
self._dirty = False
self._save_task: Optional[asyncio.Task] = None
# 远程遥测上报
self._device_id: str = "" # 延迟生成
self._branch: str = "" # 延迟生成(首次上报时读盘/抽签)
self._last_report_time: float = 0.0
self._report_interval = _TELEMETRY_REPORT_INTERVAL
self._unsent_daily: dict = {} # 尚未成功上报到服务器的增量
self._unsent_records: list = []
# batch_seq:当前正在上报或重传中的窗口标识。新窗口首次进入 _report_to_server
# 时分配一次(secrets.token_hex),失败重传时保留同一个值,让 server
# seen_batches 能 dedupe "网络 timeout 但 server 已经 commit" 的重传。
# 成功 200 后清空,下次窗口再分配新 seq。跟 _unsent_daily 一起持久化。
self._pending_batch_seq: Optional[str] = None
self._has_recorded_app_start: bool = False # 🔒 app_start 单次上报锁
self._session_start_ts: float = 0.0 # session_end 计算 duration 用
self._session_process: str = "unknown"
# 本 session 用户消息轮数。note_user_message 累加,record_app_start 重置,
# _atexit_save(session_end) emit 成 session_turn_count histogram —— 含 0
# 即"零消息会话"(开了 app 一句没聊就走),D1 流失最直接信号。
self._session_msg_count: int = 0
self._first_user_message_recorded: bool = False # 🔒 首条用户消息单次锁
self._core_loop_recorded: bool = False # 🔒 首次完成核心 loop 单次锁
# 首次启动:迁移旧版 per-instance 文件
self._migrate_legacy_files()
# 恢复上次未成功上报的远程数据
self._load_unsent_queue()
# atexit 兜底:不管进程如何退出(SIGTERM / 异常 / 正常结束),都尝试保存
# 注意:SIGKILL (kill -9) 无法被拦截,此时最多丢 60s 数据
atexit.register(self._atexit_save)
# ---- 存储路径 ----
@property
def _storage_path(self) -> Path:
return self._config_manager.config_dir / "token_usage.json"
@property
def _lock_file_path(self) -> Path:
return self._config_manager.config_dir / ".token_usage.lock"
@property
def _storage_dir(self) -> Path:
return self._config_manager.config_dir
@property
def _unsent_queue_path(self) -> Path:
"""远程上报未发送队列的持久化文件。
进程被 kill 时 _unsent_daily 会丢失(纯内存)。
通过将队列写到这个文件,重启后可以恢复并重发。
"""
return self._config_manager.config_dir / ".telemetry_unsent.json"
# ---- atexit / unsent 持久化 ----
def _atexit_save(self):
"""atexit 兜底:进程退出前尽最后努力保存。
覆盖场景:SIGTERM / 未捕获异常 / 正常退出 / sys.exit()
不覆盖:SIGKILL (kill -9) / 断电 — 此时最多丢 60s 数据
顺序要点:先 emit session_end 到 instrument buffer,再 save()。
save() → _report_to_server 会 snapshot 走 instrument,所以 emit
必须先发生;否则 session_end 的 counter/histogram 进了 buffer 但
没机会被 snapshot,远程 dashboard 看不到 session_end —— 配对的
session_start 看得见、session_end 永远缺失,dashboard 上"异常退出
率"会被误算成 100%。event 单独通过 event_logger.flush 走本地 jsonl。
"""
# global 声明提到函数开头:下面 3b 步骤会读 _TELEMETRY_SERVER_URL,
# Python 要求 global 声明先于任何使用(否则 SyntaxError)。
global _TELEMETRY_SERVER_URL
# ── 1) session_end 先落 instrument buffer,让随后的 save() 带上 ──
try:
from utils.instrument import (
event as _instr_event,
counter as _instr_counter,
histogram as _instr_histogram,
)
duration = (time.time() - self._session_start_ts) if self._session_start_ts > 0 else 0.0
_instr_event(
"session_end",
process=self._session_process,
duration_sec=round(duration, 1),
)
_instr_counter("session_end", process=self._session_process)
if duration > 0:
# 直接传秒;instrument bounds 是数字通用,没绑定单位
_instr_histogram("session_duration_sec", duration, process=self._session_process)
# 本 session 用户消息轮数(无条件 emit,含 0)——0 即零消息会话。
# 配合 session_duration_sec 看:短时长+0 轮 = 开了就走;长时长+0 轮 =
# 挂着没互动。是 D1 浅尝 vs 上瘾的核心区分。
_instr_histogram("session_turn_count", self._session_msg_count, process=self._session_process)
except Exception:
# instrument import / emit 失败不能让进程退出卡住 —— 实在丢一条
# 也比 atexit 抛出强(atexit 异常会让 SIGTERM 退出码变化)。
pass
# ── 2) Bypass 60s throttle —— atexit 是最后机会,错过没下次 ──
# _report_to_server 内部 ``now - self._last_report_time < interval``
# 在短 session(启动后不到 60s 就退出)下会阻止上报,让刚 emit 的
# session_end counter / histogram 永远留在 instrument buffer。这里
# 显式归零让那条 if 一定不命中。会带来一个理论副作用:如果 atexit
# 之前距上次成功上报 < 60s,这次再发一份;server seen_batches 靠
# batch_seq dedupe,所以不会双倍计数。
with self._lock:
self._last_report_time = 0.0
# ── 3) save() 把 daily_stats + 上面刚 emit 的 instrument snapshot 一起发 ──
try:
# save() first: persists delta to disk and attempts remote report
# (best-effort final push). Then disable remote URL so no further
# network calls happen during interpreter teardown.
self.save()
except Exception:
# save 失败不抛进 atexit(同上)。失败时 unsent 已经被持久化,
# 下次进程启动会重传。
pass
# ── 3b) 若第一次 save 是「重传」(进程带着早先失败遗留的 _pending_batch_seq),
# _report_to_server 会按 is_retry 跳过 instrument snapshot,刚 emit 的
# session_end / session_duration_sec 仍留在 buffer。常见"网络早先挂、
# 退出前恢复"场景下重传会成功并清掉 batch_seq,但没有第二次发送,
# session 指标就在关 URL 前静默丢了(Codex)。所以这里检查:instrument
# 还有数据 + batch_seq 已清(说明重传成功、下次是 fresh 窗口会 snapshot)
# → 再 bypass throttle 发一次。
try:
from utils.instrument import has_data as _instrument_has_data
if (_instrument_has_data() and self._pending_batch_seq is None
and _TELEMETRY_SERVER_URL and not _DO_NOT_TRACK):
with self._lock:
self._last_report_time = 0.0
self.save()
except Exception:
pass
# ── 4) flush event_logger —— event 不走远程 instrument 通道,本地 jsonl 兜底 ──
try:
from utils.event_logger import EventLogger
EventLogger.get_instance().flush()
except Exception:
# event_logger flush 失败丢的是本地 jsonl 的稀疏事件,下次启动
# 没有恢复路径 —— 但 counter/histogram 已经走 instrument 通道
# 发出去了,这里失败影响的只是诊断细节,不阻塞 atexit。
pass
finally:
_TELEMETRY_SERVER_URL = ""
def _load_unsent_queue(self):
"""启动时加载上次未成功上报的远程数据。"""
if _DO_NOT_TRACK or not _TELEMETRY_SERVER_URL:
return
try:
p = self._unsent_queue_path
if not p.exists():
return
with open(p, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
return
loaded_daily = data.get("daily", {})
loaded_records = data.get("records", [])
loaded_batch_seq = data.get("batch_seq")
if loaded_daily:
with self._lock:
for day_key, day_val in loaded_daily.items():