Class: CitrusRpc::RpcServer::WsAcceptor

Inherits:
Object
  • Object
show all
Includes:
Utils::EventEmitter
Defined in:
lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb

Overview

WsAcceptor

Instance Method Summary collapse

Methods included from Utils::EventEmitter

#emit, #on, #once

Constructor Details

#initialize(args = {}, &block) ⇒ WsAcceptor

Create a new websocket acceptor

Parameters:

  • args (Hash) (defaults to: {})

    Options

Options Hash (args):

  • buffer_msg (Boolean)
  • interval (Integer)


24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb', line 24

def initialize args={}, &block
  @buffer_msg = args[:buffer_msg]
  @interval = args[:interval]

  @server = nil
  @wss = {}

  @msg_queues = {}
  @callback = block

  @listening = false
  @closed = false
end

Instance Method Details

#closeObject

Close the acceptor



79
80
81
82
83
84
# File 'lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb', line 79

def close
  return unless @listening && !@closed
  EM.stop_server @server
  @closed = true
  emit :closed
end

#listen(port) ⇒ Object

Listen on port

Parameters:

  • port (Integer)


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/citrus-rpc/rpc-server/acceptors/ws_acceptor.rb', line 41

def listen port
  raise RuntimeError 'acceptor double listen' if @listening

  begin
    @server = WebSocket::EventMachine::Server.start(:host => '0.0.0.0', :port => port) { |ws|
      ws.onopen {
        @wss[ws.signature] = ws
        peer_port, peer_host = Socket.unpack_sockaddr_in ws.get_peername
        emit :connection, { :id => ws.signature, :ip => peer_host }
      }

      ws.onmessage { |msg, type|
        begin
          pkg = JSON.parse msg
          if pkg.instance_of? Array
            process_msgs ws, pkg
          else
            process_msg ws, pkg
          end
        rescue => err
        end
      }

      ws.onclose {
        @wss.delete ws.signature
        @msg_queues.delete ws.signature
      }
      ws.onerror { |err| emit :error, err }
    }
  rescue => err
    emit :error, err
  end

  on(:connection) { |obj| ip_filter obj }
  @listening = true
end