Class: SqsPoller::Poller::QueueController

Inherits:
Object
  • Object
show all
Defined in:
lib/sqspoller/poll/queue_controller.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues_config, task_queue, aws_config) ⇒ QueueController

Returns a new instance of QueueController.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/sqspoller/poll/queue_controller.rb', line 18

def initialize(queues_config, task_queue, aws_config)
  @queues_config = queues_config
  @aws_config = aws_config
  @task_queue = task_queue
  @logger = SqsPoller::Logger.get_new_logger(self.class.name)
  @counter = Concurrent::MutexAtomicFixnum.new(0)
  @sqs_client = Aws::SQS::Client.new(:access_key_id => @aws_config[:access_key_id], :secret_access_key => @aws_config[:secret_access_key], :region => @aws_config[:region])
  @started = true
  # hash of threadpools by queue name
  @pollers_thread_pool = {}
  # hash of QueuePollers by queue name
  @pollers = {}
  @queue_urls = {}
end

Class Method Details

.delete_message(queue, message) ⇒ Object



42
43
44
# File 'lib/sqspoller/poll/queue_controller.rb', line 42

def self.delete_message(queue, message)
  self.get.delete_message(queue, message)
end

.delete_messages(queue, messages) ⇒ Object



38
39
40
# File 'lib/sqspoller/poll/queue_controller.rb', line 38

def self.delete_messages(queue, messages)
  self.get.delete_messages(queue, messages)
end

.getObject



33
34
35
36
# File 'lib/sqspoller/poll/queue_controller.rb', line 33

def self.get
  return @instance if @instance
  raise "QueueController not yet started"
end

.start(queues_config, task_queue, aws_config) ⇒ Object



75
76
77
78
79
80
81
# File 'lib/sqspoller/poll/queue_controller.rb', line 75

def self.start (queues_config, task_queue, aws_config)
  return @instance if @instance
  @instance = new(queues_config, task_queue, aws_config)
  @instance.start_queue_controller
  @instance.start_poller_stats_reporter
  @instance
end

Instance Method Details

#delete_message(queue, message) ⇒ Object



59
60
61
62
63
64
65
# File 'lib/sqspoller/poll/queue_controller.rb', line 59

def delete_message(queue, message)
  delete_msg = {
    queue_url: get_queue_url(queue),
    receipt_handle: message.receipt_handle,
  }
  @client.delete_message(delete_msg)
end

#delete_messages(queue, messages) ⇒ Object



46
47
48
49
50
51
52
53
# File 'lib/sqspoller/poll/queue_controller.rb', line 46

def delete_messages(queue, messages)
  @sqs_client.delete_message_batch(
    queue_url: get_queue_url(queue),
    entries: messages.map { |msg|
      { id: msg.message_id, receipt_handle: msg.receipt_handle }
    }
  )
end

#get_queue_url(queue) ⇒ Object



55
56
57
# File 'lib/sqspoller/poll/queue_controller.rb', line 55

def get_queue_url(queue)
  @queue_urls[queue]
end

#pollersObject



71
72
73
# File 'lib/sqspoller/poll/queue_controller.rb', line 71

def pollers
  @pollers
end

#start_poller_stats_reporterObject



83
84
85
86
87
88
89
# File 'lib/sqspoller/poll/queue_controller.rb', line 83

def start_poller_stats_reporter
  SqsPoller::Metrics::SqsStatsReporter.new(
    {
      :queue_controller => self
    }
  )
end

#start_queue_controllerObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/sqspoller/poll/queue_controller.rb', line 91

def start_queue_controller
  @queues_config.keys.each { |queue|
    queue_config = @queues_config[queue]
    polling_threads = queue_config[:polling_threads]
    if polling_threads == 0
      @logger.info "Polling disabled for queue: #{queue}"
      next
    end
    started = false
    begin
      sqs_queue_config = @sqs_client.get_queue_url(queue_name: queue)
      started = start_pollers(polling_threads, queue, sqs_queue_config.queue_url, queue_config)
      @queue_urls[queue] = sqs_queue_config.queue_url
    rescue Exception => e
      @logger.error "Failed to start Queue Pollers. Caught error: #{e.message}\n #{e.backtrace.join("\n")}"
    end
    unless started
      @started = false
      @logger.error "Failed to start Queue Pollers for the queue #{queue}"
    end
  }
end

#started?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/sqspoller/poll/queue_controller.rb', line 67

def started?
  @started
end