Class: Ears::Consumer Abstract

Inherits:
Object
  • Object
show all
Defined in:
lib/ears/consumer.rb

Overview

This class is abstract.

Subclass and override #work to implement.

The abstract base class for consumers processing messages from queues.

Defined Under Namespace

Classes: InvalidReturnError

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.durable_exchangeObject

Returns the value of attribute durable_exchange.



128
129
130
# File 'lib/ears/consumer.rb', line 128

def durable_exchange
  @durable_exchange
end

.exchangeObject

Returns the value of attribute exchange.



128
129
130
# File 'lib/ears/consumer.rb', line 128

def exchange
  @exchange
end

.exchange_typeObject

Returns the value of attribute exchange_type.



128
129
130
# File 'lib/ears/consumer.rb', line 128

def exchange_type
  @exchange_type
end

.queueObject

Returns the value of attribute queue.



128
129
130
# File 'lib/ears/consumer.rb', line 128

def queue
  @queue
end

.queue_optionsObject

Returns the value of attribute queue_options.



128
129
130
# File 'lib/ears/consumer.rb', line 128

def queue_options
  @queue_options
end

.routing_keysObject

Returns the value of attribute routing_keys.



128
129
130
# File 'lib/ears/consumer.rb', line 128

def routing_keys
  @routing_keys
end

Class Method Details

.configure(opts = {}) ⇒ Object

Configures the consumer, setting queue, exchange and other options to be used by the add_consumer method.

Parameters:

  • opts (Hash) (defaults to: {})

    The options to configure the consumer with.

Options Hash (opts):

  • :queue (String)

    The name of the queue to consume from.

  • :exchange (String)

    The name of the exchange the queue should be bound to.

  • :routing_keys (Array)

    The routing keys used for the queue binding.

  • :durable_queue (Boolean) — default: true

    Whether the queue should be durable.

  • :retry_queue (Boolean) — default: false

    Whether a retry queue should be provided.

  • :retry_delay (Integer) — default: 5000

    The delay in milliseconds before retrying a message.

  • :error_queue (Boolean) — default: false

    Whether an error queue should be provided.

  • :durable_exchange (Boolean) — default: true

    Whether the exchange should be durable.

  • :exchange_type (Symbol) — default: :topic

    The type of exchange to use.



43
44
45
46
47
48
49
50
# File 'lib/ears/consumer.rb', line 43

def self.configure(opts = {})
  self.queue = opts.fetch(:queue)
  self.exchange = opts.fetch(:exchange)
  self.routing_keys = opts.fetch(:routing_keys)
  self.queue_options = queue_options_from(opts: opts)
  self.durable_exchange = opts.fetch(:durable_exchange, true)
  self.exchange_type = opts.fetch(:exchange_type, :topic)
end

.middlewaresArray<Ears::Middleware>

List of registered middlewares. Register new middlewares with use.

Returns:



18
19
20
# File 'lib/ears/consumer.rb', line 18

def self.middlewares
  @middlewares ||= []
end

.use(middleware, opts = {}) ⇒ Object

Registers a new middleware by instantiating +middleware+ and passing it +opts+.

Parameters:

  • middleware (Class<Ears::Middleware>)

    The middleware class to instantiate and register.

  • opts (Hash) (defaults to: {})

    The options for instantiating the middleware.



26
27
28
# File 'lib/ears/consumer.rb', line 26

def self.use(middleware, opts = {})
  middlewares << middleware.new(opts)
end

Instance Method Details

#process_delivery(delivery_info, metadata, payload) ⇒ Object

Wraps #work to add middlewares. This is being called by Ears when a message is received for the consumer.

Parameters:

  • delivery_info (Bunny::DeliveryInfo)

    The delivery info of the received message.

  • metadata (Bunny::MessageProperties)

    The metadata of the received message.

  • payload (String)

    The payload of the received message.

Raises:



70
71
72
73
74
75
76
77
78
79
# File 'lib/ears/consumer.rb', line 70

def process_delivery(delivery_info, , payload)
  self
    .class
    .middlewares
    .reverse
    .reduce(work_proc) do |next_middleware, middleware|
      nest_middleware(middleware, next_middleware)
    end
    .call(delivery_info, , payload)
end

#work(delivery_info, metadata, payload) ⇒ :ack, ...

The method that is called when a message from the queue is received. Keep in mind that the parameters received can be altered by middlewares!

Parameters:

  • delivery_info (Bunny::DeliveryInfo)

    The delivery info of the message.

  • metadata (Bunny::MessageProperties)

    The metadata of the message.

  • payload (String)

    The payload of the message.

Returns:

  • (:ack, :reject, :requeue)

    A symbol denoting what should be done with the message.

Raises:

  • (NotImplementedError)


60
61
62
# File 'lib/ears/consumer.rb', line 60

def work(delivery_info, , payload)
  raise NotImplementedError
end