Class: RedStorm::SimpleSpout

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/simple_spout.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#collectorObject (readonly)

Returns the value of attribute collector.



6
7
8
# File 'lib/red_storm/simple_spout.rb', line 6

def collector
  @collector
end

#configObject (readonly)

Returns the value of attribute config.



6
7
8
# File 'lib/red_storm/simple_spout.rb', line 6

def config
  @config
end

#contextObject (readonly)

Returns the value of attribute context.



6
7
8
# File 'lib/red_storm/simple_spout.rb', line 6

def context
  @context
end

Class Method Details

.configure(&configure_block) ⇒ Object

DSL class methods



10
11
12
# File 'lib/red_storm/simple_spout.rb', line 10

def self.configure(&configure_block)
  @configure_block = block_given? ? configure_block : lambda {}
end

.logObject



14
15
16
# File 'lib/red_storm/simple_spout.rb', line 14

def self.log
  @log ||= Logger.getLogger(self.name)
end

.on_ack(method_name = nil, &on_ack_block) ⇒ Object



46
47
48
# File 'lib/red_storm/simple_spout.rb', line 46

def self.on_ack(method_name = nil, &on_ack_block)
  @on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)}
end

.on_activate(method_name = nil, &on_activate_block) ⇒ Object



38
39
40
# File 'lib/red_storm/simple_spout.rb', line 38

def self.on_activate(method_name = nil, &on_activate_block)
  @on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)}
end

.on_close(method_name = nil, &on_close_block) ⇒ Object



34
35
36
# File 'lib/red_storm/simple_spout.rb', line 34

def self.on_close(method_name = nil, &on_close_block)
  @on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)}
end

.on_deactivate(method_name = nil, &on_deactivate_block) ⇒ Object



42
43
44
# File 'lib/red_storm/simple_spout.rb', line 42

def self.on_deactivate(method_name = nil, &on_deactivate_block)
  @on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)}
end

.on_fail(method_name = nil, &on_fail_block) ⇒ Object



50
51
52
# File 'lib/red_storm/simple_spout.rb', line 50

def self.on_fail(method_name = nil, &on_fail_block)
  @on_fail_block = block_given? ? on_fail_block : lambda {|msg_id| self.send(method_name || :on_fail, msg_id)}
end

.on_init(method_name = nil, &on_init_block) ⇒ Object



30
31
32
# File 'lib/red_storm/simple_spout.rb', line 30

def self.on_init(method_name = nil, &on_init_block)
  @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)}
end

.on_send(*args, &on_send_block) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/red_storm/simple_spout.rb', line 22

def self.on_send(*args, &on_send_block)
  options = args.last.is_a?(Hash) ? args.pop : {}
  method_name = args.first
  
  self.send_options.merge!(options)
  @on_send_block = block_given? ? on_send_block : lambda {self.send(method_name || :on_send)}
end

.output_fields(*fields) ⇒ Object



18
19
20
# File 'lib/red_storm/simple_spout.rb', line 18

def self.output_fields(*fields)
  @fields = fields.map(&:to_s)
end

Instance Method Details

#ack(msg_id) ⇒ Object



101
102
103
# File 'lib/red_storm/simple_spout.rb', line 101

def ack(msg_id)
  instance_exec(msg_id, &self.class.on_ack_block)
end

#activateObject



89
90
91
# File 'lib/red_storm/simple_spout.rb', line 89

def activate
  instance_exec(&self.class.on_activate_block)
end

#closeObject



85
86
87
# File 'lib/red_storm/simple_spout.rb', line 85

def close
  instance_exec(&self.class.on_close_block)
end

#deactivateObject



93
94
95
# File 'lib/red_storm/simple_spout.rb', line 93

def deactivate
  instance_exec(&self.class.on_deactivate_block)
end

#declare_output_fields(declarer) ⇒ Object



97
98
99
# File 'lib/red_storm/simple_spout.rb', line 97

def declare_output_fields(declarer)
  declarer.declare(Fields.new(self.class.fields))
end

#emit(*values) ⇒ Object

DSL instance methods



56
57
58
# File 'lib/red_storm/simple_spout.rb', line 56

def emit(*values)
  @collector.emit(Values.new(*values)) 
end

#fail(msg_id) ⇒ Object



105
106
107
# File 'lib/red_storm/simple_spout.rb', line 105

def fail(msg_id)
  instance_exec(msg_id, &self.class.on_fail_block)
end

#get_component_configurationObject



109
110
111
112
113
# File 'lib/red_storm/simple_spout.rb', line 109

def get_component_configuration
  configurator = Configurator.new
  configurator.instance_exec(&self.class.configure_block)
  configurator.config
end

#logObject



60
61
62
# File 'lib/red_storm/simple_spout.rb', line 60

def log
  self.class.log
end

#next_tupleObject

Spout proxy interface



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/red_storm/simple_spout.rb', line 66

def next_tuple
  output = instance_exec(&self.class.on_send_block)
  if self.class.emit?
    if output
      values = [output].flatten
      @collector.emit(Values.new(*values))
    else
      sleep(0.1)
    end
  end
end

#open(config, context, collector) ⇒ Object



78
79
80
81
82
83
# File 'lib/red_storm/simple_spout.rb', line 78

def open(config, context, collector)
  @collector = collector
  @context = context
  @config = config
  instance_exec(&self.class.on_init_block)
end