Class: Ford::Stage

Inherits:
Object
  • Object
show all
Defined in:
lib/ford/stage.rb

Overview

The class Ford::Stage can be extended by each stage in a pipeline

It has built-in structures and functions that helps building a pipeline

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Stage

Initialize the stage



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/ford/stage.rb', line 80

def initialize(options={})
  data = {
    :debug => false, # If true, logs messages during execution
    :log_to => STDOUT, # Logging path or IO instance
    :from_stage => self.class # Reference of the Stage that is used as data input (normally, itself). Will load items from its queue.
  }.merge(options)
  
  @config = Ford::Config.new(data) # instance configuration
  @logger = Logger.new(@config.log_to) # instance logger
  @logger.level = @config.debug ? Logger::DEBUG : Logger::INFO      
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



27
28
29
# File 'lib/ford/stage.rb', line 27

def config
  @config
end

#itemObject

Returns the value of attribute item.



27
28
29
# File 'lib/ford/stage.rb', line 27

def item
  @item
end

#loggerObject

Returns the value of attribute logger.



27
28
29
# File 'lib/ford/stage.rb', line 27

def logger
  @logger
end

Class Method Details

.inherited(subclass) ⇒ Object



40
41
42
# File 'lib/ford/stage.rb', line 40

def self.inherited(subclass)
  subclass.queue = Queue.new
end

.init_stage(options = {}) ⇒ Object

Create a stage in thread mode



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/ford/stage.rb', line 48

def self.init_stage(options={})
  options = {
    :threads => 1
  }.merge(options)
  
  options[:threads].times do |tid|
    options = options.clone
    options[:thread_id] = tid
    
    # Create a new thread
    t = Thread.new {
      obj = nil
      begin
        obj = self.new(options)
        obj.run
      rescue Exception => exc
        obj.logger.fatal("\nFailed to execute the #{self.class}'s thread (#{tid})")
        obj.logger.fatal("was consuming: #{obj.item}")
        obj.logger.fatal("#{exc}\n#{exc.backtrace.join("\n")}")
      end
    }
    
    Ford.threads.push t
    
  end
  
end

.queueObject



33
34
35
# File 'lib/ford/stage.rb', line 33

def self.queue
  @queue
end

.queue=(queue) ⇒ Object



36
37
38
# File 'lib/ford/stage.rb', line 36

def self.queue=queue
  @queue = queue
end

Instance Method Details

#consumeObject

When using the default run, consume should be implemented.



109
110
111
# File 'lib/ford/stage.rb', line 109

def consume
  raise 'Must implement!'
end

#pop_itemObject

Pop an item from the queue



116
117
118
# File 'lib/ford/stage.rb', line 116

def pop_item
  @config.from_stage.queue.pop
end

#runObject

Run the stage



95
96
97
98
99
100
101
102
103
104
# File 'lib/ford/stage.rb', line 95

def run      
  while (@item = pop_item)
    start_consume_at = Time.now
    
    logger.debug("Consuming...(#{config.thread_id})")
    consume
    
    logger.debug("Consumed in #{Time.now - start_consume_at} seconds (#{config.thread_id})")
  end      
end

#send_back(item) ⇒ Object

Enqueue an item in the current stage’s queue



131
132
133
134
# File 'lib/ford/stage.rb', line 131

def send_back(item)
  self.class.queue.push item
  logger.debug("Enqueued back (#{config.thread_id})")
end

#send_to(stage_class, item) ⇒ Object

Enqueue an item in the stage’s queue



123
124
125
126
# File 'lib/ford/stage.rb', line 123

def send_to(stage_class, item)
  stage_class.queue.push item
  logger.debug("Enqueued into #{stage_class}'s queue (#{config.thread_id})")
end