Skip to content

Commit d678408

Browse files
jsvdJAndritschrobbaveykarenzone
committed
Introduce cursor tracking akin to jdbc input (logstash-plugins#205)
Provide field value tracking, persisted to disk on each search_after page. Adds `:last_value` and `:present` placeholders, allowing the plugin to inject the cursor value and now-30 seconds, respectively, in the query string. Useful to track new data being written to an index or series of indices. Works best with nano second precision timestamps added by Elasticsearch's Ingest Pipelines. --------- Co-authored-by: Joel Andritsch <[email protected]> Co-authored-by: Rob Bavey <[email protected]> Co-authored-by: Karen Metts <[email protected]>
1 parent 7b76df7 commit d678408

File tree

10 files changed

+411
-16
lines changed

10 files changed

+411
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.22.0
2+
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)
3+
14
## 4.21.2
25
- Add elastic-transport client support used in elasticsearch-ruby 8.x [#225](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/225)
36

docs/index.asciidoc

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ This would create an Elasticsearch query with the following format:
4848
"sort": [ "_doc" ]
4949
}'
5050

51-
51+
[id="plugins-{type}s-{plugin}-scheduling"]
5252
==== Scheduling
5353

5454
Input from this plugin can be scheduled to run periodically according to a specific
@@ -103,6 +103,133 @@ Common causes are:
103103
- When the hit result contains top-level fields that are {logstash-ref}/processing.html#reserved-fields[reserved in Logstash] but do not have the expected shape. Use the <<plugins-{type}s-{plugin}-target>> directive to avoid conflicts with the top-level namespace.
104104
- When <<plugins-{type}s-{plugin}-docinfo>> is enabled and the docinfo fields cannot be merged into the hit result. Combine <<plugins-{type}s-{plugin}-target>> and <<plugins-{type}s-{plugin}-docinfo_target>> to avoid conflict.
105105

106+
[id="plugins-{type}s-{plugin}-cursor"]
107+
==== Tracking a field's value across runs
108+
109+
.Technical Preview: Tracking a field's value
110+
****
111+
The feature that allows tracking a field's value across runs is in _Technical Preview_.
112+
Configuration options and implementation details are subject to change in minor releases without being preceded by deprecation warnings.
113+
****
114+
115+
Some uses cases require tracking the value of a particular field between two jobs.
116+
Examples include:
117+
118+
* avoiding the need to re-process the entire result set of a long query after an unplanned restart
119+
* grabbing only new data from an index instead of processing the entire set on each job.
120+
121+
The Elasticsearch input plugin provides the <<plugins-{type}s-{plugin}-tracking_field>> and <<plugins-{type}s-{plugin}-tracking_field_seed>> options.
122+
When <<plugins-{type}s-{plugin}-tracking_field>> is set, the plugin records the value of that field for the last document retrieved in a run into
123+
a file.
124+
(The file location defaults to <<plugins-{type}s-{plugin}-last_run_metadata_path>>.)
125+
126+
You can then inject this value in the query using the placeholder `:last_value`.
127+
The value will be injected into the query before execution, and then updated after the query completes if new data was found.
128+
129+
This feature works best when:
130+
131+
* the query sorts by the tracking field,
132+
* the timestamp field is added by {es}, and
133+
* the field type has enough resolution so that two events are unlikely to have the same value.
134+
135+
Consider using a tracking field whose type is https://www.elastic.co/guide/en/elasticsearch/reference/current/date_nanos.html[date nanoseconds].
136+
If the tracking field is of this data type, you can use an extra placeholder called `:present` to inject the nano-second based value of "now-30s".
137+
This placeholder is useful as the right-hand side of a range filter, allowing the collection of
138+
new data but leaving partially-searchable bulk request data to the next scheduled job.
139+
140+
[id="plugins-{type}s-{plugin}-tracking-sample"]
141+
===== Sample configuration: Track field value across runs
142+
143+
This section contains a series of steps to help you set up the "tailing" of data being written to a set of indices, using a date nanosecond field added by an Elasticsearch ingest pipeline and the `tracking_field` capability of this plugin.
144+
145+
. Create ingest pipeline that adds Elasticsearch's `_ingest.timestamp` field to the documents as `event.ingested`:
146+
+
147+
[source, json]
148+
PUT _ingest/pipeline/my-pipeline
149+
{
150+
"processors": [
151+
{
152+
"script": {
153+
"lang": "painless",
154+
"source": "ctx.putIfAbsent(\"event\", [:]); ctx.event.ingested = metadata().now.format(DateTimeFormatter.ISO_INSTANT);"
155+
}
156+
}
157+
]
158+
}
159+
160+
[start=2]
161+
. Create an index mapping where the tracking field is of date nanosecond type and invokes the defined pipeline:
162+
+
163+
[source, json]
164+
PUT /_template/my_template
165+
{
166+
"index_patterns": ["test-*"],
167+
"settings": {
168+
"index.default_pipeline": "my-pipeline",
169+
},
170+
"mappings": {
171+
"properties": {
172+
"event": {
173+
"properties": {
174+
"ingested": {
175+
"type": "date_nanos",
176+
"format": "strict_date_optional_time_nanos"
177+
}
178+
}
179+
}
180+
}
181+
}
182+
}
183+
184+
[start=3]
185+
. Define a query that looks at all data of the indices, sorted by the tracking field, and with a range filter since the last value seen until present:
186+
+
187+
[source,json]
188+
{
189+
"query": {
190+
"range": {
191+
"event.ingested": {
192+
"gt": ":last_value",
193+
"lt": ":present"
194+
}
195+
}
196+
},
197+
"sort": [
198+
{
199+
"event.ingested": {
200+
"order": "asc",
201+
"format": "strict_date_optional_time_nanos",
202+
"numeric_type": "date_nanos"
203+
}
204+
}
205+
]
206+
}
207+
208+
[start=4]
209+
. Configure the Elasticsearch input to query the indices with the query defined above, every minute, and track the `event.ingested` field:
210+
+
211+
[source, ruby]
212+
input {
213+
elasticsearch {
214+
id => tail_test_index
215+
hosts => [ 'https://..']
216+
api_key => '....'
217+
index => 'test-*'
218+
query => '{ "query": { "range": { "event.ingested": { "gt": ":last_value", "lt": ":present"}}}, "sort": [ { "event.ingested": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" } } ] }'
219+
tracking_field => "[event][ingested]"
220+
slices => 5 # optional use of slices to speed data processing, should be equal to or less than number of primary shards
221+
schedule => '* * * * *' # every minute
222+
schedule_overlap => false # don't accumulate jobs if one takes longer than 1 minute
223+
}
224+
}
225+
226+
With this sample setup, new documents are indexed into a `test-*` index.
227+
The next scheduled run:
228+
229+
* selects all new documents since the last observed value of the tracking field,
230+
* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and
231+
* updates the value of the field at the end of the pagination.
232+
106233
[id="plugins-{type}s-{plugin}-options"]
107234
==== Elasticsearch Input configuration options
108235

@@ -123,12 +250,14 @@ This plugin supports the following configuration options plus the <<plugins-{typ
123250
| <<plugins-{type}s-{plugin}-ecs_compatibility>> |<<string,string>>|No
124251
| <<plugins-{type}s-{plugin}-hosts>> |<<array,array>>|No
125252
| <<plugins-{type}s-{plugin}-index>> |<<string,string>>|No
253+
| <<plugins-{type}s-{plugin}-last_run_metadata_path>> |<<string,string>>|No
126254
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
127255
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
128256
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
129257
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
130258
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
131259
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
260+
| <<plugins-{type}s-{plugin}-schedule_overlap>> |<<boolean,boolean>>|No
132261
| <<plugins-{type}s-{plugin}-scroll>> |<<string,string>>|No
133262
| <<plugins-{type}s-{plugin}-search_api>> |<<string,string>>, one of `["auto", "search_after", "scroll"]`|No
134263
| <<plugins-{type}s-{plugin}-size>> |<<number,number>>|No
@@ -148,6 +277,8 @@ This plugin supports the following configuration options plus the <<plugins-{typ
148277
| <<plugins-{type}s-{plugin}-ssl_verification_mode>> |<<string,string>>, one of `["full", "none"]`|No
149278
| <<plugins-{type}s-{plugin}-socket_timeout_seconds>> | <<number,number>>|No
150279
| <<plugins-{type}s-{plugin}-target>> | {logstash-ref}/field-references-deepdive.html[field reference] | No
280+
| <<plugins-{type}s-{plugin}-tracking_field>> |<<string,string>>|No
281+
| <<plugins-{type}s-{plugin}-tracking_field_seed>> |<<string,string>>|No
151282
| <<plugins-{type}s-{plugin}-retries>> | <<number,number>>|No
152283
| <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
153284
|=======================================================================
@@ -327,6 +458,17 @@ Check out {ref}/api-conventions.html#api-multi-index[Multi Indices
327458
documentation] in the Elasticsearch documentation for info on
328459
referencing multiple indices.
329460

461+
[id="plugins-{type}s-{plugin}-last_run_metadata_path"]
462+
===== `last_run_metadata_path`
463+
464+
* Value type is <<string,string>>
465+
* There is no default value for this setting.
466+
467+
The path to store the last observed value of the tracking field, when used.
468+
By default this file is stored as `<path.data>/plugins/inputs/elasticsearch/<pipeline_id>/last_run_value`.
469+
470+
This setting should point to file, not a directory, and Logstash must have read+write access to this file.
471+
330472
[id="plugins-{type}s-{plugin}-password"]
331473
===== `password`
332474

@@ -407,6 +549,19 @@ for example: "* * * * *" (execute query every minute, on the minute)
407549
There is no schedule by default. If no schedule is given, then the statement is run
408550
exactly once.
409551

552+
[id="plugins-{type}s-{plugin}-schedule_overlap"]
553+
===== `schedule_overlap`
554+
555+
* Value type is <<boolean,boolean>>
556+
* Default value is `true`
557+
558+
Whether to allow queuing of a scheduled run if a run is occurring.
559+
While this is ideal for ensuring a new run happens immediately after the previous on finishes if there
560+
is a lot of work to do, but given the queue is unbounded it may lead to an out of memory over long periods of time
561+
if the queue grows continuously.
562+
563+
When in doubt, set `schedule_overlap` to false (it may become the default value in the future).
564+
410565
[id="plugins-{type}s-{plugin}-scroll"]
411566
===== `scroll`
412567

@@ -617,6 +772,28 @@ When the `target` is set to a field reference, the `_source` of the hit is place
617772
This option can be useful to avoid populating unknown fields when a downstream schema such as ECS is enforced.
618773
It is also possible to target an entry in the event's metadata, which will be available during event processing but not exported to your outputs (e.g., `target \=> "[@metadata][_source]"`).
619774

775+
[id="plugins-{type}s-{plugin}-tracking_field"]
776+
===== `tracking_field`
777+
778+
* Value type is <<string,string>>
779+
* There is no default value for this setting.
780+
781+
Which field from the last event of a previous run will be used a cursor value for the following run.
782+
The value of this field is injected into each query if the query uses the placeholder `:last_value`.
783+
For the first query after a pipeline is started, the value used is either read from <<plugins-{type}s-{plugin}-last_run_metadata_path>> file,
784+
or taken from <<plugins-{type}s-{plugin}-tracking_field_seed>> setting.
785+
786+
Note: The tracking value is updated after each page is read and at the end of each Point in Time. In case of a crash the last saved value will be used so some duplication of data can occur. For this reason the use of unique document IDs for each event is recommended in the downstream destination.
787+
788+
[id="plugins-{type}s-{plugin}-tracking_field_seed"]
789+
===== `tracking_field_seed`
790+
791+
* Value type is <<string,string>>
792+
* Default value is `"1970-01-01T00:00:00.000000000Z"`
793+
794+
The starting value for the <<plugins-{type}s-{plugin}-tracking_field>> if there is no <<plugins-{type}s-{plugin}-last_run_metadata_path>> already.
795+
This field defaults to the nanosecond precision ISO8601 representation of `epoch`, or "1970-01-01T00:00:00.000000000Z", given nano-second precision timestamps are the
796+
most reliable data format to use for this feature.
620797

621798
[id="plugins-{type}s-{plugin}-user"]
622799
===== `user`

lib/logstash/inputs/elasticsearch.rb

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
7373

7474
require 'logstash/inputs/elasticsearch/paginated_search'
7575
require 'logstash/inputs/elasticsearch/aggregation'
76+
require 'logstash/inputs/elasticsearch/cursor_tracker'
7677

7778
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
7879
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
@@ -124,6 +125,20 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
124125
# by this pipeline input.
125126
config :slices, :validate => :number
126127

128+
# Enable tracking the value of a given field to be used as a cursor
129+
# Main concerns:
130+
# * using anything other than _event.timestamp easily leads to data loss
131+
# * the first "synchronization run can take a long time"
132+
config :tracking_field, :validate => :string
133+
134+
# Define the initial seed value of the tracking_field
135+
config :tracking_field_seed, :validate => :string, :default => "1970-01-01T00:00:00.000000000Z"
136+
137+
# The location of where the tracking field value will be stored
138+
# The value is persisted after each scheduled run (and not per result)
139+
# If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/<pipeline_id>/last_run_value'
140+
config :last_run_metadata_path, :validate => :string
141+
127142
# If set, include Elasticsearch document information such as index, type, and
128143
# the id in the event.
129144
#
@@ -262,6 +277,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
262277
# exactly once.
263278
config :schedule, :validate => :string
264279

280+
# Allow scheduled runs to overlap (enabled by default). Setting to false will
281+
# only start a new scheduled run after the previous one completes.
282+
config :schedule_overlap, :validate => :boolean
283+
265284
# If set, the _source of each hit will be added nested under the target instead of at the top-level
266285
config :target, :validate => :field_reference
267286

@@ -335,16 +354,30 @@ def register
335354

336355
setup_query_executor
337356

357+
setup_cursor_tracker
358+
338359
@client
339360
end
340361

341362
def run(output_queue)
342363
if @schedule
343-
scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
364+
scheduler.cron(@schedule, :overlap => @schedule_overlap) do
365+
@query_executor.do_run(output_queue, get_query_object())
366+
end
344367
scheduler.join
345368
else
346-
@query_executor.do_run(output_queue)
369+
@query_executor.do_run(output_queue, get_query_object())
370+
end
371+
end
372+
373+
def get_query_object
374+
if @cursor_tracker
375+
query = @cursor_tracker.inject_cursor(@query)
376+
@logger.debug("new query is #{query}")
377+
else
378+
query = @query
347379
end
380+
LogStash::Json.load(query)
348381
end
349382

350383
##
@@ -354,6 +387,11 @@ def push_hit(hit, output_queue, root_field = '_source')
354387
event = event_from_hit(hit, root_field)
355388
decorate(event)
356389
output_queue << event
390+
record_last_value(event)
391+
end
392+
393+
def record_last_value(event)
394+
@cursor_tracker.record_last_value(event) if @tracking_field
357395
end
358396

359397
def event_from_hit(hit, root_field)
@@ -676,6 +714,28 @@ def setup_query_executor
676714
end
677715
end
678716

717+
def setup_cursor_tracker
718+
return unless @tracking_field
719+
return unless @query_executor.is_a?(LogStash::Inputs::Elasticsearch::SearchAfter)
720+
721+
if @resolved_search_api != "search_after" || @response_type != "hits"
722+
raise ConfigurationError.new("The `tracking_field` feature can only be used with `search_after` non-aggregation queries")
723+
end
724+
725+
@cursor_tracker = CursorTracker.new(last_run_metadata_path: last_run_metadata_path,
726+
tracking_field: @tracking_field,
727+
tracking_field_seed: @tracking_field_seed)
728+
@query_executor.cursor_tracker = @cursor_tracker
729+
end
730+
731+
def last_run_metadata_path
732+
return @last_run_metadata_path if @last_run_metadata_path
733+
734+
last_run_metadata_path = ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", pipeline_id, "last_run_value")
735+
FileUtils.mkdir_p ::File.dirname(last_run_metadata_path)
736+
last_run_metadata_path
737+
end
738+
679739
def get_transport_client_class
680740
# LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport`
681741
# And now `elasticsearch-transport` is old, instead we have `elastic-transport`.

lib/logstash/inputs/elasticsearch/aggregation.rb

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,9 @@ def initialize(client, plugin)
1212
@client = client
1313
@plugin_params = plugin.params
1414

15+
@index = @plugin_params["index"]
1516
@size = @plugin_params["size"]
16-
@query = @plugin_params["query"]
1717
@retries = @plugin_params["retries"]
18-
@agg_options = {
19-
:index => @plugin_params["index"],
20-
:size => 0
21-
}.merge(:body => @query)
22-
2318
@plugin = plugin
2419
end
2520

@@ -33,10 +28,18 @@ def retryable(job_name, &block)
3328
false
3429
end
3530

36-
def do_run(output_queue)
31+
def aggregation_options(query_object)
32+
{
33+
:index => @index,
34+
:size => 0,
35+
:body => query_object
36+
}
37+
end
38+
39+
def do_run(output_queue, query_object)
3740
logger.info("Aggregation starting")
3841
r = retryable(AGGREGATION_JOB) do
39-
@client.search(@agg_options)
42+
@client.search(aggregation_options(query_object))
4043
end
4144
@plugin.push_hit(r, output_queue, 'aggregations') if r
4245
end

0 commit comments

Comments
 (0)