Class: Ione::Rpc::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ione/rpc/client.rb

Overview

This is the base class of client peers.

You can either create a subclass and add your own high-level convenience methods for constructing and sending your custom requests, or you can create a standalone client object and call #send_request.

A subclass may optionally implement #initialize_connection to send a message immediately on a successful connection, and #choose_connection to decide which connection to use for a request.

The client will handle connections to multiple server peers, and automatically reconnect to them when they disconnect.

Instance Method Summary collapse

Constructor Details

#initialize(codec, options = {}) ⇒ Client

Create a new client with the specified codec and options.

Parameters:

  • codec (Object)

    the protocol codec to use to encode requests and decode responses. See Ione::Rpc::Codec.

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :hosts (Array<String>)

    the host (and ports) to connect to, specified either as an array of host (String) and port (Integer) pairs (e.g. [['host1', 1111], [host2, 2222]]) or an array of strings on the format host:port (e.g. ['host1:1111', 'host2:2222']).

  • :io_reactor (Ione::Io::IoReactor)

    use this option to make the client use an existing IO reactor and not create its own. Please note that #stop will still stop the reactor.

  • :connection_timeout (Integer) — default: 5

    the number of seconds to wait for connections to be established before failing.

  • :max_channels (Integer) — default: 128

    the maximum number of channels supported for each connection.

  • :logger (Logger)

    a logger conforming to the standard Ruby logger API that will be used to log significant events like request failures.



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/ione/rpc/client.rb', line 37

def initialize(codec, options={})
  @codec = codec
  @lock = Mutex.new
  @connection_timeout = options[:connection_timeout] || 5
  @io_reactor = options[:io_reactor] || Io::IoReactor.new
  @max_channels = options[:max_channels] || 128
  @logger = options[:logger]
  @hosts = []
  @connections = []
  Array(options[:hosts]).each { |h| add_host(*h) }
end

Instance Method Details

#add_host(hostname, port = nil) ⇒ Ione::Future<Ione::Rpc::Client>

Add an additional host to connect to. This can be done either before or after the client is started.

Parameters:

  • hostname (String)

    the host to connect to, or the host:port pair (in which case the port parameter should be nil).

  • port (Integer) (defaults to: nil)

    the host to connect to, or nil if the host is a string on the format host:port.

Returns:

  • (Ione::Future<Ione::Rpc::Client>)

    a future that resolves to the client when the host has been connected to.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/ione/rpc/client.rb', line 115

def add_host(hostname, port=nil)
  hostname, port = normalize_address(hostname, port)
  promise = nil
  @lock.synchronize do
    _, _, promise = @hosts.find { |h, p, _| h == hostname && p == port }
    if promise
      return promise.future
    else
      promise = Promise.new
      @hosts << [hostname, port, promise]
    end
  end
  if @io_reactor.running?
    promise.observe(connect(hostname, port))
  end
  promise.future.map(self)
end

#choose_connection(connections, request) ⇒ Object (protected)

Override this method to implement custom request routing strategies. Before a request is encoded and sent over a connection this method will be called with all available connections and the request object (i.e. the object passed to #send_request).

The default implementation picks a random connection.

The connection objects have a #host property that use if you want to do routing based on host.

Examples:

Routing messages consistently based on a property of the request

def choose_connection(connections, request)
  connections[request.some_property.hash % connections.size]
end

Parameters:

  • connections (Array<Object>)

    all the open connections.

  • request (Object)

    the request to be sent.

Returns:

  • (Object)

    the connection that should receive the request.



254
255
256
# File 'lib/ione/rpc/client.rb', line 254

def choose_connection(connections, request)
  connections.sample
end

#connected?Boolean

A client is connected when it has at least one open connection.

Returns:

  • (Boolean)


50
51
52
53
54
55
# File 'lib/ione/rpc/client.rb', line 50

def connected?
  @lock.lock
  @connections.any?
ensure
  @lock.unlock
end

#connection_statsArray<Hash>

Returns an array of info and statistics about the currently open connections.

Each open connection is represented by a hash which includes the keys

  • :host and :port
  • :max_channels: the maximum number of messages to send concurrently
  • :active_channels: the number of sent messages that have not yet received a response
  • :queued_messages: the number of messages that couldn't be sent immediately because all channels were occupied and thus had to be queued.
  • :sent_messages: the total number of messages sent since the connection was opened
  • :received_responses: the total number of responses received since the connection was opened
  • :timeouts: the number of sent messages that did not receive a response before their timeout expired

Returns:

  • (Array<Hash>)

    an array of hashes that each contain info and statistics from an open connection.



76
77
78
79
80
81
# File 'lib/ione/rpc/client.rb', line 76

def connection_stats
  @lock.lock
  @connections.map(&:stats)
ensure
  @lock.unlock
end

#initialize_connection(connection) ⇒ Ione::Future (protected)

Override this method to send a request when a connection has been established, but before the future returned by #start resolves.

It's important that if you need to send a special message to initialize a connection that you send it to the right connection. To do this pass the connection as second argument to #send_request, see the example below.

Examples:

Sending a startup request

def initialize_connection(connection)
  send_request(MyStartupRequest.new, connection)
end

Returns:

  • (Ione::Future)

    a future that resolves when the initialization is complete. If this future fails the connection fails.



232
233
234
# File 'lib/ione/rpc/client.rb', line 232

def initialize_connection(connection)
  Future.resolved
end

#reconnect?(host, port, attempts) ⇒ Boolean (protected)

Override this method to control if, and how many times, the client should attempt to reconnect on connection failures.

You can, for example, stop reconnecting after a certain number of attempts.

Parameters:

  • host (String)

    the host to connect to

  • port (Integer)

    the port to connect to

  • attempts (Integer)

    the number of attempts that have been made so far – when 1 or above a connection attempt has just failed, when 0 an open connection was abruptly closed and the question is whether or not to attempt to connect again.

Returns:

  • (Boolean)

    true if a connection attempt should be made, false otherwise.



271
272
273
# File 'lib/ione/rpc/client.rb', line 271

def reconnect?(host, port, attempts)
  true
end

#remove_host(hostname, port = nil) ⇒ Ione::Future<Ione::Rpc::Client] a future that resolves to the client (immediately, this is mostly to be consistent with #add_host)

Remove a host and disconnect any connections to it. This can be done either before or after the client is started.

Parameters:

  • hostname (String)

    the host to connect to, or the host:port pair (in which case the port parameter should be nil).

  • port (Integer) (defaults to: nil)

    the host to connect to, or nil if the host is a string on the format host:port.

Returns:

  • (Ione::Future<Ione::Rpc::Client] a future that resolves to the client (immediately, this is mostly to be consistent with #add_host))

    Ione::Future<Ione::Rpc::Client] a future that resolves to the client (immediately, this is mostly to be consistent with #add_host)



142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/ione/rpc/client.rb', line 142

def remove_host(hostname, port=nil)
  hostname, port = normalize_address(hostname, port)
  @lock.synchronize do
    index = @hosts.index { |h, p, _| h == hostname && p == port }
    if index
      @hosts.delete_at(index)
      if (connection = @connections.find { |c| c.host == hostname && c.port == port })
        connection.close
      end
    end
  end
  Future.resolved(self)
end

#send_request(request, connection = nil, timeout = nil) ⇒ Ione::Future<Object>

Send a request to a server peer. The peer chosen is determined by the Implementation of #choose_connection, which is random selection by default.

If a connection closes between the point where it was chosen and when the message was written to it, the request is retried on another connection. For all other errors the request is not retried and it is up to the caller to determine if the request is safe to retry.

If a logger has been specified the following will be logged:

  • A warning when a connection has closed and the request will be retried
  • A warning when a request fails for another reason
  • A warning when there are no open connections

Parameters:

  • request (Object)

    the request to send.

  • connection (Object) (defaults to: nil)

    the connection to send the request on. This parameter is internal and should only be used from #initialize_connection.

  • timeout (Object) (defaults to: nil)

    the maximum time in seconds to wait for a response before failing the returned future with a TimeoutError. There is no timeout by default.

Returns:

  • (Ione::Future<Object>)

    a future that resolves to the response from the server, or fails because there was an error while processing the request (this is not the same thing as the server sending an error response – that is protocol specific and up to the implementation to handle), or when there was no connection open.



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/ione/rpc/client.rb', line 181

def send_request(request, connection=nil, timeout=nil)
  if connection
    chosen_connection = connection
  else
    @lock.lock
    begin
      chosen_connection = choose_connection(@connections, request)
    ensure
      @lock.unlock
    end
  end
  if chosen_connection && !chosen_connection.closed?
    f = chosen_connection.send_message(request, timeout)
    f = f.fallback do |error|
      if error.is_a?(Rpc::ConnectionClosedError)
        @logger.warn('Request failed because the connection closed, retrying') if @logger
        send_request(request, connection, timeout)
      else
        Ione::Future.failed(error)
      end
    end
    f.on_failure do |error|
      @logger.warn('Request failed: %s' % error.message) if @logger
    end
    f
  elsif chosen_connection
    Future.failed(Rpc::RequestNotSentError.new('Not connected'))
  else
    Future.failed(Rpc::NoConnectionError.new('No connection'))
  end
rescue => e
  Future.failed(e)
end

#startIone::Future<Ione::Rpc::Client>

Start the client and connect to all hosts. This also starts the IO reactor if it was not already started.

The returned future resolves when all hosts have been connected to, and if one or more fails to connect the client will periodically try again, and the future will not resolve until all of them have connected.

Returns:

  • (Ione::Future<Ione::Rpc::Client>)

    a future that resolves to the client when all hosts have been connected to.



92
93
94
# File 'lib/ione/rpc/client.rb', line 92

def start
  @io_reactor.start.flat_map { connect_all }.map(self)
end

#stopIone::Future<Ione::Rpc::Client>

Stop the client and close all connections. This also stops the IO reactor if it has not already stopped.

Returns:

  • (Ione::Future<Ione::Rpc::Client>)

    a future that resolves to the client when all connections have closed and the IO reactor has stopped.



101
102
103
104
# File 'lib/ione/rpc/client.rb', line 101

def stop
  @lock.synchronize { @connections = [] }
  @io_reactor.stop.map(self)
end