Skip to content

Commit dbfa73d

Browse files
Added BufferSize to Jetsream and NATS outputs
* follow-up to Parallel processing with Jetstream #342 * added option to provide a buffer size to go channel on output * updaed user guides for Jetstream and NATS outputs
1 parent cc35c12 commit dbfa73d

File tree

4 files changed

+12
-2
lines changed

4 files changed

+12
-2
lines changed

docs/user_guide/outputs/jetstream_output.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ outputs:
123123
# boolean, enables extra logging for the nats output
124124
debug: false
125125
# boolean, enables the collection and export (via prometheus) of output specific metrics
126+
# integer, sets the size of the local buffer where received
127+
# NATS messages are stored before being sent to outputs.
128+
# This value is set per worker. Defaults to 100 messages
129+
buffer-size: 100
126130
enable-metrics: false
127131
# list of processors to apply to the message before writing
128132
event-processors:

docs/user_guide/outputs/nats_output.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ outputs:
7373
# boolean, enables extra logging for the nats output
7474
debug: false
7575
# boolean, enables the collection and export (via prometheus) of output specific metrics
76+
# integer, sets the size of the local buffer where received
77+
# NATS messages are stored before being sent to outputs.
78+
# This value is set per worker. Defaults to 100 messages
79+
buffer-size: 100
7680
enable-metrics: false
7781
# list of processors to apply on the message before writing
7882
event-processors:

pkg/outputs/nats_outputs/jetstream/jetstream_output.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type config struct {
8787
NumWorkers int `mapstructure:"num-workers,omitempty" json:"num-workers,omitempty"`
8888
WriteTimeout time.Duration `mapstructure:"write-timeout,omitempty" json:"write-timeout,omitempty"`
8989
Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"`
90+
BufferSize int `mapstructure:"buffer-size,omitempty"`
9091
EnableMetrics bool `mapstructure:"enable-metrics,omitempty" json:"enable-metrics,omitempty"`
9192
EventProcessors []string `mapstructure:"event-processors,omitempty" json:"event-processors,omitempty"`
9293
}
@@ -136,7 +137,7 @@ func (n *jetstreamOutput) Init(ctx context.Context, name string, cfg map[string]
136137
return err
137138
}
138139

139-
n.msgChan = make(chan *outputs.ProtoMsg)
140+
n.msgChan = make(chan *outputs.ProtoMsg, uint(n.Cfg.BufferSize))
140141
initMetrics()
141142
n.mo = &formatters.MarshalOptions{
142143
Format: n.Cfg.Format,

pkg/outputs/nats_outputs/nats/nats_output.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type Config struct {
8787
NumWorkers int `mapstructure:"num-workers,omitempty"`
8888
WriteTimeout time.Duration `mapstructure:"write-timeout,omitempty"`
8989
Debug bool `mapstructure:"debug,omitempty"`
90+
BufferSize int `mapstructure:"buffer-size,omitempty"`
9091
EnableMetrics bool `mapstructure:"enable-metrics,omitempty"`
9192
EventProcessors []string `mapstructure:"event-processors,omitempty"`
9293
}
@@ -145,7 +146,7 @@ func (n *NatsOutput) Init(ctx context.Context, name string, cfg map[string]inter
145146
return err
146147
}
147148

148-
n.msgChan = make(chan *outputs.ProtoMsg)
149+
n.msgChan = make(chan *outputs.ProtoMsg, uint(n.Cfg.BufferSize))
149150
initMetrics()
150151
n.mo = &formatters.MarshalOptions{
151152
Format: n.Cfg.Format,

0 commit comments

Comments
 (0)