Skip to content

Commit bfa3f6c

Browse files
committed
parse labels & inject tenant
1 parent 84982e3 commit bfa3f6c

File tree

5 files changed

+372
-47
lines changed

5 files changed

+372
-47
lines changed

pkg/api/api.go

-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/grafana/pyroscope/api/gen/proto/go/vcs/v1/vcsv1connect"
3636
"github.com/grafana/pyroscope/api/gen/proto/go/version/v1/versionv1connect"
3737
"github.com/grafana/pyroscope/api/openapiv2"
38-
pprofileotlp "github.com/grafana/pyroscope/api/otlp/collector/profiles/v1experimental"
3938
"github.com/grafana/pyroscope/pkg/adhocprofiles"
4039
"github.com/grafana/pyroscope/pkg/compactor"
4140
"github.com/grafana/pyroscope/pkg/distributor"
@@ -223,8 +222,6 @@ func (a *API) RegisterDistributor(d *distributor.Distributor) {
223222
{Desc: "Ring status", Path: "/distributor/ring"},
224223
})
225224

226-
pprofileotlp.RegisterProfilesServiceServer(a.server.GRPCOnHTTPServer, otlpHandler)
227-
228225
a.RegisterRoute("/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export", otlpHandler, true, true, "POST")
229226

230227
// TODO(@petethepig): implement http/protobuf and http/json support

pkg/ingester/otlp/convert.go

-7
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,6 @@ func ConvertOtelToGoogle(src *otelProfile.Profile) *googleProfile.Profile {
9292
dst.DefaultSampleType = int64(len(dst.StringTable) - 2)
9393
}
9494

95-
//b, _ := json.MarshalIndent(src, "", " ")
96-
//fmt.Println("src:")
97-
//_, _ = fmt.Fprintln(os.Stdout, string(b))
98-
//b, _ = json.MarshalIndent(dst, "", " ")
99-
//fmt.Println("dst:")
100-
//_, _ = fmt.Fprintln(os.Stdout, string(b))
101-
10295
return dst
10396
}
10497

pkg/ingester/otlp/ingest_handler.go

+126-27
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@ import (
99

1010
"connectrpc.com/connect"
1111
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
1213
"github.com/google/uuid"
1314
"google.golang.org/grpc"
1415

16+
"github.com/grafana/dskit/user"
1517
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
1618
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
1719
pprofileotlp "github.com/grafana/pyroscope/api/otlp/collector/profiles/v1experimental"
18-
"github.com/grafana/pyroscope/pkg/tenant"
20+
v1 "github.com/grafana/pyroscope/api/otlp/common/v1"
21+
"github.com/grafana/pyroscope/api/otlp/profiles/v1experimental"
1922
)
2023

2124
type ingestHandler struct {
@@ -25,7 +28,6 @@ type ingestHandler struct {
2528
handler http.Handler
2629
}
2730

28-
// TODO(@petethepig): split http and grpc
2931
type Handler interface {
3032
http.Handler
3133
pprofileotlp.ProfilesServiceServer
@@ -65,41 +67,45 @@ func (h *ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6567

6668
// TODO(@petethepig): split http and grpc
6769
func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfilesServiceRequest) (*pprofileotlp.ExportProfilesServiceResponse, error) {
68-
69-
// TODO(@petethepig): make it tenant-aware
70-
ctx = tenant.InjectTenantID(ctx, tenant.DefaultTenantID)
71-
72-
h.log.Log("msg", "Export called")
70+
// Extracts user ID from the request metadata and returns and injects the user ID in the context
71+
_, ctx, err := user.ExtractFromGRPCRequest(ctx)
72+
if err != nil {
73+
level.Error(h.log).Log("msg", "failed to extract tenant ID from GRPC request", "err", err)
74+
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to extract tenant ID from GRPC request: %w", err)
75+
}
7376

7477
rps := er.ResourceProfiles
7578
for i := 0; i < len(rps); i++ {
7679
rp := rps[i]
7780

78-
labelsDst := []*typesv1.LabelPair{}
79-
// TODO(@petethepig): make labels work
80-
labelsDst = append(labelsDst, &typesv1.LabelPair{
81-
Name: "__name__",
82-
Value: "process_cpu",
83-
})
84-
labelsDst = append(labelsDst, &typesv1.LabelPair{
85-
Name: "service_name",
86-
Value: "otlp_test_app4",
87-
})
88-
labelsDst = append(labelsDst, &typesv1.LabelPair{
89-
Name: "__delta__",
90-
Value: "false",
91-
})
92-
labelsDst = append(labelsDst, &typesv1.LabelPair{
93-
Name: "pyroscope_spy",
94-
Value: "unknown",
95-
})
81+
// Get service name
82+
serviceName := getServiceNameFromAttributes(rp.Resource.GetAttributes())
83+
84+
// Start with default labels
85+
labels := getDefaultLabels(serviceName)
86+
87+
// Track processed attribute keys to avoid duplicates across levels
88+
processedKeys := make(map[string]bool)
89+
90+
// Add resource attributes
91+
labels = appendAttributesUnique(labels, rp.Resource.GetAttributes(), processedKeys)
9692

9793
sps := rp.ScopeProfiles
9894
for j := 0; j < len(sps); j++ {
9995
sp := sps[j]
96+
97+
// Add scope attributes
98+
labels = appendAttributesUnique(labels, sp.Scope.GetAttributes(), processedKeys)
99+
100100
for k := 0; k < len(sp.Profiles); k++ {
101101
p := sp.Profiles[k]
102102

103+
// Add profile attributes
104+
labels = appendAttributesUnique(labels, p.GetAttributes(), processedKeys)
105+
106+
// Add profile-specific attributes from samples/attributetable
107+
labels = appendProfileLabels(labels, p.Profile, processedKeys)
108+
103109
pprofBytes, err := OprofToPprof(p.Profile)
104110
if err != nil {
105111
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to convert from OTLP to legacy pprof: %w", err)
@@ -110,7 +116,7 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
110116
req := &pushv1.PushRequest{
111117
Series: []*pushv1.RawProfileSeries{
112118
{
113-
Labels: labelsDst,
119+
Labels: labels,
114120
Samples: []*pushv1.RawSample{{
115121
RawProfile: pprofBytes,
116122
ID: uuid.New().String(),
@@ -125,8 +131,101 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
125131
}
126132
}
127133
}
128-
129134
}
130135

131136
return &pprofileotlp.ExportProfilesServiceResponse{}, nil
132137
}
138+
139+
// getServiceNameFromAttributes extracts service name from OTLP resource attributes.
140+
// Returns "unknown" if service name is not found or empty.
141+
func getServiceNameFromAttributes(attrs []v1.KeyValue) string {
142+
for _, attr := range attrs {
143+
if attr.Key == "service.name" {
144+
val := attr.GetValue()
145+
if sv := val.GetStringValue(); sv != "" {
146+
return sv
147+
}
148+
break
149+
}
150+
}
151+
return "unknown"
152+
}
153+
154+
// getDefaultLabels returns the required base labels for Pyroscope profiles
155+
func getDefaultLabels(serviceName string) []*typesv1.LabelPair {
156+
return []*typesv1.LabelPair{
157+
{
158+
Name: "__name__",
159+
Value: "process_cpu",
160+
},
161+
{
162+
Name: "service_name",
163+
Value: serviceName,
164+
},
165+
{
166+
Name: "__delta__",
167+
Value: "false",
168+
},
169+
{
170+
Name: "pyroscope_spy",
171+
Value: "unknown",
172+
},
173+
}
174+
}
175+
176+
func appendAttributesUnique(labels []*typesv1.LabelPair, attrs []v1.KeyValue, processedKeys map[string]bool) []*typesv1.LabelPair {
177+
for _, attr := range attrs {
178+
// Skip if we've already seen this key at any level
179+
if processedKeys[attr.Key] {
180+
continue
181+
}
182+
183+
val := attr.GetValue()
184+
if sv := val.GetStringValue(); sv != "" {
185+
labels = append(labels, &typesv1.LabelPair{
186+
Name: attr.Key,
187+
Value: sv,
188+
})
189+
processedKeys[attr.Key] = true
190+
}
191+
}
192+
return labels
193+
}
194+
195+
func appendProfileLabels(labels []*typesv1.LabelPair, profile *v1experimental.Profile, processedKeys map[string]bool) []*typesv1.LabelPair {
196+
if profile == nil {
197+
return labels
198+
}
199+
200+
// Create mapping of attribute indices to their values
201+
attrMap := make(map[uint64]v1.AnyValue)
202+
for i, attr := range profile.GetAttributeTable() {
203+
val := attr.GetValue()
204+
if val.GetValue() != nil {
205+
attrMap[uint64(i)] = val
206+
}
207+
}
208+
209+
// Process only attributes referenced in samples
210+
for _, sample := range profile.Sample {
211+
for _, attrIdx := range sample.GetAttributes() {
212+
attr := profile.AttributeTable[attrIdx]
213+
// Skip if we've already processed this key at any level
214+
if processedKeys[attr.Key] {
215+
continue
216+
}
217+
218+
if value, exists := attrMap[attrIdx]; exists {
219+
if sv := value.GetStringValue(); sv != "" {
220+
labels = append(labels, &typesv1.LabelPair{
221+
Name: attr.Key,
222+
Value: sv,
223+
})
224+
processedKeys[attr.Key] = true
225+
}
226+
}
227+
}
228+
}
229+
230+
return labels
231+
}

0 commit comments

Comments
 (0)