Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle processes whose main thread has exited #376

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,10 @@ traces user-land will simply read and then clear this map on a timer.
The BPF components are responsible for notifying user-land about new and exiting
processes. An event about a new process is produced when we first interrupt it
with the unwinders. Events about exiting processes are created with a
`sched_process_exit` probe. In both cases the BPF code sends a perf event to
`sched_process_free` tracepoint. In both cases the BPF code sends a perf event to
notify user-land. We also re-report a PID if we detect execution in previously
unknown memory region to prompt re-scan of the mappings.
unknown memory region to prompt re-scan of the mappings. Finally, the profiler
can also profile processes whose main thread exits, leaving other threads running.

### Network protocol

Expand Down
4 changes: 2 additions & 2 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (c *Controller) Start(ctx context.Context) error {
return fmt.Errorf("failed to attach scheduler monitor: %w", err)
}

// This log line is used in our system tests to verify if that the agent has started. So if you
// change this log line update also the system test.
// This log line is used in our system tests to verify if that the agent has started.
// So if you change this log line update also the system test.
log.Printf("Attached sched monitor")

if err := startTraceHandling(ctx, c.reporter, intervals, trc,
Expand Down
69 changes: 52 additions & 17 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ import (
"go.opentelemetry.io/ebpf-profiler/stringutil"
)

// GetMappings returns this error when no mappings can be extracted.
var ErrNoMappings = errors.New("no mappings")

// systemProcess provides an implementation of the Process interface for a
// process that is currently running on this machine.
type systemProcess struct {
pid libpf.PID
tid libpf.PID

remoteMemory remotememory.RemoteMemory
mainThreadExit bool
remoteMemory remotememory.RemoteMemory

fileToMapping map[string]*Mapping
}
Expand All @@ -53,9 +58,10 @@ func init() {
}

// New returns an object with Process interface accessing it
func New(pid libpf.PID) Process {
func New(pid, tid libpf.PID) Process {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't switch Process to accept libpf.PIDTID as the latter is only used with PID events, and I'd rather not couple it here too.

return &systemProcess{
pid: pid,
tid: tid,
remoteMemory: remotememory.NewProcessVirtualMemory(pid),
}
}
Expand Down Expand Up @@ -165,9 +171,8 @@ func parseMappings(mapsFile io.Reader) ([]Mapping, uint32, error) {
path = VdsoPathName
device = 0
inode = vdsoInode
} else if path != "" {
// Ignore [vsyscall] and similar executable kernel
// pages we don't care about
} else {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No semantic change, I just inlined the logic from GetMappings here as this is the more appropriate place.

// Ignore mappings that are invalid, non-existent or are special pseudo-files
continue
}
} else {
Expand Down Expand Up @@ -229,20 +234,45 @@ func (sp *systemProcess) GetMappings() ([]Mapping, uint32, error) {
defer mapsFile.Close()

mappings, numParseErrors, err := parseMappings(mapsFile)
if err == nil {
fileToMapping := make(map[string]*Mapping, len(mappings))
for idx := range mappings {
m := &mappings[idx]
if m.Inode == 0 {
// Ignore mappings that are invalid,
// non-existent or are special pseudo-files.
continue
}
fileToMapping[m.Path] = m
if err != nil {
return mappings, numParseErrors, err
}

if len(mappings) == 0 {
// We could test for main thread exit here by checking for zombie state
// in /proc/sp.pid/stat but it's simpler to assume that this is the case
// and try extracting mappings for a different thread. Since we stopped
// processing /proc at agent startup, it's very unlikely that the agent
// will sample a process being initialized without mappings.
log.Warnf("PID: %v main thread exit", sp.pid)
sp.mainThreadExit = true
mapsFileAlt, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/maps", sp.pid, sp.tid))
// On all errors resulting from trying to get mappings from a different thread,
// return ErrNoMappings which will keep the PID tracked in processmanager and
// allow for a future iteration to try extracting mappings from a different thread.
// This is done to deal with race conditions triggered by thread exits (we do not want
// the agent to unload process metadata when a thread exits but the process is still
// alive).
if err != nil {
return mappings, numParseErrors, ErrNoMappings
}
defer mapsFileAlt.Close()

numParseErrorsAlt := uint32(0)
mappings, numParseErrorsAlt, err = parseMappings(mapsFileAlt)
numParseErrors += numParseErrorsAlt
if err != nil || len(mappings) == 0 {
return mappings, numParseErrors, ErrNoMappings
}
sp.fileToMapping = fileToMapping
}
return mappings, numParseErrors, err

fileToMapping := make(map[string]*Mapping, len(mappings))
for idx := range mappings {
m := &mappings[idx]
fileToMapping[m.Path] = m
}
sp.fileToMapping = fileToMapping
return mappings, numParseErrors, nil
}

func (sp *systemProcess) GetThreads() ([]ThreadInfo, error) {
Expand Down Expand Up @@ -271,6 +301,11 @@ func (sp *systemProcess) getMappingFile(m *Mapping) string {
if m.IsAnonymous() || m.IsVDSO() {
return ""
}
if sp.mainThreadExit {
// Neither /proc/sp.pid/map_files nor /proc/sp.pid/task/sp.tid/map_files
// exist if main thread has exited, so we use the mapping path directly.
return m.Path
}
return fmt.Sprintf("/proc/%v/map_files/%x-%x", sp.pid, m.Vaddr, m.Vaddr+m.Length)
}

Expand Down
3 changes: 2 additions & 1 deletion process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func TestParseMappings(t *testing.T) {
}

func TestNewPIDOfSelf(t *testing.T) {
pr := New(libpf.PID(os.Getpid()))
pid := libpf.PID(os.Getpid())
pr := New(pid, pid)
assert.NotNil(t, pr)

mappings, numParseErrors, err := pr.GetMappings()
Expand Down
31 changes: 12 additions & 19 deletions processmanager/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (pm *ProcessManager) synchronizeMappings(pr process.Process,
// fast enough and this particular pid is reused again by the system.
func (pm *ProcessManager) processPIDExit(pid libpf.PID) {
exitKTime := times.GetKTime()
log.Debugf("- PID: %v", pid)
log.Warnf("- PID: %v", pid)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove these newly added warnings before merging, they should help with reviewing the PR as you don't need to run the agent with debug logs enabled and sort through a lot of irrelevant noise.


var err error
defer func() {
Expand Down Expand Up @@ -614,9 +614,16 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) {
return
}

if errors.Is(err, process.ErrNoMappings) {
// When no mappings can be extracted but the process is still alive,
// do not trigger a process exit to avoid unloading process metadata.
// As it's likely that a future iteration can extract mappings from a
// different thread in the process, notify eBPF to enable further notifications.
pm.ebpf.RemoveReportedPID(pid)
return
}

// All other errors imply that the process has exited.
// Clean up, and notify eBPF.
pm.processPIDExit(pid)
if os.IsNotExist(err) {
// Since listing /proc and opening files in there later is inherently racy,
// we expect to lose the race sometimes and thus expect to hit os.IsNotExist.
Expand All @@ -626,22 +633,7 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) {
// return ESRCH. Handle it as if the process did not exist.
pm.mappingStats.errProcESRCH.Add(1)
}
return
}
if len(mappings) == 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments are no longer relevant.

// Valid process without any (executable) mappings. All cases are
// handled as process exit. Possible causes and reasoning:
// 1. It is a kernel worker process. The eBPF does not send events from these,
// but we can see kernel threads here during startup when tracer walks
// /proc and tries to synchronize all PIDs it sees.
// The PID should not exist anywhere, but we can still double check and
// make sure the PID is not tracked.
// 2. It is a normal process executing, but we just sampled it when the kernel
// execve() is rebuilding the mappings and nothing is currently mapped.
// In this case we can handle it as process exit because everything about
// the process is changing: all mappings, comm, etc. If execve fails, we
// reaped it early. If execve succeeds, we will get new synchronization
// request soon, and handle it as a new process event.
// Clean up, and notify eBPF.
pm.processPIDExit(pid)
return
}
Expand Down Expand Up @@ -744,6 +736,7 @@ func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) {
continue
}

log.Warnf("PID %v deleted", pid)
delete(pm.pidToProcessInfo, pid)

for _, instance := range pm.interpreters[pid] {
Expand Down
13 changes: 7 additions & 6 deletions support/ebpf/interpreter_dispatcher.ebpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ bpf_map_def SEC("maps") reported_pids = {
//
// User space code will periodically iterate through the map and process each entry.
// Additionally, each time eBPF code writes a value into the map, user space is notified
// through event_send_trigger (which uses maps/report_events). As key we use the PID of
// the process and as value always true. When sizing this map, we are thinking about
// the maximum number of unique PIDs that could generate events we're interested in
// (process new, process exit, unknown PC) within a map monitor/processing interval,
// through event_send_trigger (which uses maps/report_events). As key we use the PID/TID
// of the process/thread and as value always true. When sizing this map, we are thinking
// about the maximum number of unique PIDs that could generate events we're interested in
// (process new, thread group exit, unknown PC) within a map monitor/processing interval,
// that we would like to support.
bpf_map_def SEC("maps") pid_events = {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(u32),
.key_size = sizeof(u64),
.value_size = sizeof(bool),
.max_entries = 65536,
};
Expand Down Expand Up @@ -205,7 +205,8 @@ static inline __attribute__((__always_inline__)) int unwind_stop(struct pt_regs
// No Error
break;
case metricID_UnwindNativeErrWrongTextSection:;
if (report_pid(ctx, trace->pid, record->ratelimitAction)) {
u64 pid_tgid = (u64)trace->pid << 32 | trace->tid;
if (report_pid(ctx, pid_tgid, record->ratelimitAction)) {
increment_metric(metricID_NumUnknownPC);
}
// Fallthrough to report the error
Expand Down
27 changes: 12 additions & 15 deletions support/ebpf/sched_monitor.ebpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,26 @@

#include "types.h"

// tracepoint__sched_process_exit is a tracepoint attached to the scheduler that stops processes.
// Every time a processes stops this hook is triggered.
SEC("tracepoint/sched/sched_process_exit")
int tracepoint__sched_process_exit(void *ctx)
{
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = (u32)(pid_tgid >> 32);
u32 tid = (u32)(pid_tgid & 0xFFFFFFFF);
struct sched_process_free_ctx {
unsigned char skip[24];
pid_t pid;
int prio;
};

if (pid != tid) {
// Only if the thread group ID matched with the PID the process itself exits. If they don't
// match only a thread of the process stopped and we do not need to report this PID to
// userspace for further processing.
goto exit;
}
// tracepoint__sched_process_free is a tracepoint attached to the scheduler that frees processes.
// Every time a processes exits this hook is triggered.
SEC("tracepoint/sched/sched_process_free")
int tracepoint__sched_process_free(struct sched_process_free_ctx *ctx)
{
u32 pid = ctx->pid;

if (!bpf_map_lookup_elem(&reported_pids, &pid) && !pid_information_exists(ctx, pid)) {
// Only report PIDs that we explicitly track. This avoids sending kernel worker PIDs
// to userspace.
goto exit;
}

if (report_pid(ctx, pid, RATELIMIT_ACTION_RESET)) {
if (report_pid(ctx, (u64)pid << 32 | pid, RATELIMIT_ACTION_RESET)) {
increment_metric(metricID_NumProcExit);
}
exit:
Expand Down
14 changes: 8 additions & 6 deletions support/ebpf/tracemgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,24 @@ pid_event_ratelimit(u32 pid, int ratelimit_action)
// and reporting aborted if PID has been recently reported.
// Returns true if the PID was successfully reported to user space.
static inline __attribute__((__always_inline__)) bool
report_pid(void *ctx, int pid, int ratelimit_action)
report_pid(void *ctx, u64 pid_tgid, int ratelimit_action)
{
u32 key = (u32)pid;
u32 pid = pid_tgid >> 32;

if (pid_event_ratelimit(pid, ratelimit_action)) {
return false;
}

bool value = true;
int errNo = bpf_map_update_elem(&pid_events, &key, &value, BPF_ANY);
int errNo = bpf_map_update_elem(&pid_events, &pid_tgid, &value, BPF_ANY);
if (errNo != 0) {
DEBUG_PRINT("Failed to update pid_events with PID %d: %d", pid, errNo);
__attribute__((unused)) u32 tid = pid_tgid & 0xFFFFFFFF;
DEBUG_PRINT("Failed to update pid_events with PID %d TID: %d: %d", pid, tid, errNo);
increment_metric(metricID_PIDEventsErr);
return false;
}
if (ratelimit_action == RATELIMIT_ACTION_RESET || errNo != 0) {
bpf_map_delete_elem(&reported_pids, &key);
bpf_map_delete_elem(&reported_pids, &pid);
}

// Notify userspace that there is a PID waiting to be processed.
Expand Down Expand Up @@ -714,7 +715,8 @@ static inline int collect_trace(
}

if (!pid_information_exists(ctx, pid)) {
if (report_pid(ctx, pid, RATELIMIT_ACTION_DEFAULT)) {
u64 pid_tgid = (u64)pid << 32 | tid;
if (report_pid(ctx, pid_tgid, RATELIMIT_ACTION_DEFAULT)) {
increment_metric(metricID_NumProcNew);
}
return 0;
Expand Down
Binary file modified support/ebpf/tracer.ebpf.release.amd64
Binary file not shown.
Binary file modified support/ebpf/tracer.ebpf.release.arm64
Binary file not shown.
Loading