Skip to content

Commit 138f7f5

Browse files
authored
[cmd/telemetrygen] Add span links support to telemetrygen (open-telemetry#43007)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR adds span links support to telemetrygen, enabling the generation of spans with links to previously created spans. The new `--span-links` flag allows users to specify the number of span links to generate for each span, creating relationships between spans for testing distributed tracing scenarios. Key features: - New `--span-links` command line flag to control the number of links per span - Links are created to random existing span contexts from previously generated spans - Each link includes attributes for link type and index identification - Both parent and child spans can have links generated - Thread-safe collection and access of span contexts for linking <!--Describe what testing was performed and which tests were added.--> #### Testing - Added unit tests for span link generation functionality - Tested with various `--span-links` values (0, 1, 5, 10) - Verified links are properly attached to both parent and child spans - Confirmed thread-safe operation with multiple workers - Validated link attributes are correctly set <!--Describe the documentation added.--> #### Documentation - Updated command line help text for the new `--span-links` flag - Added configuration documentation for the NumSpanLinks field <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 38e9bd6 commit 138f7f5

File tree

5 files changed

+243
-3
lines changed

5 files changed

+243
-3
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: telemetrygen
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add span links support to telemetrygen
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43007]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The new --span-links flag allows generating spans with links to previously created spans.
20+
Each span can link to random existing span contexts, creating relationships between spans for testing
21+
distributed tracing scenarios. Links include attributes for link type and index identification.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

cmd/telemetrygen/pkg/traces/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Config struct {
2121
PropagateContext bool
2222
StatusCode string
2323
Batch bool
24+
NumSpanLinks int
2425

2526
SpanDuration time.Duration
2627
}
@@ -42,6 +43,7 @@ func (c *Config) Flags(fs *pflag.FlagSet) {
4243
fs.BoolVar(&c.PropagateContext, "marshal", c.PropagateContext, "Whether to marshal trace context via HTTP headers")
4344
fs.StringVar(&c.StatusCode, "status-code", c.StatusCode, "Status code to use for the spans, one of (Unset, Error, Ok) or the equivalent integer (0,1,2)")
4445
fs.BoolVar(&c.Batch, "batch", c.Batch, "Whether to batch traces")
46+
fs.IntVar(&c.NumSpanLinks, "span-links", c.NumSpanLinks, "Number of span links to generate for each span")
4547
fs.DurationVar(&c.SpanDuration, "span-duration", c.SpanDuration, "The duration of each generated span.")
4648
}
4749

@@ -57,6 +59,7 @@ func (c *Config) SetDefaults() {
5759
c.PropagateContext = false
5860
c.StatusCode = "0"
5961
c.Batch = true
62+
c.NumSpanLinks = 0
6063
c.SpanDuration = 123 * time.Microsecond
6164
}
6265

cmd/telemetrygen/pkg/traces/traces.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"go.opentelemetry.io/otel/sdk/resource"
2222
sdktrace "go.opentelemetry.io/otel/sdk/trace"
2323
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
24+
"go.opentelemetry.io/otel/trace"
2425
"go.uber.org/zap"
2526
"golang.org/x/time/rate"
2627

@@ -153,6 +154,8 @@ func run(c *Config, logger *zap.Logger) error {
153154
loadSize: c.LoadSize,
154155
spanDuration: c.SpanDuration,
155156
allowFailures: c.AllowExportFailures,
157+
numSpanLinks: c.NumSpanLinks,
158+
spanContexts: make([]trace.SpanContext, 0),
156159
}
157160

158161
go w.simulateTraces(telemetryAttributes)

cmd/telemetrygen/pkg/traces/traces_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@ package traces
55

66
import (
77
"encoding/pem"
8+
"fmt"
89
"net/http"
910
"net/http/httptest"
1011
"net/url"
1112
"os"
1213
"path/filepath"
14+
"sync"
1315
"testing"
1416
"time"
1517

1618
"github.com/stretchr/testify/assert"
1719
"github.com/stretchr/testify/require"
1820
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
21+
"go.opentelemetry.io/otel/trace"
1922
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
2023

2124
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
@@ -227,6 +230,18 @@ func TestConfigValidation(t *testing.T) {
227230
expectError: true,
228231
description: "Config with negative traces should be invalid",
229232
},
233+
{
234+
name: "Valid config with span links",
235+
config: Config{
236+
Config: common.Config{
237+
WorkerCount: 1,
238+
},
239+
NumTraces: 5,
240+
NumSpanLinks: 2,
241+
},
242+
expectError: false,
243+
description: "Config with span links should be valid",
244+
},
230245
}
231246

232247
for _, tt := range tests {
@@ -411,3 +426,126 @@ func TestHTTPExporterOptions_HTTP(t *testing.T) {
411426
})
412427
}
413428
}
429+
430+
// TestSpanLinksGeneration tests the span links generation functionality
431+
func TestSpanLinksGeneration(t *testing.T) {
432+
tests := []struct {
433+
name string
434+
numSpanLinks int
435+
existingContexts int
436+
expectedLinkCount int
437+
description string
438+
}{
439+
{
440+
name: "No span links",
441+
numSpanLinks: 0,
442+
existingContexts: 5,
443+
expectedLinkCount: 0,
444+
description: "Should generate no links when numSpanLinks is 0",
445+
},
446+
{
447+
name: "With existing contexts",
448+
numSpanLinks: 3,
449+
existingContexts: 5,
450+
expectedLinkCount: 3,
451+
description: "Should generate links to random existing contexts",
452+
},
453+
{
454+
name: "No existing contexts",
455+
numSpanLinks: 3,
456+
existingContexts: 0,
457+
expectedLinkCount: 0,
458+
description: "Should generate no links when no existing contexts",
459+
},
460+
{
461+
name: "Fewer contexts than requested links",
462+
numSpanLinks: 5,
463+
existingContexts: 3,
464+
expectedLinkCount: 5,
465+
description: "Should generate requested number of links (allows duplicates)",
466+
},
467+
}
468+
469+
for _, tt := range tests {
470+
t.Run(tt.name, func(t *testing.T) {
471+
w := &worker{
472+
numSpanLinks: tt.numSpanLinks,
473+
spanContexts: make([]trace.SpanContext, 0),
474+
spanContextsMu: sync.RWMutex{},
475+
}
476+
477+
// Add existing contexts for testing
478+
for i := 0; i < tt.existingContexts; i++ {
479+
traceID, _ := trace.TraceIDFromHex(fmt.Sprintf("%032d", i))
480+
spanID, _ := trace.SpanIDFromHex(fmt.Sprintf("%016d", i))
481+
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
482+
TraceID: traceID,
483+
SpanID: spanID,
484+
TraceFlags: trace.FlagsSampled,
485+
})
486+
w.addSpanContext(spanCtx)
487+
}
488+
489+
links := w.generateSpanLinks()
490+
491+
assert.Len(t, links, tt.expectedLinkCount, tt.description)
492+
493+
// Verify all links have random type and correct index
494+
for i, link := range links {
495+
// Verify link.type attribute is 'random'
496+
found := false
497+
for _, attr := range link.Attributes {
498+
if attr.Key == "link.type" && attr.Value.AsString() == "random" {
499+
found = true
500+
break
501+
}
502+
}
503+
assert.True(t, found, "Link should have 'link.type=random' attribute")
504+
505+
// Verify link.index attribute is present
506+
foundIndex := false
507+
for _, attr := range link.Attributes {
508+
if attr.Key == "link.index" && attr.Value.AsInt64() == int64(i) {
509+
foundIndex = true
510+
break
511+
}
512+
}
513+
assert.True(t, foundIndex, "Link should have correct 'link.index' attribute")
514+
}
515+
})
516+
}
517+
}
518+
519+
// TestDefaultSpanLinksConfiguration tests that the default span links configuration is correct
520+
func TestDefaultSpanLinksConfiguration(t *testing.T) {
521+
cfg := NewConfig()
522+
523+
assert.Equal(t, 0, cfg.NumSpanLinks, "Default NumSpanLinks should be 0")
524+
}
525+
526+
func TestSpanContextsBufferLimit(t *testing.T) {
527+
w := &worker{
528+
numSpanLinks: 2,
529+
spanContexts: make([]trace.SpanContext, 0),
530+
spanContextsMu: sync.RWMutex{},
531+
}
532+
533+
// Add more span contexts than the buffer limit
534+
for i := 0; i < 1200; i++ {
535+
traceID, _ := trace.TraceIDFromHex(fmt.Sprintf("%032d", i))
536+
spanID, _ := trace.SpanIDFromHex(fmt.Sprintf("%016d", i))
537+
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
538+
TraceID: traceID,
539+
SpanID: spanID,
540+
TraceFlags: trace.FlagsSampled,
541+
})
542+
w.addSpanContext(spanCtx)
543+
}
544+
545+
// Verify the buffer doesn't exceed the maximum size
546+
assert.LessOrEqual(t, len(w.spanContexts), 1000, "Span contexts buffer should not exceed maximum size")
547+
548+
// Verify we can still generate links with the buffered contexts
549+
links := w.generateSpanLinks()
550+
assert.Len(t, links, 2, "Should generate correct number of links even with buffer limit")
551+
}

cmd/telemetrygen/pkg/traces/worker.go

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package traces // import "github.com/open-telemetry/opentelemetry-collector-cont
66
import (
77
"context"
88
"fmt"
9+
"math/rand/v2"
910
"strconv"
1011
"sync"
1112
"sync/atomic"
@@ -35,15 +36,66 @@ type worker struct {
3536
wg *sync.WaitGroup // notify when done
3637
loadSize int // desired minimum size in MB of string data for each generated trace
3738
spanDuration time.Duration // duration of generated spans
39+
numSpanLinks int // number of span links to generate per span
3840
logger *zap.Logger
39-
allowFailures bool // whether to continue on export failures
41+
allowFailures bool // whether to continue on export failures
42+
spanContexts []trace.SpanContext // collection of span contexts for linking
43+
spanContextsMu sync.RWMutex // mutex for spanContexts slice
4044
}
4145

4246
const (
43-
fakeIP string = "1.2.3.4"
47+
fakeIP string = "1.2.3.4"
48+
maxSpanContextsBuffer int = 1000 // Maximum number of span contexts to keep for linking
4449
)
4550

46-
func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
51+
// addSpanContext safely adds a span context to the worker's collection
52+
// Maintains a circular buffer to prevent unbounded memory growth
53+
func (w *worker) addSpanContext(spanCtx trace.SpanContext) {
54+
w.spanContextsMu.Lock()
55+
defer w.spanContextsMu.Unlock()
56+
57+
w.spanContexts = append(w.spanContexts, spanCtx)
58+
59+
// Keep only the most recent span contexts to prevent memory growth
60+
if len(w.spanContexts) > maxSpanContextsBuffer {
61+
w.spanContexts = w.spanContexts[1 : maxSpanContextsBuffer+1]
62+
}
63+
}
64+
65+
// generateSpanLinks creates span links to random existing span contexts
66+
func (w *worker) generateSpanLinks() []trace.Link {
67+
if w.numSpanLinks <= 0 {
68+
return nil
69+
}
70+
71+
w.spanContextsMu.RLock()
72+
defer w.spanContextsMu.RUnlock()
73+
74+
availableContexts := len(w.spanContexts)
75+
if availableContexts == 0 {
76+
return nil
77+
}
78+
79+
links := make([]trace.Link, 0, w.numSpanLinks)
80+
81+
// Generate links to random existing span contexts
82+
for i := 0; i < w.numSpanLinks; i++ {
83+
randomIndex := rand.IntN(availableContexts)
84+
spanCtx := w.spanContexts[randomIndex]
85+
86+
links = append(links, trace.Link{
87+
SpanContext: spanCtx,
88+
Attributes: []attribute.KeyValue{
89+
attribute.String("link.type", "random"),
90+
attribute.Int("link.index", i),
91+
},
92+
})
93+
}
94+
95+
return links
96+
}
97+
98+
func (w *worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
4799
tracer := otel.Tracer("telemetrygen")
48100
limiter := rate.NewLimiter(w.limitPerSecond, 1)
49101
var i int
@@ -56,18 +108,25 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
56108
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
57109
}
58110

111+
// Generate span links for the parent span
112+
parentLinks := w.generateSpanLinks()
113+
59114
ctx, sp := tracer.Start(context.Background(), "lets-go", trace.WithAttributes(
60115
semconv.NetworkPeerAddress(fakeIP),
61116
semconv.PeerService("telemetrygen-server"),
62117
),
63118
trace.WithSpanKind(trace.SpanKindClient),
64119
trace.WithTimestamp(spanStart),
120+
trace.WithLinks(parentLinks...),
65121
)
66122
sp.SetAttributes(telemetryAttributes...)
67123
for j := 0; j < w.loadSize; j++ {
68124
sp.SetAttributes(common.CreateLoadAttribute(fmt.Sprintf("load-%v", j), 1))
69125
}
70126

127+
// Store the parent span context for potential future linking
128+
w.addSpanContext(sp.SpanContext())
129+
71130
childCtx := ctx
72131
if w.propagateContext {
73132
header := propagation.HeaderCarrier{}
@@ -84,15 +143,22 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
84143
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
85144
}
86145

146+
// Generate span links for child spans
147+
childLinks := w.generateSpanLinks()
148+
87149
_, child := tracer.Start(childCtx, "okey-dokey-"+strconv.Itoa(j), trace.WithAttributes(
88150
semconv.NetworkPeerAddress(fakeIP),
89151
semconv.PeerService("telemetrygen-client"),
90152
),
91153
trace.WithSpanKind(trace.SpanKindServer),
92154
trace.WithTimestamp(spanStart),
155+
trace.WithLinks(childLinks...),
93156
)
94157
child.SetAttributes(telemetryAttributes...)
95158

159+
// Store the child span context for potential future linking
160+
w.addSpanContext(child.SpanContext())
161+
96162
endTimestamp = trace.WithTimestamp(spanEnd)
97163
child.SetStatus(w.statusCode, "")
98164
child.End(endTimestamp)

0 commit comments

Comments
 (0)