Class: Ford::Stage
- Inherits:
-
Object
- Object
- Ford::Stage
- Defined in:
- lib/ford/stage.rb
Overview
The class Ford::Stage can be extended by each stage in a pipeline
It has built-in structures and functions that helps building a pipeline
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#item ⇒ Object
Returns the value of attribute item.
-
#logger ⇒ Object
Returns the value of attribute logger.
Class Method Summary collapse
- .inherited(subclass) ⇒ Object
-
.init_stage(options = {}) ⇒ Object
Create a stage in thread mode.
- .queue ⇒ Object
- .queue=(queue) ⇒ Object
Instance Method Summary collapse
-
#consume ⇒ Object
When using the default run, consume should be implemented.
-
#initialize(options = {}) ⇒ Stage
constructor
Initialize the stage.
-
#pop_item ⇒ Object
Pop an item from the queue.
-
#run ⇒ Object
Run the stage.
-
#send_back(item) ⇒ Object
Enqueue an item in the current stage’s queue.
-
#send_to(stage_class, item) ⇒ Object
Enqueue an item in the stage’s queue.
Constructor Details
#initialize(options = {}) ⇒ Stage
Initialize the stage
80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/ford/stage.rb', line 80 def initialize(={}) data = { :debug => false, # If true, logs messages during execution :log_to => STDOUT, # Logging path or IO instance :from_stage => self.class # Reference of the Stage that is used as data input (normally, itself). Will load items from its queue. }.merge() @config = Ford::Config.new(data) # instance configuration @logger = Logger.new(@config.log_to) # instance logger @logger.level = @config.debug ? Logger::DEBUG : Logger::INFO end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
27 28 29 |
# File 'lib/ford/stage.rb', line 27 def config @config end |
#item ⇒ Object
Returns the value of attribute item.
27 28 29 |
# File 'lib/ford/stage.rb', line 27 def item @item end |
#logger ⇒ Object
Returns the value of attribute logger.
27 28 29 |
# File 'lib/ford/stage.rb', line 27 def logger @logger end |
Class Method Details
.inherited(subclass) ⇒ Object
40 41 42 |
# File 'lib/ford/stage.rb', line 40 def self.inherited(subclass) subclass.queue = Queue.new end |
.init_stage(options = {}) ⇒ Object
Create a stage in thread mode
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/ford/stage.rb', line 48 def self.init_stage(={}) = { :threads => 1 }.merge() [:threads].times do |tid| = .clone [:thread_id] = tid # Create a new thread t = Thread.new { obj = nil begin obj = self.new() obj.run rescue Exception => exc obj.logger.fatal("\nFailed to execute the #{self.class}'s thread (#{tid})") obj.logger.fatal("was consuming: #{obj.item}") obj.logger.fatal("#{exc}\n#{exc.backtrace.join("\n")}") end } Ford.threads.push t end end |
.queue ⇒ Object
33 34 35 |
# File 'lib/ford/stage.rb', line 33 def self.queue @queue end |
.queue=(queue) ⇒ Object
36 37 38 |
# File 'lib/ford/stage.rb', line 36 def self.queue=queue @queue = queue end |
Instance Method Details
#consume ⇒ Object
When using the default run, consume should be implemented.
109 110 111 |
# File 'lib/ford/stage.rb', line 109 def consume raise 'Must implement!' end |
#pop_item ⇒ Object
Pop an item from the queue
116 117 118 |
# File 'lib/ford/stage.rb', line 116 def pop_item @config.from_stage.queue.pop end |
#run ⇒ Object
Run the stage
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/ford/stage.rb', line 95 def run while (@item = pop_item) start_consume_at = Time.now logger.debug("Consuming...(#{config.thread_id})") consume logger.debug("Consumed in #{Time.now - start_consume_at} seconds (#{config.thread_id})") end end |
#send_back(item) ⇒ Object
Enqueue an item in the current stage’s queue
131 132 133 134 |
# File 'lib/ford/stage.rb', line 131 def send_back(item) self.class.queue.push item logger.debug("Enqueued back (#{config.thread_id})") end |
#send_to(stage_class, item) ⇒ Object
Enqueue an item in the stage’s queue
123 124 125 126 |
# File 'lib/ford/stage.rb', line 123 def send_to(stage_class, item) stage_class.queue.push item logger.debug("Enqueued into #{stage_class}'s queue (#{config.thread_id})") end |