Class: Pipeliner::Pipeline
- Inherits:
-
Object
- Object
- Pipeliner::Pipeline
- Defined in:
- lib/pipeliner/Pipeline.rb
Instance Method Summary collapse
-
#<<(object) ⇒ Object
- object
-
Object to send down the pipeline Send an object down the pipeline.
-
#clear ⇒ Object
Remove all hooks from the pipeline.
-
#close ⇒ Object
Close the pipeline Note: This is important to call at the end of a script when not providing an ActionPool thread pool to the pipeline.
-
#filters ⇒ Object
Returns current FilterManager.
-
#filters=(fm) ⇒ Object
- fm
-
FilterManager Set the FilterManager the Pipeline should use.
-
#hook(type, object = nil, method = nil, &block) ⇒ Object
- type
- Type of Objects to pass to object object
- Object to hook to pipeline method
- Method to call on object block
-
Block to apply to object (called without object and method set) or conditional Hooks an Object into the pipeline for objects of a given type.
-
#hooks ⇒ Object
Return current hooks hash.
-
#initialize(args = {}) ⇒ Pipeline
constructor
- args
- Hash containing initializations :pool =>
- ActionPool::Pool for the Pipeline to use :filters =>
-
FilterManager for the Pipeline to use Create a new Pipeline.
-
#open ⇒ Object
Open the pipeline.
-
#unhook(object, type = nil, method = nil) ⇒ Object
- object
- Object or Proc to unhook from the pipeline type
- Type of Objects being received method
-
method registered to call Remove a hook from the pipeline.
Constructor Details
#initialize(args = {}) ⇒ Pipeline
- args
-
Hash containing initializations
- :pool =>
-
ActionPool::Pool for the Pipeline to use
- :filters =>
-
FilterManager for the Pipeline to use
Create a new Pipeline
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/pipeliner/Pipeline.rb', line 13 def initialize(args={}) @pool = args[:pool] ? args[:pool] : ActionPool::Pool.new @hooks = {} @lock = Mutex.new @filters = args[:filters] ? args[:filters] : FilterManager.new if(!args[:pool]) Kernel.at_exit do close end end end |
Instance Method Details
#<<(object) ⇒ Object
- object
-
Object to send down the pipeline
Send an object down the pipeline
146 147 148 149 150 151 152 153 |
# File 'lib/pipeliner/Pipeline.rb', line 146 def <<(object) raise StandardError.new('Pipeline is currently closed') if @pool.nil? object = @filters.apply_filters(object) if(object) object.freeze @pool.process{ flush(object) } end end |
#clear ⇒ Object
Remove all hooks from the pipeline
140 141 142 |
# File 'lib/pipeliner/Pipeline.rb', line 140 def clear @lock.synchronize{ @hooks.clear } end |
#close ⇒ Object
Close the pipeline Note: This is important to call at the end of a script when not
providing an ActionPool thread pool to the pipeline.
This will ensure the thread pool is properly shutdown
and avoid the script hanging.
30 31 32 33 |
# File 'lib/pipeliner/Pipeline.rb', line 30 def close @pool.shutdown clear end |
#filters ⇒ Object
Returns current FilterManager
41 42 43 |
# File 'lib/pipeliner/Pipeline.rb', line 41 def filters @filters end |
#filters=(fm) ⇒ Object
- fm
-
FilterManager
Set the FilterManager the Pipeline should use
47 48 49 50 |
# File 'lib/pipeliner/Pipeline.rb', line 47 def filters=(fm) raise ArgumentError.new('Expecting a FilterManager') unless fm.is_a?(FilterManager) @filters = fm end |
#hook(type, object = nil, method = nil, &block) ⇒ Object
- type
-
Type of Objects to pass to object
- object
-
Object to hook to pipeline
- method
-
Method to call on object
- block
-
Block to apply to object (called without object and method set) or conditional
Hooks an Object into the pipeline for objects of a given type. The block can serve two purposes here. First, we can hook a block to a type like so:
pipeline.hook(String){|s| puts s }
Or, we can use the block as a conditional for calling an object’s method:
pipeline.hook(String, obj, :method){|s| s == 'test' }
In the second example, this hook will only be called if the String type object matches the conditional in the block, meaning the string must be ‘test’
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/pipeliner/Pipeline.rb', line 63 def hook(type, object=nil, method=nil, &block) raise ArgumentError.new 'Type must be provided' if type.nil? if(block && (block.arity > 1 || block.arity == 0)) raise ArgumentError.new('Block must accept a parameter') end if((object && method.nil?) || (object.nil? && method)) raise ArgumentError.new('Object AND method must be provided') end if(!block_given? && object.nil? && method.nil?) raise ArgumentError.new('No object information or block provided for hook') end @lock.synchronize do const = Splib.find_const(type) type = const unless const.nil? @hooks[type] ||= {} if(block_given? && object.nil? && method.nil?) @hooks[type][:procs] ||= [] @hooks[type][:procs] << block else name = object.class method = method.to_sym raise ArgumentError.new('Given object does not respond to given method') unless object.respond_to?(method) @hooks[type][name] ||= [] @hooks[type][name] << {:object => object, :method => method, :req => !block_given? ? lambda{|x|true} : block} end end block_given? ? block : nil end |
#hooks ⇒ Object
Return current hooks hash
135 136 137 |
# File 'lib/pipeliner/Pipeline.rb', line 135 def hooks @lock.synchronize{ @hooks.dup } end |
#open ⇒ Object
Open the pipeline
36 37 38 |
# File 'lib/pipeliner/Pipeline.rb', line 36 def open @pool = ActionPool::Pool.new unless @pool end |
#unhook(object, type = nil, method = nil) ⇒ Object
- object
-
Object or Proc to unhook from the pipeline
- type
-
Type of Objects being received
- method
-
method registered to call
Remove a hook from the pipeline. If the method and type are not specified, the given object will be removed from all hooks
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/pipeliner/Pipeline.rb', line 97 def unhook(object, type=nil, method=nil) raise ArgumentError.new('Object method provided for a Proc') if object.is_a?(Proc) && method @lock.synchronize do case object when Proc if(type) @hooks[type][:procs].delete(object) @hooks[type].delete(:procs) if @hooks[type][:procs].empty? else @hooks.each_value do |h| h[:procs].delete(object) h.delete(:procs) if h[:procs].empty? end end else if(method.nil? && type.nil?) @hooks.each_value{|v|v.delete_if{|k,z| k == object.class}} else raise ArgumentError.new('Type must be provided') if type.nil? const = Splib.find_const(type) type = const unless const.nil? method = method.to_sym if method name = object.class raise NameError.new('Uninitialized hook type given') unless @hooks[type] raise StandardError.new('No hooks found for given object') unless @hooks[type][name] if(method) @hooks[type][name].delete_if{|x|x[:method] == method} @hooks[type].delete(name) if @hooks[type][name].empty? else @hooks[type].delete(name) end end end @hooks.delete_if{|k,v|v.empty?} end end |