Class: HerdstWorker::Queue::Processor

Inherits:
Runner
  • Object
show all
Defined in:
lib/herdst_worker/queue/processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Runner

#execute_message!, #process_message!

Constructor Details

#initialize(app, enabled, queue_url, queue_wait_time) ⇒ Processor

Returns a new instance of Processor.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/herdst_worker/queue/processor.rb', line 17

def initialize(app, enabled, queue_url, queue_wait_time)
    self.app = app
    self.enabled = enabled
    self.queue_url = queue_url
    self.queue_wait_time = queue_wait_time
    self.poller = Aws::SQS::QueuePoller.new(queue_url)
    self.job_count = 0
    self.max_jobs = 10
    self.attempt_threshold = 6
    self.visibility_timeout = 15
    self.ignored_notifications = [
        "AmazonSnsSubscriptionSucceeded"
    ]
    
    # Set the start time 
    self.reset_time
    
    # Start the processor as working
    self.set_status "starting"

    # Log queue stats
    self.poller.before_request do |stats|
        before_request(stats)
    end
end

Instance Attribute Details

#appObject

Returns the value of attribute app.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def app
  @app
end

#attempt_thresholdObject

Returns the value of attribute attempt_threshold.



14
15
16
# File 'lib/herdst_worker/queue/processor.rb', line 14

def attempt_threshold
  @attempt_threshold
end

#enabledObject

Returns the value of attribute enabled.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def enabled
  @enabled
end

#ignored_notificationsObject

Returns the value of attribute ignored_notifications.



14
15
16
# File 'lib/herdst_worker/queue/processor.rb', line 14

def ignored_notifications
  @ignored_notifications
end

#job_countObject

Returns the value of attribute job_count.



13
14
15
# File 'lib/herdst_worker/queue/processor.rb', line 13

def job_count
  @job_count
end

#max_jobsObject

Returns the value of attribute max_jobs.



13
14
15
# File 'lib/herdst_worker/queue/processor.rb', line 13

def max_jobs
  @max_jobs
end

#pollerObject

Returns the value of attribute poller.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def poller
  @poller
end

#processor_statusObject

Returns the value of attribute processor_status.



13
14
15
# File 'lib/herdst_worker/queue/processor.rb', line 13

def processor_status
  @processor_status
end

#queue_urlObject

Returns the value of attribute queue_url.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def queue_url
  @queue_url
end

#queue_wait_timeObject

Returns the value of attribute queue_wait_time.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def queue_wait_time
  @queue_wait_time
end

#restart_timeObject

Returns the value of attribute restart_time.



12
13
14
# File 'lib/herdst_worker/queue/processor.rb', line 12

def restart_time
  @restart_time
end

#start_timeObject

Returns the value of attribute start_time.



12
13
14
# File 'lib/herdst_worker/queue/processor.rb', line 12

def start_time
  @start_time
end

#visibility_timeoutObject

Returns the value of attribute visibility_timeout.



14
15
16
# File 'lib/herdst_worker/queue/processor.rb', line 14

def visibility_timeout
  @visibility_timeout
end

Instance Method Details

#before_request(stats) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/herdst_worker/queue/processor.rb', line 105

def before_request(stats)
    if self.app.config.is_dev?
        self.app.logger.queue_stats.info "STATS (#{self.processor_status}): #{stats.inspect}"
    end
    
    # After 1 hour of running terminate application. 
    # The app will automatically restart in production
    current_time = Time.now.utc.to_i
    if (self.processor_status == "working") && (current_time >= self.restart_time)
        runtime = current_time - self.start_time
        self.app.logger.queue.info "Stopping after #{runtime} seconds of work"
        set_status "stopping"
    
    # On finishing wait for jobs to complete and then set status
    # to idle
    elsif self.processor_status == "finishing"
        if self.job_count == 0
            self.app.logger.queue.info "Setting processor status to idle"
            set_status "idle"
        end
    
    # On stopping wait for jobs to complete and then set status
    # to stopped. Once stopped the polling will terminate.
    elsif self.processor_status == "stopping"
        if self.job_count == 0
            self.app.logger.queue.info "Setting processor status to stopped"
            set_status "stopped"
        end
    
    end
    
    if self.processor_status == "stopped"
        self.app.logger.queue.info "Exiting program, Service requested to stop"
        throw :stop_polling
    end
end

#haltObject

Sets the processor status to finishing. The sqs before action will take care of setting the idle state once all jobs have finished.



73
74
75
76
# File 'lib/herdst_worker/queue/processor.rb', line 73

def halt
    return if self.processor_status === "finishing"
    set_status "finishing"
end

#process_message(msg) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/herdst_worker/queue/processor.rb', line 143

def process_message(msg)
    if self.processor_status == "working"
        # If the app is already processing the max number of jobs 
        # put the message back in the queue with a short wait time
        if self.job_count >= self.max_jobs
            self.poller.change_message_visibility_timeout(msg, self.visibility_timeout)
            throw :skip_delete
        end
        
        # Find out how many attempts there has been already for 
        # the message.
        msg_attrs = msg.message_attributes.dup
        attempt_number = msg_attrs.include?("attempts") ? msg_attrs["attempts"]["string_value"].to_i + 1 : 1
        will_fail_permanently = attempt_number > self.attempt_threshold
        
        # Run the job and increase the job count
        # Once successful the job count is decreased by one
        # and the message is deleted.
        # If an error occured the job count is decreased by 
        # one and the error is logged locally and with sentry
        self.job_count += 1
        message = JSON.parse(msg.body)
        process_message!(message, msg, will_fail_permanently).then {
            self.job_count -= 1
            
        }.rescue { |ex|
            if will_fail_permanently
                self.app.logger.queue.error "Message failed #{attempt_number} times, Reporting and failing permanently. \n#{ex.to_s} \n#{ex.backtrace.join("\n")}"
                Sentry.capture_exception(ex, {
                    :level => "fatal",
                    :extra => {
                        "queue_attempts" => attempt_number,
                        "queue_message_body" => msg.body
                    }
                })

            else
                self.app.logger.queue.error "Message failed #{attempt_number} times, Adding back to queue."
                
                if self.app.config.is_dev?
                    puts ex.inspect
                    puts ex.backtrace
                end
                
                replaced_message = {
                    :queue_url => self.poller.queue_url,
                    :message_body => msg.body,
                    :delay_seconds => self.visibility_timeout,
                    :message_attributes => msg_attrs.merge({
                        "attempts" => {
                            :string_value => attempt_number.to_s,
                            :data_type => "Number"
                        }
                    })
                }
                
                self.poller.client.send_message replaced_message
            end
            
            if self.app.config.is_dev?
                self.app.logger.queue.error "Processor Error:"
                self.app.logger.queue.error ex.message
                self.app.logger.queue.error ex.backtrace
            end
            
            self.job_count -= 1
        }.execute
    else
        self.poller.change_message_visibility_timeout(msg, self.visibility_timeout * 2)
        throw :skip_delete
    end
end

#set_status(status) ⇒ Object

Set the processor status. The status is alos logged to file so services like capastranio can see the current status



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/herdst_worker/queue/processor.rb', line 89

def set_status(status)
    statuses = ["starting", "idle", "working", "finishing", "stopping", "stopped"]
    
    if statuses.include? status
        # Set status
        self.processor_status = status
        
        # Write the current status to file for capastranio to use
        process_file = self.app.config.paths.temp + "/process_status"
        File.open(process_file, "w") { |file| file.write(status) }
    else
        raise "Invalid status (#{status})"
    end
end

#startObject

Starts or resets the application to a working status



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/herdst_worker/queue/processor.rb', line 57

def start
    if self.processor_status == "starting"
        self.set_status "working"
        self.reset_time
        self.start_poller
    else
        return if self.processor_status == "working"
        
        self.set_status "working"
        self.reset_time
    end
end

#start_pollerObject

Runs the poller



45
46
47
48
49
50
51
52
53
# File 'lib/herdst_worker/queue/processor.rb', line 45

def start_poller
    if self.enabled
        self.poller.poll(:wait_time_seconds => self.queue_wait_time, :skip_delete => false) do |msg|
            process_message(msg)
        end
    else
        raise "Cannot start a queue which is not enabled"
    end
end

#stopObject

Sets the processor status to stopping. The sqs before action will take care of stopping the application once all jobs have finished.



81
82
83
84
# File 'lib/herdst_worker/queue/processor.rb', line 81

def stop
    return if self.processor_status == "stopping"
    set_status "stopping"
end