Class: RedStorm::SimpleSpout
- Inherits:
-
Object
- Object
- RedStorm::SimpleSpout
- Defined in:
- lib/red_storm/simple_spout.rb
Instance Attribute Summary collapse
-
#collector ⇒ Object
readonly
Returns the value of attribute collector.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
Class Method Summary collapse
-
.configure(&configure_block) ⇒ Object
DSL class methods.
- .log ⇒ Object
- .on_ack(method_name = nil, &on_ack_block) ⇒ Object
- .on_activate(method_name = nil, &on_activate_block) ⇒ Object
- .on_close(method_name = nil, &on_close_block) ⇒ Object
- .on_deactivate(method_name = nil, &on_deactivate_block) ⇒ Object
- .on_fail(method_name = nil, &on_fail_block) ⇒ Object
- .on_init(method_name = nil, &on_init_block) ⇒ Object
- .on_send(*args, &on_send_block) ⇒ Object
- .output_fields(*fields) ⇒ Object
Instance Method Summary collapse
- #ack(msg_id) ⇒ Object
- #activate ⇒ Object
- #close ⇒ Object
- #deactivate ⇒ Object
- #declare_output_fields(declarer) ⇒ Object
- #fail(msg_id) ⇒ Object
- #get_component_configuration ⇒ Object
- #log ⇒ Object
-
#next_tuple ⇒ Object
Spout proxy interface.
- #open(config, context, collector) ⇒ Object
-
#reliable_emit(message_id, *values) ⇒ Object
DSL instance methods.
- #unreliable_emit(*values) ⇒ Object (also: #emit)
Instance Attribute Details
#collector ⇒ Object (readonly)
Returns the value of attribute collector.
7 8 9 |
# File 'lib/red_storm/simple_spout.rb', line 7 def collector @collector end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
7 8 9 |
# File 'lib/red_storm/simple_spout.rb', line 7 def config @config end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
7 8 9 |
# File 'lib/red_storm/simple_spout.rb', line 7 def context @context end |
Class Method Details
.configure(&configure_block) ⇒ Object
DSL class methods
11 12 13 |
# File 'lib/red_storm/simple_spout.rb', line 11 def self.configure(&configure_block) @configure_block = block_given? ? configure_block : lambda {} end |
.log ⇒ Object
15 16 17 |
# File 'lib/red_storm/simple_spout.rb', line 15 def self.log @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end |
.on_ack(method_name = nil, &on_ack_block) ⇒ Object
47 48 49 |
# File 'lib/red_storm/simple_spout.rb', line 47 def self.on_ack(method_name = nil, &on_ack_block) @on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)} end |
.on_activate(method_name = nil, &on_activate_block) ⇒ Object
39 40 41 |
# File 'lib/red_storm/simple_spout.rb', line 39 def self.on_activate(method_name = nil, &on_activate_block) @on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)} end |
.on_close(method_name = nil, &on_close_block) ⇒ Object
35 36 37 |
# File 'lib/red_storm/simple_spout.rb', line 35 def self.on_close(method_name = nil, &on_close_block) @on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)} end |
.on_deactivate(method_name = nil, &on_deactivate_block) ⇒ Object
43 44 45 |
# File 'lib/red_storm/simple_spout.rb', line 43 def self.on_deactivate(method_name = nil, &on_deactivate_block) @on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)} end |
.on_fail(method_name = nil, &on_fail_block) ⇒ Object
51 52 53 |
# File 'lib/red_storm/simple_spout.rb', line 51 def self.on_fail(method_name = nil, &on_fail_block) @on_fail_block = block_given? ? on_fail_block : lambda {|msg_id| self.send(method_name || :on_fail, msg_id)} end |
.on_init(method_name = nil, &on_init_block) ⇒ Object
31 32 33 |
# File 'lib/red_storm/simple_spout.rb', line 31 def self.on_init(method_name = nil, &on_init_block) @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)} end |
.on_send(*args, &on_send_block) ⇒ Object
23 24 25 26 27 28 29 |
# File 'lib/red_storm/simple_spout.rb', line 23 def self.on_send(*args, &on_send_block) = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first self..merge!() @on_send_block = block_given? ? on_send_block : lambda {self.send(method_name || :on_send)} end |
.output_fields(*fields) ⇒ Object
19 20 21 |
# File 'lib/red_storm/simple_spout.rb', line 19 def self.output_fields(*fields) @fields = fields.map(&:to_s) end |
Instance Method Details
#ack(msg_id) ⇒ Object
112 113 114 |
# File 'lib/red_storm/simple_spout.rb', line 112 def ack(msg_id) instance_exec(msg_id, &self.class.on_ack_block) end |
#activate ⇒ Object
100 101 102 |
# File 'lib/red_storm/simple_spout.rb', line 100 def activate instance_exec(&self.class.on_activate_block) end |
#close ⇒ Object
96 97 98 |
# File 'lib/red_storm/simple_spout.rb', line 96 def close instance_exec(&self.class.on_close_block) end |
#deactivate ⇒ Object
104 105 106 |
# File 'lib/red_storm/simple_spout.rb', line 104 def deactivate instance_exec(&self.class.on_deactivate_block) end |
#declare_output_fields(declarer) ⇒ Object
108 109 110 |
# File 'lib/red_storm/simple_spout.rb', line 108 def declare_output_fields(declarer) declarer.declare(Fields.new(self.class.fields)) end |
#fail(msg_id) ⇒ Object
116 117 118 |
# File 'lib/red_storm/simple_spout.rb', line 116 def fail(msg_id) instance_exec(msg_id, &self.class.on_fail_block) end |
#get_component_configuration ⇒ Object
120 121 122 123 124 |
# File 'lib/red_storm/simple_spout.rb', line 120 def get_component_configuration configurator = Configurator.new configurator.instance_exec(&self.class.configure_block) configurator.config end |
#log ⇒ Object
66 67 68 |
# File 'lib/red_storm/simple_spout.rb', line 66 def log self.class.log end |
#next_tuple ⇒ Object
Spout proxy interface
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/red_storm/simple_spout.rb', line 72 def next_tuple output = instance_exec(&self.class.on_send_block) if self.class.emit? if output values = [output].flatten if self.class.reliable? = values.shift reliable_emit(, *values) else unreliable_emit(*values) end else sleep(0.1) end end end |
#open(config, context, collector) ⇒ Object
89 90 91 92 93 94 |
# File 'lib/red_storm/simple_spout.rb', line 89 def open(config, context, collector) @collector = collector @context = context @config = config instance_exec(&self.class.on_init_block) end |
#reliable_emit(message_id, *values) ⇒ Object
DSL instance methods
57 58 59 |
# File 'lib/red_storm/simple_spout.rb', line 57 def reliable_emit(, *values) @collector.emit(Values.new(*values), ) end |
#unreliable_emit(*values) ⇒ Object Also known as: emit
61 62 63 |
# File 'lib/red_storm/simple_spout.rb', line 61 def unreliable_emit(*values) @collector.emit(Values.new(*values)) end |