Class: RJR::Nodes::TCP

Inherits:
RJR::Node show all
Defined in:
lib/rjr/nodes/tcp.rb

Overview

TCP node definition, listen for and invoke json-rpc requests via TCP sockets

Clients should specify the hostname / port when listening for requests and when invoking them.

Examples:

Listening for json-rpc requests over tcp

# initialize node
server = RJR::Nodes::TCP.new :node_id => 'server', :host => 'localhost', :port => '7777'

# register rjr dispatchers (see RJR::Dispatcher)
server.dispatcher.handle('hello') { |name|
  "Hello #{name}!"
}

# listen and block
server.listen
server.join

Invoking json-rpc requests over tcp

client = RJR::Nodes::TCP.new :node_id => 'client', :host => 'localhost', :port => '8888'
puts client.invoke('jsonrpc://localhost:7777', 'hello', 'mo')

Constant Summary collapse

RJR_NODE_TYPE =
:tcp
PERSISTENT_NODE =
true
INDIRECT_NODE =
false

Instance Attribute Summary collapse

Attributes inherited from RJR::Node

#connection_event_handlers, #dispatcher, #message_headers, #node_id

Instance Method Summary collapse

Methods inherited from RJR::Node

#clear_event_handlers, em, #em, #halt, #indirect?, indirect?, #join, #node_type, #on, persistent?, #persistent?, tp, #tp

Constructor Details

#initialize(args = {}) ⇒ TCP

TCP initializer

Parameters:

  • args (Hash) (defaults to: {})

    the options to create the tcp node with

Options Hash (args):

  • :host (String)

    the hostname/ip which to listen on

  • :port (Integer)

    the port which to listen on



113
114
115
116
117
118
119
120
# File 'lib/rjr/nodes/tcp.rb', line 113

def initialize(args = {})
   super(args)
   @host      = args[:host]
   @port      = args[:port]

   @connections = []
   @connections_lock = Mutex.new
end

Instance Attribute Details

#connectionsObject

Returns the value of attribute connections.



86
87
88
# File 'lib/rjr/nodes/tcp.rb', line 86

def connections
  @connections
end

Instance Method Details

#invoke(uri, rpc_method, *args) ⇒ Object

Instructs node to send rpc request, and wait for / return response.

Implementation of RJR::Node#invoke

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

Parameters:

  • uri (String)

    location of node to send request to, should be in format of jsonrpc://hostname:port or tcp://hostname:port

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/rjr/nodes/tcp.rb', line 154

def invoke(uri, rpc_method, *args)
  uri = URI.parse(uri)
  host,port = uri.host, uri.port

  message = Messages::Request.new :method => rpc_method,
                                  :args   => args,
                                  :headers => @message_headers
  connection = nil
  @@em.schedule {
    init_client(:host => host, :port => port,
                :rjr_node => self) { |c|
      connection = c
      c.send_msg message.to_s
    }
  }

  # TODO optional timeout for response ?
  result = wait_for_result(message)

  if result.size > 2
    raise Exception, result[2]
  end
  return result[1]
end

#listenObject

Instruct Node to start listening for and dispatching rpc requests

Implementation of RJR::Node#listen



136
137
138
139
140
141
# File 'lib/rjr/nodes/tcp.rb', line 136

def listen
  @@em.schedule {
    @@em.start_server @host, @port, TCPConnection, { :rjr_node => self }
  }
  self
end

#notify(uri, rpc_method, *args) ⇒ Object

Instructs node to send rpc notification (immadiately returns / no response is generated)

Implementation of RJR::Node#notify

Parameters:

  • uri (String)

    location of node to send notification to, should be in format of jsonrpc://hostname:port

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/rjr/nodes/tcp.rb', line 187

def notify(uri, rpc_method, *args)
  # will block until message is published
  published_l = Mutex.new
  published_c = ConditionVariable.new

  uri = URI.parse(uri)
  host,port = uri.host, uri.port

  invoked = false
  conn    = nil
  message = Messages::Notification.new :method => rpc_method,
                                       :args   => args,
                                       :headers => @message_headers
  @@em.schedule {
    init_client(:host => host, :port => port,
                :rjr_node => self) { |c|
      conn = c
      c.send_msg message.to_s
      # XXX, this should be invoked only when we are sure event
      # machine sent message. Shouldn't pose a problem unless event
      # machine is killed immediately after
      published_l.synchronize { invoked = true ; published_c.signal }
    }
  }
  published_l.synchronize { published_c.wait published_l unless invoked }
  #sleep 0.01 until conn.get_outbound_data_size == 0
  nil
end

#send_msg(data, connection) ⇒ Object

Send data using specified connection

Implementation of RJR::Node#send_msg



129
130
131
# File 'lib/rjr/nodes/tcp.rb', line 129

def send_msg(data, connection)
  connection.send_msg(data)
end

#to_sObject



122
123
124
# File 'lib/rjr/nodes/tcp.rb', line 122

def to_s
  "RJR::Nodes::TCP<#{@node_id},#{@host},#{@port}>"
end