Skip to content

Commit a1c6489

Browse files
authored
Merge pull request #157 from parca-dev/cuda-tpr
Implement CUDA kernel launch tracing using usdt probes
2 parents 89d71c9 + bc34ad4 commit a1c6489

File tree

22 files changed

+747
-58
lines changed

22 files changed

+747
-58
lines changed

host/host.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type Trace struct {
6262
OffTime int64 // Time a task was off-cpu in nanoseconds.
6363
APMTraceID libpf.APMTraceID
6464
APMTransactionID libpf.APMTransactionID
65+
ParcaGPUTraceID uint32
6566
CPU int
6667
EnvVars map[string]string
6768
CustomLabels map[string]string

interpreter/gpu/cuda.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package gpu // import "go.opentelemetry.io/ebpf-profiler/interpreter/gpu"
2+
3+
import (
4+
"fmt"
5+
"runtime"
6+
7+
log "github.com/sirupsen/logrus"
8+
"go.opentelemetry.io/ebpf-profiler/interpreter"
9+
"go.opentelemetry.io/ebpf-profiler/libpf"
10+
"go.opentelemetry.io/ebpf-profiler/libpf/pfelf"
11+
"go.opentelemetry.io/ebpf-profiler/remotememory"
12+
)
13+
14+
type data struct {
15+
path string
16+
link interpreter.LinkCloser
17+
probes []pfelf.USDTProbe
18+
}
19+
20+
type instance struct {
21+
path string
22+
interpreter.InstanceStubs
23+
link interpreter.LinkCloser
24+
}
25+
26+
func Loader(ebpf interpreter.EbpfHandler, info *interpreter.LoaderInfo) (interpreter.Data, error) {
27+
ef, err := info.GetELF()
28+
if err != nil {
29+
return nil, err
30+
}
31+
// We use the existence of the .note.stapsdt section to determine if this is a
32+
// process that has libparcagpucupti.so loaded. Its cheaper and more reliable than loading
33+
// the symbol table.
34+
if sec := ef.Section(".note.stapsdt"); sec != nil {
35+
probes, err := pfelf.ParseUSDTProbes(sec)
36+
if err != nil {
37+
return nil, err
38+
}
39+
var parcagpuProbes []pfelf.USDTProbe
40+
for _, probe := range probes {
41+
if probe.Provider == "parcagpu" {
42+
parcagpuProbes = append(parcagpuProbes, probe)
43+
}
44+
}
45+
if len(parcagpuProbes) != 3 {
46+
return nil, nil
47+
}
48+
49+
// Validate probe arguments match what cuda.ebpf.c expects
50+
if err := validateProbeArguments(parcagpuProbes, info.FileName()); err != nil {
51+
return nil, err
52+
}
53+
54+
return &data{path: info.FileName(),
55+
probes: parcagpuProbes}, nil
56+
}
57+
return nil, nil
58+
}
59+
60+
// validateProbeArguments checks that the USDT probe arguments match the expectations
61+
// in cuda.ebpf.c and returns an error if they don't match.
62+
func validateProbeArguments(probes []pfelf.USDTProbe, path string) error {
63+
var expectedProbes map[string]string
64+
65+
switch runtime.GOARCH {
66+
case "amd64":
67+
expectedProbes = map[string]string{
68+
"cuda_correlation": "4@-36(%rbp)",
69+
"kernel_executed": "8@%rax 8@%rdx 8@-40(%rbp) 4@%ecx 8@%rsi",
70+
"graph_executed": "8@%rax 8@%rdx 8@-64(%rbp) 4@%ecx 4@%esi",
71+
}
72+
case "arm64":
73+
expectedProbes = map[string]string{
74+
"cuda_correlation": "4@[sp, 36]",
75+
"kernel_executed": "8@x1 8@x2 8@[sp, 112] 4@x3 8@x0",
76+
"graph_executed": "8@x1 8@x2 8@[sp, 88] 4@x3 4@x0",
77+
}
78+
default:
79+
return fmt.Errorf("unknown architecture %s, cannot validate USDT probe arguments for %s",
80+
runtime.GOARCH, path)
81+
}
82+
83+
probeMap := make(map[string]string)
84+
for _, probe := range probes {
85+
probeMap[probe.Name] = probe.Arguments
86+
}
87+
88+
for name, expectedArgs := range expectedProbes {
89+
actualArgs, ok := probeMap[name]
90+
if !ok {
91+
return fmt.Errorf("missing expected USDT probe '%s' in %s", name, path)
92+
}
93+
if actualArgs != expectedArgs {
94+
return fmt.Errorf("USDT probe '%s' in %s has incorrect arguments: "+
95+
"expected: %s"+
96+
"actual: %s",
97+
name, path, expectedArgs, actualArgs)
98+
}
99+
}
100+
return nil
101+
}
102+
103+
func (d *data) Attach(ebpf interpreter.EbpfHandler, pid libpf.PID, _ libpf.Address,
104+
_ remotememory.RemoteMemory) (ii interpreter.Instance, err error) {
105+
// Maps usdt probe name to ebpf program name.
106+
// Use the first character of the probe name as a cookie.
107+
// 'c' -> cuda_correlation
108+
// 'k' -> cuda_kernel_exec
109+
// 'g' -> cuda_graph_exec
110+
cookies := make([]uint64, len(d.probes))
111+
progNames := make([]string, len(d.probes))
112+
for i, probe := range d.probes {
113+
cookies[i] = uint64(probe.Name[0])
114+
// Map probe names to specific program names for single-shot mode
115+
switch probe.Name[0] {
116+
case 'c':
117+
progNames[i] = "usdt_parcagpu_cuda_correlation"
118+
case 'k':
119+
progNames[i] = "usdt_parcagpu_cuda_kernel"
120+
case 'g':
121+
progNames[i] = "usdt_parcagpu_cuda_graph"
122+
}
123+
}
124+
lc, err := ebpf.AttachUSDTProbes(pid, d.path, "cuda_probe", d.probes, cookies, progNames, true)
125+
if err != nil {
126+
return nil, err
127+
}
128+
log.Debugf("[cuda] parcagpu USDT probes attached for %s", d.path)
129+
d.link = lc
130+
131+
return &instance{link: lc, path: d.path}, nil
132+
}
133+
134+
// Detach does nothing, we want the probes attached as long as ANY process is using
135+
// our library.
136+
func (i *instance) Detach(_ interpreter.EbpfHandler, _ libpf.PID) error {
137+
if i.link != nil {
138+
log.Debugf("[cuda] parcagpu USDT probes closed for %s", i.path)
139+
if err := i.link.Detach(); err != nil {
140+
return err
141+
}
142+
}
143+
return nil
144+
}
145+
146+
func (d *data) Unload(ebpf interpreter.EbpfHandler) {
147+
if d.link != nil {
148+
log.Debugf("[cuda] parcagpu USDT probes closed for %s", d.path)
149+
if err := d.link.Unload(); err != nil {
150+
log.Errorf("error closing cuda usdt link: %s", err)
151+
}
152+
}
153+
}

libpf/frametype.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ const (
5555
AbortFrame FrameType = support.FrameMarkerAbort
5656
// LuaJITFrame identifies the LuaJIT interpreter frames.
5757
LuaJITFrame FrameType = support.FrameMarkerLuaJIT
58+
// CUDAKernelFrame identifies CUDA kernel frames.
59+
CUDAKernelFrame FrameType = support.FrameMarkerCUDAKernel
5860
)
5961

6062
const (

libpf/interpretertype.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ const (
3535
LuaJIT InterpreterType = support.FrameMarkerLuaJIT
3636
// Go identifies Go code.
3737
Go InterpreterType = support.FrameMarkerGo
38+
// CUDA interpreter type for CUDA kernels.
39+
CUDA InterpreterType = support.FrameMarkerCUDAKernel
3840
)
3941

4042
// Pseudo-interpreters without a corresponding frame type.

parcagpu/parcagpu.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package parcagpu // import "go.opentelemetry.io/ebpf-profiler/parcagpu"
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
"unsafe"
10+
11+
"github.com/cilium/ebpf"
12+
"github.com/cilium/ebpf/perf"
13+
log "github.com/sirupsen/logrus"
14+
15+
"go.opentelemetry.io/ebpf-profiler/host"
16+
"go.opentelemetry.io/ebpf-profiler/libpf"
17+
"go.opentelemetry.io/ebpf-profiler/support"
18+
)
19+
20+
type mapKey struct {
21+
pid uint32
22+
id uint32
23+
}
24+
25+
type gpuTraceFixer struct {
26+
mu sync.Mutex
27+
timesAwaitingTraces map[mapKey]kernelTimingEvent
28+
tracesAwaitingTimes map[mapKey]*host.Trace
29+
}
30+
31+
func (p *gpuTraceFixer) addTrace(trace *host.Trace) *host.Trace {
32+
p.mu.Lock()
33+
defer p.mu.Unlock()
34+
key := mapKey{uint32(trace.PID), trace.ParcaGPUTraceID}
35+
ev, ok := p.timesAwaitingTraces[key]
36+
if ok {
37+
delete(p.timesAwaitingTraces, key)
38+
trace.OffTime = int64(ev.end - ev.start)
39+
prepTrace(trace, &ev)
40+
return trace
41+
}
42+
p.tracesAwaitingTimes[key] = trace
43+
return nil
44+
}
45+
46+
func (p *gpuTraceFixer) addTime(key mapKey, ev *kernelTimingEvent) *host.Trace {
47+
p.mu.Lock()
48+
defer p.mu.Unlock()
49+
trace, ok := p.tracesAwaitingTimes[key]
50+
if ok {
51+
delete(p.tracesAwaitingTimes, key)
52+
prepTrace(trace, ev)
53+
return trace
54+
}
55+
p.timesAwaitingTraces[key] = *ev
56+
return nil
57+
}
58+
59+
// maybeClear clears the maps if they get too big. uprobes aren't perfect and
60+
// we may miss matching timing to trace at attach boundary.
61+
func (p *gpuTraceFixer) maybeClear() {
62+
p.mu.Lock()
63+
defer p.mu.Unlock()
64+
if len(p.timesAwaitingTraces) > 100 || len(p.tracesAwaitingTimes) > 100 {
65+
log.Warnf("[parcagpu] clearing gpu trace fixer maps, too many entries: %d traces, %d times",
66+
len(p.tracesAwaitingTimes), len(p.timesAwaitingTraces))
67+
p.timesAwaitingTraces = map[mapKey]kernelTimingEvent{}
68+
p.tracesAwaitingTimes = map[mapKey]*host.Trace{}
69+
}
70+
}
71+
72+
// TODO: have cgo generate this
73+
type kernelTimingEvent struct {
74+
pid uint32
75+
id uint32
76+
start, end uint64
77+
dev, stream, graph uint32
78+
kernelName [128]byte
79+
}
80+
81+
func prepTrace(tr *host.Trace, ev *kernelTimingEvent) {
82+
tr.OffTime = int64(ev.end - ev.start)
83+
if tr.CustomLabels == nil {
84+
tr.CustomLabels = make(map[string]string)
85+
}
86+
87+
tr.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.dev), 10)
88+
if ev.stream != 0 {
89+
tr.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.stream), 10)
90+
}
91+
if ev.graph != 0 {
92+
tr.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.graph), 10)
93+
}
94+
if len(ev.kernelName) > 0 {
95+
// TODO: is there a better way to pass this through?
96+
tr.CustomLabels["_temp_cuda_kernel"] = string(ev.kernelName[:])
97+
// ConvertTrace will add a pseudo-frame for the kernel.
98+
tr.Frames = append([]host.Frame{{
99+
Type: libpf.CUDAKernelFrame,
100+
}}, tr.Frames...)
101+
}
102+
}
103+
104+
// Start starts two goroutines that filter traces coming from ebpf and match them up with timing
105+
// information coming from the parcagpuKernelExecuted uprobe.
106+
func Start(ctx context.Context, traceInCh <-chan *host.Trace,
107+
gpuTimingEvents *ebpf.Map) chan *host.Trace {
108+
fixer := &gpuTraceFixer{
109+
timesAwaitingTraces: map[mapKey]kernelTimingEvent{},
110+
tracesAwaitingTimes: map[mapKey]*host.Trace{},
111+
}
112+
traceOutChan := make(chan *host.Trace, 1024)
113+
114+
// Read traces coming from ebpf and send normal traces through
115+
go func() {
116+
timer := time.NewTicker(60 * time.Second)
117+
118+
for {
119+
select {
120+
case <-timer.C:
121+
// We don't want to leak memory, so we purge the readers map every 60 seconds.
122+
fixer.maybeClear()
123+
case <-ctx.Done():
124+
return
125+
case t := <-traceInCh:
126+
if t != nil && t.Origin == support.TraceOriginCuda {
127+
log.Debugf("[cuda]: got trace with id 0x%x for cuda from pid: %d",
128+
t.ParcaGPUTraceID, t.PID)
129+
if tr := fixer.addTrace(t); tr != nil {
130+
log.Debugf("[cuda]: trace completed with trace: 0x%x", tr.ParcaGPUTraceID)
131+
traceOutChan <- tr
132+
}
133+
} else {
134+
traceOutChan <- t
135+
}
136+
}
137+
}
138+
}()
139+
140+
eventReader, err := perf.NewReader(gpuTimingEvents, 1024 /* perCPUBufferSize */)
141+
if err != nil {
142+
log.Fatalf("Failed to setup perf reporting via %s: %v", gpuTimingEvents, err)
143+
}
144+
145+
var lostEventsCount, readErrorCount, noDataCount atomic.Uint64
146+
go func() {
147+
var data perf.Record
148+
for {
149+
select {
150+
case <-ctx.Done():
151+
return
152+
default:
153+
if err := eventReader.ReadInto(&data); err != nil {
154+
readErrorCount.Add(1)
155+
continue
156+
}
157+
if data.LostSamples != 0 {
158+
lostEventsCount.Add(data.LostSamples)
159+
continue
160+
}
161+
if len(data.RawSample) == 0 {
162+
noDataCount.Add(1)
163+
continue
164+
}
165+
ev := (*kernelTimingEvent)(unsafe.Pointer(&data.RawSample[0]))
166+
log.Debugf("[cuda]: timing info with id 0x%x for cuda from %d", ev.id, ev.pid)
167+
if tr := fixer.addTime(mapKey{ev.pid, ev.id}, ev); tr != nil {
168+
log.Debugf("[cuda]: trace completed with event: 0x%x", tr.ParcaGPUTraceID)
169+
traceOutChan <- tr
170+
}
171+
}
172+
}
173+
}()
174+
175+
return traceOutChan
176+
}

processmanager/execinfomanager/manager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"go.opentelemetry.io/ebpf-profiler/interpreter/dotnet"
2020
golang "go.opentelemetry.io/ebpf-profiler/interpreter/go"
2121
"go.opentelemetry.io/ebpf-profiler/interpreter/golabels"
22+
"go.opentelemetry.io/ebpf-profiler/interpreter/gpu"
2223
"go.opentelemetry.io/ebpf-profiler/interpreter/hotspot"
2324
"go.opentelemetry.io/ebpf-profiler/interpreter/luajit"
2425
"go.opentelemetry.io/ebpf-profiler/interpreter/nodev8"
@@ -136,7 +137,6 @@ func NewExecutableInfoManager(
136137
if includeTracers.Has(types.LuaJITTracer) {
137138
interpreterLoaders = append(interpreterLoaders, luajit.Loader)
138139
}
139-
140140
interpreterLoaders = append(interpreterLoaders, apmint.Loader)
141141
if includeTracers.Has(types.Labels) {
142142
interpreterLoaders = append(interpreterLoaders, golabels.Loader)
@@ -146,6 +146,10 @@ func NewExecutableInfoManager(
146146
}
147147
interpreterLoaders = append(interpreterLoaders, oomwatcher.Loader, rtld.Loader)
148148

149+
if includeTracers.Has(types.CUDATracer) {
150+
interpreterLoaders = append(interpreterLoaders, gpu.Loader)
151+
}
152+
149153
deferredFileIDs, err := lru.NewSynced[host.FileID, libpf.Void](deferredFileIDSize,
150154
func(id host.FileID) uint32 { return uint32(id) })
151155
if err != nil {

0 commit comments

Comments
 (0)