Class: RedStorm::SimpleTopology

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/simple_topology.rb

Direct Known Subclasses

SimpleDRPCTopology

Defined Under Namespace

Classes: BoltDefinition, ComponentDefinition, SpoutDefinition

Constant Summary collapse

DEFAULT_SPOUT_PARALLELISM =
1
DEFAULT_BOLT_PARALLELISM =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#clusterObject (readonly)

LocalCluster reference usable in on_submit block, for example



11
12
13
# File 'lib/red_storm/simple_topology.rb', line 11

def cluster
  @cluster
end

Class Method Details

.bolt(bolt_class, *args, &bolt_block) ⇒ Object

def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block)



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/red_storm/simple_topology.rb', line 120

def self.bolt(bolt_class, *args, &bolt_block)
  set_topology_class!
  options = args.last.is_a?(Hash) ? args.pop : {}
  contructor_args = !args.empty? ? args.pop : []
  bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)

  bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
  raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
  bolt.instance_exec(&bolt_block)
  self.components << bolt
end

.configure(name = nil, &configure_block) ⇒ Object



132
133
134
135
136
# File 'lib/red_storm/simple_topology.rb', line 132

def self.configure(name = nil, &configure_block)
  set_topology_class!
  @topology_name = name.to_s if name
  @configure_block = configure_block if block_given?
end

.logObject



103
104
105
# File 'lib/red_storm/simple_topology.rb', line 103

def self.log
  @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end

.on_submit(method_name = nil, &submit_block) ⇒ Object



138
139
140
# File 'lib/red_storm/simple_topology.rb', line 138

def self.on_submit(method_name = nil, &submit_block)
  @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
end

.spout(spout_class, *args, &spout_block) ⇒ Object

def self.spout(spout_class, contructor_args = [], options = {}, &spout_block)



108
109
110
111
112
113
114
115
116
117
# File 'lib/red_storm/simple_topology.rb', line 108

def self.spout(spout_class, *args, &spout_block)
  set_topology_class!
  options = args.last.is_a?(Hash) ? args.pop : {}
  contructor_args = !args.empty? ? args.pop : []
  spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)

  spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism])
  spout.instance_exec(&spout_block) if block_given?
  self.components << spout
end

Instance Method Details

#start(base_class_path, env) ⇒ Object

topology proxy interface



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/red_storm/simple_topology.rb', line 144

def start(base_class_path, env)
  self.class.resolve_ids!(self.class.components)

  builder = TopologyBuilder.new
  self.class.spouts.each do |spout|
    declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java)
    declarer.addConfigurations(spout.config)
  end
  self.class.bolts.each do |bolt|
    declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java)
    declarer.addConfigurations(bolt.config)
    bolt.define_grouping(declarer)
  end

  # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
  defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}

  configurator = Configurator.new(defaults)
  configurator.instance_exec(env, &self.class.configure_block)
 
  submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
  submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
  instance_exec(env, &self.class.submit_block)
end