diff --git a/go.mod b/go.mod index 3917f59f6e..5bcb86d9a3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/grafana/alloy go 1.25.1 require ( + buf.build/gen/go/parca-dev/parca/grpc/go v1.5.1-20250212095114-4db6f2d46517.2 + buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.6-20250212095114-4db6f2d46517.1 cloud.google.com/go/pubsub v1.50.1 cloud.google.com/go/pubsub/v2 v2.0.0 connectrpc.com/connect v1.18.1 @@ -38,7 +40,7 @@ require ( github.com/docker/docker v28.5.1+incompatible github.com/docker/go-connections v0.6.0 github.com/drone/envsubst/v2 v2.0.0-20210730161058-179042472c46 - github.com/elastic/go-freelru v0.16.0 // indirect + github.com/elastic/go-freelru v0.16.0 github.com/fatih/color v1.18.0 github.com/fortytw2/leaktest v1.3.0 github.com/fsnotify/fsnotify v1.9.0 @@ -77,7 +79,7 @@ require ( github.com/grafana/pyroscope-go/godeltaprof v0.1.8 github.com/grafana/pyroscope/api v1.2.0 github.com/grafana/pyroscope/ebpf v0.4.11 - github.com/grafana/pyroscope/lidia v0.0.0-20250716102313-506840f4afcd + github.com/grafana/pyroscope/lidia v0.0.0-20250716102313-506840f4afcd // indirect github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 github.com/grafana/snowflake-prometheus-exporter v0.0.0-20251023151319-9baba332b98a github.com/grafana/vmware_exporter v0.0.5-beta.0.20250218170317-73398ba08329 @@ -304,10 +306,10 @@ require ( gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 gotest.tools v2.2.0+incompatible - k8s.io/api v0.34.1 - k8s.io/apimachinery v0.34.1 - k8s.io/client-go v0.34.1 - k8s.io/component-base v0.34.1 + k8s.io/api v0.34.2 + k8s.io/apimachinery v0.34.2 + k8s.io/client-go v0.34.2 + k8s.io/component-base v0.34.2 k8s.io/klog/v2 v2.130.1 k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 sigs.k8s.io/controller-runtime v0.22.2 diff --git a/go.sum b/go.sum index 115ffd7c8d..931d7f20bd 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +buf.build/gen/go/parca-dev/parca/grpc/go v1.5.1-20250212095114-4db6f2d46517.2 h1:AiTEEAQ8AHE5udDizcA2r3GNjxxlk4f2Buot/hA2oYM= +buf.build/gen/go/parca-dev/parca/grpc/go v1.5.1-20250212095114-4db6f2d46517.2/go.mod h1:U8BtFPtz71GSALR7K7ALn39RnvrKsD++lJzMI5Gf4fs= +buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.6-20250212095114-4db6f2d46517.1 h1:gXHJuGlWoXUkH9O9Qxw9skNAhIDgNgJx0tYSbI8fjIo= +buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.6-20250212095114-4db6f2d46517.1/go.mod h1:58eXMQL4tavOGzyXIveWU4f3yTFGmLIHxM9uk11DOIo= cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY= cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= @@ -3237,16 +3241,16 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0= howett.net/plist v1.0.0 h1:7CrbWYbPPO/PyNy38b2EB/+gYbjCe2DXBxgtOOZbSQM= howett.net/plist v1.0.0/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g= -k8s.io/api v0.34.1 h1:jC+153630BMdlFukegoEL8E/yT7aLyQkIVuwhmwDgJM= -k8s.io/api v0.34.1/go.mod h1:SB80FxFtXn5/gwzCoN6QCtPD7Vbu5w2n1S0J5gFfTYk= +k8s.io/api v0.34.2 h1:fsSUNZhV+bnL6Aqrp6O7lMTy6o5x2C4XLjnh//8SLYY= +k8s.io/api v0.34.2/go.mod h1:MMBPaWlED2a8w4RSeanD76f7opUoypY8TFYkSM+3XHw= k8s.io/apiextensions-apiserver v0.34.1 h1:NNPBva8FNAPt1iSVwIE0FsdrVriRXMsaWFMqJbII2CI= k8s.io/apiextensions-apiserver v0.34.1/go.mod h1:hP9Rld3zF5Ay2Of3BeEpLAToP+l4s5UlxiHfqRaRcMc= -k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= -k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/client-go v0.34.1 h1:ZUPJKgXsnKwVwmKKdPfw4tB58+7/Ik3CrjOEhsiZ7mY= -k8s.io/client-go v0.34.1/go.mod h1:kA8v0FP+tk6sZA0yKLRG67LWjqufAoSHA2xVGKw9Of8= -k8s.io/component-base v0.34.1 h1:v7xFgG+ONhytZNFpIz5/kecwD+sUhVE6HU7qQUiRM4A= -k8s.io/component-base v0.34.1/go.mod h1:mknCpLlTSKHzAQJJnnHVKqjxR7gBeHRv0rPXA7gdtQ0= +k8s.io/apimachinery v0.34.2 h1:zQ12Uk3eMHPxrsbUJgNF8bTauTVR2WgqJsTmwTE/NW4= +k8s.io/apimachinery v0.34.2/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/client-go v0.34.2 h1:Co6XiknN+uUZqiddlfAjT68184/37PS4QAzYvQvDR8M= +k8s.io/client-go v0.34.2/go.mod h1:2VYDl1XXJsdcAxw7BenFslRQX28Dxz91U9MWKjX97fE= +k8s.io/component-base v0.34.2 h1:HQRqK9x2sSAsd8+R4xxRirlTjowsg6fWCPwWYeSvogQ= +k8s.io/component-base v0.34.2/go.mod h1:9xw2FHJavUHBFpiGkZoKuYZ5pdtLKe97DEByaA+hHbM= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE= diff --git a/internal/component/pyroscope/appender.go b/internal/component/pyroscope/appender.go index a551263c2c..75ce18e74b 100644 --- a/internal/component/pyroscope/appender.go +++ b/internal/component/pyroscope/appender.go @@ -28,6 +28,7 @@ type Appendable interface { type Appender interface { Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error AppendIngest(ctx context.Context, profile *IncomingProfile) error + UploadDebugInfo(ctx context.Context, arg DebugInfoData) } type RawSample struct { @@ -116,6 +117,12 @@ type appender struct { writeLatency prometheus.Histogram } +func (a *appender) UploadDebugInfo(ctx context.Context, arg DebugInfoData) { + for _, c := range a.children { + c.UploadDebugInfo(ctx, arg) + } +} + // Append satisfies the Appender interface. func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error { now := time.Now() @@ -155,34 +162,3 @@ func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) e } return multiErr } - -type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error - -func (f AppendableFunc) Appender() Appender { - return f -} - -func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error { - return f(ctx, labels, samples) -} - -func (f AppendableFunc) AppendIngest(_ context.Context, _ *IncomingProfile) error { - // This is a no-op implementation - return nil -} - -// For testing AppendIngest operations -type AppendableIngestFunc func(ctx context.Context, profile *IncomingProfile) error - -func (f AppendableIngestFunc) Appender() Appender { - return f -} - -func (f AppendableIngestFunc) AppendIngest(ctx context.Context, p *IncomingProfile) error { - return f(ctx, p) -} - -func (f AppendableIngestFunc) Append(_ context.Context, _ labels.Labels, _ []*RawSample) error { - // This is a no-op implementation - return nil -} diff --git a/internal/component/pyroscope/appender_mock.go b/internal/component/pyroscope/appender_mock.go new file mode 100644 index 0000000000..c9a7f4d655 --- /dev/null +++ b/internal/component/pyroscope/appender_mock.go @@ -0,0 +1,41 @@ +package pyroscope + +import ( + "context" + + "github.com/prometheus/prometheus/model/labels" +) + +type AppenderMock struct { + AppendIngestFunc func(ctx context.Context, profile *IncomingProfile) error + AppendFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error + UploadDebugInfoFunc func(ctx context.Context, arg DebugInfoData) +} + +func (a AppenderMock) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error { + return a.AppendFunc(ctx, labels, samples) +} + +func (a AppenderMock) AppendIngest(ctx context.Context, profile *IncomingProfile) error { + return a.AppendIngestFunc(ctx, profile) +} + +func (a AppenderMock) UploadDebugInfo(ctx context.Context, arg DebugInfoData) { + a.UploadDebugInfoFunc(ctx, arg) +} + +func (a AppenderMock) Appender() Appender { + return a +} + +func AppendableFunc(f func(ctx context.Context, labels labels.Labels, samples []*RawSample) error) AppenderMock { + return AppenderMock{ + AppendFunc: f, + } +} + +func AppendableIngestFunc(f func(ctx context.Context, profile *IncomingProfile) error) AppenderMock { + return AppenderMock{ + AppendIngestFunc: f, + } +} diff --git a/internal/component/pyroscope/debuginfo_impl.go b/internal/component/pyroscope/debuginfo_impl.go new file mode 100644 index 0000000000..8d6d990381 --- /dev/null +++ b/internal/component/pyroscope/debuginfo_impl.go @@ -0,0 +1,15 @@ +//go:build linux && (arm64 || amd64) + +package pyroscope + +import ( + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/ebpf-profiler/process" +) + +type DebugInfoData struct { + FileID libpf.FileID + FileName string + BuildID string + Open func() (process.ReadAtCloser, error) +} diff --git a/internal/component/pyroscope/debuginfo_stub.go b/internal/component/pyroscope/debuginfo_stub.go new file mode 100644 index 0000000000..0d5af7a259 --- /dev/null +++ b/internal/component/pyroscope/debuginfo_stub.go @@ -0,0 +1,5 @@ +//go:build !(linux && (arm64 || amd64)) + +package pyroscope + +type DebugInfoData struct{} diff --git a/internal/component/pyroscope/ebpf/ebpf_linux.go b/internal/component/pyroscope/ebpf/ebpf_linux.go index 49fea0c1b5..34bd795f70 100644 --- a/internal/component/pyroscope/ebpf/ebpf_linux.go +++ b/internal/component/pyroscope/ebpf/ebpf_linux.go @@ -17,16 +17,17 @@ import ( "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/component/pyroscope/ebpf/reporter" "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/pyroscope/lidia" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "go.opentelemetry.io/ebpf-profiler/interpreter/python" ebpfmetrics "go.opentelemetry.io/ebpf-profiler/metrics" + "go.opentelemetry.io/ebpf-profiler/process" discovery2 "go.opentelemetry.io/ebpf-profiler/pyroscope/discovery" "go.opentelemetry.io/ebpf-profiler/pyroscope/dynamicprofiling" "go.opentelemetry.io/ebpf-profiler/pyroscope/internalshim/controller" "go.opentelemetry.io/ebpf-profiler/pyroscope/symb/irsymcache" + reporter2 "go.opentelemetry.io/ebpf-profiler/reporter" metricnoop "go.opentelemetry.io/otel/metric/noop" ) @@ -57,20 +58,6 @@ func New(logger log.Logger, reg prometheus.Registerer, id string, args Arguments appendable := pyroscope.NewFanout(args.ForwardTo, id, reg) - nfs, err := irsymcache.NewFSCache(irsymcache.TableTableFactory{ - Options: []lidia.Option{ - lidia.WithFiles(), - lidia.WithLines(), - }, - }, irsymcache.Options{ - SizeEntries: uint32(args.SymbCacheSizeEntries), - Path: args.SymbCachePath, - }) - if err != nil { - return nil, err - } - cfg.ExecutableReporter = nfs - if dynamicProfilingPolicy { cfg.Policy = &dynamicprofiling.ServiceDiscoveryTargetsOnlyPolicy{Discovery: discovery} } else { @@ -88,16 +75,45 @@ func New(logger log.Logger, reg prometheus.Registerer, id string, args Arguments argsUpdate: make(chan Arguments, 4), } - cfg.Reporter = reporter.NewPPROF(logger, &reporter.Config{ + r := reporter.NewPPROF(logger, &reporter.Config{ ReportInterval: cfg.ReporterInterval, SamplesPerSecond: int64(cfg.SamplesPerSecond), Demangle: args.Demangle, ReporterUnsymbolizedStubs: args.ReporterUnsymbolizedStubs, - ExtraNativeSymbolResolver: nfs, - Consumer: reporter.PPROFConsumerFunc(func(ctx context.Context, ps []reporter.PPROF) { - res.sendProfiles(ctx, ps) - }), - }, discovery) + }, discovery, func(ctx context.Context, ps []reporter.PPROF) { + res.sendProfiles(ctx, ps) + }) + cfg.Reporter = r + cfg.ExecutableReporter = ExecutableReporterFunc(func(args *reporter2.ExecutableMetadata) { + if !args.MappingFile.Valid() { + return + } + mf := args.MappingFile.Value() + arg := pyroscope.DebugInfoData{ + FileID: mf.FileID, + FileName: mf.FileName.String(), + BuildID: mf.GnuBuildID, + Open: func() (process.ReadAtCloser, error) { + fallback := func() (process.ReadAtCloser, error) { + return args.Process.OpenMappingFile(args.Mapping) + } + if args.DebuglinkFileName == "" { + return fallback() + } + if file, err := args.Process.ExtractAsFile(args.DebuglinkFileName); err != nil { + return fallback() + } else { + if f, err := os.Open(file); err != nil { + return fallback() + } else { + return f, nil + } + } + }, + } + res.appendable.Appender().UploadDebugInfo(context.Background(), arg) + }) + // todo, should we keep the ontarget lidia symbolizer for a while? if cfg.VerboseMode { logrus.SetLevel(logrus.DebugLevel) } @@ -105,6 +121,12 @@ func New(logger log.Logger, reg prometheus.Registerer, id string, args Arguments return res, nil } +type ExecutableReporterFunc func(md *reporter2.ExecutableMetadata) + +func (e ExecutableReporterFunc) ReportExecutable(md *reporter2.ExecutableMetadata) { + e(md) +} + type Component struct { logger log.Logger args Arguments @@ -158,6 +180,7 @@ func (c *Component) Run(ctx context.Context) error { }() var g run.Group + g.Add(func() error { for { select { diff --git a/internal/component/pyroscope/ebpf/metrics.go b/internal/component/pyroscope/ebpf/metrics.go index 259ba78243..da0fa8691d 100644 --- a/internal/component/pyroscope/ebpf/metrics.go +++ b/internal/component/pyroscope/ebpf/metrics.go @@ -19,6 +19,7 @@ type metrics struct { pprofSamplesTotal *prometheus.CounterVec ebpfMetrics *ebpfmetrics.Metrics pprofsDroppedTotal prometheus.Counter + //debugInfoUploadBytes prometheus.Counter } func newMetrics(reg prometheus.Registerer) *metrics { @@ -51,6 +52,10 @@ func newMetrics(reg prometheus.Registerer) *metrics { Name: "pyroscope_ebpf_pprof_samples_total", Help: "Total number of pprof profiles collected by the ebpf component", }, []string{"service_name"}), + //debugInfoUploadBytes: prometheus.NewCounter(prometheus.CounterOpts{ + // Name: "pyroscope_ebpf_debug_info_upload_bytes_total", + // Help: "Total number of bytes uploaded to the debug info endpoint", + //}), ebpfMetrics: ebpfmetrics.New(reg), } @@ -62,6 +67,7 @@ func newMetrics(reg prometheus.Registerer) *metrics { m.pprofBytesTotal = util.MustRegisterOrGet(reg, m.pprofBytesTotal).(*prometheus.CounterVec) m.pprofSamplesTotal = util.MustRegisterOrGet(reg, m.pprofSamplesTotal).(*prometheus.CounterVec) m.pprofsDroppedTotal = util.MustRegisterOrGet(reg, m.pprofsDroppedTotal).(prometheus.Counter) + //m.debugInfoUploadBytes = util.MustRegisterOrGet(reg, m.debugInfoUploadBytes).(prometheus.Counter) } return m diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/elfwriter.go b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/elfwriter.go new file mode 100644 index 0000000000..813ce54656 --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/elfwriter.go @@ -0,0 +1,857 @@ +// Copyright (C) 2025 go-delve, parca-agent +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// ----------------------------------------------------------------------------- + +// Package elfwriter is a package to write ELF files without having their entire +// contents in memory at any one time. +// +// Original work started from https://github.com/go-delve/delve/blob/master/pkg/elfwriter/writer.go +// and additional functionality added on top. +// +// This package does not provide completeness guarantees. Some of the missing features: +// - Consistency and soundness of relocations +// - Consistency and preservation of linked sections (when target removed (sh_link)) - partially supported +// - Consistency and existence of overlapping segments when a section removed (offset, range check) +package elfwriter + +import ( + "bytes" + "debug/elf" + "encoding/binary" + "errors" + "fmt" + "io" + + "golang.org/x/sys/unix" +) + +const sectionHeaderStrTable = ".shstrtab" + +// http://www.sco.com/developers/gabi/2003-12-17/ch4.sheader.html#special_sections +// - Figure 4-12 +// The list is incomplete list. +var specialSectionLinks = map[string]string{ + // Source - Target + ".symtab": ".strtab", + ".dynsym": ".dynstr", +} + +// TODO(kakkoyun): Remove FilteringWriter and remove this interface and pattern. +type sectionReaderProvider interface { + sectionReader(section elf.Section) (io.Reader, error) +} + +type sectionReaderProviderFn func(section elf.Section) (io.Reader, error) + +func (fn sectionReaderProviderFn) sectionReader(section elf.Section) (io.Reader, error) { + return fn(section) +} + +// Writer writes ELF files. +type Writer struct { + dst io.WriteSeeker + fhdr *elf.FileHeader + compressDWARFSections bool + + srProvider sectionReaderProvider + + // Program headers to write in the underlying io.WriteSeeker. + progs []*elf.Prog + // sections to write in the underlying io.WriteSeeker. + sections []*elf.Section + // sections to write in the underlying io.WriteSeeker without data. + sectionHeaders []elf.SectionHeader + // additional notes to write in the underlying io.WriteSeeker. + additionalNotes []Note + + // Source - Target + sectionLinks map[string]string + + err error + + seekProgHeader int64 // position of phoff + seekProgNum int64 // position of phnum + seekSectionHeader int64 // position of shoff + seekSectionNum int64 // position of shnun + seekSectionStringIdx int64 // position of shstrndx + seekSectionEntrySize int64 + + // For validation. + ehsize, phentsize, shentsize uint16 + shnum, shoff, shstrndx int + + shStrIdx map[string]int +} + +// newWriter creates a new Writer. +func newWriter(w io.WriteSeeker, fhdr *elf.FileHeader, srp sectionReaderProvider, opts ...Option) (*Writer, error) { + if fhdr.ByteOrder == nil { + return nil, errors.New("byte order has to be specified") + } + + switch fhdr.Class { + case elf.ELFCLASS32: + case elf.ELFCLASS64: + // Ok + case elf.ELFCLASSNONE: + fallthrough + default: + return nil, errors.New("unknown ELF class") + } + + sectionLinks := make(map[string]string) + for k, v := range specialSectionLinks { + sectionLinks[k] = v + } + wrt := &Writer{ + dst: w, + fhdr: fhdr, + srProvider: srp, + + shStrIdx: make(map[string]int), + sectionLinks: sectionLinks, + } + for _, opt := range opts { + opt(wrt) + } + return wrt, nil +} + +type Note struct { + Type elf.NType + Name string + Data []byte +} + +// Flush writes any buffered data to the underlying io.WriterSeeker. +func (w *Writer) Flush() error { + // +-------------------------------+ + // | ELF File Header | + // +-------------------------------+ + // | Program Header for segment #1 | + // +-------------------------------+ + // | Program Header for segment #2 | + // +-------------------------------+ + // | ... | + // +-------------------------------+ + // | Contents (Byte Stream) | + // | ... | + // +-------------------------------+ + // | Section Header for section #1 | + // +-------------------------------+ + // | Section Header for section #2 | + // +-------------------------------+ + // | ... | + // +-------------------------------+ + // | ".shstrtab" section | + // +-------------------------------+ + // | ".symtab" section | + // +-------------------------------+ + // | ".strtab" section | + // +-------------------------------+ + + // 1. File Header + // 2. Program Header Table + // 3. sections + // 4. Section Header Table + w.writeFileHeader() + if w.err != nil { + return fmt.Errorf("failed to write file header: %w", w.err) + } + w.writeNotes() + if w.err != nil { + return fmt.Errorf("failed to write notes: %w", w.err) + } + w.writeSegments() + if w.err != nil { + return fmt.Errorf("failed to write segments: %w", w.err) + } + w.writeSections() + if w.err != nil { + return fmt.Errorf("failed to write sections: %w", w.err) + } + + if w.shoff == 0 && w.shnum != 0 { + return fmt.Errorf("invalid ELF shnum=%d for shoff=0", w.shnum) + } + if w.shnum > 0 && w.shstrndx >= w.shnum { + return fmt.Errorf("invalid ELF shstrndx=%d", w.shstrndx) + } + return nil +} + +// Reset discards any unflushed buffered data, clears any error, and resets data to write its output to dst. +func (w *Writer) Reset(ws io.WriteSeeker) { + w.dst = ws + w.err = nil + + w.progs = nil + w.sections = nil + w.sectionHeaders = nil + w.additionalNotes = nil + + w.seekProgHeader = 0 + w.seekProgNum = 0 + w.seekSectionHeader = 0 + w.seekSectionNum = 0 + w.seekSectionStringIdx = 0 + w.seekSectionEntrySize = 0 + + w.ehsize = 0 + w.phentsize = 0 + w.shentsize = 0 + + w.shnum = 0 + w.shoff = 0 + w.shstrndx = 0 + w.shStrIdx = make(map[string]int) +} + +// AddNotes adds additional notes to write to in the underlying io.WriterSeeker. +func (w *Writer) AddNotes(additionalNotes ...Note) { + w.additionalNotes = append(w.additionalNotes, additionalNotes...) +} + +// writeFileHeader writes the initial file header using given information. +func (w *Writer) writeFileHeader() { + fhdr := w.fhdr + + switch fhdr.Class { + case elf.ELFCLASS32: + w.ehsize = 52 + w.phentsize = 32 + w.shentsize = 40 + case elf.ELFCLASS64: + w.ehsize = 64 + w.phentsize = 56 + w.shentsize = 64 + case elf.ELFCLASSNONE: + fallthrough + default: + w.err = fmt.Errorf("unknown ELF class: %v", w.fhdr.Class) + return + } + + // e_ident + w.write([]byte{ + 0x7f, 'E', 'L', 'F', // Magic number + byte(fhdr.Class), + byte(fhdr.Data), + byte(fhdr.Version), + byte(fhdr.OSABI), + fhdr.ABIVersion, + 0, 0, 0, 0, 0, 0, 0, // Padding + }) + + switch fhdr.Class { + case elf.ELFCLASS32: + // type Header32 struct { + // Ident [EI_NIDENT]byte /* File identification. */ + // Type uint16 /* File type. */ + // Machine uint16 /* Machine architecture. */ + // Version uint32 /* ELF format version. */ + // Entry uint32 /* Entry point. */ + // Phoff uint32 /* Program header file offset. */ + // Shoff uint32 /* Section header file offset. */ + // Flags uint32 /* Architecture-specific flags. */ + // Ehsize uint16 /* Size of ELF header in bytes. */ + // Phentsize uint16 /* Size of program header entry. */ + // Phnum uint16 /* Number of program header entries. */ + // Shentsize uint16 /* Size of section header entry. */ + // Shnum uint16 /* Number of section header entries. */ + // Shstrndx uint16 /* Section name strings section. */ + // } + w.u16(uint16(fhdr.Type)) // e_type + w.u16(uint16(fhdr.Machine)) // e_machine + w.u32(uint32(fhdr.Version)) // e_version + w.u32(uint32(0)) // e_entry + w.seekProgHeader = w.here() + w.u32(uint32(0)) // e_phoff + w.seekSectionHeader = w.here() + w.u32(uint32(0)) // e_shoff + w.u32(uint32(0)) // e_flags + w.u16(w.ehsize) // e_ehsize + w.u16(w.phentsize) // e_phentsize + w.seekProgNum = w.here() + w.u16(uint16(0)) // e_phnum + w.seekSectionEntrySize = w.here() + w.u16(w.shentsize) // e_shentsize + w.seekSectionNum = w.here() + w.u16(uint16(0)) // e_shnum + w.seekSectionStringIdx = w.here() + w.u16(uint16(elf.SHN_UNDEF)) // e_shstrndx + case elf.ELFCLASS64: + // type Header64 struct { + // Ident [EI_NIDENT]byte /* File identification. */ + // Type uint16 /* File type. */ + // Machine uint16 /* Machine architecture. */ + // Version uint32 /* ELF format version. */ + // Entry uint64 /* Entry point. */ + // Phoff uint64 /* Program header file offset. */ + // Shoff uint64 /* Section header file offset. */ + // Flags uint32 /* Architecture-specific flags. */ + // Ehsize uint16 /* Size of ELF header in bytes. */ + // Phentsize uint16 /* Size of program header entry. */ + // Phnum uint16 /* Number of program header entries. */ + // Shentsize uint16 /* Size of section header entry. */ + // Shnum uint16 /* Number of section header entries. */ + // Shstrndx uint16 /* Section name strings section. */ + // } + w.u16(uint16(fhdr.Type)) // e_type + w.u16(uint16(fhdr.Machine)) // e_machine + w.u32(uint32(fhdr.Version)) // e_version + w.u64(uint64(0)) // e_entry + w.seekProgHeader = w.here() + w.u64(uint64(0)) // e_phoff + w.seekSectionHeader = w.here() + w.u64(uint64(0)) // e_shoff + w.u32(uint32(0)) // e_flags + w.u16(w.ehsize) // e_ehsize + w.u16(w.phentsize) // e_phentsize + w.seekProgNum = w.here() + w.u16(uint16(0)) // e_phnum + w.seekSectionEntrySize = w.here() + w.u16(w.shentsize) // e_shentsize + w.seekSectionNum = w.here() + w.u16(uint16(0)) // e_shnum + w.seekSectionStringIdx = w.here() + w.u16(uint16(elf.SHN_UNDEF)) // e_shstrndx + case elf.ELFCLASSNONE: + fallthrough + default: + w.err = fmt.Errorf("unknown ELF class: %v", w.fhdr.Class) + } + + // Sanity check, size of file header should be the same as ehsize + if sz, _ := w.dst.Seek(0, io.SeekCurrent); sz != int64(w.ehsize) { + w.err = errors.New("internal error, ELF header size") + } +} + +// writeNotes writes notes to the current location, and adds a ProgHeader describing the notes. +func (w *Writer) writeNotes() { + // http://www.sco.com/developers/gabi/latest/ch5.pheader.html#note_section + if len(w.additionalNotes) == 0 { + return + } + + notes := w.additionalNotes + h := &elf.ProgHeader{ + Type: elf.PT_NOTE, + } + + write32 := func(note *Note) { + // Note header in a PT_NOTE section + // typedef struct elf32_note { + // Elf32_Word n_namesz; /* Name size */ + // Elf32_Word n_descsz; /* Content size */ + // Elf32_Word n_type; /* Content type */ + // } Elf32_Nhdr; + // + align := uint64(4) + h.Align = align + w.align(int64(align)) + if h.Off == 0 { + h.Off = uint64(w.here()) + } + w.u32(uint32(len(note.Name))) // n_namesz + w.u32(uint32(len(note.Data))) // n_descsz + w.u32(uint32(note.Type)) // n_type + w.write([]byte(note.Name)) + w.align(int64(align)) + w.write(note.Data) + } + + write64 := func(note *Note) { + // TODO: This might be incorrect. (At least for Go). + // - https://github.com/google/pprof/blob/d04f2422c8a17569c14e84da0fae252d9529826b/internal/elfexec/elfexec.go#L56-L58 + + // Note header in a PT_NOTE section + // typedef struct elf64_note { + // Elf64_Word n_namesz; /* Name size */ + // Elf64_Word n_descsz; /* Content size */ + // Elf64_Word n_type; /* Content type */ + // } Elf64_Nhdr; + align := uint64(8) + h.Align = align + w.align(int64(align)) + if h.Off == 0 { + h.Off = uint64(w.here()) + } + w.u64(uint64(len(note.Name))) // n_namesz + w.u64(uint64(len(note.Data))) // n_descsz + w.u64(uint64(note.Type)) // n_type + w.write([]byte(note.Name)) + w.align(int64(align)) + w.write(note.Data) + } + + var write func(note *Note) + switch w.fhdr.Class { + case elf.ELFCLASS32: + write = write32 + case elf.ELFCLASS64: + write = write64 + case elf.ELFCLASSNONE: + fallthrough + default: + w.err = fmt.Errorf("unknown ELF class: %v", w.fhdr.Class) + } + + for i := range notes { + write(¬es[i]) + } + h.Filesz = uint64(w.here()) - h.Off + w.progs = append(w.progs, &elf.Prog{ProgHeader: *h}) +} + +// writeSegments writes the program headers at the current location +// and patches the file header accordingly. +func (w *Writer) writeSegments() { + if len(w.progs) == 0 { + return + } + + // http://www.sco.com/developers/gabi/latest/ch5.pheader.html + phoff := w.here() + phnum := uint64(len(w.progs)) + + // Patch file header. + w.seek(w.seekProgHeader, io.SeekStart) + w.u64(uint64(phoff)) + w.seek(w.seekProgNum, io.SeekStart) + w.u64(phnum) // e_phnum + w.seek(0, io.SeekEnd) + + writePH32 := func(prog *elf.Prog) { + // ELF32 Program header. + // type Prog32 struct { + // Type uint32 /* Entry type. */ + // Off uint32 /* File offset of contents. */ + // Vaddr uint32 /* Virtual address in memory image. */ + // Paddr uint32 /* Physical address (not used). */ + // Filesz uint32 /* Size of contents in file. */ + // Memsz uint32 /* Size of contents in memory. */ + // Flags uint32 /* Access permission flags. */ + // Align uint32 /* Alignment in memory and file. */ + // } + w.u32(uint32(prog.Type)) + w.u32(uint32(prog.Off)) + w.u32(uint32(prog.Vaddr)) + w.u32(uint32(prog.Paddr)) + w.u32(uint32(prog.Filesz)) + w.u32(uint32(prog.Memsz)) + w.u32(uint32(prog.Flags)) + w.u32(uint32(prog.Align)) + } + + writePH64 := func(prog *elf.Prog) { + // ELF64 Program header. + // type Prog64 struct { + // Type uint32 /* Entry type. */ + // Flags uint32 /* Access permission flags. */ + // Off uint64 /* File offset of contents. */ + // Vaddr uint64 /* Virtual address in memory image. */ + // Paddr uint64 /* Physical address (not used). */ + // Filesz uint64 /* Size of contents in file. */ + // Memsz uint64 /* Size of contents in memory. */ + // Align uint64 /* Alignment in memory and file. */ + // } + w.u32(uint32(prog.Type)) + w.u32(uint32(prog.Flags)) + w.u64(prog.Off) + w.u64(prog.Vaddr) + w.u64(prog.Paddr) + w.u64(prog.Filesz) + w.u64(prog.Memsz) + w.u64(prog.Align) + } + + var writeProgramHeader func(prog *elf.Prog) + switch w.fhdr.Class { + case elf.ELFCLASS32: + writeProgramHeader = writePH32 + case elf.ELFCLASS64: + writeProgramHeader = writePH64 + case elf.ELFCLASSNONE: + fallthrough + default: + w.err = fmt.Errorf("unknown ELF class: %v", w.fhdr.Class) + } + + for _, prog := range w.progs { + // Write program header to program header table. + writeProgramHeader(prog) + } +} + +// writeSections writes the sections at the current location +// and patches the file header accordingly. +func (w *Writer) writeSections() { + if len(w.sections) == 0 { + return + } + // http://www.sco.com/developers/gabi/2003-12-17/ch4.sheader.html + // +-------------------+ + // | ELF header |---+ e_shoff + // +-------------------+ | + // | Section 0 |<-----+ + // +-------------------+ | | sh_offset + // | Section 1 |<--|-----+ + // +-------------------+ | | | + // | Section 2 |<--|--|--|--+ + // +---------> +-------------------+ | | | | + // | | |<--+ | | | + // | Section | Section header 0 | | | | + // | | |<-----+ | | + // | Header +-------------------+ | | + // | | Section header 1 |<--------+ | + // | Table +-------------------+ | + // | | Section header 2 |------------+ sh_offset + // +---------> +-------------------+ + + // Shallow copy the section for further editing. + copySection := func(s *elf.Section) *elf.Section { + clone := new(elf.Section) + *clone = *s + return clone + } + + // sections that will end up in the output. + stw := make([]*elf.Section, 0, len(w.sections)+2) + + // Build section header string table. + shstrtab := new(elf.Section) + shstrtab.Name = sectionHeaderStrTable + shstrtab.Type = elf.SHT_STRTAB + shstrtab.Addralign = 1 + + sectionNameIdx := make(map[string]int) + i := 0 + for _, sec := range w.sections { + if i == 0 { + if sec.Type == elf.SHT_NULL { + stw = append(stw, copySection(sec)) + i++ + continue + } + s := new(elf.Section) + s.Type = elf.SHT_NULL + stw = append(stw, s) + i++ + } + // There can be more than one section with the same name. + // The relevant section header string table is the one denoted by the elf header's shstrndx field. + // For our use case, we can skip the section header string table, + // because it will be replaced by the one we are building. + // http://www.sco.com/developers/gabi/latest/ch4.sheader.html#shstrndx + if isSectionStringTable(sec) { + continue + } + stw = append(stw, copySection(sec)) + sectionNameIdx[sec.Name] = i + i++ + } + for _, sh := range w.sectionHeaders { + // NOTICE: elf.Section.Open suppose to return a zero reader if the section type is no bits. + // However it doesn't respect SHT_NOBITS, so better to set the size to 0. + sh.Type = elf.SHT_NOBITS + sh.Size = 0 + sh.FileSize = 0 + i, ok := sectionNameIdx[sh.Name] + if ok { + stw[i] = &elf.Section{SectionHeader: sh} + } else { + stw = append(stw, &elf.Section{SectionHeader: sh}) + } + } + if w.shstrndx == 0 { + stw = append(stw, shstrtab) + w.shstrndx = len(stw) - 1 + sectionNameIdx[sectionHeaderStrTable] = w.shstrndx + } + + shnum := len(stw) + w.shnum = shnum + + names := make([]string, shnum) + for i, sec := range stw { + if sec.Name != "" { + names[i] = sec.Name + } + } + + // Start writing actual data for sections. + for i, sec := range stw { + var ( + written int64 + entryOffset = w.here() + ) + // The section header string section is reserved for section header string table. + if i == w.shstrndx { + w.writeStringTable(names) + sec.Size = uint64(w.here() - entryOffset) + } else { + if sec.Type == elf.SHT_NULL { + continue + } + + sr, err := w.srProvider.sectionReader(*sec) + if err != nil && w.err == nil { + w.err = err + } + + if sr != nil { + if w.compressDWARFSections && isDWARF(sec) && !isCompressed(sec) { + // Compress DWARF sections. + cw := &countingWriter{w: io.Discard} + tr := io.TeeReader(sr, cw) + + buf := bytes.NewBuffer(nil) + bufWritten, err := copyCompressed(buf, tr) + if err != nil && w.err == nil { + w.err = err + } + + ch := compressionHeader{ + byteOrder: w.fhdr.ByteOrder, + class: w.fhdr.Class, + Type: uint32(elf.COMPRESS_ZLIB), + Size: uint64(cw.written), // read bytes from the section reader. + Addralign: sec.SectionHeader.Addralign, + } + hdrWritten, err := ch.WriteTo(w.dst) + if err != nil && w.err == nil { + w.err = err + } + + dataWritten, err := buf.WriteTo(w.dst) + if err != nil && w.err == nil { + w.err = err + } + + if bufWritten != dataWritten { + w.err = fmt.Errorf("section %s: expected %d bytes, wrote %d bytes", sec.Name, bufWritten, dataWritten) + } + + written = hdrWritten + dataWritten + sec.Flags |= elf.SHF_COMPRESSED + } else { + // Write as is. + written, err = io.Copy(w.dst, sr) + if err != nil && w.err == nil { + w.err = err + } + } + } + + diff := (w.here() - entryOffset) + if written != diff { + w.err = fmt.Errorf("section %s: expected %d bytes, wrote %d bytes", sec.Name, written, diff) + } + } + + sec.Offset = uint64(entryOffset) + sec.FileSize = uint64(w.here() - entryOffset) + if w.err != nil { + // Early exit if there is an error. + return + } + } + + // Start writing the section header table. + shoff := w.here() + w.shoff = int(shoff) + // First, patch file header. + w.seek(w.seekSectionHeader, io.SeekStart) + w.u64(uint64(shoff)) + w.seek(w.seekSectionNum, io.SeekStart) + w.u16(uint16(shnum)) // e_shnum + w.seek(w.seekSectionStringIdx, io.SeekStart) + w.u16(uint16(w.shstrndx)) + w.seek(w.seekSectionEntrySize, io.SeekStart) + w.u16(w.shentsize) // e_shentsize + w.seek(0, io.SeekEnd) + + writeLink := func(sec *elf.Section) { + if sec.Link > 0 { + target, ok := w.sectionLinks[sec.Name] + if ok { + w.u32(uint32(sectionNameIdx[target])) + } else { + w.u32(uint32(0)) + } + } else { + w.u32(uint32(0)) + } + } + writeSH32 := func(shstrndx int, sec *elf.Section) { + // ELF32 Section header. + // type Section32 struct { + // Name uint32 /* Section name (index into the section header string table). */ + // Type uint32 /* Section type. */ + // Flags uint32 /* Section flags. */ + // Addr uint32 /* Address in memory image. */ + // Off uint32 /* Offset in file. */ + // Size uint32 /* Size in bytes. */ + // Link uint32 /* Index of a related section. */ + // Info uint32 /* Depends on section type. */ + // Addralign uint32 /* Alignment in bytes. */ + // Entsize uint32 /* Size of each entry in section. */ + // } + w.u32(uint32(shstrndx)) + w.u32(uint32(sec.Type)) + w.u32(uint32(sec.Flags)) + w.u32(uint32(sec.Addr)) + w.u32(uint32(sec.Offset)) + w.u32(uint32(sec.FileSize)) + writeLink(sec) + w.u32(sec.Info) + w.u32(uint32(sec.Addralign)) + w.u32(uint32(sec.Entsize)) + } + + writeSH64 := func(shstrndx int, sec *elf.Section) { + // ELF64 Section header. + // type Section64 struct { + // Name uint32 /* Section name (index into the section header string table). */ + // Type uint32 /* Section type. */ + // Flags uint64 /* Section flags. */ + // Addr uint64 /* Address in memory image. */ + // Off uint64 /* Offset in file. */ + // Size uint64 /* Size in bytes. */ + // Link uint32 /* Index of a related section. */ + // Info uint32 /* Depends on section type. */ + // Addralign uint64 /* Alignment in bytes. */ + // Entsize uint64 /* Size of each entry in section. */ + // } + w.u32(uint32(shstrndx)) + w.u32(uint32(sec.Type)) + w.u64(uint64(sec.Flags)) + w.u64(sec.Addr) + w.u64(sec.Offset) + w.u64(sec.FileSize) + writeLink(sec) + w.u32(sec.Info) + w.u64(sec.Addralign) + w.u64(sec.Entsize) + } + + // shstrndx index of the entry in the section header string table. + // 0 reserved for null string. + var writeSectionHeader func(shstrndx int, sec *elf.Section) + switch w.fhdr.Class { + case elf.ELFCLASS32: + writeSectionHeader = writeSH32 + case elf.ELFCLASS64: + writeSectionHeader = writeSH64 + case elf.ELFCLASSNONE: + fallthrough + default: + w.err = fmt.Errorf("unknown ELF class: %v", w.fhdr.Class) + } + + for _, sec := range stw { + if sec.Name == "" { + writeSectionHeader(0, sec) + continue + } + writeSectionHeader(w.shStrIdx[sec.Name], sec) + } +} + +// here returns the current seek offset from the start of the file. +func (w *Writer) here() int64 { + r, err := w.dst.Seek(0, io.SeekCurrent) + if err != nil && w.err == nil { + w.err = err + } + return r +} + +// seek moves the cursor to the point calculated using offset and starting point. +func (w *Writer) seek(offset int64, whence int) { + _, err := w.dst.Seek(offset, whence) + if err != nil && w.err == nil { + w.err = err + } +} + +// align writes as many padding bytes as needed to make the current file +// offset a multiple of align. +func (w *Writer) align(align int64) { + off := w.here() + alignOff := (off + (align - 1)) &^ (align - 1) + if alignOff-off > 0 { + w.write(make([]byte, alignOff-off)) + } +} + +func (w *Writer) write(buf []byte) { + _, err := w.dst.Write(buf) + if err != nil && w.err == nil { + w.err = err + } +} + +func (w *Writer) u16(n uint16) { + err := binary.Write(w.dst, w.fhdr.ByteOrder, n) + if err != nil && w.err == nil { + w.err = err + } +} + +func (w *Writer) u32(n uint32) { + err := binary.Write(w.dst, w.fhdr.ByteOrder, n) + if err != nil && w.err == nil { + w.err = err + } +} + +func (w *Writer) u64(n uint64) { + err := binary.Write(w.dst, w.fhdr.ByteOrder, n) + if err != nil && w.err == nil { + w.err = err + } +} + +// writeStringTable writes given strings in string table format. +func (w *Writer) writeStringTable(strs []string) { + // http://www.sco.com/developers/gabi/2003-12-17/ch4.strtab.html + w.write([]byte{0}) + i := 1 + for _, s := range strs { + if s == "" { + continue + } + data, err := unix.ByteSliceFromString(s) + if err != nil && w.err == nil { + w.err = err + break + } + w.shStrIdx[s] = i + w.write(data) + i += len(data) + } +} + +func isSectionStringTable(sec *elf.Section) bool { + return sec.Type == elf.SHT_STRTAB && sec.Name == sectionHeaderStrTable +} diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/extract.go b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/extract.go new file mode 100644 index 0000000000..6e24db8953 --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/extract.go @@ -0,0 +1,39 @@ +package elfwriter + +import ( + "debug/elf" + "fmt" + "io" +) + +type ReadAtCloser interface { + io.ReaderAt + io.Closer +} + +func OnlyKeepDebug(dst io.WriteSeeker, src ReadAtCloser) error { + w, err := NewNullifyingWriter(dst, src) + if err != nil { + return fmt.Errorf("initialize nullifying writer: %w", err) + } + w.FilterPrograms(func(p *elf.Prog) bool { + return p.Type == elf.PT_NOTE + }) + w.KeepSections( + isDWARF, + isSymbolTable, + isGoSymbolTable, + isPltSymbolTable, // NOTICE: gostd debug/elf.DWARF applies relocations. + func(s *elf.Section) bool { + return s.Name == ".comment" + }, + func(s *elf.Section) bool { + return s.Type == elf.SHT_NOTE + }, + ) + + if err := w.Flush(); err != nil { + return fmt.Errorf("flush ELF file: %w", err) + } + return nil +} diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/helpers.go b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/helpers.go new file mode 100644 index 0000000000..bfdbe8c5eb --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/helpers.go @@ -0,0 +1,206 @@ +// Copyright 2022-2024 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package elfwriter + +import ( + "debug/elf" + "encoding/binary" + "errors" + "fmt" + "io" + "strings" + + "github.com/klauspost/compress/zlib" +) + +type countingWriter struct { + w io.Writer + written int64 +} + +func (cw *countingWriter) Write(p []byte) (int, error) { + n, err := cw.w.Write(p) + cw.written += int64(n) + return n, err +} + +func isCompressed(sec *elf.Section) bool { + return sec.Type == elf.SHT_PROGBITS && + (sec.Flags&elf.SHF_COMPRESSED != 0 || strings.HasPrefix(sec.Name, ".zdebug_")) +} + +type compressionHeader struct { + byteOrder binary.ByteOrder + class elf.Class + headerSize int + + Type uint32 + Size uint64 + Addralign uint64 +} + +func NewCompressionHeaderFromSource(fhdr *elf.FileHeader, src io.ReaderAt, offset int64) (*compressionHeader, error) { + hdr := &compressionHeader{} + + switch fhdr.Class { + case elf.ELFCLASS32: + ch := new(elf.Chdr32) + hdr.headerSize = binary.Size(ch) + sr := io.NewSectionReader(src, offset, int64(hdr.headerSize)) + if err := binary.Read(sr, fhdr.ByteOrder, ch); err != nil { + return nil, err + } + hdr.class = elf.ELFCLASS32 + hdr.Type = ch.Type + hdr.Size = uint64(ch.Size) + hdr.Addralign = uint64(ch.Addralign) + hdr.byteOrder = fhdr.ByteOrder + case elf.ELFCLASS64: + ch := new(elf.Chdr64) + hdr.headerSize = binary.Size(ch) + sr := io.NewSectionReader(src, offset, int64(hdr.headerSize)) + if err := binary.Read(sr, fhdr.ByteOrder, ch); err != nil { + return nil, err + } + hdr.class = elf.ELFCLASS64 + hdr.Type = ch.Type + hdr.Size = ch.Size + hdr.Addralign = ch.Addralign + hdr.byteOrder = fhdr.ByteOrder + case elf.ELFCLASSNONE: + fallthrough + default: + return nil, fmt.Errorf("unknown ELF class: %v", fhdr.Class) + } + + if elf.CompressionType(hdr.Type) != elf.COMPRESS_ZLIB { + // TODO(kakkoyun): COMPRESS_ZSTD + // https://github.com/golang/go/issues/55107 + return nil, errors.New("section should be zlib compressed, we are reading from the wrong offset or debug data is corrupt") + } + + return hdr, nil +} + +func (hdr compressionHeader) WriteTo(w io.Writer) (int64, error) { + var written int + switch hdr.class { + case elf.ELFCLASS32: + ch := new(elf.Chdr32) + ch.Type = uint32(elf.COMPRESS_ZLIB) + ch.Size = uint32(hdr.Size) + ch.Addralign = uint32(hdr.Addralign) + if err := binary.Write(w, hdr.byteOrder, ch); err != nil { + return 0, err + } + written = binary.Size(ch) // headerSize + case elf.ELFCLASS64: + ch := new(elf.Chdr64) + ch.Type = uint32(elf.COMPRESS_ZLIB) + ch.Size = hdr.Size + ch.Addralign = hdr.Addralign + if err := binary.Write(w, hdr.byteOrder, ch); err != nil { + return 0, err + } + written = binary.Size(ch) // headerSize + case elf.ELFCLASSNONE: + fallthrough + default: + return 0, fmt.Errorf("unknown ELF class: %v", hdr.class) + } + + return int64(written), nil +} + +func copyCompressed(w io.Writer, r io.Reader) (int64, error) { + if r == nil { + return 0, errors.New("reader is nil") + } + + pr, pw := io.Pipe() + + // write in writer end of pipe. + var wErr error + go func() { + defer pw.Close() + defer func() { + if r := recover(); r != nil { + err, ok := r.(error) + if ok { + wErr = fmt.Errorf("panic occurred: %w", err) + } + } + }() + _, wErr = io.Copy(pw, r) + }() + + // read from reader end of pipe. + defer pr.Close() + + cw := &countingWriter{w: w} + zw := zlib.NewWriter(cw) + _, err := io.Copy(zw, pr) + if err != nil { + zw.Close() + return 0, err + } + zw.Close() + + if wErr != nil { + return 0, wErr + } + return cw.written, nil +} + +func isDWARF(s *elf.Section) bool { + return strings.HasPrefix(s.Name, ".debug_") || + strings.HasPrefix(s.Name, ".zdebug_") || + strings.HasPrefix(s.Name, "__debug_") // macos +} + +func isSymbolTable(s *elf.Section) bool { + return s.Type == elf.SHT_SYMTAB || s.Type == elf.SHT_DYNSYM || + s.Type == elf.SHT_STRTAB || + s.Name == ".symtab" || + s.Name == ".dynsym" || + s.Name == ".strtab" || + s.Name == ".dynstr" +} + +func isGoSymbolTable(s *elf.Section) bool { + return s.Name == ".gosymtab" || + s.Name == ".gopclntab" || + s.Name == ".go.buildinfo" || + s.Name == ".data.rel.ro.gosymtab" || + s.Name == ".data.rel.ro.gopclntab" +} + +func isPltSymbolTable(s *elf.Section) bool { + return s.Type == elf.SHT_RELA || s.Type == elf.SHT_REL || // nolint:misspell + // Redundant + s.Name == ".plt" || + s.Name == ".plt.got" || + s.Name == ".rela.plt" || + s.Name == ".rela.dyn" // nolint:goconst +} + +func match[T *elf.Prog | *elf.Section | *elf.SectionHeader](elem T, predicates ...func(T) bool) bool { + for _, pred := range predicates { + if pred(elem) { + return true + } + } + return false +} diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/nullifying_elfwriter.go b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/nullifying_elfwriter.go new file mode 100644 index 0000000000..0085ea4915 --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/nullifying_elfwriter.go @@ -0,0 +1,95 @@ +// Copyright 2022-2024 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package elfwriter + +import ( + "debug/elf" + "fmt" + "io" +) + +// NullifyingWriter is a wrapper around another Writer that nullifies all the sections +// except the whitelisted ones. +type NullifyingWriter struct { + Writer + src io.ReaderAt + + progPredicates []func(*elf.Prog) bool + sectionPredicates []func(*elf.Section) bool +} + +// NewNullifyingWriter creates a new NullifyingWriter. +func NewNullifyingWriter(dst io.WriteSeeker, src io.ReaderAt, opts ...Option) (*NullifyingWriter, error) { + f, err := elf.NewFile(src) + if err != nil { + return nil, fmt.Errorf("error reading ELF file: %w", err) + } + defer f.Close() + + w, err := newWriter(dst, &f.FileHeader, newNullifyingWriterSectionReader(src), opts...) + if err != nil { + return nil, err + } + w.progs = f.Progs + w.sections = f.Sections + + return &NullifyingWriter{ + Writer: *w, + src: src, + }, nil +} + +// FilterPrograms filters out programs from the source. +func (w *NullifyingWriter) FilterPrograms(predicates ...func(*elf.Prog) bool) { + w.progPredicates = append(w.progPredicates, predicates...) +} + +// KeepSections keeps only the sections that match the predicates. +// If no predicates are given, all sections are nullified. +func (w *NullifyingWriter) KeepSections(predicates ...func(*elf.Section) bool) { + w.sectionPredicates = append(w.sectionPredicates, predicates...) +} + +func newNullifyingWriterSectionReader(src io.ReaderAt) sectionReaderProviderFn { + return func(sec elf.Section) (io.Reader, error) { + if sec.Type == elf.SHT_NOBITS { + return nil, nil + } + return io.NewSectionReader(src, int64(sec.Offset), int64(sec.FileSize)), nil + } +} + +func (w *NullifyingWriter) Flush() error { + if len(w.progPredicates) > 0 { + newProgs := []*elf.Prog{} + for _, prog := range w.progs { + if match(prog, w.progPredicates...) { + newProgs = append(newProgs, prog) + } + } + w.progs = newProgs + } + + for _, sec := range w.sections { + if match(sec, w.sectionPredicates...) || + sec.Type == elf.SHT_NOBITS || sec.Type == elf.SHT_NULL || isSectionStringTable(sec) { + + continue + } + sec.Type = elf.SHT_NOBITS + } + + return w.Writer.Flush() +} diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/options.go b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/options.go new file mode 100644 index 0000000000..52f3293510 --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter/options.go @@ -0,0 +1,24 @@ +// Copyright 2022-2024 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package elfwriter + +type Option func(w *Writer) + +// WithCompressDWARFSections compresses DWARF sections. +func WithCompressDWARFSections() Option { + return func(w *Writer) { + w.compressDWARFSections = true + } +} diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/grpc_upload_client.go b/internal/component/pyroscope/ebpf/reporter/parca/reporter/grpc_upload_client.go new file mode 100644 index 0000000000..1094609277 --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/grpc_upload_client.go @@ -0,0 +1,148 @@ +//go:build linux && (arm64 || amd64) + +// Copyright 2022-2024 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + + debuginfogrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/debuginfo/v1alpha1/debuginfov1alpha1grpc" + debuginfopb "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/debuginfo/v1alpha1" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ErrDebuginfoAlreadyExists = errors.New("debug info already exists") + +const ( + // ChunkSize 8MB is the size of the chunks in which debuginfo files are + // uploaded and downloaded. AWS S3 has a minimum of 5MB for multi-part uploads + // and a maximum of 15MB, and a default of 8MB. + ChunkSize = 1024 * 1024 * 8 + // MaxMsgSize is the maximum message size the server can receive or send. By default, it is 64MB. + MaxMsgSize = 1024 * 1024 * 64 +) + +type GrpcDebuginfoUploadServiceClient interface { + Upload(ctx context.Context, opts ...grpc.CallOption) (debuginfogrpc.DebuginfoService_UploadClient, error) +} + +type GrpcUploadClient struct { + GrpcDebuginfoUploadServiceClient +} + +func NewGrpcUploadClient(client GrpcDebuginfoUploadServiceClient) *GrpcUploadClient { + return &GrpcUploadClient{client} +} + +func (c *GrpcUploadClient) Upload(ctx context.Context, uploadInstructions *debuginfopb.UploadInstructions, r io.Reader) (uint64, error) { + return c.grpcUpload(ctx, uploadInstructions, r) +} + +func (c *GrpcUploadClient) grpcUpload(ctx context.Context, uploadInstructions *debuginfopb.UploadInstructions, r io.Reader) (uint64, error) { + stream, err := c.GrpcDebuginfoUploadServiceClient.Upload(ctx, grpc.MaxCallSendMsgSize(MaxMsgSize)) + if err != nil { + return 0, fmt.Errorf("initiate upload: %w", err) + } + + defer func() { + if stream != nil { + _, _ = stream.CloseAndRecv() + } + }() + + err = stream.Send(&debuginfopb.UploadRequest{ + Data: &debuginfopb.UploadRequest_Info{ + Info: &debuginfopb.UploadInfo{ + UploadId: uploadInstructions.UploadId, + BuildId: uploadInstructions.BuildId, + Type: uploadInstructions.Type, + }, + }, + }) + if err != nil { + if err := sentinelError(err); err != nil { + return 0, err + } + return 0, fmt.Errorf("send upload info: %w", err) + } + + reader := bufio.NewReader(r) + + buffer := make([]byte, ChunkSize) + + bytesSent := 0 + for { + n, err := reader.Read(buffer) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return 0, fmt.Errorf("read next chunk (%d bytes sent so far): %w", bytesSent, err) + } + + err = stream.Send(&debuginfopb.UploadRequest{ + Data: &debuginfopb.UploadRequest_ChunkData{ + ChunkData: buffer[:n], + }, + }) + bytesSent += n + if errors.Is(err, io.EOF) { + // When the stream is closed, the server will send an EOF. + // To get the correct error code, we need the status. + // So receive the message and check the status. + err = stream.RecvMsg(nil) + if err := sentinelError(err); err != nil { + return 0, err + } + return 0, fmt.Errorf("send chunk: %w", err) + } + if err != nil { + return 0, fmt.Errorf("send next chunk (%d bytes sent so far): %w", bytesSent, err) + } + } + + // It returns io.EOF when the stream completes successfully. + res, err := stream.CloseAndRecv() + if errors.Is(err, io.EOF) { + return res.Size, nil + } + if err != nil { + // On any other error, the stream is aborted and the error contains the RPC status. + if err := sentinelError(err); err != nil { + return 0, err + } + return 0, fmt.Errorf("close and receive: %w", err) + } + return res.Size, nil +} + +// sentinelError checks underlying error for grpc.StatusCode and returns if it's a known and expected error. +func sentinelError(err error) error { + if sts, ok := status.FromError(err); ok { + if sts.Code() == codes.AlreadyExists { + return ErrDebuginfoAlreadyExists + } + if sts.Code() == codes.FailedPrecondition { + return err + } + } + return nil +} diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/parca_uploader.go b/internal/component/pyroscope/ebpf/reporter/parca/reporter/parca_uploader.go new file mode 100644 index 0000000000..2c45a1886b --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/parca_uploader.go @@ -0,0 +1,462 @@ +package reporter + +import ( + "bufio" + "context" + "fmt" + "io" + "maps" + "net/http" + "os" + "path/filepath" + "sync" + "time" + + "github.com/grafana/alloy/internal/component/pyroscope/ebpf/reporter/parca/reporter/elfwriter" + + debuginfogrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/debuginfo/v1alpha1/debuginfov1alpha1grpc" + debuginfopb "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/debuginfo/v1alpha1" + + lru "github.com/elastic/go-freelru" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" //nolint:depguard + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/ebpf-profiler/process" +) + +type uploadRequest struct { + fileID libpf.FileID + fileName string + buildID string + open func() (process.ReadAtCloser, error) +} + +type ParcaSymbolUploader struct { + client debuginfogrpc.DebuginfoServiceClient + grpcUploadClient *GrpcUploadClient + httpClient *http.Client + + retry *lru.SyncedLRU[libpf.FileID, struct{}] + + stripTextSection bool + tmp string + + queue chan uploadRequest + inProgressTracker *inProgressTracker + workerNum int + + uploadRequestBytes prometheus.Counter +} + +func NewParcaSymbolUploader( + client debuginfogrpc.DebuginfoServiceClient, + cacheSize uint32, + stripTextSection bool, + queueSize uint32, + workerNum int, + cacheDir string, + uploadRequestBytes prometheus.Counter, +) (*ParcaSymbolUploader, error) { + + retryCache, err := lru.NewSynced[libpf.FileID, struct{}](cacheSize, libpf.FileID.Hash32) + if err != nil { + return nil, err + } + + cacheDirectory := filepath.Join(cacheDir, "symuploader") + if _, err := os.Stat(cacheDirectory); os.IsNotExist(err) { + log.Debugf("Creating cache directory '%s'", cacheDirectory) + if err := os.MkdirAll(cacheDirectory, os.ModePerm); err != nil { + return nil, fmt.Errorf("failed to create cache directory (%s): %s", cacheDirectory, err) + } + } + + if err := filepath.Walk(cacheDirectory, func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + + if os.Remove(path) != nil { + log.Warnf("Failed to remove cached file: %s", path) + } + + return nil + }); err != nil { + return nil, fmt.Errorf("failed to clean cache directory (%s): %s", cacheDirectory, err) + } + + return &ParcaSymbolUploader{ + httpClient: http.DefaultClient, + client: client, + grpcUploadClient: NewGrpcUploadClient(client), + retry: retryCache, + stripTextSection: stripTextSection, + tmp: cacheDirectory, + queue: make(chan uploadRequest, queueSize), + inProgressTracker: newInProgressTracker(0.2), + workerNum: workerNum, + uploadRequestBytes: uploadRequestBytes, + }, nil +} + +const ( + ReasonUploadInProgress = "A previous upload is still in-progress and not stale yet (only stale uploads can be retried)." +) + +// inProgressTracker is a simple in-progress tracker that keeps track of which +// fileIDs are currently in-progress/enqueued to be uploaded. +type inProgressTracker struct { + mu sync.Mutex + m map[libpf.FileID]struct{} + + // tracking metadata to know when to shrink the map as otherwise the map + // may grow indefinitely. + maxSizeSeen int + shrinkLimitRatio float64 +} + +// newInProgressTracker returns a new in-progress tracker that shrinks the +// tracking map when the maximum size seen is larger than the current size by +// the shrinkLimitRatio. +func newInProgressTracker(shrinkLimitRatio float64) *inProgressTracker { + return &inProgressTracker{ + m: make(map[libpf.FileID]struct{}), + shrinkLimitRatio: shrinkLimitRatio, + } +} + +// GetOrAdd returns ensures that the fileID is in the in-progress state. If the +// fileID is already in the in-progress state it returns true. +func (i *inProgressTracker) GetOrAdd(fileID libpf.FileID) (alreadyInProgress bool) { + i.mu.Lock() + defer i.mu.Unlock() + + _, alreadyInProgress = i.m[fileID] + i.m[fileID] = struct{}{} + + if len(i.m) > i.maxSizeSeen { + i.maxSizeSeen = len(i.m) + } + + return +} + +// Remove removes the fileID from the in-progress state. +func (i *inProgressTracker) Remove(fileID libpf.FileID) { + i.mu.Lock() + defer i.mu.Unlock() + + delete(i.m, fileID) + + if i.shrinkLimitRatio > 0 && + int(float64(len(i.m))+float64(len(i.m))*i.shrinkLimitRatio) < i.maxSizeSeen { + + i.m = maps.Clone(i.m) + i.maxSizeSeen = len(i.m) + } +} + +// Start starts the upload workers. +func (u *ParcaSymbolUploader) Run(ctx context.Context) error { + var g errgroup.Group + + for i := 0; i < u.workerNum; i++ { + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return nil + case req := <-u.queue: + if err := u.attemptUpload(ctx, req.fileID, req.fileName, req.buildID, req.open); err != nil { + log.Warnf("Failed to upload with fileName '%s' and buildID '%s': %v", req.fileName, req.buildID, err) + } + } + } + }) + } + + return g.Wait() +} + +// Upload enqueues a file for upload if it's not already in progress, or if it +// is marked not to be retried. +func (u *ParcaSymbolUploader) Upload(ctx context.Context, fileID libpf.FileID, fileName string, buildID string, + open func() (process.ReadAtCloser, error)) { + + _, ok := u.retry.Get(fileID) + if ok { + return + } + + // Attempting to enqueue each fileID only once. + alreadyInProgress := u.inProgressTracker.GetOrAdd(fileID) + if alreadyInProgress { + return + } + + select { + case <-ctx.Done(): + u.inProgressTracker.Remove(fileID) + case u.queue <- uploadRequest{fileID: fileID, fileName: fileName, buildID: buildID, open: open}: + // Nothing to do, we enqueued the request successfully. + default: + // The queue is full, we can't enqueue the request. + u.inProgressTracker.Remove(fileID) + log.Warnf("Failed to enqueue upload request with fileName '%s' and buildID '%s': queue is full", fileName, buildID) + } +} + +// attemptUpload attempts to upload the file with the given fileID and buildID. +func (u *ParcaSymbolUploader) attemptUpload(ctx context.Context, fileID libpf.FileID, fileName string, buildID string, + open func() (process.ReadAtCloser, error)) error { + + defer u.inProgressTracker.Remove(fileID) + + buildIDType := debuginfopb.BuildIDType_BUILD_ID_TYPE_GNU + if buildID == "" { + buildIDType = debuginfopb.BuildIDType_BUILD_ID_TYPE_HASH + buildID = fileID.StringNoQuotes() + } + + shouldInitiateUploadResp, err := u.client.ShouldInitiateUpload(ctx, &debuginfopb.ShouldInitiateUploadRequest{ + BuildId: buildID, + BuildIdType: buildIDType, + Type: debuginfopb.DebuginfoType_DEBUGINFO_TYPE_DEBUGINFO_UNSPECIFIED, + }) + if err != nil { + return err + } + + if !shouldInitiateUploadResp.ShouldInitiateUpload { + // This can happen when two agents simultaneously try to upload the + // same file. The other agent already started the upload so we don't + // need to do it again, however the upload may fail so we should retry + // after a while. + if shouldInitiateUploadResp.Reason == ReasonUploadInProgress { + u.retry.AddWithLifetime(fileID, struct{}{}, 5*time.Minute) + return nil + } + u.retry.Add(fileID, struct{}{}) + return nil + } + + var ( + r io.Reader + size int64 + ) + if !u.stripTextSection { + // We're not stripping the text section so we can upload the original file. + f, err := open() + if err != nil { + if os.IsNotExist(err) { + // File doesn't exist, likely because the process is already + // gone. + return nil + } + if err.Error() == "no backing file for anonymous memory" { + // This is an anonymous memory mapping, it's not backed by + // a file so we will never be able to extract debuginfo. + u.retry.Add(fileID, struct{}{}) + return nil + } + return fmt.Errorf("open file: %w", err) + } + defer f.Close() + + size, err = readAtCloserSize(f) + if err != nil { + return err + } + if size == 0 { + // The original file is empty no need to ever upload it. + u.retry.Add(fileID, struct{}{}) + return nil + } + + r = io.NewSectionReader(f, 0, size) + } else { + f, err := os.Create(filepath.Join(u.tmp, fileID.StringNoQuotes())) + if err != nil { + os.Remove(f.Name()) + return fmt.Errorf("create file: %w", err) + } + defer os.Remove(f.Name()) + defer f.Close() + + original, err := open() + if err != nil { + os.Remove(f.Name()) + if os.IsNotExist(err) { + // Original file doesn't exist the process is likely + // already gone. + return nil + } + if err.Error() == "no backing file for anonymous memory" { + // This is an anonymous memory mapping, it's not backed by + // a file so we will never be able to extract debuginfo. + u.retry.Add(fileID, struct{}{}) + return nil + } + return fmt.Errorf("open original file: %w", err) + } + defer original.Close() + + if err := elfwriter.OnlyKeepDebug(f, original); err != nil { + os.Remove(f.Name()) + // If we can't extract the debuginfo we can't upload the file. + u.retry.Add(fileID, struct{}{}) + return fmt.Errorf("extract debuginfo: %w", err) + } + + if _, err := f.Seek(0, io.SeekStart); err != nil { + os.Remove(f.Name()) + // Something is probably seriously wrong so don't retry. + u.retry.Add(fileID, struct{}{}) + return fmt.Errorf("seek extracted debuginfo to start: %w", err) + } + + stat, err := f.Stat() + if err != nil { + os.Remove(f.Name()) + // Something is probably seriously wrong so don't retry. + u.retry.Add(fileID, struct{}{}) + return fmt.Errorf("stat file to upload: %w", err) + } + size = stat.Size() + + if size == 0 { + os.Remove(f.Name()) + // Extraction is a deterministic process so if the file is empty we + // will never be able to extract non-zero debuginfo the original + // binary. + u.retry.Add(fileID, struct{}{}) + return nil + } + + r = f + } + + log.Infof("Attempting to upload with fileName '%s' and buildID '%s'", fileName, buildID) + initiateUploadResp, err := u.client.InitiateUpload(ctx, &debuginfopb.InitiateUploadRequest{ + BuildId: buildID, + BuildIdType: buildIDType, + Type: debuginfopb.DebuginfoType_DEBUGINFO_TYPE_DEBUGINFO_UNSPECIFIED, + Hash: fileID.StringNoQuotes(), + Size: size, + }) + if err != nil { + if status.Code(err) == codes.FailedPrecondition { + // This is a race that can happen when multiple agents are trying + // to upload the same file. This happens when another upload is + // still in progress. Since we don't know if it will succeed or not + // we retry after a while. + u.retry.AddWithLifetime(fileID, struct{}{}, 5*time.Minute) + return nil + } + if status.Code(err) == codes.AlreadyExists { + // This is a race that can happen when multiple agents are trying + // to upload the same file. The other upload already succeeded so + // we don't need to upload it again. + u.retry.Add(fileID, struct{}{}) + return nil + } + if status.Code(err) == codes.InvalidArgument { + // This will never succeed, no need to retry. + u.retry.Add(fileID, struct{}{}) + return nil + } + return err + } + + if initiateUploadResp.UploadInstructions == nil { + u.retry.Add(fileID, struct{}{}) + return nil + } + + instructions := initiateUploadResp.UploadInstructions + var uploadedBytes uint64 + switch instructions.UploadStrategy { + case debuginfopb.UploadInstructions_UPLOAD_STRATEGY_SIGNED_URL: + if err := u.uploadViaSignedURL(ctx, instructions.SignedUrl, r, size); err != nil { + return err + } + uploadedBytes = uint64(size) + case debuginfopb.UploadInstructions_UPLOAD_STRATEGY_GRPC: + var err error + uploadedBytes, err = u.grpcUploadClient.Upload(ctx, instructions, r) + if err != nil { + return err + } + default: + // No clue what to do with this upload strategy. + log.Warnf("Unknown upload strategy: %v", instructions.UploadStrategy) + u.retry.Add(fileID, struct{}{}) + return nil + } + + u.uploadRequestBytes.Add(float64(uploadedBytes)) + + _, err = u.client.MarkUploadFinished(ctx, &debuginfopb.MarkUploadFinishedRequest{ + BuildId: buildID, + UploadId: initiateUploadResp.UploadInstructions.UploadId, + }) + if err != nil { + return err + } + + u.retry.Add(fileID, struct{}{}) + return nil +} + +type Stater interface { + Stat() (os.FileInfo, error) +} + +// readAtCloserSize attempts to determine the size of the reader. +func readAtCloserSize(r process.ReadAtCloser) (int64, error) { + stater, ok := r.(Stater) + if !ok { + log.Debugf("ReadAtCloser is not a Stater, can't determine size") + return 0, nil + } + + stat, err := stater.Stat() + if err != nil { + return 0, fmt.Errorf("stat file to upload: %w", err) + } + + return stat.Size(), nil +} + +// uploadViaSignedURL uploads the reader to the signed URL. +func (u *ParcaSymbolUploader) uploadViaSignedURL(ctx context.Context, url string, r io.Reader, size int64) error { + // Client is closing the reader if the reader is also closer. + // We need to wrap the reader to avoid this. + // We want to have total control over the reader. + r = bufio.NewReader(r) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, r) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + req.ContentLength = size + resp, err := u.httpClient.Do(req) + if err != nil { + return fmt.Errorf("do upload request: %w", err) + } + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + + if resp.StatusCode/100 != 2 { + data, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status code: %d, msg: %s", resp.StatusCode, string(data)) + } + + return nil +} diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/parca_uploader_test.go b/internal/component/pyroscope/ebpf/reporter/parca/reporter/parca_uploader_test.go new file mode 100644 index 0000000000..860f4ecdbb --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/parca_uploader_test.go @@ -0,0 +1,61 @@ +//go:build linux && (arm64 || amd64) + +package reporter + +import ( + "math/rand" + "testing" + + "go.opentelemetry.io/ebpf-profiler/libpf" +) + +func TestMapShrink(t *testing.T) { + tr := newInProgressTracker(0.2) + r := rand.New(rand.NewSource(0)) + + items := make([]libpf.FileID, 100) + for i := 0; i < 100; i++ { + items[i] = libpf.NewFileID( + r.Uint64(), + r.Uint64(), + ) + + tr.GetOrAdd(items[i]) + } + + if tr.maxSizeSeen != 100 { + t.Errorf("expected 100, got %d", tr.maxSizeSeen) + } + + for i := 0; i < 10; i++ { + tr.Remove(items[i]) + } + + if tr.maxSizeSeen != 100 { + t.Errorf("expected 100, got %d", tr.maxSizeSeen) + } + + for i := 10; i < 20; i++ { + tr.Remove(items[i]) + } + + if tr.maxSizeSeen != 83 { + t.Errorf("expected 83, got %d", tr.maxSizeSeen) + } + + // adding up to 83 doesn't change anything + for i := 10; i < 13; i++ { + tr.GetOrAdd(items[i]) + } + + if tr.maxSizeSeen != 83 { + t.Errorf("expected 83, got %d", tr.maxSizeSeen) + } + + // adding 84th item should increases the max size + tr.GetOrAdd(items[13]) + + if tr.maxSizeSeen != 84 { + t.Errorf("expected 84, got %d", tr.maxSizeSeen) + } +} diff --git a/internal/component/pyroscope/ebpf/reporter/parca/reporter/readme.txt b/internal/component/pyroscope/ebpf/reporter/parca/reporter/readme.txt new file mode 100644 index 0000000000..9ca6ed49ba --- /dev/null +++ b/internal/component/pyroscope/ebpf/reporter/parca/reporter/readme.txt @@ -0,0 +1 @@ +copy-paste from https://github.com/parca-dev/parca-agent/tree/86e9befa15f6078163746421f85a46ca63119376/reporter diff --git a/internal/component/pyroscope/ebpf/reporter/pprof.go b/internal/component/pyroscope/ebpf/reporter/pprof.go index 591633bb14..cc49e36260 100644 --- a/internal/component/pyroscope/ebpf/reporter/pprof.go +++ b/internal/component/pyroscope/ebpf/reporter/pprof.go @@ -10,16 +10,13 @@ import ( "sync" "time" - "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/go-kit/log" "github.com/google/pprof/profile" + "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/prometheus/prometheus/model/labels" "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/libpf/xsync" - "go.opentelemetry.io/ebpf-profiler/process" "go.opentelemetry.io/ebpf-profiler/pyroscope/discovery" - "go.opentelemetry.io/ebpf-profiler/pyroscope/symb/irsymcache" "go.opentelemetry.io/ebpf-profiler/reporter/samples" "go.opentelemetry.io/ebpf-profiler/support" ) @@ -29,41 +26,31 @@ type PPROF struct { Labels labels.Labels Origin libpf.Origin } -type PPROFConsumer interface { - ConsumePprofProfiles(ctx context.Context, p []PPROF) -} - -type PPROFConsumerFunc func(ctx context.Context, p []PPROF) -func (f PPROFConsumerFunc) ConsumePprofProfiles(ctx context.Context, p []PPROF) { - f(ctx, p) -} +type PPROFConsumer func(ctx context.Context, p []PPROF) type Config struct { ReportInterval time.Duration SamplesPerSecond int64 Demangle string ReporterUnsymbolizedStubs bool - - ExtraNativeSymbolResolver irsymcache.NativeSymbolResolver - Consumer PPROFConsumer } type PPROFReporter struct { cfg *Config log log.Logger + consumer PPROFConsumer + traceEvents xsync.RWMutex[samples.TraceEventsTree] - sd discovery.TargetProducer + sd discovery.TargetProducer + wg sync.WaitGroup cancelReporting context.CancelFunc } -func NewPPROF( - log log.Logger, - cfg *Config, - sd discovery.TargetProducer, -) *PPROFReporter { +func NewPPROF(log log.Logger, + cfg *Config, sd discovery.TargetProducer, consumer PPROFConsumer) *PPROFReporter { tree := make(samples.TraceEventsTree) return &PPROFReporter{ @@ -71,6 +58,7 @@ func NewPPROF( log: log, traceEvents: xsync.NewRWMutex(tree), sd: sd, + consumer: consumer, } } @@ -169,7 +157,7 @@ func (p *PPROFReporter) reportProfile(ctx context.Context) { } } - p.cfg.Consumer.ConsumePprofProfiles(ctx, profiles) + p.consumer(ctx, profiles) sz := 0 for _, it := range profiles { sz += len(it.Raw) @@ -178,12 +166,6 @@ func (p *PPROFReporter) reportProfile(ctx context.Context) { } func (p *PPROFReporter) createProfile(containerID samples.ContainerID, origin libpf.Origin, events map[samples.TraceAndMetaKey]*samples.TraceEvents) []PPROF { - defer func() { - if p.cfg.ExtraNativeSymbolResolver != nil { - p.cfg.ExtraNativeSymbolResolver.Cleanup() - } - }() - bs := NewProfileBuilders(BuildersOptions{ SampleRate: p.cfg.SamplesPerSecond, PerPIDProfile: true, @@ -241,7 +223,6 @@ func (p *PPROFReporter) createProfile(containerID samples.ContainerID, origin li switch fr.Type { case libpf.NativeFrame: if fr.FunctionName == libpf.NullString { - p.symbolizeNativeFrame(b, location, fr) if location.Line == nil && p.cfg.ReporterUnsymbolizedStubs { p.symbolizeStub(b, location, fr) } @@ -314,34 +295,6 @@ func (p *PPROFReporter) createProfile(containerID samples.ContainerID, origin li return res } -func (p *PPROFReporter) symbolizeNativeFrame( - b *ProfileBuilder, - loc *profile.Location, - fr libpf.Frame, -) { - - if !fr.MappingFile.Valid() { - return - } - mappingFile := fr.MappingFile.Value() - if mappingFile.FileName == process.VdsoPathName { - return - } - if p.cfg.ExtraNativeSymbolResolver == nil { - return - } - irsymcache.SymbolizeNativeFrame(p.cfg.ExtraNativeSymbolResolver, mappingFile.FileName, fr.AddressOrLineno, mappingFile.FileID, func(si irsymcache.SourceInfo) { - name := si.FunctionName - if name == libpf.NullString && si.FilePath == libpf.NullString { - return - } - name = p.demangle(name) - loc.Mapping.HasFunctions = true - line := profile.Line{Function: b.Function(name, si.FilePath)} - loc.Line = append(loc.Line, line) - }) -} - func (p *PPROFReporter) symbolizeStub(b *ProfileBuilder, location *profile.Location, fr libpf.Frame) { if location.Mapping.File == "" { return diff --git a/internal/component/pyroscope/ebpf/reporter/pprof_test.go b/internal/component/pyroscope/ebpf/reporter/pprof_test.go index 112b26a72b..7823a3471d 100644 --- a/internal/component/pyroscope/ebpf/reporter/pprof_test.go +++ b/internal/component/pyroscope/ebpf/reporter/pprof_test.go @@ -4,19 +4,18 @@ package reporter import ( "bytes" + "context" "testing" "github.com/google/pprof/profile" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/ebpf-profiler/libpf/pfelf" "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/reporter/samples" "go.opentelemetry.io/ebpf-profiler/support" discovery "go.opentelemetry.io/ebpf-profiler/pyroscope/discovery" - "go.opentelemetry.io/ebpf-profiler/pyroscope/symb/irsymcache" ) func singleFrameTrace(ty libpf.FrameType, mappingFile libpf.FrameMappingFile, lineno libpf.AddressOrLineno, funcName, sourceFile string, sourceLine libpf.SourceLineno) libpf.Frames { @@ -41,14 +40,11 @@ func newReporter() *PPROFReporter { }, }, }) - return NewPPROF( - nil, - &Config{ - SamplesPerSecond: 97, - ExtraNativeSymbolResolver: nil, - }, - tp, - ) + return NewPPROF(nil, &Config{ + SamplesPerSecond: 97, + }, tp, func(ctx context.Context, p []PPROF) { + + }) } func TestPPROFReporter_StringAndFunctionTablePopulation(t *testing.T) { @@ -260,98 +256,10 @@ Mappings assert.Equal(t, expected, p.String()) } -func TestPPROFReporter_Demangle(t *testing.T) { - fid := libpf.NewFileID(7, 13) - key := symbolizerKey{ - fid: fid, - addr: 0xcafe00de, - } - rep := newReporter() - rep.cfg.ExtraNativeSymbolResolver = &symbolizer{ - symbols: map[symbolizerKey]irsymcache.SourceInfo{ - key: { - LineNumber: 9, - FunctionName: libpf.Intern("_ZN15PlatformMonitor4waitEm"), - }, - }, - } - rep.cfg.Demangle = "full" - - frames := make(libpf.Frames, 0, 1) - frames.Append(&libpf.Frame{ - Type: libpf.KernelFrame, - AddressOrLineno: 0x2000, - }) - frames.Append(&libpf.Frame{ // a native frame without a valid mapping should not be symbolized - Type: libpf.NativeFrame, - AddressOrLineno: 0xface000, - }) - frames.Append(&libpf.Frame{ // a native frame with a mapping, already symbolized, should not be symbolized again - Type: libpf.NativeFrame, - FunctionName: libpf.Intern("_ZN18ConcurrentGCThread3runEv"), - AddressOrLineno: 0xcafe00ef, - MappingStart: 0xcafe0000, - MappingEnd: 0xcafe1000, - MappingFile: libpf.NewFrameMappingFile(libpf.FrameMappingFileData{ - FileID: fid, - FileName: libpf.Intern("libfoo.so"), - }), - }) - frames.Append(&libpf.Frame{ // a native frame with a mapping should be symbolized - Type: libpf.NativeFrame, - FunctionName: libpf.NullString, - AddressOrLineno: 0xcafe00de, - MappingStart: 0xcafe0000, - MappingEnd: 0xcafe1000, - MappingFile: libpf.NewFrameMappingFile(libpf.FrameMappingFileData{ - FileID: fid, - FileName: libpf.Intern("libfoo.so"), - }), - }) - - traceKey := samples.TraceAndMetaKey{ - Pid: 123, - } - events := samples.KeyToEventMapping{ - traceKey: &samples.TraceEvents{ - Frames: frames, - Timestamps: []uint64{42}, - }, - } - - profiles := rep.createProfile( - samples.ContainerID(""), - support.TraceOriginSampling, - events, - ) - require.Len(t, profiles, 1) - assert.Equal(t, "service_a", profiles[0].Labels.Get("service_name")) - - p, err := profile.Parse(bytes.NewReader(profiles[0].Raw)) - require.NoError(t, err) - - p.TimeNanos = 0 - expected := `PeriodType: cpu nanoseconds -Period: 10309278 -Samples: -cpu/nanoseconds - 10309278: 1 2 3 4 -Locations - 1: 0x2000 M=1 - 2: 0xface000 M=1 - 3: 0xcafe00ef M=2 ConcurrentGCThread::run() :0:0 s=0() - 4: 0xcafe00de M=2 PlatformMonitor::wait(unsigned long) :0:0 s=0() -Mappings -1: 0x0/0x0/0x0 -2: 0xcafe0000/0xcafe1000/0x0 libfoo.so [FN] -` - assert.Equal(t, expected, p.String()) -} - func TestPPROFReporter_UnsymbolizedStub(t *testing.T) { rep := newReporter() - rep.cfg.ExtraNativeSymbolResolver = &symbolizer{} - rep.cfg.ReporterUnsymbolizedStubs = true + rep.cfg. + ReporterUnsymbolizedStubs = true frames := make(libpf.Frames, 0, 1) frames.Append(&libpf.Frame{ @@ -410,28 +318,3 @@ Mappings ` assert.Equal(t, expected, p.String()) } - -type symbolizer struct { - symbols map[symbolizerKey]irsymcache.SourceInfo -} - -type symbolizerKey struct { - fid libpf.FileID - addr uint64 -} - -func (s symbolizer) ExecutableKnown(id libpf.FileID) bool { - return true -} - -func (s symbolizer) ObserveExecutable(id libpf.FileID, ref *pfelf.Reference) error { - return nil -} - -func (s symbolizer) ResolveAddress(file libpf.FileID, addr uint64) (irsymcache.SourceInfo, error) { - return s.symbols[symbolizerKey{fid: file, addr: addr}], nil -} - -func (s symbolizer) Cleanup() { - -} diff --git a/internal/component/pyroscope/enrich/enrich.go b/internal/component/pyroscope/enrich/enrich.go index 599b58c3a7..630d8321d0 100644 --- a/internal/component/pyroscope/enrich/enrich.go +++ b/internal/component/pyroscope/enrich/enrich.go @@ -5,12 +5,11 @@ import ( "context" "sync" - "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/featuregate" + "github.com/prometheus/prometheus/model/labels" ) func init() { @@ -107,6 +106,10 @@ func (e *enrichAppendable) AppendIngest(ctx context.Context, profile *pyroscope. return e.component.fanout.Appender().AppendIngest(ctx, enrichedProfile) } +func (e *enrichAppendable) UploadDebugInfo(ctx context.Context, arg pyroscope.DebugInfoData) { + e.component.fanout.Appender().UploadDebugInfo(ctx, arg) +} + // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { <-ctx.Done() diff --git a/internal/component/pyroscope/java/loop_test.go b/internal/component/pyroscope/java/loop_test.go index 3188c26458..538c699d4a 100644 --- a/internal/component/pyroscope/java/loop_test.go +++ b/internal/component/pyroscope/java/loop_test.go @@ -11,13 +11,12 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/component/pyroscope" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - - "github.com/grafana/alloy/internal/component/discovery" - "github.com/grafana/alloy/internal/component/pyroscope" ) type mockProfiler struct { @@ -52,6 +51,10 @@ func (m *mockAppendable) AppendIngest(ctx context.Context, profile *pyroscope.In return args.Error(0) } +func (m *mockAppendable) UploadDebugInfo(_ context.Context, _ pyroscope.DebugInfoData) { + +} + func newTestProfilingLoop(_ *testing.T, profiler *mockProfiler, appendable pyroscope.Appendable) *profilingLoop { reg := prometheus.NewRegistry() output := pyroscope.NewFanout([]pyroscope.Appendable{appendable}, "test-appendable", reg) diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go index 9be14f13dd..200b342ac0 100644 --- a/internal/component/pyroscope/receive_http/receive_http_test.go +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -545,6 +545,10 @@ func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.Incomi return a.appendErr } +func (a *testAppender) UploadDebugInfo(_ context.Context, _ pyroscope.DebugInfoData) { + +} + // TestUpdateArgs verifies that the component can be updated with new arguments. This explicitly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. func TestUpdateArgs(t *testing.T) { ports, err := freeport.GetFreePorts(2) diff --git a/internal/component/pyroscope/relabel/relabel.go b/internal/component/pyroscope/relabel/relabel.go index 6bbceca0f3..510db9e3b9 100644 --- a/internal/component/pyroscope/relabel/relabel.go +++ b/internal/component/pyroscope/relabel/relabel.go @@ -205,6 +205,10 @@ func (c *Component) AppendIngest(ctx context.Context, profile *pyroscope.Incomin return c.fanout.Appender().AppendIngest(ctx, profile) } +func (c *Component) UploadDebugInfo(ctx context.Context, arg pyroscope.DebugInfoData) { + c.fanout.Appender().UploadDebugInfo(ctx, arg) +} + func (c *Component) Appender() pyroscope.Appender { return c } diff --git a/internal/component/pyroscope/relabel/relabel_test.go b/internal/component/pyroscope/relabel/relabel_test.go index c331fac9bd..5ae2232e60 100644 --- a/internal/component/pyroscope/relabel/relabel_test.go +++ b/internal/component/pyroscope/relabel/relabel_test.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/alloy/internal/util" "github.com/grafana/pyroscope/api/model/labelset" "github.com/grafana/regexp" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" @@ -447,6 +446,10 @@ type TestAppender struct { profiles []*pyroscope.IncomingProfile } +func (t *TestAppender) UploadDebugInfo(_ context.Context, _ pyroscope.DebugInfoData) { + +} + func NewTestAppender() *TestAppender { return &TestAppender{ profiles: make([]*pyroscope.IncomingProfile, 0), diff --git a/internal/component/pyroscope/scrape/delta_profiles.go b/internal/component/pyroscope/scrape/delta_profiles.go index c2de9154f8..bbcaba19a9 100644 --- a/internal/component/pyroscope/scrape/delta_profiles.go +++ b/internal/component/pyroscope/scrape/delta_profiles.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/component/pyroscope/scrape/internal/fastdelta" + "github.com/klauspost/compress/gzip" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -60,6 +61,10 @@ type deltaAppender struct { initialized bool } +func (d *deltaAppender) UploadDebugInfo(_ context.Context, _ pyroscope.DebugInfoData) { + +} + type gzipBuffer struct { gzr gzip.Reader gzw *gzip.Writer diff --git a/internal/component/pyroscope/testutil/components.go b/internal/component/pyroscope/testutil/components.go index fbf0ef75eb..f933cb8c85 100644 --- a/internal/component/pyroscope/testutil/components.go +++ b/internal/component/pyroscope/testutil/components.go @@ -17,6 +17,7 @@ func CreateWriteComponent(l log.Logger, reg prometheus.Registerer, endpoint stri var receiver pyroscope.Appendable e := write.GetDefaultEndpointOptions() e.URL = endpoint + e.DebugInfo.Enabled = true _, err := write.New( log.With(l, "component", "pyroscope.write"), diff --git a/internal/component/pyroscope/write/debuginfo_impl.go b/internal/component/pyroscope/write/debuginfo_impl.go new file mode 100644 index 0000000000..891adfb049 --- /dev/null +++ b/internal/component/pyroscope/write/debuginfo_impl.go @@ -0,0 +1,121 @@ +//go:build linux && (arm64 || amd64) + +package write + +import ( + "context" + "crypto/tls" + "encoding/base64" + "fmt" + "net/url" + "os" + "strings" + + debuginfogrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/debuginfo/v1alpha1/debuginfov1alpha1grpc" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/component/pyroscope/ebpf/reporter/parca/reporter" + commonconfig "github.com/prometheus/common/config" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +func newDebugInfoUpload(u *url.URL, metrics *metrics, e *EndpointOptions) (debugInfoUploader, error) { + if !e.DebugInfo.Enabled { + return nil, nil + } + + var creds credentials.TransportCredentials + switch u.Scheme { + case "http": + creds = insecure.NewCredentials() + case "https": + if promTLSConfig := e.HTTPClientConfig.TLSConfig.Convert(); promTLSConfig != nil { + tlsConf, err := commonconfig.NewTLSConfig(promTLSConfig) + if err != nil { + return nil, err + } + creds = credentials.NewTLS(tlsConf) + } else { + creds = credentials.NewTLS(&tls.Config{}) + } + default: + return nil, fmt.Errorf("unsupported scheme: %s", u.Scheme) + } + + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(creds), + } + if auth, err := newGrpcBasicAuthCredentials(e); err != nil { + return nil, err + } else if auth != nil { + opts = append(opts, grpc.WithPerRPCCredentials(auth)) + } + cc, err := grpc.NewClient(fmt.Sprintf("%s:%s", u.Hostname(), u.Port()), opts...) + if err != nil { + return nil, err + } + + impl, err := reporter.NewParcaSymbolUploader( + debuginfogrpc.NewDebuginfoServiceClient(cc), + e.DebugInfo.CacheSize, + e.DebugInfo.StripTextSection, + e.DebugInfo.QueueSize, + e.DebugInfo.WorkerNum, + e.DebugInfo.CachePath, + metrics.debugInfoUploadBytes, + ) + if err != nil { + return nil, err + } + return &debugInfoUploaderImpl{impl}, nil +} + +func newGrpcBasicAuthCredentials(e *EndpointOptions) (*basicAuthCredential, error) { + auth := e.HTTPClientConfig.BasicAuth + if auth == nil || auth.Username == "" { + return nil, nil + } + if auth.Password != "" { + return &basicAuthCredential{ + username: auth.Username, + password: string(auth.Password), + }, nil + } + if auth.PasswordFile != "" { + passwordBytes, err := os.ReadFile(auth.PasswordFile) + if err != nil { + return nil, err + } + return &basicAuthCredential{ + username: auth.Username, + password: strings.TrimSpace(string(passwordBytes)), + }, nil + } + return nil, nil +} + +type basicAuthCredential struct { + username string + password string +} + +func (b *basicAuthCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { + auth := b.username + ":" + b.password + encodedAuth := base64.StdEncoding.EncodeToString([]byte(auth)) + return map[string]string{ + "authorization": "Basic " + encodedAuth, + }, nil +} + +func (b *basicAuthCredential) RequireTransportSecurity() bool { + return true +} + +type debugInfoUploaderImpl struct { + *reporter.ParcaSymbolUploader +} + +func (r *debugInfoUploaderImpl) UploadDebugInfo(ctx context.Context, arg pyroscope.DebugInfoData) { + r.ParcaSymbolUploader.Upload(ctx, arg.FileID, arg.FileName, arg.BuildID, arg.Open) +} diff --git a/internal/component/pyroscope/write/debuginfo_stub.go b/internal/component/pyroscope/write/debuginfo_stub.go new file mode 100644 index 0000000000..61d8ae7c14 --- /dev/null +++ b/internal/component/pyroscope/write/debuginfo_stub.go @@ -0,0 +1,11 @@ +//go:build !(linux && (arm64 || amd64)) + +package write + +import ( + "net/url" +) + +func newDebugInfoUpload(u *url.URL, metrics *metrics, e *EndpointOptions) (debugInfoUploader, error) { + return nil, nil +} diff --git a/internal/component/pyroscope/write/metrics.go b/internal/component/pyroscope/write/metrics.go index b6256808c7..6b6b5c9891 100644 --- a/internal/component/pyroscope/write/metrics.go +++ b/internal/component/pyroscope/write/metrics.go @@ -6,12 +6,13 @@ import ( ) type metrics struct { - sentBytes *prometheus.CounterVec - droppedBytes *prometheus.CounterVec - sentProfiles *prometheus.CounterVec - droppedProfiles *prometheus.CounterVec - retries *prometheus.CounterVec - latency *prometheus.HistogramVec + sentBytes *prometheus.CounterVec + droppedBytes *prometheus.CounterVec + sentProfiles *prometheus.CounterVec + droppedProfiles *prometheus.CounterVec + retries *prometheus.CounterVec + latency *prometheus.HistogramVec + debugInfoUploadBytes prometheus.Counter } func newMetrics(reg prometheus.Registerer) *metrics { @@ -40,6 +41,10 @@ func newMetrics(reg prometheus.Registerer) *metrics { Name: "pyroscope_write_latency", Help: "Write latency for sending profiles to pyroscope", }, []string{"endpoint", "type"}), + debugInfoUploadBytes: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pyroscope_ebpf_debug_info_upload_bytes_total", + Help: "Total number of bytes uploaded to the debug info endpoint", + }), } if reg != nil { @@ -49,6 +54,7 @@ func newMetrics(reg prometheus.Registerer) *metrics { m.droppedProfiles = pyrometricsutil.MustRegisterOrGet(reg, m.droppedProfiles).(*prometheus.CounterVec) m.retries = pyrometricsutil.MustRegisterOrGet(reg, m.retries).(*prometheus.CounterVec) m.latency = pyrometricsutil.MustRegisterOrGet(reg, m.latency).(*prometheus.HistogramVec) + m.debugInfoUploadBytes = pyrometricsutil.MustRegisterOrGet(reg, m.debugInfoUploadBytes).(prometheus.Counter) } return m diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 18e071f345..cab20425a1 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -74,6 +74,17 @@ type EndpointOptions struct { MinBackoff time.Duration `alloy:"min_backoff_period,attr,optional"` // start backoff at this level MaxBackoff time.Duration `alloy:"max_backoff_period,attr,optional"` // increase exponentially to this level MaxBackoffRetries int `alloy:"max_backoff_retries,attr,optional"` // give up after this many; zero means infinite retries + + DebugInfo DebugInfoOptions `alloy:"debug_info,block,optional"` +} + +type DebugInfoOptions struct { + Enabled bool `alloy:"enabled,attr,optional"` + CacheSize uint32 `alloy:"cache_size,attr,optional"` + StripTextSection bool `alloy:"strip_text_section,attr,optional"` + QueueSize uint32 + WorkerNum int + CachePath string } func GetDefaultEndpointOptions() EndpointOptions { @@ -83,6 +94,14 @@ func GetDefaultEndpointOptions() EndpointOptions { MaxBackoff: 5 * time.Minute, MaxBackoffRetries: 10, HTTPClientConfig: config.CloneDefaultHTTPClientConfig(), + DebugInfo: DebugInfoOptions{ + Enabled: false, + CacheSize: 8 * 1024, + StripTextSection: true, + QueueSize: 256, + WorkerNum: 8, + CachePath: "/tmp/symb-cache/parca-symbols-uploader", + }, } return defaultEndpointOptions @@ -112,6 +131,7 @@ type Component struct { metrics *metrics userAgent string uid string + receiver *fanOutClient } // Exports are the set of fields exposed by the pyroscope.write component. @@ -145,6 +165,7 @@ func New( metrics: m, userAgent: userAgent, uid: uid, + receiver: receiver, }, nil } @@ -162,25 +183,39 @@ func (c *Component) Update(newConfig Arguments) error { return err } c.onStateChange(Exports{Receiver: receiver}) + c.receiver = receiver + c.receiver.cancel() return nil } type fanOutClient struct { // The list of push clients to fan out to. pushClients []pushv1connect.PusherServiceClient + debugInfo []debugInfoUploader ingestClients map[*EndpointOptions]*http.Client config Arguments metrics *metrics tracer trace.Tracer logger log.Logger + cancel context.CancelFunc +} + +type debugInfoUploader interface { + UploadDebugInfo(ctx context.Context, arg pyroscope.DebugInfoData) + Run(ctx context.Context) error } // newFanOut creates a new fan out client that will fan out to all endpoints. func newFanOut(logger log.Logger, tracer trace.Tracer, config Arguments, metrics *metrics, userAgent string, uid string) (*fanOutClient, error) { pushClients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) + debugInfoClients := make([]debugInfoUploader, 0, len(config.Endpoints)) ingestClients := make(map[*EndpointOptions]*http.Client) for _, endpoint := range config.Endpoints { + u, err := url.Parse(endpoint.URL) + if err != nil { + return nil, err + } if endpoint.Headers == nil { endpoint.Headers = map[string]string{} } @@ -196,14 +231,28 @@ func newFanOut(logger log.Logger, tracer trace.Tracer, config Arguments, metrics pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent)), ) ingestClients[endpoint] = httpClient + + if symbolsUploader, err := newDebugInfoUpload(u, metrics, endpoint); err != nil { + return nil, err + } else if symbolsUploader != nil { + debugInfoClients = append(debugInfoClients, symbolsUploader) + } + } + ctx, cancel := context.WithCancel(context.Background()) + for _, c := range debugInfoClients { + go func() { + _ = c.Run(ctx) + }() } return &fanOutClient{ logger: logger, tracer: tracer, pushClients: pushClients, + debugInfo: debugInfoClients, ingestClients: ingestClients, config: config, metrics: metrics, + cancel: cancel, }, nil } @@ -576,6 +625,12 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco return errs } +func (f *fanOutClient) UploadDebugInfo(ctx context.Context, arg pyroscope.DebugInfoData) { + for _, u := range f.debugInfo { + u.UploadDebugInfo(ctx, arg) + } +} + func (f *fanOutClient) observeLatency(endpoint, latencyType string) func() { t := time.Now() return func() {