diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index dfbf30a743..cf9ef8a735 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" @@ -19,8 +19,12 @@ import ( oteltrace "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" ) +const grpcCompressorGzip = "gzip" + type Emitter interface { // Sends message with bytes and attributes to OTel Collector Emit(ctx context.Context, body []byte, attrKVs ...any) error @@ -391,6 +395,9 @@ func newMeterProvider(config Config, resource *sdkresource.Resource, creds crede otlpmetricgrpc.WithEndpoint(config.OtelExporterGRPCEndpoint), otlpmetricgrpc.WithHeaders(config.AuthHeaders), } + if config.MetricCompressorEnabled { + opts = append(opts, otlpmetricgrpc.WithCompressor(grpcCompressorGzip)) + } if config.MetricRetryConfig != nil { // NOTE: By default, the retry is enabled in the OTel SDK opts = append(opts, otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{ diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index d08226961d..1300ba89a5 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -33,6 +33,8 @@ type Config struct { MetricReaderInterval time.Duration MetricRetryConfig *RetryConfig MetricViews []metric.View + // MetricCompressorEnabled enables gRPC compression for metrics (uses "gzip"). + MetricCompressorEnabled bool // Custom Events via Chip Ingress Emitter ChipIngressEmitterEnabled bool diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 8bc42daa85..8704391b47 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -62,6 +62,7 @@ const ( envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE" envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE" envTelemetryLogStreamingEnabled = "CL_TELEMETRY_LOG_STREAMING_ENABLED" + envTelemetryMetricCompressorEnabled = "CL_TELEMETRY_METRIC_COMPRESSOR_ENABLED" envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" @@ -118,6 +119,7 @@ type EnvConfig struct { TelemetryEmitterExportMaxBatchSize int TelemetryEmitterMaxQueueSize int TelemetryLogStreamingEnabled bool + TelemetryMetricCompressorEnabled bool ChipIngressEndpoint string ChipIngressInsecureConnection bool @@ -187,6 +189,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize)) add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize)) add(envTelemetryLogStreamingEnabled, strconv.FormatBool(e.TelemetryLogStreamingEnabled)) + add(envTelemetryMetricCompressorEnabled, strconv.FormatBool(e.TelemetryMetricCompressorEnabled)) add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) @@ -351,6 +354,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envTelemetryLogStreamingEnabled, err) } + e.TelemetryMetricCompressorEnabled, err = getBool(envTelemetryMetricCompressorEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryMetricCompressorEnabled, err) + } // Optional e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint) e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index b3e9fd8ff9..7411316909 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -129,6 +129,7 @@ func (s *Server) start() error { ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "", ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint, ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + MetricCompressorEnabled: s.EnvConfig.TelemetryMetricCompressorEnabled, } if tracingConfig.Enabled {