Class: Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/pipelines.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Pipeline

Returns a new instance of Pipeline.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/pipelines.rb', line 18

def initialize(*args)
  options = args.last.is_a?(Hash) ? args.pop : {}
  dir = args.first

  @topdir = @parent ? @parent.topdir : dir
  @dir = dir
  @type = options.delete(:type) || :serial
  @parent = options.delete(:parent) # This is nil only for the top level pipeline.

  @serial_count = 0
  @parallel_count = 0

  @thread_lock = @parent ? @parent.thread_lock : Monitor.new
  @stats = @parent ? @parent.stats : {}

  @name = underscore self.class.name.split('::')[-1]
  @order = options.delete(:order)

  @input = options.delete(:input)
  @output = serial? ? nil : []

  @threads = []
  Thread.abort_on_exception = true

  @options = options
  @step = nil
  @ended_at = nil
  @started_at = options[:started_at] || Time.now
  @invocations = 0

  unless @dir.nil?
    Dir.mkdir(@dir) unless Dir.exists?(@dir)
  end

  if @parent.nil? # This is the top level pipeline.
    class << self
      alias_method :unlocked_run, :run

      def run(*args, &block)
        lock
        output = unlocked_run(*args, &block)
        unlock
        notify

 output
      end
    end
  end
end

Instance Attribute Details

#dirObject (readonly)

Returns the value of attribute dir.



5
6
7
# File 'lib/pipelines.rb', line 5

def dir
  @dir
end

#ended_atObject (readonly)

Returns the value of attribute ended_at.



6
7
8
# File 'lib/pipelines.rb', line 6

def ended_at
  @ended_at
end

#exceptionObject (readonly)

Returns the value of attribute exception.



7
8
9
# File 'lib/pipelines.rb', line 7

def exception
  @exception
end

#inputObject (readonly)

Returns the value of attribute input.



10
11
12
# File 'lib/pipelines.rb', line 10

def input
  @input
end

#invocationsObject (readonly)

Returns the value of attribute invocations.



8
9
10
# File 'lib/pipelines.rb', line 8

def invocations
  @invocations
end

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/pipelines.rb', line 9

def name
  @name
end

#outputObject (readonly)

Returns the value of attribute output.



11
12
13
# File 'lib/pipelines.rb', line 11

def output
  @output
end

#started_atObject (readonly)

Returns the value of attribute started_at.



12
13
14
# File 'lib/pipelines.rb', line 12

def started_at
  @started_at
end

#statsObject (readonly)

Returns the value of attribute stats.



13
14
15
# File 'lib/pipelines.rb', line 13

def stats
  @stats
end

#thread_lockObject (readonly)

Returns the value of attribute thread_lock.



14
15
16
# File 'lib/pipelines.rb', line 14

def thread_lock
  @thread_lock
end

#threadsObject (readonly)

Returns the value of attribute threads.



15
16
17
# File 'lib/pipelines.rb', line 15

def threads
  @threads
end

#topdirObject (readonly)

Returns the value of attribute topdir.



16
17
18
# File 'lib/pipelines.rb', line 16

def topdir
  @topdir
end

Instance Method Details

#description(klass) ⇒ Object



114
115
116
117
118
119
120
# File 'lib/pipelines.rb', line 114

def description(klass)
  if @order.nil?
    "#{@invocations}-#{underscore(klass.name.split('::')[-1])}"
  else
    "#{@order}-#{@invocations}-#{underscore(klass.name.split('::')[-1])}"
  end
end

#invoke(klass, *args) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/pipelines.rb', line 79

def invoke(klass, *args)
  @invocations += 1
 
  dir = File.join(@dir, "#{description(klass)}") unless @dir.nil?
  segment = klass.new dir, :order => @order, :parent => self

  output_file = segment_cache segment
  if output_file && File.exists?(output_file)
    segment.puts "\033[0;35mSkipping\033[0m"
    output = YAML.load(File.read(output_file))[:data]

    if serial?
      @output = output
      @input = output
    elsif parallel?
      @thread_lock.synchronize do
        @output << output
      end
    end
  else
    segment.puts "\033[0;32mRunning\033[0m"

    if serial?
      dispatch(segment, output_file, *args)
      @input = @output
    elsif parallel?
      thread = Thread.new do
        dispatch(segment, output_file, *args)
      end
      @threads << thread
    end
  end
end

#parallel(args = nil, &block) ⇒ Object



74
75
76
# File 'lib/pipelines.rb', line 74

def parallel(args=nil, &block)
  pipeline(:parallel, args, &block)
end

#puts(string = '') ⇒ Object



123
124
125
126
127
128
129
# File 'lib/pipelines.rb', line 123

def puts(string='')
  @thread_lock.synchronize do
    unlocked_puts(string)
  end

  nil # Behave like Kernel.puts
end

#serial(args = nil, &block) ⇒ Object



69
70
71
# File 'lib/pipelines.rb', line 69

def serial(args=nil, &block)
  pipeline(:serial, args, &block)
end

#unlocked_puts(string = '') ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/pipelines.rb', line 132

def unlocked_puts(string='')
  if self.class == Pipeline || @parent.nil? # If @parent is missing, it is in the top level block.
    if @order
      Kernel.puts "\033[32m[#{@order}][#{@invocations}]\033[0m #{string}"
    else
      Kernel.puts "\033[33m[#{@invocations}]\033[0m #{string}"
    end
  else
    if @order
      Kernel.puts "\033[32m[#{@order}][#{@name}][#{@parent.invocations}]\033[0m #{string}"
    else
      Kernel.puts "\033[32m[#{@name}][#{@parent.invocations}]\033[0m #{string}"
    end
  end

  STDOUT.flush
end