Skip to content

Commit 6b35566

Browse files
authored
Merge pull request #758 from marco-minervino/feat-maxackpending-input-jetstream-setting
feat: capability for jetstream input to set MaxAckPending
2 parents 1b98cbf + d5dcb0d commit 6b35566

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

docs/user_guide/inputs/jetstream_input.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ inputs:
6464
# default: 500
6565
fetch-batch-size: 200
6666

67+
# integer, maximum number of allowed pending ack on the stream
68+
# default: 1000
69+
max-ack-pending: 5000
70+
6771
# optional list of output names this input writes to
6872
# outputs must be configured at the root `outputs:` section
6973
outputs:

pkg/inputs/jetstream_input/jetstream_input.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const (
4141
defaultNumWorkers = 1
4242
defaultBufferSize = 500
4343
defaultFetchBatchSize = 500
44+
defaultMaxAckPending = 1000
4445
)
4546

4647
type deliverPolicy string
@@ -113,6 +114,7 @@ type config struct {
113114
NumWorkers int `mapstructure:"num-workers,omitempty"`
114115
BufferSize int `mapstructure:"buffer-size,omitempty"`
115116
FetchBatchSize int `mapstructure:"fetch-batch-size,omitempty"`
117+
MaxAckPending *int `mapstructure:"max-ack-pending,omitempty"`
116118
Outputs []string `mapstructure:"outputs,omitempty"`
117119
EventProcessors []string `mapstructure:"event-processors,omitempty"`
118120
}
@@ -195,6 +197,7 @@ func (n *jetstreamInput) workerStart(ctx context.Context) error {
195197
AckPolicy: jetstream.AckAllPolicy,
196198
MemoryStorage: true,
197199
FilterSubjects: n.Cfg.Subjects,
200+
MaxAckPending: *n.Cfg.MaxAckPending,
198201
})
199202
if err != nil {
200203
return fmt.Errorf("failed to create consumer: %v", err)
@@ -375,6 +378,10 @@ func (n *jetstreamInput) setDefaults() error {
375378
if n.Cfg.FetchBatchSize <= 0 {
376379
n.Cfg.FetchBatchSize = defaultFetchBatchSize
377380
}
381+
if n.Cfg.MaxAckPending == nil || *n.Cfg.MaxAckPending <= -2 {
382+
v := defaultMaxAckPending
383+
n.Cfg.MaxAckPending = &v
384+
}
378385
return nil
379386
}
380387

0 commit comments

Comments
 (0)