Module: RabbitJobs::AmqpHelpers

Included in:
Publisher, Worker
Defined in:
lib/rabbit_jobs/amqp_helpers.rb

Instance Method Summary collapse

Instance Method Details

#amqp_with_exchange(&block) ⇒ Object

Calls given block with initialized amqp

Raises:

  • (ArgumentError)


8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/rabbit_jobs/amqp_helpers.rb', line 8

def amqp_with_exchange(&block)
  raise ArgumentError unless block

  AMQP.start(host: RabbitJobs.config.host) do |connection|
    channel  = AMQP::Channel.new(connection)

    channel.on_error do |ch, channel_close|
      puts "Channel-level error: #{channel_close.reply_text}, shutting down..."
      connection.close { EM.stop }
    end

    exchange = channel.direct(RabbitJobs.config[:exchange], RabbitJobs.config[:exchange_params])

    # go work
    block.call(connection, exchange)
  end
end

#amqp_with_queue(routing_key, &block) ⇒ Object

Raises:

  • (ArgumentError)


26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/rabbit_jobs/amqp_helpers.rb', line 26

def amqp_with_queue(routing_key, &block)

  raise ArgumentError unless routing_key && block

  amqp_with_exchange do |connection, exchange|
    queue = exchange.channel.queue(RabbitJobs.config.queue_name(routing_key), RabbitJobs.config[:queues][routing_key])
    queue.bind(exchange, :routing_key => routing_key)

    # go work
    block.call(connection, queue)
  end
end

#make_queue(exchange, routing_key) ⇒ Object



39
40
41
42
43
# File 'lib/rabbit_jobs/amqp_helpers.rb', line 39

def make_queue(exchange, routing_key)
  queue = exchange.channel.queue(RabbitJobs.config.queue_name(routing_key), RabbitJobs.config[:queues][routing_key])
  queue.bind(exchange, :routing_key => routing_key)
  queue
end