Skip to content

Commit dba0f6a

Browse files
committed
lib: add diagnostic channels to http2
1 parent 30f54e6 commit dba0f6a

File tree

2 files changed

+210
-1
lines changed

2 files changed

+210
-1
lines changed

lib/internal/http2/core.js

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,18 @@ const { UV_EOF } = internalBinding('uv');
187187

188188
const { StreamPipe } = internalBinding('stream_pipe');
189189
const { _connectionListener: httpConnectionListener } = http;
190+
191+
const dc = require('diagnostics_channel');
192+
const onClientStreamCreatedChannel = dc.channel('http2.client.stream.created');
193+
const onClientStreamStartChannel = dc.channel('http2.client.stream.start');
194+
const onClientStreamErrorChannel = dc.channel('http2.client.stream.error');
195+
const onClientStreamFinishChannel = dc.channel('http2.client.stream.finish');
196+
const onClientStreamCloseChannel = dc.channel('http2.client.stream.close');
197+
const onServerStreamStartChannel = dc.channel('http2.server.stream.start');
198+
const onServerStreamErrorChannel = dc.channel('http2.server.stream.error');
199+
const onServerStreamFinishChannel = dc.channel('http2.server.stream.finish');
200+
const onServerStreamCloseChannel = dc.channel('http2.server.stream.close');
201+
190202
let debug = require('internal/util/debuglog').debuglog('http2', (fn) => {
191203
debug = fn;
192204
});
@@ -375,9 +387,23 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
375387
stream.end();
376388
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
377389
}
390+
391+
if (onServerStreamStartChannel.hasSubscribers) {
392+
onServerStreamStartChannel.publish({
393+
stream,
394+
headers: obj,
395+
});
396+
}
378397
} else {
379398
// eslint-disable-next-line no-use-before-define
380399
stream = new ClientHttp2Stream(session, handle, id, {});
400+
if (onClientStreamCreatedChannel.hasSubscribers) {
401+
onClientStreamCreatedChannel.publish({
402+
stream,
403+
headers: obj,
404+
});
405+
}
406+
381407
if (endOfStream) {
382408
stream.push(null);
383409
}
@@ -416,6 +442,16 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
416442
reqAsync.runInAsyncScope(process.nextTick, null, emit, stream, event, obj, flags, headers);
417443
else
418444
process.nextTick(emit, stream, event, obj, flags, headers);
445+
446+
if (event === 'response') {
447+
if (onClientStreamFinishChannel.hasSubscribers) {
448+
onClientStreamFinishChannel.publish({
449+
stream,
450+
headers: obj,
451+
flags,
452+
});
453+
}
454+
}
419455
}
420456
if (endOfStream) {
421457
stream.push(null);
@@ -766,7 +802,14 @@ function requestOnConnect(headers, options) {
766802
}
767803
return;
768804
}
805+
769806
this[kInit](ret.id(), ret);
807+
if (onClientStreamStartChannel.hasSubscribers) {
808+
onClientStreamStartChannel.publish({
809+
stream: this,
810+
headers: this[kSentHeaders],
811+
});
812+
}
770813
}
771814

772815
// Validates that priority options are correct, specifically:
@@ -1851,6 +1894,14 @@ class ClientHttp2Session extends Http2Session {
18511894
} else {
18521895
onConnect();
18531896
}
1897+
1898+
if (onClientStreamCreatedChannel.hasSubscribers) {
1899+
onClientStreamCreatedChannel.publish({
1900+
stream,
1901+
headers,
1902+
});
1903+
}
1904+
18541905
return stream;
18551906
}
18561907
}
@@ -1925,6 +1976,7 @@ const kSubmitRstStream = 1;
19251976
const kForceRstStream = 2;
19261977

19271978
function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
1979+
const type = stream.session[kType];
19281980
const state = stream[kState];
19291981
state.flags |= STREAM_FLAGS_CLOSED;
19301982
state.rstCode = code;
@@ -1955,6 +2007,20 @@ function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
19552007
else
19562008
stream.once('finish', finishFn);
19572009
}
2010+
2011+
if (type === NGHTTP2_SESSION_CLIENT) {
2012+
if (onClientStreamCloseChannel.hasSubscribers) {
2013+
onClientStreamCloseChannel.publish({
2014+
stream,
2015+
code,
2016+
});
2017+
}
2018+
} else if (onServerStreamCloseChannel.hasSubscribers) {
2019+
onServerStreamCloseChannel.publish({
2020+
stream,
2021+
code,
2022+
});
2023+
}
19582024
}
19592025

19602026
function finishCloseStream(code) {
@@ -2381,6 +2447,21 @@ class Http2Stream extends Duplex {
23812447
setImmediate(() => {
23822448
session[kMaybeDestroy]();
23832449
});
2450+
if (err) {
2451+
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
2452+
if (onClientStreamErrorChannel.hasSubscribers) {
2453+
onClientStreamErrorChannel.publish({
2454+
stream: this,
2455+
error: err,
2456+
});
2457+
}
2458+
} else if (onServerStreamErrorChannel.hasSubscribers) {
2459+
onServerStreamErrorChannel.publish({
2460+
stream: this,
2461+
error: err,
2462+
});
2463+
}
2464+
}
23842465
callback(err);
23852466
}
23862467
// The Http2Stream can be destroyed if it has closed and if the readable
@@ -2766,6 +2847,13 @@ class ServerHttp2Stream extends Http2Stream {
27662847
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
27672848

27682849
process.nextTick(callback, null, stream, headers, 0);
2850+
2851+
if (onServerStreamStartChannel.hasSubscribers) {
2852+
onServerStreamStartChannel.publish({
2853+
stream,
2854+
headers,
2855+
});
2856+
}
27692857
}
27702858

27712859
// Initiate a response on this Http2Stream
@@ -2813,8 +2901,14 @@ class ServerHttp2Stream extends Http2Stream {
28132901
}
28142902

28152903
const ret = this[kHandle].respond(headersList, streamOptions);
2816-
if (ret < 0)
2904+
if (ret < 0) {
28172905
this.destroy(new NghttpError(ret));
2906+
} else if (onServerStreamFinishChannel.hasSubscribers) {
2907+
onServerStreamFinishChannel.publish({
2908+
stream: this,
2909+
headers,
2910+
});
2911+
}
28182912
}
28192913

28202914
// Initiate a response using an open FD. Note that there are fewer
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Flags: --expose-internals
2+
'use strict';
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const assert = require('assert');
7+
const http2 = require('http2');
8+
const { Http2Session, Http2Stream } = require('internal/http2/core');
9+
const dc = require('diagnostics_channel');
10+
11+
const isClientHttp2Stream = (stream) => {
12+
return stream instanceof Http2Stream &&
13+
stream.session instanceof Http2Session &&
14+
stream.session.type === http2.constants.NGHTTP2_SESSION_CLIENT;
15+
};
16+
17+
const isServerHttp2Stream = (stream) => {
18+
return stream instanceof Http2Stream &&
19+
stream.session instanceof Http2Session &&
20+
stream.session.type === http2.constants.NGHTTP2_SESSION_SERVER;
21+
};
22+
23+
const isError = (error) => error instanceof Error;
24+
25+
const isValidHeaders = (headers) => {
26+
return headers && !Array.isArray(headers) && typeof headers === 'object';
27+
};
28+
29+
dc.subscribe('http2.client.stream.created', common.mustCall(({ stream, headers }) => {
30+
assert.strictEqual(isClientHttp2Stream(stream), true);
31+
assert.strictEqual(isValidHeaders(headers), true);
32+
}, 2));
33+
34+
dc.subscribe('http2.client.stream.start', common.mustCall(({ stream, headers }) => {
35+
assert.strictEqual(isClientHttp2Stream(stream), true);
36+
assert.strictEqual(isValidHeaders(headers), true);
37+
}, 2));
38+
39+
dc.subscribe('http2.client.stream.error', common.mustCall(({ stream, error }) => {
40+
assert.strictEqual(isClientHttp2Stream(stream), true);
41+
assert.strictEqual(isError(error), true);
42+
}));
43+
44+
dc.subscribe('http2.client.stream.finish', common.mustCall(({ stream, headers }) => {
45+
assert.strictEqual(isClientHttp2Stream(stream), true);
46+
assert.strictEqual(isValidHeaders(headers), true);
47+
}));
48+
49+
dc.subscribe('http2.client.stream.close', common.mustCall(({ stream, code }) => {
50+
assert.strictEqual(isClientHttp2Stream(stream), true);
51+
assert.strictEqual(Number.isInteger(code), true);
52+
}, 2));
53+
54+
dc.subscribe('http2.server.stream.start', common.mustCall(({ stream, headers }) => {
55+
assert.strictEqual(isServerHttp2Stream(stream), true);
56+
assert.strictEqual(isValidHeaders(headers), true);
57+
}, 2));
58+
59+
dc.subscribe('http2.server.stream.error', common.mustCall(({ stream, error }) => {
60+
assert.strictEqual(isServerHttp2Stream(stream), true);
61+
assert.strictEqual(isError(error), true);
62+
}));
63+
64+
dc.subscribe('http2.server.stream.finish', common.mustCall(({ stream, headers }) => {
65+
assert.strictEqual(isServerHttp2Stream(stream), true);
66+
assert.strictEqual(isValidHeaders(headers), true);
67+
}));
68+
69+
dc.subscribe('http2.server.stream.close', common.mustCall(({ stream, code }) => {
70+
assert.strictEqual(isServerHttp2Stream(stream), true);
71+
assert.strictEqual(Number.isInteger(code), true);
72+
}, 2));
73+
74+
let destroy = false;
75+
const server = http2.createServer();
76+
server.on('stream', common.mustCall((stream, headers, flags) => {
77+
if (destroy) {
78+
stream.on('error', common.mustCall());
79+
stream.session.destroy(new Error('destroyed'));
80+
} else {
81+
stream.respond({ 'content-type': 'text/html' });
82+
stream.end('test');
83+
}
84+
}, 2));
85+
86+
87+
server.listen(0, common.mustCall(() => {
88+
const port = server.address().port;
89+
const client = http2.connect(`http://localhost:${port}`);
90+
91+
const req = client.request();
92+
93+
req.on('response', common.mustCall((headers) => {
94+
assert.strictEqual(headers[':status'], 200);
95+
assert.strictEqual(headers['content-type'], 'text/html');
96+
}));
97+
98+
let data = '';
99+
100+
req.setEncoding('utf8');
101+
req.on('data', common.mustCallAtLeast((d) => data += d));
102+
req.on('end', common.mustCall(() => {
103+
destroy = true;
104+
client.on('close', common.mustCall(() => {
105+
server.close();
106+
}));
107+
client.on('error', common.mustCall((err) => {
108+
assert.strictEqual(err.message, 'Session closed with error code 2');
109+
}));
110+
111+
client.request().on('error', common.mustCall((err) => {
112+
assert.strictEqual(err.message, 'Session closed with error code 2');
113+
}));
114+
}));
115+
}));

0 commit comments

Comments
 (0)