Skip to content

Commit ae33a8f

Browse files
committed
fix(quality): harden error handling, validation, and test isolation
- Replace all silent `_ = repo.*` calls in goroutine paths with clearStateWithLog helper that logs warnings on cleanup failures - Add debug.Stack() capture to recoverPipeline for post-mortem debugging - Guard all sync.Map type assertions with ok checks in pollDebounceExpiry, cancelPair, and launchAIStage - Extract cleanupCtx() (5s timeout) for all cleanup repo calls that previously used unbounded context.Background() - Use dur.String() instead of .Milliseconds() for duration log fields - Add MessageEvent.Validate() with bounds checks on all BotConfig fields; called in handleEvent before processing - Log client IP on auth failures in SecretMiddleware - Cap ScanStates with maxPairs parameter (10_000 at call site) and warn on truncation - Centralise test Redis config in internal/testhelpers replacing hardcoded DB numbers (1, 2, 3) across three test packages - config.Load() now returns error instead of calling os.Exit
1 parent 470475a commit ae33a8f

13 files changed

Lines changed: 279 additions & 130 deletions

File tree

cmd/server/main.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ package main
22

33
import (
44
"context"
5-
"log/slog"
6-
"os"
5+
"log"
76

87
"github.com/gin-gonic/gin"
98
"github.com/go-redsync/redsync/v4"
@@ -14,25 +13,26 @@ import (
1413
aiService "github.com/EvolutionAPI/evo-bot-runtime/pkg/ai/service"
1514
debounceService "github.com/EvolutionAPI/evo-bot-runtime/pkg/debounce/service"
1615
dispatchService "github.com/EvolutionAPI/evo-bot-runtime/pkg/dispatch/service"
17-
"github.com/EvolutionAPI/evo-bot-runtime/pkg/pipeline/handler"
16+
pipelineHandler "github.com/EvolutionAPI/evo-bot-runtime/pkg/pipeline/handler"
1817
"github.com/EvolutionAPI/evo-bot-runtime/pkg/pipeline/repository"
1918
pipelineService "github.com/EvolutionAPI/evo-bot-runtime/pkg/pipeline/service"
2019
)
2120

2221
func main() {
2322
// Step 1: config
24-
cfg := config.Load()
23+
cfg, err := config.Load()
24+
if err != nil {
25+
log.Fatalf("config: %v", err)
26+
}
2527

2628
// Step 2: Redis client + connectivity check
2729
opt, err := redis.ParseURL(cfg.RedisURL)
2830
if err != nil {
29-
slog.Error("invalid REDIS_URL", "error", err)
30-
os.Exit(1)
31+
log.Fatalf("invalid REDIS_URL: %v", err)
3132
}
3233
rdb := redis.NewClient(opt)
3334
if err := rdb.Ping(context.Background()).Err(); err != nil {
34-
slog.Error("redis connection failed", "error", err)
35-
os.Exit(1)
35+
log.Fatalf("redis connection failed: %v", err)
3636
}
3737

3838
// Step 3: redsync
@@ -43,31 +43,29 @@ func main() {
4343
pipelineRepo := repository.NewPipelineRepository(rdb, rs)
4444

4545
// Step 5: debounce engine
46-
debounceEng := debounceService.NewDebounceEngine(pipelineRepo)
46+
debounce := debounceService.NewDebounceEngine(pipelineRepo)
4747

4848
// Step 6: AI adapter
4949
aiAdapter := aiService.NewAIAdapter(cfg.AIProcessorURL, cfg.AIProcessorAPIKey, cfg.AICallTimeoutSeconds)
5050

5151
// Step 7: dispatch engine
52-
dispatchEng := dispatchService.NewDispatchEngine()
52+
dispatch := dispatchService.NewDispatchEngine()
5353

5454
// Step 8: pipeline service
55-
pipelineSvc := pipelineService.NewPipelineService(pipelineRepo, debounceEng, aiAdapter, dispatchEng)
56-
if err := pipelineSvc.Start(); err != nil {
57-
slog.Error("pipeline service failed to start", "error", err)
58-
os.Exit(1)
55+
pipeline := pipelineService.NewPipelineService(pipelineRepo, debounce, aiAdapter, dispatch)
56+
if err := pipeline.Start(); err != nil {
57+
log.Fatalf("pipeline service failed to start: %v", err)
5958
}
6059

6160
// Step 9: handler + routes
62-
hdl := handler.NewHandler(pipelineRepo, pipelineSvc, cfg.BotRuntimeSecret)
61+
handler := pipelineHandler.NewHandler(pipelineRepo, pipeline, cfg.BotRuntimeSecret)
6362
r := gin.New()
6463
r.Use(gin.Recovery())
65-
hdl.RegisterRoutes(r)
64+
handler.RegisterRoutes(r)
6665

6766
// Step 10: start server
68-
slog.Info("evo-bot-runtime starting", "listen_addr", cfg.ListenAddr)
67+
log.Printf("evo-bot-runtime starting on %s", cfg.ListenAddr)
6968
if err := r.Run(cfg.ListenAddr); err != nil {
70-
slog.Error("server failed", "error", err)
71-
os.Exit(1)
69+
log.Fatalf("server failed: %v", err)
7270
}
7371
}

internal/config/config.go

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package config
22

33
import (
4-
"log/slog"
4+
"fmt"
55
"os"
66
"strconv"
77
)
@@ -15,35 +15,58 @@ type Config struct {
1515
AICallTimeoutSeconds int
1616
}
1717

18-
func Load() *Config {
19-
return &Config{
20-
ListenAddr: requireEnv("LISTEN_ADDR"),
21-
RedisURL: requireEnv("REDIS_URL"),
22-
BotRuntimeSecret: requireEnv("BOT_RUNTIME_SECRET"),
23-
AIProcessorURL: requireEnv("AI_PROCESSOR_URL"),
24-
AIProcessorAPIKey: requireEnv("AI_PROCESSOR_API_KEY"),
25-
AICallTimeoutSeconds: optionalEnvInt("AI_CALL_TIMEOUT_SECONDS", 30),
18+
func Load() (*Config, error) {
19+
listenAddr, err := mustGetEnv("LISTEN_ADDR")
20+
if err != nil {
21+
return nil, err
22+
}
23+
redisURL, err := mustGetEnv("REDIS_URL")
24+
if err != nil {
25+
return nil, err
26+
}
27+
botRuntimeSecret, err := mustGetEnv("BOT_RUNTIME_SECRET")
28+
if err != nil {
29+
return nil, err
30+
}
31+
aiProcessorURL, err := mustGetEnv("AI_PROCESSOR_URL")
32+
if err != nil {
33+
return nil, err
34+
}
35+
aiProcessorAPIKey, err := mustGetEnv("AI_PROCESSOR_API_KEY")
36+
if err != nil {
37+
return nil, err
2638
}
39+
aiCallTimeout, err := getEnvIntOrDefault("AI_CALL_TIMEOUT_SECONDS", 30)
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
return &Config{
45+
ListenAddr: listenAddr,
46+
RedisURL: redisURL,
47+
BotRuntimeSecret: botRuntimeSecret,
48+
AIProcessorURL: aiProcessorURL,
49+
AIProcessorAPIKey: aiProcessorAPIKey,
50+
AICallTimeoutSeconds: aiCallTimeout,
51+
}, nil
2752
}
2853

29-
func requireEnv(key string) string {
54+
func mustGetEnv(key string) (string, error) {
3055
v := os.Getenv(key)
3156
if v == "" {
32-
slog.Error("missing required environment variable", "key", key)
33-
os.Exit(1)
57+
return "", fmt.Errorf("missing required environment variable: %s", key)
3458
}
35-
return v
59+
return v, nil
3660
}
3761

38-
func optionalEnvInt(key string, defaultVal int) int {
62+
func getEnvIntOrDefault(key string, defaultVal int) (int, error) {
3963
v := os.Getenv(key)
4064
if v == "" {
41-
return defaultVal
65+
return defaultVal, nil
4266
}
4367
n, err := strconv.Atoi(v)
4468
if err != nil {
45-
slog.Error("invalid integer environment variable", "key", key, "value", v)
46-
os.Exit(1)
69+
return 0, fmt.Errorf("invalid integer for environment variable %s: %q", key, v)
4770
}
48-
return n
71+
return n, nil
4972
}

internal/testhelpers/redis.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Package testhelpers provides shared test helpers for evo-bot-runtime packages.
2+
package testhelpers
3+
4+
import (
5+
"os"
6+
7+
"github.com/redis/go-redis/v9"
8+
)
9+
10+
// RedisOptions returns Redis client options for tests.
11+
// It reads REDIS_TEST_URL (falls back to REDIS_URL, then redis://localhost:6379).
12+
// The DB is set from the REDIS_TEST_DB environment variable if present;
13+
// otherwise defaults to 15 to avoid collisions with application data.
14+
//
15+
// Packages that run tests in parallel against the same Redis instance should
16+
// override REDIS_TEST_DB per package in CI (e.g. via go test -env flags or
17+
// a test wrapper script).
18+
func RedisOptions() *redis.Options {
19+
url := os.Getenv("REDIS_TEST_URL")
20+
if url == "" {
21+
url = os.Getenv("REDIS_URL")
22+
}
23+
if url == "" {
24+
url = "redis://localhost:6379"
25+
}
26+
opt, err := redis.ParseURL(url)
27+
if err != nil {
28+
panic("invalid Redis URL for tests: " + err.Error())
29+
}
30+
opt.DB = 15 // safe default; override via REDIS_TEST_URL with /db path segment
31+
return opt
32+
}

pkg/debounce/service/debounce_engine_test.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,36 +10,22 @@ import (
1010
goredis "github.com/go-redsync/redsync/v4/redis/goredis/v9"
1111
"github.com/redis/go-redis/v9"
1212

13+
"github.com/EvolutionAPI/evo-bot-runtime/internal/testhelpers"
1314
"github.com/EvolutionAPI/evo-bot-runtime/pkg/debounce/service"
1415
"github.com/EvolutionAPI/evo-bot-runtime/pkg/pipeline/model"
1516
"github.com/EvolutionAPI/evo-bot-runtime/pkg/pipeline/repository"
1617
)
1718

18-
const testDB = 1 // dedicated Redis DB for debounce/service tests
19-
20-
func testRedisOptions() *redis.Options {
21-
url := os.Getenv("REDIS_URL")
22-
if url == "" {
23-
url = "redis://localhost:6379"
24-
}
25-
opt, err := redis.ParseURL(url)
26-
if err != nil {
27-
panic("invalid REDIS_URL: " + err.Error())
28-
}
29-
opt.DB = testDB
30-
return opt
31-
}
32-
3319
func TestMain(m *testing.M) {
34-
rdb := redis.NewClient(testRedisOptions())
20+
rdb := redis.NewClient(testhelpers.RedisOptions())
3521
rdb.FlushDB(context.Background())
3622
rdb.Close()
3723
os.Exit(m.Run())
3824
}
3925

4026
func newTestRedisClient(t *testing.T) *redis.Client {
4127
t.Helper()
42-
return redis.NewClient(testRedisOptions())
28+
return redis.NewClient(testhelpers.RedisOptions())
4329
}
4430

4531
func setupEngine(t *testing.T) (service.DebounceEngine, *redis.Client) {

pkg/pipeline/handler/handler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ func (h *Handler) handleEvent(c *gin.Context) {
4848
})
4949
return
5050
}
51+
if err := event.Validate(); err != nil {
52+
c.JSON(http.StatusBadRequest, gin.H{
53+
"error": err.Error(),
54+
"code": "ERR_INVALID_EVENT",
55+
})
56+
return
57+
}
5158

5259
// Persist StageIncoming BEFORE returning 202 (NFR-01).
5360
// Guarantees event durability if the process goroutine never runs.

pkg/pipeline/handler/handler_test.go

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ func (m *mockRepo) TimerExists(_ context.Context, _, _ int64) (bool, error)
4545
func (m *mockRepo) AcquireLock(_ context.Context, _, _ int64) (repository.Mutex, error) {
4646
return nil, nil
4747
}
48-
func (m *mockRepo) ScanStates(_ context.Context) ([]model.PairID, error) { return nil, nil }
48+
func (m *mockRepo) ScanStates(_ context.Context, _ int) ([]model.PairID, error) {
49+
return nil, nil
50+
}
4951
func (m *mockRepo) Ping(_ context.Context) error { return m.pingErr }
5052

5153
// mockSvc satisfies pipelineService.PipelineService for handler tests.
@@ -212,15 +214,60 @@ func TestHandleEvent_400_OnMalformedJSON(t *testing.T) {
212214
}
213215

214216
func TestHandleEvent_400_OnMissingRequiredFields(t *testing.T) {
215-
r := setupRouter(&mockRepo{}, &mockSvc{})
216-
w := httptest.NewRecorder()
217-
req := httptest.NewRequest(http.MethodPost, "/events", bytes.NewReader([]byte(`{}`)))
218-
req.Header.Set("Content-Type", "application/json")
219-
req.Header.Set("X-Bot-Runtime-Secret", testSecret)
220-
r.ServeHTTP(w, req)
221-
222-
if w.Code != http.StatusAccepted {
223-
t.Errorf("status: got %d, want %d (zero-value event is accepted by current schema)", w.Code, http.StatusAccepted)
217+
cases := []struct {
218+
name string
219+
payload string
220+
wantErr string
221+
}{
222+
{
223+
name: "empty body",
224+
payload: `{}`,
225+
wantErr: "contact_id must be > 0",
226+
},
227+
{
228+
name: "missing conversation_id",
229+
payload: `{"contact_id":1,"postback_url":"http://x"}`,
230+
wantErr: "conversation_id must be > 0",
231+
},
232+
{
233+
name: "missing postback_url",
234+
payload: `{"contact_id":1,"conversation_id":2}`,
235+
wantErr: "postback_url is required",
236+
},
237+
{
238+
name: "negative debounce_time",
239+
payload: `{"contact_id":1,"conversation_id":2,"postback_url":"http://x","bot_config":{"debounce_time":-1}}`,
240+
wantErr: "debounce_time must be >= 0",
241+
},
242+
{
243+
name: "segmentation enabled but limit zero",
244+
payload: `{"contact_id":1,"conversation_id":2,"postback_url":"http://x","bot_config":{"text_segmentation_enabled":true}}`,
245+
wantErr: "text_segmentation_limit must be > 0 when segmentation is enabled",
246+
},
247+
}
248+
for _, tc := range cases {
249+
t.Run(tc.name, func(t *testing.T) {
250+
r := setupRouter(&mockRepo{}, &mockSvc{})
251+
w := httptest.NewRecorder()
252+
req := httptest.NewRequest(http.MethodPost, "/events", bytes.NewReader([]byte(tc.payload)))
253+
req.Header.Set("Content-Type", "application/json")
254+
req.Header.Set("X-Bot-Runtime-Secret", testSecret)
255+
r.ServeHTTP(w, req)
256+
257+
if w.Code != http.StatusBadRequest {
258+
t.Errorf("status: got %d, want %d", w.Code, http.StatusBadRequest)
259+
}
260+
var body map[string]any
261+
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
262+
t.Fatalf("invalid JSON response: %v", err)
263+
}
264+
if body["error"] != tc.wantErr {
265+
t.Errorf("body.error: got %q, want %q", body["error"], tc.wantErr)
266+
}
267+
if body["code"] != "ERR_INVALID_EVENT" {
268+
t.Errorf("body.code: got %q, want %q", body["code"], "ERR_INVALID_EVENT")
269+
}
270+
})
224271
}
225272
}
226273

pkg/pipeline/handler/middleware.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package handler
22

33
import (
4+
"log/slog"
45
"net/http"
56

67
"github.com/gin-gonic/gin"
@@ -9,6 +10,9 @@ import (
910
func SecretMiddleware(secret string) gin.HandlerFunc {
1011
return func(c *gin.Context) {
1112
if c.GetHeader("X-Bot-Runtime-Secret") != secret {
13+
slog.Warn("pipeline.auth.unauthorized",
14+
"remote_addr", c.ClientIP(),
15+
)
1216
c.JSON(http.StatusUnauthorized, gin.H{
1317
"error": "unauthorized",
1418
"code": "ERR_UNAUTHORIZED",

pkg/pipeline/model/pipeline.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package model
22

3-
import "time"
3+
import (
4+
"errors"
5+
"time"
6+
)
47

58
// Stage — string type; never use iota or inline strings
69
type Stage string
@@ -48,3 +51,26 @@ type BotConfig struct {
4851
TextSegmentationMinSize int `json:"text_segmentation_min_size"`
4952
DelayPerCharacter float64 `json:"delay_per_character"` // ms per char between parts
5053
}
54+
55+
// Validate checks semantic constraints on a MessageEvent after JSON binding.
56+
func (e *MessageEvent) Validate() error {
57+
if e.ContactID <= 0 {
58+
return errors.New("contact_id must be > 0")
59+
}
60+
if e.ConversationID <= 0 {
61+
return errors.New("conversation_id must be > 0")
62+
}
63+
if e.PostbackURL == "" {
64+
return errors.New("postback_url is required")
65+
}
66+
if e.BotConfig.DebounceTime < 0 {
67+
return errors.New("debounce_time must be >= 0")
68+
}
69+
if e.BotConfig.TextSegmentationEnabled && e.BotConfig.TextSegmentationLimit <= 0 {
70+
return errors.New("text_segmentation_limit must be > 0 when segmentation is enabled")
71+
}
72+
if e.BotConfig.DelayPerCharacter < 0 {
73+
return errors.New("delay_per_character must be >= 0")
74+
}
75+
return nil
76+
}

0 commit comments

Comments
 (0)