11# frozen_string_literal: true
22
33require "sbmt/outbox/metrics/utils"
4+ require "sbmt/outbox/v2/redis_item_meta"
45
56module Sbmt
67 module Outbox
78 class ProcessItem < Sbmt ::Outbox ::DryInteractor
89 param :item_class , reader : :private
910 param :item_id , reader : :private
1011 option :worker_version , reader : :private , optional : true , default : -> { 1 }
12+ option :cache_ttl_sec , reader : :private , optional : true , default : -> { 5 * 60 }
13+ option :redis , reader : :private , optional : true , default : -> { }
1114
1215 METRICS_COUNTERS = %i[ error_counter retry_counter sent_counter fetch_error_counter discarded_counter ] . freeze
1316
14- delegate :log_success , :log_info , :log_failure , to : "Sbmt::Outbox.logger"
17+ delegate :log_success , :log_info , :log_failure , :log_debug , to : "Sbmt::Outbox.logger"
1518 delegate :item_process_middlewares , to : "Sbmt::Outbox"
1619 delegate :box_type , :box_name , :owner , to : :item_class
1720
18- attr_accessor :process_latency
21+ attr_accessor :process_latency , :retry_latency
1922
2023 def call
2124 log_success (
@@ -26,9 +29,23 @@ def call
2629 item = nil
2730
2831 item_class . transaction do
29- item = yield fetch_item
32+ item = yield fetch_item_and_lock_for_update
33+
34+ cached_item = fetch_redis_item_meta ( redis_item_key ( item_id ) )
35+ if cached_retries_exceeded? ( cached_item )
36+ msg = "max retries exceeded: marking item as failed based on cached data: #{ cached_item } "
37+ item . set_errors_count ( cached_item . errors_count )
38+ track_failed ( msg , item )
39+ next Failure ( msg )
40+ end
41+
42+ if cached_greater_errors_count? ( item , cached_item )
43+ log_failure ( "inconsistent item: cached_errors_count:#{ cached_item . errors_count } > db_errors_count:#{ item . errors_count } : setting errors_count based on cached data:#{ cached_item } " )
44+ item . set_errors_count ( cached_item . errors_count )
45+ end
3046
3147 if item . processed_at?
48+ self . retry_latency = Time . current - item . created_at
3249 item . config . retry_strategies . each do |retry_strategy |
3350 yield check_retry_strategy ( item , retry_strategy )
3451 end
@@ -62,7 +79,48 @@ def call
6279
6380 private
6481
65- def fetch_item
82+ def cached_retries_exceeded? ( cached_item )
83+ return false unless cached_item
84+
85+ item_class . max_retries_exceeded? ( cached_item . errors_count )
86+ end
87+
88+ def cached_greater_errors_count? ( db_item , cached_item )
89+ return false unless cached_item
90+
91+ cached_item . errors_count > db_item . errors_count
92+ end
93+
94+ def fetch_redis_item_meta ( redis_key )
95+ return if worker_version < 2
96+
97+ data = redis . call ( "GET" , redis_key )
98+ return if data . blank?
99+
100+ Sbmt ::Outbox ::V2 ::RedisItemMeta . deserialize! ( data )
101+ rescue => ex
102+ log_debug ( "error while fetching redis meta: #{ ex . message } " )
103+ nil
104+ end
105+
106+ def set_redis_item_meta ( item , ex )
107+ return if worker_version < 2
108+ return if item . nil?
109+
110+ redis_key = redis_item_key ( item . id )
111+ error_msg = format_exception_error ( ex , extract_cause : false )
112+ data = Sbmt ::Outbox ::V2 ::RedisItemMeta . new ( errors_count : item . errors_count , error_msg : error_msg )
113+ redis . call ( "SET" , redis_key , data . to_s , "EX" , cache_ttl_sec )
114+ rescue => ex
115+ log_debug ( "error while fetching redis meta: #{ ex . message } " )
116+ nil
117+ end
118+
119+ def redis_item_key ( item_id )
120+ "#{ box_type } :#{ item_class . box_name } :#{ item_id } "
121+ end
122+
123+ def fetch_item_and_lock_for_update
66124 item = item_class
67125 . lock ( "FOR UPDATE" )
68126 . find_by ( id : item_id )
@@ -171,6 +229,7 @@ def track_failed(ex_or_msg, item = nil)
171229 item . pending!
172230 end
173231 rescue => e
232+ set_redis_item_meta ( item , e )
174233 log_error_handling_error ( e , item )
175234 end
176235
@@ -259,6 +318,7 @@ def report_metrics(item)
259318 end
260319
261320 track_process_latency ( labels ) if process_latency
321+ track_retry_latency ( labels ) if retry_latency
262322
263323 return unless counters [ :sent_counter ] . positive?
264324
@@ -279,6 +339,10 @@ def counters
279339 def track_process_latency ( labels )
280340 Yabeda . outbox . process_latency . measure ( labels , process_latency . round ( 3 ) )
281341 end
342+
343+ def track_retry_latency ( labels )
344+ Yabeda . outbox . retry_latency . measure ( labels , retry_latency . round ( 3 ) )
345+ end
282346 end
283347 end
284348end
0 commit comments