Class: Powcloud::Insurance::Comm::AmqpBroadcastChannel

Inherits:
Powcloud::Insurance::Channel show all
Includes:
Logger
Defined in:
lib/powcloud/insurance/comm/amqp_broadcast_channel.rb

Constant Summary collapse

@@settings =

AMQP host to connect to

{:amqp_host => '', # AMQP host to connect to
:queue_prefix => 'insurance'}

Instance Attribute Summary

Attributes included from Logger

#logger

Instance Method Summary collapse

Methods included from Logger

#init_child_logger, #init_logger, #log_exception, outputters, outputters=

Methods inherited from Powcloud::Insurance::Channel

close_connections

Constructor Details

#initialize(name, logger = nil) ⇒ AmqpBroadcastChannel

Returns a new instance of AmqpBroadcastChannel.



19
20
21
22
23
24
25
# File 'lib/powcloud/insurance/comm/amqp_broadcast_channel.rb', line 19

def initialize(name, logger = nil)
  @name = name
  @exchange_name = "#{name.to_s}.topic"

  init_child_logger(logger, name) if logger
  init
end

Instance Method Details

#initObject



27
28
29
30
31
32
33
34
35
36
# File 'lib/powcloud/insurance/comm/amqp_broadcast_channel.rb', line 27

def init
  unless AMQP.conn
    amqp_opts = @@settings[:amqp_opts]
    host = amqp_opts && amqp_opts[:host] ? amqp_opts[:host] : '<default>'
    logger.warn "Connecting to #{host}" if logger
    AMQP.start(amqp_opts)
  end
  logger.warn "Initializing topic exchange: #{@exchange_name}" if logger
  @exchange = MQ.new.topic(@exchange_name) # NOTE: new is required for environments like rails.
end

#publish(message, opts = {}) ⇒ Object

Message - string to publish Opts - any amqp Exchange.publich() options



71
72
73
# File 'lib/powcloud/insurance/comm/amqp_broadcast_channel.rb', line 71

def publish(message, opts = {})
  @exchange.publish message.to_s, opts
end

#receive(opts = {}, &blk) ⇒ Object

opts :round_robin - each message will be consumed by only one receiver, distributed in round robin manner.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/powcloud/insurance/comm/amqp_broadcast_channel.rb', line 40

def receive(opts = {}, &blk)
  if opts[:round_robin]
    queue_name = @name # Shared.
  else
    queue_name = "#{@@settings[:queue_prefix]}.#{@name}"
    queue_name += ".#{rand(1000000)}"
  end

  logger.warn "Creating queue: #{queue_name}" if logger
  queue = MQ.new.queue(queue_name, :auto_delete => true, :exclusive => !opts[:round_robin])

  logger.info "Binding queue to exchange: #{@exchange_name}" if logger
  queue.bind(@exchange_name, :key => opts[:key])

  if blk
    if blk.arity == 2
      queue.subscribe do |header, msg|
        yield msg, header # Reverse order.
      end
    elsif blk.arity == 1
      queue.subscribe do |header, msg|
        yield msg
      end
    else
      raise ArgumentError, "Wrong number of arguments for receive block"
    end
  end
end