Skip to content

Commit d302b4a

Browse files
authored
Merge pull request #47 from matt-gp/extract-tenant-function
Added Tenant Extraction Function
2 parents f7642d1 + 40ddea6 commit d302b4a

2 files changed

Lines changed: 335 additions & 93 deletions

File tree

internal/processor/processor.go

Lines changed: 78 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -123,46 +123,39 @@ func New[T ResourceData](
123123
}, nil
124124
}
125125

126+
// signalTypeAttr returns a common attribute for the signal type to be used in metrics and traces.
126127
func (p *Processor[T]) signalTypeAttr() attribute.KeyValue {
127128
return attribute.String("signal.type", p.signalType)
128129
}
129130

131+
// signalTypeLogAttr returns a common log attribute for the signal type to be used in logs.
130132
func (p *Processor[T]) signalTypeLogAttr() log.KeyValue {
131133
return log.String("signal.type", p.signalType)
132134
}
133135

134-
func (p *Processor[T]) proxyRecordsMetricAdd(ctx context.Context, tenant string, count int64, opts ...attribute.KeyValue) {
135-
attrs := []attribute.KeyValue{
136-
attribute.String("signal.tenant", tenant),
137-
p.signalTypeAttr(),
138-
}
139-
attrs = append(attrs, opts...)
136+
// proxyRecordsMetricAdd adds the given count to the proxy records metric with common attributes.
137+
func (p *Processor[T]) proxyRecordsMetricAdd(ctx context.Context, tenant string, count int64, attrs []attribute.KeyValue) {
138+
attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr())
140139
p.proxyRecordsMetric.Add(
141140
ctx,
142141
count,
143142
metric.WithAttributes(attrs...),
144143
)
145144
}
146145

147-
func (p *Processor[T]) proxyRequestsMetricAdd(ctx context.Context, tenant string, opts ...attribute.KeyValue) {
148-
attrs := []attribute.KeyValue{
149-
attribute.String("signal.tenant", tenant),
150-
p.signalTypeAttr(),
151-
}
152-
attrs = append(attrs, opts...)
146+
// proxyRequestsMetricAdd adds 1 to the proxy requests metric with common attributes.
147+
func (p *Processor[T]) proxyRequestsMetricAdd(ctx context.Context, tenant string, attrs []attribute.KeyValue) {
148+
attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr())
153149
p.proxyRequestsMetric.Add(
154150
ctx,
155151
1,
156152
metric.WithAttributes(attrs...),
157153
)
158154
}
159155

160-
func (p *Processor[T]) proxyLatencyMetricRecord(ctx context.Context, tenant string, latency int64, opts ...attribute.KeyValue) {
161-
attrs := []attribute.KeyValue{
162-
attribute.String("signal.tenant", tenant),
163-
p.signalTypeAttr(),
164-
}
165-
attrs = append(attrs, opts...)
156+
// proxyLatencyMetricRecord records the given latency to the proxy latency metric with common attributes.
157+
func (p *Processor[T]) proxyLatencyMetricRecord(ctx context.Context, tenant string, latency int64, attrs []attribute.KeyValue) {
158+
attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr())
166159
p.proxyLatencyMetric.Record(
167160
ctx,
168161
latency,
@@ -184,52 +177,15 @@ func (p *Processor[T]) Partition(ctx context.Context, resources []T) map[string]
184177
tenantMap := make(map[string][]T)
185178

186179
for _, resourceData := range resources {
187-
resource := p.getResource(resourceData)
188-
logger.Trace(
189-
ctx,
190-
p.logger,
191-
fmt.Sprintf("%+v", resource),
192-
p.signalTypeLogAttr(),
193-
)
194-
195-
tenant := ""
196-
197-
// First, check for the dedicated tenant label
198-
if p.config.Tenant.Label != "" {
199-
for _, attr := range resource.GetAttributes() {
200-
if attr.GetKey() == p.config.Tenant.Label {
201-
tenant = attr.GetValue().GetStringValue()
202-
break
203-
}
204-
}
205-
}
206-
207-
// If not found and we have additional labels, check those
208-
if tenant == "" && len(p.config.Tenant.Labels) > 0 {
209-
for _, attr := range resource.GetAttributes() {
210-
if slices.Contains(p.config.Tenant.Labels, attr.GetKey()) {
211-
tenant = attr.GetValue().GetStringValue()
212-
break
213-
}
214-
}
215-
}
216-
180+
tenant := p.extractTenantFromResource(resourceData)
217181
if tenant == "" {
218-
if p.config.Tenant.Default == "" {
219-
logger.Warn(
220-
ctx,
221-
p.logger,
222-
"No tenant found in attributes and no default tenant configured",
223-
p.signalTypeLogAttr(),
224-
)
225-
continue
226-
}
227-
228-
tenant = p.config.Tenant.Default
229-
resource.Attributes = append(resource.Attributes, &commonpb.KeyValue{
230-
Key: p.config.Tenant.Label,
231-
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: tenant}},
232-
})
182+
logger.Warn(
183+
ctx,
184+
p.logger,
185+
"No tenant found in attributes and no default tenant configured",
186+
p.signalTypeLogAttr(),
187+
)
188+
continue
233189
}
234190

235191
tenantMap[tenant] = append(tenantMap[tenant], resourceData)
@@ -263,7 +219,7 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
263219

264220
resp, err := p.send(ctx, tenant, resources)
265221
if err != nil {
266-
p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)))
222+
p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)), nil)
267223

268224
logger.Error(
269225
ctx,
@@ -282,9 +238,18 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
282238
strconv.Itoa(resp.StatusCode),
283239
)
284240

285-
p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)), signalResponseStatusCodeAttr)
241+
p.proxyRecordsMetricAdd(
242+
ctx,
243+
tenant,
244+
int64(len(resources)),
245+
[]attribute.KeyValue{signalResponseStatusCodeAttr},
246+
)
286247

287-
p.proxyRequestsMetricAdd(ctx, tenant, signalResponseStatusCodeAttr)
248+
p.proxyRequestsMetricAdd(
249+
ctx,
250+
tenant,
251+
[]attribute.KeyValue{signalResponseStatusCodeAttr},
252+
)
288253

289254
logger.Debug(
290255
ctx,
@@ -414,7 +379,53 @@ func (p *Processor[T]) send(
414379
span.SetAttributes(signalResponseStatusCodeAttr)
415380
span.SetStatus(codes.Ok, "sent successfully")
416381

417-
p.proxyLatencyMetricRecord(ctx, tenant, time.Since(start).Milliseconds(), signalResponseStatusCodeAttr)
382+
p.proxyLatencyMetricRecord(
383+
ctx,
384+
tenant,
385+
time.Since(start).Milliseconds(),
386+
[]attribute.KeyValue{signalResponseStatusCodeAttr},
387+
)
418388

419389
return *resp, nil
420390
}
391+
392+
// extractTenantFromResource extracts the tenant information from the resource attributes
393+
// based on the configured tenant labels and returns it.
394+
func (p *Processor[T]) extractTenantFromResource(resourceData T) string {
395+
tenant := ""
396+
resource := p.getResource(resourceData)
397+
398+
// First, check for the dedicated tenant label
399+
if p.config.Tenant.Label != "" {
400+
for _, attr := range resource.GetAttributes() {
401+
if attr.GetKey() == p.config.Tenant.Label {
402+
tenant = attr.GetValue().GetStringValue()
403+
break
404+
}
405+
}
406+
}
407+
408+
// If not found and we have additional labels, check those
409+
if tenant == "" && len(p.config.Tenant.Labels) > 0 {
410+
for _, attr := range resource.GetAttributes() {
411+
if slices.Contains(p.config.Tenant.Labels, attr.GetKey()) {
412+
tenant = attr.GetValue().GetStringValue()
413+
break
414+
}
415+
}
416+
}
417+
418+
if tenant == "" {
419+
if p.config.Tenant.Default == "" {
420+
return ""
421+
}
422+
423+
tenant = p.config.Tenant.Default
424+
resource.Attributes = append(resource.Attributes, &commonpb.KeyValue{
425+
Key: p.config.Tenant.Label,
426+
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: tenant}},
427+
})
428+
}
429+
430+
return tenant
431+
}

0 commit comments

Comments
 (0)