Skip to content

lib: add tracing support for http2 #266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 140 additions & 58 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ const onServerStreamErrorChannel = dc.channel('http2.server.stream.error');
const onServerStreamFinishChannel = dc.channel('http2.server.stream.finish');
const onServerStreamCloseChannel = dc.channel('http2.server.stream.close');

const clientTracingChannels = dc.tracingChannel('http2.client');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const clientTracingChannels = dc.tracingChannel('http2.client');
const clientTracingChannels = dc.tracingChannel('http2.client');

Should we have a http2.server.session too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but at the moment I'm only tracking streams (http transactions). We could of course add support for sessions (to track actual connections). We can add those later on. At least this way we have the same support as we have for http and fetch.

const serverTracingChannels = dc.tracingChannel('http2.server');

let debug = require('internal/util/debuglog').debuglog('http2', (fn) => {
debug = fn;
});
Expand Down Expand Up @@ -261,6 +264,7 @@ const kSentHeaders = Symbol('sent-headers');
const kSentTrailers = Symbol('sent-trailers');
const kServer = Symbol('server');
const kState = Symbol('state');
const kTracingContext = Symbol('tracing-context');
const kType = Symbol('type');
const kWriteGeneric = Symbol('write-generic');

Expand Down Expand Up @@ -344,6 +348,31 @@ function emit(self, ...args) {
ReflectApply(self.emit, self, args);
}

function createServerStream(session, handle, id, options, headers, endOfStream) {
// eslint-disable-next-line no-use-before-define
const stream = new ServerHttp2Stream(session, handle, id, options, headers);
if (endOfStream) {
stream.push(null);
stream[kState].endAfterHeaders = true;
}

if (headers[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
// For head requests, there must not be a body...
// end the writable side immediately.
stream.end();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be inside an else if instead? What if we receive an endOfStream = true and HTTP2_HEADER_METHOD = head?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, to be honest I've just kept the same logic it was implemented before.

stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
}

if (onServerStreamStartChannel.hasSubscribers) {
onServerStreamStartChannel.publish({
stream,
headers,
});
}

return stream;
}

// Called when a new block of headers has been received for a given
// stream. The stream may or may not be new. If the stream is new,
// create the associated Http2Stream instance and emit the 'stream'
Expand Down Expand Up @@ -376,23 +405,24 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
}
// session[kType] can be only one of two possible values
if (type === NGHTTP2_SESSION_SERVER) {
// eslint-disable-next-line no-use-before-define
stream = new ServerHttp2Stream(session, handle, id, {}, obj);
if (endOfStream) {
stream.push(null);
}
if (obj[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) {
// For head requests, there must not be a body...
// end the writable side immediately.
stream.end();
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
}

if (onServerStreamStartChannel.hasSubscribers) {
onServerStreamStartChannel.publish({
stream,
headers: obj,
if (serverTracingChannels.hasSubscribers) {
const context = { headers: obj, flags };
serverTracingChannels.start.runStores(context, () => {
try {
stream = createServerStream(session, handle, id, {}, obj);
stream[kTracingContext] = context;
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
} catch (e) {
context.error = e;
serverTracingChannels.error.publish(context);
throw e;
} finally {
serverTracingChannels.end.publish(context);
}
});
} else {
stream = createServerStream(session, handle, id, {}, obj, endOfStream);
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
}
} else {
// eslint-disable-next-line no-use-before-define
Expand All @@ -408,10 +438,10 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
stream.push(null);
}
stream.end();
if (endOfStream)
stream[kState].endAfterHeaders = true;
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
}
if (endOfStream)
stream[kState].endAfterHeaders = true;
process.nextTick(emit, session, 'stream', stream, obj, flags, headers);
} else {
let event;
const status = obj[HTTP2_HEADER_STATUS];
Expand Down Expand Up @@ -452,9 +482,10 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
});
}
}
}
if (endOfStream) {
stream.push(null);

if (endOfStream) {
stream.push(null);
}
}
}

Expand Down Expand Up @@ -1791,7 +1822,31 @@ class ClientHttp2Session extends Http2Session {
// associated Http2Stream instance.
request(headers, options) {
debugSessionObj(this, 'initiating request');
headers = ObjectAssign({ __proto__: null }, headers);
options = { ...options };
if (!clientTracingChannels.hasSubscribers) {
this._requestPrepare(headers, options);
return this._requestFinalize(headers, options);
}

this._requestPrepare(headers, options);
const context = { headers, options };
return clientTracingChannels.start.runStores(context, () => {
try {
const stream = this._requestFinalize(context.headers, context.options);
stream[kTracingContext] = context;
return stream;
} catch (e) {
context.error = e;
clientTracingChannels.error.publish(context);
throw e;
} finally {
clientTracingChannels.end.publish(context);
}
});
}

_requestPrepare(headers, options) {
if (this.destroyed)
throw new ERR_HTTP2_INVALID_SESSION();

Expand All @@ -1814,9 +1869,6 @@ class ClientHttp2Session extends Http2Session {
assertIsObject(headers, 'headers');
assertIsObject(options, 'options');

headers = ObjectAssign({ __proto__: null }, headers);
options = { ...options };

if (headers[HTTP2_HEADER_METHOD] === undefined)
headers[HTTP2_HEADER_METHOD] = HTTP2_METHOD_GET;

Expand Down Expand Up @@ -1848,7 +1900,9 @@ class ClientHttp2Session extends Http2Session {
} else {
validateBoolean(options.endStream, 'options.endStream');
}
}

_requestFinalize(headers, options) {
const headersList = mapToHeaders(headers);

// eslint-disable-next-line no-use-before-define
Expand All @@ -1866,6 +1920,13 @@ class ClientHttp2Session extends Http2Session {
if (options.waitForTrailers)
stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;

if (onClientStreamCreatedChannel.hasSubscribers) {
onClientStreamCreatedChannel.publish({
stream,
headers,
});
}

const { signal } = options;
if (signal) {
validateAbortSignal(signal, 'options.signal');
Expand Down Expand Up @@ -1895,17 +1956,11 @@ class ClientHttp2Session extends Http2Session {
onConnect();
}

if (onClientStreamCreatedChannel.hasSubscribers) {
onClientStreamCreatedChannel.publish({
stream,
headers,
});
}

return stream;
}
}


function trackWriteState(stream, bytes) {
const session = stream[kSession];
stream[kState].writeQueueSize += bytes;
Expand Down Expand Up @@ -2008,18 +2063,31 @@ function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
stream.once('finish', finishFn);
}

const context = stream[kTracingContext];
if (type === NGHTTP2_SESSION_CLIENT) {
if (onClientStreamCloseChannel.hasSubscribers) {
onClientStreamCloseChannel.publish({
stream,
code,
});
}
} else if (onServerStreamCloseChannel.hasSubscribers) {
onServerStreamCloseChannel.publish({
stream,
code,
});

if (context) {
context.result = code;
clientTracingChannels.asyncEnd.publish(context);
}
} else {
if (onServerStreamCloseChannel.hasSubscribers) {
onServerStreamCloseChannel.publish({
stream,
code,
});
}

if (context) {
context.result = code;
serverTracingChannels.asyncEnd.publish(context);
}
}
}

Expand Down Expand Up @@ -2415,6 +2483,41 @@ class Http2Stream extends Duplex {
}
const hasHandle = handle !== undefined;

// RST code 8 not emitted as an error as its used by clients to signify
// abort and is already covered by aborted event, also allows more
// seamless compatibility with http1
if (err == null && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL)
err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);

if (err) {
const context = this[kTracingContext];
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
if (onClientStreamErrorChannel.hasSubscribers) {
onClientStreamErrorChannel.publish({
stream: this,
error: err,
});
}

if (context) {
context.error = err;
clientTracingChannels.error.publish(context);
}
} else {
if (onServerStreamErrorChannel.hasSubscribers) {
onServerStreamErrorChannel.publish({
stream: this,
error: err,
});
}

if (context) {
context.error = err;
serverTracingChannels.error.publish(context);
}
}
}

if (!this.closed)
closeStream(this, code, hasHandle ? kForceRstStream : kNoRstStream);
this.push(null);
Expand All @@ -2430,12 +2533,6 @@ class Http2Stream extends Duplex {
sessionState.writeQueueSize -= state.writeQueueSize;
state.writeQueueSize = 0;

// RST code 8 not emitted as an error as its used by clients to signify
// abort and is already covered by aborted event, also allows more
// seamless compatibility with http1
if (err == null && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL)
err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);

this[kSession] = undefined;
this[kHandle] = undefined;

Expand All @@ -2447,21 +2544,6 @@ class Http2Stream extends Duplex {
setImmediate(() => {
session[kMaybeDestroy]();
});
if (err) {
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
if (onClientStreamErrorChannel.hasSubscribers) {
onClientStreamErrorChannel.publish({
stream: this,
error: err,
});
}
} else if (onServerStreamErrorChannel.hasSubscribers) {
onServerStreamErrorChannel.publish({
stream: this,
error: err,
});
}
}
callback(err);
}
// The Http2Stream can be destroyed if it has closed and if the readable
Expand Down
Loading
Loading