Class: Spark::Command

Inherits:
Object
  • Object
show all
Defined in:
lib/spark/command.rb

Overview

Container which includes all commands and other things for worker Every RDD have own copy of Command

Defined Under Namespace

Classes: Aggregate, Base, CombineByKey, Compact, Filter, FlatMap, FlatMapValues, Foreach, ForeachPartition, Glom, Histogram, KeyBy, Map, MapPartitions, MapPartitionsWithIndex, MapValues, PartitionBy, Pipe, Reduce, Sample, Shuffle, SortByKey, Stats, Take

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeCommand

Returns a new instance of Command.



10
11
12
13
14
15
16
# File 'lib/spark/command.rb', line 10

def initialize
  @serializer = nil
  @deserializer = nil
  @commands = []
  @libraries = []
  @bound_objects = {}
end

Instance Attribute Details

#bound_objectsObject

Returns the value of attribute bound_objects.



8
9
10
# File 'lib/spark/command.rb', line 8

def bound_objects
  @bound_objects
end

#commandsObject

Returns the value of attribute commands.



8
9
10
# File 'lib/spark/command.rb', line 8

def commands
  @commands
end

#deserializerObject

Returns the value of attribute deserializer.



8
9
10
# File 'lib/spark/command.rb', line 8

def deserializer
  @deserializer
end

#librariesObject

Returns the value of attribute libraries.



8
9
10
# File 'lib/spark/command.rb', line 8

def libraries
  @libraries
end

#serializerObject

Returns the value of attribute serializer.



8
9
10
# File 'lib/spark/command.rb', line 8

def serializer
  @serializer
end

Instance Method Details

#execute(iterator, split_index) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/spark/command.rb', line 18

def execute(iterator, split_index)
  # Require necessary libraries
  libraries.each{|lib| require lib}

  # Prepare bound objects
  @commands.each do |command|
    command.__objects__ = bound_objects
  end

  # Prepare for running
  @commands.each(&:prepare)

  # Run all task
  @commands.each do |command|
    iterator = command.execute(iterator, split_index)
  end

  # Return changed iterator. This is not be necessary for some tasks
  # because of using inplace changing but some task can return
  # only one value (for example reduce).
  iterator
end

#lastObject



41
42
43
# File 'lib/spark/command.rb', line 41

def last
  @commands.last
end

#marshal_dumpObject

Bound objects can depend on library which is loaded during @execute In that case worker raise “undefined class/module”



61
62
63
# File 'lib/spark/command.rb', line 61

def marshal_dump
  [@serializer, @deserializer, @commands, @libraries, serialized_bound_objects]
end

#marshal_load(array) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/spark/command.rb', line 65

def marshal_load(array)
  @serializer = array.shift
  @deserializer = array.shift
  @commands = array.shift
  @libraries = array.shift
  @serialized_bound_objects = array.shift
end