Class: Sqspoller::QueueController

Inherits:
Object
  • Object
show all
Defined in:
lib/sqspoller/queue_controller.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, polling_threads_count, wait_time_seconds, task_delegator, access_key_id, secret_access_key, region, logger_file) ⇒ QueueController

Returns a new instance of QueueController.



9
10
11
12
13
14
15
16
17
18
# File 'lib/sqspoller/queue_controller.rb', line 9

def initialize(queue_name, polling_threads_count, wait_time_seconds, task_delegator, access_key_id, secret_access_key, region, logger_file)
  @logger = Logger.new(logger_file)
  @queue_name = queue_name
  @polling_threads_count = polling_threads_count
  @wait_time_seconds = wait_time_seconds
  @sqs = Aws::SQS::Client.new(:access_key_id => access_key_id, :secret_access_key => secret_access_key, :region => region)
  @queue_details = @sqs.get_queue_url(queue_name: queue_name)
  @threads = []
  @task_delegator = task_delegator
end

Instance Method Details

#delete_message(receipt_handle) ⇒ Object



57
58
59
60
61
62
# File 'lib/sqspoller/queue_controller.rb', line 57

def delete_message(receipt_handle)
  @sqs.delete_message(
    queue_url: @queue_details.queue_url,
    receipt_handle: receipt_handle
  )
end

#startObject



20
21
22
23
24
25
26
# File 'lib/sqspoller/queue_controller.rb', line 20

def start
  queue_url = @queue_details.queue_url
  @logger.info "Going to start polling threads for queue: #{queue_url}"
  @polling_threads_count.times do
    start_thread queue_url
  end
end

#start_thread(queue_url) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/sqspoller/queue_controller.rb', line 32

def start_thread(queue_url)
  @threads << Thread.new do
    @logger.info "Poller thread started for queue: #{queue_url}"
    poller = Aws::SQS::QueuePoller.new(queue_url)

    while true
      @logger.info "Polling queue #{@queue_name} for messages"
      begin
        msgs = @sqs.receive_message :queue_url => queue_url, :wait_time_seconds => @wait_time_seconds
      rescue Exception => e
        @logger.info "Error receiving messages from queue #{@queue_name}: #{e.message}"
        next
      end
      msgs.messages.each { |received_message|
        begin
          @logger.info "Received message #{@queue_name} : #{received_message.message_id}"
          @task_delegator.process self, received_message, @queue_name
        rescue Exception => e
          @logger.info "Encountered error #{e.message} while submitting message from queue #{queue_url}"
        end
      }
    end
  end
end

#threadsObject



28
29
30
# File 'lib/sqspoller/queue_controller.rb', line 28

def threads
  return @threads
end