Class: RedStorm::SimpleSpout

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/simple_spout.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#collectorObject (readonly)

Returns the value of attribute collector.



7
8
9
# File 'lib/red_storm/simple_spout.rb', line 7

def collector
  @collector
end

#configObject (readonly)

Returns the value of attribute config.



7
8
9
# File 'lib/red_storm/simple_spout.rb', line 7

def config
  @config
end

#contextObject (readonly)

Returns the value of attribute context.



7
8
9
# File 'lib/red_storm/simple_spout.rb', line 7

def context
  @context
end

Class Method Details

.configure(&configure_block) ⇒ Object

DSL class methods



11
12
13
# File 'lib/red_storm/simple_spout.rb', line 11

def self.configure(&configure_block)
  @configure_block = block_given? ? configure_block : lambda {}
end

.logObject



15
16
17
# File 'lib/red_storm/simple_spout.rb', line 15

def self.log
  @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end

.on_ack(method_name = nil, &on_ack_block) ⇒ Object



47
48
49
# File 'lib/red_storm/simple_spout.rb', line 47

def self.on_ack(method_name = nil, &on_ack_block)
  @on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)}
end

.on_activate(method_name = nil, &on_activate_block) ⇒ Object



39
40
41
# File 'lib/red_storm/simple_spout.rb', line 39

def self.on_activate(method_name = nil, &on_activate_block)
  @on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)}
end

.on_close(method_name = nil, &on_close_block) ⇒ Object



35
36
37
# File 'lib/red_storm/simple_spout.rb', line 35

def self.on_close(method_name = nil, &on_close_block)
  @on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)}
end

.on_deactivate(method_name = nil, &on_deactivate_block) ⇒ Object



43
44
45
# File 'lib/red_storm/simple_spout.rb', line 43

def self.on_deactivate(method_name = nil, &on_deactivate_block)
  @on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)}
end

.on_fail(method_name = nil, &on_fail_block) ⇒ Object



51
52
53
# File 'lib/red_storm/simple_spout.rb', line 51

def self.on_fail(method_name = nil, &on_fail_block)
  @on_fail_block = block_given? ? on_fail_block : lambda {|msg_id| self.send(method_name || :on_fail, msg_id)}
end

.on_init(method_name = nil, &on_init_block) ⇒ Object



31
32
33
# File 'lib/red_storm/simple_spout.rb', line 31

def self.on_init(method_name = nil, &on_init_block)
  @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)}
end

.on_send(*args, &on_send_block) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/red_storm/simple_spout.rb', line 23

def self.on_send(*args, &on_send_block)
  options = args.last.is_a?(Hash) ? args.pop : {}
  method_name = args.first
  
  self.send_options.merge!(options)
  @on_send_block = block_given? ? on_send_block : lambda {self.send(method_name || :on_send)}
end

.output_fields(*fields) ⇒ Object



19
20
21
# File 'lib/red_storm/simple_spout.rb', line 19

def self.output_fields(*fields)
  @fields = fields.map(&:to_s)
end

Instance Method Details

#ack(msg_id) ⇒ Object



112
113
114
# File 'lib/red_storm/simple_spout.rb', line 112

def ack(msg_id)
  instance_exec(msg_id, &self.class.on_ack_block)
end

#activateObject



100
101
102
# File 'lib/red_storm/simple_spout.rb', line 100

def activate
  instance_exec(&self.class.on_activate_block)
end

#closeObject



96
97
98
# File 'lib/red_storm/simple_spout.rb', line 96

def close
  instance_exec(&self.class.on_close_block)
end

#deactivateObject



104
105
106
# File 'lib/red_storm/simple_spout.rb', line 104

def deactivate
  instance_exec(&self.class.on_deactivate_block)
end

#declare_output_fields(declarer) ⇒ Object



108
109
110
# File 'lib/red_storm/simple_spout.rb', line 108

def declare_output_fields(declarer)
  declarer.declare(Fields.new(self.class.fields))
end

#fail(msg_id) ⇒ Object



116
117
118
# File 'lib/red_storm/simple_spout.rb', line 116

def fail(msg_id)
  instance_exec(msg_id, &self.class.on_fail_block)
end

#get_component_configurationObject



120
121
122
123
124
# File 'lib/red_storm/simple_spout.rb', line 120

def get_component_configuration
  configurator = Configurator.new
  configurator.instance_exec(&self.class.configure_block)
  configurator.config
end

#logObject



66
67
68
# File 'lib/red_storm/simple_spout.rb', line 66

def log
  self.class.log
end

#next_tupleObject

Spout proxy interface



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/red_storm/simple_spout.rb', line 72

def next_tuple
  output = instance_exec(&self.class.on_send_block)
  if self.class.emit?
    if output
      values = [output].flatten
      if self.class.reliable?
        message_id = values.shift
        reliable_emit(message_id, *values)
      else
        unreliable_emit(*values)
      end
    else
      sleep(0.1)
    end
  end
end

#open(config, context, collector) ⇒ Object



89
90
91
92
93
94
# File 'lib/red_storm/simple_spout.rb', line 89

def open(config, context, collector)
  @collector = collector
  @context = context
  @config = config
  instance_exec(&self.class.on_init_block)
end

#reliable_emit(message_id, *values) ⇒ Object

DSL instance methods



57
58
59
# File 'lib/red_storm/simple_spout.rb', line 57

def reliable_emit(message_id, *values)
  @collector.emit(Values.new(*values), message_id) 
end

#unreliable_emit(*values) ⇒ Object Also known as: emit



61
62
63
# File 'lib/red_storm/simple_spout.rb', line 61

def unreliable_emit(*values)
  @collector.emit(Values.new(*values)) 
end