22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/sqspoller/message_delegator.rb', line 22
def process(queue_controller, message, queue_name)
@semaphore.synchronize {
@pending_schedule_tasks +=1
if @pending_schedule_tasks >= @max_allowed_queue_size
@logger.info "Entered wait state, connection_pool size reached max threshold, pending_schedule_tasks=#{@pending_schedule_tasks}"
while @connection_pool.queue_length > @worker_thread_pool_size || @pending_schedule_tasks >= @max_allowed_queue_size
sleep(0.01)
end
@logger.info "Exiting wait state, connection_pool size reached below worker_thread_pool_size, pending_schedule_tasks=#{@pending_schedule_tasks}"
end
}
@logger.info "Scheduling worker task for message: #{message.message_id}"
begin
@connection_pool.post do
begin
@logger.info "Starting worker task for message: #{message.message_id}"
@worker_task.process(message.body, message.message_id)
@logger.info "Finished worker task for message: #{message.message_id}"
queue_controller.delete_message message.receipt_handle
rescue Exception => e
@logger.info "Caught error: #{e.message}, #{e.backtrace.join("\n")} for message id: #{message.message_id}, body: #{message.body}"
end
@pending_schedule_tasks -= 1
end
rescue Concurrent::RejectedExecutionError => e
@pending_schedule_tasks -= 1
@logger.info "Caught Concurrent::RejectedExecutionError for #{e.message}"
end
end
|