diff --git a/README.md b/README.md index 34f7bdb..bcbbc9f 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Please note that the name of the plugin when used is `clickhouse`, it only suppo * `automatic_retries` (default: 1) - number of connect retry attempts to each host in `http_hosts` * `request_tolerance` (default: 5) - number of http request send retry attempts if response status code is not 200 * `backoff_time` (default: 3) - time to wait in seconds for next retry attempt of connect or request +* `skip_unknown` (0 or 1, default: 1) - skip unknown fields when inserting into clickhouse. Uses `--input_format_skip_unknown_fields` parameter Default batch size is 50, with a wait of at most 5 seconds per send. These can be tweaked with the parameters `flush_size` and `idle_flush_time` respectively. @@ -39,3 +40,4 @@ To build the gem yourself, use `gem build logstash-output-clickhouse.gemspec` in To install, run the following command, assuming the gem is in the local directory: `$LOGSTASH_HOME/bin/plugin install logstash-output-clickhouse-X.Y.Z.gem` +P.S. Tested on Logstash 7.1.1 diff --git a/lib/logstash/outputs/clickhouse.rb b/lib/logstash/outputs/clickhouse.rb index ea675e2..6b872c4 100644 --- a/lib/logstash/outputs/clickhouse.rb +++ b/lib/logstash/outputs/clickhouse.rb @@ -21,6 +21,8 @@ class LogStash::Outputs::ClickHouse < LogStash::Outputs::Base config :table, :validate => :string, :required => true + config :skip_unknown, :validate => :number, :default => 1, :inclusion => 0..1 + # Custom headers to use # format is `headers => ["X-My-Header", "%{host}"]` config :headers, :validate => :hash @@ -73,7 +75,7 @@ def register @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } @requests = Array.new - @http_query = "/?query=INSERT%20INTO%20#{table}%20FORMAT%20JSONEachRow" + @http_query = "/?input_format_skip_unknown_fields=#{skip_unknown}&query=INSERT%20INTO%20#{table}%20FORMAT%20JSONEachRow" @hostnames_pool = parse_http_hosts(http_hosts, @@ -126,32 +128,12 @@ def receive(event) buffer_receive(event) end - def mutate( src ) - res = {} - @mutations.each_pair do |dstkey, source| - case source - when String then - scrkey = source - next unless src.key?(scrkey) - - res[dstkey] = src[scrkey] - when Array then - scrkey = source[0] - next unless src.key?(scrkey) - pattern = source[1] - replace = source[2] - res[dstkey] = src[scrkey].sub( Regexp.new(pattern), replace ) - end - end - res - end - public def flush(events, close=false) documents = "" #this is the string of hashes that we push to Fusion as documents events.each do |event| - documents << LogStash::Json.dump( mutate( event.to_hash() ) ) << "\n" + documents << LogStash::Json.dump( event ) << "\n" end hosts = get_host_addresses() diff --git a/logstash-output-clickhouse.gemspec b/logstash-output-clickhouse.gemspec index fdb1be9..ea397d6 100644 --- a/logstash-output-clickhouse.gemspec +++ b/logstash-output-clickhouse.gemspec @@ -20,7 +20,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - s.add_runtime_dependency "logstash-mixin-http_client", ">= 6.0.0", "<= 7.0.0" + s.add_runtime_dependency "logstash-mixin-http_client", ">= 6.0.0", "< 9.0.0" s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0" s.add_development_dependency 'logstash-devutils'