Class: Wukong::Storm::StormRunner

Inherits:
Local::LocalRunner
  • Object
show all
Includes:
Logging, StormInvocation
Defined in:
lib/wukong-storm/storm_runner.rb

Overview

Implements the runner for wu-storm.

Constant Summary collapse

DEFAULT_KAFKA_PORT =

The default port Kafka is assumed to be running on.

9092

Constants included from StormInvocation

Wukong::Storm::StormInvocation::TOPOLOGY_SUBMITTER_CLASS

Instance Method Summary collapse

Methods included from StormInvocation

#blob_input?, #blob_spout_options, #dataflow_name, #dataflow_options, #file_input?, #file_spout_options, #fully_qualified_class_name, #input_uri, #kafka_input?, #kafka_output?, #kafka_spout_options, #kafka_state_options, #native_storm_options, #output_uri, #s3_endpoint, #s3_input?, #s3_spout_options, #services_options, #spout_options, #state_options, #storm_kill_commandline, #storm_launch_commandline, #storm_runner, #storm_topology_options, #topology_name, #topology_options, #wu_bolt_commandline, #wukong_topology_submitter_jar

Instance Method Details

#kill_first?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/wukong-storm/storm_runner.rb', line 54

def kill_first?
  settings[:rm]
end

#runObject

Raises:

  • (Error)


81
82
83
84
85
86
87
88
89
90
91
# File 'lib/wukong-storm/storm_runner.rb', line 81

def run
  log_topology_structure_and_settings
  setup_run
  if kill_first?
    log.debug("Killing topology <#{topology_name}> and waiting <#{settings[:wait]}> seconds...")
    execute_command(storm_kill_commandline)
    sleep settings[:wait].to_i unless settings[:dry_run]
  end
  execute_command!(storm_launch_commandline)
  raise Error.new("Failed to launch topology #{topology_name}!") unless settings[:dry_run] || $?.success?
end

#validateObject

Raises:

  • (Error)


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/wukong-storm/storm_runner.rb', line 58

def validate
  begin
    super()
  rescue => e
    raise e if dataflow
    raise Error.new("Must provide a processor or dataflow to run, via either the --run option or as the first argument, or provide an explicit --bolt_command") unless settings[:bolt_command]
  end
  raise Error.new("An explicit --input URI is required to launch a dataflow")  if settings[:input].nil?  || settings[:input].empty?
  raise Error.new("An explicit --output URI is required to launch a dataflow") if settings[:output].nil? || settings[:output].empty?

  if kafka_input? || kafka_output?
    raise Error.new("Must provide a list of comma-separated Kafka hosts")          if settings[:kafka_hosts].nil? || settings[:kafka_hosts].empty?
  end
  
  if s3_input?
    raise Error.new("Must provide an S3 bucket and path")                     if input_uri.path.nil?        || input_uri.path.empty?
    raise Error.new("Must provide an AWS access key (settings[:aws_key])")    if settings[:aws_key].nil?    || settings[:aws_key].empty?
    raise Error.new("Must provide an AWS secret key (settings[:aws_secret])") if settings[:aws_secret].nil? || settings[:aws_secret].empty?
    raise Error.new("Invalid AWS region: <#{settings[:aws_region]}>") unless s3_endpoint
  end
  true
end