Class: Sqspoller::QueueController
- Inherits:
-
Object
- Object
- Sqspoller::QueueController
- Defined in:
- lib/sqspoller/queue_controller.rb
Instance Method Summary collapse
- #delete_message(receipt_handle) ⇒ Object
-
#initialize(queue_name, polling_threads_count, wait_time_seconds, task_delegator, access_key_id, secret_access_key, region, logger_file) ⇒ QueueController
constructor
A new instance of QueueController.
- #start ⇒ Object
- #start_thread(queue_url) ⇒ Object
- #threads ⇒ Object
Constructor Details
#initialize(queue_name, polling_threads_count, wait_time_seconds, task_delegator, access_key_id, secret_access_key, region, logger_file) ⇒ QueueController
Returns a new instance of QueueController.
9 10 11 12 13 14 15 16 17 18 |
# File 'lib/sqspoller/queue_controller.rb', line 9 def initialize(queue_name, polling_threads_count, wait_time_seconds, task_delegator, access_key_id, secret_access_key, region, logger_file) @logger = Logger.new(logger_file) @queue_name = queue_name @polling_threads_count = polling_threads_count @wait_time_seconds = wait_time_seconds @sqs = Aws::SQS::Client.new(:access_key_id => access_key_id, :secret_access_key => secret_access_key, :region => region) @queue_details = @sqs.get_queue_url(queue_name: queue_name) @threads = [] @task_delegator = task_delegator end |
Instance Method Details
#delete_message(receipt_handle) ⇒ Object
57 58 59 60 61 62 |
# File 'lib/sqspoller/queue_controller.rb', line 57 def (receipt_handle) @sqs.( queue_url: @queue_details.queue_url, receipt_handle: receipt_handle ) end |
#start ⇒ Object
20 21 22 23 24 25 26 |
# File 'lib/sqspoller/queue_controller.rb', line 20 def start queue_url = @queue_details.queue_url @logger.info "Going to start polling threads for queue: #{queue_url}" @polling_threads_count.times do start_thread queue_url end end |
#start_thread(queue_url) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/sqspoller/queue_controller.rb', line 32 def start_thread(queue_url) @threads << Thread.new do @logger.info "Poller thread started for queue: #{queue_url}" poller = Aws::SQS::QueuePoller.new(queue_url) while true @logger.info "Polling queue #{@queue_name} for messages" begin msgs = @sqs. :queue_url => queue_url, :wait_time_seconds => @wait_time_seconds rescue Exception => e @logger.info "Error receiving messages from queue #{@queue_name}: #{e.}" next end msgs..each { || begin @logger.info "Received message #{@queue_name} : #{.}" @task_delegator.process self, , @queue_name rescue Exception => e @logger.info "Encountered error #{e.} while submitting message from queue #{queue_url}" end } end end end |
#threads ⇒ Object
28 29 30 |
# File 'lib/sqspoller/queue_controller.rb', line 28 def threads return @threads end |