Skip to content

Commit ccbeb50

Browse files
authored
test(070): Phase-8 finalization — cross-surface consistency regression + O3 amendment (Related MCP-746) (#559)
* test(070): cross-surface registry-add consistency regression (T021, CN-004/SC-004) Guards that all three add surfaces (Web UI, CLI, MCP) route through the single AddServerFromRegistry keystone op with quarantine-by-default preserved. The production code landed via PR #555; this is the Phase-7 regression that was cut after #555's branch point. Related MCP-746 * docs(070): O3 spec amendment — record stale US1/US2 gap premise The US1/US2 gaps recorded 2026-05-31 were partly stale: Web UI already had an 'Add to MCP' path and the CLI already listed/searched registries. Real work was de-duplicating every surface onto the single AddServerFromRegistry keystone op (PR #555 / 354580f). Related MCP-746 * fix(upstream): resolve connectStdio↔monitorStderr data race on c.stderr TestCrossSurfaceConsistency_RegistryAdd is the first unit test to drive the real supervisor connect loop under -race and it surfaced a genuine production data race: connectStdio reassigns the shared c.stderr field on every (re)connect (connection_stdio.go:217) while the monitorStderr goroutine read that same field to build its scanner (monitoring.go:170). A reconnect's write raced a lingering monitor's read. Capture the stderr reader as a local under monitoringMu in StartStderrMonitoring and pass it to monitorStderr as an argument, completing the existing "abandoned goroutine touches only its locals" design — the monitor no longer reads the shared field, so the write can never race it. Add TestStderrMonitoring_ReassignDuringMonitorNoRace, which reproduces the exact write@217/read@170 pair under -race (red before this change, green after). Related #559 * fix(configsvc): guard Subscribe init-send against concurrent Close (send-on-closed) The registry-add unit tests added in this PR drive a full Runtime + configsvc under -race and surfaced a second production data race (distinct from the connectStdio↔monitorStderr fix): Subscribe spawns a goroutine that sends the initial snapshot on the subscriber channel (service.go:206) holding no lock, while Close (service.go:261) and Unsubscribe close that same channel under subMu. The send racing the close both data-races and panics with "send on closed channel". Deliver the initial snapshot under subMu(R) after confirming the channel is still a live subscriber (isSubscribedLocked), so the close paths — which run under the subMu write lock — are mutually exclusive with the send. The send is non-blocking (the cap-10 buffer is empty at subscribe time) so the lock is never held across a blocking send. Preserve the ctx-canceled early-out. Add TestService_SubscribeCloseRace, which reproduces the panic/race under -race (red before this change, green after). Related #559
1 parent c17f262 commit ccbeb50

6 files changed

Lines changed: 292 additions & 9 deletions

File tree

internal/runtime/configsvc/service.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,23 +200,53 @@ func (s *Service) Subscribe(ctx context.Context) <-chan Update {
200200
s.logger.Debug("New configuration subscriber",
201201
zap.Int("total_subscribers", len(s.subscribers)))
202202

203-
// Send initial snapshot to new subscriber
203+
// Send initial snapshot to new subscriber.
204204
go func() {
205+
// Best-effort early-out if the subscriber canceled before we ran.
206+
select {
207+
case <-ctx.Done():
208+
s.Unsubscribe(ch)
209+
return
210+
default:
211+
}
212+
213+
// Deliver the initial snapshot under subMu(R) so Close()/Unsubscribe()
214+
// (which close ch under the subMu write lock) cannot race or panic this
215+
// send — a send on a closed channel both data-races and panics. The
216+
// membership check covers both teardown paths: Close() nils the slice and
217+
// Unsubscribe() removes this ch. The buffer (cap 10) is empty at subscribe
218+
// time, so a non-blocking send always reaches a still-live subscriber;
219+
// staying non-blocking avoids holding the lock across a blocking send.
220+
s.subMu.RLock()
221+
defer s.subMu.RUnlock()
222+
if !s.isSubscribedLocked(ch) {
223+
return
224+
}
205225
select {
206226
case ch <- Update{
207227
Snapshot: s.Current(),
208228
Type: UpdateTypeInit,
209229
ChangedAt: time.Now(),
210230
Source: "subscription",
211231
}:
212-
case <-ctx.Done():
213-
s.Unsubscribe(ch)
232+
default:
214233
}
215234
}()
216235

217236
return ch
218237
}
219238

239+
// isSubscribedLocked reports whether ch is still a live subscriber. Callers must
240+
// hold subMu (read or write).
241+
func (s *Service) isSubscribedLocked(ch chan Update) bool {
242+
for _, sub := range s.subscribers {
243+
if sub == ch {
244+
return true
245+
}
246+
}
247+
return false
248+
}
249+
220250
// Unsubscribe removes a subscriber channel and closes it.
221251
func (s *Service) Unsubscribe(ch <-chan Update) {
222252
s.subMu.Lock()

internal/runtime/configsvc/service_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package configsvc
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

@@ -328,6 +329,34 @@ func TestSnapshot_ServerNames(t *testing.T) {
328329
}
329330
}
330331

332+
// TestService_SubscribeCloseRace reproduces the close-vs-send data race surfaced
333+
// by TestHandleUpstreamServers_AddFromRegistry_* under -race (MCP-816 / MCP-809
334+
// RV-3). Subscribe spawns a goroutine that sends the initial snapshot on the
335+
// subscriber channel (service.go:206) while holding no lock; Close (service.go:261)
336+
// closes that same channel under subMu. A send racing the close both data-races
337+
// and can panic ("send on closed channel"). Run under `go test -race`: trips
338+
// before the lock+membership-guarded send, green after. Run with high parallelism
339+
// so a Subscribe-init send overlaps the Close in most iterations.
340+
func TestService_SubscribeCloseRace(t *testing.T) {
341+
const iterations = 200
342+
343+
var wg sync.WaitGroup
344+
for i := 0; i < iterations; i++ {
345+
wg.Add(1)
346+
go func() {
347+
defer wg.Done()
348+
svc := NewService(&config.Config{Listen: "127.0.0.1:8080"}, "/tmp/config.json", zap.NewNop())
349+
ctx, cancel := context.WithCancel(context.Background())
350+
defer cancel()
351+
352+
// Subscribe schedules the init-snapshot send goroutine; Close races it.
353+
_ = svc.Subscribe(ctx)
354+
svc.Close()
355+
}()
356+
}
357+
wg.Wait()
358+
}
359+
331360
func TestService_Close(t *testing.T) {
332361
cfg := &config.Config{
333362
Listen: "127.0.0.1:8080",
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package server
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"net/http"
8+
"net/http/httptest"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
"go.uber.org/zap"
15+
16+
"github.com/smart-mcp-proxy/mcpproxy-go/internal/config"
17+
"github.com/smart-mcp-proxy/mcpproxy-go/internal/contracts"
18+
"github.com/smart-mcp-proxy/mcpproxy-go/internal/httpapi"
19+
)
20+
21+
// Spec 070 keystone regression (T021 / CN-004 / FR-010 / SC-004).
22+
//
23+
// Every add surface (REST, MCP, CLI) funnels through the single keystone
24+
// AddServerFromRegistryRef, so the registry-result -> config.ServerConfig
25+
// normalization lives in exactly one place. This test is the guard that keeps
26+
// it that way: it drives the SAME logical add — same (registry, serverId, name,
27+
// env) — through each surface against its own isolated server, then asserts the
28+
// PERSISTED config.ServerConfig is byte-identical across all three (modulo the
29+
// Created/Updated timestamps) and that every one is quarantined (SC-004).
30+
//
31+
// If a future change lets one surface bypass the keystone (e.g. the Web UI's
32+
// old client-side install_cmd parsing, or a surface that forgets the quarantine
33+
// default), the persisted configs diverge and this test fails.
34+
//
35+
// Surfaces exercised in-process:
36+
// - MCP: the real upstream_servers handler (operation=add_from_registry),
37+
// which extracts args from the MCP request (note: env arrives as env_json).
38+
// - REST: a real HTTP POST to the actual chi router handler
39+
// (POST /api/v1/registries/{id}/servers/{serverId}/add), exercising JSON
40+
// body decode + URL param extraction + auth.
41+
// - CLI add path: the CLI is a thin HTTP client of the REST route, so its
42+
// config-derivation contribution bottoms out at the same controller method
43+
// (AddServerFromRegistryRef). The full CLI binary->daemon path is separately
44+
// covered end-to-end by TestRegistryAddCLIE2E.
45+
func TestCrossSurfaceConsistency_RegistryAdd(t *testing.T) {
46+
// One stdio entry whose install command declares a required input
47+
// (${API_KEY}); supplying it via env exercises required-input satisfaction
48+
// AND env carry-through on the persisted config.
49+
servers := []map[string]interface{}{
50+
{"id": "everything", "name": "everything", "installCmd": "npx -y srv --key ${API_KEY}"},
51+
}
52+
startTestRegistry(t, servers) // registers id="testreg" against a local httptest server
53+
54+
const (
55+
regID = "testreg"
56+
serverID = "everything"
57+
addName = "consistency-srv"
58+
)
59+
env := map[string]string{"API_KEY": "secret-123"}
60+
61+
// --- Surface 1: MCP -------------------------------------------------------
62+
srvMCP := newConsistencyServer(t)
63+
proxy := createTestMCPProxyServer(t)
64+
proxy.mainServer = srvMCP
65+
66+
envJSON, err := json.Marshal(env)
67+
require.NoError(t, err)
68+
mcpResult := callAddFromRegistry(t, proxy, map[string]interface{}{
69+
"operation": "add_from_registry",
70+
"registry": regID,
71+
"id": serverID,
72+
"name": addName,
73+
"env_json": string(envJSON),
74+
})
75+
require.False(t, mcpResult.IsError, "MCP add must succeed: %v", mcpResult.Content)
76+
mcpCfg := persistedServer(t, srvMCP, addName)
77+
78+
// --- Surface 2: REST (real HTTP through the chi router) -------------------
79+
srvREST := newConsistencyServer(t)
80+
api := httpapi.NewServer(srvREST, zap.NewNop().Sugar(), nil)
81+
82+
body, err := json.Marshal(contracts.AddFromRegistryRequest{Name: addName, Env: env})
83+
require.NoError(t, err)
84+
req := httptest.NewRequest(http.MethodPost,
85+
"/api/v1/registries/"+regID+"/servers/"+serverID+"/add", bytes.NewReader(body))
86+
req.Header.Set("Content-Type", "application/json")
87+
req.Header.Set("X-API-Key", consistencyAPIKey)
88+
rec := httptest.NewRecorder()
89+
api.Router().ServeHTTP(rec, req)
90+
require.Equal(t, http.StatusOK, rec.Code, "REST add must succeed: %s", rec.Body.String())
91+
restCfg := persistedServer(t, srvREST, addName)
92+
93+
// --- Surface 3: CLI add path (shared controller bottom) -------------------
94+
srvCLI := newConsistencyServer(t)
95+
cliCfg, rerr, err := srvCLI.AddServerFromRegistryRef(context.Background(), regID, serverID, addName, env, nil)
96+
require.NoError(t, err)
97+
require.Nil(t, rerr)
98+
require.NotNil(t, cliCfg)
99+
cliPersisted := persistedServer(t, srvCLI, addName)
100+
101+
// --- Cross-surface byte-identity (CN-004) ---------------------------------
102+
mcpJSON := canonicalServerJSON(t, mcpCfg)
103+
restJSON := canonicalServerJSON(t, restCfg)
104+
cliJSON := canonicalServerJSON(t, cliPersisted)
105+
106+
assert.Equal(t, mcpJSON, restJSON, "REST add must persist a byte-identical config to MCP add")
107+
assert.Equal(t, mcpJSON, cliJSON, "CLI add path must persist a byte-identical config to MCP add")
108+
109+
// --- Quarantine invariant (SC-004 / CN-002) -------------------------------
110+
assert.True(t, mcpCfg.Quarantined, "MCP-added server must be quarantined")
111+
assert.True(t, restCfg.Quarantined, "REST-added server must be quarantined")
112+
assert.True(t, cliPersisted.Quarantined, "CLI-added server must be quarantined")
113+
114+
// --- Sanity on the shared derivation -------------------------------------
115+
assert.Equal(t, "stdio", mcpCfg.Protocol)
116+
assert.Equal(t, "npx", mcpCfg.Command)
117+
assert.Equal(t, []string{"-y", "srv", "--key", "${API_KEY}"}, mcpCfg.Args)
118+
assert.Equal(t, "secret-123", mcpCfg.Env["API_KEY"])
119+
assert.True(t, mcpCfg.Enabled)
120+
}
121+
122+
const consistencyAPIKey = "t021-consistency-key"
123+
124+
// newConsistencyServer builds an isolated *Server (own data dir + storage) with
125+
// a known API key so the REST surface can authenticate. The storage handle is
126+
// closed on cleanup so the temp-dir removal succeeds on Windows.
127+
func newConsistencyServer(t *testing.T) *Server {
128+
t.Helper()
129+
cfg := config.DefaultConfig()
130+
cfg.DataDir = t.TempDir()
131+
cfg.Listen = "127.0.0.1:0"
132+
cfg.APIKey = consistencyAPIKey
133+
srv, err := NewServer(cfg, zap.NewNop())
134+
require.NoError(t, err)
135+
t.Cleanup(func() { _ = srv.Shutdown() })
136+
return srv
137+
}
138+
139+
// persistedServer reads the actually-persisted ServerConfig back from the
140+
// server's live config snapshot (not the function return value), so the
141+
// comparison reflects what reached storage.
142+
func persistedServer(t *testing.T, srv *Server, name string) *config.ServerConfig {
143+
t.Helper()
144+
cfg := srv.runtime.Config()
145+
require.NotNil(t, cfg, "runtime config must be available")
146+
for _, sc := range cfg.Servers {
147+
if sc != nil && sc.Name == name {
148+
return sc
149+
}
150+
}
151+
t.Fatalf("server %q not found in persisted config", name)
152+
return nil
153+
}
154+
155+
// canonicalServerJSON serializes a ServerConfig with the per-add timestamps
156+
// zeroed, so byte-comparison reflects only the derived/persisted fields.
157+
func canonicalServerJSON(t *testing.T, sc *config.ServerConfig) string {
158+
t.Helper()
159+
clone := *sc
160+
clone.Created = time.Time{}
161+
clone.Updated = time.Time{}
162+
b, err := json.Marshal(&clone)
163+
require.NoError(t, err)
164+
return string(b)
165+
}

internal/upstream/core/monitoring.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"context"
66
"fmt"
7+
"io"
78
"os"
89
"strings"
910
"time"
@@ -31,17 +32,25 @@ func (c *Client) StartStderrMonitoring() {
3132
return
3233
}
3334

35+
// Capture the stderr reader as a local under monitoringMu. connectStdio
36+
// reassigns c.stderr on every (re)connect (connection_stdio.go:217); passing
37+
// the reader as a goroutine arg keeps monitorStderr from reading the shared
38+
// field, so a later reconnect's write never races a lingering monitor's read
39+
// (the connectStdio↔monitorStderr data race, MCP-816).
40+
stderr := c.stderr
41+
3442
// Create context for stderr monitoring. The monitor goroutine receives the
35-
// context and its done channel as locals so an abandoned (timed-out)
36-
// goroutine never reads the shared fields a later Start may overwrite.
43+
// context, stderr reader, and its done channel as locals so an abandoned
44+
// (timed-out) goroutine never reads the shared fields a later Start may
45+
// overwrite.
3746
ctx, cancel := context.WithCancel(context.Background())
3847
done := make(chan struct{})
3948
c.stderrMonitoringCtx, c.stderrMonitoringCancel = ctx, cancel
4049
c.stderrMonitoringDone = done
4150

4251
go func() {
4352
defer close(done)
44-
c.monitorStderr(ctx)
53+
c.monitorStderr(ctx, stderr)
4554
}()
4655

4756
c.logger.Debug("Started stderr monitoring",
@@ -165,9 +174,12 @@ func (c *Client) monitorProcess(ctx context.Context) {
165174
}
166175
}
167176

168-
// monitorStderr monitors stderr output and logs it to both main and server-specific logs
169-
func (c *Client) monitorStderr(ctx context.Context) {
170-
scanner := bufio.NewScanner(c.stderr)
177+
// monitorStderr monitors stderr output and logs it to both main and server-specific logs.
178+
// The stderr reader is passed as an argument (captured under monitoringMu by the
179+
// caller) rather than read from c.stderr, so a concurrent connectStdio reassigning
180+
// the shared field cannot race this goroutine's read (MCP-816).
181+
func (c *Client) monitorStderr(ctx context.Context, stderr io.Reader) {
182+
scanner := bufio.NewScanner(stderr)
171183
for scanner.Scan() {
172184
select {
173185
case <-ctx.Done():

internal/upstream/core/monitoring_race_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,49 @@ func TestStderrMonitoring_StartStopRace(t *testing.T) {
4848
c.StopStderrMonitoring()
4949
}
5050

51+
// TestStderrMonitoring_ReassignDuringMonitorNoRace reproduces the
52+
// connectStdio(connection_stdio.go:217)-write vs monitorStderr(monitoring.go:170)-read
53+
// data race on the shared c.stderr field, surfaced by TestCrossSurfaceConsistency_RegistryAdd
54+
// in CI (MCP-816 / MCP-809 RV-3). connectStdio reassigns c.stderr on every
55+
// (re)connect, then starts the monitor; the monitor goroutine built its scanner
56+
// from the shared c.stderr field instead of a captured local, so a reconnect's
57+
// field write raced the lingering goroutine's read. Run under `go test -race`:
58+
// trips before the capture-local fix, green after. Empty readers EOF immediately
59+
// so each monitor goroutine exits at once and the loop stays fast.
60+
func TestStderrMonitoring_ReassignDuringMonitorNoRace(t *testing.T) {
61+
c := &Client{
62+
transportType: transportStdio,
63+
stderr: strings.NewReader(""),
64+
logger: zap.NewNop(),
65+
config: &config.ServerConfig{Name: "race"},
66+
}
67+
68+
const iterations = 500
69+
var wg sync.WaitGroup
70+
wg.Add(2)
71+
72+
// Mimics connectStdio: reassign c.stderr on each reconnect, then (re)start
73+
// the monitor. The reassignment is the field write the detector flagged at
74+
// connection_stdio.go:217; the monitor goroutine spawned by the previous
75+
// iteration reads c.stderr at monitoring.go:170 concurrently.
76+
go func() {
77+
defer wg.Done()
78+
for i := 0; i < iterations; i++ {
79+
c.stderr = strings.NewReader("")
80+
c.StartStderrMonitoring()
81+
}
82+
}()
83+
go func() {
84+
defer wg.Done()
85+
for i := 0; i < iterations; i++ {
86+
c.StopStderrMonitoring()
87+
}
88+
}()
89+
90+
wg.Wait()
91+
c.StopStderrMonitoring()
92+
}
93+
5194
// TestStderrMonitoring_AbandonedMonitorNoRace models the round-5 escape: the
5295
// monitor goroutine is still alive when Stop is called (its stderr Read blocks),
5396
// so Stop hits the 500ms timeout and abandons it. With the old reused-WaitGroup

specs/070-registry-easy-upstream-add/spec.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
- Q: Add-from-registry today via MCP? → A: Works, but the agent must hand-construct the upstream config from a search result; a convenience "add from registry result" mode is desired.
1616
- Q: Does quarantine-by-default hold on add? → A: Yes, on every surface (they share the core add path); this MUST be preserved as an invariant.
1717

18+
### Session 2026-06-01 (O3 — post-implementation amendment)
19+
20+
- Amendment (O3): The US1/US2 "gaps" recorded on 2026-05-31 were partly **stale**. During implementation it was found that the Web UI already exposed an "Add to MCP" path and the CLI already *listed and searched* registries; the genuine gap was narrower — each surface re-implemented add logic (including a client-side parse on the Web UI, a CN-001 risk) instead of sharing one path, and the CLI could not add directly *from a search result*. The real work delivered was therefore **de-duplicating every surface (Web UI, CLI, MCP) onto the single `AddServerFromRegistry` keystone core op** (`internal/registries/`), so quarantine-by-default and validation live in exactly one place. This landed in PR #555 / commit `354580f4` and is guarded by a cross-surface consistency regression (`internal/server/consistency_crosssurface_test.go`, T021 / CN-004 / SC-004). The other decision artifacts are already reflected in shipped code — O1 (required-input heuristic), O2 (key-absent skip), O4 (P2 in scope); O3 (this note) was the only outstanding artifact.
21+
1822
## User Scenarios & Testing *(mandatory)*
1923

2024
### User Story 1 — Add a discovered server to upstream from the Web UI in one flow (Priority: P1)

0 commit comments

Comments
 (0)