Class: Warren::Handler::Broadcast
- Defined in:
- lib/warren/handler/broadcast.rb
Overview
Class Warren::Broadcast provides a connection pool of threadsafe RabbitMQ channels for broadcasting messages
Defined Under Namespace
Classes: Channel
Instance Method Summary collapse
-
#<<(message) ⇒ Warren::Handler::Broadcast
Borrows a RabbitMQ channel, sends a message, and immediately returns it again.
-
#connect ⇒ true
Opens a connection to the RabbitMQ server.
-
#disconnect ⇒ true
Closes the connection.
-
#initialize(exchange:, routing_key_prefix:, server: {}, pool_size: 14) ⇒ Broadcast
constructor
Creates a warren but does not connect.
- #new_channel(worker_count: 1) ⇒ Object
-
#with_channel {|A| ... } ⇒ void
Yields an Channel which gets returned to the pool on block closure.
Constructor Details
#initialize(exchange:, routing_key_prefix:, server: {}, pool_size: 14) ⇒ Broadcast
Creates a warren but does not connect.
76 77 78 79 80 81 82 |
# File 'lib/warren/handler/broadcast.rb', line 76 def initialize(exchange:, routing_key_prefix:, server: {}, pool_size: 14) super() @server = server @exchange_name = exchange @pool_size = pool_size @routing_key_prefix = routing_key_prefix end |
Instance Method Details
#<<(message) ⇒ Warren::Handler::Broadcast
Borrows a RabbitMQ channel, sends a message, and immediately returns it again. Useful if you only need to send one message.
125 126 127 128 |
# File 'lib/warren/handler/broadcast.rb', line 125 def <<() with_channel { |channel| channel << } self end |
#connect ⇒ true
Opens a connection to the RabbitMQ server. Will need to be re-initialized after forking.
89 90 91 92 |
# File 'lib/warren/handler/broadcast.rb', line 89 def connect reset_pool start_session end |
#disconnect ⇒ true
Closes the connection. Call before forking to avoid leaking connections
100 101 102 |
# File 'lib/warren/handler/broadcast.rb', line 100 def disconnect close_session end |
#new_channel(worker_count: 1) ⇒ Object
130 131 132 133 |
# File 'lib/warren/handler/broadcast.rb', line 130 def new_channel(worker_count: 1) Channel.new(session.create_channel(nil, worker_count), exchange: @exchange_name, routing_key_prefix: @routing_key_prefix) end |
#with_channel {|A| ... } ⇒ void
This method returns an undefined value.
Yields an Channel which gets returned to the pool on block closure
111 112 113 |
# File 'lib/warren/handler/broadcast.rb', line 111 def with_channel(&block) connection_pool.with(&block) end |