Class: Ears::Setup

Inherits:
Object
  • Object
show all
Defined in:
lib/ears/setup.rb

Overview

Contains methods used in setup to set up your exchanges, queues and consumers.

Constant Summary collapse

QUEUE_PARAMS =
%i[retry_queue retry_delay error_queue]

Instance Method Summary collapse

Instance Method Details

#consumer(queue, consumer_class, threads = 1, args = {}) ⇒ Object

Creates and starts one or many consumers bound to the given queue.

Parameters:

  • queue (Bunny::Queue)

    The queue the consumers should be subscribed to.

  • consumer_class (Class<Ears::Consumer>)

    A class implementing Consumer that holds the consumer behavior.

  • threads (Integer) (defaults to: 1)

    The number of threads that should be used to process messages from the queue.

  • args (Hash) (defaults to: {})

    The arguments for the consumer. These are passed on to +Bunny::Consumer.new+.

Options Hash (args):

  • :prefetch (Integer) — default: 1

    The prefetch count used for this consumer.



50
51
52
53
54
55
56
57
58
59
# File 'lib/ears/setup.rb', line 50

def consumer(queue, consumer_class, threads = 1, args = {})
  threads.times do |n|
    consumer_queue = create_consumer_queue(queue, args)
    consumer = create_consumer(consumer_queue, consumer_class, args, n + 1)
    consumer.on_delivery do |delivery_info, , payload|
      consumer.process_delivery(delivery_info, , payload)
    end
    consumer_queue.subscribe_with(consumer)
  end
end

#exchange(name, type, opts = {}) ⇒ Bunny::Exchange

Creates a new exchange if it does not already exist.

Parameters:

  • name (String)

    The name of the exchange.

  • type (Symbol)

    The type of the exchange (:direct, :fanout, :topic or :headers).

  • opts (Hash) (defaults to: {})

    The options for the exchange. These are passed on to +Bunny::Exchange.new+.

Returns:

  • (Bunny::Exchange)

    The exchange that was either newly created or was already there.



16
17
18
# File 'lib/ears/setup.rb', line 16

def exchange(name, type, opts = {})
  Bunny::Exchange.new(Ears.channel, type, name, opts)
end

#queue(name, opts = {}) ⇒ Bunny::Queue

Creates a new queue if it does not already exist.

Parameters:

  • name (String)

    The name of the queue.

  • opts (Hash) (defaults to: {})

    The options for the queue. These are passed on to +Bunny::Exchange.new+.

  • args (Hash)

    a customizable set of options

Returns:

  • (Bunny::Queue)

    The queue that was either newly created or was already there.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/ears/setup.rb', line 28

def queue(name, opts = {})
  bunny_opts = opts.reject { |k, _| QUEUE_PARAMS.include?(k) }
  retry_args = retry_arguments(name, opts)
  retry_delay = opts.fetch(:retry_delay, 5000)

  create_retry_queue(name, retry_delay, bunny_opts) if opts[:retry_queue]
  create_error_queue(name, bunny_opts) if opts[:error_queue]

  Bunny::Queue.new(
    Ears.channel,
    name,
    queue_options(bunny_opts, retry_args),
  )
end

#setup_consumers(*consumer_classes) ⇒ Object

Sets up consumers, including bindings to exchanges and queues.

Parameters:

  • consumer_classes (Array<Class<Ears::Consumer>>)

    An array of subclasses of Consumer that call Consumer#configure in their class definition.



64
65
66
# File 'lib/ears/setup.rb', line 64

def setup_consumers(*consumer_classes)
  consumer_classes.each { |consumer_class| setup_consumer(consumer_class) }
end