Class: Ione::Rpc::Client
- Inherits:
-
Object
- Object
- Ione::Rpc::Client
- Defined in:
- lib/ione/rpc/client.rb
Overview
This is the base class of client peers.
You can either create a subclass and add your own high-level convenience methods for constructing and sending your custom requests, or you can create a standalone client object and call #send_request.
A subclass may optionally implement #initialize_connection to send a message immediately on a successful connection, and #choose_connection to decide which connection to use for a request.
The client will handle connections to multiple server peers, and automatically reconnect to them when they disconnect.
Instance Method Summary collapse
-
#add_host(hostname, port = nil) ⇒ Ione::Future<Ione::Rpc::Client>
Add an additional host to connect to.
-
#choose_connection(connections, request) ⇒ Object
protected
Override this method to implement custom request routing strategies.
-
#connected? ⇒ Boolean
A client is connected when it has at least one open connection.
-
#connection_stats ⇒ Array<Hash>
Returns an array of info and statistics about the currently open connections.
-
#initialize(codec, options = {}) ⇒ Client
constructor
Create a new client with the specified codec and options.
-
#initialize_connection(connection) ⇒ Ione::Future
protected
Override this method to send a request when a connection has been established, but before the future returned by #start resolves.
-
#reconnect?(host, port, attempts) ⇒ Boolean
protected
Override this method to control if, and how many times, the client should attempt to reconnect on connection failures.
-
#remove_host(hostname, port = nil) ⇒ Ione::Future<Ione::Rpc::Client] a future that resolves to the client (immediately, this is mostly to be consistent with #add_host)
Remove a host and disconnect any connections to it.
-
#send_request(request, connection = nil, timeout = nil) ⇒ Ione::Future<Object>
Send a request to a server peer.
-
#start ⇒ Ione::Future<Ione::Rpc::Client>
Start the client and connect to all hosts.
-
#stop ⇒ Ione::Future<Ione::Rpc::Client>
Stop the client and close all connections.
Constructor Details
#initialize(codec, options = {}) ⇒ Client
Create a new client with the specified codec and options.
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/ione/rpc/client.rb', line 37 def initialize(codec, ={}) @codec = codec @lock = Mutex.new @connection_timeout = [:connection_timeout] || 5 @io_reactor = [:io_reactor] || Io::IoReactor.new @max_channels = [:max_channels] || 128 @logger = [:logger] @hosts = [] @connections = [] Array([:hosts]).each { |h| add_host(*h) } end |
Instance Method Details
#add_host(hostname, port = nil) ⇒ Ione::Future<Ione::Rpc::Client>
Add an additional host to connect to. This can be done either before or after the client is started.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/ione/rpc/client.rb', line 115 def add_host(hostname, port=nil) hostname, port = normalize_address(hostname, port) promise = nil @lock.synchronize do _, _, promise = @hosts.find { |h, p, _| h == hostname && p == port } if promise return promise.future else promise = Promise.new @hosts << [hostname, port, promise] end end if @io_reactor.running? promise.observe(connect(hostname, port)) end promise.future.map(self) end |
#choose_connection(connections, request) ⇒ Object (protected)
Override this method to implement custom request routing strategies. Before a request is encoded and sent over a connection this method will be called with all available connections and the request object (i.e. the object passed to #send_request).
The default implementation picks a random connection.
The connection objects have a #host
property that use if you want to
do routing based on host.
254 255 256 |
# File 'lib/ione/rpc/client.rb', line 254 def choose_connection(connections, request) connections.sample end |
#connected? ⇒ Boolean
A client is connected when it has at least one open connection.
50 51 52 53 54 55 |
# File 'lib/ione/rpc/client.rb', line 50 def connected? @lock.lock @connections.any? ensure @lock.unlock end |
#connection_stats ⇒ Array<Hash>
Returns an array of info and statistics about the currently open connections.
Each open connection is represented by a hash which includes the keys
:host
and:port
:max_channels
: the maximum number of messages to send concurrently:active_channels
: the number of sent messages that have not yet received a response:queued_messages
: the number of messages that couldn't be sent immediately because all channels were occupied and thus had to be queued.:sent_messages
: the total number of messages sent since the connection was opened:received_responses
: the total number of responses received since the connection was opened:timeouts
: the number of sent messages that did not receive a response before their timeout expired
76 77 78 79 80 81 |
# File 'lib/ione/rpc/client.rb', line 76 def connection_stats @lock.lock @connections.map(&:stats) ensure @lock.unlock end |
#initialize_connection(connection) ⇒ Ione::Future (protected)
Override this method to send a request when a connection has been established, but before the future returned by #start resolves.
It's important that if you need to send a special message to initialize a connection that you send it to the right connection. To do this pass the connection as second argument to #send_request, see the example below.
232 233 234 |
# File 'lib/ione/rpc/client.rb', line 232 def initialize_connection(connection) Future.resolved end |
#reconnect?(host, port, attempts) ⇒ Boolean (protected)
Override this method to control if, and how many times, the client should attempt to reconnect on connection failures.
You can, for example, stop reconnecting after a certain number of attempts.
271 272 273 |
# File 'lib/ione/rpc/client.rb', line 271 def reconnect?(host, port, attempts) true end |
#remove_host(hostname, port = nil) ⇒ Ione::Future<Ione::Rpc::Client] a future that resolves to the client (immediately, this is mostly to be consistent with #add_host)
Remove a host and disconnect any connections to it. This can be done either before or after the client is started.
142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/ione/rpc/client.rb', line 142 def remove_host(hostname, port=nil) hostname, port = normalize_address(hostname, port) @lock.synchronize do index = @hosts.index { |h, p, _| h == hostname && p == port } if index @hosts.delete_at(index) if (connection = @connections.find { |c| c.host == hostname && c.port == port }) connection.close end end end Future.resolved(self) end |
#send_request(request, connection = nil, timeout = nil) ⇒ Ione::Future<Object>
Send a request to a server peer. The peer chosen is determined by the Implementation of #choose_connection, which is random selection by default.
If a connection closes between the point where it was chosen and when the message was written to it, the request is retried on another connection. For all other errors the request is not retried and it is up to the caller to determine if the request is safe to retry.
If a logger has been specified the following will be logged:
- A warning when a connection has closed and the request will be retried
- A warning when a request fails for another reason
- A warning when there are no open connections
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/ione/rpc/client.rb', line 181 def send_request(request, connection=nil, timeout=nil) if connection chosen_connection = connection else @lock.lock begin chosen_connection = choose_connection(@connections, request) ensure @lock.unlock end end if chosen_connection && !chosen_connection.closed? f = chosen_connection.(request, timeout) f = f.fallback do |error| if error.is_a?(Rpc::ConnectionClosedError) @logger.warn('Request failed because the connection closed, retrying') if @logger send_request(request, connection, timeout) else Ione::Future.failed(error) end end f.on_failure do |error| @logger.warn('Request failed: %s' % error.) if @logger end f elsif chosen_connection Future.failed(Rpc::RequestNotSentError.new('Not connected')) else Future.failed(Rpc::NoConnectionError.new('No connection')) end rescue => e Future.failed(e) end |
#start ⇒ Ione::Future<Ione::Rpc::Client>
Start the client and connect to all hosts. This also starts the IO reactor if it was not already started.
The returned future resolves when all hosts have been connected to, and if one or more fails to connect the client will periodically try again, and the future will not resolve until all of them have connected.
92 93 94 |
# File 'lib/ione/rpc/client.rb', line 92 def start @io_reactor.start.flat_map { connect_all }.map(self) end |
#stop ⇒ Ione::Future<Ione::Rpc::Client>
Stop the client and close all connections. This also stops the IO reactor if it has not already stopped.
101 102 103 104 |
# File 'lib/ione/rpc/client.rb', line 101 def stop @lock.synchronize { @connections = [] } @io_reactor.stop.map(self) end |