Class: RedStorm::DSL::DRPCTopology
Constant Summary
Constants inherited
from Topology
Topology::DEFAULT_BOLT_PARALLELISM, Topology::DEFAULT_SPOUT_PARALLELISM
Instance Attribute Summary
Attributes inherited from Topology
#cluster
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Topology
bolt, build_topology, configure, log, on_submit
Class Method Details
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/red_storm/dsl/drpc_topology.rb', line 75
def self.input_bolt(bolt_class, *args, &bolt_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
|
.spout ⇒ Object
44
45
46
|
# File 'lib/red_storm/dsl/drpc_topology.rb', line 44
def self.spout
raise TopologyDefinitionError, "DRPC spout is already defined"
end
|
Instance Method Details
#start(base_class_path, env) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/red_storm/dsl/drpc_topology.rb', line 48
def start(base_class_path, env)
builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name)
self.class.bolts.each do |bolt|
declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
drpc = nil
if env == :local
drpc = LocalDRPC.new
submitter = @cluster = LocalCluster.new
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc))
else
submitter = StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology)
end
instance_exec(env, drpc, &self.class.submit_block)
end
|