Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/scripts/levelization/results/ordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ xrpld.app > xrpl.basics
xrpld.app > xrpl.core
xrpld.app > xrpld.consensus
xrpld.app > xrpld.core
xrpld.app > xrpld.telemetry
xrpld.app > xrpl.json
xrpld.app > xrpl.ledger
xrpld.app > xrpl.net
Expand All @@ -244,6 +245,7 @@ xrpld.overlay > xrpl.basics
xrpld.overlay > xrpl.core
xrpld.overlay > xrpld.core
xrpld.overlay > xrpld.peerfinder
xrpld.overlay > xrpld.telemetry
xrpld.overlay > xrpl.json
xrpld.overlay > xrpl.protocol
xrpld.overlay > xrpl.rdb
Expand Down
2 changes: 1 addition & 1 deletion cspell.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ words:
- daria
- dcmake
- dearmor
- dedup
- Dedup
- dedup
- deleteme
- demultiplexer
- deserializaton
Expand Down
18 changes: 18 additions & 0 deletions include/xrpl/proto/xrpl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ message TMPublicKey {
// If you want to send an amount that is greater than any single address of yours
// you must first combine coins from one address to another.

// Trace context for OpenTelemetry distributed tracing across nodes.
// Uses W3C Trace Context format internally.
message TraceContext {
optional bytes trace_id = 1; // 16-byte trace identifier
optional bytes span_id = 2; // 8-byte parent span identifier
optional uint32 trace_flags = 3; // bit 0 = sampled
optional string trace_state = 4; // W3C tracestate header value
}

enum TransactionStatus {
tsNEW = 1; // origin node did/could not validate
tsCURRENT = 2; // scheduled to go in this ledger
Expand All @@ -101,6 +110,9 @@ message TMTransaction {
required TransactionStatus status = 2;
optional uint64 receiveTimestamp = 3;
optional bool deferred = 4; // not applied to open ledger

// Optional trace context for OpenTelemetry distributed tracing
optional TraceContext trace_context = 1001;
}

message TMTransactions {
Expand Down Expand Up @@ -149,6 +161,9 @@ message TMProposeSet {

// Number of hops traveled
optional uint32 hops = 12 [deprecated = true];

// Optional trace context for OpenTelemetry distributed tracing
optional TraceContext trace_context = 1001;
}

enum TxSetStatus {
Expand Down Expand Up @@ -194,6 +209,9 @@ message TMValidation {

// Number of hops traveled
optional uint32 hops = 3 [deprecated = true];

// Optional trace context for OpenTelemetry distributed tracing
optional TraceContext trace_context = 1001;
}

// An array of Endpoint messages
Expand Down
94 changes: 94 additions & 0 deletions include/xrpl/telemetry/TraceContextPropagator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once

/** Utilities for trace context propagation across nodes.

Provides serialization/deserialization of OTel trace context to/from
Protocol Buffer TraceContext messages (P2P cross-node propagation).

Only compiled when XRPL_ENABLE_TELEMETRY is defined.
*/

#ifdef XRPL_ENABLE_TELEMETRY

#include <xrpl/proto/xrpl.pb.h>

#include <opentelemetry/context/context.h>
#include <opentelemetry/trace/context.h>
#include <opentelemetry/trace/default_span.h>
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/trace_flags.h>
#include <opentelemetry/trace/trace_id.h>

#include <cstdint>

namespace xrpl {
namespace telemetry {

/** Extract OTel context from a protobuf TraceContext message.

@param proto The protobuf TraceContext received from a peer.
@return An OTel Context with the extracted parent span, or an empty
context if the protobuf fields are missing or invalid.
*/
inline opentelemetry::context::Context
extractFromProtobuf(protocol::TraceContext const& proto)
{
namespace trace = opentelemetry::trace;

if (!proto.has_trace_id() || proto.trace_id().size() != 16 || !proto.has_span_id() ||
proto.span_id().size() != 8)
{
return opentelemetry::context::Context{};
}

auto const* rawTraceId = reinterpret_cast<std::uint8_t const*>(proto.trace_id().data());
auto const* rawSpanId = reinterpret_cast<std::uint8_t const*>(proto.span_id().data());
trace::TraceId traceId(opentelemetry::nostd::span<std::uint8_t const, 16>(rawTraceId, 16));
trace::SpanId spanId(opentelemetry::nostd::span<std::uint8_t const, 8>(rawSpanId, 8));
// Default to not-sampled (0x00) per W3C Trace Context spec when
// the trace_flags field is absent.
trace::TraceFlags flags(
proto.has_trace_flags() ? static_cast<std::uint8_t>(proto.trace_flags())
: static_cast<std::uint8_t>(0));

trace::SpanContext spanCtx(traceId, spanId, flags, /* remote = */ true);

return opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
}

/** Inject the current span's trace context into a protobuf TraceContext.

@param ctx The OTel context containing the span to propagate.
@param proto The protobuf TraceContext to populate.
*/
inline void
injectToProtobuf(opentelemetry::context::Context const& ctx, protocol::TraceContext& proto)
{
namespace trace = opentelemetry::trace;

auto span = trace::GetSpan(ctx);
if (!span)
return;

Check warning on line 73 in include/xrpl/telemetry/TraceContextPropagator.h

View check run for this annotation

Codecov / codecov/patch

include/xrpl/telemetry/TraceContextPropagator.h#L73

Added line #L73 was not covered by tests

auto const& spanCtx = span->GetContext();
if (!spanCtx.IsValid())
return;

// Serialize trace_id (16 bytes)
auto const& traceId = spanCtx.trace_id();
proto.set_trace_id(traceId.Id().data(), trace::TraceId::kSize);

// Serialize span_id (8 bytes)
auto const& spanId = spanCtx.span_id();
proto.set_span_id(spanId.Id().data(), trace::SpanId::kSize);

// Serialize flags
proto.set_trace_flags(spanCtx.trace_flags().flags());
}

} // namespace telemetry
} // namespace xrpl

#endif // XRPL_ENABLE_TELEMETRY
155 changes: 155 additions & 0 deletions src/tests/libxrpl/telemetry/TraceContextPropagator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#include <gtest/gtest.h>

#ifdef XRPL_ENABLE_TELEMETRY

#include <xrpl/telemetry/TraceContextPropagator.h>

#include <opentelemetry/context/context.h>
#include <opentelemetry/nostd/span.h>
#include <opentelemetry/trace/context.h>
#include <opentelemetry/trace/default_span.h>
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/trace_flags.h>
#include <opentelemetry/trace/trace_id.h>

#include <cstring>

namespace trace = opentelemetry::trace;

TEST(TraceContextPropagator, round_trip)
{
std::uint8_t traceIdBuf[16] = {
0x01,
0x02,
0x03,
0x04,
0x05,
0x06,
0x07,
0x08,
0x09,
0x0a,
0x0b,
0x0c,
0x0d,
0x0e,
0x0f,
0x10};
std::uint8_t spanIdBuf[8] = {0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x11, 0x22};

trace::TraceId traceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16));
trace::SpanId spanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8));
trace::TraceFlags flags(trace::TraceFlags::kIsSampled);
trace::SpanContext spanCtx(traceId, spanId, flags, true);

auto ctx = opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));

protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);

EXPECT_TRUE(proto.has_trace_id());
EXPECT_EQ(proto.trace_id().size(), 16u);
EXPECT_TRUE(proto.has_span_id());
EXPECT_EQ(proto.span_id().size(), 8u);
EXPECT_EQ(proto.trace_flags(), static_cast<uint32_t>(trace::TraceFlags::kIsSampled));
EXPECT_EQ(std::memcmp(proto.trace_id().data(), traceIdBuf, 16), 0);
EXPECT_EQ(std::memcmp(proto.span_id().data(), spanIdBuf, 8), 0);

auto extractedCtx = xrpl::telemetry::extractFromProtobuf(proto);
auto extractedSpan = trace::GetSpan(extractedCtx);
ASSERT_NE(extractedSpan, nullptr);

auto const& extracted = extractedSpan->GetContext();
EXPECT_TRUE(extracted.IsValid());
EXPECT_TRUE(extracted.IsRemote());
EXPECT_EQ(extracted.trace_id(), traceId);
EXPECT_EQ(extracted.span_id(), spanId);
EXPECT_TRUE(extracted.trace_flags().IsSampled());
}

TEST(TraceContextPropagator, extract_empty_protobuf)
{
protocol::TraceContext proto;
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}

TEST(TraceContextPropagator, extract_wrong_size_trace_id)
{
protocol::TraceContext proto;
proto.set_trace_id(std::string(8, '\x01'));
proto.set_span_id(std::string(8, '\xaa'));

auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}

TEST(TraceContextPropagator, extract_wrong_size_span_id)
{
protocol::TraceContext proto;
proto.set_trace_id(std::string(16, '\x01'));
proto.set_span_id(std::string(4, '\xaa'));

auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(ctx);
if (span)
{
EXPECT_FALSE(span->GetContext().IsValid());
}
}

TEST(TraceContextPropagator, inject_invalid_span)
{
auto ctx = opentelemetry::context::Context{};
protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);

EXPECT_FALSE(proto.has_trace_id());
EXPECT_FALSE(proto.has_span_id());
}

TEST(TraceContextPropagator, flags_preservation)
{
std::uint8_t traceIdBuf[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
std::uint8_t spanIdBuf[8] = {1, 2, 3, 4, 5, 6, 7, 8};

// Test with flags NOT sampled (flags = 0)
trace::TraceFlags flags(0);
trace::SpanContext spanCtx(
trace::TraceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16)),
trace::SpanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8)),
flags,
true);

auto ctx = opentelemetry::context::Context{}.SetValue(
trace::kSpanKey,
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));

protocol::TraceContext proto;
xrpl::telemetry::injectToProtobuf(ctx, proto);
EXPECT_EQ(proto.trace_flags(), 0u);

auto extracted = xrpl::telemetry::extractFromProtobuf(proto);
auto span = trace::GetSpan(extracted);
ASSERT_NE(span, nullptr);
EXPECT_FALSE(span->GetContext().trace_flags().IsSampled());
}

#else // XRPL_ENABLE_TELEMETRY not defined

TEST(TraceContextPropagator, compiles_without_telemetry)
{
SUCCEED();
}

#endif // XRPL_ENABLE_TELEMETRY
11 changes: 11 additions & 0 deletions src/xrpld/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <xrpld/rpc/DeliveredAmount.h>
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpld/telemetry/TracingInstrumentation.h>

#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/mulDiv.h>
Expand Down Expand Up @@ -1223,16 +1224,26 @@ NetworkOPsImp::processTransaction(
bool bLocal,
FailHard failType)
{
XRPL_TRACE_TX(registry_.getTelemetry(), "tx.process");
XRPL_TRACE_SET_ATTR("xrpl.tx.hash", to_string(transaction->getID()).c_str());
XRPL_TRACE_SET_ATTR("xrpl.tx.local", bLocal);

auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");

// preProcessTransaction can change our pointer
if (!preProcessTransaction(transaction))
return;

if (bLocal)
{
XRPL_TRACE_SET_ATTR("xrpl.tx.path", "sync");
doTransactionSync(transaction, bUnlimited, failType);
}
else
{
XRPL_TRACE_SET_ATTR("xrpl.tx.path", "async");
doTransactionAsync(transaction, bUnlimited, failType);
}
}

void
Expand Down
7 changes: 7 additions & 0 deletions src/xrpld/overlay/detail/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/telemetry/TracingInstrumentation.h>

#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/base64.h>
Expand Down Expand Up @@ -1269,6 +1270,9 @@ PeerImp::handleTransaction(
bool eraseTxQueue,
bool batch)
{
XRPL_TRACE_TX(app_.getTelemetry(), "tx.receive");
XRPL_TRACE_SET_ATTR("xrpl.peer.id", static_cast<int64_t>(id_));

XRPL_ASSERT(eraseTxQueue != batch, ("xrpl::PeerImp::handleTransaction : valid inputs"));
if (tracking_.load() == Tracking::diverged)
return;
Expand All @@ -1287,6 +1291,7 @@ PeerImp::handleTransaction(
{
auto stx = std::make_shared<STTx const>(sit);
uint256 txID = stx->getTransactionID();
XRPL_TRACE_SET_ATTR("xrpl.tx.hash", to_string(txID).c_str());

// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
Expand Down Expand Up @@ -1320,9 +1325,11 @@ PeerImp::handleTransaction(

if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
{
XRPL_TRACE_SET_ATTR("xrpl.tx.suppressed", true);
// we have seen this transaction recently
if (any(flags & HashRouterFlags::BAD))
{
XRPL_TRACE_SET_ATTR("xrpl.tx.status", "known_bad");
fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
}
Expand Down
Loading