Class: SqsPoller::Process::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sqspoller/process/task_worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(worker_name, task_queue, task_finalizer, message_handler) ⇒ Worker

Returns a new instance of Worker.



17
18
19
20
21
22
23
# File 'lib/sqspoller/process/task_worker.rb', line 17

def initialize(worker_name, task_queue, task_finalizer, message_handler)
  @worker_name = worker_name
  @task_queue = task_queue
  @message_handler = message_handler
  @task_finalizer = task_finalizer
  @logger = SqsPoller::Logger.get_new_logger("#{self.class.name}-#{@worker_name}")
end

Instance Method Details

#runObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/sqspoller/process/task_worker.rb', line 25

def run
  loop do
    task = @task_queue.pop
    message = task[:message]
    success = false
    timer = SqsPoller::Common::Utils.start_timer
    message_type= "UNKNOWN"
    begin
      @logger.debug "Starting worker task for message: #{message.message_id}"
      message_type = @message_handler.handle(message.body, message.message_id)
      @logger.debug "Finished worker task for message: #{message.message_id}"
      success = true
    rescue Exception => e
      @logger.error "Caught error: #{e.message}, #{e.backtrace.join("\n")} for message id: #{message.message_id}, body: #{message.body}"
    end
    elapsed_time = timer.stop
    queue_wait_time = timer.start_time - task[:queue_time]
    @logger.info "Task Completed queue_name: #{task[:queue_name]}, message_id: #{message.message_id}, message_type: #{message_type}, elapsed_time: #{elapsed_time}, queue_wait_time: #{queue_wait_time} message_count: #{task[:index]}"
    SqsPoller::Metrics.record(task, success, timer, elapsed_time)
    @task_finalizer.finalize(task) if success
  end
end