Class: Warren::Handler::Broadcast

Inherits:
Base
  • Object
show all
Defined in:
lib/warren/handler/broadcast.rb

Overview

Class Warren::Broadcast provides a connection pool of threadsafe RabbitMQ channels for broadcasting messages

Defined Under Namespace

Classes: Channel

Instance Method Summary collapse

Constructor Details

#initialize(exchange:, routing_key_prefix:, server: {}, pool_size: 14) ⇒ Broadcast

Creates a warren but does not connect.

Parameters:

  • server (Hash) (defaults to: {})

    Server config options passes straight to Bunny

  • exchange (String)

    The name of the exchange to connect to

  • pool_size (Integer) (defaults to: 14)

    The connection pool size

  • routing_key_prefix (String, nil)

    The prefix to pass before the routing key. Can be used to ensure environments remain distinct.



76
77
78
79
80
81
82
# File 'lib/warren/handler/broadcast.rb', line 76

def initialize(exchange:, routing_key_prefix:, server: {}, pool_size: 14)
  super()
  @server = server
  @exchange_name = exchange
  @pool_size = pool_size
  @routing_key_prefix = routing_key_prefix
end

Instance Method Details

#<<(message) ⇒ Warren::Handler::Broadcast

Borrows a RabbitMQ channel, sends a message, and immediately returns it again. Useful if you only need to send one message.

Parameters:

  • message (Warren::Message)

    The message to broadcast. Must respond to #routing_key and #payload

Returns:

  • (Warren::Handler::Broadcast)

    Returns itself to allow chaining. But you’re probably better off using #with_channel in that case



125
126
127
128
# File 'lib/warren/handler/broadcast.rb', line 125

def <<(message)
  with_channel { |channel| channel << message }
  self
end

#connecttrue

Opens a connection to the RabbitMQ server. Will need to be re-initialized after forking.

Returns:

  • (true)

    We’ve connected!



89
90
91
92
# File 'lib/warren/handler/broadcast.rb', line 89

def connect
  reset_pool
  start_session
end

#disconnecttrue

Closes the connection. Call before forking to avoid leaking connections

Returns:

  • (true)

    We’ve disconnected



100
101
102
# File 'lib/warren/handler/broadcast.rb', line 100

def disconnect
  close_session
end

#new_channel(worker_count: 1) ⇒ Object



130
131
132
133
# File 'lib/warren/handler/broadcast.rb', line 130

def new_channel(worker_count: 1)
  Channel.new(session.create_channel(nil, worker_count), exchange: @exchange_name,
                                                         routing_key_prefix: @routing_key_prefix)
end

#with_channel {|A| ... } ⇒ void

This method returns an undefined value.

Yields an Channel which gets returned to the pool on block closure

Yield Parameters:



111
112
113
# File 'lib/warren/handler/broadcast.rb', line 111

def with_channel(&block)
  connection_pool.with(&block)
end