Module: Concurrent::Concern::Observable

Included in:
Agent, Atom, IVar, TimerTask
Defined in:
lib/concurrent-ruby/concurrent/concern/observable.rb

Overview

The [observer pattern](en.wikipedia.org/wiki/Observer_pattern) is one of the most useful design patterns.

The workflow is very simple:

  • an ‘observer` can register itself to a `subject` via a callback

  • many ‘observers` can be registered to the same `subject`

  • the ‘subject` notifies all registered observers when its status changes

  • an ‘observer` can deregister itself when is no more interested to receive

    event notifications
    

In a single threaded environment the whole pattern is very easy: the ‘subject` can use a simple data structure to manage all its subscribed `observer`s and every `observer` can react directly to every event without caring about synchronization.

In a multi threaded environment things are more complex. The ‘subject` must synchronize the access to its data structure and to do so currently we’re using two specialized ObserverSet: CopyOnWriteObserverSet and CopyOnNotifyObserverSet.

When implementing and ‘observer` there’s a very important rule to remember: **there are no guarantees about the thread that will execute the callback**

Let’s take this example “‘ class Observer

def initialize
  @count = 0
end

def update
  @count += 1
end

end

obs = Observer.new [obj1, obj2, obj3, obj4].each { |o| o.add_observer(obs) } # execute [obj1, obj2, obj3, obj4] “‘

‘obs` is wrong because the variable `@count` can be accessed by different threads at the same time, so it should be synchronized (using either a Mutex or an AtomicFixum)

Instance Method Summary collapse

Instance Method Details

#add_observer(observer = nil, func = :update, &block) ⇒ Object

Adds an observer to this set. If a block is passed, the observer will be created by this method and no other params should be passed.

Parameters:

  • observer (Object) (defaults to: nil)

    the observer to add

  • func (Symbol) (defaults to: :update)

    the function to call on the observer during notification. Default is :update

Returns:

  • (Object)

    the added observer



61
62
63
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 61

def add_observer(observer = nil, func = :update, &block)
  observers.add_observer(observer, func, &block)
end

#count_observersInteger

Return the number of observers associated with this object.

Returns:

  • (Integer)

    the observers count



101
102
103
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 101

def count_observers
  observers.count_observers
end

#delete_observer(observer) ⇒ Object

Remove ‘observer` as an observer on this object so that it will no longer receive notifications.

Parameters:

  • observer (Object)

    the observer to remove

Returns:

  • (Object)

    the deleted observer



82
83
84
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 82

def delete_observer(observer)
  observers.delete_observer(observer)
end

#delete_observersObservable

Remove all observers associated with this object.

Returns:



91
92
93
94
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 91

def delete_observers
  observers.delete_observers
  self
end

#with_observer(observer = nil, func = :update, &block) ⇒ Observable

As ‘#add_observer` but can be used for chaining.

Parameters:

  • observer (Object) (defaults to: nil)

    the observer to add

  • func (Symbol) (defaults to: :update)

    the function to call on the observer during notification.

Returns:



70
71
72
73
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 70

def with_observer(observer = nil, func = :update, &block)
  add_observer(observer, func, &block)
  self
end