diff --git a/CMakeLists.txt b/CMakeLists.txt index 575b004..c1c85bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,13 +1,17 @@ cmake_minimum_required(VERSION 3.19...3.28 FATAL_ERROR) -project(trim - DESCRIPTION "Trim operator plugin for Tenzir" +project(example + DESCRIPTION "Example plugin for Tenzir" LANGUAGES CXX) find_package(Tenzir REQUIRED PATHS "/opt/tenzir") +# This plugin is set up in a way where every builtin/*.cpp file may define its +# own plugin. The main src/plugin.cpp file is just a stub that acts as a parent +# to all the other plugins. TenzirRegisterPlugin( - TARGET trim + TARGET example ENTRYPOINT "src/plugin.cpp" SOURCES GLOB "src/*.cpp" + BUILTINS GLOB "builtins/*.cpp" INCLUDE_DIRECTORIES "include") diff --git a/Dockerfile b/Dockerfile index 68fea62..860ca7a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,18 +1,16 @@ ARG TENZIR_VERSION=main FROM ghcr.io/tenzir/tenzir-dev:${TENZIR_VERSION} AS builder -COPY . /tmp/trim/ +COPY . /tmp/example/ -RUN cmake -S /tmp/trim -B /tmp/trim/build -G Ninja -D CMAKE_INSTALL_PREFIX:STRING="$PREFIX" -RUN cmake --build /tmp/trim/build --parallel -RUN cmake --install /tmp/trim/build --strip --component Runtime --prefix /tmp/trim/install +RUN cmake -S /tmp/example -B /tmp/example/build -G Ninja -D CMAKE_INSTALL_PREFIX:STRING="$PREFIX" +RUN cmake --build /tmp/example/build --parallel +RUN cmake --install /tmp/example/build --strip --component Runtime --prefix /tmp/example/install FROM builder AS test -ENV BATS_LIB_PATH=/tmp/tenzir/tenzir/integration/lib - -ENTRYPOINT cmake --build /tmp/trim/build --target update-integration +ENTRYPOINT cmake --build /tmp/example/build --target update-integration FROM ghcr.io/tenzir/tenzir:${TENZIR_VERSION} -COPY --from=builder --chown=tenzir:tenzir /tmp/trim/install /opt/tenzir +COPY --from=builder --chown=tenzir:tenzir /tmp/example/install /opt/tenzir diff --git a/README.md b/README.md index a9f882c..4950161 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,12 @@ # Tenzir Example Plugin -This is an example plugin for Tenzir, adding a `trim` operaotr that removes -whitespace from string fields. +We think that learning how to build a plugin is best done by example. This +example plugin implements a simple operator `read_custom_log` that parses a +[custom line-based log format](integration/data/inputs/sample.log). + +The operators C++ implementation can be found in +[`builtins/operators/read_custom_log.cpp`](builtins/operators/read_custom_log.cpp) +and is extensively commented. ## Build and run @@ -10,22 +15,13 @@ your additional plugin. Use `docker compose run --build tenzir ''` to interact with the node on the command-line, or set the following environment variables connect your -node to app.tenzir.com: +node to [app.tenzir.com](app): -``` +```bash export TENZIR_PLUGINS__PLATFORM__API_KEY='' export TENZIR_PLUGINS__PLATFORM__TENANT_ID='' ``` -## Learn how to write a plugin - -We think that learning how to build a plugin is best done by example. Tenzir -ships with a variety of [plugins][plugins-source] and -[builtins][builtins-source] to get inspired by and to learn from. - -If you have any questions, feel free to reach out in the [#developers channel -on Discord][discord]. - ## Run tests Every plugin defines additional tests using @@ -33,15 +29,25 @@ Every plugin defines additional tests using `docker compose run --build tests` to execute your tests and update the reference files automatically. +## Further Resources + +Tenzir ships with a variety of [plugins][plugins-source] and +[builtins][builtins-source] to get inspired by and to learn from. + +If you have any questions, feel free to reach out in the [#developers channel +on Discord][discord]. + ## Contribute your plugin If you want to upstream your plugin so that it is bundled with every Tenzir installation, open a PR that adds it to the [`plugins/` directory in the -`tenzir/tenzir` repository][plugins-source]. If your plugin has no +`tenzir/tenzir` repository][plugins-source]. If your plugin has no dependencies, consider contributing it as a builtin instead. Builtins are located in the [`libtenzir/builtins/` directory in the `tenzir/tenzir` repository][builtins-source]. +[tenzir]: https://github.com/tenzir/tenzir +[app]: https://app.tenzir.com [plugins-source]: https://github.com/tenzir/tenzir/tree/main/plugins [builtins-source]: https://github.com/tenzir/tenzir/tree/main/libtenzir/builtins [discord]: https://docs.tenzir.com/discord diff --git a/builtins/operators/read_custom_log.cpp b/builtins/operators/read_custom_log.cpp new file mode 100644 index 0000000..642026a --- /dev/null +++ b/builtins/operators/read_custom_log.cpp @@ -0,0 +1,297 @@ +// _ _____ __________ +// | | / / _ | / __/_ __/ Visibility +// | |/ / __ |_\ \ / / Across +// |___/_/ |_/___/ /_/ Space and Time +// +// SPDX-FileCopyrightText: (c) 2025 The Tenzir Contributors +// SPDX-License-Identifier: BSD-3-Clause + +// This operators read the "custom log" format, which is a simple key-value that +// doesn't actually exist, but that we'll use for demo purposes. Our made up log +// looks as follows: +// +// [TIMESTAMP] [LOG_LEVEL] [USER_ID] [ACTION_TYPE] - MESSAGE +// +// Some example log lines: +// +// [2025-01-07T17:00:00] [INFO] [user123] [CREATE_POST] - User created a new +// blog post titled "Understanding AI". +// +// [2025-01-07T17:05:00] [INFO] [user123] [EDIT_POST] - User edited the blog +// post "Understanding AI". +// +// [2025-01-07T17:10:00] [INFO] [user456] [COMMENT] - User commented on +// "Understanding AI": "Great insights!". +// +// [2025-01-07T17:15:00] [ERROR] [user123] [DELETE_POST] - User attempted to +// delete a post that does not exist. +// +// [2025-01-07T17:20:00] [INFO] [user789] [LIKE] - User liked the blog post +// "Understanding AI". +// +// Throughout the file, we'll explain things piece-by-piece. Let's start with +// some includes. + +#include +#include +#include +#include +#include + +// Next, jump to the bottom to read the description of the +// `read_custom_log_plugin` class. + +namespace tenzir::plugins::example { +namespace { + +// The operator instance for the `read_custom_log` operator. This class +// implements the `crtp_operator`, which in turn implements most of the +// `operator_base` interface for us. We only need to implement `operator()`, +// mapping a generator of elements to another generator of elements. +// +// Note that many other functions are available to be overridden, such as +// `location` for forcing an operator to run inside a node, or `detached` to +// mark an operator as requiring its own thread. Check the base class for more +// information. +class read_custom_log_operator final + : public crtp_operator { +public: + // This provides a constructor for the operator. Note that operator instances + // _must_ be default-constructible without any arguments so that they can be + // transferred between processes. + read_custom_log_operator() = default; + + // Since our operator also accepts a parameter, we provide a second + // constructor accepting that parameter. This is what we will use in our code. + read_custom_log_operator(tenzir::duration time_offset) + : time_offset_{time_offset} { + } + + // The name of the operator. Must be unique. + auto name() const -> std::string override { return "read_custom_log"; } + + // Specifies how the operator handles optimization. Check the + // `operator_base::optimize` documentation for more information. For this + // particular operator, we don't do any optimization and choose to act as an + // optimization barrier in the pipeline. + auto optimize(const expression& filter, + event_order order) const -> optimize_result override { + TENZIR_UNUSED(filter, order); + return do_not_optimize(*this); + } + + // Handles serialization and deserialization of the operator instance. This + // function _must_ capture all member variables of the instance. + friend auto inspect(auto& f, read_custom_log_operator& x) -> bool { + return f.object(x).fields( + f.field("time_offset", x.time_offset_) + // If there were further members, they _must_ be added here. + ); + } + + // This is the main run-loop of the operator instance. It must have one of the + // following signatures: + // + // (generator input, operator_control_plane &ctrl) -> generator + // (operator_control_plane &ctrl) -> generator + // + // `T` may be either `table_slice` or `chunk_ptr`, and `U` may be either + // `table_slice`, `chunk_ptr`, or `std::monostate`. + // + // A table slice is series of events. A chunk is a series of bytes. The + // absence of the input denotes that an operator is a source, and returning a + // generator of monostate denotes that an operator is a sink. + // + // The operator control plane is an escape hatch that allows operators to + // interact with whatever resides outside of the *data plane*. The input + // stream, the operators dynamic state and the output stream are considered + // the data plane. The operator control plane on the other hand contains + // things related to workings of Tenzir. Most importantly, the operator + // control plane provides access to a diagnostic handler, which can be used to + // emit diagnostics. + // + // In this case, we're reading a custom line-based log format, so we'll be + // taking in bytes and returning events. + // + // Note that this function is marked `const`, which means that it is not + // allowed to modify any members of the operator instance. Store mutable state + // in the function instead. + auto + operator()(generator input, + operator_control_plane& ctrl) const -> generator { + // Since we have a line-based format, we'll adapt our generator of chunks + // into a generator of views onto lines using the `to_lines` function from + // libtenzir, and then for readability will continue with the `read_lines` + // function below. + return read_lines(to_lines(std::move(input)), ctrl); + } + +private: + // This function accepts lines, which are much easier to work with for our + // format. We dispatch to this from `operator()`. + auto + read_lines(generator> input, + operator_control_plane& ctrl) const -> generator { + // We set up a builder to create events in, and give it a fixed schema. For + // more advanced use cases, consider using the `multi_series_builder` + // instead. As a reminder, this is what our log format looks like: + // [TIMESTAMP] [LOG_LEVEL] [USER_ID] [ACTION_TYPE] - MESSAGE + auto builder = series_builder{type{ + "custom_log", + record_type{ + {"timestamp", time_type{}}, + {"log_level", string_type{}}, + {"user_id", string_type{}}, + {"action_type", string_type{}}, + {"message", string_type{}}, + }, + }}; + // We want to buffer events for no more than 250ms before returning them. + // For this, we need to store when we last returned events. + // Without this logic, the operator would not yield any results until all + // input has been processed. + auto last_yield = std::chrono::steady_clock::now(); + // This is the main input loop, which will run until the input stream ends. + for (auto &&line : input) { + // Whenever we read a new line, we first check if we've passed our + // timeout. + if (last_yield + std::chrono::milliseconds{250} < + std::chrono::steady_clock::now()) { + // If we have, we yield the events we've built so far. + for (auto events : builder.finish_as_table_slice()) { + co_yield std::move(events); + } + // And reset the timer. + last_yield = std::chrono::steady_clock::now(); + } + if (!line) { + // If we did not get a line, we yield control back to the scheduler. + co_yield {}; + continue; + } + // Let's skip empty lines. + if (line->empty()) { + continue; + } + // Now, we parse each line one-by-one. We pass in the diagnostics handler + // so that we can tell the user about parse failures. + parse_line(*line, builder, ctrl.diagnostics()); + } + // At the end of the input, we flush the builder to get the final events. + for (auto events : builder.finish_as_table_slice()) { + co_yield std::move(events); + } + } + + auto parse_line(std::string_view line, series_builder& builder, + diagnostic_handler& dh) const -> void { + // Here, we can now finally parse the line piece by piece. Tenzir also ships + // with a parser combinator library, which can be used to parse more complex + // formats. But for this one, we'll just go through the line iteratively. + // We'll start by splitting the line into its components. Here's the format + // again: + // + // [TIMESTAMP] [LOG_LEVEL] [USER_ID] [ACTION_TYPE] - MESSAGE + // + // We'll split by the four first spaces initially: + auto parts = detail::split(line, " ", 4); + // If we don't have enough parts, we'll emit a diagnostic and skip this + // line. + if (parts.size() != 5) { + diagnostic::warning("unexpected log format: expected at least 4 spaces") + .note("got `{}`", line) + .emit(dh); + return; + } + // Now, let's check if the first four sections are wrapped in square + // brackets, and if they are remove them. + for (auto i = 0; i < 4; ++i) { + if (parts[i].front() != '[' || parts[i].back() != ']') { + diagnostic::warning("unexpected log format: expected square brackets") + .note("got `{}`", parts[i]) + .emit(dh); + return; + } + parts[i].remove_prefix(1); + parts[i].remove_suffix(1); + } + // For the last section, we'll check whether we begin with a dash and a + // space and will then just leave the rest as-is. + if (not parts[4].starts_with("- ")) { + diagnostic::warning("unexpected log format: expected a dash and a space") + .note("got `{}`", parts[4]) + .emit(dh); + return; + } + parts[4].remove_prefix(2); + // For the first section, we need to additionally parse the timestamp. We'll + // use Tenzir's built-in timestamp parser for this. + auto timestamp = time{}; + if (not parsers::time(parts[0], timestamp)) { + diagnostic::warning("unexpected log format: expected a timestamp") + .note("got `{}`", parts[0]) + .emit(dh); + return; + } + // Apply our time offset. + timestamp += time_offset_; + // Now, we can finally build the event. + auto event = builder.record(); + event.field("timestamp", timestamp); + event.field("log_level", parts[1]); + event.field("user_id", parts[2]); + event.field("action_type", parts[3]); + event.field("message", parts[4]); + // Nested fields could be added by calling + // `event.field("my_field").record()` and operating on that. + // + // We write a simple log message for debugging purposes. + TENZIR_TRACE("parsed line {}", line); + } + + // Our data member for the time offset. + tenzir::duration time_offset_{}; +}; + +// The `read_custom_log_plugin` class is the plugin that registers the operator +// for us. It's a subclass of `operator_plugin2`, which is a plugin that defines +// an operator for TQL2. The plugin is responsible for creating instances of the +// operator from invocations. +// +// Note that a plugin can inherit from any number of plugin types, but the name +// of a plugin must be unique, or Tenzir will fail to start. For this particular +// plugin, the `name()` function is overriden automatically by the +// `operator_plugin2` plugin, which infers the name from its template parameter. +// For most other plugins, you'll need to override the `name()` function +// manually. +class read_custom_log_plugin final + : public virtual operator_plugin2 { +public: + auto make(invocation inv, session ctx) const + -> failure_or override { + using namespace std::chrono_literals; + // We create an `argument_parser2` to parse arguments. + auto parser = argument_parser2::operator_(name()); + // Our operator will accept an optional duration to offset the timestamp in + // the log. The default for this offset will be zero. + // Using `named_optional`, we add our argument. You can also use the + // `positional` and/or `named`. See `argument_parser2.hpp`. If you dont need + // any arguments, you can skip this part. + auto time_offset = std::optional{0s}; + parser.named("time_offset", time_offset); + // We let the argument_parser parse our arguments. The TRY macro ensures + // that a parsing failure will stop the setup. + TRY(parser.parse(inv, ctx)); + // Create the operator instance, passing in any arguments that the operator + // requires. + return std::make_unique(*time_offset); + } +}; + +} // namespace +} // namespace tenzir::plugins::example + +// Lastly, register our plugin. +TENZIR_REGISTER_PLUGIN(tenzir::plugins::example::read_custom_log_plugin) + +// Now, jump back up to read the operator instance's description. diff --git a/docker-compose.yaml b/docker-compose.yaml index 1f94bf2..b85900c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -42,7 +42,7 @@ services: profiles: - donotstart volumes: - - ./integration/data/reference/:/plugins/example/integration/data/reference/ + - ./integration/data/reference/:/tmp/example/integration/data/reference/ volumes: tenzir-lib: diff --git a/integration/data/inputs/sample.log b/integration/data/inputs/sample.log new file mode 100644 index 0000000..0f6c97e --- /dev/null +++ b/integration/data/inputs/sample.log @@ -0,0 +1,5 @@ +[2025-01-07T17:00:00] [INFO] [user123] [CREATE_POST] - User created a new blog post titled "Understanding AI". +[2025-01-07T17:05:00] [INFO] [user123] [EDIT_POST] - User edited the blog post "Understanding AI". +[2025-01-07T17:10:00] [INFO] [user456] [COMMENT] - User commented on "Understanding AI": "Great insights!". +[2025-01-07T17:15:00] [ERROR] [user123] [DELETE_POST] - User attempted to delete a post that does not exist. +[2025-01-07T17:20:00] [INFO] [user789] [LIKE] - User liked the blog post "Understanding AI". diff --git a/integration/data/reference/tests/test_parse_example_logs/step_00.ref b/integration/data/reference/tests/test_parse_example_logs/step_00.ref new file mode 100644 index 0000000..765dc29 --- /dev/null +++ b/integration/data/reference/tests/test_parse_example_logs/step_00.ref @@ -0,0 +1,5 @@ +{"timestamp": "2025-01-07T17:00:00.000000", "log_level": "INFO", "user_id": "user123", "action_type": "CREATE_POST", "message": "User created a new blog post titled \"Understanding AI\"."} +{"timestamp": "2025-01-07T17:05:00.000000", "log_level": "INFO", "user_id": "user123", "action_type": "EDIT_POST", "message": "User edited the blog post \"Understanding AI\"."} +{"timestamp": "2025-01-07T17:10:00.000000", "log_level": "INFO", "user_id": "user456", "action_type": "COMMENT", "message": "User commented on \"Understanding AI\": \"Great insights!\"."} +{"timestamp": "2025-01-07T17:15:00.000000", "log_level": "ERROR", "user_id": "user123", "action_type": "DELETE_POST", "message": "User attempted to delete a post that does not exist."} +{"timestamp": "2025-01-07T17:20:00.000000", "log_level": "INFO", "user_id": "user789", "action_type": "LIKE", "message": "User liked the blog post \"Understanding AI\"."} \ No newline at end of file diff --git a/integration/data/reference/tests/test_parse_example_logs_with_argument/step_00.ref b/integration/data/reference/tests/test_parse_example_logs_with_argument/step_00.ref new file mode 100644 index 0000000..c5d67ab --- /dev/null +++ b/integration/data/reference/tests/test_parse_example_logs_with_argument/step_00.ref @@ -0,0 +1,5 @@ +{"timestamp": "2025-01-07T18:00:00.000000", "log_level": "INFO", "user_id": "user123", "action_type": "CREATE_POST", "message": "User created a new blog post titled \"Understanding AI\"."} +{"timestamp": "2025-01-07T18:05:00.000000", "log_level": "INFO", "user_id": "user123", "action_type": "EDIT_POST", "message": "User edited the blog post \"Understanding AI\"."} +{"timestamp": "2025-01-07T18:10:00.000000", "log_level": "INFO", "user_id": "user456", "action_type": "COMMENT", "message": "User commented on \"Understanding AI\": \"Great insights!\"."} +{"timestamp": "2025-01-07T18:15:00.000000", "log_level": "ERROR", "user_id": "user123", "action_type": "DELETE_POST", "message": "User attempted to delete a post that does not exist."} +{"timestamp": "2025-01-07T18:20:00.000000", "log_level": "INFO", "user_id": "user789", "action_type": "LIKE", "message": "User liked the blog post \"Understanding AI\"."} \ No newline at end of file diff --git a/integration/data/reference/tests/test_trim_strings/step_00.ref b/integration/data/reference/tests/test_trim_strings/step_00.ref deleted file mode 100644 index 782ebb2..0000000 --- a/integration/data/reference/tests/test_trim_strings/step_00.ref +++ /dev/null @@ -1 +0,0 @@ -{"foo": "foo"} \ No newline at end of file diff --git a/integration/data/reference/tests/test_trim_strings/step_01.ref b/integration/data/reference/tests/test_trim_strings/step_01.ref deleted file mode 100644 index b8c7899..0000000 --- a/integration/data/reference/tests/test_trim_strings/step_01.ref +++ /dev/null @@ -1 +0,0 @@ -{"foo": "foo", "bar": " bar"} \ No newline at end of file diff --git a/integration/data/reference/tests/test_trim_strings/step_02.ref b/integration/data/reference/tests/test_trim_strings/step_02.ref deleted file mode 100644 index 0a72233..0000000 --- a/integration/data/reference/tests/test_trim_strings/step_02.ref +++ /dev/null @@ -1 +0,0 @@ -{"foo": "foo", "bar": "bar"} \ No newline at end of file diff --git a/integration/data/reference/tests/test_trimming_non-2dstring_fields_fails/step_00.ref b/integration/data/reference/tests/test_trimming_non-2dstring_fields_fails/step_00.ref deleted file mode 100644 index 80f7e52..0000000 --- a/integration/data/reference/tests/test_trimming_non-2dstring_fields_fails/step_00.ref +++ /dev/null @@ -1,6 +0,0 @@ -error: NotImplemented: Function 'utf8_trim' has no kernel matching input types (uint64) - --> :1:16 - | -1 | version | trim :uint64 - | ^^^^^^^ - | \ No newline at end of file diff --git a/integration/tests/setup_suite.bash b/integration/tests/setup_suite.bash index 1752ee6..9769d95 100644 --- a/integration/tests/setup_suite.bash +++ b/integration/tests/setup_suite.bash @@ -4,3 +4,5 @@ setup_suite() { export_default_paths } + +export BATS_LIB_PATH=${BATS_LIB_PATH:+${BATS_LIB_PATH}:}/tmp/tenzir/tenzir/integration/lib diff --git a/integration/tests/tests.bats b/integration/tests/tests.bats index fb7edb3..75ed648 100644 --- a/integration/tests/tests.bats +++ b/integration/tests/tests.bats @@ -6,7 +6,8 @@ setup() { bats_load_library bats-tenzir export_default_node_config - export TENZIR_PLUGINS="trim" + export TENZIR_PLUGINS="example" + export TENZIR_TQL2=true setup_node } @@ -14,12 +15,12 @@ teardown() { teardown_node } -@test "trim strings" { - check tenzir 'version | put foo=" foo " | trim foo' - check tenzir 'version | put foo=" foo ", bar=" bar" | trim foo' - check tenzir 'version | put foo=" foo ", bar=" bar" | trim :string' +@test "parse example logs" { + cat "${BATS_TENZIR_INPUTSDIR}/sample.log" | + check tenzir 'read_custom_log' } -@test "trimming non-string fields fails" { - check ! tenzir 'version | trim :uint64' +@test "parse example logs with argument" { + cat "${BATS_TENZIR_INPUTSDIR}/sample.log" | + check tenzir 'read_custom_log time_offset=1h' } diff --git a/src/plugin.cpp b/src/plugin.cpp index 281ab26..2868b0e 100644 --- a/src/plugin.cpp +++ b/src/plugin.cpp @@ -1,147 +1,16 @@ -#include -#include -#include #include -#include -#include - -#include - -namespace tenzir::plugins::trim { +namespace tenzir::plugins::example { namespace { -class trim_operator final : public crtp_operator { -public: - trim_operator() = default; - - explicit trim_operator(located field) - : field_{std::move(field)} { - } - - // This function contains the logic of the trim operator when instantiated. - // It maps from a generator of table slices to a generator of table slices. A - // table slice is simply a batch of events. The operator control plane is an - // escape hatch from the operator, utilized for emitting diagnostics or - // running asynchronous code outside of the operator's generator. - auto operator()(generator input, - operator_control_plane& ctrl) const - -> generator { - // All code up to the first yield is run synchronously in an operator and - // is considered the start up phase. This operator doesn't do anything - // special in this case, so we can signal a successful startup immediately. - co_yield {}; - // The main loop of the operator exits once the previous operator has - // finished. Utilize this for control flow. For example, we keep a list - // of schemas outside of the loop that we already warned for. - auto warned_for_schemas = std::unordered_set{}; - for (auto events : input) { - // There's one important contract that an operator must always adhere to: - // if an input batch is empty, the operator must yield. In all other - // situations, the operator may continue without yielding. - if (events.rows() == 0) { - co_yield {}; - continue; - } - // We can now start processing the events in the batch. First, we resolve - // the field for the batch's schema to a set of indices pointing to the - // field(s) within the schema. This transparently supports resolving - // concepts and field extractors. - const auto indices = collect(events.schema().resolve(field_.inner)); - // If the field didn't resolve, we can't do anything with the batch. We - // warn the user about it and return the input unchanged. We warn only - // once per schema, and dwe only warn when the specified field was not a - // type extractor. - if (indices.empty()) { - const auto [_, inserted] = warned_for_schemas.insert( - std::string{events.schema().name()}); - if (inserted and not field_.inner.starts_with(':')) { - diagnostic::warning("field did not resolve for schema `{}`", - events.schema()) - .primary(field_.source) - .emit(ctrl.diagnostics()); - } - co_yield std::move(events); - continue; - } - // Now we can transform the events in the batch. We're utilizing Apache - // Arrow's compute function 'utf8_trim' for this, confuring it to trim - // whitespace. - const auto trim = [&](struct record_type::field field, - std::shared_ptr array) - -> indexed_transformation::result_type { - static const auto options = arrow::compute::TrimOptions{" \t\n\v\f\r"}; - auto trimmed_array = arrow::compute::CallFunction( - "utf8_trim", {array}, &options); - if (not trimmed_array.ok()) { - diagnostic::error("{}", trimmed_array.status().ToString()) - .primary(field_.source) - .throw_(); - } - return { - {field, trimmed_array.MoveValueUnsafe().make_array()}, - }; - }; - // Lastly, we apply the transformation to all indices and then return the - // transformed batch. - auto transformations = std::vector{}; - for (auto index : indices) { - transformations.push_back({index, trim}); - } - co_yield transform_columns(std::move(events), std::move(transformations)); - } - } - - // Return the user-facing name of the operator. Must be a valid identifier. - auto name() const -> std::string override { - return "trim"; - } - - // Specify how optimizations affect the operator. - auto optimize(const expression& filter, event_order order) const - -> optimize_result override { - (void)order; - (void)filter; - return do_not_optimize(*this); - } - - // List all fields so that the operator can successfully be transmitted - // between nodes. - friend auto inspect(auto& f, trim_operator& x) -> bool { - return f.object(x).fields(f.field("field", x.field_)); - } - -private: - located field_ = {}; -}; - -class plugin final : public virtual operator_plugin { +class example_plugin final : public virtual plugin { public: - // Provide the signature of the operator for `show operators`. - auto signature() const -> operator_signature override { - return { - .source = false, - .transformation = true, - .sink = false, - }; - } - - // Parse the operator from the parser interface. - auto parse_operator(parser_interface& p) const -> operator_ptr override { - auto parser = argument_parser{ - "trim", - "https://github.com/tenzir/example-plugin", - }; - auto field = located{}; - parser.add(field, ""); - parser.parse(p); - return std::make_unique(field); - } + auto name() const -> std::string override { return "example"; } }; } // namespace -} // namespace tenzir::plugins::trim +} // namespace tenzir::plugins::example -TENZIR_REGISTER_PLUGIN(tenzir::plugins::trim::plugin) +TENZIR_REGISTER_PLUGIN(tenzir::plugins::example::example_plugin)