Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Use dynamodb for sincedb storage #55

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 129 additions & 18 deletions lib/logstash/inputs/cloudwatch_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "aws-sdk"
require "logstash/inputs/cloudwatch_logs/patch"
require "fileutils"
require "json"

Aws.eager_autoload!

Expand Down Expand Up @@ -51,6 +52,9 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base
config :start_position, :default => 'beginning'


# use dynamodb to store the sincedb
config :dynamodb_sincedb_table, :validate => :string, :default => nil

# def register
public
def register
Expand All @@ -60,11 +64,33 @@ def register
@sincedb = {}

check_start_position_validity

Aws::ConfigService::Client.new(aws_options_hash)
@cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash)

if @sincedb_path.nil?
aws_options = aws_options_hash
aws_options[:region]=ENV["AWS_REGION"]

Aws::ConfigService::Client.new(aws_options)
@cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options)

if !@dynamodb_sincedb_table.nil?
# use dynamodb for sincedb stuff
@ddb=Aws::DynamoDB::Client.new(aws_options)
begin
resp = @ddb.describe_table(
{
table_name: @dynamodb_sincedb_table,
})
if resp.nil?
@logger.debug("dynamodb table not found - create it #{@dynamodb_sincedb_table}")
create_ddb_table
else
@logger.debug("dynamodb table found #{@dynamodb_sincedb_table} - #{resp.to_json}")
end
rescue Aws::DynamoDB::Errors::ResourceNotFoundException
# table doesn't exist - create it
@logger.debug("dynamodb table needs creating #{@dynamodb_sincedb_table}")
create_ddb_table
end
end
if @dynamodb_sincedb_table.nil? && @sincedb_path.nil?
if settings
datapath = File.join(settings.get_value("path.data"), "plugins", "inputs", "cloudwatch_logs")
# Ensure that the filepath exists before writing, since it's deeply nested.
Expand All @@ -75,7 +101,7 @@ def register

# This section is going to be deprecated eventually, as path.data will be
# the default, not an environment variable (SINCEDB_DIR or HOME)
if @sincedb_path.nil? # If it is _still_ nil...
if @dynamodb_sincedb_table.nil? && @sincedb_path.nil? # If it is _still_ nil...
if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
@logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
"to keep track of the files I'm watching. Either set " \
Expand Down Expand Up @@ -158,6 +184,54 @@ def priority_of(group)
@priority.index(group) || -1
end

public
def create_ddb_table
@logger.debug("Creating dynamodb table: #{@dynamodb_sincedb_table}")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any previous-art on official logstash plugins using dynamo for state?

I see the value in supporting dynamo, but am not sure I like the auto-creation behaviour. I think I would prefer we error and boot, and require the user to correctly create the table.

We could provide documentation on the correct schema, but not doing it automatically would prevent any issues arising from misconfiguration which may cause it to go back and re-read histories, which could be unintended.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not see anything else use DDB, so just went with what worked for us.

Regarding autocreation - thinking analogously: a sincedb filesystem file based record is also created automatically? I like autocreation because it makes my deployment simpler, especially as we may end up with quite a few of these.

However, I see your point too - perhaps we should have a configuration parameter which switches between autocreate and error behaviours?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with a configuration to change the behaviour - defaulting to auto-create will probably reduce number of issues logged, but also allow more control for the situation I was concerned about.

resp = @ddb.create_table(
{
attribute_definitions: [
{
attribute_name: "GroupName",
attribute_type: "S",
},
],
key_schema: [
{
attribute_name: "GroupName",
key_type: "HASH",
},
],
provisioned_throughput: {
read_capacity_units: 1,
write_capacity_units: 1,
},
table_name: @dynamodb_sincedb_table,
})
begin
resp = @ddb.describe_table(
{
table_name: @dynamodb_sincedb_table,
})
rescue Aws::DynamoDB::Errors::ResourceNotFoundException
@logger.debug("not found checking dynamodb table: #{@dynamodb_sincedb_table}")

end

until !resp.nil? && resp.table.table_status == "ACTIVE"
@logger.debug("waiting before checking dynamodb table: #{@dynamodb_sincedb_table}")
sleep 5
begin
resp = @ddb.describe_table(
{
table_name: @dynamodb_sincedb_table,
})
rescue Aws::DynamoDB::Errors::ResourceNotFoundException
@logger.debug("not found again checking dynamodb table: #{@dynamodb_sincedb_table}")
end
end
@logger.debug("Done creating dynamodb table: #{@dynamodb_sincedb_table}")
end

public
def determine_start_position(groups, sincedb)
groups.each do |group|
Expand Down Expand Up @@ -195,8 +269,12 @@ def process_group(group)
process_log(event, group)
end

_sincedb_write

unless @dynamodb_sincedb_table.nil?
update_ddb_group(group)
else
_sincedb_write
end

next_token = resp.next_token
break if next_token.nil?
end
Expand Down Expand Up @@ -229,23 +307,56 @@ def parse_time(data)

private
def _sincedb_open
begin
File.open(@sincedb_path) do |db|
@logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}")
db.each do |line|
group, pos = line.split(" ", 2)
@logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}")
@sincedb[group] = pos.to_i
unless @dynamodb_sincedb_table.nil?
#get the sincedb from ddb
begin
@logger.debug? && @logger.debug("_sincedb_open: reading from DynmanoDB: #{@dynamodb_sincedb_table}")
resp = @ddb.scan(
{
table_name: @dynamodb_sincedb_table,
}
)
resp.items.each do |item|
@logger.debug? && @logger.debug("_sincedb_open: setting #{item["GroupName"]} to #{item["StartPosition"]}")
@sincedb[item["GroupName"]] = item["StartPosition"].to_i
end
rescue
#problem opening DynamoDB
@logger.debug? && @logger.debug("_sincedb_open: error: #{@dynamodb_sincedb_table}: #{$!}")
end
else
begin
File.open(@sincedb_path) do |db|
@logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}")
db.each do |line|
group, pos = line.split(" ", 2)
@logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}")
@sincedb[group] = pos.to_i
end
end
rescue
#No existing sincedb to load
@logger.debug? && @logger.debug("_sincedb_open: error: #{@sincedb_path}: #{$!}")
end
rescue
#No existing sincedb to load
@logger.debug? && @logger.debug("_sincedb_open: error: #{@sincedb_path}: #{$!}")
end
end # def _sincedb_open

private
def update_ddb_group(group)
resp = @ddb.put_item(
{
table_name: @dynamodb_sincedb_table,
item: {
"GroupName" => group,
"StartPosition" => "#{@sincedb[group]}"
}
}
)
end
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if we errored here due to something like dynamo throttles? Should special handling be created for that case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I should have put in some error checking here :)


private
def _sincedb_write

begin
IO.write(@sincedb_path, serialize_sincedb, 0)
rescue Errno::EACCES
Expand Down