Class: RedStorm::SimpleBolt

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/simple_bolt.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#collectorObject (readonly)

Returns the value of attribute collector.



7
8
9
# File 'lib/red_storm/simple_bolt.rb', line 7

def collector
  @collector
end

#configObject (readonly)

Returns the value of attribute config.



7
8
9
# File 'lib/red_storm/simple_bolt.rb', line 7

def config
  @config
end

#contextObject (readonly)

Returns the value of attribute context.



7
8
9
# File 'lib/red_storm/simple_bolt.rb', line 7

def context
  @context
end

Class Method Details

.configure(&configure_block) ⇒ Object



19
20
21
# File 'lib/red_storm/simple_bolt.rb', line 19

def self.configure(&configure_block)
  @configure_block = block_given? ? configure_block : lambda {}
end

.logObject

DSL class methods



11
12
13
# File 'lib/red_storm/simple_bolt.rb', line 11

def self.log
  @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end

.on_close(method_name = nil, &close_block) ⇒ Object



35
36
37
# File 'lib/red_storm/simple_bolt.rb', line 35

def self.on_close(method_name = nil, &close_block)
  @close_block = block_given? ? close_block : lambda {self.send(method_name || :on_close)}
end

.on_init(method_name = nil, &on_init_block) ⇒ Object



31
32
33
# File 'lib/red_storm/simple_bolt.rb', line 31

def self.on_init(method_name = nil, &on_init_block)
  @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)}
end

.on_receive(*args, &on_receive_block) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/red_storm/simple_bolt.rb', line 23

def self.on_receive(*args, &on_receive_block)
  options = args.last.is_a?(Hash) ? args.pop : {}
  method_name = args.first

  self.receive_options.merge!(options)
  @on_receive_block = block_given? ? on_receive_block : lambda {|tuple| self.send(method_name || :on_receive, tuple)}
end

.output_fields(*fields) ⇒ Object



15
16
17
# File 'lib/red_storm/simple_bolt.rb', line 15

def self.output_fields(*fields)
  @fields = fields.map(&:to_s)
end

Instance Method Details

#ack(tuple) ⇒ Object



53
54
55
# File 'lib/red_storm/simple_bolt.rb', line 53

def ack(tuple)
  @collector.ack(tuple)
end

#anchored_emit(tuple, *values) ⇒ Object



49
50
51
# File 'lib/red_storm/simple_bolt.rb', line 49

def anchored_emit(tuple, *values)
  @collector.emit(tuple, Values.new(*values)) 
end

#cleanupObject



79
80
81
# File 'lib/red_storm/simple_bolt.rb', line 79

def cleanup
  instance_exec(&self.class.close_block)
end

#declare_output_fields(declarer) ⇒ Object



83
84
85
# File 'lib/red_storm/simple_bolt.rb', line 83

def declare_output_fields(declarer)
  declarer.declare(Fields.new(self.class.fields))
end

#execute(tuple) ⇒ Object

Bolt proxy interface



63
64
65
66
67
68
69
70
# File 'lib/red_storm/simple_bolt.rb', line 63

def execute(tuple)
  output = instance_exec(tuple, &self.class.on_receive_block)
  if output && self.class.emit?
    values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output
    values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)}
    @collector.ack(tuple) if self.class.ack?
  end
end

#fail(tuple) ⇒ Object



57
58
59
# File 'lib/red_storm/simple_bolt.rb', line 57

def fail(tuple)
  @collector.fail(tuple)
end

#get_component_configurationObject



87
88
89
90
91
# File 'lib/red_storm/simple_bolt.rb', line 87

def get_component_configuration
  configurator = Configurator.new
  configurator.instance_exec(&self.class.configure_block)
  configurator.config
end

#logObject

DSL instance methods



41
42
43
# File 'lib/red_storm/simple_bolt.rb', line 41

def log
  self.class.log
end

#prepare(config, context, collector) ⇒ Object



72
73
74
75
76
77
# File 'lib/red_storm/simple_bolt.rb', line 72

def prepare(config, context, collector)
  @collector = collector
  @context = context
  @config = config
  instance_exec(&self.class.on_init_block)
end

#unanchored_emit(*values) ⇒ Object



45
46
47
# File 'lib/red_storm/simple_bolt.rb', line 45

def unanchored_emit(*values)
  @collector.emit(Values.new(*values)) 
end