Module: RabbitJobs::AmqpHelpers
Instance Method Summary collapse
-
#amqp_with_exchange(&block) ⇒ Object
Calls given block with initialized amqp.
- #amqp_with_queue(routing_key, &block) ⇒ Object
- #make_queue(exchange, routing_key) ⇒ Object
Instance Method Details
#amqp_with_exchange(&block) ⇒ Object
Calls given block with initialized amqp
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/rabbit_jobs/amqp_helpers.rb', line 8 def amqp_with_exchange(&block) raise ArgumentError unless block AMQP.start(host: RabbitJobs.config.host) do |connection| channel = AMQP::Channel.new(connection) channel.on_error do |ch, channel_close| puts "Channel-level error: #{channel_close.reply_text}, shutting down..." connection.close { EM.stop } end exchange = channel.direct(RabbitJobs.config[:exchange], RabbitJobs.config[:exchange_params]) # go work block.call(connection, exchange) end end |
#amqp_with_queue(routing_key, &block) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/rabbit_jobs/amqp_helpers.rb', line 26 def amqp_with_queue(routing_key, &block) raise ArgumentError unless routing_key && block amqp_with_exchange do |connection, exchange| queue = exchange.channel.queue(RabbitJobs.config.queue_name(routing_key), RabbitJobs.config[:queues][routing_key]) queue.bind(exchange, :routing_key => routing_key) # go work block.call(connection, queue) end end |
#make_queue(exchange, routing_key) ⇒ Object
39 40 41 42 43 |
# File 'lib/rabbit_jobs/amqp_helpers.rb', line 39 def make_queue(exchange, routing_key) queue = exchange.channel.queue(RabbitJobs.config.queue_name(routing_key), RabbitJobs.config[:queues][routing_key]) queue.bind(exchange, :routing_key => routing_key) queue end |