Class: Plumbing::Pipe

Inherits:
Object
  • Object
show all
Includes:
Actor
Defined in:
lib/plumbing/pipe.rb

Overview

A basic pipe

Direct Known Subclasses

CustomFilter, Junction

Defined Under Namespace

Classes: CustomFilter, Filter, Junction

Instance Method Summary collapse

Methods included from Actor

#in_context?, included, #safely, #stop, timeout, transporter

Instance Method Details

#add_observer(observer = nil, &block) ⇒ #call

Add an observer to this pipe Either a ‘callable` or a `block` must be supplied. If the latter, it is converted to a [Proc]

Parameters:

  • callable (#call)

    (optional)

  • &block (Block)

    (optional)

Returns:

  • (#call)


27
28
29
30
31
# File 'lib/plumbing/pipe.rb', line 27

def add_observer(observer = nil, &block)
  observer ||= block.to_proc
  observers << observer.as(Callable).target
  observer
end

#is_observer?(observer) ⇒ Boolean

Test whether the given observer is observing this pipe

Parameters:

  • observer (#call)

Returns:

  • (Boolean)


42
43
44
# File 'lib/plumbing/pipe.rb', line 42

def is_observer? observer
  observers.include? observer
end

#notify(event_name, **data) ⇒ Object

Notify observers about an event

Parameters:

  • event_name (String)

    representing the type of event this is

  • data (Hash)

    representing the event-specific data to be passed to the observers



11
12
13
14
15
16
17
18
19
20
# File 'lib/plumbing/pipe.rb', line 11

def notify event_name, **data
  Plumbing.config.logger.debug { "-> #{self.class}##{__callee__} #{event_name}" }
  observers.each do |observer|
    Plumbing.config.logger.debug { "===> #{self.class}#call #{event_name}(#{data}) to #{observer}" }
    observer.call event_name, data
  rescue => ex
    Plumbing.config.logger.error { "!!!! #{self.class}##{__callee__} #{event_name} => #{ex}" }
    ex
  end
end

#remove_observer(observer) ⇒ Object

Remove an observer from this pipe

Parameters:

  • observer (#call)

    remove the observer from this pipe (where the observer was previously added by #add_observer)



35
36
37
# File 'lib/plumbing/pipe.rb', line 35

def remove_observer observer
  observers.delete observer
end

#shutdownObject

Close this pipe and perform any cleanup. Subclasses should override this to perform their own shutdown routines and call ‘super` to ensure everything is tidied up



48
49
50
51
# File 'lib/plumbing/pipe.rb', line 48

def shutdown
  observers.clear
  stop
end