9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/herdst_worker/queue/runner.rb', line 9
def process_message!(message, raw_message = nil, will_fail_permanently = false)
sent_timestamp = raw_message.attributes.include?("SentTimestamp") ?
raw_message.attributes["SentTimestamp"].to_i :
nil
trigger_timestamp = raw_message.message_attributes.include?("triggerTimestamp") ?
raw_message.message_attributes["triggerTimestamp"]["string_value"].to_i :
nil
expiry = raw_message.message_attributes.include?("expiry") ?
raw_message.message_attributes["expiry"]["string_value"].to_i :
nil
if expiry && (expiry > 0) && ((Time.now.utc.to_i * 1000) > expiry)
self.app.logger.queue.info "Job has expired, not running. #{message.inspect}"
return Concurrent::Promise.new {}
end
if message["Type"] != nil and message["Type"] == "Notification"
if message["Message"].is_a? String
message["Message"] = JSON.parse(message["Message"])
end
if (message.include? "Subject") && (message["Subject"].include? "Elastic Transcoder")
type = "Transcoder"
else
type = message["Message"]["notificationType"]
end
message["Message"]["sentTimestamp"] = sent_timestamp
message["Message"]["triggerTimestamp"] = trigger_timestamp || sent_timestamp
message["Message"]["configurationId"] = "notification#{type}"
return Concurrent::Promise.new {
if !self.ignored_notifications.include? type
execute_message!(nil, nil, message["Message"])
end
}
end
if message["Records"].is_a? Array
execution_promise = nil
execution_data = []
message["Records"].each do |record|
data_source = record["eventSource"].split(":")
data_origin = data_source.first
data_operation = data_source.last
record_data = record[data_operation]
company_id = nil
user_id = nil
record_data["sentTimestamp"] = sent_timestamp
record_data["triggerTimestamp"] = trigger_timestamp || sent_timestamp
if data_origin == "aws" && data_operation == "s3" && record["eventName"] == "ObjectCreated:Put"
record_data["configurationId"] = "objectCreated"
end
execution_data << record_data
if data_origin === "application" and record.include? "userIdentity"
company_id = record["userIdentity"]["companyId"]
user_id = record["userIdentity"]["principalId"]
end
if execution_promise == nil
execution_promise = Concurrent::Promise.new {
execute_message!(company_id, user_id, record_data)
}
else
execution_promise = execution_promise.then {
execute_message!(company_id, user_id, record_data)
}
end
end
return Concurrent::Promise.new {} if execution_promise == nil
return execution_promise.rescue { |ex|
execution_data.each do |data|
fail_action_permanently(ex, data) if will_fail_permanently
end
raise ex
}
end
if message["source"] != nil and message["source"].starts_with?("aws.")
action_name = message["source"].split(".").last
message_record = message.dup
message_record["configurationId"] = "Event#{action_name.camelize}"
message_record["sentTimestamp"] = sent_timestamp
message_record["triggerTimestamp"] = trigger_timestamp || sent_timestamp
return Concurrent::Promise.new {
execute_message!(nil, nil, message_record)
}
end
return Concurrent::Promise.new {}
end
|