Class: RJR::Nodes::Unix

Inherits:
RJR::Node show all
Defined in:
lib/rjr/nodes/unix.rb

Overview

Unix node definition, listen for and invoke json-rpc requests via Unix Sockets

Clients should specify the socketname when listening for requests and when invoking them.

TODO client / server examples

Constant Summary collapse

RJR_NODE_TYPE =
:unix
PERSISTENT_NODE =
true
INDIRECT_NODE =
false

Instance Attribute Summary collapse

Attributes inherited from RJR::Node

#connection_event_handlers, #dispatcher, #message_headers, #node_id

Instance Method Summary collapse

Methods inherited from RJR::Node

#clear_event_handlers, em, #em, #halt, #indirect?, indirect?, #join, #node_type, #on, persistent?, #persistent?, tp, #tp

Constructor Details

#initialize(args = {}) ⇒ Unix

Unix initializer

Parameters:

  • args (Hash) (defaults to: {})

    the options to create the unix node with

Options Hash (args):

  • :socketname (String)

    the name of the socket which to listen on



97
98
99
100
101
102
103
# File 'lib/rjr/nodes/unix.rb', line 97

def initialize(args = {})
   super(args)
   @socketname = args[:socketname]

   @connections = []
   @connections_lock = Mutex.new
end

Instance Attribute Details

#connectionsObject

Returns the value of attribute connections.



71
72
73
# File 'lib/rjr/nodes/unix.rb', line 71

def connections
  @connections
end

Instance Method Details

#invoke(socketname, rpc_method, *args) ⇒ Object

Instructs node to send rpc request, and wait for / return response.

Implementation of RJR::Node#invoke

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

Parameters:

  • socketname (String)

    name of socket which destination node is listening on

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/rjr/nodes/unix.rb', line 137

def invoke(socketname, rpc_method, *args)
  message = Messages::Request.new :method => rpc_method,
                                  :args   => args,
                                  :headers => @message_headers
  connection = nil
  @@em.schedule {
    init_client(:socketname => socketname,
                :rjr_node => self) { |c|
      connection = c
      c.send_msg message.to_s
    }
  }

  # TODO optional timeout for response ?
  result = wait_for_result(message)

  if result.size > 2
    raise Exception, result[2]
  end
  return result[1]
end

#listenObject

Instruct Node to start listening for and dispatching rpc requests

Implementation of RJR::Node#listen



119
120
121
122
123
124
# File 'lib/rjr/nodes/unix.rb', line 119

def listen
  @@em.schedule {
    @@em.start_unix_domain_server @socketname, nil, UnixConnection, { :rjr_node => self }
  }
  self
end

#notify(socketname, rpc_method, *args) ⇒ Object

Instructs node to send rpc notification (immadiately returns / no response is generated)

Implementation of RJR::Node#notify

destination node is listening on

Parameters:

  • socketname (String)

    name of socket which

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/rjr/nodes/unix.rb', line 167

def notify(socketname, rpc_method, *args)
  # will block until message is published
  published_l = Mutex.new
  published_c = ConditionVariable.new

  invoked = false
  conn    = nil
  message = Messages::Notification.new :method => rpc_method,
                                       :args   => args,
                                       :headers => @message_headers
  @@em.schedule {
    init_client(:socketname => socketname,
                :rjr_node => self) { |c|
      conn = c
      c.send_msg message.to_s
      # XXX, this should be invoked only when we are sure event
      # machine sent message. Shouldn't pose a problem unless event
      # machine is killed immediately after
      published_l.synchronize { invoked = true ; published_c.signal }
    }
  }
  published_l.synchronize { published_c.wait published_l unless invoked }
  #sleep 0.01 until conn.get_outbound_data_size == 0
  nil
end

#send_msg(data, connection) ⇒ Object

Send data using specified connection

Implementation of RJR::Node#send_msg



112
113
114
# File 'lib/rjr/nodes/unix.rb', line 112

def send_msg(data, connection)
  connection.send_msg(data)
end

#to_sObject



105
106
107
# File 'lib/rjr/nodes/unix.rb', line 105

def to_s
  "RJR::Nodes::Unix<#{@node_id},#{@socketname}>"
end