Class: Spark::Command::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/spark/command/base.rb

Overview

Spark::Command::Base

Parent for all commands (Map, FlatMap, Sort, …)

Direct Known Subclasses

CombineByKey::Base, PartitionBy::Base

Constant Summary collapse

DEFAULT_VARIABLE_OPTIONS =
{
  type: Hash,
  function: true
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Base

Returns a new instance of Base.



13
14
15
16
17
# File 'lib/spark/command/base.rb', line 13

def initialize(*args)
  settings.variables.each do |name, options|
    instance_variable_set("@#{name}", args.shift)
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object



150
151
152
153
154
155
156
# File 'lib/spark/command/base.rb', line 150

def method_missing(method, *args, &block)
  if __objects__ && __objects__.has_key?(method)
    return __objects__[method]
  end

  super
end

Instance Attribute Details

#__objects__Object

Bound objects



148
149
150
# File 'lib/spark/command/base.rb', line 148

def __objects__
  @__objects__
end

Class Method Details

.error(message) ⇒ Object



23
24
25
# File 'lib/spark/command/base.rb', line 23

def self.error(message)
  raise Spark::CommandError, message
end

.init_settingsObject

Init empty settings



52
53
54
55
56
57
58
59
# File 'lib/spark/command/base.rb', line 52

def self.init_settings
  if !class_variable_defined?(:@@settings)
    struct = Struct.new(:variables)

    class_variable_set(:@@settings, struct.new)
    settings.variables = {}
  end
end

.settingsObject

Settings for command (variables)



42
43
44
45
# File 'lib/spark/command/base.rb', line 42

def self.settings
  init_settings
  class_variable_get(:@@settings)
end

.variable(name, options = {}) ⇒ Object

New variable for command

Example:

class Map < Spark::Command::Base
  variable :map_function
end

command = Map.new(1)

command.instance_variables
# => [:@map_function]
command.instance_variable_get(:@map_function)
# => 1


76
77
78
79
80
81
82
# File 'lib/spark/command/base.rb', line 76

def self.variable(name, options={})
  if settings.variables.has_key?(name)
    error "Function #{name} already exist."
  end

  settings.variables[name] = DEFAULT_VARIABLE_OPTIONS.merge(options)
end

Instance Method Details

#before_runObject

This method is called before every execution.



141
142
# File 'lib/spark/command/base.rb', line 141

def before_run
end

#error(message) ⇒ Object



27
28
29
# File 'lib/spark/command/base.rb', line 27

def error(message)
  self.class.error(message)
end

#execute(iterator, split_index) ⇒ Object

Execute command for data and split index



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/spark/command/base.rb', line 89

def execute(iterator, split_index)
  # Implemented on Base but can be override
  before_run

  # Run has to be implemented on child
  if iterator.is_a?(Enumerator::Lazy) && respond_to?(:lazy_run)
    return lazy_run(iterator, split_index)
  end

  iterator = iterator.to_a
  run(iterator, split_index)
end

#log(message = nil) ⇒ Object



31
32
33
34
# File 'lib/spark/command/base.rb', line 31

def log(message=nil)
  $stdout.puts %{==> #{Time.now.strftime("%H:%M:%S")} [#{self.class.name}] #{message}}
  $stdout.flush
end

#prepareObject

This is called before execution. Executing will be stopped if some command contains error (e.g. badly serialized lambda).

What is doing?

  • evaluate lambda

  • evaluate method

  • make new lambda



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/spark/command/base.rb', line 114

def prepare
  return if prepared?

  to_function = settings.variables.select {|_, options| options[:function]}
  to_function.each do |name, options|
    name = "@#{name}"
    data = instance_variable_get(name)

    case data[:type]
    when 'proc'
      result = eval(data[:content])
    when 'symbol'
      result = lambda(&data[:content])
    when 'method'
      # Method must me added to instance not Class
      instance_eval(data[:content])
      # Method will be available as Proc
      result = lambda(&method(data[:name]))
    end

    instance_variable_set(name, result)
  end

  @prepared = true
end

#prepared?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/spark/command/base.rb', line 102

def prepared?
  !!@prepared
end

#settingsObject



47
48
49
# File 'lib/spark/command/base.rb', line 47

def settings
  self.class.settings
end

#to_sObject



19
20
21
# File 'lib/spark/command/base.rb', line 19

def to_s
  self.class.name.split('::').last
end