Skip to content

Commit 4eb8704

Browse files
committed
lib: add tracing channels for http2 streams
PR-URL: #266 Reviewed-By: Juan José Arboleda <[email protected]> Reviewed-By: Rafael Gonzaga <[email protected]>
1 parent be38d84 commit 4eb8704

File tree

1 file changed

+140
-58
lines changed

1 file changed

+140
-58
lines changed

lib/internal/http2/core.js

Lines changed: 140 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ const onServerStreamErrorChannel = dc.channel('http2.server.stream.error');
198198
const onServerStreamFinishChannel = dc.channel('http2.server.stream.finish');
199199
const onServerStreamCloseChannel = dc.channel('http2.server.stream.close');
200200

201+
const clientTracingChannels = dc.tracingChannel('http2.client');
202+
const serverTracingChannels = dc.tracingChannel('http2.server');
203+
201204
let debug = require('internal/util/debuglog').debuglog('http2', (fn) => {
202205
debug = fn;
203206
});
@@ -259,6 +262,7 @@ const kSentHeaders = Symbol('sent-headers');
259262
const kSentTrailers = Symbol('sent-trailers');
260263
const kServer = Symbol('server');
261264
const kState = Symbol('state');
265+
const kTracingContext = Symbol('tracing-context');
262266
const kType = Symbol('type');
263267
const kWriteGeneric = Symbol('write-generic');
264268

@@ -342,6 +346,31 @@ function emit(self, ...args) {
342346
ReflectApply(self.emit, self, args);
343347
}
344348

349+
function createServerStream(session, handle, id, options, headers, endOfStream) {
350+
// eslint-disable-next-line no-use-before-define
351+
const stream = new ServerHttp2Stream(session, handle, id, options, headers);
352+
if (endOfStream) {
353+
stream.push(null);
354+
stream[kState].endAfterHeaders = true;
355+
}
356+
357+
if (headers[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
358+
// For head requests, there must not be a body...
359+
// end the writable side immediately.
360+
stream.end();
361+
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
362+
}
363+
364+
if (onServerStreamStartChannel.hasSubscribers) {
365+
onServerStreamStartChannel.publish({
366+
stream,
367+
headers,
368+
});
369+
}
370+
371+
return stream;
372+
}
373+
345374
// Called when a new block of headers has been received for a given
346375
// stream. The stream may or may not be new. If the stream is new,
347376
// create the associated Http2Stream instance and emit the 'stream'
@@ -374,23 +403,24 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
374403
}
375404
// session[kType] can be only one of two possible values
376405
if (type === NGHTTP2_SESSION_SERVER) {
377-
// eslint-disable-next-line no-use-before-define
378-
stream = new ServerHttp2Stream(session, handle, id, {}, obj);
379-
if (endOfStream) {
380-
stream.push(null);
381-
}
382-
if (obj[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
383-
// For head requests, there must not be a body...
384-
// end the writable side immediately.
385-
stream.end();
386-
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
387-
}
388-
389-
if (onServerStreamStartChannel.hasSubscribers) {
390-
onServerStreamStartChannel.publish({
391-
stream,
392-
headers: obj,
406+
if (serverTracingChannels.start.hasSubscribers) {
407+
const context = { headers: obj, flags };
408+
serverTracingChannels.start.runStores(context, () => {
409+
try {
410+
stream = createServerStream(session, handle, id, {}, obj);
411+
stream[kTracingContext] = context;
412+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
413+
} catch (e) {
414+
context.error = e;
415+
serverTracingChannels.error.publish(context);
416+
throw e;
417+
} finally {
418+
serverTracingChannels.end.publish(context);
419+
}
393420
});
421+
} else {
422+
stream = createServerStream(session, handle, id, {}, obj, endOfStream);
423+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
394424
}
395425
} else {
396426
// eslint-disable-next-line no-use-before-define
@@ -406,10 +436,10 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
406436
stream.push(null);
407437
}
408438
stream.end();
439+
if (endOfStream)
440+
stream[kState].endAfterHeaders = true;
441+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
409442
}
410-
if (endOfStream)
411-
stream[kState].endAfterHeaders = true;
412-
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
413443
} else {
414444
let event;
415445
const status = obj[HTTP2_HEADER_STATUS];
@@ -445,9 +475,10 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
445475
});
446476
}
447477
}
448-
}
449-
if (endOfStream) {
450-
stream.push(null);
478+
479+
if (endOfStream) {
480+
stream.push(null);
481+
}
451482
}
452483
}
453484

@@ -1779,7 +1810,31 @@ class ClientHttp2Session extends Http2Session {
17791810
// associated Http2Stream instance.
17801811
request(headers, options) {
17811812
debugSessionObj(this, 'initiating request');
1813+
headers = ObjectAssign({ __proto__: null }, headers);
1814+
options = { ...options };
1815+
if (!clientTracingChannels.start.hasSubscribers) {
1816+
this._requestPrepare(headers, options);
1817+
return this._requestFinalize(headers, options);
1818+
}
1819+
1820+
this._requestPrepare(headers, options);
1821+
const context = { headers, options };
1822+
return clientTracingChannels.start.runStores(context, () => {
1823+
try {
1824+
const stream = this._requestFinalize(context.headers, context.options);
1825+
stream[kTracingContext] = context;
1826+
return stream;
1827+
} catch (e) {
1828+
context.error = e;
1829+
clientTracingChannels.error.publish(context);
1830+
throw e;
1831+
} finally {
1832+
clientTracingChannels.end.publish(context);
1833+
}
1834+
});
1835+
}
17821836

1837+
_requestPrepare(headers, options) {
17831838
if (this.destroyed)
17841839
throw new ERR_HTTP2_INVALID_SESSION();
17851840

@@ -1802,9 +1857,6 @@ class ClientHttp2Session extends Http2Session {
18021857
assertIsObject(headers, 'headers');
18031858
assertIsObject(options, 'options');
18041859

1805-
headers = ObjectAssign(ObjectCreate(null), headers);
1806-
options = { ...options };
1807-
18081860
if (headers[HTTP2_HEADER_METHOD] === undefined)
18091861
headers[HTTP2_HEADER_METHOD] = HTTP2_METHOD_GET;
18101862

@@ -1837,7 +1889,9 @@ class ClientHttp2Session extends Http2Session {
18371889
} else if (typeof options.endStream !== 'boolean') {
18381890
throw new ERR_INVALID_ARG_VALUE('options.endStream', options.endStream);
18391891
}
1892+
}
18401893

1894+
_requestFinalize(headers, options) {
18411895
const headersList = mapToHeaders(headers);
18421896

18431897
// eslint-disable-next-line no-use-before-define
@@ -1853,6 +1907,13 @@ class ClientHttp2Session extends Http2Session {
18531907
if (options.waitForTrailers)
18541908
stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
18551909

1910+
if (onClientStreamCreatedChannel.hasSubscribers) {
1911+
onClientStreamCreatedChannel.publish({
1912+
stream,
1913+
headers,
1914+
});
1915+
}
1916+
18561917
const { signal } = options;
18571918
if (signal) {
18581919
validateAbortSignal(signal, 'options.signal');
@@ -1883,17 +1944,11 @@ class ClientHttp2Session extends Http2Session {
18831944
onConnect();
18841945
}
18851946

1886-
if (onClientStreamCreatedChannel.hasSubscribers) {
1887-
onClientStreamCreatedChannel.publish({
1888-
stream,
1889-
headers,
1890-
});
1891-
}
1892-
18931947
return stream;
18941948
}
18951949
}
18961950

1951+
18971952
function trackWriteState(stream, bytes) {
18981953
const session = stream[kSession];
18991954
stream[kState].writeQueueSize += bytes;
@@ -1996,18 +2051,31 @@ function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
19962051
stream.once('finish', finishFn);
19972052
}
19982053

2054+
const context = stream[kTracingContext];
19992055
if (type === NGHTTP2_SESSION_CLIENT) {
20002056
if (onClientStreamCloseChannel.hasSubscribers) {
20012057
onClientStreamCloseChannel.publish({
20022058
stream,
20032059
code,
20042060
});
20052061
}
2006-
} else if (onServerStreamCloseChannel.hasSubscribers) {
2007-
onServerStreamCloseChannel.publish({
2008-
stream,
2009-
code,
2010-
});
2062+
2063+
if (context) {
2064+
context.result = code;
2065+
clientTracingChannels.asyncEnd.publish(context);
2066+
}
2067+
} else {
2068+
if (onServerStreamCloseChannel.hasSubscribers) {
2069+
onServerStreamCloseChannel.publish({
2070+
stream,
2071+
code,
2072+
});
2073+
}
2074+
2075+
if (context) {
2076+
context.result = code;
2077+
serverTracingChannels.asyncEnd.publish(context);
2078+
}
20112079
}
20122080
}
20132081

@@ -2404,6 +2472,41 @@ class Http2Stream extends Duplex {
24042472
}
24052473
const hasHandle = handle !== undefined;
24062474

2475+
// RST code 8 not emitted as an error as its used by clients to signify
2476+
// abort and is already covered by aborted event, also allows more
2477+
// seamless compatibility with http1
2478+
if (err == null && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL)
2479+
err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);
2480+
2481+
if (err) {
2482+
const context = this[kTracingContext];
2483+
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
2484+
if (onClientStreamErrorChannel.hasSubscribers) {
2485+
onClientStreamErrorChannel.publish({
2486+
stream: this,
2487+
error: err,
2488+
});
2489+
}
2490+
2491+
if (context) {
2492+
context.error = err;
2493+
clientTracingChannels.error.publish(context);
2494+
}
2495+
} else {
2496+
if (onServerStreamErrorChannel.hasSubscribers) {
2497+
onServerStreamErrorChannel.publish({
2498+
stream: this,
2499+
error: err,
2500+
});
2501+
}
2502+
2503+
if (context) {
2504+
context.error = err;
2505+
serverTracingChannels.error.publish(context);
2506+
}
2507+
}
2508+
}
2509+
24072510
if (!this.closed)
24082511
closeStream(this, code, hasHandle ? kForceRstStream : kNoRstStream);
24092512
this.push(null);
@@ -2419,12 +2522,6 @@ class Http2Stream extends Duplex {
24192522
sessionState.writeQueueSize -= state.writeQueueSize;
24202523
state.writeQueueSize = 0;
24212524

2422-
// RST code 8 not emitted as an error as its used by clients to signify
2423-
// abort and is already covered by aborted event, also allows more
2424-
// seamless compatibility with http1
2425-
if (err == null && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL)
2426-
err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);
2427-
24282525
this[kSession] = undefined;
24292526
this[kHandle] = undefined;
24302527

@@ -2433,21 +2530,6 @@ class Http2Stream extends Duplex {
24332530
// will destroy if it has been closed and there are no other open or
24342531
// pending streams.
24352532
session[kMaybeDestroy]();
2436-
if (err) {
2437-
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
2438-
if (onClientStreamErrorChannel.hasSubscribers) {
2439-
onClientStreamErrorChannel.publish({
2440-
stream: this,
2441-
error: err,
2442-
});
2443-
}
2444-
} else if (onServerStreamErrorChannel.hasSubscribers) {
2445-
onServerStreamErrorChannel.publish({
2446-
stream: this,
2447-
error: err,
2448-
});
2449-
}
2450-
}
24512533
callback(err);
24522534
}
24532535
// The Http2Stream can be destroyed if it has closed and if the readable

0 commit comments

Comments
 (0)