Class: RJR::Nodes::WS

Inherits:
RJR::Node show all
Defined in:
lib/rjr/nodes/ws.rb

Overview

Web socket node definition, listen for and invoke json-rpc requests via web sockets

Clients should specify the hostname / port when listening for and invoking requests.

note the RJR javascript client also supports sending / receiving json-rpc messages over web sockets

Examples:

Listening for json-rpc requests over tcp

# initialize node
server = RJR::Nodes::WS.new :node_id => 'server', :host => 'localhost', :port => '7777'

# register rjr dispatchers (see RJR::Dispatcher)
server.dispatcher.handle('hello') do |name|
  "Hello #{name}!"
end

# listen, and block
server.listen
server.join

Invoking json-rpc requests over web sockets using rjr

client = RJR::Nodes::WS.new :node_id => 'client'
puts client.invoke_request('ws://localhost:7777', 'hello', 'mo')

Constant Summary collapse

RJR_NODE_TYPE =
:ws
PERSISTENT_NODE =
true
INDIRECT_NODE =
false

Instance Attribute Summary

Attributes inherited from RJR::Node

#connection_event_handlers, #dispatcher, #message_headers, #node_id

Instance Method Summary collapse

Methods inherited from RJR::Node

#clear_event_handlers, em, #em, #halt, #indirect?, indirect?, #join, #node_type, #on, persistent?, #persistent?, tp, #tp

Constructor Details

#initialize(args = {}) ⇒ WS

WS initializer

Parameters:

  • args (Hash) (defaults to: {})

    the options to create the web socket node with

Options Hash (args):

  • :host (String)

    the hostname/ip which to listen on

  • :port (Integer)

    the port which to listen on



89
90
91
92
93
94
95
96
# File 'lib/rjr/nodes/ws.rb', line 89

def initialize(args = {})
   super(args)
   @host      = args[:host]
   @port      = args[:port]

   @connections = []
   @connections_lock = Mutex.new
end

Instance Method Details

#invoke(uri, rpc_method, *args) ⇒ Object

Instructs node to send rpc request, and wait for / return response

Implementation of RJR::Node#invoke

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

Parameters:

  • uri (String)

    location of node to send request to, should be in format of ws://hostname:port

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/rjr/nodes/ws.rb', line 135

def invoke(uri, rpc_method, *args)
  message = Messages::Request.new :method => rpc_method,
                                  :args   => args,
                                  :headers => @message_headers

  @@em.schedule {
    init_client(uri) do |c|
      c.stream { |msg| handle_message(msg.data, c) }

      c.send_msg message.to_s
    end
  }

  # TODO optional timeout for response ?
  result = wait_for_result(message)

  if result.size > 2
    raise Exception, result[2]
  end
  return result[1]
end

#listenObject

Instruct Node to start listening for and dispatching rpc requests

Implementation of RJR::Node#listen



112
113
114
115
116
117
118
119
120
121
122
# File 'lib/rjr/nodes/ws.rb', line 112

def listen
  @@em.schedule do
    EventMachine::WebSocket.run(:host => @host, :port => @port) do |ws|
      ws.onopen    { }
      ws.onclose   {       @connection_event_handlers[:closed].each { |h| h.call self } }
      ws.onerror   { |e|   @connection_event_handlers[:error].each  { |h| h.call self } }
      ws.onmessage { |msg| handle_message(msg, ws) }
    end
  end
  self
end

#notify(uri, rpc_method, *args) ⇒ Object

Instructs node to send rpc notification (immadiately returns / no response is generated)

Implementation of RJR::Node#notify

Parameters:

  • uri (String)

    location of node to send notification to, should be in format of ws://hostname:port

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/rjr/nodes/ws.rb', line 165

def notify(uri, rpc_method, *args)
  # will block until message is published
  published_l = Mutex.new
  published_c = ConditionVariable.new

  invoked = false
  message = Messages::Notification.new :method => rpc_method,
                                       :args   => args,
                                       :headers => @message_headers
  @@em.schedule {
    init_client(uri) do |c|
      c.stream { |msg| handle_message(msg.data, c) }

      c.send_msg message.to_s

      # XXX same issue w/ tcp node, due to nature of event machine
      # we aren't guaranteed that message is actually written to socket
      # here, process must be kept alive until data is sent or will be lost
      published_l.synchronize { invoked = true ; published_c.signal }
    end
  }
  published_l.synchronize { published_c.wait published_l unless invoked }
  nil
end

#send_msg(data, ws) ⇒ Object

Send data using specified websocket safely

Implementation of RJR::Node#send_msg



105
106
107
# File 'lib/rjr/nodes/ws.rb', line 105

def send_msg(data, ws)
  @@em.schedule { ws.send(data) }
end

#to_sObject



98
99
100
# File 'lib/rjr/nodes/ws.rb', line 98

def to_s
  "RJR::Nodes::WS<#{@node_id},#{@host},#{@port}>"
end