Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions modules/http-adapters/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ set(HTTP_ADAPTERS_SOURCES
http-adapters-plugin.c
splunk-adapter.c
splunk-adapter.h
openobserve-adapter.c
openobserve-adapter.h
)

add_module(
Expand Down
2 changes: 2 additions & 0 deletions modules/http-adapters/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ modules_http_adapters_libhttp_adapters_la_SOURCES = \
modules/http-adapters/http-adapter.h \
modules/http-adapters/splunk-adapter.c \
modules/http-adapters/splunk-adapter.h \
modules/http-adapters/openobserve-adapter.c \
modules/http-adapters/openobserve-adapter.h \
modules/http-adapters/http-adapters-grammar.y \
modules/http-adapters/http-adapters-plugin.c \
modules/http-adapters/http-adapters-parser.h \
Expand Down
56 changes: 56 additions & 0 deletions modules/http-adapters/http-adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,55 @@
*/

#include "http-adapter.h"
#include "splunk-adapter.h"
#include "openobserve-adapter.h"
#include "compat/string.h"

#define HTTP_ADAPTER_PLUGIN "http-adapter"

json_object *
http_adapter_parse_response_json(GString *response)
{
struct json_object *jso;
struct json_tokener *tok;

tok = json_tokener_new();
jso = json_tokener_parse_ex(tok, response->str, response->len);
if (tok->err != json_tokener_success || !jso)
{
msg_error("http-response-adapter(): failed to parse JSON response",
evt_tag_str("input", response->str),
tok->err != json_tokener_success ? evt_tag_str("json_error", json_tokener_error_desc(tok->err)) : NULL);
json_tokener_free(tok);
return NULL;
}
json_tokener_free(tok);
return jso;
}

void
http_adapter_locate_offending_payload(HttpResponseSignalData *data)
{
const gchar *line = data->request_body->str;

for (gint i = 0; i < data->offending_message; i++)
{
const gchar *nl = strchr(line, '\n');
if (!nl)
goto notfound;
line = nl + 1;
}
data->offending_request_len = strchrnul(line, '\n') - line;
if (data->offending_request_len != 0)
{
data->offending_request_start = line - data->request_body->str;
return;
}

notfound:
data->offending_request_start = data->offending_request_len = 0;
}

static void
_on_http_response_received(HttpAdapter *self, HttpResponseSignalData *data)
{
Expand Down Expand Up @@ -57,3 +103,13 @@ http_adapter_init_instance(HttpAdapter *self)
self->super.attach = _attach;
self->super.detach = _detach;
}

HttpAdapter *
http_adapter_new_by_name(const gchar *name)
{
if (strcmp(name, "splunk") == 0)
return splunk_adapter_new();
if (strcmp(name, "openobserve") == 0)
return openobserve_adapter_new();
return NULL;
}
5 changes: 5 additions & 0 deletions modules/http-adapters/http-adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "driver.h"
#include "modules/http/http-signals.h"
#include <json.h>

typedef struct _HttpAdapter HttpAdapter;

Expand All @@ -47,5 +48,9 @@ http_adapter_free(HttpAdapter *self)
}

void http_adapter_init_instance(HttpAdapter *self);
HttpAdapter *http_adapter_new_by_name(const gchar *name);

json_object *http_adapter_parse_response_json(GString *response);
void http_adapter_locate_offending_payload(HttpResponseSignalData *data);

#endif
8 changes: 4 additions & 4 deletions modules/http-adapters/http-adapters-grammar.ym
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/

%code top {
#include "splunk-adapter.h"
#include "http-adapter.h"

}

Expand Down Expand Up @@ -73,9 +73,9 @@ driver
http_response_adapter
: KW_RESPONSE_ADAPTER '(' string ')'
{
CHECK_ERROR(strcmp($3, "splunk") == 0, @3, "Unknown response adapter \"%s\"", $3);
$$ = splunk_adapter_new();
free($3);
$$ = http_adapter_new_by_name($3);
CHECK_ERROR($$ != NULL, @3, "Unknown response adapter \"%s\"", $3);
free($3);
}
;

Expand Down
120 changes: 120 additions & 0 deletions modules/http-adapters/openobserve-adapter.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2026 Balazs Scheidler <balazs.scheidler@axoflow.com>
*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/
#include "openobserve-adapter.h"
#include <json.h>

/* OpenObserve returns HTTP 200 for partial batch failures, with a JSON body of the form:
* {"code":200,"status":[{"name":"<stream>","successful":N,"failed":M,"error":"<reason>"},...]}
*
* The adapter detects failures in the status array and identifies the first failing message
* by assuming messages are processed in order: the first failure is at index total_successful.
*/
static void
_adapt_openobserve_response(HttpAdapter *self, HttpResponseSignalData *data)
{
if (data->http_code != 200)
return;

/* performance optimization:
* - unfortunately the status code does not help us narrow down to cases
* where our post generated a failure
*
* - I used heuristics to decide if we need to properly parse the response
*
* - the first "},{" filters out cases where we have multiple items in the
* "status" member, in that case we have to parse
*
* - if we have a single element in "status", then we check that the
* number of failed entries is zero
*/

if (strstr(data->response_body->str, "},{") == NULL &&
strstr(data->response_body->str, ",\"failed\":0") != NULL)
return;

data->result = HTTP_SLOT_CRITICAL_ERROR;

struct json_object *jso = http_adapter_parse_response_json(data->response_body);
if (!jso)
goto exit;

struct json_object *status_jso = NULL;
if (!json_object_object_get_ex(jso, "status", &status_jso))
goto exit;

if (!json_object_is_type(status_jso, json_type_array))
goto exit;

gint total_successful = 0;
gint total_failed = 0;
const gchar *first_error = NULL;

gint num_entries = json_object_array_length(status_jso);
for (gint i = 0; i < num_entries; i++)
{
struct json_object *entry = json_object_array_get_idx(status_jso, i);
struct json_object *successful_jso = NULL;
struct json_object *failed_jso = NULL;
struct json_object *error_jso = NULL;

json_object_object_get_ex(entry, "successful", &successful_jso);
json_object_object_get_ex(entry, "failed", &failed_jso);
json_object_object_get_ex(entry, "error", &error_jso);

if (successful_jso)
total_successful += json_object_get_int(successful_jso);
if (failed_jso)
total_failed += json_object_get_int(failed_jso);
if (error_jso && !first_error)
first_error = json_object_get_string(error_jso);
}

if (total_failed > 0)
{
msg_notice("openobserve: partial batch failure reported by server",
evt_tag_int("successful", total_successful),
evt_tag_int("failed", total_failed),
first_error ? evt_tag_str("error", first_error) : NULL);
data->offending_message = (guint) total_successful;
if (data->offending_message >= data->batch_size)
data->offending_message = 0;
else
http_adapter_locate_offending_payload(data);
}
else
{
data->result = HTTP_SLOT_SUCCESS;
}

exit:
json_object_put(jso);
}


HttpAdapter *
openobserve_adapter_new(void)
{
HttpAdapter *self = g_new0(HttpAdapter, 1);
http_adapter_init_instance(self);
self->adapt_response = _adapt_openobserve_response;
return self;
}
30 changes: 30 additions & 0 deletions modules/http-adapters/openobserve-adapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2026 Balazs Scheidler <balazs.scheidler@axoflow.com>
*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#ifndef OPENOBSERVE_ADAPTER_H_INCLUDED
#define OPENOBSERVE_ADAPTER_H_INCLUDED 1

#include "http-adapter.h"

HttpAdapter *openobserve_adapter_new(void);

#endif
52 changes: 4 additions & 48 deletions modules/http-adapters/splunk-adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,15 @@
*
*/
#include "http-adapter.h"
#include "compat/string.h"
#include <json.h>

static json_object *
_parse_response_json(GString *response)
{
struct json_object *jso;
struct json_tokener *tok;

tok = json_tokener_new();
jso = json_tokener_parse_ex(tok, response->str, response->len);
if (tok->err != json_tokener_success || !jso)
{
msg_error("http-response-adapter(): failed to parse JSON response",
evt_tag_str("input", response->str),
tok->err != json_tokener_success ? evt_tag_str ("json_error", json_tokener_error_desc(tok->err)) : NULL);
json_tokener_free (tok);
return NULL;
}
json_tokener_free(tok);
return jso;
}

static void
_locate_offending_payload(HttpResponseSignalData *data)
{
const gchar *line = data->request_body->str;

for (gint i = 0; i < data->offending_message; i++)
{
const gchar *nl = strchr(line, '\n');
if (!nl)
goto notfound;
line = nl + 1;
}
data->offending_request_len = strchrnul(line, '\n') - line;
if (data->offending_request_len != 0)
{
data->offending_request_start = line - data->request_body->str;
return;
}

notfound:

data->offending_request_start = data->offending_request_len = 0;
return;
}

static void
_adapt_splunk_response(HttpAdapter *self, HttpResponseSignalData *data)
{
if (data->http_code >= 400)
{
struct json_object *jso = _parse_response_json(data->response_body);
data->result = HTTP_SLOT_CRITICAL_ERROR;
struct json_object *jso = http_adapter_parse_response_json(data->response_body);
if (jso)
{
struct json_object *text_jso = NULL;
Expand All @@ -85,7 +40,8 @@ _adapt_splunk_response(HttpAdapter *self, HttpResponseSignalData *data)
data->offending_message = json_object_get_int(event_num_jso);
if (data->offending_message >= data->batch_size)
data->offending_message = 0;
_locate_offending_payload(data);
else
http_adapter_locate_offending_payload(data);
exit:
json_object_put(jso);
}
Expand Down
1 change: 1 addition & 0 deletions modules/http-adapters/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
add_unit_test(CRITERION TARGET test_splunk_adapter INCLUDES ${PROJECT_SOURCE_DIR} DEPENDS http-adapters)
add_unit_test(CRITERION TARGET test_openobserve_adapter INCLUDES ${PROJECT_SOURCE_DIR} DEPENDS http-adapters)
10 changes: 9 additions & 1 deletion modules/http-adapters/tests/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
if ENABLE_HTTP

modules_http_adapters_tests_TESTS = \
modules/http-adapters/tests/test_splunk_adapter
modules/http-adapters/tests/test_splunk_adapter \
modules/http-adapters/tests/test_openobserve_adapter

check_PROGRAMS += ${modules_http_adapters_tests_TESTS}

Expand All @@ -12,5 +13,12 @@ modules_http_adapters_tests_test_splunk_adapter_LDADD = $(TEST_LDADD)
modules_http_adapters_tests_test_splunk_adapter_LDFLAGS = \
-dlpreopen $(top_builddir)/modules/http-adapters/libhttp-adapters.la

EXTRA_modules_http_adapters_tests_test_openobserve_adapter_DEPENDENCIES = \
$(top_builddir)/modules/http-adapters/libhttp-adapters.la
modules_http_adapters_tests_test_openobserve_adapter_CFLAGS = $(TEST_CFLAGS) -I$(top_srcdir)/modules/http-adapters
modules_http_adapters_tests_test_openobserve_adapter_LDADD = $(TEST_LDADD)
modules_http_adapters_tests_test_openobserve_adapter_LDFLAGS = \
-dlpreopen $(top_builddir)/modules/http-adapters/libhttp-adapters.la


endif
Loading
Loading