Class: Wukong::Load::SourceDriver
- Inherits:
-
Wukong::Local::StdioDriver
- Object
- Wukong::Local::StdioDriver
- Wukong::Load::SourceDriver
- Includes:
- Logging
- Defined in:
- lib/wukong-load/source_driver.rb
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
Returns the value of attribute batch_size.
-
#index ⇒ Object
Returns the value of attribute index.
Class Method Summary collapse
Instance Method Summary collapse
Instance Attribute Details
#batch_size ⇒ Object
Returns the value of attribute batch_size.
6 7 8 |
# File 'lib/wukong-load/source_driver.rb', line 6 def batch_size @batch_size end |
#index ⇒ Object
Returns the value of attribute index.
6 7 8 |
# File 'lib/wukong-load/source_driver.rb', line 6 def index @index end |
Class Method Details
.start(label, settings = {}) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/wukong-load/source_driver.rb', line 14 def self.start(label, settings={}) driver = new(:foobar, label, settings) driver.post_init period = case when settings[:period] then settings[:period] when settings[:per_sec] then (1.0 / settings[:per_sec]) rescue 1.0 else 1.0 end driver.create_event EventMachine::PeriodicTimer.new(period) { driver.create_event } end |
Instance Method Details
#create_event ⇒ Object
27 28 29 30 31 |
# File 'lib/wukong-load/source_driver.rb', line 27 def create_event receive_line(index.to_s) self.index += 1 finalize_dataflow if self.batch_size && (self.index % self.batch_size) == 0 end |
#post_init ⇒ Object
8 9 10 11 12 |
# File 'lib/wukong-load/source_driver.rb', line 8 def post_init super() self.index = 1 self.batch_size = settings[:batch_size].to_i if settings[:batch_size] && settings[:batch_size].to_i > 0 end |
#process(record) ⇒ Object
:nodoc:
Not sure why I have to add the call to $stdout.flush at the end of this method. Supposedly $stdout.sync is called during the #setup method in StdoutProcessor in wukong/widget/processors. Doesn't that do this?
39 40 41 42 |
# File 'lib/wukong-load/source_driver.rb', line 39 def process record $stdout.puts record $stdout.flush end |