Class: RedStorm::SimpleTopology
- Inherits:
-
Object
- Object
- RedStorm::SimpleTopology
- Defined in:
- lib/red_storm/simple_topology.rb
Direct Known Subclasses
Defined Under Namespace
Classes: BoltDefinition, ComponentDefinition, SpoutDefinition
Constant Summary collapse
- DEFAULT_SPOUT_PARALLELISM =
1
- DEFAULT_BOLT_PARALLELISM =
1
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
LocalCluster reference usable in on_submit block, for example.
Class Method Summary collapse
-
.bolt(bolt_class, *args, &bolt_block) ⇒ Object
def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block).
- .configure(name = nil, &configure_block) ⇒ Object
- .log ⇒ Object
- .on_submit(method_name = nil, &submit_block) ⇒ Object
-
.spout(spout_class, *args, &spout_block) ⇒ Object
def self.spout(spout_class, contructor_args = [], options = {}, &spout_block).
Instance Method Summary collapse
-
#start(base_class_path, env) ⇒ Object
topology proxy interface.
Instance Attribute Details
#cluster ⇒ Object (readonly)
LocalCluster reference usable in on_submit block, for example
11 12 13 |
# File 'lib/red_storm/simple_topology.rb', line 11 def cluster @cluster end |
Class Method Details
.bolt(bolt_class, *args, &bolt_block) ⇒ Object
def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block)
120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/red_storm/simple_topology.rb', line 120 def self.bolt(bolt_class, *args, &bolt_block) set_topology_class! = args.last.is_a?(Hash) ? args.pop : {} contructor_args = !args.empty? ? args.pop : [] = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge() bolt = BoltDefinition.new(bolt_class, contructor_args, [:id], [:parallelism]) raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? bolt.instance_exec(&bolt_block) self.components << bolt end |
.configure(name = nil, &configure_block) ⇒ Object
132 133 134 135 136 |
# File 'lib/red_storm/simple_topology.rb', line 132 def self.configure(name = nil, &configure_block) set_topology_class! @topology_name = name.to_s if name @configure_block = configure_block if block_given? end |
.log ⇒ Object
103 104 105 |
# File 'lib/red_storm/simple_topology.rb', line 103 def self.log @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end |
.on_submit(method_name = nil, &submit_block) ⇒ Object
138 139 140 |
# File 'lib/red_storm/simple_topology.rb', line 138 def self.on_submit(method_name = nil, &submit_block) @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)} end |
.spout(spout_class, *args, &spout_block) ⇒ Object
def self.spout(spout_class, contructor_args = [], options = {}, &spout_block)
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/red_storm/simple_topology.rb', line 108 def self.spout(spout_class, *args, &spout_block) set_topology_class! = args.last.is_a?(Hash) ? args.pop : {} contructor_args = !args.empty? ? args.pop : [] = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge() spout = SpoutDefinition.new(spout_class, contructor_args, [:id], [:parallelism]) spout.instance_exec(&spout_block) if block_given? self.components << spout end |
Instance Method Details
#start(base_class_path, env) ⇒ Object
topology proxy interface
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/red_storm/simple_topology.rb', line 144 def start(base_class_path, env) self.class.resolve_ids!(self.class.components) builder = TopologyBuilder.new self.class.spouts.each do |spout| declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java) declarer.addConfigurations(spout.config) end self.class.bolts.each do |bolt| declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java) declarer.addConfigurations(bolt.config) bolt.define_grouping(declarer) end # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"} configurator = Configurator.new(defaults) configurator.instance_exec(env, &self.class.configure_block) submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology) instance_exec(env, &self.class.submit_block) end |