Skip to content

Commit fd26e4f

Browse files
committed
feat: add @grpc/grpc-js driver instrumentation
Patches all four call types on Client.prototype: - makeUnaryRequest / makeClientStreamRequest: wrap callback, capture duration - makeServerStreamRequest / makeBidiStreamRequest: listen for stream status/error events; a published flag prevents double-publishing when both events fire for the same call RPC method path (e.g. /pkg.Service/Method) is used as the query key, making gRPC calls visible in slow-query logs, cache monitor, and OTLP exports. Silently skipped when @grpc/grpc-js is not installed (consistent with other drivers). 17 drivers total (up from 16). 17 new tests: 12 behavior + 5 coverage. README, CHANGELOG, and Project Structure updated.
1 parent f8786e0 commit fd26e4f

6 files changed

Lines changed: 670 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
---
99

10+
## [Unreleased]
11+
12+
### Added
13+
- **gRPC instrumentation**`@grpc/grpc-js` driver patch (`drivers/grpc.ts`).
14+
Auto-patches all four call types on `Client.prototype`:
15+
- **Unary** (`makeUnaryRequest`) and **client-streaming** (`makeClientStreamRequest`) — wraps the callback to capture wall-clock duration and forward errors to `diagnostics_channel`.
16+
- **Server-streaming** (`makeServerStreamRequest`) and **bidi-streaming** (`makeBidiStreamRequest`) — listens for the stream's `status` (completion) and `error` events; a `published` flag prevents double-publishing when both events fire for the same call.
17+
The RPC method path (e.g. `/package.Service/Method`) is used as the query key, making gRPC calls visible in slow-query logs, the cache monitor, and OTLP exports.
18+
17 drivers total (up from 16). Wired into `applyDriverPatches()` under a new `// RPC` section; silently skipped when `@grpc/grpc-js` is not installed.
19+
20+
---
21+
1022
## [0.3.2] — 2026-04-29
1123

1224
### Changed

README.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ The agent has two distinct usage modes with different Node.js requirements:
7979

8080
| Feature | Minimum Node | Behaviour on older versions |
8181
|---|---|---|
82-
| DB query tracing (all 16 drivers) | 14.18.0 | Full support — we control both publisher and subscriber |
82+
| DB query tracing (all 17 drivers) | 14.18.0 | Full support — we control both publisher and subscriber |
8383
| HTTP outbound tracing | 18.0.0 | Automatic via `diagnostics_channel`; on Node 14–17 the agent falls back to monkey-patching `http.request` / `https.request` automatically |
8484
| Module load timing (`slow-require`) | 20.0.0 | Silent no-op on Node < 20 (channels absent) |
8585
| Stream leak auto-detection | 22.0.0 | Falls back to manual `track()` calls on Node < 22 |
@@ -126,7 +126,7 @@ import { ArgusAgent } from 'argus-apm';
126126
git clone https://github.com/sharon77242/Argus.git
127127
pnpm install
128128

129-
# Run all 373 tests (uses --experimental-strip-types, requires Node 22.6+)
129+
# Run all 618 tests (uses --experimental-strip-types, requires Node 22.6+)
130130
pnpm test
131131

132132
# Build both ESM and CJS outputs
@@ -296,7 +296,7 @@ import fs from 'node:fs';
296296
const agent = await ArgusAgent.create()
297297
.withSourceMaps('./dist') // Source-map resolution for stack traces
298298
.withRuntimeMonitor({ eventLoopThresholdMs: 50 }) // Event loop lag + memory leak detection
299-
.withInstrumentation({ autoPatching: true }) // 16 DB drivers via diagnostics_channel
299+
.withInstrumentation({ autoPatching: true }) // 17 DB/RPC drivers via diagnostics_channel
300300
.withHttpTracing() // Slow request & insecure HTTP detection
301301
.withLogTracing({ scrubContext: true }) // Strip secrets from console overrides
302302
.withFsTracing() // ⚠ DEV ONLY — sync FS blocker detection
@@ -582,7 +582,7 @@ All thresholds can be overridden without code changes, making the agent CI/CD an
582582
| `.withCrashGuard()` | ✅ Yes | Very Low | Intercepts `uncaughtException`; emits event for `unhandledRejection` |
583583
| `.withResourceLeakMonitor(opts?)` | ✅ Yes | Low | Tracks OS handles; rate-limited by `alertCooldownMs` |
584584
| `.withGracefulShutdown(opts?)` | ✅ Yes | Very Low | Registers SIGTERM/SIGINT; awaits `agent.stop()` before `process.exit` |
585-
| `.withInstrumentation(opts?)` | ✅ Yes | Low | DB/IO tracing via `diagnostics_channel` (16 drivers) |
585+
| `.withInstrumentation(opts?)` | ✅ Yes | Low | DB/IO tracing via `diagnostics_channel` (17 drivers) |
586586
| `.withHttpTracing()` | ✅ Yes | Low | HTTP request inspection & slow-request detection |
587587
| `.withLogTracing(opts?)` | ✅ Yes | Low | `console.*` override with entropy-scrubbed payloads |
588588
| `.withFsTracing()` |**No** | High | Patches `fs`. Detects `*Sync` blockers; escalates to `sync-in-hot-path` (critical) when called inside a live request. **DEV ONLY.** |
@@ -640,7 +640,7 @@ Telemetry is exported over **mTLS** (Mutual TLS) — both client and server cert
640640
│ Profiling │ Instrumentation │ Analysis │
641641
│ ──────────────── │ ───────────────────── │ ───────────────── │
642642
│ RuntimeMonitor │ InstrumentationEngine │ QueryAnalyzer │
643-
│ CrashGuard │ 16 DB Drivers │ SlowQueryMonitor │
643+
│ CrashGuard │ 17 DB/RPC Drivers │ SlowQueryMonitor │
644644
│ ResourceLeakMon │ HttpInstrumentation │ TransactionMonitor │
645645
│ GcMonitor │ FsInstrumentation │ CacheMonitor │
646646
│ PoolMonitor │ LoggerInstrumentation │ CircuitBreaker │
@@ -711,6 +711,7 @@ packages/agent/
711711
bigquery.ts → @google-cloud/bigquery
712712
neo4j.ts → neo4j-driver
713713
clickhouse.ts → @clickhouse/client
714+
grpc.ts → @grpc/grpc-js (unary + streaming RPCs)
714715
715716
licensing/
716717
public-key.ts → Bundled ECDSA public keys (keyed by kid)
@@ -746,7 +747,7 @@ packages/agent/
746747
exporter.ts → OTLP JSON formatter + mTLS transport
747748
otlp-compatible-exporter.ts → Simplified OTLP exporter (API key, no mTLS)
748749
749-
tests/ → Mirrors src/ structure (508 tests, 86 suites)
750+
tests/ → Mirrors src/ structure (618 tests, 121 suites)
750751
```
751752

752753
---
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import { performance } from "node:perf_hooks";
2+
import { nodeRequire } from "./_require.ts";
3+
import {
4+
isAlreadyPatched,
5+
PATCHED_SYMBOL,
6+
activePatches,
7+
AUTO_PATCH_CHANNEL,
8+
} from "./patch-utils.ts";
9+
import type { PatchedQueryMessage } from "./patch-utils.ts";
10+
import { safeChannel } from "../safe-channel.ts";
11+
12+
const DRIVER = "@grpc/grpc-js";
13+
14+
type AnyProto = Record<string, any>;
15+
16+
// Wraps callback-bearing gRPC calls: makeUnaryRequest and makeClientStreamRequest.
17+
// args[0] is always the full RPC method path, e.g. /package.Service/Method.
18+
// The callback (err, response) is always the last function argument.
19+
function wrapCallbackMethod(proto: AnyProto, methodName: string): void {
20+
if (isAlreadyPatched(proto, methodName)) return;
21+
const original = proto[methodName] as (...args: unknown[]) => unknown;
22+
const channel = safeChannel(AUTO_PATCH_CHANNEL);
23+
24+
const wrapped = function (this: unknown, ...args: unknown[]) {
25+
const start = performance.now();
26+
const query = typeof args[0] === "string" ? args[0] : String(args[0]);
27+
28+
const lastIdx = args.length - 1;
29+
if (lastIdx >= 0 && typeof args[lastIdx] === "function") {
30+
const originalCb = args[lastIdx] as (...a: unknown[]) => unknown;
31+
args[lastIdx] = function (this: unknown, err: unknown, ...rest: unknown[]) {
32+
channel.publish({
33+
query,
34+
durationMs: performance.now() - start,
35+
driver: DRIVER,
36+
error: err ?? undefined,
37+
} satisfies PatchedQueryMessage);
38+
return originalCb.call(this, err, ...rest);
39+
};
40+
}
41+
42+
return original.apply(this, args);
43+
};
44+
45+
(wrapped as unknown as Record<symbol, unknown>)[PATCHED_SYMBOL] = true;
46+
proto[methodName] = wrapped;
47+
activePatches.push({ target: proto, methodName, original });
48+
}
49+
50+
// Wraps stream-returning gRPC calls: makeServerStreamRequest and makeBidiStreamRequest.
51+
// Timing covers from call initiation to the stream's 'status' (success) or 'error' event.
52+
// The published flag prevents double-publishing when both events fire for a single call.
53+
function wrapStreamMethod(proto: AnyProto, methodName: string): void {
54+
if (isAlreadyPatched(proto, methodName)) return;
55+
const original = proto[methodName] as (...args: unknown[]) => unknown;
56+
const channel = safeChannel(AUTO_PATCH_CHANNEL);
57+
58+
const wrapped = function (this: unknown, ...args: unknown[]) {
59+
const start = performance.now();
60+
const query = typeof args[0] === "string" ? args[0] : String(args[0]);
61+
62+
const stream = original.apply(this, args);
63+
64+
if (stream != null && typeof stream === "object") {
65+
const s = stream as Record<string, unknown>;
66+
if (typeof s.once === "function") {
67+
let published = false;
68+
const publish = (err?: unknown) => {
69+
if (published) return;
70+
published = true;
71+
channel.publish({
72+
query,
73+
durationMs: performance.now() - start,
74+
driver: DRIVER,
75+
error: err,
76+
} satisfies PatchedQueryMessage);
77+
};
78+
(s.once as (e: string, fn: (...a: unknown[]) => void) => void)("status", () => publish());
79+
(s.once as (e: string, fn: (err: unknown) => void) => void)("error", (err) => publish(err));
80+
}
81+
}
82+
83+
return stream;
84+
};
85+
86+
(wrapped as unknown as Record<symbol, unknown>)[PATCHED_SYMBOL] = true;
87+
proto[methodName] = wrapped;
88+
activePatches.push({ target: proto, methodName, original });
89+
}
90+
91+
export function patchGrpc(): boolean {
92+
try {
93+
const grpc = nodeRequire("@grpc/grpc-js");
94+
const clientProto: AnyProto | undefined = grpc.Client?.prototype;
95+
if (!clientProto) return false;
96+
97+
let patched = false;
98+
99+
if (clientProto.makeUnaryRequest && !isAlreadyPatched(clientProto, "makeUnaryRequest")) {
100+
wrapCallbackMethod(clientProto, "makeUnaryRequest");
101+
patched = true;
102+
}
103+
104+
if (
105+
clientProto.makeClientStreamRequest &&
106+
!isAlreadyPatched(clientProto, "makeClientStreamRequest")
107+
) {
108+
wrapCallbackMethod(clientProto, "makeClientStreamRequest");
109+
patched = true;
110+
}
111+
112+
if (
113+
clientProto.makeServerStreamRequest &&
114+
!isAlreadyPatched(clientProto, "makeServerStreamRequest")
115+
) {
116+
wrapStreamMethod(clientProto, "makeServerStreamRequest");
117+
patched = true;
118+
}
119+
120+
if (
121+
clientProto.makeBidiStreamRequest &&
122+
!isAlreadyPatched(clientProto, "makeBidiStreamRequest")
123+
) {
124+
wrapStreamMethod(clientProto, "makeBidiStreamRequest");
125+
patched = true;
126+
}
127+
128+
return patched;
129+
} catch {
130+
/* not installed */
131+
}
132+
return false;
133+
}

packages/agent/src/instrumentation/drivers/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { patchFirestore } from "./firestore.ts";
1313
import { patchCassandra } from "./cassandra.ts";
1414
import { patchNeo4j } from "./neo4j.ts";
1515
import { patchClickhouse } from "./clickhouse.ts";
16+
import { patchGrpc } from "./grpc.ts";
1617

1718
// Re-export shared utilities for external consumers
1819
export { AUTO_PATCH_CHANNEL, patchMethod } from "./patch-utils.ts";
@@ -30,6 +31,7 @@ export type { PatchedQueryMessage } from "./patch-utils.ts";
3031
* - Cloud: @google-cloud/bigquery
3132
* - Graph: neo4j-driver
3233
* - Analytics: @clickhouse/client
34+
* - RPC: @grpc/grpc-js
3335
*/
3436
export function applyDriverPatches(): string[] {
3537
const patched: string[] = [];
@@ -57,6 +59,9 @@ export function applyDriverPatches(): string[] {
5759
if (patchNeo4j()) patched.push("neo4j-driver");
5860
if (patchClickhouse()) patched.push("@clickhouse/client");
5961

62+
// RPC
63+
if (patchGrpc()) patched.push("@grpc/grpc-js");
64+
6065
return patched;
6166
}
6267

packages/agent/tests/instrumentation/drivers-all-coverage.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,4 +674,74 @@ describe("Driver Patch Modules (coverage)", () => {
674674
}
675675
});
676676
});
677+
678+
// ── grpc.ts ────────────────────────────────────────────────────
679+
describe("patchGrpc", () => {
680+
it("should patch @grpc/grpc-js Client prototype methods", async () => {
681+
const { patchGrpc } = await import("../../src/instrumentation/drivers/grpc.ts");
682+
const mockProto: Record<string, any> = {
683+
makeUnaryRequest: () => {},
684+
makeClientStreamRequest: () => ({}),
685+
makeServerStreamRequest: () => ({}),
686+
makeBidiStreamRequest: () => ({}),
687+
};
688+
const restore = mockNodeRequire(() => ({ Client: { prototype: mockProto } }));
689+
try {
690+
assert.strictEqual(patchGrpc(), true);
691+
} finally {
692+
restore();
693+
cleanupPatches();
694+
}
695+
});
696+
697+
it("should return false when Client.prototype is missing", async () => {
698+
const { patchGrpc } = await import("../../src/instrumentation/drivers/grpc.ts");
699+
const restore = mockNodeRequire(() => ({ Client: {} }));
700+
try {
701+
assert.strictEqual(patchGrpc(), false);
702+
} finally {
703+
restore();
704+
}
705+
});
706+
707+
it("should return false when Client is absent", async () => {
708+
const { patchGrpc } = await import("../../src/instrumentation/drivers/grpc.ts");
709+
const restore = mockNodeRequire(() => ({}));
710+
try {
711+
assert.strictEqual(patchGrpc(), false);
712+
} finally {
713+
restore();
714+
}
715+
});
716+
717+
it("should return false when not installed", async () => {
718+
const { patchGrpc } = await import("../../src/instrumentation/drivers/grpc.ts");
719+
const restore = mockNodeRequire(() => {
720+
throw new Error("not found");
721+
});
722+
try {
723+
assert.strictEqual(patchGrpc(), false);
724+
} finally {
725+
restore();
726+
}
727+
});
728+
729+
it("should return false when all methods already patched", async () => {
730+
const { patchGrpc } = await import("../../src/instrumentation/drivers/grpc.ts");
731+
const mockProto: Record<string, any> = {
732+
makeUnaryRequest: () => {},
733+
makeClientStreamRequest: () => ({}),
734+
makeServerStreamRequest: () => ({}),
735+
makeBidiStreamRequest: () => ({}),
736+
};
737+
const restore = mockNodeRequire(() => ({ Client: { prototype: mockProto } }));
738+
try {
739+
assert.strictEqual(patchGrpc(), true);
740+
assert.strictEqual(patchGrpc(), false, "second call should return false (already patched)");
741+
} finally {
742+
restore();
743+
cleanupPatches();
744+
}
745+
});
746+
});
677747
});

0 commit comments

Comments
 (0)