Module: Tusk::Observable::Redis
- Defined in:
- lib/tusk/observable/redis.rb
Overview
An observer implementation for Redis. This module requires that your class implement a ‘connection` method that returns a redis connection that this module can use.
This observer works across processes.
Example:
require 'redis'
require 'tusk/observable/redis'
class Timer
include Tusk::Observable::Redis
def tick
changed
notify_observers
end
def connection
Thread.current[:conn] ||= ::Redis.new
end
end
class Listener
def update
puts "got update"
end
end
timer = Timer.new
fork do
timer.add_observer Listener.new
sleep # put the process to sleep so it doesn't exit
end
loop do
timer.tick
sleep 1
end
Instance Attribute Summary collapse
-
#control_channel ⇒ Object
readonly
Returns the value of attribute control_channel.
-
#subscribers ⇒ Object
readonly
Returns the value of attribute subscribers.
Class Method Summary collapse
Instance Method Summary collapse
-
#add_observer(object, func = :update) ⇒ Object
Add
observer
as an observer to this object. -
#changed(state = true) ⇒ Object
Set the changed state of this object.
-
#changed? ⇒ Boolean
Returns true if this object’s state has been changed since the last call to #notify_observers.
-
#count_observers ⇒ Object
Returns the number of observers associated with this object *in the current process*.
-
#delete_observer(o) ⇒ Object
Remove
observer
so that it will no longer receive notifications. -
#delete_observers ⇒ Object
Remove all observers associated with this object *in the current process*.
- #initialize(*args) ⇒ Object
-
#notify_observers(*args) ⇒ Object
If this object’s #changed? state is true, this method will notify observing objects.
Instance Attribute Details
#control_channel ⇒ Object (readonly)
Returns the value of attribute control_channel.
62 63 64 |
# File 'lib/tusk/observable/redis.rb', line 62 def control_channel @control_channel end |
#subscribers ⇒ Object (readonly)
Returns the value of attribute subscribers.
62 63 64 |
# File 'lib/tusk/observable/redis.rb', line 62 def subscribers @subscribers end |
Class Method Details
.extended(klass) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/tusk/observable/redis.rb', line 50 def self.extended klass super klass.instance_eval do @sub_lock = Mutex.new @observer_state = false @subscribers = {} @_listener = nil @control_channel = SecureRandom.hex end end |
Instance Method Details
#add_observer(object, func = :update) ⇒ Object
Add observer
as an observer to this object. The object
will receive a notification when #changed? returns true and #notify_observers is called.
func
method is called on object
when notifications are sent.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/tusk/observable/redis.rb', line 114 def add_observer object, func = :update observer_set = Latch.new observing = Latch.new @sub_lock.synchronize do observing.release if subscribers.key? channel subscribers.fetch(channel) { |k| Thread.new { observer_set.await start_listener(observing) } subscribers[k] = {} }[object] = func end observer_set.release observing.await end |
#changed(state = true) ⇒ Object
Set the changed state of this object. Notifications will be sent only if the changed state
is a truthy object.
97 98 99 |
# File 'lib/tusk/observable/redis.rb', line 97 def changed state = true @observer_state = state end |
#changed? ⇒ Boolean
Returns true if this object’s state has been changed since the last call to #notify_observers.
91 92 93 |
# File 'lib/tusk/observable/redis.rb', line 91 def changed? @observer_state end |
#count_observers ⇒ Object
Returns the number of observers associated with this object *in the current process*. If the object is observed across multiple processes, the returned count will not reflect the other processes.
77 78 79 |
# File 'lib/tusk/observable/redis.rb', line 77 def count_observers @sub_lock.synchronize { subscribers.fetch(channel, {}).length } end |
#delete_observer(o) ⇒ Object
Remove observer
so that it will no longer receive notifications.
135 136 137 138 139 140 141 142 143 |
# File 'lib/tusk/observable/redis.rb', line 135 def delete_observer o @sub_lock.synchronize do subscribers.fetch(channel, {}).delete o if subscribers.fetch(channel,{}).empty? subscribers.delete channel connection.publish control_channel, 'quit' end end end |
#delete_observers ⇒ Object
Remove all observers associated with this object *in the current process*. This method will not impact observers of this object in other processes.
84 85 86 87 |
# File 'lib/tusk/observable/redis.rb', line 84 def delete_observers @sub_lock.synchronize { subscribers.delete channel } connection.publish control_channel, 'quit' end |
#initialize(*args) ⇒ Object
64 65 66 67 68 69 70 71 72 |
# File 'lib/tusk/observable/redis.rb', line 64 def initialize *args super @sub_lock = Mutex.new @observer_state = false @subscribers = {} @_listener = nil @control_channel = SecureRandom.hex end |
#notify_observers(*args) ⇒ Object
If this object’s #changed? state is true, this method will notify observing objects.
103 104 105 106 107 |
# File 'lib/tusk/observable/redis.rb', line 103 def notify_observers(*args) return unless changed? connection.publish channel, payload_coder.dump(args) changed false end |