Skip to content

Commit f7441a1

Browse files
committed
Track Probe collection+closers in Manager with ProbeReference struct
1 parent 3264e35 commit f7441a1

File tree

2 files changed

+67
-101
lines changed

2 files changed

+67
-101
lines changed

internal/pkg/instrumentation/manager.go

+52-38
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ const (
4747
// Manager handles the management of [probe.Probe] instances.
4848
type Manager struct {
4949
logger *slog.Logger
50-
probes map[probe.ID]probe.Probe
50+
probes map[probe.ID]ProbeReference
5151
otelController *opentelemetry.Controller
5252
cp ConfigProvider
5353
exe *link.Executable
@@ -61,6 +61,14 @@ type Manager struct {
6161
collectionOpts *ebpf.CollectionOptions
6262
}
6363

64+
// ProbeReference is used by the Manager to track an initialized reference
65+
// to a Probe and its related resources such as its ebpf.Collection and io.Closers.
66+
type ProbeReference struct {
67+
probe probe.Probe
68+
collection *ebpf.Collection
69+
closers []io.Closer
70+
}
71+
6472
// NewManager returns a new [Manager].
6573
func NewManager(
6674
logger *slog.Logger,
@@ -71,7 +79,7 @@ func NewManager(
7179
) (*Manager, error) {
7280
m := &Manager{
7381
logger: logger,
74-
probes: make(map[probe.ID]probe.Probe),
82+
probes: make(map[probe.ID]ProbeReference),
7583
otelController: otelController,
7684
cp: cp,
7785
}
@@ -99,12 +107,6 @@ func NewManager(
99107
},
100108
}
101109

102-
alloc, err := process.Allocate(logger, pid)
103-
if err != nil {
104-
return nil, err
105-
}
106-
m.proc.Allocation = alloc
107-
108110
m.logger.Info("loaded process info", "process", m.proc)
109111

110112
m.filterUnusedProbes()
@@ -145,7 +147,10 @@ func (m *Manager) registerProbe(p probe.Probe) error {
145147
return err
146148
}
147149

148-
m.probes[id] = p
150+
m.probes[id] = ProbeReference{
151+
probe: p,
152+
closers: make([]io.Closer, 0),
153+
}
149154
return nil
150155
}
151156

@@ -159,7 +164,7 @@ func (m *Manager) filterUnusedProbes() {
159164

160165
for name, inst := range m.probes {
161166
funcsFound := false
162-
for _, s := range inst.Manifest().Symbols {
167+
for _, s := range inst.probe.Manifest().Symbols {
163168
if len(s.DependsOn) == 0 {
164169
if _, exists := existingFuncMap[s.Symbol]; exists {
165170
funcsFound = true
@@ -226,15 +231,15 @@ func (m *Manager) applyConfig(c Config) error {
226231

227232
if currentlyEnabled && !newEnabled {
228233
m.logger.Info("Disabling probe", "id", id)
229-
err = errors.Join(err, p.Close())
234+
err = errors.Join(err, m.CloseProbe(p))
230235
continue
231236
}
232237

233238
if !currentlyEnabled && newEnabled {
234239
m.logger.Info("Enabling probe", "id", id)
235240
err = errors.Join(err, m.LoadProbe(p, id, c))
236241
if err == nil {
237-
m.runProbe(p)
242+
m.runProbe(p.probe)
238243
}
239244
continue
240245
}
@@ -313,7 +318,7 @@ func (m *Manager) runProbes(ctx context.Context) (context.Context, error) {
313318

314319
for id, p := range m.probes {
315320
if isProbeEnabled(id, m.currentConfig) {
316-
m.runProbe(p)
321+
m.runProbe(p.probe)
317322
}
318323
}
319324

@@ -414,29 +419,31 @@ func (m *Manager) loadProbes() error {
414419
return nil
415420
}
416421

417-
func (m *Manager) LoadProbe(i probe.Probe, name probe.ID, cfg Config) error {
422+
func (m *Manager) LoadProbe(i ProbeReference, name probe.ID, cfg Config) error {
418423
m.logger.Info("loading probe", "name", name)
419424

420-
err := i.Init(cfg.SamplingConfig)
425+
spec, err := i.probe.Spec()
421426
if err != nil {
422427
return errors.Join(err, m.cleanup())
423428
}
424429

425-
spec, err := i.Spec()
430+
err = m.InjectProbeConsts(i.probe, spec)
426431
if err != nil {
427432
return errors.Join(err, m.cleanup())
428433
}
429434

430-
err = m.InjectProbeConsts(i, spec)
435+
c, err := utils.InitializeEBPFCollection(spec, m.collectionOpts)
431436
if err != nil {
432-
return err
437+
return errors.Join(err, m.cleanup())
433438
}
439+
i.collection = c
434440

435-
c, err := utils.InitializeEBPFCollection(spec, m.collectionOpts)
441+
reader, err := i.probe.InitStartupConfig(c, cfg.SamplingConfig)
436442
if err != nil {
437-
return err
443+
return errors.Join(err, m.cleanup())
438444
}
439-
i.SetCollection(c)
445+
i.closers = append(i.closers, reader)
446+
440447
return nil
441448
}
442449

@@ -461,8 +468,8 @@ func (m *Manager) InjectProbeConsts(i probe.Probe, spec *ebpf.CollectionSpec) er
461468
return inject.Constants(spec, opts...)
462469
}
463470

464-
func (m *Manager) loadUprobesFromProbe(i probe.Probe) error {
465-
for _, up := range i.GetUprobes() {
471+
func (m *Manager) loadUprobesFromProbe(i ProbeReference) error {
472+
for _, up := range i.probe.GetUprobes() {
466473
var skip bool
467474
for _, pc := range up.PackageConstraints {
468475
if pc.Constraints.Check(m.proc.Modules[pc.Package]) {
@@ -472,9 +479,9 @@ func (m *Manager) loadUprobesFromProbe(i probe.Probe) error {
472479
var logFn func(string, ...any)
473480
switch pc.FailureMode {
474481
case probe.FailureModeIgnore:
475-
logFn = i.GetLogger().Debug
482+
logFn = i.probe.GetLogger().Debug
476483
case probe.FailureModeWarn:
477-
logFn = i.GetLogger().Warn
484+
logFn = i.probe.GetLogger().Warn
478485
default:
479486
// Unknown and FailureModeError.
480487
return fmt.Errorf(
@@ -486,7 +493,7 @@ func (m *Manager) loadUprobesFromProbe(i probe.Probe) error {
486493

487494
logFn(
488495
"package constraint not meet, skipping uprobe",
489-
"probe", i.Manifest().ID,
496+
"probe", i.probe.Manifest().ID,
490497
"symbol", up.Sym,
491498
"package", pc.Package,
492499
"constraint", pc.Constraints.String(),
@@ -500,22 +507,23 @@ func (m *Manager) loadUprobesFromProbe(i probe.Probe) error {
500507
continue
501508
}
502509

503-
err := m.loadUprobe(up, i.GetCollection())
510+
err := m.loadUprobe(up, i.collection)
504511
if err != nil {
505512
var logFn func(string, ...any)
506513
switch up.FailureMode {
507514
case probe.FailureModeIgnore:
508-
logFn = i.GetLogger().Debug
515+
logFn = i.probe.GetLogger().Debug
509516
case probe.FailureModeWarn:
510-
logFn = i.GetLogger().Warn
517+
logFn = i.probe.GetLogger().Warn
511518
default:
512519
// Unknown and FailureModeError.
513520
return err
514521
}
515-
logFn("failed to load uprobe", "probe", i.Manifest().ID, "symbol", up.Sym, "error", err)
522+
logFn("failed to load uprobe", "probe", i.probe.Manifest().ID, "symbol", up.Sym, "error", err)
516523
continue
517524
}
518-
_ = i.UpdateClosers(up)
525+
526+
i.closers = append(i.closers, up)
519527
}
520528
return nil
521529
}
@@ -574,20 +582,26 @@ func (m *Manager) loadUprobe(u *probe.Uprobe, c *ebpf.Collection) error {
574582
return nil
575583
}
576584

577-
func (m *Manager) mount() error {
578-
if m.proc.Allocation != nil {
579-
m.logger.Debug("Mounting bpffs", "allocation", m.proc.Allocation)
580-
} else {
581-
m.logger.Debug("Mounting bpffs")
585+
func (m *Manager) CloseProbe(p ProbeReference) error {
586+
if p.collection != nil {
587+
p.collection.Close()
582588
}
583-
return bpffsMount(m.proc)
589+
590+
var err error
591+
for _, c := range p.closers {
592+
err = errors.Join(err, c.Close())
593+
}
594+
if err == nil {
595+
p.probe.GetLogger().Debug("Closed", "Probe", p.probe.Manifest().ID)
596+
}
597+
return err
584598
}
585599

586600
func (m *Manager) cleanup() error {
587601
ctx := context.Background()
588602
err := m.cp.Shutdown(context.Background())
589603
for _, i := range m.probes {
590-
err = errors.Join(err, i.Close())
604+
err = errors.Join(err, m.CloseProbe(i))
591605
}
592606

593607
// Wait for all probes to close so we know there is no more telemetry being

internal/pkg/instrumentation/probe/probe.go

+15-63
Original file line numberDiff line numberDiff line change
@@ -34,36 +34,25 @@ type Probe interface {
3434
// the information about the package the Probe instruments.
3535
Manifest() Manifest
3636

37-
// Init initializes the Probe, setting up the reader, closers, and sampling config.
38-
Init(*sampling.Config) error
37+
// InitStartupConfig sets up initialization config options for the Probe,
38+
// such as its sampling config, sets up its BPFObj as a closer, and initializes
39+
// the Probe's reader, returning it as an io.Closer.
40+
InitStartupConfig(*ebpf.Collection, *sampling.Config) (io.Closer, error)
3941

4042
// Run runs the events processing loop.
4143
Run(func(ptrace.ScopeSpans))
4244

43-
// Close stops the Probe.
44-
Close() error
45-
4645
// GetLogger returns the *slog.Logger associated with the Probe.
4746
GetLogger() *slog.Logger
4847

4948
// Spec returns the *ebpf.CollectionSpec for the Probe.
5049
Spec() (*ebpf.CollectionSpec, error)
5150

52-
// SetCollection sets the *ebpf.Collection for the Probe.
53-
SetCollection(*ebpf.Collection)
54-
55-
// GetCollection returns the *ebpf.Collection for the Probe.
56-
GetCollection() *ebpf.Collection
57-
5851
// GetUprobes returns a list of *Uprobes for the Probe.
5952
GetUprobes() []*Uprobe
6053

6154
// GetConsts returns a list of Consts for the Probe.
6255
GetConsts() []Const
63-
64-
// UpdateClosers updates the closers for the Probe to the io.Closers passed to it,
65-
// and returns the new list of io.Closers for the Probe.
66-
UpdateClosers(...io.Closer) []io.Closer
6756
}
6857

6958
// Base is a base implementation of [Probe].
@@ -93,7 +82,6 @@ type Base[BPFObj any, BPFEvent any] struct {
9382
ProcessRecord func(perf.Record) (*BPFEvent, error)
9483

9584
reader *perf.Reader
96-
collection *ebpf.Collection
9785
closers []io.Closer
9886
samplingManager *sampling.Manager
9987
}
@@ -131,40 +119,33 @@ func (i *Base[BPFObj, BPFEvent]) Spec() (*ebpf.CollectionSpec, error) {
131119
return i.SpecFn()
132120
}
133121

134-
// Init initializes the Probe, setting up the io.Closers, Reader, and Sampling config.
135-
func (i *Base[BPFObj, BPFEvent]) Init(sampler *sampling.Config) error {
122+
func (i *Base[BPFObj, BPFEvent]) InitStartupConfig(c *ebpf.Collection, sampler *sampling.Config) (io.Closer, error) {
136123
obj := new(BPFObj)
137124
if c, ok := ((interface{})(obj)).(io.Closer); ok {
138125
i.closers = append(i.closers, c)
139126
}
140127

141-
err := i.initReader()
128+
samplingManager, err := sampling.NewSamplingManager(c, sampler)
142129
if err != nil {
143-
return err
130+
return nil, err
144131
}
132+
i.samplingManager = samplingManager
145133

146-
i.samplingManager, err = sampling.NewSamplingManager(i.collection, sampler)
134+
buf, ok := c.Maps[DefaultBufferMapName]
135+
if !ok {
136+
return nil, fmt.Errorf("%s map not found", DefaultBufferMapName)
137+
}
138+
i.reader, err = perf.NewReader(buf, PerfBufferDefaultSizeInPages*os.Getpagesize())
147139
if err != nil {
148-
return err
140+
return nil, err
149141
}
150-
151-
i.closers = append(i.closers, i.reader)
152-
153-
return nil
142+
return i.reader, nil
154143
}
155144

156145
func (i *Base[BPFObj, BPFEvent]) GetLogger() *slog.Logger {
157146
return i.Logger
158147
}
159148

160-
func (i *Base[BPFObj, BPFEvent]) SetCollection(c *ebpf.Collection) {
161-
i.collection = c
162-
}
163-
164-
func (i *Base[BPFObj, BPFEvent]) GetCollection() *ebpf.Collection {
165-
return i.collection
166-
}
167-
168149
func (i *Base[BPFObj, BPFEvent]) GetUprobes() []*Uprobe {
169150
return i.Uprobes
170151
}
@@ -178,20 +159,6 @@ func (i *Base[BPFObj, BPFEvent]) GetConsts() []Const {
178159
return i.Consts
179160
}
180161

181-
func (i *Base[BPFObj, BPFEvent]) initReader() error {
182-
buf, ok := i.collection.Maps[DefaultBufferMapName]
183-
if !ok {
184-
return fmt.Errorf("%s map not found", DefaultBufferMapName)
185-
}
186-
var err error
187-
i.reader, err = perf.NewReader(buf, PerfBufferDefaultSizeInPages*os.Getpagesize())
188-
if err != nil {
189-
return err
190-
}
191-
i.closers = append(i.closers, i.reader)
192-
return nil
193-
}
194-
195162
// read reads a new BPFEvent from the perf Reader.
196163
func (i *Base[BPFObj, BPFEvent]) read() (*BPFEvent, error) {
197164
record, err := i.reader.Read()
@@ -222,21 +189,6 @@ func (i *Base[BPFObj, BPFEvent]) read() (*BPFEvent, error) {
222189
return event, nil
223190
}
224191

225-
// Close stops the Probe.
226-
func (i *Base[BPFObj, BPFEvent]) Close() error {
227-
if i.collection != nil {
228-
i.collection.Close()
229-
}
230-
var err error
231-
for _, c := range i.closers {
232-
err = errors.Join(err, c.Close())
233-
}
234-
if err == nil {
235-
i.Logger.Debug("Closed", "Probe", i.ID)
236-
}
237-
return err
238-
}
239-
240192
type SpanProducer[BPFObj any, BPFEvent any] struct {
241193
Base[BPFObj, BPFEvent]
242194

0 commit comments

Comments
 (0)