Skip to content

Commit 13540ef

Browse files
committed
lib: add tracing channels for http2 streams
PR-URL: #266 Reviewed-By: Juan José Arboleda <[email protected]> Reviewed-By: Rafael Gonzaga <[email protected]> PR-URL: #268
1 parent 309ac77 commit 13540ef

File tree

1 file changed

+140
-58
lines changed

1 file changed

+140
-58
lines changed

lib/internal/http2/core.js

Lines changed: 140 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ const onServerStreamErrorChannel = dc.channel('http2.server.stream.error');
197197
const onServerStreamFinishChannel = dc.channel('http2.server.stream.finish');
198198
const onServerStreamCloseChannel = dc.channel('http2.server.stream.close');
199199

200+
const clientTracingChannels = dc.tracingChannel('http2.client');
201+
const serverTracingChannels = dc.tracingChannel('http2.server');
202+
200203
let debug = require('internal/util/debuglog').debuglog('http2', (fn) => {
201204
debug = fn;
202205
});
@@ -258,6 +261,7 @@ const kSentHeaders = Symbol('sent-headers');
258261
const kSentTrailers = Symbol('sent-trailers');
259262
const kServer = Symbol('server');
260263
const kState = Symbol('state');
264+
const kTracingContext = Symbol('tracing-context');
261265
const kType = Symbol('type');
262266
const kWriteGeneric = Symbol('write-generic');
263267

@@ -341,6 +345,31 @@ function emit(self, ...args) {
341345
ReflectApply(self.emit, self, args);
342346
}
343347

348+
function createServerStream(session, handle, id, options, headers, endOfStream) {
349+
// eslint-disable-next-line no-use-before-define
350+
const stream = new ServerHttp2Stream(session, handle, id, options, headers);
351+
if (endOfStream) {
352+
stream.push(null);
353+
stream[kState].endAfterHeaders = true;
354+
}
355+
356+
if (headers[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
357+
// For head requests, there must not be a body...
358+
// end the writable side immediately.
359+
stream.end();
360+
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
361+
}
362+
363+
if (onServerStreamStartChannel.hasSubscribers) {
364+
onServerStreamStartChannel.publish({
365+
stream,
366+
headers,
367+
});
368+
}
369+
370+
return stream;
371+
}
372+
344373
// Called when a new block of headers has been received for a given
345374
// stream. The stream may or may not be new. If the stream is new,
346375
// create the associated Http2Stream instance and emit the 'stream'
@@ -373,23 +402,24 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
373402
}
374403
// session[kType] can be only one of two possible values
375404
if (type === NGHTTP2_SESSION_SERVER) {
376-
// eslint-disable-next-line no-use-before-define
377-
stream = new ServerHttp2Stream(session, handle, id, {}, obj);
378-
if (endOfStream) {
379-
stream.push(null);
380-
}
381-
if (obj[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
382-
// For head requests, there must not be a body...
383-
// end the writable side immediately.
384-
stream.end();
385-
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
386-
}
387-
388-
if (onServerStreamStartChannel.hasSubscribers) {
389-
onServerStreamStartChannel.publish({
390-
stream,
391-
headers: obj,
405+
if (serverTracingChannels.hasSubscribers) {
406+
const context = { headers: obj, flags };
407+
serverTracingChannels.start.runStores(context, () => {
408+
try {
409+
stream = createServerStream(session, handle, id, {}, obj);
410+
stream[kTracingContext] = context;
411+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
412+
} catch (e) {
413+
context.error = e;
414+
serverTracingChannels.error.publish(context);
415+
throw e;
416+
} finally {
417+
serverTracingChannels.end.publish(context);
418+
}
392419
});
420+
} else {
421+
stream = createServerStream(session, handle, id, {}, obj, endOfStream);
422+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
393423
}
394424
} else {
395425
// eslint-disable-next-line no-use-before-define
@@ -405,10 +435,10 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
405435
stream.push(null);
406436
}
407437
stream.end();
438+
if (endOfStream)
439+
stream[kState].endAfterHeaders = true;
440+
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
408441
}
409-
if (endOfStream)
410-
stream[kState].endAfterHeaders = true;
411-
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
412442
} else {
413443
let event;
414444
const status = obj[HTTP2_HEADER_STATUS];
@@ -444,9 +474,10 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
444474
});
445475
}
446476
}
447-
}
448-
if (endOfStream) {
449-
stream.push(null);
477+
478+
if (endOfStream) {
479+
stream.push(null);
480+
}
450481
}
451482
}
452483

@@ -1786,7 +1817,31 @@ class ClientHttp2Session extends Http2Session {
17861817
// associated Http2Stream instance.
17871818
request(headers, options) {
17881819
debugSessionObj(this, 'initiating request');
1820+
headers = ObjectAssign({ __proto__: null }, headers);
1821+
options = { ...options };
1822+
if (!clientTracingChannels.hasSubscribers) {
1823+
this._requestPrepare(headers, options);
1824+
return this._requestFinalize(headers, options);
1825+
}
1826+
1827+
this._requestPrepare(headers, options);
1828+
const context = { headers, options };
1829+
return clientTracingChannels.start.runStores(context, () => {
1830+
try {
1831+
const stream = this._requestFinalize(context.headers, context.options);
1832+
stream[kTracingContext] = context;
1833+
return stream;
1834+
} catch (e) {
1835+
context.error = e;
1836+
clientTracingChannels.error.publish(context);
1837+
throw e;
1838+
} finally {
1839+
clientTracingChannels.end.publish(context);
1840+
}
1841+
});
1842+
}
17891843

1844+
_requestPrepare(headers, options) {
17901845
if (this.destroyed)
17911846
throw new ERR_HTTP2_INVALID_SESSION();
17921847

@@ -1809,9 +1864,6 @@ class ClientHttp2Session extends Http2Session {
18091864
assertIsObject(headers, 'headers');
18101865
assertIsObject(options, 'options');
18111866

1812-
headers = ObjectAssign({ __proto__: null }, headers);
1813-
options = { ...options };
1814-
18151867
if (headers[HTTP2_HEADER_METHOD] === undefined)
18161868
headers[HTTP2_HEADER_METHOD] = HTTP2_METHOD_GET;
18171869

@@ -1843,7 +1895,9 @@ class ClientHttp2Session extends Http2Session {
18431895
} else {
18441896
validateBoolean(options.endStream, 'options.endStream');
18451897
}
1898+
}
18461899

1900+
_requestFinalize(headers, options) {
18471901
const headersList = mapToHeaders(headers);
18481902

18491903
// eslint-disable-next-line no-use-before-define
@@ -1859,6 +1913,13 @@ class ClientHttp2Session extends Http2Session {
18591913
if (options.waitForTrailers)
18601914
stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
18611915

1916+
if (onClientStreamCreatedChannel.hasSubscribers) {
1917+
onClientStreamCreatedChannel.publish({
1918+
stream,
1919+
headers,
1920+
});
1921+
}
1922+
18621923
const { signal } = options;
18631924
if (signal) {
18641925
validateAbortSignal(signal, 'options.signal');
@@ -1888,17 +1949,11 @@ class ClientHttp2Session extends Http2Session {
18881949
onConnect();
18891950
}
18901951

1891-
if (onClientStreamCreatedChannel.hasSubscribers) {
1892-
onClientStreamCreatedChannel.publish({
1893-
stream,
1894-
headers,
1895-
});
1896-
}
1897-
18981952
return stream;
18991953
}
19001954
}
19011955

1956+
19021957
function trackWriteState(stream, bytes) {
19031958
const session = stream[kSession];
19041959
stream[kState].writeQueueSize += bytes;
@@ -2001,18 +2056,31 @@ function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
20012056
stream.once('finish', finishFn);
20022057
}
20032058

2059+
const context = stream[kTracingContext];
20042060
if (type === NGHTTP2_SESSION_CLIENT) {
20052061
if (onClientStreamCloseChannel.hasSubscribers) {
20062062
onClientStreamCloseChannel.publish({
20072063
stream,
20082064
code,
20092065
});
20102066
}
2011-
} else if (onServerStreamCloseChannel.hasSubscribers) {
2012-
onServerStreamCloseChannel.publish({
2013-
stream,
2014-
code,
2015-
});
2067+
2068+
if (context) {
2069+
context.result = code;
2070+
clientTracingChannels.asyncEnd.publish(context);
2071+
}
2072+
} else {
2073+
if (onServerStreamCloseChannel.hasSubscribers) {
2074+
onServerStreamCloseChannel.publish({
2075+
stream,
2076+
code,
2077+
});
2078+
}
2079+
2080+
if (context) {
2081+
context.result = code;
2082+
serverTracingChannels.asyncEnd.publish(context);
2083+
}
20162084
}
20172085
}
20182086

@@ -2408,6 +2476,41 @@ class Http2Stream extends Duplex {
24082476
}
24092477
const hasHandle = handle !== undefined;
24102478

2479+
// RST code 8 not emitted as an error as its used by clients to signify
2480+
// abort and is already covered by aborted event, also allows more
2481+
// seamless compatibility with http1
2482+
if (err == null && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL)
2483+
err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);
2484+
2485+
if (err) {
2486+
const context = this[kTracingContext];
2487+
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
2488+
if (onClientStreamErrorChannel.hasSubscribers) {
2489+
onClientStreamErrorChannel.publish({
2490+
stream: this,
2491+
error: err,
2492+
});
2493+
}
2494+
2495+
if (context) {
2496+
context.error = err;
2497+
clientTracingChannels.error.publish(context);
2498+
}
2499+
} else {
2500+
if (onServerStreamErrorChannel.hasSubscribers) {
2501+
onServerStreamErrorChannel.publish({
2502+
stream: this,
2503+
error: err,
2504+
});
2505+
}
2506+
2507+
if (context) {
2508+
context.error = err;
2509+
serverTracingChannels.error.publish(context);
2510+
}
2511+
}
2512+
}
2513+
24112514
if (!this.closed)
24122515
closeStream(this, code, hasHandle ? kForceRstStream : kNoRstStream);
24132516
this.push(null);
@@ -2423,12 +2526,6 @@ class Http2Stream extends Duplex {
24232526
sessionState.writeQueueSize -= state.writeQueueSize;
24242527
state.writeQueueSize = 0;
24252528

2426-
// RST code 8 not emitted as an error as its used by clients to signify
2427-
// abort and is already covered by aborted event, also allows more
2428-
// seamless compatibility with http1
2429-
if (err == null && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL)
2430-
err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);
2431-
24322529
this[kSession] = undefined;
24332530
this[kHandle] = undefined;
24342531

@@ -2440,21 +2537,6 @@ class Http2Stream extends Duplex {
24402537
setImmediate(() => {
24412538
session[kMaybeDestroy]();
24422539
});
2443-
if (err) {
2444-
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
2445-
if (onClientStreamErrorChannel.hasSubscribers) {
2446-
onClientStreamErrorChannel.publish({
2447-
stream: this,
2448-
error: err,
2449-
});
2450-
}
2451-
} else if (onServerStreamErrorChannel.hasSubscribers) {
2452-
onServerStreamErrorChannel.publish({
2453-
stream: this,
2454-
error: err,
2455-
});
2456-
}
2457-
}
24582540
callback(err);
24592541
}
24602542
// The Http2Stream can be destroyed if it has closed and if the readable

0 commit comments

Comments
 (0)