-
Notifications
You must be signed in to change notification settings - Fork 2.7k
feat: add max pending entries option to batch-processor #12338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add max pending entries option to batch-processor #12338
Conversation
return | ||
end | ||
|
||
self.processed_entries = self.processed_entries + #batch.entries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please read the above code, for whole not ok
case, the failed batch will be discard after exceed max_retry_count
, in this case, batch will be free too, so it should be counting in processed_entries
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add test cases for this feature? start a mocking tcp server that don't read data from connected socket, this will accumulating pending log entry in apisix, and we can set max_pending_entries
to small value to confirm discard code is working.
…p/apisix into revolyssup/fix-batch-processor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this pr, I think we only need to update one plugin kafka-logger
, this should be enough
We can change other plugins later via separate PRs
function _M:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries) | ||
if max_pending_entries and | ||
self.total_pushed_entries - total_processed_entries(self) > max_pending_entries then | ||
core.log.error("max pending entries limit exceeded. discarding entry") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to print more useful information, eg: max_pending_entries, self.total_pushed_entries - total_processed_entries(self)
if max_pending_entries then | ||
local total_processed_entries = total_processed_entries(self) | ||
if self.total_pushed_entries - total_processed_entries > max_pending_entries then | ||
core.log.error("max pending entries limit exceeded. discarding entry.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if max_pending_entries then | ||
local total_processed_entries = total_processed_entries(self) | ||
if self.total_pushed_entries - total_processed_entries > max_pending_entries then | ||
core.log.error("max pending entries limit exceeded. discarding entry.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
apisix/plugins/kafka-logger.lua
Outdated
max_pending_entries = { | ||
type = "integer", | ||
description = "maximum number of pending entries in the batch processor", | ||
minimum = 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the minimum should be 1
here
0 is meaningless for APISIX user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, except one minor issue
Fixes #
When Kafka server is slow or unavailable, the logger objects keep accumulating faster than they are released causing memory spike.
The solution is to implement a limit in batch processor manager that will allow it to drop new objects pushed to it, if a given number of callbacks have not been processed.
Reproduction steps
kafka-logger:
batch_max_size: 80
brokers:
port: 9092
buffer_duration: 180
cluster_name: 1
inactive_timeout: 5
kafka_topic: test2
max_retry_count: 1
meta_format: default
meta_refresh_interval: 30
name: service kafka logger
producer_batch_num: 200
producer_batch_size: 104857600
producer_max_buffering: 50000
producer_time_linger: 1
producer_type: async
required_acks: 1
retry_delay: 1
timeout: 5
sudo docker update --cpus 0.1 apisix_kafka
wrk2 -c 200 -d 60 -t 4 -R 50000 http://xxxxx/test
Checklist