Skip to content

Commit

Permalink
Remove sequence token
Browse files Browse the repository at this point in the history
  • Loading branch information
petergvizd committed Jan 27, 2023
1 parent 7287d1a commit 7a0cc76
Showing 1 changed file with 10 additions and 42 deletions.
52 changes: 10 additions & 42 deletions lib/fluent/plugin/out_cloudwatch_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ def start
end
options[:http_proxy] = @http_proxy if @http_proxy
@logs ||= Aws::CloudWatchLogs::Client.new(options)
@sequence_tokens = {}
@store_next_sequence_token_mutex = Mutex.new

@log_groups = {}

log.debug "Aws::CloudWatchLogs::Client initialized: log.level #{log.level} => #{options[:log_level]}"

@json_handler = case @json_handler
Expand Down Expand Up @@ -356,20 +355,6 @@ def scrub_record!(record)
end
end

def delete_sequence_token(group_name, stream_name)
@sequence_tokens[group_name].delete(stream_name)
end

def next_sequence_token(group_name, stream_name)
@sequence_tokens[group_name][stream_name]
end

def store_next_sequence_token(group_name, stream_name, token)
@store_next_sequence_token_mutex.synchronize do
@sequence_tokens[group_name][stream_name] = token
end
end

def put_events_by_chunk(group_name, stream_name, events)
chunk = []

Expand Down Expand Up @@ -413,9 +398,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
log_stream_name: stream_name,
}

token = next_sequence_token(group_name, stream_name)
args[:sequence_token] = token if token

begin
t = Time.now
response = @logs.put_log_events(args)
Expand All @@ -424,7 +406,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
"stream" => stream_name,
"events_count" => events.size,
"events_bytesize" => events_bytesize,
"sequence_token" => token,
"thread" => Thread.current.object_id,
"request_sec" => Time.now - t,
}
Expand All @@ -434,16 +415,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
else
log.debug "Called PutLogEvents API", request
end
rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException, Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException => err
sleep 1 # to avoid too many API calls
store_next_sequence_token(group_name, stream_name, err.expected_sequence_token)
log.warn "updating upload sequence token forcefully because unrecoverable error occured", {
"error" => err,
"log_group" => group_name,
"log_stream" => stream_name,
"new_sequence_token" => token,
}
retry_count += 1
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException => err
if @auto_create_stream && err.message == 'The specified log stream does not exist.'
log.warn 'Creating log stream because "The specified log stream does not exist." error is got', {
Expand All @@ -452,7 +423,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
"log_stream" => stream_name,
}
create_log_stream(group_name, stream_name)
delete_sequence_token(group_name, stream_name)
retry_count += 1
else
raise err
Expand Down Expand Up @@ -487,8 +457,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
if 0 < retry_count
log.warn "retry succeeded"
end

store_next_sequence_token(group_name, stream_name, response.next_sequence_token)
end

def create_log_group(group_name, log_group_aws_tags = nil, retention_in_days = nil)
Expand All @@ -497,7 +465,7 @@ def create_log_group(group_name, log_group_aws_tags = nil, retention_in_days = n
unless retention_in_days.nil?
put_retention_policy(group_name, retention_in_days)
end
@sequence_tokens[group_name] = {}
@log_groups[group_name] = []
rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
log.debug "Log group '#{group_name}' already exists"
end
Expand All @@ -517,18 +485,18 @@ def put_retention_policy(group_name, retention_in_days)
def create_log_stream(group_name, stream_name)
begin
@logs.create_log_stream(log_group_name: group_name, log_stream_name: stream_name)
@sequence_tokens[group_name] ||= {}
@sequence_tokens[group_name][stream_name] = nil
@sequence_tokens[group_name] ||= []
@sequence_tokens[group_name].push(stream_name)
rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
log.debug "Log stream '#{stream_name}' already exists"
end
end

def log_group_exists?(group_name)
if @sequence_tokens[group_name]
if @log_groups[group_name]
true
elsif check_log_group_existence(group_name)
@sequence_tokens[group_name] = {}
@log_groups[group_name] = []
true
else
false
Expand All @@ -547,12 +515,12 @@ def check_log_group_existence(group_name)
end

def log_stream_exists?(group_name, stream_name)
if not @sequence_tokens[group_name]
if not @log_groups[group_name]
false
elsif @sequence_tokens[group_name].has_key?(stream_name)
elsif @log_groups[group_name].include?(stream_name)
true
elsif (log_stream = find_log_stream(group_name, stream_name))
@sequence_tokens[group_name][stream_name] = log_stream.upload_sequence_token
@log_groups[group_name].push(stream_name)
true
else
false
Expand Down

0 comments on commit 7a0cc76

Please sign in to comment.