Class: HerdstWorker::Queue::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/herdst_worker/queue/runner.rb

Direct Known Subclasses

Processor

Instance Method Summary collapse

Instance Method Details

#execute_message!(company_id, user_id, data) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/herdst_worker/queue/runner.rb', line 121

def execute_message!(company_id, user_id, data)
    action = data["configurationId"]
    action_name = action.camelize
    
    unless self.app.config.actions["enabled"].include?(action_name)
        message = "Invalid action. #{action} is not an enabled action. Please add this action to the config file."
        
        if self.app.config.is_dev?
            raise message
        else
            Sentry.capture_message(message)
            return
        end
    end
    
    action_name.constantize.send(:invoke, company_id, user_id, data)
end

#process_message!(message, raw_message = nil, will_fail_permanently = false) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/herdst_worker/queue/runner.rb', line 9

def process_message!(message, raw_message = nil, will_fail_permanently = false)
    sent_timestamp = raw_message.attributes.include?("SentTimestamp") ? 
        raw_message.attributes["SentTimestamp"].to_i : 
        nil
    trigger_timestamp = raw_message.message_attributes.include?("triggerTimestamp") ? 
        raw_message.message_attributes["triggerTimestamp"]["string_value"].to_i : 
        nil
    expiry = raw_message.message_attributes.include?("expiry") ? 
        raw_message.message_attributes["expiry"]["string_value"].to_i : 
        nil
    
    if expiry && (expiry > 0) && ((Time.now.utc.to_i * 1000) > expiry)
        self.app.logger.queue.info "Job has expired, not running. #{message.inspect}"
        
        return Concurrent::Promise.new {}
    end
    
    if message["Type"] != nil and message["Type"] == "Notification"
        if message["Message"].is_a? String
            message["Message"] = JSON.parse(message["Message"])
        end
        
        # Get the type
        if (message.include? "Subject") && (message["Subject"].include? "Elastic Transcoder")
            type = "Transcoder"
        else
            type = message["Message"]["notificationType"]
        end
        
        # Update the message with sent and triggered timestamp
        message["Message"]["sentTimestamp"] = sent_timestamp
        message["Message"]["triggerTimestamp"] = trigger_timestamp || sent_timestamp
        
        # Update the message with configuration Id
        message["Message"]["configurationId"] = "notification#{type}"
        
        # Since zips take a log time to process we might need to use:
        # poller.change_message_visibility_timeout(msg, 60)
        # To make sure other workers don't pick up the job
        return Concurrent::Promise.new {
            if !self.ignored_notifications.include? type
                execute_message!(nil, nil, message["Message"])
            end
        }
    end
    
    if message["Records"].is_a? Array
        execution_promise = nil
        execution_data = []
        
        message["Records"].each do |record|
            data_source = record["eventSource"].split(":")
            data_origin = data_source.first
            data_operation = data_source.last
            record_data = record[data_operation]
            company_id = nil
            user_id = nil
            
            # Update the message with sent and triggered timestamp
            record_data["sentTimestamp"] = sent_timestamp
            record_data["triggerTimestamp"] = trigger_timestamp || sent_timestamp
            
            # Id the event origin is s3 and an object was created overwrite the configuration ID
            if data_origin == "aws" && data_operation == "s3" && record["eventName"] == "ObjectCreated:Put"
                record_data["configurationId"] = "objectCreated"
            end

            execution_data << record_data
            
            if data_origin === "application" and record.include? "userIdentity"
                company_id = record["userIdentity"]["companyId"]
                user_id = record["userIdentity"]["principalId"]
            end
            
            if execution_promise == nil
                execution_promise = Concurrent::Promise.new {
                    execute_message!(company_id, user_id, record_data)
                }
            else
                execution_promise = execution_promise.then {
                    execute_message!(company_id, user_id, record_data)
                }
            end
        end
        
        return Concurrent::Promise.new {} if execution_promise == nil
        return execution_promise.rescue { |ex|
            execution_data.each do |data|
                fail_action_permanently(ex, data) if will_fail_permanently
            end
            
            raise ex
        }
    end
    
    if message["source"] != nil and message["source"].starts_with?("aws.")
        action_name = message["source"].split(".").last
        
        message_record = message.dup
        message_record["configurationId"] = "Event#{action_name.camelize}"
        message_record["sentTimestamp"] = sent_timestamp
        message_record["triggerTimestamp"] = trigger_timestamp || sent_timestamp

        return Concurrent::Promise.new {
            execute_message!(nil, nil, message_record)
        }
    end

    return Concurrent::Promise.new {}
end