Class: Pipeline
- Inherits:
-
Object
- Object
- Pipeline
- Defined in:
- lib/pipelines.rb
Instance Attribute Summary collapse
-
#dir ⇒ Object
readonly
Returns the value of attribute dir.
-
#ended_at ⇒ Object
readonly
Returns the value of attribute ended_at.
-
#exception ⇒ Object
readonly
Returns the value of attribute exception.
-
#input ⇒ Object
readonly
Returns the value of attribute input.
-
#invocations ⇒ Object
readonly
Returns the value of attribute invocations.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#output ⇒ Object
readonly
Returns the value of attribute output.
-
#started_at ⇒ Object
readonly
Returns the value of attribute started_at.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
-
#thread_lock ⇒ Object
readonly
Returns the value of attribute thread_lock.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
-
#topdir ⇒ Object
readonly
Returns the value of attribute topdir.
Instance Method Summary collapse
- #description(klass) ⇒ Object
-
#initialize(*args) ⇒ Pipeline
constructor
A new instance of Pipeline.
- #invoke(klass, *args) ⇒ Object
- #parallel(args = nil, &block) ⇒ Object
- #puts(string = '') ⇒ Object
- #serial(args = nil, &block) ⇒ Object
- #unlocked_puts(string = '') ⇒ Object
Constructor Details
#initialize(*args) ⇒ Pipeline
Returns a new instance of Pipeline.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/pipelines.rb', line 18 def initialize(*args) = args.last.is_a?(Hash) ? args.pop : {} dir = args.first @topdir = @parent ? @parent.topdir : dir @dir = dir @type = .delete(:type) || :serial @parent = .delete(:parent) # This is nil only for the top level pipeline. @serial_count = 0 @parallel_count = 0 @thread_lock = @parent ? @parent.thread_lock : Monitor.new @stats = @parent ? @parent.stats : {} @name = underscore self.class.name.split('::')[-1] @order = .delete(:order) @input = .delete(:input) @output = serial? ? nil : [] @threads = [] Thread.abort_on_exception = true @options = @step = nil @ended_at = nil @started_at = [:started_at] || Time.now @invocations = 0 unless @dir.nil? Dir.mkdir(@dir) unless Dir.exists?(@dir) end if @parent.nil? # This is the top level pipeline. class << self alias_method :unlocked_run, :run def run(*args, &block) lock output = unlocked_run(*args, &block) unlock notify output end end end end |
Instance Attribute Details
#dir ⇒ Object (readonly)
Returns the value of attribute dir.
5 6 7 |
# File 'lib/pipelines.rb', line 5 def dir @dir end |
#ended_at ⇒ Object (readonly)
Returns the value of attribute ended_at.
6 7 8 |
# File 'lib/pipelines.rb', line 6 def ended_at @ended_at end |
#exception ⇒ Object (readonly)
Returns the value of attribute exception.
7 8 9 |
# File 'lib/pipelines.rb', line 7 def exception @exception end |
#input ⇒ Object (readonly)
Returns the value of attribute input.
10 11 12 |
# File 'lib/pipelines.rb', line 10 def input @input end |
#invocations ⇒ Object (readonly)
Returns the value of attribute invocations.
8 9 10 |
# File 'lib/pipelines.rb', line 8 def invocations @invocations end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/pipelines.rb', line 9 def name @name end |
#output ⇒ Object (readonly)
Returns the value of attribute output.
11 12 13 |
# File 'lib/pipelines.rb', line 11 def output @output end |
#started_at ⇒ Object (readonly)
Returns the value of attribute started_at.
12 13 14 |
# File 'lib/pipelines.rb', line 12 def started_at @started_at end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
13 14 15 |
# File 'lib/pipelines.rb', line 13 def stats @stats end |
#thread_lock ⇒ Object (readonly)
Returns the value of attribute thread_lock.
14 15 16 |
# File 'lib/pipelines.rb', line 14 def thread_lock @thread_lock end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
15 16 17 |
# File 'lib/pipelines.rb', line 15 def threads @threads end |
#topdir ⇒ Object (readonly)
Returns the value of attribute topdir.
16 17 18 |
# File 'lib/pipelines.rb', line 16 def topdir @topdir end |
Instance Method Details
#description(klass) ⇒ Object
114 115 116 117 118 119 120 |
# File 'lib/pipelines.rb', line 114 def description(klass) if @order.nil? "#{@invocations}-#{underscore(klass.name.split('::')[-1])}" else "#{@order}-#{@invocations}-#{underscore(klass.name.split('::')[-1])}" end end |
#invoke(klass, *args) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/pipelines.rb', line 79 def invoke(klass, *args) @invocations += 1 dir = File.join(@dir, "#{description(klass)}") unless @dir.nil? segment = klass.new dir, :order => @order, :parent => self output_file = segment_cache segment if output_file && File.exists?(output_file) segment.puts "\033[0;35mSkipping\033[0m" output = YAML.load(File.read(output_file))[:data] if serial? @output = output @input = output elsif parallel? @thread_lock.synchronize do @output << output end end else segment.puts "\033[0;32mRunning\033[0m" if serial? dispatch(segment, output_file, *args) @input = @output elsif parallel? thread = Thread.new do dispatch(segment, output_file, *args) end @threads << thread end end end |
#parallel(args = nil, &block) ⇒ Object
74 75 76 |
# File 'lib/pipelines.rb', line 74 def parallel(args=nil, &block) pipeline(:parallel, args, &block) end |
#puts(string = '') ⇒ Object
123 124 125 126 127 128 129 |
# File 'lib/pipelines.rb', line 123 def puts(string='') @thread_lock.synchronize do unlocked_puts(string) end nil # Behave like Kernel.puts end |
#serial(args = nil, &block) ⇒ Object
69 70 71 |
# File 'lib/pipelines.rb', line 69 def serial(args=nil, &block) pipeline(:serial, args, &block) end |
#unlocked_puts(string = '') ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/pipelines.rb', line 132 def unlocked_puts(string='') if self.class == Pipeline || @parent.nil? # If @parent is missing, it is in the top level block. if @order Kernel.puts "\033[32m[#{@order}][#{@invocations}]\033[0m #{string}" else Kernel.puts "\033[33m[#{@invocations}]\033[0m #{string}" end else if @order Kernel.puts "\033[32m[#{@order}][#{@name}][#{@parent.invocations}]\033[0m #{string}" else Kernel.puts "\033[32m[#{@name}][#{@parent.invocations}]\033[0m #{string}" end end STDOUT.flush end |