Class: Mimi::Messaging::SQS_SNS::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/mimi/messaging/sqs_sns/consumer.rb

Overview

Message consumer for SQS queues

Constant Summary collapse

NACK_VISIBILITY_TIMEOUT =

(seconds) determines how soon the NACK-ed message becomes visible to other consumers

1

Instance Method Summary collapse

Constructor Details

#initialize(adapter, queue_url, worker_pool = nil, &block) ⇒ Consumer

Creates a new Consumer to read and process messages from the given queue.

Parameters:

  • adapter (Mimi::Messaging::SQS_SNS::Adapter)
  • queue_url (String)
  • worker_pool (Concurrent::ThreadPoolExecutor, nil) (defaults to: nil)

    an optional worker pool to be used



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/mimi/messaging/sqs_sns/consumer.rb', line 19

def initialize(adapter, queue_url, worker_pool = nil, &block)
  @stop_requested = false
  @worker_pool = worker_pool
  Mimi::Messaging.log "Starting consumer for: #{queue_url}, " \
    " worker_pool: #{worker_pool ? "yes" : "no"}"
  @consumer_thread = Thread.new do
    while not @stop_requested
      read_and_process_message(adapter, queue_url, worker_pool, block)
    end
    Mimi::Messaging.log "Stopping consumer for: #{queue_url}"
  end
end

Instance Method Details

#signal_stopObject

Requests the Consumer to stop, without actually waiting for it



34
35
36
# File 'lib/mimi/messaging/sqs_sns/consumer.rb', line 34

def signal_stop
  @stop_requested = true
end

#stopObject

Requests the Consumer to stop AND waits until it does



40
41
42
43
# File 'lib/mimi/messaging/sqs_sns/consumer.rb', line 40

def stop
  @stop_requested = true
  @consumer_thread.join
end