Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.5.0
- Introduce opt-in "yaml_load_strategy => streaming" to stream parse YAML dictionaries [#106](https://github.com/logstash-plugins/logstash-filter-translate/pull/106)

## 3.4.3
- Allow YamlFile's Psych::Parser and Visitor instances to be garbage collected [#104](https://github.com/logstash-plugins/logstash-filter-translate/pull/104)

Expand Down
17 changes: 17 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-refresh_behaviour>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-yaml_dictionary_code_point_limit>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-yaml_load_strategy>> |<<string,string>>, one of `["one_shot", "streaming"]`|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand Down Expand Up @@ -432,5 +433,21 @@ the filter will succeed. This will clobber the old value of the source field!
The max amount of code points in the YAML file in `dictionary_path`. Please be aware that byte limit depends on the encoding.
This setting is effective for YAML file only. YAML over the limit throws exception.

[id="plugins-{type}s-{plugin}-yaml_load_strategy"]
===== `yaml_load_strategy`

* Value can be any of: `one_shot`, `streaming`
* Default value is `one_shot`

How to load and parse the YAML file. This setting defaults to `one_shot`, which loads the entire
YAML file into the parser in one go, emitting the final dictionary from the fully parsed YAML document.

Setting to `streaming` will instead instruct the parser to emit one "YAML element" at a time, constructing the dictionary
during parsing. This mode drastically reduces the amount of memory required to load or refresh the dictionary and it is also faster.

Due to underlying implementation differences this mode only supports basic types such as Arrays, Objects, Strings, numbers and booleans, and does not support tags.

If you have a lot of translate filters with large YAML documents consider changing this setting to `streaming` instead.

[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]
111 changes: 111 additions & 0 deletions lib/logstash/filters/dictionary/streaming_yaml_parser.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
module LogStash module Filters module Dictionary
class StreamingYamlDictParser
def snakeYamlEngineV2
Java::org.snakeyaml.engine.v2
end

def snakeYamlEngineV2Events
snakeYamlEngineV2.events
end

def initialize(filename, yaml_code_point_limit)
settings = snakeYamlEngineV2.api.LoadSettings.builder
.set_code_point_limit(yaml_code_point_limit)
.build

stream = Java::java.io.FileInputStream.new(filename)
reader = Java::java.io.InputStreamReader.new(stream, Java::java.nio.charset.StandardCharsets::UTF_8)
stream_reader = snakeYamlEngineV2.scanner.StreamReader.new(reader, settings)

@parser = snakeYamlEngineV2.parser.ParserImpl.new(stream_reader, settings)

skip_until(snakeYamlEngineV2Events.MappingStartEvent)
end


def each_pair
while peek_event && !peek_event.is_a?(snakeYamlEngineV2Events.MappingEndEvent)
key = parse_node
value = parse_node
yield(key, value)
end
end

private

def next_event
@parser.next
ensure
nil
end

def peek_event
@parser.peek_event
end

def skip_until(event_class)
while @parser.has_next
evt = @parser.next
return if event_class === evt
end
end

def parse_node
event = next_event

case event
when snakeYamlEngineV2Events.ScalarEvent
parse_scalar(event)
when snakeYamlEngineV2Events.MappingStartEvent
parse_mapping
when snakeYamlEngineV2Events.SequenceStartEvent
parse_sequence
else
raise "Unexpected event: #{event.class}"
end
end

def parse_mapping
hash = {}
while peek_event && !peek_event.is_a?(snakeYamlEngineV2Events.MappingEndEvent)
key = parse_node
value = parse_node
hash[key] = value
end
next_event
hash
end

def parse_sequence
array = []
while peek_event && !peek_event.is_a?(snakeYamlEngineV2Events.SequenceEndEvent)
array << parse_node
end
next_event
array
end

def parse_scalar(scalar)
value = scalar.value
# return quoted scalars as they are
# e.g. don't convert "true" to true
return value unless scalar.is_plain

# otherwise let's do some checking and conversions
case value
when 'null', '', '~' then nil
when 'true' then true
when 'false' then false
else
# Try to convert to integer or float
if value.match?(/\A-?\d+\z/)
value.to_i
elsif value.match?(/\A-?\d+\.\d+\z/)
value.to_f
else
value
end
end
end
end
end end end
25 changes: 16 additions & 9 deletions lib/logstash/filters/dictionary/yaml_file.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8

require_relative "yaml_visitor"
require_relative "streaming_yaml_parser"

module LogStash module Filters module Dictionary
class YamlFile < File
Expand All @@ -9,18 +10,24 @@ class YamlFile < File

def initialize_for_file_type(**file_type_args)
@yaml_code_point_limit = file_type_args[:yaml_code_point_limit]
@yaml_load_strategy = file_type_args[:yaml_load_strategy]
end

def read_file_into_dictionary
visitor = YamlVisitor.create
parser = Psych::Parser.new(Psych::TreeBuilder.new)
parser.code_point_limit = @yaml_code_point_limit
# low level YAML read that tries to create as
# few intermediate objects as possible
# this overwrites the value at key
yaml_string = IO.read(@dictionary_path, :mode => 'r:bom|utf-8')
parser.parse(yaml_string, @dictionary_path)
visitor.accept_with_dictionary(@dictionary, parser.handler.root)
if @yaml_load_strategy == "one_shot"
visitor = YamlVisitor.create
parser = Psych::Parser.new(Psych::TreeBuilder.new)
parser.code_point_limit = @yaml_code_point_limit
# low level YAML read that tries to create as
# few intermediate objects as possible
# this overwrites the value at key
yaml_string = IO.read(@dictionary_path, :mode => 'r:bom|utf-8')
parser.parse(yaml_string, @dictionary_path)
visitor.accept_with_dictionary(@dictionary, parser.handler.root)
else # stream parse it
parser = StreamingYamlDictParser.new(@dictionary_path, @yaml_code_point_limit)
parser.each_pair {|key, value| @dictionary[key] = value }
end
end
end
end end end
6 changes: 5 additions & 1 deletion lib/logstash/filters/translate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class Translate < LogStash::Filters::Base
# The default value is 128MB for code points of size 1 byte
config :yaml_dictionary_code_point_limit, :validate => :number

# either load the entire yaml into memory before generating the in-memory dictionary
# alternatively "streaming" will gradually build the dictionary with little memory overhead
config :yaml_load_strategy, :validate => ["streaming", "one_shot"], :default => "one_shot"

# When using a dictionary file, this setting will indicate how frequently
# (in seconds) logstash will check the dictionary file for updates.
config :refresh_interval, :validate => :number, :default => 300
Expand Down Expand Up @@ -195,7 +199,7 @@ def register
if @yaml_dictionary_code_point_limit <= 0
raise LogStash::ConfigurationError, "Please set a positive number in `yaml_dictionary_code_point_limit => #{@yaml_dictionary_code_point_limit}`."
else
@lookup = Dictionary::File.create(@dictionary_path, @refresh_interval, @refresh_behaviour, @exact, @regex, yaml_code_point_limit: @yaml_dictionary_code_point_limit)
@lookup = Dictionary::File.create(@dictionary_path, @refresh_interval, @refresh_behaviour, @exact, @regex, yaml_code_point_limit: @yaml_dictionary_code_point_limit, yaml_load_strategy: @yaml_load_strategy)
end
elsif @yaml_dictionary_code_point_limit != nil
raise LogStash::ConfigurationError, "Please remove `yaml_dictionary_code_point_limit` for dictionary file in JSON or CSV format"
Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-translate.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-translate'
s.version = '3.4.3'
s.version = '3.5.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Replaces field contents based on a hash or YAML file"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
25 changes: 20 additions & 5 deletions spec/filters/translate_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,21 @@ def self.build_fixture_path(filename)
subject.filter(event)
expect(event.get("translation")).to eq(1)
end

describe "yaml_load_strategy" do
let(:one_shot_parse_filter) { subject }
let(:streaming_parse_filter) { described_class.new(config.merge("yaml_load_strategy" => 'streaming')) }

before(:each) do
subject.register
streaming_parse_filter.register
end
let(:one_shot_dictionary) { one_shot_parse_filter.lookup.dictionary }
let(:streaming_dictionary) { streaming_parse_filter.lookup.dictionary }
it "produces an equivalent dictionary for both strategies" do
expect(one_shot_dictionary).to eq(streaming_dictionary)
end
end
end

describe "when using a yml dictionary with code point limit" do
Expand All @@ -246,23 +261,23 @@ def self.build_fixture_path(filename)
"source" => "status",
"target" => "translation",
"dictionary_path" => dictionary_path,
"yaml_dictionary_code_point_limit" => dictionary_size # the file is 18 bytes
"yaml_dictionary_code_point_limit" => codepoint_limit
}
end
let(:dictionary_path) { TranslateUtil.build_fixture_path("dict.yml") }
let(:dictionary_size) { IO.read(dictionary_path).size }
let(:event) { LogStash::Event.new("status" => "a") }
let(:codepoint_limit) { dictionary_size }

context "dictionary is over limit" do
let(:dictionary_size) { 17 }
context "codepoint limit under dictionary size" do
let(:codepoint_limit) { dictionary_size / 2 }

it "raises exception" do
expect { subject.register }.to raise_error(/The incoming YAML document exceeds/)
end
end

context "dictionary is within limit" do
let(:dictionary_size) { 18 }

it "returns the exact translation" do
subject.register
subject.filter(event)
Expand Down
1 change: 1 addition & 0 deletions spec/fixtures/dict.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
a : 1
b : 2
c : 3
d : { "e": [1, "hello", true, "false", "1", "1.1"] }