Class: Volt::MessageBus::Redis

Inherits:
BaseMessageBus
  • Object
show all
Defined in:
lib/volt/message_bus/redis_message_bus.rb

Defined Under Namespace

Classes: Subscription

Instance Method Summary collapse

Constructor Details

#initialize(volt_app) ⇒ Redis

Returns a new instance of Redis.



17
18
19
# File 'lib/volt/message_bus/redis_message_bus.rb', line 17

def initialize(volt_app)
  @redis = new_connection
end

Instance Method Details

#disconnect!Object

waits for all messages to be flushed and closes connections



57
58
59
# File 'lib/volt/message_bus/redis_message_bus.rb', line 57

def disconnect!
  raise "Not implemented"
end

#new_connectionObject



45
46
47
48
49
50
51
52
53
54
# File 'lib/volt/message_bus/redis_message_bus.rb', line 45

def new_connection
  msg_bus = Volt.config.message_bus
  if msg_bus && (opts = msg_bus.connect_options)
    ::Redis.new(opts)
  elsif ENV['REDIS_URL']
    ::Redis.new(url: ENV["REDIS_URL"])
  else
    ::Redis.new
  end
end

#publish(channel_name, message) ⇒ Object

publish should push out to all subscribed within the volt cluster.



41
42
43
# File 'lib/volt/message_bus/redis_message_bus.rb', line 41

def publish(channel_name, message)
  @redis.publish(channel_name.to_sym, message)
end

#subscribe(channel_name, &block) ⇒ Object

Subscribe should return an object that you can call .remove on to stop the subscription.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/volt/message_bus/redis_message_bus.rb', line 23

def subscribe(channel_name, &block)
  sub_redis = new_connection

  Thread.new do
    # Since the Redis driver does not have a connection pool, we create a
    # new connection each time we subscribe.
    # Note: internally volt does only 1 subscription.
    sub_redis.subscribe(channel_name.to_sym) do |on|
      on.message do |channel_name, message|
        block.call(message)
      end
    end
  end

  Subscription.new(sub_redis)
end