Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions lib/logsource.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@

gboolean accurate_nanosleep = FALSE;

/* Batch source wakeups: only re-arm the source once this many slots have
* been freed up in the flow-control window since it ran out. Amortizes
* wakeup overhead under high message rates. Clamped to the source's
* initial_window_size at use, so small windows still get woken. */
#define LOG_SOURCE_WAKEUP_BATCH_SIZE 100

void
log_source_wakeup(LogSource *self)
{
Expand Down Expand Up @@ -77,6 +83,7 @@ _flow_control_window_size_adjust(LogSource *self, guint32 window_size_increment,
window_size_increment = log_source_gather_dynamic_window_reclamation(self, window_size_increment);

gsize old_window_size = _window_size_add(self, window_size_increment, &suspended);
gsize new_window_size = old_window_size + window_size_increment;

msg_diagnostics("Window size adjustment",
evt_tag_int("old_window_size", old_window_size),
Expand All @@ -87,7 +94,22 @@ _flow_control_window_size_adjust(LogSource *self, guint32 window_size_increment,
gboolean need_to_resume_counter = !last_ack_type_is_suspended && suspended;
if (need_to_resume_counter)
window_size_counter_resume(&self->window_size);
if ((window_size_increment != 0 && old_window_size == 0) || need_to_resume_counter)

/* Wake the source on one of two transitions:
* - the explicit destination-side suspend bit is being lifted (back-pressure
* released — old_window_size > 0 distinguishes this from the natural drain
* case where _is_suspended() was true only because the counter hit zero);
*
* - enough slots have accumulated past the batching threshold since the
* window last ran out. Threshold is clamped to a quarter of
* initial_window_size so sources with small windows still wake.
*/

gboolean threshold_crossed = window_size_increment != 0 &&
old_window_size <= self->wakeup_threshold &&
new_window_size > self->wakeup_threshold;
gboolean explicit_suspend_lifted = need_to_resume_counter && old_window_size > 0;
if (threshold_crossed || explicit_suspend_lifted)
Comment thread
bazsi marked this conversation as resolved.
log_source_wakeup(self);
}

Expand Down Expand Up @@ -656,11 +678,12 @@ log_source_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options
static void
_initialize_window(LogSource *self, gint init_window_size)
{
self->window_initialized = TRUE;
window_size_counter_set(&self->window_size, init_window_size);

self->initial_window_size = init_window_size;
self->full_window_size = init_window_size;
self->wakeup_threshold = MIN((gsize) LOG_SOURCE_WAKEUP_BATCH_SIZE, init_window_size / 4);

self->window_initialized = TRUE;
}

static gboolean
Expand Down
1 change: 1 addition & 0 deletions lib/logsource.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct _LogSource
gchar *stats_id;
DynamicWindow dynamic_window;
gsize initial_window_size;
gsize wakeup_threshold;
/* full_window_size = static + dynamic */
gsize full_window_size;
atomic_gssize window_size_to_be_reclaimed;
Expand Down
45 changes: 41 additions & 4 deletions lib/tests/test_logsource.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,28 +309,65 @@ Test(log_source, test_suspend)

Test(log_source, test_wakeup)
{
source_options.init_window_size = 3;
/* Wakeup is batched: source wakes only once at least LOG_SOURCE_WAKEUP_BATCH_SIZE
* or init_window_size/4 slots have freed up since the window ran out. */
source_options.init_window_size = 20;

LogSource *source = test_source_init(&source_options);
TestPipe *next_pipe = test_pipe_init();
log_pipe_append(&source->super, &next_pipe->super);

_post_messages(source, 3);
_post_messages(source, 20);
cr_expect_not(log_source_free_to_send(source));

test_pipe_ack_messages(next_pipe, 1);
cr_assert_eq(((TestSource *) source)->wakeup_count, 0);
cr_expect(log_source_free_to_send(source));

test_pipe_ack_messages(next_pipe, 1);
cr_assert_eq(((TestSource *) source)->wakeup_count, 0);
cr_expect(log_source_free_to_send(source));

/* crossing the wakeup threshold, which is 20/4 == 5 in our case */
test_pipe_ack_messages(next_pipe, 4);
cr_assert_eq(((TestSource *) source)->wakeup_count, 1);
cr_expect(log_source_free_to_send(source));

_post_messages(source, 1);
test_pipe_ack_messages(next_pipe, 14);
cr_assert_eq(((TestSource *) source)->wakeup_count, 1);
cr_expect(log_source_free_to_send(source));

_post_messages(source, 20);
cr_expect_not(log_source_free_to_send(source));

test_pipe_ack_messages(next_pipe, 3);
test_pipe_ack_messages(next_pipe, 20);
cr_assert_eq(((TestSource *) source)->wakeup_count, 2);

test_pipe_destroy(next_pipe);
test_source_destroy(source);
}

Test(log_source, test_wakeup_small_window)
{
source_options.init_window_size = 3;

LogSource *source = test_source_init(&source_options);
TestPipe *next_pipe = test_pipe_init();
log_pipe_append(&source->super, &next_pipe->super);

_post_messages(source, 3);
cr_expect_not(log_source_free_to_send(source));

test_pipe_ack_messages(next_pipe, 2);
cr_assert_eq(((TestSource *) source)->wakeup_count, 1);

test_pipe_ack_messages(next_pipe, 1);
cr_assert_eq(((TestSource *) source)->wakeup_count, 1);

test_pipe_destroy(next_pipe);
test_source_destroy(source);
}

Test(log_source, test_forced_suspend_and_wakeup)
{
LogSource *source = test_source_init(&source_options);
Expand Down
Loading