Module: Tusk::Observable::Redis

Defined in:
lib/tusk/observable/redis.rb

Overview

An observer implementation for Redis. This module requires that your class implement a ‘connection` method that returns a redis connection that this module can use.

This observer works across processes.

Example:

require 'redis'
require 'tusk/observable/redis'

class Timer
  include Tusk::Observable::Redis

  def tick
    changed
    notify_observers
  end

  def connection
    Thread.current[:conn] ||= ::Redis.new
  end
end

class Listener
  def update
    puts "got update"
  end
end

timer = Timer.new

fork do
  timer.add_observer Listener.new
  sleep # put the process to sleep so it doesn't exit
end

loop do
  timer.tick
  sleep 1
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#control_channelObject (readonly)

Returns the value of attribute control_channel.



62
63
64
# File 'lib/tusk/observable/redis.rb', line 62

def control_channel
  @control_channel
end

#subscribersObject (readonly)

Returns the value of attribute subscribers.



62
63
64
# File 'lib/tusk/observable/redis.rb', line 62

def subscribers
  @subscribers
end

Class Method Details

.extended(klass) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/tusk/observable/redis.rb', line 50

def self.extended klass
  super

  klass.instance_eval do
    @sub_lock        = Mutex.new
    @observer_state  = false
    @subscribers     = {}
    @_listener       = nil
    @control_channel = SecureRandom.hex
  end
end

Instance Method Details

#add_observer(object, func = :update) ⇒ Object

Add observer as an observer to this object. The object will receive a notification when #changed? returns true and #notify_observers is called.

func method is called on object when notifications are sent.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/tusk/observable/redis.rb', line 114

def add_observer object, func = :update
  observer_set = Latch.new
  observing    = Latch.new

  @sub_lock.synchronize do
    observing.release if subscribers.key? channel

    subscribers.fetch(channel) { |k|
      Thread.new {
        observer_set.await
        start_listener(observing)
      }
      subscribers[k] = {}
    }[object] = func
  end

  observer_set.release
  observing.await
end

#changed(state = true) ⇒ Object

Set the changed state of this object. Notifications will be sent only if the changed state is a truthy object.



97
98
99
# File 'lib/tusk/observable/redis.rb', line 97

def changed state = true
  @observer_state = state
end

#changed?Boolean

Returns true if this object’s state has been changed since the last call to #notify_observers.

Returns:

  • (Boolean)


91
92
93
# File 'lib/tusk/observable/redis.rb', line 91

def changed?
  @observer_state
end

#count_observersObject

Returns the number of observers associated with this object *in the current process*. If the object is observed across multiple processes, the returned count will not reflect the other processes.



77
78
79
# File 'lib/tusk/observable/redis.rb', line 77

def count_observers
  @sub_lock.synchronize { subscribers.fetch(channel, {}).length }
end

#delete_observer(o) ⇒ Object

Remove observer so that it will no longer receive notifications.



135
136
137
138
139
140
141
142
143
# File 'lib/tusk/observable/redis.rb', line 135

def delete_observer o
  @sub_lock.synchronize do
    subscribers.fetch(channel, {}).delete o
    if subscribers.fetch(channel,{}).empty?
      subscribers.delete channel
      connection.publish control_channel, 'quit'
    end
  end
end

#delete_observersObject

Remove all observers associated with this object *in the current process*. This method will not impact observers of this object in other processes.



84
85
86
87
# File 'lib/tusk/observable/redis.rb', line 84

def delete_observers
  @sub_lock.synchronize { subscribers.delete channel }
  connection.publish control_channel, 'quit'
end

#initialize(*args) ⇒ Object



64
65
66
67
68
69
70
71
72
# File 'lib/tusk/observable/redis.rb', line 64

def initialize *args
  super

  @sub_lock        = Mutex.new
  @observer_state  = false
  @subscribers     = {}
  @_listener       = nil
  @control_channel = SecureRandom.hex
end

#notify_observers(*args) ⇒ Object

If this object’s #changed? state is true, this method will notify observing objects.



103
104
105
106
107
# File 'lib/tusk/observable/redis.rb', line 103

def notify_observers(*args)
  return unless changed?
  connection.publish channel, payload_coder.dump(args)
  changed false
end