Skip to content

Commit c6d7ffa

Browse files
* Initial commit
1 parent 0e9f300 commit c6d7ffa

10 files changed

+543
-379
lines changed

build.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
export LOGSTASH_SOURCE=1
2+
export LOGSTASH_PATH=/softwares/logstash
3+
export JRUBY_HOME=$LOGSTASH_PATH/vendor/jruby
4+
export JAVA_HOME=$LOGSTASH_PATH/jdk
5+
export PATH=$PATH:/softwares/logstash/vendor/jruby/bin:/softwares/logstash/bin
6+
jruby -S gem install bundler -v 2.4.19
7+
jruby -S bundle install
8+
gem build *.gemspec
9+
rm Gemfile.lock
10+
/softwares/logstash/bin/logstash-plugin uninstall logstash-output-kusto
11+
/softwares/logstash/bin/logstash-plugin install logstash-output-kusto-3.0.0-java.gem

lib/logstash/outputs/kusto.rb

Lines changed: 85 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -4,153 +4,119 @@
44
require 'logstash/namespace'
55
require 'logstash/errors'
66

7-
require 'logstash/outputs/kusto/ingestor'
8-
require 'logstash/outputs/kusto/custom_size_based_buffer'
7+
require 'logstash/outputs/kusto/customSizeBasedBuffer'
98
require 'logstash/outputs/kusto/kustoLogstashConfiguration'
9+
require 'logstash/outputs/kusto/logStashFlushBuffer'
1010

1111
##
1212
# This plugin sends messages to Azure Kusto in batches.
1313
#
1414
class LogStash::Outputs::Kusto < LogStash::Outputs::Base
15-
config_name 'kusto'
16-
concurrency :shared
15+
config_name 'kusto'
16+
concurrency :shared
1717

18-
FIELD_REF = /%\{[^}]+\}/
18+
FIELD_REF = /%\{[^}]+\}/
1919

20-
attr_reader :failure_path
20+
attr_reader :failure_path
2121

22-
# The Kusto endpoint for ingestion related communication. You can see it on the Azure Portal.
23-
config :ingest_url, validate: :string, required: true
22+
# The Kusto endpoint for ingestion related communication. You can see it on the Azure Portal.
23+
config :ingest_url, validate: :string, required: true
24+
# The following are the credentials used to connect to the Kusto service
25+
# application id
26+
config :app_id, validate: :string, required: false
27+
# application key (secret)
28+
config :app_key, validate: :password, required: false
29+
# aad tenant id
30+
config :app_tenant, validate: :string, default: nil
31+
# managed identity id
32+
config :managed_identity, validate: :string, default: nil
33+
# CLI credentials for dev-test
34+
config :cli_auth, validate: :boolean, default: false
35+
# The following are the data settings that impact where events are written to
36+
# Database name
37+
config :database, validate: :string, required: true
38+
# Target table name
39+
config :table, validate: :string, required: true
40+
# Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table.
41+
# Note that this must be in JSON format, as this is the interface between Logstash and Kusto
42+
# Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings
43+
config :json_mapping, validate: :string, default: nil
2444

25-
# The following are the credentials used to connect to the Kusto service
26-
# application id
27-
config :app_id, validate: :string, required: false
28-
# application key (secret)
29-
config :app_key, validate: :password, required: false
30-
# aad tenant id
31-
config :app_tenant, validate: :string, default: nil
32-
# managed identity id
33-
config :managed_identity, validate: :string, default: nil
34-
# CLI credentials for dev-test
35-
config :cli_auth, validate: :boolean, default: false
36-
# The following are the data settings that impact where events are written to
37-
# Database name
38-
config :database, validate: :string, required: true
39-
# Target table name
40-
config :table, validate: :string, required: true
41-
# Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table.
42-
# Note that this must be in JSON format, as this is the interface between Logstash and Kusto
43-
# Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings
44-
config :json_mapping, validate: :string, default: nil
45+
# Mapping name - deprecated, use json_mapping
46+
config :mapping, validate: :string, deprecated: true
4547

46-
# Mapping name - deprecated, use json_mapping
47-
config :mapping, validate: :string, deprecated: true
4848

49-
# Path - deprecated
50-
config :path, validate: :string, deprecated: true
49+
# TODO: will be used to route events to many tables according to event properties
50+
config :dynamic_event_routing, validate: :boolean, default: false
5151

52-
# TODO: will be used to route events to many tables according to event properties
53-
config :dynamic_event_routing, validate: :boolean, default: false
52+
# Specify how many files can be uploaded concurrently
53+
config :upload_concurrent_count, validate: :number, default: 3
5454

55-
# Specify how many files can be uploaded concurrently
56-
config :upload_concurrent_count, validate: :number, default: 3
55+
# Specify how many files can be kept in the upload queue before the main process
56+
# starts processing them in the main thread (not healthy)
57+
config :upload_queue_size, validate: :number, default: 30
5758

58-
# Specify how many files can be kept in the upload queue before the main process
59-
# starts processing them in the main thread (not healthy)
60-
config :upload_queue_size, validate: :number, default: 30
59+
# Host of the proxy , is an optional field. Can connect directly
60+
config :proxy_host, validate: :string, required: false
6161

62-
# Host of the proxy , is an optional field. Can connect directly
63-
config :proxy_host, validate: :string, required: false
62+
# Port where the proxy runs , defaults to 80. Usually a value like 3128
63+
config :proxy_port, validate: :number, required: false , default: 80
6464

65-
# Port where the proxy runs , defaults to 80. Usually a value like 3128
66-
config :proxy_port, validate: :number, required: false , default: 80
65+
# Check Proxy URL can be over http or https. Do we need it this way or ignore this & remove this
66+
config :proxy_protocol, validate: :string, required: false , default: 'https'
6767

68-
# Check Proxy URL can be over http or https. Do we need it this way or ignore this & remove this
69-
config :proxy_protocol, validate: :string, required: false , default: 'http'
68+
# Maximum size of the buffer before it gets flushed, defaults to 10MB
69+
config :max_batch_size, validate: :number, required: false , default: 10
7070

71-
# Maximum size of the buffer before it gets flushed, defaults to 10MB
72-
config :max_size, validate: :number, required: false , default: 10
71+
# Maximum interval (in seconds) before the buffer gets flushed, defaults to 10
72+
config :plugin_flush_interval, validate: :number, required: false , default: 10
7373

74-
# Maximum interval (in seconds) before the buffer gets flushed, defaults to 10
75-
config :max_interval, validate: :number, required: false , default: 10
74+
# Maximum interval (in seconds) before the buffer gets flushed, defaults to 10
75+
config :max_items, validate: :number, required: false , default: 100
7676

77-
# Latch timeout in seconds, defaults to 60
78-
config :latch_timeout, validate: :number, required: false, default: 60
7977

80-
81-
default :codec, 'json_lines'
78+
default :codec, 'json_lines'
8279

8380
def register
84-
@io_mutex = Mutex.new
85-
86-
final_mapping = json_mapping
87-
final_mapping = mapping if final_mapping.nil? || final_mapping.empty?
88-
89-
executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
90-
max_threads: upload_concurrent_count,
91-
max_queue: upload_queue_size,
92-
fallback_policy: :caller_runs)
93-
94-
kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping)
81+
kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping)
9582
kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth)
9683
kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false)
97-
@kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger)
98-
@ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, latch_timeout, executor)
99-
100-
# Deprecation warning for path
101-
if @path
102-
@logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.")
103-
end
104-
sleep(30)
84+
kusto_flush_config = LogStash::Outputs::KustoInternal::KustoFlushConfiguration.new(max_items, plugin_flush_interval,max_batch_size)
85+
kusto_upload_config = LogStash::Outputs::KustoInternal::KustoUploadConfiguration.new(upload_concurrent_count, upload_queue_size)
86+
kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, kusto_flush_config, kusto_upload_config,@logger)
87+
kusto_logstash_configuration.validate_config
10588
# Initialize the custom buffer with size and interval
106-
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
107-
flush_buffer(events)
108-
end
109-
end
110-
111-
112-
public
113-
def multi_receive_encoded(events_and_encoded)
114-
events_and_encoded.each do |event, encoded|
115-
begin
116-
@buffer << encoded
117-
rescue => e
118-
@logger.error("Error processing event: #{e.message}")
119-
end
120-
end
89+
@buffer = LogStash::Outputs::KustoOutputInternal::LogStashEventsBatcher.new(kusto_logstash_configuration,@logger)
12190
end
12291

123-
def close
124-
@logger.info("Closing Kusto output plugin")
125-
begin
126-
@buffer.shutdown unless @buffer.nil?
127-
@logger.info("Buffer shutdown") unless @buffer.nil?
128-
rescue => e
129-
@logger.error("Error shutting down buffer: #{e.message}")
130-
@logger.error(e.backtrace.join("\n"))
131-
end
132-
133-
begin
134-
@ingestor.stop unless @ingestor.nil?
135-
@logger.info("Ingestor stopped") unless @ingestor.nil?
136-
rescue => e
137-
@logger.error("Error stopping ingestor: #{e.message}")
138-
@logger.error(e.backtrace.join("\n"))
139-
end
140-
@logger.info("Kusto output plugin Closed")
141-
end
142-
143-
public
144-
def flush_buffer(events)
145-
return if events.empty?
146-
@logger.info("flush_buffer with #{events.size} events")
147-
begin
148-
@ingestor.upload_async(events.join)
149-
rescue => e
150-
@logger.error("Error during flush: #{e.message}")
151-
@logger.error(e.backtrace.join("\n"))
152-
raise e # Exception is raised to trigger the rescue block in buffer_flush
153-
end
154-
end
15592

93+
public
94+
def multi_receive_encoded(events_and_encoded)
95+
events_and_encoded.each do |event, encoded|
96+
begin
97+
@buffer.batch_event(event.to_hash)
98+
rescue => e
99+
@logger.error("Error processing event: #{e.message}")
100+
end
101+
end
102+
end
103+
104+
def close
105+
# @logger.info("Closing Kusto output plugin")
106+
# begin
107+
# @buffer.shutdown unless @buffer.nil?
108+
# @logger.info("Buffer shutdown") unless @buffer.nil?
109+
# rescue => e
110+
# @logger.error("Error shutting down buffer: #{e.message}")
111+
# @logger.error(e.backtrace.join("\n"))
112+
# end
113+
# begin
114+
# @ingestor.stop unless @ingestor.nil?
115+
# @logger.info("Ingestor stopped") unless @ingestor.nil?
116+
# rescue => e
117+
# @logger.error("Error stopping ingestor: #{e.message}")
118+
# @logger.error(e.backtrace.join("\n"))
119+
# end
120+
# @logger.info("Kusto output plugin Closed")
121+
end
156122
end

0 commit comments

Comments
 (0)