Class: Sqspoller::QueueController
- Inherits:
-
Object
- Object
- Sqspoller::QueueController
- Defined in:
- lib/sqspoller/queue_controller.rb
Constant Summary collapse
- REGEXP =
expecting the queue name to be <environment>outbound_messages<company_id>
/(?<environment>\w+)-outbound-messages-(?<window_identifier>[\w-]+)/
Instance Attribute Summary collapse
-
#queue_name ⇒ Object
Returns the value of attribute queue_name.
-
#threads ⇒ Object
Returns the value of attribute threads.
Instance Method Summary collapse
- #all_threads_alive? ⇒ Boolean
- #block_on_maintenance_window ⇒ Object
- #delete_message(receipt_handle) ⇒ Object
-
#initialize(args) ⇒ QueueController
constructor
A new instance of QueueController.
- #start ⇒ Object
- #start_thread(queue_url) ⇒ Object
Constructor Details
#initialize(args) ⇒ QueueController
Returns a new instance of QueueController.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/sqspoller/queue_controller.rb', line 15 def initialize args @logger = args[:logger] self.queue_name = args[:queue_name] @polling_threads_count = args[:polling_threads_count] @sqs = Aws::SQS::Client.new access_key_id: args[:access_key_id], secret_access_key: args[:secret_access_key], region: args[:region] @queue_details = @sqs.get_queue_url(queue_name: queue_name) self.threads = [] @task_delegator = args[:task_delegator] match_data = REGEXP.match queue_name @maintenance_window = if match_data @cache_key = "#{match_data[:environment]}::OutboundMessages::MaintenanceWindowOpen::#{match_data[:window_identifier]}" @window_identifier = match_data[:window_identifier] true else false end end |
Instance Attribute Details
#queue_name ⇒ Object
Returns the value of attribute queue_name.
12 13 14 |
# File 'lib/sqspoller/queue_controller.rb', line 12 def queue_name @queue_name end |
#threads ⇒ Object
Returns the value of attribute threads.
12 13 14 |
# File 'lib/sqspoller/queue_controller.rb', line 12 def threads @threads end |
Instance Method Details
#all_threads_alive? ⇒ Boolean
35 36 37 |
# File 'lib/sqspoller/queue_controller.rb', line 35 def all_threads_alive? threads.all?(&:alive?) end |
#block_on_maintenance_window ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/sqspoller/queue_controller.rb', line 100 def block_on_maintenance_window if @maintenance_window loop do @logger.info " Checking if maintenance window is open" window_open = REDIS.get @cache_key if window_open @logger.info " Maintenance Window is open for #{@window_identifier}, sleeping for 5 minutes" sleep 300 else @logger.info " No maintenance window is open, continuing" break end end end end |
#delete_message(receipt_handle) ⇒ Object
95 96 97 98 |
# File 'lib/sqspoller/queue_controller.rb', line 95 def (receipt_handle) @sqs. queue_url: @queue_details.queue_url, receipt_handle: receipt_handle end |
#start ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/sqspoller/queue_controller.rb', line 39 def start queue_url = @queue_details.queue_url @logger.info "Going to start polling threads for queue: #{queue_url}" @polling_threads_count.times do self.threads << start_thread(queue_url) end end |
#start_thread(queue_url) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/sqspoller/queue_controller.rb', line 47 def start_thread(queue_url) Thread.new do begin poller = Aws::SQS::QueuePoller.new(queue_url) poller.before_request do |stats| block_on_maintenance_window @logger.info " Polling queue #{queue_name} for messages" end poller.poll do || @logger.info " Received message from #{@queue_name} : #{.}" begin @task_delegator.process self, , queue_name rescue Exception => e @logger.info " Encountered error #{e.} while submitting message to worker from queue #{queue_name}" throw :skip_delete end @logger.info " Completed processing received_message: #{}" end # loop do # block_on_maintenance_window # @logger.info " Polling queue #{queue_name} for messages" # begin # msgs = @sqs.receive_message queue_url: queue_url, wait_time_seconds: 20 # rescue Exception => e # @logger.info "Error receiving messages from queue #{@queue_name}: #{e.message}" # next # end # msgs.messages.each do |received_message| # begin # @logger.info "Received message from #{@queue_name} : #{received_message.message_id}" # @task_delegator.process self, received_message, @queue_name # rescue Exception => e # @logger.info "Encountered error #{e.message} while submitting message from queue #{queue_url}" # end # end # end rescue Aws::SQS::Errors::ServiceError => e @logger.info "AWS SQS Encountered the error: #{e.inspect}" end @logger.info "Exiting thread for queue_url #{queue_url}, should never get here" end rescue ThreadError => e @logger.info " Encountered Thread error #{e.} while starting thread for queue_url #{queue_url}" rescue => e @logger.info " Encountered error #{e.} while starting thread for queue_url #{queue_url}" end |