Skip to content

Commit ce62f8d

Browse files
authored
Merge pull request #549 from wendall-robinson/Add_BufferSize_to_JetStream_Output
Added BufferSize to Jetsream and NATS outputs
2 parents 36634dd + edeba06 commit ce62f8d

File tree

4 files changed

+12
-2
lines changed

4 files changed

+12
-2
lines changed

docs/user_guide/outputs/jetstream_output.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ outputs:
122122
write-timeout: 5s
123123
# boolean, enables extra logging for the nats output
124124
debug: false
125+
# integer, sets the size of the local buffer where received
126+
# NATS messages are stored before being sent to outputs.
127+
# This value is set per worker. Defaults to 0 messages
128+
buffer-size: 0
125129
# boolean, enables the collection and export (via prometheus) of output specific metrics
126130
enable-metrics: false
127131
# list of processors to apply to the message before writing

docs/user_guide/outputs/nats_output.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ outputs:
7272
write-timeout: 5s
7373
# boolean, enables extra logging for the nats output
7474
debug: false
75+
# integer, sets the size of the local buffer where received
76+
# NATS messages are stored before being sent to outputs.
77+
# This value is set per worker. Defaults to 0 messages
78+
buffer-size: 0
7579
# boolean, enables the collection and export (via prometheus) of output specific metrics
7680
enable-metrics: false
7781
# list of processors to apply on the message before writing

pkg/outputs/nats_outputs/jetstream/jetstream_output.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type config struct {
8787
NumWorkers int `mapstructure:"num-workers,omitempty" json:"num-workers,omitempty"`
8888
WriteTimeout time.Duration `mapstructure:"write-timeout,omitempty" json:"write-timeout,omitempty"`
8989
Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"`
90+
BufferSize uint `mapstructure:"buffer-size,omitempty"`
9091
EnableMetrics bool `mapstructure:"enable-metrics,omitempty" json:"enable-metrics,omitempty"`
9192
EventProcessors []string `mapstructure:"event-processors,omitempty" json:"event-processors,omitempty"`
9293
}
@@ -136,7 +137,7 @@ func (n *jetstreamOutput) Init(ctx context.Context, name string, cfg map[string]
136137
return err
137138
}
138139

139-
n.msgChan = make(chan *outputs.ProtoMsg)
140+
n.msgChan = make(chan *outputs.ProtoMsg, n.Cfg.BufferSize)
140141
initMetrics()
141142
n.mo = &formatters.MarshalOptions{
142143
Format: n.Cfg.Format,

pkg/outputs/nats_outputs/nats/nats_output.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type Config struct {
8787
NumWorkers int `mapstructure:"num-workers,omitempty"`
8888
WriteTimeout time.Duration `mapstructure:"write-timeout,omitempty"`
8989
Debug bool `mapstructure:"debug,omitempty"`
90+
BufferSize uint `mapstructure:"buffer-size,omitempty"`
9091
EnableMetrics bool `mapstructure:"enable-metrics,omitempty"`
9192
EventProcessors []string `mapstructure:"event-processors,omitempty"`
9293
}
@@ -145,7 +146,7 @@ func (n *NatsOutput) Init(ctx context.Context, name string, cfg map[string]inter
145146
return err
146147
}
147148

148-
n.msgChan = make(chan *outputs.ProtoMsg)
149+
n.msgChan = make(chan *outputs.ProtoMsg, n.Cfg.BufferSize)
149150
initMetrics()
150151
n.mo = &formatters.MarshalOptions{
151152
Format: n.Cfg.Format,

0 commit comments

Comments
 (0)