From a307db2d77708bc5b14a78aed679a1e2c2742e10 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 8 Apr 2025 07:36:21 -0700 Subject: [PATCH 01/22] ES|QL support: ESQL executor implementation, response type to accept esql option, validations to make sure both LS and ES support the ESQL execution. --- CHANGELOG.md | 3 + lib/logstash/inputs/elasticsearch.rb | 54 ++++++++++- lib/logstash/inputs/elasticsearch/esql.rb | 79 ++++++++++++++++ logstash-input-elasticsearch.gemspec | 2 +- spec/inputs/elasticsearch_esql_spec.rb | 96 ++++++++++++++++++++ spec/inputs/elasticsearch_spec.rb | 106 ++++++++++++++++++++++ 6 files changed, 334 insertions(+), 6 deletions(-) create mode 100644 lib/logstash/inputs/elasticsearch/esql.rb create mode 100644 spec/inputs/elasticsearch_esql_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index f4b4dc2..d63c5df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.2.0 + - ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233) + ## 5.1.0 - Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 564acc6..736c25d 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base require 'logstash/inputs/elasticsearch/paginated_search' require 'logstash/inputs/elasticsearch/aggregation' require 'logstash/inputs/elasticsearch/cursor_tracker' + require 'logstash/inputs/elasticsearch/esql' include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck @@ -104,7 +105,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # This allows you to speccify the response type: either hits or aggregations # where hits: normal search request # aggregations: aggregation request - config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits' + config :response_type, :validate => %w[hits aggregations esql], :default => 'hits' # This allows you to set the maximum number of hits returned per scroll. config :size, :validate => :number, :default => 1000 @@ -302,10 +303,17 @@ def register fill_hosts_from_cloud_id setup_ssl_params! - @base_query = LogStash::Json.load(@query) - if @slices - @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") - @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") + if @response_type == 'esql' + validate_ls_version_for_esql_support! + validate_esql_query! + inform_ineffective_esql_params + else + # for the ES|QL, plugin accepts raw string query but JSON for others + @base_query = LogStash::Json.load(@query) + if @slices + @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") + @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") + end end @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") @@ -341,6 +349,9 @@ def register test_connection! + # make sure connected ES supports ES|QL (8.11+) + validate_es_for_esql_support! if @response_type == 'esql' + setup_serverless setup_search_api @@ -398,6 +409,12 @@ def event_from_hit(hit, root_field) return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure']) end + def decorate_and_push_to_queue(output_queue, mapped_entry) + event = targeted_event_factory.new_event mapped_entry + decorate(event) + output_queue << event + end + def set_docinfo_fields(hit, event) # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} @@ -675,6 +692,8 @@ def setup_query_executor end when 'aggregations' LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) + when 'esql' + LogStash::Inputs::Elasticsearch::Esql.new(@client, self) end end @@ -714,6 +733,31 @@ def get_transport_client_class ::Elastic::Transport::Transport::HTTP::Manticore end + def validate_ls_version_for_esql_support! + # LS 8.17.4+ has elasticsearch-ruby 8.17 client + # elasticsearch-ruby 8.11+ supports ES|QL + if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create("8.17.4") + fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4") + end + end + + def validate_esql_query! + fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty? + source_commands = %w[FROM ROW SHOW] + contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) } + fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command + end + + def inform_ineffective_esql_params + ineffective_options = original_params.keys & %w(target size slices search_api) + @logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1 + end + + def validate_es_for_esql_support! + es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create("8.11") + fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. Please upgrade it.") unless es_supports_esql + end + module URIOrEmptyValidator ## # @override to provide :uri_or_empty validator diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb new file mode 100644 index 0000000..dbcaacf --- /dev/null +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -0,0 +1,79 @@ +require 'logstash/helpers/loggable_try' + +module LogStash + module Inputs + class Elasticsearch + class Esql + include LogStash::Util::Loggable + + ESQL_JOB = "ES|QL job" + + # Initialize the ESQL query executor + # @param client [Elasticsearch::Client] The Elasticsearch client instance + # @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance + def initialize(client, plugin) + @client = client + @plugin_params = plugin.params + @plugin = plugin + @retries = @plugin_params["retries"] + @query = @plugin_params["query"] + end + + # Execute the ESQL query and process results + # @param output_queue [Queue] The queue to push processed events to + def do_run(output_queue) + logger.info("ES|QL executor starting") + response = retryable(ESQL_JOB) do + @client.esql.query({ body: { query: @query }, format: 'json' }) + end + # retriable already printed error details + return if response == false + + if response&.headers&.dig("warning") + logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]}) + end + if response['values'] && response['columns'] + process_response(response['values'], response['columns'], output_queue) + end + end + + # Execute a retryable operation with proper error handling + # @param job_name [String] Name of the job for logging purposes + # @yield The block to execute + # @return [Boolean] true if successful, false otherwise + def retryable(job_name, &block) + stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name) + stud_try.try((@retries + 1).times) { yield } + rescue => e + error_details = {:message => e.message, :cause => e.cause} + error_details[:backtrace] = e.backtrace if logger.debug? + logger.error("#{job_name} failed with ", error_details) + false + end + + private + + # Process the ESQL response and push events to the output queue + # @param values [Array[Array]] The ESQL query response hits + # @param columns [Array[Hash]] The ESQL query response columns + # @param output_queue [Queue] The queue to push processed events to + def process_response(values, columns, output_queue) + values.each do |value| + mapped_data = map_column_and_values(columns, value) + @plugin.decorate_and_push_to_queue(output_queue, mapped_data) + end + end + + # Map column names to their corresponding values + # @param columns [Array] Array of column definitions + # @param values [Array] Array of values for the current row + # @return [Hash] Mapped data with column names as keys + def map_column_and_values(columns, values) + columns.each_with_index.with_object({}) do |(column, index), mapped_data| + mapped_data[column["name"]] = values[index] + end + end + end + end + end +end \ No newline at end of file diff --git a/logstash-input-elasticsearch.gemspec b/logstash-input-elasticsearch.gemspec index 469f4d8..976b3ef 100644 --- a/logstash-input-elasticsearch.gemspec +++ b/logstash-input-elasticsearch.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-elasticsearch' - s.version = '5.1.0' + s.version = '5.2.0' s.licenses = ['Apache License (2.0)'] s.summary = "Reads query results from an Elasticsearch cluster" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb new file mode 100644 index 0000000..8629fab --- /dev/null +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -0,0 +1,96 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/elasticsearch" +require "elasticsearch" + +describe LogStash::Inputs::Elasticsearch::Esql do + let(:client) { instance_double(Elasticsearch::Client) } + let(:esql_client) { double("esql-client") } + let(:plugin) { instance_double(LogStash::Inputs::Elasticsearch, params: plugin_config) } + let(:plugin_config) do + { + "query" => "FROM test-index | STATS count() BY field", + "retries" => 3 + } + end + let(:esql_executor) { described_class.new(client, plugin) } + + describe "when initializes" do + it "sets up the ESQL client with correct parameters" do + expect(esql_executor.instance_variable_get(:@query)).to eq(plugin_config["query"]) + expect(esql_executor.instance_variable_get(:@retries)).to eq(plugin_config["retries"]) + end + end + + describe "when faces error while retrying" do + it "retries the given block the specified number of times" do + attempts = 0 + result = esql_executor.retryable("Test Job") do + attempts += 1 + raise StandardError if attempts < 3 + "success" + end + expect(attempts).to eq(3) + expect(result).to eq("success") + end + + it "returns false if the block fails all attempts" do + result = esql_executor.retryable("Test Job") do + raise StandardError + end + expect(result).to eq(false) + end + end + + describe "when executing chain of processes" do + let(:output_queue) { Queue.new } + let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'id'}, { 'name' => 'val'}] } } + + before do + allow(esql_executor).to receive(:retryable).and_yield + allow(client).to receive_message_chain(:esql, :query).and_return(response) + allow(plugin).to receive(:decorate_and_push_to_queue) + end + + it "executes the ESQL query and processes the results" do + allow(response).to receive(:headers).and_return({}) + esql_executor.do_run(output_queue) + expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'}) + end + + it "logs a warning if the response contains a warning header" do + allow(response).to receive(:headers).and_return({"warning" => "some warning"}) + expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", {:message => "some warning"}) + esql_executor.do_run(output_queue) + end + + it "does not log a warning if the response does not contain a warning header" do + allow(response).to receive(:headers).and_return({}) + expect(esql_executor.logger).not_to receive(:warn) + esql_executor.do_run(output_queue) + end + end + + + describe "when starts processing the response" do + let(:output_queue) { Queue.new } + let(:values) { [%w[foo bar]] } + let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] } + + it "processes the ESQL response and pushes events to the output queue" do + allow(plugin).to receive(:decorate_and_push_to_queue) + esql_executor.send(:process_response, values, columns, output_queue) + expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'}) + end + end + + describe "when maps column and values" do + let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] } + let(:values) { %w[foo bar] } + + it "maps column names to their corresponding values" do + result = esql_executor.send(:map_column_and_values, columns, values) + expect(result).to eq({'id' => 'foo', 'val' => 'bar'}) + end + end +end \ No newline at end of file diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index 0c07992..d0a8123 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1370,4 +1370,110 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie client.transport.respond_to?(:transport) ? client.transport.transport : client.transport end + context "#ESQL" do + let(:config) do + { + "query" => "FROM test-index | STATS count() BY field", + "response_type" => "esql", + "retries" => 3 + } + end + let(:es_version) { "8.11.0" } + + before(:each) do + # ES|QL supported |elasticsearch-ruby v8 client is available from 8.17.4 + # this is a safeguard to let tests succeed in <8.17.4 versions, see validation test cases for unsupported behavior + stub_const("LOGSTASH_VERSION", "8.17.4") + end + + describe "#initialize" do + it "sets up the ESQL client with correct parameters" do + expect(plugin.instance_variable_get(:@query)).to eq(config["query"]) + expect(plugin.instance_variable_get(:@response_type)).to eq(config["response_type"]) + expect(plugin.instance_variable_get(:@retries)).to eq(config["retries"]) + end + end + + describe "#register" do + before(:each) do + Elasticsearch::Client.send(:define_method, :ping) { } + allow_any_instance_of(Elasticsearch::Client).to receive(:info).and_return(cluster_info) + end + it "creates ES|QL executor" do + plugin.register + expect(plugin.instance_variable_get(:@query_executor)).to be_an_instance_of(LogStash::Inputs::Elasticsearch::Esql) + end + end + + describe "#validation" do + + describe "LS version" do + context "when compatible" do + + it "does not raise an error" do + expect { plugin.send(:validate_ls_version_for_esql_support!) }.not_to raise_error + end + end + + context "when incompatible" do + before(:each) do + stub_const("LOGSTASH_VERSION", "8.10.0") + end + + it "raises a runtime error" do + expect { plugin.send(:validate_ls_version_for_esql_support!) } + .to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4/) + end + end + end + + describe "ES version" do + before(:each) do + allow(plugin).to receive(:es_version).and_return("8.10.5") + end + + context "when incompatible" do + it "raises a runtime error" do + expect { plugin.send(:validate_es_for_esql_support!) } + .to raise_error(RuntimeError, /Connected Elasticsearch 8.10.5 version does not supports ES|QL. Please upgrade it./) + end + end + end + + describe "ES|QL query" do + context "when query is valid" do + it "does not raise an error" do + expect { plugin.send(:validate_esql_query!) }.not_to raise_error + end + end + + context "when query is empty" do + let(:config) do + { + "query" => " " + } + end + + it "raises a configuration error" do + expect { plugin.send(:validate_esql_query!) } + .to raise_error(LogStash::ConfigurationError, /`query` cannot be empty/) + end + end + + context "when query doesn't align with ES syntax" do + let(:config) do + { + "query" => "RANDOM query" + } + end + + it "raises a configuration error" do + source_commands = %w[FROM ROW SHOW] + expect { plugin.send(:validate_esql_query!) } + .to raise_error(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") + end + end + end + end + end end From 9c35f22a5f0c7d539bcc6de1b5f37f709f215ef9 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 8 Apr 2025 17:14:51 -0700 Subject: [PATCH 02/22] Merge with upstream, warn if query doesn't include METADATA which DSL adds by default - might be users are looking for by default. --- lib/logstash/inputs/elasticsearch.rb | 5 +++-- lib/logstash/inputs/elasticsearch/esql.rb | 8 ++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 736c25d..24a72c9 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -366,15 +366,16 @@ def register def run(output_queue) if @schedule scheduler.cron(@schedule, :overlap => @schedule_overlap) do - @query_executor.do_run(output_queue, get_query_object()) + @query_executor.do_run(output_queue, get_query_object) end scheduler.join else - @query_executor.do_run(output_queue, get_query_object()) + @query_executor.do_run(output_queue, get_query_object) end end def get_query_object + return @query if @response_type == 'esql' if @cursor_tracker query = @cursor_tracker.inject_cursor(@query) @logger.debug("new query is #{query}") diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index dbcaacf..3cf50c1 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -17,14 +17,18 @@ def initialize(client, plugin) @plugin = plugin @retries = @plugin_params["retries"] @query = @plugin_params["query"] + unless @query.include?('METADATA') + logger.warn("The query doesn't have METADATA keyword. Including it makes _id and _version available in the documents", {:query => @query}) + end end # Execute the ESQL query and process results # @param output_queue [Queue] The queue to push processed events to - def do_run(output_queue) + # @param query A query to be executed + def do_run(output_queue, query) logger.info("ES|QL executor starting") response = retryable(ESQL_JOB) do - @client.esql.query({ body: { query: @query }, format: 'json' }) + @client.esql.query({ body: { query: query }, format: 'json' }) end # retriable already printed error details return if response == false From 6f99055b23fb092324f4acd5cc124d2f8b47c700 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 9 Apr 2025 23:57:55 -0700 Subject: [PATCH 03/22] Run unit tests with the LS version which actually supports the ES|QL. --- spec/inputs/elasticsearch_esql_spec.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb index 8629fab..411840e 100644 --- a/spec/inputs/elasticsearch_esql_spec.rb +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -54,20 +54,20 @@ it "executes the ESQL query and processes the results" do allow(response).to receive(:headers).and_return({}) - esql_executor.do_run(output_queue) + esql_executor.do_run(output_queue, plugin_config["query"]) expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'}) end it "logs a warning if the response contains a warning header" do allow(response).to receive(:headers).and_return({"warning" => "some warning"}) expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", {:message => "some warning"}) - esql_executor.do_run(output_queue) + esql_executor.do_run(output_queue, plugin_config["query"]) end it "does not log a warning if the response does not contain a warning header" do allow(response).to receive(:headers).and_return({}) expect(esql_executor.logger).not_to receive(:warn) - esql_executor.do_run(output_queue) + esql_executor.do_run(output_queue, plugin_config["query"]) end end @@ -93,4 +93,4 @@ expect(result).to eq({'id' => 'foo', 'val' => 'bar'}) end end -end \ No newline at end of file +end if LOGSTASH_VERSION >= '8.17.5' \ No newline at end of file From 086a5921246caad321cc628d5019178fb3ca6b3b Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 10 Apr 2025 11:18:16 -0700 Subject: [PATCH 04/22] Add query type to the agent. DRY of supported ES/LS versions. --- lib/logstash/inputs/elasticsearch.rb | 23 ++++++++++++----------- spec/inputs/elasticsearch_esql_spec.rb | 2 +- spec/inputs/elasticsearch_spec.rb | 11 +++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 24a72c9..e052e53 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -287,6 +287,9 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze INTERNAL_ORIGIN_HEADER = { 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze + LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8 + ES_ESQL_SUPPORT_VERSION = "8.11.0" + def initialize(params={}) super(params) @@ -308,7 +311,6 @@ def register validate_esql_query! inform_ineffective_esql_params else - # for the ES|QL, plugin accepts raw string query but JSON for others @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @@ -349,8 +351,7 @@ def register test_connection! - # make sure connected ES supports ES|QL (8.11+) - validate_es_for_esql_support! if @response_type == 'esql' + validate_es_for_esql_support! setup_serverless @@ -571,8 +572,8 @@ def prepare_user_agent jvm_version = java.lang.System.getProperty('java.version') plugin_version = Gem.loaded_specs["logstash-input-elasticsearch"].version - # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-input-elasticsearch/4.10.0 - "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}" + # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-input-elasticsearch/4.10.0 query_type/DSL + "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version} query_type/#{@response_type}" end def fill_user_password_from_cloud_auth @@ -735,10 +736,8 @@ def get_transport_client_class end def validate_ls_version_for_esql_support! - # LS 8.17.4+ has elasticsearch-ruby 8.17 client - # elasticsearch-ruby 8.11+ supports ES|QL - if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create("8.17.4") - fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4") + if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION) + fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}") end end @@ -755,8 +754,10 @@ def inform_ineffective_esql_params end def validate_es_for_esql_support! - es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create("8.11") - fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. Please upgrade it.") unless es_supports_esql + return unless @response_type == 'esql' + # make sure connected ES supports ES|QL (8.11+) + es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION) + fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql end module URIOrEmptyValidator diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb index 411840e..2625544 100644 --- a/spec/inputs/elasticsearch_esql_spec.rb +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -93,4 +93,4 @@ expect(result).to eq({'id' => 'foo', 'val' => 'bar'}) end end -end if LOGSTASH_VERSION >= '8.17.5' \ No newline at end of file +end if LOGSTASH_VERSION >= LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION \ No newline at end of file diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index d0a8123..104972e 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1378,12 +1378,11 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie "retries" => 3 } end - let(:es_version) { "8.11.0" } + let(:es_version) { LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION } + let(:ls_version) { LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION } before(:each) do - # ES|QL supported |elasticsearch-ruby v8 client is available from 8.17.4 - # this is a safeguard to let tests succeed in <8.17.4 versions, see validation test cases for unsupported behavior - stub_const("LOGSTASH_VERSION", "8.17.4") + stub_const("LOGSTASH_VERSION", ls_version) end describe "#initialize" do @@ -1422,7 +1421,7 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie it "raises a runtime error" do expect { plugin.send(:validate_ls_version_for_esql_support!) } - .to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4/) + .to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{ls_version}/) end end end @@ -1435,7 +1434,7 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie context "when incompatible" do it "raises a runtime error" do expect { plugin.send(:validate_es_for_esql_support!) } - .to raise_error(RuntimeError, /Connected Elasticsearch 8.10.5 version does not supports ES|QL. Please upgrade it./) + .to raise_error(RuntimeError, /Connected Elasticsearch 8.10.5 version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{es_version} version./) end end end From e30e0f9d58004019dbb39a28724a350bb7ccc4e8 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 10 Apr 2025 14:15:23 -0700 Subject: [PATCH 05/22] Remove query type from user-agent since it is useless, put back accidental method formatting. --- lib/logstash/inputs/elasticsearch.rb | 8 ++++---- lib/logstash/inputs/elasticsearch/esql.rb | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index e052e53..4b007bc 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -367,11 +367,11 @@ def register def run(output_queue) if @schedule scheduler.cron(@schedule, :overlap => @schedule_overlap) do - @query_executor.do_run(output_queue, get_query_object) + @query_executor.do_run(output_queue, get_query_object()) end scheduler.join else - @query_executor.do_run(output_queue, get_query_object) + @query_executor.do_run(output_queue, get_query_object()) end end @@ -572,8 +572,8 @@ def prepare_user_agent jvm_version = java.lang.System.getProperty('java.version') plugin_version = Gem.loaded_specs["logstash-input-elasticsearch"].version - # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-input-elasticsearch/4.10.0 query_type/DSL - "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version} query_type/#{@response_type}" + # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-input-elasticsearch/4.10.0 + "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}" end def fill_user_password_from_cloud_auth diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index 3cf50c1..bf7ff4c 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -27,6 +27,7 @@ def initialize(client, plugin) # @param query A query to be executed def do_run(output_queue, query) logger.info("ES|QL executor starting") + puts "Query: #{query}" response = retryable(ESQL_JOB) do @client.esql.query({ body: { query: query }, format: 'json' }) end From 7746c14c80ca4d16bada74d9dca9a33747c29aea Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 10 Apr 2025 16:29:24 -0700 Subject: [PATCH 06/22] Initial docs added for ES|QL. --- docs/index.asciidoc | 71 ++++++++++++++++++++++- lib/logstash/inputs/elasticsearch/esql.rb | 1 - 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 6478dff..722e78e 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -230,6 +230,40 @@ The next scheduled run: * uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and * updates the value of the field at the end of the pagination. +[id="plugins-{type}s-{plugin}-esql"] +==== ES|QL support +{es} Query Language (ES|QL) provides a SQL-like interface for querying your {es} data. + +To utilize the ES|QL feature with this plugin, the following version requirements must be met: +[cols="1,2",options="header"] +|=== +|Component |Minimum version +|{es} |8.11.0 or newer +|{ls} |8.17.4 or newer +|This plugin |4.23.0+ (4.x series) or 5.2.0+ (5.x series) +|=== + +To configure ES|QL query in the plugin, set the `response_type` to `esql` and provide your ES|QL query in the `query` parameter. + +IMPORTANT: We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments. + +The following is a basic scheduled ES|QL query that runs hourly: +[source, ruby] + input { + elasticsearch { + id => hourly_cron_job + hosts => [ 'https://..'] + api_key => '....' + response_type => 'esql' + query => 'FROM my-index | WHERE @timestamp > NOW() - 1 hour | LIMIT 500' + schedule => '0 * * * *' # every hour at min 0 + } + } + +NOTE: With ES|QL query, {ls} doesn't generate `event.original` + +For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation]. + [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Input configuration options @@ -257,7 +291,7 @@ Please check out <> for details. | <> |<>|No | <> |<>|No | <> |<>|No -| <> |<>, one of `["hits","aggregations"]`|No +| <> |<>, one of `["hits","aggregations","esql"]`|No | <> | <>|No | <> |<>|No | <> |<>|No @@ -507,17 +541,50 @@ the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the qu [id="plugins-{type}s-{plugin}-response_type"] ===== `response_type` - * Value can be any of: `hits`, `aggregations` + * Value can be any of: `hits`, `aggregations`, `esql` * Default value is `hits` Which part of the result to transform into Logstash events when processing the response from the query. + The default `hits` will generate one event per returned document (i.e. "hit"). + When set to `aggregations`, a single Logstash event will be generated with the contents of the `aggregations` object of the query's response. In this case the `hits` object will be ignored. The parameter `size` will be always be set to 0 regardless of the default or user-defined value set in this plugin. +When using the `esql` setting, the query parameter must be a valid plaintext ES|QL string. +When this setting is active, `target`, `size`, `slices` and `search_api` parameters are ignored. +ES|QL returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries). +The plugin maps each value entry to an event, populating corresponding fields. +For example, a query might produce a table like: + +[cols="2,1,1,2",options="header"] +|=== +|`timestamp` |`user_id` | `action` | `status_code` + +|2025-04-10T12:00:00 |123 |login |200 +|2025-04-10T12:05:00 |456 |purchase |403 +|=== + +For this case, the plugin emits two events look like +[source, json] +[ + { + "timestamp": "2025-04-10T12:00:00", + "user_id": 123, + "action": "login", + "status_code": 200 + }, + { + "timestamp": "2025-04-10T12:05:00", + "user_id": 456, + "action": "purchase", + "status_code": 403 + } +] + [id="plugins-{type}s-{plugin}-request_timeout_seconds"] ===== `request_timeout_seconds` diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index bf7ff4c..3cf50c1 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -27,7 +27,6 @@ def initialize(client, plugin) # @param query A query to be executed def do_run(output_queue, query) logger.info("ES|QL executor starting") - puts "Query: #{query}" response = retryable(ESQL_JOB) do @client.esql.query({ body: { query: query }, format: 'json' }) end From 76303d815aed2d72a14d7f2d642c3786d655d287 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 10 Apr 2025 17:08:46 -0700 Subject: [PATCH 07/22] Update query to include condition with string. --- docs/index.asciidoc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 722e78e..589dc56 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -255,7 +255,10 @@ The following is a basic scheduled ES|QL query that runs hourly: hosts => [ 'https://..'] api_key => '....' response_type => 'esql' - query => 'FROM my-index | WHERE @timestamp > NOW() - 1 hour | LIMIT 500' + query => ' + FROM food-index + | WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour + | LIMIT 500' schedule => '0 * * * *' # every hour at min 0 } } From 1fb29f7e112f5bf533411dc7e3d4e3203a80fc69 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Fri, 11 Apr 2025 22:21:44 -0700 Subject: [PATCH 08/22] Tested escaped chars cases, uses orignal query. --- docs/index.asciidoc | 5 ++++- lib/logstash/inputs/elasticsearch/esql.rb | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 589dc56..a0bfa1e 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -258,11 +258,14 @@ The following is a basic scheduled ES|QL query that runs hourly: query => ' FROM food-index | WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour - | LIMIT 500' + | LIMIT 500 + ' schedule => '0 * * * *' # every hour at min 0 } } +Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query. + NOTE: With ES|QL query, {ls} doesn't generate `event.original` For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation]. diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index 3cf50c1..5cdce78 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -13,10 +13,10 @@ class Esql # @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance def initialize(client, plugin) @client = client - @plugin_params = plugin.params @plugin = plugin - @retries = @plugin_params["retries"] - @query = @plugin_params["query"] + @retries = plugin.params["retries"] + + @query = plugin.params["query"] unless @query.include?('METADATA') logger.warn("The query doesn't have METADATA keyword. Including it makes _id and _version available in the documents", {:query => @query}) end @@ -24,11 +24,11 @@ def initialize(client, plugin) # Execute the ESQL query and process results # @param output_queue [Queue] The queue to push processed events to - # @param query A query to be executed + # @param query A query (to obey interface definition) def do_run(output_queue, query) logger.info("ES|QL executor starting") response = retryable(ESQL_JOB) do - @client.esql.query({ body: { query: query }, format: 'json' }) + @client.esql.query({ body: { query: @query }, format: 'json' }) end # retriable already printed error details return if response == false From 5d47f2f60a07971d4c1a5bfad6721a0f54c920f6 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 14 Apr 2025 15:05:43 -0700 Subject: [PATCH 09/22] Integration tests added. --- .../integration/elasticsearch_esql_spec.rb | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 spec/inputs/integration/elasticsearch_esql_spec.rb diff --git a/spec/inputs/integration/elasticsearch_esql_spec.rb b/spec/inputs/integration/elasticsearch_esql_spec.rb new file mode 100644 index 0000000..2c27bd8 --- /dev/null +++ b/spec/inputs/integration/elasticsearch_esql_spec.rb @@ -0,0 +1,148 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/elasticsearch" +require "elasticsearch" +require_relative "../../../spec/es_helper" + +describe LogStash::Inputs::Elasticsearch, integration: true do + + SECURE_INTEGRATION = ENV['SECURE_INTEGRATION'].eql? 'true' + ES_HOSTS = ["http#{SECURE_INTEGRATION ? 's' : nil}://#{ESHelper.get_host_port}"] + + let(:plugin) { described_class.new(config) } + let(:es_version) { LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION } + let(:es_index) { "logstash-esql-integration-#{rand(1000)}" } + let(:test_documents) do + [ + { "message" => "test message 1", "type" => "a", "count" => 1 }, + { "message" => "test message 2", "type" => "a", "count" => 2 }, + { "message" => "test message 3", "type" => "b", "count" => 3 }, + { "message" => "test message 4", "type" => "b", "count" => 4 }, + { "message" => "test message 5", "type" => "c", "count" => 5 } + ] + end + let(:config) do + { + "hosts" => ES_HOSTS, + "response_type" => "esql" + } + end + let(:es_client) do + Elasticsearch::Client.new(hosts: ES_HOSTS) + end + + before(:all) do + # Skip tests if ES version doesn't support ES||QL + es_client = Elasticsearch::Client.new(hosts: ES_HOSTS) # need to separately create since let isn't allowed in before(:context) + es_version_info = es_client.info["version"] + es_gem_version = Gem::Version.create(es_version_info["number"]) + skip "Elasticsearch version does not support ES|QL" if es_gem_version.nil? || es_gem_version < Gem::Version.create(LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION) + end + + before(:each) do + # Create index with test documents + es_client.indices.create(index: es_index, body: {}) unless es_client.indices.exists?(index: es_index) + + test_documents.each do |doc| + es_client.index(index: es_index, body: doc, refresh: true) + end + end + + after(:each) do + es_client.indices.delete(index: es_index) if es_client.indices.exists?(index: es_index) + end + + context "#run ES|QL queries" do + + before do + stub_const("LOGSTASH_VERSION", LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION) + allow_any_instance_of(LogStash::Inputs::Elasticsearch).to receive(:exit_plugin?).and_return false, true + end + + before(:each) do + plugin.register + end + + shared_examples "ESQL query execution" do |expected_count| + it "correctly retrieves documents" do + queue = Queue.new + plugin.run(queue) + + event_count = 0 + expected_count.times do |i| + event = queue.pop + expect(event).to be_a(LogStash::Event) + event_count += 1 + end + expect(event_count).to eq(expected_count) + end + end + + context "#FROM query" do + let(:config) do + super().merge("query" => "FROM #{es_index} | SORT count") + end + + include_examples "ESQL query execution", 5 + end + + context "#FROM query and WHERE clause" do + let(:config) do + super().merge("query" => "FROM #{es_index} | WHERE type == \"a\" | SORT count") + end + + include_examples "ESQL query execution", 2 + end + + context "#STATS aggregation" do + let(:config) do + super().merge("query" => "FROM #{es_index} | STATS avg(count) BY type") + end + + it "retrieves aggregated stats" do + queue = Queue.new + plugin.run(queue) + results = [] + 3.times do + event = queue.pop + expect(event).to be_a(LogStash::Event) + results << event.get("avg(count)") + end + + expected_averages = [1.5, 3.5, 5.0] + expect(results.sort).to eq(expected_averages) + end + end + + context "#METADATA included" do + let(:config) do + super().merge("query" => "FROM #{es_index} METADATA _index, _id, _version | SORT count") + end + + it "includes document metadata" do + queue = Queue.new + plugin.run(queue) + + 5.times do + event = queue.pop + expect(event).to be_a(LogStash::Event) + expect(event.get("_index")).not_to be_nil + expect(event.get("_id")).not_to be_nil + expect(event.get("_version")).not_to be_nil + end + end + end + + context "#invalid ES|QL query" do + let(:config) do + super().merge("query" => "FROM undefined index | LIMIT 1") + end + + it "doesn't produce events" do + queue = Queue.new + plugin.run(queue) + expect(queue.empty?).to eq(true) + end + end + end +end \ No newline at end of file From c291e24e7dacfceb9285949f87598f0574b13795 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 14 Apr 2025 15:42:13 -0700 Subject: [PATCH 10/22] Skip the ESQL test if LS with the ES client which doesn't support ESQL feature. --- spec/inputs/integration/elasticsearch_esql_spec.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spec/inputs/integration/elasticsearch_esql_spec.rb b/spec/inputs/integration/elasticsearch_esql_spec.rb index 2c27bd8..36d4e48 100644 --- a/spec/inputs/integration/elasticsearch_esql_spec.rb +++ b/spec/inputs/integration/elasticsearch_esql_spec.rb @@ -10,7 +10,6 @@ ES_HOSTS = ["http#{SECURE_INTEGRATION ? 's' : nil}://#{ESHelper.get_host_port}"] let(:plugin) { described_class.new(config) } - let(:es_version) { LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION } let(:es_index) { "logstash-esql-integration-#{rand(1000)}" } let(:test_documents) do [ @@ -32,11 +31,14 @@ end before(:all) do + is_ls_with_esql_supported_client = Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION) + skip "LS version does not have ES client which supports ES|QL" unless is_ls_with_esql_supported_client + # Skip tests if ES version doesn't support ES||QL es_client = Elasticsearch::Client.new(hosts: ES_HOSTS) # need to separately create since let isn't allowed in before(:context) es_version_info = es_client.info["version"] es_gem_version = Gem::Version.create(es_version_info["number"]) - skip "Elasticsearch version does not support ES|QL" if es_gem_version.nil? || es_gem_version < Gem::Version.create(LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION) + skip "ES version does not support ES|QL" if es_gem_version.nil? || es_gem_version < Gem::Version.create(LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION) end before(:each) do From 22e72e9a41d0f05d62827ffa0dd8573c25cc171f Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 14 Apr 2025 16:53:40 -0700 Subject: [PATCH 11/22] Add comments on response type and query params about ES|QL acceptance/info and add docinfo* fields in ineffective fields list. --- lib/logstash/inputs/elasticsearch.rb | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 4b007bc..3cff066 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -97,14 +97,17 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # The index or alias to search. config :index, :validate => :string, :default => "logstash-*" - # The query to be executed. Read the Elasticsearch query DSL documentation - # for more info - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # The query to be executed. DSL or ES|QL (when `response_type => 'esql'`) query type is accepted. + # Read the following documentations for more info + # Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }' - # This allows you to speccify the response type: either hits or aggregations - # where hits: normal search request - # aggregations: aggregation request + # This allows you to speccify the response type: one of [hits, aggregations, esql] + # where + # hits: normal search request + # aggregations: aggregation request + # esql: ES|QL request config :response_type, :validate => %w[hits aggregations esql], :default => 'hits' # This allows you to set the maximum number of hits returned per scroll. @@ -749,7 +752,7 @@ def validate_esql_query! end def inform_ineffective_esql_params - ineffective_options = original_params.keys & %w(target size slices search_api) + ineffective_options = original_params.keys & %w(index target size slices search_api, docinfo, docinfo_target, docinfo_fields) @logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1 end From af6e24aa50bfde0e196d733a3c2c378650499eeb Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Mon, 21 Apr 2025 14:20:43 -0700 Subject: [PATCH 12/22] Update spec/inputs/integration/elasticsearch_esql_spec.rb Fix the condition to correctly compares supported LS version. --- spec/inputs/integration/elasticsearch_esql_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/inputs/integration/elasticsearch_esql_spec.rb b/spec/inputs/integration/elasticsearch_esql_spec.rb index 36d4e48..3c9ff7a 100644 --- a/spec/inputs/integration/elasticsearch_esql_spec.rb +++ b/spec/inputs/integration/elasticsearch_esql_spec.rb @@ -31,7 +31,7 @@ end before(:all) do - is_ls_with_esql_supported_client = Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LogStash::Inputs::Elasticsearch::ES_ESQL_SUPPORT_VERSION) + is_ls_with_esql_supported_client = Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION) skip "LS version does not have ES client which supports ES|QL" unless is_ls_with_esql_supported_client # Skip tests if ES version doesn't support ES||QL From 4ce6fa4a113c6fcb62247bcfebd625a80caf0b46 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 21 Apr 2025 14:29:06 -0700 Subject: [PATCH 13/22] Integration test skip condition correction. --- spec/inputs/integration/elasticsearch_esql_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/inputs/integration/elasticsearch_esql_spec.rb b/spec/inputs/integration/elasticsearch_esql_spec.rb index 3c9ff7a..819fac4 100644 --- a/spec/inputs/integration/elasticsearch_esql_spec.rb +++ b/spec/inputs/integration/elasticsearch_esql_spec.rb @@ -31,7 +31,7 @@ end before(:all) do - is_ls_with_esql_supported_client = Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION) + is_ls_with_esql_supported_client = Gem::Version.create(LOGSTASH_VERSION) >= Gem::Version.create(LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION) skip "LS version does not have ES client which supports ES|QL" unless is_ls_with_esql_supported_client # Skip tests if ES version doesn't support ES||QL From 4ed69ff9ddf33174d45f66e0137ac94b2f6c6f99 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 24 Apr 2025 16:26:59 -0700 Subject: [PATCH 14/22] Introduce query_params option to accept drop_null_columns, set default timestampt converter to LogStash::Timestamp, dotted fields extended to nested fields. --- docs/index.asciidoc | 34 ++++++++++++++++++++++- lib/logstash/inputs/elasticsearch.rb | 16 +++++++++++ lib/logstash/inputs/elasticsearch/esql.rb | 31 +++++++++++++++++++-- spec/inputs/elasticsearch_esql_spec.rb | 9 +++--- spec/inputs/elasticsearch_spec.rb | 32 +++++++++++++++++++++ 5 files changed, 113 insertions(+), 9 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index a0bfa1e..bdf86c9 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -266,7 +266,13 @@ The following is a basic scheduled ES|QL query that runs hourly: Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query. -NOTE: With ES|QL query, {ls} doesn't generate `event.original` +NOTE: With ES|QL query, {ls} doesn't generate `event.original`. + +Consider the following caveat scenarios: + +- ES|QL by default returns entire columns even if their values are `null`. The plugin provides a `drop_null_columns` option via <>. Enabling this parameter instructs {es} to automatically exclude columns with null values from query results. +- If your {es} index uses https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] mapping(s), ES|QL query fetches all parent and sub-fields fields. Since {ls} events cannot contain parent field's concrete value and sub-field values together, we recommend using the `DROP` keyword in your ES|QL query explicitly remove sub-fields. +- If your {es} index contains top level `tags` field, this will conflict with {ls} event's reserved `tags` field. {ls} moves `tags` field values to the `_tags` and populates `tags` with `["_tagsparsefailure"]`. For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation]. @@ -297,6 +303,7 @@ Please check out <> for details. | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>, one of `["hits","aggregations","esql"]`|No | <> | <>|No | <> |<>|No @@ -544,6 +551,31 @@ documentation] for more information. When <> resolves to `search_after` and the query does not specify `sort`, the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more. +[id="plugins-{type}s-{plugin}-query_params"] +===== `query_params` +Parameters to send to {es} together with <>. + +Accepted options: +[cols="2,1,3",options="header"] +|=== +|Option name |Default value | Description + +|`drop_null_columns` |`false` | Requests {es} to filter out `null` columns +|=== + +Example +[source, ruby] + input { + elasticsearch { + response_type => 'esql' + query => 'FROM access-logs* | WHERE type="apache"' + query_params => { + drop_null_columns => true + } + } + } + + [id="plugins-{type}s-{plugin}-response_type"] ===== `response_type` diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 3cff066..c5ef818 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -276,6 +276,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # If set, the _source of each hit will be added nested under the target instead of at the top-level config :target, :validate => :field_reference + # Parameters query or query APIs can use + # current acceptable params: drop_null_columns => true|false (for ES|QL) + config :query_params, :validate => :hash, :default => {} + # Obsolete Settings config :ssl, :obsolete => "Set 'ssl_enabled' instead." config :ca_file, :obsolete => "Set 'ssl_certificate_authorities' instead." @@ -323,6 +327,7 @@ def register @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") + validate_query_params! validate_authentication fill_user_password_from_cloud_auth @@ -751,6 +756,17 @@ def validate_esql_query! fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command end + def validate_query_params! + # keep the original, remove ES|QL accepted params and validate + cloned_query_params = @query_params.clone + if @response_type == 'esql' + cloned_query_params.delete("drop_null_columns") + fail(LogStash::ConfigurationError, "#{cloned_query_params} not accepted when `response_type => 'esql'`") if cloned_query_params.any? + else + fail(LogStash::ConfigurationError, "#{@query_params} not accepted when `response_type => #{@response_type}`") if @query_params.any? + end + end + def inform_ineffective_esql_params ineffective_options = original_params.keys & %w(index target size slices search_api, docinfo, docinfo_target, docinfo_fields) @logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1 diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index 5cdce78..948cbb0 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -8,6 +8,10 @@ class Esql ESQL_JOB = "ES|QL job" + ESQL_PARSERS_BY_TYPE = Hash.new(lambda { |x| x }).merge( + 'date' => ->(value) { value && LogStash::Timestamp.new(value) }, + ) + # Initialize the ESQL query executor # @param client [Elasticsearch::Client] The Elasticsearch client instance # @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance @@ -20,6 +24,9 @@ def initialize(client, plugin) unless @query.include?('METADATA') logger.warn("The query doesn't have METADATA keyword. Including it makes _id and _version available in the documents", {:query => @query}) end + + params = plugin.params["query_params"] || {} + @drop_null_columns = params["drop_null_columns"] || false end # Execute the ESQL query and process results @@ -28,7 +35,7 @@ def initialize(client, plugin) def do_run(output_queue, query) logger.info("ES|QL executor starting") response = retryable(ESQL_JOB) do - @client.esql.query({ body: { query: @query }, format: 'json' }) + @client.esql.query({ body: { query: @query }, format: 'json', drop_null_columns: @drop_null_columns }) end # retriable already printed error details return if response == false @@ -64,7 +71,13 @@ def retryable(job_name, &block) def process_response(values, columns, output_queue) values.each do |value| mapped_data = map_column_and_values(columns, value) - @plugin.decorate_and_push_to_queue(output_queue, mapped_data) + nest_structured_data = nest_keys(mapped_data) + @plugin.decorate_and_push_to_queue(output_queue, nest_structured_data) + rescue => e + # if event creation fails with whatever reason, inform user and tag with failure and return entry as it is + logger.warn("Event creation error, ", message: e.message, exception: e.class, data: { "columns" => columns, "values" => [value] }) + failed_event = LogStash::Event.new("columns" => columns, "values" => [value], "tags" => ['_elasticsearch_input_failure']) + output_queue << failed_event end end @@ -74,7 +87,19 @@ def process_response(values, columns, output_queue) # @return [Hash] Mapped data with column names as keys def map_column_and_values(columns, values) columns.each_with_index.with_object({}) do |(column, index), mapped_data| - mapped_data[column["name"]] = values[index] + mapped_data[column["name"]] = ESQL_PARSERS_BY_TYPE[column["type"]].call(values[index]) + end + end + + # Transforms dotted keys to nested JSON shape + # @param dot_keyed_hash [Hash] whose keys are dotted (example 'a.b.c.d': 'val') + # @return [Hash] whose keys are nested with value mapped ({'a':{'b':{'c':{'d':'val'}}}}) + def nest_keys(dot_keyed_hash) + dot_keyed_hash.each_with_object({}) do |(key, value), result| + key_parts = key.to_s.split('.') + *path, leaf = key_parts + leaf_scope = path.inject(result) { |scope, part| scope[part] ||= {} } + leaf_scope[leaf] = value end end end diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb index 2625544..3980a8d 100644 --- a/spec/inputs/elasticsearch_esql_spec.rb +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -44,7 +44,7 @@ describe "when executing chain of processes" do let(:output_queue) { Queue.new } - let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'id'}, { 'name' => 'val'}] } } + let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'a.b.1.d'}, { 'name' => 'h_g.k$l.m.0'}] } } before do allow(esql_executor).to receive(:retryable).and_yield @@ -55,7 +55,7 @@ it "executes the ESQL query and processes the results" do allow(response).to receive(:headers).and_return({}) esql_executor.do_run(output_queue, plugin_config["query"]) - expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'}) + expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {"a"=>{"b"=>{"1"=>{"d"=>"foo"}}}, "h_g"=>{"k$l"=>{"m"=>{"0"=>"bar"}}}}) end it "logs a warning if the response contains a warning header" do @@ -71,16 +71,15 @@ end end - describe "when starts processing the response" do let(:output_queue) { Queue.new } let(:values) { [%w[foo bar]] } - let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] } + let(:columns) { [{'name' => 'some.id'}, {'name' => 'some.val'}] } it "processes the ESQL response and pushes events to the output queue" do allow(plugin).to receive(:decorate_and_push_to_queue) esql_executor.send(:process_response, values, columns, output_queue) - expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'}) + expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {"some"=>{"id"=>"foo", "val"=>"bar"}}) end end diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index 104972e..b7c97e0 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1473,6 +1473,38 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie end end end + + describe "with extra params" do + context "empty `query_params`" do + let(:config) { + super().merge('query_params' => {}) + } + + it "does not raise a configuration error" do + expect { plugin.send(:validate_query_params!) }.not_to raise_error + end + end + + context "with actual `drop_null_columns` value" do + let(:config) { + super().merge('query_params' => { 'drop_null_columns' => true }) + } + + it "does not raise a configuration error" do + expect { plugin.send(:validate_query_params!) }.not_to raise_error + end + end + + context "with extra non ES|QL params" do + let(:config) { + super().merge('query_params' => { 'drop_null_columns' => true, 'test' => 'hi'}) + } + + it "does not raise a configuration error" do + expect { plugin.send(:validate_query_params!) }.to raise_error(LogStash::ConfigurationError, "{\"test\"=>\"hi\"} not accepted when `response_type => 'esql'`") + end + end + end end end end From 0725f98a10d849714e342436359a3e57522b6f49 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 24 Apr 2025 21:57:44 -0700 Subject: [PATCH 15/22] Fix the failed integration test. --- spec/inputs/integration/elasticsearch_esql_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/inputs/integration/elasticsearch_esql_spec.rb b/spec/inputs/integration/elasticsearch_esql_spec.rb index 819fac4..b27b8dd 100644 --- a/spec/inputs/integration/elasticsearch_esql_spec.rb +++ b/spec/inputs/integration/elasticsearch_esql_spec.rb @@ -116,9 +116,9 @@ end end - context "#METADATA included" do + context "#METADATA" do let(:config) do - super().merge("query" => "FROM #{es_index} METADATA _index, _id, _version | SORT count") + super().merge("query" => "FROM #{es_index} METADATA _index, _id, _version | DROP message.keyword, type.keyword | SORT count") end it "includes document metadata" do From cfb36f37d5bf1dd75f17185cfca3a7cac9daad3a Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 30 Apr 2025 18:53:12 -0700 Subject: [PATCH 16/22] Request dropping null columns and filter out null values. Consider setting the result into target if defined. Debug logs added which can help to investigate query and its result. --- docs/index.asciidoc | 125 +++++++++-------- lib/logstash/inputs/elasticsearch.rb | 60 +++----- lib/logstash/inputs/elasticsearch/esql.rb | 80 ++++++----- spec/inputs/elasticsearch_esql_spec.rb | 163 +++++++++++++++------- spec/inputs/elasticsearch_spec.rb | 34 +---- 5 files changed, 238 insertions(+), 224 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index bdf86c9..4b57ad0 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -268,13 +268,67 @@ Set `config.support_escapes: true` in `logstash.yml` if you need to escape speci NOTE: With ES|QL query, {ls} doesn't generate `event.original`. -Consider the following caveat scenarios: +[id="plugins-{type}s-{plugin}-esql-event-mapping"] +===== Mapping ES|QL result to {ls} event +ES|QL returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries). +The plugin maps each value entry to an event, populating corresponding fields. +For example, a query might produce a table like: + +[cols="2,1,1,1,2",options="header"] +|=== +|`timestamp` |`user_id` | `action` | `status.code` | `status.desc` + +|2025-04-10T12:00:00 |123 |login |200 | Success +|2025-04-10T12:05:00 |456 |purchase |403 | Forbidden (unauthorized user) +|=== + +For this case, the plugin emits two events look like +[source, json] +[ + { + "timestamp": "2025-04-10T12:00:00", + "user_id": 123, + "action": "login", + "status": { + "code": 200, + "desc": "Success" + } + }, + { + "timestamp": "2025-04-10T12:05:00", + "user_id": 456, + "action": "purchase", + "status": { + "code": 403, + "desc": "Forbidden (unauthorized user)" + } + } +] -- ES|QL by default returns entire columns even if their values are `null`. The plugin provides a `drop_null_columns` option via <>. Enabling this parameter instructs {es} to automatically exclude columns with null values from query results. -- If your {es} index uses https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] mapping(s), ES|QL query fetches all parent and sub-fields fields. Since {ls} events cannot contain parent field's concrete value and sub-field values together, we recommend using the `DROP` keyword in your ES|QL query explicitly remove sub-fields. -- If your {es} index contains top level `tags` field, this will conflict with {ls} event's reserved `tags` field. {ls} moves `tags` field values to the `_tags` and populates `tags` with `["_tagsparsefailure"]`. +NOTE: If your index has a mapping with sub-objects where `status.code` and `status.desc` actually dotted fields, they appear in {ls} events as a nested structure. -For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation]. +[id="plugins-{type}s-{plugin}-esql-multifields"] +===== Conflict on multi-fields + +ES|QL query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects]. +Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin cannot map the result to {ls} event and produces `_elasticsearch_input_failure` tagged failed event. +We recommend using the `RENAME` (or `DROP`) keyword in your ES|QL query explicitly rename the fields to overcome this issue. +To illustrate the situation with example, assuming your mapping has a time `time` field with `time.min` and `time.max` sub-fields as following: +[source, ruby] + "properties": { + "time": { "type": "long" }, + "time.min": { "type": "long" }, + "time.max": { "type": "long" } + } + +The ES|QL result will contain all three fields but the plugin cannot map them into {ls} event. +To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields. +[source, ruby] + ... + query => 'FROM my-index | RENAME time AS time.current' + ... + +For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{es} ES|QL documentation]. [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Input configuration options @@ -303,7 +357,6 @@ Please check out <> for details. | <> |<>|No | <> |<>|No | <> |<>|No -| <> |<>|No | <> |<>, one of `["hits","aggregations","esql"]`|No | <> | <>|No | <> |<>|No @@ -545,37 +598,13 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`. * Value type is <> * Default value is `'{ "sort": [ "_doc" ] }'` -The query to be executed. Read the {ref}/query-dsl.html[Elasticsearch query DSL -documentation] for more information. +The query to be executed. +Accepted query shape is DSL or ES|QL (when `response_type => 'esql'`). +Read the {ref}/query-dsl.html[{es} query DSL documentation] or {ref}/esql.html[{es} ES|QL documentation] for more information. When <> resolves to `search_after` and the query does not specify `sort`, the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more. -[id="plugins-{type}s-{plugin}-query_params"] -===== `query_params` -Parameters to send to {es} together with <>. - -Accepted options: -[cols="2,1,3",options="header"] -|=== -|Option name |Default value | Description - -|`drop_null_columns` |`false` | Requests {es} to filter out `null` columns -|=== - -Example -[source, ruby] - input { - elasticsearch { - response_type => 'esql' - query => 'FROM access-logs* | WHERE type="apache"' - query_params => { - drop_null_columns => true - } - } - } - - [id="plugins-{type}s-{plugin}-response_type"] ===== `response_type` @@ -592,36 +621,8 @@ contents of the `aggregations` object of the query's response. In this case the `hits` object will be ignored. The parameter `size` will be always be set to 0 regardless of the default or user-defined value set in this plugin. -When using the `esql` setting, the query parameter must be a valid plaintext ES|QL string. +When using the `esql` setting, the query must be a valid ES|QL string. When this setting is active, `target`, `size`, `slices` and `search_api` parameters are ignored. -ES|QL returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries). -The plugin maps each value entry to an event, populating corresponding fields. -For example, a query might produce a table like: - -[cols="2,1,1,2",options="header"] -|=== -|`timestamp` |`user_id` | `action` | `status_code` - -|2025-04-10T12:00:00 |123 |login |200 -|2025-04-10T12:05:00 |456 |purchase |403 -|=== - -For this case, the plugin emits two events look like -[source, json] -[ - { - "timestamp": "2025-04-10T12:00:00", - "user_id": 123, - "action": "login", - "status_code": 200 - }, - { - "timestamp": "2025-04-10T12:05:00", - "user_id": 456, - "action": "purchase", - "status_code": 403 - } -] [id="plugins-{type}s-{plugin}-request_timeout_seconds"] ===== `request_timeout_seconds` diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index c5ef818..c5f596e 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -97,7 +97,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # The index or alias to search. config :index, :validate => :string, :default => "logstash-*" - # The query to be executed. DSL or ES|QL (when `response_type => 'esql'`) query type is accepted. + # The query to be executed. DSL or ES|QL (when `response_type => 'esql'`) query shape is accepted. # Read the following documentations for more info # Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html # ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html @@ -276,10 +276,6 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # If set, the _source of each hit will be added nested under the target instead of at the top-level config :target, :validate => :field_reference - # Parameters query or query APIs can use - # current acceptable params: drop_null_columns => true|false (for ES|QL) - config :query_params, :validate => :hash, :default => {} - # Obsolete Settings config :ssl, :obsolete => "Set 'ssl_enabled' instead." config :ca_file, :obsolete => "Set 'ssl_certificate_authorities' instead." @@ -316,7 +312,8 @@ def register if @response_type == 'esql' validate_ls_version_for_esql_support! validate_esql_query! - inform_ineffective_esql_params + ignored_options = original_params.keys & %w(index size slices search_api, docinfo, docinfo_target, docinfo_fields) + @logger.info("Configured #{ignored_options} params are ignored in ES|QL query") if ignored_options&.size > 1 else @base_query = LogStash::Json.load(@query) if @slices @@ -327,7 +324,6 @@ def register @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") - validate_query_params! validate_authentication fill_user_password_from_cloud_auth @@ -383,6 +379,22 @@ def run(output_queue) end end + ## + # This can be called externally from the query_executor + public + def push_hit(hit, output_queue, root_field = '_source') + event = event_from_hit(hit, root_field) + decorate(event) + output_queue << event + record_last_value(event) + end + + def decorate_event(event) + decorate(event) + end + + private + def get_query_object return @query if @response_type == 'esql' if @cursor_tracker @@ -394,16 +406,6 @@ def get_query_object LogStash::Json.load(query) end - ## - # This can be called externally from the query_executor - public - def push_hit(hit, output_queue, root_field = '_source') - event = event_from_hit(hit, root_field) - decorate(event) - output_queue << event - record_last_value(event) - end - def record_last_value(event) @cursor_tracker.record_last_value(event) if @tracking_field end @@ -419,12 +421,6 @@ def event_from_hit(hit, root_field) return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure']) end - def decorate_and_push_to_queue(output_queue, mapped_entry) - event = targeted_event_factory.new_event mapped_entry - decorate(event) - output_queue << event - end - def set_docinfo_fields(hit, event) # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} @@ -441,8 +437,6 @@ def set_docinfo_fields(hit, event) event.set(@docinfo_target, docinfo_target) end - private - def hosts_default?(hosts) hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? ) end @@ -756,22 +750,6 @@ def validate_esql_query! fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command end - def validate_query_params! - # keep the original, remove ES|QL accepted params and validate - cloned_query_params = @query_params.clone - if @response_type == 'esql' - cloned_query_params.delete("drop_null_columns") - fail(LogStash::ConfigurationError, "#{cloned_query_params} not accepted when `response_type => 'esql'`") if cloned_query_params.any? - else - fail(LogStash::ConfigurationError, "#{@query_params} not accepted when `response_type => #{@response_type}`") if @query_params.any? - end - end - - def inform_ineffective_esql_params - ineffective_options = original_params.keys & %w(index target size slices search_api, docinfo, docinfo_target, docinfo_fields) - @logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1 - end - def validate_es_for_esql_support! return unless @response_type == 'esql' # make sure connected ES supports ES|QL (8.11+) diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index 948cbb0..a6b9ed2 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -18,24 +18,23 @@ class Esql def initialize(client, plugin) @client = client @plugin = plugin + @target_field = plugin.params["target"] @retries = plugin.params["retries"] @query = plugin.params["query"] unless @query.include?('METADATA') - logger.warn("The query doesn't have METADATA keyword. Including it makes _id and _version available in the documents", {:query => @query}) + logger.info("`METADATA` not found the query. `_id`, `_version` and `_index` will not be available in the result", {:query => @query}) end - - params = plugin.params["query_params"] || {} - @drop_null_columns = params["drop_null_columns"] || false + logger.debug("ES|QL executor initialized with", {:query => @query}) end # Execute the ESQL query and process results # @param output_queue [Queue] The queue to push processed events to # @param query A query (to obey interface definition) def do_run(output_queue, query) - logger.info("ES|QL executor starting") + logger.info("ES|QL executor has started") response = retryable(ESQL_JOB) do - @client.esql.query({ body: { query: @query }, format: 'json', drop_null_columns: @drop_null_columns }) + @client.esql.query({ body: { query: @query }, format: 'json', drop_null_columns: true }) end # retriable already printed error details return if response == false @@ -43,9 +42,11 @@ def do_run(output_queue, query) if response&.headers&.dig("warning") logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]}) end - if response['values'] && response['columns'] - process_response(response['values'], response['columns'], output_queue) - end + columns = response['columns']&.freeze + values = response['values']&.freeze + logger.debug("ES|QL query response size: #{values&.size}") + + process_response(columns, values, output_queue) if columns && values end # Execute a retryable operation with proper error handling @@ -65,42 +66,51 @@ def retryable(job_name, &block) private # Process the ESQL response and push events to the output queue - # @param values [Array[Array]] The ESQL query response hits # @param columns [Array[Hash]] The ESQL query response columns + # @param values [Array[Array]] The ESQL query response hits # @param output_queue [Queue] The queue to push processed events to - def process_response(values, columns, output_queue) - values.each do |value| - mapped_data = map_column_and_values(columns, value) - nest_structured_data = nest_keys(mapped_data) - @plugin.decorate_and_push_to_queue(output_queue, nest_structured_data) + def process_response(columns, values, output_queue) + column_specs = columns.map { |column| ColumnSpec.new(column) } + values.each do |row| + event = column_specs.zip(row).each_with_object(LogStash::Event.new) do |(column, value), event| + # `unless value.nil?` is a part of `drop_null_columns` that if some of columns' values are not `nil`, `nil` values appear + # we should continuously filter out them to achieve full `drop_null_columns` on each individual row (ideal `LIMIT 1` result) + unless value.nil? + field_reference = @target_field.nil? ? column.field_reference : "[#{@target_field}][#{column.field_reference}]" + event.set(field_reference, ESQL_PARSERS_BY_TYPE[column.type].call(value)) + end + end + @plugin.decorate_event(event) + output_queue << event rescue => e # if event creation fails with whatever reason, inform user and tag with failure and return entry as it is - logger.warn("Event creation error, ", message: e.message, exception: e.class, data: { "columns" => columns, "values" => [value] }) - failed_event = LogStash::Event.new("columns" => columns, "values" => [value], "tags" => ['_elasticsearch_input_failure']) + logger.warn("Event creation error, ", message: e.message, exception: e.class, data: { "columns" => columns, "values" => [row] }) + failed_event = LogStash::Event.new("columns" => columns, "values" => [row], "tags" => ['_elasticsearch_input_failure']) output_queue << failed_event end end + end + + # Class representing a column specification in the ESQL response['columns'] + # The class's main purpose is to provide a structure for the event key + # columns is an array with `name` and `type` pair (example: `{"name"=>"@timestamp", "type"=>"date"}`) + # @attr_reader :name [String] The name of the column + # @attr_reader :type [String] The type of the column + class ColumnSpec + attr_reader :name, :type + + def initialize(spec) + @name = isolate(spec.fetch('name')) + @type = isolate(spec.fetch('type')) + end - # Map column names to their corresponding values - # @param columns [Array] Array of column definitions - # @param values [Array] Array of values for the current row - # @return [Hash] Mapped data with column names as keys - def map_column_and_values(columns, values) - columns.each_with_index.with_object({}) do |(column, index), mapped_data| - mapped_data[column["name"]] = ESQL_PARSERS_BY_TYPE[column["type"]].call(values[index]) - end + def field_reference + @_field_reference ||= '[' + name.gsub('.', '][') + ']' end - # Transforms dotted keys to nested JSON shape - # @param dot_keyed_hash [Hash] whose keys are dotted (example 'a.b.c.d': 'val') - # @return [Hash] whose keys are nested with value mapped ({'a':{'b':{'c':{'d':'val'}}}}) - def nest_keys(dot_keyed_hash) - dot_keyed_hash.each_with_object({}) do |(key, value), result| - key_parts = key.to_s.split('.') - *path, leaf = key_parts - leaf_scope = path.inject(result) { |scope, part| scope[part] ||= {} } - leaf_scope[leaf] = value - end + private + def isolate(value) + value.frozen? ? value : value.clone.freeze end end end diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb index 3980a8d..1d6a12c 100644 --- a/spec/inputs/elasticsearch_esql_spec.rb +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -15,81 +15,138 @@ end let(:esql_executor) { described_class.new(client, plugin) } - describe "when initializes" do + describe "#initialization" do it "sets up the ESQL client with correct parameters" do expect(esql_executor.instance_variable_get(:@query)).to eq(plugin_config["query"]) expect(esql_executor.instance_variable_get(:@retries)).to eq(plugin_config["retries"]) + expect(esql_executor.instance_variable_get(:@plugin)).to eq(plugin) + expect(esql_executor.instance_variable_get(:@target_field)).to eq(nil) end end - describe "when faces error while retrying" do - it "retries the given block the specified number of times" do - attempts = 0 - result = esql_executor.retryable("Test Job") do - attempts += 1 - raise StandardError if attempts < 3 - "success" + describe "#execution" do + let(:output_queue) { Queue.new } + + context "when faces error while retrying" do + it "retries the given block the specified number of times" do + attempts = 0 + result = esql_executor.retryable("Test Job") do + attempts += 1 + raise StandardError if attempts < 3 + "success" + end + expect(attempts).to eq(3) + expect(result).to eq("success") end - expect(attempts).to eq(3) - expect(result).to eq("success") - end - it "returns false if the block fails all attempts" do - result = esql_executor.retryable("Test Job") do - raise StandardError + it "returns false if the block fails all attempts" do + result = esql_executor.retryable("Test Job") do + raise StandardError + end + expect(result).to eq(false) end - expect(result).to eq(false) end - end - describe "when executing chain of processes" do - let(:output_queue) { Queue.new } - let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'a.b.1.d'}, { 'name' => 'h_g.k$l.m.0'}] } } + context "when executing chain of processes" do + let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'a.b.1.d', 'type' => 'keyword' }, + { 'name' => 'h_g.k$l.m.0', 'type' => 'keyword' }] } } - before do - allow(esql_executor).to receive(:retryable).and_yield - allow(client).to receive_message_chain(:esql, :query).and_return(response) - allow(plugin).to receive(:decorate_and_push_to_queue) - end + before do + allow(esql_executor).to receive(:retryable).and_yield + allow(client).to receive_message_chain(:esql, :query).and_return(response) + allow(plugin).to receive(:decorate_event) + end - it "executes the ESQL query and processes the results" do - allow(response).to receive(:headers).and_return({}) - esql_executor.do_run(output_queue, plugin_config["query"]) - expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {"a"=>{"b"=>{"1"=>{"d"=>"foo"}}}, "h_g"=>{"k$l"=>{"m"=>{"0"=>"bar"}}}}) - end + it "executes the ESQL query and processes the results" do + allow(response).to receive(:headers).and_return({}) + esql_executor.do_run(output_queue, plugin_config["query"]) + expect(output_queue.size).to eq(1) - it "logs a warning if the response contains a warning header" do - allow(response).to receive(:headers).and_return({"warning" => "some warning"}) - expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", {:message => "some warning"}) - esql_executor.do_run(output_queue, plugin_config["query"]) - end + event = output_queue.pop + expect(event.get('[a][b][1][d]')).to eq('foo') + expect(event.get('[h_g][k$l][m][0]')).to eq('bar') + end + + it "logs a warning if the response contains a warning header" do + allow(response).to receive(:headers).and_return({ "warning" => "some warning" }) + expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", { :message => "some warning" }) + esql_executor.do_run(output_queue, plugin_config["query"]) + end - it "does not log a warning if the response does not contain a warning header" do - allow(response).to receive(:headers).and_return({}) - expect(esql_executor.logger).not_to receive(:warn) - esql_executor.do_run(output_queue, plugin_config["query"]) + it "does not log a warning if the response does not contain a warning header" do + allow(response).to receive(:headers).and_return({}) + expect(esql_executor.logger).not_to receive(:warn) + esql_executor.do_run(output_queue, plugin_config["query"]) + end end - end - describe "when starts processing the response" do - let(:output_queue) { Queue.new } - let(:values) { [%w[foo bar]] } - let(:columns) { [{'name' => 'some.id'}, {'name' => 'some.val'}] } + describe "multiple rows in the result" do + let(:response) { { 'values' => rows, 'columns' => [{ 'name' => 'key.1', 'type' => 'keyword' }, + { 'name' => 'key.2', 'type' => 'keyword' }] } } + + before do + allow(esql_executor).to receive(:retryable).and_yield + allow(client).to receive_message_chain(:esql, :query).and_return(response) + allow(plugin).to receive(:decorate_event) + allow(response).to receive(:headers).and_return({}) + end + + context "when mapping" do + let(:rows) { [%w[foo bar], %w[hello world]] } + + it "1:1 maps rows to events" do + esql_executor.do_run(output_queue, plugin_config["query"]) + expect(output_queue.size).to eq(2) + + event_1 = output_queue.pop + expect(event_1.get('[key][1]')).to eq('foo') + expect(event_1.get('[key][2]')).to eq('bar') + + event_2 = output_queue.pop + expect(event_2.get('[key][1]')).to eq('hello') + expect(event_2.get('[key][2]')).to eq('world') + end + end + + context "when partial nil values appear" do + let(:rows) { [[nil, "bar"], ["hello", nil]] } + + it "ignores the nil values" do + esql_executor.do_run(output_queue, plugin_config["query"]) + expect(output_queue.size).to eq(2) - it "processes the ESQL response and pushes events to the output queue" do - allow(plugin).to receive(:decorate_and_push_to_queue) - esql_executor.send(:process_response, values, columns, output_queue) - expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {"some"=>{"id"=>"foo", "val"=>"bar"}}) + event_1 = output_queue.pop + expect(event_1.get('[key][1]')).to eq(nil) + expect(event_1.get('[key][2]')).to eq('bar') + + event_2 = output_queue.pop + expect(event_2.get('[key][1]')).to eq('hello') + expect(event_2.get('[key][2]')).to eq(nil) + end + end end end - describe "when maps column and values" do - let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] } - let(:values) { %w[foo bar] } + describe "#column spec" do + let(:valid_spec) { { 'name' => 'field.name', 'type' => 'keyword' } } + let(:column_spec) { LogStash::Inputs::Elasticsearch::ColumnSpec.new(valid_spec) } + + context "when initializes" do + it "sets the name and type attributes" do + expect(column_spec.name).to eq("field.name") + expect(column_spec.type).to eq("keyword") + end + + it "freezes the name and type attributes" do + expect(column_spec.name).to be_frozen + expect(column_spec.type).to be_frozen + end + end - it "maps column names to their corresponding values" do - result = esql_executor.send(:map_column_and_values, columns, values) - expect(result).to eq({'id' => 'foo', 'val' => 'bar'}) + context "when calls the field reference" do + it "returns the correct field reference format" do + expect(column_spec.field_reference).to eq("[field][name]") + end end end end if LOGSTASH_VERSION >= LogStash::Inputs::Elasticsearch::LS_ESQL_SUPPORT_VERSION \ No newline at end of file diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index b7c97e0..bc404a5 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1370,7 +1370,7 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie client.transport.respond_to?(:transport) ? client.transport.transport : client.transport end - context "#ESQL" do + describe "#ESQL" do let(:config) do { "query" => "FROM test-index | STATS count() BY field", @@ -1473,38 +1473,6 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie end end end - - describe "with extra params" do - context "empty `query_params`" do - let(:config) { - super().merge('query_params' => {}) - } - - it "does not raise a configuration error" do - expect { plugin.send(:validate_query_params!) }.not_to raise_error - end - end - - context "with actual `drop_null_columns` value" do - let(:config) { - super().merge('query_params' => { 'drop_null_columns' => true }) - } - - it "does not raise a configuration error" do - expect { plugin.send(:validate_query_params!) }.not_to raise_error - end - end - - context "with extra non ES|QL params" do - let(:config) { - super().merge('query_params' => { 'drop_null_columns' => true, 'test' => 'hi'}) - } - - it "does not raise a configuration error" do - expect { plugin.send(:validate_query_params!) }.to raise_error(LogStash::ConfigurationError, "{\"test\"=>\"hi\"} not accepted when `response_type => 'esql'`") - end - end - end end end end From a92a71ef94c23f6717c9b7f1a7bc86fe13d6b10d Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Tue, 6 May 2025 17:40:05 -0700 Subject: [PATCH 17/22] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Rye Biesemeyer Co-authored-by: João Duarte --- docs/index.asciidoc | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 4b57ad0..a7ea2c1 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -234,18 +234,12 @@ The next scheduled run: ==== ES|QL support {es} Query Language (ES|QL) provides a SQL-like interface for querying your {es} data. -To utilize the ES|QL feature with this plugin, the following version requirements must be met: -[cols="1,2",options="header"] -|=== -|Component |Minimum version -|{es} |8.11.0 or newer -|{ls} |8.17.4 or newer -|This plugin |4.23.0+ (4.x series) or 5.2.0+ (5.x series) +To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer. |=== -To configure ES|QL query in the plugin, set the `response_type` to `esql` and provide your ES|QL query in the `query` parameter. +To configure {esql} query in the plugin, set the `response_type` to `esql` and provide your {esql} query in the `query` parameter. -IMPORTANT: We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments. +IMPORTANT: {esql} is evolving and may still have limitations with regard to result size or supported field types. We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments. The following is a basic scheduled ES|QL query that runs hourly: [source, ruby] @@ -322,6 +316,8 @@ To illustrate the situation with example, assuming your mapping has a time `time } The ES|QL result will contain all three fields but the plugin cannot map them into {ls} event. + +This a common occurence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field. In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field` . To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields. [source, ruby] ... @@ -622,7 +618,7 @@ contents of the `aggregations` object of the query's response. In this case the 0 regardless of the default or user-defined value set in this plugin. When using the `esql` setting, the query must be a valid ES|QL string. -When this setting is active, `target`, `size`, `slices` and `search_api` parameters are ignored. +When this setting is active, `index`, `size`, `slices`, `search_api`, `docinfo`, `docinfo_target` and `docinfo_fields` parameters are not allowed. [id="plugins-{type}s-{plugin}-request_timeout_seconds"] ===== `request_timeout_seconds` From d4f559d4d6051855f0c0fe6923c90ba34f910693 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 6 May 2025 23:03:16 -0700 Subject: [PATCH 18/22] Apply code review suggestions: to use decorator as a proc call, doc syntax fix, unit test errors fix. --- docs/index.asciidoc | 1 - lib/logstash/inputs/elasticsearch.rb | 4 ++-- lib/logstash/inputs/elasticsearch/esql.rb | 4 ++-- spec/inputs/elasticsearch_esql_spec.rb | 4 ++-- spec/inputs/elasticsearch_spec.rb | 18 ++++++++++++++++++ 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index a7ea2c1..832341e 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -235,7 +235,6 @@ The next scheduled run: {es} Query Language (ES|QL) provides a SQL-like interface for querying your {es} data. To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer. -|=== To configure {esql} query in the plugin, set the `response_type` to `esql` and provide your {esql} query in the `query` parameter. diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index c5f596e..43d5592 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -312,8 +312,8 @@ def register if @response_type == 'esql' validate_ls_version_for_esql_support! validate_esql_query! - ignored_options = original_params.keys & %w(index size slices search_api, docinfo, docinfo_target, docinfo_fields) - @logger.info("Configured #{ignored_options} params are ignored in ES|QL query") if ignored_options&.size > 1 + not_allowed_options = original_params.keys & %w(index size slices search_api, docinfo, docinfo_target, docinfo_fields) + raise(LogStash::ConfigurationError, "Configured #{not_allowed_options} params are not allowed while using ES|QL query") if not_allowed_options&.size > 1 else @base_query = LogStash::Json.load(@query) if @slices diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index a6b9ed2..3a5ac28 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -17,7 +17,7 @@ class Esql # @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance def initialize(client, plugin) @client = client - @plugin = plugin + @event_decorator = plugin.method(:decorate_event) @target_field = plugin.params["target"] @retries = plugin.params["retries"] @@ -80,7 +80,7 @@ def process_response(columns, values, output_queue) event.set(field_reference, ESQL_PARSERS_BY_TYPE[column.type].call(value)) end end - @plugin.decorate_event(event) + @event_decorator.call(event) output_queue << event rescue => e # if event creation fails with whatever reason, inform user and tag with failure and return entry as it is diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb index 1d6a12c..6355db9 100644 --- a/spec/inputs/elasticsearch_esql_spec.rb +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -6,7 +6,8 @@ describe LogStash::Inputs::Elasticsearch::Esql do let(:client) { instance_double(Elasticsearch::Client) } let(:esql_client) { double("esql-client") } - let(:plugin) { instance_double(LogStash::Inputs::Elasticsearch, params: plugin_config) } + + let(:plugin) { instance_double(LogStash::Inputs::Elasticsearch, params: plugin_config, decorate_event: nil) } let(:plugin_config) do { "query" => "FROM test-index | STATS count() BY field", @@ -19,7 +20,6 @@ it "sets up the ESQL client with correct parameters" do expect(esql_executor.instance_variable_get(:@query)).to eq(plugin_config["query"]) expect(esql_executor.instance_variable_get(:@retries)).to eq(plugin_config["retries"]) - expect(esql_executor.instance_variable_get(:@plugin)).to eq(plugin) expect(esql_executor.instance_variable_get(:@target_field)).to eq(nil) end end diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index bc404a5..77703d2 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1439,6 +1439,24 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie end end + context "ES|QL query and DSL params used together" do + let(:config) { + super().merge({ + "index" => "my-index", + "size" => 1, + "slices" => 1, + "search_api" => "auto", + "docinfo" => true, + "docinfo_target" => "[@metadata][docinfo]", + "docinfo_fields" => ["_index"], + })} + + it "raises a config error" do + mixed_fields = %w[index size slices docinfo_fields] + expect { plugin.register }.to raise_error(LogStash::ConfigurationError, /Configured #{mixed_fields} params are not allowed while using ES|QL query/) + end + end + describe "ES|QL query" do context "when query is valid" do it "does not raise an error" do From 65eb675c3bfe49dd5169be36ae95234b4e598573 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 8 May 2025 09:30:18 -0700 Subject: [PATCH 19/22] Rename warning msg field name to avoid conflicts. Generate a target apply method to avoid null checks at runtime. --- lib/logstash/inputs/elasticsearch/esql.rb | 12 +++++++++--- spec/inputs/elasticsearch_esql_spec.rb | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index 3a5ac28..9f95b4d 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -18,9 +18,15 @@ class Esql def initialize(client, plugin) @client = client @event_decorator = plugin.method(:decorate_event) - @target_field = plugin.params["target"] @retries = plugin.params["retries"] + target_field = plugin.params["target"] + if target_field + def self.apply_target(path) = "[#{target_field}][#{path}]" + else + def self.apply_target(path) = path + end + @query = plugin.params["query"] unless @query.include?('METADATA') logger.info("`METADATA` not found the query. `_id`, `_version` and `_index` will not be available in the result", {:query => @query}) @@ -40,7 +46,7 @@ def do_run(output_queue, query) return if response == false if response&.headers&.dig("warning") - logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]}) + logger.warn("ES|QL executor received warning", {:warning_message => response.headers["warning"]}) end columns = response['columns']&.freeze values = response['values']&.freeze @@ -76,7 +82,7 @@ def process_response(columns, values, output_queue) # `unless value.nil?` is a part of `drop_null_columns` that if some of columns' values are not `nil`, `nil` values appear # we should continuously filter out them to achieve full `drop_null_columns` on each individual row (ideal `LIMIT 1` result) unless value.nil? - field_reference = @target_field.nil? ? column.field_reference : "[#{@target_field}][#{column.field_reference}]" + field_reference = apply_target(column.field_reference) event.set(field_reference, ESQL_PARSERS_BY_TYPE[column.type].call(value)) end end diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb index 6355db9..dcc53c5 100644 --- a/spec/inputs/elasticsearch_esql_spec.rb +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -69,7 +69,7 @@ it "logs a warning if the response contains a warning header" do allow(response).to receive(:headers).and_return({ "warning" => "some warning" }) - expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", { :message => "some warning" }) + expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", { :warning_message => "some warning" }) esql_executor.do_run(output_queue, plugin_config["query"]) end From 789f4676a3ffbc535bfa43d90fa328939c74a2ce Mon Sep 17 00:00:00 2001 From: Mashhur Date: Thu, 8 May 2025 12:17:31 -0700 Subject: [PATCH 20/22] Ignore sub-fields with warninigs and keep only parent. --- docs/index.asciidoc | 10 ++++--- lib/logstash/inputs/elasticsearch/esql.rb | 35 +++++++++++++++++++++-- spec/inputs/elasticsearch_esql_spec.rb | 32 +++++++++++++++++++-- 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 832341e..5f38d12 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -304,8 +304,12 @@ NOTE: If your index has a mapping with sub-objects where `status.code` and `stat ===== Conflict on multi-fields ES|QL query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects]. -Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin cannot map the result to {ls} event and produces `_elasticsearch_input_failure` tagged failed event. -We recommend using the `RENAME` (or `DROP`) keyword in your ES|QL query explicitly rename the fields to overcome this issue. +Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin ignores sub-fields with warning and includes parent. +We recommend using the `RENAME` (or `DROP` to avoid warnings) keyword in your ES|QL query explicitly rename the fields to include sub-fields into the event. + +This a common occurrence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field. +In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field`. + To illustrate the situation with example, assuming your mapping has a time `time` field with `time.min` and `time.max` sub-fields as following: [source, ruby] "properties": { @@ -315,8 +319,6 @@ To illustrate the situation with example, assuming your mapping has a time `time } The ES|QL result will contain all three fields but the plugin cannot map them into {ls} event. - -This a common occurence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field. In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field` . To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields. [source, ruby] ... diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb index 9f95b4d..30afb72 100644 --- a/lib/logstash/inputs/elasticsearch/esql.rb +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -22,9 +22,9 @@ def initialize(client, plugin) target_field = plugin.params["target"] if target_field - def self.apply_target(path) = "[#{target_field}][#{path}]" + def self.apply_target(path); "[#{target_field}][#{path}]"; end else - def self.apply_target(path) = path + def self.apply_target(path); path; end end @query = plugin.params["query"] @@ -77,11 +77,16 @@ def retryable(job_name, &block) # @param output_queue [Queue] The queue to push processed events to def process_response(columns, values, output_queue) column_specs = columns.map { |column| ColumnSpec.new(column) } + sub_element_mark_map = mark_sub_elements(column_specs) + multi_fields = sub_element_mark_map.filter_map { |key, val| key.name if val == true } + logger.warn("Multi-fields found in ES|QL result and they will not be available in the event. Please use `RENAME` command if you want to include them.", { :detected_multi_fields => multi_fields }) if multi_fields.any? + values.each do |row| event = column_specs.zip(row).each_with_object(LogStash::Event.new) do |(column, value), event| # `unless value.nil?` is a part of `drop_null_columns` that if some of columns' values are not `nil`, `nil` values appear # we should continuously filter out them to achieve full `drop_null_columns` on each individual row (ideal `LIMIT 1` result) - unless value.nil? + # we also exclude sub-elements of main field + if value && sub_element_mark_map[column] == false field_reference = apply_target(column.field_reference) event.set(field_reference, ESQL_PARSERS_BY_TYPE[column.type].call(value)) end @@ -95,6 +100,30 @@ def process_response(columns, values, output_queue) output_queue << failed_event end end + + # Determines whether each column in a collection is a nested sub-element (example "user.age") + # of another column in the same collection (example "user"). + # + # @param columns [Array] An array of objects with a `name` attribute representing field paths. + # @return [Hash] A hash mapping each column to `true` if it is a sub-element of another field, `false` otherwise. + # Time complexity: (O(NlogN+N*K)) where K is the number of conflict depth + # without (`prefix_set`) memoization, it would be O(N^2) + def mark_sub_elements(columns) + # Sort columns by name length (ascending) + sorted_columns = columns.sort_by { |c| c.name.length } + prefix_set = Set.new # memoization set + + sorted_columns.each_with_object({}) do |column, memo| + # Split the column name into parts (e.g., "user.profile.age" → ["user", "profile", "age"]) + parts = column.name.split('.') + + # Generate all possible parent prefixes (e.g., "user", "user.profile") + # and check if any parent prefix exists in the set + parent_prefixes = (0...parts.size - 1).map { |i| parts[0..i].join('.') } + memo[column] = parent_prefixes.any? { |prefix| prefix_set.include?(prefix) } + prefix_set.add(column.name) + end + end end # Class representing a column specification in the ESQL response['columns'] diff --git a/spec/inputs/elasticsearch_esql_spec.rb b/spec/inputs/elasticsearch_esql_spec.rb index dcc53c5..f958dea 100644 --- a/spec/inputs/elasticsearch_esql_spec.rb +++ b/spec/inputs/elasticsearch_esql_spec.rb @@ -54,7 +54,6 @@ before do allow(esql_executor).to receive(:retryable).and_yield allow(client).to receive_message_chain(:esql, :query).and_return(response) - allow(plugin).to receive(:decorate_event) end it "executes the ESQL query and processes the results" do @@ -87,7 +86,6 @@ before do allow(esql_executor).to receive(:retryable).and_yield allow(client).to receive_message_chain(:esql, :query).and_return(response) - allow(plugin).to receive(:decorate_event) allow(response).to receive(:headers).and_return({}) end @@ -125,6 +123,36 @@ end end end + + context "when sub-elements occur in the result" do + let(:response) { { + 'values' => [[50, 1, 100], [50, 0, 1000], [50, 9, 99999]], + 'columns' => + [ + { 'name' => 'time', 'type' => 'long' }, + { 'name' => 'time.min', 'type' => 'long' }, + { 'name' => 'time.max', 'type' => 'long' }, + ] + } } + + before do + allow(esql_executor).to receive(:retryable).and_yield + allow(client).to receive_message_chain(:esql, :query).and_return(response) + allow(response).to receive(:headers).and_return({}) + end + + it "includes 1st depth elements into event" do + esql_executor.do_run(output_queue, plugin_config["query"]) + + expect(output_queue.size).to eq(3) + 3.times do + event = output_queue.pop + expect(event.get('time')).to eq(50) + expect(event.get('[time][min]')).to eq(nil) + expect(event.get('[time][max]')).to eq(nil) + end + end + end end describe "#column spec" do From fefe6a070913c59293bce49cc0ff677c8bef1aa6 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 26 May 2025 11:03:01 -0700 Subject: [PATCH 21/22] Introduce at high level which other params such as , etc.. follow it. It validates the shape to send a valid query type to the ES. --- docs/index.asciidoc | 46 +++++++++++-------- lib/logstash/inputs/elasticsearch.rb | 45 +++++++++--------- spec/inputs/elasticsearch_spec.rb | 8 ++-- .../integration/elasticsearch_esql_spec.rb | 2 +- 4 files changed, 55 insertions(+), 46 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 5f38d12..2ea3acd 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -231,23 +231,23 @@ The next scheduled run: * updates the value of the field at the end of the pagination. [id="plugins-{type}s-{plugin}-esql"] -==== ES|QL support -{es} Query Language (ES|QL) provides a SQL-like interface for querying your {es} data. +==== {esql} support +{es} Query Language ({esql}) provides a SQL-like interface for querying your {es} data. To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer. -To configure {esql} query in the plugin, set the `response_type` to `esql` and provide your {esql} query in the `query` parameter. +To configure {esql} query in the plugin, set the `query_type` to `esql` and provide your {esql} query in the `query` parameter. IMPORTANT: {esql} is evolving and may still have limitations with regard to result size or supported field types. We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments. -The following is a basic scheduled ES|QL query that runs hourly: +The following is a basic scheduled {esql} query that runs hourly: [source, ruby] input { elasticsearch { id => hourly_cron_job hosts => [ 'https://..'] api_key => '....' - response_type => 'esql' + query_type => 'esql' query => ' FROM food-index | WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour @@ -259,11 +259,11 @@ The following is a basic scheduled ES|QL query that runs hourly: Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query. -NOTE: With ES|QL query, {ls} doesn't generate `event.original`. +NOTE: With {esql} query, {ls} doesn't generate `event.original`. [id="plugins-{type}s-{plugin}-esql-event-mapping"] -===== Mapping ES|QL result to {ls} event -ES|QL returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries). +===== Mapping {esql} result to {ls} event +{esql} returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries). The plugin maps each value entry to an event, populating corresponding fields. For example, a query might produce a table like: @@ -303,9 +303,9 @@ NOTE: If your index has a mapping with sub-objects where `status.code` and `stat [id="plugins-{type}s-{plugin}-esql-multifields"] ===== Conflict on multi-fields -ES|QL query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects]. +{esql} query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects]. Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin ignores sub-fields with warning and includes parent. -We recommend using the `RENAME` (or `DROP` to avoid warnings) keyword in your ES|QL query explicitly rename the fields to include sub-fields into the event. +We recommend using the `RENAME` (or `DROP` to avoid warnings) keyword in your {esql} query explicitly rename the fields to include sub-fields into the event. This a common occurrence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field. In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field`. @@ -318,14 +318,14 @@ To illustrate the situation with example, assuming your mapping has a time `time "time.max": { "type": "long" } } -The ES|QL result will contain all three fields but the plugin cannot map them into {ls} event. +The {esql} result will contain all three fields but the plugin cannot map them into {ls} event. To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields. [source, ruby] ... query => 'FROM my-index | RENAME time AS time.current' ... -For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{es} ES|QL documentation]. +For comprehensive {esql} syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{esql} documentation]. [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Input configuration options @@ -354,7 +354,8 @@ Please check out <> for details. | <> |<>|No | <> |<>|No | <> |<>|No -| <> |<>, one of `["hits","aggregations","esql"]`|No +| <> |<>, one of `["dsl","esql"]`|No +| <> |<>, one of `["hits","aggregations"]`|No | <> | <>|No | <> |<>|No | <> |<>|No @@ -596,12 +597,22 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`. * Default value is `'{ "sort": [ "_doc" ] }'` The query to be executed. -Accepted query shape is DSL or ES|QL (when `response_type => 'esql'`). -Read the {ref}/query-dsl.html[{es} query DSL documentation] or {ref}/esql.html[{es} ES|QL documentation] for more information. +Accepted query shape is DSL or {esql} (when `query_type => 'esql'`). +Read the {ref}/query-dsl.html[{es} query DSL documentation] or {ref}/esql.html[{esql} documentation] for more information. When <> resolves to `search_after` and the query does not specify `sort`, the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more. +[id="plugins-{type}s-{plugin}-query_type"] +===== `query_type` + +* Value can be `dsl` or `esql` +* Default value is `dsl` + +Defines the <> shape. +When `dsl`, the query shape must be valid {es} JSON-style string. +When `esql`, the query shape must be a valid {esql} string and `index`, `size`, `slices`, `search_api`, `docinfo`, `docinfo_target`, `docinfo_fields`, `response_type` and `tracking_field` parameters are not allowed. + [id="plugins-{type}s-{plugin}-response_type"] ===== `response_type` @@ -613,14 +624,11 @@ response from the query. The default `hits` will generate one event per returned document (i.e. "hit"). -When set to `aggregations`, a single Logstash event will be generated with the +When set to `aggregations`, a single {ls} event will be generated with the contents of the `aggregations` object of the query's response. In this case the `hits` object will be ignored. The parameter `size` will be always be set to 0 regardless of the default or user-defined value set in this plugin. -When using the `esql` setting, the query must be a valid ES|QL string. -When this setting is active, `index`, `size`, `slices`, `search_api`, `docinfo`, `docinfo_target` and `docinfo_fields` parameters are not allowed. - [id="plugins-{type}s-{plugin}-request_timeout_seconds"] ===== `request_timeout_seconds` diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 43d5592..b7b477b 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -97,18 +97,21 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # The index or alias to search. config :index, :validate => :string, :default => "logstash-*" - # The query to be executed. DSL or ES|QL (when `response_type => 'esql'`) query shape is accepted. + # A type of Elasticsearch query, provided by @query. This will validate query shape and other params. + config :query_type, :validate => %w[dsl esql], :default => 'dsl' + + # The query to be executed. DSL or ES|QL (when `query_type => 'esql'`) query shape is accepted. # Read the following documentations for more info # Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html # ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }' - # This allows you to speccify the response type: one of [hits, aggregations, esql] + # This allows you to specify the DSL response type: one of [hits, aggregations] # where # hits: normal search request # aggregations: aggregation request - # esql: ES|QL request - config :response_type, :validate => %w[hits aggregations esql], :default => 'hits' + # Note that this param is invalid when `query_type => 'esql'`, ES|QL response shape is always a tabular format + config :response_type, :validate => %w[hits aggregations], :default => 'hits' # This allows you to set the maximum number of hits returned per scroll. config :size, :validate => :number, :default => 1000 @@ -309,10 +312,10 @@ def register fill_hosts_from_cloud_id setup_ssl_params! - if @response_type == 'esql' + if @query_type == 'esql' validate_ls_version_for_esql_support! validate_esql_query! - not_allowed_options = original_params.keys & %w(index size slices search_api, docinfo, docinfo_target, docinfo_fields) + not_allowed_options = original_params.keys & %w(index size slices search_api docinfo docinfo_target docinfo_fields response_type tracking_field) raise(LogStash::ConfigurationError, "Configured #{not_allowed_options} params are not allowed while using ES|QL query") if not_allowed_options&.size > 1 else @base_query = LogStash::Json.load(@query) @@ -361,7 +364,7 @@ def register setup_search_api - setup_query_executor + @query_executor = create_query_executor setup_cursor_tracker @@ -396,7 +399,7 @@ def decorate_event(event) private def get_query_object - return @query if @response_type == 'esql' + return @query if @query_type == 'esql' if @cursor_tracker query = @cursor_tracker.inject_cursor(@query) @logger.debug("new query is #{query}") @@ -685,20 +688,16 @@ def setup_search_api end - def setup_query_executor - @query_executor = case @response_type - when 'hits' - if @resolved_search_api == "search_after" - LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self) - else - logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8 - LogStash::Inputs::Elasticsearch::Scroll.new(@client, self) - end - when 'aggregations' - LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) - when 'esql' - LogStash::Inputs::Elasticsearch::Esql.new(@client, self) - end + def create_query_executor + return LogStash::Inputs::Elasticsearch::Esql.new(@client, self) if @query_type == 'esql' + + # DSL query executor + return LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) if @response_type == 'aggregations' + # response_type is hits, executor can be search_after or scroll type + return LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self) if @resolved_search_api == "search_after" + + logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8 + LogStash::Inputs::Elasticsearch::Scroll.new(@client, self) end def setup_cursor_tracker @@ -751,7 +750,7 @@ def validate_esql_query! end def validate_es_for_esql_support! - return unless @response_type == 'esql' + return unless @query_type == 'esql' # make sure connected ES supports ES|QL (8.11+) es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION) fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index 77703d2..02787df 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1374,7 +1374,7 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie let(:config) do { "query" => "FROM test-index | STATS count() BY field", - "response_type" => "esql", + "query_type" => "esql", "retries" => 3 } end @@ -1387,8 +1387,8 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie describe "#initialize" do it "sets up the ESQL client with correct parameters" do + expect(plugin.instance_variable_get(:@query_type)).to eq(config["query_type"]) expect(plugin.instance_variable_get(:@query)).to eq(config["query"]) - expect(plugin.instance_variable_get(:@response_type)).to eq(config["response_type"]) expect(plugin.instance_variable_get(:@retries)).to eq(config["retries"]) end end @@ -1449,10 +1449,12 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie "docinfo" => true, "docinfo_target" => "[@metadata][docinfo]", "docinfo_fields" => ["_index"], + "response_type" => "hits", + "tracking_field" => "[@metadata][tracking]" })} it "raises a config error" do - mixed_fields = %w[index size slices docinfo_fields] + mixed_fields = %w[index size slices docinfo_fields response_type tracking_field] expect { plugin.register }.to raise_error(LogStash::ConfigurationError, /Configured #{mixed_fields} params are not allowed while using ES|QL query/) end end diff --git a/spec/inputs/integration/elasticsearch_esql_spec.rb b/spec/inputs/integration/elasticsearch_esql_spec.rb index b27b8dd..b25f65a 100644 --- a/spec/inputs/integration/elasticsearch_esql_spec.rb +++ b/spec/inputs/integration/elasticsearch_esql_spec.rb @@ -23,7 +23,7 @@ let(:config) do { "hosts" => ES_HOSTS, - "response_type" => "esql" + "query_type" => "esql" } end let(:es_client) do From e108c87d94d013a9f481740bfed72919d6c5b005 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 28 May 2025 11:06:25 -0700 Subject: [PATCH 22/22] Add a tech preview fior the ESQL. --- docs/index.asciidoc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 2ea3acd..677b11e 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -232,6 +232,13 @@ The next scheduled run: [id="plugins-{type}s-{plugin}-esql"] ==== {esql} support + +.Technical Preview +**** +The {esql} feature that allows using ES|QL queries with this plugin is in Technical Preview. +Configuration options and implementation details are subject to change in minor releases without being preceded by deprecation warnings. +**** + {es} Query Language ({esql}) provides a SQL-like interface for querying your {es} data. To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer.