Class: Master::Base
Constant Summary
Spark::Constant::ACCUMULATOR_ACK, Spark::Constant::CREATE_WORKER, Spark::Constant::DATA_EOF, Spark::Constant::KILL_WORKER, Spark::Constant::KILL_WORKER_AND_WAIT, Spark::Constant::SUCCESSFULLY_KILLED, Spark::Constant::UNSUCCESSFUL_KILLING, Spark::Constant::WORKER_DONE, Spark::Constant::WORKER_ERROR
Instance Method Summary
collapse
Constructor Details
#initialize ⇒ Base
Returns a new instance of Base.
32
33
34
35
36
|
# File 'lib/spark/worker/master.rb', line 32
def initialize
@port = ARGV[1].to_s.strip.to_i
@socket = TCPSocket.open('localhost', @port)
@worker_arguments = @socket.read_string
end
|
Instance Method Details
#kill_worker_and_wait ⇒ Object
60
61
62
63
64
65
66
|
# File 'lib/spark/worker/master.rb', line 60
def kill_worker_and_wait
if kill_worker
@socket.write_int(SUCCESSFULLY_KILLED)
else
@socket.write_int(UNSUCCESSFUL_KILLING)
end
end
|
#receive_message ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/spark/worker/master.rb', line 47
def receive_message
command = @socket.read_int
case command
when CREATE_WORKER
create_worker
when KILL_WORKER
kill_worker
when KILL_WORKER_AND_WAIT
kill_worker_and_wait
end
end
|
#run ⇒ Object
38
39
40
41
42
43
44
45
|
# File 'lib/spark/worker/master.rb', line 38
def run
selector = NIO::Selector.new
monitor = selector.register(@socket, :r)
monitor.value = Proc.new { receive_message }
loop {
selector.select {|monitor| monitor.value.call}
}
end
|