diff --git a/go.mod b/go.mod index a0c72c25..c1a86dc5 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( firebase.google.com/go/v4 v4.13.0 github.com/BurntSushi/toml v1.6.0 github.com/DATA-DOG/go-sqlmock v1.5.0 - github.com/Mininglamp-OSS/octo-lib v0.0.0-20260626063251-cff4d7a48f55 + github.com/Mininglamp-OSS/octo-lib v0.0.0-20260628015025-0c34e6f108c4 github.com/alibabacloud-go/darabonba-openapi v0.2.1 github.com/alibabacloud-go/sms-intl-20180501 v1.0.1 github.com/alibabacloud-go/tea v1.2.1 diff --git a/go.sum b/go.sum index a76502d3..e6091d8b 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw= github.com/Mininglamp-OSS/octo-lib v0.0.0-20260626063251-cff4d7a48f55 h1:2reLrd1tHsMwGRaGVXbHVpIzqnlVajkOPwn8hclDraI= github.com/Mininglamp-OSS/octo-lib v0.0.0-20260626063251-cff4d7a48f55/go.mod h1:uOCTTI3QdG0ETKZ6uluRWUZgKexee/pY1pQYWyIEvZ0= +github.com/Mininglamp-OSS/octo-lib v0.0.0-20260628015025-0c34e6f108c4 h1:We9RyaYNEerDC/V5Zq0wFl+uvUFp9S0eHB5Jz6sz4fk= +github.com/Mininglamp-OSS/octo-lib v0.0.0-20260628015025-0c34e6f108c4/go.mod h1:uOCTTI3QdG0ETKZ6uluRWUZgKexee/pY1pQYWyIEvZ0= github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae h1:DcFpTQBYQ9Ct2d6sC7ol0/ynxc2pO1cpGUM+f4t5adg= github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae/go.mod h1:rJJ84PyA/Wlmw1hO+xTzV2wsSUon6J5ktg0g8BF2PuU= github.com/RichardKnop/machinery/v2 v2.0.11 h1:BTfLGOmOju3W/OtlZmLX26OjYNZsU4PJo04pQReycdc= diff --git a/main.go b/main.go index f776b1d6..c2042418 100644 --- a/main.go +++ b/main.go @@ -199,10 +199,10 @@ func runAPI(ctx *config.Context) { // 生命周期:跟随进程存续,不显式 Close——与 lib 自身的 redis.Conn 处理方式一致。 // PoolSize 显式设 10:令牌桶 Lua 脚本是短事务,Redis 端 <1ms,不需要大池; // go-redis v6 默认 10*NumCPU 在大核机上会失控(多副本 × 多 client 连接数叠加)。 - rlRedis := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + rlRedis := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 1 o.PoolSize = 10 - })) + }) route.Use(route.RateLimitMiddleware(context.Background(), rlRedis, rps, burst, globalRateLimitExcludePaths()...)) // 上游依赖可观测性(issue #440 P0-a):依赖调用延迟 + 连接池指标,均注册到 // DefaultRegisterer,经 /metrics 暴露(与 httpMetrics 同一端点)。连接池用 @@ -238,13 +238,12 @@ func runAPI(ctx *config.Context) { // observer 在调用时按 atomic 解析,construct 时即已挂 hook,故放在此处(指标族 // 注册之后、serve 之前)即可,不会漏掉后续流量。 // - // 覆盖范围(重要):dependency="redis" 只覆盖经 pkg/redis 构造的客户端 —— - // 主要是共享缓存 ctx.GetRedisConn()(数据面大头)。全仓还有约 15 处裸 - // rd.NewClient(octoredis.MustBuildOptions(...))(限流 rlRedis、OIDC state/bind/ - // logout/sync_lock、bot registry、user/group/space auth、health、各类 Lua 锁等), - // 它们需要 Eval/Script/SetNX 等 pkg/redis 包装未暴露的原语,因此都绕过插桩, - // 其单命令延迟不进本指标(连接池指标仍覆盖)。在 dependency="redis" 上做告警时 - // 要知道 auth/OIDC/限流/锁/health 这些路径不可见。通用治理见 octo-lib#96。 + // 覆盖范围:dependency="redis" 覆盖共享缓存 ctx.GetRedisConn()(数据面大头), + // 以及所有需要 Eval/Script/SetNX 而走裸 *rd.Client 的控制面客户端(限流 rlRedis、 + // OIDC state/bind/logout/sync_lock、bot registry、user/group/space auth、health、 + // 各类 Lua 锁等)—— 后者统一经 octoredis.NewInstrumentedClient/InstrumentedClientFromOptions + // 构造,在构造时即插桩(octo-lib#104)。仍在覆盖外的只有绕过 octoredis 直接 + // rd.NewClient 的极少数场景(若有),用连接池指标兜底。 libdb.SetDBObserver(metrics.ObserveDB) libredis.SetRedisObserver(metrics.ObserveRedisCmd) metrics.RegisterPoolCollectors(prometheus.DefaultRegisterer, ctx.DB().DB, map[string]*rd.Client{ diff --git a/modules/bot_api/registry_redis.go b/modules/bot_api/registry_redis.go index 84ca0e49..ba331747 100644 --- a/modules/bot_api/registry_redis.go +++ b/modules/bot_api/registry_redis.go @@ -96,12 +96,12 @@ type RedisAppBotRegistry struct { // process config). ttl supplies the safety-net key expiry; a non-positive value // is coerced to a sane floor in set(). func NewRedisAppBotRegistry(ctx *config.Context, ttl func() time.Duration) *RedisAppBotRegistry { - client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 2 o.DialTimeout = 3 * time.Second o.ReadTimeout = 2 * time.Second o.WriteTimeout = 2 * time.Second - })) + }) return &RedisAppBotRegistry{ Log: log.NewTLog("RedisAppBotRegistry"), client: client, diff --git a/modules/bot_provision/bot_api.go b/modules/bot_provision/bot_api.go index a1a3fb6d..27bca0a6 100644 --- a/modules/bot_provision/bot_api.go +++ b/modules/bot_provision/bot_api.go @@ -209,10 +209,10 @@ func (a *BotProvision) Route(r *wkhttp.WKHttp) { // allowlist / X-Internal-Key) is the documented primary control; // the limiter is defense-in-depth. rlCtx := context.Background() - rlRedis := rd.NewClient(octoredis.MustBuildOptions(a.ctx.GetConfig(), func(o *rd.Options) { + rlRedis := octoredis.NewInstrumentedClient(a.ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 1 o.PoolSize = 10 - })) + }) verifyLimit := r.StrictIPRateLimitMiddleware(rlCtx, rlRedis, "verify", 1000.0/60, 100) r.GET("/v1/bot/:uid/token", verifyLimit, a.botToken) } diff --git a/modules/common/health.go b/modules/common/health.go index 4b9acf57..dc27fea8 100644 --- a/modules/common/health.go +++ b/modules/common/health.go @@ -40,7 +40,7 @@ type dependencyReadinessChecker struct { func newDependencyReadinessChecker(ctx *config.Context, db *db) readinessChecker { return &dependencyReadinessChecker{ db: db, - redisClient: rd.NewClient(readinessRedisOptions(ctx.GetConfig())), + redisClient: octoredis.InstrumentedClientFromOptions(readinessRedisOptions(ctx.GetConfig())), } } diff --git a/modules/group/api.go b/modules/group/api.go index 28a62a6a..9a05418d 100644 --- a/modules/group/api.go +++ b/modules/group/api.go @@ -150,10 +150,10 @@ func (g *Group) Route(r *wkhttp.WKHttp) { // - DM_API_GROUP_INVITE_RPS 每秒填充速率(float,缺省 60.0) // - DM_API_GROUP_INVITE_BURST 桶容量(int,缺省 200) // 生产环境如需收紧,在部署时设置 env(如 RPS=0.1667 / BURST=5 恢复到 10 req/min)。 - rlRedis := redis.NewClient(octoredis.MustBuildOptions(g.ctx.GetConfig(), func(o *redis.Options) { + rlRedis := octoredis.NewInstrumentedClient(g.ctx.GetConfig(), func(o *redis.Options) { o.MaxRetries = 1 o.PoolSize = 10 - })) + }) inviteRPS := wkhttp.ParseRPSFromEnv("DM_API_GROUP_INVITE_RPS", 60.0) // 默认 60 rps ≈ 3600 req/min inviteBurst := wkhttp.ParseBurstFromEnv("DM_API_GROUP_INVITE_BURST", 200) groupInviteLimit := r.StrictIPRateLimitMiddleware(context.Background(), rlRedis, "group_invite", inviteRPS, inviteBurst) diff --git a/modules/incomingwebhook/api.go b/modules/incomingwebhook/api.go index ac9945d5..ba0a7a5c 100644 --- a/modules/incomingwebhook/api.go +++ b/modules/incomingwebhook/api.go @@ -146,10 +146,10 @@ func sharedRateRedis(cfg *config.Config) *redis.Client { // PoolSize 显式设 10:令牌桶 Lua 脚本是短事务,与 main.go / user / group / // space / integration 等其它限流 client 的全局约定保持一致。Redis 故障/连接池 // 打满导致 fail-open 的兜底由进程内 localFloor 负责,不在此处放大连接池。 - rateRedisClient = redis.NewClient(octoredis.MustBuildOptions(cfg, func(o *redis.Options) { + rateRedisClient = octoredis.NewInstrumentedClient(cfg, func(o *redis.Options) { o.MaxRetries = 1 o.PoolSize = 10 - })) + }) }) return rateRedisClient } diff --git a/modules/integration/api.go b/modules/integration/api.go index b39def0d..8e6a99fc 100644 --- a/modules/integration/api.go +++ b/modules/integration/api.go @@ -82,10 +82,10 @@ func New(ctx *config.Context) *Integration { func sharedIntegrationRateRedis(cfg *config.Config) *rd.Client { integrationRateRedisOnce.Do(func() { - integrationRateRedisClient = rd.NewClient(octoredis.MustBuildOptions(cfg, func(o *rd.Options) { + integrationRateRedisClient = octoredis.NewInstrumentedClient(cfg, func(o *rd.Options) { o.MaxRetries = 1 o.PoolSize = integrationRateLimitPoolSize - })) + }) }) return integrationRateRedisClient } diff --git a/modules/oidc/api.go b/modules/oidc/api.go index af63072e..fabf28fa 100644 --- a/modules/oidc/api.go +++ b/modules/oidc/api.go @@ -1397,12 +1397,12 @@ type redisCompareDeleter struct { } func newRedisCompareDeleter(ctx *config.Context) *redisCompareDeleter { - client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 3 o.ReadTimeout = 3 * time.Second o.WriteTimeout = 3 * time.Second o.DialTimeout = 3 * time.Second - })) + }) return &redisCompareDeleter{client: client} } diff --git a/modules/oidc/bind_store_redis.go b/modules/oidc/bind_store_redis.go index f424bbd2..01398114 100644 --- a/modules/oidc/bind_store_redis.go +++ b/modules/oidc/bind_store_redis.go @@ -69,12 +69,12 @@ type redisBindStore struct { } func newRedisBindStore(ctx *config.Context) *redisBindStore { - client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 3 o.ReadTimeout = 3 * time.Second o.WriteTimeout = 3 * time.Second o.DialTimeout = 3 * time.Second - })) + }) return &redisBindStore{client: client} } diff --git a/modules/oidc/logout_idtoken.go b/modules/oidc/logout_idtoken.go index 23e09b49..d0264fda 100644 --- a/modules/oidc/logout_idtoken.go +++ b/modules/oidc/logout_idtoken.go @@ -40,12 +40,12 @@ type redisIDTokenStore struct { } func newRedisIDTokenStore(ctx *config.Context, enc *Encryptor) *redisIDTokenStore { - client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 3 o.ReadTimeout = 3 * time.Second o.WriteTimeout = 3 * time.Second o.DialTimeout = 3 * time.Second - })) + }) return &redisIDTokenStore{client: client, enc: enc} } diff --git a/modules/oidc/state_store_redis.go b/modules/oidc/state_store_redis.go index 81cb043e..8fea5875 100644 --- a/modules/oidc/state_store_redis.go +++ b/modules/oidc/state_store_redis.go @@ -36,12 +36,12 @@ type redisStateStore struct { } func newRedisStateStore(ctx *config.Context) *redisStateStore { - client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 3 o.ReadTimeout = 3 * time.Second o.WriteTimeout = 3 * time.Second o.DialTimeout = 3 * time.Second - })) + }) return &redisStateStore{client: client} } diff --git a/modules/oidc/sync_lock.go b/modules/oidc/sync_lock.go index b48ea766..226fb1e4 100644 --- a/modules/oidc/sync_lock.go +++ b/modules/oidc/sync_lock.go @@ -52,12 +52,12 @@ type RedisTickLock struct { } func newRedisTickLock(ctx *config.Context) *RedisTickLock { - client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 3 o.ReadTimeout = 3 * time.Second o.WriteTimeout = 3 * time.Second o.DialTimeout = 3 * time.Second - })) + }) return &RedisTickLock{client: client} } diff --git a/modules/opanalytics/etl_lock.go b/modules/opanalytics/etl_lock.go index 98fe7ee3..e7dbfe17 100644 --- a/modules/opanalytics/etl_lock.go +++ b/modules/opanalytics/etl_lock.go @@ -45,12 +45,12 @@ type etlLock struct { } func newETLLock(ctx *config.Context) *etlLock { - client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 3 o.ReadTimeout = 3 * time.Second o.WriteTimeout = 3 * time.Second o.DialTimeout = 3 * time.Second - })) + }) return &etlLock{client: client} } diff --git a/modules/space/api.go b/modules/space/api.go index b5fe8ae9..6f9bdc9e 100644 --- a/modules/space/api.go +++ b/modules/space/api.go @@ -106,10 +106,10 @@ func (s *Space) Route(r *wkhttp.WKHttp) { // 两个端点共享同一 limiter,使同一 IP 跨端点总配额受控。 // 阈值与 user 模块 login 同档(10 req/min, burst 5),详见 PR #1090。 // PoolSize=10:Lua 脚本短事务,与 user 模块 / main.go 保持一致。 - rlRedis := rd.NewClient(octoredis.MustBuildOptions(s.ctx.GetConfig(), func(o *rd.Options) { + rlRedis := octoredis.NewInstrumentedClient(s.ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 1 o.PoolSize = 10 - })) + }) invitePreviewLimit := r.StrictIPRateLimitMiddleware(context.Background(), rlRedis, "space_invite", 10.0/60, 5) open := r.Group("/v1/space") diff --git a/modules/user/api.go b/modules/user/api.go index 9046c056..bc675dd1 100644 --- a/modules/user/api.go +++ b/modules/user/api.go @@ -161,9 +161,9 @@ func New(ctx *config.Context) *User { spaceSettingDB: NewSpaceSettingDB(ctx.DB()), verificationDB: newVerificationDB(ctx), existingTokenSetter: redisExistingTokenSetter{ - client: rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client: octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.PoolSize = 10 - })), + }), }, } // LanguageService 与 main.go 注入到 CacheTokenParser 的实例独立构造,但共享 @@ -193,10 +193,10 @@ func (u *User) Route(r *wkhttp.WKHttp) { rlCtx := context.Background() // 限流状态存 Redis,多副本共享配额;生命周期跟随进程,与 main.go 的做法一致 // PoolSize 显式设 10:理由同 main.go——限流 Lua 脚本短事务,不需要大池。 - rlRedis := rd.NewClient(octoredis.MustBuildOptions(u.ctx.GetConfig(), func(o *rd.Options) { + rlRedis := octoredis.NewInstrumentedClient(u.ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 1 o.PoolSize = 10 - })) + }) // burst 取小值:人类正常重试容忍 + 不给攻击者初始白嫖窗口 // tag 用稳定字符串分离 keyspace;注意 register 和 sms 参数相同但语义不同,必须分开 loginLimit := r.StrictIPRateLimitMiddleware(rlCtx, rlRedis, "login", 10.0/60, 5) // 10 req/min, burst 5 diff --git a/modules/user/api_authcode_token_redis_test.go b/modules/user/api_authcode_token_redis_test.go index 1ca3e523..488a6e46 100644 --- a/modules/user/api_authcode_token_redis_test.go +++ b/modules/user/api_authcode_token_redis_test.go @@ -7,7 +7,6 @@ import ( "github.com/Mininglamp-OSS/octo-lib/pkg/util" "github.com/Mininglamp-OSS/octo-lib/testutil" octoredis "github.com/Mininglamp-OSS/octo-server/pkg/redis" - rd "github.com/go-redis/redis" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -26,7 +25,7 @@ import ( func TestRedisExistingTokenSetter_SetXX_RealRedis(t *testing.T) { _, ctx := testutil.NewTestServer() setter := redisExistingTokenSetter{ - client: rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig())), + client: octoredis.NewInstrumentedClient(ctx.GetConfig()), } prefix := ctx.GetConfig().Cache.TokenCachePrefix diff --git a/modules/usersecret/api.go b/modules/usersecret/api.go index 157261dc..343b4ab6 100644 --- a/modules/usersecret/api.go +++ b/modules/usersecret/api.go @@ -48,10 +48,10 @@ func sharedRateRedis(cfg *config.Config) *rd.Client { // 经 octoredis.MustBuildOptions 构造,确保 RedisTLS 启用(托管 TLS Redis)时 // TLSConfig 不被遗漏,否则限流 client 连不上、fail-open 静默关掉防护。 // PoolSize 显式设 10:令牌桶 Lua 是短事务,与其它限流 client 全局约定一致。 - rateRedisClient = rd.NewClient(octoredis.MustBuildOptions(cfg, func(o *rd.Options) { + rateRedisClient = octoredis.NewInstrumentedClient(cfg, func(o *rd.Options) { o.MaxRetries = 1 o.PoolSize = 10 - })) + }) }) return rateRedisClient } diff --git a/pkg/redis/chokepoint_guard_test.go b/pkg/redis/chokepoint_guard_test.go new file mode 100644 index 00000000..03c3bb7d --- /dev/null +++ b/pkg/redis/chokepoint_guard_test.go @@ -0,0 +1,80 @@ +package redis + +import ( + "io/fs" + "os" + "path/filepath" + "regexp" + "strings" + "testing" +) + +// TestNoRawRedisClientOutsideChokepoint 把本 PR 的核心不变量钉成源码守卫:生产代码 +// 只能经 octoredis 的 NewInstrumentedClient / InstrumentedClientFromOptions 构造 redis +// 客户端,不得直接 rd.NewClient / redis.NewClient —— 否则新站点会静默重开 +// dependency="redis" 的盲区(COMPREHENSION §2 命名的回归)。仿照 i18n 的 +// Test*NoLegacyResponseError 源码守卫。 +// +// 扫描范围:仓库根下所有非 _test.go 的 .go 文件,排除 chokepoint 自身所在的 pkg/redis。 +func TestNoRawRedisClientOutsideChokepoint(t *testing.T) { + root := repoRoot(t) + chokepoint := filepath.Join(root, "pkg", "redis") + // \b 防止把 octoredis.NewClient(若有)误判成 redis.NewClient。 + re := regexp.MustCompile(`\b(rd|redis)\.NewClient\(`) + + var violations []string + err := filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + switch d.Name() { + case ".git", "vendor", "node_modules": + return filepath.SkipDir + } + if path == chokepoint { + return filepath.SkipDir // chokepoint 自身合法持有 rd.NewClient + } + return nil + } + if !strings.HasSuffix(path, ".go") || strings.HasSuffix(path, "_test.go") { + return nil + } + b, err := os.ReadFile(path) + if err != nil { + return err + } + if re.Match(b) { + rel, _ := filepath.Rel(root, path) + violations = append(violations, rel) + } + return nil + }) + if err != nil { + t.Fatalf("walk repo: %v", err) + } + if len(violations) > 0 { + t.Fatalf("raw redis client construction outside the octoredis chokepoint — route through "+ + "octoredis.NewInstrumentedClient / InstrumentedClientFromOptions so commands feed "+ + "dependency=\"redis\":\n %s", strings.Join(violations, "\n ")) + } +} + +// repoRoot 从测试运行目录向上找到含 go.mod 的仓库根。 +func repoRoot(t *testing.T) string { + t.Helper() + dir, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %v", err) + } + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir + } + parent := filepath.Dir(dir) + if parent == dir { + t.Fatal("go.mod not found walking up from test dir") + } + dir = parent + } +} diff --git a/pkg/redis/instrument_test.go b/pkg/redis/instrument_test.go new file mode 100644 index 00000000..5d875477 --- /dev/null +++ b/pkg/redis/instrument_test.go @@ -0,0 +1,81 @@ +package redis + +import ( + "sync" + "testing" + "time" + + liboredis "github.com/Mininglamp-OSS/octo-lib/pkg/redis" + rd "github.com/go-redis/redis" +) + +// NewInstrumentedClient 构造的裸 client 必须已挂上 octo-lib 的命令计时 hook。 +// 命令打向无人监听的地址会立即失败,但 WrapProcess 仍会触发上报 —— 借此在无真实 +// Redis 的情况下证明插桩已生效。共享进程级 observer 单例,勿加 t.Parallel()。 +func TestNewInstrumentedClientInstruments(t *testing.T) { + var ( + mu sync.Mutex + sawGet bool + ) + liboredis.SetRedisObserver(func(cmd string, _ time.Duration, _ error) { + mu.Lock() + if cmd == "get" { + sawGet = true + } + mu.Unlock() + }) + t.Cleanup(func() { liboredis.SetRedisObserver(nil) }) + + cfg := newConfig() + cfg.DB.RedisAddr = "127.0.0.1:1" // 无人监听:命令立即失败,但 hook 仍触发 + c := NewInstrumentedClient(cfg, func(o *rd.Options) { + o.MaxRetries = 0 + o.DialTimeout = 200 * time.Millisecond + }) + if c == nil { + t.Fatal("NewInstrumentedClient returned nil") + } + defer func() { _ = c.Close() }() + + _ = c.Get("k").Err() + + mu.Lock() + defer mu.Unlock() + if !sawGet { + t.Fatal("expected the instrumented client's GET to reach the observer") + } +} + +// InstrumentedClientFromOptions 同样应挂上 hook。 +func TestInstrumentedClientFromOptionsInstruments(t *testing.T) { + var ( + mu sync.Mutex + sawGet bool + ) + liboredis.SetRedisObserver(func(cmd string, _ time.Duration, _ error) { + mu.Lock() + if cmd == "get" { + sawGet = true + } + mu.Unlock() + }) + t.Cleanup(func() { liboredis.SetRedisObserver(nil) }) + + c := InstrumentedClientFromOptions(&rd.Options{ + Addr: "127.0.0.1:1", + MaxRetries: 0, + DialTimeout: 200 * time.Millisecond, + }) + if c == nil { + t.Fatal("InstrumentedClientFromOptions returned nil") + } + defer func() { _ = c.Close() }() + + _ = c.Get("k").Err() + + mu.Lock() + defer mu.Unlock() + if !sawGet { + t.Fatal("expected the instrumented client's GET to reach the observer") + } +} diff --git a/pkg/redis/options.go b/pkg/redis/options.go index d53d999d..82556d12 100644 --- a/pkg/redis/options.go +++ b/pkg/redis/options.go @@ -54,3 +54,29 @@ func MustBuildOptions(cfg *config.Config, overrides ...OptionsOverride) *rd.Opti } return opts } + +// NewInstrumentedClient 用 cfg(+overrides) 构造一个裸 *rd.Client,并在返回前挂上 +// octo-lib 的每条命令计时 hook(liboredis.Instrument),使其命令进入 +// dependency="redis" 指标。 +// +// octo-server 内所有需要裸 *rd.Client 的场景(限流令牌桶、OIDC 锁、health 探针等 +// 需要 Eval/Script/SetNX、无法用 lib 的 Conn 包装的地方)都应通过本函数构造 —— 既 +// 统一了 TLS(经 BuildOptions),又确保插桩不被漏掉。插桩在构造时、client 被共享 +// 前完成,满足 octo-lib Instrument 的「共享前插桩」契约。 +// +// 与 MustBuildOptions 一样,TLS 配置错误属启动期配置错误,直接 panic。 +func NewInstrumentedClient(cfg *config.Config, overrides ...OptionsOverride) *rd.Client { + return InstrumentedClientFromOptions(MustBuildOptions(cfg, overrides...)) +} + +// InstrumentedClientFromOptions 用调用方预构造的 *rd.Options 建裸 *rd.Client 并插桩。 +// 供少数已自行拼好 Options 的场景(如 health 探针)使用;一般情况优先用 +// NewInstrumentedClient。 +func InstrumentedClientFromOptions(opts *rd.Options) *rd.Client { + // 防御性复制:go-redis 会就地写入若干默认值,不复制可能在调用方复用同一 + // *rd.Options 时串改。与 octo-lib redis.NewWithOptions 的处理保持一致。 + local := *opts + c := rd.NewClient(&local) + liboredis.Instrument(c) + return c +} diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index 161805d0..939191f8 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -31,7 +31,9 @@ func NewWithOptions(opts *rd.Options) *Conn { if opts.MaxRetries == 0 { opts.MaxRetries = 3 } - return &Conn{client: rd.NewClient(opts)} + // 经 InstrumentedClientFromOptions 构造,使这个 Conn 的命令也进入 dependency="redis" + // 指标(与裸 client 一致)。 + return &Conn{client: InstrumentedClientFromOptions(opts)} } func (rc *Conn) Ping() (string, error) { diff --git a/pkg/wkhttp/ratelimit_helper.go b/pkg/wkhttp/ratelimit_helper.go index 1e0af8ef..a42c4cf5 100644 --- a/pkg/wkhttp/ratelimit_helper.go +++ b/pkg/wkhttp/ratelimit_helper.go @@ -64,10 +64,10 @@ func SharedUIDRateLimiter(r *libwkhttp.WKHttp, ctx *config.Context) libwkhttp.Ha // Eval/Script 接口,令牌桶 Lua 脚本必须走原生 go-redis。生命周期跟随进程。 // ctx 传 context.Background():go-redis v6 的 Script.Run 不接受 context; // 即便未来升级到 v8+ 也不应传请求 ctx(token 消耗不可因客户端断连回退)。 - client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) { + client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) { o.MaxRetries = 1 o.PoolSize = uidRateLimitPoolSize - })) + }) if r == nil { r = libwkhttp.New() }