Class: SqsPoller::Poller::QueuePoller
- Inherits:
-
Object
- Object
- SqsPoller::Poller::QueuePoller
- Defined in:
- lib/sqspoller/poll/queue_poller.rb
Constant Summary collapse
- DEFAULT_SQS_WAIT_TIME_SECONDS =
20
- DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES =
10
Instance Method Summary collapse
- #get_max_number_of_messages(queue_config) ⇒ Object
- #get_poller_stats ⇒ Object
- #get_wait_time_seconds(queue_config) ⇒ Object
-
#initialize(worker_name, queue_name, queue_config, task_queue, sqs_client, queue_url, counter) ⇒ QueuePoller
constructor
A new instance of QueuePoller.
- #run ⇒ Object
Constructor Details
#initialize(worker_name, queue_name, queue_config, task_queue, sqs_client, queue_url, counter) ⇒ QueuePoller
Returns a new instance of QueuePoller.
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/sqspoller/poll/queue_poller.rb', line 16 def initialize(worker_name, queue_name, queue_config, task_queue, sqs_client, queue_url, counter) @worker_name = worker_name @queue_name = queue_name @queue_config = queue_config @task_queue = task_queue @wait_time_seconds = get_wait_time_seconds(@queue_config) @max_number_of_messages = (@queue_config) @sqs_client = sqs_client @queue_url = queue_url @logger = SqsPoller::Logger.get_new_logger("#{self.class.name}-#{@worker_name}") @counter = counter end |
Instance Method Details
#get_max_number_of_messages(queue_config) ⇒ Object
63 64 65 66 67 68 69 |
# File 'lib/sqspoller/poll/queue_poller.rb', line 63 def (queue_config) = queue_config[:max_number_of_messages] unless && >= 0 && <= DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES = DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES end end |
#get_poller_stats ⇒ Object
29 30 31 |
# File 'lib/sqspoller/poll/queue_poller.rb', line 29 def get_poller_stats @poller_stats end |
#get_wait_time_seconds(queue_config) ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/sqspoller/poll/queue_poller.rb', line 55 def get_wait_time_seconds(queue_config) wait_time_seconds = queue_config[:wait_time_seconds] unless wait_time_seconds && wait_time_seconds >= 0 && wait_time_seconds <= DEFAULT_SQS_WAIT_TIME_SECONDS wait_time_seconds = DEFAULT_SQS_WAIT_TIME_SECONDS end wait_time_seconds end |
#run ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/sqspoller/poll/queue_poller.rb', line 33 def run poller = Aws::SQS::QueuePoller.new(@queue_url, { client: @sqs_client }) poller.before_request do |stats| @poller_stats = stats end poller.poll(skip_delete: true, :wait_time_seconds => @wait_time_seconds, :max_number_of_messages => @max_number_of_messages) do || if @max_number_of_messages == 1 = [] end .each { || task = { :message => , :queue_name => @queue_name, :queue_time => SqsPoller::Common::Utils.get_current_time_in_millis, :index => @counter.increment } @task_queue.push task } @logger.info "Queued #{.size} messages from #{@queue_name}" end end |