input: input_chunk: Handle flow with events metrics per window#11698
input: input_chunk: Handle flow with events metrics per window#11698cosmo0920 wants to merge 8 commits into
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a rate-gate feature for inputs: per-window ingestion accounting, rate-based pausing/resuming with hysteresis and optional backpressure, CMetrics gauges for rates and gate state, public rate-control APIs, and integration + unit tests exercising burst, fanout, filesystem, and memrb scenarios. Changes
Sequence Diagram(s)sequenceDiagram
participant Chunk as Input Chunk
participant Input as Input Instance
participant RateGate as Rate-Gate Control
participant Metrics as CMetrics
participant Pauser as Pause/Resume Handler
Chunk->>Input: flb_input_rate_update(timestamp, records, bytes)
Input->>RateGate: accumulate window counts
alt window elapsed
RateGate->>RateGate: compute per-second rates
RateGate->>Metrics: publish rate gauges
end
Chunk->>Input: flb_input_chunk_protect()
Input->>RateGate: flb_input_rate_gate_protect()
RateGate->>RateGate: sample backpressure (busy chunks, retries)
RateGate->>RateGate: evaluate limits
alt limits exceeded && backpressure enabled
RateGate->>Input: set rate_gate_status = PAUSED
RateGate->>Metrics: update gate gauges
RateGate-->>Input: return FLB_TRUE (pause)
else within limits
RateGate-->>Input: return FLB_FALSE
end
Chunk->>Input: flb_input_chunk_set_limits()
Input->>RateGate: flb_input_rate_gate_maybe_resume()
RateGate->>RateGate: apply resume_ratio hysteresis
alt rates below resume threshold
RateGate->>Input: set rate_gate_status = RUNNING
RateGate->>Pauser: signal resume (flb_input_resume)
RateGate->>Metrics: update gate gauges
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
1ba07dc to
28ad2e6
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/flb_input_chunk.c (2)
2451-2497:⚠️ Potential issue | 🟠 MajorThe new rate-gate guard creates a circular wait.
These resume branches now require
rate_gate_status == FLB_INPUT_RUNNING, butflb_input_rate_gate_maybe_resume()only clears the rate gate aftermem_buf_statusandstorage_buf_statusare already running. In a mixed-pause state, neither side can transition first, so scheduler-driven recovery stalls.Suggested fix
- in->mem_buf_status == FLB_INPUT_PAUSED && - in->rate_gate_status == FLB_INPUT_RUNNING) { + in->mem_buf_status == FLB_INPUT_PAUSED) { in->mem_buf_status = FLB_INPUT_RUNNING; - if (in->p->cb_resume) { + if (in->rate_gate_status == FLB_INPUT_RUNNING && + in->p->cb_resume) { flb_input_resume(in); flb_info("[input] %s resume (mem buf overlimit - buf size %zuB now below limit %zuB)", flb_input_name(in), in->mem_chunks_size, in->mem_buf_limit); } } @@ - in->storage_buf_status == FLB_INPUT_PAUSED && - in->rate_gate_status == FLB_INPUT_RUNNING) { + in->storage_buf_status == FLB_INPUT_PAUSED) { in->storage_buf_status = FLB_INPUT_RUNNING; - if (in->p->cb_resume) { + if (in->rate_gate_status == FLB_INPUT_RUNNING && + in->p->cb_resume) { flb_input_resume(in); flb_info("[input] %s resume (storage buf overlimit %zu/%zu)", flb_input_name(in), ((struct flb_storage_input *) in->storage)->cio->total_chunks_up, ((struct flb_storage_input *) in->storage)->cio->max_chunks_up); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_input_chunk.c` around lines 2451 - 2497, The new guard requiring rate_gate_status == FLB_INPUT_RUNNING causes a circular wait between the mem/storage resume checks and the rate-gate clearance: call flb_input_rate_gate_maybe_resume() before the memory/storage resume blocks (or alternatively remove the rate_gate_status == FLB_INPUT_RUNNING check from the mem/storage resume branches) so the rate gate can clear first and allow flb_input_chunk_is_mem_overlimit, flb_input_chunk_is_storage_overlimit and subsequent flb_input_resume() calls to proceed; update the logic around in->rate_gate_status, flb_input_rate_gate_maybe_resume(), flb_input_resume(), flb_input_paused(), and the two checks that use rate_gate_status to break the deadlock.
1103-1116:⚠️ Potential issue | 🟠 MajorDon't count restored backlog as fresh ingress.
flb_input_chunk_map()marks these chunks asfs_backlog, so feeding them intoflb_input_rate_update()makes old filesystem backlog count against the live rate gate. A large backlog can pause the input on startup/rollout before any new events arrive.Suggested fix
- flb_input_rate_update(in, ts, ic->total_records, buf_size);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_input_chunk.c` around lines 1103 - 1116, The code is updating input rate metrics for chunks restored from disk, which were marked as fs_backlog by flb_input_chunk_map(), causing old backlog to be counted as fresh ingress; change the logic so flb_input_rate_update() is not called for fs_backlog chunks (check the chunk/backlog flag on ic, e.g. ic->fs_backlog or the corresponding indicator set by flb_input_chunk_map()) while retaining the counters if desired—wrap the flb_input_rate_update(in, ts, ic->total_records, buf_size) call in a conditional that skips it when the chunk is a filesystem backlog.
🧹 Nitpick comments (1)
tests/internal/input_rate_gate.c (1)
326-335: Add a backlog-vs-live rate-gate regression.The new accounting touches both live appends and chunk restoration, but this suite never exercises an
fs_backlogchunk being mapped before live traffic. A focused case here would catch startup/filesystem false positives.Based on learnings, "Applies to tests/internal/**/*.{c,h} : Add regression tests for: mixed signals, processor drop/modify paths, multi-route fan-out, backlog + live ingestion parity".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/internal/input_rate_gate.c` around lines 326 - 335, Add a regression test that exercises an fs_backlog chunk being mapped before any live appends to ensure rate-gate accounting treats backlog vs live correctly: implement a new test function (e.g., test_rate_gate_backlog_vs_live) that maps/ restores a backlog chunk into the filesystem, then starts live append traffic and asserts the rate-gate metrics/state (pause/resume/hysteresis or counters) reflect both backlog and live paths without falsely throttling new live writes; add this test symbol to TEST_LIST (alongside existing test_rate_gate_* entries) so the suite runs it and ensure the test covers mixed signals (backlog mapping then live ingestion) and verifies parity between backlog accounting and live append accounting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/flb_input_chunk.c`:
- Around line 2451-2497: The new guard requiring rate_gate_status ==
FLB_INPUT_RUNNING causes a circular wait between the mem/storage resume checks
and the rate-gate clearance: call flb_input_rate_gate_maybe_resume() before the
memory/storage resume blocks (or alternatively remove the rate_gate_status ==
FLB_INPUT_RUNNING check from the mem/storage resume branches) so the rate gate
can clear first and allow flb_input_chunk_is_mem_overlimit,
flb_input_chunk_is_storage_overlimit and subsequent flb_input_resume() calls to
proceed; update the logic around in->rate_gate_status,
flb_input_rate_gate_maybe_resume(), flb_input_resume(), flb_input_paused(), and
the two checks that use rate_gate_status to break the deadlock.
- Around line 1103-1116: The code is updating input rate metrics for chunks
restored from disk, which were marked as fs_backlog by flb_input_chunk_map(),
causing old backlog to be counted as fresh ingress; change the logic so
flb_input_rate_update() is not called for fs_backlog chunks (check the
chunk/backlog flag on ic, e.g. ic->fs_backlog or the corresponding indicator set
by flb_input_chunk_map()) while retaining the counters if desired—wrap the
flb_input_rate_update(in, ts, ic->total_records, buf_size) call in a conditional
that skips it when the chunk is a filesystem backlog.
---
Nitpick comments:
In `@tests/internal/input_rate_gate.c`:
- Around line 326-335: Add a regression test that exercises an fs_backlog chunk
being mapped before any live appends to ensure rate-gate accounting treats
backlog vs live correctly: implement a new test function (e.g.,
test_rate_gate_backlog_vs_live) that maps/ restores a backlog chunk into the
filesystem, then starts live append traffic and asserts the rate-gate
metrics/state (pause/resume/hysteresis or counters) reflect both backlog and
live paths without falsely throttling new live writes; add this test symbol to
TEST_LIST (alongside existing test_rate_gate_* entries) so the suite runs it and
ensure the test covers mixed signals (backlog mapping then live ingestion) and
verifies parity between backlog accounting and live append accounting.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1b590a09-ca8b-412f-a77d-f630e579ae40
📒 Files selected for processing (12)
include/fluent-bit/flb_input.hsrc/flb_input.csrc/flb_input_chunk.ctests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yamltests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.pytests/internal/CMakeLists.txttests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (7)
- tests/internal/CMakeLists.txt
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
b935e4b to
03c0db8
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/internal/input_rate_gate.c (1)
233-266: Test relies on implicit default values for resume conditions.Looking at
flb_input_rate_gate_maybe_resume()insrc/flb_input.c, the resume check requires multiple conditions beyond just the rate threshold:if (can_resume == FLB_TRUE && ins->mem_buf_status == FLB_INPUT_RUNNING && ins->storage_buf_status == FLB_INPUT_RUNNING && ins->config->is_running == FLB_TRUE && ins->config->is_ingestion_active == FLB_TRUE) {The test at line 262-263 expects
rate_gate_statusto becomeFLB_INPUT_RUNNING, but doesn't explicitly setmem_buf_status,storage_buf_status,config->is_running, orconfig->is_ingestion_active. This relies on default initialization values being correct.For test robustness and clarity, consider explicitly setting these preconditions:
🔧 Suggested improvement for explicit preconditions
ctx.input->rate_gate_max_bytes = 100; ctx.input->rate_gate_resume_ratio = 0.80; ctx.input->rate_window_start = cfl_time_now(); ctx.input->rate_window_size = 10 * FLB_NSEC_IN_SEC; + ctx.input->mem_buf_status = FLB_INPUT_RUNNING; + ctx.input->storage_buf_status = FLB_INPUT_RUNNING; + ctx.config->is_running = FLB_TRUE; + ctx.config->is_ingestion_active = FLB_TRUE; ctx.input->rate_bytes = 110.0;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/internal/input_rate_gate.c` around lines 233 - 266, The test test_rate_gate_hysteresis_resume relies on implicit defaults when calling flb_input_rate_gate_maybe_resume; explicitly set the preconditions used in the resume check: set ctx.input->mem_buf_status = FLB_INPUT_RUNNING, ctx.input->storage_buf_status = FLB_INPUT_RUNNING, ctx.input->config->is_running = FLB_TRUE, and ctx.input->config->is_ingestion_active = FLB_TRUE before calling flb_input_rate_gate_maybe_resume so the resume path in flb_input_rate_gate_maybe_resume() can be exercised deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tests/internal/input_rate_gate.c`:
- Around line 233-266: The test test_rate_gate_hysteresis_resume relies on
implicit defaults when calling flb_input_rate_gate_maybe_resume; explicitly set
the preconditions used in the resume check: set ctx.input->mem_buf_status =
FLB_INPUT_RUNNING, ctx.input->storage_buf_status = FLB_INPUT_RUNNING,
ctx.input->config->is_running = FLB_TRUE, and
ctx.input->config->is_ingestion_active = FLB_TRUE before calling
flb_input_rate_gate_maybe_resume so the resume path in
flb_input_rate_gate_maybe_resume() can be exercised deterministically.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d488da05-b9a8-4e56-98c0-82031eca786d
📒 Files selected for processing (9)
tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yamltests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.pytests/internal/CMakeLists.txttests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (6)
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (2)
- tests/internal/CMakeLists.txt
- tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/flb_input_chunk.c`:
- Around line 1116-1118: The mapped-chunk path never triggers
flb_input_rate_update because ic->fs_backlog is left set by
flb_input_chunk_map(); to fix, ensure mapped/restored chunks get the same
rate-gate accounting: either clear ic->fs_backlog (set to FLB_FALSE) after
flb_input_chunk_map() when the chunk is restored into the active route, or
explicitly call flb_input_rate_update(in, ts, ic->total_records, buf_size) for
mapped chunks before this conditional; locate flb_input_chunk_map(), the
ic->fs_backlog field, and the flb_input_rate_update(...) call to implement the
chosen change so restored chunks preserve route state and accounting parity with
live-ingested chunks.
In `@src/flb_input.c`:
- Around line 981-987: The code currently parses rate_gate.max_records using
atoi (in the prop_key_check(...) branch) which silently accepts malformed
strings; replace atoi with strtol to strictly validate the entire tmp string:
call strtol(tmp, &endptr, 10), check errno for ERANGE, ensure endptr points to
the terminating NUL (no leftover characters), and verify the parsed value is >=
0 and fits into size_t (reject negatives and out-of-range values); on any
validation failure free tmp with flb_sds_destroy(tmp) and return -1, otherwise
assign ins->rate_gate_max_records = (size_t) parsed_value.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 04a9929e-c6a9-4276-9926-9db5414d48b5
📒 Files selected for processing (12)
include/fluent-bit/flb_input.hsrc/flb_input.csrc/flb_input_chunk.ctests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yamltests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.pytests/internal/CMakeLists.txttests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (6)
- tests/internal/CMakeLists.txt
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
This branch adds hysteresis-based resume control for input rate gating (rate_gate.resume_ratio), centralizes backpressure-adjusted effective limit computation, and applies those limits consistently to pause/resume decisions. It also fixes filesystem-mode enforcement gaps and adds periodic scheduler-driven resume checks so paused inputs can recover without requiring new ingestion events.
Testing
Includes expanded internal tests for parsing/window/hysteresis/pause-resume safety and updated config_rate_gate integration scenarios for fanout+retry and rollout paths across memrb/filesystem cases.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
Using with integrated tests' configurations, there's no memory leaks:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit