Skip to content

Commit 8fde431

Browse files
authored
feat(telemetry): W3C trace context extraction and SSE/transport observability (#416)
* feat(telemetry): extract W3C trace context from upstream and add SSE/transport observability - Call propagation.extract() in middleware so server span becomes a child of the upstream gateway (Envoy) span, connecting distributed traces - Rename server span to 'http.server.request' (OTel/Datadog convention) - Add http.transport (sse|jsonrpc) attribute: GET /mcp = SSE long-lived stream, everything else = one-shot JSON-RPC - Add http.response.close_reason (completed|client_disconnect): tracks which event fired first — finish (server-side complete) vs close (client disconnect), making SSE max-stream timeouts distinguishable from genuine app slowness that triggers envoy upstream_rq_timeout alerts - Add mcp.sse.connection.duration histogram (seconds) for SSE connection lifetime, separate from http.server.request.duration to avoid SSE long-tail skewing the one-shot request distribution - Expand propagator to CompositePropagator([W3CTraceContext, W3CBaggage]) - Add deployment.environment resource attribute (OTEL_DEPLOYMENT_ENVIRONMENT) * refactor(telemetry): collapse closeReason intermediate to a single mutable - Default closeReason to 'completed' instead of undefined; the close listener flips it to 'client_disconnect' only when relevant - Drop the finalCloseReason intermediate const and the ?? 'completed' fallback — they were tracking the same logical state - finish listener becomes a bare flush reference; close listener stays a small arrow that mutates before flushing flushOnce() in the flush function continues to guard against double emission, so a close-after-finish overwrite never reaches a downstream consumer. * refactor(telemetry): extract createDefaultPropagator helper Centralize the W3C TraceContext + Baggage composite propagator chain so test setup and initTelemetry register the exact same propagator — preventing drift where one side adds a propagator the other lacks.
1 parent 5f28108 commit 8fde431

7 files changed

Lines changed: 374 additions & 25 deletions

File tree

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
"freee-mcp": patch
3+
---
4+
5+
Remote モードで W3C `traceparent` を抽出して上流ゲートウェイの trace に server span を接続するようにし、SSE と JSON-RPC を区別できる観測ラベルを追加。
6+
7+
- middleware で `propagation.extract` を呼び、サーバー span を上流(Envoy/Istio)の child として紐付け
8+
- span 名を `http.server.request` に変更(Datadog operation 命名に整合)
9+
- `http.transport` (`sse` | `jsonrpc`)、`http.response.close_reason` (`completed` | `client_disconnect`) を span attribute と canonical log に追加
10+
- 新ヒストグラム `mcp.sse.connection.duration` を追加 — SSE 接続寿命を専用バケットで観測
11+
- propagator を `CompositePropagator([W3CTraceContext, W3CBaggage])` に拡張、resource に `deployment.environment` を付与

src/server/request-context.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,31 @@ export interface RequestRecorderContext {
8686
path: string;
8787
}
8888

89+
/**
90+
* Inbound transport classification for a single MCP request.
91+
*
92+
* - `sse`: a long-lived SSE GET (Streamable-HTTP transport). Duration tracks
93+
* the connection lifetime, not the time to produce a JSON-RPC response.
94+
* - `jsonrpc`: a short POST that yields a single JSON-RPC response.
95+
*
96+
* This distinction matters operationally because a 9-minute SSE connection
97+
* looks identical to a 9-minute slow JSON-RPC call when filtering on
98+
* duration alone.
99+
*/
100+
export type CanonicalRequestTransport = 'sse' | 'jsonrpc';
101+
102+
/**
103+
* How the response stream actually ended.
104+
*
105+
* - `completed`: server emitted the full response and `res.on('finish')` fired.
106+
* - `client_disconnect`: client closed the socket before completion;
107+
* `res.on('close')` fired without prior `finish`.
108+
*
109+
* Distinguishes legitimate SSE max-stream timeouts (`completed` at the route's
110+
* stream limit) from client-side aborts.
111+
*/
112+
export type CanonicalCloseReason = 'completed' | 'client_disconnect';
113+
89114
/**
90115
* Canonical log line: the complete payload emitted as one JSON log entry
91116
* per HTTP request at `res.on('finish')`. Consumers (pino, Datadog) see
@@ -114,6 +139,8 @@ export interface CanonicalLogPayload {
114139
path: string;
115140
status: number;
116141
duration_ms: number;
142+
transport: CanonicalRequestTransport;
143+
close_reason: CanonicalCloseReason;
117144
};
118145
mcp: {
119146
tool_calls: ToolCallInfo[];
@@ -185,7 +212,12 @@ export class RequestRecorder {
185212
}
186213

187214
/** Builds the canonical log payload; caller passes it to pino. */
188-
buildPayload(http: { status: number; duration_ms: number }): CanonicalLogPayload {
215+
buildPayload(http: {
216+
status: number;
217+
duration_ms: number;
218+
transport: CanonicalRequestTransport;
219+
close_reason: CanonicalCloseReason;
220+
}): CanonicalLogPayload {
189221
return {
190222
request_id: this.context.request_id,
191223
source_ip: this.context.source_ip,
@@ -197,6 +229,8 @@ export class RequestRecorder {
197229
path: this.context.path,
198230
status: http.status,
199231
duration_ms: http.duration_ms,
232+
transport: http.transport,
233+
close_reason: http.close_reason,
200234
},
201235
mcp: {
202236
tool_calls: this.toolCalls,

src/telemetry/init.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { SpanKind, SpanStatusCode, context, metrics, propagation, trace } from '@opentelemetry/api';
22
import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks';
3-
import { W3CTraceContextPropagator } from '@opentelemetry/core';
3+
import {
4+
CompositePropagator,
5+
W3CBaggagePropagator,
6+
W3CTraceContextPropagator,
7+
} from '@opentelemetry/core';
48
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http';
59
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
610
import { resourceFromAttributes } from '@opentelemetry/resources';
@@ -21,6 +25,17 @@ export function isOtelEnabled(): boolean {
2125
return _enabled;
2226
}
2327

28+
/**
29+
* Build the propagator stack the server uses for both incoming extract and
30+
* outgoing inject. Exported so test setup can register the exact same
31+
* propagator chain — keeping prod and test invariants in sync.
32+
*/
33+
export function createDefaultPropagator(): CompositePropagator {
34+
return new CompositePropagator({
35+
propagators: [new W3CTraceContextPropagator(), new W3CBaggagePropagator()],
36+
});
37+
}
38+
2439
const SENSITIVE_PARAMS = new Set([
2540
'code',
2641
'code_verifier',
@@ -114,9 +129,13 @@ export function initTelemetry(serviceVersion: string): OtelHandle | null {
114129
const serviceName = process.env.OTEL_SERVICE_NAME || 'freee-mcp';
115130
const endpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT || 'http://localhost:4318';
116131

132+
const deploymentEnv =
133+
process.env.OTEL_DEPLOYMENT_ENVIRONMENT ?? process.env.NODE_ENV ?? 'unknown';
134+
117135
const resource = resourceFromAttributes({
118136
'service.name': serviceName,
119137
'service.version': serviceVersion,
138+
'deployment.environment': deploymentEnv,
120139
});
121140

122141
const exporter = new OTLPTraceExporter({
@@ -139,10 +158,13 @@ export function initTelemetry(serviceVersion: string): OtelHandle | null {
139158
spanProcessors: [new BatchSpanProcessor(exporter)],
140159
});
141160

142-
// Register global context manager, propagator, and tracer provider
161+
// Register global context manager, propagator, and tracer provider.
162+
// CompositePropagator wraps W3C Trace Context (parent linkage) + W3C Baggage
163+
// (cross-cutting attributes) so future propagators can be added without
164+
// changing call sites.
143165
const contextManager = new AsyncLocalStorageContextManager();
144166
context.setGlobalContextManager(contextManager);
145-
propagation.setGlobalPropagator(new W3CTraceContextPropagator());
167+
propagation.setGlobalPropagator(createDefaultPropagator());
146168
trace.setGlobalTracerProvider(provider);
147169

148170
// Initialize MeterProvider for metrics export

src/telemetry/metrics.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { afterEach, describe, expect, it } from 'vitest';
99
import {
1010
getHttpRequestDuration,
1111
getHttpRequestErrorCount,
12+
getMcpSseConnectionDuration,
1213
getToolErrorCount,
1314
getToolInvocationDuration,
1415
} from './metrics.js';
@@ -57,6 +58,29 @@ describe('HTTP metrics', () => {
5758

5859
await provider.shutdown();
5960
});
61+
62+
it('records mcp.sse.connection.duration with seconds unit', async () => {
63+
// The new SSE-only histogram is the signal that distinguishes a normal
64+
// long-lived stream (ends at the platform stream timeout) from an
65+
// anomalous one (ends much earlier or later). Locking the unit to "s"
66+
// keeps the Datadog/Grafana axis legible.
67+
const { reader, provider } = setupTestMetrics();
68+
69+
getMcpSseConnectionDuration().record(599.9, {
70+
path: '/mcp',
71+
status: '200',
72+
close_reason: 'completed',
73+
});
74+
75+
const result = await reader.collect();
76+
const metric = result.resourceMetrics.scopeMetrics[0]?.metrics.find(
77+
(m) => m.descriptor.name === 'mcp.sse.connection.duration',
78+
);
79+
expect(metric).toBeDefined();
80+
expect(metric?.descriptor.unit).toBe('s');
81+
82+
await provider.shutdown();
83+
});
6084
});
6185

6286
describe('MCP tool metrics', () => {

src/telemetry/metrics.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ const METER_NAME = 'freee-mcp';
66
// HTTP server metrics
77
let _httpRequestDuration: Histogram | null = null;
88
let _httpRequestErrorCount: Counter | null = null;
9+
let _mcpSseConnectionDuration: Histogram | null = null;
910

1011
// MCP tool metrics
1112
let _toolInvocationDuration: Histogram | null = null;
1213
let _toolErrorCount: Counter | null = null;
1314

1415
/**
1516
* HTTP request duration histogram (seconds).
16-
* Labels: method, path, status
17+
* Labels: method, path, status, transport, close_reason
1718
*/
1819
export function getHttpRequestDuration(): Histogram {
1920
if (!_httpRequestDuration) {
@@ -25,6 +26,28 @@ export function getHttpRequestDuration(): Histogram {
2526
return _httpRequestDuration;
2627
}
2728

29+
/**
30+
* SSE (Streamable-HTTP) connection lifetime histogram (seconds).
31+
*
32+
* Recorded only for `transport=sse` requests at connection close. The expected
33+
* distribution clusters near the route's `streamIdleTimeout` (e.g. ~600s on
34+
* Istio's default HTTPRoute), so the p99 reveals whether SSE clients are
35+
* disconnecting early or running until the platform max — useful when
36+
* triaging `envoy.cluster.upstream_rq_timeout` alerts which alone cannot
37+
* distinguish "long but normal SSE" from "slow JSON-RPC".
38+
*
39+
* Labels: path, status, close_reason
40+
*/
41+
export function getMcpSseConnectionDuration(): Histogram {
42+
if (!_mcpSseConnectionDuration) {
43+
_mcpSseConnectionDuration = metrics.getMeter(METER_NAME).createHistogram('mcp.sse.connection.duration', {
44+
description: 'SSE (Streamable-HTTP) connection lifetime',
45+
unit: 's',
46+
});
47+
}
48+
return _mcpSseConnectionDuration;
49+
}
50+
2851
/**
2952
* HTTP server error counter (5xx responses).
3053
* Labels: method, path, status

0 commit comments

Comments
 (0)