Class: RedStorm::DSL::Spout

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/dsl/spout.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#collectorObject (readonly)

Returns the value of attribute collector.



12
13
14
# File 'lib/red_storm/dsl/spout.rb', line 12

def collector
  @collector
end

#configObject (readonly)

Returns the value of attribute config.



12
13
14
# File 'lib/red_storm/dsl/spout.rb', line 12

def config
  @config
end

#contextObject (readonly)

Returns the value of attribute context.



12
13
14
# File 'lib/red_storm/dsl/spout.rb', line 12

def context
  @context
end

Class Method Details

.configure(&configure_block) ⇒ Object

DSL class methods



16
17
18
# File 'lib/red_storm/dsl/spout.rb', line 16

def self.configure(&configure_block)
  @configure_block = block_given? ? configure_block : lambda {}
end

.logObject



20
21
22
# File 'lib/red_storm/dsl/spout.rb', line 20

def self.log
  @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end

.on_ack(method_name = nil, &on_ack_block) ⇒ Object



62
63
64
65
# File 'lib/red_storm/dsl/spout.rb', line 62

def self.on_ack(method_name = nil, &on_ack_block)
  body = block_given? ? on_ack_block : lambda {|msg_id| self.send((method_name || :on_ack).to_sym, msg_id)}
  define_method(:on_ack, body)
end

.on_activate(method_name = nil, &on_activate_block) ⇒ Object



50
51
52
53
54
# File 'lib/red_storm/dsl/spout.rb', line 50

def self.on_activate(method_name = nil, &on_activate_block)
  # @on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)}
  body = block_given? ? on_activate_block : lambda {self.send((method_name || :on_activate).to_sym)}
  define_method(:on_activate, body)
end

.on_close(method_name = nil, &on_close_block) ⇒ Object



45
46
47
48
# File 'lib/red_storm/dsl/spout.rb', line 45

def self.on_close(method_name = nil, &on_close_block)
  body = block_given? ? on_close_block : lambda {self.send((method_name || :on_close).to_sym)}
  define_method(:on_close, body)
end

.on_deactivate(method_name = nil, &on_deactivate_block) ⇒ Object



56
57
58
59
60
# File 'lib/red_storm/dsl/spout.rb', line 56

def self.on_deactivate(method_name = nil, &on_deactivate_block)
  # @on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)}
  body = block_given? ? on_deactivate_block : lambda {self.send((method_name || :on_deactivate).to_sym)}
  define_method(:on_deactivate, body)
end

.on_fail(method_name = nil, &on_fail_block) ⇒ Object



67
68
69
70
# File 'lib/red_storm/dsl/spout.rb', line 67

def self.on_fail(method_name = nil, &on_fail_block)
  body = block_given? ? on_fail_block : lambda {|msg_id| self.send((method_name || :on_fail).to_sym, msg_id)}
  define_method(:on_fail, body)
end

.on_init(method_name = nil, &on_init_block) ⇒ Object



40
41
42
43
# File 'lib/red_storm/dsl/spout.rb', line 40

def self.on_init(method_name = nil, &on_init_block)
  body = block_given? ? on_init_block : lambda {self.send((method_name || :on_init).to_sym)}
  define_method(:on_init, body)
end

.on_send(*args, &on_send_block) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/red_storm/dsl/spout.rb', line 28

def self.on_send(*args, &on_send_block)
  options = args.last.is_a?(Hash) ? args.pop : {}
  method_name = args.first

  self.send_options.merge!(options)

  # indirecting through a lambda defers the method lookup at invocation time
  # and the performance penalty is negligible
  body = block_given? ? on_send_block : lambda{self.send((method_name || :on_send).to_sym)}
  define_method(:on_send, body)
end

.output_fields(*fields) ⇒ Object



24
25
26
# File 'lib/red_storm/dsl/spout.rb', line 24

def self.output_fields(*fields)
  @fields = fields.map(&:to_s)
end

Instance Method Details

#ack(msg_id) ⇒ Object



131
132
133
# File 'lib/red_storm/dsl/spout.rb', line 131

def ack(msg_id)
  on_ack(msg_id)
end

#activateObject



119
120
121
# File 'lib/red_storm/dsl/spout.rb', line 119

def activate
  on_activate
end

#closeObject



115
116
117
# File 'lib/red_storm/dsl/spout.rb', line 115

def close
  on_close
end

#deactivateObject



123
124
125
# File 'lib/red_storm/dsl/spout.rb', line 123

def deactivate
  on_deactivate
end

#declare_output_fields(declarer) ⇒ Object



127
128
129
# File 'lib/red_storm/dsl/spout.rb', line 127

def declare_output_fields(declarer)
  declarer.declare(Fields.new(self.class.fields))
end

#fail(msg_id) ⇒ Object



135
136
137
# File 'lib/red_storm/dsl/spout.rb', line 135

def fail(msg_id)
  on_fail(msg_id)
end

#get_component_configurationObject



139
140
141
142
143
# File 'lib/red_storm/dsl/spout.rb', line 139

def get_component_configuration
  configurator = Configurator.new
  configurator.instance_exec(&self.class.configure_block)
  configurator.config
end

#logObject



83
84
85
# File 'lib/red_storm/dsl/spout.rb', line 83

def log
  self.class.log
end

#next_tupleObject

Spout proxy interface



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/red_storm/dsl/spout.rb', line 89

def next_tuple
  output = on_send

  if self.class.emit?
    if output
      values = [output].flatten
      if self.class.reliable?
        message_id = values.shift
        reliable_emit(message_id, *values)
      else
        unreliable_emit(*values)
      end
    else
      sleep(0.1)
    end
  end
end

#open(config, context, collector) ⇒ Object



107
108
109
110
111
112
113
# File 'lib/red_storm/dsl/spout.rb', line 107

def open(config, context, collector)
  @collector = collector
  @context = context
  @config = config

  on_init
end

#reliable_emit(message_id, *values) ⇒ Object

DSL instance methods



74
75
76
# File 'lib/red_storm/dsl/spout.rb', line 74

def reliable_emit(message_id, *values)
  @collector.emit(Values.new(*values), message_id)
end

#unreliable_emit(*values) ⇒ Object Also known as: emit



78
79
80
# File 'lib/red_storm/dsl/spout.rb', line 78

def unreliable_emit(*values)
  @collector.emit(Values.new(*values))
end