Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 78 additions & 67 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,46 +123,39 @@ func New[T ResourceData](
}, nil
}

// signalTypeAttr returns a common attribute for the signal type to be used in metrics and traces.
func (p *Processor[T]) signalTypeAttr() attribute.KeyValue {
return attribute.String("signal.type", p.signalType)
}

// signalTypeLogAttr returns a common log attribute for the signal type to be used in logs.
func (p *Processor[T]) signalTypeLogAttr() log.KeyValue {
return log.String("signal.type", p.signalType)
}

func (p *Processor[T]) proxyRecordsMetricAdd(ctx context.Context, tenant string, count int64, opts ...attribute.KeyValue) {
attrs := []attribute.KeyValue{
attribute.String("signal.tenant", tenant),
p.signalTypeAttr(),
}
attrs = append(attrs, opts...)
// proxyRecordsMetricAdd adds the given count to the proxy records metric with common attributes.
func (p *Processor[T]) proxyRecordsMetricAdd(ctx context.Context, tenant string, count int64, attrs []attribute.KeyValue) {
attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr())
p.proxyRecordsMetric.Add(
ctx,
count,
metric.WithAttributes(attrs...),
)
}

func (p *Processor[T]) proxyRequestsMetricAdd(ctx context.Context, tenant string, opts ...attribute.KeyValue) {
attrs := []attribute.KeyValue{
attribute.String("signal.tenant", tenant),
p.signalTypeAttr(),
}
attrs = append(attrs, opts...)
// proxyRequestsMetricAdd adds 1 to the proxy requests metric with common attributes.
func (p *Processor[T]) proxyRequestsMetricAdd(ctx context.Context, tenant string, attrs []attribute.KeyValue) {
attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr())
p.proxyRequestsMetric.Add(
ctx,
1,
metric.WithAttributes(attrs...),
)
}

func (p *Processor[T]) proxyLatencyMetricRecord(ctx context.Context, tenant string, latency int64, opts ...attribute.KeyValue) {
attrs := []attribute.KeyValue{
attribute.String("signal.tenant", tenant),
p.signalTypeAttr(),
}
attrs = append(attrs, opts...)
// proxyLatencyMetricRecord records the given latency to the proxy latency metric with common attributes.
func (p *Processor[T]) proxyLatencyMetricRecord(ctx context.Context, tenant string, latency int64, attrs []attribute.KeyValue) {
attrs = append(attrs, attribute.String("signal.tenant", tenant), p.signalTypeAttr())
p.proxyLatencyMetric.Record(
ctx,
latency,
Expand All @@ -184,52 +177,15 @@ func (p *Processor[T]) Partition(ctx context.Context, resources []T) map[string]
tenantMap := make(map[string][]T)

for _, resourceData := range resources {
resource := p.getResource(resourceData)
logger.Trace(
ctx,
p.logger,
fmt.Sprintf("%+v", resource),
p.signalTypeLogAttr(),
)

tenant := ""

// First, check for the dedicated tenant label
if p.config.Tenant.Label != "" {
for _, attr := range resource.GetAttributes() {
if attr.GetKey() == p.config.Tenant.Label {
tenant = attr.GetValue().GetStringValue()
break
}
}
}

// If not found and we have additional labels, check those
if tenant == "" && len(p.config.Tenant.Labels) > 0 {
for _, attr := range resource.GetAttributes() {
if slices.Contains(p.config.Tenant.Labels, attr.GetKey()) {
tenant = attr.GetValue().GetStringValue()
break
}
}
}

tenant := p.extractTenantFromResource(resourceData)
if tenant == "" {
if p.config.Tenant.Default == "" {
logger.Warn(
ctx,
p.logger,
"No tenant found in attributes and no default tenant configured",
p.signalTypeLogAttr(),
)
continue
}

tenant = p.config.Tenant.Default
resource.Attributes = append(resource.Attributes, &commonpb.KeyValue{
Key: p.config.Tenant.Label,
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: tenant}},
})
logger.Warn(
ctx,
p.logger,
"No tenant found in attributes and no default tenant configured",
p.signalTypeLogAttr(),
)
continue
}

tenantMap[tenant] = append(tenantMap[tenant], resourceData)
Expand Down Expand Up @@ -263,7 +219,7 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e

resp, err := p.send(ctx, tenant, resources)
if err != nil {
p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)))
p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)), nil)

logger.Error(
ctx,
Expand All @@ -282,9 +238,18 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
strconv.Itoa(resp.StatusCode),
)

p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)), signalResponseStatusCodeAttr)
p.proxyRecordsMetricAdd(
ctx,
tenant,
int64(len(resources)),
[]attribute.KeyValue{signalResponseStatusCodeAttr},
)

p.proxyRequestsMetricAdd(ctx, tenant, signalResponseStatusCodeAttr)
p.proxyRequestsMetricAdd(
ctx,
tenant,
[]attribute.KeyValue{signalResponseStatusCodeAttr},
)

logger.Debug(
ctx,
Expand Down Expand Up @@ -414,7 +379,53 @@ func (p *Processor[T]) send(
span.SetAttributes(signalResponseStatusCodeAttr)
span.SetStatus(codes.Ok, "sent successfully")

p.proxyLatencyMetricRecord(ctx, tenant, time.Since(start).Milliseconds(), signalResponseStatusCodeAttr)
p.proxyLatencyMetricRecord(
ctx,
tenant,
time.Since(start).Milliseconds(),
[]attribute.KeyValue{signalResponseStatusCodeAttr},
)

return *resp, nil
}

// extractTenantFromResource extracts the tenant information from the resource attributes
// based on the configured tenant labels and returns it.
func (p *Processor[T]) extractTenantFromResource(resourceData T) string {
tenant := ""
resource := p.getResource(resourceData)

// First, check for the dedicated tenant label
if p.config.Tenant.Label != "" {
for _, attr := range resource.GetAttributes() {
if attr.GetKey() == p.config.Tenant.Label {
tenant = attr.GetValue().GetStringValue()
break
}
}
}

// If not found and we have additional labels, check those
if tenant == "" && len(p.config.Tenant.Labels) > 0 {
for _, attr := range resource.GetAttributes() {
if slices.Contains(p.config.Tenant.Labels, attr.GetKey()) {
tenant = attr.GetValue().GetStringValue()
break
}
}
}

if tenant == "" {
if p.config.Tenant.Default == "" {
return ""
}

tenant = p.config.Tenant.Default
resource.Attributes = append(resource.Attributes, &commonpb.KeyValue{
Key: p.config.Tenant.Label,
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: tenant}},
})
}

return tenant
}
Loading
Loading