Skip to content

Commit 63a9b6f

Browse files
committed
lib: add tracing channels for http2 streams
PR-URL: #266 Reviewed-By: Juan José Arboleda <[email protected]> Reviewed-By: Rafael Gonzaga <[email protected]>
1 parent 65430e7 commit 63a9b6f

File tree

1 file changed

+140
-58
lines changed

1 file changed

+140
-58
lines changed

lib/internal/http2/core.js

Lines changed: 140 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ const onServerStreamErrorChannel = dc.channel('http2.server.stream.error');
199199
const onServerStreamFinishChannel = dc.channel('http2.server.stream.finish');
200200
const onServerStreamCloseChannel = dc.channel('http2.server.stream.close');
201201

202+
const clientTracingChannels = dc.tracingChannel('http2.client');
203+
const serverTracingChannels = dc.tracingChannel('http2.server');
204+
202205
let debug = require('internal/util/debuglog').debuglog('http2', (fn) => {
203206
debug = fn;
204207
});
@@ -261,6 +264,7 @@ const kSentHeaders = Symbol('sent-headers');
261264
const kSentTrailers = Symbol('sent-trailers');
262265
const kServer = Symbol('server');
263266
const kState = Symbol('state');
267+
const kTracingContext = Symbol('tracing-context');
264268
const kType = Symbol('type');
265269
const kWriteGeneric = Symbol('write-generic');
266270

@@ -344,6 +348,31 @@ function emit(self, ...args) {
344348
ReflectApply(self.emit, self, args);
345349
}
346350

351+
function createServerStream(session, handle, id, options, headers, endOfStream) {
352+
// eslint-disable-next-line no-use-before-define
353+
const stream = new ServerHttp2Stream(session, handle, id, options, headers);
354+
if (endOfStream) {
355+
stream.push(null);
356+
stream[kState].endAfterHeaders = true;
357+
}
358+
359+
if (headers[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
360+
// For head requests, there must not be a body...
361+
// end the writable side immediately.
362+
stream.end();
363+
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
364+
}
365+
366+
if (onServerStreamStartChannel.hasSubscribers) {
367+
onServerStreamStartChannel.publish({
368+
stream,
369+
headers,
370+
});
371+
}
372+
373+
return stream;
374+
}
375+
347376
// Called when a new block of headers has been received for a given
348377
// stream. The stream may or may not be new. If the stream is new,
349378
// create the associated Http2Stream instance and emit the 'stream'
@@ -376,23 +405,24 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
376405
}
377406
// session[kType] can be only one of two possible values
378407
if (type === NGHTTP2_SESSION_SERVER) {
379-
// eslint-disable-next-line no-use-before-define
380-
stream = new ServerHttp2Stream(session, handle, id, {}, obj);
381-
if (endOfStream) {
382-
stream.push(null);
383-
}
384-
if (obj[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
385-
// For head requests, there must not be a body...
386-
// end the writable side immediately.
387-
stream.end();
388-
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
389-
}
390-
391-
if (onServerStreamStartChannel.hasSubscribers) {
392-
onServerStreamStartChannel.publish({
393-
stream,
394-
headers: obj,
408+
if (serverTracingChannels.hasSubscribers) {
409+
const context = { headers: obj, flags };
410+
serverTracingChannels.start.runStores(context, () => {
411+
try {
412+
stream = createServerStream(session, handle, id, {}, obj);
413+
stream[kTracingContext] = context;
414+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
415+
} catch (e) {
416+
context.error = e;
417+
serverTracingChannels.error.publish(context);
418+
throw e;
419+
} finally {
420+
serverTracingChannels.end.publish(context);
421+
}
395422
});
423+
} else {
424+
stream = createServerStream(session, handle, id, {}, obj, endOfStream);
425+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
396426
}
397427
} else {
398428
// eslint-disable-next-line no-use-before-define
@@ -408,10 +438,10 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
408438
stream.push(null);
409439
}
410440
stream.end();
441+
if (endOfStream)
442+
stream[kState].endAfterHeaders = true;
443+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
411444
}
412-
if (endOfStream)
413-
stream[kState].endAfterHeaders = true;
414-
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
415445
} else {
416446
let event;
417447
const status = obj[HTTP2_HEADER_STATUS];
@@ -452,9 +482,10 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
452482
});
453483
}
454484
}
455-
}
456-
if (endOfStream) {
457-
stream.push(null);
485+
486+
if (endOfStream) {
487+
stream.push(null);
488+
}
458489
}
459490
}
460491

@@ -1791,7 +1822,31 @@ class ClientHttp2Session extends Http2Session {
17911822
// associated Http2Stream instance.
17921823
request(headers, options) {
17931824
debugSessionObj(this, 'initiating request');
1825+
headers = ObjectAssign({ __proto__: null }, headers);
1826+
options = { ...options };
1827+
if (!clientTracingChannels.hasSubscribers) {
1828+
this._requestPrepare(headers, options);
1829+
return this._requestFinalize(headers, options);
1830+
}
1831+
1832+
this._requestPrepare(headers, options);
1833+
const context = { headers, options };
1834+
return clientTracingChannels.start.runStores(context, () => {
1835+
try {
1836+
const stream = this._requestFinalize(context.headers, context.options);
1837+
stream[kTracingContext] = context;
1838+
return stream;
1839+
} catch (e) {
1840+
context.error = e;
1841+
clientTracingChannels.error.publish(context);
1842+
throw e;
1843+
} finally {
1844+
clientTracingChannels.end.publish(context);
1845+
}
1846+
});
1847+
}
17941848

1849+
_requestPrepare(headers, options) {
17951850
if (this.destroyed)
17961851
throw new ERR_HTTP2_INVALID_SESSION();
17971852

@@ -1814,9 +1869,6 @@ class ClientHttp2Session extends Http2Session {
18141869
assertIsObject(headers, 'headers');
18151870
assertIsObject(options, 'options');
18161871

1817-
headers = ObjectAssign({ __proto__: null }, headers);
1818-
options = { ...options };
1819-
18201872
if (headers[HTTP2_HEADER_METHOD] === undefined)
18211873
headers[HTTP2_HEADER_METHOD] = HTTP2_METHOD_GET;
18221874

@@ -1848,7 +1900,9 @@ class ClientHttp2Session extends Http2Session {
18481900
} else {
18491901
validateBoolean(options.endStream, 'options.endStream');
18501902
}
1903+
}
18511904

1905+
_requestFinalize(headers, options) {
18521906
const headersList = mapToHeaders(headers);
18531907

18541908
// eslint-disable-next-line no-use-before-define
@@ -1866,6 +1920,13 @@ class ClientHttp2Session extends Http2Session {
18661920
if (options.waitForTrailers)
18671921
stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
18681922

1923+
if (onClientStreamCreatedChannel.hasSubscribers) {
1924+
onClientStreamCreatedChannel.publish({
1925+
stream,
1926+
headers,
1927+
});
1928+
}
1929+
18691930
const { signal } = options;
18701931
if (signal) {
18711932
validateAbortSignal(signal, 'options.signal');
@@ -1895,17 +1956,11 @@ class ClientHttp2Session extends Http2Session {
18951956
onConnect();
18961957
}
18971958

1898-
if (onClientStreamCreatedChannel.hasSubscribers) {
1899-
onClientStreamCreatedChannel.publish({
1900-
stream,
1901-
headers,
1902-
});
1903-
}
1904-
19051959
return stream;
19061960
}
19071961
}
19081962

1963+
19091964
function trackWriteState(stream, bytes) {
19101965
const session = stream[kSession];
19111966
stream[kState].writeQueueSize += bytes;
@@ -2008,18 +2063,31 @@ function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
20082063
stream.once('finish', finishFn);
20092064
}
20102065

2066+
const context = stream[kTracingContext];
20112067
if (type === NGHTTP2_SESSION_CLIENT) {
20122068
if (onClientStreamCloseChannel.hasSubscribers) {
20132069
onClientStreamCloseChannel.publish({
20142070
stream,
20152071
code,
20162072
});
20172073
}
2018-
} else if (onServerStreamCloseChannel.hasSubscribers) {
2019-
onServerStreamCloseChannel.publish({
2020-
stream,
2021-
code,
2022-
});
2074+
2075+
if (context) {
2076+
context.result = code;
2077+
clientTracingChannels.asyncEnd.publish(context);
2078+
}
2079+
} else {
2080+
if (onServerStreamCloseChannel.hasSubscribers) {
2081+
onServerStreamCloseChannel.publish({
2082+
stream,
2083+
code,
2084+
});
2085+
}
2086+
2087+
if (context) {
2088+
context.result = code;
2089+
serverTracingChannels.asyncEnd.publish(context);
2090+
}
20232091
}
20242092
}
20252093

@@ -2415,6 +2483,41 @@ class Http2Stream extends Duplex {
24152483
}
24162484
const hasHandle = handle !== undefined;
24172485

2486+
// RST code 8 not emitted as an error as its used by clients to signify
2487+
// abort and is already covered by aborted event, also allows more
2488+
// seamless compatibility with http1
2489+
if (err == null && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL)
2490+
err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);
2491+
2492+
if (err) {
2493+
const context = this[kTracingContext];
2494+
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
2495+
if (onClientStreamErrorChannel.hasSubscribers) {
2496+
onClientStreamErrorChannel.publish({
2497+
stream: this,
2498+
error: err,
2499+
});
2500+
}
2501+
2502+
if (context) {
2503+
context.error = err;
2504+
clientTracingChannels.error.publish(context);
2505+
}
2506+
} else {
2507+
if (onServerStreamErrorChannel.hasSubscribers) {
2508+
onServerStreamErrorChannel.publish({
2509+
stream: this,
2510+
error: err,
2511+
});
2512+
}
2513+
2514+
if (context) {
2515+
context.error = err;
2516+
serverTracingChannels.error.publish(context);
2517+
}
2518+
}
2519+
}
2520+
24182521
if (!this.closed)
24192522
closeStream(this, code, hasHandle ? kForceRstStream : kNoRstStream);
24202523
this.push(null);
@@ -2430,12 +2533,6 @@ class Http2Stream extends Duplex {
24302533
sessionState.writeQueueSize -= state.writeQueueSize;
24312534
state.writeQueueSize = 0;
24322535

2433-
// RST code 8 not emitted as an error as its used by clients to signify
2434-
// abort and is already covered by aborted event, also allows more
2435-
// seamless compatibility with http1
2436-
if (err == null && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL)
2437-
err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);
2438-
24392536
this[kSession] = undefined;
24402537
this[kHandle] = undefined;
24412538

@@ -2447,21 +2544,6 @@ class Http2Stream extends Duplex {
24472544
setImmediate(() => {
24482545
session[kMaybeDestroy]();
24492546
});
2450-
if (err) {
2451-
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
2452-
if (onClientStreamErrorChannel.hasSubscribers) {
2453-
onClientStreamErrorChannel.publish({
2454-
stream: this,
2455-
error: err,
2456-
});
2457-
}
2458-
} else if (onServerStreamErrorChannel.hasSubscribers) {
2459-
onServerStreamErrorChannel.publish({
2460-
stream: this,
2461-
error: err,
2462-
});
2463-
}
2464-
}
24652547
callback(err);
24662548
}
24672549
// The Http2Stream can be destroyed if it has closed and if the readable

0 commit comments

Comments
 (0)