Class: Powcloud::Insurance::Comm::AmqpBroadcastChannel
- Inherits:
-
Powcloud::Insurance::Channel
- Object
- Powcloud::Insurance::Channel
- Powcloud::Insurance::Comm::AmqpBroadcastChannel
- Includes:
- Logger
- Defined in:
- lib/powcloud/insurance/comm/amqp_broadcast_channel.rb
Constant Summary collapse
- @@settings =
AMQP host to connect to
{:amqp_host => '', # AMQP host to connect to :queue_prefix => 'insurance'}
Instance Attribute Summary
Attributes included from Logger
Instance Method Summary collapse
- #init ⇒ Object
-
#initialize(name, logger = nil) ⇒ AmqpBroadcastChannel
constructor
A new instance of AmqpBroadcastChannel.
-
#publish(message, opts = {}) ⇒ Object
Message - string to publish Opts - any amqp Exchange.publich() options.
-
#receive(opts = {}, &blk) ⇒ Object
opts :round_robin - each message will be consumed by only one receiver, distributed in round robin manner.
Methods included from Logger
#init_child_logger, #init_logger, #log_exception, outputters, outputters=
Methods inherited from Powcloud::Insurance::Channel
Constructor Details
#initialize(name, logger = nil) ⇒ AmqpBroadcastChannel
Returns a new instance of AmqpBroadcastChannel.
19 20 21 22 23 24 25 |
# File 'lib/powcloud/insurance/comm/amqp_broadcast_channel.rb', line 19 def initialize(name, logger = nil) @name = name @exchange_name = "#{name.to_s}.topic" init_child_logger(logger, name) if logger init end |
Instance Method Details
#init ⇒ Object
27 28 29 30 31 32 33 34 35 36 |
# File 'lib/powcloud/insurance/comm/amqp_broadcast_channel.rb', line 27 def init unless AMQP.conn amqp_opts = @@settings[:amqp_opts] host = amqp_opts && amqp_opts[:host] ? amqp_opts[:host] : '<default>' logger.warn "Connecting to #{host}" if logger AMQP.start(amqp_opts) end logger.warn "Initializing topic exchange: #{@exchange_name}" if logger @exchange = MQ.new.topic(@exchange_name) # NOTE: new is required for environments like rails. end |
#publish(message, opts = {}) ⇒ Object
Message - string to publish Opts - any amqp Exchange.publich() options
71 72 73 |
# File 'lib/powcloud/insurance/comm/amqp_broadcast_channel.rb', line 71 def publish(, opts = {}) @exchange.publish .to_s, opts end |
#receive(opts = {}, &blk) ⇒ Object
opts :round_robin - each message will be consumed by only one receiver, distributed in round robin manner.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/powcloud/insurance/comm/amqp_broadcast_channel.rb', line 40 def receive(opts = {}, &blk) if opts[:round_robin] queue_name = @name # Shared. else queue_name = "#{@@settings[:queue_prefix]}.#{@name}" queue_name += ".#{rand(1000000)}" end logger.warn "Creating queue: #{queue_name}" if logger queue = MQ.new.queue(queue_name, :auto_delete => true, :exclusive => !opts[:round_robin]) logger.info "Binding queue to exchange: #{@exchange_name}" if logger queue.bind(@exchange_name, :key => opts[:key]) if blk if blk.arity == 2 queue.subscribe do |header, msg| yield msg, header # Reverse order. end elsif blk.arity == 1 queue.subscribe do |header, msg| yield msg end else raise ArgumentError, "Wrong number of arguments for receive block" end end end |