Class: DCell::Server

Inherits:
Object
  • Object
show all
Includes:
Celluloid::ZMQ
Defined in:
lib/dcell/server.rb

Overview

Servers handle incoming 0MQ traffic

Defined Under Namespace

Classes: InvalidMessageError

Instance Method Summary collapse

Constructor Details

#initializeServer

Bind to the given 0MQ address (in URL form ala tcp://host:port)



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/dcell/server.rb', line 9

def initialize
  # The gossip protocol is dependent on the node manager
  link Celluloid::Actor[:node_manager]

  @addr   = DCell.addr
  @socket = PullSocket.new

  begin
    @socket.bind(@addr)
  rescue IOError
    @socket.close
    raise
  end

  async.run
end

Instance Method Details

#closeObject



31
32
33
# File 'lib/dcell/server.rb', line 31

def close
  @socket.close if @socket
end

#decode_message(message) ⇒ Object

Decode incoming messages



54
55
56
57
58
59
60
61
62
63
# File 'lib/dcell/server.rb', line 54

def decode_message(message)
  if message[0..1].unpack("CC") == [Marshal::MAJOR_VERSION, Marshal::MINOR_VERSION]
    begin
      Marshal.load message
    rescue => ex
      raise InvalidMessageError, "invalid message: #{ex}"
    end
  else raise InvalidMessageError, "couldn't determine message format: #{message}"
  end
end

#handle_message(message) ⇒ Object

Handle incoming messages



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/dcell/server.rb', line 36

def handle_message(message)
  begin
    message = decode_message message
  rescue InvalidMessageError => ex
    Logger.crash("couldn't decode message", ex)
    return
  end

  begin
    message.dispatch
  rescue => ex
    Logger.crash("DCell::Server: message dispatch failed", ex)
  end
end

#runObject

Wait for incoming 0MQ messages



27
28
29
# File 'lib/dcell/server.rb', line 27

def run
  while true; async.handle_message @socket.read; end
end

#terminateObject

Terminate this server



66
67
68
69
# File 'lib/dcell/server.rb', line 66

def terminate
  @socket.close
  super
end