Skip to content

Commit 1f54ac6

Browse files
committed
feat: add chip-ingress batch config keys
Add 7 chip-ingress batch configuration keys to enable tunable batch emitter behavior: - BufferSize (default 1000): in-memory queue size for chip-ingress events - MaxBatchSize (default 500): max events per PublishBatch RPC - MaxConcurrentSends (default 10): limits parallel PublishBatch calls - SendInterval (default 100ms): max wait before flushing incomplete batch - SendTimeout (default 3s): per-RPC timeout for PublishBatch - DrainTimeout (default 10s): max shutdown wait to flush queued events - MaxGRPCRequestSize (default 10 MiB): max serialized request size before batch splitting (min 1 MiB enforced by batch client) Wired through beholder.Config, batch_emitter_service.go, and loop EnvConfig (env vars, serialization, parsing) for LOOP plugin support.
1 parent 6bcb242 commit 1f54ac6

7 files changed

Lines changed: 108 additions & 5 deletions

File tree

pkg/beholder/batch_emitter_service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lg
6363
if drainTimeout == 0 {
6464
drainTimeout = defaults.ChipIngressDrainTimeout
6565
}
66+
maxGRPCRequestSize := int(cfg.ChipIngressMaxGRPCRequestSize)
67+
if maxGRPCRequestSize == 0 {
68+
maxGRPCRequestSize = int(defaults.ChipIngressMaxGRPCRequestSize)
69+
}
6670

6771
meter := otel.Meter("beholder/chip_ingress_batch_emitter")
6872
metrics, err := newBatchEmitterMetrics(meter)
@@ -77,6 +81,7 @@ func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lg
7781
batch.WithMaxPublishTimeout(sendTimeout),
7882
batch.WithShutdownTimeout(drainTimeout),
7983
batch.WithMaxConcurrentSends(maxConcurrentSends),
84+
batch.WithMaxGRPCRequestSize(maxGRPCRequestSize),
8085
batch.WithEventClone(false),
8186
)
8287
if err != nil {

pkg/beholder/batch_emitter_service_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func newTestConfig() beholder.Config {
3131
ChipIngressSendInterval: 50 * time.Millisecond,
3232
ChipIngressSendTimeout: 5 * time.Second,
3333
ChipIngressDrainTimeout: 5 * time.Second,
34+
ChipIngressMaxGRPCRequestSize: 1024 * 1024,
3435
}
3536
}
3637

@@ -573,6 +574,7 @@ func BenchmarkChipIngressBatchEmitterService_Emit(b *testing.B) {
573574
ChipIngressSendInterval: time.Hour,
574575
ChipIngressSendTimeout: 5 * time.Second,
575576
ChipIngressDrainTimeout: 5 * time.Second,
577+
ChipIngressMaxGRPCRequestSize: 1024 * 1024,
576578
}
577579
emitter, err := beholder.NewChipIngressBatchEmitterService(&chipingress.NoopClient{}, cfg, logger.Test(b))
578580
if err != nil {

pkg/beholder/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type Config struct {
5757
ChipIngressSendTimeout time.Duration // Timeout per PublishBatch call (default 3s)
5858
ChipIngressDrainTimeout time.Duration // Max time to flush remaining events on shutdown (default 10s)
5959
ChipIngressMaxConcurrentSends int // Max concurrent PublishBatch calls (default 10)
60+
ChipIngressMaxGRPCRequestSize uint // Max serialized PublishBatch request size in bytes (default 10 MiB)
6061
ChipIngressLogger logger.Logger // Required when ChipIngressBatchEmitterEnabled is true
6162

6263
// OTel Log
@@ -158,6 +159,7 @@ func DefaultConfig() Config {
158159
ChipIngressSendTimeout: 3 * time.Second,
159160
ChipIngressDrainTimeout: 10 * time.Second,
160161
ChipIngressMaxConcurrentSends: defaultMaxConcurrentSends,
162+
ChipIngressMaxGRPCRequestSize: 10 * 1024 * 1024, // 10 MiB
161163
// Auth (defaults to static auth mode with TTL=0)
162164
AuthHeadersTTL: 0,
163165
}

pkg/beholder/testdata/config-example.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"ChipIngressSendTimeout": 0,
4545
"ChipIngressDrainTimeout": 0,
4646
"ChipIngressMaxConcurrentSends": 0,
47+
"ChipIngressMaxGRPCRequestSize": 0,
4748
"ChipIngressLogger": null,
4849
"LogExportTimeout": 1000000000,
4950
"LogExportInterval": 1000000000,

pkg/loop/config.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ const (
9191
envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED"
9292
envChipIngressDurableEmitterEnabled = "CL_CHIP_INGRESS_DURABLE_EMITTER_ENABLED"
9393

94+
envChipIngressBufferSize = "CL_CHIP_INGRESS_BUFFER_SIZE"
95+
envChipIngressMaxBatchSize = "CL_CHIP_INGRESS_MAX_BATCH_SIZE"
96+
envChipIngressMaxConcurrentSends = "CL_CHIP_INGRESS_MAX_CONCURRENT_SENDS"
97+
envChipIngressSendInterval = "CL_CHIP_INGRESS_SEND_INTERVAL"
98+
envChipIngressSendTimeout = "CL_CHIP_INGRESS_SEND_TIMEOUT"
99+
envChipIngressDrainTimeout = "CL_CHIP_INGRESS_DRAIN_TIMEOUT"
100+
envChipIngressMaxGRPCRequestSize = "CL_CHIP_INGRESS_MAX_GRPC_REQUEST_SIZE"
101+
94102
envCRESettings = cresettings.EnvNameSettings
95103
envCRESettingsDefault = cresettings.EnvNameSettingsDefault
96104
)
@@ -105,6 +113,14 @@ type EnvConfig struct {
105113
ChipIngressBatchEmitterEnabled bool
106114
ChipIngressDurableEmitterEnabled bool
107115

116+
ChipIngressBufferSize uint
117+
ChipIngressMaxBatchSize uint
118+
ChipIngressMaxConcurrentSends int
119+
ChipIngressSendInterval time.Duration
120+
ChipIngressSendTimeout time.Duration
121+
ChipIngressDrainTimeout time.Duration
122+
ChipIngressMaxGRPCRequestSize uint
123+
108124
CRESettings string
109125
CRESettingsDefault string
110126

@@ -269,6 +285,13 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
269285
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
270286
add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled))
271287
add(envChipIngressDurableEmitterEnabled, strconv.FormatBool(e.ChipIngressDurableEmitterEnabled))
288+
add(envChipIngressBufferSize, strconv.FormatUint(uint64(e.ChipIngressBufferSize), 10))
289+
add(envChipIngressMaxBatchSize, strconv.FormatUint(uint64(e.ChipIngressMaxBatchSize), 10))
290+
add(envChipIngressMaxConcurrentSends, strconv.Itoa(e.ChipIngressMaxConcurrentSends))
291+
add(envChipIngressSendInterval, e.ChipIngressSendInterval.String())
292+
add(envChipIngressSendTimeout, e.ChipIngressSendTimeout.String())
293+
add(envChipIngressDrainTimeout, e.ChipIngressDrainTimeout.String())
294+
add(envChipIngressMaxGRPCRequestSize, strconv.FormatUint(uint64(e.ChipIngressMaxGRPCRequestSize), 10))
272295

273296
if e.CRESettings != "" {
274297
add(envCRESettings, e.CRESettings)
@@ -513,6 +536,34 @@ func (e *EnvConfig) parse() error {
513536
if err != nil {
514537
return fmt.Errorf("failed to parse %s: %w", envChipIngressDurableEmitterEnabled, err)
515538
}
539+
e.ChipIngressBufferSize, err = getUint(envChipIngressBufferSize)
540+
if err != nil {
541+
return fmt.Errorf("failed to parse %s: %w", envChipIngressBufferSize, err)
542+
}
543+
e.ChipIngressMaxBatchSize, err = getUint(envChipIngressMaxBatchSize)
544+
if err != nil {
545+
return fmt.Errorf("failed to parse %s: %w", envChipIngressMaxBatchSize, err)
546+
}
547+
e.ChipIngressMaxConcurrentSends, err = getInt(envChipIngressMaxConcurrentSends)
548+
if err != nil {
549+
return fmt.Errorf("failed to parse %s: %w", envChipIngressMaxConcurrentSends, err)
550+
}
551+
e.ChipIngressSendInterval, err = getDuration(envChipIngressSendInterval)
552+
if err != nil {
553+
return fmt.Errorf("failed to parse %s: %w", envChipIngressSendInterval, err)
554+
}
555+
e.ChipIngressSendTimeout, err = getDuration(envChipIngressSendTimeout)
556+
if err != nil {
557+
return fmt.Errorf("failed to parse %s: %w", envChipIngressSendTimeout, err)
558+
}
559+
e.ChipIngressDrainTimeout, err = getDuration(envChipIngressDrainTimeout)
560+
if err != nil {
561+
return fmt.Errorf("failed to parse %s: %w", envChipIngressDrainTimeout, err)
562+
}
563+
e.ChipIngressMaxGRPCRequestSize, err = getUint(envChipIngressMaxGRPCRequestSize)
564+
if err != nil {
565+
return fmt.Errorf("failed to parse %s: %w", envChipIngressMaxGRPCRequestSize, err)
566+
}
516567
}
517568

518569
e.CRESettings = os.Getenv(envCRESettings)
@@ -605,6 +656,18 @@ func getEnv[T any](key string, parse func(string) (T, error)) (t T, err error) {
605656
return
606657
}
607658

659+
func getUint(envKey string) (uint, error) {
660+
s := os.Getenv(envKey)
661+
if s == "" {
662+
return 0, nil
663+
}
664+
u, err := strconv.ParseUint(s, 10, 64)
665+
if err != nil {
666+
return 0, err
667+
}
668+
return uint(u), nil
669+
}
670+
608671
func getInt(envKey string) (int, error) {
609672
s := os.Getenv(envKey)
610673
if s == "" {

pkg/loop/config_test.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,19 @@ func TestEnvConfig_parse(t *testing.T) {
8686
envTelemetryPrometheusBridgeEnabled: "true",
8787
envTelemetryPrometheusBridgePrefixes: "foo,bar",
8888

89-
envChipIngressEndpoint: "chip-ingress.example.com:50051",
90-
envChipIngressInsecureConnection: "true",
91-
envChipIngressBatchEmitterEnabled: "false",
92-
93-
envCRESettings: `{"global":{}}`,
89+
envChipIngressEndpoint: "chip-ingress.example.com:50051",
90+
envChipIngressInsecureConnection: "true",
91+
envChipIngressBatchEmitterEnabled: "false",
92+
93+
envChipIngressBufferSize: "1000",
94+
envChipIngressMaxBatchSize: "500",
95+
envChipIngressMaxConcurrentSends: "10",
96+
envChipIngressSendInterval: "100ms",
97+
envChipIngressSendTimeout: "3s",
98+
envChipIngressDrainTimeout: "10s",
99+
envChipIngressMaxGRPCRequestSize: "10485760",
100+
101+
envCRESettings: `{"global":{}}`,
94102
envCRESettingsDefault: `{"foo":"bar"}`,
95103
},
96104
expectError: false,
@@ -205,6 +213,14 @@ var envCfgFull = EnvConfig{
205213
ChipIngressBatchEmitterEnabled: false,
206214
ChipIngressDurableEmitterEnabled: false,
207215

216+
ChipIngressBufferSize: 1000,
217+
ChipIngressMaxBatchSize: 500,
218+
ChipIngressMaxConcurrentSends: 10,
219+
ChipIngressSendInterval: 100 * time.Millisecond,
220+
ChipIngressSendTimeout: 3 * time.Second,
221+
ChipIngressDrainTimeout: 10 * time.Second,
222+
ChipIngressMaxGRPCRequestSize: 10485760,
223+
208224
CRESettings: `{"global":{}}`,
209225
CRESettingsDefault: `{"foo":"bar"}`,
210226
}
@@ -269,6 +285,13 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) {
269285
assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint])
270286
assert.Equal(t, "true", got[envChipIngressInsecureConnection])
271287
assert.Equal(t, "false", got[envChipIngressBatchEmitterEnabled])
288+
assert.Equal(t, "1000", got[envChipIngressBufferSize])
289+
assert.Equal(t, "500", got[envChipIngressMaxBatchSize])
290+
assert.Equal(t, "10", got[envChipIngressMaxConcurrentSends])
291+
assert.Equal(t, "100ms", got[envChipIngressSendInterval])
292+
assert.Equal(t, "3s", got[envChipIngressSendTimeout])
293+
assert.Equal(t, "10s", got[envChipIngressDrainTimeout])
294+
assert.Equal(t, "10485760", got[envChipIngressMaxGRPCRequestSize])
272295

273296
assert.JSONEq(t, `{"global":{}}`, got[envCRESettings])
274297
assert.JSONEq(t, `{"foo":"bar"}`, got[envCRESettingsDefault])

pkg/loop/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,13 @@ func (s *Server) start(opts ...ServerOpt) error {
189189
ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint,
190190
ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection,
191191
ChipIngressBatchEmitterEnabled: s.EnvConfig.ChipIngressBatchEmitterEnabled,
192+
ChipIngressBufferSize: s.EnvConfig.ChipIngressBufferSize,
193+
ChipIngressMaxBatchSize: s.EnvConfig.ChipIngressMaxBatchSize,
194+
ChipIngressMaxConcurrentSends: s.EnvConfig.ChipIngressMaxConcurrentSends,
195+
ChipIngressSendInterval: s.EnvConfig.ChipIngressSendInterval,
196+
ChipIngressSendTimeout: s.EnvConfig.ChipIngressSendTimeout,
197+
ChipIngressDrainTimeout: s.EnvConfig.ChipIngressDrainTimeout,
198+
ChipIngressMaxGRPCRequestSize: s.EnvConfig.ChipIngressMaxGRPCRequestSize,
192199
ChipIngressLogger: s.Logger,
193200
MetricCompressor: s.EnvConfig.TelemetryMetricCompressor,
194201
}

0 commit comments

Comments
 (0)