@@ -13,7 +13,8 @@ class BaseDeleteStaleItemsJob < Outbox.active_job_base_class
1313 class << self
1414 def enqueue
1515 item_classes . each do |item_class |
16- perform_later ( item_class . to_s )
16+ delay = rand ( 15 ) . seconds
17+ set ( wait : delay ) . perform_later ( item_class . to_s )
1718 end
1819 end
1920
@@ -41,12 +42,13 @@ def perform(item_class_name)
4142
4243 lock_manager . lock ( "#{ self . class . name } :#{ item_class_name } :lock" , LOCK_TTL ) do |locked |
4344 if locked
44- duration = item_class . config . retention
45+ duration_failed = item_class . config . retention
46+ duration_delivered = item_class . config . retention_delivered_items
4547
46- validate_retention! ( duration )
48+ validate_retention! ( duration_failed )
4749
4850 logger . with_tags ( box_type : box_type , box_name : box_name ) do
49- delete_stale_items ( Time . current - duration )
51+ delete_stale_items ( Time . current - duration_failed , Time . current - duration_delivered )
5052 end
5153 else
5254 logger . log_info ( "Failed to acquire lock #{ self . class . name } :#{ item_class_name } " )
@@ -58,25 +60,25 @@ def perform(item_class_name)
5860
5961 private
6062
61- def validate_retention! ( duration )
62- return if duration >= MIN_RETENTION_PERIOD
63+ def validate_retention! ( duration_failed )
64+ return if duration_failed >= MIN_RETENTION_PERIOD
6365
6466 raise "Retention period for #{ box_name } must be longer than #{ MIN_RETENTION_PERIOD . inspect } "
6567 end
6668
67- def delete_stale_items ( waterline )
68- logger . log_info ( "Start deleting #{ box_type } items for #{ box_name } older than #{ waterline } " )
69+ def delete_stale_items ( waterline_failed , waterline_delivered )
70+ logger . log_info ( "Start deleting #{ box_type } items for #{ box_name } older than: failed and discarded items #{ waterline_failed } and delivered items #{ waterline_delivered } " )
6971
7072 case database_type
7173 when :postgresql
72- postgres_delete_in_batches ( waterline )
74+ postgres_delete_in_batches ( waterline_failed , waterline_delivered )
7375 when :mysql
74- mysql_delete_in_batches ( waterline )
76+ mysql_delete_in_batches ( waterline_failed , waterline_delivered )
7577 else
7678 raise "Unsupported database type"
7779 end
7880
79- logger . log_info ( "Successfully deleted #{ box_type } items for #{ box_name } older than #{ waterline } " )
81+ logger . log_info ( "Successfully deleted #{ box_type } items for #{ box_name } older than: failed and discarded items #{ waterline_failed } and delivered items #{ waterline_delivered } " )
8082 end
8183
8284 # Deletes stale items from PostgreSQL database in batches
@@ -90,12 +92,22 @@ def delete_stale_items(waterline)
9092 # WHERE "items"."id" IN (
9193 # SELECT "items"."id"
9294 # FROM "items"
93- # WHERE "items"."created_at" < '2023-05-01 00:00:00'
95+ # WHERE (
96+ # "items"."status" = 1 AND "items"."created_at" < '2023-05-01 00:00:00'
97+ # )
9498 # LIMIT 1000
9599 # )
96- def postgres_delete_in_batches ( waterline )
100+ def postgres_delete_in_batches ( waterline_failed , waterline_delivered )
97101 table = item_class . arel_table
98- condition = table [ :created_at ] . lt ( waterline )
102+
103+ status_delivered = item_class . statuses [ :delivered ]
104+ status_failed_discarded = item_class . statuses . values_at ( :failed , :discarded )
105+
106+ delete_items_in_batches ( table , table [ :status ] . eq ( status_delivered ) . and ( table [ :created_at ] . lt ( waterline_delivered ) ) )
107+ delete_items_in_batches ( table , table [ :status ] . in ( status_failed_discarded ) . and ( table [ :created_at ] . lt ( waterline_failed ) ) )
108+ end
109+
110+ def delete_items_in_batches ( table , condition )
99111 subquery = table
100112 . project ( table [ :id ] )
101113 . where ( condition )
@@ -129,14 +141,25 @@ def postgres_delete_in_batches(waterline)
129141 #
130142 # Example SQL generated for deletion:
131143 # DELETE FROM `items`
132- # WHERE `items`.`created_at` < '2023-05-01 00:00:00'
144+ # WHERE (
145+ # `items`.`status` = 1 AND `items`.`created_at` < '2023-05-01 00:00:00'
146+ # )
133147 # LIMIT 1000
134- def mysql_delete_in_batches ( waterline )
148+ def mysql_delete_in_batches ( waterline_failed , waterline_delivered )
149+ status_delivered = item_class . statuses [ :delivered ]
150+ status_failed_discarded = [ item_class . statuses . values_at ( :failed , :discarded ) ]
151+
152+ delete_items_in_batches_mysql (
153+ item_class . where ( status : status_delivered , created_at : ...waterline_delivered )
154+ )
155+ delete_items_in_batches_mysql (
156+ item_class . where ( status : status_failed_discarded ) . where ( created_at : ...waterline_failed )
157+ )
158+ end
159+
160+ def delete_items_in_batches_mysql ( query )
135161 loop do
136- deleted_count = item_class
137- . where ( created_at : ...waterline )
138- . limit ( BATCH_SIZE )
139- . delete_all
162+ deleted_count = query . limit ( BATCH_SIZE ) . delete_all
140163
141164 logger . log_info ( "Deleted #{ deleted_count } #{ box_type } items for #{ box_name } items" )
142165 break if deleted_count == 0
0 commit comments