Class: Spark::Context

Inherits:
Object
  • Object
show all
Includes:
Helper::Logger, Helper::Parser, Helper::System
Defined in:
lib/spark/context.rb

Overview

Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helper::Logger

included

Methods included from Helper::Parser

included

Methods included from Helper::System

included

Constructor Details

#initializeContext

Constructor for Ruby context. Configuration is automatically is taken from Spark. Config will be automatically set to default if user start context first.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/spark/context.rb', line 21

def initialize
  Spark.config.valid!
  @jcontext = JavaSparkContext.new(Spark.config.spark_conf)
  @jcontext.addJar(Spark.ruby_spark_jar)

  # Does not work on 1.2
  # ui.attachTab(RubyTab.new(ui, to_java_hash(RbConfig::CONFIG)))

  spark_local_dir = JUtils.getLocalDir(sc.conf)
  @temp_dir = JUtils.createTempDir(spark_local_dir, 'ruby').getAbsolutePath

  accum_server = Spark::Accumulator::Server
  accum_server.start
  @jaccumulator = @jcontext.accumulator(ArrayList.new, RubyAccumulatorParam.new(accum_server.host, accum_server.port))

  log_info("Ruby accumulator server is running on port #{accum_server.port}")

  set_call_site('Ruby') # description of stage
end

Instance Attribute Details

#jaccumulatorObject (readonly)

Returns the value of attribute jaccumulator.



15
16
17
# File 'lib/spark/context.rb', line 15

def jaccumulator
  @jaccumulator
end

#jcontextObject (readonly)

Returns the value of attribute jcontext.



15
16
17
# File 'lib/spark/context.rb', line 15

def jcontext
  @jcontext
end

#temp_dirObject (readonly)

Returns the value of attribute temp_dir.



15
16
17
# File 'lib/spark/context.rb', line 15

def temp_dir
  @temp_dir
end

Instance Method Details

#accumulator(value, accum_param = :+, zero_value = 0) ⇒ Object

Create an Accumulator with the given initial value, using a given accum_param helper object to define how to add values of the data type if provided.

Example:

accum = $sc.accumulator(7)

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(accum: accum)
rdd = rdd.map_partitions(lambda{|_| accum.add(1) })
rdd = rdd.collect

accum.value
# => 11


188
189
190
# File 'lib/spark/context.rb', line 188

def accumulator(value, accum_param=:+, zero_value=0)
  Spark::Accumulator.new(value, accum_param, zero_value)
end

#add_file(*files) ⇒ Object Also known as: addFile

Add a file to be downloaded with this Spark job on every node. The path of file passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.

To access the file in Spark jobs, use ‘SparkFiles.get(file_name)` with the filename to find its download location.

Example:

`echo 10 > test.txt`

$sc.add_file('test.txt')
$sc.parallelize(0..5).map(lambda{|x| x * SparkFiles.get_content('test.txt').to_i}).collect
# => [0, 10, 20, 30, 40, 50]


149
150
151
152
153
# File 'lib/spark/context.rb', line 149

def add_file(*files)
  files.each do |file|
    sc.addFile(file)
  end
end

#broadcast(value) ⇒ Object

Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

Example:

broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2)
rdd = rdd.map_partitions_with_index(lambda{|part, index| [broadcast1.value * index, broadcast2.value * index] })
rdd.collect
# => ["", "", "a", "b", "aa", "bb", "aaa", "bbb"]


169
170
171
# File 'lib/spark/context.rb', line 169

def broadcast(value)
  Spark::Broadcast.new(self, value)
end

#clear_call_siteObject Also known as: clearCallSite



120
121
122
# File 'lib/spark/context.rb', line 120

def clear_call_site
  jcontext.clearCallSite
end

#config(key = nil) ⇒ Object

Return a copy of this SparkContext’s configuration. The configuration cannot be changed at runtime.



127
128
129
130
131
132
133
# File 'lib/spark/context.rb', line 127

def config(key=nil)
  if key
    Spark.config.get(key)
  else
    Spark.config
  end
end

#default_batch_sizeObject



92
93
94
95
96
97
98
99
# File 'lib/spark/context.rb', line 92

def default_batch_size
  size = config('spark.ruby.serializer.batch_size').to_i
  if size >= 1
    size
  else
    'auto'
  end
end

#default_parallelismObject Also known as: defaultParallelism

Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD)



63
64
65
# File 'lib/spark/context.rb', line 63

def default_parallelism
  sc.defaultParallelism
end

#default_serializerObject

Default serializer

Batch -> Compress -> Basic



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/spark/context.rb', line 71

def default_serializer
  # Basic
  serializer = Spark::Serializer.find!(config('spark.ruby.serializer')).new

  # Compress
  if config('spark.ruby.serializer.compress')
    serializer = Spark::Serializer.compressed(serializer)
  end

  # Bactching
  batch_size = default_batch_size
  if batch_size == 'auto'
    serializer = Spark::Serializer.auto_batched(serializer)
  else
    serializer = Spark::Serializer.batched(serializer, batch_size)
  end

  # Finally, "container" contains serializers
  serializer
end

#get_local_property(key) ⇒ Object Also known as: getLocalProperty

Get a local property set in this thread, or null if it is missing



110
111
112
# File 'lib/spark/context.rb', line 110

def get_local_property(key)
  jcontext.getLocalProperty(key)
end

#inspectObject



41
42
43
44
45
# File 'lib/spark/context.rb', line 41

def inspect
  result  = %{#<#{self.class.name}:0x#{object_id}\n}
  result << %{Tempdir: "#{temp_dir}">}
  result
end

#parallelize(data, num_slices = nil, serializer = nil) ⇒ Object

Distribute a local Ruby collection to form an RDD Direct method can be slow so be careful, this method update data inplace

Parameters:

data

Range or Array

num_slices

number of slice

serializer

custom serializer (default: serializer based on configuration)

Examples:

$sc.parallelize(["1", "2", "3"]).map(lambda{|x| x.to_i}).collect
#=> [1, 2, 3]

$sc.parallelize(1..3).map(:to_s).collect
#=> ["1", "2", "3"]


207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/spark/context.rb', line 207

def parallelize(data, num_slices=nil, serializer=nil)
  num_slices ||= default_parallelism
  serializer ||= default_serializer

  serializer.check_each(data)

  # Through file
  file = Tempfile.new('to_parallelize', temp_dir)
  serializer.dump_to_io(data, file)
  file.close # not unlink
  jrdd = RubyRDD.readRDDFromFile(jcontext, file.path, num_slices)

  Spark::RDD.new(jrdd, self, serializer)
ensure
  file && file.unlink
end

#run_job(rdd, f, partitions = nil, allow_local = false) ⇒ Object Also known as: runJob

Executes the given partition function f on the specified set of partitions, returning the result as an array of elements.

If partitions is not specified, this will run over all partitions.

Example:

rdd = $sc.parallelize(0..10, 5)
$sc.run_job(rdd, lambda{|x| x.to_s}, [0,2])
# => ["[0, 1]", "[4, 5]"]


278
279
280
# File 'lib/spark/context.rb', line 278

def run_job(rdd, f, partitions=nil, allow_local=false)
  run_job_with_command(rdd, partitions, allow_local, Spark::Command::MapPartitions, f)
end

#run_job_with_command(rdd, partitions, allow_local, command, *args) ⇒ Object Also known as: runJobWithCommand

Execute the given command on specific set of partitions.



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/spark/context.rb', line 284

def run_job_with_command(rdd, partitions, allow_local, command, *args)
  if !partitions.nil? && !partitions.is_a?(Array)
    raise Spark::ContextError, 'Partitions must be nil or Array'
  end

  partitions_size = rdd.partitions_size

  # Execute all parts
  if partitions.nil?
    partitions = (0...partitions_size).to_a
  end

  # Can happend when you use coalesce
  partitions.delete_if {|part| part >= partitions_size}

  # Rjb represent Fixnum as Integer but Jruby as Long
  partitions = to_java_array_list(convert_to_java_int(partitions))

  # File for result
  file = Tempfile.new('collect', temp_dir)

  mapped = rdd.new_rdd_from_command(command, *args)
  RubyRDD.runJob(rdd.context.sc, mapped.jrdd, partitions, allow_local, file.path)

  mapped.collect_from_file(file)
end

#scObject



53
54
55
# File 'lib/spark/context.rb', line 53

def sc
  @jcontext.sc
end

#set_call_site(site) ⇒ Object Also known as: setCallSite

Support function for API backtraces.



116
117
118
# File 'lib/spark/context.rb', line 116

def set_call_site(site)
  jcontext.setCallSite(site)
end

#set_local_property(key, value) ⇒ Object Also known as: setLocalProperty

Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.



104
105
106
# File 'lib/spark/context.rb', line 104

def set_local_property(key, value)
  jcontext.setLocalProperty(key, value)
end

#stopObject



47
48
49
50
51
# File 'lib/spark/context.rb', line 47

def stop
  Spark::Accumulator::Server.stop
  log_info('Ruby accumulator server was stopped')
  @jcontext.stop
end

#text_file(path, min_partitions = nil, encoding = Encoding::UTF_8, serializer = nil) ⇒ Object Also known as: textFile

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

Example:

f = Tempfile.new("test")
f.puts("1")
f.puts("2")
f.close

$sc.text_file(f.path).map(lambda{|x| x.to_i}).collect
# => [1, 2]


236
237
238
239
240
241
242
# File 'lib/spark/context.rb', line 236

def text_file(path, min_partitions=nil, encoding=Encoding::UTF_8, serializer=nil)
  min_partitions ||= default_parallelism
  serializer     ||= default_serializer
  deserializer     = Spark::Serializer.build { __text__(encoding) }

  Spark::RDD.new(@jcontext.textFile(path, min_partitions), self, serializer, deserializer)
end

#uiObject



57
58
59
# File 'lib/spark/context.rb', line 57

def ui
  sc.ui
end

#whole_text_files(path, min_partitions = nil, serializer = nil) ⇒ Object Also known as: wholeTextFiles

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

Example:

dir = Dir.mktmpdir
f1 = Tempfile.new("test1", dir)
f2 = Tempfile.new("test2", dir)
f1.puts("1"); f1.puts("2");
f2.puts("3"); f2.puts("4");
f1.close
f2.close

$sc.whole_text_files(dir).flat_map(lambda{|key, value| value.split}).collect
# => ["1", "2", "3", "4"]


260
261
262
263
264
265
266
# File 'lib/spark/context.rb', line 260

def whole_text_files(path, min_partitions=nil, serializer=nil)
  min_partitions ||= default_parallelism
  serializer     ||= default_serializer
  deserializer     = Spark::Serializer.build{ __pair__(__text__, __text__) }

  Spark::RDD.new(@jcontext.wholeTextFiles(path, min_partitions), self, serializer, deserializer)
end