diff --git a/CHANGELOG.md b/CHANGELOG.md index f4b4dc2..d63c5df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.2.0 + - ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233) + ## 5.1.0 - Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 6478dff..677b11e 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -230,6 +230,110 @@ The next scheduled run: * uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and * updates the value of the field at the end of the pagination. +[id="plugins-{type}s-{plugin}-esql"] +==== {esql} support + +.Technical Preview +**** +The {esql} feature that allows using ES|QL queries with this plugin is in Technical Preview. +Configuration options and implementation details are subject to change in minor releases without being preceded by deprecation warnings. +**** + +{es} Query Language ({esql}) provides a SQL-like interface for querying your {es} data. + +To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer. + +To configure {esql} query in the plugin, set the `query_type` to `esql` and provide your {esql} query in the `query` parameter. + +IMPORTANT: {esql} is evolving and may still have limitations with regard to result size or supported field types. We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments. + +The following is a basic scheduled {esql} query that runs hourly: +[source, ruby] + input { + elasticsearch { + id => hourly_cron_job + hosts => [ 'https://..'] + api_key => '....' + query_type => 'esql' + query => ' + FROM food-index + | WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour + | LIMIT 500 + ' + schedule => '0 * * * *' # every hour at min 0 + } + } + +Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query. + +NOTE: With {esql} query, {ls} doesn't generate `event.original`. + +[id="plugins-{type}s-{plugin}-esql-event-mapping"] +===== Mapping {esql} result to {ls} event +{esql} returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries). +The plugin maps each value entry to an event, populating corresponding fields. +For example, a query might produce a table like: + +[cols="2,1,1,1,2",options="header"] +|=== +|`timestamp` |`user_id` | `action` | `status.code` | `status.desc` + +|2025-04-10T12:00:00 |123 |login |200 | Success +|2025-04-10T12:05:00 |456 |purchase |403 | Forbidden (unauthorized user) +|=== + +For this case, the plugin emits two events look like +[source, json] +[ + { + "timestamp": "2025-04-10T12:00:00", + "user_id": 123, + "action": "login", + "status": { + "code": 200, + "desc": "Success" + } + }, + { + "timestamp": "2025-04-10T12:05:00", + "user_id": 456, + "action": "purchase", + "status": { + "code": 403, + "desc": "Forbidden (unauthorized user)" + } + } +] + +NOTE: If your index has a mapping with sub-objects where `status.code` and `status.desc` actually dotted fields, they appear in {ls} events as a nested structure. + +[id="plugins-{type}s-{plugin}-esql-multifields"] +===== Conflict on multi-fields + +{esql} query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects]. +Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin ignores sub-fields with warning and includes parent. +We recommend using the `RENAME` (or `DROP` to avoid warnings) keyword in your {esql} query explicitly rename the fields to include sub-fields into the event. + +This a common occurrence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field. +In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field`. + +To illustrate the situation with example, assuming your mapping has a time `time` field with `time.min` and `time.max` sub-fields as following: +[source, ruby] + "properties": { + "time": { "type": "long" }, + "time.min": { "type": "long" }, + "time.max": { "type": "long" } + } + +The {esql} result will contain all three fields but the plugin cannot map them into {ls} event. +To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields. +[source, ruby] + ... + query => 'FROM my-index | RENAME time AS time.current' + ... + +For comprehensive {esql} syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{esql} documentation]. + [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Input configuration options @@ -257,6 +361,7 @@ Please check out <> for details. | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>, one of `["dsl","esql"]`|No | <> |<>, one of `["hits","aggregations"]`|No | <> | <>|No | <> |<>|No @@ -498,22 +603,35 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`. * Value type is <> * Default value is `'{ "sort": [ "_doc" ] }'` -The query to be executed. Read the {ref}/query-dsl.html[Elasticsearch query DSL -documentation] for more information. +The query to be executed. +Accepted query shape is DSL or {esql} (when `query_type => 'esql'`). +Read the {ref}/query-dsl.html[{es} query DSL documentation] or {ref}/esql.html[{esql} documentation] for more information. When <> resolves to `search_after` and the query does not specify `sort`, the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more. +[id="plugins-{type}s-{plugin}-query_type"] +===== `query_type` + +* Value can be `dsl` or `esql` +* Default value is `dsl` + +Defines the <> shape. +When `dsl`, the query shape must be valid {es} JSON-style string. +When `esql`, the query shape must be a valid {esql} string and `index`, `size`, `slices`, `search_api`, `docinfo`, `docinfo_target`, `docinfo_fields`, `response_type` and `tracking_field` parameters are not allowed. + [id="plugins-{type}s-{plugin}-response_type"] ===== `response_type` - * Value can be any of: `hits`, `aggregations` + * Value can be any of: `hits`, `aggregations`, `esql` * Default value is `hits` Which part of the result to transform into Logstash events when processing the response from the query. + The default `hits` will generate one event per returned document (i.e. "hit"). -When set to `aggregations`, a single Logstash event will be generated with the + +When set to `aggregations`, a single {ls} event will be generated with the contents of the `aggregations` object of the query's response. In this case the `hits` object will be ignored. The parameter `size` will be always be set to 0 regardless of the default or user-defined value set in this plugin. diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 564acc6..b7b477b 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base require 'logstash/inputs/elasticsearch/paginated_search' require 'logstash/inputs/elasticsearch/aggregation' require 'logstash/inputs/elasticsearch/cursor_tracker' + require 'logstash/inputs/elasticsearch/esql' include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck @@ -96,15 +97,21 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # The index or alias to search. config :index, :validate => :string, :default => "logstash-*" - # The query to be executed. Read the Elasticsearch query DSL documentation - # for more info - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # A type of Elasticsearch query, provided by @query. This will validate query shape and other params. + config :query_type, :validate => %w[dsl esql], :default => 'dsl' + + # The query to be executed. DSL or ES|QL (when `query_type => 'esql'`) query shape is accepted. + # Read the following documentations for more info + # Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }' - # This allows you to speccify the response type: either hits or aggregations - # where hits: normal search request - # aggregations: aggregation request - config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits' + # This allows you to specify the DSL response type: one of [hits, aggregations] + # where + # hits: normal search request + # aggregations: aggregation request + # Note that this param is invalid when `query_type => 'esql'`, ES|QL response shape is always a tabular format + config :response_type, :validate => %w[hits aggregations], :default => 'hits' # This allows you to set the maximum number of hits returned per scroll. config :size, :validate => :number, :default => 1000 @@ -286,6 +293,9 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze INTERNAL_ORIGIN_HEADER = { 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze + LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8 + ES_ESQL_SUPPORT_VERSION = "8.11.0" + def initialize(params={}) super(params) @@ -302,10 +312,17 @@ def register fill_hosts_from_cloud_id setup_ssl_params! - @base_query = LogStash::Json.load(@query) - if @slices - @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") - @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") + if @query_type == 'esql' + validate_ls_version_for_esql_support! + validate_esql_query! + not_allowed_options = original_params.keys & %w(index size slices search_api docinfo docinfo_target docinfo_fields response_type tracking_field) + raise(LogStash::ConfigurationError, "Configured #{not_allowed_options} params are not allowed while using ES|QL query") if not_allowed_options&.size > 1 + else + @base_query = LogStash::Json.load(@query) + if @slices + @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") + @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") + end end @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") @@ -341,11 +358,13 @@ def register test_connection! + validate_es_for_esql_support! + setup_serverless setup_search_api - setup_query_executor + @query_executor = create_query_executor setup_cursor_tracker @@ -363,16 +382,6 @@ def run(output_queue) end end - def get_query_object - if @cursor_tracker - query = @cursor_tracker.inject_cursor(@query) - @logger.debug("new query is #{query}") - else - query = @query - end - LogStash::Json.load(query) - end - ## # This can be called externally from the query_executor public @@ -383,6 +392,23 @@ def push_hit(hit, output_queue, root_field = '_source') record_last_value(event) end + def decorate_event(event) + decorate(event) + end + + private + + def get_query_object + return @query if @query_type == 'esql' + if @cursor_tracker + query = @cursor_tracker.inject_cursor(@query) + @logger.debug("new query is #{query}") + else + query = @query + end + LogStash::Json.load(query) + end + def record_last_value(event) @cursor_tracker.record_last_value(event) if @tracking_field end @@ -414,8 +440,6 @@ def set_docinfo_fields(hit, event) event.set(@docinfo_target, docinfo_target) end - private - def hosts_default?(hosts) hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? ) end @@ -664,18 +688,16 @@ def setup_search_api end - def setup_query_executor - @query_executor = case @response_type - when 'hits' - if @resolved_search_api == "search_after" - LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self) - else - logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8 - LogStash::Inputs::Elasticsearch::Scroll.new(@client, self) - end - when 'aggregations' - LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) - end + def create_query_executor + return LogStash::Inputs::Elasticsearch::Esql.new(@client, self) if @query_type == 'esql' + + # DSL query executor + return LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) if @response_type == 'aggregations' + # response_type is hits, executor can be search_after or scroll type + return LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self) if @resolved_search_api == "search_after" + + logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8 + LogStash::Inputs::Elasticsearch::Scroll.new(@client, self) end def setup_cursor_tracker @@ -714,6 +736,26 @@ def get_transport_client_class ::Elastic::Transport::Transport::HTTP::Manticore end + def validate_ls_version_for_esql_support! + if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION) + fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}") + end + end + + def validate_esql_query! + fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty? + source_commands = %w[FROM ROW SHOW] + contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) } + fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command + end + + def validate_es_for_esql_support! + return unless @query_type == 'esql' + # make sure connected ES supports ES|QL (8.11+) + es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION) + fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql + end + module URIOrEmptyValidator ## # @override to provide :uri_or_empty validator diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb new file mode 100644 index 0000000..30afb72 --- /dev/null +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -0,0 +1,153 @@ +require 'logstash/helpers/loggable_try' + +module LogStash + module Inputs + class Elasticsearch + class Esql + include LogStash::Util::Loggable + + ESQL_JOB = "ES|QL job" + + ESQL_PARSERS_BY_TYPE = Hash.new(lambda { |x| x }).merge( + 'date' => ->(value) { value && LogStash::Timestamp.new(value) }, + ) + + # Initialize the ESQL query executor + # @param client [Elasticsearch::Client] The Elasticsearch client instance + # @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance + def initialize(client, plugin) + @client = client + @event_decorator = plugin.method(:decorate_event) + @retries = plugin.params["retries"] + + target_field = plugin.params["target"] + if target_field + def self.apply_target(path); "[#{target_field}][#{path}]"; end + else + def self.apply_target(path); path; end + end + + @query = plugin.params["query"] + unless @query.include?('METADATA') + logger.info("`METADATA` not found the query. `_id`, `_version` and `_index` will not be available in the result", {:query => @query}) + end + logger.debug("ES|QL executor initialized with", {:query => @query}) + end + + # Execute the ESQL query and process results + # @param output_queue [Queue] The queue to push processed events to + # @param query A query (to obey interface definition) + def do_run(output_queue, query) + logger.info("ES|QL executor has started") + response = retryable(ESQL_JOB) do + @client.esql.query({ body: { query: @query }, format: 'json', drop_null_columns: true }) + end + # retriable already printed error details + return if response == false + + if response&.headers&.dig("warning") + logger.warn("ES|QL executor received warning", {:warning_message => response.headers["warning"]}) + end + columns = response['columns']&.freeze + values = response['values']&.freeze + logger.debug("ES|QL query response size: #{values&.size}") + + process_response(columns, values, output_queue) if columns && values + end + + # Execute a retryable operation with proper error handling + # @param job_name [String] Name of the job for logging purposes + # @yield The block to execute + # @return [Boolean] true if successful, false otherwise + def retryable(job_name, &block) + stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name) + stud_try.try((@retries + 1).times) { yield } + rescue => e + error_details = {:message => e.message, :cause => e.cause} + error_details[:backtrace] = e.backtrace if logger.debug? + logger.error("#{job_name} failed with ", error_details) + false + end + + private + + # Process the ESQL response and push events to the output queue + # @param columns [Array[Hash]] The ESQL query response columns + # @param values [Array[Array]] The ESQL query response hits + # @param output_queue [Queue] The queue to push processed events to + def process_response(columns, values, output_queue) + column_specs = columns.map { |column| ColumnSpec.new(column) } + sub_element_mark_map = mark_sub_elements(column_specs) + multi_fields = sub_element_mark_map.filter_map { |key, val| key.name if val == true } + logger.warn("Multi-fields found in ES|QL result and they will not be available in the event. Please use `RENAME` command if you want to include them.", { :detected_multi_fields => multi_fields }) if multi_fields.any? + + values.each do |row| + event = column_specs.zip(row).each_with_object(LogStash::Event.new) do |(column, value), event| + # `unless value.nil?` is a part of `drop_null_columns` that if some of columns' values are not `nil`, `nil` values appear + # we should continuously filter out them to achieve full `drop_null_columns` on each individual row (ideal `LIMIT 1` result) + # we also exclude sub-elements of main field + if value && sub_element_mark_map[column] == false + field_reference = apply_target(column.field_reference) + event.set(field_reference, ESQL_PARSERS_BY_TYPE[column.type].call(value)) + end + end + @event_decorator.call(event) + output_queue << event + rescue => e + # if event creation fails with whatever reason, inform user and tag with failure and return entry as it is + logger.warn("Event creation error, ", message: e.message, exception: e.class, data: { "columns" => columns, "values" => [row] }) + failed_event = LogStash::Event.new("columns" => columns, "values" => [row], "tags" => ['_elasticsearch_input_failure']) + output_queue << failed_event + end + end + + # Determines whether each column in a collection is a nested sub-element (example "user.age") + # of another column in the same collection (example "user"). + # + # @param columns [Array] An array of objects with a `name` attribute representing field paths. + # @return [Hash] A hash mapping each column to `true` if it is a sub-element of another field, `false` otherwise. + # Time complexity: (O(NlogN+N*K)) where K is the number of conflict depth + # without (`prefix_set`) memoization, it would be O(N^2) + def mark_sub_elements(columns) + # Sort columns by name length (ascending) + sorted_columns = columns.sort_by { |c| c.name.length } + prefix_set = Set.new # memoization set + + sorted_columns.each_with_object({}) do |column, memo| + # Split the column name into parts (e.g., "user.profile.age" → ["user", "profile", "age"]) + parts = column.name.split('.') + + # Generate all possible parent prefixes (e.g., "user", "user.profile") + # and check if any parent prefix exists in the set + parent_prefixes = (0...parts.size - 1).map { |i| parts[0..i].join('.') } + memo[column] = parent_prefixes.any? { |prefix| prefix_set.include?(prefix) } + prefix_set.add(column.name) + end + end + end + + # Class representing a column specification in the ESQL response['columns'] + # The class's main purpose is to provide a structure for the event key + # columns is an array with `name` and `type` pair (example: `{"name"=>"@timestamp", "type"=>"date"}`) + # @attr_reader :name [String] The name of the column + # @attr_reader :type [String] The type of the column + class ColumnSpec + attr_reader :name, :type + + def initialize(spec) + @name = isolate(spec.fetch('name')) + @type = isolate(spec.fetch('type')) + end + + def field_reference + @_field_reference ||= '[' + name.gsub('.', '][') + ']' + end + + private + def isolate(value) + value.frozen? ? value : value.clone.freeze + end + end + end + end +end \ No newline at end of file diff --git a/logstash-input-elasticsearch.gemspec b/logstash-input-elasticsearch.gemspec index 469f4d8..976b3ef 100644 --- a/logstash-input-elasticsearch.gemspec +++ b/logstash-input-elasticsearch.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-elasticsearch' - s.version = '5.1.0' + s.version = '5.2.0' s.licenses = ['Apache License (2.0)'] s.summary = "Reads query results from an Elasticsearch cluster" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb new file mode 100644 index 0000000..f958dea --- /dev/null +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -0,0 +1,180 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/elasticsearch" +require "elasticsearch" + +describe LogStash::Inputs::Elasticsearch::Esql do + let(:client) { instance_double(Elasticsearch::Client) } + let(:esql_client) { double("esql-client") } + + let(:plugin) { instance_double(LogStash::Inputs::Elasticsearch, params: plugin_config, decorate_event: nil) } + let(:plugin_config) do + { + "query" => "FROM test-index | STATS count() BY field", + "retries" => 3 + } + end + let(:esql_executor) { described_class.new(client, plugin) } + + describe "#initialization" do + it "sets up the ESQL client with correct parameters" do + expect(esql_executor.instance_variable_get(:@query)).to eq(plugin_config["query"]) + expect(esql_executor.instance_variable_get(:@retries)).to eq(plugin_config["retries"]) + expect(esql_executor.instance_variable_get(:@target_field)).to eq(nil) + end + end + + describe "#execution" do + let(:output_queue) { Queue.new } + + context "when faces error while retrying" do + it "retries the given block the specified number of times" do + attempts = 0 + result = esql_executor.retryable("Test Job") do + attempts += 1 + raise StandardError if attempts < 3 + "success" + end + expect(attempts).to eq(3) + expect(result).to eq("success") + end + + it "returns false if the block fails all attempts" do + result = esql_executor.retryable("Test Job") do + raise StandardError + end + expect(result).to eq(false) + end + end + + context "when executing chain of processes" do + let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'a.b.1.d', 'type' => 'keyword' }, + { 'name' => 'h_g.k$l.m.0', 'type' => 'keyword' }] } } + + before do + allow(esql_executor).to receive(:retryable).and_yield + allow(client).to receive_message_chain(:esql, :query).and_return(response) + end + + it "executes the ESQL query and processes the results" do + allow(response).to receive(:headers).and_return({}) + esql_executor.do_run(output_queue, plugin_config["query"]) + expect(output_queue.size).to eq(1) + + event = output_queue.pop + expect(event.get('[a][b][1][d]')).to eq('foo') + expect(event.get('[h_g][k$l][m][0]')).to eq('bar') + end + + it "logs a warning if the response contains a warning header" do + allow(response).to receive(:headers).and_return({ "warning" => "some warning" }) + expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", { :warning_message => "some warning" }) + esql_executor.do_run(output_queue, plugin_config["query"]) + end + + it "does not log a warning if the response does not contain a warning header" do + allow(response).to receive(:headers).and_return({}) + expect(esql_executor.logger).not_to receive(:warn) + esql_executor.do_run(output_queue, plugin_config["query"]) + end + end + + describe "multiple rows in the result" do + let(:response) { { 'values' => rows, 'columns' => [{ 'name' => 'key.1', 'type' => 'keyword' }, + { 'name' => 'key.2', 'type' => 'keyword' }] } } + + before do + allow(esql_executor).to receive(:retryable).and_yield + allow(client).to receive_message_chain(:esql, :query).and_return(response) + allow(response).to receive(:headers).and_return({}) + end + + context "when mapping" do + let(:rows) { [%w[foo bar], %w[hello world]] } + + it "1:1 maps rows to events" do + esql_executor.do_run(output_queue, plugin_config["query"]) + expect(output_queue.size).to eq(2) + + event_1 = output_queue.pop + expect(event_1.get('[key][1]')).to eq('foo') + expect(event_1.get('[key][2]')).to eq('bar') + + event_2 = output_queue.pop + expect(event_2.get('[key][1]')).to eq('hello') + expect(event_2.get('[key][2]')).to eq('world') + end + end + + context "when partial nil values appear" do + let(:rows) { [[nil, "bar"], ["hello", nil]] } + + it "ignores the nil values" do + esql_executor.do_run(output_queue, plugin_config["query"]) + expect(output_queue.size).to eq(2) + + event_1 = output_queue.pop + expect(event_1.get('[key][1]')).to eq(nil) + expect(event_1.get('[key][2]')).to eq('bar') + + event_2 = output_queue.pop + expect(event_2.get('[key][1]')).to eq('hello') + expect(event_2.get('[key][2]')).to eq(nil) + end + end + end + + context "when sub-elements occur in the result" do + let(:response) { { + 'values' => [[50, 1, 100], [50, 0, 1000], [50, 9, 99999]], + 'columns' => + [ + { 'name' => 'time', 'type' => 'long' }, + { 'name' => 'time.min', 'type' => 'long' }, + { 'name' => 'time.max', 'type' => 'long' }, + ] + } } + + before do + allow(esql_executor).to receive(:retryable).and_yield + allow(client).to receive_message_chain(:esql, :query).and_return(response) + allow(response).to receive(:headers).and_return({}) + end + + it "includes 1st depth elements into event" do + esql_executor.do_run(output_queue, plugin_config["query"]) + + expect(output_queue.size).to eq(3) + 3.times do + event = output_queue.pop + expect(event.get('time')).to eq(50) + expect(event.get('[time][min]')).to eq(nil) + expect(event.get('[time][max]')).to eq(nil) + end + end + end + end + + describe "#column spec" do + let(:valid_spec) { { 'name' => 'field.name', 'type' => 'keyword' } } + let(:column_spec) { LogStash::Inputs::Elasticsearch::ColumnSpec.new(valid_spec) } + + context "when initializes" do + it "sets the name and type attributes" do + expect(column_spec.name).to eq("field.name") + expect(column_spec.type).to eq("keyword") + end + + it "freezes the name and type attributes" do + expect(column_spec.name).to be_frozen + expect(column_spec.type).to be_frozen + end + end + + context "when calls the field reference" do + it "returns the correct field reference format" do + expect(column_spec.field_reference).to eq("[field][name]") + end + end + end +end if LOGSTASH_VERSION >= LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION \ No newline at end of file diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index 0c07992..02787df 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1370,4 +1370,129 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie client.transport.respond_to?(:transport) ? client.transport.transport : client.transport end + describe "#ESQL" do + let(:config) do + { + "query" => "FROM test-index | STATS count() BY field", + "query_type" => "esql", + "retries" => 3 + } + end + let(:es_version) { LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION } + let(:ls_version) { LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION } + + before(:each) do + stub_const("LOGSTASH_VERSION", ls_version) + end + + describe "#initialize" do + it "sets up the ESQL client with correct parameters" do + expect(plugin.instance_variable_get(:@query_type)).to eq(config["query_type"]) + expect(plugin.instance_variable_get(:@query)).to eq(config["query"]) + expect(plugin.instance_variable_get(:@retries)).to eq(config["retries"]) + end + end + + describe "#register" do + before(:each) do + Elasticsearch::Client.send(:define_method, :ping) { } + allow_any_instance_of(Elasticsearch::Client).to receive(:info).and_return(cluster_info) + end + it "creates ES|QL executor" do + plugin.register + expect(plugin.instance_variable_get(:@query_executor)).to be_an_instance_of(LogStash::Inputs::Elasticsearch::Esql) + end + end + + describe "#validation" do + + describe "LS version" do + context "when compatible" do + + it "does not raise an error" do + expect { plugin.send(:validate_ls_version_for_esql_support!) }.not_to raise_error + end + end + + context "when incompatible" do + before(:each) do + stub_const("LOGSTASH_VERSION", "8.10.0") + end + + it "raises a runtime error" do + expect { plugin.send(:validate_ls_version_for_esql_support!) } + .to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{ls_version}/) + end + end + end + + describe "ES version" do + before(:each) do + allow(plugin).to receive(:es_version).and_return("8.10.5") + end + + context "when incompatible" do + it "raises a runtime error" do + expect { plugin.send(:validate_es_for_esql_support!) } + .to raise_error(RuntimeError, /Connected Elasticsearch 8.10.5 version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{es_version} version./) + end + end + end + + context "ES|QL query and DSL params used together" do + let(:config) { + super().merge({ + "index" => "my-index", + "size" => 1, + "slices" => 1, + "search_api" => "auto", + "docinfo" => true, + "docinfo_target" => "[@metadata][docinfo]", + "docinfo_fields" => ["_index"], + "response_type" => "hits", + "tracking_field" => "[@metadata][tracking]" + })} + + it "raises a config error" do + mixed_fields = %w[index size slices docinfo_fields response_type tracking_field] + expect { plugin.register }.to raise_error(LogStash::ConfigurationError, /Configured #{mixed_fields} params are not allowed while using ES|QL query/) + end + end + + describe "ES|QL query" do + context "when query is valid" do + it "does not raise an error" do + expect { plugin.send(:validate_esql_query!) }.not_to raise_error + end + end + + context "when query is empty" do + let(:config) do + { + "query" => " " + } + end + + it "raises a configuration error" do + expect { plugin.send(:validate_esql_query!) } + .to raise_error(LogStash::ConfigurationError, /`query` cannot be empty/) + end + end + + context "when query doesn't align with ES syntax" do + let(:config) do + { + "query" => "RANDOM query" + } + end + + it "raises a configuration error" do + source_commands = %w[FROM ROW SHOW] + expect { plugin.send(:validate_esql_query!) } + .to raise_error(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") + end + end + end + end + end end diff --git a/spec/inputs/integration/elasticsearch_esql_spec.rb b/spec/inputs/integration/elasticsearch_esql_spec.rb new file mode 100644 index 0000000..b25f65a --- /dev/null +++ b/spec/inputs/integration/elasticsearch_esql_spec.rb @@ -0,0 +1,150 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/elasticsearch" +require "elasticsearch" +require_relative "../../../spec/es_helper" + +describe LogStash::Inputs::Elasticsearch, integration: true do + + SECURE_INTEGRATION = ENV['SECURE_INTEGRATION'].eql? 'true' + ES_HOSTS = ["http#{SECURE_INTEGRATION ? 's' : nil}://#{ESHelper.get_host_port}"] + + let(:plugin) { described_class.new(config) } + let(:es_index) { "logstash-esql-integration-#{rand(1000)}" } + let(:test_documents) do + [ + { "message" => "test message 1", "type" => "a", "count" => 1 }, + { "message" => "test message 2", "type" => "a", "count" => 2 }, + { "message" => "test message 3", "type" => "b", "count" => 3 }, + { "message" => "test message 4", "type" => "b", "count" => 4 }, + { "message" => "test message 5", "type" => "c", "count" => 5 } + ] + end + let(:config) do + { + "hosts" => ES_HOSTS, + "query_type" => "esql" + } + end + let(:es_client) do + Elasticsearch::Client.new(hosts: ES_HOSTS) + end + + before(:all) do + is_ls_with_esql_supported_client = Gem::Version.create(LOGSTASH_VERSION) >= Gem::Version.create(LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION) + skip "LS version does not have ES client which supports ES|QL" unless is_ls_with_esql_supported_client + + # Skip tests if ES version doesn't support ES||QL + es_client = Elasticsearch::Client.new(hosts: ES_HOSTS) # need to separately create since let isn't allowed in before(:context) + es_version_info = es_client.info["version"] + es_gem_version = Gem::Version.create(es_version_info["number"]) + skip "ES version does not support ES|QL" if es_gem_version.nil? || es_gem_version < Gem::Version.create(LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION) + end + + before(:each) do + # Create index with test documents + es_client.indices.create(index: es_index, body: {}) unless es_client.indices.exists?(index: es_index) + + test_documents.each do |doc| + es_client.index(index: es_index, body: doc, refresh: true) + end + end + + after(:each) do + es_client.indices.delete(index: es_index) if es_client.indices.exists?(index: es_index) + end + + context "#run ES|QL queries" do + + before do + stub_const("LOGSTASH_VERSION", LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION) + allow_any_instance_of(LogStash::Inputs::Elasticsearch).to receive(:exit_plugin?).and_return false, true + end + + before(:each) do + plugin.register + end + + shared_examples "ESQL query execution" do |expected_count| + it "correctly retrieves documents" do + queue = Queue.new + plugin.run(queue) + + event_count = 0 + expected_count.times do |i| + event = queue.pop + expect(event).to be_a(LogStash::Event) + event_count += 1 + end + expect(event_count).to eq(expected_count) + end + end + + context "#FROM query" do + let(:config) do + super().merge("query" => "FROM #{es_index} | SORT count") + end + + include_examples "ESQL query execution", 5 + end + + context "#FROM query and WHERE clause" do + let(:config) do + super().merge("query" => "FROM #{es_index} | WHERE type == \"a\" | SORT count") + end + + include_examples "ESQL query execution", 2 + end + + context "#STATS aggregation" do + let(:config) do + super().merge("query" => "FROM #{es_index} | STATS avg(count) BY type") + end + + it "retrieves aggregated stats" do + queue = Queue.new + plugin.run(queue) + results = [] + 3.times do + event = queue.pop + expect(event).to be_a(LogStash::Event) + results << event.get("avg(count)") + end + + expected_averages = [1.5, 3.5, 5.0] + expect(results.sort).to eq(expected_averages) + end + end + + context "#METADATA" do + let(:config) do + super().merge("query" => "FROM #{es_index} METADATA _index, _id, _version | DROP message.keyword, type.keyword | SORT count") + end + + it "includes document metadata" do + queue = Queue.new + plugin.run(queue) + + 5.times do + event = queue.pop + expect(event).to be_a(LogStash::Event) + expect(event.get("_index")).not_to be_nil + expect(event.get("_id")).not_to be_nil + expect(event.get("_version")).not_to be_nil + end + end + end + + context "#invalid ES|QL query" do + let(:config) do + super().merge("query" => "FROM undefined index | LIMIT 1") + end + + it "doesn't produce events" do + queue = Queue.new + plugin.run(queue) + expect(queue.empty?).to eq(true) + end + end + end +end \ No newline at end of file