Class: Spark::Context
- Inherits:
-
Object
- Object
- Spark::Context
- Includes:
- Helper::Logger, Helper::Parser, Helper::System
- Defined in:
- lib/spark/context.rb
Overview
Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Instance Attribute Summary collapse
-
#jaccumulator ⇒ Object
readonly
Returns the value of attribute jaccumulator.
-
#jcontext ⇒ Object
readonly
Returns the value of attribute jcontext.
-
#temp_dir ⇒ Object
readonly
Returns the value of attribute temp_dir.
Instance Method Summary collapse
-
#accumulator(value, accum_param = :+, zero_value = 0) ⇒ Object
Create an Accumulator with the given initial value, using a given accum_param helper object to define how to add values of the data type if provided.
-
#add_file(*files) ⇒ Object
(also: #addFile)
Add a file to be downloaded with this Spark job on every node.
-
#broadcast(value) ⇒ Object
Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions.
- #clear_call_site ⇒ Object (also: #clearCallSite)
-
#config(key = nil) ⇒ Object
Return a copy of this SparkContext’s configuration.
- #default_batch_size ⇒ Object
-
#default_parallelism ⇒ Object
(also: #defaultParallelism)
Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD).
-
#default_serializer ⇒ Object
Default serializer.
-
#get_local_property(key) ⇒ Object
(also: #getLocalProperty)
Get a local property set in this thread, or null if it is missing.
-
#initialize ⇒ Context
constructor
Constructor for Ruby context.
- #inspect ⇒ Object
-
#parallelize(data, num_slices = nil, serializer = nil) ⇒ Object
Distribute a local Ruby collection to form an RDD Direct method can be slow so be careful, this method update data inplace.
-
#run_job(rdd, f, partitions = nil, allow_local = false) ⇒ Object
(also: #runJob)
Executes the given partition function f on the specified set of partitions, returning the result as an array of elements.
-
#run_job_with_command(rdd, partitions, allow_local, command, *args) ⇒ Object
(also: #runJobWithCommand)
Execute the given command on specific set of partitions.
- #sc ⇒ Object
-
#set_call_site(site) ⇒ Object
(also: #setCallSite)
Support function for API backtraces.
-
#set_local_property(key, value) ⇒ Object
(also: #setLocalProperty)
Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
- #stop ⇒ Object
-
#text_file(path, min_partitions = nil, encoding = Encoding::UTF_8, serializer = nil) ⇒ Object
(also: #textFile)
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
- #ui ⇒ Object
-
#whole_text_files(path, min_partitions = nil, serializer = nil) ⇒ Object
(also: #wholeTextFiles)
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI.
Methods included from Helper::Logger
Methods included from Helper::Parser
Methods included from Helper::System
Constructor Details
#initialize ⇒ Context
Constructor for Ruby context. Configuration is automatically is taken from Spark. Config will be automatically set to default if user start context first.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/spark/context.rb', line 21 def initialize Spark.config.valid! @jcontext = JavaSparkContext.new(Spark.config.spark_conf) @jcontext.addJar(Spark.ruby_spark_jar) # Does not work on 1.2 # ui.attachTab(RubyTab.new(ui, to_java_hash(RbConfig::CONFIG))) spark_local_dir = JUtils.getLocalDir(sc.conf) @temp_dir = JUtils.createTempDir(spark_local_dir, 'ruby').getAbsolutePath accum_server = Spark::Accumulator::Server accum_server.start @jaccumulator = @jcontext.accumulator(ArrayList.new, RubyAccumulatorParam.new(accum_server.host, accum_server.port)) log_info("Ruby accumulator server is running on port #{accum_server.port}") set_call_site('Ruby') # description of stage end |
Instance Attribute Details
#jaccumulator ⇒ Object (readonly)
Returns the value of attribute jaccumulator.
15 16 17 |
# File 'lib/spark/context.rb', line 15 def jaccumulator @jaccumulator end |
#jcontext ⇒ Object (readonly)
Returns the value of attribute jcontext.
15 16 17 |
# File 'lib/spark/context.rb', line 15 def jcontext @jcontext end |
#temp_dir ⇒ Object (readonly)
Returns the value of attribute temp_dir.
15 16 17 |
# File 'lib/spark/context.rb', line 15 def temp_dir @temp_dir end |
Instance Method Details
#accumulator(value, accum_param = :+, zero_value = 0) ⇒ Object
Create an Accumulator with the given initial value, using a given accum_param helper object to define how to add values of the data type if provided.
Example:
accum = $sc.accumulator(7)
rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(accum: accum)
rdd = rdd.map_partitions(lambda{|_| accum.add(1) })
rdd = rdd.collect
accum.value
# => 11
188 189 190 |
# File 'lib/spark/context.rb', line 188 def accumulator(value, accum_param=:+, zero_value=0) Spark::Accumulator.new(value, accum_param, zero_value) end |
#add_file(*files) ⇒ Object Also known as: addFile
Add a file to be downloaded with this Spark job on every node. The path of file passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.
To access the file in Spark jobs, use ‘SparkFiles.get(file_name)` with the filename to find its download location.
Example:
`echo 10 > test.txt`
$sc.add_file('test.txt')
$sc.parallelize(0..5).map(lambda{|x| x * SparkFiles.get_content('test.txt').to_i}).collect
# => [0, 10, 20, 30, 40, 50]
149 150 151 152 153 |
# File 'lib/spark/context.rb', line 149 def add_file(*files) files.each do |file| sc.addFile(file) end end |
#broadcast(value) ⇒ Object
Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
Example:
broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')
rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2)
rdd = rdd.map_partitions_with_index(lambda{|part, index| [broadcast1.value * index, broadcast2.value * index] })
rdd.collect
# => ["", "", "a", "b", "aa", "bb", "aaa", "bbb"]
169 170 171 |
# File 'lib/spark/context.rb', line 169 def broadcast(value) Spark::Broadcast.new(self, value) end |
#clear_call_site ⇒ Object Also known as: clearCallSite
120 121 122 |
# File 'lib/spark/context.rb', line 120 def clear_call_site jcontext.clearCallSite end |
#config(key = nil) ⇒ Object
Return a copy of this SparkContext’s configuration. The configuration cannot be changed at runtime.
127 128 129 130 131 132 133 |
# File 'lib/spark/context.rb', line 127 def config(key=nil) if key Spark.config.get(key) else Spark.config end end |
#default_batch_size ⇒ Object
92 93 94 95 96 97 98 99 |
# File 'lib/spark/context.rb', line 92 def default_batch_size size = config('spark.ruby.serializer.batch_size').to_i if size >= 1 size else 'auto' end end |
#default_parallelism ⇒ Object Also known as: defaultParallelism
Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD)
63 64 65 |
# File 'lib/spark/context.rb', line 63 def default_parallelism sc.defaultParallelism end |
#default_serializer ⇒ Object
Default serializer
Batch -> Compress -> Basic
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/spark/context.rb', line 71 def default_serializer # Basic serializer = Spark::Serializer.find!(config('spark.ruby.serializer')).new # Compress if config('spark.ruby.serializer.compress') serializer = Spark::Serializer.compressed(serializer) end # Bactching batch_size = default_batch_size if batch_size == 'auto' serializer = Spark::Serializer.auto_batched(serializer) else serializer = Spark::Serializer.batched(serializer, batch_size) end # Finally, "container" contains serializers serializer end |
#get_local_property(key) ⇒ Object Also known as: getLocalProperty
Get a local property set in this thread, or null if it is missing
110 111 112 |
# File 'lib/spark/context.rb', line 110 def get_local_property(key) jcontext.getLocalProperty(key) end |
#inspect ⇒ Object
41 42 43 44 45 |
# File 'lib/spark/context.rb', line 41 def inspect result = %{#<#{self.class.name}:0x#{object_id}\n} result << %{Tempdir: "#{temp_dir}">} result end |
#parallelize(data, num_slices = nil, serializer = nil) ⇒ Object
Distribute a local Ruby collection to form an RDD Direct method can be slow so be careful, this method update data inplace
Parameters:
- data
-
Range or Array
- num_slices
-
number of slice
- serializer
-
custom serializer (default: serializer based on configuration)
Examples:
$sc.parallelize(["1", "2", "3"]).map(lambda{|x| x.to_i}).collect
#=> [1, 2, 3]
$sc.parallelize(1..3).map(:to_s).collect
#=> ["1", "2", "3"]
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/spark/context.rb', line 207 def parallelize(data, num_slices=nil, serializer=nil) num_slices ||= default_parallelism serializer ||= default_serializer serializer.check_each(data) # Through file file = Tempfile.new('to_parallelize', temp_dir) serializer.dump_to_io(data, file) file.close # not unlink jrdd = RubyRDD.readRDDFromFile(jcontext, file.path, num_slices) Spark::RDD.new(jrdd, self, serializer) ensure file && file.unlink end |
#run_job(rdd, f, partitions = nil, allow_local = false) ⇒ Object Also known as: runJob
Executes the given partition function f on the specified set of partitions, returning the result as an array of elements.
If partitions is not specified, this will run over all partitions.
Example:
rdd = $sc.parallelize(0..10, 5)
$sc.run_job(rdd, lambda{|x| x.to_s}, [0,2])
# => ["[0, 1]", "[4, 5]"]
278 279 280 |
# File 'lib/spark/context.rb', line 278 def run_job(rdd, f, partitions=nil, allow_local=false) run_job_with_command(rdd, partitions, allow_local, Spark::Command::MapPartitions, f) end |
#run_job_with_command(rdd, partitions, allow_local, command, *args) ⇒ Object Also known as: runJobWithCommand
Execute the given command on specific set of partitions.
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/spark/context.rb', line 284 def run_job_with_command(rdd, partitions, allow_local, command, *args) if !partitions.nil? && !partitions.is_a?(Array) raise Spark::ContextError, 'Partitions must be nil or Array' end partitions_size = rdd.partitions_size # Execute all parts if partitions.nil? partitions = (0...partitions_size).to_a end # Can happend when you use coalesce partitions.delete_if {|part| part >= partitions_size} # Rjb represent Fixnum as Integer but Jruby as Long partitions = to_java_array_list(convert_to_java_int(partitions)) # File for result file = Tempfile.new('collect', temp_dir) mapped = rdd.new_rdd_from_command(command, *args) RubyRDD.runJob(rdd.context.sc, mapped.jrdd, partitions, allow_local, file.path) mapped.collect_from_file(file) end |
#sc ⇒ Object
53 54 55 |
# File 'lib/spark/context.rb', line 53 def sc @jcontext.sc end |
#set_call_site(site) ⇒ Object Also known as: setCallSite
Support function for API backtraces.
116 117 118 |
# File 'lib/spark/context.rb', line 116 def set_call_site(site) jcontext.setCallSite(site) end |
#set_local_property(key, value) ⇒ Object Also known as: setLocalProperty
Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
104 105 106 |
# File 'lib/spark/context.rb', line 104 def set_local_property(key, value) jcontext.setLocalProperty(key, value) end |
#stop ⇒ Object
47 48 49 50 51 |
# File 'lib/spark/context.rb', line 47 def stop Spark::Accumulator::Server.stop log_info('Ruby accumulator server was stopped') @jcontext.stop end |
#text_file(path, min_partitions = nil, encoding = Encoding::UTF_8, serializer = nil) ⇒ Object Also known as: textFile
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
Example:
f = Tempfile.new("test")
f.puts("1")
f.puts("2")
f.close
$sc.text_file(f.path).map(lambda{|x| x.to_i}).collect
# => [1, 2]
236 237 238 239 240 241 242 |
# File 'lib/spark/context.rb', line 236 def text_file(path, min_partitions=nil, encoding=Encoding::UTF_8, serializer=nil) min_partitions ||= default_parallelism serializer ||= default_serializer deserializer = Spark::Serializer.build { __text__(encoding) } Spark::RDD.new(@jcontext.textFile(path, min_partitions), self, serializer, deserializer) end |
#ui ⇒ Object
57 58 59 |
# File 'lib/spark/context.rb', line 57 def ui sc.ui end |
#whole_text_files(path, min_partitions = nil, serializer = nil) ⇒ Object Also known as: wholeTextFiles
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
Example:
dir = Dir.mktmpdir
f1 = Tempfile.new("test1", dir)
f2 = Tempfile.new("test2", dir)
f1.puts("1"); f1.puts("2");
f2.puts("3"); f2.puts("4");
f1.close
f2.close
$sc.whole_text_files(dir).flat_map(lambda{|key, value| value.split}).collect
# => ["1", "2", "3", "4"]
260 261 262 263 264 265 266 |
# File 'lib/spark/context.rb', line 260 def whole_text_files(path, min_partitions=nil, serializer=nil) min_partitions ||= default_parallelism serializer ||= default_serializer deserializer = Spark::Serializer.build{ __pair__(__text__, __text__) } Spark::RDD.new(@jcontext.wholeTextFiles(path, min_partitions), self, serializer, deserializer) end |