Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/beholder/batch_emitter_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lg
if drainTimeout == 0 {
drainTimeout = defaults.ChipIngressDrainTimeout
}
maxGRPCRequestSize := int(cfg.ChipIngressMaxGRPCRequestSize)
if maxGRPCRequestSize == 0 {
maxGRPCRequestSize = int(defaults.ChipIngressMaxGRPCRequestSize)
}

meter := otel.Meter("beholder/chip_ingress_batch_emitter")
metrics, err := newBatchEmitterMetrics(meter)
Expand All @@ -77,6 +81,7 @@ func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lg
batch.WithMaxPublishTimeout(sendTimeout),
batch.WithShutdownTimeout(drainTimeout),
batch.WithMaxConcurrentSends(maxConcurrentSends),
batch.WithMaxGRPCRequestSize(maxGRPCRequestSize),
batch.WithEventClone(false),
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/beholder/batch_emitter_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func newTestConfig() beholder.Config {
ChipIngressSendInterval: 50 * time.Millisecond,
ChipIngressSendTimeout: 5 * time.Second,
ChipIngressDrainTimeout: 5 * time.Second,
ChipIngressMaxGRPCRequestSize: 1024 * 1024,
}
}

Expand Down Expand Up @@ -573,6 +574,7 @@ func BenchmarkChipIngressBatchEmitterService_Emit(b *testing.B) {
ChipIngressSendInterval: time.Hour,
ChipIngressSendTimeout: 5 * time.Second,
ChipIngressDrainTimeout: 5 * time.Second,
ChipIngressMaxGRPCRequestSize: 1024 * 1024,
}
emitter, err := beholder.NewChipIngressBatchEmitterService(&chipingress.NoopClient{}, cfg, logger.Test(b))
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Config struct {
ChipIngressSendTimeout time.Duration // Timeout per PublishBatch call (default 3s)
ChipIngressDrainTimeout time.Duration // Max time to flush remaining events on shutdown (default 10s)
ChipIngressMaxConcurrentSends int // Max concurrent PublishBatch calls (default 10)
ChipIngressMaxGRPCRequestSize uint // Max serialized PublishBatch request size in bytes (default 10 MiB)
ChipIngressLogger logger.Logger // Required when ChipIngressBatchEmitterEnabled is true

// OTel Log
Expand Down Expand Up @@ -158,6 +159,7 @@ func DefaultConfig() Config {
ChipIngressSendTimeout: 3 * time.Second,
ChipIngressDrainTimeout: 10 * time.Second,
ChipIngressMaxConcurrentSends: defaultMaxConcurrentSends,
ChipIngressMaxGRPCRequestSize: 10 * 1024 * 1024, // 10 MiB
// Auth (defaults to static auth mode with TTL=0)
AuthHeadersTTL: 0,
}
Expand Down
1 change: 1 addition & 0 deletions pkg/beholder/testdata/config-example.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"ChipIngressSendTimeout": 0,
"ChipIngressDrainTimeout": 0,
"ChipIngressMaxConcurrentSends": 0,
"ChipIngressMaxGRPCRequestSize": 0,
"ChipIngressLogger": null,
"LogExportTimeout": 1000000000,
"LogExportInterval": 1000000000,
Expand Down
63 changes: 63 additions & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@
envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED"
envChipIngressDurableEmitterEnabled = "CL_CHIP_INGRESS_DURABLE_EMITTER_ENABLED"

envChipIngressBufferSize = "CL_CHIP_INGRESS_BUFFER_SIZE"
envChipIngressMaxBatchSize = "CL_CHIP_INGRESS_MAX_BATCH_SIZE"
envChipIngressMaxConcurrentSends = "CL_CHIP_INGRESS_MAX_CONCURRENT_SENDS"
envChipIngressSendInterval = "CL_CHIP_INGRESS_SEND_INTERVAL"
envChipIngressSendTimeout = "CL_CHIP_INGRESS_SEND_TIMEOUT"
envChipIngressDrainTimeout = "CL_CHIP_INGRESS_DRAIN_TIMEOUT"
envChipIngressMaxGRPCRequestSize = "CL_CHIP_INGRESS_MAX_GRPC_REQUEST_SIZE"

envCRESettings = cresettings.EnvNameSettings
envCRESettingsDefault = cresettings.EnvNameSettingsDefault
)
Expand All @@ -105,6 +113,14 @@
ChipIngressBatchEmitterEnabled bool
ChipIngressDurableEmitterEnabled bool

ChipIngressBufferSize uint
ChipIngressMaxBatchSize uint
ChipIngressMaxConcurrentSends int
ChipIngressSendInterval time.Duration
ChipIngressSendTimeout time.Duration
ChipIngressDrainTimeout time.Duration
ChipIngressMaxGRPCRequestSize uint

CRESettings string
CRESettingsDefault string

Expand Down Expand Up @@ -269,6 +285,13 @@
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled))
add(envChipIngressDurableEmitterEnabled, strconv.FormatBool(e.ChipIngressDurableEmitterEnabled))
add(envChipIngressBufferSize, strconv.FormatUint(uint64(e.ChipIngressBufferSize), 10))
add(envChipIngressMaxBatchSize, strconv.FormatUint(uint64(e.ChipIngressMaxBatchSize), 10))
add(envChipIngressMaxConcurrentSends, strconv.Itoa(e.ChipIngressMaxConcurrentSends))
add(envChipIngressSendInterval, e.ChipIngressSendInterval.String())
add(envChipIngressSendTimeout, e.ChipIngressSendTimeout.String())
add(envChipIngressDrainTimeout, e.ChipIngressDrainTimeout.String())
add(envChipIngressMaxGRPCRequestSize, strconv.FormatUint(uint64(e.ChipIngressMaxGRPCRequestSize), 10))

if e.CRESettings != "" {
add(envCRESettings, e.CRESettings)
Expand Down Expand Up @@ -513,6 +536,34 @@
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressDurableEmitterEnabled, err)
}
e.ChipIngressBufferSize, err = getUint(envChipIngressBufferSize)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressBufferSize, err)
}
e.ChipIngressMaxBatchSize, err = getUint(envChipIngressMaxBatchSize)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressMaxBatchSize, err)
}
e.ChipIngressMaxConcurrentSends, err = getInt(envChipIngressMaxConcurrentSends)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressMaxConcurrentSends, err)
}
e.ChipIngressSendInterval, err = getDuration(envChipIngressSendInterval)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressSendInterval, err)
}
e.ChipIngressSendTimeout, err = getDuration(envChipIngressSendTimeout)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressSendTimeout, err)
}
e.ChipIngressDrainTimeout, err = getDuration(envChipIngressDrainTimeout)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressDrainTimeout, err)
}
e.ChipIngressMaxGRPCRequestSize, err = getUint(envChipIngressMaxGRPCRequestSize)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envChipIngressMaxGRPCRequestSize, err)
}
}

e.CRESettings = os.Getenv(envCRESettings)
Expand Down Expand Up @@ -605,6 +656,18 @@
return
}

func getUint(envKey string) (uint, error) {
s := os.Getenv(envKey)
if s == "" {
return 0, nil
}
u, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0, err
}
return uint(u), nil

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 64-bit integer from
strconv.ParseUint
to a lower bit size type uint without an upper bound check.
}

func getInt(envKey string) (int, error) {
s := os.Getenv(envKey)
if s == "" {
Expand Down
33 changes: 28 additions & 5 deletions pkg/loop/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,19 @@ func TestEnvConfig_parse(t *testing.T) {
envTelemetryPrometheusBridgeEnabled: "true",
envTelemetryPrometheusBridgePrefixes: "foo,bar",

envChipIngressEndpoint: "chip-ingress.example.com:50051",
envChipIngressInsecureConnection: "true",
envChipIngressBatchEmitterEnabled: "false",

envCRESettings: `{"global":{}}`,
envChipIngressEndpoint: "chip-ingress.example.com:50051",
envChipIngressInsecureConnection: "true",
envChipIngressBatchEmitterEnabled: "false",

envChipIngressBufferSize: "1000",
envChipIngressMaxBatchSize: "500",
envChipIngressMaxConcurrentSends: "10",
envChipIngressSendInterval: "100ms",
envChipIngressSendTimeout: "3s",
envChipIngressDrainTimeout: "10s",
envChipIngressMaxGRPCRequestSize: "10485760",

envCRESettings: `{"global":{}}`,
envCRESettingsDefault: `{"foo":"bar"}`,
},
expectError: false,
Expand Down Expand Up @@ -205,6 +213,14 @@ var envCfgFull = EnvConfig{
ChipIngressBatchEmitterEnabled: false,
ChipIngressDurableEmitterEnabled: false,

ChipIngressBufferSize: 1000,
ChipIngressMaxBatchSize: 500,
ChipIngressMaxConcurrentSends: 10,
ChipIngressSendInterval: 100 * time.Millisecond,
ChipIngressSendTimeout: 3 * time.Second,
ChipIngressDrainTimeout: 10 * time.Second,
ChipIngressMaxGRPCRequestSize: 10485760,

CRESettings: `{"global":{}}`,
CRESettingsDefault: `{"foo":"bar"}`,
}
Expand Down Expand Up @@ -269,6 +285,13 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) {
assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint])
assert.Equal(t, "true", got[envChipIngressInsecureConnection])
assert.Equal(t, "false", got[envChipIngressBatchEmitterEnabled])
assert.Equal(t, "1000", got[envChipIngressBufferSize])
assert.Equal(t, "500", got[envChipIngressMaxBatchSize])
assert.Equal(t, "10", got[envChipIngressMaxConcurrentSends])
assert.Equal(t, "100ms", got[envChipIngressSendInterval])
assert.Equal(t, "3s", got[envChipIngressSendTimeout])
assert.Equal(t, "10s", got[envChipIngressDrainTimeout])
assert.Equal(t, "10485760", got[envChipIngressMaxGRPCRequestSize])

assert.JSONEq(t, `{"global":{}}`, got[envCRESettings])
assert.JSONEq(t, `{"foo":"bar"}`, got[envCRESettingsDefault])
Expand Down
7 changes: 7 additions & 0 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ func (s *Server) start(opts ...ServerOpt) error {
ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint,
ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection,
ChipIngressBatchEmitterEnabled: s.EnvConfig.ChipIngressBatchEmitterEnabled,
ChipIngressBufferSize: s.EnvConfig.ChipIngressBufferSize,
ChipIngressMaxBatchSize: s.EnvConfig.ChipIngressMaxBatchSize,
ChipIngressMaxConcurrentSends: s.EnvConfig.ChipIngressMaxConcurrentSends,
ChipIngressSendInterval: s.EnvConfig.ChipIngressSendInterval,
ChipIngressSendTimeout: s.EnvConfig.ChipIngressSendTimeout,
ChipIngressDrainTimeout: s.EnvConfig.ChipIngressDrainTimeout,
ChipIngressMaxGRPCRequestSize: s.EnvConfig.ChipIngressMaxGRPCRequestSize,
ChipIngressLogger: s.Logger,
MetricCompressor: s.EnvConfig.TelemetryMetricCompressor,
}
Expand Down
Loading