Class: SqsPoller::Process::Worker
- Inherits:
-
Object
- Object
- SqsPoller::Process::Worker
- Defined in:
- lib/sqspoller/process/task_worker.rb
Instance Method Summary collapse
-
#initialize(worker_name, task_queue, task_finalizer, message_handler) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
Constructor Details
#initialize(worker_name, task_queue, task_finalizer, message_handler) ⇒ Worker
Returns a new instance of Worker.
17 18 19 20 21 22 23 |
# File 'lib/sqspoller/process/task_worker.rb', line 17 def initialize(worker_name, task_queue, task_finalizer, ) @worker_name = worker_name @task_queue = task_queue @message_handler = @task_finalizer = task_finalizer @logger = SqsPoller::Logger.get_new_logger("#{self.class.name}-#{@worker_name}") end |
Instance Method Details
#run ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/sqspoller/process/task_worker.rb', line 25 def run loop do task = @task_queue.pop = task[:message] success = false timer = SqsPoller::Common::Utils.start_timer = "UNKNOWN" begin @logger.debug "Starting worker task for message: #{.}" = @message_handler.handle(.body, .) @logger.debug "Finished worker task for message: #{.}" success = true rescue Exception => e @logger.error "Caught error: #{e.}, #{e.backtrace.join("\n")} for message id: #{.}, body: #{.body}" end elapsed_time = timer.stop queue_wait_time = timer.start_time - task[:queue_time] @logger.info "Task Completed queue_name: #{task[:queue_name]}, message_id: #{.}, message_type: #{}, elapsed_time: #{elapsed_time}, queue_wait_time: #{queue_wait_time} message_count: #{task[:index]}" SqsPoller::Metrics.record(task, success, timer, elapsed_time) @task_finalizer.finalize(task) if success end end |