Class: RedStorm::DSL::Topology
- Inherits:
-
Object
- Object
- RedStorm::DSL::Topology
show all
- Defined in:
- lib/red_storm/dsl/topology.rb
Defined Under Namespace
Classes: BoltDefinition, ComponentDefinition, SpoutDefinition
Constant Summary
collapse
- DEFAULT_SPOUT_PARALLELISM =
1
- DEFAULT_BOLT_PARALLELISM =
1
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
LocalCluster reference usable in on_submit block, for example.
Class Method Summary
collapse
-
.bolt(bolt_class, *args, &bolt_block) ⇒ Object
def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block).
-
.build_topology ⇒ Object
-
.configure(name = nil, &configure_block) ⇒ Object
-
.log ⇒ Object
-
.on_submit(method_name = nil, &submit_block) ⇒ Object
-
.spout(spout_class, *args, &spout_block) ⇒ Object
def self.spout(spout_class, contructor_args = [], options = {}, &spout_block).
Instance Method Summary
collapse
Instance Attribute Details
#cluster ⇒ Object
LocalCluster reference usable in on_submit block, for example
13
14
15
|
# File 'lib/red_storm/dsl/topology.rb', line 13
def cluster
@cluster
end
|
Class Method Details
.bolt(bolt_class, *args, &bolt_block) ⇒ Object
def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block)
120
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/red_storm/dsl/topology.rb', line 120
def self.bolt(bolt_class, *args, &bolt_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
|
.build_topology ⇒ Object
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/red_storm/dsl/topology.rb', line 142
def self.build_topology
resolve_ids!(components)
builder = TopologyBuilder.new
spouts.each do |spout|
declarer = builder.setSpout(spout.id, spout.new_instance, spout.parallelism.to_java)
declarer.addConfigurations(spout.config)
end
bolts.each do |bolt|
declarer = builder.setBolt(bolt.id, bolt.new_instance, bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
builder.createTopology
end
|
132
133
134
135
136
|
# File 'lib/red_storm/dsl/topology.rb', line 132
def self.configure(name = nil, &configure_block)
set_topology_class!
@topology_name = name.to_s if name
@configure_block = configure_block if block_given?
end
|
.log ⇒ Object
103
104
105
|
# File 'lib/red_storm/dsl/topology.rb', line 103
def self.log
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
|
.on_submit(method_name = nil, &submit_block) ⇒ Object
138
139
140
|
# File 'lib/red_storm/dsl/topology.rb', line 138
def self.on_submit(method_name = nil, &submit_block)
@submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
end
|
.spout(spout_class, *args, &spout_block) ⇒ Object
def self.spout(spout_class, contructor_args = [], options = {}, &spout_block)
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/red_storm/dsl/topology.rb', line 108
def self.spout(spout_class, *args, &spout_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)
spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism])
spout.instance_exec(&spout_block) if block_given?
self.components << spout
end
|
Instance Method Details
#start(env) ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/red_storm/dsl/topology.rb', line 158
def start(env)
topology = self.class.build_topology
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, topology)
instance_exec(env, &self.class.submit_block)
end
|