Class: RJR::Nodes::WS
Overview
Web socket node definition, listen for and invoke json-rpc requests via web sockets
Clients should specify the hostname / port when listening for and invoking requests.
note the RJR javascript client also supports sending / receiving json-rpc messages over web sockets
Constant Summary collapse
- RJR_NODE_TYPE =
:ws
- PERSISTENT_NODE =
true
- INDIRECT_NODE =
false
Instance Attribute Summary
Attributes inherited from RJR::Node
#connection_event_handlers, #dispatcher, #message_headers, #node_id
Instance Method Summary collapse
-
#initialize(args = {}) ⇒ WS
constructor
WS initializer.
-
#invoke(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for / return response.
-
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
-
#notify(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated).
-
#send_msg(data, ws) ⇒ Object
Send data using specified websocket safely.
- #to_s ⇒ Object
Methods inherited from RJR::Node
#clear_event_handlers, em, #em, #halt, #indirect?, indirect?, #join, #node_type, #on, persistent?, #persistent?, tp, #tp
Constructor Details
#initialize(args = {}) ⇒ WS
WS initializer
89 90 91 92 93 94 95 96 |
# File 'lib/rjr/nodes/ws.rb', line 89 def initialize(args = {}) super(args) @host = args[:host] @port = args[:port] @connections = [] @connections_lock = Mutex.new end |
Instance Method Details
#invoke(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for / return response
Implementation of RJR::Node#invoke
Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/rjr/nodes/ws.rb', line 135 def invoke(uri, rpc_method, *args) = Messages::Request.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule { init_client(uri) do |c| c.stream { |msg| (msg.data, c) } c.send_msg .to_s end } # TODO optional timeout for response ? result = wait_for_result() if result.size > 2 raise Exception, result[2] end return result[1] end |
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests
Implementation of RJR::Node#listen
112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/rjr/nodes/ws.rb', line 112 def listen @@em.schedule do EventMachine::WebSocket.run(:host => @host, :port => @port) do |ws| ws.onopen { } ws.onclose { @connection_event_handlers[:closed].each { |h| h.call self } } ws.onerror { |e| @connection_event_handlers[:error].each { |h| h.call self } } ws. { |msg| (msg, ws) } end end self end |
#notify(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated)
Implementation of RJR::Node#notify
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/rjr/nodes/ws.rb', line 165 def notify(uri, rpc_method, *args) # will block until message is published published_l = Mutex.new published_c = ConditionVariable.new invoked = false = Messages::Notification.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule { init_client(uri) do |c| c.stream { |msg| (msg.data, c) } c.send_msg .to_s # XXX same issue w/ tcp node, due to nature of event machine # we aren't guaranteed that message is actually written to socket # here, process must be kept alive until data is sent or will be lost published_l.synchronize { invoked = true ; published_c.signal } end } published_l.synchronize { published_c.wait published_l unless invoked } nil end |
#send_msg(data, ws) ⇒ Object
Send data using specified websocket safely
Implementation of RJR::Node#send_msg
105 106 107 |
# File 'lib/rjr/nodes/ws.rb', line 105 def send_msg(data, ws) @@em.schedule { ws.send(data) } end |
#to_s ⇒ Object
98 99 100 |
# File 'lib/rjr/nodes/ws.rb', line 98 def to_s "RJR::Nodes::WS<#{@node_id},#{@host},#{@port}>" end |