Class: Spark::Accumulator::Server
- Inherits:
-
Object
- Object
- Spark::Accumulator::Server
- Defined in:
- lib/spark/accumulator.rb
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
Class Method Summary collapse
Instance Method Summary collapse
- #handle_accept ⇒ Object
- #handle_connection(socket) ⇒ Object
-
#initialize ⇒ Server
constructor
A new instance of Server.
- #stop ⇒ Object
Constructor Details
#initialize ⇒ Server
Returns a new instance of Server.
221 222 223 224 225 226 227 228 |
# File 'lib/spark/accumulator.rb', line 221 def initialize @server = TCPServer.new(0) @host = @server.hostname @port = @server.port @threads = [] handle_accept end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
201 202 203 |
# File 'lib/spark/accumulator.rb', line 201 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
201 202 203 |
# File 'lib/spark/accumulator.rb', line 201 def port @port end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
201 202 203 |
# File 'lib/spark/accumulator.rb', line 201 def server @server end |
Class Method Details
.host ⇒ Object
211 212 213 214 |
# File 'lib/spark/accumulator.rb', line 211 def self.host start @instance.host end |
.port ⇒ Object
216 217 218 219 |
# File 'lib/spark/accumulator.rb', line 216 def self.port start @instance.port end |
.start ⇒ Object
203 204 205 |
# File 'lib/spark/accumulator.rb', line 203 def self.start @instance ||= Spark::Accumulator::Server.new end |
.stop ⇒ Object
207 208 209 |
# File 'lib/spark/accumulator.rb', line 207 def self.stop @instance && @instance.stop end |
Instance Method Details
#handle_accept ⇒ Object
236 237 238 239 240 241 242 243 |
# File 'lib/spark/accumulator.rb', line 236 def handle_accept @threads << Thread.new do loop { handle_connection(@server.accept) } end end |
#handle_connection(socket) ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/spark/accumulator.rb', line 245 def handle_connection(socket) @threads << Thread.new do until socket.closed? count = socket.read_int count.times do data = socket.read_data accum = Spark::Accumulator.instances[data[0]] if accum accum.add(data[1]) else Spark.logger.warn("Accumulator with id #{data[0]} does not exist.") end end # http://stackoverflow.com/questions/28560133/ruby-server-java-scala-client-deadlock # socket.write_int(Spark::Constant::ACCUMULATOR_ACK) end end end |
#stop ⇒ Object
230 231 232 233 234 |
# File 'lib/spark/accumulator.rb', line 230 def stop @threads.each(&:kill) rescue nil end |