Class: RedStorm::DSL::Topology

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/dsl/topology.rb

Direct Known Subclasses

DRPCTopology

Defined Under Namespace

Classes: BoltDefinition, ComponentDefinition, SpoutDefinition

Constant Summary collapse

DEFAULT_SPOUT_PARALLELISM =
1
DEFAULT_BOLT_PARALLELISM =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#clusterObject (readonly)

LocalCluster reference usable in on_submit block, for example



13
14
15
# File 'lib/red_storm/dsl/topology.rb', line 13

def cluster
  @cluster
end

Class Method Details

.bolt(bolt_class, *args, &bolt_block) ⇒ Object

def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block)



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/red_storm/dsl/topology.rb', line 120

def self.bolt(bolt_class, *args, &bolt_block)
  set_topology_class!
  options = args.last.is_a?(Hash) ? args.pop : {}
  contructor_args = !args.empty? ? args.pop : []
  bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)

  bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
  raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
  bolt.instance_exec(&bolt_block)
  self.components << bolt
end

.build_topologyObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/red_storm/dsl/topology.rb', line 142

def self.build_topology
  resolve_ids!(components)

  builder = TopologyBuilder.new
  spouts.each do |spout|
    declarer = builder.setSpout(spout.id, spout.new_instance, spout.parallelism.to_java)
    declarer.addConfigurations(spout.config)
  end
  bolts.each do |bolt|
    declarer = builder.setBolt(bolt.id, bolt.new_instance, bolt.parallelism.to_java)
    declarer.addConfigurations(bolt.config)
    bolt.define_grouping(declarer)
  end
  builder.createTopology
end

.configure(name = nil, &configure_block) ⇒ Object



132
133
134
135
136
# File 'lib/red_storm/dsl/topology.rb', line 132

def self.configure(name = nil, &configure_block)
  set_topology_class!
  @topology_name = name.to_s if name
  @configure_block = configure_block if block_given?
end

.logObject



103
104
105
# File 'lib/red_storm/dsl/topology.rb', line 103

def self.log
  @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end

.on_submit(method_name = nil, &submit_block) ⇒ Object



138
139
140
# File 'lib/red_storm/dsl/topology.rb', line 138

def self.on_submit(method_name = nil, &submit_block)
  @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
end

.spout(spout_class, *args, &spout_block) ⇒ Object

def self.spout(spout_class, contructor_args = [], options = {}, &spout_block)



108
109
110
111
112
113
114
115
116
117
# File 'lib/red_storm/dsl/topology.rb', line 108

def self.spout(spout_class, *args, &spout_block)
  set_topology_class!
  options = args.last.is_a?(Hash) ? args.pop : {}
  contructor_args = !args.empty? ? args.pop : []
  spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)

  spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism])
  spout.instance_exec(&spout_block) if block_given?
  self.components << spout
end

Instance Method Details

#start(env) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/red_storm/dsl/topology.rb', line 158

def start(env)
  topology = self.class.build_topology

  # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
  defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}

  configurator = Configurator.new(defaults)
  configurator.instance_exec(env, &self.class.configure_block)

  submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
  submitter.submitTopology(self.class.topology_name, configurator.config, topology)
  instance_exec(env, &self.class.submit_block)
end