diff --git a/cmd/main.go b/cmd/main.go index 2c4f459..d187c1e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -48,7 +48,7 @@ func main() { defer stop() // Initialize handlers - h := handler.New( + h, err := handler.New( cfg, &http.Client{Timeout: cfg.Logs.Timeout}, &http.Client{Timeout: cfg.Metrics.Timeout}, @@ -57,6 +57,10 @@ func main() { meterProvider, tracerProvider, ) + if err != nil { + logger.Error(ctx, loggingProvider, err.Error()) + os.Exit(1) + } // Initialize HTTP router router := http.NewServeMux() diff --git a/internal/handler/handlers.go b/internal/handler/handlers.go index 484c6f7..a3bee8a 100644 --- a/internal/handler/handlers.go +++ b/internal/handler/handlers.go @@ -4,20 +4,28 @@ package handler import ( "github.com/matt-gp/otel-lgtm-proxy/internal/config" "github.com/matt-gp/otel-lgtm-proxy/internal/processor" + "github.com/matt-gp/otel-lgtm-proxy/internal/util/proto" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" + logpb "go.opentelemetry.io/proto/otlp/logs/v1" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" + resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" + tracepb "go.opentelemetry.io/proto/otlp/trace/v1" ) // Handlers contains the dependencies needed for all OTLP signal handlers. type Handlers struct { - config *config.Config - logsClient processor.Client - metricsClient processor.Client - tracesClient processor.Client - logger log.Logger - meter metric.Meter - tracer trace.Tracer + config *config.Config + logsClient processor.Client + metricsClient processor.Client + tracesClient processor.Client + logger log.Logger + meter metric.Meter + tracer trace.Tracer + logsProcessor processor.Processor[*logpb.ResourceLogs] + metricsProcessor processor.Processor[*metricpb.ResourceMetrics] + tracesProcessor processor.Processor[*tracepb.ResourceSpans] } // New creates a new Handlers instance. @@ -29,14 +37,86 @@ func New( logger log.Logger, meter metric.Meter, tracer trace.Tracer, -) *Handlers { - return &Handlers{ - config: config, - logsClient: logsClient, - metricsClient: metricsClient, - tracesClient: tracesClient, - logger: logger, - meter: meter, - tracer: tracer, +) (*Handlers, error) { + // Create logs processor + logsProcessor, err := processor.New( + config, + &config.Logs, + "logs", + logsClient, + logger, + meter, + tracer, + func(rl *logpb.ResourceLogs) *resourcepb.Resource { + return rl.GetResource() + }, + func(resources []*logpb.ResourceLogs) ([]byte, error) { + data := &logpb.LogsData{ + ResourceLogs: resources, + } + return proto.Marshal(data) + }, + ) + if err != nil { + return nil, err + } + + // Create metrics processor + metricsProcessor, err := processor.New( + config, + &config.Metrics, + "metrics", + metricsClient, + logger, + meter, + tracer, + func(rm *metricpb.ResourceMetrics) *resourcepb.Resource { + return rm.GetResource() + }, + func(resources []*metricpb.ResourceMetrics) ([]byte, error) { + data := &metricpb.MetricsData{ + ResourceMetrics: resources, + } + return proto.Marshal(data) + }, + ) + if err != nil { + return nil, err } + + // Create traces processor + tracesProcessor, err := processor.New( + config, + &config.Traces, + "traces", + tracesClient, + logger, + meter, + tracer, + func(rs *tracepb.ResourceSpans) *resourcepb.Resource { + return rs.GetResource() + }, + func(resources []*tracepb.ResourceSpans) ([]byte, error) { + data := &tracepb.TracesData{ + ResourceSpans: resources, + } + return proto.Marshal(data) + }, + ) + if err != nil { + return nil, err + } + + return &Handlers{ + config: config, + logsClient: logsClient, + metricsClient: metricsClient, + tracesClient: tracesClient, + logger: logger, + meter: meter, + tracer: tracer, + logsProcessor: *logsProcessor, + metricsProcessor: *metricsProcessor, + tracesProcessor: *tracesProcessor, + }, nil } diff --git a/internal/handler/handlers_test.go b/internal/handler/handlers_test.go index 7a84bda..03d30ba 100644 --- a/internal/handler/handlers_test.go +++ b/internal/handler/handlers_test.go @@ -18,18 +18,29 @@ func TestNew(t *testing.T) { logsClient *http.Client metricsClient *http.Client tracesClient *http.Client + wantErr bool }{ { - name: "creates handlers with all dependencies", + name: "creates handlers with all dependencies and processors", config: &config.Config{ Tenant: config.Tenant{ Label: "tenant.id", Default: "default", }, + Logs: config.Endpoint{ + Address: "http://localhost:3100", + }, + Metrics: config.Endpoint{ + Address: "http://localhost:9009", + }, + Traces: config.Endpoint{ + Address: "http://localhost:4318", + }, }, logsClient: &http.Client{}, metricsClient: &http.Client{}, tracesClient: &http.Client{}, + wantErr: false, }, } @@ -39,7 +50,7 @@ func TestNew(t *testing.T) { meter := noopmetric.NewMeterProvider().Meter("test") tracer := nooptrace.NewTracerProvider().Tracer("test") - handlers := New( + handlers, err := New( tt.config, tt.logsClient, tt.metricsClient, @@ -49,14 +60,24 @@ func TestNew(t *testing.T) { tracer, ) - assert.NotNil(t, handlers) - assert.Equal(t, tt.config, handlers.config) - assert.Equal(t, tt.logsClient, handlers.logsClient) - assert.Equal(t, tt.metricsClient, handlers.metricsClient) - assert.Equal(t, tt.tracesClient, handlers.tracesClient) - assert.NotNil(t, handlers.logger) - assert.NotNil(t, handlers.meter) - assert.NotNil(t, handlers.tracer) + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, handlers) + } else { + assert.NoError(t, err) + assert.NotNil(t, handlers) + assert.Equal(t, tt.config, handlers.config) + assert.Equal(t, tt.logsClient, handlers.logsClient) + assert.Equal(t, tt.metricsClient, handlers.metricsClient) + assert.Equal(t, tt.tracesClient, handlers.tracesClient) + assert.NotNil(t, handlers.logger) + assert.NotNil(t, handlers.meter) + assert.NotNil(t, handlers.tracer) + // Verify processors were created + assert.NotNil(t, handlers.logsProcessor) + assert.NotNil(t, handlers.metricsProcessor) + assert.NotNil(t, handlers.tracesProcessor) + } }) } } diff --git a/internal/handler/logs.go b/internal/handler/logs.go index e0ef08c..16c435d 100644 --- a/internal/handler/logs.go +++ b/internal/handler/logs.go @@ -5,13 +5,11 @@ import ( "net/http" "github.com/matt-gp/otel-lgtm-proxy/internal/logger" - "github.com/matt-gp/otel-lgtm-proxy/internal/processor" "github.com/matt-gp/otel-lgtm-proxy/internal/util/proto" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" logpb "go.opentelemetry.io/proto/otlp/logs/v1" - resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" ) // Logs handles incoming OTLP log requests. @@ -23,6 +21,7 @@ func (h *Handlers) Logs(w http.ResponseWriter, r *http.Request) { ) defer span.End() + // Unmarshal the incoming log data data, err := proto.Unmarshal(r, &logpb.LogsData{}) if err != nil { logger.Error(ctx, h.logger, err.Error()) @@ -32,35 +31,8 @@ func (h *Handlers) Logs(w http.ResponseWriter, r *http.Request) { return } - // Create processor for this request - proc, err := processor.New( - h.config, - &h.config.Logs, - "logs", - h.logsClient, - h.logger, - h.meter, - h.tracer, - func(rl *logpb.ResourceLogs) *resourcepb.Resource { - return rl.GetResource() - }, - func(resources []*logpb.ResourceLogs) ([]byte, error) { - data := &logpb.LogsData{ - ResourceLogs: resources, - } - return proto.Marshal(data) - }, - ) - if err != nil { - logger.Error(ctx, h.logger, err.Error()) - http.Error(w, err.Error(), http.StatusInternalServerError) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return - } - // Process the log data - if err := proc.Dispatch(ctx, proc.Partition(ctx, data.GetResourceLogs())); err != nil { + if err := h.logsProcessor.Dispatch(ctx, h.logsProcessor.Partition(ctx, data.GetResourceLogs())); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return diff --git a/internal/handler/metrics.go b/internal/handler/metrics.go index ced0f65..3b67886 100644 --- a/internal/handler/metrics.go +++ b/internal/handler/metrics.go @@ -5,13 +5,11 @@ import ( "net/http" "github.com/matt-gp/otel-lgtm-proxy/internal/logger" - "github.com/matt-gp/otel-lgtm-proxy/internal/processor" "github.com/matt-gp/otel-lgtm-proxy/internal/util/proto" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" - resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" ) // Metrics handles incoming OTLP metric requests. @@ -23,6 +21,7 @@ func (h *Handlers) Metrics(w http.ResponseWriter, r *http.Request) { ) defer span.End() + // Unmarshal the incoming metric data data, err := proto.Unmarshal(r, &metricpb.MetricsData{}) if err != nil { logger.Error(ctx, h.logger, err.Error()) @@ -32,35 +31,8 @@ func (h *Handlers) Metrics(w http.ResponseWriter, r *http.Request) { return } - // Create processor for this request - proc, err := processor.New( - h.config, - &h.config.Metrics, - "metrics", - h.metricsClient, - h.logger, - h.meter, - h.tracer, - func(rm *metricpb.ResourceMetrics) *resourcepb.Resource { - return rm.GetResource() - }, - func(resources []*metricpb.ResourceMetrics) ([]byte, error) { - data := &metricpb.MetricsData{ - ResourceMetrics: resources, - } - return proto.Marshal(data) - }, - ) - if err != nil { - logger.Error(ctx, h.logger, err.Error()) - http.Error(w, err.Error(), http.StatusInternalServerError) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return - } - // Process the metric data - if err := proc.Dispatch(ctx, proc.Partition(ctx, data.GetResourceMetrics())); err != nil { + if err := h.metricsProcessor.Dispatch(ctx, h.metricsProcessor.Partition(ctx, data.GetResourceMetrics())); err != nil { logger.Error(ctx, h.logger, err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) span.RecordError(err) diff --git a/internal/handler/traces.go b/internal/handler/traces.go index 13a5d4d..fd2cc8f 100644 --- a/internal/handler/traces.go +++ b/internal/handler/traces.go @@ -5,12 +5,10 @@ import ( "net/http" "github.com/matt-gp/otel-lgtm-proxy/internal/logger" - "github.com/matt-gp/otel-lgtm-proxy/internal/processor" "github.com/matt-gp/otel-lgtm-proxy/internal/util/proto" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" ) @@ -23,6 +21,7 @@ func (h *Handlers) Traces(w http.ResponseWriter, r *http.Request) { ) defer span.End() + // Unmarshal the incoming trace data data, err := proto.Unmarshal(r, &tracepb.TracesData{}) if err != nil { logger.Error(ctx, h.logger, err.Error()) @@ -32,35 +31,8 @@ func (h *Handlers) Traces(w http.ResponseWriter, r *http.Request) { return } - // Create processor for this request - proc, err := processor.New( - h.config, - &h.config.Traces, - "traces", - h.tracesClient, - h.logger, - h.meter, - h.tracer, - func(rs *tracepb.ResourceSpans) *resourcepb.Resource { - return rs.GetResource() - }, - func(resources []*tracepb.ResourceSpans) ([]byte, error) { - data := &tracepb.TracesData{ - ResourceSpans: resources, - } - return proto.Marshal(data) - }, - ) - if err != nil { - logger.Error(ctx, h.logger, err.Error()) - http.Error(w, err.Error(), http.StatusInternalServerError) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return - } - // Process the trace data - if err := proc.Dispatch(ctx, proc.Partition(ctx, data.GetResourceSpans())); err != nil { + if err := h.tracesProcessor.Dispatch(ctx, h.tracesProcessor.Partition(ctx, data.GetResourceSpans())); err != nil { logger.Error(ctx, h.logger, err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) span.RecordError(err) diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 219c984..99ff565 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -68,6 +68,7 @@ func New[T ResourceData]( getResource func(T) *resourcepb.Resource, marshalResources func([]T) ([]byte, error), ) (*Processor[T], error) { + // Create a counter for the total number of records processed by the proxy proxyRecordsMetric, err := meter.Int64Counter( "otel_lgtm_proxy_records_total", metric.WithDescription("Total number of otel lgtm proxy records processed"), @@ -76,6 +77,7 @@ func New[T ResourceData]( return nil, fmt.Errorf("failed to create otel lgtm proxy records counter: %w", err) } + // Create a counter for the total number of requests processed by the proxy proxyRequestsMetric, err := meter.Int64Counter( "otel_lgtm_proxy_requests_total", metric.WithDescription("Total number of otel lgtm proxy requests processed"), @@ -84,6 +86,7 @@ func New[T ResourceData]( return nil, fmt.Errorf("failed to create otel lgtm proxy requests counter: %w", err) } + // Create a histogram for the latency of requests processed by the proxy proxyLatencyMetric, err := meter.Int64Histogram( "otel_lgtm_proxy_request_duration_seconds", metric.WithDescription("Latency of otel lgtm proxy requests"), @@ -93,6 +96,7 @@ func New[T ResourceData]( return nil, fmt.Errorf("failed to create otel lgtm proxy latency histogram: %w", err) } + // Configure TLS if enabled if cert.TLSEnabled(&endpoint.TLS) { tlsConfig, err := cert.CreateTLSConfig(endpoint) if err != nil {