Class: Spark::RDD
- Inherits:
-
Object
- Object
- Spark::RDD
- Extended by:
- Forwardable
- Includes:
- Helper::Logger, Helper::Parser, Helper::Statistic
- Defined in:
- lib/spark/rdd.rb
Overview
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as ‘map`, `filter`, and `persist`.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#command ⇒ Object
readonly
Returns the value of attribute command.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#jrdd ⇒ Object
readonly
Returns the value of attribute jrdd.
Instance Method Summary collapse
-
#+(other) ⇒ Object
Operators.
-
#add_command(klass, *args) ⇒ Object
Commad and serializer.
-
#add_library(*libraries) ⇒ Object
(also: #addLibrary, #require)
Add ruby library Libraries will be included before computing.
-
#aggregate(zero_value, seq_op, comb_op) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”.
-
#aggregate_by_key(zero_value, seq_func, comb_func, num_partitions = nil) ⇒ Object
(also: #aggregateByKey)
Aggregate the values of each key, using given combine functions and a neutral zero value.
-
#bind(objects) ⇒ Object
Bind object to RDD.
-
#cache ⇒ Object
Persist this RDD with the default storage level MEMORY_ONLY_SER because of serialization.
- #cached? ⇒ Boolean
-
#cartesian(other) ⇒ Object
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements ‘(a, b)` where `a` is in `self` and `b` is in `other`.
- #checkpointed? ⇒ Boolean
-
#coalesce(num_partitions) ⇒ Object
Return a new RDD that is reduced into num_partitions partitions.
-
#cogroup(*others) ⇒ Object
For each key k in ‘this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.
-
#collect(as_enum = false) ⇒ Object
Return an array that contains all of the elements in this RDD.
-
#collect_as_hash ⇒ Object
Convert an Array to Hash.
- #collect_from_file(file, as_enum = false) ⇒ Object
-
#combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions = nil) ⇒ Object
(also: #combineByKey)
Generic function to combine the elements for each key using a custom set of aggregation functions.
-
#compact ⇒ Object
Return a new RDD containing non-nil elements.
-
#config ⇒ Object
Variables and non-computing functions.
-
#count ⇒ Object
Return the number of values in this RDD.
- #default_reduce_partitions ⇒ Object (also: #defaultReducePartitions)
-
#distinct ⇒ Object
Return a new RDD containing the distinct elements in this RDD.
-
#filter(f) ⇒ Object
Return a new RDD containing only the elements that satisfy a predicate.
-
#first ⇒ Object
Return the first element in this RDD.
-
#flat_map(f) ⇒ Object
(also: #flatMap)
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
-
#flat_map_values(f) ⇒ Object
Pass each value in the key-value pair RDD through a flat_map function without changing the keys; this also retains the original RDD’s partitioning.
-
#fold(zero_value, f) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.
-
#fold_by_key(zero_value, f, num_partitions = nil) ⇒ Object
(also: #foldByKey)
Merge the values for each key using an associative function f and a neutral ‘zero_value` which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).
-
#foreach(f, options = {}) ⇒ Object
Applies a function f to all elements of this RDD.
-
#foreach_partition(f, options = {}) ⇒ Object
(also: #foreachPartition)
Applies a function f to each partition of this RDD.
-
#glom ⇒ Object
Return an RDD created by coalescing all elements within each partition into an array.
-
#group_by(f, num_partitions = nil) ⇒ Object
(also: #groupBy)
Return an RDD of grouped items.
-
#group_by_key(num_partitions = nil) ⇒ Object
(also: #groupByKey)
Group the values for each key in the RDD into a single sequence.
-
#group_with(other, num_partitions = nil) ⇒ Object
(also: #groupWith)
The same functionality as cogroup but this can grouped only 2 rdd’s and you can change num_partitions.
-
#histogram(buckets) ⇒ Object
Compute a histogram using the provided buckets.
-
#id ⇒ Object
A unique ID for this RDD (within its SparkContext).
-
#initialize(jrdd, context, serializer, deserializer = nil) ⇒ RDD
constructor
Initializing RDD, this method is root of all Pipelined RDD - its unique If you call some operations on this class it will be computed in Java.
- #inspect ⇒ Object
-
#intersection(other) ⇒ Object
Return the intersection of this RDD and another one.
-
#key_by(f) ⇒ Object
(also: #keyBy)
Creates array of the elements in this RDD by applying function f.
-
#keys ⇒ Object
Return an RDD with the first element of PairRDD.
-
#lookup(key) ⇒ Object
Return the list of values in the RDD for key ‘key`.
-
#map(f) ⇒ Object
Return a new RDD by applying a function to all elements of this RDD.
-
#map_partitions(f) ⇒ Object
(also: #mapPartitions)
Return a new RDD by applying a function to each partition of this RDD.
-
#map_partitions_with_index(f, options = {}) ⇒ Object
(also: #mapPartitionsWithIndex)
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
-
#map_values(f) ⇒ Object
(also: #mapValues)
Pass each value in the key-value pair RDD through a map function without changing the keys.
-
#max ⇒ Object
Return the max of this RDD.
-
#mean ⇒ Object
Compute the mean of this RDD’s elements.
-
#min ⇒ Object
Return the min of this RDD.
-
#name ⇒ Object
Return the name of this RDD.
- #name=(value) ⇒ Object
- #new_rdd_from_command(klass, *args) ⇒ Object
-
#partition_by(num_partitions, partition_func = nil) ⇒ Object
(also: #partitionBy)
Return a copy of the RDD partitioned using the specified partitioner.
-
#partitions_size ⇒ Object
(also: #partitionsSize)
Count of ParallelCollectionPartition.
-
#persist(new_level) ⇒ Object
Set this RDD’s storage level to persist its values across operations after the first time it is computed.
-
#pipe(*cmds) ⇒ Object
Return an RDD created by piping elements to a forked external process.
-
#reduce(f) ⇒ Object
Reduces the elements of this RDD using the specified lambda or method.
-
#reduce_by_key(f, num_partitions = nil) ⇒ Object
(also: #reduceByKey)
Merge the values for each key using an associative reduce function.
-
#reserialize(new_serializer) ⇒ Object
Return a new RDD with different serializer.
-
#sample(with_replacement, fraction, seed = nil) ⇒ Object
Return a sampled subset of this RDD.
-
#sample_stdev ⇒ Object
(also: #sampleStdev)
Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).
-
#sample_variance ⇒ Object
(also: #sampleVariance)
Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).
-
#set_name(value) ⇒ Object
(also: #setName)
Assign a name to this RDD.
-
#shuffle(seed = nil) ⇒ Object
Return a shuffled RDD.
-
#sort_by(key_function = nil, ascending = true, num_partitions = nil) ⇒ Object
(also: #sortBy)
Sorts this RDD by the given key_function.
-
#sort_by_key(ascending = true, num_partitions = nil) ⇒ Object
(also: #sortByKey)
Sort the RDD by key.
-
#sort_by_value(ascending = true, num_partitions = nil) ⇒ Object
Sort the RDD by value.
-
#stats ⇒ Object
Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.
-
#stdev ⇒ Object
Compute the standard deviation of this RDD’s elements.
-
#subtract(other, num_partitions = nil) ⇒ Object
Return an RDD with the elements from self that are not in other.
-
#subtract_by_key(other, num_partitions = nil) ⇒ Object
(also: #subtractByKey)
Return each (key, value) pair in self RDD that has no pair with matching key in other RDD.
-
#sum ⇒ Object
Return the sum of this RDD.
-
#take(count) ⇒ Object
Take the first num elements of the RDD.
-
#take_sample(with_replacement, num, seed = nil) ⇒ Object
(also: #takeSample)
Return a fixed-size sampled subset of this RDD in an array.
- #to_java ⇒ Object
-
#union(other) ⇒ Object
Return the union of this RDD and another one.
-
#unpersist(blocking = true) ⇒ Object
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
-
#values ⇒ Object
Return an RDD with the second element of PairRDD.
-
#variance ⇒ Object
Compute the variance of this RDD’s elements.
Methods included from Helper::Statistic
#bisect_right, #compute_fraction, #determine_bounds, #upper_binomial_bound, #upper_poisson_bound
Methods included from Helper::Parser
Methods included from Helper::Logger
Constructor Details
#initialize(jrdd, context, serializer, deserializer = nil) ⇒ RDD
Initializing RDD, this method is root of all Pipelined RDD - its unique If you call some operations on this class it will be computed in Java
Parameters:
- jrdd
-
org.apache.spark.api.java.JavaRDD
- context
- serializer
27 28 29 30 31 32 33 34 35 |
# File 'lib/spark/rdd.rb', line 27 def initialize(jrdd, context, serializer, deserializer=nil) @jrdd = jrdd @context = context @cached = false @checkpointed = false @command = Spark::CommandBuilder.new(serializer, deserializer) end |
Instance Attribute Details
#command ⇒ Object (readonly)
Returns the value of attribute command.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def command @command end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def context @context end |
#jrdd ⇒ Object (readonly)
Returns the value of attribute jrdd.
11 12 13 |
# File 'lib/spark/rdd.rb', line 11 def jrdd @jrdd end |
Instance Method Details
#+(other) ⇒ Object
Operators
54 55 56 |
# File 'lib/spark/rdd.rb', line 54 def +(other) self.union(other) end |
#add_command(klass, *args) ⇒ Object
Commad and serializer
62 63 64 |
# File 'lib/spark/rdd.rb', line 62 def add_command(klass, *args) @command.deep_copy.add_command(klass, *args) end |
#add_library(*libraries) ⇒ Object Also known as: addLibrary, require
Add ruby library Libraries will be included before computing
Example:
rdd.add_library('pry').add_library('nio4r', 'distribution')
72 73 74 75 |
# File 'lib/spark/rdd.rb', line 72 def add_library(*libraries) @command.add_library(*libraries) self end |
#aggregate(zero_value, seq_op, comb_op) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”.
This function can return a different result type. We need one operation for merging.
Result must be an Array otherwise Serializer Array’s zero value will be send as multiple values and not just one.
Example:
# 1 2 3 4 5 => 15 + 1 = 16
# 6 7 8 9 10 => 40 + 1 = 41
# 16 * 41 = 656
seq = lambda{|x,y| x+y}
com = lambda{|x,y| x*y}
rdd = $sc.parallelize(1..10, 2)
rdd.aggregate(1, seq, com)
# => 656
342 343 344 |
# File 'lib/spark/rdd.rb', line 342 def aggregate(zero_value, seq_op, comb_op) _reduce(Spark::Command::Aggregate, seq_op, comb_op, zero_value) end |
#aggregate_by_key(zero_value, seq_func, comb_func, num_partitions = nil) ⇒ Object Also known as: aggregateByKey
Aggregate the values of each key, using given combine functions and a neutral zero value.
Example:
def combine(x,y)
x+y
end
def merge(x,y)
x*y
end
rdd = $sc.parallelize([["a", 1], ["b", 2], ["a", 3], ["a", 4], ["c", 5]], 2)
rdd.aggregate_by_key(1, method(:combine), method(:merge))
# => [["b", 3], ["a", 16], ["c", 6]]
1026 1027 1028 1029 1030 1031 1032 |
# File 'lib/spark/rdd.rb', line 1026 def aggregate_by_key(zero_value, seq_func, comb_func, num_partitions=nil) _combine_by_key( [Spark::Command::CombineByKey::CombineWithZero, zero_value, seq_func], [Spark::Command::CombineByKey::Merge, comb_func], num_partitions ) end |
#bind(objects) ⇒ Object
Bind object to RDD
Example:
text = "test"
rdd = $sc.parallelize(0..5)
rdd = rdd.map(lambda{|x| x.to_s + " " + text})
rdd = rdd.bind(text: text)
rdd.collect
# => ["0 test", "1 test", "2 test", "3 test", "4 test", "5 test"]
89 90 91 92 93 94 95 96 |
# File 'lib/spark/rdd.rb', line 89 def bind(objects) unless objects.is_a?(Hash) raise ArgumentError, 'Argument must be a Hash.' end @command.bind(objects) self end |
#cache ⇒ Object
Persist this RDD with the default storage level MEMORY_ONLY_SER because of serialization.
126 127 128 |
# File 'lib/spark/rdd.rb', line 126 def cache persist('memory_only_ser') end |
#cached? ⇒ Boolean
153 154 155 |
# File 'lib/spark/rdd.rb', line 153 def cached? @cached end |
#cartesian(other) ⇒ Object
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements ‘(a, b)` where `a` is in `self` and `b` is in `other`.
Example:
rdd1 = $sc.parallelize([1,2,3])
rdd2 = $sc.parallelize([4,5,6])
rdd1.cartesian(rdd2).collect
# => [[1, 4], [1, 5], [1, 6], [2, 4], [2, 5], [2, 6], [3, 4], [3, 5], [3, 6]]
705 706 707 708 709 710 |
# File 'lib/spark/rdd.rb', line 705 def cartesian(other) _deserializer = Spark::Serializer::Cartesian.new(self.deserializer, other.deserializer) new_jrdd = jrdd.cartesian(other.jrdd) RDD.new(new_jrdd, context, serializer, _deserializer) end |
#checkpointed? ⇒ Boolean
157 158 159 |
# File 'lib/spark/rdd.rb', line 157 def checkpointed? @checkpointed end |
#coalesce(num_partitions) ⇒ Object
Return a new RDD that is reduced into num_partitions partitions.
Example:
rdd = $sc.parallelize(0..10, 3)
rdd.coalesce(2).glom.collect
# => [[0, 1, 2], [3, 4, 5, 6, 7, 8, 9, 10]]
683 684 685 686 687 688 689 690 691 692 |
# File 'lib/spark/rdd.rb', line 683 def coalesce(num_partitions) if self.is_a?(PipelinedRDD) deser = @command.serializer else deser = @command.deserializer end new_jrdd = jrdd.coalesce(num_partitions) RDD.new(new_jrdd, context, @command.serializer, deser) end |
#cogroup(*others) ⇒ Object
For each key k in ‘this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd2 = $sc.parallelize([["a", 4], ["a", 5], ["b", 6]])
rdd3 = $sc.parallelize([["a", 7], ["a", 8], ["b", 9]])
rdd1.cogroup(rdd2, rdd3).collect
# => [["a", [1, 2, 4, 5, 7, 8]], ["b", [3, 6, 9]]]
1057 1058 1059 1060 1061 1062 1063 1064 |
# File 'lib/spark/rdd.rb', line 1057 def cogroup(*others) unioned = self others.each do |other| unioned = unioned.union(other) end unioned.group_by_key end |
#collect(as_enum = false) ⇒ Object
Return an array that contains all of the elements in this RDD. RJB raise an error if stage is killed.
199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/spark/rdd.rb', line 199 def collect(as_enum=false) file = Tempfile.new('collect', context.temp_dir) context.set_call_site(caller.first) RubyRDD.writeRDDToFile(jrdd.rdd, file.path) collect_from_file(file, as_enum) rescue => e raise Spark::RDDError, e. ensure context.clear_call_site end |
#collect_as_hash ⇒ Object
Convert an Array to Hash
232 233 234 |
# File 'lib/spark/rdd.rb', line 232 def collect_as_hash Hash[collect] end |
#collect_from_file(file, as_enum = false) ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/spark/rdd.rb', line 212 def collect_from_file(file, as_enum=false) if self.is_a?(PipelinedRDD) klass = @command.serializer else klass = @command.deserializer end if as_enum result = klass.load_from_file(file) else result = klass.load_from_io(file).to_a file.close file.unlink end result end |
#combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions = nil) ⇒ Object Also known as: combineByKey
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a “combined type” C * Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List). Users provide three functions:
Parameters:
- create_combiner
-
which turns a V into a C (e.g., creates a one-element list)
- merge_value
-
to merge a V into a C (e.g., adds it to the end of a list)
- merge_combiners
-
to combine two C’s into a single one.
Example:
def combiner(x)
x
end
def merge(x,y)
x+y
end
rdd = $sc.parallelize(["a","b","c","a","b","c","a","c"], 2).map(lambda{|x| [x, 1]})
rdd.combine_by_key(method(:combiner), method(:merge), method(:merge)).collect_as_hash
# => {"a"=>3, "b"=>2, "c"=>3}
959 960 961 962 963 964 965 |
# File 'lib/spark/rdd.rb', line 959 def combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions=nil) _combine_by_key( [Spark::Command::CombineByKey::Combine, create_combiner, merge_value], [Spark::Command::CombineByKey::Merge, merge_combiners], num_partitions ) end |
#compact ⇒ Object
Return a new RDD containing non-nil elements.
Example:
rdd = $sc.parallelize([1, nil, 2, nil, 3])
rdd.compact.collect
# => [1, 2, 3]
661 662 663 |
# File 'lib/spark/rdd.rb', line 661 def compact new_rdd_from_command(Spark::Command::Compact) end |
#config ⇒ Object
Variables and non-computing functions
107 108 109 |
# File 'lib/spark/rdd.rb', line 107 def config @context.config end |
#count ⇒ Object
Return the number of values in this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.count
# => 11
386 387 388 389 390 |
# File 'lib/spark/rdd.rb', line 386 def count # nil is for seq_op => it means the all result go directly to one worker for combine @count ||= self.map_partitions('lambda{|iterator| iterator.to_a.size }') .aggregate(0, nil, 'lambda{|sum, item| sum + item }') end |
#default_reduce_partitions ⇒ Object Also known as: defaultReducePartitions
111 112 113 |
# File 'lib/spark/rdd.rb', line 111 def default_reduce_partitions config['spark.default.parallelism'] || partitions_size end |
#distinct ⇒ Object
Return a new RDD containing the distinct elements in this RDD. Ordering is not preserved because of reducing
Example:
rdd = $sc.parallelize([1,1,1,2,3])
rdd.distinct.collect
# => [1, 2, 3]
720 721 722 723 724 |
# File 'lib/spark/rdd.rb', line 720 def distinct self.map('lambda{|x| [x, nil]}') .reduce_by_key('lambda{|x,_| x}') .map('lambda{|x| x[0]}') end |
#filter(f) ⇒ Object
Return a new RDD containing only the elements that satisfy a predicate.
Example:
rdd = $sc.parallelize(0..10)
rdd.filter(lambda{|x| x.even?}).collect
# => [0, 2, 4, 6, 8, 10]
650 651 652 |
# File 'lib/spark/rdd.rb', line 650 def filter(f) new_rdd_from_command(Spark::Command::Filter, f) end |
#first ⇒ Object
Return the first element in this RDD.
Example:
rdd = $sc.parallelize(0..100)
rdd.first
# => 0
290 291 292 |
# File 'lib/spark/rdd.rb', line 290 def first self.take(1)[0] end |
#flat_map(f) ⇒ Object Also known as: flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
Example:
rdd = $sc.parallelize(0..5)
rdd.flat_map(lambda {|x| [x, 1]}).collect
# => [0, 1, 1, 1, 2, 1, 3, 1, 4, 1, 5, 1]
616 617 618 |
# File 'lib/spark/rdd.rb', line 616 def flat_map(f) new_rdd_from_command(Spark::Command::FlatMap, f) end |
#flat_map_values(f) ⇒ Object
Pass each value in the key-value pair RDD through a flat_map function without changing the keys; this also retains the original RDD’s partitioning.
Example:
rdd = $sc.parallelize([["a", [1,2]], ["b", [3]]])
rdd = rdd.flat_map_values(lambda{|x| x*2})
rdd.collect
# => [["a", 1], ["a", 2], ["a", 1], ["a", 2], ["b", 3], ["b", 3]]
1218 1219 1220 |
# File 'lib/spark/rdd.rb', line 1218 def flat_map_values(f) new_rdd_from_command(Spark::Command::FlatMapValues, f) end |
#fold(zero_value, f) ⇒ Object
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.
The function f(x, y) is allowed to modify x and return it as its result value to avoid object allocation; however, it should not modify y.
Be careful, zero_values is applied to all stages. See example.
Example:
rdd = $sc.parallelize(0..10, 2)
rdd.fold(1, lambda{|sum, x| sum+x})
# => 58
318 319 320 |
# File 'lib/spark/rdd.rb', line 318 def fold(zero_value, f) self.aggregate(zero_value, f, f) end |
#fold_by_key(zero_value, f, num_partitions = nil) ⇒ Object Also known as: foldByKey
Merge the values for each key using an associative function f and a neutral ‘zero_value` which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).
Example:
rdd = $sc.parallelize([["a", 1], ["b", 2], ["a", 3], ["a", 4], ["c", 5]])
rdd.fold_by_key(1, lambda{|x,y| x+y})
# => [["a", 9], ["c", 6], ["b", 3]]
1007 1008 1009 |
# File 'lib/spark/rdd.rb', line 1007 def fold_by_key(zero_value, f, num_partitions=nil) self.aggregate_by_key(zero_value, f, f, num_partitions) end |
#foreach(f, options = {}) ⇒ Object
Applies a function f to all elements of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.foreach(lambda{|x| puts x})
# => nil
576 577 578 579 |
# File 'lib/spark/rdd.rb', line 576 def foreach(f, ={}) new_rdd_from_command(Spark::Command::Foreach, f).collect nil end |
#foreach_partition(f, options = {}) ⇒ Object Also known as: foreachPartition
Applies a function f to each partition of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.foreachPartition(lambda{|x| puts x.to_s})
# => nil
588 589 590 591 |
# File 'lib/spark/rdd.rb', line 588 def foreach_partition(f, ={}) new_rdd_from_command(Spark::Command::ForeachPartition, f).collect nil end |
#glom ⇒ Object
Return an RDD created by coalescing all elements within each partition into an array.
Example:
rdd = $sc.parallelize(0..10, 3)
rdd.glom.collect
# => [[0, 1, 2], [3, 4, 5, 6], [7, 8, 9, 10]]
672 673 674 |
# File 'lib/spark/rdd.rb', line 672 def glom new_rdd_from_command(Spark::Command::Glom) end |
#group_by(f, num_partitions = nil) ⇒ Object Also known as: groupBy
Return an RDD of grouped items.
Example:
rdd = $sc.parallelize(0..5)
rdd.group_by(lambda{|x| x%2}).collect
# => [[0, [0, 2, 4]], [1, [1, 3, 5]]]
974 975 976 |
# File 'lib/spark/rdd.rb', line 974 def group_by(f, num_partitions=nil) self.key_by(f).group_by_key(num_partitions) end |
#group_by_key(num_partitions = nil) ⇒ Object Also known as: groupByKey
Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduce_by_key or combine_by_key will provide much better performance.
Example:
rdd = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd.group_by_key.collect
# => [["a", [1, 2]], ["b", [3]]]
989 990 991 992 993 994 995 |
# File 'lib/spark/rdd.rb', line 989 def group_by_key(num_partitions=nil) create_combiner = 'lambda{|item| [item]}' merge_value = 'lambda{|combiner, item| combiner << item; combiner}' merge_combiners = 'lambda{|combiner_1, combiner_2| combiner_1 += combiner_2; combiner_1}' combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions) end |
#group_with(other, num_partitions = nil) ⇒ Object Also known as: groupWith
The same functionality as cogroup but this can grouped only 2 rdd’s and you can change num_partitions.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3]])
rdd2 = $sc.parallelize([["a", 4], ["a", 5], ["b", 6]])
rdd1.group_with(rdd2).collect
# => [["a", [1, 2, 4, 5]], ["b", [3, 6]]]
1043 1044 1045 |
# File 'lib/spark/rdd.rb', line 1043 def group_with(other, num_partitions=nil) self.union(other).group_by_key(num_partitions) end |
#histogram(buckets) ⇒ Object
Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1.
If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per element(where n = # buckets).
Buckets must be sorted and not contain any duplicates, must be at least two elements.
Examples:
rdd = $sc.parallelize(0..50)
rdd.histogram(2)
# => [[0.0, 25.0, 50], [25, 26]]
rdd.histogram([0, 5, 25, 50])
# => [[0, 5, 25, 50], [5, 20, 26]]
rdd.histogram([0, 15, 30, 45, 60])
# => [[0, 15, 30, 45, 60], [15, 15, 15, 6]]
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 |
# File 'lib/spark/rdd.rb', line 476 def histogram(buckets) # ----------------------------------------------------------------------- # Integer # if buckets.is_a?(Integer) # Validation if buckets < 1 raise ArgumentError, "Bucket count must be >= 1, #{buckets} inserted." end # Filter invalid values # Nil and NaN func = 'lambda{|x| if x.nil? || (x.is_a?(Float) && x.nan?) false else true end }' filtered = self.filter(func) # Compute the minimum and the maximum func = 'lambda{|memo, item| [memo[0] < item[0] ? memo[0] : item[0], memo[1] > item[1] ? memo[1] : item[1]] }' min, max = filtered.map('lambda{|x| [x, x]}').reduce(func) # Min, max must be valid numbers if (min.is_a?(Float) && !min.finite?) || (max.is_a?(Float) && !max.finite?) raise Spark::RDDError, 'Histogram on either an empty RDD or RDD containing +/-infinity or NaN' end # Already finished if min == max || buckets == 1 return [min, max], [filtered.count] end # Custom range begin span = max - min # increment buckets = (0...buckets).map do |x| min + (x * span) / buckets.to_f end buckets << max rescue NoMethodError raise Spark::RDDError, 'Can not generate buckets with non-number in RDD' end even = true # ----------------------------------------------------------------------- # Array # elsif buckets.is_a?(Array) if buckets.size < 2 raise ArgumentError, 'Buckets should have more than one value.' end if buckets.detect{|x| x.nil? || (x.is_a?(Float) && x.nan?)} raise ArgumentError, 'Can not have nil or nan numbers in buckets.' end if buckets.detect{|x| buckets.count(x) > 1} raise ArgumentError, 'Buckets should not contain duplicated values.' end if buckets.sort != buckets raise ArgumentError, 'Buckets must be sorted.' end even = false # ----------------------------------------------------------------------- # Other # else raise Spark::RDDError, 'Buckets should be number or array.' end reduce_func = 'lambda{|memo, item| memo.size.times do |i| memo[i] += item[i] end memo }' return buckets, new_rdd_from_command(Spark::Command::Histogram, even, buckets).reduce(reduce_func) end |
#id ⇒ Object
A unique ID for this RDD (within its SparkContext).
121 122 123 |
# File 'lib/spark/rdd.rb', line 121 def id jrdd.id end |
#inspect ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/spark/rdd.rb', line 37 def inspect comms = @command.commands.join(' -> ') result = %{#<#{self.class.name}:0x#{object_id}} result << %{ (#{comms})} unless comms.empty? result << %{ (cached)} if cached? result << %{\n} result << %{ Serializer: "#{serializer}"\n} result << %{Deserializer: "#{deserializer}"} result << %{>} result end |
#intersection(other) ⇒ Object
Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.
Example:
rdd1 = $sc.parallelize([1,2,3,4,5])
rdd2 = $sc.parallelize([1,4,5,6,7])
rdd1.intersection(rdd2).collect
# => [1, 4, 5]
785 786 787 788 789 790 791 792 793 |
# File 'lib/spark/rdd.rb', line 785 def intersection(other) mapping_function = 'lambda{|item| [item, nil]}' filter_function = 'lambda{|(key, values)| values.size > 1}' self.map(mapping_function) .cogroup(other.map(mapping_function)) .filter(filter_function) .keys end |
#key_by(f) ⇒ Object Also known as: keyBy
Creates array of the elements in this RDD by applying function f.
Example:
rdd = $sc.parallelize(0..5)
rdd.key_by(lambda{|x| x%2}).collect
# => [[0, 0], [1, 1], [0, 2], [1, 3], [0, 4], [1, 5]]
1190 1191 1192 |
# File 'lib/spark/rdd.rb', line 1190 def key_by(f) new_rdd_from_command(Spark::Command::KeyBy, f) end |
#keys ⇒ Object
Return an RDD with the first element of PairRDD
Example:
rdd = $sc.parallelize([[1,2], [3,4], [5,6]])
rdd.keys.collect
# => [1, 3, 5]
1229 1230 1231 |
# File 'lib/spark/rdd.rb', line 1229 def keys self.map('lambda{|(key, _)| key}') end |
#lookup(key) ⇒ Object
Return the list of values in the RDD for key ‘key`. TODO: add Partitioner for efficiently searching
Example:
rdd = $sc.parallelize(0..10)
rdd = rdd.group_by(lambda {|x| x%3})
rdd.lookup(2)
# => [[2, 5, 8]]
rdd = $sc.parallelize(0..10)
rdd = rdd.key_by(lambda{|x| x.even?})
rdd.lookup(true)
# => [0, 2, 4, 6, 8, 10]
1258 1259 1260 1261 1262 1263 1264 1265 |
# File 'lib/spark/rdd.rb', line 1258 def lookup(key) lookup_key = "lookup_key_#{object_id}" self.filter("lambda{|(key, _)| key == #{lookup_key}}") .bind(lookup_key => key) .values .collect end |
#map(f) ⇒ Object
Return a new RDD by applying a function to all elements of this RDD.
Example:
rdd = $sc.parallelize(0..5)
rdd.map(lambda {|x| x*2}).collect
# => [0, 2, 4, 6, 8, 10]
604 605 606 |
# File 'lib/spark/rdd.rb', line 604 def map(f) new_rdd_from_command(Spark::Command::Map, f) end |
#map_partitions(f) ⇒ Object Also known as: mapPartitions
Return a new RDD by applying a function to each partition of this RDD.
Example:
rdd = $sc.parallelize(0..10, 2)
rdd.map_partitions(lambda{|part| part.reduce(:+)}).collect
# => [15, 40]
627 628 629 |
# File 'lib/spark/rdd.rb', line 627 def map_partitions(f) new_rdd_from_command(Spark::Command::MapPartitions, f) end |
#map_partitions_with_index(f, options = {}) ⇒ Object Also known as: mapPartitionsWithIndex
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
Example:
rdd = $sc.parallelize(0...4, 4)
rdd.map_partitions_with_index(lambda{|part, index| part.first * index}).collect
# => [0, 1, 4, 9]
639 640 641 |
# File 'lib/spark/rdd.rb', line 639 def map_partitions_with_index(f, ={}) new_rdd_from_command(Spark::Command::MapPartitionsWithIndex, f) end |
#map_values(f) ⇒ Object Also known as: mapValues
Pass each value in the key-value pair RDD through a map function without changing the keys. This also retains the original RDD’s partitioning.
Example:
rdd = $sc.parallelize(["ruby", "scala", "java"])
rdd = rdd.map(lambda{|x| [x, x]})
rdd = rdd.map_values(lambda{|x| x.upcase})
rdd.collect
# => [["ruby", "RUBY"], ["scala", "SCALA"], ["java", "JAVA"]]
1204 1205 1206 |
# File 'lib/spark/rdd.rb', line 1204 def map_values(f) new_rdd_from_command(Spark::Command::MapValues, f) end |
#max ⇒ Object
Return the max of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.max
# => 10
353 354 355 |
# File 'lib/spark/rdd.rb', line 353 def max self.reduce('lambda{|memo, item| memo > item ? memo : item }') end |
#mean ⇒ Object
Compute the mean of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).mean
# => 2.0
404 405 406 |
# File 'lib/spark/rdd.rb', line 404 def mean stats.mean end |
#min ⇒ Object
Return the min of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.min
# => 0
364 365 366 |
# File 'lib/spark/rdd.rb', line 364 def min self.reduce('lambda{|memo, item| memo < item ? memo : item }') end |
#name ⇒ Object
Return the name of this RDD.
163 164 165 166 |
# File 'lib/spark/rdd.rb', line 163 def name _name = jrdd.name _name && _name.encode(Encoding::UTF_8) end |
#name=(value) ⇒ Object
175 176 177 |
# File 'lib/spark/rdd.rb', line 175 def name=(value) set_name(value) end |
#new_rdd_from_command(klass, *args) ⇒ Object
98 99 100 101 |
# File 'lib/spark/rdd.rb', line 98 def new_rdd_from_command(klass, *args) comm = add_command(klass, *args) PipelinedRDD.new(self, comm) end |
#partition_by(num_partitions, partition_func = nil) ⇒ Object Also known as: partitionBy
Return a copy of the RDD partitioned using the specified partitioner.
Example:
rdd = $sc.parallelize(["1","2","3","4","5"]).map(lambda {|x| [x, 1]})
rdd.partitionBy(2).glom.collect
# => [[["3", 1], ["4", 1]], [["1", 1], ["2", 1], ["5", 1]]]
802 803 804 805 806 807 |
# File 'lib/spark/rdd.rb', line 802 def partition_by(num_partitions, partition_func=nil) num_partitions ||= default_reduce_partitions partition_func ||= 'lambda{|x| Spark::Digest.portable_hash(x.to_s)}' _partition_by(num_partitions, Spark::Command::PartitionBy::Basic, partition_func) end |
#partitions_size ⇒ Object Also known as: partitionsSize
Count of ParallelCollectionPartition
116 117 118 |
# File 'lib/spark/rdd.rb', line 116 def partitions_size jrdd.rdd.partitions.size end |
#persist(new_level) ⇒ Object
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.
See StorageLevel for type of new_level
136 137 138 139 140 |
# File 'lib/spark/rdd.rb', line 136 def persist(new_level) @cached = true jrdd.persist(Spark::StorageLevel.java_get(new_level)) self end |
#pipe(*cmds) ⇒ Object
Return an RDD created by piping elements to a forked external process.
Cmds:
cmd = [env,] command... [,options]
env: hash
name => val : set the environment variable
name => nil : unset the environment variable
command...:
commandline : command line string which is passed to the standard shell
cmdname, arg1, ... : command name and one or more arguments (This form does
not use the shell. See below for caveats.)
[cmdname, argv0], arg1, ... : command name, argv[0] and zero or more arguments (no shell)
options: hash
See http://ruby-doc.org/core-2.2.0/Process.html#method-c-spawn
Examples:
$sc.parallelize(0..5).pipe('cat').collect
# => ["0", "1", "2", "3", "4", "5"]
rdd = $sc.parallelize(0..5)
rdd = rdd.pipe('cat', "awk '{print $1*10}'")
rdd = rdd.map(lambda{|x| x.to_i + 1})
rdd.collect
# => [1, 11, 21, 31, 41, 51]
913 914 915 |
# File 'lib/spark/rdd.rb', line 913 def pipe(*cmds) new_rdd_from_command(Spark::Command::Pipe, cmds) end |
#reduce(f) ⇒ Object
Reduces the elements of this RDD using the specified lambda or method.
Example:
rdd = $sc.parallelize(0..10)
rdd.reduce(lambda{|sum, x| sum+x})
# => 55
301 302 303 |
# File 'lib/spark/rdd.rb', line 301 def reduce(f) _reduce(Spark::Command::Reduce, f, f) end |
#reduce_by_key(f, num_partitions = nil) ⇒ Object Also known as: reduceByKey
Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with the existing partitioner/ parallelism level.
Example:
rdd = $sc.parallelize(["a","b","c","a","b","c","a","c"]).map(lambda{|x| [x, 1]})
rdd.reduce_by_key(lambda{|x,y| x+y}).collect_as_hash
# => {"a"=>3, "b"=>2, "c"=>3}
931 932 933 |
# File 'lib/spark/rdd.rb', line 931 def reduce_by_key(f, num_partitions=nil) combine_by_key('lambda {|x| x}', f, f, num_partitions) end |
#reserialize(new_serializer) ⇒ Object
Return a new RDD with different serializer. This method is useful during union and join operations.
Example:
rdd = $sc.parallelize([1, 2, 3], nil, serializer: "marshal")
rdd = rdd.map(lambda{|x| x.to_s})
rdd.reserialize("oj").collect
# => ["1", "2", "3"]
765 766 767 768 769 770 771 772 773 774 |
# File 'lib/spark/rdd.rb', line 765 def reserialize(new_serializer) if serializer == new_serializer return self end new_command = @command.deep_copy new_command.serializer = new_serializer PipelinedRDD.new(self, new_command) end |
#sample(with_replacement, fraction, seed = nil) ⇒ Object
Return a sampled subset of this RDD. Operations are base on Poisson and Uniform distributions. TODO: Replace Unfirom for Bernoulli
Examples:
rdd = $sc.parallelize(0..100)
rdd.sample(true, 10).collect
# => [17, 17, 22, 23, 51, 52, 62, 64, 69, 70, 96]
rdd.sample(false, 0.1).collect
# => [3, 5, 9, 32, 44, 55, 66, 68, 75, 80, 86, 91, 98]
822 823 824 |
# File 'lib/spark/rdd.rb', line 822 def sample(with_replacement, fraction, seed=nil) new_rdd_from_command(Spark::Command::Sample, with_replacement, fraction, seed) end |
#sample_stdev ⇒ Object Also known as: sampleStdev
Compute the sample standard deviation of this RDD’s elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N).
Example:
$sc.parallelize([1, 2, 3]).sample_stdev
# => 1.0
436 437 438 |
# File 'lib/spark/rdd.rb', line 436 def sample_stdev stats.sample_stdev end |
#sample_variance ⇒ Object Also known as: sampleVariance
Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).
Example:
$sc.parallelize([1, 2, 3]).sample_variance
# => 1.0
447 448 449 |
# File 'lib/spark/rdd.rb', line 447 def sample_variance stats.sample_variance end |
#set_name(value) ⇒ Object Also known as: setName
Assign a name to this RDD.
170 171 172 173 |
# File 'lib/spark/rdd.rb', line 170 def set_name(value) jrdd.setName(value) value end |
#shuffle(seed = nil) ⇒ Object
Return a shuffled RDD.
Example:
rdd = $sc.parallelize(0..10)
rdd.shuffle.collect
# => [3, 10, 6, 7, 8, 0, 4, 2, 9, 1, 5]
733 734 735 736 737 |
# File 'lib/spark/rdd.rb', line 733 def shuffle(seed=nil) seed ||= Random.new_seed new_rdd_from_command(Spark::Command::Shuffle, seed) end |
#sort_by(key_function = nil, ascending = true, num_partitions = nil) ⇒ Object Also known as: sortBy
Sorts this RDD by the given key_function
This is a different implementation than spark. Sort by doesn’t use key_by method first. It can be slower but take less memory and you can always use map.sort_by_key
Example:
rdd = $sc.parallelize(["aaaaaaa", "cc", "b", "eeee", "ddd"])
rdd.sort_by.collect
# => ["aaaaaaa", "b", "cc", "ddd", "eeee"]
rdd.sort_by(lambda{|x| x.size}).collect
# => ["b", "cc", "ddd", "eeee", "aaaaaaa"]
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 |
# File 'lib/spark/rdd.rb', line 1139 def sort_by(key_function=nil, ascending=true, num_partitions=nil) key_function ||= 'lambda{|x| x}' num_partitions ||= default_reduce_partitions command_klass = Spark::Command::SortByKey # Allow spill data to disk due to memory limit # spilling = config['spark.shuffle.spill'] || false spilling = false memory = '' # Set spilling to false if worker has unlimited memory if memory.empty? spilling = false memory = nil else memory = to_memory_size(memory) end # Sorting should do one worker if num_partitions == 1 rdd = self rdd = rdd.coalesce(1) if partitions_size > 1 return rdd.new_rdd_from_command(command_klass, key_function, ascending, spilling, memory, serializer) end # Compute boundary of collection # Collection should be evenly distributed # 20.0 is from scala RangePartitioner (for roughly balanced output partitions) count = self.count sample_size = num_partitions * 20.0 fraction = [sample_size / [count, 1].max, 1.0].min samples = self.sample(false, fraction, 1).map(key_function).collect samples.sort! # Reverse is much faster than reverse sort_by samples.reverse! if !ascending # Determine part bounds bounds = determine_bounds(samples, num_partitions) shuffled = _partition_by(num_partitions, Spark::Command::PartitionBy::Sorting, key_function, bounds, ascending, num_partitions) shuffled.new_rdd_from_command(command_klass, key_function, ascending, spilling, memory, serializer) end |
#sort_by_key(ascending = true, num_partitions = nil) ⇒ Object Also known as: sortByKey
Sort the RDD by key
Example:
rdd = $sc.parallelize([["c", 1], ["b", 2], ["a", 3]])
rdd.sort_by_key.collect
# => [["a", 3], ["b", 2], ["c", 1]]
1109 1110 1111 |
# File 'lib/spark/rdd.rb', line 1109 def sort_by_key(ascending=true, num_partitions=nil) self.sort_by('lambda{|(key, _)| key}') end |
#sort_by_value(ascending = true, num_partitions = nil) ⇒ Object
Sort the RDD by value
Example:
rdd = $sc.parallelize([["a", 3], ["b", 1], ["c", 2]])
rdd.sort_by_value.collect
# => [["b", 1], ["c", 2], ["a", 3]]
1120 1121 1122 |
# File 'lib/spark/rdd.rb', line 1120 def sort_by_value(ascending=true, num_partitions=nil) self.sort_by('lambda{|(_, value)| value}') end |
#stats ⇒ Object
Return a StatCounter object that captures the mean, variance and count of the RDD’s elements in one operation.
394 395 396 |
# File 'lib/spark/rdd.rb', line 394 def stats @stats ||= new_rdd_from_command(Spark::Command::Stats).reduce('lambda{|memo, item| memo.merge(item)}') end |
#stdev ⇒ Object
Compute the standard deviation of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).stdev
# => 0.816...
424 425 426 |
# File 'lib/spark/rdd.rb', line 424 def stdev stats.stdev end |
#subtract(other, num_partitions = nil) ⇒ Object
Return an RDD with the elements from self that are not in other.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["a", 2], ["c", 6]])
rdd1.subtract(rdd2).collect
# => [["a", 1], ["b", 3], ["c", 4]]
1094 1095 1096 1097 1098 1099 1100 |
# File 'lib/spark/rdd.rb', line 1094 def subtract(other, num_partitions=nil) mapping_function = 'lambda{|x| [x,nil]}' self.map(mapping_function) .subtract_by_key(other.map(mapping_function), num_partitions) .keys end |
#subtract_by_key(other, num_partitions = nil) ⇒ Object Also known as: subtractByKey
Return each (key, value) pair in self RDD that has no pair with matching key in other RDD.
Example:
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["b", 5], ["c", 6]])
rdd1.subtract_by_key(rdd2).collect
# => [["a", 1], ["a", 2]]
1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 |
# File 'lib/spark/rdd.rb', line 1075 def subtract_by_key(other, num_partitions=nil) create_combiner = 'lambda{|item| [[item]]}' merge_value = 'lambda{|combiner, item| combiner.first << item; combiner}' merge_combiners = 'lambda{|combiner_1, combiner_2| combiner_1 += combiner_2; combiner_1}' self.union(other) .combine_by_key(create_combiner, merge_value, merge_combiners, num_partitions) .filter('lambda{|(key,values)| values.size == 1}') .flat_map_values('lambda{|item| item.first}') end |
#sum ⇒ Object
Return the sum of this RDD
Example:
rdd = $sc.parallelize(0..10)
rdd.sum
# => 55
375 376 377 |
# File 'lib/spark/rdd.rb', line 375 def sum self.reduce('lambda{|sum, item| sum + item}') end |
#take(count) ⇒ Object
Take the first num elements of the RDD.
It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
Example:
rdd = $sc.parallelize(0..100, 20)
rdd.take(5)
# => [0, 1, 2, 3, 4]
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/spark/rdd.rb', line 247 def take(count) buffer = [] parts_count = self.partitions_size # No parts was scanned, yet last_scanned = -1 while buffer.empty? last_scanned += 1 buffer += context.run_job_with_command(self, [last_scanned], true, Spark::Command::Take, 0, -1) end # Assumption. Depend on batch_size and how Spark divided data. items_per_part = buffer.size left = count - buffer.size while left > 0 && last_scanned < parts_count parts_to_take = (left.to_f/items_per_part).ceil parts_for_scanned = Array.new(parts_to_take) do last_scanned += 1 end # We cannot take exact number of items because workers are isolated from each other. # => once you take e.g. 50% from last part and left is still > 0 then its very # difficult merge new items items = context.run_job_with_command(self, parts_for_scanned, true, Spark::Command::Take, left, last_scanned) buffer += items left = count - buffer.size # Average size of all parts items_per_part = [items_per_part, items.size].reduce(0){|sum, x| sum + x.to_f/2} end buffer.slice!(0, count) end |
#take_sample(with_replacement, num, seed = nil) ⇒ Object Also known as: takeSample
Return a fixed-size sampled subset of this RDD in an array
Examples:
rdd = $sc.parallelize(0..100)
rdd.take_sample(true, 10)
# => [90, 84, 74, 44, 27, 22, 72, 96, 80, 54]
rdd.take_sample(false, 10)
# => [5, 35, 30, 48, 22, 33, 40, 75, 42, 32]
837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 |
# File 'lib/spark/rdd.rb', line 837 def take_sample(with_replacement, num, seed=nil) if num < 0 raise Spark::RDDError, 'Size have to be greater than 0' elsif num == 0 return [] end # Taken from scala num_st_dev = 10.0 # Number of items initial_count = self.count return [] if initial_count == 0 # Create new generator seed ||= Random.new_seed rng = Random.new(seed) # Shuffle elements if requested num if greater than array size if !with_replacement && num >= initial_count return self.shuffle(seed).collect end # Max num max_sample_size = Integer::MAX - (num_st_dev * Math.sqrt(Integer::MAX)).to_i if num > max_sample_size raise Spark::RDDError, "Size can not be greate than #{max_sample_size}" end # Approximate fraction with tolerance fraction = compute_fraction(num, initial_count, with_replacement) # Compute first samled subset samples = self.sample(with_replacement, fraction, seed).collect # If the first sample didn't turn out large enough, keep trying to take samples; # this shouldn't happen often because we use a big multiplier for their initial size. index = 0 while samples.size < num log_warning("Needed to re-sample due to insufficient sample size. Repeat #{index}") samples = self.sample(with_replacement, fraction, rng.rand(0..Integer::MAX)).collect index += 1 end samples.shuffle!(random: rng) samples[0, num] end |
#to_java ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/spark/rdd.rb', line 179 def to_java marshal = Spark::Serializer.marshal if deserializer.batched? ser = deserializer.deep_copy ser.serializer = marshal else ser = Spark::Serializer.batched(marshal) end rdd = self.reserialize(ser) RubyRDD.toJava(rdd.jrdd, rdd.serializer.batched?) end |
#union(other) ⇒ Object
Return the union of this RDD and another one. Any identical elements will appear multiple times (use .distinct to eliminate them).
Example:
rdd = $sc.parallelize([1, 2, 3])
rdd.union(rdd).collect
# => [1, 2, 3, 1, 2, 3]
747 748 749 750 751 752 753 754 |
# File 'lib/spark/rdd.rb', line 747 def union(other) if self.serializer != other.serializer other = other.reserialize(serializer) end new_jrdd = jrdd.union(other.jrdd) RDD.new(new_jrdd, context, serializer, deserializer) end |
#unpersist(blocking = true) ⇒ Object
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
Parameters:
- blocking
-
whether to block until all blocks are deleted.
147 148 149 150 151 |
# File 'lib/spark/rdd.rb', line 147 def unpersist(blocking=true) @cached = false jrdd.unpersist(blocking) self end |
#values ⇒ Object
Return an RDD with the second element of PairRDD
Example:
rdd = $sc.parallelize([[1,2], [3,4], [5,6]])
rdd.keys.collect
# => [2, 4, 6]
1240 1241 1242 |
# File 'lib/spark/rdd.rb', line 1240 def values self.map('lambda{|(_, value)| value}') end |
#variance ⇒ Object
Compute the variance of this RDD’s elements.
Example:
$sc.parallelize([1, 2, 3]).variance
# => 0.666...
414 415 416 |
# File 'lib/spark/rdd.rb', line 414 def variance stats.variance end |