Skip to content

Commit 5691d84

Browse files
committed
Implement CUDA kernel launch tracing using usdt probes
Add support for tracing CUDA kernel launches via usdt probes from the CUPTI based parcagpu library (github.com/parca-dev/parcagpu) This enables profiling of GPU workloads by capturing CPU-side stack traces at kernel launch time, with correlation IDs for matching with GPU traces. Key changes: - Add GPU interpreter for detecting CUDA libraries and attaching probes - Implement cuda eBPF programs - Add TRACE_CUDA_LAUNCH origin type for GPU-related traces - Include ParcaGPUTraceID field for correlating CPU and GPU traces - Add InstrumentCudaLaunch configuration option - Create a goroutine that saves GPU events in memory for correlation when the timing information comes in.
1 parent 89d71c9 commit 5691d84

File tree

22 files changed

+740
-58
lines changed

22 files changed

+740
-58
lines changed

host/host.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type Trace struct {
6262
OffTime int64 // Time a task was off-cpu in nanoseconds.
6363
APMTraceID libpf.APMTraceID
6464
APMTransactionID libpf.APMTransactionID
65+
ParcaGPUTraceID uint32
6566
CPU int
6667
EnvVars map[string]string
6768
CustomLabels map[string]string

interpreter/gpu/cuda.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package gpu // import "go.opentelemetry.io/ebpf-profiler/interpreter/gpu"
2+
3+
import (
4+
"runtime"
5+
6+
log "github.com/sirupsen/logrus"
7+
"go.opentelemetry.io/ebpf-profiler/interpreter"
8+
"go.opentelemetry.io/ebpf-profiler/libpf"
9+
"go.opentelemetry.io/ebpf-profiler/libpf/pfelf"
10+
"go.opentelemetry.io/ebpf-profiler/remotememory"
11+
)
12+
13+
type data struct {
14+
path string
15+
link interpreter.LinkCloser
16+
probes []pfelf.USDTProbe
17+
}
18+
19+
type instance struct {
20+
path string
21+
interpreter.InstanceStubs
22+
link interpreter.LinkCloser
23+
}
24+
25+
func Loader(ebpf interpreter.EbpfHandler, info *interpreter.LoaderInfo) (interpreter.Data, error) {
26+
ef, err := info.GetELF()
27+
if err != nil {
28+
return nil, err
29+
}
30+
// We use the existence of the .note.stapsdt section to determine if this is a
31+
// process that has libparcagpucupti.so loaded. Its cheaper and more reliable than loading
32+
// the symbol table.
33+
if sec := ef.Section(".note.stapsdt"); sec != nil {
34+
probes, err := pfelf.ParseUSDTProbes(sec)
35+
if err != nil {
36+
return nil, err
37+
}
38+
var parcagpuProbes []pfelf.USDTProbe
39+
for _, probe := range probes {
40+
if probe.Provider == "parcagpu" {
41+
parcagpuProbes = append(parcagpuProbes, probe)
42+
}
43+
}
44+
if len(parcagpuProbes) != 3 {
45+
return nil, nil
46+
}
47+
48+
// Validate probe arguments match what cuda.ebpf.c expects
49+
validateProbeArguments(parcagpuProbes, info.FileName())
50+
51+
return &data{path: info.FileName(),
52+
probes: parcagpuProbes}, nil
53+
}
54+
return nil, nil
55+
}
56+
57+
// validateProbeArguments checks that the USDT probe arguments match the expectations
58+
// in cuda.ebpf.c and logs errors if they don't match.
59+
func validateProbeArguments(probes []pfelf.USDTProbe, path string) {
60+
var expectedProbes map[string]string
61+
62+
switch runtime.GOARCH {
63+
case "amd64":
64+
expectedProbes = map[string]string{
65+
"cuda_correlation": "4@-36(%rbp)",
66+
"kernel_executed": "8@%rax 8@%rdx 8@-40(%rbp) 4@%ecx 8@%rsi",
67+
"graph_executed": "8@%rax 8@%rdx 8@-64(%rbp) 4@%ecx 4@%esi",
68+
}
69+
case "arm64":
70+
expectedProbes = map[string]string{
71+
"cuda_correlation": "4@[sp, 36]",
72+
"kernel_executed": "8@x1 8@x2 8@[sp, 112] 4@x3 8@x0",
73+
"graph_executed": "8@x1 8@x2 8@[sp, 88] 4@x3 4@x0",
74+
}
75+
default:
76+
log.Warnf("[cuda] Unknown architecture %s, cannot validate USDT probe arguments for %s",
77+
runtime.GOARCH, path)
78+
return
79+
}
80+
81+
probeMap := make(map[string]string)
82+
for _, probe := range probes {
83+
probeMap[probe.Name] = probe.Arguments
84+
}
85+
86+
for name, expectedArgs := range expectedProbes {
87+
actualArgs, ok := probeMap[name]
88+
if !ok {
89+
log.Errorf("[cuda] Missing expected USDT probe '%s' in %s", name, path)
90+
continue
91+
}
92+
if actualArgs != expectedArgs {
93+
log.Errorf("[cuda] USDT probe '%s' in %s has incorrect arguments:\n"+
94+
" Expected: %s\n"+
95+
" Actual: %s\n"+
96+
" This will cause incorrect data collection. cuda.ebpf.c needs to be updated.",
97+
name, path, expectedArgs, actualArgs)
98+
}
99+
}
100+
}
101+
102+
func (d *data) Attach(ebpf interpreter.EbpfHandler, pid libpf.PID, _ libpf.Address,
103+
_ remotememory.RemoteMemory) (ii interpreter.Instance, err error) {
104+
// Maps usdt probe name to ebpf program name.
105+
// Use the first character of the probe name as a cookie.
106+
// 'c' -> cuda_correlation
107+
// 'k' -> cuda_kernel_exec
108+
// 'g' -> cuda_graph_exec
109+
cookies := make([]uint64, len(d.probes))
110+
progNames := make([]string, len(d.probes))
111+
for i, probe := range d.probes {
112+
cookies[i] = uint64(probe.Name[0])
113+
// Map probe names to specific program names for single-shot mode
114+
switch probe.Name[0] {
115+
case 'c':
116+
progNames[i] = "usdt_parcagpu_cuda_correlation"
117+
case 'k':
118+
progNames[i] = "usdt_parcagpu_cuda_kernel"
119+
case 'g':
120+
progNames[i] = "usdt_parcagpu_cuda_graph"
121+
}
122+
}
123+
lc, err := ebpf.AttachUSDTProbes(pid, d.path, "cuda_probe", d.probes, cookies, progNames, true)
124+
if err != nil {
125+
return nil, err
126+
}
127+
log.Debugf("[cuda] parcagpu USDT probes attached for %s", d.path)
128+
d.link = lc
129+
130+
return &instance{link: lc, path: d.path}, nil
131+
}
132+
133+
// Detach does nothing, we want the probes attached as long as ANY process is using
134+
// our library.
135+
func (i *instance) Detach(_ interpreter.EbpfHandler, _ libpf.PID) error {
136+
if i.link != nil {
137+
log.Debugf("[cuda] parcagpu USDT probes closed for %s", i.path)
138+
if err := i.link.Detach(); err != nil {
139+
return err
140+
}
141+
}
142+
return nil
143+
}
144+
145+
func (d *data) Unload(ebpf interpreter.EbpfHandler) {
146+
if d.link != nil {
147+
log.Debugf("[cuda] parcagpu USDT probes closed for %s", d.path)
148+
if err := d.link.Unload(); err != nil {
149+
log.Errorf("error closing cuda usdt link: %s", err)
150+
}
151+
}
152+
}

libpf/frametype.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ const (
5555
AbortFrame FrameType = support.FrameMarkerAbort
5656
// LuaJITFrame identifies the LuaJIT interpreter frames.
5757
LuaJITFrame FrameType = support.FrameMarkerLuaJIT
58+
// CUDAKernelFrame identifies CUDA kernel frames.
59+
CUDAKernelFrame FrameType = support.FrameMarkerCUDAKernel
5860
)
5961

6062
const (

libpf/interpretertype.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ const (
3535
LuaJIT InterpreterType = support.FrameMarkerLuaJIT
3636
// Go identifies Go code.
3737
Go InterpreterType = support.FrameMarkerGo
38+
// CUDA interpreter type for CUDA kernels.
39+
CUDA InterpreterType = support.FrameMarkerCUDAKernel
3840
)
3941

4042
// Pseudo-interpreters without a corresponding frame type.

parcagpu/parcagpu.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package parcagpu // import "go.opentelemetry.io/ebpf-profiler/parcagpu"
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
"unsafe"
10+
11+
"github.com/cilium/ebpf"
12+
"github.com/cilium/ebpf/perf"
13+
log "github.com/sirupsen/logrus"
14+
15+
"go.opentelemetry.io/ebpf-profiler/host"
16+
"go.opentelemetry.io/ebpf-profiler/libpf"
17+
"go.opentelemetry.io/ebpf-profiler/support"
18+
)
19+
20+
type mapKey struct {
21+
pid uint32
22+
id uint32
23+
}
24+
25+
type gpuTraceFixer struct {
26+
mu sync.Mutex
27+
timesAwaitingTraces map[mapKey]kernelTimingEvent
28+
tracesAwaitingTimes map[mapKey]*host.Trace
29+
}
30+
31+
func (p *gpuTraceFixer) addTrace(trace *host.Trace) *host.Trace {
32+
p.mu.Lock()
33+
defer p.mu.Unlock()
34+
key := mapKey{uint32(trace.PID), trace.ParcaGPUTraceID}
35+
ev, ok := p.timesAwaitingTraces[key]
36+
if ok {
37+
delete(p.timesAwaitingTraces, key)
38+
trace.OffTime = int64(ev.end - ev.start)
39+
prepTrace(trace, &ev)
40+
return trace
41+
}
42+
p.tracesAwaitingTimes[key] = trace
43+
return nil
44+
}
45+
46+
func (p *gpuTraceFixer) addTime(key mapKey, ev *kernelTimingEvent) *host.Trace {
47+
p.mu.Lock()
48+
defer p.mu.Unlock()
49+
trace, ok := p.tracesAwaitingTimes[key]
50+
if ok {
51+
delete(p.tracesAwaitingTimes, key)
52+
prepTrace(trace, ev)
53+
return trace
54+
}
55+
p.timesAwaitingTraces[key] = *ev
56+
return nil
57+
}
58+
59+
// uprobes aren't perfect and we may miss matching timing to trace at attach boundary
60+
// so clear them if they get too big.
61+
func (p *gpuTraceFixer) clear() {
62+
p.mu.Lock()
63+
defer p.mu.Unlock()
64+
if len(p.timesAwaitingTraces) > 100 || len(p.tracesAwaitingTimes) > 100 {
65+
log.Warnf("[parcagpu] clearing gpu trace fixer maps, too many entries: %d traces, %d times",
66+
len(p.tracesAwaitingTimes), len(p.timesAwaitingTraces))
67+
p.timesAwaitingTraces = map[mapKey]kernelTimingEvent{}
68+
p.tracesAwaitingTimes = map[mapKey]*host.Trace{}
69+
}
70+
}
71+
72+
// TODO: have cgo generate this
73+
type kernelTimingEvent struct {
74+
pid uint32
75+
id uint32
76+
start, end uint64
77+
dev, stream, graph uint32
78+
kernelName [128]byte
79+
}
80+
81+
func prepTrace(tr *host.Trace, ev *kernelTimingEvent) {
82+
tr.OffTime = int64(ev.end - ev.start)
83+
if tr.CustomLabels == nil {
84+
tr.CustomLabels = make(map[string]string)
85+
}
86+
87+
tr.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.dev), 10)
88+
if ev.stream != 0 {
89+
tr.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.stream), 10)
90+
}
91+
if ev.graph != 0 {
92+
tr.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.graph), 10)
93+
}
94+
if len(ev.kernelName) > 0 {
95+
tr.CustomLabels["_temp_cuda_kernel"] = string(ev.kernelName[:])
96+
// ConvertTrace will add a pseudo-frame for the kernel.
97+
tr.Frames = append([]host.Frame{{
98+
Type: libpf.CUDAKernelFrame,
99+
}}, tr.Frames...)
100+
}
101+
}
102+
103+
// Start starts two goroutines that filter traces coming from ebpf and match them up with timing
104+
// information coming from the parcagpuKernelExecuted uprobe.
105+
func Start(ctx context.Context, traceInCh <-chan *host.Trace,
106+
gpuTimingEvents *ebpf.Map) chan *host.Trace {
107+
fixer := &gpuTraceFixer{
108+
timesAwaitingTraces: map[mapKey]kernelTimingEvent{},
109+
tracesAwaitingTimes: map[mapKey]*host.Trace{},
110+
}
111+
traceOutChan := make(chan *host.Trace, 1024)
112+
113+
// Read traces coming from ebpf and send normal traces through
114+
go func() {
115+
timer := time.NewTicker(60 * time.Second)
116+
117+
for {
118+
select {
119+
case <-timer.C:
120+
// We don't want to leak memory, so we purge the readers map every 60 seconds.
121+
fixer.clear()
122+
case <-ctx.Done():
123+
return
124+
case t := <-traceInCh:
125+
if t != nil && t.Origin == support.TraceOriginCuda {
126+
log.Debugf("[cuda]: got trace with id 0x%x for cuda from pid: %d",
127+
t.ParcaGPUTraceID, t.PID)
128+
if tr := fixer.addTrace(t); tr != nil {
129+
log.Debugf("[cuda]: trace completed with trace: 0x%x", tr.ParcaGPUTraceID)
130+
traceOutChan <- tr
131+
}
132+
} else {
133+
traceOutChan <- t
134+
}
135+
}
136+
}
137+
}()
138+
139+
eventReader, err := perf.NewReader(gpuTimingEvents, 1024 /* perCPUBufferSize */)
140+
if err != nil {
141+
log.Fatalf("Failed to setup perf reporting via %s: %v", gpuTimingEvents, err)
142+
}
143+
144+
var lostEventsCount, readErrorCount, noDataCount atomic.Uint64
145+
go func() {
146+
var data perf.Record
147+
for {
148+
select {
149+
case <-ctx.Done():
150+
return
151+
default:
152+
if err := eventReader.ReadInto(&data); err != nil {
153+
readErrorCount.Add(1)
154+
continue
155+
}
156+
if data.LostSamples != 0 {
157+
lostEventsCount.Add(data.LostSamples)
158+
continue
159+
}
160+
if len(data.RawSample) == 0 {
161+
noDataCount.Add(1)
162+
continue
163+
}
164+
ev := (*kernelTimingEvent)(unsafe.Pointer(&data.RawSample[0]))
165+
log.Debugf("[cuda]: timing info with id 0x%x for cuda from %d", ev.id, ev.pid)
166+
if tr := fixer.addTime(mapKey{ev.pid, ev.id}, ev); tr != nil {
167+
log.Debugf("[cuda]: trace completed with event: 0x%x", tr.ParcaGPUTraceID)
168+
traceOutChan <- tr
169+
}
170+
}
171+
}
172+
}()
173+
174+
return traceOutChan
175+
}

processmanager/execinfomanager/manager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"go.opentelemetry.io/ebpf-profiler/interpreter/dotnet"
2020
golang "go.opentelemetry.io/ebpf-profiler/interpreter/go"
2121
"go.opentelemetry.io/ebpf-profiler/interpreter/golabels"
22+
"go.opentelemetry.io/ebpf-profiler/interpreter/gpu"
2223
"go.opentelemetry.io/ebpf-profiler/interpreter/hotspot"
2324
"go.opentelemetry.io/ebpf-profiler/interpreter/luajit"
2425
"go.opentelemetry.io/ebpf-profiler/interpreter/nodev8"
@@ -136,7 +137,6 @@ func NewExecutableInfoManager(
136137
if includeTracers.Has(types.LuaJITTracer) {
137138
interpreterLoaders = append(interpreterLoaders, luajit.Loader)
138139
}
139-
140140
interpreterLoaders = append(interpreterLoaders, apmint.Loader)
141141
if includeTracers.Has(types.Labels) {
142142
interpreterLoaders = append(interpreterLoaders, golabels.Loader)
@@ -146,6 +146,10 @@ func NewExecutableInfoManager(
146146
}
147147
interpreterLoaders = append(interpreterLoaders, oomwatcher.Loader, rtld.Loader)
148148

149+
if includeTracers.Has(types.CUDATracer) {
150+
interpreterLoaders = append(interpreterLoaders, gpu.Loader)
151+
}
152+
149153
deferredFileIDs, err := lru.NewSynced[host.FileID, libpf.Void](deferredFileIDSize,
150154
func(id host.FileID) uint32 { return uint32(id) })
151155
if err != nil {

processmanager/manager.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,17 @@ func (pm *ProcessManager) ConvertTrace(trace *host.Trace) (newTrace *libpf.Trace
286286

287287
newTrace.AppendFrameFull(frame.Type, fileID,
288288
relativeRIP, mappingStart, mappingEnd, fileOffset)
289+
case libpf.CUDA:
290+
name := trace.CustomLabels["_temp_cuda_kernel"]
291+
if name == "" {
292+
panic("cuda_kernel frame without _temp_cuda_kernel label")
293+
}
294+
// remove the label
295+
delete(trace.CustomLabels, "_temp_cuda_kernel")
296+
newTrace.Frames.Append(&libpf.Frame{
297+
Type: frame.Type,
298+
FunctionName: libpf.Intern(name),
299+
})
289300
default:
290301
err := pm.symbolizeFrame(i, trace, &newTrace.Frames)
291302
if err != nil {

0 commit comments

Comments
 (0)