Class: Spark::Accumulator::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/spark/accumulator.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeServer

Returns a new instance of Server.



221
222
223
224
225
226
227
228
# File 'lib/spark/accumulator.rb', line 221

def initialize
  @server = TCPServer.new(0)
  @host = @server.hostname
  @port = @server.port

  @threads = []
  handle_accept
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



201
202
203
# File 'lib/spark/accumulator.rb', line 201

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



201
202
203
# File 'lib/spark/accumulator.rb', line 201

def port
  @port
end

#serverObject (readonly)

Returns the value of attribute server.



201
202
203
# File 'lib/spark/accumulator.rb', line 201

def server
  @server
end

Class Method Details

.hostObject



211
212
213
214
# File 'lib/spark/accumulator.rb', line 211

def self.host
  start
  @instance.host
end

.portObject



216
217
218
219
# File 'lib/spark/accumulator.rb', line 216

def self.port
  start
  @instance.port
end

.startObject



203
204
205
# File 'lib/spark/accumulator.rb', line 203

def self.start
  @instance ||= Spark::Accumulator::Server.new
end

.stopObject



207
208
209
# File 'lib/spark/accumulator.rb', line 207

def self.stop
  @instance && @instance.stop
end

Instance Method Details

#handle_acceptObject



236
237
238
239
240
241
242
243
# File 'lib/spark/accumulator.rb', line 236

def handle_accept
  @threads << Thread.new do
    loop {
      handle_connection(@server.accept)
    }
  end

end

#handle_connection(socket) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/spark/accumulator.rb', line 245

def handle_connection(socket)
  @threads << Thread.new do
    until socket.closed?
      count = socket.read_int
      count.times do
        data = socket.read_data
        accum = Spark::Accumulator.instances[data[0]]
        if accum
          accum.add(data[1])
        else
          Spark.logger.warn("Accumulator with id #{data[0]} does not exist.")
        end
      end

      # http://stackoverflow.com/questions/28560133/ruby-server-java-scala-client-deadlock
      # socket.write_int(Spark::Constant::ACCUMULATOR_ACK)
    end

  end
end

#stopObject



230
231
232
233
234
# File 'lib/spark/accumulator.rb', line 230

def stop
  @threads.each(&:kill)
rescue
  nil
end