Class: HerdstWorker::Queue::Processor
- Defined in:
- lib/herdst_worker/queue/processor.rb
Instance Attribute Summary collapse
-
#app ⇒ Object
Returns the value of attribute app.
-
#attempt_threshold ⇒ Object
Returns the value of attribute attempt_threshold.
-
#enabled ⇒ Object
Returns the value of attribute enabled.
-
#ignored_notifications ⇒ Object
Returns the value of attribute ignored_notifications.
-
#job_count ⇒ Object
Returns the value of attribute job_count.
-
#max_jobs ⇒ Object
Returns the value of attribute max_jobs.
-
#poller ⇒ Object
Returns the value of attribute poller.
-
#processor_status ⇒ Object
Returns the value of attribute processor_status.
-
#queue_url ⇒ Object
Returns the value of attribute queue_url.
-
#queue_wait_time ⇒ Object
Returns the value of attribute queue_wait_time.
-
#restart_time ⇒ Object
Returns the value of attribute restart_time.
-
#start_time ⇒ Object
Returns the value of attribute start_time.
-
#visibility_timeout ⇒ Object
Returns the value of attribute visibility_timeout.
Instance Method Summary collapse
- #before_request(stats) ⇒ Object
-
#halt ⇒ Object
Sets the processor status to finishing.
-
#initialize(app, enabled, queue_url, queue_wait_time) ⇒ Processor
constructor
A new instance of Processor.
- #process_message(msg) ⇒ Object
-
#set_status(status) ⇒ Object
Set the processor status.
-
#start ⇒ Object
Starts or resets the application to a working status.
-
#start_poller ⇒ Object
Runs the poller.
-
#stop ⇒ Object
Sets the processor status to stopping.
Methods inherited from Runner
#execute_message!, #process_message!
Constructor Details
#initialize(app, enabled, queue_url, queue_wait_time) ⇒ Processor
Returns a new instance of Processor.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/herdst_worker/queue/processor.rb', line 17 def initialize(app, enabled, queue_url, queue_wait_time) self.app = app self.enabled = enabled self.queue_url = queue_url self.queue_wait_time = queue_wait_time self.poller = Aws::SQS::QueuePoller.new(queue_url) self.job_count = 0 self.max_jobs = 10 self.attempt_threshold = 6 self.visibility_timeout = 15 self.ignored_notifications = [ "AmazonSnsSubscriptionSucceeded" ] # Set the start time self.reset_time # Start the processor as working self.set_status "starting" # Log queue stats self.poller.before_request do |stats| before_request(stats) end end |
Instance Attribute Details
#app ⇒ Object
Returns the value of attribute app.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def app @app end |
#attempt_threshold ⇒ Object
Returns the value of attribute attempt_threshold.
14 15 16 |
# File 'lib/herdst_worker/queue/processor.rb', line 14 def attempt_threshold @attempt_threshold end |
#enabled ⇒ Object
Returns the value of attribute enabled.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def enabled @enabled end |
#ignored_notifications ⇒ Object
Returns the value of attribute ignored_notifications.
14 15 16 |
# File 'lib/herdst_worker/queue/processor.rb', line 14 def ignored_notifications @ignored_notifications end |
#job_count ⇒ Object
Returns the value of attribute job_count.
13 14 15 |
# File 'lib/herdst_worker/queue/processor.rb', line 13 def job_count @job_count end |
#max_jobs ⇒ Object
Returns the value of attribute max_jobs.
13 14 15 |
# File 'lib/herdst_worker/queue/processor.rb', line 13 def max_jobs @max_jobs end |
#poller ⇒ Object
Returns the value of attribute poller.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def poller @poller end |
#processor_status ⇒ Object
Returns the value of attribute processor_status.
13 14 15 |
# File 'lib/herdst_worker/queue/processor.rb', line 13 def processor_status @processor_status end |
#queue_url ⇒ Object
Returns the value of attribute queue_url.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def queue_url @queue_url end |
#queue_wait_time ⇒ Object
Returns the value of attribute queue_wait_time.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def queue_wait_time @queue_wait_time end |
#restart_time ⇒ Object
Returns the value of attribute restart_time.
12 13 14 |
# File 'lib/herdst_worker/queue/processor.rb', line 12 def restart_time @restart_time end |
#start_time ⇒ Object
Returns the value of attribute start_time.
12 13 14 |
# File 'lib/herdst_worker/queue/processor.rb', line 12 def start_time @start_time end |
#visibility_timeout ⇒ Object
Returns the value of attribute visibility_timeout.
14 15 16 |
# File 'lib/herdst_worker/queue/processor.rb', line 14 def visibility_timeout @visibility_timeout end |
Instance Method Details
#before_request(stats) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/herdst_worker/queue/processor.rb', line 105 def before_request(stats) if self.app.config.is_dev? self.app.logger.queue_stats.info "STATS (#{self.processor_status}): #{stats.inspect}" end # After 1 hour of running terminate application. # The app will automatically restart in production current_time = Time.now.utc.to_i if (self.processor_status == "working") && (current_time >= self.restart_time) runtime = current_time - self.start_time self.app.logger.queue.info "Stopping after #{runtime} seconds of work" set_status "stopping" # On finishing wait for jobs to complete and then set status # to idle elsif self.processor_status == "finishing" if self.job_count == 0 self.app.logger.queue.info "Setting processor status to idle" set_status "idle" end # On stopping wait for jobs to complete and then set status # to stopped. Once stopped the polling will terminate. elsif self.processor_status == "stopping" if self.job_count == 0 self.app.logger.queue.info "Setting processor status to stopped" set_status "stopped" end end if self.processor_status == "stopped" self.app.logger.queue.info "Exiting program, Service requested to stop" throw :stop_polling end end |
#halt ⇒ Object
Sets the processor status to finishing. The sqs before action will take care of setting the idle state once all jobs have finished.
73 74 75 76 |
# File 'lib/herdst_worker/queue/processor.rb', line 73 def halt return if self.processor_status === "finishing" set_status "finishing" end |
#process_message(msg) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/herdst_worker/queue/processor.rb', line 143 def (msg) if self.processor_status == "working" # If the app is already processing the max number of jobs # put the message back in the queue with a short wait time if self.job_count >= self.max_jobs self.poller.(msg, self.visibility_timeout) throw :skip_delete end # Find out how many attempts there has been already for # the message. msg_attrs = msg..dup attempt_number = msg_attrs.include?("attempts") ? msg_attrs["attempts"]["string_value"].to_i + 1 : 1 will_fail_permanently = attempt_number > self.attempt_threshold # Run the job and increase the job count # Once successful the job count is decreased by one # and the message is deleted. # If an error occured the job count is decreased by # one and the error is logged locally and with sentry self.job_count += 1 = JSON.parse(msg.body) (, msg, will_fail_permanently).then { self.job_count -= 1 }.rescue { |ex| if will_fail_permanently self.app.logger.queue.error "Message failed #{attempt_number} times, Reporting and failing permanently. \n#{ex.to_s} \n#{ex.backtrace.join("\n")}" Sentry.capture_exception(ex, { :level => "fatal", :extra => { "queue_attempts" => attempt_number, "queue_message_body" => msg.body } }) else self.app.logger.queue.error "Message failed #{attempt_number} times, Adding back to queue." if self.app.config.is_dev? puts ex.inspect puts ex.backtrace end = { :queue_url => self.poller.queue_url, :message_body => msg.body, :delay_seconds => self.visibility_timeout, :message_attributes => msg_attrs.merge({ "attempts" => { :string_value => attempt_number.to_s, :data_type => "Number" } }) } self.poller.client. end if self.app.config.is_dev? self.app.logger.queue.error "Processor Error:" self.app.logger.queue.error ex. self.app.logger.queue.error ex.backtrace end self.job_count -= 1 }.execute else self.poller.(msg, self.visibility_timeout * 2) throw :skip_delete end end |
#set_status(status) ⇒ Object
Set the processor status. The status is alos logged to file so services like capastranio can see the current status
89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/herdst_worker/queue/processor.rb', line 89 def set_status(status) statuses = ["starting", "idle", "working", "finishing", "stopping", "stopped"] if statuses.include? status # Set status self.processor_status = status # Write the current status to file for capastranio to use process_file = self.app.config.paths.temp + "/process_status" File.open(process_file, "w") { |file| file.write(status) } else raise "Invalid status (#{status})" end end |
#start ⇒ Object
Starts or resets the application to a working status
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/herdst_worker/queue/processor.rb', line 57 def start if self.processor_status == "starting" self.set_status "working" self.reset_time self.start_poller else return if self.processor_status == "working" self.set_status "working" self.reset_time end end |
#start_poller ⇒ Object
Runs the poller
45 46 47 48 49 50 51 52 53 |
# File 'lib/herdst_worker/queue/processor.rb', line 45 def start_poller if self.enabled self.poller.poll(:wait_time_seconds => self.queue_wait_time, :skip_delete => false) do |msg| (msg) end else raise "Cannot start a queue which is not enabled" end end |
#stop ⇒ Object
Sets the processor status to stopping. The sqs before action will take care of stopping the application once all jobs have finished.
81 82 83 84 |
# File 'lib/herdst_worker/queue/processor.rb', line 81 def stop return if self.processor_status == "stopping" set_status "stopping" end |