Skip to content

Commit 1d3cd8f

Browse files
authored
Merge pull request #140 from parca-dev/rem-observer
2 parents 7c32e7f + 99907e0 commit 1d3cd8f

File tree

6 files changed

+14
-177
lines changed

6 files changed

+14
-177
lines changed

interpreter/instancestubs.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,28 @@ import (
1212
"go.opentelemetry.io/ebpf-profiler/tpbase"
1313
)
1414

15-
// ObserverStubs provides empty implementations of Observer hooks that are
16-
// not mandatory for an Observer implementation.
17-
type ObserverStubs struct {
15+
// InstanceStubs provides empty implementations of Instance hooks that are
16+
// not mandatory for a Instance implementation.
17+
type InstanceStubs struct {
1818
}
1919

20-
func (os *ObserverStubs) Detach(EbpfHandler, libpf.PID) error {
20+
func (is *InstanceStubs) Detach(EbpfHandler, libpf.PID) error {
2121
return nil
2222
}
2323

24-
func (os *ObserverStubs) SynchronizeMappings(EbpfHandler, reporter.SymbolReporter, process.Process,
24+
func (is *InstanceStubs) SynchronizeMappings(EbpfHandler, reporter.SymbolReporter, process.Process,
2525
[]process.Mapping) error {
2626
return nil
2727
}
2828

29-
func (os *ObserverStubs) UpdateTSDInfo(EbpfHandler, libpf.PID, tpbase.TSDInfo) error {
29+
func (is *InstanceStubs) UpdateTSDInfo(EbpfHandler, libpf.PID, tpbase.TSDInfo) error {
3030
return nil
3131
}
3232

33-
func (os *ObserverStubs) GetAndResetMetrics() ([]metrics.Metric, error) {
33+
func (is *InstanceStubs) GetAndResetMetrics() ([]metrics.Metric, error) {
3434
return []metrics.Metric{}, nil
3535
}
3636

37-
// InstanceStubs provides empty implementations of Instance hooks that are
38-
// not mandatory for a Instance implementation.
39-
type InstanceStubs struct {
40-
ObserverStubs
41-
}
42-
4337
func (is *InstanceStubs) Symbolize(*host.Frame, *libpf.Frames) error {
4438
return ErrMismatchInterpreterType
4539
}

interpreter/types.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,14 @@ type Data interface {
136136
Unload(ebpf EbpfHandler)
137137
}
138138

139-
// Observer is the base interface for observing per-PID data without symbolization.
140-
// This interface is useful for components that need to observe processes without
141-
// providing frame symbolization capabilities.
142-
type Observer interface {
139+
// Instance is the interface to operate on per-PID data.
140+
type Instance interface {
143141
// Detach removes any information from the ebpf maps. The pid is given as argument so
144-
// simple observers can use the global Data also as the Observer implementation.
142+
// simple interpreters can use the global Data also as the Instance implementation.
145143
Detach(ebpf EbpfHandler, pid libpf.PID) error
146144

147145
// SynchronizeMappings is called when the processmanager has reread process memory
148-
// mappings. Observers not needing to process these events can simply ignore them
146+
// mappings. Interpreters not needing to process these events can simply ignore them
149147
// by just returning a nil.
150148
SynchronizeMappings(ebpf EbpfHandler, symbolReporter reporter.SymbolReporter,
151149
pr process.Process, mappings []process.Mapping) error
@@ -154,16 +152,6 @@ type Observer interface {
154152
// introspection data has been updated.
155153
UpdateTSDInfo(ebpf EbpfHandler, pid libpf.PID, info tpbase.TSDInfo) error
156154

157-
// GetAndResetMetrics collects the metrics from the Observer and resets
158-
// the counters to their initial value.
159-
GetAndResetMetrics() ([]metrics.Metric, error)
160-
}
161-
162-
// Instance is the interface to operate on per-PID data with symbolization support.
163-
// It extends Observer with the ability to symbolize frames.
164-
type Instance interface {
165-
Observer
166-
167155
// Symbolize converts one ebpf frame to one or more (if inlining was expanded) libpf.Frame.
168156
// The resulting libpf.Frame values are appended to frames.
169157
Symbolize(ebpfFrame *host.Frame, frames *libpf.Frames) error

processmanager/execinfomanager/manager.go

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ type ExecutableInfo struct {
6666
// instance belongs to was previously identified as an interpreter. Otherwise,
6767
// this field is nil.
6868
Data interpreter.Data
69-
// Observers stores per-executable observer information. Multiple observers
70-
// can be associated with a single executable.
71-
Observers []interpreter.Data
7269
// TSDInfo stores TSD information if the executable is libc, otherwise nil.
7370
TSDInfo *tpbase.TSDInfo
7471
}
@@ -146,12 +143,7 @@ func NewExecutableInfoManager(
146143
if collectCustomLabels {
147144
interpreterLoaders = append(interpreterLoaders, customlabels.Loader)
148145
}
149-
150-
// Initialize observer loaders
151-
observerLoaders := []interpreter.Loader{
152-
oomwatcher.Loader,
153-
// Additional observers can be added here
154-
}
146+
interpreterLoaders = append(interpreterLoaders, oomwatcher.Loader)
155147

156148
deferredFileIDs, err := lru.NewSynced[host.FileID, libpf.Void](deferredFileIDSize,
157149
func(id host.FileID) uint32 { return uint32(id) })
@@ -164,7 +156,6 @@ func NewExecutableInfoManager(
164156
sdp: sdp,
165157
state: xsync.NewRWMutex(executableInfoManagerState{
166158
interpreterLoaders: interpreterLoaders,
167-
observerLoaders: observerLoaders,
168159
executables: map[host.FileID]*entry{},
169160
unwindInfoIndex: map[sdtypes.UnwindInfo]uint16{},
170161
ebpf: ebpf,
@@ -247,9 +238,8 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID,
247238
// Insert a corresponding record into our map.
248239
info = &entry{
249240
ExecutableInfo: ExecutableInfo{
250-
Data: state.detectAndLoadInterpData(loaderInfo),
251-
Observers: state.detectAndLoadObservers(loaderInfo),
252-
TSDInfo: tsdInfo,
241+
Data: state.detectAndLoadInterpData(loaderInfo),
242+
TSDInfo: tsdInfo,
253243
},
254244
mapRef: ref,
255245
rc: 1,
@@ -321,11 +311,6 @@ type executableInfoManagerState struct {
321311
// for loading the host agent support for a specific interpreter type.
322312
interpreterLoaders []interpreter.Loader
323313

324-
// observerLoaders is a list of instances of an interface that provide functionality
325-
// for loading observers for executables. Unlike interpreters, multiple observers
326-
// can be associated with a single executable.
327-
observerLoaders []interpreter.Loader
328-
329314
// ebpf provides the interface to manipulate eBPF maps.
330315
ebpf pmebpf.EbpfHandler
331316

@@ -390,38 +375,6 @@ func (state *executableInfoManagerState) detectAndLoadInterpData(
390375
}
391376
}
392377

393-
// detectAndLoadObservers attempts to detect observers for the given executable.
394-
// Unlike interpreters, multiple observers can be loaded for a single executable.
395-
func (state *executableInfoManagerState) detectAndLoadObservers(
396-
loaderInfo *interpreter.LoaderInfo) []interpreter.Data {
397-
var observers []interpreter.Data
398-
399-
// Ask all observer loaders to check this executable
400-
for _, loader := range state.observerLoaders {
401-
data, err := loader(state.ebpf, loaderInfo)
402-
if err != nil {
403-
if errors.Is(err, os.ErrNotExist) {
404-
// Very common if the process exited when we tried to analyze it.
405-
log.Debugf("Failed to load observer for %v (%#016x): file not found",
406-
loaderInfo.FileName(), loaderInfo.FileID())
407-
} else {
408-
log.Errorf("Loader for %v (%#016x) failed: %v",
409-
loaderInfo.FileName(), loaderInfo.FileID(), err)
410-
}
411-
continue
412-
}
413-
414-
// All observers return a data instance (possibly no-op)
415-
if data != nil {
416-
log.Debugf("Observer data %v for %v (%#016x)",
417-
data, loaderInfo.FileName(), loaderInfo.FileID())
418-
observers = append(observers, data)
419-
}
420-
}
421-
422-
return observers
423-
}
424-
425378
// loadDeltas converts the sdtypes.StackDelta to StackDeltaEBPF and passes that to
426379
// the ebpf interface to be loaded to kernel maps. While converting the deltas, it
427380
// also creates a list of all large gaps in the executable.

processmanager/manager.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,11 @@ func New(ctx context.Context, includeTracers types.IncludedTracers, monitorInter
8989
}
9090

9191
interpreters := make(map[libpf.PID]map[util.OnDiskFileIdentifier]interpreter.Instance)
92-
observers := make(map[libpf.PID]map[util.OnDiskFileIdentifier][]interpreter.Observer)
9392

9493
pm := &ProcessManager{
9594
interpreterTracerEnabled: em.NumInterpreterLoaders() > 0,
9695
eim: em,
9796
interpreters: interpreters,
98-
observers: observers,
9997
exitEvents: make(map[libpf.PID]times.KTime),
10098
pidToProcessInfo: make(map[libpf.PID]*processInfo),
10199
ebpf: ebpf,

processmanager/processinfo.go

Lines changed: 0 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -283,60 +283,6 @@ func (pm *ProcessManager) handleNewInterpreter(pr process.Process, m *Mapping,
283283
return nil
284284
}
285285

286-
// handleNewObservers processes observers for a new executable mapping.
287-
// Unlike interpreters where only one can handle an executable, multiple observers
288-
// can be attached to the same executable.
289-
//
290-
// The caller is responsible to hold the ProcessManager lock to avoid race conditions.
291-
func (pm *ProcessManager) handleNewObservers(pr process.Process, m *Mapping,
292-
ei *eim.ExecutableInfo) {
293-
if len(ei.Observers) == 0 {
294-
return
295-
}
296-
297-
pid := pr.PID()
298-
key := m.GetOnDiskFileIdentifier()
299-
300-
// Initialize observer map for this PID if needed
301-
if _, ok := pm.observers[pid]; !ok {
302-
pm.observers[pid] = make(map[util.OnDiskFileIdentifier][]interpreter.Observer)
303-
}
304-
305-
// Check if observers are already attached for this key
306-
if _, ok := pm.observers[pid][key]; ok {
307-
// Already handled
308-
return
309-
}
310-
311-
// Attach all observers
312-
var observers []interpreter.Observer //nolint:prealloc
313-
for _, observerData := range ei.Observers {
314-
observer, err := observerData.Attach(pm.ebpf, pid, libpf.Address(m.Bias),
315-
pr.GetRemoteMemory())
316-
if err != nil {
317-
log.Errorf("Failed to attach observer %v to PID %v: %v", observerData, pid, err)
318-
continue
319-
}
320-
if observer == nil {
321-
continue
322-
}
323-
324-
log.Debugf("Attached observer %v to PID %v", observerData, pid)
325-
observers = append(observers, observer)
326-
327-
// Update TSD info if available
328-
if tsdInfo := pm.getTSDInfo(pid); tsdInfo != nil {
329-
if err := observer.UpdateTSDInfo(pm.ebpf, pid, *tsdInfo); err != nil {
330-
log.Errorf("Failed to update observer TSDInfo for PID %v: %v", pid, err)
331-
}
332-
}
333-
}
334-
335-
if len(observers) > 0 {
336-
pm.observers[pid][key] = observers
337-
}
338-
}
339-
340286
// handleNewMapping processes new file backed mappings
341287
func (pm *ProcessManager) handleNewMapping(pr process.Process, m *Mapping,
342288
elfRef *pfelf.Reference) error {
@@ -369,9 +315,6 @@ func (pm *ProcessManager) handleNewMapping(pr process.Process, m *Mapping,
369315
}
370316
}
371317

372-
// Handle observers if present
373-
pm.handleNewObservers(pr, m, &ei)
374-
375318
return nil
376319
}
377320

@@ -533,26 +476,6 @@ func (pm *ProcessManager) processRemovedMappings(pid libpf.PID, mappings []libpf
533476
// remove the entry.
534477
delete(pm.interpreters, pid)
535478
}
536-
537-
// Clean up observers that are no longer valid
538-
for key, observerList := range pm.observers[pid] {
539-
if _, ok := interpretersValid[key]; ok {
540-
continue
541-
}
542-
for _, observer := range observerList {
543-
if err := observer.Detach(pm.ebpf, pid); err != nil {
544-
log.Errorf("Failed to unload observer for PID %d: %v",
545-
pid, err)
546-
}
547-
}
548-
delete(pm.observers[pid], key)
549-
}
550-
551-
if len(pm.observers[pid]) == 0 {
552-
// There are no longer any mapped observers in the process, therefore we can
553-
// remove the entry.
554-
delete(pm.observers, pid)
555-
}
556479
}
557480

558481
// synchronizeMappings synchronizes executable mappings for the given PID.
@@ -857,18 +780,6 @@ func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) {
857780
}
858781
delete(pm.interpreters, pid)
859782

860-
// Detach all observers for this PID
861-
for _, observerList := range pm.observers[pid] {
862-
for _, observer := range observerList {
863-
if err2 := observer.Detach(pm.ebpf, pid); err2 != nil {
864-
err = errors.Join(err,
865-
fmt.Errorf("failed to handle observer process exit for PID %d: %v",
866-
pid, err2))
867-
}
868-
}
869-
}
870-
delete(pm.observers, pid)
871-
872783
delete(pm.exitEvents, pid)
873784
log.Debugf("PID %v exit latency %v ms", pid, (nowKTime-pidExitKTime)/1e6)
874785
}

processmanager/types.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,6 @@ type ProcessManager struct {
5050
// the unique on-disk identifier of the interpreter DSO.
5151
interpreters map[libpf.PID]map[util.OnDiskFileIdentifier]interpreter.Instance
5252

53-
// observers records the interpreter.Observer interface which contains hooks for
54-
// process monitoring without symbolization. Unlike interpreters, multiple observers
55-
// can be associated with each executable.
56-
// The key of the first map is a process ID, while the key of the second map is
57-
// the unique on-disk identifier of the observed DSO.
58-
observers map[libpf.PID]map[util.OnDiskFileIdentifier][]interpreter.Observer
59-
6053
// pidToProcessInfo keeps track of the executable memory mappings.
6154
pidToProcessInfo map[libpf.PID]*processInfo
6255

0 commit comments

Comments
 (0)