Class: RedStorm::SimpleBolt
- Inherits:
-
Object
- Object
- RedStorm::SimpleBolt
- Defined in:
- lib/red_storm/simple_bolt.rb
Instance Attribute Summary collapse
-
#collector ⇒ Object
readonly
Returns the value of attribute collector.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
Class Method Summary collapse
- .configure(&configure_block) ⇒ Object
-
.log ⇒ Object
DSL class methods.
- .on_close(method_name = nil, &close_block) ⇒ Object
- .on_init(method_name = nil, &on_init_block) ⇒ Object
- .on_receive(*args, &on_receive_block) ⇒ Object
- .output_fields(*fields) ⇒ Object
Instance Method Summary collapse
- #ack(tuple) ⇒ Object
- #anchored_emit(tuple, *values) ⇒ Object
- #cleanup ⇒ Object
- #declare_output_fields(declarer) ⇒ Object
-
#execute(tuple) ⇒ Object
Bolt proxy interface.
- #fail(tuple) ⇒ Object
- #get_component_configuration ⇒ Object
-
#log ⇒ Object
DSL instance methods.
- #prepare(config, context, collector) ⇒ Object
- #unanchored_emit(*values) ⇒ Object
Instance Attribute Details
#collector ⇒ Object (readonly)
Returns the value of attribute collector.
7 8 9 |
# File 'lib/red_storm/simple_bolt.rb', line 7 def collector @collector end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
7 8 9 |
# File 'lib/red_storm/simple_bolt.rb', line 7 def config @config end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
7 8 9 |
# File 'lib/red_storm/simple_bolt.rb', line 7 def context @context end |
Class Method Details
.configure(&configure_block) ⇒ Object
19 20 21 |
# File 'lib/red_storm/simple_bolt.rb', line 19 def self.configure(&configure_block) @configure_block = block_given? ? configure_block : lambda {} end |
.log ⇒ Object
DSL class methods
11 12 13 |
# File 'lib/red_storm/simple_bolt.rb', line 11 def self.log @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end |
.on_close(method_name = nil, &close_block) ⇒ Object
35 36 37 |
# File 'lib/red_storm/simple_bolt.rb', line 35 def self.on_close(method_name = nil, &close_block) @close_block = block_given? ? close_block : lambda {self.send(method_name || :on_close)} end |
.on_init(method_name = nil, &on_init_block) ⇒ Object
31 32 33 |
# File 'lib/red_storm/simple_bolt.rb', line 31 def self.on_init(method_name = nil, &on_init_block) @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)} end |
.on_receive(*args, &on_receive_block) ⇒ Object
23 24 25 26 27 28 29 |
# File 'lib/red_storm/simple_bolt.rb', line 23 def self.on_receive(*args, &on_receive_block) = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first self..merge!() @on_receive_block = block_given? ? on_receive_block : lambda {|tuple| self.send(method_name || :on_receive, tuple)} end |
.output_fields(*fields) ⇒ Object
15 16 17 |
# File 'lib/red_storm/simple_bolt.rb', line 15 def self.output_fields(*fields) @fields = fields.map(&:to_s) end |
Instance Method Details
#ack(tuple) ⇒ Object
53 54 55 |
# File 'lib/red_storm/simple_bolt.rb', line 53 def ack(tuple) @collector.ack(tuple) end |
#anchored_emit(tuple, *values) ⇒ Object
49 50 51 |
# File 'lib/red_storm/simple_bolt.rb', line 49 def anchored_emit(tuple, *values) @collector.emit(tuple, Values.new(*values)) end |
#cleanup ⇒ Object
79 80 81 |
# File 'lib/red_storm/simple_bolt.rb', line 79 def cleanup instance_exec(&self.class.close_block) end |
#declare_output_fields(declarer) ⇒ Object
83 84 85 |
# File 'lib/red_storm/simple_bolt.rb', line 83 def declare_output_fields(declarer) declarer.declare(Fields.new(self.class.fields)) end |
#execute(tuple) ⇒ Object
Bolt proxy interface
63 64 65 66 67 68 69 70 |
# File 'lib/red_storm/simple_bolt.rb', line 63 def execute(tuple) output = instance_exec(tuple, &self.class.on_receive_block) if output && self.class.emit? values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)} @collector.ack(tuple) if self.class.ack? end end |
#fail(tuple) ⇒ Object
57 58 59 |
# File 'lib/red_storm/simple_bolt.rb', line 57 def fail(tuple) @collector.fail(tuple) end |
#get_component_configuration ⇒ Object
87 88 89 90 91 |
# File 'lib/red_storm/simple_bolt.rb', line 87 def get_component_configuration configurator = Configurator.new configurator.instance_exec(&self.class.configure_block) configurator.config end |
#log ⇒ Object
DSL instance methods
41 42 43 |
# File 'lib/red_storm/simple_bolt.rb', line 41 def log self.class.log end |
#prepare(config, context, collector) ⇒ Object
72 73 74 75 76 77 |
# File 'lib/red_storm/simple_bolt.rb', line 72 def prepare(config, context, collector) @collector = collector @context = context @config = config instance_exec(&self.class.on_init_block) end |
#unanchored_emit(*values) ⇒ Object
45 46 47 |
# File 'lib/red_storm/simple_bolt.rb', line 45 def unanchored_emit(*values) @collector.emit(Values.new(*values)) end |