Skip to content

Commit 0afcc21

Browse files
committed
perf: share websocket streams, cache settings reads, and parallelize auto-heal inspections
1 parent 3100054 commit 0afcc21

File tree

8 files changed

+819
-213
lines changed

8 files changed

+819
-213
lines changed

backend/internal/api/ws_handler.go

Lines changed: 276 additions & 49 deletions
Large diffs are not rendered by default.
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"net/http/httptest"
7+
"strings"
8+
"sync/atomic"
9+
"testing"
10+
"time"
11+
12+
systemtypes "github.com/getarcaneapp/arcane/types/system"
13+
"github.com/gin-gonic/gin"
14+
"github.com/gorilla/websocket"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func newTestWebSocketHandler() *WebSocketHandler {
19+
return &WebSocketHandler{
20+
wsMetrics: NewWebSocketMetrics(),
21+
logStreams: make(map[string]*wsLogStream),
22+
wsUpgrader: websocket.Upgrader{
23+
CheckOrigin: func(r *http.Request) bool { return true },
24+
},
25+
}
26+
}
27+
28+
func dialWebSocket(t *testing.T, serverURL, path string) *websocket.Conn {
29+
t.Helper()
30+
31+
wsURL := "ws" + strings.TrimPrefix(serverURL, "http") + path
32+
conn, resp, err := websocket.DefaultDialer.Dial(wsURL, nil)
33+
require.NoError(t, err)
34+
if resp != nil {
35+
_ = resp.Body.Close()
36+
}
37+
38+
return conn
39+
}
40+
41+
func TestWebSocketHandler_ProjectLogs_SharedStreamPerTarget(t *testing.T) {
42+
gin.SetMode(gin.TestMode)
43+
44+
handler := newTestWebSocketHandler()
45+
var starts atomic.Int32
46+
var cancels atomic.Int32
47+
48+
handler.projectLogStreamer = func(ctx context.Context, projectID string, logsChan chan<- string, follow bool, tail, since string, timestamps bool) error {
49+
starts.Add(1)
50+
ticker := time.NewTicker(20 * time.Millisecond)
51+
defer ticker.Stop()
52+
defer cancels.Add(1)
53+
54+
for {
55+
select {
56+
case <-ctx.Done():
57+
return ctx.Err()
58+
case <-ticker.C:
59+
select {
60+
case <-ctx.Done():
61+
return ctx.Err()
62+
case logsChan <- "api | shared project log":
63+
}
64+
}
65+
}
66+
}
67+
68+
router := gin.New()
69+
router.GET("/api/environments/:id/ws/projects/:projectId/logs", handler.ProjectLogs)
70+
server := httptest.NewServer(router)
71+
defer server.Close()
72+
73+
conn1 := dialWebSocket(t, server.URL, "/api/environments/0/ws/projects/project-1/logs")
74+
conn2 := dialWebSocket(t, server.URL, "/api/environments/0/ws/projects/project-1/logs")
75+
76+
_ = conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
77+
_, _, err := conn1.ReadMessage()
78+
require.NoError(t, err)
79+
80+
_ = conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
81+
_, _, err = conn2.ReadMessage()
82+
require.NoError(t, err)
83+
84+
require.Eventually(t, func() bool {
85+
return starts.Load() == 1
86+
}, 2*time.Second, 20*time.Millisecond)
87+
88+
require.Eventually(t, func() bool {
89+
handler.logStreamsMu.Lock()
90+
defer handler.logStreamsMu.Unlock()
91+
return len(handler.logStreams) == 1
92+
}, time.Second, 20*time.Millisecond)
93+
94+
require.NoError(t, conn1.Close())
95+
96+
time.Sleep(100 * time.Millisecond)
97+
98+
handler.logStreamsMu.Lock()
99+
activeAfterFirstClose := len(handler.logStreams)
100+
handler.logStreamsMu.Unlock()
101+
require.Equal(t, 1, activeAfterFirstClose)
102+
require.Equal(t, int32(0), cancels.Load())
103+
104+
require.NoError(t, conn2.Close())
105+
106+
require.Eventually(t, func() bool {
107+
handler.logStreamsMu.Lock()
108+
defer handler.logStreamsMu.Unlock()
109+
return len(handler.logStreams) == 0
110+
}, 2*time.Second, 20*time.Millisecond)
111+
require.Eventually(t, func() bool {
112+
return cancels.Load() == 1
113+
}, 2*time.Second, 20*time.Millisecond)
114+
}
115+
116+
func TestWebSocketHandler_SystemStats_UsesSharedSampler(t *testing.T) {
117+
gin.SetMode(gin.TestMode)
118+
119+
handler := newTestWebSocketHandler()
120+
var collects atomic.Int32
121+
122+
handler.systemStatsCollector = func(ctx context.Context) systemtypes.SystemStats {
123+
n := collects.Add(1)
124+
return systemtypes.SystemStats{
125+
CPUUsage: float64(n),
126+
}
127+
}
128+
129+
router := gin.New()
130+
router.GET("/api/environments/:id/ws/system/stats", handler.SystemStats)
131+
server := httptest.NewServer(router)
132+
defer server.Close()
133+
134+
conn1 := dialWebSocket(t, server.URL, "/api/environments/0/ws/system/stats?interval=1")
135+
conn2 := dialWebSocket(t, server.URL, "/api/environments/0/ws/system/stats?interval=1")
136+
137+
_ = conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
138+
_, _, err := conn1.ReadMessage()
139+
require.NoError(t, err)
140+
141+
_ = conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
142+
_, _, err = conn2.ReadMessage()
143+
require.NoError(t, err)
144+
145+
time.Sleep(1200 * time.Millisecond)
146+
147+
afterWarmup := collects.Load()
148+
require.GreaterOrEqual(t, afterWarmup, int32(1))
149+
require.LessOrEqual(t, afterWarmup, int32(3))
150+
151+
require.NoError(t, conn1.Close())
152+
require.NoError(t, conn2.Close())
153+
154+
require.Eventually(t, func() bool {
155+
handler.systemStatsSampler.lifecycleMu.Lock()
156+
defer handler.systemStatsSampler.lifecycleMu.Unlock()
157+
return !handler.systemStatsSampler.running && handler.systemStatsSampler.clients == 0
158+
}, 2*time.Second, 20*time.Millisecond)
159+
160+
stoppedAt := collects.Load()
161+
time.Sleep(1200 * time.Millisecond)
162+
require.Equal(t, stoppedAt, collects.Load())
163+
}

backend/internal/models/settings.go

Lines changed: 86 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import (
55
"errors"
66
"fmt"
77
"reflect"
8-
"slices"
98
"strconv"
109
"strings"
10+
"sync"
1111
"time"
1212
)
1313

@@ -21,6 +21,21 @@ type SettingVariable struct {
2121
Value string
2222
}
2323

24+
type settingFieldMeta struct {
25+
index int
26+
key string
27+
attrs string
28+
isPublic bool
29+
isSensitive bool
30+
isLocal bool
31+
}
32+
33+
var settingsFieldCache struct {
34+
once sync.Once
35+
ordered []settingFieldMeta
36+
byKey map[string]settingFieldMeta
37+
}
38+
2439
// IsTrue returns true if the value is a truthy string
2540
func (s SettingVariable) IsTrue() bool {
2641
ok, _ := strconv.ParseBool(s.Value)
@@ -150,29 +165,63 @@ func (SettingVariable) TableName() string {
150165
return "settings"
151166
}
152167

153-
func (s *Settings) ToSettingVariableSlice(showAll bool, redactSensitiveValues bool) []SettingVariable {
154-
cfgValue := reflect.ValueOf(s).Elem()
155-
cfgType := cfgValue.Type()
156-
157-
var res []SettingVariable
158-
159-
for i := 0; i < cfgType.NumField(); i++ {
160-
field := cfgType.Field(i)
168+
func buildSettingsFieldCache() {
169+
rt := reflect.TypeFor[Settings]()
170+
ordered := make([]settingFieldMeta, 0, rt.NumField())
171+
byKey := make(map[string]settingFieldMeta, rt.NumField())
161172

173+
for i := 0; i < rt.NumField(); i++ {
174+
field := rt.Field(i)
162175
key, attrs, _ := strings.Cut(field.Tag.Get("key"), ",")
163176
if key == "" {
164177
continue
165178
}
166179

167-
if !showAll && !strings.Contains(attrs, "public") {
180+
meta := settingFieldMeta{
181+
index: i,
182+
key: key,
183+
attrs: attrs,
184+
isPublic: strings.Contains(attrs, "public"),
185+
isSensitive: strings.Contains(attrs, "sensitive"),
186+
isLocal: strings.Contains(attrs, "local"),
187+
}
188+
ordered = append(ordered, meta)
189+
byKey[key] = meta
190+
}
191+
192+
settingsFieldCache.ordered = ordered
193+
settingsFieldCache.byKey = byKey
194+
}
195+
196+
func getSettingsFieldCache() ([]settingFieldMeta, map[string]settingFieldMeta) {
197+
settingsFieldCache.once.Do(buildSettingsFieldCache)
198+
return settingsFieldCache.ordered, settingsFieldCache.byKey
199+
}
200+
201+
func (s *Settings) Clone() *Settings {
202+
if s == nil {
203+
return &Settings{}
204+
}
205+
206+
clone := *s
207+
return &clone
208+
}
209+
210+
func (s *Settings) ToSettingVariableSlice(showAll bool, redactSensitiveValues bool) []SettingVariable {
211+
cfgValue := reflect.ValueOf(s).Elem()
212+
fields, _ := getSettingsFieldCache()
213+
214+
res := make([]SettingVariable, 0, len(fields))
215+
for _, field := range fields {
216+
if !showAll && !field.isPublic {
168217
continue
169218
}
170219

171-
value := cfgValue.Field(i).FieldByName("Value").String()
172-
value = redactSettingValue(key, value, attrs, redactSensitiveValues)
220+
value := cfgValue.Field(field.index).FieldByName("Value").String()
221+
value = redactSettingValue(field.key, value, field.attrs, redactSensitiveValues)
173222

174223
settingVariable := SettingVariable{
175-
Key: key,
224+
Key: field.key,
176225
Value: value,
177226
}
178227
res = append(res, settingVariable)
@@ -183,65 +232,47 @@ func (s *Settings) ToSettingVariableSlice(showAll bool, redactSensitiveValues bo
183232

184233
func (s *Settings) FieldByKey(key string) (defaultValue string, isPublic bool, isSensitive bool, err error) {
185234
rv := reflect.ValueOf(s).Elem()
186-
rt := rv.Type()
187-
188-
for i := 0; i < rt.NumField(); i++ {
189-
tagValue := strings.Split(rt.Field(i).Tag.Get("key"), ",")
190-
keyFromTag := tagValue[0]
191-
isPublic = slices.Contains(tagValue, "public")
192-
isSensitive = slices.Contains(tagValue, "sensitive")
235+
_, byKey := getSettingsFieldCache()
193236

194-
if keyFromTag != key {
195-
continue
196-
}
197-
198-
valueField := rv.Field(i).FieldByName("Value")
199-
return valueField.String(), isPublic, isSensitive, nil
237+
field, ok := byKey[key]
238+
if !ok {
239+
return "", false, false, SettingKeyNotFoundError{field: key}
200240
}
201241

202-
return "", false, false, SettingKeyNotFoundError{field: key}
242+
valueField := rv.Field(field.index).FieldByName("Value")
243+
return valueField.String(), field.isPublic, field.isSensitive, nil
203244
}
204245

205246
func (s *Settings) IsLocalSetting(key string) bool {
206-
rt := reflect.TypeFor[Settings]()
207-
208-
for field := range rt.Fields() {
209-
tagValue := strings.Split(field.Tag.Get("key"), ",")
210-
keyFromTag := tagValue[0]
211-
212-
if keyFromTag == key {
213-
return slices.Contains(tagValue, "local")
214-
}
247+
_, byKey := getSettingsFieldCache()
248+
field, ok := byKey[key]
249+
if !ok {
250+
return false
215251
}
216252

217-
return false
253+
return field.isLocal
218254
}
219255

220256
func (s *Settings) UpdateField(key string, value string, noSensitive bool) error {
221257
rv := reflect.ValueOf(s).Elem()
222-
rt := rv.Type()
223-
224-
for i := 0; i < rt.NumField(); i++ {
225-
tagValue, attrs, _ := strings.Cut(rt.Field(i).Tag.Get("key"), ",")
226-
if tagValue != key {
227-
continue
228-
}
258+
_, byKey := getSettingsFieldCache()
229259

230-
// If the field is sensitive and noSensitive is true, we skip that
231-
if noSensitive && strings.Contains(attrs, "sensitive") {
232-
return SettingSensitiveForbiddenError{field: key}
233-
}
260+
field, ok := byKey[key]
261+
if !ok {
262+
return SettingKeyNotFoundError{field: key}
263+
}
234264

235-
valueField := rv.Field(i).FieldByName("Value")
236-
if !valueField.CanSet() {
237-
return fmt.Errorf("field Value in SettingVariable is not settable for config key '%s'", key)
238-
}
265+
if noSensitive && field.isSensitive {
266+
return SettingSensitiveForbiddenError{field: key}
267+
}
239268

240-
valueField.SetString(value)
241-
return nil
269+
valueField := rv.Field(field.index).FieldByName("Value")
270+
if !valueField.CanSet() {
271+
return fmt.Errorf("field Value in SettingVariable is not settable for config key '%s'", key)
242272
}
243273

244-
return SettingKeyNotFoundError{field: key}
274+
valueField.SetString(value)
275+
return nil
245276
}
246277

247278
// helper keeps redaction logic in one place; behavior unchanged

0 commit comments

Comments
 (0)