Class: RedStorm::DSL::Bolt

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/dsl/bolt.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#collectorObject (readonly)

Returns the value of attribute collector.



12
13
14
# File 'lib/red_storm/dsl/bolt.rb', line 12

def collector
  @collector
end

#configObject (readonly)

Returns the value of attribute config.



12
13
14
# File 'lib/red_storm/dsl/bolt.rb', line 12

def config
  @config
end

#contextObject (readonly)

Returns the value of attribute context.



12
13
14
# File 'lib/red_storm/dsl/bolt.rb', line 12

def context
  @context
end

Class Method Details

.configure(&configure_block) ⇒ Object



24
25
26
# File 'lib/red_storm/dsl/bolt.rb', line 24

def self.configure(&configure_block)
  @configure_block = block_given? ? configure_block : lambda {}
end

.logObject

DSL class methods



16
17
18
# File 'lib/red_storm/dsl/bolt.rb', line 16

def self.log
  @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end

.on_close(method_name = nil, &on_close_block) ⇒ Object



45
46
47
48
# File 'lib/red_storm/dsl/bolt.rb', line 45

def self.on_close(method_name = nil, &on_close_block)
  body = block_given? ? on_close_block : lambda {self.send((method_name || :on_close).to_sym)}
  define_method(:on_close, body)
end

.on_init(method_name = nil, &on_init_block) ⇒ Object



40
41
42
43
# File 'lib/red_storm/dsl/bolt.rb', line 40

def self.on_init(method_name = nil, &on_init_block)
  body = block_given? ? on_init_block : lambda {self.send((method_name || :on_init).to_sym)}
  define_method(:on_init, body)
end

.on_receive(*args, &on_receive_block) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/red_storm/dsl/bolt.rb', line 28

def self.on_receive(*args, &on_receive_block)
  options = args.last.is_a?(Hash) ? args.pop : {}
  method_name = args.first

  self.receive_options.merge!(options)

  # indirecting through a lambda defers the method lookup at invocation time
  # and the performance penalty is negligible
  body = block_given? ? on_receive_block : lambda{|tuple| self.send((method_name || :on_receive).to_sym, tuple)}
  define_method(:on_receive, body)
end

.output_fields(*fields) ⇒ Object



20
21
22
# File 'lib/red_storm/dsl/bolt.rb', line 20

def self.output_fields(*fields)
  @fields = fields.map(&:to_s)
end

Instance Method Details

#ack(tuple) ⇒ Object



64
65
66
# File 'lib/red_storm/dsl/bolt.rb', line 64

def ack(tuple)
  @collector.ack(tuple)
end

#anchored_emit(tuple, *values) ⇒ Object



60
61
62
# File 'lib/red_storm/dsl/bolt.rb', line 60

def anchored_emit(tuple, *values)
  @collector.emit(tuple, Values.new(*values))
end

#cleanupObject



91
92
93
# File 'lib/red_storm/dsl/bolt.rb', line 91

def cleanup
  on_close
end

#declare_output_fields(declarer) ⇒ Object



95
96
97
# File 'lib/red_storm/dsl/bolt.rb', line 95

def declare_output_fields(declarer)
  declarer.declare(Fields.new(self.class.fields))
end

#execute(tuple) ⇒ Object

Bolt proxy interface



74
75
76
77
78
79
80
81
# File 'lib/red_storm/dsl/bolt.rb', line 74

def execute(tuple)
  output = on_receive(tuple)
  if output && self.class.emit?
    values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output
    values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)}
    @collector.ack(tuple) if self.class.ack?
  end
end

#fail(tuple) ⇒ Object



68
69
70
# File 'lib/red_storm/dsl/bolt.rb', line 68

def fail(tuple)
  @collector.fail(tuple)
end

#get_component_configurationObject



99
100
101
102
103
# File 'lib/red_storm/dsl/bolt.rb', line 99

def get_component_configuration
  configurator = Configurator.new
  configurator.instance_exec(&self.class.configure_block)
  configurator.config
end

#logObject

DSL instance methods



52
53
54
# File 'lib/red_storm/dsl/bolt.rb', line 52

def log
  self.class.log
end

#prepare(config, context, collector) ⇒ Object



83
84
85
86
87
88
89
# File 'lib/red_storm/dsl/bolt.rb', line 83

def prepare(config, context, collector)
  @collector = collector
  @context = context
  @config = config

  on_init
end

#unanchored_emit(*values) ⇒ Object



56
57
58
# File 'lib/red_storm/dsl/bolt.rb', line 56

def unanchored_emit(*values)
  @collector.emit(Values.new(*values))
end