Class: Ears::Consumer Abstract
- Inherits:
-
Object
- Object
- Ears::Consumer
- Defined in:
- lib/ears/consumer.rb
Overview
Subclass and override #work to implement.
The abstract base class for consumers processing messages from queues.
Defined Under Namespace
Classes: InvalidReturnError
Class Attribute Summary collapse
-
.durable_exchange ⇒ Object
readonly
Returns the value of attribute durable_exchange.
-
.exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
.exchange_type ⇒ Object
readonly
Returns the value of attribute exchange_type.
-
.queue ⇒ Object
readonly
Returns the value of attribute queue.
-
.queue_options ⇒ Object
readonly
Returns the value of attribute queue_options.
-
.routing_keys ⇒ Object
readonly
Returns the value of attribute routing_keys.
Class Method Summary collapse
-
.configure(opts = {}) ⇒ Object
Configures the consumer, setting queue, exchange and other options to be used by the add_consumer method.
-
.middlewares ⇒ Array<Ears::Middleware>
List of registered middlewares.
-
.use(middleware, opts = {}) ⇒ Object
Registers a new middleware by instantiating +middleware+ and passing it +opts+.
Instance Method Summary collapse
-
#process_delivery(delivery_info, metadata, payload) ⇒ Object
Wraps #work to add middlewares.
-
#work(delivery_info, metadata, payload) ⇒ :ack, ...
The method that is called when a message from the queue is received.
Class Attribute Details
.durable_exchange ⇒ Object
Returns the value of attribute durable_exchange.
128 129 130 |
# File 'lib/ears/consumer.rb', line 128 def durable_exchange @durable_exchange end |
.exchange ⇒ Object
Returns the value of attribute exchange.
128 129 130 |
# File 'lib/ears/consumer.rb', line 128 def exchange @exchange end |
.exchange_type ⇒ Object
Returns the value of attribute exchange_type.
128 129 130 |
# File 'lib/ears/consumer.rb', line 128 def exchange_type @exchange_type end |
.queue ⇒ Object
Returns the value of attribute queue.
128 129 130 |
# File 'lib/ears/consumer.rb', line 128 def queue @queue end |
.queue_options ⇒ Object
Returns the value of attribute queue_options.
128 129 130 |
# File 'lib/ears/consumer.rb', line 128 def @queue_options end |
.routing_keys ⇒ Object
Returns the value of attribute routing_keys.
128 129 130 |
# File 'lib/ears/consumer.rb', line 128 def routing_keys @routing_keys end |
Class Method Details
.configure(opts = {}) ⇒ Object
Configures the consumer, setting queue, exchange and other options to be used by the add_consumer method.
43 44 45 46 47 48 49 50 |
# File 'lib/ears/consumer.rb', line 43 def self.configure(opts = {}) self.queue = opts.fetch(:queue) self.exchange = opts.fetch(:exchange) self.routing_keys = opts.fetch(:routing_keys) self. = (opts: opts) self.durable_exchange = opts.fetch(:durable_exchange, true) self.exchange_type = opts.fetch(:exchange_type, :topic) end |
.middlewares ⇒ Array<Ears::Middleware>
List of registered middlewares. Register new middlewares with use.
18 19 20 |
# File 'lib/ears/consumer.rb', line 18 def self.middlewares @middlewares ||= [] end |
.use(middleware, opts = {}) ⇒ Object
Registers a new middleware by instantiating +middleware+ and passing it +opts+.
26 27 28 |
# File 'lib/ears/consumer.rb', line 26 def self.use(middleware, opts = {}) middlewares << middleware.new(opts) end |
Instance Method Details
#process_delivery(delivery_info, metadata, payload) ⇒ Object
Wraps #work to add middlewares. This is being called by Ears when a message is received for the consumer.
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ears/consumer.rb', line 70 def process_delivery(delivery_info, , payload) self .class .middlewares .reverse .reduce(work_proc) do |next_middleware, middleware| nest_middleware(middleware, next_middleware) end .call(delivery_info, , payload) end |
#work(delivery_info, metadata, payload) ⇒ :ack, ...
The method that is called when a message from the queue is received. Keep in mind that the parameters received can be altered by middlewares!
60 61 62 |
# File 'lib/ears/consumer.rb', line 60 def work(delivery_info, , payload) raise NotImplementedError end |