Module: Tusk::Observable::PG
- Defined in:
- lib/tusk/observable/pg.rb
Overview
An observer implementation for PostgreSQL. This module requires that your class implement a ‘connection` method that returns a database connection that this module can use.
This observer works across processes.
Example:
require 'pg'
require 'tusk/observable/pg'
class Timer
include Tusk::Observable::PG
def tick
changed
notify_observers
end
def connection
Thread.current[:conn] ||= ::PG::Connection.new :dbname => 'postgres'
end
end
class Listener
def update
puts "got update"
end
end
timer = Timer.new
fork do
timer.add_observer Listener.new
sleep # put the process to sleep so it doesn't exit
end
loop do
timer.tick
sleep 1
end
Instance Attribute Summary collapse
-
#subscribers ⇒ Object
readonly
Returns the value of attribute subscribers.
Class Method Summary collapse
Instance Method Summary collapse
-
#add_observer(object, func = :update) ⇒ Object
Add
observer
as an observer to this object. -
#changed(state = true) ⇒ Object
Set the changed state of this object.
-
#changed? ⇒ Boolean
Returns true if this object’s state has been changed since the last call to #notify_observers.
-
#count_observers ⇒ Object
Returns the number of observers associated with this object *in the current process*.
-
#delete_observer(o) ⇒ Object
Remove
observer
so that it will no longer receive notifications. -
#delete_observers ⇒ Object
Remove all observers associated with this object *in the current process*.
- #initialize(*args) ⇒ Object
-
#notify_observers ⇒ Object
If this object’s #changed? state is true, this method will notify observing objects.
Instance Attribute Details
#subscribers ⇒ Object (readonly)
Returns the value of attribute subscribers.
62 63 64 |
# File 'lib/tusk/observable/pg.rb', line 62 def subscribers @subscribers end |
Class Method Details
Instance Method Details
#add_observer(object, func = :update) ⇒ Object
Add observer
as an observer to this object. The object
will receive a notification when #changed? returns true and #notify_observers is called.
func
method is called on object
when notifications are sent.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/tusk/observable/pg.rb', line 115 def add_observer object, func = :update @sub_lock.synchronize do subscribers.fetch(channel) { |k| Thread.new { start_listener unwrap(connection).exec "LISTEN #{channel}" @observing.release } subscribers[k] = {} }[object] = func end @observing.await end |
#changed(state = true) ⇒ Object
Set the changed state of this object. Notifications will be sent only if the changed state
is a truthy object.
96 97 98 |
# File 'lib/tusk/observable/pg.rb', line 96 def changed state = true @observer_state = state end |
#changed? ⇒ Boolean
Returns true if this object’s state has been changed since the last call to #notify_observers.
90 91 92 |
# File 'lib/tusk/observable/pg.rb', line 90 def changed? @observer_state end |
#count_observers ⇒ Object
Returns the number of observers associated with this object *in the current process*. If the object is observed across multiple processes, the returned count will not reflect the other processes.
77 78 79 |
# File 'lib/tusk/observable/pg.rb', line 77 def count_observers @sub_lock.synchronize { subscribers.fetch(channel, {}).length } end |
#delete_observer(o) ⇒ Object
Remove observer
so that it will no longer receive notifications.
131 132 133 134 135 |
# File 'lib/tusk/observable/pg.rb', line 131 def delete_observer o @sub_lock.synchronize do subscribers.fetch(channel, {}).delete o end end |
#delete_observers ⇒ Object
Remove all observers associated with this object *in the current process*. This method will not impact observers of this object in other processes.
84 85 86 |
# File 'lib/tusk/observable/pg.rb', line 84 def delete_observers @sub_lock.synchronize { subscribers.delete channel } end |
#initialize(*args) ⇒ Object
64 65 66 67 68 69 70 71 72 |
# File 'lib/tusk/observable/pg.rb', line 64 def initialize *args super @sub_lock = Mutex.new @observer_state = false @subscribers = {} @_listener = nil @observing = Latch.new end |
#notify_observers ⇒ Object
If this object’s #changed? state is true, this method will notify observing objects.
102 103 104 105 106 107 108 |
# File 'lib/tusk/observable/pg.rb', line 102 def notify_observers return unless changed? unwrap(connection).exec "NOTIFY #{channel}" changed false end |