-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_custom_log.cpp
268 lines (253 loc) · 11 KB
/
read_custom_log.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// _ _____ __________
// | | / / _ | / __/_ __/ Visibility
// | |/ / __ |_\ \ / / Across
// |___/_/ |_/___/ /_/ Space and Time
//
// SPDX-FileCopyrightText: (c) 2025 The Tenzir Contributors
// SPDX-License-Identifier: BSD-3-Clause
// This operators read the "custom log" format, which is a simple key-value that
// doesn't actually exist, but that we'll use for demo purposes. Our made up log
// looks as follows:
//
// [TIMESTAMP] [LOG_LEVEL] [USER_ID] [ACTION_TYPE] - MESSAGE
//
// Some example log lines:
//
// [2025-01-07T17:00:00] [INFO] [user123] [CREATE_POST] - User created a new
// blog post titled "Understanding AI".
//
// [2025-01-07T17:05:00] [INFO] [user123] [EDIT_POST] - User edited the blog
// post "Understanding AI".
//
// [2025-01-07T17:10:00] [INFO] [user456] [COMMENT] - User commented on
// "Understanding AI": "Great insights!".
//
// [2025-01-07T17:15:00] [ERROR] [user123] [DELETE_POST] - User attempted to
// delete a post that does not exist.
//
// [2025-01-07T17:20:00] [INFO] [user789] [LIKE] - User liked the blog post
// "Understanding AI".
//
// Throughout the file, we'll explain things piece-by-piece. Let's start with
// some includes.
#include <tenzir/concept/parseable/tenzir/time.hpp>
#include <tenzir/detail/string.hpp>
#include <tenzir/series_builder.hpp>
#include <tenzir/to_lines.hpp>
#include <tenzir/tql2/plugin.hpp>
// Next, plaese jump to the buttom to read the description of the
// `read_custom_log_plugin` class.
namespace tenzir::plugins::example {
namespace {
// The operator instance for the `read_custom_log` operator. This class
// implements the `crtp_operator`, which in turn implements most of the
// `operator_base` interface for us. We only need to implement the `operator()`,
// mapping a generator of elements to another generator of elements.
//
// Note that many other functions are available to be overridden, such as
// `location` for forcing an operator to run inside a node, or `detached` to
// mark an operator as requiring its own thread. Check the base class for more
// information.
class read_custom_log_operator final
: public crtp_operator<read_custom_log_operator> {
public:
// This provides a constructor for the operator. Note that operator instances
// _must_ be default-constructible without any arguments so that they can be
// transferred between processes. If you wish to provide arguments from the
// corresponding plugin class's `make` function, you can do so by providing an
// additional constructor that takes additional arguments.
read_custom_log_operator() = default;
// The name of the operator. Must be unique.
auto name() const -> std::string override { return "read_custom_log"; }
// Specifies how the operator handles optimization. Check the
// `operator_base::optimize` documentation for more information. For this
// particular operator, we don't do any optimization and choose to act as an
// optimization barrier in the pipeline.
auto optimize(const expression &filter, event_order order) const
-> optimize_result override {
TENZIR_UNUSED(filter, order);
return do_not_optimize(*this);
}
// Handles serialization and deserialization of the operator instance. This
// function _must_ capture all member variables of the instance.
friend auto inspect(auto &f, read_custom_log_operator &x) -> bool {
return f.object(x).fields(
// If we had any members to serialize here, we'd list them as follows:
// f.field("foo", x.foo),
// f.field("bar", x.bar),
// ...
);
}
// This is the main run-loop of the operator instance. It must have one of the
// following signatures:
//
// (generator<T> input, operator_control_plane &ctrl) -> generator<U>
// (operator_control_plane &ctrl) -> generator<U>
//
// `T` may be either `table_slice` or `chunk_ptr`, and `U` may be either
// `table_slice`, `chunk_ptr`, or `std::monostate`.
//
// A table slice is series of events. A chunk is a series of bytes. The
// absence of the input denotes that an operator is a source, and returning a
// generator of monostate denotes that an operator is a sink.
//
// The operator control plane is an escape hatch that allows operators to
// interact with whatever resides outside of the data plane.
//
// In this case, we're reading a custom line-based log format, so we'll be
// taking in bytes and returning events.
//
// Note that this function is marked `const`, which means that it is not
// allowed to modify any members of the operator instance. Store mutable state
// in the function instead.
auto operator()(generator<chunk_ptr> input,
operator_control_plane &ctrl) const
-> generator<table_slice> {
// Since we have a line-based format, we'll adapt our generator of chunks
// into a generator of views onto lines using the `to_lines` function from
// libtenzir, and then for readability will continue with the `read_lines`
// function below.
return read_lines(to_lines(std::move(input)), ctrl);
}
private:
auto read_lines(generator<std::optional<std::string_view>> input,
operator_control_plane &ctrl) const
-> generator<table_slice> {
// We set up a builder to create events in, and give it a fixed schema. For
// more advanced use cases, consider using the `multi_series_builder`
// instead. As a reminder, this is what our log format looks like:
// [TIMESTAMP] [LOG_LEVEL] [USER_ID] [ACTION_TYPE] - MESSAGE
auto builder = series_builder{type{
"custom_log",
record_type{
{"timestamp", time_type{}},
{"log_level", string_type{}},
{"user_id", string_type{}},
{"action_type", string_type{}},
{"message", string_type{}},
},
}};
// We want to buffer events for no more than 250ms before returning them.
// For this, we need to store when we last returned events.
auto last_yield = std::chrono::steady_clock::now();
for (auto &&line : input) {
// Whenever we read a new line, we first check if we've passed our
// timeout.
if (last_yield + std::chrono::milliseconds{250} <
std::chrono::steady_clock::now()) {
// If we have, we yield the events we've built so far.
for (auto events : builder.finish_as_table_slice()) {
co_yield std::move(events);
}
// And reset the timer.
last_yield = std::chrono::steady_clock::now();
}
if (!line) {
// We're playing friendly and yielding control back to the scheduler if
// we can't read a line.
co_yield {};
continue;
}
// Let's skip empty lines.
if (line->empty()) {
continue;
}
// Now, we parse each line one-by-one. We pass in the diagnostics handler
// so that we can tell the user about parse failures.
parse_line(*line, builder, ctrl.diagnostics());
}
// At the end of the input, we flush the builder to get the final events.
for (auto events : builder.finish_as_table_slice()) {
co_yield std::move(events);
}
}
auto parse_line(std::string_view line, series_builder &builder,
diagnostic_handler &dh) const -> void {
// Here, we can now finally parse the line piece by piece. Tenzir also ships
// with a parser combinator library, which can be used to parse more complex
// formats. But for this one, we'll just go through the line iteratively.
// We'll start by splitting the line into its components. Here's the format
// again:
//
// [TIMESTAMP] [LOG_LEVEL] [USER_ID] [ACTION_TYPE] - MESSAGE
//
// We'll split by the four first spaces initially:
auto parts = detail::split(line, " ", 4);
// If we don't have enough parts, we'll emit a diagnostic and skip this
// line.
if (parts.size() != 5) {
diagnostic::warning("unexpected log format: expected at least 4 spaces")
.note("got `{}`", line)
.emit(dh);
return;
}
// Now, let's check if the first four sections are wrapped in square
// brackets, and if they are remove them.
for (auto i = 0; i < 4; ++i) {
if (parts[i].front() != '[' || parts[i].back() != ']') {
diagnostic::warning("unexpected log format: expected square brackets")
.note("got `{}`", parts[i])
.emit(dh);
return;
}
parts[i] = parts[i].substr(1, parts[i].size() - 2);
}
// For the last section, we'll check whether we begin with a dash and a
// space and will then just leave the rest as-is.
if (not parts[4].starts_with("- ")) {
diagnostic::warning("unexpected log format: expected a dash and a space")
.note("got `{}`", parts[4])
.emit(dh);
return;
}
parts[4] = parts[4].substr(2);
// For the first section, we need to additionally parse the timestamp. We'll
// use Tenzir's built-in timestamp parser for this.
auto timestamp = time{};
if (not parsers::time(parts[0], timestamp)) {
diagnostic::warning("unexpected log format: expected a timestamp")
.note("got `{}`", parts[0])
.emit(dh);
return;
}
// Now, we can finally build the event.
auto event = builder.record();
event.field("timestamp", timestamp);
event.field("log_level", parts[1]);
event.field("user_id", parts[2]);
event.field("action_type", parts[3]);
event.field("message", parts[4]);
TENZIR_WARN("parsed line {}", line);
}
};
// The `read_custom_log_plugin` class is the plugin that registers the operator
// for us. It's a subclass of `operator_plugin2`, which is a plugin that defines
// an operator for TQL2. The plugin is responsible for creating instances of the
// operator from invocations.
//
// Note that a plugin can inherit from any number of plugin types, but the name
// of a plugin must be unique, or Tenzir will fail to start. For this particular
// plugin, the `name()` function is overriden automatically by the
// `operator_plugin2` plugin, which infers the name from its template parameter.
// For most other plugins, you'll need to override the `name()` function
// manually.
class read_custom_log_plugin final
: public virtual operator_plugin2<read_custom_log_operator> {
public:
auto make(invocation inv, session ctx) const
-> failure_or<operator_ptr> override {
// We don't accept any arguments, so we can just parse the invocation with
// an empty parser. If you want to accept positional or named arguments,
// call the .positional and .named methods on the parser.
auto parser = argument_parser2::operator_(name());
TRY(parser.parse(inv, ctx));
// Create the operator instance, passing in any arguments that the operator
// requires.
return std::make_unique<read_custom_log_operator>();
}
};
} // namespace
} // namespace tenzir::plugins::example
// Lastly, register our plugin.
TENZIR_REGISTER_PLUGIN(tenzir::plugins::example::read_custom_log_plugin)
// Now, jump back up to read the operator instance's description.