@@ -10,13 +10,16 @@ import (
10
10
11
11
"connectrpc.com/connect"
12
12
"github.com/go-kit/log"
13
+ "github.com/go-kit/log/level"
13
14
"github.com/google/uuid"
14
15
"google.golang.org/grpc"
15
16
17
+ "github.com/grafana/dskit/user"
16
18
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
17
19
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
18
20
pprofileotlp "github.com/grafana/pyroscope/api/otlp/collector/profiles/v1experimental"
19
- "github.com/grafana/pyroscope/pkg/tenant"
21
+ v1 "github.com/grafana/pyroscope/api/otlp/common/v1"
22
+ "github.com/grafana/pyroscope/api/otlp/profiles/v1experimental"
20
23
)
21
24
22
25
type ingestHandler struct {
@@ -26,7 +29,6 @@ type ingestHandler struct {
26
29
handler http.Handler
27
30
}
28
31
29
- // TODO(@petethepig): split http and grpc
30
32
type Handler interface {
31
33
http.Handler
32
34
pprofileotlp.ProfilesServiceServer
@@ -66,41 +68,45 @@ func (h *ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
66
68
67
69
// TODO(@petethepig): split http and grpc
68
70
func (h * ingestHandler ) Export (ctx context.Context , er * pprofileotlp.ExportProfilesServiceRequest ) (* pprofileotlp.ExportProfilesServiceResponse , error ) {
69
-
70
- // TODO(@petethepig): make it tenant-aware
71
- ctx = tenant .InjectTenantID (ctx , tenant .DefaultTenantID )
72
-
73
- h .log .Log ("msg" , "Export called" )
71
+ // Extracts user ID from the request metadata and returns and injects the user ID in the context
72
+ _ , ctx , err := user .ExtractFromGRPCRequest (ctx )
73
+ if err != nil {
74
+ level .Error (h .log ).Log ("msg" , "failed to extract tenant ID from GRPC request" , "err" , err )
75
+ return & pprofileotlp.ExportProfilesServiceResponse {}, fmt .Errorf ("failed to extract tenant ID from GRPC request: %w" , err )
76
+ }
74
77
75
78
rps := er .ResourceProfiles
76
79
for i := 0 ; i < len (rps ); i ++ {
77
80
rp := rps [i ]
78
81
79
- labelsDst := []* typesv1.LabelPair {}
80
- // TODO(@petethepig): make labels work
81
- labelsDst = append (labelsDst , & typesv1.LabelPair {
82
- Name : "__name__" ,
83
- Value : "process_cpu" ,
84
- })
85
- labelsDst = append (labelsDst , & typesv1.LabelPair {
86
- Name : "service_name" ,
87
- Value : "otlp_test_app4" ,
88
- })
89
- labelsDst = append (labelsDst , & typesv1.LabelPair {
90
- Name : "__delta__" ,
91
- Value : "false" ,
92
- })
93
- labelsDst = append (labelsDst , & typesv1.LabelPair {
94
- Name : "pyroscope_spy" ,
95
- Value : "unknown" ,
96
- })
82
+ // Get service name
83
+ serviceName := getServiceNameFromAttributes (rp .Resource .GetAttributes ())
84
+
85
+ // Start with default labels
86
+ labels := getDefaultLabels (serviceName )
87
+
88
+ // Track processed attribute keys to avoid duplicates across levels
89
+ processedKeys := make (map [string ]bool )
90
+
91
+ // Add resource attributes
92
+ labels = appendAttributesUnique (labels , rp .Resource .GetAttributes (), processedKeys )
97
93
98
94
sps := rp .ScopeProfiles
99
95
for j := 0 ; j < len (sps ); j ++ {
100
96
sp := sps [j ]
97
+
98
+ // Add scope attributes
99
+ labels = appendAttributesUnique (labels , sp .Scope .GetAttributes (), processedKeys )
100
+
101
101
for k := 0 ; k < len (sp .Profiles ); k ++ {
102
102
p := sp .Profiles [k ]
103
103
104
+ // Add profile attributes
105
+ labels = appendAttributesUnique (labels , p .GetAttributes (), processedKeys )
106
+
107
+ // Add profile-specific attributes from samples/attributetable
108
+ labels = appendProfileLabels (labels , p .Profile , processedKeys )
109
+
104
110
pprofBytes , err := OprofToPprof (p .Profile )
105
111
if err != nil {
106
112
return & pprofileotlp.ExportProfilesServiceResponse {}, fmt .Errorf ("failed to convert from OTLP to legacy pprof: %w" , err )
@@ -111,7 +117,7 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
111
117
req := & pushv1.PushRequest {
112
118
Series : []* pushv1.RawProfileSeries {
113
119
{
114
- Labels : labelsDst ,
120
+ Labels : labels ,
115
121
Samples : []* pushv1.RawSample {{
116
122
RawProfile : pprofBytes ,
117
123
ID : uuid .New ().String (),
@@ -126,8 +132,101 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
126
132
}
127
133
}
128
134
}
129
-
130
135
}
131
136
132
137
return & pprofileotlp.ExportProfilesServiceResponse {}, nil
133
138
}
139
+
140
+ // getServiceNameFromAttributes extracts service name from OTLP resource attributes.
141
+ // Returns "unknown" if service name is not found or empty.
142
+ func getServiceNameFromAttributes (attrs []v1.KeyValue ) string {
143
+ for _ , attr := range attrs {
144
+ if attr .Key == "service.name" {
145
+ val := attr .GetValue ()
146
+ if sv := val .GetStringValue (); sv != "" {
147
+ return sv
148
+ }
149
+ break
150
+ }
151
+ }
152
+ return "unknown"
153
+ }
154
+
155
+ // getDefaultLabels returns the required base labels for Pyroscope profiles
156
+ func getDefaultLabels (serviceName string ) []* typesv1.LabelPair {
157
+ return []* typesv1.LabelPair {
158
+ {
159
+ Name : "__name__" ,
160
+ Value : "process_cpu" ,
161
+ },
162
+ {
163
+ Name : "service_name" ,
164
+ Value : serviceName ,
165
+ },
166
+ {
167
+ Name : "__delta__" ,
168
+ Value : "false" ,
169
+ },
170
+ {
171
+ Name : "pyroscope_spy" ,
172
+ Value : "unknown" ,
173
+ },
174
+ }
175
+ }
176
+
177
+ func appendAttributesUnique (labels []* typesv1.LabelPair , attrs []v1.KeyValue , processedKeys map [string ]bool ) []* typesv1.LabelPair {
178
+ for _ , attr := range attrs {
179
+ // Skip if we've already seen this key at any level
180
+ if processedKeys [attr .Key ] {
181
+ continue
182
+ }
183
+
184
+ val := attr .GetValue ()
185
+ if sv := val .GetStringValue (); sv != "" {
186
+ labels = append (labels , & typesv1.LabelPair {
187
+ Name : attr .Key ,
188
+ Value : sv ,
189
+ })
190
+ processedKeys [attr .Key ] = true
191
+ }
192
+ }
193
+ return labels
194
+ }
195
+
196
+ func appendProfileLabels (labels []* typesv1.LabelPair , profile * v1experimental.Profile , processedKeys map [string ]bool ) []* typesv1.LabelPair {
197
+ if profile == nil {
198
+ return labels
199
+ }
200
+
201
+ // Create mapping of attribute indices to their values
202
+ attrMap := make (map [uint64 ]v1.AnyValue )
203
+ for i , attr := range profile .GetAttributeTable () {
204
+ val := attr .GetValue ()
205
+ if val .GetValue () != nil {
206
+ attrMap [uint64 (i )] = val
207
+ }
208
+ }
209
+
210
+ // Process only attributes referenced in samples
211
+ for _ , sample := range profile .Sample {
212
+ for _ , attrIdx := range sample .GetAttributes () {
213
+ attr := profile .AttributeTable [attrIdx ]
214
+ // Skip if we've already processed this key at any level
215
+ if processedKeys [attr .Key ] {
216
+ continue
217
+ }
218
+
219
+ if value , exists := attrMap [attrIdx ]; exists {
220
+ if sv := value .GetStringValue (); sv != "" {
221
+ labels = append (labels , & typesv1.LabelPair {
222
+ Name : attr .Key ,
223
+ Value : sv ,
224
+ })
225
+ processedKeys [attr .Key ] = true
226
+ }
227
+ }
228
+ }
229
+ }
230
+
231
+ return labels
232
+ }
0 commit comments