Skip to content

Commit 13d0997

Browse files
authored
Merge pull request syslog-ng#4943 from bazsi/filterx-refactor-eval-context-management
Filterx refactor eval context management
2 parents 0f51c01 + 831316e commit 13d0997

16 files changed

+169
-80
lines changed

lib/driver.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ static void
142142
log_driver_init_instance(LogDriver *self, GlobalConfig *cfg)
143143
{
144144
log_pipe_init_instance(&self->super, cfg);
145-
self->super.flags |= PIF_CONFIG_RELATED;
145+
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX;
146146
self->super.free_fn = log_driver_free;
147147
self->super.pre_init = log_driver_pre_init_method;
148148
self->super.init = log_driver_init_method;

lib/filter/filter-pipe.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ log_filter_pipe_new(FilterExprNode *expr, GlobalConfig *cfg)
122122
LogFilterPipe *self = g_new0(LogFilterPipe, 1);
123123

124124
log_pipe_init_instance(&self->super, cfg);
125-
self->super.flags |= PIF_CONFIG_RELATED;
125+
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX;
126126
self->super.init = log_filter_pipe_init;
127127
self->super.queue = log_filter_pipe_queue;
128128
self->super.free_fn = log_filter_pipe_free;

lib/filterx/filterx-eval.c

+65-11
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
*/
2323
#include "filterx/filterx-eval.h"
2424
#include "filterx/filterx-expr.h"
25+
#include "logpipe.h"
2526
#include "scratch-buffers.h"
2627
#include "tls-support.h"
2728

@@ -112,6 +113,33 @@ filterx_format_last_error(void)
112113
buf ? buf->str : "");
113114
}
114115

116+
117+
/*
118+
* This is not a real weakref implementation as we will never get rid off
119+
* weak references until the very end of a scope. If this wasn't the case
120+
* we would have to:
121+
* 1) run a proper GC
122+
* 2) notify weak references once the object is detroyed
123+
*
124+
* None of that exists now and I doubt ever will (but never say never).
125+
* Right now a weak ref is destroyed as a part of the scope finalization
126+
* process at which point circular references will be broken so the rest can
127+
* go too.
128+
*/
129+
void
130+
filterx_eval_store_weak_ref(FilterXObject *object)
131+
{
132+
FilterXEvalContext *context = filterx_eval_get_context();
133+
134+
if (object && !object->weak_referenced)
135+
{
136+
/* avoid putting object to the list multiple times */
137+
object->weak_referenced = TRUE;
138+
g_ptr_array_add(context->weak_refs, filterx_object_ref(object));
139+
}
140+
}
141+
142+
115143
static gboolean
116144
_evaluate_statement(FilterXExpr *expr)
117145
{
@@ -157,16 +185,10 @@ _evaluate_statement(FilterXExpr *expr)
157185

158186

159187
gboolean
160-
filterx_eval_exec_statements(FilterXScope *scope, GList *statements, LogMessage *msg)
188+
filterx_eval_exec_statements(FilterXEvalContext *context, GList *statements, LogMessage *msg)
161189
{
162-
FilterXEvalContext local_context =
163-
{
164-
.msgs = &msg,
165-
.num_msg = 1,
166-
.template_eval_options = &DEFAULT_TEMPLATE_EVAL_OPTIONS,
167-
.scope = scope,
168-
};
169-
filterx_eval_set_context(&local_context);
190+
context->msgs = &msg;
191+
context->num_msg = 1;
170192
gboolean success = FALSE;
171193
for (GList *l = statements; l; l = l->next)
172194
{
@@ -179,7 +201,39 @@ filterx_eval_exec_statements(FilterXScope *scope, GList *statements, LogMessage
179201
/* NOTE: we only store the results into the message if the entire evaluation was successful */
180202
success = TRUE;
181203
fail:
182-
filterx_scope_set_dirty(scope);
183-
filterx_eval_set_context(NULL);
204+
filterx_scope_set_dirty(context->scope);
184205
return success;
185206
}
207+
208+
void
209+
filterx_eval_init_context(FilterXEvalContext *context, FilterXEvalContext *previous_context)
210+
{
211+
FilterXScope *scope;
212+
213+
if (previous_context)
214+
scope = filterx_scope_ref(previous_context->scope);
215+
else
216+
scope = filterx_scope_new();
217+
filterx_scope_make_writable(&scope);
218+
219+
memset(context, 0, sizeof(*context));
220+
context->template_eval_options = &DEFAULT_TEMPLATE_EVAL_OPTIONS;
221+
context->scope = scope;
222+
223+
if (previous_context)
224+
context->weak_refs = previous_context->weak_refs;
225+
else
226+
context->weak_refs = g_ptr_array_new_with_free_func((GDestroyNotify) filterx_object_unref);
227+
context->previous_context = previous_context;
228+
229+
filterx_eval_set_context(context);
230+
}
231+
232+
void
233+
filterx_eval_deinit_context(FilterXEvalContext *context)
234+
{
235+
if (!context->previous_context)
236+
g_ptr_array_free(context->weak_refs, TRUE);
237+
filterx_scope_unref(context->scope);
238+
filterx_eval_set_context(context->previous_context);
239+
}

lib/filterx/filterx-eval.h

+29-1
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,44 @@ struct _FilterXEvalContext
4242
FilterXScope *scope;
4343
FilterXError error;
4444
LogTemplateEvalOptions *template_eval_options;
45+
GPtrArray *weak_refs;
46+
FilterXEvalContext *previous_context;
4547
};
4648

4749
FilterXEvalContext *filterx_eval_get_context(void);
4850
FilterXScope *filterx_eval_get_scope(void);
4951
void filterx_eval_push_error(const gchar *message, FilterXExpr *expr, FilterXObject *object);
5052
void filterx_eval_set_context(FilterXEvalContext *context);
51-
gboolean filterx_eval_exec_statements(FilterXScope *scope, GList *statements, LogMessage *msg);
53+
gboolean filterx_eval_exec_statements(FilterXEvalContext *context, GList *statements, LogMessage *msg);
5254
void filterx_eval_sync_scope_and_message(FilterXScope *scope, LogMessage *msg);
5355
const gchar *filterx_eval_get_last_error(void);
5456
void filterx_eval_clear_errors(void);
5557

58+
void filterx_eval_store_weak_ref(FilterXObject *object);
59+
60+
void filterx_eval_init_context(FilterXEvalContext *context, FilterXEvalContext *previous_context);
61+
void filterx_eval_deinit_context(FilterXEvalContext *context);
62+
63+
static inline void
64+
filterx_eval_sync_message(FilterXEvalContext *context, LogMessage **pmsg, const LogPathOptions *path_options)
65+
{
66+
if (!context)
67+
return;
68+
69+
if (!filterx_scope_is_dirty(context->scope))
70+
return;
71+
72+
log_msg_make_writable(pmsg, path_options);
73+
filterx_scope_sync(context->scope, *pmsg);
74+
}
75+
76+
static inline void
77+
filterx_eval_prepare_for_fork(FilterXEvalContext *context, LogMessage **pmsg, const LogPathOptions *path_options)
78+
{
79+
filterx_eval_sync_message(context, pmsg, path_options);
80+
if (context)
81+
filterx_scope_write_protect(context->scope);
82+
log_msg_write_protect(*pmsg);
83+
}
5684

5785
#endif

lib/filterx/filterx-object.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ struct _FilterXObject
8787
* propagates to the inner elements lazily
8888
*
8989
*/
90-
guint thread_index:16, modified_in_place:1, readonly:1;
90+
guint thread_index:16, modified_in_place:1, readonly:1, weak_referenced:1;
9191
FilterXType *type;
9292
};
9393

lib/filterx/filterx-pipe.c

+7-13
Original file line numberDiff line numberDiff line change
@@ -49,35 +49,29 @@ static void
4949
log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
5050
{
5151
LogFilterXPipe *self = (LogFilterXPipe *) s;
52+
FilterXEvalContext eval_context;
5253
LogPathOptions local_path_options;
5354
gboolean res;
5455

5556
path_options = log_path_options_chain(&local_path_options, path_options);
56-
57-
FilterXScope *scope = filterx_scope_ref(path_options->filterx_scope);
58-
if (!scope)
59-
scope = filterx_scope_new();
60-
filterx_scope_make_writable(&scope);
57+
filterx_eval_init_context(&eval_context, path_options->filterx_context);
6158

6259
msg_trace(">>>>>> filterx rule evaluation begin",
6360
evt_tag_str("rule", self->name),
6461
log_pipe_location_tag(s),
65-
evt_tag_printf("path_scope", "%p", path_options->filterx_scope),
66-
evt_tag_printf("scope", "%p", scope),
6762
evt_tag_msg_reference(msg));
6863

6964
NVTable *payload = nv_table_ref(msg->payload);
70-
res = filterx_eval_exec_statements(scope, self->stmts, msg);
65+
res = filterx_eval_exec_statements(&eval_context, self->stmts, msg);
7166

7267
msg_trace("<<<<<< filterx rule evaluation result",
7368
evt_tag_str("result", res ? "matched" : "unmatched"),
7469
evt_tag_str("rule", self->name),
7570
log_pipe_location_tag(s),
76-
evt_tag_printf("scope", "%p", scope),
77-
evt_tag_int("dirty", filterx_scope_is_dirty(scope)),
71+
evt_tag_int("dirty", filterx_scope_is_dirty(eval_context.scope)),
7872
evt_tag_msg_reference(msg));
7973

80-
local_path_options.filterx_scope = scope;
74+
local_path_options.filterx_context = &eval_context;
8175
if (res)
8276
{
8377
log_pipe_forward_msg(s, msg, path_options);
@@ -89,7 +83,7 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o
8983
log_msg_drop(msg, path_options, AT_PROCESSED);
9084
}
9185

92-
filterx_scope_unref(scope);
86+
filterx_eval_deinit_context(&eval_context);
9387
nv_table_unref(payload);
9488
}
9589

@@ -120,7 +114,7 @@ log_filterx_pipe_new(GList *stmts, GlobalConfig *cfg)
120114
LogFilterXPipe *self = g_new0(LogFilterXPipe, 1);
121115

122116
log_pipe_init_instance(&self->super, cfg);
123-
self->super.flags = (self->super.flags | PIF_CONFIG_RELATED) & ~PIF_SYNC_SCOPE;
117+
self->super.flags = (self->super.flags | PIF_CONFIG_RELATED);
124118
self->super.init = log_filterx_pipe_init;
125119
self->super.queue = log_filterx_pipe_queue;
126120
self->super.free_fn = log_filterx_pipe_free;

lib/filterx/filterx-scope.c

-13
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ struct _FilterXScope
102102
{
103103
GAtomicCounter ref_cnt;
104104
GArray *variables;
105-
GPtrArray *weak_refs;
106105
gboolean write_protected;
107106
gboolean dirty;
108107
gint generation;
@@ -215,16 +214,6 @@ filterx_scope_register_variable(FilterXScope *self,
215214
return &g_array_index(self->variables, FilterXVariable, v_index);
216215
}
217216

218-
219-
void
220-
filterx_scope_store_weak_ref(FilterXScope *self, FilterXObject *object)
221-
{
222-
g_assert(self->write_protected == FALSE);
223-
224-
if (object)
225-
g_ptr_array_add(self->weak_refs, filterx_object_ref(object));
226-
}
227-
228217
/*
229218
* 1) sync objects to message
230219
* 2) drop undeclared objects
@@ -307,7 +296,6 @@ filterx_scope_new(void)
307296
g_atomic_counter_set(&self->ref_cnt, 1);
308297
self->variables = g_array_sized_new(FALSE, TRUE, sizeof(FilterXVariable), 16);
309298
g_array_set_clear_func(self->variables, (GDestroyNotify) _variable_free);
310-
self->weak_refs = g_ptr_array_new_with_free_func((GDestroyNotify) filterx_object_unref);
311299
return self;
312300
}
313301

@@ -371,7 +359,6 @@ static void
371359
_free(FilterXScope *self)
372360
{
373361
g_array_free(self->variables, TRUE);
374-
g_ptr_array_free(self->weak_refs, TRUE);
375362
g_free(self);
376363
}
377364

lib/filterx/filterx-scope.h

-3
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ FilterXVariable *filterx_scope_register_variable(FilterXScope *self,
6666
FilterXVariableHandle handle,
6767
FilterXObject *initial_value);
6868

69-
void filterx_scope_store_weak_ref(FilterXScope *self, FilterXObject *object);
70-
71-
7269
/* copy on write */
7370
void filterx_scope_write_protect(FilterXScope *self);
7471
FilterXScope *filterx_scope_make_writable(FilterXScope **pself);

lib/filterx/filterx-weakrefs.c

+1-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@
5454
void
5555
filterx_weakref_set(FilterXWeakRef *self, FilterXObject *object)
5656
{
57-
FilterXScope *scope = filterx_eval_get_scope();
58-
if (scope)
59-
filterx_scope_store_weak_ref(scope, object);
57+
filterx_eval_store_weak_ref(object);
6058
self->object = object;
6159
}
6260

lib/filterx/object-json.c

+1-3
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,7 @@ filterx_json_convert_json_to_object_cached(FilterXObject *self, FilterXWeakRef *
131131
void
132132
filterx_json_associate_cached_object(struct json_object *jso, FilterXObject *filterx_obj)
133133
{
134-
FilterXScope *scope = filterx_eval_get_scope();
135-
136-
filterx_scope_store_weak_ref(scope, filterx_obj);
134+
filterx_eval_store_weak_ref(filterx_obj);
137135

138136
/* we are not storing a reference in userdata to avoid circular
139137
* references. That ref is retained by the filterx_scope_store_weak_ref()

lib/logmpx.c

+6-8
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,12 @@ log_multiplexer_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op
8888
log_path_options_push_junction(&local_path_options, &matched, path_options);
8989
if (_has_multiple_arcs(self))
9090
{
91-
if (path_options->filterx_scope)
92-
{
93-
log_msg_make_writable(&msg, path_options);
94-
filterx_scope_sync(path_options->filterx_scope, msg);
95-
filterx_scope_write_protect(path_options->filterx_scope);
96-
}
97-
log_msg_write_protect(msg);
91+
/* if we are delivering to multiple branches, we need to sync the
92+
* filterx state with our message and also need to make everything
93+
* write protected so that changes in those branches don't overwrite
94+
* data we still need */
95+
96+
filterx_eval_prepare_for_fork(path_options->filterx_context, &msg, path_options);
9897
}
9998
for (fallback = 0; (fallback == 0) || (fallback == 1 && self->fallback_exists && !delivered); fallback++)
10099
{
@@ -220,7 +219,6 @@ log_multiplexer_new(GlobalConfig *cfg)
220219
LogMultiplexer *self = g_new0(LogMultiplexer, 1);
221220

222221
log_pipe_init_instance(&self->super, cfg);
223-
self->super.flags = self->super.flags & ~PIF_SYNC_SCOPE;
224222
self->super.init = log_multiplexer_init;
225223
self->super.deinit = log_multiplexer_deinit;
226224
self->super.queue = log_multiplexer_queue;

lib/logpipe.c

-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ log_pipe_init_instance(LogPipe *self, GlobalConfig *cfg)
8282
self->queue = NULL;
8383
self->free_fn = log_pipe_free_method;
8484
self->arcs = _arcs;
85-
self->flags = PIF_SYNC_SCOPE;
8685
}
8786

8887
LogPipe *

lib/logpipe.h

+6-10
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
#include "syslog-ng.h"
2929
#include "logmsg/logmsg.h"
30-
#include "filterx/filterx-scope.h"
30+
#include "filterx/filterx-eval.h"
3131
#include "cfg.h"
3232
#include "atomic.h"
3333
#include "messages.h"
@@ -72,7 +72,8 @@
7272
/* node created directly by the user */
7373
#define PIF_CONFIG_RELATED 0x0100
7474

75-
#define PIF_SYNC_SCOPE 0x0200
75+
/* sync filterx state and message in right before calling queue() */
76+
#define PIF_SYNC_FILTERX 0x0200
7677

7778
/* private flags range, to be used by other LogPipe instances for their own purposes */
7879

@@ -218,7 +219,7 @@ struct _LogPathOptions
218219

219220
gboolean *matched;
220221
const LogPathOptions *lpo_parent_junction;
221-
FilterXScope *filterx_scope;
222+
FilterXEvalContext *filterx_context;
222223
};
223224

224225
#define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL }
@@ -458,13 +459,8 @@ log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
458459
}
459460
}
460461

461-
if ((s->flags & PIF_SYNC_SCOPE) &&
462-
path_options->filterx_scope &&
463-
filterx_scope_is_dirty(path_options->filterx_scope))
464-
{
465-
log_msg_make_writable(&msg, path_options);
466-
filterx_scope_sync(path_options->filterx_scope, msg);
467-
}
462+
if ((s->flags & PIF_SYNC_FILTERX))
463+
filterx_eval_sync_message(path_options->filterx_context, &msg, path_options);
468464

469465
if (G_UNLIKELY(s->flags & (PIF_HARD_FLOW_CONTROL | PIF_JUNCTION_END | PIF_CONDITIONAL_MIDPOINT)))
470466
{

lib/parser/parser-expr.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ void
182182
log_parser_init_instance(LogParser *self, GlobalConfig *cfg)
183183
{
184184
log_pipe_init_instance(&self->super, cfg);
185-
self->super.flags |= PIF_CONFIG_RELATED;
185+
self->super.flags |= PIF_CONFIG_RELATED + PIF_SYNC_FILTERX;
186186
self->super.init = log_parser_init_method;
187187
self->super.deinit = log_parser_deinit_method;
188188
self->super.free_fn = log_parser_free_method;

0 commit comments

Comments
 (0)