Class: Master::Base

Inherits:
Object
  • Object
show all
Includes:
Spark::Constant
Defined in:
lib/spark/worker/master.rb

Direct Known Subclasses

Process, Thread

Constant Summary

Constants included from Spark::Constant

Spark::Constant::ACCUMULATOR_ACK, Spark::Constant::CREATE_WORKER, Spark::Constant::DATA_EOF, Spark::Constant::KILL_WORKER, Spark::Constant::KILL_WORKER_AND_WAIT, Spark::Constant::SUCCESSFULLY_KILLED, Spark::Constant::UNSUCCESSFUL_KILLING, Spark::Constant::WORKER_DONE, Spark::Constant::WORKER_ERROR

Instance Method Summary collapse

Constructor Details

#initializeBase

Returns a new instance of Base.



32
33
34
35
36
# File 'lib/spark/worker/master.rb', line 32

def initialize
  @port = ARGV[1].to_s.strip.to_i
  @socket = TCPSocket.open('localhost', @port)
  @worker_arguments = @socket.read_string
end

Instance Method Details

#kill_worker_and_waitObject



60
61
62
63
64
65
66
# File 'lib/spark/worker/master.rb', line 60

def kill_worker_and_wait
  if kill_worker
    @socket.write_int(SUCCESSFULLY_KILLED)
  else
    @socket.write_int(UNSUCCESSFUL_KILLING)
  end
end

#receive_messageObject



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/spark/worker/master.rb', line 47

def receive_message
  command = @socket.read_int

  case command
  when CREATE_WORKER
    create_worker
  when KILL_WORKER
    kill_worker
  when KILL_WORKER_AND_WAIT
    kill_worker_and_wait
  end
end

#runObject



38
39
40
41
42
43
44
45
# File 'lib/spark/worker/master.rb', line 38

def run
  selector = NIO::Selector.new
  monitor = selector.register(@socket, :r)
  monitor.value = Proc.new { receive_message }
  loop {
    selector.select {|monitor| monitor.value.call}
  }
end