Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
defer stop()

// Initialize handlers
h := handler.New(
h, err := handler.New(
cfg,
&http.Client{Timeout: cfg.Logs.Timeout},
&http.Client{Timeout: cfg.Metrics.Timeout},
Expand All @@ -57,6 +57,10 @@ func main() {
meterProvider,
tracerProvider,
)
if err != nil {
logger.Error(ctx, loggingProvider, err.Error())
os.Exit(1)
}

// Initialize HTTP router
router := http.NewServeMux()
Expand Down
112 changes: 96 additions & 16 deletions internal/handler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,28 @@ package handler
import (
"github.com/matt-gp/otel-lgtm-proxy/internal/config"
"github.com/matt-gp/otel-lgtm-proxy/internal/processor"
"github.com/matt-gp/otel-lgtm-proxy/internal/util/proto"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
)

// Handlers contains the dependencies needed for all OTLP signal handlers.
type Handlers struct {
config *config.Config
logsClient processor.Client
metricsClient processor.Client
tracesClient processor.Client
logger log.Logger
meter metric.Meter
tracer trace.Tracer
config *config.Config
logsClient processor.Client
metricsClient processor.Client
tracesClient processor.Client
logger log.Logger
meter metric.Meter
tracer trace.Tracer
logsProcessor processor.Processor[*logpb.ResourceLogs]
metricsProcessor processor.Processor[*metricpb.ResourceMetrics]
tracesProcessor processor.Processor[*tracepb.ResourceSpans]
}

// New creates a new Handlers instance.
Expand All @@ -29,14 +37,86 @@ func New(
logger log.Logger,
meter metric.Meter,
tracer trace.Tracer,
) *Handlers {
return &Handlers{
config: config,
logsClient: logsClient,
metricsClient: metricsClient,
tracesClient: tracesClient,
logger: logger,
meter: meter,
tracer: tracer,
) (*Handlers, error) {
// Create logs processor
logsProcessor, err := processor.New(
config,
&config.Logs,
"logs",
logsClient,
logger,
meter,
tracer,
func(rl *logpb.ResourceLogs) *resourcepb.Resource {
return rl.GetResource()
},
func(resources []*logpb.ResourceLogs) ([]byte, error) {
data := &logpb.LogsData{
ResourceLogs: resources,
}
return proto.Marshal(data)
},
)
if err != nil {
return nil, err
}

// Create metrics processor
metricsProcessor, err := processor.New(
config,
&config.Metrics,
"metrics",
metricsClient,
logger,
meter,
tracer,
func(rm *metricpb.ResourceMetrics) *resourcepb.Resource {
return rm.GetResource()
},
func(resources []*metricpb.ResourceMetrics) ([]byte, error) {
data := &metricpb.MetricsData{
ResourceMetrics: resources,
}
return proto.Marshal(data)
},
)
if err != nil {
return nil, err
}

// Create traces processor
tracesProcessor, err := processor.New(
config,
&config.Traces,
"traces",
tracesClient,
logger,
meter,
tracer,
func(rs *tracepb.ResourceSpans) *resourcepb.Resource {
return rs.GetResource()
},
func(resources []*tracepb.ResourceSpans) ([]byte, error) {
data := &tracepb.TracesData{
ResourceSpans: resources,
}
return proto.Marshal(data)
},
)
if err != nil {
return nil, err
}

return &Handlers{
config: config,
logsClient: logsClient,
metricsClient: metricsClient,
tracesClient: tracesClient,
logger: logger,
meter: meter,
tracer: tracer,
logsProcessor: *logsProcessor,
metricsProcessor: *metricsProcessor,
tracesProcessor: *tracesProcessor,
}, nil
}
41 changes: 31 additions & 10 deletions internal/handler/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@ func TestNew(t *testing.T) {
logsClient *http.Client
metricsClient *http.Client
tracesClient *http.Client
wantErr bool
}{
{
name: "creates handlers with all dependencies",
name: "creates handlers with all dependencies and processors",
config: &config.Config{
Tenant: config.Tenant{
Label: "tenant.id",
Default: "default",
},
Logs: config.Endpoint{
Address: "http://localhost:3100",
},
Metrics: config.Endpoint{
Address: "http://localhost:9009",
},
Traces: config.Endpoint{
Address: "http://localhost:4318",
},
},
logsClient: &http.Client{},
metricsClient: &http.Client{},
tracesClient: &http.Client{},
wantErr: false,
},
}

Expand All @@ -39,7 +50,7 @@ func TestNew(t *testing.T) {
meter := noopmetric.NewMeterProvider().Meter("test")
tracer := nooptrace.NewTracerProvider().Tracer("test")

handlers := New(
handlers, err := New(
tt.config,
tt.logsClient,
tt.metricsClient,
Expand All @@ -49,14 +60,24 @@ func TestNew(t *testing.T) {
tracer,
)

assert.NotNil(t, handlers)
assert.Equal(t, tt.config, handlers.config)
assert.Equal(t, tt.logsClient, handlers.logsClient)
assert.Equal(t, tt.metricsClient, handlers.metricsClient)
assert.Equal(t, tt.tracesClient, handlers.tracesClient)
assert.NotNil(t, handlers.logger)
assert.NotNil(t, handlers.meter)
assert.NotNil(t, handlers.tracer)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, handlers)
} else {
assert.NoError(t, err)
assert.NotNil(t, handlers)
assert.Equal(t, tt.config, handlers.config)
assert.Equal(t, tt.logsClient, handlers.logsClient)
assert.Equal(t, tt.metricsClient, handlers.metricsClient)
assert.Equal(t, tt.tracesClient, handlers.tracesClient)
assert.NotNil(t, handlers.logger)
assert.NotNil(t, handlers.meter)
assert.NotNil(t, handlers.tracer)
// Verify processors were created
assert.NotNil(t, handlers.logsProcessor)
assert.NotNil(t, handlers.metricsProcessor)
assert.NotNil(t, handlers.tracesProcessor)
}
})
}
}
32 changes: 2 additions & 30 deletions internal/handler/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import (
"net/http"

"github.com/matt-gp/otel-lgtm-proxy/internal/logger"
"github.com/matt-gp/otel-lgtm-proxy/internal/processor"
"github.com/matt-gp/otel-lgtm-proxy/internal/util/proto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
)

// Logs handles incoming OTLP log requests.
Expand All @@ -23,6 +21,7 @@ func (h *Handlers) Logs(w http.ResponseWriter, r *http.Request) {
)
defer span.End()

// Unmarshal the incoming log data
data, err := proto.Unmarshal(r, &logpb.LogsData{})
if err != nil {
logger.Error(ctx, h.logger, err.Error())
Expand All @@ -32,35 +31,8 @@ func (h *Handlers) Logs(w http.ResponseWriter, r *http.Request) {
return
}

// Create processor for this request
proc, err := processor.New(
h.config,
&h.config.Logs,
"logs",
h.logsClient,
h.logger,
h.meter,
h.tracer,
func(rl *logpb.ResourceLogs) *resourcepb.Resource {
return rl.GetResource()
},
func(resources []*logpb.ResourceLogs) ([]byte, error) {
data := &logpb.LogsData{
ResourceLogs: resources,
}
return proto.Marshal(data)
},
)
if err != nil {
logger.Error(ctx, h.logger, err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return
}

// Process the log data
if err := proc.Dispatch(ctx, proc.Partition(ctx, data.GetResourceLogs())); err != nil {
if err := h.logsProcessor.Dispatch(ctx, h.logsProcessor.Partition(ctx, data.GetResourceLogs())); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return
Expand Down
32 changes: 2 additions & 30 deletions internal/handler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import (
"net/http"

"github.com/matt-gp/otel-lgtm-proxy/internal/logger"
"github.com/matt-gp/otel-lgtm-proxy/internal/processor"
"github.com/matt-gp/otel-lgtm-proxy/internal/util/proto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
)

// Metrics handles incoming OTLP metric requests.
Expand All @@ -23,6 +21,7 @@ func (h *Handlers) Metrics(w http.ResponseWriter, r *http.Request) {
)
defer span.End()

// Unmarshal the incoming metric data
data, err := proto.Unmarshal(r, &metricpb.MetricsData{})
if err != nil {
logger.Error(ctx, h.logger, err.Error())
Expand All @@ -32,35 +31,8 @@ func (h *Handlers) Metrics(w http.ResponseWriter, r *http.Request) {
return
}

// Create processor for this request
proc, err := processor.New(
h.config,
&h.config.Metrics,
"metrics",
h.metricsClient,
h.logger,
h.meter,
h.tracer,
func(rm *metricpb.ResourceMetrics) *resourcepb.Resource {
return rm.GetResource()
},
func(resources []*metricpb.ResourceMetrics) ([]byte, error) {
data := &metricpb.MetricsData{
ResourceMetrics: resources,
}
return proto.Marshal(data)
},
)
if err != nil {
logger.Error(ctx, h.logger, err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return
}

// Process the metric data
if err := proc.Dispatch(ctx, proc.Partition(ctx, data.GetResourceMetrics())); err != nil {
if err := h.metricsProcessor.Dispatch(ctx, h.metricsProcessor.Partition(ctx, data.GetResourceMetrics())); err != nil {
logger.Error(ctx, h.logger, err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
span.RecordError(err)
Expand Down
Loading
Loading