Skip to content

Commit ff60e7c

Browse files
committed
parse labels & inject tenant
1 parent 84982e3 commit ff60e7c

File tree

5 files changed

+373
-47
lines changed

5 files changed

+373
-47
lines changed

pkg/api/api.go

-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/grafana/pyroscope/api/gen/proto/go/vcs/v1/vcsv1connect"
3636
"github.com/grafana/pyroscope/api/gen/proto/go/version/v1/versionv1connect"
3737
"github.com/grafana/pyroscope/api/openapiv2"
38-
pprofileotlp "github.com/grafana/pyroscope/api/otlp/collector/profiles/v1experimental"
3938
"github.com/grafana/pyroscope/pkg/adhocprofiles"
4039
"github.com/grafana/pyroscope/pkg/compactor"
4140
"github.com/grafana/pyroscope/pkg/distributor"
@@ -223,8 +222,6 @@ func (a *API) RegisterDistributor(d *distributor.Distributor) {
223222
{Desc: "Ring status", Path: "/distributor/ring"},
224223
})
225224

226-
pprofileotlp.RegisterProfilesServiceServer(a.server.GRPCOnHTTPServer, otlpHandler)
227-
228225
a.RegisterRoute("/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export", otlpHandler, true, true, "POST")
229226

230227
// TODO(@petethepig): implement http/protobuf and http/json support

pkg/ingester/otlp/convert.go

-7
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,6 @@ func ConvertOtelToGoogle(src *otelProfile.Profile) *googleProfile.Profile {
9292
dst.DefaultSampleType = int64(len(dst.StringTable) - 2)
9393
}
9494

95-
//b, _ := json.MarshalIndent(src, "", " ")
96-
//fmt.Println("src:")
97-
//_, _ = fmt.Fprintln(os.Stdout, string(b))
98-
//b, _ = json.MarshalIndent(dst, "", " ")
99-
//fmt.Println("dst:")
100-
//_, _ = fmt.Fprintln(os.Stdout, string(b))
101-
10295
return dst
10396
}
10497

pkg/ingester/otlp/ingest_handler.go

+127-27
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@ import (
99

1010
"connectrpc.com/connect"
1111
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
1213
"github.com/google/uuid"
1314
"google.golang.org/grpc"
1415

16+
"github.com/grafana/dskit/user"
1517
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
1618
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
1719
pprofileotlp "github.com/grafana/pyroscope/api/otlp/collector/profiles/v1experimental"
18-
"github.com/grafana/pyroscope/pkg/tenant"
20+
v1 "github.com/grafana/pyroscope/api/otlp/common/v1"
21+
"github.com/grafana/pyroscope/api/otlp/profiles/v1experimental"
1922
)
2023

2124
type ingestHandler struct {
@@ -25,7 +28,6 @@ type ingestHandler struct {
2528
handler http.Handler
2629
}
2730

28-
// TODO(@petethepig): split http and grpc
2931
type Handler interface {
3032
http.Handler
3133
pprofileotlp.ProfilesServiceServer
@@ -65,41 +67,46 @@ func (h *ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6567

6668
// TODO(@petethepig): split http and grpc
6769
func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfilesServiceRequest) (*pprofileotlp.ExportProfilesServiceResponse, error) {
68-
69-
// TODO(@petethepig): make it tenant-aware
70-
ctx = tenant.InjectTenantID(ctx, tenant.DefaultTenantID)
71-
72-
h.log.Log("msg", "Export called")
70+
// Extracts user ID from the request metadata and returns and injects the user ID in the context
71+
_, ctx, err := user.ExtractFromGRPCRequest(ctx)
72+
if err != nil {
73+
level.Error(h.log).Log("msg", "failed to extract tenant ID from GRPC request", "err", err)
74+
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to extract tenant ID from GRPC request: %w", err)
75+
}
7376

7477
rps := er.ResourceProfiles
7578
for i := 0; i < len(rps); i++ {
7679
rp := rps[i]
7780

78-
labelsDst := []*typesv1.LabelPair{}
79-
// TODO(@petethepig): make labels work
80-
labelsDst = append(labelsDst, &typesv1.LabelPair{
81-
Name: "__name__",
82-
Value: "process_cpu",
83-
})
84-
labelsDst = append(labelsDst, &typesv1.LabelPair{
85-
Name: "service_name",
86-
Value: "otlp_test_app4",
87-
})
88-
labelsDst = append(labelsDst, &typesv1.LabelPair{
89-
Name: "__delta__",
90-
Value: "false",
91-
})
92-
labelsDst = append(labelsDst, &typesv1.LabelPair{
93-
Name: "pyroscope_spy",
94-
Value: "unknown",
95-
})
81+
// Get service name
82+
serviceName := getServiceNameFromAttributes(rp.Resource.GetAttributes())
83+
fmt.Println("service.name: ", serviceName)
84+
85+
// Start with default labels
86+
labels := getDefaultLabels(serviceName)
87+
88+
// Track processed attribute keys to avoid duplicates across levels
89+
processedKeys := make(map[string]bool)
90+
91+
// Add resource attributes
92+
labels = appendAttributesUnique(labels, rp.Resource.GetAttributes(), processedKeys)
9693

9794
sps := rp.ScopeProfiles
9895
for j := 0; j < len(sps); j++ {
9996
sp := sps[j]
97+
98+
// Add scope attributes
99+
labels = appendAttributesUnique(labels, sp.Scope.GetAttributes(), processedKeys)
100+
100101
for k := 0; k < len(sp.Profiles); k++ {
101102
p := sp.Profiles[k]
102103

104+
// Add profile attributes
105+
labels = appendAttributesUnique(labels, p.GetAttributes(), processedKeys)
106+
107+
// Add profile-specific attributes from samples/attributetable
108+
labels = appendProfileLabels(labels, p.Profile, processedKeys)
109+
103110
pprofBytes, err := OprofToPprof(p.Profile)
104111
if err != nil {
105112
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to convert from OTLP to legacy pprof: %w", err)
@@ -110,7 +117,7 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
110117
req := &pushv1.PushRequest{
111118
Series: []*pushv1.RawProfileSeries{
112119
{
113-
Labels: labelsDst,
120+
Labels: labels,
114121
Samples: []*pushv1.RawSample{{
115122
RawProfile: pprofBytes,
116123
ID: uuid.New().String(),
@@ -125,8 +132,101 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
125132
}
126133
}
127134
}
128-
129135
}
130136

131137
return &pprofileotlp.ExportProfilesServiceResponse{}, nil
132138
}
139+
140+
// getServiceNameFromAttributes extracts service name from OTLP resource attributes.
141+
// Returns "unknown" if service name is not found or empty.
142+
func getServiceNameFromAttributes(attrs []v1.KeyValue) string {
143+
for _, attr := range attrs {
144+
if attr.Key == "service.name" {
145+
val := attr.GetValue()
146+
if sv := val.GetStringValue(); sv != "" {
147+
return sv
148+
}
149+
break
150+
}
151+
}
152+
return "unknown"
153+
}
154+
155+
// getDefaultLabels returns the required base labels for Pyroscope profiles
156+
func getDefaultLabels(serviceName string) []*typesv1.LabelPair {
157+
return []*typesv1.LabelPair{
158+
{
159+
Name: "__name__",
160+
Value: "process_cpu",
161+
},
162+
{
163+
Name: "service_name",
164+
Value: serviceName,
165+
},
166+
{
167+
Name: "__delta__",
168+
Value: "false",
169+
},
170+
{
171+
Name: "pyroscope_spy",
172+
Value: "unknown",
173+
},
174+
}
175+
}
176+
177+
func appendAttributesUnique(labels []*typesv1.LabelPair, attrs []v1.KeyValue, processedKeys map[string]bool) []*typesv1.LabelPair {
178+
for _, attr := range attrs {
179+
// Skip if we've already seen this key at any level
180+
if processedKeys[attr.Key] {
181+
continue
182+
}
183+
184+
val := attr.GetValue()
185+
if sv := val.GetStringValue(); sv != "" {
186+
labels = append(labels, &typesv1.LabelPair{
187+
Name: attr.Key,
188+
Value: sv,
189+
})
190+
processedKeys[attr.Key] = true
191+
}
192+
}
193+
return labels
194+
}
195+
196+
func appendProfileLabels(labels []*typesv1.LabelPair, profile *v1experimental.Profile, processedKeys map[string]bool) []*typesv1.LabelPair {
197+
if profile == nil {
198+
return labels
199+
}
200+
201+
// Create mapping of attribute indices to their values
202+
attrMap := make(map[uint64]v1.AnyValue)
203+
for i, attr := range profile.GetAttributeTable() {
204+
val := attr.GetValue()
205+
if val.GetValue() != nil {
206+
attrMap[uint64(i)] = val
207+
}
208+
}
209+
210+
// Process only attributes referenced in samples
211+
for _, sample := range profile.Sample {
212+
for _, attrIdx := range sample.GetAttributes() {
213+
attr := profile.AttributeTable[attrIdx]
214+
// Skip if we've already processed this key at any level
215+
if processedKeys[attr.Key] {
216+
continue
217+
}
218+
219+
if value, exists := attrMap[attrIdx]; exists {
220+
if sv := value.GetStringValue(); sv != "" {
221+
labels = append(labels, &typesv1.LabelPair{
222+
Name: attr.Key,
223+
Value: sv,
224+
})
225+
processedKeys[attr.Key] = true
226+
}
227+
}
228+
}
229+
}
230+
231+
return labels
232+
}

0 commit comments

Comments
 (0)