Class: Wukong::Storm::StormRunner
- Inherits:
-
Local::LocalRunner
- Object
- Local::LocalRunner
- Wukong::Storm::StormRunner
- Includes:
- Logging, StormInvocation
- Defined in:
- lib/wukong-storm/storm_runner.rb
Overview
Implements the runner for wu-storm.
Constant Summary collapse
- DEFAULT_KAFKA_PORT =
The default port Kafka is assumed to be running on.
9092
Constants included from StormInvocation
Wukong::Storm::StormInvocation::TOPOLOGY_SUBMITTER_CLASS
Instance Method Summary collapse
Methods included from StormInvocation
#blob_input?, #blob_spout_options, #dataflow_name, #dataflow_options, #file_input?, #file_spout_options, #fully_qualified_class_name, #input_uri, #kafka_input?, #kafka_output?, #kafka_spout_options, #kafka_state_options, #native_storm_options, #output_uri, #s3_endpoint, #s3_input?, #s3_spout_options, #services_options, #spout_options, #state_options, #storm_kill_commandline, #storm_launch_commandline, #storm_runner, #storm_topology_options, #topology_name, #topology_options, #wu_bolt_commandline, #wukong_topology_submitter_jar
Instance Method Details
#kill_first? ⇒ Boolean
54 55 56 |
# File 'lib/wukong-storm/storm_runner.rb', line 54 def kill_first? settings[:rm] end |
#run ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/wukong-storm/storm_runner.rb', line 81 def run log_topology_structure_and_settings setup_run if kill_first? log.debug("Killing topology <#{topology_name}> and waiting <#{settings[:wait]}> seconds...") execute_command(storm_kill_commandline) sleep settings[:wait].to_i unless settings[:dry_run] end execute_command!(storm_launch_commandline) raise Error.new("Failed to launch topology #{topology_name}!") unless settings[:dry_run] || $?.success? end |
#validate ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/wukong-storm/storm_runner.rb', line 58 def validate begin super() rescue => e raise e if dataflow raise Error.new("Must provide a processor or dataflow to run, via either the --run option or as the first argument, or provide an explicit --bolt_command") unless settings[:bolt_command] end raise Error.new("An explicit --input URI is required to launch a dataflow") if settings[:input].nil? || settings[:input].empty? raise Error.new("An explicit --output URI is required to launch a dataflow") if settings[:output].nil? || settings[:output].empty? if kafka_input? || kafka_output? raise Error.new("Must provide a list of comma-separated Kafka hosts") if settings[:kafka_hosts].nil? || settings[:kafka_hosts].empty? end if s3_input? raise Error.new("Must provide an S3 bucket and path") if input_uri.path.nil? || input_uri.path.empty? raise Error.new("Must provide an AWS access key (settings[:aws_key])") if settings[:aws_key].nil? || settings[:aws_key].empty? raise Error.new("Must provide an AWS secret key (settings[:aws_secret])") if settings[:aws_secret].nil? || settings[:aws_secret].empty? raise Error.new("Invalid AWS region: <#{settings[:aws_region]}>") unless s3_endpoint end true end |