Skip to content

Commit 3af0e4e

Browse files
authored
Merge pull request syslog-ng#4849 from bazsi/fix-proto-0-for-framed-syslog
logproto: save aux for buffered data
2 parents 10b021e + f22b2e4 commit 3af0e4e

File tree

1 file changed

+29
-25
lines changed

1 file changed

+29
-25
lines changed

lib/logproto/logproto-framed-server.c

+29-25
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ typedef struct _LogProtoFramedServer
5959
guint32 frame_len;
6060
gboolean half_message_in_buffer;
6161
guint32 fetch_counter;
62+
63+
/* auxiliary data (e.g. GSockAddr, other transport related meta
64+
* data) associated with the already buffered data */
65+
LogTransportAuxData buffer_aux;
6266
} LogProtoFramedServer;
6367

6468
static LogProtoPrepareAction
@@ -90,7 +94,7 @@ log_proto_framed_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *tim
9094
*/
9195
static gboolean
9296
log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read,
93-
LogTransportAuxData *aux, LogProtoStatus *status)
97+
LogProtoStatus *status)
9498
{
9599
gint rc;
96100
*status = LPS_SUCCESS;
@@ -104,8 +108,9 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
104108
if (self->fetch_counter++ >= MAX_FETCH_COUNT)
105109
return FALSE;
106110

111+
log_transport_aux_data_reinit(&self->buffer_aux);
107112
rc = log_transport_read(self->super.transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end,
108-
aux);
113+
&self->buffer_aux);
109114

110115
if (rc < 0)
111116
{
@@ -114,7 +119,6 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
114119
msg_error("Error reading RFC6587 style framed data",
115120
evt_tag_int("fd", self->super.transport->fd),
116121
evt_tag_error("error"));
117-
log_transport_aux_data_reinit(aux);
118122
*status = LPS_ERROR;
119123
}
120124
else
@@ -129,7 +133,6 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
129133
{
130134
msg_trace("EOF occurred while reading",
131135
evt_tag_int(EVT_TAG_FD, self->super.transport->fd));
132-
log_transport_aux_data_reinit(aux);
133136
*status = LPS_EOF;
134137
return FALSE;
135138
}
@@ -190,8 +193,7 @@ _is_trimmed_part_completely_fetched(LogProtoFramedServer *self)
190193
/* Returns TRUE if successfully finished consuming the data. Otherwise it is not finished, but
191194
* there is nothing left to read (or there was a read error) and expects to be called again. */
192195
static gboolean
193-
_consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read,
194-
LogTransportAuxData *aux, LogProtoStatus *status)
196+
_consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
195197
{
196198
/* Since trimming requires a full (buffer sized) message, the consuming
197199
* always starts at the beginning of the buffer, with a new read. */
@@ -200,7 +202,7 @@ _consume_trimmed_part(LogProtoFramedServer *self, gboolean *may_read,
200202

201203
while (1)
202204
{
203-
if (!log_proto_framed_server_fetch_data(self, may_read, aux, status))
205+
if (!log_proto_framed_server_fetch_data(self, may_read, status))
204206
return FALSE;
205207

206208
if (_is_trimmed_part_completely_fetched(self))
@@ -225,24 +227,23 @@ _ensure_buffer(LogProtoFramedServer *self)
225227
}
226228

227229
static LogProtoFramedServerStateControl
228-
_on_frame_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status)
230+
_on_frame_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
229231
{
230-
if (!log_proto_framed_server_fetch_data(self, may_read, aux, status))
232+
if (!log_proto_framed_server_fetch_data(self, may_read, status))
231233
return LPFSSCTRL_RETURN_WITH_STATUS;
232234

233235
self->state = LPFSS_FRAME_EXTRACT;
234236
return LPFSSCTRL_NEXT_STATE;
235237
}
236238

237239
static LogProtoFramedServerStateControl
238-
_on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProtoStatus *status)
240+
_on_frame_extract(LogProtoFramedServer *self, LogProtoStatus *status)
239241
{
240242
gboolean need_more_data = FALSE;
241243

242244
if (!log_proto_framed_server_extract_frame_length(self, &need_more_data))
243245
{
244246
/* invalid frame header */
245-
log_transport_aux_data_reinit(aux);
246247
*status = LPS_ERROR;
247248
return LPFSSCTRL_RETURN_WITH_STATUS;
248249
}
@@ -270,7 +271,6 @@ _on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProto
270271
msg_error("Incoming frame larger than log_msg_size()",
271272
evt_tag_int("log_msg_size", self->super.options->max_msg_size),
272273
evt_tag_int("frame_length", self->frame_len));
273-
log_transport_aux_data_reinit(aux);
274274
*status = LPS_ERROR;
275275
return LPFSSCTRL_RETURN_WITH_STATUS;
276276
}
@@ -281,9 +281,9 @@ _on_frame_extract(LogProtoFramedServer *self, LogTransportAuxData *aux, LogProto
281281
}
282282

283283
static LogProtoFramedServerStateControl
284-
_on_trim_message_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status)
284+
_on_trim_message_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
285285
{
286-
if (!log_proto_framed_server_fetch_data(self, may_read, aux, status))
286+
if (!log_proto_framed_server_fetch_data(self, may_read, status))
287287
return LPFSSCTRL_RETURN_WITH_STATUS;
288288
self->state = LPFSS_TRIM_MESSAGE;
289289

@@ -312,9 +312,9 @@ _on_trim_message(LogProtoFramedServer *self, const guchar **msg, gsize *msg_len,
312312
}
313313

314314
static LogProtoFramedServerStateControl
315-
_on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status)
315+
_on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
316316
{
317-
if (_consume_trimmed_part(self, may_read, aux, status))
317+
if (_consume_trimmed_part(self, may_read, status))
318318
{
319319
self->state = LPFSS_FRAME_EXTRACT;
320320
/* If there is data in the buffer, try to process it immediately.
@@ -330,9 +330,9 @@ _on_consume_trimmed(LogProtoFramedServer *self, gboolean *may_read, LogTransport
330330
}
331331

332332
static LogProtoFramedServerStateControl
333-
_on_message_read(LogProtoFramedServer *self, gboolean *may_read, LogTransportAuxData *aux, LogProtoStatus *status)
333+
_on_message_read(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status)
334334
{
335-
if (!log_proto_framed_server_fetch_data(self, may_read, aux, status))
335+
if (!log_proto_framed_server_fetch_data(self, may_read, status))
336336
return LPFSSCTRL_RETURN_WITH_STATUS;
337337

338338
self->state = LPFSS_MESSAGE_EXTRACT;
@@ -364,27 +364,27 @@ _on_message_extract(LogProtoFramedServer *self, const guchar **msg, gsize *msg_l
364364

365365
static LogProtoFramedServerStateControl
366366
_step_state_machine(LogProtoFramedServer *self, const guchar **msg, gsize *msg_len, gboolean *may_read,
367-
LogTransportAuxData *aux, LogProtoStatus *status)
367+
LogProtoStatus *status)
368368
{
369369
switch (self->state)
370370
{
371371
case LPFSS_FRAME_READ:
372-
return _on_frame_read(self, may_read, aux, status);
372+
return _on_frame_read(self, may_read, status);
373373

374374
case LPFSS_FRAME_EXTRACT:
375-
return _on_frame_extract(self, aux, status);
375+
return _on_frame_extract(self, status);
376376

377377
case LPFSS_TRIM_MESSAGE_READ:
378-
return _on_trim_message_read(self, may_read, aux, status);
378+
return _on_trim_message_read(self, may_read, status);
379379

380380
case LPFSS_TRIM_MESSAGE:
381381
return _on_trim_message(self, msg, msg_len, status);
382382

383383
case LPFSS_CONSUME_TRIMMED:
384-
return _on_consume_trimmed(self, may_read, aux, status);
384+
return _on_consume_trimmed(self, may_read, status);
385385

386386
case LPFSS_MESSAGE_READ:
387-
return _on_message_read(self, may_read, aux, status);
387+
return _on_message_read(self, may_read, status);
388388

389389
case LPFSS_MESSAGE_EXTRACT:
390390
return _on_message_extract(self, msg, msg_len, status);
@@ -404,8 +404,11 @@ log_proto_framed_server_fetch(LogProtoServer *s, const guchar **msg, gsize *msg_
404404
_ensure_buffer(self);
405405

406406
self->fetch_counter = 0;
407-
while (_step_state_machine(self, msg, msg_len, may_read, aux, &status) != LPFSSCTRL_RETURN_WITH_STATUS) ;
407+
while (_step_state_machine(self, msg, msg_len, may_read, &status) != LPFSSCTRL_RETURN_WITH_STATUS)
408+
;
408409

410+
if (status == LPS_SUCCESS && aux)
411+
log_transport_aux_data_copy(aux, &self->buffer_aux);
409412
return status;
410413
}
411414

@@ -415,6 +418,7 @@ log_proto_framed_server_free(LogProtoServer *s)
415418
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
416419
g_free(self->buffer);
417420

421+
log_transport_aux_data_destroy(&self->buffer_aux);
418422
log_proto_server_free_method(s);
419423
}
420424

0 commit comments

Comments
 (0)