Skip to content

Commit 040e892

Browse files
authored
Merge pull request #48 from matt-gp/err-group
Switched From WaitGroup -> ErrorGroup
2 parents d302b4a + e2ac1e9 commit 040e892

File tree

6 files changed

+178
-15
lines changed

6 files changed

+178
-15
lines changed

CONTRIBUTING.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,26 @@ if err != nil {
295295
logger.Error("Failed to get data", "id", id, "error", err)
296296
return nil, err
297297
}
298+
299+
// Good: Check HTTP status codes and return errors for failures
300+
if resp.StatusCode >= http.StatusBadRequest {
301+
logger.Error(
302+
ctx,
303+
logger,
304+
fmt.Sprintf("received non-success status code: %d", resp.StatusCode),
305+
log.String("status_code", strconv.Itoa(resp.StatusCode)),
306+
)
307+
return fmt.Errorf("received non-success status code: %d", resp.StatusCode)
308+
}
298309
```
299310

311+
**HTTP Error Handling:**
312+
- Treat all HTTP status codes >= 400 as errors
313+
- Log errors with relevant context (tenant, signal type, status code)
314+
- Record errors in distributed tracing spans
315+
- Return errors to enable retry logic in upstream systems
316+
- Always record metrics even for failed requests
317+
300318
### Logging
301319

302320
Use structured logging:

README.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func (h *Handlers) Logs(w http.ResponseWriter, r *http.Request) {
301301
**Processor Package (`internal/processor/`):**
302302
- `New[T ResourceData]()` - Create generic processor with signal-specific callbacks
303303
- `Partition(ctx, resources)` - Partition resources by tenant from resource attributes
304-
- `Dispatch(ctx, tenantMap)` - Concurrent forwarding to backend with tenant headers
304+
- `Dispatch(ctx, tenantMap)` - Concurrent forwarding to backend with tenant headers; returns error if any backend responds with status >= 400
305305
- `send(ctx, tenant, resources)` - HTTP client with protobuf marshaling and metrics
306306

307307
**Handler Package (`internal/handler/`):**
@@ -564,6 +564,19 @@ When forwarding data to observability backends:
564564
5. Content-Type is set to `application/x-protobuf`
565565
6. Original protobuf format is preserved with proper headers via `addHeaders()`
566566

567+
### Error Handling
568+
569+
The proxy implements robust error handling for backend responses:
570+
571+
- **Success Responses (< 400)**: Data is successfully forwarded and metrics are recorded with the response status code
572+
- **Error Responses (>= 400)**: The proxy treats all HTTP status codes of 400 or higher as errors:
573+
- An error is logged with the status code, tenant, and signal type
574+
- The request is marked as failed in distributed tracing
575+
- An error is returned to the caller, which may trigger retry logic in upstream collectors
576+
- Metrics are still recorded with the error status code for observability
577+
578+
This ensures that client errors (4xx) and server errors (5xx) from the backend are properly surfaced and can be monitored through the proxy's own telemetry.
579+
567580
## Observability
568581

569582
The service exposes metrics about its operation:
@@ -802,7 +815,7 @@ go tool cover -func=coverage.out
802815
**Processor Tests (`internal/processor/processor_test.go`):**
803816
- `TestNew` - Processor creation with various configurations
804817
- `TestPartition` - Tenant partitioning logic with primary/fallback labels and defaults
805-
- `TestDispatch` - Concurrent request dispatching to multiple tenants
818+
- `TestDispatch` - Concurrent request dispatching to multiple tenants with error handling for HTTP status >= 400
806819
- `TestSend` - Individual HTTP request handling with error scenarios
807820

808821
**Handler Tests (`internal/handler/handlers_test.go`):**

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ require (
2424
go.uber.org/mock v0.6.0
2525
)
2626

27+
require golang.org/x/sync v0.20.0
28+
2729
require (
2830
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
2931
github.com/cespare/xxhash/v2 v2.3.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
7979
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
8080
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
8181
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
82+
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
83+
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
8284
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
8385
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
8486
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=

internal/processor/processor.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"net/http"
1010
"slices"
1111
"strconv"
12-
"sync"
1312
"time"
1413

1514
"github.com/matt-gp/otel-lgtm-proxy/internal/config"
@@ -26,6 +25,7 @@ import (
2625
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
2726
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
2827
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
28+
"golang.org/x/sync/errgroup"
2929
)
3030

3131
// Client is an interface for making HTTP requests.
@@ -197,7 +197,7 @@ func (p *Processor[T]) Partition(ctx context.Context, resources []T) map[string]
197197

198198
// Dispatch sends all the requests to the target.
199199
func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) error {
200-
wg := sync.WaitGroup{}
200+
errGroup, ctx := errgroup.WithContext(ctx)
201201

202202
for tenant, resources := range tenantMap {
203203
ctx, span := p.tracer.Start(
@@ -210,13 +210,8 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
210210
)
211211
defer span.End()
212212

213-
wg.Add(1)
214-
215-
go func(tenant string, resources []T) {
216-
defer wg.Done()
217-
213+
errGroup.Go(func() error {
218214
tenantAttribute := attribute.String("signal.tenant", tenant)
219-
220215
resp, err := p.send(ctx, tenant, resources)
221216
if err != nil {
222217
p.proxyRecordsMetricAdd(ctx, tenant, int64(len(resources)), nil)
@@ -230,7 +225,7 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
230225

231226
span.RecordError(err)
232227
span.SetStatus(codes.Error, "failed to send")
233-
return
228+
return err
234229
}
235230

236231
signalResponseStatusCodeAttr := attribute.String(
@@ -251,6 +246,20 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
251246
[]attribute.KeyValue{signalResponseStatusCodeAttr},
252247
)
253248

249+
if resp.StatusCode >= http.StatusBadRequest {
250+
logger.Error(
251+
ctx,
252+
p.logger,
253+
fmt.Sprintf("received non-success status code: %d", resp.StatusCode),
254+
p.signalTypeLogAttr(),
255+
log.KeyValueFromAttribute(tenantAttribute),
256+
log.KeyValueFromAttribute(signalResponseStatusCodeAttr),
257+
)
258+
259+
span.SetStatus(codes.Error, fmt.Sprintf("received non-success status code: %d", resp.StatusCode))
260+
return fmt.Errorf("received non-success status code: %d", resp.StatusCode)
261+
}
262+
254263
logger.Debug(
255264
ctx,
256265
p.logger,
@@ -273,12 +282,11 @@ func (p *Processor[T]) Dispatch(ctx context.Context, tenantMap map[string][]T) e
273282
)
274283

275284
span.SetStatus(codes.Ok, "sent successfully")
276-
}(tenant, resources)
285+
return nil
286+
})
277287
}
278288

279-
wg.Wait()
280-
281-
return nil
289+
return errGroup.Wait()
282290
}
283291

284292
// send sends an individual request to the target.

internal/processor/processor_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,126 @@ func TestDispatch(t *testing.T) {
580580
},
581581
wantErr: false,
582582
},
583+
{
584+
name: "http 400 bad request should return error",
585+
tenantMap: map[string][]*logpb.ResourceLogs{
586+
"tenant-a": {
587+
{
588+
Resource: &resourcepb.Resource{
589+
Attributes: []*commonpb.KeyValue{
590+
{Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-a"}}},
591+
},
592+
},
593+
},
594+
},
595+
},
596+
mockResponses: []struct {
597+
statusCode int
598+
body string
599+
err error
600+
}{
601+
{statusCode: http.StatusBadRequest, body: "bad request", err: nil},
602+
},
603+
wantErr: true,
604+
},
605+
{
606+
name: "http 404 not found should return error",
607+
tenantMap: map[string][]*logpb.ResourceLogs{
608+
"tenant-a": {
609+
{
610+
Resource: &resourcepb.Resource{
611+
Attributes: []*commonpb.KeyValue{
612+
{Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-a"}}},
613+
},
614+
},
615+
},
616+
},
617+
},
618+
mockResponses: []struct {
619+
statusCode int
620+
body string
621+
err error
622+
}{
623+
{statusCode: http.StatusNotFound, body: "not found", err: nil},
624+
},
625+
wantErr: true,
626+
},
627+
{
628+
name: "http 500 internal server error should return error",
629+
tenantMap: map[string][]*logpb.ResourceLogs{
630+
"tenant-a": {
631+
{
632+
Resource: &resourcepb.Resource{
633+
Attributes: []*commonpb.KeyValue{
634+
{Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-a"}}},
635+
},
636+
},
637+
},
638+
},
639+
},
640+
mockResponses: []struct {
641+
statusCode int
642+
body string
643+
err error
644+
}{
645+
{statusCode: http.StatusInternalServerError, body: "server error", err: nil},
646+
},
647+
wantErr: true,
648+
},
649+
{
650+
name: "http 503 service unavailable should return error",
651+
tenantMap: map[string][]*logpb.ResourceLogs{
652+
"tenant-a": {
653+
{
654+
Resource: &resourcepb.Resource{
655+
Attributes: []*commonpb.KeyValue{
656+
{Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-a"}}},
657+
},
658+
},
659+
},
660+
},
661+
},
662+
mockResponses: []struct {
663+
statusCode int
664+
body string
665+
err error
666+
}{
667+
{statusCode: http.StatusServiceUnavailable, body: "service unavailable", err: nil},
668+
},
669+
wantErr: true,
670+
},
671+
{
672+
name: "multiple tenants with one failing should return error",
673+
tenantMap: map[string][]*logpb.ResourceLogs{
674+
"tenant-a": {
675+
{
676+
Resource: &resourcepb.Resource{
677+
Attributes: []*commonpb.KeyValue{
678+
{Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-a"}}},
679+
},
680+
},
681+
},
682+
},
683+
"tenant-b": {
684+
{
685+
Resource: &resourcepb.Resource{
686+
Attributes: []*commonpb.KeyValue{
687+
{Key: "tenant.id", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "tenant-b"}}},
688+
},
689+
},
690+
},
691+
},
692+
},
693+
mockResponses: []struct {
694+
statusCode int
695+
body string
696+
err error
697+
}{
698+
{statusCode: http.StatusOK, body: "ok", err: nil},
699+
{statusCode: http.StatusBadRequest, body: "bad request", err: nil},
700+
},
701+
wantErr: true,
702+
},
583703
}
584704

585705
for _, tt := range tests {

0 commit comments

Comments
 (0)