@@ -8,32 +8,77 @@ class EventImportWorker
88 def perform ( sqs_message = nil , data = nil )
99 log_prefix = "[Events:EventImportWorker]"
1010
11- if data . blank?
11+ event_data = event_data ( data )
12+
13+ if event_data . nil?
1214 Rails . logger . info ( "#{ log_prefix } Message data was blank" )
1315 return
1416 end
1517
16- log_identifier = "subj_id: #{ data [ "subjId" ] } , " \
17- "obj_id: #{ data [ "objId" ] } , " \
18- "source_id: #{ data [ "sourceId" ] } , " \
19- "relation_type_id: #{ data [ "relationTypeId" ] } "
18+ log_identifier = log_identifier ( event_data )
2019
2120 Rails . logger . info ( "#{ log_prefix } Start of event message processing for #{ log_identifier } " )
22- Rails . logger . info ( "#{ log_prefix } Searching for event with #{ log_identifier } " )
21+ Rails . logger . info ( "#{ log_prefix } Searching for event for #{ log_identifier } " )
2322
24- event = Event . find_by (
25- subj_id : data [ "subjId" ] ,
26- obj_id : data [ "objId" ] ,
27- source_id : data [ "sourceId" ] ,
28- relation_type_id : data [ "relationTypeId" ] ,
29- )
23+ event = find_event ( event_data )
3024
31- if event . blank ?
32- Rails . logger . info ( " #{ log_prefix } Creating a new event with #{ log_identifier } " )
25+ if event . nil ?
26+ create_event ( event_data , log_prefix , log_identifier )
3327 else
34- Rails . logger . info ( " #{ log_prefix } Update an existing event with #{ log_identifier } " )
28+ update_event ( event , event_data , log_prefix , log_identifier )
3529 end
3630
37- Rails . logger . info ( "#{ log_prefix } End of event message processing for #{ log_identifier } " )
31+ Rails . logger . info ( "#{ log_prefix } Completed processing event for #{ log_identifier } " )
32+ end
33+
34+ private
35+
36+ # Returns the SQS event data as a hash.
37+ # Will return nil if either a data or attributes field is missing.
38+ def event_data ( data )
39+ data_hash = JSON . parse ( data )
40+
41+ data_hash . dig ( "data" , "attributes" )
42+ end
43+
44+ # Returns a string which serves as an identifier for this event import worker run.
45+ # Consists of the event subj_id, obj_id, source_id and relation_type_id.
46+ def log_identifier ( event_data )
47+ "subj_id: #{ event_data [ "subjId" ] } , " \
48+ "obj_id: #{ event_data [ "objId" ] } , " \
49+ "source_id: #{ event_data [ "sourceId" ] } , " \
50+ "relation_type_id: #{ event_data [ "relationTypeId" ] } "
51+ end
52+
53+ # Searchs for an existing event using subj_id, obj_id, source_id and relation_type_id
54+ def find_event ( event_data )
55+ Event . find_by (
56+ subj_id : event_data [ "subjId" ] ,
57+ obj_id : event_data [ "objId" ] ,
58+ source_id : event_data [ "sourceId" ] ,
59+ relation_type_id : event_data [ "relationTypeId" ] ,
60+ )
61+ end
62+
63+ def create_event ( event_data , log_prefix , log_identifier )
64+ Rails . logger . info ( "#{ log_prefix } Creating a new event for #{ log_identifier } " )
65+
66+ event = EventFactory . create_from_sqs ( event_data )
67+
68+ if event . save
69+ Rails . logger . info ( "#{ log_prefix } Event successfully created for #{ log_identifier } " )
70+ elsif event . errors . any?
71+ Rails . logger . error ( "#{ log_prefix } Creating event failed for #{ log_identifier } : #{ event . errors . inspect } " )
72+ end
73+ end
74+
75+ def update_event ( event , event_data , log_prefix , log_identifier )
76+ Rails . logger . info ( "#{ log_prefix } Update an existing event for #{ log_identifier } " )
77+
78+ if event . update ( data )
79+ Rails . logger . info ( "#{ log_prefix } Event successfully updated for #{ log_identifier } " )
80+ else
81+ Rails . logger . error ( "#{ log_prefix } Updating event failed for #{ log_identifier } : #{ event . errors . inspect } " )
82+ end
3883 end
3984end
0 commit comments