From 40ddea603d0b4ef031c088862b9a7c1661cb3cc1 Mon Sep 17 00:00:00 2001 From: matt-gp Date: Wed, 11 Mar 2026 22:08:27 +0000 Subject: [PATCH] Added Tenant Extraction Function --- internal/processor/processor.go | 145 +++++++------- internal/processor/processor_test.go | 283 ++++++++++++++++++++++++--- 2 files changed, 335 insertions(+), 93 deletions(-) diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 11c9e45..1744772 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -123,20 +123,19 @@ func New[T ResourceData]( }, nil } +// signalTypeAttr returns a common attribute for the signal type to be used in metrics and traces. func (p *Processor[T]) signalTypeAttr() attribute.KeyValue { return attribute.String("signal.type", p.signalType) } +// signalTypeLogAttr returns a common log attribute for the signal type to be used in logs. func (p *Processor[T]) signalTypeLogAttr() log.KeyValue { return log.String("signal.type", p.signalType) } -func (p *Processor[T]) proxyRecordsMetricAdd(ctx context.Context, tenant string, count int64, opts ...attribute.KeyValue) { - attrs := []attribute.KeyValue{ - attribute.String("signal.tenant", tenant), - p.signalTypeAttr(), - } - attrs = append(attrs, opts...) +// proxyRecordsMetricAdd adds the given count to the proxy records metric with common attributes. +func (p *Processor[T]) proxyRecordsMetricAdd(ctx context.Context, tenant string, count int64, attrs []attribute.KeyValue) { + attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr()) p.proxyRecordsMetric.Add( ctx, count, @@ -144,12 +143,9 @@ func (p *Processor[T]) proxyRecordsMetricAdd(ctx context.Context, tenant string, ) } -func (p *Processor[T]) proxyRequestsMetricAdd(ctx context.Context, tenant string, opts ...attribute.KeyValue) { - attrs := []attribute.KeyValue{ - attribute.String("signal.tenant", tenant), - p.signalTypeAttr(), - } - attrs = append(attrs, opts...) +// proxyRequestsMetricAdd adds 1 to the proxy requests metric with common attributes. +func (p *Processor[T]) proxyRequestsMetricAdd(ctx context.Context, tenant string, attrs []attribute.KeyValue) { + attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr()) p.proxyRequestsMetric.Add( ctx, 1, @@ -157,12 +153,9 @@ func (p *Processor[T]) proxyRequestsMetricAdd(ctx context.Context, tenant string ) } -func (p *Processor[T]) proxyLatencyMetricRecord(ctx context.Context, tenant string, latency int64, opts ...attribute.KeyValue) { - attrs := []attribute.KeyValue{ - attribute.String("signal.tenant", tenant), - p.signalTypeAttr(), - } - attrs = append(attrs, opts...) +// proxyLatencyMetricRecord records the given latency to the proxy latency metric with common attributes. +func (p *Processor[T]) proxyLatencyMetricRecord(ctx context.Context, tenant string, latency int64, attrs []attribute.KeyValue) { + attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr()) p.proxyLatencyMetric.Record( ctx, latency, @@ -184,52 +177,15 @@ func (p *Processor[T]) Partition(ctx context.Context, resources []T) map[string] tenantMap := make(map[string][]T) for _, resourceData := range resources { - resource := p.getResource(resourceData) - logger.Trace( - ctx, - p.logger, - fmt.Sprintf("%+v", resource), - p.signalTypeLogAttr(), - ) - - tenant := "" - - // First, check for the dedicated tenant label - if p.config.Tenant.Label != "" { - for _, attr := range resource.GetAttributes() { - if attr.GetKey() == p.config.Tenant.Label { - tenant = attr.GetValue().GetStringValue() - break - } - } - } - - // If not found and we have additional labels, check those - if tenant == "" && len(p.config.Tenant.Labels) > 0 { - for _, attr := range resource.GetAttributes() { - if slices.Contains(p.config.Tenant.Labels, attr.GetKey()) { - tenant = attr.GetValue().GetStringValue() - break - } - } - } - + tenant := p.extractTenantFromResource(resourceData) if tenant == "" { - if p.config.Tenant.Default == "" { - logger.Warn( - ctx, - p.logger, - "No tenant found in attributes and no default tenant configured", - p.signalTypeLogAttr(), - ) - continue - } - - tenant = p.config.Tenant.Default - resource.Attributes = append(resource.Attributes, &commonpb.KeyValue{ - Key: p.config.Tenant.Label, - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: tenant}}, - }) + logger.Warn( + ctx, + p.logger, + "No tenant found in attributes and no default tenant configured", + p.signalTypeLogAttr(), + ) + continue } tenantMap[tenant] = append(tenantMap[tenant], resourceData) @@ -263,7 +219,7 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e resp, err := p.send(ctx, tenant, resources) if err != nil { - p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources))) + p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)), nil) logger.Error( ctx, @@ -282,9 +238,18 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e strconv.Itoa(resp.StatusCode), ) - p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)), signalResponseStatusCodeAttr) + p.proxyRecordsMetricAdd( + ctx, + tenant, + int64(len(resources)), + []attribute.KeyValue{signalResponseStatusCodeAttr}, + ) - p.proxyRequestsMetricAdd(ctx, tenant, signalResponseStatusCodeAttr) + p.proxyRequestsMetricAdd( + ctx, + tenant, + []attribute.KeyValue{signalResponseStatusCodeAttr}, + ) logger.Debug( ctx, @@ -414,7 +379,53 @@ func (p *Processor[T]) send( span.SetAttributes(signalResponseStatusCodeAttr) span.SetStatus(codes.Ok, "sent successfully") - p.proxyLatencyMetricRecord(ctx, tenant, time.Since(start).Milliseconds(), signalResponseStatusCodeAttr) + p.proxyLatencyMetricRecord( + ctx, + tenant, + time.Since(start).Milliseconds(), + []attribute.KeyValue{signalResponseStatusCodeAttr}, + ) return *resp, nil } + +// extractTenantFromResource extracts the tenant information from the resource attributes +// based on the configured tenant labels and returns it. +func (p *Processor[T]) extractTenantFromResource(resourceData T) string { + tenant := "" + resource := p.getResource(resourceData) + + // First, check for the dedicated tenant label + if p.config.Tenant.Label != "" { + for _, attr := range resource.GetAttributes() { + if attr.GetKey() == p.config.Tenant.Label { + tenant = attr.GetValue().GetStringValue() + break + } + } + } + + // If not found and we have additional labels, check those + if tenant == "" && len(p.config.Tenant.Labels) > 0 { + for _, attr := range resource.GetAttributes() { + if slices.Contains(p.config.Tenant.Labels, attr.GetKey()) { + tenant = attr.GetValue().GetStringValue() + break + } + } + } + + if tenant == "" { + if p.config.Tenant.Default == "" { + return "" + } + + tenant = p.config.Tenant.Default + resource.Attributes = append(resource.Attributes, &commonpb.KeyValue{ + Key: p.config.Tenant.Label, + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: tenant}}, + }) + } + + return tenant +} diff --git a/internal/processor/processor_test.go b/internal/processor/processor_test.go index d16b341..673ccd8 100644 --- a/internal/processor/processor_test.go +++ b/internal/processor/processor_test.go @@ -110,6 +110,202 @@ func TestNew(t *testing.T) { } } +func TestExtractTenantFromResource(t *testing.T) { + tests := []struct { + name string + resource *logpb.ResourceLogs + config *config.Config + expectedTenant string + expectModified bool // whether the resource should be modified with default tenant + }{ + { + name: "extract from primary label", + resource: &logpb.ResourceLogs{ + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-a"}}}, + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "my-service"}}}, + }, + }, + }, + config: &config.Config{ + Tenant: config.Tenant{ + Label: "tenant.id", + Default: "default", + }, + }, + expectedTenant: "tenant-a", + expectModified: false, + }, + { + name: "extract from first secondary label", + resource: &logpb.ResourceLogs{ + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenantId", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-b"}}}, + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "my-service"}}}, + }, + }, + }, + config: &config.Config{ + Tenant: config.Tenant{ + Label: "tenant.id", + Labels: []string{"tenantId", "tenant_id"}, + Default: "default", + }, + }, + expectedTenant: "tenant-b", + expectModified: false, + }, + { + name: "extract from second secondary label", + resource: &logpb.ResourceLogs{ + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenant_id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-c"}}}, + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "my-service"}}}, + }, + }, + }, + config: &config.Config{ + Tenant: config.Tenant{ + Label: "tenant.id", + Labels: []string{"tenantId", "tenant_id"}, + Default: "default", + }, + }, + expectedTenant: "tenant-c", + expectModified: false, + }, + { + name: "use default tenant when no tenant attribute", + resource: &logpb.ResourceLogs{ + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "my-service"}}}, + }, + }, + }, + config: &config.Config{ + Tenant: config.Tenant{ + Label: "tenant.id", + Default: "shared", + }, + }, + expectedTenant: "shared", + expectModified: true, + }, + { + name: "return empty when no tenant and no default", + resource: &logpb.ResourceLogs{ + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "my-service"}}}, + }, + }, + }, + config: &config.Config{ + Tenant: config.Tenant{ + Label: "tenant.id", + Default: "", + }, + }, + expectedTenant: "", + expectModified: false, + }, + { + name: "primary label takes precedence over secondary labels", + resource: &logpb.ResourceLogs{ + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "primary-tenant"}}}, + {Key: "tenantId", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "secondary-tenant"}}}, + }, + }, + }, + config: &config.Config{ + Tenant: config.Tenant{ + Label: "tenant.id", + Labels: []string{"tenantId"}, + Default: "default", + }, + }, + expectedTenant: "primary-tenant", + expectModified: false, + }, + { + name: "empty primary label not configured", + resource: &logpb.ResourceLogs{ + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenantId", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-d"}}}, + }, + }, + }, + config: &config.Config{ + Tenant: config.Tenant{ + Label: "", + Labels: []string{"tenantId"}, + Default: "default", + }, + }, + expectedTenant: "tenant-d", + expectModified: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := noop.NewLoggerProvider().Logger("test") + meter := noopmetric.NewMeterProvider().Meter("test") + tracer := nooptrace.NewTracerProvider().Tracer("test") + + getResource := func(rl *logpb.ResourceLogs) *resourcepb.Resource { + return rl.GetResource() + } + marshalResources := func(resources []*logpb.ResourceLogs) ([]byte, error) { + return []byte{}, nil + } + + proc, err := New( + tt.config, + &config.Endpoint{Address: "http://localhost:3100"}, + "logs", + &http.Client{}, + logger, + meter, + tracer, + getResource, + marshalResources, + ) + require.NoError(t, err) + + originalAttrCount := len(tt.resource.Resource.Attributes) + tenant := proc.extractTenantFromResource(tt.resource) + + assert.Equal(t, tt.expectedTenant, tenant) + + if tt.expectModified { + // Should have added the tenant label to attributes + assert.Equal(t, originalAttrCount+1, len(tt.resource.Resource.Attributes)) + // Verify the added attribute + found := false + for _, attr := range tt.resource.Resource.Attributes { + if attr.Key == tt.config.Tenant.Label { + assert.Equal(t, tt.expectedTenant, attr.GetValue().GetStringValue()) + found = true + break + } + } + assert.True(t, found, "tenant label should be added to attributes") + } else { + // Should not have modified the attributes + assert.Equal(t, originalAttrCount, len(tt.resource.Resource.Attributes)) + } + }) + } +} + func TestPartition(t *testing.T) { tests := []struct { name string @@ -118,7 +314,18 @@ func TestPartition(t *testing.T) { expectedTenants map[string]int // tenant -> number of resources }{ { - name: "single tenant with primary label", + name: "empty resources returns empty map", + resources: []*logpb.ResourceLogs{}, + config: &config.Config{ + Tenant: config.Tenant{ + Label: "tenant.id", + Default: "default", + }, + }, + expectedTenants: map[string]int{}, + }, + { + name: "single tenant groups all resources together", resources: []*logpb.ResourceLogs{ { Resource: &resourcepb.Resource{ @@ -134,6 +341,13 @@ func TestPartition(t *testing.T) { }, }, }, + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-a"}}}, + }, + }, + }, }, config: &config.Config{ Tenant: config.Tenant{ @@ -142,11 +356,11 @@ func TestPartition(t *testing.T) { }, }, expectedTenants: map[string]int{ - "tenant-a": 2, + "tenant-a": 3, }, }, { - name: "multiple tenants with primary label", + name: "multiple tenants partition resources correctly", resources: []*logpb.ResourceLogs{ { Resource: &resourcepb.Resource{ @@ -169,25 +383,17 @@ func TestPartition(t *testing.T) { }, }, }, - }, - config: &config.Config{ - Tenant: config.Tenant{ - Label: "tenant.id", - Default: "default", + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-c"}}}, + }, + }, }, - }, - expectedTenants: map[string]int{ - "tenant-a": 2, - "tenant-b": 1, - }, - }, - { - name: "fallback to secondary label", - resources: []*logpb.ResourceLogs{ { Resource: &resourcepb.Resource{ Attributes: []*commonpb.KeyValue{ - {Key: "tenantId", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-c"}}}, + {Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-b"}}}, }, }, }, @@ -195,17 +401,25 @@ func TestPartition(t *testing.T) { config: &config.Config{ Tenant: config.Tenant{ Label: "tenant.id", - Labels: []string{"tenantId", "tenant_id"}, Default: "default", }, }, expectedTenants: map[string]int{ + "tenant-a": 2, + "tenant-b": 2, "tenant-c": 1, }, }, { - name: "use default tenant when no tenant attribute", + name: "resources without tenant are skipped", resources: []*logpb.ResourceLogs{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-a"}}}, + }, + }, + }, { Resource: &resourcepb.Resource{ Attributes: []*commonpb.KeyValue{ @@ -213,24 +427,39 @@ func TestPartition(t *testing.T) { }, }, }, + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-b"}}}, + }, + }, + }, }, config: &config.Config{ Tenant: config.Tenant{ Label: "tenant.id", - Default: "shared", + Default: "", // No default, so resource without tenant should be skipped }, }, expectedTenants: map[string]int{ - "shared": 1, + "tenant-a": 1, + "tenant-b": 1, }, }, { - name: "skip resource when no tenant and no default", + name: "resources with default tenant are grouped", resources: []*logpb.ResourceLogs{ { Resource: &resourcepb.Resource{ Attributes: []*commonpb.KeyValue{ - {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "my-service"}}}, + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "service-1"}}}, + }, + }, + }, + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "service-2"}}}, }, }, }, @@ -238,10 +467,12 @@ func TestPartition(t *testing.T) { config: &config.Config{ Tenant: config.Tenant{ Label: "tenant.id", - Default: "", + Default: "shared", }, }, - expectedTenants: map[string]int{}, + expectedTenants: map[string]int{ + "shared": 2, + }, }, }