Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/pyroscope/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Usage of ./pyroscope:
How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction). (default 15m0s)
-blocks-storage.bucket-store.tenant-sync-concurrency int
Maximum number of concurrent tenants synching blocks. (default 10)
-client-capability.allow-utf8-labelnames
[experimental] Enable reading and writing utf-8 label names. To use this feature, API calls must include `allow-utf8-labelnames=true` in the `Accept` header.
-compactor.block-ranges value
List of compaction time ranges. (default 1h0m0s,2h0m0s,8h0m0s)
-compactor.block-sync-concurrency int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ tenant_settings:
# CLI flag: -tenant-settings.recording-rules.enabled
[enabled: <boolean> | default = false]

client_capability:
# Enable reading and writing utf-8 label names. To use this feature, API calls
# must include `allow-utf8-labelnames=true` in the `Accept` header.
# CLI flag: -client-capability.allow-utf8-labelnames
[allow_utf_8_label_names: <boolean> | default = false]

storage:
# Backend storage to use. Supported backends are: s3, gcs, azure, swift,
# filesystem, cos.
Expand Down
35 changes: 23 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"connectrpc.com/connect"
"go.uber.org/atomic"

"github.com/grafana/pyroscope/pkg/featureflags"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -651,7 +653,7 @@ func visitSampleSeriesForIngester(profile *profilev1.Profile, labels []*typesv1.
}

func (d *Distributor) sendRequestsToIngester(ctx context.Context, req *distributormodel.ProfileSeries) (resp *connect.Response[pushv1.PushResponse], err error) {
sampleSeries, err := d.visitSampleSeries(req, visitSampleSeriesForIngester)
sampleSeries, err := d.visitSampleSeries(ctx, req, visitSampleSeriesForIngester)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -740,7 +742,7 @@ func (d *Distributor) sendRequestsToSegmentWriter(ctx context.Context, req *dist
// NOTE(kolesnikovae): if we return early, e.g., due to a validation error,
// or if there are no series, the write path router has already seen the
// request, and could have already accounted for the size, latency, etc.
serviceSeries, err := d.visitSampleSeries(req, visitSampleSeriesForSegmentWriter)
serviceSeries, err := d.visitSampleSeries(ctx, req, visitSampleSeriesForSegmentWriter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1126,15 +1128,23 @@ func injectMappingVersions(s *distributormodel.ProfileSeries) error {

type visitFunc func(*profilev1.Profile, []*typesv1.LabelPair, []*relabel.Config, *sampleSeriesVisitor) error

func (d *Distributor) visitSampleSeries(s *distributormodel.ProfileSeries, visit visitFunc) ([]*distributormodel.ProfileSeries, error) {
func (d *Distributor) visitSampleSeries(ctx context.Context, s *distributormodel.ProfileSeries, visit visitFunc) ([]*distributormodel.ProfileSeries, error) {
relabelingRules := d.limits.IngestionRelabelingRules(s.TenantID)
usageConfig := d.limits.DistributorUsageGroups(s.TenantID)
var result []*distributormodel.ProfileSeries
usageGroups := d.usageGroupEvaluator.GetMatch(s.TenantID, usageConfig, s.Labels)

// Extract client capabilities from context
var clientCapabilities *featureflags.ClientCapabilities
if capabilities, ok := featureflags.GetClientCapabilities(ctx); ok {
clientCapabilities = &capabilities
}

visitor := &sampleSeriesVisitor{
tenantID: s.TenantID,
limits: d.limits,
profile: s.Profile,
tenantID: s.TenantID,
limits: d.limits,
profile: s.Profile,
clientCapabilities: clientCapabilities,
}
if err := visit(s.Profile.Profile, s.Labels, relabelingRules, visitor); err != nil {
validation.DiscardedProfiles.WithLabelValues(string(validation.ReasonOf(err)), s.TenantID).Add(float64(s.TotalProfiles))
Expand Down Expand Up @@ -1164,18 +1174,19 @@ func (d *Distributor) visitSampleSeries(s *distributormodel.ProfileSeries, visit
}

type sampleSeriesVisitor struct {
tenantID string
limits Limits
profile *pprof.Profile
exp *pprof.SampleExporter
series []*distributormodel.ProfileSeries
tenantID string
limits Limits
profile *pprof.Profile
exp *pprof.SampleExporter
clientCapabilities *featureflags.ClientCapabilities
series []*distributormodel.ProfileSeries

discardedBytes int
discardedProfiles int
}

func (v *sampleSeriesVisitor) ValidateLabels(labels phlaremodel.Labels) error {
return validation.ValidateLabels(v.limits, v.tenantID, labels)
return validation.ValidateLabels(v.clientCapabilities, v.limits, v.tenantID, labels)
}

func (v *sampleSeriesVisitor) VisitProfile(labels phlaremodel.Labels) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ func Test_SampleLabels_Ingester(t *testing.T) {
}}, overrides, nil, log.NewLogfmtLogger(os.Stdout), nil)
require.NoError(t, err)
var series []*distributormodel.ProfileSeries
series, err = d.visitSampleSeries(tc.pushReq, visitSampleSeriesForIngester)
series, err = d.visitSampleSeries(context.Background(), tc.pushReq, visitSampleSeriesForIngester)
assert.Equal(t, tc.expectBytesDropped, float64(tc.pushReq.DiscardedBytesRelabeling))
assert.Equal(t, tc.expectProfilesDropped, float64(tc.pushReq.DiscardedProfilesRelabeling))

Expand Down Expand Up @@ -1786,7 +1786,7 @@ func Test_SampleLabels_SegmentWriter(t *testing.T) {

require.NoError(t, err)
var series []*distributormodel.ProfileSeries
series, err = d.visitSampleSeries(tc.pushReq, visitSampleSeriesForSegmentWriter)
series, err = d.visitSampleSeries(context.Background(), tc.pushReq, visitSampleSeriesForSegmentWriter)
assert.Equal(t, tc.expectBytesDropped, float64(tc.pushReq.DiscardedBytesRelabeling))
assert.Equal(t, tc.expectProfilesDropped, float64(tc.pushReq.DiscardedProfilesRelabeling))

Expand Down
41 changes: 33 additions & 8 deletions pkg/featureflags/client_capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,42 @@ package featureflags

import (
"context"
"flag"
"mime"
"net/http"
"strings"

"connectrpc.com/connect"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/middleware"
"github.com/grafana/pyroscope/pkg/util"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

const (
// Capability names - update parseClientCapabilities below when new capabilities added
allowUtf8LabelNamesCapabilityName string = "allow-utf8-labelnames"

// Config
clientCapabilityPrefix = "client-capability."
allowUtf8LabelNames = clientCapabilityPrefix + allowUtf8LabelNamesCapabilityName
)

type ClientCapabilityConfig struct {
AllowUtf8LabelNames bool `yaml:"allow_utf_8_label_names" category:"experimental"`
}

func (cfg *ClientCapabilityConfig) RegisterFlags(fs *flag.FlagSet) {
fs.BoolVar(
&cfg.AllowUtf8LabelNames,
allowUtf8LabelNames,
false,
"Enable reading and writing utf-8 label names. To use this feature, API calls must "+
"include `allow-utf8-labelnames=true` in the `Accept` header.",
)
}

// Define a custom context key type to avoid collisions
type contextKey struct{}

Expand All @@ -35,7 +54,7 @@ func GetClientCapabilities(ctx context.Context) (ClientCapabilities, bool) {
return value, ok
}

func ClientCapabilitiesGRPCMiddleware() grpc.UnaryServerInterceptor {
func ClientCapabilitiesGRPCMiddleware(cfg *ClientCapabilityConfig, logger log.Logger) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
Expand All @@ -56,7 +75,7 @@ func ClientCapabilitiesGRPCMiddleware() grpc.UnaryServerInterceptor {
}

// Reuse existing HTTP header parsing
clientCapabilities, err := parseClientCapabilities(httpHeader)
clientCapabilities, err := parseClientCapabilities(httpHeader, cfg, logger)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
Expand All @@ -68,10 +87,10 @@ func ClientCapabilitiesGRPCMiddleware() grpc.UnaryServerInterceptor {

// ClientCapabilitiesHttpMiddleware creates middleware that extracts and parses the
// `Accept` header for capabilities the client supports
func ClientCapabilitiesHttpMiddleware() middleware.Interface {
func ClientCapabilitiesHttpMiddleware(cfg *ClientCapabilityConfig, logger log.Logger) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
clientCapabilities, err := parseClientCapabilities(r.Header)
clientCapabilities, err := parseClientCapabilities(r.Header, cfg, logger)
if err != nil {
http.Error(w, "Invalid header format: "+err.Error(), http.StatusBadRequest)
return
Expand All @@ -83,7 +102,7 @@ func ClientCapabilitiesHttpMiddleware() middleware.Interface {
})
}

func parseClientCapabilities(header http.Header) (ClientCapabilities, error) {
func parseClientCapabilities(header http.Header, cfg *ClientCapabilityConfig, logger log.Logger) (ClientCapabilities, error) {
acceptHeaderValues := header.Values("Accept")

var capabilities ClientCapabilities
Expand All @@ -100,10 +119,16 @@ func parseClientCapabilities(header http.Header) (ClientCapabilities, error) {
switch k {
case allowUtf8LabelNamesCapabilityName:
if v == "true" {
capabilities.AllowUtf8LabelNames = true
if !cfg.AllowUtf8LabelNames {
level.Warn(logger).Log(
"msg", "client requested capability that is not enabled on server",
"capability", allowUtf8LabelNamesCapabilityName)
} else {
capabilities.AllowUtf8LabelNames = true
}
}
default:
level.Debug(util.Logger).Log(
level.Debug(logger).Log(
"msg", "unknown capability parsed from Accept header",
"acceptHeaderKey", k,
"acceptHeaderValue", v)
Expand Down
Loading