Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
firebase.google.com/go/v4 v4.13.0
github.com/BurntSushi/toml v1.6.0
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Mininglamp-OSS/octo-lib v0.0.0-20260626063251-cff4d7a48f55
github.com/Mininglamp-OSS/octo-lib v0.0.0-20260628015025-0c34e6f108c4
github.com/alibabacloud-go/darabonba-openapi v0.2.1
github.com/alibabacloud-go/sms-intl-20180501 v1.0.1
github.com/alibabacloud-go/tea v1.2.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID
github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw=
github.com/Mininglamp-OSS/octo-lib v0.0.0-20260626063251-cff4d7a48f55 h1:2reLrd1tHsMwGRaGVXbHVpIzqnlVajkOPwn8hclDraI=
github.com/Mininglamp-OSS/octo-lib v0.0.0-20260626063251-cff4d7a48f55/go.mod h1:uOCTTI3QdG0ETKZ6uluRWUZgKexee/pY1pQYWyIEvZ0=
github.com/Mininglamp-OSS/octo-lib v0.0.0-20260628015025-0c34e6f108c4 h1:We9RyaYNEerDC/V5Zq0wFl+uvUFp9S0eHB5Jz6sz4fk=
github.com/Mininglamp-OSS/octo-lib v0.0.0-20260628015025-0c34e6f108c4/go.mod h1:uOCTTI3QdG0ETKZ6uluRWUZgKexee/pY1pQYWyIEvZ0=
github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae h1:DcFpTQBYQ9Ct2d6sC7ol0/ynxc2pO1cpGUM+f4t5adg=
github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae/go.mod h1:rJJ84PyA/Wlmw1hO+xTzV2wsSUon6J5ktg0g8BF2PuU=
github.com/RichardKnop/machinery/v2 v2.0.11 h1:BTfLGOmOju3W/OtlZmLX26OjYNZsU4PJo04pQReycdc=
Expand Down
17 changes: 8 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ func runAPI(ctx *config.Context) {
// 生命周期:跟随进程存续,不显式 Close——与 lib 自身的 redis.Conn 处理方式一致。
// PoolSize 显式设 10:令牌桶 Lua 脚本是短事务,Redis 端 <1ms,不需要大池;
// go-redis v6 默认 10*NumCPU 在大核机上会失控(多副本 × 多 client 连接数叠加)。
rlRedis := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
rlRedis := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 1
o.PoolSize = 10
}))
})
route.Use(route.RateLimitMiddleware(context.Background(), rlRedis, rps, burst, globalRateLimitExcludePaths()...))
// 上游依赖可观测性(issue #440 P0-a):依赖调用延迟 + 连接池指标,均注册到
// DefaultRegisterer,经 /metrics 暴露(与 httpMetrics 同一端点)。连接池用
Expand Down Expand Up @@ -238,13 +238,12 @@ func runAPI(ctx *config.Context) {
// observer 在调用时按 atomic 解析,construct 时即已挂 hook,故放在此处(指标族
// 注册之后、serve 之前)即可,不会漏掉后续流量。
//
// 覆盖范围(重要):dependency="redis" 只覆盖经 pkg/redis 构造的客户端 ——
// 主要是共享缓存 ctx.GetRedisConn()(数据面大头)。全仓还有约 15 处裸
// rd.NewClient(octoredis.MustBuildOptions(...))(限流 rlRedis、OIDC state/bind/
// logout/sync_lock、bot registry、user/group/space auth、health、各类 Lua 锁等),
// 它们需要 Eval/Script/SetNX 等 pkg/redis 包装未暴露的原语,因此都绕过插桩,
// 其单命令延迟不进本指标(连接池指标仍覆盖)。在 dependency="redis" 上做告警时
// 要知道 auth/OIDC/限流/锁/health 这些路径不可见。通用治理见 octo-lib#96。
// 覆盖范围:dependency="redis" 覆盖共享缓存 ctx.GetRedisConn()(数据面大头),
// 以及所有需要 Eval/Script/SetNX 而走裸 *rd.Client 的控制面客户端(限流 rlRedis、
// OIDC state/bind/logout/sync_lock、bot registry、user/group/space auth、health、
// 各类 Lua 锁等)—— 后者统一经 octoredis.NewInstrumentedClient/InstrumentedClientFromOptions
// 构造,在构造时即插桩(octo-lib#104)。仍在覆盖外的只有绕过 octoredis 直接
// rd.NewClient 的极少数场景(若有),用连接池指标兜底。
libdb.SetDBObserver(metrics.ObserveDB)
libredis.SetRedisObserver(metrics.ObserveRedisCmd)
metrics.RegisterPoolCollectors(prometheus.DefaultRegisterer, ctx.DB().DB, map[string]*rd.Client{
Expand Down
4 changes: 2 additions & 2 deletions modules/bot_api/registry_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ type RedisAppBotRegistry struct {
// process config). ttl supplies the safety-net key expiry; a non-positive value
// is coerced to a sane floor in set().
func NewRedisAppBotRegistry(ctx *config.Context, ttl func() time.Duration) *RedisAppBotRegistry {
client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 2
o.DialTimeout = 3 * time.Second
o.ReadTimeout = 2 * time.Second
o.WriteTimeout = 2 * time.Second
}))
})
return &RedisAppBotRegistry{
Log: log.NewTLog("RedisAppBotRegistry"),
client: client,
Expand Down
4 changes: 2 additions & 2 deletions modules/bot_provision/bot_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ func (a *BotProvision) Route(r *wkhttp.WKHttp) {
// allowlist / X-Internal-Key) is the documented primary control;
// the limiter is defense-in-depth.
rlCtx := context.Background()
rlRedis := rd.NewClient(octoredis.MustBuildOptions(a.ctx.GetConfig(), func(o *rd.Options) {
rlRedis := octoredis.NewInstrumentedClient(a.ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 1
o.PoolSize = 10
}))
})
verifyLimit := r.StrictIPRateLimitMiddleware(rlCtx, rlRedis, "verify", 1000.0/60, 100)
r.GET("/v1/bot/:uid/token", verifyLimit, a.botToken)
}
2 changes: 1 addition & 1 deletion modules/common/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type dependencyReadinessChecker struct {
func newDependencyReadinessChecker(ctx *config.Context, db *db) readinessChecker {
return &dependencyReadinessChecker{
db: db,
redisClient: rd.NewClient(readinessRedisOptions(ctx.GetConfig())),
redisClient: octoredis.InstrumentedClientFromOptions(readinessRedisOptions(ctx.GetConfig())),
}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/group/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ func (g *Group) Route(r *wkhttp.WKHttp) {
// - DM_API_GROUP_INVITE_RPS 每秒填充速率(float,缺省 60.0)
// - DM_API_GROUP_INVITE_BURST 桶容量(int,缺省 200)
// 生产环境如需收紧,在部署时设置 env(如 RPS=0.1667 / BURST=5 恢复到 10 req/min)。
rlRedis := redis.NewClient(octoredis.MustBuildOptions(g.ctx.GetConfig(), func(o *redis.Options) {
rlRedis := octoredis.NewInstrumentedClient(g.ctx.GetConfig(), func(o *redis.Options) {
o.MaxRetries = 1
o.PoolSize = 10
}))
})
inviteRPS := wkhttp.ParseRPSFromEnv("DM_API_GROUP_INVITE_RPS", 60.0) // 默认 60 rps ≈ 3600 req/min
inviteBurst := wkhttp.ParseBurstFromEnv("DM_API_GROUP_INVITE_BURST", 200)
groupInviteLimit := r.StrictIPRateLimitMiddleware(context.Background(), rlRedis, "group_invite", inviteRPS, inviteBurst)
Expand Down
4 changes: 2 additions & 2 deletions modules/incomingwebhook/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func sharedRateRedis(cfg *config.Config) *redis.Client {
// PoolSize 显式设 10:令牌桶 Lua 脚本是短事务,与 main.go / user / group /
// space / integration 等其它限流 client 的全局约定保持一致。Redis 故障/连接池
// 打满导致 fail-open 的兜底由进程内 localFloor 负责,不在此处放大连接池。
rateRedisClient = redis.NewClient(octoredis.MustBuildOptions(cfg, func(o *redis.Options) {
rateRedisClient = octoredis.NewInstrumentedClient(cfg, func(o *redis.Options) {
o.MaxRetries = 1
o.PoolSize = 10
}))
})
})
return rateRedisClient
}
Expand Down
4 changes: 2 additions & 2 deletions modules/integration/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func New(ctx *config.Context) *Integration {

func sharedIntegrationRateRedis(cfg *config.Config) *rd.Client {
integrationRateRedisOnce.Do(func() {
integrationRateRedisClient = rd.NewClient(octoredis.MustBuildOptions(cfg, func(o *rd.Options) {
integrationRateRedisClient = octoredis.NewInstrumentedClient(cfg, func(o *rd.Options) {
o.MaxRetries = 1
o.PoolSize = integrationRateLimitPoolSize
}))
})
})
return integrationRateRedisClient
}
Expand Down
4 changes: 2 additions & 2 deletions modules/oidc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1397,12 +1397,12 @@ type redisCompareDeleter struct {
}

func newRedisCompareDeleter(ctx *config.Context) *redisCompareDeleter {
client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 3
o.ReadTimeout = 3 * time.Second
o.WriteTimeout = 3 * time.Second
o.DialTimeout = 3 * time.Second
}))
})
return &redisCompareDeleter{client: client}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/oidc/bind_store_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ type redisBindStore struct {
}

func newRedisBindStore(ctx *config.Context) *redisBindStore {
client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 3
o.ReadTimeout = 3 * time.Second
o.WriteTimeout = 3 * time.Second
o.DialTimeout = 3 * time.Second
}))
})
return &redisBindStore{client: client}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/oidc/logout_idtoken.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type redisIDTokenStore struct {
}

func newRedisIDTokenStore(ctx *config.Context, enc *Encryptor) *redisIDTokenStore {
client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 3
o.ReadTimeout = 3 * time.Second
o.WriteTimeout = 3 * time.Second
o.DialTimeout = 3 * time.Second
}))
})
return &redisIDTokenStore{client: client, enc: enc}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/oidc/state_store_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type redisStateStore struct {
}

func newRedisStateStore(ctx *config.Context) *redisStateStore {
client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 3
o.ReadTimeout = 3 * time.Second
o.WriteTimeout = 3 * time.Second
o.DialTimeout = 3 * time.Second
}))
})
return &redisStateStore{client: client}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/oidc/sync_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ type RedisTickLock struct {
}

func newRedisTickLock(ctx *config.Context) *RedisTickLock {
client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 3
o.ReadTimeout = 3 * time.Second
o.WriteTimeout = 3 * time.Second
o.DialTimeout = 3 * time.Second
}))
})
return &RedisTickLock{client: client}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/opanalytics/etl_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ type etlLock struct {
}

func newETLLock(ctx *config.Context) *etlLock {
client := rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
client := octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 3
o.ReadTimeout = 3 * time.Second
o.WriteTimeout = 3 * time.Second
o.DialTimeout = 3 * time.Second
}))
})
return &etlLock{client: client}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/space/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ func (s *Space) Route(r *wkhttp.WKHttp) {
// 两个端点共享同一 limiter,使同一 IP 跨端点总配额受控。
// 阈值与 user 模块 login 同档(10 req/min, burst 5),详见 PR #1090。
// PoolSize=10:Lua 脚本短事务,与 user 模块 / main.go 保持一致。
rlRedis := rd.NewClient(octoredis.MustBuildOptions(s.ctx.GetConfig(), func(o *rd.Options) {
rlRedis := octoredis.NewInstrumentedClient(s.ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 1
o.PoolSize = 10
}))
})
invitePreviewLimit := r.StrictIPRateLimitMiddleware(context.Background(), rlRedis, "space_invite", 10.0/60, 5)

open := r.Group("/v1/space")
Expand Down
8 changes: 4 additions & 4 deletions modules/user/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func New(ctx *config.Context) *User {
spaceSettingDB: NewSpaceSettingDB(ctx.DB()),
verificationDB: newVerificationDB(ctx),
existingTokenSetter: redisExistingTokenSetter{
client: rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig(), func(o *rd.Options) {
client: octoredis.NewInstrumentedClient(ctx.GetConfig(), func(o *rd.Options) {
o.PoolSize = 10
})),
}),
},
}
// LanguageService 与 main.go 注入到 CacheTokenParser 的实例独立构造,但共享
Expand Down Expand Up @@ -193,10 +193,10 @@ func (u *User) Route(r *wkhttp.WKHttp) {
rlCtx := context.Background()
// 限流状态存 Redis,多副本共享配额;生命周期跟随进程,与 main.go 的做法一致
// PoolSize 显式设 10:理由同 main.go——限流 Lua 脚本短事务,不需要大池。
rlRedis := rd.NewClient(octoredis.MustBuildOptions(u.ctx.GetConfig(), func(o *rd.Options) {
rlRedis := octoredis.NewInstrumentedClient(u.ctx.GetConfig(), func(o *rd.Options) {
o.MaxRetries = 1
o.PoolSize = 10
}))
})
// burst 取小值:人类正常重试容忍 + 不给攻击者初始白嫖窗口
// tag 用稳定字符串分离 keyspace;注意 register 和 sms 参数相同但语义不同,必须分开
loginLimit := r.StrictIPRateLimitMiddleware(rlCtx, rlRedis, "login", 10.0/60, 5) // 10 req/min, burst 5
Expand Down
3 changes: 1 addition & 2 deletions modules/user/api_authcode_token_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/Mininglamp-OSS/octo-lib/pkg/util"
"github.com/Mininglamp-OSS/octo-lib/testutil"
octoredis "github.com/Mininglamp-OSS/octo-server/pkg/redis"
rd "github.com/go-redis/redis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -26,7 +25,7 @@ import (
func TestRedisExistingTokenSetter_SetXX_RealRedis(t *testing.T) {
_, ctx := testutil.NewTestServer()
setter := redisExistingTokenSetter{
client: rd.NewClient(octoredis.MustBuildOptions(ctx.GetConfig())),
client: octoredis.NewInstrumentedClient(ctx.GetConfig()),
}
prefix := ctx.GetConfig().Cache.TokenCachePrefix

Expand Down
4 changes: 2 additions & 2 deletions modules/usersecret/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func sharedRateRedis(cfg *config.Config) *rd.Client {
// 经 octoredis.MustBuildOptions 构造,确保 RedisTLS 启用(托管 TLS Redis)时
// TLSConfig 不被遗漏,否则限流 client 连不上、fail-open 静默关掉防护。
// PoolSize 显式设 10:令牌桶 Lua 是短事务,与其它限流 client 全局约定一致。
rateRedisClient = rd.NewClient(octoredis.MustBuildOptions(cfg, func(o *rd.Options) {
rateRedisClient = octoredis.NewInstrumentedClient(cfg, func(o *rd.Options) {
o.MaxRetries = 1
o.PoolSize = 10
}))
})
})
return rateRedisClient
}
Expand Down
80 changes: 80 additions & 0 deletions pkg/redis/chokepoint_guard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package redis

import (
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"
"testing"
)

// TestNoRawRedisClientOutsideChokepoint 把本 PR 的核心不变量钉成源码守卫:生产代码
// 只能经 octoredis 的 NewInstrumentedClient / InstrumentedClientFromOptions 构造 redis
// 客户端,不得直接 rd.NewClient / redis.NewClient —— 否则新站点会静默重开
// dependency="redis" 的盲区(COMPREHENSION §2 命名的回归)。仿照 i18n 的
// Test*NoLegacyResponseError 源码守卫。
//
// 扫描范围:仓库根下所有非 _test.go 的 .go 文件,排除 chokepoint 自身所在的 pkg/redis。
func TestNoRawRedisClientOutsideChokepoint(t *testing.T) {
root := repoRoot(t)
chokepoint := filepath.Join(root, "pkg", "redis")
// \b 防止把 octoredis.NewClient(若有)误判成 redis.NewClient。
re := regexp.MustCompile(`\b(rd|redis)\.NewClient\(`)

var violations []string
err := filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
switch d.Name() {
case ".git", "vendor", "node_modules":
return filepath.SkipDir
}
if path == chokepoint {
return filepath.SkipDir // chokepoint 自身合法持有 rd.NewClient
}
return nil
}
if !strings.HasSuffix(path, ".go") || strings.HasSuffix(path, "_test.go") {
return nil
}
b, err := os.ReadFile(path)
if err != nil {
return err
}
if re.Match(b) {
rel, _ := filepath.Rel(root, path)
violations = append(violations, rel)
}
return nil
})
if err != nil {
t.Fatalf("walk repo: %v", err)
}
if len(violations) > 0 {
t.Fatalf("raw redis client construction outside the octoredis chokepoint — route through "+
"octoredis.NewInstrumentedClient / InstrumentedClientFromOptions so commands feed "+
"dependency=\"redis\":\n %s", strings.Join(violations, "\n "))
}
}

// repoRoot 从测试运行目录向上找到含 go.mod 的仓库根。
func repoRoot(t *testing.T) string {
t.Helper()
dir, err := os.Getwd()
if err != nil {
t.Fatalf("getwd: %v", err)
}
for {
if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil {
return dir
}
parent := filepath.Dir(dir)
if parent == dir {
t.Fatal("go.mod not found walking up from test dir")
}
dir = parent
}
}
Loading
Loading