Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: HTTPX tracing parallel requests #1404

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -10,83 +10,111 @@ module HTTPX
module Plugin
# Instruments around HTTPX's request/response lifecycle in order to generate
# an OTEL trace.
class RequestTracer
# Constant for the HTTP status range
HTTP_STATUS_SUCCESS_RANGE = (100..399)
module RequestTracer
module_function

# initializes tracing on the +request+.
def call(request)
span = nil

# request objects are reused, when already buffered requests get rerouted to a different
# connection due to connection issues, or when they already got a response, but need to
# be retried. In such situations, the original span needs to be extended for the former,
# while a new is required for the latter.
request.on(:idle) do
span = nil
end
# the span is initialized when the request is buffered in the parser, which is the closest
# one gets to actually sending the request.
request.on(:headers) do
next if span

span = initialize_span(request)
end

request.on(:response) do |response|
unless span
next unless response.is_a?(::HTTPX::ErrorResponse) && response.error.respond_to?(:connection)

def initialize(request)
@request = request
# handles the case when the +error+ happened during name resolution, which means
# that the tracing start point hasn't been triggered yet; in such cases, the approximate
# initial resolving time is collected from the connection, and used as span start time,
# and the tracing object in inserted before the on response callback is called.
span = initialize_span(request, response.error.connection.init_time)

end

finish(response, span)
end
end

def call
@request.on(:response, &method(:finish)) # rubocop:disable Performance/MethodObjectAsBlock
def finish(response, span)
if response.is_a?(::HTTPX::ErrorResponse)
if response.response
span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, response.response.status)
end

uri = @request.uri
request_method = @request.verb
span_name = "HTTP #{request_method}"
span.record_exception(response.error)
span.status = Trace::Status.error(response.error.to_s)
else
span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, response.status)
end

span.finish
end

# return a span initialized with the +request+ state.
def initialize_span(request, start_time = ::Time.now)
verb = request.verb
uri = request.uri

config = HTTPX::Instrumentation.instance.config

attributes = {
OpenTelemetry::SemanticConventions::Trace::HTTP_HOST => uri.host,
OpenTelemetry::SemanticConventions::Trace::HTTP_METHOD => request_method,
OpenTelemetry::SemanticConventions::Trace::HTTP_METHOD => verb,
OpenTelemetry::SemanticConventions::Trace::HTTP_SCHEME => uri.scheme,
OpenTelemetry::SemanticConventions::Trace::HTTP_TARGET => uri.path,
OpenTelemetry::SemanticConventions::Trace::HTTP_URL => "#{uri.scheme}://#{uri.host}",
OpenTelemetry::SemanticConventions::Trace::NET_PEER_NAME => uri.host,
OpenTelemetry::SemanticConventions::Trace::NET_PEER_PORT => uri.port
}
config = HTTPX::Instrumentation.instance.config
attributes[OpenTelemetry::SemanticConventions::Trace::PEER_SERVICE] = config[:peer_service] if config[:peer_service]
attributes.merge!(
OpenTelemetry::Common::HTTP::ClientContext.attributes
)

@span = tracer.start_span(span_name, attributes: attributes, kind: :client)
trace_ctx = OpenTelemetry::Trace.context_with_span(@span)
@trace_token = OpenTelemetry::Context.attach(trace_ctx)

OpenTelemetry.propagation.inject(@request.headers)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end
attributes[OpenTelemetry::SemanticConventions::Trace::PEER_SERVICE] = config[:peer_service] if config[:peer_service]
attributes.merge!(OpenTelemetry::Common::HTTP::ClientContext.attributes)

def finish(response)
return unless @span
span = tracer.start_span("HTTP #{verb}", attributes: attributes, kind: :client, start_timestamp: start_time)

if response.is_a?(::HTTPX::ErrorResponse)
@span.record_exception(response.error)
@span.status = Trace::Status.error("Unhandled exception of type: #{response.error.class}")
else
@span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, response.status)
@span.status = Trace::Status.error unless HTTP_STATUS_SUCCESS_RANGE.cover?(response.status)
OpenTelemetry::Trace.with_span(span) do
OpenTelemetry.propagation.inject(request.headers)
end

OpenTelemetry::Context.detach(@trace_token) if @trace_token
@span.finish
span
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

private

def tracer
HTTPX::Instrumentation.instance.tracer
end
end

# HTTPX::Request overrides
module RequestMethods
def __otel_enable_trace!
return if @__otel_enable_trace
# intercepts request initialization to inject the tracing logic.
def initialize(*)
super

RequestTracer.new(self).call
@__otel_enable_trace = true
RequestTracer.call(self)
end
end

# HTTPX::Connection overrides
module ConnectionMethods
def send(request)
request.__otel_enable_trace!
attr_reader :init_time

def initialize(*)
super

@init_time = ::Time.now
end
end
end
Expand Down
Loading