From 09bf384ebdf5852f4c05d73aafaf00756d606a70 Mon Sep 17 00:00:00 2001 From: Bert Date: Sun, 9 Jun 2024 12:39:12 -0300 Subject: [PATCH] Adds histogram prometheus metrics (#112) * histogram * user can pass buckets * bucket size arg parsing works * code cleanup and added exmaple * comments * made example more comprehensive and add support for chars * Update pkg/stats/stats.go Co-authored-by: Krisztian Fekete <103492698+krisztianfekete@users.noreply.github.com> * Update pkg/stats/stats.go Co-authored-by: Krisztian Fekete <103492698+krisztianfekete@users.noreply.github.com> * Update pkg/stats/stats.go Co-authored-by: Krisztian Fekete <103492698+krisztianfekete@users.noreply.github.com> * fix compiler error * added GPL license identifier to example * added attribution comment to example * Update pkg/cli/internal/commands/run/run.go Co-authored-by: Krisztian Fekete <103492698+krisztianfekete@users.noreply.github.com> * fixed bug where couldn't set histogram key --------- Co-authored-by: Krisztian Fekete <103492698+krisztianfekete@users.noreply.github.com> --- examples/nfsstats/nfsstats.c | 129 +++++++++++++++++++++++++++ pkg/cli/internal/commands/run/run.go | 87 ++++++++++++++++-- pkg/decoder/decoder.go | 18 ++++ pkg/loader/loader.go | 125 +++++++++++++++++++++++--- pkg/stats/stats.go | 31 ++++++- 5 files changed, 372 insertions(+), 18 deletions(-) create mode 100644 examples/nfsstats/nfsstats.c diff --git a/examples/nfsstats/nfsstats.c b/examples/nfsstats/nfsstats.c new file mode 100644 index 0000000..c86b1d7 --- /dev/null +++ b/examples/nfsstats/nfsstats.c @@ -0,0 +1,129 @@ +// Based on: https://github.com/iovisor/bcc/blob/master/libbpf-tools/fsslower.bpf.c +// SPDX-License-Identifier: GPL-2.0 + +#include "vmlinux.h" +#include "bpf/bpf_helpers.h" +#include "bpf/bpf_core_read.h" +#include "bpf/bpf_tracing.h" +#include "solo_types.h" + +// Example for tracing NFS operation file duration using histogram metrics + +char __license[] SEC("license") = "Dual MIT/GPL"; + +struct event { + char fname[255]; + char op; + u64 le; +}; + +struct event_start { + u64 ts; + struct file *fp; +}; + +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __uint(max_entries, 4096); + __type(key, u32); + __type(value, struct event_start); +} start SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 1 << 24); + __type(value, struct event); +} hist_nfs_op_time_us SEC(".maps"); + +static __always_inline int +probe_entry(struct file *fp) +{ + bpf_printk("nfs_file_read happened"); + + struct event_start evt = {}; + + u32 tgid = bpf_get_current_pid_tgid() >> 32; + u64 ts = bpf_ktime_get_ns(); + + evt.ts = ts; + evt.fp = fp; + bpf_map_update_elem(&start, &tgid, &evt, 0); + + return 0; +} + +static __always_inline int +probe_exit(char op) { + struct event evt = {}; + struct file *fp; + struct dentry *dentry; + const __u8 *file_name; + + u32 tgid = bpf_get_current_pid_tgid() >> 32; + struct event_start *rs; + + rs = bpf_map_lookup_elem(&start, &tgid); + if (!rs) + return 0; + + u64 ts = bpf_ktime_get_ns(); + u64 duration = (ts - rs->ts) / 1000; + + bpf_printk("nfs operation duration: %lld", duration); + + evt.le = duration; + evt.op = op; + + // decode filename + fp = rs->fp; + dentry = BPF_CORE_READ(fp, f_path.dentry); + file_name = BPF_CORE_READ(dentry, d_name.name); + bpf_probe_read_kernel_str(evt.fname, sizeof(evt.fname), file_name); + bpf_printk("nfs op file_name: %s", evt.fname); + + struct event *ring_val; + ring_val = bpf_ringbuf_reserve(&hist_nfs_op_time_us, sizeof(evt), 0); + if (!ring_val) + return 0; + + memcpy(ring_val, &evt, sizeof(evt)); + bpf_ringbuf_submit(ring_val, 0); +} + +SEC("kprobe/nfs_file_read") +int BPF_KPROBE(nfs_file_read, struct kiocb *iocb) { + bpf_printk("nfs_file_read happened"); + struct file *fp = BPF_CORE_READ(iocb, ki_filp); + return probe_entry(fp); +} + +SEC("kretprobe/nfs_file_read") +int BPF_KRETPROBE(nfs_file_read_ret, ssize_t ret) { + bpf_printk("nfs_file_read returtned"); + return probe_exit('r'); +} + +SEC("kprobe/nfs_file_write") +int BPF_KPROBE(nfs_file_write, struct kiocb *iocb) { + bpf_printk("nfs_file_write happened"); + struct file *fp = BPF_CORE_READ(iocb, ki_filp); + return probe_entry(fp); +} + +SEC("kretprobe/nfs_file_write") +int BPF_KRETPROBE(nfs_file_write_ret, ssize_t ret) { + bpf_printk("nfs_file_write returtned"); + return probe_exit('w'); +} + +SEC("kprobe/nfs_file_open") +int BPF_KPROBE(nfs_file_open, struct inode *inode, struct file *fp) { + bpf_printk("nfs_file_open happened"); + return probe_entry(fp); +} + +SEC("kretprobe/nfs_file_open") +int BPF_KRETPROBE(nfs_file_open_ret, struct file *fp) { + bpf_printk("nfs_file_open returtned"); + return probe_exit('o'); +} diff --git a/pkg/cli/internal/commands/run/run.go b/pkg/cli/internal/commands/run/run.go index 632076a..21d66ac 100644 --- a/pkg/cli/internal/commands/run/run.go +++ b/pkg/cli/internal/commands/run/run.go @@ -7,6 +7,8 @@ import ( "io" "os" "os/signal" + "strconv" + "strings" "syscall" "github.com/cilium/ebpf/rlimit" @@ -27,14 +29,19 @@ import ( type runOptions struct { general *options.GeneralOptions - debug bool - filter []string - notty bool - pinMaps string - pinProgs string - promPort uint32 + debug bool + filter []string + histBuckets []string + histValueKey []string + notty bool + pinMaps string + pinProgs string + promPort uint32 } +const histBucketsDescription string = "Buckets to use for histogram maps. Format is \"map_name,\"" + + "where is a comma separated list of bucket limits. For example: \"events,[1,2,3,4,5]\"" + const filterDescription string = "Filter to apply to output from maps. Format is \"map_name,key_name,regex\" " + "You can define a filter per map, if more than one defined, the last defined filter will take precedence" @@ -43,6 +50,8 @@ var stopper chan os.Signal func addToFlags(flags *pflag.FlagSet, opts *runOptions) { flags.BoolVarP(&opts.debug, "debug", "d", false, "Create a log file 'debug.log' that provides debug logs of loader and TUI execution") flags.StringSliceVarP(&opts.filter, "filter", "f", []string{}, filterDescription) + flags.StringArrayVarP(&opts.histBuckets, "hist-buckets", "b", []string{}, histBucketsDescription) + flags.StringArrayVarP(&opts.histValueKey, "hist-value-key", "k", []string{}, "Key to use for histogram maps. Format is \"map_name,key_name\"") flags.BoolVar(&opts.notty, "no-tty", false, "Set to true for running without a tty allocated, so no interaction will be expected or rich output will done") flags.StringVar(&opts.pinMaps, "pin-maps", "", "Directory to pin maps to, left unpinned if empty") flags.StringVar(&opts.pinProgs, "pin-progs", "", "Directory to pin progs to, left unpinned if empty") @@ -72,6 +81,10 @@ $ bee run -f="events,comm,node" ghcr.io/solo-io/bumblebee/opensnoop:0.0.7 To run with multiple filters, use the --filter (or -f) flag multiple times: $ bee run -f="events_hash,daddr,1.1.1.1" -f="events_ring,daddr,1.1.1.1" ghcr.io/solo-io/bumblebee/tcpconnect:0.0.7 + +If your program has histogram output, you can supply the buckets using --buckets (or -b) flag: +TODO(albertlockett) add a program w/ histogram buckets as example +$ bee run -b="events,[1,2,3,4,5]" ghcr.io/solo-io/bumblebee/TODO:0.0.7 `, Args: cobra.ExactArgs(1), // Filename or image RunE: func(cmd *cobra.Command, args []string) error { @@ -118,6 +131,13 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { return fmt.Errorf("could not parse BPF program: %w", err) } + watchMapOptions, err := parseWatchMapOptions(opts) + if err != nil { + contextutils.LoggerFrom(ctx).Errorf("could not parse watch map options: %v", err) + return err + } + parsedELF.WatchedMapOptions = watchMapOptions + tuiApp, err := buildTuiApp(&progLoader, progLocation, opts.filter, parsedELF) if err != nil { return err @@ -244,3 +264,58 @@ func buildContext(ctx context.Context, debug bool) (context.Context, error) { return ctx, nil } + +func parseWatchMapOptions(runOpts *runOptions) (map[string]loader.WatchedMapOptions, error) { + watchMapOptions := make(map[string]loader.WatchedMapOptions) + + for _, bucket := range runOpts.histBuckets { + mapName, bucketLimits, err := parseBucket(bucket) + if err != nil { + return nil, err + } + watchMapOptions[mapName] = loader.WatchedMapOptions{ + HistBuckets: bucketLimits, + } + } + + for _, key := range runOpts.histValueKey { + split := strings.Index(key, ",") + if split == -1 { + return nil, fmt.Errorf("could not parse hist-value-key: %s", key) + } + mapName := key[:split] + valueKey := key[split+1:] + if _, ok := watchMapOptions[mapName]; !ok { + watchMapOptions[mapName] = loader.WatchedMapOptions{} + } + w := watchMapOptions[mapName] + w.HistValueKey = valueKey + watchMapOptions[mapName] = w + } + + return watchMapOptions, nil +} + +func parseBucket(bucket string) (string, []float64, error) { + split := strings.Index(bucket, ",") + if split == -1 { + return "", nil, fmt.Errorf("could not parse bucket: %s", bucket) + } + + mapName := bucket[:split] + bucketLimits := bucket[split+1:] + bucketLimits = strings.TrimPrefix(bucketLimits, "[") + bucketLimits = strings.TrimSuffix(bucketLimits, "]") + buckets := []float64{} + + for _, limit := range strings.Split(bucketLimits, ",") { + bval, err := strconv.ParseFloat(limit, 64) + if err != nil { + return "", nil, fmt.Errorf("could not parse bucket: %s from buckets %s", limit, bucket) + } + buckets = append(buckets, bval) + } + + return mapName, buckets, nil + +} diff --git a/pkg/decoder/decoder.go b/pkg/decoder/decoder.go index 795b0ac..97c349a 100644 --- a/pkg/decoder/decoder.go +++ b/pkg/decoder/decoder.go @@ -92,6 +92,9 @@ func (d *decoder) processSingleType(typ btf.Type) (interface{}, error) { case *btf.Int: switch typedMember.Encoding { case btf.Signed: + if typedMember.Name == "char" { + return d.handleChar(typedMember) + } return d.handleInt(typedMember) case btf.Bool: // TODO @@ -100,6 +103,9 @@ func (d *decoder) processSingleType(typ btf.Type) (interface{}, error) { // TODO return "", nil default: + if typedMember.Name == "unsigned char" { + return d.handleChar(typedMember) + } // Default encoding seems to be unsigned return d.handleUint(typedMember) } @@ -221,6 +227,18 @@ func (d *decoder) handleUint( return nil, errors.New("this should never happen") } +func (d *decoder) handleChar( + typedMember *btf.Int, +) (interface{}, error) { + buf := bytes.NewBuffer(d.raw[d.offset : d.offset+1]) + d.offset += 1 + var val byte + if err := binary.Read(buf, Endianess, &val); err != nil { + return nil, err + } + return string([]byte{val}), nil +} + func (d *decoder) handleInt( typedMember *btf.Int, ) (interface{}, error) { diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index 509e7cb..e1f8c7c 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -23,8 +23,9 @@ import ( ) type ParsedELF struct { - Spec *ebpf.CollectionSpec - WatchedMaps map[string]WatchedMap + Spec *ebpf.CollectionSpec + WatchedMaps map[string]WatchedMap + WatchedMapOptions map[string]WatchedMapOptions } type LoadOptions struct { @@ -37,7 +38,7 @@ type LoadOptions struct { type Loader interface { Parse(ctx context.Context, reader io.ReaderAt) (*ParsedELF, error) Load(ctx context.Context, opts *LoadOptions) error - WatchMaps(ctx context.Context, watchedMaps map[string]WatchedMap, coll map[string]*ebpf.Map, watcher MapWatcher) error + WatchMaps(ctx context.Context, watchedMaps map[string]WatchedMap, watchedMapOptions map[string]WatchedMapOptions, coll map[string]*ebpf.Map, watcher MapWatcher) error } type WatchedMap struct { @@ -50,6 +51,11 @@ type WatchedMap struct { valueStruct *btf.Struct } +type WatchedMapOptions struct { + HistValueKey string + HistBuckets []float64 +} + type loader struct { decoderFactory decoder.DecoderFactory metricsProvider stats.MetricsProvider @@ -66,9 +72,10 @@ func NewLoader( } const ( - counterMapPrefix = "counter_" - gaugeMapPrefix = "gauge_" - printMapPrefix = "print_" + counterMapPrefix = "counter_" + gaugeMapPrefix = "gauge_" + histogramMapPrefix = "hist_" + printMapPrefix = "print_" ) func isPrintMap(spec *ebpf.MapSpec) bool { @@ -83,8 +90,12 @@ func isCounterMap(spec *ebpf.MapSpec) bool { return strings.HasPrefix(spec.Name, counterMapPrefix) } +func isHistogramMap(spec *ebpf.MapSpec) bool { + return strings.HasPrefix(spec.Name, histogramMapPrefix) +} + func isTrackedMap(spec *ebpf.MapSpec) bool { - return isCounterMap(spec) || isGaugeMap(spec) || isPrintMap(spec) + return isCounterMap(spec) || isGaugeMap(spec) || isHistogramMap(spec) || isPrintMap(spec) } func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, error) { @@ -238,12 +249,13 @@ func (l *loader) Load(ctx context.Context, opts *LoadOptions) error { } } - return l.WatchMaps(ctx, opts.ParsedELF.WatchedMaps, coll.Maps, opts.Watcher) + return l.WatchMaps(ctx, opts.ParsedELF.WatchedMaps, opts.ParsedELF.WatchedMapOptions, coll.Maps, opts.Watcher) } func (l *loader) WatchMaps( ctx context.Context, watchedMaps map[string]WatchedMap, + watchedMapOptions map[string]WatchedMapOptions, maps map[string]*ebpf.Map, watcher MapWatcher, ) error { @@ -256,14 +268,40 @@ func (l *loader) WatchMaps( switch bpfMap.mapType { case ebpf.RingBuf: var increment stats.IncrementInstrument + var setIncrement stats.SetInstrument + var setKeyName string + if isCounterMap(bpfMap.mapSpec) { increment = l.metricsProvider.NewIncrementCounter(name, bpfMap.Labels) + } else if isHistogramMap(bpfMap.mapSpec) { + setKeyName = "le" + buckets := []float64{0, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000} + if opts, ok := watchedMapOptions[name]; ok { + if opts.HistBuckets != nil { + buckets = opts.HistBuckets + } + if opts.HistValueKey != "" { + setKeyName = opts.HistValueKey + } + } + histLabels := []string{} + for _, label := range bpfMap.Labels { + if label != setKeyName { + histLabels = append(histLabels, label) + } + } + + setIncrement = l.metricsProvider.NewHistogram(name, histLabels, buckets) } else if isPrintMap(bpfMap.mapSpec) { increment = &noop{} } eg.Go(func() error { watcher.NewRingBuf(name, bpfMap.Labels) - return l.startRingBuf(ctx, bpfMap.valueStruct, maps[name], increment, name, watcher) + if setIncrement != nil { + return l.startRingBufSet(ctx, bpfMap.valueStruct, maps[name], setIncrement, name, setKeyName, watcher) + } else { + return l.startRingBufIncrement(ctx, bpfMap.valueStruct, maps[name], increment, name, watcher) + } }) case ebpf.Array: fallthrough @@ -293,7 +331,7 @@ func (l *loader) WatchMaps( return err } -func (l *loader) startRingBuf( +func (l *loader) startRingBufIncrement( ctx context.Context, valueStruct *btf.Struct, liveMap *ebpf.Map, @@ -349,6 +387,73 @@ func (l *loader) startRingBuf( } } +func (l *loader) startRingBufSet( + ctx context.Context, + valueStruct *btf.Struct, + liveMap *ebpf.Map, + instrument stats.SetInstrument, + name string, + valueKey string, + watcher MapWatcher, +) error { + // Initialize decoder + d := l.decoderFactory() + logger := contextutils.LoggerFrom(ctx) + + // Open a ringbuf reader from userspace RINGBUF map described in the + // eBPF C program. + rd, err := ringbuf.NewReader(liveMap) + if err != nil { + return fmt.Errorf("opening ringbuf reader: %v", err) + } + defer rd.Close() + + // Close the reader when the process receives a signal, which will exit + // the read loop. + go func() { + <-ctx.Done() + logger.Info("in ringbuf set watcher, got done...") + if err := rd.Close(); err != nil { + logger.Infof("error while closing ringbuf '%s' reader: %s", name, err) + } + logger.Info("after reader.Close()") + }() + + for { + record, err := rd.Read() + if err != nil { + if errors.Is(err, ringbuf.ErrClosed) { + logger.Info("ringbuf closed...") + return nil + } + logger.Infof("error while reading from ringbuf '%s' reader: %s", name, err) + continue + } + result, err := d.DecodeBtfBinary(ctx, valueStruct, record.RawSample) + if err != nil { + return err + } + + intVal, ok := result[valueKey].(uint64) + if !ok { + return fmt.Errorf("value key '%s' is not a uint64", valueKey) + } + + stringLabels := stringify(result) + watcher.SendEntry(MapEntry{ + Name: name, + Entry: KvPair{ + Key: stringLabels, + Value: fmt.Sprint(intVal), + }, + }) + + delete(result, valueKey) + instrument.Set(ctx, int64(intVal), stringify(result)) + } + +} + func (l *loader) startHashMap( ctx context.Context, mapSpec *ebpf.MapSpec, diff --git a/pkg/stats/stats.go b/pkg/stats/stats.go index a2ca759..995e509 100644 --- a/pkg/stats/stats.go +++ b/pkg/stats/stats.go @@ -20,7 +20,7 @@ const ( type PrometheusOpts struct { Port uint32 MetricsPath string - Registry *prometheus.Registry + Registry *prometheus.Registry } func (p *PrometheusOpts) initDefaults() { @@ -66,6 +66,7 @@ type MetricsProvider interface { NewSetCounter(name string, labels []string) SetInstrument NewIncrementCounter(name string, labels []string) IncrementInstrument NewGauge(name string, labels []string) SetInstrument + NewHistogram(name string, labels []string, buckets []float64) SetInstrument } type IncrementInstrument interface { @@ -76,7 +77,7 @@ type SetInstrument interface { Set(ctx context.Context, val int64, labels map[string]string) } -type metricsProvider struct{ +type metricsProvider struct { registry *prometheus.Registry } @@ -117,6 +118,20 @@ func (m *metricsProvider) NewGauge(name string, labels []string) SetInstrument { } } +func (m *metricsProvider) NewHistogram(name string, labels []string, buckets []float64) SetInstrument { + h := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: ebpfNamespace, + Name: name, + Buckets: buckets, + }, labels) + + m.register(h) + return &histogram{ + histogram: h, + } + +} + func (m *metricsProvider) register(collectors ...prometheus.Collector) { if m.registry != nil { m.registry.MustRegister(collectors...) @@ -172,3 +187,15 @@ func (g *gauge) Set( ) { g.gauge.With(prometheus.Labels(decodedKey)).Set(float64(intVal)) } + +type histogram struct { + histogram *prometheus.HistogramVec +} + +func (h *histogram) Set( + ctx context.Context, + intVal int64, + decodedKey map[string]string, +) { + h.histogram.With(prometheus.Labels(decodedKey)).Observe(float64(intVal)) +}