Class: Spark::Config

Inherits:
Object
  • Object
show all
Includes:
Helper::System
Defined in:
lib/spark/config.rb

Overview

Common configuration for RubySpark and Spark

Constant Summary collapse

TYPES =
{
  'spark.shuffle.spill' => :boolean,
  'spark.ruby.serializer.compress' => :boolean
}

Instance Method Summary collapse

Methods included from Helper::System

included

Constructor Details

#initializeConfig

Initialize java SparkConf and load default configuration.



16
17
18
19
20
# File 'lib/spark/config.rb', line 16

def initialize
  @spark_conf = SparkConf.new(true)
  set_default
  from_file(Spark::DEFAULT_CONFIG_FILE)
end

Instance Method Details

#[](key) ⇒ Object



31
32
33
# File 'lib/spark/config.rb', line 31

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



35
36
37
# File 'lib/spark/config.rb', line 35

def []=(key, value)
  set(key, value)
end

#contains?(key) ⇒ Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/spark/config.rb', line 104

def contains?(key)
  spark_conf.contains(key.to_s)
end

#default_executor_commandObject

Command template which is applied when scala want create a ruby process (e.g. master, home request). Command is represented by ā€˜%sā€™.

Example:

bash --norc -i -c "export HOME=/home/user; cd; source .bashrc; %s"


169
170
171
# File 'lib/spark/config.rb', line 169

def default_executor_command
  ENV['SPARK_RUBY_EXECUTOR_COMMAND'] || '%s'
end

#default_executor_optionsObject

Options for every worker.

Example:

-J-Xmx512m


178
179
180
# File 'lib/spark/config.rb', line 178

def default_executor_options
  ENV['SPARK_RUBY_EXECUTOR_OPTIONS'] || ''
end

#default_serializerObject



151
152
153
# File 'lib/spark/config.rb', line 151

def default_serializer
  ENV['SPARK_RUBY_SERIALIZER'] || Spark::Serializer::DEFAULT_SERIALIZER_NAME
end

#default_serializer_batch_sizeObject



159
160
161
# File 'lib/spark/config.rb', line 159

def default_serializer_batch_size
  ENV['SPARK_RUBY_SERIALIZER_BATCH_SIZE'] || Spark::Serializer::DEFAULT_BATCH_SIZE
end

#default_serializer_compressObject



155
156
157
# File 'lib/spark/config.rb', line 155

def default_serializer_compress
  ENV['SPARK_RUBY_SERIALIZER_COMPRESS'] || Spark::Serializer::DEFAULT_COMPRESS
end

#default_worker_typeObject

Type of worker.

Options:

process

(default)

thread

(experimental)



198
199
200
# File 'lib/spark/config.rb', line 198

def default_worker_type
  ENV['SPARK_RUBY_WORKER_TYPE'] || 'process'
end

#from_file(file) ⇒ Object



22
23
24
25
26
27
28
29
# File 'lib/spark/config.rb', line 22

def from_file(file)
  check_read_only

  if file && File.exist?(file)
    file = File.expand_path(file)
    RubyUtils.loadPropertiesFile(spark_conf, file)
  end
end

#get(key) ⇒ Object

Rescue from NoSuchElementException



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/spark/config.rb', line 85

def get(key)
  value = spark_conf.get(key.to_s)

  case TYPES[key]
  when :boolean
    parse_boolean(value)
  when :integer
    parse_integer(value)
  else
    value
  end
rescue
  nil
end

#get_allObject Also known as: getAll



100
101
102
# File 'lib/spark/config.rb', line 100

def get_all
  Hash[spark_conf.getAll.map{|tuple| [tuple._1, tuple._2]}]
end

#load_executor_envsObject

Load environment variables for executor from ENV.

Examples:

SPARK_RUBY_EXECUTOR_ENV_KEY1="1"
SPARK_RUBY_EXECUTOR_ENV_KEY2="2"


208
209
210
211
212
213
214
215
216
217
218
# File 'lib/spark/config.rb', line 208

def load_executor_envs
  prefix = 'SPARK_RUBY_EXECUTOR_ENV_'

  envs = ENV.select{|key, _| key.start_with?(prefix)}
  envs.each do |key, value|
    key = key.dup # ENV keys are frozen
    key.slice!(0, prefix.size)

    set("spark.ruby.executor.env.#{key}", value)
  end
end

#parse_boolean(value) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/spark/config.rb', line 121

def parse_boolean(value)
  case value
  when 'true'
    true
  when 'false'
    false
  end
end

#parse_integer(value) ⇒ Object



130
131
132
# File 'lib/spark/config.rb', line 130

def parse_integer(value)
  value.to_i
end

#read_only?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/spark/config.rb', line 80

def read_only?
  Spark.started?
end

#set(key, value) ⇒ Object



108
109
110
111
# File 'lib/spark/config.rb', line 108

def set(key, value)
  check_read_only
  spark_conf.set(key.to_s, value.to_s)
end

#set_app_name(name) ⇒ Object Also known as: setAppName



113
114
115
# File 'lib/spark/config.rb', line 113

def set_app_name(name)
  set('spark.app.name', name)
end

#set_defaultObject

Defaults



137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/spark/config.rb', line 137

def set_default
  set_app_name('RubySpark')
  set_master('local[*]')
  set('spark.ruby.driver_home', Spark.home)
  set('spark.ruby.serializer', default_serializer)
  set('spark.ruby.serializer.compress', default_serializer_compress)
  set('spark.ruby.serializer.batch_size', default_serializer_batch_size)
  set('spark.ruby.executor.command', default_executor_command)
  set('spark.ruby.executor.options', default_executor_options)
  set('spark.ruby.worker.type', default_worker_type)
  load_executor_envs
  # set('spark.ruby.executor.install', default_executor_install)
end

#set_master(master) ⇒ Object Also known as: setMaster



117
118
119
# File 'lib/spark/config.rb', line 117

def set_master(master)
  set('spark.master', master)
end

#spark_confObject



39
40
41
42
43
44
45
46
# File 'lib/spark/config.rb', line 39

def spark_conf
  if Spark.started?
    # Get latest configuration
    Spark.context.jcontext.conf
  else
    @spark_conf
  end
end

#valid!Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/spark/config.rb', line 48

def valid!
  errors = []

  if !contains?('spark.app.name')
    errors << 'An application name must be set in your configuration.'
  end

  if !contains?('spark.master')
    errors << 'A master URL must be set in your configuration.'
  end

  if Spark::Serializer.find(get('spark.ruby.serializer')).nil?
    errors << 'Unknow serializer.'
  end

  scanned = get('spark.ruby.executor.command').scan('%s')

  if scanned.size == 0
    errors << "Executor command must contain '%s'."
  end

  if scanned.size > 1
    errors << "Executor command can contain only one '%s'."
  end

  if errors.any?
    errors.map!{|error| "- #{error}"}

    raise Spark::ConfigurationError, "Configuration is not valid:\r\n#{errors.join("\r\n")}"
  end
end