Class: Volt::MessageBus::Redis
- Inherits:
-
BaseMessageBus
- Object
- BaseMessageBus
- Volt::MessageBus::Redis
- Defined in:
- lib/volt/message_bus/redis_message_bus.rb
Defined Under Namespace
Classes: Subscription
Instance Method Summary collapse
-
#disconnect! ⇒ Object
waits for all messages to be flushed and closes connections.
-
#initialize(volt_app) ⇒ Redis
constructor
A new instance of Redis.
- #new_connection ⇒ Object
-
#publish(channel_name, message) ⇒ Object
publish should push out to all subscribed within the volt cluster.
-
#subscribe(channel_name, &block) ⇒ Object
Subscribe should return an object that you can call .remove on to stop the subscription.
Constructor Details
#initialize(volt_app) ⇒ Redis
Returns a new instance of Redis.
17 18 19 |
# File 'lib/volt/message_bus/redis_message_bus.rb', line 17 def initialize(volt_app) @redis = new_connection end |
Instance Method Details
#disconnect! ⇒ Object
waits for all messages to be flushed and closes connections
57 58 59 |
# File 'lib/volt/message_bus/redis_message_bus.rb', line 57 def disconnect! raise "Not implemented" end |
#new_connection ⇒ Object
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/volt/message_bus/redis_message_bus.rb', line 45 def new_connection msg_bus = Volt.config. if msg_bus && (opts = msg_bus.) ::Redis.new(opts) elsif ENV['REDIS_URL'] ::Redis.new(url: ENV["REDIS_URL"]) else ::Redis.new end end |
#publish(channel_name, message) ⇒ Object
publish should push out to all subscribed within the volt cluster.
41 42 43 |
# File 'lib/volt/message_bus/redis_message_bus.rb', line 41 def publish(channel_name, ) @redis.publish(channel_name.to_sym, ) end |
#subscribe(channel_name, &block) ⇒ Object
Subscribe should return an object that you can call .remove on to stop the subscription.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/volt/message_bus/redis_message_bus.rb', line 23 def subscribe(channel_name, &block) sub_redis = new_connection Thread.new do # Since the Redis driver does not have a connection pool, we create a # new connection each time we subscribe. # Note: internally volt does only 1 subscription. sub_redis.subscribe(channel_name.to_sym) do |on| on. do |channel_name, | block.call() end end end Subscription.new(sub_redis) end |