Module: Tusk::Observable::PG

Defined in:
lib/tusk/observable/pg.rb

Overview

An observer implementation for PostgreSQL. This module requires that your class implement a ‘connection` method that returns a database connection that this module can use.

This observer works across processes.

Example:

require 'pg'
require 'tusk/observable/pg'

class Timer
  include Tusk::Observable::PG

  def tick
    changed
    notify_observers
  end

  def connection
    Thread.current[:conn] ||= ::PG::Connection.new :dbname => 'postgres'
  end
end

class Listener
  def update
    puts "got update"
  end
end

timer = Timer.new

fork do
  timer.add_observer Listener.new
  sleep # put the process to sleep so it doesn't exit
end

loop do
  timer.tick
  sleep 1
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#subscribersObject (readonly)

Returns the value of attribute subscribers.



62
63
64
# File 'lib/tusk/observable/pg.rb', line 62

def subscribers
  @subscribers
end

Class Method Details

.extended(klass) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/tusk/observable/pg.rb', line 50

def self.extended klass
  super

  klass.instance_eval do
    @sub_lock       = Mutex.new
    @observer_state = false
    @subscribers    = {}
    @_listener      = nil
    @observing      = Latch.new
  end
end

Instance Method Details

#add_observer(object, func = :update) ⇒ Object

Add observer as an observer to this object. The object will receive a notification when #changed? returns true and #notify_observers is called.

func method is called on object when notifications are sent.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/tusk/observable/pg.rb', line 115

def add_observer object, func = :update
  @sub_lock.synchronize do
    subscribers.fetch(channel) { |k|
      Thread.new {
        start_listener
        unwrap(connection).exec "LISTEN #{channel}"
        @observing.release
      }
      subscribers[k] = {}
    }[object] = func
  end

  @observing.await
end

#changed(state = true) ⇒ Object

Set the changed state of this object. Notifications will be sent only if the changed state is a truthy object.



96
97
98
# File 'lib/tusk/observable/pg.rb', line 96

def changed state = true
  @observer_state = state
end

#changed?Boolean

Returns true if this object’s state has been changed since the last call to #notify_observers.

Returns:

  • (Boolean)


90
91
92
# File 'lib/tusk/observable/pg.rb', line 90

def changed?
  @observer_state
end

#count_observersObject

Returns the number of observers associated with this object *in the current process*. If the object is observed across multiple processes, the returned count will not reflect the other processes.



77
78
79
# File 'lib/tusk/observable/pg.rb', line 77

def count_observers
  @sub_lock.synchronize { subscribers.fetch(channel, {}).length }
end

#delete_observer(o) ⇒ Object

Remove observer so that it will no longer receive notifications.



131
132
133
134
135
# File 'lib/tusk/observable/pg.rb', line 131

def delete_observer o
  @sub_lock.synchronize do
    subscribers.fetch(channel, {}).delete o
  end
end

#delete_observersObject

Remove all observers associated with this object *in the current process*. This method will not impact observers of this object in other processes.



84
85
86
# File 'lib/tusk/observable/pg.rb', line 84

def delete_observers
  @sub_lock.synchronize { subscribers.delete channel }
end

#initialize(*args) ⇒ Object



64
65
66
67
68
69
70
71
72
# File 'lib/tusk/observable/pg.rb', line 64

def initialize *args
  super

  @sub_lock       = Mutex.new
  @observer_state = false
  @subscribers    = {}
  @_listener      = nil
  @observing      = Latch.new
end

#notify_observersObject

If this object’s #changed? state is true, this method will notify observing objects.



102
103
104
105
106
107
108
# File 'lib/tusk/observable/pg.rb', line 102

def notify_observers
  return unless changed?

  unwrap(connection).exec "NOTIFY #{channel}"

  changed false
end