Class: SimpleRPC::Client
- Inherits:
-
Object
- Object
- SimpleRPC::Client
- Defined in:
- lib/simplerpc/client.rb
Overview
The SimpleRPC client connects to a server, either persistently on on-demand, and makes calls to its proxy object.
Once created, you should be able to interact with the client as if it were the remote object, i.e.:
require 'simplerpc/client'
# Connect
c = SimpleRPC::Client.new(:hostname => '127.0.0.1', :port => 27045)
# Make some calls directly
c.length # 2
c.call(:dup) # ["thing", "thing2"]
c.call(:class) # Array
# Get a proxy object
p = c.get_proxy
c.persist # always-on mode with 1 connection
p.dup # ["thing", "thing2"]
p.length # 2
p.class # Array
p.each{|x| puts x} # outputs "thing\nthing2\n"
# Disconnect from always-on mode
c.disconnect
Making Requests
Requests can be made on the client object as if it were local, and these will be proxied to the server. For methods that are clobbered locally (for example ‘.class’, which will return ‘SimpleRPC::Client’, you may use #call to send this without local interaction:
c.class # SimpleRPC::Client
c.call(:class) # Array
Proxy Objects
Calling #get_proxy will return a dynamically-constructed object that lacks any methods other than remote ones—this means it will be almost indistinguishable from a local object:
c.class # Array
c.dup # ['thing', 'thing2']
This is an exceptionally seamless way of interacting, but you must retain the original client connection in order to call Client#disconnect or use always-on mode.
Blocks
Blocks are supported and run on the client-side. A server object may yield any number of times. Note that if the client is single-threaded, it is not possible to call further calls when inside the block (if :threading is on this is perfectly acceptable).
Exceptions
Remote exceptions fired by the server during a call are returned as RemoteExceptions, and have the message and backtrace set as if you are on the remote server.
Network errors are exposed directly. The server will not close a pipe during an operation, so if using connect-on-demand you should only observe Errno::ECONNREFUSED exceptions. If using a persistent connection pool, you will encounter either Errno::ECONNREFUSED, Errno::ECONNRESET or EOFError as the serialiser attempts to read from the closed socket.
Thread Safety
Clients are thread-safe and will block when controlling the always-on connection with #persist and #close.
If :threaded is true, clients will support multiple connections to the server. If used in always-on mode, this means it will maintain one re-usable connection, and only spawn new ones if requested.
Modes
It is possible to use the client in two modes: always-on and connect-on-demand, controlled by calling #persist and #disconnect.
Always-on mode maintains a pool of connections to the server, and requests are preferentially sent over these (note that if you have threading off, it makes no sense to allocate more than one entry in the pool)
connect-on-demand creates a connection when necessary. This mode is used whenever the client is not connected. There is a small performance hit to reconnecting each time, especially if you are using authentication.
Serialisation Formats
By default both client and server use Marshal. This has proven fast and general, and is capable of sending data directly over sockets.
The serialiser also supports MessagePack (the msgpack gem), and this yields a small performance increase at the expense of generality (restrictions on data type).
Note that JSON and YAML, though they support reading and writing to sockets, do not properly terminate their reads and cause the system to hang. These methods are both slow and limited by comparison anyway, and algorithms needed to support their use require relatively large memory usage. They may be supported in later versions.
Authentication
Setting the :password and :secret options will cause the client to attempt auth on connection. If this process succeeds, the client will then proceed as before, else the server will forcibly close the socket. If :fast_auth is on this will cause some kind of random data loading exception from the serialiser. If :fast_auth is off (default), this will throw a SimpleRPC::AuthenticationError exception.
Clients and servers do not tell one another to use auth (such a system would impact speed) so the results of using mismatched configurations are undefined.
The auth process is simple and not particularly secure, but is designed to deter casual connections and attacks. It uses a password that is sent encrypted against a salt sent by the server to prevent replay attacks. If you want more reliable security, use an SSH tunnel.
The performance impact of auth is small, and takes about the same time as a simple request. This can be mitigated by using always-on mode.
Instance Attribute Summary collapse
-
#fast_auth ⇒ Object
Returns the value of attribute fast_auth.
-
#hostname ⇒ Object
readonly
Returns the value of attribute hostname.
-
#password ⇒ Object
writeonly
Sets the attribute password.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#secret ⇒ Object
writeonly
Sets the attribute secret.
-
#serialiser ⇒ Object
Returns the value of attribute serialiser.
-
#threaded ⇒ Object
readonly
Returns the value of attribute threaded.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Class Method Summary collapse
-
.new_proxy(opts = {}) ⇒ Object
Connect to the remote server and return two things:.
Instance Method Summary collapse
-
#call(m, *args, &block) ⇒ Object
Call a method that is otherwise clobbered by the client object, e.g.:.
-
#connected? ⇒ Boolean
Is this client maintaining any persistent connections?.
-
#disconnect ⇒ Object
Close all persistent connections to the server.
-
#get_proxy ⇒ Object
Returns a proxy object that is all but indistinguishable from the remote object.
-
#initialize(opts = {}) ⇒ Client
constructor
Create a new client for the network.
-
#method_missing(m, *args, &block) ⇒ Object
Calls RPC on the remote object.
-
#persist(pool_size = 1) ⇒ Object
Tell the client how many connections to persist.
Constructor Details
#initialize(opts = {}) ⇒ Client
Create a new client for the network. Takes an options hash, in which :port is required:
- :hostname
-
The hostname to connect to. Defaults to localhost
- :port
-
The port to connect on. Required.
- :serialiser
-
A class supporting #dump(object, io) and #load(IO), defaults to Marshal. I recommend using MessagePack if this is not fast enough
- :timeout
-
Socket timeout in seconds.
- :password
-
The password clients need to connect
- :secret
-
The encryption key used during password authentication. Should be some long random string that matches the server’s. This should be ASCII-8bit encoded (it will be converted if not)
- :fast_auth
-
Use a slightly faster auth system that is incapable of knowing if it has failed or not. By default this is off.
- :threaded
-
Support multiple connections to the server (default is on) If off, threaded requests will queue in the client.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/simplerpc/client.rb', line 162 def initialize(opts = {}) # Connection details @hostname = opts[:hostname] || '127.0.0.1' @port = opts[:port] raise 'Port required' unless @port timeout = opts[:timeout] # Support multiple connections at once? @threaded = !(opts[:threaded] == false) # Serialiser. @serialiser = opts[:serialiser] || Marshal # Auth system if opts[:password] && opts[:secret] require 'simplerpc/encryption' @password = opts[:password] @secret = opts[:secret] # Check for return from auth? @fast_auth = (opts[:fast_auth] == true) end # Threading uses @pool, single thread uses @s and @mutex if @threaded @pool_mutex = Mutex.new # Controls edits to the pool @pool = {} # List of available sockets with # accompanying mutices else @mutex = Mutex.new @s = nil end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(m, *args, &block) ⇒ Object
Calls RPC on the remote object.
You should not need to call this directly (though you are welcome to).
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/simplerpc/client.rb', line 332 def method_missing(m, *args, &block) # Records the server's return values. result = nil success = true # Get a socket preferentially from the pool, # and do the actual work _get_socket() do |s, persist| # send method name and arity SocketProtocol::Stream.send(s, [m, args, block_given?, persist], @serialiser) # Call with args success, result = SocketProtocol::Stream.recv(s, @serialiser) # Check if we should yield while success == SocketProtocol::REQUEST_YIELD do block_result = yield(*result) SocketProtocol::Stream.send(s, block_result, @serialiser) success, result = SocketProtocol::Stream.recv(s, @serialiser) end end # If it didn't succeed, treat the payload as an exception raise result unless success == SocketProtocol::REQUEST_SUCCESS return result rescue EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::ECONNABORTED, Errno::EPIPE => e raise ConnectionError.new(e) rescue StandardError => e raise FormatError.new(e) end |
Instance Attribute Details
#fast_auth ⇒ Object
Returns the value of attribute fast_auth.
142 143 144 |
# File 'lib/simplerpc/client.rb', line 142 def fast_auth @fast_auth end |
#hostname ⇒ Object (readonly)
Returns the value of attribute hostname.
141 142 143 |
# File 'lib/simplerpc/client.rb', line 141 def hostname @hostname end |
#password=(value) ⇒ Object (writeonly)
Sets the attribute password
143 144 145 |
# File 'lib/simplerpc/client.rb', line 143 def password=(value) @password = value end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
141 142 143 |
# File 'lib/simplerpc/client.rb', line 141 def port @port end |
#secret=(value) ⇒ Object (writeonly)
Sets the attribute secret
143 144 145 |
# File 'lib/simplerpc/client.rb', line 143 def secret=(value) @secret = value end |
#serialiser ⇒ Object
Returns the value of attribute serialiser.
142 143 144 |
# File 'lib/simplerpc/client.rb', line 142 def serialiser @serialiser end |
#threaded ⇒ Object (readonly)
Returns the value of attribute threaded.
141 142 143 |
# File 'lib/simplerpc/client.rb', line 141 def threaded @threaded end |
#timeout ⇒ Object
Returns the value of attribute timeout.
141 142 143 |
# File 'lib/simplerpc/client.rb', line 141 def timeout @timeout end |
Class Method Details
.new_proxy(opts = {}) ⇒ Object
Connect to the remote server and return two things:
-
A proxy object for communicating with the server
-
The client itself, for controlling the connection
All options are the same as #new
219 220 221 222 223 224 |
# File 'lib/simplerpc/client.rb', line 219 def self.new_proxy(opts = {}) client = self.new(opts) proxy = client.get_proxy return proxy, client end |
Instance Method Details
#call(m, *args, &block) ⇒ Object
Call a method that is otherwise clobbered by the client object, e.g.:
client.call(:dup) # return a copy of the server object
324 325 326 |
# File 'lib/simplerpc/client.rb', line 324 def call(m, *args, &block) method_missing(m, *args, &block) end |
#connected? ⇒ Boolean
Is this client maintaining any persistent connections?
Returns true/false if the client is single-threaded, or the number of active connections if the client is multi-threaded
306 307 308 309 310 311 312 313 |
# File 'lib/simplerpc/client.rb', line 306 def connected? # If not threaded, simply check socket @mutex.synchronize { return _connected?(@s) } unless @threaded # if threaded, return pool length @pool_mutex.synchronize { return (@pool.length) } end |
#disconnect ⇒ Object
Close all persistent connections to the server.
298 299 300 |
# File 'lib/simplerpc/client.rb', line 298 def disconnect persist(0) end |
#get_proxy ⇒ Object
Returns a proxy object that is all but indistinguishable from the remote object.
This allows you to pass the object around whilst retaining control over the RPC client (i.e. calling persist/disconnect).
The class returned extends BasicObject and is thus able to pass all calls through to the server.
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 |
# File 'lib/simplerpc/client.rb', line 376 def get_proxy # Construct a new class as a subclass of RemoteObject cls = Class.new(RemoteObject) do # Accept the originating client def initialize(client) @client = client end # And handle method_missing by calling the client def method_missing(m, *args, &block) @client.call(m, *args, &block) end end # Return a new class linked to us return cls.new(self) end |
#persist(pool_size = 1) ⇒ Object
Tell the client how many connections to persist.
If the client is single-threaded, this can either be 1 or 0. If the client is multi-threaded, it can be any positive integer value (or 0).
#persist(0) is equivalent to #disconnect.
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/simplerpc/client.rb', line 237 def persist(pool_size = 1) # Check the pool size is positive raise 'Invalid pool size requested' if pool_size < 0 # If not threaded, check pool size is valid and connect/disconnect # single socket unless @threaded raise 'Threading is disabled: pool size must be 1' if pool_size > 1 # Set socket up @mutex.synchronize do if pool_size == 0 _disconnect(@s) @s = nil else @s = _connect end end return end # If threaded, create a pool of sockets instead @pool_mutex.synchronize do # Resize the pool if pool_size > @pool.length # Allocate more pool space by simply # connecting more sockets (pool_size - @pool.length).times { @pool[_connect] = Mutex.new } else # remove from the pool by trying to remove available # sockets over and over until they are gone. # # This has the effect of waiting for clients to be done # with the socket, without hanging on any one mutex. while @pool.length > pool_size do # Go through and remove from the pool if unused. @pool.each do |s, m| if @pool.length > pool_size && m.try_lock _disconnect(s) @pool.delete(s) end end # Since we're spinning, delay for a while sleep(0.05) end end end rescue EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::ECONNABORTED, Errno::EPIPE => e raise ConnectionError.new(e) end |