Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 235 additions & 0 deletions apisix/debug_tracer.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local process = require("ngx.process")

local always_on_sampler_new = require("opentelemetry.trace.sampling.always_on_sampler").new
local exporter_client_new = require("opentelemetry.trace.exporter.http_client").new
local otlp_exporter_new = require("opentelemetry.trace.exporter.otlp").new
local batch_span_processor_new = require("opentelemetry.trace.batch_span_processor").new
local tracer_provider_new = require("opentelemetry.trace.tracer_provider").new
local resource_new = require("opentelemetry.resource").new
local attr = require("opentelemetry.attribute")
local span_kind = require("opentelemetry.trace.span_kind")

local context = require("opentelemetry.context").new()

local _M = { version = 0.1 }

local hostname

function _M.init()
if process.type() ~= "worker" then
return
end
hostname = core.utils.gethostname()
end



local debug_tracer_provider = {}
debug_tracer_provider.__index = debug_tracer_provider

function debug_tracer_provider.new(collector_config, resource_attrs)
local self = setmetatable({
collector_config = collector_config or {
address = "127.0.0.1:4318",
request_timeout = 3,
request_headers = {}
},
resource_attrs = resource_attrs or {},
spans = {}, -- Buffered spans for this tracer instance
is_reporting = false
}, debug_tracer_provider)

return self
end

function debug_tracer_provider:start_span(span_name, options)
local span_id = core.utils.uuid()
local trace_id = core.utils.uuid()
local start_time = ngx.now() * 1000000000 -- Convert to nanoseconds

local span = {
id = span_id,
trace_id = trace_id,
name = span_name,
start_time = start_time,
end_time = nil,
kind = options and options.kind or span_kind.internal,
attributes = options and options.attributes or {},
parent_span_id = options and options.parent_span_id,
status = nil,
events = {}
}

-- Store in buffered spans
self.spans[span_id] = span

return {
span_id = span_id,
trace_id = trace_id,
name = span_name,
context = context
}
end

function debug_tracer_provider:finish_span(span_token, end_time)
local span = self.spans[span_token.span_id]
if span then
span.end_time = end_time or (ngx.now() * 1000000000)
end
return span
end

function debug_tracer_provider:add_event(span_token, event_name, attributes)
local span = self.spans[span_token.span_id]
if span then
table.insert(span.events, {
name = event_name,
time = ngx.now() * 1000000000,
attributes = attributes or {}
})
end
end

function debug_tracer_provider:set_attributes(span_token, attributes)
local span = self.spans[span_token.span_id]
if span then
for k, v in pairs(attributes) do
span.attributes[k] = v
end
end
end

function debug_tracer_provider:set_status(span_token, status, description)
local span = self.spans[span_token.span_id]
if span then
span.status = {
code = status,
description = description
}
end
end

function debug_tracer_provider:report_trace(debug_session_id)
if self.is_reporting then
core.log.warn("Debug tracer is already in reporting mode")
return
end
self.is_reporting = true
local real_tracer = self:_create_real_tracer(debug_session_id)

-- Convert all buffered spans to real spans
for span_id, buffered_span in pairs(self.spans) do
if buffered_span.end_time then
self:_convert_to_real_span(real_tracer, buffered_span)
end
end

-- Force flush
real_tracer.provider:force_flush()

core.log.info("Debug trace reported for session: ", debug_session_id)
end

function debug_tracer_provider:_create_real_tracer(debug_session_id)
-- Build resource attributes
local resource_attrs = { attr.string("hostname", hostname) }

-- Add service name if not provided
if not self.resource_attrs["service.name"] then
table.insert(resource_attrs, attr.string("service.name", "APISIX-Debug"))
end

-- Add debug session ID
table.insert(resource_attrs, attr.string("debug.session.id", debug_session_id))

-- Add custom resource attributes
for k, v in pairs(self.resource_attrs) do
if type(v) == "string" then
table.insert(resource_attrs, attr.string(k, v))
elseif type(v) == "number" then
table.insert(resource_attrs, attr.double(k, v))
elseif type(v) == "boolean" then
table.insert(resource_attrs, attr.bool(k, v))
end
end

-- Create real tracer
local exporter = otlp_exporter_new(
exporter_client_new(
self.collector_config.address,
self.collector_config.request_timeout,
self.collector_config.request_headers
)
)

local batch_span_processor = batch_span_processor_new(
exporter,
self.collector_config.batch_span_processor or {}
)

local sampler = always_on_sampler_new() -- Always sample debug traces

local tp = tracer_provider_new(batch_span_processor, {
resource = resource_new(unpack(resource_attrs)),
sampler = sampler,
})

return tp:tracer("apisix-debug-tracer")
end

function debug_tracer_provider:_convert_to_real_span(real_tracer, buffered_span)
-- Start span with original timing
local span_ctx = real_tracer:start(buffered_span.name, {
kind = buffered_span.kind,
attributes = buffered_span.attributes,
start_time = buffered_span.start_time
})

local span = span_ctx:span()

-- Add events
for _, event in ipairs(buffered_span.events) do
-- Note: OpenTelemetry Lua might not have direct event API
-- We can add as attributes instead
span:set_attributes(event.attributes)
end

-- Set status
if buffered_span.status then
span:set_status(buffered_span.status.code, buffered_span.status.description)
end

-- Finish with original end time
span:finish(buffered_span.end_time)
end

function debug_tracer_provider:get_buffered_spans_count()
local count = 0
for _ in pairs(self.spans) do
count = count + 1
end
return count
end

function _M.create_tracer_provider(collector_config, resource_attrs)
return debug_tracer_provider.new(collector_config, resource_attrs)
end

return _M
26 changes: 25 additions & 1 deletion apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ local debug = require("apisix.debug")
local pubsub_kafka = require("apisix.pubsub.kafka")
local resource = require("apisix.resource")
local trusted_addresses_util = require("apisix.utils.trusted-addresses")
local span_kind = require("opentelemetry.trace.span_kind")
local debug_tracer = require("apisix.debug_tracer")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
Expand Down Expand Up @@ -203,8 +205,30 @@ function _M.ssl_client_hello_phase()
local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx

-- if shared_dict:get("active_debug_sessions") then (Make it conditional?)
local tracer_provider = debug_tracer.create_tracer_provider()
api_ctx.debug_tracer = tracer_provider
local tls_span = tracer_provider:start_span("TLS Handshake", {
kind = span_kind.server,
attributes = {
tls_sni = sni,
tls_ext_status_request = tls_ext_status_req ~= nil
}
})
api_ctx.tls_span = tls_span
-- end
local ok, err = router.router_ssl.match_and_set(api_ctx, true, sni)

if api_ctx.debug_tracer and api_ctx.matched_ssl then
local route_span = api_ctx.debug_tracer:start_span("SSL Route Matching", {
attributes = {
matched_sni = sni,
ssl_cert_found = api_ctx.matched_ssl ~= nil,
route_id = api_ctx.matched_ssl and api_ctx.matched_ssl.value and
api_ctx.matched_ssl.value.id or "none"
}
})
api_ctx.route_span = route_span
end
ngx_ctx.matched_ssl = api_ctx.matched_ssl
core.tablepool.release("api_ctx", api_ctx)
ngx_ctx.api_ctx = nil
Expand Down
67 changes: 56 additions & 11 deletions apisix/plugins/opentelemetry.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local plugin_name = "opentelemetry"
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local process = require("ngx.process")
local debug_tracer = require("apisix.debug_tracer")

local always_off_sampler_new = require("opentelemetry.trace.sampling.always_off_sampler").new
local always_on_sampler_new = require("opentelemetry.trace.sampling.always_on_sampler").new
Expand Down Expand Up @@ -305,6 +306,28 @@ local function inject_attributes(attributes, wanted_attributes, source, with_pre
end
end

function _M.access(conf, api_ctx)
-- Check if this request matches any debug session
local debug_session = check_debug_session_conditions(api_ctx)
if debug_session and api_ctx.debug_tracer then
api_ctx.active_debug_session = debug_session

-- We've decided to report this trace, finish spans and report
if api_ctx.tls_span then
api_ctx.debug_tracer:finish_span(api_ctx.tls_span)
end
if api_ctx.route_span then
api_ctx.debug_tracer:finish_span(api_ctx.route_span)
end
if api_ctx.debug_main_span then
api_ctx.debug_tracer:finish_span(api_ctx.debug_main_span)
end

-- Report the trace
api_ctx.debug_tracer:report_trace(debug_session.id)
api_ctx.debug_tracer = nil -- Clean up
end
end

function _M.rewrite(conf, api_ctx)
local metadata = plugin.plugin_metadata(plugin_name)
Expand Down Expand Up @@ -380,6 +403,23 @@ function _M.rewrite(conf, api_ctx)

-- inject trace context into the headers of upstream HTTP request
trace_context_propagator:inject(ctx, ngx.req)
-- implement should_enable_debug_tracing
if should_enable_debug_tracing(api_ctx) then
local metadata = plugin.plugin_metadata(plugin_name)
if metadata then
local plugin_info = metadata.value
api_ctx.debug_tracer = debug_tracer.create_tracer_provider(
plugin_info.collector,
plugin_info.resource
)
-- Start main request span in debug tracer
local debug_span = api_ctx.debug_tracer:start_span(span_name, {
kind = span_kind.server,
attributes = attributes
})
api_ctx.debug_main_span = debug_span
end
end
end


Expand Down Expand Up @@ -407,18 +447,23 @@ end
-- body_filter maybe not called because of empty http body response
-- so we need to check if the span has finished in log phase
function _M.log(conf, api_ctx)
if api_ctx.otel_context_token then
-- ctx:detach() is not necessary, because of ctx is stored in ngx.ctx
local upstream_status = core.response.get_upstream_status(api_ctx)

-- get span from current context
local span = context:current():span()
if upstream_status and upstream_status >= 500 then
span:set_status(span_status.ERROR,
"upstream response status: " .. upstream_status)
if not api_ctx.active_debug_session then
-- implement check_response_debug_session
local debug_session = check_response_debug_session(api_ctx)
if debug_session and api_ctx.debug_tracer then
-- Finish all spans and report
for span_id, span in pairs(api_ctx.debug_tracer.spans) do
if not span.end_time then
api_ctx.debug_tracer:finish_span({span_id = span_id})
end
end
api_ctx.debug_tracer:report_trace(debug_session.id)
end

span:finish()
end

-- Cleanup: if debug tracer exists but no session matched, discard
if api_ctx.debug_tracer and not api_ctx.active_debug_session then
api_ctx.debug_tracer = nil
end
end

Expand Down
Loading