Skip to content

Commit 7de2c45

Browse files
authored
in_premetheus_remote_write: Implement handler of payloads of prometheus remote write protocol (#8725)
in_prometheus_remote_write: Implement prometheus remote write input plugin. This plugin is able to handle the following types currently: - Counter - Gauge - Untyped - Histogram Summary type of metrics shouldn't be handled and decoded correctly for now. --------- Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 9311b43 commit 7de2c45

12 files changed

+1341
-0
lines changed

CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ option(FLB_IN_ELASTICSEARCH "Enable Elasticsearch (Bulk API) input pl
209209
option(FLB_IN_CALYPTIA_FLEET "Enable Calyptia Fleet input plugin" Yes)
210210
option(FLB_IN_SPLUNK "Enable Splunk HTTP HEC input plugin" Yes)
211211
option(FLB_IN_PROCESS_EXPORTER_METRICS "Enable process exporter metrics input plugin" Yes)
212+
option(FLB_IN_PROMETHEUS_REMOTE_WRITE "Enable prometheus remote write input plugin" Yes)
212213
option(FLB_OUT_AZURE "Enable Azure output plugin" Yes)
213214
option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes)
214215
option(FLB_OUT_AZURE_LOGS_INGESTION "Enable Azure Logs Ingestion output plugin" Yes)

cmake/windows-setup.cmake

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ if(FLB_WINDOWS_DEFAULTS)
6161
set(FLB_IN_PODMAN_METRICS No)
6262
set(FLB_IN_ELASTICSEARCH Yes)
6363
set(FLB_IN_SPLUNK Yes)
64+
set(FLB_IN_PROMETHEUS_REMOTE_WRITE Yes)
6465

6566
# OUTPUT plugins
6667
# ==============

plugins/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ REGISTER_IN_PLUGIN("in_opentelemetry")
229229
REGISTER_IN_PLUGIN("in_elasticsearch")
230230
REGISTER_IN_PLUGIN("in_calyptia_fleet")
231231
REGISTER_IN_PLUGIN("in_splunk")
232+
REGISTER_IN_PLUGIN("in_prometheus_remote_write")
232233

233234
# Test the event loop messaging when used in threaded mode
234235
REGISTER_IN_PLUGIN("in_event_test")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
if(NOT FLB_METRICS)
2+
message(FATAL_ERROR "Prometheus remote write input plugin requires FLB_HTTP_SERVER=On.")
3+
endif()
4+
5+
set(src
6+
prom_rw.c
7+
prom_rw_prot.c
8+
prom_rw_conn.c
9+
prom_rw_config.c
10+
)
11+
12+
FLB_PLUGIN(in_prometheus_remote_write "${src}" "monkey-core-static")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2015-2024 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.in_in (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.in_in
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
21+
#include <fluent-bit/flb_input_plugin.h>
22+
#include <fluent-bit/flb_downstream.h>
23+
#include <fluent-bit/flb_network.h>
24+
#include <fluent-bit/flb_config.h>
25+
26+
#include "prom_rw.h"
27+
#include "prom_rw_conn.h"
28+
#include "prom_rw_prot.h"
29+
#include "prom_rw_config.h"
30+
31+
/*
32+
* For a server event, the collection event means a new client have arrived, we
33+
* accept the connection and create a new TCP instance which will wait for
34+
* JSON map messages.
35+
*/
36+
static int prom_rw_collect(struct flb_input_instance *ins,
37+
struct flb_config *config, void *in_context)
38+
{
39+
struct flb_connection *connection;
40+
struct prom_remote_write_conn *conn;
41+
struct flb_prom_remote_write *ctx;
42+
43+
ctx = in_context;
44+
45+
connection = flb_downstream_conn_get(ctx->downstream);
46+
47+
if (connection == NULL) {
48+
flb_plg_error(ctx->ins, "could not accept new connection");
49+
50+
return -1;
51+
}
52+
53+
flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd);
54+
55+
conn = prom_rw_conn_add(connection, ctx);
56+
57+
if (conn == NULL) {
58+
return -1;
59+
}
60+
61+
return 0;
62+
}
63+
64+
static int prom_rw_init(struct flb_input_instance *ins,
65+
struct flb_config *config, void *data)
66+
{
67+
unsigned short int port;
68+
int ret;
69+
struct flb_prom_remote_write *ctx;
70+
71+
(void) data;
72+
73+
/* Create context and basic conf */
74+
ctx = prom_rw_config_create(ins);
75+
if (!ctx) {
76+
return -1;
77+
}
78+
ctx->collector_id = -1;
79+
80+
/* Populate context with config map defaults and incoming properties */
81+
ret = flb_input_config_map_set(ins, (void *) ctx);
82+
if (ret == -1) {
83+
flb_plg_error(ctx->ins, "configuration error");
84+
prom_rw_config_destroy(ctx);
85+
return -1;
86+
}
87+
88+
/* Set the context */
89+
flb_input_set_context(ins, ctx);
90+
91+
port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10);
92+
93+
if (ctx->enable_http2) {
94+
ret = flb_http_server_init(&ctx->http_server,
95+
HTTP_PROTOCOL_AUTODETECT,
96+
(FLB_HTTP_SERVER_FLAG_KEEPALIVE | FLB_HTTP_SERVER_FLAG_AUTO_INFLATE),
97+
NULL,
98+
ins->host.listen,
99+
ins->host.port,
100+
ins->tls,
101+
ins->flags,
102+
&ins->net_setup,
103+
flb_input_event_loop_get(ins),
104+
ins->config,
105+
(void *) ctx);
106+
107+
if (ret != 0) {
108+
flb_plg_error(ctx->ins,
109+
"could not initialize http server on %s:%u. Aborting",
110+
ins->host.listen, ins->host.port);
111+
112+
prom_rw_config_destroy(ctx);
113+
114+
return -1;
115+
}
116+
117+
ret = flb_http_server_start(&ctx->http_server);
118+
119+
if (ret != 0) {
120+
flb_plg_error(ctx->ins,
121+
"could not start http server on %s:%u. Aborting",
122+
ins->host.listen, ins->host.port);
123+
124+
prom_rw_config_destroy(ctx);
125+
126+
return -1;
127+
}
128+
129+
ctx->http_server.request_callback = prom_rw_prot_handle_ng;
130+
131+
flb_input_downstream_set(ctx->http_server.downstream, ctx->ins);
132+
}
133+
else {
134+
ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP,
135+
ins->flags,
136+
ctx->listen,
137+
port,
138+
ins->tls,
139+
config,
140+
&ins->net_setup);
141+
142+
if (ctx->downstream == NULL) {
143+
flb_plg_error(ctx->ins,
144+
"could not initialize downstream on %s:%s. Aborting",
145+
ctx->listen, ctx->tcp_port);
146+
147+
prom_rw_config_destroy(ctx);
148+
149+
return -1;
150+
}
151+
152+
flb_input_downstream_set(ctx->downstream, ctx->ins);
153+
154+
/* Collect upon data available on the standard input */
155+
ret = flb_input_set_collector_socket(ins,
156+
prom_rw_collect,
157+
ctx->downstream->server_fd,
158+
config);
159+
if (ret == -1) {
160+
flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin");
161+
prom_rw_config_destroy(ctx);
162+
return -1;
163+
}
164+
165+
ctx->collector_id = ret;
166+
}
167+
168+
flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port);
169+
170+
if (ctx->successful_response_code != 200 &&
171+
ctx->successful_response_code != 201 &&
172+
ctx->successful_response_code != 204) {
173+
flb_plg_error(ctx->ins, "%d is not supported response code. Use default 201",
174+
ctx->successful_response_code);
175+
ctx->successful_response_code = 201;
176+
}
177+
178+
return 0;
179+
}
180+
181+
static int prom_rw_exit(void *data, struct flb_config *config)
182+
{
183+
struct flb_prom_remote_write *ctx;
184+
185+
(void) config;
186+
187+
ctx = data;
188+
189+
if (ctx != NULL) {
190+
prom_rw_config_destroy(ctx);
191+
}
192+
193+
return 0;
194+
}
195+
196+
/* Configuration properties map */
197+
static struct flb_config_map config_map[] = {
198+
{
199+
FLB_CONFIG_MAP_BOOL, "http2", "true",
200+
0, FLB_TRUE, offsetof(struct flb_prom_remote_write, enable_http2),
201+
NULL
202+
},
203+
204+
{
205+
FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE,
206+
0, FLB_TRUE, offsetof(struct flb_prom_remote_write, buffer_max_size),
207+
""
208+
},
209+
210+
{
211+
FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE,
212+
0, FLB_TRUE, offsetof(struct flb_prom_remote_write, buffer_chunk_size),
213+
""
214+
},
215+
216+
{
217+
FLB_CONFIG_MAP_STR, "uri", NULL,
218+
0, FLB_TRUE, offsetof(struct flb_prom_remote_write, uri),
219+
"Specify an optional HTTP URI for the target web server, e.g: /something"
220+
},
221+
222+
{
223+
FLB_CONFIG_MAP_BOOL, "tag_from_uri", "true",
224+
0, FLB_TRUE, offsetof(struct flb_prom_remote_write, tag_from_uri),
225+
"If true, tag will be created from uri. e.g. v1_metrics from /v1/metrics ."
226+
},
227+
{
228+
FLB_CONFIG_MAP_INT, "successful_response_code", "201",
229+
0, FLB_TRUE, offsetof(struct flb_prom_remote_write, successful_response_code),
230+
"Set successful response code. 200, 201 and 204 are supported."
231+
},
232+
233+
/* EOF */
234+
{0}
235+
};
236+
237+
/* Plugin reference */
238+
struct flb_input_plugin in_prometheus_remote_write_plugin = {
239+
.name = "prometheus_remote_write",
240+
.description = "Prometheus Remote Write input",
241+
.cb_init = prom_rw_init,
242+
.cb_pre_run = NULL,
243+
.cb_collect = prom_rw_collect,
244+
.cb_flush_buf = NULL,
245+
.cb_pause = NULL,
246+
.cb_resume = NULL,
247+
.cb_exit = prom_rw_exit,
248+
.config_map = config_map,
249+
.flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
250+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2015-2024 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#ifndef FLB_IN_PROM_RW_H
21+
#define FLB_IN_PROM_RW_H
22+
23+
#include <fluent-bit/flb_config.h>
24+
#include <fluent-bit/flb_input.h>
25+
#include <fluent-bit/flb_utils.h>
26+
27+
#include <monkey/monkey.h>
28+
#include <fluent-bit/http_server/flb_http_server.h>
29+
30+
#define HTTP_BUFFER_MAX_SIZE "4M"
31+
#define HTTP_BUFFER_CHUNK_SIZE "512K"
32+
33+
struct flb_prom_remote_write {
34+
int successful_response_code;
35+
flb_sds_t listen;
36+
flb_sds_t tcp_port;
37+
int tag_from_uri;
38+
39+
struct flb_input_instance *ins;
40+
41+
/* HTTP URI */
42+
char *uri;
43+
44+
/* New gen HTTP server */
45+
int enable_http2;
46+
struct flb_http_server http_server;
47+
48+
/* Legacy HTTP server */
49+
size_t buffer_max_size; /* Maximum buffer size */
50+
size_t buffer_chunk_size; /* Chunk allocation size */
51+
52+
int collector_id; /* Listener collector id */
53+
struct flb_downstream *downstream; /* Client manager */
54+
struct mk_list connections; /* linked list of connections */
55+
56+
struct mk_server *server;
57+
};
58+
59+
60+
#endif

0 commit comments

Comments
 (0)