Class: Sqspoller::QueueController

Inherits:
Object
  • Object
show all
Defined in:
lib/sqspoller/queue_controller.rb

Constant Summary collapse

REGEXP =

expecting the queue name to be <environment>outbound_messages<company_id>

/(?<environment>\w+)-outbound-messages-(?<window_identifier>[\w-]+)/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ QueueController

Returns a new instance of QueueController.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/sqspoller/queue_controller.rb', line 15

def initialize args
  @logger = args[:logger]
  self.queue_name = args[:queue_name]
  @polling_threads_count = args[:polling_threads_count]
  @sqs = Aws::SQS::Client.new access_key_id: args[:access_key_id],
                              secret_access_key: args[:secret_access_key],
                              region: args[:region]
  @queue_details = @sqs.get_queue_url(queue_name: queue_name)
  self.threads = []
  @task_delegator = args[:task_delegator]
  match_data = REGEXP.match queue_name
  @maintenance_window = if match_data
                          @cache_key = "#{match_data[:environment]}::OutboundMessages::MaintenanceWindowOpen::#{match_data[:window_identifier]}"
                          @window_identifier = match_data[:window_identifier]
                          true
                        else
                          false
                        end
end

Instance Attribute Details

#queue_nameObject

Returns the value of attribute queue_name.



12
13
14
# File 'lib/sqspoller/queue_controller.rb', line 12

def queue_name
  @queue_name
end

#threadsObject

Returns the value of attribute threads.



12
13
14
# File 'lib/sqspoller/queue_controller.rb', line 12

def threads
  @threads
end

Instance Method Details

#all_threads_alive?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/sqspoller/queue_controller.rb', line 35

def all_threads_alive?
  threads.all?(&:alive?)
end

#block_on_maintenance_windowObject



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/sqspoller/queue_controller.rb', line 100

def block_on_maintenance_window
  if @maintenance_window
    loop do
      @logger.info "    Checking if maintenance window is open"
      window_open = REDIS.get @cache_key
      if window_open
        @logger.info "      Maintenance Window is open for #{@window_identifier}, sleeping for 5 minutes"
        sleep 300
      else
        @logger.info "      No maintenance window is open, continuing"
        break
      end
    end
  end
end

#delete_message(receipt_handle) ⇒ Object



95
96
97
98
# File 'lib/sqspoller/queue_controller.rb', line 95

def delete_message(receipt_handle)
  @sqs.delete_message queue_url: @queue_details.queue_url,
                      receipt_handle: receipt_handle
end

#startObject



39
40
41
42
43
44
45
# File 'lib/sqspoller/queue_controller.rb', line 39

def start
  queue_url = @queue_details.queue_url
  @logger.info "Going to start polling threads for queue: #{queue_url}"
  @polling_threads_count.times do
    self.threads << start_thread(queue_url)
  end
end

#start_thread(queue_url) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/sqspoller/queue_controller.rb', line 47

def start_thread(queue_url)
  Thread.new do
    begin
      poller = Aws::SQS::QueuePoller.new(queue_url)
      poller.before_request do |stats|
        block_on_maintenance_window
        @logger.info "  Polling queue #{queue_name} for messages"
      end

      poller.poll do |received_message|
        @logger.info "    Received message from #{@queue_name} : #{received_message.message_id}"
        begin
          @task_delegator.process self, received_message, queue_name
        rescue Exception => e
          @logger.info "      Encountered error #{e.message} while submitting message to worker from queue #{queue_name}"
          throw :skip_delete
        end
        @logger.info "    Completed processing received_message: #{received_message}"
      end
      # loop do
      #   block_on_maintenance_window
      #   @logger.info "  Polling queue #{queue_name} for messages"
      #   begin
      #     msgs = @sqs.receive_message queue_url: queue_url, wait_time_seconds: 20
      #   rescue Exception => e
      #     @logger.info "Error receiving messages from queue #{@queue_name}: #{e.message}"
      #     next
      #   end
      #   msgs.messages.each do |received_message|
      #     begin
      #       @logger.info "Received message from #{@queue_name} : #{received_message.message_id}"
      #       @task_delegator.process self, received_message, @queue_name
      #     rescue Exception => e
      #       @logger.info "Encountered error #{e.message} while submitting message from queue #{queue_url}"
      #     end
      #   end
      # end
    rescue Aws::SQS::Errors::ServiceError => e
      @logger.info "AWS SQS Encountered the error: #{e.inspect}"
    end
    @logger.info "Exiting thread for queue_url #{queue_url}, should never get here"
  end
rescue ThreadError => e
  @logger.info "  Encountered Thread error #{e.message} while starting thread for queue_url #{queue_url}"
rescue => e
  @logger.info "  Encountered error #{e.message} while starting thread for queue_url #{queue_url}"
end