Skip to content

Commit f504d9b

Browse files
Phase 3: Transaction tracing — protobuf context, PeerImp, NetworkOPs
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 051a219 commit f504d9b

File tree

7 files changed

+288
-1
lines changed

7 files changed

+288
-1
lines changed

.github/scripts/levelization/results/ordering.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ xrpld.app > xrpl.basics
220220
xrpld.app > xrpl.core
221221
xrpld.app > xrpld.consensus
222222
xrpld.app > xrpld.core
223+
xrpld.app > xrpld.telemetry
223224
xrpld.app > xrpl.json
224225
xrpld.app > xrpl.ledger
225226
xrpld.app > xrpl.net
@@ -244,6 +245,7 @@ xrpld.overlay > xrpl.basics
244245
xrpld.overlay > xrpl.core
245246
xrpld.overlay > xrpld.core
246247
xrpld.overlay > xrpld.peerfinder
248+
xrpld.overlay > xrpld.telemetry
247249
xrpld.overlay > xrpl.json
248250
xrpld.overlay > xrpl.protocol
249251
xrpld.overlay > xrpl.rdb

cspell.config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ words:
8787
- daria
8888
- dcmake
8989
- dearmor
90-
- dedup
9190
- Dedup
91+
- dedup
9292
- deleteme
9393
- demultiplexer
9494
- deserializaton

include/xrpl/proto/xrpl.proto

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ message TMPublicKey {
8585
// If you want to send an amount that is greater than any single address of yours
8686
// you must first combine coins from one address to another.
8787

88+
// Trace context for OpenTelemetry distributed tracing across nodes.
89+
// Uses W3C Trace Context format internally.
90+
message TraceContext {
91+
optional bytes trace_id = 1; // 16-byte trace identifier
92+
optional bytes span_id = 2; // 8-byte parent span identifier
93+
optional uint32 trace_flags = 3; // bit 0 = sampled
94+
optional string trace_state = 4; // W3C tracestate header value
95+
}
96+
8897
enum TransactionStatus {
8998
tsNEW = 1; // origin node did/could not validate
9099
tsCURRENT = 2; // scheduled to go in this ledger
@@ -101,6 +110,9 @@ message TMTransaction {
101110
required TransactionStatus status = 2;
102111
optional uint64 receiveTimestamp = 3;
103112
optional bool deferred = 4; // not applied to open ledger
113+
114+
// Optional trace context for OpenTelemetry distributed tracing
115+
optional TraceContext trace_context = 1001;
104116
}
105117

106118
message TMTransactions {
@@ -149,6 +161,9 @@ message TMProposeSet {
149161

150162
// Number of hops traveled
151163
optional uint32 hops = 12 [deprecated = true];
164+
165+
// Optional trace context for OpenTelemetry distributed tracing
166+
optional TraceContext trace_context = 1001;
152167
}
153168

154169
enum TxSetStatus {
@@ -194,6 +209,9 @@ message TMValidation {
194209

195210
// Number of hops traveled
196211
optional uint32 hops = 3 [deprecated = true];
212+
213+
// Optional trace context for OpenTelemetry distributed tracing
214+
optional TraceContext trace_context = 1001;
197215
}
198216

199217
// An array of Endpoint messages
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#pragma once
2+
3+
/** Utilities for trace context propagation across nodes.
4+
5+
Provides serialization/deserialization of OTel trace context to/from
6+
Protocol Buffer TraceContext messages (P2P cross-node propagation).
7+
8+
Only compiled when XRPL_ENABLE_TELEMETRY is defined.
9+
*/
10+
11+
#ifdef XRPL_ENABLE_TELEMETRY
12+
13+
#include <xrpl/proto/xrpl.pb.h>
14+
15+
#include <opentelemetry/context/context.h>
16+
#include <opentelemetry/trace/context.h>
17+
#include <opentelemetry/trace/default_span.h>
18+
#include <opentelemetry/trace/span_context.h>
19+
#include <opentelemetry/trace/trace_flags.h>
20+
#include <opentelemetry/trace/trace_id.h>
21+
22+
#include <cstdint>
23+
24+
namespace xrpl {
25+
namespace telemetry {
26+
27+
/** Extract OTel context from a protobuf TraceContext message.
28+
29+
@param proto The protobuf TraceContext received from a peer.
30+
@return An OTel Context with the extracted parent span, or an empty
31+
context if the protobuf fields are missing or invalid.
32+
*/
33+
inline opentelemetry::context::Context
34+
extractFromProtobuf(protocol::TraceContext const& proto)
35+
{
36+
namespace trace = opentelemetry::trace;
37+
38+
if (!proto.has_trace_id() || proto.trace_id().size() != 16 || !proto.has_span_id() ||
39+
proto.span_id().size() != 8)
40+
{
41+
return opentelemetry::context::Context{};
42+
}
43+
44+
auto const* rawTraceId = reinterpret_cast<std::uint8_t const*>(proto.trace_id().data());
45+
auto const* rawSpanId = reinterpret_cast<std::uint8_t const*>(proto.span_id().data());
46+
trace::TraceId traceId(opentelemetry::nostd::span<std::uint8_t const, 16>(rawTraceId, 16));
47+
trace::SpanId spanId(opentelemetry::nostd::span<std::uint8_t const, 8>(rawSpanId, 8));
48+
// Default to not-sampled (0x00) per W3C Trace Context spec when
49+
// the trace_flags field is absent.
50+
trace::TraceFlags flags(
51+
proto.has_trace_flags() ? static_cast<std::uint8_t>(proto.trace_flags())
52+
: static_cast<std::uint8_t>(0));
53+
54+
trace::SpanContext spanCtx(traceId, spanId, flags, /* remote = */ true);
55+
56+
return opentelemetry::context::Context{}.SetValue(
57+
trace::kSpanKey,
58+
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
59+
}
60+
61+
/** Inject the current span's trace context into a protobuf TraceContext.
62+
63+
@param ctx The OTel context containing the span to propagate.
64+
@param proto The protobuf TraceContext to populate.
65+
*/
66+
inline void
67+
injectToProtobuf(opentelemetry::context::Context const& ctx, protocol::TraceContext& proto)
68+
{
69+
namespace trace = opentelemetry::trace;
70+
71+
auto span = trace::GetSpan(ctx);
72+
if (!span)
73+
return;
74+
75+
auto const& spanCtx = span->GetContext();
76+
if (!spanCtx.IsValid())
77+
return;
78+
79+
// Serialize trace_id (16 bytes)
80+
auto const& traceId = spanCtx.trace_id();
81+
proto.set_trace_id(traceId.Id().data(), trace::TraceId::kSize);
82+
83+
// Serialize span_id (8 bytes)
84+
auto const& spanId = spanCtx.span_id();
85+
proto.set_span_id(spanId.Id().data(), trace::SpanId::kSize);
86+
87+
// Serialize flags
88+
proto.set_trace_flags(spanCtx.trace_flags().flags());
89+
}
90+
91+
} // namespace telemetry
92+
} // namespace xrpl
93+
94+
#endif // XRPL_ENABLE_TELEMETRY
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#include <gtest/gtest.h>
2+
3+
#ifdef XRPL_ENABLE_TELEMETRY
4+
5+
#include <xrpl/telemetry/TraceContextPropagator.h>
6+
7+
#include <opentelemetry/context/context.h>
8+
#include <opentelemetry/nostd/span.h>
9+
#include <opentelemetry/trace/context.h>
10+
#include <opentelemetry/trace/default_span.h>
11+
#include <opentelemetry/trace/span_context.h>
12+
#include <opentelemetry/trace/trace_flags.h>
13+
#include <opentelemetry/trace/trace_id.h>
14+
15+
#include <cstring>
16+
17+
namespace trace = opentelemetry::trace;
18+
19+
TEST(TraceContextPropagator, round_trip)
20+
{
21+
std::uint8_t traceIdBuf[16] = {
22+
0x01,
23+
0x02,
24+
0x03,
25+
0x04,
26+
0x05,
27+
0x06,
28+
0x07,
29+
0x08,
30+
0x09,
31+
0x0a,
32+
0x0b,
33+
0x0c,
34+
0x0d,
35+
0x0e,
36+
0x0f,
37+
0x10};
38+
std::uint8_t spanIdBuf[8] = {0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x11, 0x22};
39+
40+
trace::TraceId traceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16));
41+
trace::SpanId spanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8));
42+
trace::TraceFlags flags(trace::TraceFlags::kIsSampled);
43+
trace::SpanContext spanCtx(traceId, spanId, flags, true);
44+
45+
auto ctx = opentelemetry::context::Context{}.SetValue(
46+
trace::kSpanKey,
47+
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
48+
49+
protocol::TraceContext proto;
50+
xrpl::telemetry::injectToProtobuf(ctx, proto);
51+
52+
EXPECT_TRUE(proto.has_trace_id());
53+
EXPECT_EQ(proto.trace_id().size(), 16u);
54+
EXPECT_TRUE(proto.has_span_id());
55+
EXPECT_EQ(proto.span_id().size(), 8u);
56+
EXPECT_EQ(proto.trace_flags(), static_cast<uint32_t>(trace::TraceFlags::kIsSampled));
57+
EXPECT_EQ(std::memcmp(proto.trace_id().data(), traceIdBuf, 16), 0);
58+
EXPECT_EQ(std::memcmp(proto.span_id().data(), spanIdBuf, 8), 0);
59+
60+
auto extractedCtx = xrpl::telemetry::extractFromProtobuf(proto);
61+
auto extractedSpan = trace::GetSpan(extractedCtx);
62+
ASSERT_NE(extractedSpan, nullptr);
63+
64+
auto const& extracted = extractedSpan->GetContext();
65+
EXPECT_TRUE(extracted.IsValid());
66+
EXPECT_TRUE(extracted.IsRemote());
67+
EXPECT_EQ(extracted.trace_id(), traceId);
68+
EXPECT_EQ(extracted.span_id(), spanId);
69+
EXPECT_TRUE(extracted.trace_flags().IsSampled());
70+
}
71+
72+
TEST(TraceContextPropagator, extract_empty_protobuf)
73+
{
74+
protocol::TraceContext proto;
75+
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
76+
auto span = trace::GetSpan(ctx);
77+
if (span)
78+
{
79+
EXPECT_FALSE(span->GetContext().IsValid());
80+
}
81+
}
82+
83+
TEST(TraceContextPropagator, extract_wrong_size_trace_id)
84+
{
85+
protocol::TraceContext proto;
86+
proto.set_trace_id(std::string(8, '\x01'));
87+
proto.set_span_id(std::string(8, '\xaa'));
88+
89+
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
90+
auto span = trace::GetSpan(ctx);
91+
if (span)
92+
{
93+
EXPECT_FALSE(span->GetContext().IsValid());
94+
}
95+
}
96+
97+
TEST(TraceContextPropagator, extract_wrong_size_span_id)
98+
{
99+
protocol::TraceContext proto;
100+
proto.set_trace_id(std::string(16, '\x01'));
101+
proto.set_span_id(std::string(4, '\xaa'));
102+
103+
auto ctx = xrpl::telemetry::extractFromProtobuf(proto);
104+
auto span = trace::GetSpan(ctx);
105+
if (span)
106+
{
107+
EXPECT_FALSE(span->GetContext().IsValid());
108+
}
109+
}
110+
111+
TEST(TraceContextPropagator, inject_invalid_span)
112+
{
113+
auto ctx = opentelemetry::context::Context{};
114+
protocol::TraceContext proto;
115+
xrpl::telemetry::injectToProtobuf(ctx, proto);
116+
117+
EXPECT_FALSE(proto.has_trace_id());
118+
EXPECT_FALSE(proto.has_span_id());
119+
}
120+
121+
TEST(TraceContextPropagator, flags_preservation)
122+
{
123+
std::uint8_t traceIdBuf[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
124+
std::uint8_t spanIdBuf[8] = {1, 2, 3, 4, 5, 6, 7, 8};
125+
126+
// Test with flags NOT sampled (flags = 0)
127+
trace::TraceFlags flags(0);
128+
trace::SpanContext spanCtx(
129+
trace::TraceId(opentelemetry::nostd::span<uint8_t const, 16>(traceIdBuf, 16)),
130+
trace::SpanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBuf, 8)),
131+
flags,
132+
true);
133+
134+
auto ctx = opentelemetry::context::Context{}.SetValue(
135+
trace::kSpanKey,
136+
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(spanCtx)));
137+
138+
protocol::TraceContext proto;
139+
xrpl::telemetry::injectToProtobuf(ctx, proto);
140+
EXPECT_EQ(proto.trace_flags(), 0u);
141+
142+
auto extracted = xrpl::telemetry::extractFromProtobuf(proto);
143+
auto span = trace::GetSpan(extracted);
144+
ASSERT_NE(span, nullptr);
145+
EXPECT_FALSE(span->GetContext().trace_flags().IsSampled());
146+
}
147+
148+
#else // XRPL_ENABLE_TELEMETRY not defined
149+
150+
TEST(TraceContextPropagator, compiles_without_telemetry)
151+
{
152+
SUCCEED();
153+
}
154+
155+
#endif // XRPL_ENABLE_TELEMETRY

src/xrpld/app/misc/NetworkOPs.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <xrpld/rpc/DeliveredAmount.h>
3030
#include <xrpld/rpc/MPTokenIssuanceID.h>
3131
#include <xrpld/rpc/ServerHandler.h>
32+
#include <xrpld/telemetry/TracingInstrumentation.h>
3233

3334
#include <xrpl/basics/UptimeClock.h>
3435
#include <xrpl/basics/mulDiv.h>
@@ -1223,16 +1224,26 @@ NetworkOPsImp::processTransaction(
12231224
bool bLocal,
12241225
FailHard failType)
12251226
{
1227+
XRPL_TRACE_TX(registry_.getTelemetry(), "tx.process");
1228+
XRPL_TRACE_SET_ATTR("xrpl.tx.hash", to_string(transaction->getID()).c_str());
1229+
XRPL_TRACE_SET_ATTR("xrpl.tx.local", bLocal);
1230+
12261231
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
12271232

12281233
// preProcessTransaction can change our pointer
12291234
if (!preProcessTransaction(transaction))
12301235
return;
12311236

12321237
if (bLocal)
1238+
{
1239+
XRPL_TRACE_SET_ATTR("xrpl.tx.path", "sync");
12331240
doTransactionSync(transaction, bUnlimited, failType);
1241+
}
12341242
else
1243+
{
1244+
XRPL_TRACE_SET_ATTR("xrpl.tx.path", "async");
12351245
doTransactionAsync(transaction, bUnlimited, failType);
1246+
}
12361247
}
12371248

12381249
void

src/xrpld/overlay/detail/PeerImp.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <xrpld/overlay/Cluster.h>
99
#include <xrpld/overlay/detail/PeerImp.h>
1010
#include <xrpld/overlay/detail/Tuning.h>
11+
#include <xrpld/telemetry/TracingInstrumentation.h>
1112

1213
#include <xrpl/basics/UptimeClock.h>
1314
#include <xrpl/basics/base64.h>
@@ -1269,6 +1270,9 @@ PeerImp::handleTransaction(
12691270
bool eraseTxQueue,
12701271
bool batch)
12711272
{
1273+
XRPL_TRACE_TX(app_.getTelemetry(), "tx.receive");
1274+
XRPL_TRACE_SET_ATTR("xrpl.peer.id", static_cast<int64_t>(id_));
1275+
12721276
XRPL_ASSERT(eraseTxQueue != batch, ("xrpl::PeerImp::handleTransaction : valid inputs"));
12731277
if (tracking_.load() == Tracking::diverged)
12741278
return;
@@ -1287,6 +1291,7 @@ PeerImp::handleTransaction(
12871291
{
12881292
auto stx = std::make_shared<STTx const>(sit);
12891293
uint256 txID = stx->getTransactionID();
1294+
XRPL_TRACE_SET_ATTR("xrpl.tx.hash", to_string(txID).c_str());
12901295

12911296
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
12921297
// LCOV_EXCL_START
@@ -1320,9 +1325,11 @@ PeerImp::handleTransaction(
13201325

13211326
if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
13221327
{
1328+
XRPL_TRACE_SET_ATTR("xrpl.tx.suppressed", true);
13231329
// we have seen this transaction recently
13241330
if (any(flags & HashRouterFlags::BAD))
13251331
{
1332+
XRPL_TRACE_SET_ATTR("xrpl.tx.status", "known_bad");
13261333
fee_.update(Resource::feeUselessData, "known bad");
13271334
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
13281335
}

0 commit comments

Comments
 (0)