Class: SqsPoller::Poller::QueuePoller

Inherits:
Object
  • Object
show all
Defined in:
lib/sqspoller/poll/queue_poller.rb

Constant Summary collapse

DEFAULT_SQS_WAIT_TIME_SECONDS =
20
DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES =
10

Instance Method Summary collapse

Constructor Details

#initialize(worker_name, queue_name, queue_config, task_queue, sqs_client, queue_url, counter) ⇒ QueuePoller

Returns a new instance of QueuePoller.



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/sqspoller/poll/queue_poller.rb', line 16

def initialize(worker_name, queue_name, queue_config, task_queue, sqs_client, queue_url, counter)
  @worker_name = worker_name
  @queue_name = queue_name
  @queue_config = queue_config
  @task_queue = task_queue
  @wait_time_seconds = get_wait_time_seconds(@queue_config)
  @max_number_of_messages = get_max_number_of_messages(@queue_config)
  @sqs_client = sqs_client
  @queue_url = queue_url
  @logger = SqsPoller::Logger.get_new_logger("#{self.class.name}-#{@worker_name}")
  @counter = counter
end

Instance Method Details

#get_max_number_of_messages(queue_config) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/sqspoller/poll/queue_poller.rb', line 63

def get_max_number_of_messages(queue_config)
  max_number_of_messages = queue_config[:max_number_of_messages]
  unless max_number_of_messages && max_number_of_messages >= 0 && max_number_of_messages <= DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES
    max_number_of_messages = DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES
  end
  max_number_of_messages
end

#get_poller_statsObject



29
30
31
# File 'lib/sqspoller/poll/queue_poller.rb', line 29

def get_poller_stats
  @poller_stats
end

#get_wait_time_seconds(queue_config) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/sqspoller/poll/queue_poller.rb', line 55

def get_wait_time_seconds(queue_config)
  wait_time_seconds = queue_config[:wait_time_seconds]
  unless wait_time_seconds && wait_time_seconds >= 0 && wait_time_seconds <= DEFAULT_SQS_WAIT_TIME_SECONDS
    wait_time_seconds = DEFAULT_SQS_WAIT_TIME_SECONDS
  end
  wait_time_seconds
end

#runObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/sqspoller/poll/queue_poller.rb', line 33

def run
  poller = Aws::SQS::QueuePoller.new(@queue_url, { client: @sqs_client })
  poller.before_request do |stats|
    @poller_stats = stats
  end
  poller.poll(skip_delete: true, :wait_time_seconds => @wait_time_seconds, :max_number_of_messages => @max_number_of_messages) do |messages|
    if @max_number_of_messages == 1
      messages = [messages]
    end
    messages.each { |message|
      task = {
        :message => message,
        :queue_name => @queue_name,
        :queue_time => SqsPoller::Common::Utils.get_current_time_in_millis,
        :index => @counter.increment
      }
      @task_queue.push task
    }
    @logger.info "Queued #{messages.size} messages from #{@queue_name}"
  end
end